Skip to content

Instantly share code, notes, and snippets.

@LucasMMota
Last active August 24, 2021 23:19
Show Gist options
  • Save LucasMMota/7624d22bf7614b66b338e3d9367c613c to your computer and use it in GitHub Desktop.
Save LucasMMota/7624d22bf7614b66b338e3d9367c613c to your computer and use it in GitHub Desktop.
Example for testing SLA in Airflow with a custom function (sla_miss_callback)
from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
def my_alerting_function(dag, task_list, blocking_task_list, slas, blocking_tis):
"""Executes the user defined code (for example, post on Slack)"""
message = (
f":alert: *Airflow SLA Miss*\n\n"
f"DAG: {slas[0].dag_id}\n"
f"Task: {slas[0].task_id}\n"
f"Execution Date: {slas[0].execution_date.isoformat()}"
)
post_msg_in_slack(message)
dag = DAG(
dag_id="my_dag_to_test_sla",
catchup=False,
start_date=datetime(2021, 8, 28, 0, 0, 0),
# If schedule_interval=None, SLA won't be evaluated for this DAG!
schedule_interval="*/5 * * * *",
sla_miss_callback=my_alerting_function,
)
task_1 = BashOperator(
task_id='task_1',
bash_command='sleep 150',
dag=dag,
)
task_2 = BashOperator(
dag=dag,
task_id='task_2',
bash_command='echo 1',
# If the task finishes (or starts) after 120 seconds,
# it will register an SLA miss
sla=timedelta(seconds=120)
)
# The task `task_2` will start after 150 seconds
# so Airflow will register an SLA miss
task_1 >> task_2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment