I have been trying to get a slack message callback to trigger on SLA misses. I've noticed that:
SLA misses get registered successfully in the Airflow web UI at
slamiss/list/
on_failure_callback
works successfully
However, the sla_miss_callback
function itself will never get triggered.
What I've tried:
Different combinations adding
sla
andsla_miss_callback
at thedefault_args
level, the DAG level, and the task levelChecking logs on our scheduler and workers for SLA related messages (see also here), but we haven't seen anything
The slack message callback function works if called from any other
basic task or function
default_args = {
"owner": "airflow",
"depends_on_past": False,
'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
'on_failure_callback': send_task_failed_msg_to_slack,
'sla': timedelta(minutes=1),
"retries": 0,
"pool": 'canary',
'priority_weight': 1
}
dag = airflow.DAG(
dag_id='sla_test',
default_args=default_args,
sla_miss_callback=send_sla_miss_message_to_slack,
schedule_interval='*/5 * * * *',
catchup=False,
max_active_runs=1,
dagrun_timeout=timedelta(minutes=5)
)
def sleep():
""" Sleep for 2 minutes """
time.sleep(90)
LOGGER.info("Slept for 2 minutes")
def simple_print(**context):
""" Prints a message """
print("Hello World!")
sleep = PythonOperator(
task_id="sleep",
python_callable=sleep,
dag=dag
)
simple_task = PythonOperator(
task_id="simple_task",
python_callable=simple_print,
provide_context=True,
dag=dag
)
sleep >> simple_task
A lot of these answers are 90% complete so I wanted to share my example using bash operators which combined what I found from all of the responses above and other resources
The most important things being how you define sla_miss_callback in the dag definition and not in the default_args, and not passing context to the sla function.