I have a use case where source system is putting files in GCS bucket "landing_incoming". A Airflow DAG is running every hour, that first moves all the available files from "landing_incoming" to "landing_inprocess" and then run series of task(somewhere 10-12 tasks in sequence) on each file and then finally load the data to a BQ table. In current approach, processing happens for each file in sequence.
To implement the parallelism we used the concept of TaskGroup, where in DAG first few task is responsible for moving the files to "landing_inprocess" bucket, and then we create Taskgroup for each file available in this bucket that runs in parallel.
In above sample DAG task_3, task_4 and task_5 is created for each file dynamically, since there are 3 files in my bucket, that's why it created 3 parallel flow, for n files it will create n parallel flow.
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
from airflow.models.baseoperator import chain
from google.cloud import storage
storage_client = storage.Client()
default_args = {
"start_date": airflow.utils.dates.days_ago(1),
"retries": 0,
"retry_delay": timedelta(minutes=1)
}
with DAG(
"Task_Group_Job",
default_args=default_args,
description="Test Task group job",
schedule_interval = None,
dagrun_timeout=timedelta(minutes=60),
) as dag:
task_1 = BashOperator(
task_id='task_1',
bash_command='echo "Running task 1"',
dag=dag
)
task_2 = BashOperator(
task_id='task_2',
bash_command='echo "Running task 2"',
dag=dag
)
task_7 = BashOperator(
task_id='task_7',
bash_command='echo "Running task 7"',
dag=dag
)
# Assume this function is responsible for moving files from "landing_incoming" to "landing_inprocess"
def push_function(**context):
lineage_id = "ugugfvvi7576tggbug7t"
context['ti'].xcom_push(key='lineage_id', value=lineage_id)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
)
with TaskGroup(group_id='process_maf_files', prefix_group_id=False) as process_maf_files:
source_bucket = storage_client.get_bucket("landing_inprocess")
for blob in source_bucket.list_blobs(prefix="accounts/account"):
file = str(blob.name.split('/')[-1].split(".")[0])
task_3 = BashOperator(
task_id=f'task_3_{file}',
bash_command=f'echo "Processing file: {file} with lineage id {lineage_id}"'
)
task_4 = BashOperator(
task_id=f'task_4_{file}',
bash_command=f'echo "Processing file: {file} with lineage id {lineage_id}"'
)
task_5 = BashOperator(
task_id=f'task_5_{file}',
bash_command=f'echo "Processing file: {file} with lineage id {lineage_id}"'
)
chain(task_3,task_4,task_5)
task_1 >> task_2 >> push_task >> process_maf_files >> task_7
process_maf_files >> task_7
task_1 >> task_2 >> push_task >> task_7
Now what's happening is, since every time this DAG will trigger there will be no files in "landing_inprocess" bucket initially, and hence TaskGroup will generate any task at parsing i.e. empty taskgroup. But eventually the files are moved just before the Taskgroup within the same DAG, but still DAG is not considering those files and hence directly running task_7, because refresh is not happened. When I am adding a wait time of 2 min in "push_task", then my taskgroup is generating flows correctly.
I am not sure, how this can be resolved, why adding wait in "push_task" is helping the DAG create taskgroup with flows for each file, otherwise not.