We are getting a 'failed' and 'error' event with the following message: Missing process handler for job type default. We are not defining named processors.
Versions in use:
bull: ^4.10.2
Nodejs: 18
Sample code for 1st microservice:
const Bull = require('bull');
function getBullQueueObj(queueName, logger) {
const queue = new Bull(queueName, {
redis: {
port: process.env.PORT,
host: process.env.URL,
password: process.env.PASSWORD || '',
maxRetriesPerRequest: null,
enableReadyCheck: false,
connectTimeout: 60000,
showFriendlyErrorStack: true,
reconnectOnError: () => {
return true;
},
connectionName: 'testconnectionName'
},
prefix: 'testPrefix',
defaultJobOptions:{
removeOnComplete: true,
removeOnFail: true,
attempts: 10,
delay: 2000
}
});
queue.on('error', (error) => {
logger.error(`Queue threw error `, {error});
});
queue.on('stalled', (job) => {
logger.info(`Job (${job.id}) of queue (${job.queue.name}) got stalled`, {job});
});
queue.on('failed', (job, err) => {
logger.error(`Job (${job.id}) of queue (${job.queue.name}) got failed`, {err, job});
});
queue.on('waiting', (jobId) => {
logger.info(`Job id (${jobId}) is waiting to be processed`);
});
queue.on('active', (job, jobPromise) => {
logger.info(`Job (${job.id}) of queue (${job.queue.name}) got started`, {job});
});
let reconnectingEvent = false;
queue.clients.forEach((cli) => {
cli.on('error', (err) => {
logger.error(`[${queue.name}] Redis threw error `, err);
});
cli.on('ready', () => {
logger.info(`[${queue.name}] Redis is connected ...`);
if (reconnectingEvent) {
reconnectingEvent = false;
queue.run(1);
}
});
cli.on('connect', () => {
logger.info(`[${queue.name}] Redis is connecting ...`);
});
cli.on('close', () => {
logger.info(`[${queue.name}] Redis is closing ...`);
});
cli.on('reconnecting', (error) => {
logger.info(`[${queue.name}] Redis is reconnecting ...`, {
name: error.name,
msg: error.message,
});
reconnectingEvent = true;
});
});
return queue;
}
const queue1 = bullQueue('queue1', logger);
queue1.add({name: 'Test'});
const queue2 = bullQueue('queue2', logger);
queue2.add({score: 20});
Sample code for 2nd microservice, it consumes messages pushed in queue
const Bull = require('bull');
function createQueue(processor, options = {autoClean: true}) {
if (!processor.job || !processor.queueName)
throw new Error(
'A processor should have a name and job function, check your implementation'
);
const jobName = processor.queueName();
const newQueue = new Bull(jobName, {
redis: {
port: process.env.PORT,
host: process.env.URL,
password: process.env.PASSWORD,
maxRetriesPerRequest: null,
enableReadyCheck: false,
connectTimeout: 60000,
showFriendlyErrorStack: true,
reconnectOnError: () => {
return true;
},
connectionName: 'testconnectionName'
},
prefix: 'testPrefix',
defaultJobOptions:{
removeOnComplete: true,
removeOnFail: true,
attempts: 10
},
settings: {
lockDuration: 120000
}
});
newQueue.name = jobName;
newQueue.on('completed', async (job, result) => {
logger.info(`Job (${job.id}) from Queue (${job.queue.name}) is completed`);
if(options.autoClean) {
await newQueue.clean(1000*5);
logger.info(`Job (${job.id}) from Queue (${job.queue.name}) cleaning will be done in ${util.format(1000*5)} ms`);
}
});
newQueue.on('error', (error) => {
logger.error(`Job threw error`, {error});
});
newQueue.on('stalled', (job) => {
logger.info(`Job (${job.id}) from Queue (${job.queue.name}) got stalled`, {job});
});
newQueue.on('failed', (job, err) => {
logger.error(`Job (${job.id}) from Queue (${job.queue.name}) got failed`, {err, job});
});
newQueue.on('active', (job, jobPromise) => {
logger.info(`Job (${job.id}) from Queue (${job.queue.name}) got started`, {job});
});
newQueue.on('waiting', (jobId) => {
logger.info(`Job id (${jobId}) waiting for processed`);
});
newQueue.process(processor.job);
let reconnectingEvent = false
newQueue.clients.forEach((cli) => {
cli.on('error', (err) => {
logger.error(`[${newQueue.name}] Redis threw error `, err);
});
cli.on('ready', () => {
logger.info(`[${newQueue.name}] Redis is connected ...`);
if (reconnectingEvent) { // this is actually the fix, we just set a flag to not kick run unless we are restarting
reconnectingEvent = false
newQueue.run(1)
}
});
cli.on('connect', () => {
logger.info(`[${newQueue.name}] Redis is connecting ...`);
});
cli.on('close', () => {
logger.info(`[${newQueue.name}] Redis is connecting is closing ...`);
});
cli.on('reconnecting', error => {
logger.info(`[${newQueue.name}] Redis is reconnecting ...`, {name: error.name, mag: error.message});
reconnectingEvent = true
})
});
if (processor.defaultJobs) {
newQueue.defaultJobs = [
...newQueue.defaultJobs,
...processor.defaultJobs(),
].flatMap((s) => s);
}
jobQueues.push(newQueue);
logger.info(`Created new job ${jobName}`);
return newQueue;
}
let firstJob = {
queueName: () => {
return 'queue1';
},
job: async(jobData, done) => {
try {
let task = await runTask(jobData);
done();
} catch (error) {
logger.error(`Error occurred while shooting automatic job`, {
message: error.message,
stack: error.stack
});
}
}
}
let secondJob = {
queueName: () => {
return 'queue2';
},
job: async(jobData, done) => {
try {
let task = await runTask2(jobData);
done();
} catch (error) {
logger.error(`Error occurred while shooting job`, {
message: error.message,
stack: error.stack
});
}
}
}
createQueue(firstJob);
createQueue(secondJob);
Everything works fine when we run both the services in docker, data is pushed in redis & then consumed by other service, but we noticed that after sometime 1st microservice starts giving error
{"message":"Job (4) of queue (queue1) got failed","level":"error", meta: {errorMsg: "Missing process handler for job type __default__"}}
{"message":"Job (6) of queue (queue2) got failed","level":"error", meta: {errorMsg: "Missing process handler for job type __default__"}}
If we restart 1st microservice then it again starts working & again stops after sometime.
Not able to find any root cause, any help is appreciated.