I have a Rebus bus setup with a single worker and max parallelism of 1 that processes messages "sequentialy". In case an handler fails, or for specific business reason, I'd like the bus instance to immediately stop processing messages.
I tried using the Rebus.Event package to detect the exception in the AfterMessageHandled handler and set the number of workers to 0, but it seems other messages are processed before it can actually succeed in stoping the single worker instance.
Where in the event processing pipeline could I do
bus.Advanced.Workers.SetNumberOfWorkers(0);
in order to prevent further message processing?
I also tried setting the number of workers to 0 inside a catch block in the handler itself, but it doesn't seem like the right place to do it since SetNumberOfWorkers(0)
waits for handlers to complete before returning and the caller is the handler... Looks like a some kind of a deadlock to me.
Thank you
This particular situation is a little bit of a dilemma, because – as you've correctly observed –
SetNumberOfWorkers
is a blocking function, which will wait until the desired number of threads has been reached.In your case, since you're setting it to zero, it means your message handler needs to finish before the number of threads has reached zero... and then: ☠
I'm sorry to say this, because I bet your desire to do this is because you're in a pickle somehow – but generally, I must say that wanting to process messages sequentually and in order with message queues is begging for trouble, because there are so many things that can lead to messages being reordered.
But, I think you can solve your problem by installing a transport decorator, which will bypass the real transport when toggled. If the decorator then returns
null
from theReceive
method, it will trigger Rebus' built-in back-off strategy and start chilling (i.e. it will increase the waiting time between polling the transport).Check this out – first, let's create a simple, thread-safe toggle:
(which you'll probably want to wrap up and make pretty somehow, but this should do for now)
and then we'll register it as a singleton in the container (assuming Microsoft DI here):
We'll use the
ProcessMessages
flag to signal whether message processing should be enabled.Now, when you configure Rebus, you decorate the transport and give the decorator access to the toggle instance in the container:
So, now you'll just need to build the decorator:
As you can see, it'll just return
null
whenProcessMessages == false
. Only thing left is to decide when to resume processing messages again, pullMessageHandlingToggle
from the container somehow (probably by having it injected), and then flick thebool
back totrue
.I hope can work for you, or at least give you some inspiration on how you can solve your problem.