Airflow sla_miss_callback function not triggering

13.7k Views Asked by At

I have been trying to get a slack message callback to trigger on SLA misses. I've noticed that:

  1. SLA misses get registered successfully in the Airflow web UI at slamiss/list/

  2. on_failure_callback works successfully

However, the sla_miss_callback function itself will never get triggered.

What I've tried:

  • Different combinations adding sla and sla_miss_callback at the default_args level, the DAG level, and the task level

  • Checking logs on our scheduler and workers for SLA related messages (see also here), but we haven't seen anything

  • The slack message callback function works if called from any other
    basic task or function

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': send_task_failed_msg_to_slack,
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=send_sla_miss_message_to_slack,
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

def sleep():
    """ Sleep for 2 minutes """
    time.sleep(90)
    LOGGER.info("Slept for 2 minutes")

def simple_print(**context):
    """ Prints a message """
    print("Hello World!")


sleep = PythonOperator(
    task_id="sleep",
    python_callable=sleep,
    dag=dag
)

simple_task = PythonOperator(
    task_id="simple_task",
    python_callable=simple_print,
    provide_context=True,
    dag=dag
)

sleep >> simple_task
7

There are 7 best solutions below

0
On

A lot of these answers are 90% complete so I wanted to share my example using bash operators which combined what I found from all of the responses above and other resources

The most important things being how you define sla_miss_callback in the dag definition and not in the default_args, and not passing context to the sla function.

"""
A simple example showing the basics of using a custom SLA notification response.
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
from airflow.operators.slack_operator import SlackAPIPostOperator
from slack import slack_attachment
from airflow.hooks.base_hook import BaseHook
import urllib

#slack alert for sla_miss
def slack_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis):
    dag_id = slas[0].dag_id
    task_id = slas[0].task_id
    execution_date = slas[0].execution_date.isoformat()
    base_url = 'webserver_url_here'
    encoded_execution_date = urllib.parse.quote_plus(execution_date)
    dag_url = (f'{base_url}/graph?dag_id={dag_id}'
               f'&execution_date={encoded_execution_date}')
    message = (f':alert: *Airflow SLA Miss*'
               f'\n\n'
               f'*DAG:* {dag_id}\n'
               f'*Task:* {task_id}\n'
               f'*Execution Date:* {execution_date}'
               f'\n\n'
               f'<{dag_url}|Click here to view DAG>')

    sla_miss_alert = SlackAPIPostOperator(
        task_id='slack_sla_miss',
        channel='airflow-alerts-test',
        token=str(BaseHook.get_connection("slack").password),
        text = message
    )
    return sla_miss_alert.execute()

#slack alert for successful task completion
def slack_success_task(context):
    success_alert = SlackAPIPostOperator(
        task_id='slack_success',
        channel='airflow-alerts-test',
        token=str(BaseHook.get_connection("slack").password),
        text = "Test successful"
    )
    return success_alert.execute(context=context)


default_args = {
    "depends_on_past": False,
    'start_date': datetime(2020, 11, 18),
    "retries": 0
}

# Create a basic DAG with our args
# Note: Don't put sla_miss_callback=sla_miss_alert in default_args. It should be defined in the DAG definition itself.
dag = DAG(
    dag_id='sla_slack_v6',
    default_args=default_args,
    sla_miss_callback=slack_sla_miss,
    catchup=False,
   # A common interval to make the job fire when we run it
    schedule_interval=timedelta(minutes=3)
)

# Add a task that will always fail the SLA
t1 = BashOperator(
    task_id='timeout_test_sla_miss',
    # Sleep 60 seconds to guarantee we miss the SLA
    bash_command='sleep 60',
    # Do not retry so the SLA miss fires after the first execution
    retries=0,
    #on_success_callback = slack_success_task,
    provide_context = True,
    # Set our task up with a 10 second SLA
    sla=timedelta(seconds=10),
    dag=dag
    )

t2 = BashOperator(
    task_id='timeout_test_sla_miss_task_2',
    # Sleep 30 seconds to guarantee we miss the SLA of 20 seconds set in this task
    bash_command='sleep 30',
    # Do not retry so the SLA miss fires after the first execution
    retries=0,
    #on_success_callback = slack_success_task,
    provide_context = True,
    # Set our task up with a 20 second SLA
    sla=timedelta(seconds=20),
    dag=dag
    )

t3 = BashOperator(
    task_id='timeout_test_sla_miss_task_3',
    # Sleep 60 seconds to guarantee we miss the SLA
    bash_command='sleep 60',
    # Do not retry so the SLA miss fires after the first execution
    retries=0,
    #on_success_callback = slack_success_task,
    provide_context = True,
    # Set our task up with a 30 second SLA
    sla=timedelta(seconds=30),
    dag=dag
    )
 
t1 >> t2 >> t3
1
On

I had the same issue, but was able to get it working with this code:


import logging as log
import airflow
import time
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python_operator import PythonOperator
from airflow import configuration
import urllib
from airflow.operators.slack_operator import SlackAPIPostOperator


def sleep():
    """ Sleep for 2 minutes """
    time.sleep(60*2)
    log.info("Slept for 2 minutes")


def simple_print(**context):
    """ Prints a message """
    print("Hello World!")


def slack_on_sla_miss(dag,
                      task_list,
                      blocking_task_list,
                      slas,
                      blocking_tis):

    log.info('Running slack_on_sla_miss')

    slack_conn_id = 'slack_default'
    slack_channel = '#general'

    dag_id = slas[0].dag_id
    task_id = slas[0].task_id
    execution_date = slas[0].execution_date.isoformat()

    base_url = configuration.get('webserver', 'BASE_URL')
    encoded_execution_date = urllib.parse.quote_plus(execution_date)
    dag_url = (f'{base_url}/graph?dag_id={dag_id}'
               f'&execution_date={encoded_execution_date}')

    message = (f':o: *Airflow SLA Miss*'
               f'\n\n'
               f'*DAG:* {dag_id}\n'
               f'*Task:* {task_id}\n'
               f'*Execution Date:* {execution_date}'
               f'\n\n'
               f'<{dag_url}|Click here to view>')

    slack_op = SlackAPIPostOperator(task_id='slack_failed',
                                    slack_conn_id=slack_conn_id,
                                    channel=slack_channel,
                                    text=message)
    slack_op.execute()


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0, minute=1),
    "retries": 0,
    'priority_weight': 1,
}

dag = DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=slack_on_sla_miss,
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
)

with dag:
    sleep = PythonOperator(
        task_id="sleep",
        python_callable=sleep,
        )

    simple_task = PythonOperator(
        task_id="simple_task",
        python_callable=simple_print,
        provide_context=True,
        sla=timedelta(minutes=1),
        )

    sleep >> simple_task
1
On

I've run into this issue myself. Unlike the on_failure_callback that is looking for a python callable function, it appears that sla_miss_callback needs the full function call.

An example that is working for me:

def sla_miss_alert(dag_id):
    """
    Function that alerts me that dag_id missed sla
    """
    <function code here>

def task_failure_alert(dag_id, context):
    """
    Function that alerts me that a task failed
    """
    <function code here>


dag_id = 'sla_test'
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    'start_date': airflow.utils.dates.days_ago(n=0,minute=1),
    'on_failure_callback': partial(task_failure_alert, dag_id),
    'sla': timedelta(minutes=1),
    "retries": 0, 
    "pool": 'canary',
    'priority_weight': 1
}

dag = airflow.DAG(
    dag_id='sla_test',
    default_args=default_args,
    sla_miss_callback=sla_miss_alert(dag_id),
    schedule_interval='*/5 * * * *',
    catchup=False,
    max_active_runs=1,
    dagrun_timeout=timedelta(minutes=5)
)

As far as I can tell, sla_miss_callback doesn't have access to context, which is unfortunate. Once I stopped looking for the context, I finally got my alerts.

0
On

I think the airflow documentation is a bit fuzzy on this.

Instead of the method signature as

def slack_sla_miss(dag, task_list, blocking_task_list, slas, blocking_tis)

Modify your signature like this

def slack_sla_miss(*args, **kwargs)

This way all the parameters get passed. You will not get the errors which you are seeing in the logs.

Learnt this on url - https://www.cloudwalker.io/2020/12/15/airflow-sla-management/

3
On

Airflow 1 (not tested on Airflow 2!)

Example of using SLA missed and Execution Timeout alerts:

  • At first, you'll get SLA missed after 2 minutes task run,
  • and then, after 4 minutes task will fail with Execution Timeout alert.
"sla": timedelta(minutes=2),  # Default Task SLA time
"execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout

Also, you have log_url right in the message, so you can easily open task log in Airflow.

Example Slack Message

import time
from datetime import datetime, timedelta
from textwrap import dedent
from typing import Any, Dict, List, Optional, Tuple

from airflow import AirflowException
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
from airflow.exceptions import AirflowTaskTimeout
from airflow.hooks.base_hook import BaseHook
from airflow.models import DAG, TaskInstance
from airflow.operators.python_operator import PythonOperator

SLACK_STATUS_TASK_FAILED = ":red_circle: Task Failed"
SLACK_STATUS_EXECUTION_TIMEOUT = ":alert: Task Failed by Execution Timeout."


def send_slack_alert_sla_miss(
        dag: DAG,
        task_list: str,
        blocking_task_list: str,
        slas: List[Tuple],
        blocking_tis: List[TaskInstance],
) -> None:
    """Send `SLA missed` alert to Slack"""
    task_instance: TaskInstance = blocking_tis[0]
    message = dedent(
        f"""
        :warning: Task SLA missed.
        *DAG*: {dag.dag_id}
        *Task*: {task_instance.task_id}
        *Execution Time*: {task_instance.execution_date.strftime("%Y-%m-%d %H:%M:%S")} UTC
        *SLA Time*: {task_instance.task.sla}
        _* Time by which the job is expected to succeed_
        *Task State*: `{task_instance.state}`
        *Blocking Task List*: {blocking_task_list}
        *Log URL*: {task_instance.log_url}
        """
    )
    send_slack_alert(message=message)


def send_slack_alert_task_failed(context: Dict[str, Any]) -> None:
    """Send `Task Failed` notification to Slack"""
    task_instance: TaskInstance = context.get("task_instance")
    exception: AirflowException = context.get("exception")

    status = SLACK_STATUS_TASK_FAILED
    if isinstance(exception, AirflowTaskTimeout):
        status = SLACK_STATUS_EXECUTION_TIMEOUT

    # Prepare formatted Slack message
    message = dedent(
        f"""
        {status}
        *DAG*: {task_instance.dag_id}
        *Task*: {task_instance.task_id}
        *Execution Time*: {context.get("execution_date").to_datetime_string()} UTC
        *SLA Time*: {task_instance.task.sla}
        _* Time by which the job is expected to succeed_
        *Execution Timeout*: {task_instance.task.execution_timeout}
        _** Max time allowed for the execution of this task instance_
        *Task Duration*: {timedelta(seconds=round(task_instance.duration))}
        *Task State*: `{task_instance.state}`
        *Exception*: {exception}
        *Log URL*: {task_instance.log_url}
        """
    )
    send_slack_alert(
        message=message,
        context=context,
    )


def send_slack_alert(
        message: str,
        context: Optional[Dict[str, Any]] = None,
) -> None:
    """Send prepared message to Slack"""
    slack_webhook_token = BaseHook.get_connection("slack").password
    notification = SlackWebhookOperator(
        task_id="slack_notification",
        http_conn_id="slack",
        webhook_token=slack_webhook_token,
        message=message,
        username="airflow",
    )
    notification.execute(context)


# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    "owner": "airflow",
    "email": ["test@test,com"],
    "email_on_failure": True,
    "depends_on_past": False,
    "retry_delay": timedelta(minutes=5),
    "sla": timedelta(minutes=2),  # Default Task SLA time
    "execution_timeout": timedelta(minutes=4),  # Default Task Execution Timeout
    "on_failure_callback": send_slack_alert_task_failed,
}

with DAG(
        dag_id="test_sla",
        schedule_interval="*/5 * * * *",
        start_date=datetime(2021, 1, 11),
        default_args=default_args,
        sla_miss_callback=send_slack_alert_sla_miss,  # Must be set here, not in default_args!
) as dag:
    delay_python_task = PythonOperator(
        task_id="delay_five_minutes_python_task",
#MIKE MILLIGAN ADDED THIS
        sla=timedelta(minutes=2),
        python_callable=lambda: time.sleep(300),
    )
3
On

It seems that the only way to make the sla_miss_callback work is by explicitly passing the arguments that it needs... nothing else has worked for me and these arguments: 'dag', 'task_list', 'blocking_task_list', 'slas', and 'blocking_tis' are not been sent to the callback at all.

TypeError: print_sla_miss() missing 5 required positional arguments: 'dag', 'task_list', 'blocking_task_list', 'slas', and 'blocking_tis'

3
On

I was in similar situation once.
On investigating the scheduler log, I found the following error:

[2020-07-08 09:14:32,781] {scheduler_job.py:534} INFO -  --------------> ABOUT TO CALL SLA MISS CALL BACK  
[2020-07-08 09:14:32,781] {scheduler_job.py:541} ERROR - Could not call sla_miss_callback for DAG 
sla_miss_alert() takes 1 positional arguments but 5 were given

The problem is that your sla_miss_callback function is expecting only 1 argument, but actually this should be like:

def sla_miss_alert(dag, task_list, blocking_task_list, slas, blocking_tis):
    """Function that alerts me that dag_id missed sla"""
    # <function code here>

For reference, checkout the Airflow source code.

Note: Don't put sla_miss_callback=sla_miss_alert in default_args. It should be defined in the DAG definition itself.