Use NodeJS Bull Queue seperate processor

2.4k Views Asked by At

I'm trying to run the process in different processor file itsef as its in bull documentation, I've added the file as below.

// -------- Queue.js ----------

formatQueue.process(__dirname + "/processors/format-worker.js");


// On Cmplete Handler

formatQueue.on('completed', function(job, result){
    console.log(result, "Format-Complete-job"); // result is undefined
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

// -------- Queue.js ends ---------

//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        return Promise.resolve(data);
    });
}

Now On Job complete which I was using before, I used to get job data with updated job parameters. Now I'm not getting updated job data. and the second parameter which is there in the documentation i.e result is undefined. Now how can I get the updated job data in this case.

The job and the processors are working fine it I run the process as below.

formatQueue.process(function(job, done){
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        done();
    });
});

In this case, The job data itself is getting updated and that works aswell.

I have multiple queues doing the different jobs, Once the previous job is finished with success, then only I need the other job to start the work. I have another question and I've mentioned the usecase there. please check it here.

1

There are 1 best solutions below

0
On

Let me help you and also give you some hints. First of all there are some errors in your code. The process function is not returning the promise that you create inside the validate callback. Now, I do not know what Validator.Format.validate returns, so to be on the safe side I will write it like this:

module.exports = job => {
  return new Promise(resolve => {
    Validator.Format.validate(job.data, data => {
      resolve(data);
    });
  });
};

Secondly, in general it is more robust to add the next job inside the process handler itself instead of on the "completed" event callback, the reason is that by doing so you will get a much more robust solution, in the case adding the next job fails for some reason, the job will fail and you will be able to retry or examine why it failed and so on.

module.exports = (job) => {
  return new Promise(resolve => {
    Validator.Format.validate(job.data, (data) => {
      if(data.is_well_format){
        resolve(existenceQueue.add(data, { attempts: 3, backoff: 1000 }));
      }else {
        resolve(QueueModel.lastStep(data))
      }
    });
  }
});