I have developed an Azure function which is supposed to send a message to Azure Service Bus. I want the message to be scheduled to be picked after 5 min, to accomplish that I am using DaprBindingMessage so that I can set up ScheduledEnqueueTime metadata. I have deployed the Azure function is cluster and using DAPR Pub/Sub to publish message in Azure Service bus. Below is my sample code. The issue I am facing is that the message published in the Service bus queue is double encapsulated in "data" filed. Please suggest where I might be doing it wrong. I do not want to use .Net SDK for message publish.

public static class TestFunc
{
[FunctionName("TestFunc")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = "TestFunc")] HttpRequest req,
[DaprPublish(PubSubName = "testSub", Topic = "testTopic")] IAsyncCollector<DaprPubSubEvent> pubEvent,            
ILogger log)
{

   log.LogInformation(" HTTP trigger function processed a request.");

   var d = new Dictionary<string, object> ()
                    {
                        { "ScheduledEnqueueTime", DateTimeOffset.Now.AddMinutes(5)}
                    };

   DaprBindingMessage dm = new DaprBindingMessage("Test Message", d);

   await pubEvent.AddAsync(new DaprPubSubEvent(dm));
   return new OkObjectResult("TestResponse" );
}

The published message in service bus queue looks like this

{
"data": {
    "data": {
        "Test Message"          
    },
    "metadata": {
        "ScheduledEnqueueTime": "2023-12-07T20:01:38.677943+00:00"
    }
},
"datacontenttype": "application/json; charset=utf-8",
"id": "20ad432360",
"pubsubname": "pubsub",
"source": "ca",
"specversion": "1.0",
"time": "2023-12-07T19:56:38Z",
"topic": "testTopic",
"traceid": "004b7d4b-00",
"traceparent": "00d4b-00",
"tracestate": "",
"type": "com.dapr.event.sent"
}
1

There are 1 best solutions below

2
On

I tried adding the dm directly in pubEvent.AddAsync but received the same issue as yours:-

DaprBindingMessage dm = new DaprBindingMessage("Test Message", d);

        await pubEvent.AddAsync(dm);
        return new OkObjectResult("TestResponse");

Output:-

enter image description here

In order to resolve this error, I passed my message directly as a variable like below:-

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Dapr;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Extensions.Dapr.Core;

public static class TestFunc
{
    [FunctionName("TestFunc")]
    public static async Task<IActionResult> Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = "TestFunc")] HttpRequest req,
    [DaprPublish(PubSubName = "my-service-bus", Topic = "t1")] IAsyncCollector<string> pubEvent,
    ILogger log)
    {
        log.LogInformation("HTTP trigger function processed a request.");

        var messagePayload = "Test Message"; // Your message content
        await pubEvent.AddAsync(messagePayload);

        return new OkObjectResult("TestResponse");
    }
}

Command to run the function with dapr:-

dapr run --app-id my-function-app --app-port 7071 --dapr-http-port 3500 --components-path . func start

Output:-

enter image description here

Code with ScheduledEnqueueTime = DateTimeOffset.Now.AddMinutes(5):-

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.Dapr;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Extensions.Dapr.Core;

public static class TestFunc
{
    [FunctionName("TestFunc")]
    public static async Task<IActionResult> Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = "TestFunc")] HttpRequest req,
    [DaprPublish(PubSubName = "my-service-bus", Topic = "t1")] IAsyncCollector<string> pubEvent,
    ILogger log)
    {
        log.LogInformation("HTTP trigger function processed a request.");

        var messagePayload = new
        {
            Data = "Test Message",
            Metadata = new
            {
                ScheduledEnqueueTime = DateTimeOffset.Now.AddMinutes(5)
            }
        };

        await pubEvent.AddAsync(messagePayload.Data);

        return new OkObjectResult("TestResponse");
    }
}

Output:-

enter image description here

enter image description here