default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 11, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
logger = logging.getLogger('airflow')
with DAG(
'test_mapping_branch',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
@task
def get_list():
return [1, 2, {"a": "b"}, "str"]
@task_group(group_id='my_group')
def my_group(result):
@task.branch
def choice(result):
if isinstance(result, int):
return 'my_group.first'
else:
return 'my_group.second'
@task
def first():
print('First')
@task
def second():
print('Second')
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def task_end():
print('End')
choice(result) >> [first(), second()] >> task_end()
# Case 1
my_group.expand(result=get_list())
# Case 2
my_group.expand(result=[1, 2, {"a": "b"}, "str"])
Why does an error occur when executing my_group.expand(result=get_list()) in contrast to the successful execution of my_group.expand(result=[1, 2, {"a": "b"}, "str"])? Specifically, in the former case, both first and second tasks are executed, whereas the expected behavior is for only one of them to be executed based on the conditions specified in the choice task.
Case 1:
Case 2:
I attempted to use task-generated mapping over a task group in Airflow, specifically utilizing the branch
feature. My expectation was that based on the conditions specified in the choice
task within the task group, only one of the tasks (first
or second
) would be executed when calling rank.expand(result=get_list())
. However, both first
and second
tasks were unexpectedly executed. This behavior contrasts with the successful execution of rank.expand(result=[1, 2, {"a": "b"}, "str"])
, where only the appropriate task was executed based on the conditions.