I am building a microservices product using RabbitMQ as the connector between a task creator and worker nodes. The task creator (producers) will push tasks onto a single RabbitMQ queue. We will have multiple consumers that should process these time-consuming jobs (15+ minutes) in parallel, each task being assigned to one consumer, and each consumer processing one job at a time.
The jobs finish below the default 30 minute Delivery Acknowledgement Timeout, but end up getting their consumer's connection closed because of missed heartbeats from client, default timeout of 60s. What is the difference between these two timeouts? Is there a way that I can have my consumer process the long running job while blocking from consuming new tasks, and continuing to send heartbeats so the connection will not close without having to increase the heartbeat timeout?
What is the recommended way to use RabbitMQ to queue and process time-consuming jobs in parallel?
Producer code (javascript):
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection){
if(error0){
throw error0;
}
connection.createChannel(function(error1, channel){
if(error1){
throw error1;
}
var queue = "task_queue";
var msg = `Hello World ${Math.random()}`
channel.assertQueue(queue, {
durable: true,
});
channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true,
});
console.log(" [x] Sent %s", msg);
})
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});
Consumer code (Python):
import pika, sys, os, time
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue="task_queue", durable=True)
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
time.sleep(15*60)
print(f" [x] Finished {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="task_queue",
on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print("Interrupted")
try:
sys.exit(0)
except SystemExit:
os._exit(0)
Result in RabbitMQ console:
[error] <0.4929.0> missed heartbeats from client, timeout: 60s