How to listen to the completed event in bull queue - just for the current job

14k Views Asked by At

When running the code below it prints too many results.

My suspicion is that the completed event listens to all the previous jobs made in the current queue instance.

How can I manage the completed event to listen just to the current job completion?

producer.js

The producer creates a job with a default numerical id and listens to the global completion in order to return a response when the job is done.

const BullQ = require('bull');

let bullQ = BullQ('my-first-queue', {
  redis: {
    host: process.env.REDISHOST || 'localhost',
    password: process.env.REDISPASSWORD || ''
  }
});

app.get('/search/:term', async (req, res) => {
  const job = await bullQ.add({
    searchTerm: req.params.term
  });
  
  // Listen to the global completion of the queue in order to return result.
  bullQ.on('global:completed', (jobId, result) => {
    // Check if id is a number without any additions
    if (/^\d+$/.test(jobId) && !res.headersSent) {
      console.log(`Producer get: Job ${jobId} completed! Result: ${result}`);
      res.json(`Job is completed with result: ${result}`);
    }
  });
});

consumer.js

The consumer has 2 roles.

  1. To consume the jobs as it should be by the book
  2. To create new jobs based on the result of the last job.
const BullQ = require('bull');

let bullQ = BullQ('my-first-queue', {
  redis: {
    host: process.env.REDISHOST || 'localhost',
    password: process.env.REDISPASSWORD || ''
  }
});

bullQ.process((job, done) => {
  // Simulate asynchronous server request.
  setTimeout(async () => {
    // Done the first job and return an answer to the producer after the timeout.
    done(null, `Search result for ${job.data.searchTerm}`);
    // next job run
    if (counter < 10) {
      // For the first run the id is just a number if not changed via the jobId in JobOpts,
      // the next time the job id will be set to {{id}}_next_{{counter}} we need only the first number in order not to end with a long and not clear concatenated string.
      let jobID = (/^\d+$/.test(job.id)) ? job.id : job.id.replace(/[^\d].*/,'');
      await createNextJob(jobID, ++counter);
    }
  }, 100);
});

// Create next job and add it to the queue.
// Listen to the completed jobs (locally)
const createNextJob = async (id, counter) => {
  const nextJob = bullQ.add({
    searchTerm: "Next job"
  }, {
    jobId: `${id}_next_${counter}`
  });

  await bullQ.on('completed', (job, result) => {
    job.finished();
    console.log(`Consumer(next): Job ${job.id} completed! Result: ${result}`);
  });
};
1

There are 1 best solutions below

3
On

You can await job.finished() to get your result specific to a job object returned by queue.add().

Here's a simple, runnable example without Express to illustrate:

const Queue = require("bull"); // "bull": "^4.8.5"
const {setTimeout} = require("node:timers/promises");

const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
  await setTimeout(job.data.seconds * 1000); // waste time
  return Promise.resolve(`job ${job.id} complete!`);
});

(async () => {
  const job = await queue.add({seconds: 5});
  const result = await job.finished();
  console.log(result); // => job 42 complete!
  await queue.close();
})();

With Express:

const Queue = require("bull");
const express = require("express");
const {setTimeout} = require("node:timers/promises");

const queue = new Queue("test", process.env.REDIS_URL);
queue.process(4, async job => {
  await setTimeout(job.data.seconds * 1000);
  return Promise.resolve(`job ${job.id} complete!`);
});

const app = express();
app
  .set("port", process.env.PORT || 5000)
  .get("/", async (req, res) => {
    try {
      const job = await queue.add({
        seconds: Math.abs(+req.query.seconds) || 10,
      });
      const result = await job.finished();
      res.send(result);
    }
    catch (err) {
      res.status(500).send({error: err.message});
    }
  })
  .listen(app.get("port"), () => 
    console.log("app running on port", app.get("port"))
  );

Sample run:

$ curl localhost:5000?seconds=2
job 42 complete!