I'm working on a new monitoring system that can measure Celery queue throughput and help alert the team when the queue is getting backed up. Over the course of my work, I've come across some peculiar behaviors that I don't understand (and are not well documented in the Celery specs).

For testing purposes, I've set up an endpoint that will populate the queue with 16 several long-running tasks that can be used to simulate a backed-up queue. The framework is Flask and the Queue broker is Redis. Celery is configured for each worker to work on up to 4 tasks in parallel, and I have 2 workers running.

api/health.py

def health():
    health = Blueprint("health", __name__)

    @health.route("/api/debug/create-long-queue", methods=["GET"])
    def long_queue():
        for i in range(16):
            sleepy_job.delay()

        return make_response({}, 200)

    return health

jobs.py

@celery.task(priority=HIGH_PRIORITY)
def sleepy_job(*args, **kwargs):
    time.sleep(30)

Here's what I do to simulate a backed-up production queue:

  1. I call /api/debug/create-long-queue to simulate a back-up in my queue. Based on the above math, the workers should be busy sleeping for 1 minute each (Together, they can concurrently handle 8 tasks at a time. Each task just sleeps for 30 seconds, and there are 16 tasks total.)
  2. I make another API call shortly after (< 5 s), which kicks of a different job with real business logic (processing of an inbound webhook API call). We'll call this job handle_incoming_message.

Here's what I see Using flower to inspect the queue:

  • While all workers are blocked by the first 8 sleepy_job tasks, I see no sign of the new handle_incoming_message on the queue, even though I am certain handle_incoming_message.delay() has been called as a result of the 2nd API call.
  • After the first 8 sleepy_job tasks have been completed (~30s), I see the new handle_incoming_message on the queue with state RECIEVED.
  • After the second (and final) 8 sleepy_job tasks have been completed, I now see handle_incoming_message has state STARTED (and I can confirm this as the UI updates with the new data that was received and processed in that task.)

Questions

So it seems clear that when the workers are momentarily unblocked after handling the first 8 sleepy_job tasks, they are doing something to mark/acknowledge the new handle_incoming_message task in a way that is visible to flower. But this leaves several unanswered questions:

  • What is the state of the new handle_incoming_message task when the workers are blocked?
  • What changes after workers are unblocked that makes it so flower now has visibility into the new handle_incoming_message task?
  • What does the "RECEIVED" state actually mean?
  • (Bonus: How can I get visibility into tasks that are queued while workers are blocked?)
1

There are 1 best solutions below

2
On BEST ANSWER
  1. When all workers are blocked SOME tasks could be in the received state because of prefetching (look in the documentation for that). So chances are very high that your tasks are simply in the queue, waiting to be received by Celery workers (coordinating processes - these are not actual worker processes).

  2. Flower is a simple service that is built upon a Celery feature called "task events". In simple terms it (Flower) subscribes itself as receiver of all events (received, succeeded, started, failed, etc) and then visually represents those to the web clients. More about it here. So when task gets received by a Celery worker, a "task-received" event is sent. Flower fetches this event, and changes the state of that task in the dashboard.

  3. When a task is "received" it means that particular Celery worker took that task off the queue and it may be executed immediately (if there is a free worker-process to execute it), or Celery worker will wait for a worker process to become ready to run the task. I have already mentioned prefetching - Celery workers will often take more tasks then available worker-processes.

  4. Celery does not give users a way to list what is in particular queue. That is why you will see many similar questions - including this one which offers answers. You will see my short answer there among others. In short, it depends on your broker of choice. If it is Redis, then you simply go through the list of objects. If it is RabbitMQ then you can use their tool to inspect queues. I think the decision not to provide this is good one as this information is never reliable. By the time you list all the tasks in particular queue, there may be thousands new ones...