Basing my architecture off of this example:
Setup:
- Workers get one message at a time
- each Worker downloads a document, takes a few seconds
- once a Worker successfully downloads a doc, it acknowledges the message
- if a worker fails to download a document, it noAcks the message for re-queueing (max of three re-tries)
I'm looking into bottlenecks in my implementation that's causing slowdowns. Since I use noAck to re-queue failed workers. To enable this evenly across my worker threads I've set prefetch to 1.Looking at this question: RabbitMQ work queue is blocking consumers - They are seeing what I see in the below screenshots:
To make sure workers are only assigned a message at a time, I need the pre-fetch set to 1, but others say this causes workers to work sequentially rather than in parallel.
What does Running actually mean at the channel level? I see the queue and connection are running properly, but individual channels (one per thread) are idling.
EDIT #1: This note about passing a connection pool to the RabbitMQ connection looks promising. https://www.rabbitmq.com/api-guide.html#consumer-thread-pool I'm using Spring AMQP but I think a similar approach can be used here:
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
//configure a large thread pool for the connection thread
int threads = 30;
logger.info(String.format("Configuring thread pool with %d threads", threads));
ExecutorService connectionPool = Executors.newFixedThreadPool(threads);
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
return cf;
}
@Bean
AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){
AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
Apparantly in Spring AMQP the default thread pool for a single physical TCP/IP connection is defaulted to 5 threads:
Spring AMQP
I was able to replicate this by changing the number of threads assigned to RabbitMQ's connection pool:
In the above code I have the number of threads per connection mirroring the number of virtual channels i.e. one real physical thread for each virtual RabbitMQ channel as each channel references a worker thread processing one message each. This results in the channels no longer blocking on the default 5 connections and instead taking full advantage of the expanded number of threads:
Manipulating the number of threads available to RabbitMQ's connection will show channels blocking. Setting to 10 threads and opening 50 channels for example.