When running the code below it prints too many results.
My suspicion is that the completed
event listens to all the previous jobs made in the current queue instance.
How can I manage the completed event to listen just to the current job completion?
producer.js
The producer creates a job with a default numerical id and listens to the global completion in order to return a response when the job is done.
const BullQ = require('bull');
let bullQ = BullQ('my-first-queue', {
redis: {
host: process.env.REDISHOST || 'localhost',
password: process.env.REDISPASSWORD || ''
}
});
app.get('/search/:term', async (req, res) => {
const job = await bullQ.add({
searchTerm: req.params.term
});
// Listen to the global completion of the queue in order to return result.
bullQ.on('global:completed', (jobId, result) => {
// Check if id is a number without any additions
if (/^\d+$/.test(jobId) && !res.headersSent) {
console.log(`Producer get: Job ${jobId} completed! Result: ${result}`);
res.json(`Job is completed with result: ${result}`);
}
});
});
consumer.js
The consumer has 2 roles.
- To consume the jobs as it should be by the book
- To create new jobs based on the result of the last job.
const BullQ = require('bull');
let bullQ = BullQ('my-first-queue', {
redis: {
host: process.env.REDISHOST || 'localhost',
password: process.env.REDISPASSWORD || ''
}
});
bullQ.process((job, done) => {
// Simulate asynchronous server request.
setTimeout(async () => {
// Done the first job and return an answer to the producer after the timeout.
done(null, `Search result for ${job.data.searchTerm}`);
// next job run
if (counter < 10) {
// For the first run the id is just a number if not changed via the jobId in JobOpts,
// the next time the job id will be set to {{id}}_next_{{counter}} we need only the first number in order not to end with a long and not clear concatenated string.
let jobID = (/^\d+$/.test(job.id)) ? job.id : job.id.replace(/[^\d].*/,'');
await createNextJob(jobID, ++counter);
}
}, 100);
});
// Create next job and add it to the queue.
// Listen to the completed jobs (locally)
const createNextJob = async (id, counter) => {
const nextJob = bullQ.add({
searchTerm: "Next job"
}, {
jobId: `${id}_next_${counter}`
});
await bullQ.on('completed', (job, result) => {
job.finished();
console.log(`Consumer(next): Job ${job.id} completed! Result: ${result}`);
});
};
You can
await job.finished()
to get your result specific to ajob
object returned byqueue.add()
.Here's a simple, runnable example without Express to illustrate:
With Express:
Sample run: