MassTransit 6.3.2 Consumer is never called when using IBusControl.ConnectConsumer

131 Views Asked by At

When I attach consumers during intial message bus config, the consumers are called as expected.

When I attach the consumers after bus config, using ConnectConsumer the consumers are never called; The temporary queue/exchange is created, but it doesn't seen to know of the consumers that are supposed to be attached to that queue.

There is another service/consumer on the bus that is receiving Request messages being published here and publishing Response messages that should be consumed here.

Any idea why this is not working?

NOTE: I know that the "preferred" way of connecting consumers to the bus in the bus config (as in the working example); this is not an option for me as in practice, the bus is being creating/configed in a referenced assembly and the end-user programmers that are adding consumers to the bus do not have access to the bus configuration method. This is something that used to be trivial in version 2; it seems later version make such usecases much more difficult - not all use-cases have easy access to the bus creation/config methods.

Ex.

public class TestResponseConsumer : IConsumer<ITestResponse>
{
    public Task Consume(ConsumeContext<ITestResponse> context)
    {
        Console.WriteLine("TestResponse received");

        return Task.CompletedTask;
    }
}

    ...

This works (consumer gets called):

    public IBusControl ServiceBus;

    public IntegrationTestsBase()
    {
        ServiceBus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            cfg.Host("vmdevrab-bld", "/", h => {
                h.Username("guest");
                h.Password("guest");
            });

            cfg.ReceiveEndpoint("Int_Test", e =>
            {
                e.Consumer<TestResponseConsumer>();
            });

            cfg.AutoStart = true;
        });

        ServiceBus.Start();
    }

    ~IntegrationTestsBase()
    {
        ServiceBus.Stop();
    }
}

This does not work:

    [TestMethod]
    public void Can_Receive_SampleResponse()
    {
        try
        {
            ITestRequest request = new TestRequest(Guid.NewGuid(), Guid.NewGuid(), Guid.NewGuid());

            ServiceBus.ConnectConsumer<TestResponseConsumer>();

            ServiceBus.Publish<ITestRequest>(request);

            mre.WaitOne(60000);
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            Console.WriteLine(ex.StackTrace);
            Assert.Fail();
        }
    }
1

There are 1 best solutions below

2
On BEST ANSWER

It doesn't work because as explained in the documentation when you connect a consumer to the bus endpoint, no exchange bindings are created. Published messages will not be delivered to the bus endpoint.

If you want to connect consumers to the bus after it has been started, you should use ConnectReceiveEndpoint() instead, which is also covered in the documentation.

var handle = bus.ConnectReceiveEndpoint("secondary-queue", x =>
{
    x.Consumer<TestResponseConsumer>();
})

var ready = await handle.Ready;

The endpoint can be stopped when it is no longer needed, otherwise it will be stopped when the bus is stopped.