Skip to content

Commit 1a3442a

Browse files
dulinrileymeta-codesync[bot]
authored andcommitted
Fix assert with undeliverable message from comm actor (#1606)
Summary: Pull Request resolved: #1606 Fix a panic in PythonActor::handle_undeliverable_message when the "sender" is the comm actor. We need to update the sender back to the original "self" actor by using the headers set in the envelope. After this fix, instead of a panic we get a supervision error: ``` SupervisionError( Actor ...wrapper_1xYczTZiTdb1[0] exited because of the following reason: <PyActorSupervisionEvent: ...wrapper_1xYczTZiTdb1[0]: failed: serving ...wrapper_1xYczTZiTdb1[0]: processing error: a message from ...wrapper_1xYczTZiTdb1[0] to ...fail_1am38hE5fnus[0] was undeliverable and returned: Some("send error: channel closed; multicast error: comm actor comm_1JgvjFbdpnUf[0] failed to deliver the cast message to the dest actor; return to its original sender's port ...wrapper_1xYczTZiTdb1[0] ) ``` Not very terse, but better than a panic! This also allows any custom override of handle_undeliverable_message to work. Reviewed By: pablorfb-meta Differential Revision: D84952942 fbshipit-source-id: b2cb36600ca89a03e7a85cfccd46ce6bcf2487cf
1 parent 5531f59 commit 1a3442a

File tree

2 files changed

+67
-2
lines changed

2 files changed

+67
-2
lines changed

hyperactor/src/mailbox.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,13 @@ impl MessageEnvelope {
324324
self.errors.push(error)
325325
}
326326

327+
/// Change the sender on the envelope in case it was set incorrectly. This
328+
/// should only be used by CommActor since it is forwarding from another
329+
/// sender.
330+
pub fn update_sender(&mut self, sender: ActorId) {
331+
self.sender = sender;
332+
}
333+
327334
/// The message has been determined to be undeliverable with the
328335
/// provided error. Mark the envelope with the error and return to
329336
/// sender.

monarch_hyperactor/src/actor.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,17 @@ use hyperactor::Handler;
2020
use hyperactor::Instance;
2121
use hyperactor::Named;
2222
use hyperactor::OncePortHandle;
23+
use hyperactor::ProcId;
2324
use hyperactor::mailbox::MessageEnvelope;
2425
use hyperactor::mailbox::Undeliverable;
2526
use hyperactor::message::Bind;
2627
use hyperactor::message::Bindings;
2728
use hyperactor::message::Unbind;
29+
use hyperactor::reference::WorldId;
30+
use hyperactor_mesh::actor_mesh::CAST_ACTOR_MESH_ID;
31+
use hyperactor_mesh::comm::multicast::CAST_ORIGINATING_SENDER;
2832
use hyperactor_mesh::comm::multicast::CastInfo;
33+
use hyperactor_mesh::reference::ActorMeshId;
2934
use monarch_types::PickledPyObject;
3035
use monarch_types::SerializablePyErr;
3136
use pyo3::IntoPyObjectExt;
@@ -493,6 +498,47 @@ impl PythonActor {
493498
}
494499
}
495500

501+
/// An undeliverable might have its sender address set as the comm actor instead
502+
/// of the original sender. Update it based on the headers present in the message
503+
/// so it matches the sender.
504+
fn update_undeliverable_envelope_for_casting(
505+
mut envelope: Undeliverable<MessageEnvelope>,
506+
) -> Undeliverable<MessageEnvelope> {
507+
let old_actor = envelope.0.sender().clone();
508+
// v1 casting
509+
if let Some(actor_id) = envelope.0.headers().get(CAST_ORIGINATING_SENDER).cloned() {
510+
tracing::debug!(
511+
actor_id = %old_actor,
512+
"PythonActor::handle_undeliverable_message: remapped comm-actor id to id from CAST_ORIGINATING_SENDER {}", actor_id
513+
);
514+
envelope.0.update_sender(actor_id);
515+
// v0 casting
516+
} else if let Some(actor_mesh_id) = envelope.0.headers().get(CAST_ACTOR_MESH_ID) {
517+
match actor_mesh_id {
518+
ActorMeshId::V0(proc_mesh_id, actor_name) => {
519+
let actor_id = ActorId(
520+
ProcId::Ranked(WorldId(proc_mesh_id.0.clone()), 0),
521+
actor_name.clone(),
522+
0,
523+
);
524+
tracing::debug!(
525+
actor_id = %old_actor,
526+
"PythonActor::handle_undeliverable_message: remapped comm-actor id to mesh id from CAST_ACTOR_MESH_ID {}", actor_id
527+
);
528+
envelope.0.update_sender(actor_id);
529+
}
530+
ActorMeshId::V1(_) => {
531+
tracing::debug!(
532+
"PythonActor::handle_undeliverable_message: headers present but V1 ActorMeshId; leaving actor_id unchanged"
533+
);
534+
}
535+
}
536+
} else {
537+
// Do nothing, it wasn't from a comm actor.
538+
}
539+
envelope
540+
}
541+
496542
#[async_trait]
497543
impl Actor for PythonActor {
498544
type Params = PickledPyObject;
@@ -537,9 +583,21 @@ impl Actor for PythonActor {
537583
async fn handle_undeliverable_message(
538584
&mut self,
539585
ins: &Instance<Self>,
540-
envelope: Undeliverable<MessageEnvelope>,
586+
mut envelope: Undeliverable<MessageEnvelope>,
541587
) -> Result<(), anyhow::Error> {
542-
assert_eq!(envelope.0.sender(), ins.self_id());
588+
if envelope.0.sender() != ins.self_id() {
589+
// This can happen if the sender is comm. Update the envelope.
590+
envelope = update_undeliverable_envelope_for_casting(envelope);
591+
}
592+
assert_eq!(
593+
envelope.0.sender(),
594+
ins.self_id(),
595+
"undeliverable message was returned to the wrong actor. \
596+
Return address = {}, src actor = {}, dest actor port = {}",
597+
envelope.0.sender(),
598+
ins.self_id(),
599+
envelope.0.dest()
600+
);
543601

544602
let cx = Context::new(ins, envelope.0.headers().clone());
545603

0 commit comments

Comments
 (0)