RabbitMQ.Client Consume and send to another queue

1.6k Views Asked by At

I am trying to consume RabbitMQ messages in C# using RabbitMQ.Client library and get the list of messages and add it to an array before returning the array. Following this, I was hoping to process this array and send it to a new queue. So my first approach is below:

public async Task<List<string>> ProcessMessages() 
{
var factory = new ConnectionFactory { HostName = "local"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "myqueue");

var consumer = new EventingBasicConsumer(channel);

var messages= new List<string>();

consumer.Received += async (model, ea) =>
{
    var body = ea.Body.ToString();

    // Do something with the to the body...

    messages.Add(body);

};

channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);

return messages;
}

It seems that the messages array returned is always empty. Could anyone help me out on understanding why please?

The other approach I tried is to do it within the existing Received handler:

public async Task ProcessMessages()
{
    var factory = new ConnectionFactory { HostName = "local" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.QueueDeclare(queue: "myqueue");

    var consumer = new EventingBasicConsumer(channel);

    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToString();
        using var channel = connection.CreateModel();

        channel.QueueDeclare(queue: "NewQueue1",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);


        var newMsgBytes[] = new Byte[] // Do some processing of message and send new  message to new queue

            channel.BasicPublish(exchange: string.Empty,
                    routingKey: "NewQueue1",
                    basicProperties: null,
                    body: newMsgBytes);

    };

    channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);

}

This also does not seem to work as the new messages is not sent to a queue. Any suggestions please? Thanks

4

There are 4 best solutions below

0
Sarvesh Mishra On

Try reading this way (instead of ea.Body.ToString(), use ea.Body.ToArray())

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
};

More details are here https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

0
Spikeh On

In your first example you are not adding anything to the messages list (which is being returned), only receivedQMessages. Unless you are modifying messages elsewhere, that might be why you're not seeing anything "in the queue".

Secondly, note that if you set autoAck to true, the message will be deleted once you have received it.

0
sep7696 On

you Should Convert eventArgs Body To array This is Simple Consumer :

var factory = new ConnectionFactory {
HostName = "localhost"
};
//Create the RabbitMQ connection using connection factory details 
var connection = factory.CreateConnection();

var channel = connection.CreateModel();
//declare the queue after mentioning name and a few property related to that
channel.QueueDeclare("myqueue", exclusive: false);
//Set Event object which listen message from chanel which is sent by producer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) => {
    var body = eventArgs.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($ "myqueue message received: {message}");
};
//read the message
channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer);
0
R.Abbasi On

When you assign the consumer method, it will run in another task and the result needs to be sharable among the tasks. you have to watch the shared result with another background task. You must use ConcurrentBag to share the list. You can not return the result because you don't know when the connection will be established and the consumer starts to consume. So create a static ConcurrentBag property. Add your messages to it and process your ConcurrentBag in a BackgroundService.

The fallacy is that the acknowledgment and processing phase won't happen transactionally. So there is a chance that the messages are gathered in the ConcurrentBag (and deleted from the queue) but the processing phase fails.