-
Notifications
You must be signed in to change notification settings - Fork 14.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Swap CeleryExecutor over to use TaskSDK for execution.
Some points of note about this PR: - Logging is changed in Celery, but only for Airflow 3 Celery does it's own "capture stdout" logging, which conflicts with the ones we do in the TaskSDK, so we disable that; but to not change anything for Airflow 3. - Simplify task SDK logging redirection As part of this discovery that Celery captures stdout/stderr itself (and before disabling that) I discovered a simpler way to re-open the stdin/out/err so that the implementation needs fewer/no special casing. - Make JSON task logs more readable by giving them a consistent/useful order We re-order (by re-creating) the event_dict so that timestamp, level, and then even are always the first items in the dict - Makes the CeleryExecutor understand the concept of "workloads" instead a command tuple. This change isn't done in the best way, but until Kube executor is swapped over (and possibly the other in-tree executors, such as ECS) we need to support both styles concurrently. The change should be done in such a way that the provider still works with Airflow v2, if it's running on that version. - Upgrade Celery This turned out to not be 100% necessary but it does fix some deprecation warnings when running on Python 3.12 - Ensure that the forked process in TaskSDK _never ever_ exits Again, this isn't possible usually, but since the setup step of `_fork_main` died, it didn't call `os._exit()`, and was caught further up, which meant the process stayed alive as it never closed the sockets properly. We put and extra safety try/except block in place to catch that I have not yet included a newsfragment for changing the executor interface as the old style is _currently_ still supported.
- Loading branch information
Showing
10 changed files
with
201 additions
and
79 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.