Process dead letter message in Azure Service Bus Mass Transit

193 Views Asked by At

I want to reprocess dead-lettered message through Azure Service Bus Mass Transit here is my code on the service collection:

services.AddMassTransit(massTransitConfigurator =>
{
    massTransitConfigurator.UsingAzureServiceBus((ctx, cfg) =>
    {
        cfg.SubscriptionEndpoint<TestModel>("test-topic", e =>
        {
            e.ConfigureConsumer<TestConsumer>(ctx);
        });
    });
});

Here is my consumer class:

public class TestConsumer : IConsumer<TestModel>
{
    public Task Consume(ConsumeContext<TestModel> context)
    {
        throw new Exception("Test Error will create a dead-lettered message");
        Console.WriteLine(context.Message);
    }
}

I have checked the Service Bus Explorer and the exception will create a dead-lettered message. I want to consume that or to reprocess but I still want to get the exception like it is unhandled. Thank you.

1

There are 1 best solutions below

0
On

Using this MSDOC I am able to send message as property to dead-lettered message and received message with MassTransit.

Used MassTransit with Azure Service Bus for message processing, including handling retries, moving messages to the dead-letter queue and receiving messages.

Code:

 public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddMassTransit(cfg =>
                    {
                        var serviceBusConnectionString = "YOUR_CONNECTION_STRING_HERE"; // Replace with your Azure Service Bus connection string
                        var queueName = "YOUR_QUEUE_NAME_HERE"; // Replace with your queue name

                        cfg.UsingAzureServiceBus((context, busFactoryCfg) =>
                        {
                            busFactoryCfg.Host(serviceBusConnectionString);

                            // Define a consumer for your message type
                            busFactoryCfg.ReceiveEndpoint(queueName, endpointCfg =>
                            {
                                endpointCfg.PrefetchCount = 16; // Optional, set prefetch count
                                endpointCfg.UseMessageRetry(retryCfg => retryCfg.Interval(3, 1000));
                                endpointCfg.Consumer<YourMessageConsumer>();
                            });

                            // Configure a consumer for dead-lettered messages
                            busFactoryCfg.ReceiveEndpoint(queueName + "-error", endpointCfg =>
                            {
                                endpointCfg.PrefetchCount = 16;
                                endpointCfg.UseMessageRetry(retryCfg => retryCfg.Interval(3, 1000));
                                endpointCfg.Consumer<DeadLetterConsumer>();
                            });
                        });
                    });

                    services.AddMassTransitHostedService();

                    services.AddHostedService<Worker>();
                })
                .ConfigureLogging((hostContext, logging) =>
                {
                    logging.AddConsole(); // Enable console logging
                });
    }

    public class Worker : IHostedService
    {
        private readonly IBusControl _bus;

        public Worker(IBusControl bus)
        {
            _bus = bus;
        }

        public async Task StartAsync(CancellationToken cancellationToken)
        {
            var messageToSend = new YourMessageType
            {
                Property1 = "Hello",
                Property2 = "World"
            };

            await _bus.Publish(messageToSend);

            Console.WriteLine("Message sent to MassTransit");
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }

    public class YourMessageType
    {
        public string Property1 { get; set; }
        public string Property2 { get; set; }
    }

    public class YourMessageConsumer : IConsumer<YourMessageType>
    {
        public async Task Consume(ConsumeContext<YourMessageType> context)
        {
            var message = context.Message;
            Console.WriteLine($"Received message: Property1 = {message.Property1}, Property2 = {message.Property2}");

            // Simulate exceeding the maximum delivery count
            if (context.Headers.TryGetHeader("deliveryCount", out var deliveryCount) && (int)deliveryCount >= 2)
            {
                Console.WriteLine("Message exceeded maximum delivery count. Moving to DLQ.");
                var address = new Uri($"queue:{context.Queue.Address.Path}-error"); // Define the error queue address
                var publishEndpoint = await context.GetSendEndpoint(address);
                await publishEndpoint.Send(message);
            }
        }
    }

    public class DeadLetterConsumer : IConsumer<YourMessageType>
    {
        public async Task Consume(ConsumeContext<YourMessageType> context)
        {
            var message = context.Message;
            Console.WriteLine($"Received dead-lettered message: Property1 = {message.Property1}, Property2 = {message.Property2}");

         
        }
    }
}     

Output: enter image description here

  • For more details refer to this git.