Bull Queue Concurrency Questions

30.2k Views Asked by At

I need help understanding how Bull Queue (bull.js) processes concurrent jobs.

Suppose I have 10 Node.js instances that each instantiate a Bull Queue connected to the same Redis instance:

const bullQueue = require('bull');
const queue = new bullQueue('taskqueue', {...})
const concurrency = 5;
queue.process('jobTypeA', concurrency, job => {...do something...});

Does this mean that globally across all 10 node instances there will be a maximum of 5 (concurrency) concurrently running jobs of type jobTypeA? Or am I misunderstanding and the concurrency setting is per-Node instance?

What happens if one Node instance specifies a different concurrency value?

Can I be certain that jobs will not be processed by more than one Node instance?

5

There are 5 best solutions below

1
On

I spent a bunch of time digging into it as a result of facing a problem with too many processor threads.

The short story is that bull's concurrency is at a queue object level, not a queue level.

If you dig into the code the concurrency setting is invoked at the point in which you call .process on your queue object. This means that even within the same Node application if you create multiple queues and call .process multiple times they will add to the number of concurrent jobs that can be processed.

One contributor posted the following:

Yes, It was a little surprising for me too when I used Bull first time. Queue options are never persisted in Redis. You can have as many Queue instances per application as you want, each can have different settings. The concurrency setting is set when you're registering a processor, it is in fact specific to each process() function call, not Queue. If you'd use named processors, you can call process() multiple times. Each call will register N event loop handlers (with Node's process.nextTick()), by the amount of concurrency (default is 1).

So the answer to your question is: yes, your processes WILL be processed by multiple node instances if you register process handlers in multiple node instances.

0
On

Ah Welcome! This is a meta answer and probably not what you were hoping for but a general process for solving this:

You can specify a concurrency argument. Bull will then call your handler in parallel respecting this maximum value.

I personally don't really understand this or the guarantees that bull provides. Since it's not super clear:

IMO the biggest thing is:

Can I be certain that jobs will not be processed by more than one Node instance?

If exclusive message processing is an invariant and would result in incorrectness for your application, even with great documentation, I would highly recommend to perform due diligence on the library :p

1
On

Looking into it more, I think Bull doesn't handle being distributed across multiple Node instances at all, so the behavior is at best undefined.

0
On

Bull is designed for processing jobs concurrently with "at least once" semantics, although if the processors are working correctly, i.e. not stalling or crashing, it is in fact delivering "exactly once". However you can set the maximum stalled retries to 0 (maxStalledCount https://github.com/OptimalBits/bull/blob/develop/REFERENCE.md#queue) and then the semantics will be "at most once".

Having said that I will try to answer to the 2 questions asked by the poster:

What happens if one Node instance specifies a different concurrency value?

I will assume you mean "queue instance". If so, the concurrency is specified in the processor. If the concurrency is X, what happens is that at most X jobs will be processed concurrently by that given processor.

Can I be certain that jobs will not be processed by more than one Node instance?

Yes, as long as your job does not crash or your max stalled jobs setting is 0.

0
On

The TL;DR is: under normal conditions, jobs are being processed only once. If things go wrong (say Node.js process crashes), jobs may be double processed.

Quoting from Bull's official README.md:

Important Notes

The queue aims for an "at least once" working strategy. This means that in some situations, a job could be processed more than once. This mostly happens when a worker fails to keep a lock for a given job during the total duration of the processing.

When a worker is processing a job it will keep the job "locked" so other workers can't process it.

It's important to understand how locking works to prevent your jobs from losing their lock - becoming stalled - and being restarted as a result. Locking is implemented internally by creating a lock for lockDuration on interval lockRenewTime (which is usually half lockDuration). If lockDuration elapses before the lock can be renewed, the job will be considered stalled and is automatically restarted; it will be double processed. This can happen when:

  1. The Node process running your job processor unexpectedly terminates.
  2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the lockDuration setting (with the tradeoff being that it will take longer to recognize a real stalled job).

As such, you should always listen for the stalled event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.

As a safeguard so problematic jobs won't get restarted indefinitely (e.g. if the job processor aways crashes its Node process), jobs will be recovered from a stalled state a maximum of maxStalledCount times (default: 1).