pika with celery, connection closed

1.6k Views Asked by At

I'm using celery with RabbitMQ to run some tasks, sometimes I need to return a message from the workers back to the RabbitMQ so i'm using pika.

I'm currently using the BlockingConnection() in order to connect to the RabbitMQ but after a while I get an exception "Connection Lost".

I believe that it happens because celery is asynchronous and I'm using the BlockingConnection().

This is my code:

class RabbitConnection(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=RABBITMQ_OUT_NAME, durable=True)
        self.channel.confirm_delivery()

    def add_alert(self, new_alert):
        message = new_alert.to_json()
        delivered = self.channel.basic_publish(exchange='',
                                               routing_key=RABBITMQ_OUT_NAME,
                                               body=message,
                                               properties=pika.BasicProperties(
                                                   delivery_mode=2,
                                                   content_type='application/json',
                                               ))

Should I use a different connection? if so how should I use it?

2

There are 2 best solutions below

1
On BEST ANSWER

It sounds like this could be a threading issue. You can handle requests with Pika over multiple threads, but ideally you should have one connection per thread, or use locking. Instead of adding additional complexity to your code I would recommend that you use a thread-safe library; such as amqp-storm or rabbitpy.

If you would implement this using my AMQP-Storm library, the code would look something like this.

import amqpstorm

class RabbitConnection(object):
    def __init__(self):
        self.connection = amqpstorm.Connection('localhost', 'guest', 'guest')
        self.channel = self.connection.channel()
        self.channel.queue.declare(queue=RABBITMQ_OUT_NAME, durable=True)
        self.channel.confirm_deliveries()

    def add_alert(self, new_alert):
        message = new_alert.to_json()
        delivered = self.channel.basic.publish(exchange='',
                                               routing_key=RABBITMQ_OUT_NAME,
                                               body=message,
                                               properties={
                                                   'delivery_mode': 2,
                                                   'content_type': 'application/json',
                                                })
0
On

If you're willing to sacrifice ~5ms of latency per request:

import pika

class PikaClient:
    def __init__(self):
        self.connection = None
        self.channel = None

    def __enter__(self):
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
        channel.queue_declare(queue='myqueue')
        self.connection = connection
        self.channel = channel
        return self

    def __exit__(self, type, value, traceback):
        self.connection.close()

    def messageSent(self, msgStr):
        self.channel.basic_publish(exchange='', routing_key='myqueue', body=msgStr)

and then when you want to send a message:

with PikaClient() as pClient: 
    pClient.messageSent("my message")

Depending on your application, 5ms of latency may be a price worth paying. If your app becomes successful you'll probably want to rewrite anyway, and then you can use a language with better multithreading features such as Java.