How to branch tasks in Airflow DAG based on SqlSensor success or skip status?

315 Views Asked by At

I am working with Apache Airflow and need to create a DAG that behaves differently based on the outcome of a SqlSensor. Specifically, I want to perform Action A when the SqlSensor gets skipped (i.e., when the SQL check returns no data), and Action B when the SqlSensor succeeds (i.e., when the SQL check finds data). The sensor is configured with soft_fail=True.

oracle_sensor = SqlSensor(
                    task_id='check_for_records_with_load_date',
                    conn_id='oracle_db',
                    sql="select count(*) etc"
                    mode='poke', 
                    timeout=60, 
                    soft_fail=True 
                     )

I don't know how to make the branch dependent on on the status of oracle_sensor, i.e skipped/successful.

The only thing I've found is something like this:

def branch_function(**context):
    task_instance = context['task_instance']
    sensor_task_id = 'check_for_records_with_load_date'
        if task_instance.xcom_pull(task_ids=sensor_task_id) is None:
        return 'end_dag'
    else:
        return 'continue_processing'
(...)

branch_task = BranchPythonOperator(
    task_id='branch_task',
    python_callable=branch_function,
    provide_context=True,
    dag=dag,
)

But it depends on the xcoms. And I checked the xcom for my sensor (I opened it in the webUI) and in both cases (skipped/success), the xcom is empty.

How do I achieve it? Basically, if the data isn't found i want the dag to end immediately, successfuly. And if the data is found i want to proceed with transformations.

Thank you

1

There are 1 best solutions below

1
On

Sensors are intended for polling a resource until certain condition gets fulfilled. For branching based on the results of the query there is BranchSQLOperator just for that.

from airflow.providers.common.sql.operators.sql import BranchSQLOperator

branch_task = BranchSQLOperator(
    task_id='check_for_records_with_load_date',
    conn_id='oracle_db',
    sql="select case when count(*) = 0 then 'false' else 'true' end etc",
    follow_task_ids_if_true="continue_processing",
    follow_task_ids_if_false="end_dag"
)

Just be careful to provide a query that returns a boolean (or string that can be interpreted as boolean, ie. true/false).