I'm following this pattern in the docs https://docs.bullmq.io/patterns/process-step-jobs
But it seems like the example code throws a Missing lock for job <jobId> failed
error. Following is a minimal reproducible version
import { Worker, Queue } from "bullmq";
const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));
enum Step {
Initial,
Second,
Finish,
}
type JobData = {
step: Step;
};
const worker = new Worker<JobData>("reproduce-error", async (job, token) => {
let step = job.data.step;
while (step !== Step.Finish) {
switch (step) {
case Step.Initial: {
await sleep(3 * 1000);
await job.moveToDelayed(Date.now() + 1000, token);
await job.update({
step: Step.Second,
});
await job.updateProgress(1);
step = Step.Second;
break;
}
case Step.Second: {
await sleep(3 * 1000);
await job.update({
step: Step.Finish,
});
await job.updateProgress(2);
step = Step.Finish;
return Step.Finish;
}
default: {
throw new Error("invalid step");
}
}
}
});
worker.on("error", (failedReason) => console.log(failedReason));
worker.on("progress", (job, progress) =>
console.log("progress", job.data, progress)
);
const queue = new Queue<JobData>("reproduce-error");
queue.add("Reproduce error", { step: Step.Initial });
Here's a stack trace of the error
at Scripts.finishedErrors (/Users/sumit/Coding/indiemaker/twips/reproduce-bullmq-error/node_modules/bullmq/src/classes/scripts.ts:356:16)
at Job.moveToFailed (/Users/sumit/Coding/indiemaker/twips/reproduce-bullmq-error/node_modules/bullmq/src/classes/job.ts:618:26)
at processTicksAndRejections (node:internal/process/task_queues:95:5)
at async handleFailed (/Users/sumit/Coding/indiemaker/twips/reproduce-bullmq-error/node_modules/bullmq/src/classes/worker.ts:642:11)
at async Worker.retryIfFailed (/Users/sumit/Coding/indiemaker/twips/reproduce-bullmq-error/node_modules/bullmq/src/classes/worker.ts:788:16)
at async Worker.run (/Users/sumit/Coding/indiemaker/twips/reproduce-bullmq-error/node_modules/bullmq/src/classes/worker.ts:385:34)
Here's what I've thought so far:
I suspect moveToDelayed
moves the job to the delayed set but it keeps running the current execution. That's why it tries to move the same job to the delayed set twice and thus fails. But, changing the break
to return
in the Initial
block should change that, but even that doesn't help and gives the same error. So there might be something more fundamental that's going wrong here.
Any help would be appreciated. Thanks in advance!