From 5975c53ba0f69ffb7bfa040a7fee6584eecb26fa Mon Sep 17 00:00:00 2001 From: zdevito Date: Tue, 21 Oct 2025 11:59:06 -0700 Subject: [PATCH] order async messages, provide workaround flag Differential Revision: [D85172238](https://our.internmc.facebook.com/intern/diff/D85172238/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85172238/)! [ghstack-poisoned] --- monarch_hyperactor/src/actor.rs | 33 ++++++++++++++++--------- monarch_hyperactor/src/config.rs | 3 +++ python/monarch/_src/actor/actor_mesh.py | 11 +++++++-- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/monarch_hyperactor/src/actor.rs b/monarch_hyperactor/src/actor.rs index ab07af6a7..3b1534482 100644 --- a/monarch_hyperactor/src/actor.rs +++ b/monarch_hyperactor/src/actor.rs @@ -54,6 +54,7 @@ use tokio::sync::oneshot; use tracing::Instrument; use crate::buffers::FrozenBuffer; +use crate::config::MONARCH_OLD_ASYNC_WORKAROUND; use crate::config::SHARED_ASYNCIO_RUNTIME; use crate::context::PyInstance; use crate::local_state_broker::BrokerId; @@ -484,6 +485,9 @@ pub struct PythonActor { /// so that we can store information from the Init (spawn rank, controller controller) /// and provide it to other calls instance: Option>, + + /// Cached config value for old async workaround + old_async_workaround: bool, } impl PythonActor { @@ -559,6 +563,7 @@ impl Actor for PythonActor { panic_watcher: UnhandledErrorObserver::ForwardTo(rx), panic_sender: tx, instance: None, + old_async_workaround: hyperactor::config::global::get(MONARCH_OLD_ASYNC_WORKAROUND), }) })?) } @@ -809,19 +814,23 @@ impl Handler for PythonActor { .map_err(|err| err.into()) })?; - // Spawn a child actor to await the Python handler method. - tokio::spawn( - handle_async_endpoint_panic( - self.panic_sender.clone(), - PythonTask::new(future), - receiver, - ) - .instrument( - tracing::info_span!("py_panic_handler") - .follows_from(tracing::Span::current().id()) - .clone(), - ), + let future = handle_async_endpoint_panic( + self.panic_sender.clone(), + PythonTask::new(future), + receiver, + ) + .instrument( + tracing::info_span!("py_panic_handler") + .follows_from(tracing::Span::current().id()) + .clone(), ); + if self.old_async_workaround { + // Spawn a child actor to await the Python handler method. + tokio::spawn(future); + } else { + // things happen in order + future.await; + } Ok(()) } } diff --git a/monarch_hyperactor/src/config.rs b/monarch_hyperactor/src/config.rs index f3215c35f..def224a39 100644 --- a/monarch_hyperactor/src/config.rs +++ b/monarch_hyperactor/src/config.rs @@ -35,6 +35,9 @@ use crate::channel::PyChannelTransport; declare_attrs! { /// Use a single asyncio runtime for all Python actors, rather than one per actor pub attr SHARED_ASYNCIO_RUNTIME: bool = false; + + /// Use old async workaround that spawns Python handler methods in tokio tasks + pub attr MONARCH_OLD_ASYNC_WORKAROUND: bool = false; } /// Python API for configuration management diff --git a/python/monarch/_src/actor/actor_mesh.py b/python/monarch/_src/actor/actor_mesh.py index 43c3465ba..8bdef3ce8 100644 --- a/python/monarch/_src/actor/actor_mesh.py +++ b/python/monarch/_src/actor/actor_mesh.py @@ -13,10 +13,12 @@ import inspect import itertools import logging + +import os import threading from abc import abstractproperty - from dataclasses import dataclass +from functools import cache from pprint import pformat from textwrap import indent from traceback import TracebackException @@ -213,6 +215,11 @@ def _root_client_context() -> "Context": ... T = TypeVar("T") +@cache +def _old_async_workaround(): + return os.environ.get("MONARCH_OLD_ASYNC_WORKAROUND", "0") != "0" + + class _Lazy(Generic[T]): def __init__(self, init: Callable[[], T]) -> None: self._lock = threading.Lock() @@ -1141,7 +1148,7 @@ def __init__( else: sync_endpoints.append(attr_name) - if sync_endpoints and async_endpoints: + if (sync_endpoints and async_endpoints) or _old_async_workaround(): raise ValueError( f"{self._class} mixes both async and sync endpoints." "Synchronous endpoints cannot be mixed with async endpoints because they can cause the asyncio loop to deadlock if they wait."