Need to do hourly table refresh/loads using Airflow/Python from an oracle datasource using last_modified_dt timestamp column.

In Airflow there's airflow.models.taskinstance API which exposes data from task_instance metadata table and which has following fields (shown with sample data), assuming the dag/tasks first execution date/time was 1/1/2020 05:00 :-

task_id, dag_id, execution_datetime (of dag), start_date, end_date, duration, state, ....
task_a, oracle, 1/1/2020 05:00:00, 1/1/2020 05:00:00, 1/1/2020 05:05:00, 0.5, success, ....
task_b, oracle, 1/1/2020 05:00:00, 1/1/2020 05:01:00, 1/1/2020 05:04:00, 0.3, success, ....
task_c, oracle, 1/1/202005:00:00, 1/1/2020 05:02:00, 1/1/2020 05:06:00, 0.4, success, ....

So, am thinking of using this task_instance metadata table or API to get the previous start datetime of each task and its state (success) and use that in a condition like below:

So, when running after an hour at 1/1/2020 06:00:00 :-

select * from table_a where last_mod_dttm > prev(start_datetime of task_id=task_a) and state = sucesss;
select * from table_b where last_mod_dttm > prev(start_datetime of task_id=task_b) and state = sucesss;
select * from table_c where last_mod_dttm > prev(start_datetime of task_id=task_c) and state = sucesss;

Is this approach right ? If yes, then querying directly airflow metadata task_instance table every time to get the previous or max(start_datetime) of a task(s) would have any performance implications? If yes, then how do we get the previous start_datetime and "success" state of a task via airflow.models.taskinstance API (https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html)

Thanks!

1

There are 1 best solutions below

9
On BEST ANSWER

First it's important to understand how execution_date works, see Scheduler Doc:

The scheduler won’t trigger your tasks until the period it covers has ended e.g., A job with schedule_interval set as @daily runs after the day has ended. This technique makes sure that whatever data is required for that period is fully available before the dag is executed. In the UI, it appears as if Airflow is running your tasks a day late.

If you run a DAG on a schedule_interval of one day, the run with execution_date 2019-11-21 triggers soon after 2019-11-21T23:59.

Let’s Repeat That, the scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

This means by referencing execution_date you get exactly the time when the last run was triggered.

Regarding the query, I wouldn't query the database to get the last execution date but rather use Macros that come out of the box with Airflow - see this reference:

You should be able to just use {{ execution_date }} in your query and Airflow should replace it when the DAG run is triggered.