Airflow Task stuck at scheduled state

84 Views Asked by At

in Airflow i have some issue when running multiple task, in this case i have 2 DAG which is first dag have 50 task to be done, and 2nd is 5 task to be done as follow :

dynamic_Task_Concurrency.py

sources = [
    {"num": i, "task_ingest_name": f"table_data{i}", "path_ingestion": "/tmp"}
    for i in range(1, 51)
]

# define the DAG
with DAG(
    "dynamic_Task_Concurrency",
    default_args={
        'owner': 'airflow',
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=5)
    },

    start_date=datetime(2022, 7, 13),
    schedule_interval='0 17 * * *',
    catchup=False,
    tags=['daily'],
    max_active_runs=5,
) as dag:

    data_ingestion_start = DummyOperator(task_id="data_ingestion_start")

    with TaskGroup(group_id="group_data_ingestion") as group_data_ingestion:
        for source in sources:
            ingest_table = BashOperator(
                task_id=f"ingestion_table_{source['task_ingest_name']}",
                bash_command="{}/task_delay.sh ".format(source['path_ingestion']),
                dag=dag,
            )

    data_ingestion_end = DummyOperator(task_id="data_ingestion_end")

    data_ingestion_start >> group_data_ingestion >> data_ingestion_end

dynamic_Task_Concurrency_two.py

sources = [
    {"num": i, "task_ingest_name": f"table_data{i}", "path_ingestion": "/tmp"}
    for i in range(1, 5)
]

# define the DAG
with DAG(
    "dynamic_Task_Concurrency_two",
    default_args={
        'owner': 'airflow',
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 5,
        'retry_delay': timedelta(minutes=5)
    },

    start_date=datetime(2022, 7, 13),
    schedule_interval='0 17 * * *',
    catchup=False,
    tags=['daily'],
    max_active_runs=2,
) as dag:

    data_ingestion_start = DummyOperator(task_id="data_ingestion_start")

    with TaskGroup(group_id="group_data_ingestion") as group_data_ingestion:
        for source in sources:
            ingest_table = BashOperator(
                task_id=f"ingestion_table_{source['task_ingest_name']}",
                bash_command="{}/task_delay.sh ".format(source['path_ingestion']),
                dag=dag,
            )

    data_ingestion_end = DummyOperator(task_id="data_ingestion_end")

    data_ingestion_start >> group_data_ingestion >> data_ingestion_end

and in airflow.cfg i have set up

parallelism = 36
max_active_tasks_per_dag = 12
dags_are_paused_at_creation = True
max_active_runs_per_dag = 5

this is when dynamic_Task_Concurrency run,dynamic_Task_Concurrency_two is all on scheduled until atleast some task of dynamic_Task_Concurrency is done enter image description here

this is when some of dynamic_Task_Concurrency is Done. enter image description here

i was expecting dynamic_Task_Concurrency and dynamic_Task_Concurrency_Two running together with 12 task run at the same time, which part i must change?thanks

0

There are 0 best solutions below