Skip to content

Commit d6dee1c

Browse files
committed
[hyperactor] mesh: implement local proc bypasss
Pull Request resolved: #1442 This change allows procs local to a host to bypass the host multiplexer. This should improve multicast delivery, as local comm actors do not need to traverse the same (frontend) bottleneck. ghstack-source-id: 319797871 @exported-using-ghexport Differential Revision: [D83996264](https://our.internmc.facebook.com/intern/diff/D83996264/)
1 parent 434e447 commit d6dee1c

File tree

8 files changed

+312
-31
lines changed

8 files changed

+312
-31
lines changed

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ pub type Data = Vec<u8>;
164164
Deserialize,
165165
Named,
166166
Clone,
167-
PartialEq
167+
PartialEq,
168+
Eq
168169
)]
169170
pub enum DeliveryError {
170171
/// The destination address is not reachable.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ use crate::supervision::ActorSupervisionEvent;
3232
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
3333
pub struct Undeliverable<M: Message>(pub M);
3434

35+
impl<M: Message> Undeliverable<M> {
36+
/// Return the inner M-typed message.
37+
pub fn into_inner(self) -> M {
38+
self.0
39+
}
40+
}
41+
3542
// Port handle and receiver for undeliverable messages.
3643
pub(crate) fn new_undeliverable_port() -> (
3744
PortHandle<Undeliverable<MessageEnvelope>>,

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ serde_bytes = "0.11"
7575
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
7676
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
7777
strum = { version = "0.27.1", features = ["derive"] }
78+
tempfile = "3.22"
7879
thiserror = "2.0.12"
7980
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
8081
tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal", "sync", "time"] }
@@ -88,7 +89,6 @@ criterion = { version = "0.5.1", features = ["async_tokio", "csv_output"] }
8889
itertools = "0.14.0"
8990
maplit = "1.0"
9091
proptest = "1.5"
91-
tempfile = "3.22"
9292
timed_test = { version = "0.0.0", path = "../timed_test" }
9393
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
9494

hyperactor_mesh/src/bootstrap.rs

Lines changed: 94 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::future;
1515
use std::io;
1616
use std::io::Write;
1717
use std::os::unix::process::ExitStatusExt;
18+
use std::path::Path;
1819
use std::path::PathBuf;
1920
use std::process::Stdio;
2021
use std::sync::Arc;
@@ -35,6 +36,7 @@ use hyperactor::ProcId;
3536
use hyperactor::attrs::Attrs;
3637
use hyperactor::channel;
3738
use hyperactor::channel::ChannelAddr;
39+
use hyperactor::channel::ChannelError;
3840
use hyperactor::channel::ChannelTransport;
3941
use hyperactor::channel::Rx;
4042
use hyperactor::channel::Tx;
@@ -50,10 +52,13 @@ use hyperactor::host::HostError;
5052
use hyperactor::host::ProcHandle;
5153
use hyperactor::host::ProcManager;
5254
use hyperactor::host::TerminateSummary;
55+
use hyperactor::mailbox::IntoBoxedMailboxSender;
56+
use hyperactor::mailbox::MailboxClient;
5357
use hyperactor::mailbox::MailboxServer;
5458
use hyperactor::proc::Proc;
5559
use serde::Deserialize;
5660
use serde::Serialize;
61+
use tempfile::TempDir;
5762
use tokio::process::Child;
5863
use tokio::process::ChildStderr;
5964
use tokio::process::ChildStdout;
@@ -68,6 +73,8 @@ use crate::v1;
6873
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
6974
use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
7075

76+
mod mailbox;
77+
7178
declare_attrs! {
7279
/// If enabled (default), bootstrap child processes install
7380
/// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
@@ -228,6 +235,10 @@ pub enum Bootstrap {
228235
backend_addr: ChannelAddr,
229236
/// The callback address used to indicate successful spawning.
230237
callback_addr: ChannelAddr,
238+
/// Directory for storing proc socket files. Procs place their sockets
239+
/// in this directory, so that they can be looked up by other procs
240+
/// for direct transfer.
241+
socket_dir_path: PathBuf,
231242
/// Optional config snapshot (`hyperactor::config::Attrs`)
232243
/// captured by the parent. If present, the child installs it
233244
/// as the `Runtime` layer so the parent's effective config
@@ -340,6 +351,7 @@ impl Bootstrap {
340351
proc_id,
341352
backend_addr,
342353
callback_addr,
354+
socket_dir_path,
343355
config,
344356
} => {
345357
if let Some(attrs) = config {
@@ -359,15 +371,39 @@ impl Bootstrap {
359371
eprintln!("(bootstrap) PDEATHSIG disabled via config");
360372
}
361373

362-
let result =
363-
host::spawn_proc(proc_id, backend_addr, callback_addr, |proc| async move {
364-
ProcMeshAgent::boot_v1(proc).await
365-
})
366-
.await;
367-
match result {
368-
Ok(_proc) => halt().await,
369-
Err(e) => e.into(),
370-
}
374+
let (local_addr, name) = ok!(proc_id
375+
.as_direct()
376+
.ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
377+
// TODO provide a direct way to construct these
378+
let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
379+
let serve_addr = serve_addr.parse().unwrap();
380+
381+
// The following is a modified host::spawn_proc to support direct
382+
// dialing between local procs: 1) we bind each proc to a deterministic
383+
// address in socket_dir_path; 2) we use LocalProcDialer to dial these
384+
// addresses for local procs.
385+
let proc_sender = mailbox::LocalProcDialer::new(
386+
local_addr.clone(),
387+
socket_dir_path,
388+
ok!(MailboxClient::dial(backend_addr)),
389+
);
390+
391+
let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
392+
393+
let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
394+
.await
395+
.map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
396+
397+
// Finally serve the proc on the same transport as the backend address,
398+
// and call back.
399+
let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr, "proc_backend"));
400+
proc.clone().serve(proc_rx);
401+
ok!(ok!(channel::dial(callback_addr))
402+
.send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
403+
.await
404+
.map_err(ChannelError::from));
405+
406+
halt().await
371407
}
372408
Bootstrap::Host {
373409
addr,
@@ -385,7 +421,8 @@ impl Bootstrap {
385421
Some(command) => command,
386422
None => ok!(BootstrapCommand::current()),
387423
};
388-
let manager = BootstrapProcManager::new(command);
424+
let manager = BootstrapProcManager::new(command).unwrap();
425+
389426
let (host, _handle) = ok!(Host::serve(manager, addr).await);
390427
let addr = host.addr().clone();
391428
let host_mesh_agent = ok!(host
@@ -1437,6 +1474,11 @@ pub struct BootstrapProcManager {
14371474
/// FileMonitor that aggregates logs from all children.
14381475
/// None if file monitor creation failed.
14391476
file_appender: Option<Arc<crate::logging::FileAppender>>,
1477+
1478+
/// Directory for storing proc socket files. Procs place their sockets
1479+
/// in this directory, so that they can be looked up by other procs
1480+
/// for direct transfer.
1481+
socket_dir: TempDir,
14401482
}
14411483

14421484
impl Drop for BootstrapProcManager {
@@ -1486,7 +1528,7 @@ impl BootstrapProcManager {
14861528
/// This is the general entry point when you want to manage procs
14871529
/// backed by a specific binary path (e.g. a bootstrap
14881530
/// trampoline).
1489-
pub(crate) fn new(command: BootstrapCommand) -> Self {
1531+
pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
14901532
let log_monitor = match crate::logging::FileAppender::new() {
14911533
Some(fm) => {
14921534
tracing::info!("log monitor created successfully");
@@ -1498,19 +1540,25 @@ impl BootstrapProcManager {
14981540
}
14991541
};
15001542

1501-
Self {
1543+
Ok(Self {
15021544
command,
15031545
children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
15041546
pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
15051547
file_appender: log_monitor,
1506-
}
1548+
socket_dir: runtime_dir()?,
1549+
})
15071550
}
15081551

15091552
/// The bootstrap command used to launch processes.
15101553
pub fn command(&self) -> &BootstrapCommand {
15111554
&self.command
15121555
}
15131556

1557+
/// The socket directory, where per-proc Unix sockets are placed.
1558+
pub fn socket_dir(&self) -> &Path {
1559+
self.socket_dir.path()
1560+
}
1561+
15141562
/// Return the current [`ProcStatus`] for the given [`ProcId`], if
15151563
/// the proc is known to this manager.
15161564
///
@@ -1688,6 +1736,7 @@ impl ProcManager for BootstrapProcManager {
16881736
proc_id: proc_id.clone(),
16891737
backend_addr,
16901738
callback_addr,
1739+
socket_dir_path: self.socket_dir.path().to_owned(),
16911740
config: Some(config.client_config_override),
16921741
};
16931742
let mut cmd = self.command.new();
@@ -2114,6 +2163,18 @@ impl Write for Debug {
21142163
}
21152164
}
21162165

2166+
/// Create a new runtime [`TempDir`]. The directory is created in
2167+
/// `$XDG_RUNTIME_DIR`, otherwise falling back to the system tempdir.
2168+
fn runtime_dir() -> io::Result<TempDir> {
2169+
match std::env::var_os("XDG_RUNTIME_DIR") {
2170+
Some(runtime_dir) => {
2171+
let path = PathBuf::from(runtime_dir);
2172+
tempfile::tempdir_in(path)
2173+
}
2174+
None => tempfile::tempdir(),
2175+
}
2176+
}
2177+
21172178
#[cfg(test)]
21182179
mod tests {
21192180
use std::path::PathBuf;
@@ -2157,6 +2218,7 @@ mod tests {
21572218
proc_id: id!(foo[0]),
21582219
backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
21592220
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2221+
socket_dir_path: PathBuf::from("notexist"),
21602222
config: None,
21612223
},
21622224
];
@@ -2214,13 +2276,16 @@ mod tests {
22142276
attrs[MESH_TAIL_LOG_LINES] = 123;
22152277
attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
22162278

2279+
let socket_dir = runtime_dir().unwrap();
2280+
22172281
// Proc case
22182282
{
22192283
let original = Bootstrap::Proc {
22202284
proc_id: id!(foo[42]),
22212285
backend_addr: ChannelAddr::any(ChannelTransport::Unix),
22222286
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
22232287
config: Some(attrs.clone()),
2288+
socket_dir_path: socket_dir.path().to_owned(),
22242289
};
22252290
let env_str = original.to_env_safe_string().expect("encode bootstrap");
22262291
let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
@@ -2260,14 +2325,13 @@ mod tests {
22602325
use std::process::Stdio;
22612326

22622327
use tokio::process::Command;
2263-
use tokio::time::Duration;
22642328

22652329
// Manager; program path is irrelevant for this test.
22662330
let command = BootstrapCommand {
22672331
program: PathBuf::from("/bin/true"),
22682332
..Default::default()
22692333
};
2270-
let manager = BootstrapProcManager::new(command);
2334+
let manager = BootstrapProcManager::new(command).unwrap();
22712335

22722336
// Spawn a long-running child process (sleep 30) with
22732337
// kill_on_drop(true).
@@ -2647,7 +2711,7 @@ mod tests {
26472711
program: PathBuf::from("/bin/true"),
26482712
..Default::default()
26492713
};
2650-
let manager = BootstrapProcManager::new(command);
2714+
let manager = BootstrapProcManager::new(command).unwrap();
26512715

26522716
// Spawn a fast-exiting child.
26532717
let mut cmd = Command::new("true");
@@ -2681,7 +2745,7 @@ mod tests {
26812745
program: PathBuf::from("/bin/sleep"),
26822746
..Default::default()
26832747
};
2684-
let manager = BootstrapProcManager::new(command);
2748+
let manager = BootstrapProcManager::new(command).unwrap();
26852749

26862750
// Spawn a process that will live long enough to kill.
26872751
let mut cmd = Command::new("/bin/sleep");
@@ -2798,7 +2862,8 @@ mod tests {
27982862
let manager = BootstrapProcManager::new(BootstrapCommand {
27992863
program: PathBuf::from("/bin/true"),
28002864
..Default::default()
2801-
});
2865+
})
2866+
.unwrap();
28022867
let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
28032868
assert!(manager.status(&unknown).await.is_none());
28042869
}
@@ -2808,7 +2873,8 @@ mod tests {
28082873
let manager = BootstrapProcManager::new(BootstrapCommand {
28092874
program: PathBuf::from("/bin/sleep"),
28102875
..Default::default()
2811-
});
2876+
})
2877+
.unwrap();
28122878

28132879
// Long-ish child so it's alive while we "steal" it.
28142880
let mut cmd = Command::new("/bin/sleep");
@@ -2847,7 +2913,8 @@ mod tests {
28472913
let manager = BootstrapProcManager::new(BootstrapCommand {
28482914
program: PathBuf::from("/bin/sleep"),
28492915
..Default::default()
2850-
});
2916+
})
2917+
.unwrap();
28512918

28522919
let mut cmd = Command::new("/bin/sleep");
28532920
cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
@@ -3200,8 +3267,6 @@ mod tests {
32003267
instance: &hyperactor::Instance<()>,
32013268
_tag: &str,
32023269
) -> (ProcId, ChannelAddr) {
3203-
let proc_id = id!(bootstrap_child[0]);
3204-
32053270
// Serve a Unix channel as the "backend_addr" and hook it into
32063271
// this test proc.
32073272
let (backend_addr, rx) =
@@ -3212,6 +3277,9 @@ mod tests {
32123277
// router.
32133278
instance.proc().clone().serve(rx);
32143279

3280+
// We return an arbitrary (but unbound!) unix direct proc id here;
3281+
// it is okay, as we're not testing connectivity.
3282+
let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
32153283
(proc_id, backend_addr)
32163284
}
32173285

@@ -3223,7 +3291,7 @@ mod tests {
32233291
.unwrap();
32243292
let (instance, _handle) = root.instance("client").unwrap();
32253293

3226-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3294+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
32273295
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
32283296
let handle = mgr
32293297
.spawn(
@@ -3286,7 +3354,7 @@ mod tests {
32863354
.unwrap();
32873355
let (instance, _handle) = root.instance("client").unwrap();
32883356

3289-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3357+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
32903358

32913359
// Proc identity + host backend channel the child will dial.
32923360
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
@@ -3509,7 +3577,8 @@ mod tests {
35093577
let manager = BootstrapProcManager::new(BootstrapCommand {
35103578
program: std::path::PathBuf::from("/bin/true"), // unused in this test
35113579
..Default::default()
3512-
});
3580+
})
3581+
.unwrap();
35133582

35143583
// Give the monitors time to start
35153584
RealClock.sleep(Duration::from_millis(1000)).await;

0 commit comments

Comments
 (0)