Skip to content

Commit 9b48962

Browse files
mariusaemeta-codesync[bot]
authored andcommitted
don't abort in-progress proc state checks (#1670)
Summary: Pull Request resolved: #1670 These are not currently cancellation safe because dropping the rx will cause messages to be undelivered, supervision to kick in, and the job to fail. Instead, we thread through a cancellation token so that we can safely cancel these tasks, resolving the race. ghstack-source-id: 318994668 Reviewed By: dulinriley Differential Revision: D85577198 fbshipit-source-id: dcf3d5f91cd95c5f783131ed9511eda1db045efc
1 parent ac1d6fb commit 9b48962

File tree

2 files changed

+16
-6
lines changed

2 files changed

+16
-6
lines changed

monarch_hyperactor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
4848
tempfile = "3.22"
4949
thiserror = "2.0.12"
5050
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
51+
tokio-util = { version = "0.7.15", features = ["full"] }
5152
tracing = { version = "0.1.41", features = ["attributes", "valuable"] }
5253

5354
[dev-dependencies]

monarch_hyperactor/src/v1/actor_mesh.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use pyo3::prelude::*;
4646
use pyo3::types::PyBytes;
4747
use tokio::sync::watch;
4848
use tokio::task::JoinHandle;
49+
use tokio_util::sync::CancellationToken;
4950

5051
use crate::actor::PythonActor;
5152
use crate::actor::PythonMessage;
@@ -90,15 +91,15 @@ impl RootHealthState {
9091

9192
#[derive(Debug)]
9293
struct SupervisionMonitor {
93-
task: JoinHandle<()>,
94+
cancel: CancellationToken,
9495
receiver: watch::Receiver<Option<PyErr>>,
9596
}
9697

9798
impl Drop for SupervisionMonitor {
9899
fn drop(&mut self) {
99100
// The task is continuously polling for events on this mesh, but when
100101
// the mesh is no longer available we can stop querying it.
101-
self.task.abort();
102+
self.cancel.cancel();
102103
}
103104
}
104105

@@ -306,6 +307,8 @@ impl PythonActorMeshImpl {
306307
// not share the health state. This is fine because requerying a slice
307308
// of a mesh will still return any failed state.
308309
let (sender, receiver) = watch::channel(None);
310+
let cancel = CancellationToken::new();
311+
let canceled = cancel.clone();
309312
let task = get_tokio_runtime().spawn(async move {
310313
// 3 seconds is chosen to not penalize short-lived successful calls,
311314
// while still able to catch issues before they look like a hang or timeout.
@@ -322,7 +325,8 @@ impl PythonActorMeshImpl {
322325
unhandled,
323326
health_state,
324327
time_between_checks,
325-
sender.clone(),
328+
sender,
329+
canceled,
326330
)
327331
.await;
328332
}
@@ -342,13 +346,14 @@ impl PythonActorMeshImpl {
342346
unhandled,
343347
health_state,
344348
time_between_checks,
345-
sender.clone(),
349+
sender,
350+
canceled,
346351
)
347352
.await;
348353
}
349354
};
350355
});
351-
SupervisionMonitor { task, receiver }
356+
SupervisionMonitor { cancel, receiver }
352357
}
353358
}
354359

@@ -466,6 +471,7 @@ async fn actor_states_monitor<A, F>(
466471
health_state: Arc<RootHealthState>,
467472
time_between_checks: tokio::time::Duration,
468473
sender: watch::Sender<Option<PyErr>>,
474+
canceled: CancellationToken,
469475
) where
470476
A: Actor + RemotableActor + Referable,
471477
A::Params: RemoteMessage,
@@ -479,7 +485,10 @@ async fn actor_states_monitor<A, F>(
479485
let mut existing_states: HashMap<Point, resource::State<ActorState>> = HashMap::new();
480486
loop {
481487
// Wait in between checking to avoid using too much network.
482-
RealClock.sleep(time_between_checks).await;
488+
tokio::select! {
489+
_ = RealClock.sleep(time_between_checks) => (),
490+
_ = canceled.cancelled() => break,
491+
}
483492
// First check if the proc mesh is dead before trying to query their agents.
484493
let proc_states = mesh.proc_mesh().proc_states(cx).await;
485494
if let Err(e) = proc_states {

0 commit comments

Comments
 (0)