Unable to consume message - MassTransit with ActiveMQ Artemis

49 Views Asked by At
public class MessagePublisher : BackgroundService
{
    private readonly ILogger<MessagePublisher> _logger;
    private readonly IBusControl _busControl;
    private readonly IMessageIcd _messageIcd;
    private readonly IArtemisMqOptions _artemisMqOptions;
    private ITestHarness _testHarness;

    public MessagePublisher(ILogger<MessagePublisher> logger, IArtemisMqOptions artemisMqOptions, IBusControl busControl, IMessageIcd messageIcd, ITestHarness testHarness)
    {
        _logger = logger;
        _artemisMqOptions = artemisMqOptions ?? throw new ArgumentNullException(nameof(artemisMqOptions));
        _busControl = busControl ?? throw new ArgumentNullException(nameof(busControl));
        _messageIcd = messageIcd ?? throw new ArgumentNullException(nameof(messageIcd));
        _testHarness = testHarness;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Message Publisher started");

        try
        {
            await _testHarness.Start();

            while (!stoppingToken.IsCancellationRequested)
            {
                if (_messageIcd != null)
                {
                    await _testHarness.Bus.Publish(_messageIcd, ctx => ctx.DestinationAddress = new Uri($"queue:{_artemisMqOptions.InQueue}"));

                    var consumerTestHarness = _testHarness.GetConsumerHarness<MessageConsumer>();
                    Assert.True(await _testHarness.Published.Any<IMessageIcd>(), "Message not published");
                    Assert.True(await consumerTestHarness.Consumed.Any<IMessageIcd>(), "Message not consumed");

                    _logger.LogInformation("Message published successfully.");
                }
                else
                {
                    _logger.LogWarning("MessageIcd is null. Skipping publishing.");
                }

                await Task.Delay(1000, stoppingToken); // Adjust the delay as needed
            }
        }
        catch (OperationCanceledException)
        {
            _logger.LogInformation("Publishing operation cancelled.");
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "An error occurred while publishing the message.");
            throw; // Rethrow the exception for further handling at a higher level
        }
    }
}

public class MessageConsumer : IConsumer<IMessageIcd>
{
    private readonly ILogger<MessageConsumer> _logger;
    private readonly SemaphoreSlim _consumedSignal;

    public MessageConsumer(ILogger<MessageConsumer> logger)
    {
        _logger = logger ?? throw new ArgumentNullException(nameof(logger));
        _consumedSignal = new SemaphoreSlim(0);
    }

    public async Task Consume(ConsumeContext<IMessageIcd> context)
    {
        // Implement your message handling logic here
        var message = context.Message;

        // Test logic
        _logger.LogInformation("Message consumed: {Message}", message);

        // Signal that message has been consumed
        _consumedSignal.Release();

        await Task.CompletedTask;
    }

    public async Task WaitForMessageConsumption(TimeSpan timeout)
    {
        // Wait for the consumed signal or timeout
        await _consumedSignal.WaitAsync(timeout);
    }
}
 [Fact]
public async Task TestSendMessageAsync1()
{
    try
    {
        ICommonLogicMethods common = new CommonLogicMethods(_messageIcd);
        var sfa = _sfaTestData.GenerateData();
        var messageicd = common.GenerateIcd<ISingleFormAuthority>(sfa, MessageType.Authority, 1001, "000", "SystemCRC", 23000, GlobalEnums.MessageAction.Create, GlobalEnums.PayLoadDataType.Object);

        if (messageicd?.Result != null)
        {
            var timeout = TimeSpan.FromSeconds(30);
            using var source = new CancellationTokenSource(timeout);

            var publisher = new MessagePublisher(_loggerPublish, _artemisMqOptions, _busControl, messageicd.Result, _harness);
            await publisher.StartAsync(source.Token);
           
            await Task.Delay(5000);

            // Get consumer harness
            var consumerTestHarness = _harness.GetConsumerHarness<MessageConsumer>();

            // Wait for message consumption
            var consumedMessage = await consumerTestHarness.Consumed.Any<IMessageIcd>(source.Token);

            // Perform assertions
            Assert.True(consumedMessage, "Message not consumed");

            _loggerPublish.LogInformation("Message published and consumed successfully.");
        }
    }
    catch (Exception ex)
    {
        throw ex;
    }
    finally
    {
        await _harness.Stop();
    }
}

Here is my test case with publish and consume the message. Unable to consume the message. Can someone help me out what I am missing here? It is coming null as in context. The test case I am written in test project and publisher and consumer is in main project. How to consume with specific queue name?

0

There are 0 best solutions below