How to release Azure service bus session lock?

178 Views Asked by At

I'm using service bus topics with session enabled subscriptions and I would like to release the session lock right after one message is handled. For example if the first ServiceBusSessionProcessor gets a message and calls CompleteMessageAsync another ServiceBusSessionProcessor should be able to get messages for the same sessionId.

Currently I have to wait 5 minutes so the lock expires and then the second processor can start getting the messages.

This is what I'm calling for each processor:

private void CreateSessionMessageProcessor(string sessionId)
    {
        IMessageProcessorFactory factory = new MessageProcessorFactory(_serviceProvider.GetRequiredService<IServiceBusPersisterConnection>());

        var serviceScope = _serviceProvider.CreateScope();

        factory.CreateSessionMessageProcessor(
        serviceScope.ServiceProvider.GetRequiredService<IMessageHandler<Message>>(),
        "topic",
        "subscription",
        sessionId,
        TimeSpan.FromSeconds(15),
        _serviceProvider.GetRequiredService<IMessageHelper>(),
        serviceScope.ServiceProvider.GetRequiredService<IAppLogger<Message>>());
    }

And this is how the processor is created:

public ISessionMessageProcessor CreateSessionMessageProcessor<T>(IMessageHandler<T> handler, string topicName, string subscriptionName, string sessionId, TimeSpan timeout, IMessageHelper messageHelper, IAppLogger<T> logger)
    {
        return new BaseSessionMessageProcessor<T>(handler, topicName, subscriptionName, sessionId, timeout, _serviceBusPersisterConnection, messageHelper, logger);
    }


public BaseSessionMessageProcessor(
    IMessageHandler<T> handler,
    string topicName,
    string subscriptionName,
    string sessionId,
    TimeSpan timeOut,
    IServiceBusPersisterConnection serviceBusPersisterConnection,
    IMessageHelper messageHelper,
    IAppLogger<T> logger)
{
    _handler = handler;
    _messageHelper = messageHelper;
    _logger = logger;

    var options = new ServiceBusSessionProcessorOptions
    {
        MaxConcurrentSessions = 2,
        MaxConcurrentCallsPerSession = 1,
        AutoCompleteMessages = false,
        SessionIds = { sessionId },
        SessionIdleTimeout = timeOut,
        MaxAutoLockRenewalDuration = TimeSpan.Zero
    };
    _processor = serviceBusPersisterConnection.ServiceBusClient.CreateSessionProcessor(topicName, subscriptionName, options);

    RegisterSessionMessageProcessor().GetAwaiter().GetResult();
}

public async Task RegisterSessionMessageProcessor()
{
    _processor.ProcessMessageAsync += SessionMessageHandler;
    _processor.ProcessErrorAsync += ErrorHandler;

    await _processor.StartProcessingAsync();
}

public async Task SessionMessageHandler(ProcessSessionMessageEventArgs args)
{
    _logger.LogInformation($"log-info: process started");
}

public Task ErrorHandler(ProcessErrorEventArgs args)
{
    var exception = args.Exception;
    var context = args.ErrorSource;

    _logger.LogError($"log-exception: An error occurred while trying to handle a message.", exception, context);

    return Task.CompletedTask;
}
1

There are 1 best solutions below

1
On

I tried the below C# console code to create a Service Bus session processor for handling messages with distinct session IDs with five messages to the specified topic and subscription. The session processor processes each message with a delay of 10 seconds for message handling before completing.

Code :

using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

class Program
{
    const string ServiceBusConnectionString = "<ServiceBusTopic_connecString>";
    const string TopicName = "<topic_name>";
    const string SubscriptionName = "<subscription_name>";

    static async Task Main()
    {
        await SendMessageAsync();

        var serviceBusClient = new ServiceBusClient(ServiceBusConnectionString);
        var sessionProcessor = serviceBusClient.CreateSessionProcessor(TopicName, SubscriptionName, new ServiceBusSessionProcessorOptions
        {
            MaxConcurrentSessions = 1
        });

        sessionProcessor.ProcessMessageAsync += ProcessMessagesAsync;
        sessionProcessor.ProcessErrorAsync += ExceptionReceivedHandler;

        try
        {
            Console.WriteLine("Starting session processor...");
            await sessionProcessor.StartProcessingAsync();

            Console.WriteLine("Press any key to exit...");
            Console.ReadKey();
        }
        finally
        {
            Console.WriteLine("Closing session processor...");
            await sessionProcessor.CloseAsync();
            await serviceBusClient.DisposeAsync();
            Console.WriteLine("Exiting application...");
        }
    }

    static async Task SendMessageAsync()
    {
        var serviceBusClient = new ServiceBusClient(ServiceBusConnectionString);
        var sender = serviceBusClient.CreateSender(TopicName);

        try
        {
            for (int i = 1; i <= 5; i++)
            {
                var sessionId = $"session-{i}";
                var messageBody = $"Message {i}";
                var message = new ServiceBusMessage(messageBody)
                {
                    SessionId = sessionId
                };

                await sender.SendMessageAsync(message);
                Console.WriteLine($"Message sent with SessionId: {sessionId}");
            }
        }
        finally
        {
            await sender.CloseAsync();
            await serviceBusClient.DisposeAsync();
        }
    }

    static async Task ProcessMessagesAsync(ProcessSessionMessageEventArgs args)
    {
        try
        {
            var session = args.SessionId;
            var message = args.Message;

            Console.WriteLine($"Received message: SessionId = {session}, SequenceNumber = {message.SequenceNumber}");
            await Task.Delay(TimeSpan.FromSeconds(10));
            await args.CompleteMessageAsync(message);
            Console.WriteLine($"Completed message: SessionId = {session}, SequenceNumber = {message.SequenceNumber}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Error processing message: {ex.Message}");
        }
    }

    static Task ExceptionReceivedHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine($"Message handler encountered an exception: {args.Exception}");
        return Task.CompletedTask;
    }
}

Output :

It ran successfully as below.

enter image description here