Need to do hourly table refresh/loads using Airflow/Python from an oracle datasource using last_modified_dt timestamp column.
In Airflow there's airflow.models.taskinstance API which exposes data from task_instance metadata table and which has following fields (shown with sample data), assuming the dag/tasks first execution date/time was 1/1/2020 05:00 :-
task_id, dag_id, execution_datetime (of dag), start_date, end_date, duration, state, ....
task_a, oracle, 1/1/2020 05:00:00, 1/1/2020 05:00:00, 1/1/2020 05:05:00, 0.5, success, ....
task_b, oracle, 1/1/2020 05:00:00, 1/1/2020 05:01:00, 1/1/2020 05:04:00, 0.3, success, ....
task_c, oracle, 1/1/202005:00:00, 1/1/2020 05:02:00, 1/1/2020 05:06:00, 0.4, success, ....
So, am thinking of using this task_instance metadata table or API to get the previous start datetime of each task and its state (success) and use that in a condition like below:
So, when running after an hour at 1/1/2020 06:00:00 :-
select * from table_a where last_mod_dttm > prev(start_datetime of task_id=task_a) and state = sucesss;
select * from table_b where last_mod_dttm > prev(start_datetime of task_id=task_b) and state = sucesss;
select * from table_c where last_mod_dttm > prev(start_datetime of task_id=task_c) and state = sucesss;
Is this approach right ? If yes, then querying directly airflow metadata task_instance table every time to get the previous or max(start_datetime) of a task(s) would have any performance implications? If yes, then how do we get the previous start_datetime and "success" state of a task via airflow.models.taskinstance API (https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html)
Thanks!
First it's important to understand how
execution_date
works, see Scheduler Doc:This means by referencing
execution_date
you get exactly the time when the last run was triggered.Regarding the query, I wouldn't query the database to get the last execution date but rather use Macros that come out of the box with Airflow - see this reference:
You should be able to just use
{{ execution_date }}
in your query and Airflow should replace it when the DAG run is triggered.