Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 ) #8545

Open
wants to merge 150 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
150 commits
Select commit Hold shift + click to select a range
94aa0fb
Refactor SLA management as several DAG methods with comments
jmeickle May 2, 2018
e27e66c
Dedent logic
jmeickle May 2, 2018
b6f1134
Cleanup log message
jmeickle May 2, 2018
819cac3
Do not send SLAs on skipped TIs.
jmeickle May 22, 2018
0981af5
Add expected_duration, expected_start, expected_finish. Deprecate SLA.
jmeickle May 22, 2018
dce956e
Reference the correct parameter name
jmeickle May 22, 2018
48abfe2
Warn on invalid combinations of SLA parameters.
jmeickle May 22, 2018
e3ce6fa
Add a helper for determining whether a task has SLAs set.
jmeickle May 23, 2018
3404bee
Rename function and add comments
jmeickle May 23, 2018
030cae6
Reimplement recording SLA misses with helper functions
jmeickle May 23, 2018
982131d
Add SLA helper functions in utils
jmeickle May 23, 2018
f349b1e
Move email default callback into a helper function
jmeickle May 23, 2018
f481c98
Update documentation.
jmeickle May 23, 2018
8608403
Move sla miss callback to tasks
jmeickle May 23, 2018
667c5e0
Remove unused SLA attribute
jmeickle May 23, 2018
e3bd868
Add type column and constants to SlaMiss
jmeickle May 23, 2018
6ce08ec
Fix imports and indentation
jmeickle Jun 22, 2018
64a7254
Remove unused import
jmeickle Jun 29, 2018
3ed866a
Fix logging
jmeickle Jun 29, 2018
df76755
Respect task end dates
jmeickle Jun 29, 2018
8b7f158
Support DAG end dates too.
jmeickle Jun 29, 2018
ea7a538
Better document this section
jmeickle Jun 29, 2018
38d4270
Improve comments.
jmeickle Jun 29, 2018
d7dc6c0
Implement expected start/finish SLA checks
jmeickle Jun 29, 2018
f55c95a
Get from db first, create fake second.
jmeickle Jun 29, 2018
866a9f4
Reorder imports
jmeickle Jun 29, 2018
fc40490
Spacing fix.
jmeickle Jun 29, 2018
ee4ebac
Comment fix
jmeickle Jun 29, 2018
a6909b8
type -> sla_type because reserved keyword.
jmeickle Jun 29, 2018
7a53bdc
Add email sub-functions
jmeickle Jun 29, 2018
941fa02
Add URL for task instance details
jmeickle Jul 3, 2018
abb84bf
Refactor email function API
jmeickle Jul 3, 2018
5b6110a
Clean up email function.
jmeickle Jul 3, 2018
bd20baa
Add missing import
jmeickle Jul 3, 2018
aaa26d4
Generalize.
jmeickle Jul 3, 2018
561dbfb
Implement other emails
jmeickle Jul 3, 2018
3b57116
Fix bug
jmeickle Jul 3, 2018
4406c31
Implement snail
jmeickle Jul 3, 2018
91e794b
Syntax and py2
jmeickle Jul 3, 2018
c6149de
Fix syntax error
jmeickle Jul 3, 2018
4d7e97b
Fix missing import
jmeickle Jul 3, 2018
ec02609
Get session in kwargs
jmeickle Jul 3, 2018
f69dbdb
Pass through session in more places.
jmeickle Jul 5, 2018
95881e2
Additional debug logging.
jmeickle Jul 5, 2018
bbe3724
Add a migration for SLA miss type.
jmeickle Jul 5, 2018
e5a0e5a
SQLalchemy syntax fix
jmeickle Jul 5, 2018
d177a81
Get matching ti/task
jmeickle Jul 5, 2018
9238c1f
Iterator based approach to TIs.
jmeickle Jul 6, 2018
8160a80
DagRun based approach to finding SLA TIs
jmeickle Jul 6, 2018
2de9f8a
Fix SQLAlchemy comparison
jmeickle Jul 6, 2018
fabac91
Handle TIs whether they exist yet or not.
jmeickle Jul 6, 2018
757fad6
Fix unit tests for SLAs
jmeickle Jul 6, 2018
0fb7ec1
Don't create new SLA misses if they already exist.
jmeickle Jul 9, 2018
02c27e5
flake8.
jmeickle Jul 9, 2018
21482a5
Remove extraneous backslash
jmeickle Oct 10, 2018
64a6739
Upgrade to warning since this is clearly incorrect
jmeickle Oct 10, 2018
c25496c
Explanatory comment
jmeickle Oct 10, 2018
951d6c4
Comment clarifications.
jmeickle Oct 10, 2018
3c598e8
Only SLA notify when duration is exceeded
jmeickle Oct 10, 2018
3453281
Explanatory comment.
jmeickle Oct 10, 2018
06aa8a4
Add a sqlalchemy not import
jmeickle Oct 10, 2018
5344282
Order by the DAGRun execution date to grab the most recent
jmeickle Oct 10, 2018
99bf058
Fix inverted operator
jmeickle Oct 10, 2018
29d3b0e
Update function documentation
jmeickle Oct 10, 2018
f31bf6b
Enhance function documentation.
jmeickle Oct 10, 2018
77d7abd
Rename function and better describe what it's checking
jmeickle Oct 10, 2018
1d7a78a
Consistently use snails for late SLAs
jmeickle Oct 10, 2018
480acc0
Documentation updates.
jmeickle Oct 10, 2018
0db2513
Increase logging to an exception, since missing SLA creation is a big…
jmeickle Oct 10, 2018
3a46f48
Documentation updates.
jmeickle Oct 12, 2018
6f61441
Improve validation of SLA parameters
jmeickle Oct 12, 2018
8307f56
flake8 no-qa
jmeickle Oct 12, 2018
de8db06
Use six instead of basestring directly.
jmeickle Oct 12, 2018
0f5cf90
Clearer logic for SLA parameters.
jmeickle Oct 17, 2018
2e6c2cb
Improve parameter setting logic
jmeickle Oct 17, 2018
f9eb662
More improvements to param setting logic.
jmeickle Oct 17, 2018
81d160d
Query SQLalchemy state correctly
jmeickle Oct 17, 2018
c317fff
Task instances, not tasks
jmeickle Oct 17, 2018
2fef6cf
Task method, not TI method
jmeickle Oct 17, 2018
d340923
Set Task on each TI
jmeickle Oct 17, 2018
18291f3
Fix template variable.
jmeickle Oct 17, 2018
e391d26
Include the SLA-missed TI as an impacted task
jmeickle Oct 17, 2018
1e52dd3
Add an initial test suite for the SLA utils.
jmeickle Oct 17, 2018
7c552b0
Add some test suite inspired fixes.
jmeickle Oct 17, 2018
5c763f8
Use warnings.warn to raise deprecation messages and add tests
jmeickle Oct 17, 2018
61cc988
Test that invalid args raise an exception in operator construction
jmeickle Oct 17, 2018
ebfe87e
Fix warning logic
jmeickle Oct 17, 2018
0a5daeb
Timestamp is clearer than ts
jmeickle Oct 17, 2018
29dc420
Logging improvements
jmeickle Oct 17, 2018
2c0e52c
Improve logging and exception messages
jmeickle Oct 17, 2018
dc5b60b
Use correct tense
jmeickle Oct 17, 2018
6f18787
Fix copypasta error
jmeickle Oct 17, 2018
b5c5b22
Fix broken query logic and improve readability
jmeickle Oct 17, 2018
a355564
Add explanatory note.
jmeickle Oct 17, 2018
e105750
Move SLA miss test from jobs into models.
jmeickle Oct 17, 2018
2ef0eb9
Yield a fake dagrun
jmeickle Oct 17, 2018
0e9059a
Add a test that addresses checking unscheduled dagruns
jmeickle Oct 17, 2018
14528c1
flake8
jmeickle Oct 17, 2018
9421510
Rename function for clarity.
jmeickle Oct 18, 2018
859d2f9
Skip this logic if it's not needed.
jmeickle Oct 18, 2018
1c8171d
Put a task on the TI
jmeickle Oct 18, 2018
4ba5850
Improve logging and comments
jmeickle Oct 18, 2018
882782e
Commit after each SLA modification
jmeickle Oct 18, 2018
e53478f
Improve string representation of SLA miss
jmeickle Oct 18, 2018
9420383
Test sending SLA notifications.
jmeickle Oct 18, 2018
87530a0
Order by task ID
jmeickle Oct 18, 2018
93a3f30
flake8
jmeickle Oct 18, 2018
6202d99
Catch exceptions when managing SLAs.
jmeickle Oct 18, 2018
440552a
finish rebase (part 1)
Apr 23, 2020
441734f
finish rebase (part 2) fix all tests
Apr 24, 2020
9a6f20b
finish rebase (part 3) remove models.py, jobs.py, and tests
Apr 24, 2020
6e7c946
fix static style checks
Apr 24, 2020
7fa726d
remove schedule job sla test; bump query count in query count test;
Apr 24, 2020
22f0577
add manage_sla tests for schedule job
Apr 24, 2020
670e7c7
remove unused import
Apr 24, 2020
1ab5311
fix static checks
Apr 24, 2020
283a906
import airflow.models to workaround dependency issue
Apr 24, 2020
2b20377
fix pylint
Apr 25, 2020
57e426a
flake8
seanxwzhang Apr 26, 2020
012b35b
pylint ignore circular imports for sla
seanxwzhang Apr 26, 2020
2d10161
fix sla tests in test_core.py
seanxwzhang Apr 27, 2020
45a239e
update alembic revision
seanxwzhang Apr 27, 2020
7aec742
add stats for manage_sla failures
seanxwzhang Apr 30, 2020
b1d7223
fix indentation and baseoperator docstring
seanxwzhang May 1, 2020
dd5522e
add type hint in baseopertor has_sla
seanxwzhang May 1, 2020
e7243d3
add alembic migration script for sla_checked column in dagrun
seanxwzhang May 1, 2020
73352a4
query only dagruns that have sla_checked=False
seanxwzhang May 1, 2020
52aa700
add type hint; use %s in log
seanxwzhang May 1, 2020
dd22a4c
set notification_sent directly on sla_miss
seanxwzhang May 1, 2020
89b1097
avoid creating a separate string in runtime when getting urls
seanxwzhang May 1, 2020
5b22338
rename to yield_uncreated_runs
seanxwzhang May 1, 2020
889de98
remove py2 compatibility bits
seanxwzhang May 1, 2020
a8062c1
use set for existing_sla_types
seanxwzhang May 1, 2020
edbb31b
use TI __repr__ for formatting, delete custom TI describer
seanxwzhang May 1, 2020
3180298
merge sla_checked query to DagRun.find
seanxwzhang May 1, 2020
2db2536
remove unused DagRunType
seanxwzhang May 1, 2020
ef5acd6
fix test_core
seanxwzhang May 1, 2020
32861b7
remove sla_checked to improve performance
seanxwzhang May 3, 2020
950d5c8
apply static checks
seanxwzhang May 3, 2020
bd6d51b
move sla.py to airflow/
seanxwzhang May 3, 2020
b3601eb
move test_sla.py to tests/
seanxwzhang May 3, 2020
e127e8d
fix dataproc import order
seanxwzhang May 3, 2020
e86f264
reorder setup.cfg
seanxwzhang May 3, 2020
68a6c8a
reorder import
seanxwzhang May 3, 2020
2d1880f
only fetch DRS where state != SUCCESS
seanxwzhang May 11, 2020
391a30d
add test for get_latest_run
seanxwzhang May 11, 2020
6701d76
remove unused TaskNotFound
seanxwzhang May 11, 2020
61ab469
adjust QueryCount
seanxwzhang May 11, 2020
e1723bf
remove unused MagicMock
seanxwzhang May 12, 2020
cd3408f
reabse; not tested
seanxwzhang Sep 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 16 additions & 156 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@

from airflow import models, settings
from airflow.configuration import conf
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
from airflow.jobs.base_job import BaseJob
from airflow.models import DAG, DagModel, SlaMiss, errors
from airflow.models import DAG, DagModel, errors
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey
Expand All @@ -52,11 +52,10 @@
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import SCHEDULED_DEPS
from airflow.ti_deps.dependencies_states import EXECUTION_STATES
from airflow.utils import asciiart, helpers, timezone
from airflow.utils import helpers, timezone
from airflow.utils.dag_processing import (
AbstractDagFileProcessorProcess, DagFileProcessorAgent, FailureCallbackRequest, SimpleDagBag,
)
from airflow.utils.email import get_email_address_list, send_email
from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context
from airflow.utils.mixins import MultiprocessingStartMethodMixin
from airflow.utils.session import provide_session
Expand Down Expand Up @@ -359,156 +358,6 @@ def __init__(self, dag_ids: Optional[List[str]], log: logging.Logger):
self.dag_ids = dag_ids
self._log = log

@provide_session
def manage_slas(self, dag: DAG, session: Session = None) -> None:
"""
Finding all tasks that have SLAs defined, and sending alert emails
where needed. New SLA misses are also recorded in the database.

We are assuming that the scheduler runs often, so we only check for
tasks that should have succeeded in the past hour.
"""
if not any([isinstance(ti.sla, timedelta) for ti in dag.tasks]):
self.log.info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
return

qry = (
session
.query(
TI.task_id,
func.max(TI.execution_date).label('max_ti')
)
.with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
.filter(TI.dag_id == dag.dag_id)
.filter(
or_(
TI.state == State.SUCCESS,
TI.state == State.SKIPPED
)
)
.filter(TI.task_id.in_(dag.task_ids))
.group_by(TI.task_id).subquery('sq')
)

max_tis: List[TI] = session.query(TI).filter(
TI.dag_id == dag.dag_id,
TI.task_id == qry.c.task_id,
TI.execution_date == qry.c.max_ti,
).all()

ts = timezone.utcnow()
for ti in max_tis:
task = dag.get_task(ti.task_id)
if not isinstance(task.sla, timedelta):
continue

dttm = dag.following_schedule(ti.execution_date)
while dttm < timezone.utcnow():
following_schedule = dag.following_schedule(dttm)
if following_schedule + task.sla < timezone.utcnow():
session.merge(SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=dttm,
timestamp=ts))
dttm = dag.following_schedule(dttm)
session.commit()

slas: List[SlaMiss] = (
session
.query(SlaMiss)
.filter(SlaMiss.notification_sent == False, SlaMiss.dag_id == dag.dag_id) # noqa pylint: disable=singleton-comparison
.all()
)

if slas: # pylint: disable=too-many-nested-blocks
sla_dates: List[datetime.datetime] = [sla.execution_date for sla in slas]
fetched_tis: List[TI] = (
session
.query(TI)
.filter(
TI.state != State.SUCCESS,
TI.execution_date.in_(sla_dates),
TI.dag_id == dag.dag_id
).all()
)
blocking_tis: List[TI] = []
for ti in fetched_tis:
if ti.task_id in dag.task_ids:
ti.task = dag.get_task(ti.task_id)
blocking_tis.append(ti)
else:
session.delete(ti)
session.commit()

task_list = "\n".join([
sla.task_id + ' on ' + sla.execution_date.isoformat()
for sla in slas])
blocking_task_list = "\n".join([
ti.task_id + ' on ' + ti.execution_date.isoformat()
for ti in blocking_tis])
# Track whether email or any alert notification sent
# We consider email or the alert callback as notifications
email_sent = False
notification_sent = False
if dag.sla_miss_callback:
# Execute the alert callback
self.log.info(' --------------> ABOUT TO CALL SLA MISS CALL BACK ')
try:
dag.sla_miss_callback(dag, task_list, blocking_task_list, slas,
blocking_tis)
notification_sent = True
except Exception: # pylint: disable=broad-except
self.log.exception("Could not call sla_miss_callback for DAG %s",
dag.dag_id)
email_content = """\
Here's a list of tasks that missed their SLAs:
<pre><code>{task_list}\n<code></pre>
Blocking tasks:
<pre><code>{blocking_task_list}\n{bug}<code></pre>
""".format(task_list=task_list, blocking_task_list=blocking_task_list,
bug=asciiart.bug)

tasks_missed_sla = []
for sla in slas:
try:
task = dag.get_task(sla.task_id)
except TaskNotFound:
# task already deleted from DAG, skip it
self.log.warning(
"Task %s doesn't exist in DAG anymore, skipping SLA miss notification.",
sla.task_id)
continue
tasks_missed_sla.append(task)

emails: Set[str] = set()
for task in tasks_missed_sla:
if task.email:
if isinstance(task.email, str):
emails |= set(get_email_address_list(task.email))
elif isinstance(task.email, (list, tuple)):
emails |= set(task.email)
if emails:
try:
send_email(
emails,
"[airflow] SLA miss on DAG=" + dag.dag_id,
email_content)
email_sent = True
notification_sent = True
except Exception: # pylint: disable=broad-except
Stats.incr('sla_email_notification_failure')
self.log.exception("Could not send SLA Miss email notification for"
" DAG %s", dag.dag_id)
# If we sent any notification, update the sla_miss table
if notification_sent:
for sla in slas:
if email_sent:
sla.email_sent = True
sla.notification_sent = True
session.merge(sla)
session.commit()

@staticmethod
def update_import_errors(session: Session, dagbag: DagBag) -> None:
"""
Expand Down Expand Up @@ -733,7 +582,7 @@ def _process_dags(self, dags: List[DAG], session: Session = None) -> List[TaskIn

1. Create appropriate DagRun(s) in the DB.
2. Create appropriate TaskInstance(s) in the DB.
3. Send emails for tasks that have missed SLAs (if CHECK_SLAS config enabled).
3. Create SLA misses and trigger callbacks for late TaskInstances. (if CHECK_SLAS config enabled).

:param dags: the DAGs from the DagBag to process
:type dags: List[airflow.models.DAG]
Expand Down Expand Up @@ -775,7 +624,18 @@ def _process_dags(self, dags: List[DAG], session: Session = None) -> List[TaskIn
if dag_runs_for_dag:
tis_out.extend(self._process_task_instances(dag, dag_runs_for_dag))
if check_slas:
self.manage_slas(dag)
# Look for SLA misses for all task instances associated with this
# DAG, whether or not corresponding runs have been created yet.
# Catch exceptions, though; we don't want SLA checking failures to
# prevent tasks from being scheduled!
try:
dag.manage_slas()
except Exception: # pylint: disable=broad-except
Stats.incr(
'dagrun.sla_manage_failure.{dag_id}'.format(dag_id=dag.dag_id))
self.log.exception(
"DAG %s was unable to successfully manage SLAs; continuing"
" with scheduling rather than raising exception.", dag)

return tis_out

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add SLA missing type

Revision ID: 14d508160edd
Revises: 952da73b5eff
Create Date: 2020-04-27 05:26:46.236705

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = '14d508160edd'
down_revision = '952da73b5eff'
branch_labels = None
depends_on = None

SLA_MISS_TABLE = "sla_miss"
NEW_COLUMN = "sla_type"
OLD_SLA_MISS_TYPE = "task_late_finish"


def upgrade():
op.add_column(SLA_MISS_TABLE,
sa.Column(
NEW_COLUMN,
sa.String(50),
primary_key=True,
nullable=False,
server_default=OLD_SLA_MISS_TYPE
))


def downgrade():
op.drop_column(SLA_MISS_TABLE, NEW_COLUMN)
Loading