Airflow TriggerDagRunOperator does nothing

320 Views Asked by At

So I have 2 DAGs, One is simple to fetch some data from an API and start another more complex DAG for each item.

This is my code:

from datetime import datetime, timedelta


from airflow import DAG
from airflow.api.client.local_client import Client
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 
from nop_collector_fw_scripts.fetch_data_from_fw_module import \
    get_data_from_fwmodule
 
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2023, 10, 18),
    "retries": 0,
    "retry_delay": timedelta(minutes=5), }
 
dag = DAG(
    "poll_cisco_asa_dag",
    default_args=default_args,
    schedule_interval=None,  # Set your desired schedule interval
)
 
 
def fetch_tenant_data_func(**kwargs):
    # …some logic to get the tenants…
    ti.xcom_push(key="tenant_data", value=tenant_data)
 
fetch_tenant_data_task = PythonOperator(
    task_id="fetch_tenant_data",
    python_callable=fetch_tenant_data_func,
    provide_context=True,
    dag=dag,
)
 
def fetch_device_data_func(**kwargs):
    # …some logic to get the device data …
    ti.xcom_push(key="device_data", value=device_data)
 
fetch_device_data_task = PythonOperator(
    task_id="fetch_device_data",
    python_callable=fetch_device_data_func,
    provide_context=True,
    dag=dag,
)
 
 
# Loop through the fetched data and trigger new DAG runs for each device
trigger_dag_tasks = []
 
def trigger_dag_task_func(**kwargs):
    ti = kwargs["ti"]
    device_data = ti.xcom_pull(task_ids="fetch_device_data_task", key="device_data")
    if device_data:
        for device in device_data:
           trigger_dag_task = TriggerDagRunOperator(
                task_id=f'trigger_device_dag_task_{device["name"]}',
                trigger_dag_id="device _dag",
                dag=dag,
                conf={
                    "device": device  # Pass the entire dictionary as a parameter
                }
            )
            trigger_dag_task.execute(context=kwargs)
            trigger_dag_tasks.append(trigger_dag_task)
 
trigger_dag_task = PythonOperator(
    task_id="trigger_dag_task",
    python_callable=trigger_dag_task_func,
    provide_context=True,
    dag=dag,
)
 
fetch_tenant_data_task >> fetch_device_data_task >> trigger_dag_task

If I run the DAG it completes with no error. I can see the data in XCOM. But the other DAG is never triggered. I don´t See any problems in logs either. All completes with success.

Any help would be very much appreciated.

1

There are 1 best solutions below

0
On

Without having the full code and the same environment as you have, it's a tad difficult to fully test, but I believe your code (though a bit hacky, because is sort of dynamically sticking a bunch of TriggerDagRunOperator(s) in the DAG, which is not something Airflow wants to do and could be problematic with their visibility and naming) should work. In the sense that when you run trigger_dag_task.execute(...) a new DagRun will be created and started. Well... sorta/kinda, because what happens is that a DagRun object is added to the database with a state State.RUNNING but let's not worry about that right now.

Make sure that:

  1. The DAG name... erm... ID... erm... the value of the trigger_dag_id argument in the TriggerDagRunOperator is correct and has no typos. Just in case, though you should be getting an error if there were.

  2. The DAG that you want to programmatically trigger is Active in the UI (there's a slider: make sure it's on)

    enter image description here

  3. That the task that is going to push to Xcom has the argument do_xcom_push=True. It shouldn't be this, because I believe most operators have it active by default, but just in case.

With all that in mind, you can try something like this:

import random
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


# Producer:

def produce_data_fn(**context):
    my_data = [random.random() for _ in range(random.randint(1, 5))]
    print(f"Returning {len(my_data)} data points: {my_data}")
    context['task_instance'].xcom_push('my_data', my_data)


def trigger_consumers_fn(**context):
    produced_data = context['task_instance'].xcom_pull(
        task_ids='produce_data_task', 
        key='my_data')
    if not produced_data:
        return
    for i, d in enumerate(produced_data):
        trigger_dag_task = TriggerDagRunOperator(
            task_id=f'trigger_dag_task_{i}',
            trigger_dag_id='SO-77357880-3-consumer',
            dag=context['dag'],
            conf={
                "my_data": d
            }
        )
        trigger_dag_task.execute(context=context)


with DAG(dag_id='SO-77357880-3-producer', schedule=None,
         start_date=datetime(year=2021, month=1, day=1)) as dag_producer:
    produce_data_task = PythonOperator(
        task_id="produce_data_task",
        python_callable=produce_data_fn,
        provide_context=True,
        do_xcom_push=True,
        dag=dag_producer,
    )
    trigger_consumers_task = PythonOperator(
        task_id="trigger_consumers_task",
        python_callable=trigger_consumers_fn,
        provide_context=True,
        dag=dag_producer,
    )

    produce_data_task >> trigger_consumers_task


# Consumer:

def consume_data_fn(**context):
    print(f"Processing data point {context['dag_run'].conf.get('my_data')}")


with DAG(dag_id='SO-77357880-3-consumer', schedule=None,
         start_date=datetime(year=2021, month=1, day=1)) as dag_consumer:
    PythonOperator(
        task_id="consume_data_task",
        python_callable=consume_data_fn,
        provide_context=True,
        dag=dag_consumer,
    )

And, sure enough, there will be a DAG producing values and triggering one run of the consumer DAG per value:

enter image description here

enter image description here

Notice that even though you "added" one TriggerDagRunOperator per datapoint returned, those don't show up in the DAG. That's one of the consequences of "dynamically" adding operators in the middle of the execution. As I mentioned before, the only thing you need from the TriggerDagRunOperator is its ability to create a new DagRun object in the database with a .RUNNING state. You could very well substitute it by:

from airflow.models import DagBag
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from airflow.utils import timezone as dt
# ... more imports...

def trigger_consumers_fn(**context):
    produced_data = context['task_instance'].xcom_pull(task_ids='produce_data_task', key='my_data')
    if not produced_data:
        return
    
    trigger_dag = DagBag(settings.DAGS_FOLDER).get_dag('SO-77357880-3-consumer')
    execute_date = dt.utcnow().replace(microsecond=0)
    for i, d in enumerate(produced_data):
        # This is pretty much the same than
        # what TriggerDagRunOperator.execute() does:
        trigger_dag.create_dagrun(
            run_id=f'trig__{i + 1}_{execute_date.isoformat()}',
            run_type=DagRunType.MANUAL,
            state=State.RUNNING,
            conf={'my_data': d},
            external_trigger=True,
        )

This could be interesting in the sense that

  1. It doesn't dynamically add tasks to your DAG (Airflow doesn't like DAGs whose structure can sort of randomly change over time)
  2. It gives you a bit more control over the task's run_id naming structure, which could be beneficial later on in the UI, or to identify logs, or something like that.
  3. It might be less confusing for both Airflow and humans that expect an Operator to produce tasks in the DAG (which is not quite what you are doing with the TriggerDagRunOperator: you're just taking advantage of the fact that its .execute() method happens to do what you want)

Now: if you could alter the producer data function (def fetch_device_data_task in your code) a little bit so it returns a list of dicts (some iterable that can be expand-ed and that contains dicts), you could use the brand new new-ish (it's been there since version 2.3) Airflow's ability to create dynamic tasks and pass those dictionaries as the triggered DAG's config:

import random
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

# Producer:
with DAG(dag_id='SO-77357880-1-producer', schedule=None,
         start_date=datetime(year=2021, month=1, day=1)):
    @task(task_id='produce_data_task')
    def produce_data():
        my_data = [{'my_data': random.random()} for _ in range(random.randint(1, 5))]
        print(f"Returning {len(my_data)} data points: {my_data}")
        return my_data


    run_dags = TriggerDagRunOperator.partial(
        task_id='trigger_consumers_task',
        trigger_dag_id='SO-77357880-1-consumer',
    ).expand(
        conf=produce_data()
    )


# Consumer:
def consume_data_fn(**context):
    print(f"Processing data point {context['dag_run'].conf.get('my_data')}")


with DAG(dag_id='SO-77357880-1-consumer', schedule=None,
         start_date=datetime(year=2021, month=1, day=1)) as dag_consumer:
    PythonOperator(
        task_id="consume_data",
        python_callable=consume_data_fn,
        provide_context=True,
        dag=dag_consumer,
    )

Or... who knows... Now that you've seen the ability that newer (>= version 2.3) Airflows have to create dynamic tasks, maybe you'll decide that you don't need a separate DAG, after all