Celery ChordError propagation from deeply nested workflows and catching them in Error Handlers

96 Views Asked by At

Small bit of context, we are running Python 3.11.2 and Celery 5.3

I have quite a complex use case of celery within a new application our team has built out. It nests a bunch of chains/groups/chords which generally are all working as expected.

We wanted to add a light workflow wrapper to our task/workflow runner so that we can track when a given workflow is running. After some back and forth with my senior dev, we came up with the following wrapper. It's very simple and gives us exactly what we need.

return (  # Create a chain to start the workflow and return the AsynchResult
    workflow_factory(*args, **kwargs).on_error(workflow_error_handler.s(workflow.id))
    | workflow_success_handler.si(workflow.id)
).delay()

workflow_factor is any registered workflow from elsewhere in our app, it expects a chain or group to be returned that we can then attach our on_error handler to. And assuming there are no errors, it should handle all the cleanup with the workflow_success_handler task.

Generally, this is working as expected. The workflow either succeed and our success handler is called and everything occurs how we expect. Or maybe there is a failure, can the error handler is called, and again, everything is handled.

However, we have one workflow that is effectively the culmination of all other workflows:

return (
    notify_workflow_started.si('EXAMPLE MAIN', day, company_ids),
    get_Z_workflow(*args),
    chord(
        [
            get_A_workflow(*args),
            get_B_workflow(*args),
            get_C_workflow(*args),
            get_D_workflow(*args),
            get_E_workflow(*args),
        ],
        get_F_workflow(*args),
        get_G_workflow(*args, omit_if_data_exists=True)
    ),
    notify_workflow_finished.si('EXAMPLE MAIN', day, company_ids),
)

This is what our "main" workflow_factor returns - I've obfuscated names for obvious reasons, what's important here I think is the workflow structure, not the logic itself.

This is already quite nested - but it goes deeper, let's take workflow C as the example as it's the one that goes the deepest and where I am testing the most right now...

when calling get_C_workflow, it returns the following:

return (
    notify_workflow_started.si('EXAMPLE C', *args),
    get_example_c_stage_1_workflow(*args),
    get_example_c_stage_2_workflow(*args),
    notify_workflow_finished.si('EXAMPLE C', *args)
)

This continues even deeper now, let's look into stage_1 as gain, I am working down towards where I am forcing an exception at the task level.

stage_1 returns the following:

return chain(
    notify_workflow_started.si('EXAMPLE C STAGE 1', *args),
    chord(
        [
            tasks.calculate_A_subfactors.si(*args),
            tasks.calculate_B_subfactors.si(*args),
            tasks.calculate_C_subfactors.si(*args),
            tasks.calculate_D_subfactors.si(*args),
            tasks.calculate_E_subfactors.si(*args),
            tasks.calculate_F_subfactors.si(*args),
            tasks.calculate_G_subfactors.si(*args),
        ],
        notify_workflow_finished.si('EXAMPLE C STAGE 1', *args),
    )
)

And here we finally see some of the actual tasks that are being run within this large workflow. The issue is that, in my example, I have calculate_C_subfactors forcibly raising an exception so that we can test how the workflow wrapper itself handles various errors that occur within the workflow.

The issue is that it seems like we are "too deeply nested" as right now when the exception is raised, the error_handler is not called. We clearly see the error is raised once in the celery docs, it's the actual custom exception itself. Then a few logs later as there are other tasks running in parallel, we see a ChordError gets raised having been triggered by the exception we raised - but it never reaches the ErrorHandler. Subsequently, since something occurred, when all the other tasks and sub-workflows finish, our success handler is also not called

The part that is really interesting is that this workflow is made up of other smaller workflows. So I can call and run get_C_workflow directly which effectively removes one "layer" from the groups/chains/chords, and in that case the error handler works as expected. It catches the error, does what it needs to and everything is great.

Is there something we are missing about how CHordErrors/Task Errors are propagated through nested chains/chords/groups that would cause this behavior?

We have tried to swap explicit chords out with implicit chords (A chain of a group and one other task), there's been some settings I've seen mentioned on a few other posts related to how exceptions propagate that had no effect on behavior.

We are running with a redis backend so we know the task statuses are being managed correctly as this is a pre-requisite to being able to use chords.

This has totally stumped me and two other devs for the past week and we seemingly can't find a path forward. Would love to get some insight from anyone that might know what's going on here.

0

There are 0 best solutions below