What's the isolated-process equivalent to in-process ServiceBusSender bindings in Azure functions?

185 Views Asked by At

We've got a number of in-process azure functions that have bindings along the lines of:

  public async Task Run(
      [ServiceBusTrigger("%TriggerQueueName%", Connection = "TriggerQueueConnection")] InitMessageBody messageBody, string correlationId, ILogger log,
      [ServiceBus("%TriggerQueueName%", Connection = "TriggerQueueConnection", EntityType = ServiceBusEntityType.Queue)] ServiceBusSender originatingQueueSender,
      [ServiceBus("%OutputQueueName%", Connection = "OutputQueueConnection", EntityType = ServiceBusEntityType.Queue)] ServiceBusSender outputQueueSender
      )

We use the ServiceBusSender instances rather than Output binding as it gives us easy control over batch sending (in the case of the output sender) but we also have an exponential back-off retry handler pattern we use to schedule a copy of the original message back onto the originating queue when a transient error occurs.

I've got some more functions to write and now that isolated is the preferred (and currently the only available one for .net8) model for Azure Functions, I thought I'd give it a go but I can't work out how to implement a similar strategy to using the ServiceBusSender which I don't believe is available as a binding in v5.x functions.

For output batching, from what I can tell, while you can send multiple messages on the Output by have the return type set as T[], you can only supply the message body and have no control over metadata like messageId or correlationId, which I want.

For scheduling messages back onto the originating queue, the only way I can see is to spin up a ServiceBusClient in the function ctor and use that to generate a ServiceBusSender.

Is the only/correct way of doing what I want do to spin up/down clients and senders manually?

If this is the correct way, how should disposal of the ServiceBusClient be handled? Am I best off using DI to provide the client and let that take care of disposal when appropriate (I do this already for web-app integration but unsure on how best to do this given the functions lifecycle)?

Feeling like there's a lot of grind appearing that I guess was taken care of by the in-process model - pondering if I should drop back down to .Net6 until the in-process model becomes available for .NET8.

1

There are 1 best solutions below

2
Dasari Kamali On

I tried the sample code below in .NET 8 isolated model to send a message from the input queue and receive it in the output queue with ServiceBusClient, using the Service Bus Queue Trigger function.

Code :

Function1.cs :

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace FunctionApp101
{
    public class Function1
    {
        private readonly ILogger<Function1> _logger;
        private readonly ServiceBusClient _serviceBusClient;
        private readonly ServiceBusClient _outputQueueServiceBusClient;

        public Function1(ILogger<Function1> logger, ServiceBusClient serviceBusClient, ServiceBusClient outputQueueServiceBusClient)
        {
            _logger = logger;
            _serviceBusClient = serviceBusClient;
            _outputQueueServiceBusClient = outputQueueServiceBusClient;
        }

        [Function(nameof(Function1))]
        public async Task Run(
            [ServiceBusTrigger("<input_queueName>", Connection = "servicebus")]
            ServiceBusReceivedMessage message,
            ServiceBusMessageActions messageActions)
        {
            _logger.LogInformation("Message ID: {id}", message.MessageId);
            _logger.LogInformation("Message Body: {body}", message.Body);
            _logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
            await SendMessageToAnotherQueueAsync(message);
            await messageActions.CompleteMessageAsync(message);
        }

        private async Task SendMessageToAnotherQueueAsync(ServiceBusReceivedMessage message)
        {
            try
            {
                var sender = _outputQueueServiceBusClient.CreateSender("<output_queueName>");
                var newMessage = new ServiceBusMessage(message.Body)
                {
                    MessageId = Guid.NewGuid().ToString(),
                    CorrelationId = message.MessageId 
                };
                await sender.SendMessageAsync(newMessage);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error sending message to output queue.");
            }
        }
    }
}

Program.cs :

using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using Azure.Messaging.ServiceBus;
using System;
using Microsoft.Extensions.Hosting;

var host = new HostBuilder()
    .ConfigureFunctionsWebApplication()
    .ConfigureServices(services =>
    {
        services.AddSingleton(provider =>
        {
            var connectionString = Environment.GetEnvironmentVariable("servicebus");
            return new ServiceBusClient(connectionString);
        });
        services.AddSingleton(provider =>
        {
            var outputQueueConnectionString = Environment.GetEnvironmentVariable("outputQueueConnectionString");
            return new ServiceBusClient(outputQueueConnectionString);
        });
    })
    .Build();

host.Run();

local.settings.json :

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated",
    "servicebus": "<inputqueue_connec>",
    "outputQueueConnectionString": "<outputqueue_connec>"
  }
}

Output :

The following function started running as shown below.

enter image description here

Next, I sent a message to the input queue (kamqueue) in the Azure Service Bus, as shown below.

enter image description here

I received the above input queue message in the output queue in the Azure Portal.

enter image description here

I successfully sent the message from the input queue(kamqueue) and received it in the output queue.

Azure Functions Core Tools
Core Tools Version:       4.0.5504 Commit hash: N/A +f829589bcaxxxxxxxxxxxxxx (64-bit)
Function Runtime Version: 4.28.3.21820

[2024-02-17T02:04:55.530Z] Found C:\Users\xxxxxx\source\repos\FunctionApp101\FunctionApp101\FunctionApp101.csproj. Using for user secrets file configuration.
[2024-02-17T02:05:05.349Z] Azure Functions .NET Worker (PID: 4948) initialized in debug mode. Waiting for debugger to attach...
[2024-02-17T02:05:05.590Z] Worker process started and initialized.

Functions:

        Function1: serviceBusTrigger

For detailed output, run func with --verbose flag.
[2024-02-17T02:05:10.915Z] Host lock lease acquired by instance ID '0000000000xxxxxxxxxxxxxxxx'.
[2024-02-17T02:05:40.240Z] Executing 'Functions.Function1' (Reason='(null)', Id=4129fa78xxxxxxxxxxxxx)
[2024-02-17T02:05:40.244Z] Trigger Details: MessageId: b6103f31586540888341xxxxxxxxxxxx, SequenceNumber: 4, DeliveryCount: 1, EnqueuedTimeUtc: 2024-02-17T02:05:39.2850000+00:00, LockedUntilUtc: 2024-02-17T02:06:39.3010000+00:00, SessionId: (null)
[2024-02-17T02:05:41.060Z] Message Body: Hi Kamali
[2024-02-17T02:05:41.060Z] Message ID: b6103f31586540888341xxxxxxxxxxx
[2024-02-17T02:05:41.060Z] Message Content-Type: (null)
[2024-02-17T02:05:45.300Z] Start processing HTTP request POST http://127.0.0.1:50310/Settlement/Complete
[2024-02-17T02:05:45.305Z] Sending HTTP request POST http://127.0.0.1:50310/Settlement/Complete
[2024-02-17T02:05:45.914Z] Received HTTP response headers after 592.3766ms - 200
[2024-02-17T02:05:45.916Z] End processing HTTP request after 630.5462ms - 200
[2024-02-17T02:05:46.065Z] Executed 'Functions.Function1' (Succeeded, Id=4129fa78xxxxxxxxxxxxx, Duration=5982ms)

enter image description here