Multiple vhost connection inside same project (Subscribe / Publish)

1.3k Views Asked by At

Our requirement is like below :-

  1. Exchange 1 is a topic exchange and queue 1 is bound to it. It is on VHOST 1.
  2. Application is subscribed to the queue 1. It processes the message of queue 1. After processing of queue 1 message, we want to publish next message to different exchange which is on VHOST 2 (different rabbit connection)

I have below questions :-
a) Is it possible to implement this without federation ?
a b) In a same application, Can I maintain 2 different rabbit connections ?

We are using using EasynetQ as a client to connect with rabbitmq.

Can you please share some sample on this.

Thanks in advance.

2

There are 2 best solutions below

0
On

a) Yes, you can also create a shovel between vhosts, which is simpler than a federation

b) Yes I don't see a problem with creating multiple IBus instances, as long as you use different DI (sub) containers per bus instance, so there's an added complexity.

0
On

Here is how I handle multiple connections. I can't find a solution directly from EasyNetQ. I don't use the default DI adaptors for MS DI. And I only use the advance api and inject the service I need manually. So far it seems working but it needs more test for sure.

in startup.cs / ConfigureServices

            services.AddBusStation(busStationBuilder =>
            {
                // inject IBusStation and get the bus thru name
                appSettings.RabbitMQSettings.Connections.ForEach(c =>
                {
                    var taskQueueBus = RabbitHutch.CreateBus(c.ConnectionString, CustomServiceRegister.ServiceRegisterAction());
                    c.Exchanges.ForEach(async e =>
                    {
                        await taskQueueBus.Advanced.ExchangeDeclareAsync(e.Name, e.Type, e.Durable, e.AutoDelete);
                    });
                    busStationBuilder.Add(c.Name, taskQueueBus.Advanced);
                    busStationBuilder.AddDefaultBus(taskQueueBus);
                });
            });
    public interface IBusStation
    {
        IBus DefualtBus { get; }
        IAdvancedBus Get(string busName);
        void Add(string busName, IAdvancedBus advancedBus);
        void Add(IBus bus);
    }

    public class BusStation : IBusStation
    {
        private Dictionary<string, IAdvancedBus> BusList { get; set; } = new Dictionary<string, IAdvancedBus>();
        public IBus DefualtBus { get; private set; }
        public IAdvancedBus Get(string busName)
        {
            if (BusList.TryGetValue(busName, out IAdvancedBus advancedBus))
            {
                return advancedBus;
            }
            return null;
        }
        public void Add(string busName, IAdvancedBus advancedBus)
        {
            BusList.Add(busName, advancedBus);
        }
        public void Add(IBus bus)
        {
            this.DefualtBus = bus;
        }
    }

    public class BusStationBuilder
    {
        private readonly IBusStation _BusStation;

        public BusStationBuilder(IServiceCollection services, IBusStation busStation)
        {
            this._BusStation = busStation;
            services.AddSingleton(busStation);
        }
        public BusStationBuilder Add(string busName, IAdvancedBus advancedBus)
        {
            _BusStation.Add(busName, advancedBus);
            return this;
        }
        public BusStationBuilder AddDefaultBus(IBus bus)
        {
            _BusStation.Add(bus);
            return this;
        }
    }

    public static class DependencyExtensions
    {
        public static IServiceCollection AddBusStation(this IServiceCollection services, Action<BusStationBuilder> builder)
        {
            var busStationBuilder = new BusStationBuilder(services, new BusStation());
            builder(busStationBuilder);
            return services;
        }
    }

appsettings.json


    "RabbitMQSettings": {
        "DefaultQueue": "task.main",
        "Connections": [
            {
                "Name": "Task_Queue",
                "ConnectionString": "host=192.168.123.123;virtualHost=/;username=admin;password=password123;prefetchCount=1;persistentMessages=true;publisherConfirms=true",
                "Exchanges": [
                    {
                        "Name": "Direct_Task_Queue",
                        "Type": "direct",
                        "Passive": false,
                        "Durable": true,
                        "AutoDelete": false,
                        "Internal": false,
                        "AlternateExchange": null,
                        "Delayed": false
                    }
                ]
            }
        ]
    },