There're 4 task to run, and a condition filier
I expect that happen:
- if
t0
returnTrue
, skipt1
, runt2
thent3
- if
t0
returnFalse
, runt1
, but whent1
done, runt2
thent3
logic like: enter image description here
but why does flow also skip t2, t3 when t1 skiped?
flow.py file
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from datetime import datetime
def f0():
print("this is function0")
return False
# return Ture
def f1():
print("this is function1")
def f2():
print("this is function2")
def f3():
print("this is function3")
def branch_chooser(**kwargs):
return 't2' if kwargs['ti'].xcom_pull(task_ids='t0') else 't1'
with DAG(
'TEST',
default_args={
'depends_on_past': False,
'retries': 1,
},
description='test process',
schedule_interval=None,
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
t0 = PythonOperator(
task_id='t0',
python_callable=f0,
provide_context=True,
dag=dag,
)
branch_task = BranchPythonOperator(
task_id='Branch_condition',
python_callable=branch_chooser,
provide_context=True,
dag=dag,
)
t1 = PythonOperator(
task_id='t1',
python_callable=f1,
dag=dag,
)
t2 = PythonOperator(
task_id='t2',
python_callable=f2,
dag=dag,
)
t3 = PythonOperator(
task_id='t3',
python_callable=f3,
dag=dag,
)
# Set up the DAG with two distinct paths
t0 >> branch_task
branch_task >> t1
branch_task >> t2
t1 >> t2
t2 >> t3
Try many combonation, can't get what I expect.
It's simpler than you think:
With
branch_task
we only decide to run t1 or not. If it runs, t2 and t3 run sequentially as usual. If not (branch_chooser
returnedNone
), t1 is skipped.Now say hello to trigger rules. With default setting of
all_success
t2 runs only if it's predecessor t1 finished successfully. If it skipped, t2 and all downstream tasks will skip as well.However with
trigger_rule='none_failed'
it's enough that t1 hasn't failed - skipped is fine and t2 runs normally, followed by t3.