Skip to content

Commit 197b9d6

Browse files
mariusaemeta-codesync[bot]
authored andcommitted
mesh: implement local proc bypasss (#1442)
Summary: Pull Request resolved: #1442 This change allows procs local to a host to bypass the host multiplexer. This should improve multicast delivery, as local comm actors do not need to traverse the same (frontend) bottleneck. ghstack-source-id: 320071694 exported-using-ghexport Reviewed By: shayne-fletcher Differential Revision: D83996264 fbshipit-source-id: 2f5695f75a70524f858e194e0362462e4ae4ecf8
1 parent 4b5409e commit 197b9d6

File tree

8 files changed

+331
-31
lines changed

8 files changed

+331
-31
lines changed

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ pub type Data = Vec<u8>;
164164
Deserialize,
165165
Named,
166166
Clone,
167-
PartialEq
167+
PartialEq,
168+
Eq
168169
)]
169170
pub enum DeliveryError {
170171
/// The destination address is not reachable.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,13 @@ use crate::supervision::ActorSupervisionEvent;
3030
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
3131
pub struct Undeliverable<M: Message>(pub M);
3232

33+
impl<M: Message> Undeliverable<M> {
34+
/// Return the inner M-typed message.
35+
pub fn into_inner(self) -> M {
36+
self.0
37+
}
38+
}
39+
3340
// Port handle and receiver for undeliverable messages.
3441
pub(crate) fn new_undeliverable_port() -> (
3542
PortHandle<Undeliverable<MessageEnvelope>>,

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ serde_bytes = "0.11"
7474
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
7575
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
7676
strum = { version = "0.27.1", features = ["derive"] }
77+
tempfile = "3.22"
7778
thiserror = "2.0.12"
7879
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
7980
tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal", "sync", "time"] }
@@ -87,7 +88,6 @@ criterion = { version = "0.5.1", features = ["async_tokio", "csv_output"] }
8788
itertools = "0.14.0"
8889
maplit = "1.0"
8990
proptest = "1.5"
90-
tempfile = "3.22"
9191
timed_test = { version = "0.0.0", path = "../timed_test" }
9292
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
9393

hyperactor_mesh/src/bootstrap.rs

Lines changed: 94 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::future;
1515
use std::io;
1616
use std::io::Write;
1717
use std::os::unix::process::ExitStatusExt;
18+
use std::path::Path;
1819
use std::path::PathBuf;
1920
use std::process::Stdio;
2021
use std::sync::Arc;
@@ -35,6 +36,7 @@ use hyperactor::ProcId;
3536
use hyperactor::attrs::Attrs;
3637
use hyperactor::channel;
3738
use hyperactor::channel::ChannelAddr;
39+
use hyperactor::channel::ChannelError;
3840
use hyperactor::channel::ChannelTransport;
3941
use hyperactor::channel::Rx;
4042
use hyperactor::channel::Tx;
@@ -51,10 +53,13 @@ use hyperactor::host::HostError;
5153
use hyperactor::host::ProcHandle;
5254
use hyperactor::host::ProcManager;
5355
use hyperactor::host::TerminateSummary;
56+
use hyperactor::mailbox::IntoBoxedMailboxSender;
57+
use hyperactor::mailbox::MailboxClient;
5458
use hyperactor::mailbox::MailboxServer;
5559
use hyperactor::proc::Proc;
5660
use serde::Deserialize;
5761
use serde::Serialize;
62+
use tempfile::TempDir;
5863
use tokio::process::Child;
5964
use tokio::process::ChildStderr;
6065
use tokio::process::ChildStdout;
@@ -70,6 +75,8 @@ use crate::v1;
7075
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
7176
use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
7277

78+
mod mailbox;
79+
7380
declare_attrs! {
7481
/// Enable forwarding child stdout/stderr over the mesh log
7582
/// channel.
@@ -276,6 +283,10 @@ pub enum Bootstrap {
276283
backend_addr: ChannelAddr,
277284
/// The callback address used to indicate successful spawning.
278285
callback_addr: ChannelAddr,
286+
/// Directory for storing proc socket files. Procs place their sockets
287+
/// in this directory, so that they can be looked up by other procs
288+
/// for direct transfer.
289+
socket_dir_path: PathBuf,
279290
/// Optional config snapshot (`hyperactor::config::Attrs`)
280291
/// captured by the parent. If present, the child installs it
281292
/// as the `ClientOverride` layer so the parent's effective config
@@ -390,6 +401,7 @@ impl Bootstrap {
390401
proc_id,
391402
backend_addr,
392403
callback_addr,
404+
socket_dir_path,
393405
config,
394406
} => {
395407
if let Some(attrs) = config {
@@ -409,15 +421,39 @@ impl Bootstrap {
409421
eprintln!("(bootstrap) PDEATHSIG disabled via config");
410422
}
411423

412-
let result =
413-
host::spawn_proc(proc_id, backend_addr, callback_addr, |proc| async move {
414-
ProcMeshAgent::boot_v1(proc).await
415-
})
416-
.await;
417-
match result {
418-
Ok(_proc) => halt().await,
419-
Err(e) => e.into(),
420-
}
424+
let (local_addr, name) = ok!(proc_id
425+
.as_direct()
426+
.ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
427+
// TODO provide a direct way to construct these
428+
let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
429+
let serve_addr = serve_addr.parse().unwrap();
430+
431+
// The following is a modified host::spawn_proc to support direct
432+
// dialing between local procs: 1) we bind each proc to a deterministic
433+
// address in socket_dir_path; 2) we use LocalProcDialer to dial these
434+
// addresses for local procs.
435+
let proc_sender = mailbox::LocalProcDialer::new(
436+
local_addr.clone(),
437+
socket_dir_path,
438+
ok!(MailboxClient::dial(backend_addr)),
439+
);
440+
441+
let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
442+
443+
let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
444+
.await
445+
.map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
446+
447+
// Finally serve the proc on the same transport as the backend address,
448+
// and call back.
449+
let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr, "proc_backend"));
450+
proc.clone().serve(proc_rx);
451+
ok!(ok!(channel::dial(callback_addr))
452+
.send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
453+
.await
454+
.map_err(ChannelError::from));
455+
456+
halt().await
421457
}
422458
Bootstrap::Host {
423459
addr,
@@ -435,7 +471,8 @@ impl Bootstrap {
435471
Some(command) => command,
436472
None => ok!(BootstrapCommand::current()),
437473
};
438-
let manager = BootstrapProcManager::new(command);
474+
let manager = BootstrapProcManager::new(command).unwrap();
475+
439476
let (host, _handle) = ok!(Host::serve(manager, addr).await);
440477
let addr = host.addr().clone();
441478
let host_mesh_agent = ok!(host
@@ -1512,6 +1549,11 @@ pub struct BootstrapProcManager {
15121549
/// FileMonitor that aggregates logs from all children.
15131550
/// None if file monitor creation failed.
15141551
file_appender: Option<Arc<crate::logging::FileAppender>>,
1552+
1553+
/// Directory for storing proc socket files. Procs place their sockets
1554+
/// in this directory, so that they can be looked up by other procs
1555+
/// for direct transfer.
1556+
socket_dir: TempDir,
15151557
}
15161558

15171559
impl Drop for BootstrapProcManager {
@@ -1561,7 +1603,7 @@ impl BootstrapProcManager {
15611603
/// This is the general entry point when you want to manage procs
15621604
/// backed by a specific binary path (e.g. a bootstrap
15631605
/// trampoline).
1564-
pub(crate) fn new(command: BootstrapCommand) -> Self {
1606+
pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
15651607
let file_appender = if hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE) {
15661608
match crate::logging::FileAppender::new() {
15671609
Some(fm) => {
@@ -1577,19 +1619,25 @@ impl BootstrapProcManager {
15771619
None
15781620
};
15791621

1580-
Self {
1622+
Ok(Self {
15811623
command,
15821624
children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
15831625
pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
15841626
file_appender,
1585-
}
1627+
socket_dir: runtime_dir()?,
1628+
})
15861629
}
15871630

15881631
/// The bootstrap command used to launch processes.
15891632
pub fn command(&self) -> &BootstrapCommand {
15901633
&self.command
15911634
}
15921635

1636+
/// The socket directory, where per-proc Unix sockets are placed.
1637+
pub fn socket_dir(&self) -> &Path {
1638+
self.socket_dir.path()
1639+
}
1640+
15931641
/// Return the current [`ProcStatus`] for the given [`ProcId`], if
15941642
/// the proc is known to this manager.
15951643
///
@@ -1767,6 +1815,7 @@ impl ProcManager for BootstrapProcManager {
17671815
proc_id: proc_id.clone(),
17681816
backend_addr,
17691817
callback_addr,
1818+
socket_dir_path: self.socket_dir.path().to_owned(),
17701819
config: Some(config.client_config_override),
17711820
};
17721821
let mut cmd = self.command.new();
@@ -2227,6 +2276,18 @@ impl Write for Debug {
22272276
}
22282277
}
22292278

2279+
/// Create a new runtime [`TempDir`]. The directory is created in
2280+
/// `$XDG_RUNTIME_DIR`, otherwise falling back to the system tempdir.
2281+
fn runtime_dir() -> io::Result<TempDir> {
2282+
match std::env::var_os("XDG_RUNTIME_DIR") {
2283+
Some(runtime_dir) => {
2284+
let path = PathBuf::from(runtime_dir);
2285+
tempfile::tempdir_in(path)
2286+
}
2287+
None => tempfile::tempdir(),
2288+
}
2289+
}
2290+
22302291
#[cfg(test)]
22312292
mod tests {
22322293
use std::path::PathBuf;
@@ -2270,6 +2331,7 @@ mod tests {
22702331
proc_id: id!(foo[0]),
22712332
backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
22722333
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2334+
socket_dir_path: PathBuf::from("notexist"),
22732335
config: None,
22742336
},
22752337
];
@@ -2327,13 +2389,16 @@ mod tests {
23272389
attrs[MESH_TAIL_LOG_LINES] = 123;
23282390
attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
23292391

2392+
let socket_dir = runtime_dir().unwrap();
2393+
23302394
// Proc case
23312395
{
23322396
let original = Bootstrap::Proc {
23332397
proc_id: id!(foo[42]),
23342398
backend_addr: ChannelAddr::any(ChannelTransport::Unix),
23352399
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
23362400
config: Some(attrs.clone()),
2401+
socket_dir_path: socket_dir.path().to_owned(),
23372402
};
23382403
let env_str = original.to_env_safe_string().expect("encode bootstrap");
23392404
let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
@@ -2373,14 +2438,13 @@ mod tests {
23732438
use std::process::Stdio;
23742439

23752440
use tokio::process::Command;
2376-
use tokio::time::Duration;
23772441

23782442
// Manager; program path is irrelevant for this test.
23792443
let command = BootstrapCommand {
23802444
program: PathBuf::from("/bin/true"),
23812445
..Default::default()
23822446
};
2383-
let manager = BootstrapProcManager::new(command);
2447+
let manager = BootstrapProcManager::new(command).unwrap();
23842448

23852449
// Spawn a long-running child process (sleep 30) with
23862450
// kill_on_drop(true).
@@ -2760,7 +2824,7 @@ mod tests {
27602824
program: PathBuf::from("/bin/true"),
27612825
..Default::default()
27622826
};
2763-
let manager = BootstrapProcManager::new(command);
2827+
let manager = BootstrapProcManager::new(command).unwrap();
27642828

27652829
// Spawn a fast-exiting child.
27662830
let mut cmd = Command::new("true");
@@ -2794,7 +2858,7 @@ mod tests {
27942858
program: PathBuf::from("/bin/sleep"),
27952859
..Default::default()
27962860
};
2797-
let manager = BootstrapProcManager::new(command);
2861+
let manager = BootstrapProcManager::new(command).unwrap();
27982862

27992863
// Spawn a process that will live long enough to kill.
28002864
let mut cmd = Command::new("/bin/sleep");
@@ -2911,7 +2975,8 @@ mod tests {
29112975
let manager = BootstrapProcManager::new(BootstrapCommand {
29122976
program: PathBuf::from("/bin/true"),
29132977
..Default::default()
2914-
});
2978+
})
2979+
.unwrap();
29152980
let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
29162981
assert!(manager.status(&unknown).await.is_none());
29172982
}
@@ -2921,7 +2986,8 @@ mod tests {
29212986
let manager = BootstrapProcManager::new(BootstrapCommand {
29222987
program: PathBuf::from("/bin/sleep"),
29232988
..Default::default()
2924-
});
2989+
})
2990+
.unwrap();
29252991

29262992
// Long-ish child so it's alive while we "steal" it.
29272993
let mut cmd = Command::new("/bin/sleep");
@@ -2960,7 +3026,8 @@ mod tests {
29603026
let manager = BootstrapProcManager::new(BootstrapCommand {
29613027
program: PathBuf::from("/bin/sleep"),
29623028
..Default::default()
2963-
});
3029+
})
3030+
.unwrap();
29643031

29653032
let mut cmd = Command::new("/bin/sleep");
29663033
cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
@@ -3313,8 +3380,6 @@ mod tests {
33133380
instance: &hyperactor::Instance<()>,
33143381
_tag: &str,
33153382
) -> (ProcId, ChannelAddr) {
3316-
let proc_id = id!(bootstrap_child[0]);
3317-
33183383
// Serve a Unix channel as the "backend_addr" and hook it into
33193384
// this test proc.
33203385
let (backend_addr, rx) =
@@ -3325,6 +3390,9 @@ mod tests {
33253390
// router.
33263391
instance.proc().clone().serve(rx);
33273392

3393+
// We return an arbitrary (but unbound!) unix direct proc id here;
3394+
// it is okay, as we're not testing connectivity.
3395+
let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
33283396
(proc_id, backend_addr)
33293397
}
33303398

@@ -3336,7 +3404,7 @@ mod tests {
33363404
.unwrap();
33373405
let (instance, _handle) = root.instance("client").unwrap();
33383406

3339-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3407+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
33403408
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
33413409
let handle = mgr
33423410
.spawn(
@@ -3399,7 +3467,7 @@ mod tests {
33993467
.unwrap();
34003468
let (instance, _handle) = root.instance("client").unwrap();
34013469

3402-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3470+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
34033471

34043472
// Proc identity + host backend channel the child will dial.
34053473
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
@@ -3622,7 +3690,8 @@ mod tests {
36223690
let manager = BootstrapProcManager::new(BootstrapCommand {
36233691
program: std::path::PathBuf::from("/bin/true"), // unused in this test
36243692
..Default::default()
3625-
});
3693+
})
3694+
.unwrap();
36263695

36273696
// Give the monitors time to start
36283697
RealClock.sleep(Duration::from_millis(1000)).await;

0 commit comments

Comments
 (0)