in Airflow i have some issue when running multiple task, in this case i have 2 DAG which is first dag have 50 task to be done, and 2nd is 5 task to be done as follow :
dynamic_Task_Concurrency.py
sources = [
{"num": i, "task_ingest_name": f"table_data{i}", "path_ingestion": "/tmp"}
for i in range(1, 51)
]
# define the DAG
with DAG(
"dynamic_Task_Concurrency",
default_args={
'owner': 'airflow',
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
},
start_date=datetime(2022, 7, 13),
schedule_interval='0 17 * * *',
catchup=False,
tags=['daily'],
max_active_runs=5,
) as dag:
data_ingestion_start = DummyOperator(task_id="data_ingestion_start")
with TaskGroup(group_id="group_data_ingestion") as group_data_ingestion:
for source in sources:
ingest_table = BashOperator(
task_id=f"ingestion_table_{source['task_ingest_name']}",
bash_command="{}/task_delay.sh ".format(source['path_ingestion']),
dag=dag,
)
data_ingestion_end = DummyOperator(task_id="data_ingestion_end")
data_ingestion_start >> group_data_ingestion >> data_ingestion_end
dynamic_Task_Concurrency_two.py
sources = [
{"num": i, "task_ingest_name": f"table_data{i}", "path_ingestion": "/tmp"}
for i in range(1, 5)
]
# define the DAG
with DAG(
"dynamic_Task_Concurrency_two",
default_args={
'owner': 'airflow',
'email_on_failure': False,
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=5)
},
start_date=datetime(2022, 7, 13),
schedule_interval='0 17 * * *',
catchup=False,
tags=['daily'],
max_active_runs=2,
) as dag:
data_ingestion_start = DummyOperator(task_id="data_ingestion_start")
with TaskGroup(group_id="group_data_ingestion") as group_data_ingestion:
for source in sources:
ingest_table = BashOperator(
task_id=f"ingestion_table_{source['task_ingest_name']}",
bash_command="{}/task_delay.sh ".format(source['path_ingestion']),
dag=dag,
)
data_ingestion_end = DummyOperator(task_id="data_ingestion_end")
data_ingestion_start >> group_data_ingestion >> data_ingestion_end
and in airflow.cfg i have set up
parallelism = 36
max_active_tasks_per_dag = 12
dags_are_paused_at_creation = True
max_active_runs_per_dag = 5
this is when dynamic_Task_Concurrency run,dynamic_Task_Concurrency_two is all on scheduled until atleast some task of dynamic_Task_Concurrency is done
this is when some of dynamic_Task_Concurrency is Done.
i was expecting dynamic_Task_Concurrency and dynamic_Task_Concurrency_Two running together with 12 task run at the same time, which part i must change?thanks