Celery: How to use "link_error" if documentation has an error?

3.9k Views Asked by At

In Celery documentation: http://celery.readthedocs.org/en/latest/userguide/canvas.html#chains is example how to use link_error:

You can also add error callbacks using the link_error argument:

add.apply_async((2, 2), link_error=log_error.s())
add.subtask((2, 2), link_error=log_error.s())

Since exceptions can only be serialized when pickle is used the error callbacks take the id of the parent task as argument instead:

from __future__ import print_function 
import os 
from proj.celery import app

@app.task 
def log_error(task_id):
    result = app.AsyncResult(task_id)
    result.get(propagate=False)  # make sure result written.
    with open(os.path.join('/var/errors', task_id), 'a') as fh:
        print('--\n\n{0} {1} {2}'.format(
            task_id, result.result, result.traceback), file=fh)

But in this example is bug because they calling AsuncResult.get inside task which causing DEADLOCK and this log entries:

/opt/.virtualenvs/spark/lib/python3.4/site-packages/celery/result.py:45: RuntimeWarning: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

In Celery 3.2 this will result in an exception being
raised instead of just being a warning.

  warnings.warn(RuntimeWarning(E_WOULDBLOCK))

[2015-06-13 20:30:19,242: WARNING/Worker-4] /opt/.virtualenvs/spark/lib/python3.4/site-packages/celery/result.py:45: RuntimeWarning: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

In Celery 3.2 this will result in an exception being
raised instead of just being a warning.

  warnings.warn(RuntimeWarning(E_WOULDBLOCK))
1

There are 1 best solutions below

0
On

Please do this check-list to ensure configuration is ok:

  1. CELERY_IGNORE_RESULT must be set to False
  2. CELERY_BACKEND_RESULT must be set, e.g. 'amqp' (for RabbitMQ)
  3. @task decorator MUST NOT HAVE option ignore_result=True on every task (but You can create immutable signatures)

The last is properly modified task for fetching result (allow_join_result):

from celery.result import allow_join_result

@app.task
def log_error(task_id):
    with allow_join_result():
        result = app.AsyncResult(task_id)
        result.get(propagate=False)  # make sure result written.
        with open(os.path.join('/var/errors', task_id), 'a') as fh:
            print('--\n\n{0} {1} {2}'.format(
                task_id, result.result, result.traceback), file=fh)

Note: result.result in this case of ZeroDivisionError is dictionary:

{
    'exc_type': 'ZeroDivisionError',  # Name of Exception type
    'exc_message': 'division by zero'  # Reason of exception
}

Note:

This is #2652 issue, when calling get() on AsyncResult when Celery ignoring error