Can a flow be terminated if a specific task fails?

717 Views Asked by At

I have a long flow and I would like to mark its state as Failed if a specific task fails.

I read the docs here, however they don't seem to specify such a scenario. My guess is that I need to specific this somehow in the @task() decorator, but I don't know how.

Any guidance is much appreciated.

1

There are 1 best solutions below

3
On BEST ANSWER

This can be achieved by declaring your specific task a "reference task" of your Flow. Reference tasks are the tasks that ultimately determine the final state for each flow run.

For example, the following code snippet creates a flow with two independent, unrelated tasks that random fail half the time. However, the overall flow run state will only be determined based on the state of task_one, as we specify that task as the sole reference task:

import random
from prefect import task, Flow


@task
def task_one():
    if random.random() > 0.5:
        raise ValueError("Random failure")

@task
def task_two():
    if random.random() > 0.5:
        raise ValueError("Random failure")


flow = Flow("Two Task Flow", tasks=[task_one, task_two])
flow.set_reference_tasks([task_one])