I have been trying to get a slack message callback to trigger on SLA misses. I've noticed that:
SLA misses get registered successfully in the Airflow web UI at
slamiss/list/on_failure_callbackworks successfully
However, the sla_miss_callback function itself will never get triggered.
What I've tried:
Different combinations adding
slaandsla_miss_callbackat thedefault_argslevel, the DAG level, and the task levelChecking logs on our scheduler and workers for SLA related messages (see also here), but we haven't seen anything
The slack message callback function works if called from any other
basic task or function
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': send_task_failed_msg_to_slack,
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=send_sla_miss_message_to_slack,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
def sleep():
""" Sleep for 2 minutes """
time.sleep(90)
LOGGER.info("Slept for 2 minutes")
def simple_print(**context):
""" Prints a message """
print("Hello World!")
sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
dag=dag
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
dag=dag
)
sleep >> simple_task
I've run into this issue myself. Unlike the
on_failure_callbackthat is looking for a python callable function, it appears thatsla_miss_callbackneeds the full function call.An example that is working for me:
As far as I can tell, sla_miss_callback doesn't have access to context, which is unfortunate. Once I stopped looking for the context, I finally got my alerts.