Airflow: Task-generated Mapping Over a Task Group: Branching Behavior Not Functioning as Expected

58 Views Asked by At
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 11, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
logger = logging.getLogger('airflow')


with DAG(
    'test_mapping_branch',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
) as dag:

    @task
    def get_list():
        return [1, 2, {"a": "b"}, "str"]

    @task_group(group_id='my_group')
    def my_group(result):
        @task.branch
        def choice(result):
            if isinstance(result, int):
                return 'my_group.first'
            else:
                return 'my_group.second'

        @task
        def first():
            print('First')

        @task
        def second():
            print('Second')

        @task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
        def task_end():
            print('End')

        choice(result) >> [first(), second()] >> task_end()

    # Case 1
    my_group.expand(result=get_list())
    
    # Case 2
    my_group.expand(result=[1, 2, {"a": "b"}, "str"])

Why does an error occur when executing my_group.expand(result=get_list()) in contrast to the successful execution of my_group.expand(result=[1, 2, {"a": "b"}, "str"])? Specifically, in the former case, both first and second tasks are executed, whereas the expected behavior is for only one of them to be executed based on the conditions specified in the choice task.

Case 1: enter image description here

Case 2: enter image description here

I attempted to use task-generated mapping over a task group in Airflow, specifically utilizing the branch feature. My expectation was that based on the conditions specified in the choice task within the task group, only one of the tasks (first or second) would be executed when calling rank.expand(result=get_list()). However, both first and second tasks were unexpectedly executed. This behavior contrasts with the successful execution of rank.expand(result=[1, 2, {"a": "b"}, "str"]), where only the appropriate task was executed based on the conditions.

0

There are 0 best solutions below