Skip to content

Commit 063fe55

Browse files
committed
[hyperactor] mesh: implement local proc bypasss
Pull Request resolved: #1442 This change allows procs local to a host to bypass the host multiplexer. This should improve multicast delivery, as local comm actors do not need to traverse the same (frontend) bottleneck. ghstack-source-id: 319535620 @exported-using-ghexport Differential Revision: [D83996264](https://our.internmc.facebook.com/intern/diff/D83996264/)
1 parent 8f3981c commit 063fe55

File tree

7 files changed

+310
-30
lines changed

7 files changed

+310
-30
lines changed

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ pub type Data = Vec<u8>;
164164
Deserialize,
165165
Named,
166166
Clone,
167-
PartialEq
167+
PartialEq,
168+
Eq
168169
)]
169170
pub enum DeliveryError {
170171
/// The destination address is not reachable.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ use crate::supervision::ActorSupervisionEvent;
3232
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
3333
pub struct Undeliverable<M: Message>(pub M);
3434

35+
impl<M: Message> Undeliverable<M> {
36+
/// Return the inner M-typed message.
37+
pub fn into_inner(self) -> M {
38+
self.0
39+
}
40+
}
41+
3542
// Port handle and receiver for undeliverable messages.
3643
pub(crate) fn new_undeliverable_port() -> (
3744
PortHandle<Undeliverable<MessageEnvelope>>,

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ serde_bytes = "0.11"
7575
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
7676
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
7777
strum = { version = "0.27.1", features = ["derive"] }
78+
tempfile = "3.22"
7879
thiserror = "2.0.12"
7980
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
8081
tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal", "sync", "time"] }
@@ -88,7 +89,6 @@ criterion = { version = "0.5.1", features = ["async_tokio", "csv_output"] }
8889
itertools = "0.14.0"
8990
maplit = "1.0"
9091
proptest = "1.5"
91-
tempfile = "3.22"
9292
timed_test = { version = "0.0.0", path = "../timed_test" }
9393
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
9494

hyperactor_mesh/src/bootstrap.rs

Lines changed: 94 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::future;
1515
use std::io;
1616
use std::io::Write;
1717
use std::os::unix::process::ExitStatusExt;
18+
use std::path::Path;
1819
use std::path::PathBuf;
1920
use std::process::Stdio;
2021
use std::sync::Arc;
@@ -35,6 +36,7 @@ use hyperactor::ProcId;
3536
use hyperactor::attrs::Attrs;
3637
use hyperactor::channel;
3738
use hyperactor::channel::ChannelAddr;
39+
use hyperactor::channel::ChannelError;
3840
use hyperactor::channel::ChannelTransport;
3941
use hyperactor::channel::Rx;
4042
use hyperactor::channel::Tx;
@@ -50,10 +52,13 @@ use hyperactor::host::HostError;
5052
use hyperactor::host::ProcHandle;
5153
use hyperactor::host::ProcManager;
5254
use hyperactor::host::TerminateSummary;
55+
use hyperactor::mailbox::IntoBoxedMailboxSender;
56+
use hyperactor::mailbox::MailboxClient;
5357
use hyperactor::mailbox::MailboxServer;
5458
use hyperactor::proc::Proc;
5559
use serde::Deserialize;
5660
use serde::Serialize;
61+
use tempfile::TempDir;
5762
use tokio::process::Child;
5863
use tokio::process::ChildStderr;
5964
use tokio::process::ChildStdout;
@@ -68,6 +73,8 @@ use crate::v1;
6873
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
6974
use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
7075

76+
mod mailbox;
77+
7178
declare_attrs! {
7279
/// If enabled (default), bootstrap child processes install
7380
/// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
@@ -228,6 +235,10 @@ pub enum Bootstrap {
228235
backend_addr: ChannelAddr,
229236
/// The callback address used to indicate successful spawning.
230237
callback_addr: ChannelAddr,
238+
/// Directory for storing proc socket files. Procs place their sockets
239+
/// in this directory, so that they can be looked up by other procs
240+
/// for direct transfer.
241+
socket_dir_path: PathBuf,
231242
/// Optional config snapshot (`hyperactor::config::Attrs`)
232243
/// captured by the parent. If present, the child installs it
233244
/// as the `Runtime` layer so the parent's effective config
@@ -340,6 +351,7 @@ impl Bootstrap {
340351
proc_id,
341352
backend_addr,
342353
callback_addr,
354+
socket_dir_path,
343355
config,
344356
} => {
345357
if let Some(attrs) = config {
@@ -359,15 +371,39 @@ impl Bootstrap {
359371
eprintln!("(bootstrap) PDEATHSIG disabled via config");
360372
}
361373

362-
let result =
363-
host::spawn_proc(proc_id, backend_addr, callback_addr, |proc| async move {
364-
ProcMeshAgent::boot_v1(proc).await
365-
})
366-
.await;
367-
match result {
368-
Ok(_proc) => halt().await,
369-
Err(e) => e.into(),
370-
}
374+
let (local_addr, name) = ok!(proc_id
375+
.as_direct()
376+
.ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
377+
// TODO provide a direct way to construct these
378+
let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
379+
let serve_addr = serve_addr.parse().unwrap();
380+
381+
// The following is a modified host::spawn_proc to support direct
382+
// dialing between local procs: 1) we bind each proc to a deterministic
383+
// address in socket_dir_path; 2) we use LocalProcDialer to dial these
384+
// addresses for local procs.
385+
let proc_sender = mailbox::LocalProcDialer::new(
386+
local_addr.clone(),
387+
socket_dir_path,
388+
ok!(MailboxClient::dial(backend_addr)),
389+
);
390+
391+
let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
392+
393+
let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
394+
.await
395+
.map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
396+
397+
// Finally serve the proc on the same transport as the backend address,
398+
// and call back.
399+
let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr, "proc_backend"));
400+
proc.clone().serve(proc_rx);
401+
ok!(ok!(channel::dial(callback_addr))
402+
.send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
403+
.await
404+
.map_err(ChannelError::from));
405+
406+
halt().await
371407
}
372408
Bootstrap::Host {
373409
addr,
@@ -385,7 +421,8 @@ impl Bootstrap {
385421
Some(command) => command,
386422
None => ok!(BootstrapCommand::current()),
387423
};
388-
let manager = BootstrapProcManager::new(command);
424+
let manager = BootstrapProcManager::new(command).unwrap();
425+
389426
let (host, _handle) = ok!(Host::serve(manager, addr).await);
390427
let addr = host.addr().clone();
391428
let host_mesh_agent = ok!(host
@@ -1421,6 +1458,11 @@ pub struct BootstrapProcManager {
14211458
/// FileMonitor that aggregates logs from all children.
14221459
/// None if file monitor creation failed.
14231460
file_appender: Option<Arc<crate::logging::FileAppender>>,
1461+
1462+
/// Directory for storing proc socket files. Procs place their sockets
1463+
/// in this directory, so that they can be looked up by other procs
1464+
/// for direct transfer.
1465+
socket_dir: TempDir,
14241466
}
14251467

14261468
impl Drop for BootstrapProcManager {
@@ -1470,7 +1512,7 @@ impl BootstrapProcManager {
14701512
/// This is the general entry point when you want to manage procs
14711513
/// backed by a specific binary path (e.g. a bootstrap
14721514
/// trampoline).
1473-
pub(crate) fn new(command: BootstrapCommand) -> Self {
1515+
pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
14741516
let log_monitor = match crate::logging::FileAppender::new() {
14751517
Some(fm) => {
14761518
tracing::info!("log monitor created successfully");
@@ -1482,19 +1524,25 @@ impl BootstrapProcManager {
14821524
}
14831525
};
14841526

1485-
Self {
1527+
Ok(Self {
14861528
command,
14871529
children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
14881530
pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
14891531
file_appender: log_monitor,
1490-
}
1532+
socket_dir: runtime_dir()?,
1533+
})
14911534
}
14921535

14931536
/// The bootstrap command used to launch processes.
14941537
pub fn command(&self) -> &BootstrapCommand {
14951538
&self.command
14961539
}
14971540

1541+
/// The socket directory, where per-proc Unix sockets are placed.
1542+
pub fn socket_dir(&self) -> &Path {
1543+
self.socket_dir.path()
1544+
}
1545+
14981546
/// Return the current [`ProcStatus`] for the given [`ProcId`], if
14991547
/// the proc is known to this manager.
15001548
///
@@ -1672,6 +1720,7 @@ impl ProcManager for BootstrapProcManager {
16721720
proc_id: proc_id.clone(),
16731721
backend_addr,
16741722
callback_addr,
1723+
socket_dir_path: self.socket_dir.path().to_owned(),
16751724
config: Some(config.client_config_override),
16761725
};
16771726
let mut cmd = Command::new(&self.command.program);
@@ -2107,6 +2156,18 @@ impl Write for Debug {
21072156
}
21082157
}
21092158

2159+
/// Create a new runtime [`TempDir`]. The directory is created in
2160+
/// `$XDG_RUNTIME_DIR`, otherwise falling back to the system tempdir.
2161+
fn runtime_dir() -> io::Result<TempDir> {
2162+
match std::env::var_os("XDG_RUNTIME_DIR") {
2163+
Some(runtime_dir) => {
2164+
let path = PathBuf::from(runtime_dir);
2165+
tempfile::tempdir_in(path)
2166+
}
2167+
None => tempfile::tempdir(),
2168+
}
2169+
}
2170+
21102171
#[cfg(test)]
21112172
mod tests {
21122173
use std::path::PathBuf;
@@ -2150,6 +2211,7 @@ mod tests {
21502211
proc_id: id!(foo[0]),
21512212
backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
21522213
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2214+
socket_dir_path: PathBuf::from("notexist"),
21532215
config: None,
21542216
},
21552217
];
@@ -2207,13 +2269,16 @@ mod tests {
22072269
attrs[MESH_TAIL_LOG_LINES] = 123;
22082270
attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
22092271

2272+
let socket_dir = runtime_dir().unwrap();
2273+
22102274
// Proc case
22112275
{
22122276
let original = Bootstrap::Proc {
22132277
proc_id: id!(foo[42]),
22142278
backend_addr: ChannelAddr::any(ChannelTransport::Unix),
22152279
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
22162280
config: Some(attrs.clone()),
2281+
socket_dir_path: socket_dir.path().to_owned(),
22172282
};
22182283
let env_str = original.to_env_safe_string().expect("encode bootstrap");
22192284
let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
@@ -2253,14 +2318,13 @@ mod tests {
22532318
use std::process::Stdio;
22542319

22552320
use tokio::process::Command;
2256-
use tokio::time::Duration;
22572321

22582322
// Manager; program path is irrelevant for this test.
22592323
let command = BootstrapCommand {
22602324
program: PathBuf::from("/bin/true"),
22612325
..Default::default()
22622326
};
2263-
let manager = BootstrapProcManager::new(command);
2327+
let manager = BootstrapProcManager::new(command).unwrap();
22642328

22652329
// Spawn a long-running child process (sleep 30) with
22662330
// kill_on_drop(true).
@@ -2640,7 +2704,7 @@ mod tests {
26402704
program: PathBuf::from("/bin/true"),
26412705
..Default::default()
26422706
};
2643-
let manager = BootstrapProcManager::new(command);
2707+
let manager = BootstrapProcManager::new(command).unwrap();
26442708

26452709
// Spawn a fast-exiting child.
26462710
let mut cmd = Command::new("true");
@@ -2674,7 +2738,7 @@ mod tests {
26742738
program: PathBuf::from("/bin/sleep"),
26752739
..Default::default()
26762740
};
2677-
let manager = BootstrapProcManager::new(command);
2741+
let manager = BootstrapProcManager::new(command).unwrap();
26782742

26792743
// Spawn a process that will live long enough to kill.
26802744
let mut cmd = Command::new("/bin/sleep");
@@ -2791,7 +2855,8 @@ mod tests {
27912855
let manager = BootstrapProcManager::new(BootstrapCommand {
27922856
program: PathBuf::from("/bin/true"),
27932857
..Default::default()
2794-
});
2858+
})
2859+
.unwrap();
27952860
let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
27962861
assert!(manager.status(&unknown).await.is_none());
27972862
}
@@ -2801,7 +2866,8 @@ mod tests {
28012866
let manager = BootstrapProcManager::new(BootstrapCommand {
28022867
program: PathBuf::from("/bin/sleep"),
28032868
..Default::default()
2804-
});
2869+
})
2870+
.unwrap();
28052871

28062872
// Long-ish child so it's alive while we "steal" it.
28072873
let mut cmd = Command::new("/bin/sleep");
@@ -2840,7 +2906,8 @@ mod tests {
28402906
let manager = BootstrapProcManager::new(BootstrapCommand {
28412907
program: PathBuf::from("/bin/sleep"),
28422908
..Default::default()
2843-
});
2909+
})
2910+
.unwrap();
28442911

28452912
let mut cmd = Command::new("/bin/sleep");
28462913
cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
@@ -3193,8 +3260,6 @@ mod tests {
31933260
instance: &hyperactor::Instance<()>,
31943261
_tag: &str,
31953262
) -> (ProcId, ChannelAddr) {
3196-
let proc_id = id!(bootstrap_child[0]);
3197-
31983263
// Serve a Unix channel as the "backend_addr" and hook it into
31993264
// this test proc.
32003265
let (backend_addr, rx) =
@@ -3205,6 +3270,9 @@ mod tests {
32053270
// router.
32063271
instance.proc().clone().serve(rx);
32073272

3273+
// We return an arbitrary (but unbound!) unix direct proc id here;
3274+
// it is okay, as we're not testing connectivity.
3275+
let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
32083276
(proc_id, backend_addr)
32093277
}
32103278

@@ -3216,7 +3284,7 @@ mod tests {
32163284
.unwrap();
32173285
let (instance, _handle) = root.instance("client").unwrap();
32183286

3219-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3287+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
32203288
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
32213289
let handle = mgr
32223290
.spawn(
@@ -3279,7 +3347,7 @@ mod tests {
32793347
.unwrap();
32803348
let (instance, _handle) = root.instance("client").unwrap();
32813349

3282-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3350+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
32833351

32843352
// Proc identity + host backend channel the child will dial.
32853353
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
@@ -3502,7 +3570,8 @@ mod tests {
35023570
let manager = BootstrapProcManager::new(BootstrapCommand {
35033571
program: std::path::PathBuf::from("/bin/true"), // unused in this test
35043572
..Default::default()
3505-
});
3573+
})
3574+
.unwrap();
35063575

35073576
// Give the monitors time to start
35083577
RealClock.sleep(Duration::from_millis(1000)).await;

0 commit comments

Comments
 (0)