Skip to content

Commit f0280b1

Browse files
committed
order async messages, provide workaround flag
Differential Revision: [D85172238](https://our.internmc.facebook.com/intern/diff/D85172238/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85172238/)! ghstack-source-id: 317722005 Pull Request resolved: #1631
1 parent 45a646e commit f0280b1

File tree

3 files changed

+33
-14
lines changed

3 files changed

+33
-14
lines changed

monarch_hyperactor/src/actor.rs

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use tokio::sync::oneshot;
5454
use tracing::Instrument;
5555

5656
use crate::buffers::FrozenBuffer;
57+
use crate::config::MONARCH_OLD_ASYNC_WORKAROUND;
5758
use crate::config::SHARED_ASYNCIO_RUNTIME;
5859
use crate::context::PyInstance;
5960
use crate::local_state_broker::BrokerId;
@@ -484,6 +485,9 @@ pub struct PythonActor {
484485
/// so that we can store information from the Init (spawn rank, controller controller)
485486
/// and provide it to other calls
486487
instance: Option<Py<crate::context::PyInstance>>,
488+
489+
/// Cached config value for old async workaround
490+
old_async_workaround: bool,
487491
}
488492

489493
impl PythonActor {
@@ -559,6 +563,7 @@ impl Actor for PythonActor {
559563
panic_watcher: UnhandledErrorObserver::ForwardTo(rx),
560564
panic_sender: tx,
561565
instance: None,
566+
old_async_workaround: hyperactor::config::global::get(MONARCH_OLD_ASYNC_WORKAROUND),
562567
})
563568
})?)
564569
}
@@ -809,19 +814,23 @@ impl Handler<PythonMessage> for PythonActor {
809814
.map_err(|err| err.into())
810815
})?;
811816

812-
// Spawn a child actor to await the Python handler method.
813-
tokio::spawn(
814-
handle_async_endpoint_panic(
815-
self.panic_sender.clone(),
816-
PythonTask::new(future),
817-
receiver,
818-
)
819-
.instrument(
820-
tracing::info_span!("py_panic_handler")
821-
.follows_from(tracing::Span::current().id())
822-
.clone(),
823-
),
817+
let future = handle_async_endpoint_panic(
818+
self.panic_sender.clone(),
819+
PythonTask::new(future),
820+
receiver,
821+
)
822+
.instrument(
823+
tracing::info_span!("py_panic_handler")
824+
.follows_from(tracing::Span::current().id())
825+
.clone(),
824826
);
827+
if self.old_async_workaround {
828+
// Spawn a child actor to await the Python handler method.
829+
tokio::spawn(future);
830+
} else {
831+
// things happen in order
832+
future.await;
833+
}
825834
Ok(())
826835
}
827836
}

monarch_hyperactor/src/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ use crate::channel::PyChannelTransport;
3535
declare_attrs! {
3636
/// Use a single asyncio runtime for all Python actors, rather than one per actor
3737
pub attr SHARED_ASYNCIO_RUNTIME: bool = false;
38+
39+
/// Use old async workaround that spawns Python handler methods in tokio tasks
40+
pub attr MONARCH_OLD_ASYNC_WORKAROUND: bool = false;
3841
}
3942

4043
/// Python API for configuration management

python/monarch/_src/actor/actor_mesh.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
import inspect
1414
import itertools
1515
import logging
16+
17+
import os
1618
import threading
1719
from abc import abstractproperty
18-
1920
from dataclasses import dataclass
21+
from functools import cache
2022
from pprint import pformat
2123
from textwrap import indent
2224
from traceback import TracebackException
@@ -213,6 +215,11 @@ def _root_client_context() -> "Context": ...
213215
T = TypeVar("T")
214216

215217

218+
@cache
219+
def _old_async_workaround():
220+
return os.environ.get("MONARCH_OLD_ASYNC_WORKAROUND", "0") != "0"
221+
222+
216223
class _Lazy(Generic[T]):
217224
def __init__(self, init: Callable[[], T]) -> None:
218225
self._lock = threading.Lock()
@@ -1141,7 +1148,7 @@ def __init__(
11411148
else:
11421149
sync_endpoints.append(attr_name)
11431150

1144-
if sync_endpoints and async_endpoints:
1151+
if (sync_endpoints and async_endpoints) or _old_async_workaround():
11451152
raise ValueError(
11461153
f"{self._class} mixes both async and sync endpoints."
11471154
"Synchronous endpoints cannot be mixed with async endpoints because they can cause the asyncio loop to deadlock if they wait."

0 commit comments

Comments
 (0)