MassTransit fault consumer not invoked for request/response

2.3k Views Asked by At

What is the best practice for handling exceptions in MassTransit 3+ with regard to Request/Response pattern? The docs here mention that if a ResponseAddress exists on a message, the Fault message will be sent to that address, but how does one consumer/receive the messages at that address? The ResponseAddress for Bus.Request seems to be an auto-generated MassTransit address that I don't have control over, so I don't know how to access the exception thrown in the main consumer. What am I missing? Here's my code to register the consumer and its fault consumer using Unity container:

cfg.ReceiveEndpoint(host, "request_response_queue", e =>
{
    e.Consumer<IConsumer<IRequestResponse>>(container);
    e.Consumer(() => container.Resolve<IMessageFaultConsumer<IRequestResponse>>() as IConsumer<Fault<IRequestResponse>>);
});

And here's my attempt at a global message fault consumer:

public interface IMessageFaultConsumer<TMessage>
{
}

public class MessageFaultConsumer<TMessage> : IConsumer<Fault<TMessage>>, IMessageFaultConsumer<TMessage>
{
    public Task Consume(ConsumeContext<Fault<TMessage>> context)
    {
        Console.WriteLine("MessageFaultConsumer");
        return Task.FromResult(0);
    }
}

This approach DOES work when I use Bus.Publish as opposed to Bus.Request. I also looked into creating an IConsumeObserver and putting my global exception logging code into the ConsumeFault method, but that has the downside of being invoked every exception prior to the re-tries giving up. What is the proper way to handle exceptions for request/response?

1

There are 1 best solutions below

0
On BEST ANSWER

First of all, the request/response support in MassTransit is meant to be used with the .Request() method, or the request client (MessageRequestClient or PublishRequestClient). With these methods, if the consumer of the request message throws an exception, that exception is packaged into the Fault<T>, which is sent to the ResponseAddress. Since the .Request() method, and the request client are both asynchronous, using await will throw an exception with the exception data from the fault included. That's how it is designed, await the request and it will either complete, timeout, or fault (throw an exception upon await).

If you are trying to put in some global "exception handler" code for logging purposes, you really should log those at the service boundary, and an observer is the best way to handle it. This way, you can just implement the ConsumeFault method, and log to your event sink. However, this is synchronous within the consumer pipeline, so recognize the delay that could be introduced.

The other option is to of course just consume Fault<T>, but as you mentioned, it does not get published when the request client is used with the response address in the header. In this case, perhaps your requester should publish an event indicating that operation X faulted, and you can log that -- at the business context level versus the service level.

There are many options here, it's just choosing the one that fits your use case best.