airflow - how to get start date of current dag_run (not specific task)?

2.9k Views Asked by At

Tasks 1, 2, 3, 4 in the same dag will insert to a db table. I then want task 7 to update the db table only for rows with timestamp >= the time of the start of the dagrun (not the start time of task 7).

Is there some jinja/kwarg/context macro i can use? I didn't see any example to get dagrun start_date (not exec date).

2

There are 2 best solutions below

0
On

context variable contain a number of variables containing information about the task context, including dag_run.start_date

context['dag_run'].start_date
0
On

kwargs['dag_run'].start_date will provide the start date (as opposed to the execution date) of the task:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.decorators import task

with DAG(
        "demo_dag", # Dag id                                                                             
        start_date=datetime(2021, 1 ,1),
        schedule_interval='* * * * *', # every minute                                                    
        catchup=False
) as dag:
    @task(task_id="task")
    def demo(**kwargs):
        print("kwargs['dag_run'].start_date:")
        print(kwargs["dag_run"].start_date)
        print("kwargs['dag_run'].execution_date:")
        print(kwargs["dag_run"].execution_date)

    task1 = demo()

This results in log entries similar to:

[2023-02-08, 09:43:01 NZDT] {logging_mixin.py:115} INFO - kwargs['dag_run'].start_date:
[2023-02-08, 09:43:01 NZDT] {logging_mixin.py:115} INFO - 2023-02-07 20:43:00.996729+00:00
[2023-02-08, 09:43:01 NZDT] {logging_mixin.py:115} INFO - kwargs['dag_run'].execution_date:
[2023-02-08, 09:43:01 NZDT] {logging_mixin.py:115} INFO - 2023-02-07 20:42:00+00:00

A discussion of the difference between start_date and execution_date can be found here https://infinitelambda.com/airflow-start-date-execution-date/