Why the nodejs with amqplib consume function is closure?

1.2k Views Asked by At

I use the nodejs amqplib module to connect rabbitmq. I found the consume function is become a closure function, but I couldn't understand why. I didn't use closure.

My code is below. I found the corr in the returnOK still get the first time value. When I fire this function second times. The corr still the value at first time. I think that is odd. Someone could explain this?

  const corr = new Date().getTime();
  try {
    const params = JSON.stringify(req.body);
    console.log('corr first =', corr);
    await ch.sendToQueue(q, Buffer.from(params), {
      deliveryMode: true,
      correlationId: corr.toString(),
      replyTo: queue.queue,
    });

    const returnOK = (msg) => {
      if (msg.properties.correlationId === corr.toString()) {
        console.info('******* Proxy send message done *******');
        res.status(HTTPStatus.OK).json('Done');
      }
    };
    await ch.consume(queue.queue, returnOK, { noAck: true });
  } catch (error) {
    res.status(HTTPStatus.INTERNAL_SERVER_ERROR).json(error);
  }
2

There are 2 best solutions below

3
On BEST ANSWER

It appears you're calling ch.consume on every request, in effect creating a new consumer every time. You should only do that once.

What is happening is that the first consumer is picking up the messages.

To fix this, you probably want to move ch.consume outside the request handler.

0
On

I see this is kinda obsolete, but I also came across a similar problem when writing a POST request that waits for a message to be processed before sending a response to the user.

My case

First I tried to make a new channel on each post-request, but it seemed not optimal, so somehow I found that channel.consume method has a consumerTag property in args, and you can cancel your consuming subscription by this tag, like this:

    app.post('/process', async (req, res) => {
        const requestData = req.body;
        
        const requestId = uuidv4()
    
        await sendToQueue({ id: requestId, data: requestData });
    
        const responseData = await new Promise(async (resolve) => {
            const consumerTag = `${requestId}-consumer`
            await channel.consume(
            'responses',
            (message) => {
              const data = JSON.parse(message.content.toString());
              channel.ack(message);
              channel.cancel(consumerTag);
              resolve(data);
            },
            { noAck: false, consumerTag: consumerTag }
          );
        });

        console.log("responseData", responseData)

        res.status(200).json(responseData);
    });

So for you it will be something like this, I guess:

    const consumerTag = "*your tag*"
    const returnOK = (msg) => {
      if (msg.properties.correlationId === corr.toString()) {
        console.info('******* Proxy send message done *******');
        res.status(HTTPStatus.OK).json('Done');
      }
      ch.cancel(consumerTag)
    };
    await ch.consume(queue.queue, returnOK, { noAck: true, consumerTag: consumerTag});