I'm using service bus topics with session enabled subscriptions and I would like to release the session lock right after one message is handled. For example if the first ServiceBusSessionProcessor gets a message and calls CompleteMessageAsync another ServiceBusSessionProcessor should be able to get messages for the same sessionId.
Currently I have to wait 5 minutes so the lock expires and then the second processor can start getting the messages.
This is what I'm calling for each processor:
private void CreateSessionMessageProcessor(string sessionId)
{
IMessageProcessorFactory factory = new MessageProcessorFactory(_serviceProvider.GetRequiredService<IServiceBusPersisterConnection>());
var serviceScope = _serviceProvider.CreateScope();
factory.CreateSessionMessageProcessor(
serviceScope.ServiceProvider.GetRequiredService<IMessageHandler<Message>>(),
"topic",
"subscription",
sessionId,
TimeSpan.FromSeconds(15),
_serviceProvider.GetRequiredService<IMessageHelper>(),
serviceScope.ServiceProvider.GetRequiredService<IAppLogger<Message>>());
}
And this is how the processor is created:
public ISessionMessageProcessor CreateSessionMessageProcessor<T>(IMessageHandler<T> handler, string topicName, string subscriptionName, string sessionId, TimeSpan timeout, IMessageHelper messageHelper, IAppLogger<T> logger)
{
return new BaseSessionMessageProcessor<T>(handler, topicName, subscriptionName, sessionId, timeout, _serviceBusPersisterConnection, messageHelper, logger);
}
public BaseSessionMessageProcessor(
IMessageHandler<T> handler,
string topicName,
string subscriptionName,
string sessionId,
TimeSpan timeOut,
IServiceBusPersisterConnection serviceBusPersisterConnection,
IMessageHelper messageHelper,
IAppLogger<T> logger)
{
_handler = handler;
_messageHelper = messageHelper;
_logger = logger;
var options = new ServiceBusSessionProcessorOptions
{
MaxConcurrentSessions = 2,
MaxConcurrentCallsPerSession = 1,
AutoCompleteMessages = false,
SessionIds = { sessionId },
SessionIdleTimeout = timeOut,
MaxAutoLockRenewalDuration = TimeSpan.Zero
};
_processor = serviceBusPersisterConnection.ServiceBusClient.CreateSessionProcessor(topicName, subscriptionName, options);
RegisterSessionMessageProcessor().GetAwaiter().GetResult();
}
public async Task RegisterSessionMessageProcessor()
{
_processor.ProcessMessageAsync += SessionMessageHandler;
_processor.ProcessErrorAsync += ErrorHandler;
await _processor.StartProcessingAsync();
}
public async Task SessionMessageHandler(ProcessSessionMessageEventArgs args)
{
_logger.LogInformation($"log-info: process started");
}
public Task ErrorHandler(ProcessErrorEventArgs args)
{
var exception = args.Exception;
var context = args.ErrorSource;
_logger.LogError($"log-exception: An error occurred while trying to handle a message.", exception, context);
return Task.CompletedTask;
}
I tried the below C# console code to create a Service Bus session processor for handling messages with distinct session IDs with five messages to the specified topic and subscription. The session processor processes each message with a delay of 10 seconds for message handling before completing.
Code :
Output :
It ran successfully as below.