I'm working on a new monitoring system that can measure Celery queue throughput and help alert the team when the queue is getting backed up. Over the course of my work, I've come across some peculiar behaviors that I don't understand (and are not well documented in the Celery specs).
For testing purposes, I've set up an endpoint that will populate the queue with 16 several long-running tasks that can be used to simulate a backed-up queue. The framework is Flask and the Queue broker is Redis. Celery is configured for each worker to work on up to 4 tasks in parallel, and I have 2 workers running.
api/health.py
def health():
health = Blueprint("health", __name__)
@health.route("/api/debug/create-long-queue", methods=["GET"])
def long_queue():
for i in range(16):
sleepy_job.delay()
return make_response({}, 200)
return health
jobs.py
@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
time.sleep(30)
Here's what I do to simulate a backed-up production queue:
- I call
/api/debug/create-long-queue
to simulate a back-up in my queue. Based on the above math, the workers should be busy sleeping for 1 minute each (Together, they can concurrently handle 8 tasks at a time. Each task just sleeps for 30 seconds, and there are 16 tasks total.) - I make another API call shortly after (< 5 s), which kicks of a different job with real business logic (processing of an inbound webhook API call). We'll call this job
handle_incoming_message
.
Here's what I see Using flower to inspect the queue:
- While all workers are blocked by the first 8
sleepy_job
tasks, I see no sign of the newhandle_incoming_message
on the queue, even though I am certainhandle_incoming_message.delay()
has been called as a result of the 2nd API call. - After the first 8
sleepy_job
tasks have been completed (~30s), I see the newhandle_incoming_message
on the queue with stateRECIEVED
. - After the second (and final) 8
sleepy_job
tasks have been completed, I now seehandle_incoming_message
has stateSTARTED
(and I can confirm this as the UI updates with the new data that was received and processed in that task.)
Questions
So it seems clear that when the workers are momentarily unblocked after handling the first 8 sleepy_job
tasks, they are doing something to mark/acknowledge the new handle_incoming_message
task in a way that is visible to flower. But this leaves several unanswered questions:
- What is the state of the new
handle_incoming_message
task when the workers are blocked? - What changes after workers are unblocked that makes it so flower now has visibility into the new
handle_incoming_message
task? - What does the "RECEIVED" state actually mean?
- (Bonus: How can I get visibility into tasks that are queued while workers are blocked?)
When all workers are blocked SOME tasks could be in the received state because of prefetching (look in the documentation for that). So chances are very high that your tasks are simply in the queue, waiting to be received by Celery workers (coordinating processes - these are not actual worker processes).
Flower is a simple service that is built upon a Celery feature called "task events". In simple terms it (Flower) subscribes itself as receiver of all events (received, succeeded, started, failed, etc) and then visually represents those to the web clients. More about it here. So when task gets received by a Celery worker, a "task-received" event is sent. Flower fetches this event, and changes the state of that task in the dashboard.
When a task is "received" it means that particular Celery worker took that task off the queue and it may be executed immediately (if there is a free worker-process to execute it), or Celery worker will wait for a worker process to become ready to run the task. I have already mentioned prefetching - Celery workers will often take more tasks then available worker-processes.
Celery does not give users a way to list what is in particular queue. That is why you will see many similar questions - including this one which offers answers. You will see my short answer there among others. In short, it depends on your broker of choice. If it is Redis, then you simply go through the list of objects. If it is RabbitMQ then you can use their tool to inspect queues. I think the decision not to provide this is good one as this information is never reliable. By the time you list all the tasks in particular queue, there may be thousands new ones...