Airflow 2: Check if a task still exists or not with task_id

925 Views Asked by At

we have created a task for sensor operation, but the task name will be dynamically updated. i.e., f"{table_name}_s3_exists". We have a scenario where we have to check a table's location twice, but if the task is still present, we don't have to create the sensor. Is there a way to find whether the task exists or not within the DAG during building ?

3

There are 3 best solutions below

0
On

You could try the get_tasks endpoint in the Airflow REST API. The endpoint returns a lot of information for tasks in a given DAG.

1
On

The CLI command

airflow tasks list [-h] [-S SUBDIR] [-t] [-v] dag_id

will give you list of all the dags.

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#list_repeat6

You can also use the REST API to get the same info: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_tasks

0
On

If you are looking for a programmatic way to solve this problem, why not maintain a list of the task ids that you have utilized and check if the id is present in the list or not?

Example: Let's say you have a list of bucket ids looping over which you are creating the tasks in airflow

buckets = ['1', '2', '3'] # and so on
task_ids = []
for b in buckets:
    if b not in task_ids:
        task = BashOperator() # create task
        previous_tas >> task
        task_ids.append(b)

Hope this helps.