Bull Queue: Missing process handler for job type __default__

108 Views Asked by At

We are getting a 'failed' and 'error' event with the following message: Missing process handler for job type default. We are not defining named processors.

Versions in use:

bull: ^4.10.2
Nodejs: 18

Sample code for 1st microservice:

const Bull = require('bull');

function getBullQueueObj(queueName, logger) {
  const queue = new Bull(queueName, {
    redis: {
      port: process.env.PORT,
      host: process.env.URL,
      password: process.env.PASSWORD || '',
      maxRetriesPerRequest: null,
      enableReadyCheck: false,
      connectTimeout: 60000,
      showFriendlyErrorStack: true,
      reconnectOnError: () => {
        return true;
      },
      connectionName: 'testconnectionName'
    },
    prefix: 'testPrefix',
    defaultJobOptions:{
      removeOnComplete: true,
      removeOnFail: true,
      attempts: 10,
      delay: 2000
    }
  });

  queue.on('error', (error) => {
    logger.error(`Queue threw error `, {error});
  });
  queue.on('stalled', (job) => {
    logger.info(`Job (${job.id}) of queue (${job.queue.name}) got stalled`, {job});
  });
  queue.on('failed', (job, err) => {
    logger.error(`Job (${job.id}) of queue (${job.queue.name}) got failed`, {err, job});
  });
  queue.on('waiting', (jobId) => {
    logger.info(`Job id (${jobId}) is waiting to be processed`);
  });
  queue.on('active', (job, jobPromise) => {
    logger.info(`Job (${job.id}) of queue (${job.queue.name}) got started`, {job});
  });

  let reconnectingEvent = false;
  queue.clients.forEach((cli) => {
    cli.on('error', (err) => {
      logger.error(`[${queue.name}] Redis threw error `, err);
    });
    cli.on('ready', () => {
      logger.info(`[${queue.name}] Redis is connected ...`);
      if (reconnectingEvent) {
        reconnectingEvent = false;
        queue.run(1);
      }
    });
    cli.on('connect', () => {
      logger.info(`[${queue.name}] Redis is connecting ...`);
    });
    cli.on('close', () => {
      logger.info(`[${queue.name}] Redis is closing ...`);
    });
    cli.on('reconnecting', (error) => {
      logger.info(`[${queue.name}] Redis is reconnecting ...`, {
        name: error.name,
        msg: error.message,
      });
      reconnectingEvent = true;
    });
  });

  return queue;
}

const queue1 = bullQueue('queue1', logger);
queue1.add({name: 'Test'});
const queue2 = bullQueue('queue2', logger);
queue2.add({score: 20});

Sample code for 2nd microservice, it consumes messages pushed in queue

const Bull = require('bull');
function createQueue(processor, options = {autoClean: true}) {
  if (!processor.job || !processor.queueName)
    throw new Error(
      'A processor should have a name and job function, check your implementation'
    );

  const jobName = processor.queueName();

  const newQueue = new Bull(jobName, {
    redis: {
      port: process.env.PORT,
      host: process.env.URL,
      password: process.env.PASSWORD,
      maxRetriesPerRequest: null,
      enableReadyCheck: false,
      connectTimeout: 60000,
      showFriendlyErrorStack: true,
      reconnectOnError: () => {
        return true;
      },
      connectionName: 'testconnectionName'
    },
    prefix: 'testPrefix',
    defaultJobOptions:{
      removeOnComplete: true,
      removeOnFail: true,
      attempts: 10
    },
    settings: {
      lockDuration: 120000
    }
  });

  newQueue.name = jobName;
  newQueue.on('completed', async (job, result) => {
    logger.info(`Job (${job.id}) from Queue (${job.queue.name}) is completed`);
    if(options.autoClean) {
      await newQueue.clean(1000*5);
      logger.info(`Job (${job.id}) from Queue (${job.queue.name}) cleaning will be done in ${util.format(1000*5)} ms`);
    }
  });
  newQueue.on('error', (error) => {
    logger.error(`Job threw error`, {error});
  });
  newQueue.on('stalled', (job) => {
    logger.info(`Job (${job.id}) from Queue (${job.queue.name}) got stalled`, {job});
  });
  newQueue.on('failed', (job, err) => {
    logger.error(`Job (${job.id}) from Queue (${job.queue.name}) got failed`, {err, job});
  });
  newQueue.on('active', (job, jobPromise) => {
    logger.info(`Job (${job.id}) from Queue (${job.queue.name}) got started`, {job});
  });
  newQueue.on('waiting', (jobId) => {
    logger.info(`Job id (${jobId}) waiting for processed`);
  });

  newQueue.process(processor.job);

  let reconnectingEvent = false 
  newQueue.clients.forEach((cli) => {
    cli.on('error', (err) => {
      logger.error(`[${newQueue.name}] Redis threw error `, err);
    });
    cli.on('ready', () => {
      logger.info(`[${newQueue.name}] Redis is connected ...`);
      if (reconnectingEvent) { // this is actually the fix, we just set a flag to not kick run unless we are restarting
        reconnectingEvent = false
        newQueue.run(1)
      }
    });
    cli.on('connect', () => {
      logger.info(`[${newQueue.name}] Redis is connecting ...`);
    });
    cli.on('close', () => {
      logger.info(`[${newQueue.name}] Redis is connecting is closing ...`);
    });
    cli.on('reconnecting', error => {
      logger.info(`[${newQueue.name}] Redis is reconnecting ...`, {name: error.name, mag: error.message});
      reconnectingEvent = true
    })
  });

  if (processor.defaultJobs) {
    newQueue.defaultJobs = [
      ...newQueue.defaultJobs,
      ...processor.defaultJobs(),
    ].flatMap((s) => s);
  }

  jobQueues.push(newQueue);

  logger.info(`Created new job ${jobName}`);

  return newQueue;
}

let firstJob = {
  queueName: () => {
    return 'queue1';
  },

  job: async(jobData, done) => {
    try {
      let task = await runTask(jobData);
      done();
    } catch (error) {
      logger.error(`Error occurred while shooting automatic job`, {
        message: error.message,
        stack: error.stack
      });
    }
  }
}

let secondJob = {
  queueName: () => {
    return 'queue2';
  },

  job: async(jobData, done) => {
    try {
      let task = await runTask2(jobData);
      done();
    } catch (error) {
      logger.error(`Error occurred while shooting job`, {
        message: error.message,
        stack: error.stack
      });
    }
  }
}
createQueue(firstJob);
createQueue(secondJob);

Everything works fine when we run both the services in docker, data is pushed in redis & then consumed by other service, but we noticed that after sometime 1st microservice starts giving error

{"message":"Job (4) of queue (queue1) got failed","level":"error", meta: {errorMsg: "Missing process handler for job type __default__"}}
{"message":"Job (6) of queue (queue2) got failed","level":"error", meta: {errorMsg: "Missing process handler for job type __default__"}}

If we restart 1st microservice then it again starts working & again stops after sometime.

Not able to find any root cause, any help is appreciated.

0

There are 0 best solutions below