Multiple workers consume same message from RabbitMQ queue

4.2k Views Asked by At

I use py-amqp module and Python 3.4 When I run more than 1 listener and start one producer to publish messages listeners takes one message and start to process it simultaniously. I do not need that kind of behaviour because messages should be written to DB only once. So fastest worker write message to DB and all other workers say that message already exists.

producer:

import json
import amqp
import random
from application.settings import RMQ_PASSWORD, RMQ_USER, RMQ_HOST, RMQ_EXCHANGE

def main():
    conn = amqp.Connection(RMQ_HOST, RMQ_USER,
                           RMQ_PASSWORD, ssl=False)
    ch = conn.channel()
    ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
    req = {"request": {"transaction_number": random.randint(100000, 9999999999)}}
    message = json.dumps(req)    
    msg = amqp.Message(message)    
    ch.basic_publish(msg, RMQ_EXCHANGE)    
    ch.close()
    conn.close()

if __name__ == '__main__':
    for x in range(100):
        main()

worker:

from functools import
from pipeline import pipeline, dal
from settings import DB_CONNECTION_STRING, RMQ_EXCHANGE, RMQ_HOST, RMQ_PASSWORD, RMQ_USER    
import amqp


DB = dal.DAL(DB_CONNECTION_STRING)
message_processor = pipeline.Pipeline(DB)


def callback(channel, msg):
    channel.basic_ack(msg.delivery_tag)
    message_processor.process(msg)

    if msg.body == 'quit':
        channel.basic_cancel(msg.consumer_tag)


def main():
    conn = amqp.Connection(RMQ_HOST, RMQ_USER,
                           RMQ_PASSWORD, ssl=False)
    ch = conn.channel()
    ch.exchange_declare(RMQ_EXCHANGE, 'fanout')
    qname, _, _ = ch.queue_declare()
    ch.queue_bind(qname, RMQ_EXCHANGE)
    ch.basic_consume(qname, callback=partial(callback, ch))
    while ch.callbacks:
        ch.wait()
    ch.close()
    conn.close()

if __name__ == '__main__':
    print('Listener starting')
    main()

also:

user@RabbitMQ:~$ sudo rabbitmqctl list_bindings
Listing bindings ...
        exchange        amq.gen--crTjfeSlue6gw0LRwW7pQ  queue   amq.gen--crTjfeSlue6gw0LRwW7pQ  []
        exchange        amq.gen-1X3vwGF5OKn_gcnofpJKFg  queue   amq.gen-1X3vwGF5OKn_gcnofpJKFg  []
...
        exchange        amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  queue   amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  []
        exchange        entryapi.test   queue   entryapi.test   []
entryapi        exchange        entryapi.test   queue           []
azaza   exchange        amq.gen--crTjfeSlue6gw0LRwW7pQ  queue           []
azaza   exchange        amq.gen-1X3vwGF5OKn_gcnofpJKFg  queue           []
...
azaza   exchange        amq.gen-yf8ieG1AK9x83Vz4GBj-ZA  queue           []
azaza   exchange        entryapi.test   queue           []
...done.
1

There are 1 best solutions below

1
On

I think you are using the wrong type of set up for your use case. You have a publisher publishing to an exchange and you want to read the messages and write them to a DB. You want to do this with many consumers writing to the DB so that you increase throughput. Fanout exchanges replicate the message so multiple queues and consumers will result in multiple writes of the same data to the DB. You need to use 'Work Queues'. Each exchange will be a default (no type, or a direct exchange with all messages using the same routing key) exchange. All messages sent to the exchange will be directed to one single queue. Each queue will have multiple consumers. Each message will be read from the queue once and once only by a single consumer from your group of consumers, then will only be written once to the DB.

Read more here http://www.rabbitmq.com/tutorials/tutorial-two-python.html