Consume RabbitMq using Pika and push using Socket.io

764 Views Asked by At

I am building a service that receives messages from rabbitmq using pika. and push messages to clients using socket.io.

The socket.io server and pika server are both blocking the main thread.
This will be the same also for celery with flask or Django.

What is the proper approach to solving this and run them both under the same context?

1

There are 1 best solutions below

0
Chandan On

You can use the Pub/Sub model, Start the consume process in another thread register user that want to receive from the queue and send data to subscribed users.

import json
import pika
import gevent
from flask import Flask
from flask_sockets import Sockets

connection_url = 'localhost'
channel_queue = 'test'

class PubSubListener(threading.Thread):
    def __init__(self, queue_name):
        threading.Thread.__init__(self)

        self.clients = []
        self.queue_name = queue_name

        connection = pika.BlockingConnection(pika.ConnectionParameters(connection_url))
        self.channel = connection.channel()
        self.channel.queue_declare(queue=self.queue_name)

        threading.Thread(target=self.channel.basic_consume(queue=self.queue_name,
            auto_ack=True,
            on_message_callback=self._callback))

    def run(self):
        self.channel.start_consuming()

    def publish(self, body):
        self.channel.basic_publish(exchange='',
            routing_key=self.queue_name,
            body=body)

    def subscribe(self, client):
        self.clients.append(client)

    def _callback(self, channel, method, properties, body):
        time.sleep(0.001)
        message = json.loads(body)
        print(message)
        self.send(message)

    def send(self, data):
        for client in self.clients:
            try:
                client.send(data)
            except Exception:
                self.clients.remove(client)

pslistener = PubSubListener(channel_queue)

app = Flask(__name__)
sockets = Sockets(app)

@sockets.route('/echo')
def echo_socket(ws):
    pslistener.subscribe(ws)

    while not ws.closed:
        gevent.sleep(0.1)

@app.route('/')
def hello():
    return 'Hello World!'


if __name__ == "__main__":
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler

    pslistener.start()

    print("Started")
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()