So I have 2 DAGs, One is simple to fetch some data from an API and start another more complex DAG for each item.
This is my code:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.api.client.local_client import Client
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from nop_collector_fw_scripts.fetch_data_from_fw_module import \
get_data_from_fwmodule
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2023, 10, 18),
"retries": 0,
"retry_delay": timedelta(minutes=5), }
dag = DAG(
"poll_cisco_asa_dag",
default_args=default_args,
schedule_interval=None, # Set your desired schedule interval
)
def fetch_tenant_data_func(**kwargs):
# …some logic to get the tenants…
ti.xcom_push(key="tenant_data", value=tenant_data)
fetch_tenant_data_task = PythonOperator(
task_id="fetch_tenant_data",
python_callable=fetch_tenant_data_func,
provide_context=True,
dag=dag,
)
def fetch_device_data_func(**kwargs):
# …some logic to get the device data …
ti.xcom_push(key="device_data", value=device_data)
fetch_device_data_task = PythonOperator(
task_id="fetch_device_data",
python_callable=fetch_device_data_func,
provide_context=True,
dag=dag,
)
# Loop through the fetched data and trigger new DAG runs for each device
trigger_dag_tasks = []
def trigger_dag_task_func(**kwargs):
ti = kwargs["ti"]
device_data = ti.xcom_pull(task_ids="fetch_device_data_task", key="device_data")
if device_data:
for device in device_data:
trigger_dag_task = TriggerDagRunOperator(
task_id=f'trigger_device_dag_task_{device["name"]}',
trigger_dag_id="device _dag",
dag=dag,
conf={
"device": device # Pass the entire dictionary as a parameter
}
)
trigger_dag_task.execute(context=kwargs)
trigger_dag_tasks.append(trigger_dag_task)
trigger_dag_task = PythonOperator(
task_id="trigger_dag_task",
python_callable=trigger_dag_task_func,
provide_context=True,
dag=dag,
)
fetch_tenant_data_task >> fetch_device_data_task >> trigger_dag_task
If I run the DAG it completes with no error. I can see the data in XCOM. But the other DAG is never triggered. I don´t See any problems in logs either. All completes with success.
Any help would be very much appreciated.
Without having the full code and the same environment as you have, it's a tad difficult to fully test, but I believe your code (though a bit hacky, because is sort of dynamically sticking a bunch of
TriggerDagRunOperator
(s) in the DAG, which is not something Airflow wants to do and could be problematic with their visibility and naming) should work. In the sense that when you runtrigger_dag_task.execute(...)
a new DagRun will be created and started. Well... sorta/kinda, because what happens is that aDagRun
object is added to the database with a stateState.RUNNING
but let's not worry about that right now.Make sure that:
The DAG name... erm... ID... erm... the value of the
trigger_dag_id
argument in theTriggerDagRunOperator
is correct and has no typos. Just in case, though you should be getting an error if there were.The DAG that you want to programmatically trigger is Active in the UI (there's a slider: make sure it's on)
That the task that is going to push to Xcom has the argument
do_xcom_push=True
. It shouldn't be this, because I believe most operators have it active by default, but just in case.With all that in mind, you can try something like this:
And, sure enough, there will be a DAG producing values and triggering one run of the consumer DAG per value:
Notice that even though you "added" one
TriggerDagRunOperator
per datapoint returned, those don't show up in the DAG. That's one of the consequences of "dynamically" adding operators in the middle of the execution. As I mentioned before, the only thing you need from theTriggerDagRunOperator
is its ability to create a newDagRun
object in the database with a.RUNNING
state. You could very well substitute it by:This could be interesting in the sense that
run_id
naming structure, which could be beneficial later on in the UI, or to identify logs, or something like that.TriggerDagRunOperator
: you're just taking advantage of the fact that its.execute()
method happens to do what you want)Now: if you could alter the producer data function (
def fetch_device_data_task
in your code) a little bit so it returns a list of dicts (some iterable that can beexpand
-ed and that contains dicts), you could use thebrand newnew-ish (it's been there since version 2.3) Airflow's ability to create dynamic tasks and pass those dictionaries as the triggered DAG's config:Or... who knows... Now that you've seen the ability that newer (>= version 2.3) Airflows have to create dynamic tasks, maybe you'll decide that you don't need a separate DAG, after all