How to register to receive service bus session messages that only have a particular session id?

2.3k Views Asked by At
  1. I am using .net core 3.1 and Microsoft.Azure.ServiceBus (version 5.1.3).
  2. I have a service bus topic with one subscription that can handle session messages only.

A topic client can send 3 messages with session id (say ABCD) and then sends another 4 messages with session id (XYZ). It is pretty easy to write subscription client to receive all 7 messages with related session ids. However I want the ability to receive messages with session id XYZ only (and don't care about the messages with session id ABCD and don't even want to receive them).

The following example code to receive all messages with all session ids works as expected:

static async Task Main(string[] args)
{
    try
    {
        byte[] messageBody = System.Text.Encoding.Unicode.GetBytes("Hello, world!");
        ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(connectionString);

        SubscriptionClient client = new SubscriptionClient(builder, subscriptionName, ReceiveMode.PeekLock);

        var sessionHandler = new SessionHandlerOptions(ExceptionHandler);
        sessionHandler.AutoComplete = true;
        client.RegisterSessionHandler(SessionMessageHandler, sessionHandler);

        Console.WriteLine("Press any key to exit!");
        Console.ReadKey();

        await client.CloseAsync();
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex);
    }        
}

static Task SessionMessageHandler(IMessageSession session, Message message, CancellationToken cancellationToken)
{
    var bodyText = System.Text.Encoding.Unicode.GetString(message.Body);
    Console.WriteLine($"Session id: {message.SessionId}; Body: {bodyText}");
    return Task.CompletedTask;
}

static Task ExceptionHandler(ExceptionReceivedEventArgs args)
{
    var context = args.ExceptionReceivedContext;
    Console.WriteLine($"Exception context: {context.Action}, {context.ClientId}, {context.Endpoint}, {context.EntityPath}");
    Console.WriteLine($"Exception: {args.Exception}");
    return Task.CompletedTask;
}

Questions:

  1. How do I change the above code so that I only receive messages with session id XYZ only (without ever receiving the messages with session id ABCD)?
  2. If it is not possible with above code, is there any other way to achieve what I want (with the same library)? If so, please provide examples.
2

There are 2 best solutions below

0
On BEST ANSWER

The code above is using a session handler that is designed to process multiple sessions and not just a single session. If you'd like to process a single session only with a specific ID, you'd need to use SessionClient and its AcceptMessageSessionAsync(String) method that accepts the session ID as the parameter.

0
On

Using Sean's advise, I modified the code to following and it is working. Thanks Sean.

    static async Task Main(string[] args)
    {
        try
        {
            Console.WriteLine("Enter session id to listen on ...");
            var sessionId = Console.ReadLine();
            if (sessionId == string.Empty)
            {
                sessionId = "12345";
            }
            Console.WriteLine($"Reading messages with session id: {sessionId}");

            var sessionClient = new SessionClient(connectionStringWithoutEntityPath, subscriberPath, ReceiveMode.PeekLock);

            var messageSession = await sessionClient.AcceptMessageSessionAsync(sessionId);

            if (messageSession != null)
            {
                while(true)
                {
                    Message message = await messageSession.ReceiveAsync();

                    if (message != null)
                    {
                        var bodyText = System.Text.Encoding.Unicode.GetString(message.Body);
                        Console.WriteLine($"Session id: {message.SessionId}; Body: {bodyText}");
                        await messageSession.CompleteAsync(message.SystemProperties.LockToken);
                    }
                    else
                    {
                        Console.WriteLine("Press Enter to keep reading. Otherwise press any other key to exit.");
                        if (Console.ReadLine() != string.Empty)
                        {
                            break;
                        }
                    }
                }
            }

            await sessionClient.CloseAsync();
            Console.WriteLine("Press any key to exit!");
            Console.ReadKey();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }        
    }