Is it possible to use InMemory Transport for competing consumers in Rebus?

1k Views Asked by At

Is it possible to use Rebus InMemory Transport to implement competing consumers, like the sample RabbitScaleout ?

Context: I have a monolith application that I have to break up and scale certain parts. We have to maintain the monolith deployment, while working on container / cloud deployment.

My intention was to refactor, use Rebus for the coordination between the services and use InMemoryTransport for the monolith and RabbitMQ for the containerized services. Would this work ?

The code below results in both handlers being called.

class Program
    {
        static void Main(string[] args)
        {
            using (var activator = new BuiltinHandlerActivator())
            {
                activator.Register(() => new Handler1());
                activator.Register(() => new Handler2());

                Configure.With(activator)
                    .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "queue"))
                    .Logging( t=> t.None())
                    .Routing( r=> r.TypeBased().Map<string>("queue"))
                    .Start();


                activator.Bus.Send("test");

                Thread.Sleep(1000);
                    
            }
        }
    }


    class Handler1 : IHandleMessages<String>
    {
        public async Task Handle(String message)
        {
            Console.WriteLine("Handler1 " + message);
        }
    }

    class Handler2 : IHandleMessages<String>
    {
        public async Task Handle(string message)
        {
            Console.WriteLine("Handler2 " + message);
        }
    }
1

There are 1 best solutions below

1
On

The in-mem transport can most certainly be used in a competing consumers scenario.

The trick is to pass the same InMemNetwork instance to two bus instances that have the same input queue name – then they will distribute the messages among all of the instances.

// buses will be connected on this network
var network = new InMemNetwork();

using (var activator1 = new BuiltinHandlerActivator())
using (var activator2 = new BuiltinHandlerActivator())
{
    // when we're doing "competing consumers", we should probably execute
    // the same logic – therefore, we register the same handler twice here:
    activator1.Register(() => new Handler1());
    activator2.Register(() => new Handler1());

    // start bus 1
    Configure.With(activator1)
        .Transport(t => t.UseInMemoryTransport(network, "queue"))
        .Logging( t=> t.None())
        .Routing( r=> r.TypeBased().Map<string>("queue"))
        .Start();

    // start bus 2
    Configure.With(activator2)
        .Transport(t => t.UseInMemoryTransport(network, "queue"))
        .Logging( t=> t.None())
        .Routing( r=> r.TypeBased().Map<string>("queue"))
        .Start();

    // always AWAIT here
    await activator1.Bus.Send("test");
    //
    // - if that's not possible, use the sync bus:
    activator1.Bus.Advanced.SyncBus.Send("test");

    Thread.Sleep(1000);
}

While this will work, I'm not sure I will recommend it though – the in-mem transport, by virtue of being in memory, does not provide any kind of durability, so you'll lose all messages in all queues if the InMemNetwork instance is lost (e.g. due to the process restarting, or whatever).

Also, if the intent is to parallelize message processing, I'm not sure you would want to it that way, because the buses would be running in the same process and thus compete for the same resources – so the effect of doing this would simply be another kind of parallelism... and you could more easily parallelize message processing by either

a) increasing parallelism

Configure.With(...)
    .(...)
    .Options(o => o.SetMaxParallelism(100))
    .Start();

(here enabling up to 100 messages to be processed in parallel, if their work is Task-based – typically I/O bound stuff),

or b) increasing the number of worker threads

Configure.With(...)
    .(...)
    .Options(o => o.SetNumberOfWorkers(10))
    .Start();

(which mostly makes sense if your transport does not have a Task-based API) (here configuring Rebus to spawn 10 dedicated threads for message processing).