Duplicate messages from "reply-to" queues to another queue

976 Views Asked by At

I'm using rabbitMq mainly in RPC mode, but I also want to copy both request and responses messages to another queue.

In the end, what I want to achieve, is the possibility for an external consummer to see all the trafic by listening to a queue, let's call it a "logging queue".

Copy incoming messages is ok, I just need to use a fanout exchange or bind my logging queue to the used exchange with the same routing key as the one used by RPC call.

But I can't manage to find a way to "fanout" messages sent via the direct reply-to feature.

So far, I understand that the response messages are sent to the default direct exchange with a generated routing_key in the form of amqp.rabbitmq.reply-to.generatedName and, since the default exchange is untouchable, I can't duplicate thoses messages.

Do you know any way to do it?

I have a solution which I would rather avoid: having the client re-send the response received from the reply-to to the "logging queue".

but that means my client is responsible of this "logging" feature and I would rather not.

by the way, even if I don't think it's relevant because it's probably a rabbitMq server configuration problem, I use Spring-AMQP client

2

There are 2 best solutions below

1
Gary Russell On

You can't do it with fixed reply-to because there is no real queue/exchange.

You can, however, configure each RabbitTemplate to use a fixed reply queue and a reply container to direct replies from that queue to the template.

Documentation here.

Furthermore, when using this mechanism, you can configure the template's replyAddress to be in the form of an exchange and routing key.

/**
 * An address for replies; if not provided, a temporary exclusive, auto-delete queue will
 * be used for each reply, unless RabbitMQ supports 'amq.rabbitmq.reply-to' - see
 * https://www.rabbitmq.com/direct-reply-to.html
 * <p>The address can be a simple queue name (in which case the reply will be routed via the default
 * exchange), or with the form {@code exchange/routingKey} to route the reply using an explicit
 * exchange and routing key.
 * @param replyAddress the replyAddress to set
 */
public synchronized void setReplyAddress(String replyAddress) {...}

You simply set up the template and container as normal, making the template the container's listener...

@Bean
public RabbitTemplate amqpTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setMessageConverter(msgConv());
    rabbitTemplate.setReplyAddress(replyQueue().getName());
    rabbitTemplate.setReplyTimeout(60000);
    rabbitTemplate.setUseDirectReplyToContainer(false);
    return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueues(replyQueue());
    container.setMessageListener(amqpTemplate());
    return container;
}

Again, this is from the documentation.

0
minioim On

I have found a way to achieve what I wanted.

This is not as flexible as the way Gary Russell proposed : https://stackoverflow.com/a/59976806/2546702

but, I can make use of the firehose feature and bind a queue (or an exchange for more control) on the amq.rabbitmq.trace exchange with the routingkey set to "publish." (the dot at the end is important)

That allows to log messages published to the default exchange, including reply-to messages.

of course, using firehose has a performance impact, but in my case, it's not a big deal since rabbitmq is underused.

Since I have 16 queues to listen, I'd rather not use a different template for each queue. I could use a single reply queue for all RPC queues, but that would be a bottleneck.

so, if there is no hard NOGO, firehose seems a good option.