Is it possible to commit transaction in BeforeTransportMessage event

141 Views Asked by At

As in title I wanted to ask is it possible to commit transaction in BeforeTransportMessage event so for example if specific headers occur in TransportMessage I could wrap it in another message and send somewhere else and stop Rebus from futher processing.

I didn't find any real life example of using that event, is there maybe something you could share?

Thanks in advance :)

1

There are 1 best solutions below

9
On BEST ANSWER

If you can safely deserialize all incoming message, this is what you can do:

If you're trying to implement a filter that inspects all incoming messages, I suggest you create a "catch-all" message handler, i.e. an implementation of IHandleMessages<object>. Let's call it CatchAllHandler, although I'm sure you can come up with a better name ;)

Then you can

Configure.With(yourFavoriteContainerAdapter)
    .UseMsmq(...)
    .SpecifyOrderOfHandlers(o => o.First<CatchAllHandler>())
    .Create()
    .Start();

in order to have all incoming messages dispatched to your CatchAllHandler before any other handlers. Since it implements IHandleMessage<object>, it is capable of handling all incoming messages. Your CatchAllHandler can then do something like this:

public class CatchAllHandler : IHandleMessages<object> {
    readonly IMessageContext context;
    readonly IBus bus;

    public CatchAllHandler(IMessageContext context, IBus bus) {
        this.context = context;
        this.bus = bus;
    }

    public void Handle(object message) {
        if (MessageHasSpecialHeader(context.Headers)) {
            // forward the current transport message in its entirety
            bus.Advanced.Routing.ForwardCurrentMessage("somewhereElse");

            // abort further processing of the message (i.e. do not
            // dispatch to subsequent handlers in the pipeline)
            context.Abort();
        }
    }

    bool MessageHasSpecialHeader(IDictionary<string, object> headers) {
        // ... look for that special header down here
    }
}

If, however, you don't know whether the incoming byte[] is good for you, you need to do something else. It's fairly straightforward though to use Rebus' transport implementations without the rest of Rebus, but you'll have to implement the threading model yourself.

The following code is an example (written completely from memory, so you might need to correct some small bits) on what such a worker thread might look like if you're using MSMQ:

volatile bool _keepWorking = true;

public void ThreadLoop() {
    using(var transport = new MsmqMessageQueue("inputQueueName")) {
        while(_keepWorking) {
            using(var scope = new TransactionScope()) 
            using(var context = new AmbientTransactionContext())
            {
                var message = transport.ReceiveMessage(context);
                if (message == null) {
                    Thread.Sleep(1000); //< don't punish queues too much
                    continue;
                }

                var destination = GetDestinationBasedOnWhatever(message);

                transport.Send(destination, message.ToForwardableMessage());

                scope.Complete();
            }
        }
    }
}

You'd need a separate queue for this special kind of content-based router to work though. Do you think that would work for you?