Azure Fuction DARP Publish set up scheduling option in Azure Service bus message

127 Views Asked by At

I want to create an Azure function(using C# and DARP API), which gets triggered when a message comes into an Azure Service Bus and then publishes a test message to a topic in Azure Sevice bus. But I don't want the message to be immediately published. I have investigated but don't see any Scheduloption that i cam setup in DaprPubSubEvent. Below is the function code.

public static class TestFunc
{
    [FunctionName("TestFunc")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = "TestFunc")] HttpRequest req,
        [DaprPublish(PubSubName = "testSub", Topic = "testTopic")] IAsyncCollector<DaprPubSubEvent> pubEvent,            
        ILogger log)
    {

        log.LogInformation(" HTTP trigger function processed a request.");
       
        await pubEvent.AddAsync(new DaprPubSubEvent("Test Message"));
        return new OkObjectResult("TestResponse" );
    }

Please suggest if there is a way to set up message scheduling in DAPR.

Update: I have modified the code and I am able to add the metadata ScheduledEnqueueTime in the message but for some reason when the message is published the message is encapsulated in 2 data fields. Below is the new code.

public static class TestFunc
{
   [FunctionName("TestFunc")]
   public static async Task<IActionResult> Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = "TestFunc")] HttpRequest req,
    [DaprPublish(PubSubName = "testSub", Topic = "testTopic")] IAsyncCollector<DaprPubSubEvent> pubEvent,            
    ILogger log)
   {

       log.LogInformation(" HTTP trigger function processed a request.");
    
       var d = new Dictionary<string, object> ()
                        {
                            { "ScheduledEnqueueTime", DateTimeOffset.Now.AddMinutes(5)}
                        };

       DaprBindingMessage dm = new DaprBindingMessage("Test Message", d);

       await pubEvent.AddAsync(new DaprPubSubEvent(dm));
       return new OkObjectResult("TestResponse" );
   }

and the message in service bus queue looks like this

{
    "data": {
        "data": {
            "Test Message"          
        },
        "metadata": {
            "ScheduledEnqueueTime": "2023-12-07T20:01:38.677943+00:00"
        }
    },
    "datacontenttype": "application/json; charset=utf-8",
    "id": "20ad432360",
    "pubsubname": "pubsub",
    "source": "ca",
    "specversion": "1.0",
    "time": "2023-12-07T19:56:38Z",
    "topic": "testTopic",
    "traceid": "004b7d4b-00",
    "traceparent": "00d4b-00",
    "tracestate": "",
    "type": "com.dapr.event.sent"

}

I am not able to find why its encapsulating in data twice. If anyone have an idea please suggest.

1

There are 1 best solutions below

2
On

One way is to schedule messages with Azure Service Bus using its .NET SDK.

  • Scheduled messages will be sent to a queue/topic with delayed processing.
  • The code below publishes events to a Dapr pub/sub topic, sends a message to an Azure Service Bus topic and schedules a message enqueue for a specific time using the ScheduledEnqueueTime property.

[FunctionName("TestFunc")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = "TestFunc")] HttpRequest req,
        [DaprPublish(PubSubName = "testSub", Topic = "testTopic")] IAsyncCollector<DaprPubSubEvent> pubEvent,
        ILogger log)
    {
        log.LogInformation("HTTP trigger function processed a request.");

        try
        {
            string connectionString = "your_service_bus_connection_string";
            string topicName = "your_service_bus_topic";

            // Create a ServiceBusClient
            await using var client = new ServiceBusClient(connectionString);

            // Create a sender for the topic
            ServiceBusSender sender = client.CreateSender(topicName);

            // Create a message that we can send
            ServiceBusMessage message = new ServiceBusMessage("Test Message");

            // Set the ScheduledEnqueueTime to indicate how long it should be delayed
            message.ScheduledEnqueueTime = DateTimeOffset.Now.AddMinutes(2); // Example: 2 minutes delay

            // Send the message to the topic
            await sender.SendMessageAsync(message);

            // Publish the Dapr event
            await pubEvent.AddAsync(new DaprPubSubEvent("Test Message"));

            return new OkObjectResult("TestResponse");
        }
        catch (Exception ex)
        {
            log.LogError($"Error: {ex.Message}");
            return new StatusCodeResult(500);
        }
    }

enter image description here

enter image description here

Azure: enter image description here

enter image description here