I am working with Apache Airflow and need to create a DAG that behaves differently based on the outcome of a SqlSensor
. Specifically, I want to perform Action A when the SqlSensor
gets skipped (i.e., when the SQL check returns no data), and Action B when the SqlSensor
succeeds (i.e., when the SQL check finds data). The sensor is configured with soft_fail=True
.
oracle_sensor = SqlSensor(
task_id='check_for_records_with_load_date',
conn_id='oracle_db',
sql="select count(*) etc"
mode='poke',
timeout=60,
soft_fail=True
)
I don't know how to make the branch dependent on on the status of oracle_sensor, i.e skipped/successful.
The only thing I've found is something like this:
def branch_function(**context):
task_instance = context['task_instance']
sensor_task_id = 'check_for_records_with_load_date'
if task_instance.xcom_pull(task_ids=sensor_task_id) is None:
return 'end_dag'
else:
return 'continue_processing'
(...)
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=branch_function,
provide_context=True,
dag=dag,
)
But it depends on the xcoms. And I checked the xcom for my sensor (I opened it in the webUI) and in both cases (skipped/success), the xcom is empty.
How do I achieve it? Basically, if the data isn't found i want the dag to end immediately, successfuly. And if the data is found i want to proceed with transformations.
Thank you
Sensors are intended for polling a resource until certain condition gets fulfilled. For branching based on the results of the query there is BranchSQLOperator just for that.
Just be careful to provide a query that returns a boolean (or string that can be interpreted as boolean, ie. true/false).