Creating Dynamic TaskGroup

49 Views Asked by At

I have a use case where source system is putting files in GCS bucket "landing_incoming". A Airflow DAG is running every hour, that first moves all the available files from "landing_incoming" to "landing_inprocess" and then run series of task(somewhere 10-12 tasks in sequence) on each file and then finally load the data to a BQ table. In current approach, processing happens for each file in sequence.

To implement the parallelism we used the concept of TaskGroup, where in DAG first few task is responsible for moving the files to "landing_inprocess" bucket, and then we create Taskgroup for each file available in this bucket that runs in parallel.

enter image description here

In above sample DAG task_3, task_4 and task_5 is created for each file dynamically, since there are 3 files in my bucket, that's why it created 3 parallel flow, for n files it will create n parallel flow.

from datetime import timedelta

import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.models.baseoperator import chain

from google.cloud import storage

storage_client = storage.Client()

default_args = {
    "start_date": airflow.utils.dates.days_ago(1),
    "retries": 0,
    "retry_delay": timedelta(minutes=1)
}

with DAG(
        "Task_Group_Job",
        default_args=default_args,
        description="Test Task group job",
        schedule_interval = None,
        dagrun_timeout=timedelta(minutes=60),

) as dag:

    task_1 = BashOperator(
        task_id='task_1',
        bash_command='echo "Running task 1"',
        dag=dag
    )

    task_2 = BashOperator(
        task_id='task_2',
        bash_command='echo "Running task 2"',
        dag=dag
    )

    task_7 = BashOperator(
        task_id='task_7',
        bash_command='echo "Running task 7"',
        dag=dag
    )

    # Assume this function is responsible for moving files from "landing_incoming" to "landing_inprocess"
    def push_function(**context):
        lineage_id = "ugugfvvi7576tggbug7t"
        context['ti'].xcom_push(key='lineage_id', value=lineage_id)

    push_task = PythonOperator(
        task_id='push_task', 
        python_callable=push_function,
        provide_context=True,
        )


    with TaskGroup(group_id='process_maf_files', prefix_group_id=False) as process_maf_files:
        source_bucket = storage_client.get_bucket("landing_inprocess")
        for blob in source_bucket.list_blobs(prefix="accounts/account"):
            file = str(blob.name.split('/')[-1].split(".")[0])
            task_3 = BashOperator(
                task_id=f'task_3_{file}',
                bash_command=f'echo "Processing file: {file} with lineage id {lineage_id}"'
            )

            task_4 = BashOperator(
                task_id=f'task_4_{file}',
                bash_command=f'echo "Processing file: {file} with lineage id {lineage_id}"'
            )

            task_5 = BashOperator(
                task_id=f'task_5_{file}',
                bash_command=f'echo "Processing file: {file} with lineage id {lineage_id}"'
            )

            chain(task_3,task_4,task_5)

    task_1 >> task_2 >> push_task >> process_maf_files >> task_7
    process_maf_files >> task_7
    task_1 >> task_2 >> push_task >> task_7

Now what's happening is, since every time this DAG will trigger there will be no files in "landing_inprocess" bucket initially, and hence TaskGroup will generate any task at parsing i.e. empty taskgroup. But eventually the files are moved just before the Taskgroup within the same DAG, but still DAG is not considering those files and hence directly running task_7, because refresh is not happened. When I am adding a wait time of 2 min in "push_task", then my taskgroup is generating flows correctly.

I am not sure, how this can be resolved, why adding wait in "push_task" is helping the DAG create taskgroup with flows for each file, otherwise not.

0

There are 0 best solutions below