AMQP RabbitMQ consumers blocking eachother?

1.7k Views Asked by At

I have coded up a C (rabbitmq-c) worker app which consumes a queue published by a Python script (pika).

I have the following strange behaviour which I can't seem to solve:

  1. Starting all the workers before messages are published to the queue works as expected
  2. Starting 1 worker after the queue has been published works as expected
  3. HOWEVER: Starting additional workers after a worker has started consuming from the queue means that those workers don't see any messages on the queue (message count=0) and therefore just wait (eventhough there are meant to be many messages still on the queue). Killing the first worker will suddently start messages flowing to all the other (waiting) consumers.

Any ideas what could be going on?

I've tried making sure that each consumer has it's own channel (is this necessary?) but still the same behaviour...

Here's the code for the consumer (worker):

conn = amqp_new_connection();
sock = (amqp_socket_t *)(uint64_t)amqp_tcp_socket_new(conn);
amqp_socket_open(sock, "localhost", 5672);
amqp_login(conn,
           "/",
           0,
           131072,
           0,
           AMQP_SASL_METHOD_PLAIN,
           "guest",
           "guest");

if (amqp_channel_open(conn, chan) == NULL)
    LOG_ERR(" [!] Failed to open amqp channel!\n");

if ((q = amqp_queue_declare(conn,
                            chan,
                            amqp_cstring_bytes("ranges"),
                            0,
                            0,
                            0,
                            0,
                            amqp_empty_table)) == NULL)
    LOG_ERR(" [!] Failed to declare queue!\n");

LOG_INFO(" [x] Queue (message count = %d)\n", q->message_count);

amqp_queue_bind(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, amqp_empty_table);
amqp_basic_consume(conn, chan, amqp_cstring_bytes("ranges"), amqp_empty_bytes, 0, 0, 0, amqp_empty_table);

while(1) {
    amqp_maybe_release_buffers(conn);
    amqp_consume_message(conn, &e, NULL, 0);

    {
        int n;
        amqp_frame_t f;
        unsigned char buf[8];
        unsigned char *pbuf = buf;

        amqp_simple_wait_frame(conn, &f);       // METHOD frame
        amqp_simple_wait_frame(conn, &f);       // HEADER frame

        n = f.payload.properties.body_size;
        if (n != sizeof(range_buf))
            LOG_ERR(" [!] Invalid message size!");

        while (n) {
            amqp_simple_wait_frame(conn, &f);   // BODY frame
            memcpy(pbuf,
                   f.payload.body_fragment.bytes,
                   f.payload.body_fragment.len);
            n -= f.payload.body_fragment.len;
            pbuf += f.payload.body_fragment.len;
        }

        // do something with buf

        LOG_INFO(" [x] Message recevied from queue\n");
    }

    amqp_destroy_envelope(&e);

    amqp_maybe_release_buffers(conn);
}
2

There are 2 best solutions below

1
On BEST ANSWER

The problem here is most likely that your consumer pre-fetches all messages when started. This is default behavior by RabbitMQ, but you can reduce the number of messages pre-fetched by the consumer, to allow you to better spread the workload across multiple workers.

This simply means that one or more of the consumers will pick up all the messages, and leave none for the new consumers.

If you apply qos to your consumer and limit the pre-fetch to lets say 10 messages. The consumer will only queue up the 10 first messages, and the new consumers can pick up the slack.

The function you are looking for to implement this is called amqp_basic_qos, and in addition you can read more about consumer-prefetch here.

1
On

This might help you

Message acknowledgment

Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.

But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.

In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message has been received, processed and that RabbitMQ is free to delete it.

If a consumer dies without sending an ack, RabbitMQ will understand that a message wasn't processed fully and will redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.

There aren't any message timeouts; RabbitMQ will redeliver the message only when the worker connection dies. It's fine even if processing a message takes a very, very long time.

Message acknowledgments are turned on by default.