Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 ) #8545

Open
wants to merge 150 commits into
base: main
Choose a base branch
from

Conversation

seanxwzhang
Copy link

@seanxwzhang seanxwzhang commented Apr 24, 2020

This PR tries to land #3584. Most changes are from @Eronarn, I rebased on master and added a few tests.

As per @Eronarn :

JIRA

AIRFLOW-249 Refactor the SLA mechanism

This is a fairly large patch that should and/or may also address:

AIRFLOW-1360 SLA miss goes to all emails in DAG
AIRFLOW-2236 Airflow SLA is triggered for all backfilled tasks
AIRFLOW-557 SLA notification does not work for manually triggered DAGs
AIRFLOW-133 SLAs don't seem to work with schedule_interval=None
AIRFLOW-1013 airflow/jobs.py:manage_slas() exception for @once dag

Description

At Quantopian we use Airflow to produce artifacts based on the previous day's stock market data. These artifacts are required for us to trade on today's stock market. Therefore, I've been investing time in improving Airflow notifications (such as writing PagerDuty and Slack integrations). My attention has turned to Airflow's SLA system, which has some drawbacks for our use case:

Defining SLAs can be awkward because they are relative to the execution date instead of the task start time. There's no way to alert if a task runs for "more than an hour", for any non-trivial DAG. Instead you can only express "more than an hour from execution date". The financial data we use varies in when it arrives, and how long it takes to process (data volume changes frequently). We also run DAGs with mixed UTC and Eastern events, making Airflow SLA definitions depend on time of year.

Execution timeouts can capture "more than an hour" but do not serve the same purpose as SLAs. We have tight timelines on long-duration tasks that make retries difficult, so we want to alert an operator while leaving the original task running, rather than failing and then alerting.

The way that SLA miss callbacks are defined is not intuitive, as in contrast to other callbacks, they are defined on the DAG rather than on the task. This has lead to a lot of confusion in JIRA/the mailing list; many people think that SLAs are for DAG completion, when in reality it's a DAG-level attribute that handles batched task-level completions. Also, the call signature is poorly defined: for instance, two of the arguments are just strings produced from the other two arguments.

SLA miss emails don't include any links back to the Airflow instance (important for us because we run the same DAGs in both staging/production) or the execution date they apply to. When opened, they be hard to read for even a moderately sized DAG because they include a flat list of task instances that are unsorted (neither alpha nor topo).

SLA miss emails are sent to every email address associated with the DAG. This can lead to inadvertent paging of users associated with unrelated "forks" in the DAG from where the SLA miss failed.

SLA emails are not callbacks, and can't be turned off (other than either removing the SLA or removing the email attribute on the task instance).

This patch attempts to address the above issues by making some of the following changes:

The sla= parameter is split into:
expected_start: Timedelta after execution date, representing when this task must have started by.
expected_finish: Timedelta after execution date, representing when this task must have finished by.
expected_duration: Timedelta after task start, representing how long this task is expected to run, including all retries.
These parameters are set on a task (or DAG-level default args), and a task can have any combination of them, though there is some basic validation logic to warn you if you try to set an illogical combination. The SlaMiss object stores the type of SLA miss as a new database field, which is a component of the primary key.

There is logic to convert the existing sla= parameter to expected_finish (as well as a migration), since that's the closest parallel, so it should be relatively backwards compatible.

SLA misses are no longer grouped for checks or callbacks. While there have always been independent per-task SlaMiss objects, there was a lot of logic to poll for all SlaMisses that occurred at the same time, and to batch them into a single email.

As a consequence of 2), The sla_miss_callback is no longer set on the DAG level, which has been confusing. It now has a context-based signature to be consistent with other task callbacks. This change is not backwards compatible for anyone using custom SLA miss callbacks, but should be a fairly straightforward conversion.

The SLA miss email is now the default SLA miss callback on tasks. Previously it was an additional non-overrideable feature.

The SLA miss email has some improvements:

  1. Only one SLA miss per email
  2. SLA-miss-specific title
  3. Includes a link to the task instance
  4. Only includes potentially-blocked downstreams
  5. Sends to a list of "interested subscribers", which is defined as all email addresses on tasks downstream of the task that missed its SLA.
  6. Additional ASCII art to help distinguish emails.
  7. Move the SLA miss code largely out of the Scheduler code: some into models (DAGs manage their own SlaMiss objects), and some into SLA helper functions.

Overall, attempt to reduce the complexity and lack of documentation of the SLA miss logic, given the constraint that the new implementation is a larger feature and more lines of code. The previous implementation was stuffed into one overloaded function that is responsible for checking for SLA misses, creating database objects for them, filtering tasks, selecting emails, rendering, and sending. These are now broken into multiple functions, which attempt to be more single-purpose.


Make sure to mark the boxes below before creating PR: [x]

  • Description above provides context of the change
  • Unit tests coverage for changes (not needed for documentation changes)
  • Target Github ISSUE in description if exists
  • Commits follow "How to write a good git commit message"
  • Relevant documentation is updated including usage instructions.
  • I will engage committers as explained in Contribution Workflow Example.

In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Apr 24, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 24, 2020

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, pylint and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: [email protected]
    Slack: https://apache-airflow-slack.herokuapp.com/

@seanxwzhang seanxwzhang force-pushed the xzhang/airflow_sla_refactoring branch from 223783d to 261601a Compare April 24, 2020 17:47
@seanxwzhang seanxwzhang changed the title Xzhang/airflow sla refactoring [AIRFLOW-249] Refactor the SLA mechanism (Continuation from #3584 ) Apr 24, 2020
@seanxwzhang seanxwzhang force-pushed the xzhang/airflow_sla_refactoring branch 4 times, most recently from 064de0a to 62d8abd Compare April 27, 2020 06:36
@kaxil kaxil requested a review from BasPH April 27, 2020 14:38
@seanxwzhang seanxwzhang marked this pull request as ready for review April 27, 2020 15:35
@jmeickle
Copy link
Contributor

Thank you for taking this on. I couldn't justify any more time on it but I think it's still very relevant to the project.

@seanxwzhang
Copy link
Author

Thank you for taking this on. I couldn't justify any more time on it but I think it's still very relevant to the project.

Happy to contribute :)

Copy link
Contributor

@BasPH BasPH left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A very big change, lots to take in. Made a first round of comments.

Copy link
Author

@seanxwzhang seanxwzhang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reviews @BasPH @houqp Will address them soon.

@seanxwzhang seanxwzhang force-pushed the xzhang/airflow_sla_refactoring branch 7 times, most recently from a5e3757 to 819c445 Compare May 4, 2020 05:02
@seanxwzhang
Copy link
Author

Addressed most, if not all comments in the previous round of review. Played a bit with two cases regarding how we fetch DagRuns for SLA consideration:

  1. Use a fixed number (e.g., 100) for fetching DRs
  2. Add an sla_checked column to DR and use it to filter out DRs that have already been checked.

My conclusion is that option 1 is a better trade-off, because one has to go through all TIs in a DagRun to determine if a DR can be free from further checking (e.g., if a DR has 10 TIs, then each TI has to checked for all possible SLA violations before the DR is sla_checked). This is not a cheap operation since a single TI could have 3 SLAs, hence the additional computation and IO could easily outweigh the benefit of filtering out sla_checked DRs.

@houqp
Copy link
Member

houqp commented May 7, 2020

My conclusion is that option 1 is a better trade-off, because one has to go through all TIs in a DagRun to determine if a DR can be free from further checking (e.g., if a DR has 10 TIs, then each TI has to checked for all possible SLA violations before the DR is sla_checked). This is not a cheap operation since a single TI could have 3 SLAs, hence the additional computation and IO could easily outweigh the benefit of filtering out sla_checked DRs.

Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns that need to be checked than the preset limit, some of them will be ignored?

With regards to performance comparison between option 1 and option 2, aren't we already checking all the TIs for the 100 fetched dag runs in option 1?

@seanxwzhang
Copy link
Author

Option 1 doesn't guarantee correctness right? i.e. if there are more dagruns that need to be checked than the preset limit, some of them will be ignored?

True. I guess the way to do it (if no addition column is added) would be to remove the the fixed count, and simply do

scheduled_dagruns = (
            session.query(DR)
            .filter(DR.dag_id == self.dag_id)
            .filter(DR.run_id.notlike(f"{DagRunType.BACKFILL_JOB.value}__%"))
            .filter(DR.external_trigger == False)
            .filter(DR.state != State.SUCCESS)
            .order_by(desc(DR.execution_date))
            .all()
        )

This way we only get DR that have yet to succeed (since we made an assumption that successful DRs are free from SLA check).

With regards to performance comparison between option 1 and option 2, aren't we already checking all the TIs for the 100 fetched dag runs in option 1?

We are checking whether these TIs are violating SLAs, not whether these TIs are free from SLAs, those are different checks (e.g., to check if a TI violates expected_duration, we compare the current duration with the SLA; to check if a TI is free from SLA violations, we assert on that the TI has finished within the expected_duration). To do so would require us adding another column to TI as well.

I'm slightly inclined towards option 1 (probably need to remove the 100 fixed limit), but definitely open to other opinions. :)

@houqp
Copy link
Member

houqp commented May 10, 2020

If we can add .filter(DR.state != State.SUCCESS) to the query filter list, then I am also in favor of option 1 (without fixed 100 limit of course). It's simpler than 2 and shouldn't run into performance issue for majority of the use-cases :)

@tchow-notion
Copy link

hi @seanxwzhang any updates on this patch?

@seanxwzhang
Copy link
Author

seanxwzhang commented Sep 20, 2022

hi @seanxwzhang any updates on this patch?

Unfortunately, I won't be able to continue working on this patch, happy to hand it over to others.

@auvipy
Copy link
Contributor

auvipy commented Jan 23, 2023

to take it over where should some one focus? fixing conflicts first?

@potiuk
Copy link
Member

potiuk commented Jan 23, 2023

to take it over where should some one focus? fixing conflicts first

I guess with how far behind this change is and how many conflicts it has, starting from the scratch following the ideas here is far better idea.

@auvipy auvipy self-assigned this Jan 24, 2023
@sungwy
Copy link
Contributor

sungwy commented Jan 26, 2023

@auvipy are you currently working on this PR? If not, I'm happy to take the ideas and open up a new one that's works out of the DagFileProcessor/DagFileProcessingManager

@auvipy
Copy link
Contributor

auvipy commented Jan 27, 2023

@auvipy are you currently working on this PR? If not, I'm happy to take the ideas and open up a new one that's works out of the DagFileProcessor/DagFileProcessingManager

please go ahead. feel free to ping me for review

@auvipy auvipy removed their assignment Jan 27, 2023
@sungwy
Copy link
Contributor

sungwy commented Mar 30, 2023

Hi everyone, I've spent a lot of time collecting all reported concerns that the community has had regarding SLAs to date. After much deliberation, I've reached the conclusion that we might be better off defining the Airflow-native SLA feature only at the DAG level, where it can be supported to users' expectations, and leave the task-level SLA definition to the users. There are three main reasons to why I think task-level SLAs should be implemented by the users instead of by Airflow.

  1. Today, users have the ability to monitor Task-level SLAs through the use of Deferrable Operators and Asynchronous DateTimeTriggers (and Task groups to organize these tasks on the UI).
  2. Reliably tracking task-level SLAs when the task actually misses the SLA (instead of only after the task succeeds) is only possible at the expense of overloading the work of the scheduler - which is not ideal because task-level SLA detection is not the primary function of a scheduler, and it wouldn't be beneficial for Airflow users to compromise the scheduler in any way.
  3. Some users want to customize the way they monitor the Task-level SLAs. Some want to use different definitions of the timedelta (timedelta from dagrun start versus from task start), some want to detect task SLA misses multiple times (different levels of warning for delays), and some users want to detect the SLA miss only if the target task is in a certain state (unfinished state - RUNNING, finished state- SUCCESS/SKIPPED)

In contrast, I believe DAG-level SLA will strictly be a positive feature. It will increase the general reliability of Airflow DAGs and even be able to alert us on job delays when undefined behaviors happen, all without negatively impacting the performance of the scheduler.

If you have been interested in the SLA mechanism, or have been actively using the current version of the SLA mechanism, I would love to get your feedback on this proposal. I would love to work with you to try to come up with an SLA solution that meets user expectations!

Airflow Improvement Proposal

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler pinned Protect from Stalebot auto closing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants