Celery : Task not execute from queue after the first few times

1.6k Views Asked by At

[Prerequisite]

  • Django==1.5.8
  • celery==3.0.23
  • django-celery==3.0.23
  • amqp==1.4.6

[Steps]

  • django views.py send message via RabbitMQ to celery task queue
  • Celery read messages from queue and execute tasks in order
  • Each task wouldn't take long time

[Problem]

  • For the first few times, celery successfully executed tasks but number of tasks is not reduced from the queue (Task is constantly stacked up at the queue)
  • Finally, celery is not executing task from queue after the first few times

[Hint]

  • I thought that following could be hints to solve problem
  • For the first few times, task is executed well but it is not removed from the queue
  • Task utils.tasks.do_things_1[027918df-a6ef-4712-a8c8-5b380d5bd0f6] succeeded in 1415464878.25s: None => Why does it take so long?

[Detail description with log]

  • Check number of tasks and worker

$ sudo rabbitmqctl list_queues name messages consumers | grep celery

celery 0 0 (0 tasks, 0 worker)

  • Celery Information when starts with Debug mode

$ ./manage.py celery worker --loglevel=debug

celery@75e28d96-1a33-4e08-8283-a31813a4124d v3.0.23 (Chiastic Slide)

Linux-3.2.0-34-generic-pae-i686-with-Ubuntu-14.04-trusty

[config]

broker: amqp://guest@localhost:5672//

app: default:0xb6b2188c (djcelery.loaders.DjangoLoader)

concurrency: 1 (processes)

events: OFF (enable -E to monitor this worker)

[queues]

celery: exchange:celery(direct) binding:celery

[Tasks]

utils.tasks.do_things_1

utils.tasks.do_things_2

utils.tasks.do_things_3

[2014-11-16 14:09:13,769: DEBUG/MainProcess] [Worker] Loading modules.

[2014-11-16 14:09:13,771: DEBUG/MainProcess] [Worker] Claiming components.

[2014-11-16 14:09:13,771: DEBUG/MainProcess] [Worker] Building boot step graph.

[2014-11-16 14:09:13,772: DEBUG/MainProcess] [Worker] New boot order: {ev, queues, beat, pool, mediator, autoreloader, timers, state-db, autoscaler, consumer}

[2014-11-16 14:09:13,774: DEBUG/MainProcess] Starting celery.worker.hub.Hub...

[2014-11-16 14:09:13,774: DEBUG/MainProcess] celery.worker.hub.Hub OK!

[2014-11-16 14:09:13,774: DEBUG/MainProcess] Starting celery.concurrency.processes.TaskPool...

[2014-11-16 14:09:13,782: DEBUG/MainProcess] celery.concurrency.processes.TaskPool OK!

[2014-11-16 14:09:13,787: DEBUG/MainProcess] Starting celery.worker.mediator.Mediator...

[2014-11-16 14:09:13,788: DEBUG/MainProcess] celery.worker.mediator.Mediator OK!

[2014-11-16 14:09:13,788: DEBUG/MainProcess] Starting celery.worker.consumer.Consumer...

[2014-11-16 14:09:13,789: WARNING/MainProcess] celery@75e28d96-1a33-4e08-8283-a31813a4124d ready.

[2014-11-16 14:09:13,789: DEBUG/MainProcess] consumer: Re-establishing connection to the broker...

[2014-11-16 14:09:13,800: DEBUG/MainProcess] Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL. See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2013 GoPivotal, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'connection.blocked': True, u'authentication_failure_close': True, u'basic.nack': True, u'consumer_priorities': True, u'consumer_cancel_notify': True, u'publisher_confirms': True}, u'platform': u'Erlang/OTP', u'version': u'3.2.4'}, mechanisms: [u'PLAIN', u'AMQPLAIN'], locales: [u'en_US']

[2014-11-16 14:09:13,801: DEBUG/MainProcess] Open OK!

[2014-11-16 14:09:13,801: INFO/MainProcess] consumer: Connected to amqp://[email protected]:5672//.

[2014-11-16 14:09:13,802: DEBUG/MainProcess] using channel_id: 1

[2014-11-16 14:09:13,803: DEBUG/MainProcess] Channel open

[2014-11-16 14:09:13,804: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->4

[2014-11-16 14:09:13,805: DEBUG/MainProcess] using channel_id: 2

[2014-11-16 14:09:13,805: DEBUG/MainProcess] Channel open

[2014-11-16 14:09:13,812: DEBUG/MainProcess] consumer: Ready to accept tasks!

  • Check number of tasks and worker

$ sudo rabbitmqctl list_queues name messages consumers | grep celery

celery 0 1 (0 tasks, 1 worker)

  • Some event occured in views.py and send message to execute do_things_1, do_things_2, do_things_3(1st attempts)

[2014-11-16 14:53:51,874: INFO/MainProcess] Got task from broker: utils.tasks.do_things_1[027918df-a6ef-4712-a8c8-5b380d5bd0f6] eta:[2014-11-16 14:53:52.835084+09:00]

[2014-11-16 14:53:51,875: INFO/MainProcess] Got task from broker: utils.tasks.do_things_2[a1c09645-32fb-4399-802b-60ca826fa6bc] eta:[2014-11-16 14:53:52.851516+09:00]

[2014-11-16 14:53:51,880: INFO/MainProcess] Got task from broker: utils.tasks.do_things_3[5a66f885-baf2-49ab-b82f-733d2b68fe7b] eta:[2014-11-16 14:53:52.852527+09:00]

[2014-11-16 14:53:51,880: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->7

[2014-11-16 14:53:52,882: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->4

[2014-11-16 14:53:52,917: DEBUG/MainProcess] Mediator: Running callback for task: utils.tasks.do_things_1[027918df-a6ef-4712-a8c8-5b380d5bd0f6]

[2014-11-16 14:53:52,918: DEBUG/MainProcess] TaskPool: Apply (args:('utils.tasks.do_things_1', '027918df-a6ef-4712-a8c8-5b380d5bd0f6', [u'0301010101', u'1'], {}, {'utc': True, 'is_eager': False, 'chord': None, 'group': None, 'args': [u'0301010101', u'1'], 'retries': 0, 'delivery_info': {'priority': None, 'routing_key': u'celery', 'exchange': u'celery'}, 'expires': None, 'task': 'utils.tasks.do_things_1', 'callbacks': None, 'errbacks': None, 'hostname': '************************', 'taskset': None, 'kwargs': {}, 'eta': '2014-11-16T05:53:52.835084', 'id': '027918df-a6ef-4712-a8c8-5b380d5bd0f6'}) kwargs:{})

[2014-11-16 14:53:52,918: DEBUG/MainProcess] Mediator: Running callback for task: utils.tasks.do_things_2[a1c09645-32fb-4399-802b-60ca826fa6bc]

[2014-11-16 14:53:52,918: DEBUG/MainProcess] TaskPool: Apply (args:('utils.tasks.do_things_2', 'a1c09645-32fb-4399-802b-60ca826fa6bc', [u'0301010101', u'1'], {}, {'utc': True, 'is_eager': False, 'chord': None, 'group': None, 'args': [u'0301010101', u'1'], 'retries': 0, 'delivery_info': {'priority': None, 'routing_key': u'celery', 'exchange': u'celery'}, 'expires': None, 'task': 'utils.tasks.do_things_2', 'callbacks': None, 'errbacks': None, 'hostname': '*********************', 'taskset': None, 'kwargs': {}, 'eta': '2014-11-16T05:53:52.851516', 'id': 'a1c09645-32fb-4399-802b-60ca826fa6bc'}) kwargs:{})

[2014-11-16 14:53:52,918: DEBUG/MainProcess] Mediator: Running callback for task: utils.tasks.do_things_3[5a66f885-baf2-49ab-b82f-733d2b68fe7b]

[2014-11-16 14:53:52,918: DEBUG/MainProcess] TaskPool: Apply (args:('utils.tasks.do_things_3', '5a66f885-baf2-49ab-b82f-733d2b68fe7b', [u'0301010101', u'1', u'1'], {}, {'utc': True, 'is_eager': False, 'chord': None, 'group': None, 'args': [u'0301010101', u'1', u'1'], 'retries': 0, 'delivery_info': {'priority': None, 'routing_key': u'celery', 'exchange': u'celery'}, 'expires': None, 'task': 'utils.tasks.do_things_3', 'callbacks': None, 'errbacks': None, 'hostname': '*************************', 'taskset': None, 'kwargs': {}, 'eta': '2014-11-16T05:53:52.852527', 'id': '5a66f885-baf2-49ab-b82f-733d2b68fe7b'}) kwargs:{})

  • I've checked that do_things_1, do_thins_2, do_things_3 executed well from the browser

  • Check number of tasks and worker (As you can see, task is not reduced from the queue)

$ sudo rabbitmqctl list_queues name messages consumers | grep celery

celery 3 1 (3 tasks, 1 worker)

  • Repeat to execute do_things_1, do_things_2, do_things_3(2nd attempts)

[2014-11-16 15:27:54,060: DEBUG/MainProcess] Task accepted: utils.tasks.do_things_1[027918df-a6ef-4712-a8c8-5b380d5bd0f6] pid:23793

[2014-11-16 15:27:54,060: INFO/MainProcess] Got task from broker: utils.tasks.do_things_1[2ea06e92-c617-460e-9275-98d4c4f79394] eta:[2014-11-16 15:27:55.048622+09:00]

[2014-11-16 15:27:54,060: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->5

[2014-11-16 15:27:54,063: INFO/MainProcess] Task utils.tasks.do_things_1[027918df-a6ef-4712-a8c8-5b380d5bd0f6] succeeded in 1415464878.25s: None

[2014-11-16 15:27:54,063: INFO/MainProcess] Got task from broker: utils.tasks.do_things_2[cad7b687-7ef6-4f99-a06e-4a038369837d] eta:[2014-11-16 15:27:55.061913+09:00]

[2014-11-16 15:27:54,063: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->6

[2014-11-16 15:27:54,065: DEBUG/MainProcess] Task accepted: utils.tasks.do_things_2[a1c09645-32fb-4399-802b-60ca826fa6bc] pid:23793

[2014-11-16 15:27:54,066: INFO/MainProcess] Got task from broker: utils.tasks.do_things_3[ceee2b27-5cc0-45cc-9a00-e15a361fa92f] eta:[2014-11-16 15:27:55.064312+09:00]

[2014-11-16 15:27:54,066: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->7

[2014-11-16 15:27:55,068: DEBUG/MainProcess] consumer: basic.qos: prefetch_count->4

[2014-11-16 15:27:55,101: DEBUG/MainProcess] Mediator: Running callback for task: utils.tasks.do_things_1[2ea06e92-c617-460e-9275-98d4c4f79394]

[2014-11-16 15:27:55,101: DEBUG/MainProcess] TaskPool: Apply (args:('utils.tasks.do_things_1', '2ea06e92-c617-460e-9275-98d4c4f79394', [u'0301010101', u'1'], {}, {'utc': True, 'is_eager': False, 'chord': None, 'group': None, 'args': [u'0301010101', u'1'], 'retries': 0, 'delivery_info': {'priority': None, 'routing_key': u'celery', 'exchange': u'celery'}, 'expires': None, 'task': 'utils.tasks.do_things_1', 'callbacks': None, 'errbacks': None, 'hostname': '******************************', 'taskset': None, 'kwargs': {}, 'eta': '2014-11-16T06:27:55.048622', 'id': '2ea06e92-c617-460e-9275-98d4c4f79394'}) kwargs:{})

[2014-11-16 15:27:55,105: DEBUG/MainProcess] Mediator: Running callback for task: utils.tasks.do_things_2[cad7b687-7ef6-4f99-a06e-4a038369837d]

[2014-11-16 15:27:55,105: DEBUG/MainProcess] TaskPool: Apply (args:('utils.tasks.do_things_2', 'cad7b687-7ef6-4f99-a06e-4a038369837d', [u'0301010101', u'1'], {}, {'utc': True, 'is_eager': False, 'chord': None, 'group': None, 'args': [u'0301010101', u'1'], 'retries': 0, 'delivery_info': {'priority': None, 'routing_key': u'celery', 'exchange': u'celery'}, 'expires': None, 'task': 'utils.tasks.do_things_2', 'callbacks': None, 'errbacks': None, 'hostname': '*************************', 'taskset': None, 'kwargs': {}, 'eta': '2014-11-16T06:27:55.061913', 'id': 'cad7b687-7ef6-4f99-a06e-4a038369837d'}) kwargs:{})

[2014-11-16 15:27:55,105: DEBUG/MainProcess] Mediator: Running callback for task: utils.tasks.do_things_3[ceee2b27-5cc0-45cc-9a00-e15a361fa92f]

[2014-11-16 15:27:55,106: DEBUG/MainProcess] TaskPool: Apply (args:('utils.tasks.do_things_3', 'ceee2b27-5cc0-45cc-9a00-e15a361fa92f', [u'0301010101', u'1', u'1'], {}, {'utc': True, 'is_eager': False, 'chord': None, 'group': None, 'args': [u'0301010101', u'1', u'1'], 'retries': 0, 'delivery_info': {'priority': None, 'routing_key': u'celery', 'exchange': u'celery'}, 'expires': None, 'task': 'utils.tasks.do_things_3', 'callbacks': None, 'errbacks': None, 'hostname': '*******************************', 'taskset': None, 'kwargs': {}, 'eta': '2014-11-16T06:27:55.064312', 'id': 'ceee2b27-5cc0-45cc-9a00-e15a361fa92f'}) kwargs:{})

  • I've checked that do_things_1, do_thins_2, do_things_3 executed well from the browser

  • Check number of tasks and worker (As you can see, task is reduced partially)

$ sudo rabbitmqctl list_queues name messages consumers | grep celery

celery 4 1 (4 tasks, 1 worker)

  • Repeat to execute do_things_1, do_things_2, do_things_3 (3rd attempts)

  • Celery didn't log anything and do_things_1, do_thins_2, do_things_3 not executed!

  • Check number of tasks and worker (As you can see, task is increased)

$ sudo rabbitmqctl list_queues name messages consumers | grep celery

celery 7 1 (7 tasks, 1 worker)

  • When I terminate it by "Ctrl+C", it logs like following

celeryd: Warm shutdown (MainProcess)

[2014-11-16 16:15:52,922: DEBUG/MainProcess] Stopping celery.worker.consumer.Consumer...

[2014-11-16 16:15:52,923: DEBUG/MainProcess] consumer: Stopping consumers...

[2014-11-16 16:15:52,923: DEBUG/MainProcess] Stopping celery.worker.mediator.Mediator...

[2014-11-16 16:15:53,638: DEBUG/MainProcess] Stopping celery.concurrency.processes.TaskPool...

[2014-11-16 16:15:53,638: INFO/MainProcess] Task utils.tasks.do_things_2[a1c09645-32fb-4399-802b-60ca826fa6bc] succeeded in 1415467757.76s: None

[2014-11-16 16:15:53,639: DEBUG/MainProcess] Task accepted: utils.tasks.do_things_3[5a66f885-baf2-49ab-b82f-733d2b68fe7b] pid:23793

[2014-11-16 16:15:53,639: INFO/MainProcess] Task utils.tasks.do_things_3[5a66f885-baf2-49ab-b82f-733d2b68fe7b] succeeded in 1415467757.75s: None

[2014-11-16 16:15:53,640: DEBUG/MainProcess] Task accepted: utils.tasks.do_things_1[2ea06e92-c617-460e-9275-98d4c4f79394] pid:23793

[2014-11-16 16:15:53,640: INFO/MainProcess] Task utils.tasks.do_things_1[2ea06e92-c617-460e-9275-98d4c4f79394] succeeded in 1415465715.65s: None

[2014-11-16 16:15:53,640: DEBUG/MainProcess] Task accepted: utils.tasks.do_things_2[cad7b687-7ef6-4f99-a06e-4a038369837d] pid:23793

[2014-11-16 16:15:53,641: INFO/MainProcess] Task utils.tasks.do_things_2[cad7b687-7ef6-4f99-a06e-4a038369837d] succeeded in 1415465715.64s: None

[2014-11-16 16:15:53,641: DEBUG/MainProcess] Task accepted: utils.tasks.do_things_3[ceee2b27-5cc0-45cc-9a00-e15a361fa92f] pid:23793

[2014-11-16 16:15:53,642: INFO/MainProcess] Task utils.tasks.update_apartboard_1st_load_page_cache[ceee2b27-5cc0-45cc-9a00-e15a361fa92f] succeeded in 1415465715.63s: None

  • do_things_1, do_things_2, do_things_3 not executed (Check from the browser)

  • Check number of tasks and worker (As you can see, task is reduced from the queue)

$ sudo rabbitmqctl list_queues name messages consumers | grep celery

celery 3 0 (3 tasks, 0 worker)

  • Purge task queue

$ ./manage.py celeryctl purge

$ sudo rabbitmqctl list_queues name messages consumers | grep celery

celery 0 0 (0 tasks, 0 worker)

1

There are 1 best solutions below

0
On

Upgrade Celery version from 3.0.23 to 3.0.25 solved this problem.

However, I still don't know the cause of the failure.