You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently the SLA handling in airflow is not as straight forward as we would expect and has known bugs that prevent us from using it as we expect in our production applications. I am aware of the efforts done in this pull request to address the problem but I want to propose a more simple approach.
Use case / motivation
In a current use case for our production workflow we want to be notified of certain tasks that take longer than a certain time to complete. We thought that the SLA feature would fit our needs but when testing it we noticed a behavior that was a bit counter intuitive of what we would expect. According to the useful book Data Pipelines with Apache Ariflow, the SLA actually works as described:
SLAs function somewhat counter-intuitive. While you might expect it to function as a maximum runtime for the given task, it functions as the maximum time difference between the scheduled start of the DAG run and completion of the task.
In fact, we are interested in measuring only the maximum runtime of a task in order to send alerts via a callback. The following proposal aims to simplify the SLA feature as per task instead of seeing it as a global attribute of the DAG or at least to give it an additional option to the user to provide a configuration by task to handle SLAs at that level, in case there is still interest for DAG level SLAs.
I took inspiration on how the task timeout is implemented in TaskInstance, which I believe is pretty straight forward. As a workaround, I implemented a custom operator which inherits from SSHOperator that reflects this approach:
classSlaOperator(SSHOperator):
sla: Optional[timedelta] # Same sla definition as BaseOperator, putting it here just for visibilitylong_run_callback: Optional[Callable[[Dict[Text, Any]], None]]
logger=logging.getLogger("SlaOperator")
@apply_defaultsdef__init__(
self,
sla: Optional[timedelta] =None,
long_run_callback: Optional[Callable[[Dict[Text, Any]], None]] =None,
*args: Any,
**kwargs: Any,
) ->None:
super(SlaOperator, self).__init__(*args, **kwargs)
self.long_run_callback=long_run_callbackself.sla=sladefexecute(self, context: Dict[Text, Any]) ->Optional[Any]:
result=None# If there is an sla defined, do the time tracking of the execution# If not, execute as usual.ifself.slaisnotNone:
seconds=int(self.sla.total_seconds())
# Context handling of the signal alarm.withself.time_limit(seconds, context):
result=super(SlaOperator, self).execute(context=context)
else:
result=super(SlaOperator, self).execute(context=context)
returnresult@contextmanagerdeftime_limit(self, seconds: int, context: Dict[Text, Any]) ->Iterator[None]:
""" Context manager function that helps set the SIGALRM signal in order to monitor the execution time of a function. The long_run_callback is passed as the function that will handle the signal trigger. If the operator is initialized with the callback as None, a simple log message will be executed. """signal.signal(
signal.SIGALRM,
lambdasignum, frame: self.long_run_callback(context)
ifself.long_run_callbackisnotNoneelseself.logger.warning(f"Long running task with context {context}"),
)
signal.alarm(seconds)
try:
yieldfinally:
signal.alarm(0)
No exception is raised, just the optional execution of the long_run_callback which receives the dag run context dictionary. This would be the only difference with the execution timeout implementation. What I would try to achieve is to leave this logic at the TaskInstance and handle it a similar way as the execution timeout. In particular, registering the sla misses in database are not of interest to us from the use case point of view but I am not against of complying to that requirement if the team deems it necessary.
If this sounds like a reasonable approach I am more than happy to contribute with a PR and propose a concrete implementation of the feature.
Related Issues
AIRFLOW-249 Refactor the SLA mechanism AIRFLOW-1360 SLA miss goes to all emails in DAG AIRFLOW-2236 Airflow SLA is triggered for all backfilled tasks AIRFLOW-557 SLA notification does not work for manually triggered DAGs AIRFLOW-133 SLAs don't seem to work with schedule_interval=None AIRFLOW-1013 airflow/jobs.py:manage_slas() exception for @once dag
The text was updated successfully, but these errors were encountered:
Description
Currently the SLA handling in airflow is not as straight forward as we would expect and has known bugs that prevent us from using it as we expect in our production applications. I am aware of the efforts done in this pull request to address the problem but I want to propose a more simple approach.
Use case / motivation
In a current use case for our production workflow we want to be notified of certain tasks that take longer than a certain time to complete. We thought that the SLA feature would fit our needs but when testing it we noticed a behavior that was a bit counter intuitive of what we would expect. According to the useful book Data Pipelines with Apache Ariflow, the SLA actually works as described:
In fact, we are interested in measuring only the maximum runtime of a task in order to send alerts via a callback. The following proposal aims to simplify the SLA feature as per task instead of seeing it as a global attribute of the DAG or at least to give it an additional option to the user to provide a configuration by task to handle SLAs at that level, in case there is still interest for DAG level SLAs.
I took inspiration on how the task timeout is implemented in TaskInstance, which I believe is pretty straight forward. As a workaround, I implemented a custom operator which inherits from SSHOperator that reflects this approach:
No exception is raised, just the optional execution of the
long_run_callback
which receives the dag run context dictionary. This would be the only difference with the execution timeout implementation. What I would try to achieve is to leave this logic at theTaskInstance
and handle it a similar way as the execution timeout. In particular, registering the sla misses in database are not of interest to us from the use case point of view but I am not against of complying to that requirement if the team deems it necessary.If this sounds like a reasonable approach I am more than happy to contribute with a PR and propose a concrete implementation of the feature.
Related Issues
AIRFLOW-249 Refactor the SLA mechanism
AIRFLOW-1360 SLA miss goes to all emails in DAG
AIRFLOW-2236 Airflow SLA is triggered for all backfilled tasks
AIRFLOW-557 SLA notification does not work for manually triggered DAGs
AIRFLOW-133 SLAs don't seem to work with schedule_interval=None
AIRFLOW-1013 airflow/jobs.py:manage_slas() exception for @once dag
The text was updated successfully, but these errors were encountered: