TriggerDagRunOperator and ExternalTaskSensor with dynamic execution_date

100 Views Asked by At

Trying to trigger one dag multiple times with different configs using TriggerDagRunOperator and ExternalTaskSensor. However, TriggerDagRunOperator takes parent DAGs execution_date (logical_date) for execution and that just reruns same instance of triggered DAG instead of running new instance with new config. Sample code

@task_group(group_id='refresh_pre-prod')
def refresh_pre_prod():
    prod_to_pre_prod = TriggerDagRunOperator (
        task_id='prod_to_pre_prod',
        trigger_dag_id="util_clone_bq_env",
        execution_date='{{ dag_run.logical_date }}',
        conf={
                "src_project_id":"production",
                "trg_project_id":"pre-production"
             }, 
        reset_dag_run = True)

    prod_to_pre_prod_sensor = ExternalTaskSensor(
        task_id='prod_to_pre_prod_sensor',
        external_dag_id='util_clone_bq_env',
        external_task_id='notify_completion',
        allowed_states=["success"],
        failed_states=["failed", "skipped", "upstream_failed"])

    prod_to_pre_prod >> prod_to_pre_prod_sensor

@task_group(group_id='refresh_demo')
def refresh_demo():
    prod_to_demo = TriggerDagRunOperator(
        task_id='prod_to_demo',
        trigger_dag_id="util_clone_bq_env",
        execution_date='{{ dag_run.logical_date }}',
        conf={
                "src_project_id":"production",
                "trg_project_id":"demo1"
             },
        reset_dag_run = True)

    prod_to_demo_sensor = ExternalTaskSensor(
        task_id='prod_to_demo_sensor',
        external_dag_id='util_clone_bq_env',
        external_task_id='notify_completion',
        allowed_states=["success"],
        failed_states=["failed", "skipped", "upstream_failed"])

    prod_to_demo >> prod_to_demo_sensor

Tried passing execution_date if different ways to TriggerDagRunOperator and ExternalTaskSensor but no luck.

I'm expecting a solution so I can trigger same DAG with different config

1

There are 1 best solutions below

1
On

Option 1:

Pass the trigger_run_id parameter to your TriggerDagRunOperator instance.

You can see in the airflow trigger_dagrun.py source code that when it's passed, it sets run_id to the value of trigger_run_id instead of from the execution_date.

...
if self.trigger_run_id:
  run_id = str(self.trigger_run_id)
else:
  run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date)
...

Option 2:

Pass the timestamp instead of the execution date like this answer shows: How do we trigger single airflow dag multipler times using TriggerDagRunOperator?