Aiflow Branch issue

36 Views Asked by At

There're 4 task to run, and a condition filier

I expect that happen:

  1. if t0 return True, skip t1, run t2 then t3
  2. if t0 return False, run t1, but when t1 done, run t2 then t3

logic like: enter image description here

but why does flow also skip t2, t3 when t1 skiped?

flow.py file

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime

def f0():
    print("this is function0")
    return False
    # return Ture

def f1():
    print("this is function1")

def f2():
    print("this is function2")

def f3():
    print("this is function3")

def branch_chooser(**kwargs):
    return 't2' if kwargs['ti'].xcom_pull(task_ids='t0') else 't1'

with DAG(
    'TEST',
    default_args={
        'depends_on_past': False,
        'retries': 1,
    },
    description='test process',
    schedule_interval=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:
    t0 = PythonOperator(
        task_id='t0',
        python_callable=f0,
        provide_context=True,
        dag=dag,
    )

    branch_task = BranchPythonOperator(
        task_id='Branch_condition',
        python_callable=branch_chooser,
        provide_context=True,
        dag=dag,
    )

    t1 = PythonOperator(
        task_id='t1',
        python_callable=f1,
        dag=dag,
    )

    t2 = PythonOperator(
        task_id='t2',
        python_callable=f2,
        dag=dag,
    )

    t3 = PythonOperator(
        task_id='t3',
        python_callable=f3,
        dag=dag,
    )

    # Set up the DAG with two distinct paths
    t0 >> branch_task
    branch_task >> t1
    branch_task >> t2
    t1 >> t2
    t2 >> t3
    

Try many combonation, can't get what I expect.

1

There are 1 best solutions below

0
On

It's simpler than you think:

def branch_chooser(**kwargs):
    return None if kwargs['ti'].xcom_pull(task_ids='t0') else 't1'

with DAG(...):
    ...

    t2 = PythonOperator(
        task_id='t2',
        python_callable=f2,
        trigger_rule='none_failed'  # << important!
        dag=dag,
    )

    # Keep it simple!
    t0 >> branch_task >> t1 >> t2 >> t3

With branch_task we only decide to run t1 or not. If it runs, t2 and t3 run sequentially as usual. If not (branch_chooser returned None), t1 is skipped.

Now say hello to trigger rules. With default setting of all_success t2 runs only if it's predecessor t1 finished successfully. If it skipped, t2 and all downstream tasks will skip as well.

However with trigger_rule='none_failed' it's enough that t1 hasn't failed - skipped is fine and t2 runs normally, followed by t3.