Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions monarch_hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use tokio::sync::oneshot;
use tracing::Instrument;

use crate::buffers::FrozenBuffer;
use crate::config::MONARCH_OLD_ASYNC_WORKAROUND;
use crate::config::SHARED_ASYNCIO_RUNTIME;
use crate::context::PyInstance;
use crate::local_state_broker::BrokerId;
Expand Down Expand Up @@ -484,6 +485,9 @@ pub struct PythonActor {
/// so that we can store information from the Init (spawn rank, controller controller)
/// and provide it to other calls
instance: Option<Py<crate::context::PyInstance>>,

/// Cached config value for old async workaround
old_async_workaround: bool,
}

impl PythonActor {
Expand Down Expand Up @@ -559,6 +563,7 @@ impl Actor for PythonActor {
panic_watcher: UnhandledErrorObserver::ForwardTo(rx),
panic_sender: tx,
instance: None,
old_async_workaround: hyperactor::config::global::get(MONARCH_OLD_ASYNC_WORKAROUND),
})
})?)
}
Expand Down Expand Up @@ -809,19 +814,23 @@ impl Handler<PythonMessage> for PythonActor {
.map_err(|err| err.into())
})?;

// Spawn a child actor to await the Python handler method.
tokio::spawn(
handle_async_endpoint_panic(
self.panic_sender.clone(),
PythonTask::new(future),
receiver,
)
.instrument(
tracing::info_span!("py_panic_handler")
.follows_from(tracing::Span::current().id())
.clone(),
),
let future = handle_async_endpoint_panic(
self.panic_sender.clone(),
PythonTask::new(future),
receiver,
)
.instrument(
tracing::info_span!("py_panic_handler")
.follows_from(tracing::Span::current().id())
.clone(),
);
if self.old_async_workaround {
// Spawn a child actor to await the Python handler method.
tokio::spawn(future);
} else {
// things happen in order
future.await;
}
Ok(())
}
}
Expand Down
3 changes: 3 additions & 0 deletions monarch_hyperactor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use crate::channel::PyChannelTransport;
declare_attrs! {
/// Use a single asyncio runtime for all Python actors, rather than one per actor
pub attr SHARED_ASYNCIO_RUNTIME: bool = false;

/// Use old async workaround that spawns Python handler methods in tokio tasks
pub attr MONARCH_OLD_ASYNC_WORKAROUND: bool = false;
}

/// Python API for configuration management
Expand Down
11 changes: 9 additions & 2 deletions python/monarch/_src/actor/actor_mesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
import inspect
import itertools
import logging

import os
import threading
from abc import abstractproperty

from dataclasses import dataclass
from functools import cache
from pprint import pformat
from textwrap import indent
from traceback import TracebackException
Expand Down Expand Up @@ -213,6 +215,11 @@ def _root_client_context() -> "Context": ...
T = TypeVar("T")


@cache
def _old_async_workaround():
return os.environ.get("MONARCH_OLD_ASYNC_WORKAROUND", "0") != "0"


class _Lazy(Generic[T]):
def __init__(self, init: Callable[[], T]) -> None:
self._lock = threading.Lock()
Expand Down Expand Up @@ -1141,7 +1148,7 @@ def __init__(
else:
sync_endpoints.append(attr_name)

if sync_endpoints and async_endpoints:
if (sync_endpoints and async_endpoints) or _old_async_workaround():
raise ValueError(
f"{self._class} mixes both async and sync endpoints."
"Synchronous endpoints cannot be mixed with async endpoints because they can cause the asyncio loop to deadlock if they wait."
Expand Down
Loading