Skip to content

Commit 141bc9a

Browse files
committed
[hyperactor] mesh: implement local proc bypasss
Pull Request resolved: #1442 This change allows procs local to a host to bypass the host multiplexer. This should improve multicast delivery, as local comm actors do not need to traverse the same (frontend) bottleneck. ghstack-source-id: 319513868 @exported-using-ghexport Differential Revision: [D83996264](https://our.internmc.facebook.com/intern/diff/D83996264/)
1 parent 8f3981c commit 141bc9a

File tree

6 files changed

+297
-29
lines changed

6 files changed

+297
-29
lines changed

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,8 @@ pub type Data = Vec<u8>;
164164
Deserialize,
165165
Named,
166166
Clone,
167-
PartialEq
167+
PartialEq,
168+
Eq
168169
)]
169170
pub enum DeliveryError {
170171
/// The destination address is not reachable.

hyperactor/src/mailbox/undeliverable.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ use crate::supervision::ActorSupervisionEvent;
3232
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Named)]
3333
pub struct Undeliverable<M: Message>(pub M);
3434

35+
impl<M: Message> Undeliverable<M> {
36+
/// Return the inner M-typed message.
37+
pub fn into_inner(self) -> M {
38+
self.0
39+
}
40+
}
41+
3542
// Port handle and receiver for undeliverable messages.
3643
pub(crate) fn new_undeliverable_port() -> (
3744
PortHandle<Undeliverable<MessageEnvelope>>,

hyperactor_mesh/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ serde_bytes = "0.11"
7575
serde_json = { version = "1.0.140", features = ["alloc", "float_roundtrip", "raw_value", "unbounded_depth"] }
7676
serde_multipart = { version = "0.0.0", path = "../serde_multipart" }
7777
strum = { version = "0.27.1", features = ["derive"] }
78+
tempfile = "3.22"
7879
thiserror = "2.0.12"
7980
tokio = { version = "1.47.1", features = ["full", "test-util", "tracing"] }
8081
tokio-stream = { version = "0.1.17", features = ["fs", "io-util", "net", "signal", "sync", "time"] }
@@ -88,7 +89,6 @@ criterion = { version = "0.5.1", features = ["async_tokio", "csv_output"] }
8889
itertools = "0.14.0"
8990
maplit = "1.0"
9091
proptest = "1.5"
91-
tempfile = "3.22"
9292
timed_test = { version = "0.0.0", path = "../timed_test" }
9393
tracing-test = { version = "0.2.3", features = ["no-env-filter"] }
9494

hyperactor_mesh/src/bootstrap.rs

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use std::future;
1515
use std::io;
1616
use std::io::Write;
1717
use std::os::unix::process::ExitStatusExt;
18+
use std::path::Path;
1819
use std::path::PathBuf;
1920
use std::process::Stdio;
2021
use std::sync::Arc;
@@ -35,6 +36,7 @@ use hyperactor::ProcId;
3536
use hyperactor::attrs::Attrs;
3637
use hyperactor::channel;
3738
use hyperactor::channel::ChannelAddr;
39+
use hyperactor::channel::ChannelError;
3840
use hyperactor::channel::ChannelTransport;
3941
use hyperactor::channel::Rx;
4042
use hyperactor::channel::Tx;
@@ -50,10 +52,13 @@ use hyperactor::host::HostError;
5052
use hyperactor::host::ProcHandle;
5153
use hyperactor::host::ProcManager;
5254
use hyperactor::host::TerminateSummary;
55+
use hyperactor::mailbox::IntoBoxedMailboxSender;
56+
use hyperactor::mailbox::MailboxClient;
5357
use hyperactor::mailbox::MailboxServer;
5458
use hyperactor::proc::Proc;
5559
use serde::Deserialize;
5660
use serde::Serialize;
61+
use tempfile::TempDir;
5762
use tokio::process::Child;
5863
use tokio::process::ChildStderr;
5964
use tokio::process::ChildStdout;
@@ -68,6 +73,8 @@ use crate::v1;
6873
use crate::v1::host_mesh::mesh_agent::HostAgentMode;
6974
use crate::v1::host_mesh::mesh_agent::HostMeshAgent;
7075

76+
mod mailbox;
77+
7178
declare_attrs! {
7279
/// If enabled (default), bootstrap child processes install
7380
/// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
@@ -228,6 +235,10 @@ pub enum Bootstrap {
228235
backend_addr: ChannelAddr,
229236
/// The callback address used to indicate successful spawning.
230237
callback_addr: ChannelAddr,
238+
/// Directory for storing proc socket files. Procs place their sockets
239+
/// in this directory, so that they can be looked up by other procs
240+
/// for direct transfer.
241+
socket_dir_path: PathBuf,
231242
/// Optional config snapshot (`hyperactor::config::Attrs`)
232243
/// captured by the parent. If present, the child installs it
233244
/// as the `Runtime` layer so the parent's effective config
@@ -340,6 +351,7 @@ impl Bootstrap {
340351
proc_id,
341352
backend_addr,
342353
callback_addr,
354+
socket_dir_path,
343355
config,
344356
} => {
345357
if let Some(attrs) = config {
@@ -359,15 +371,39 @@ impl Bootstrap {
359371
eprintln!("(bootstrap) PDEATHSIG disabled via config");
360372
}
361373

362-
let result =
363-
host::spawn_proc(proc_id, backend_addr, callback_addr, |proc| async move {
364-
ProcMeshAgent::boot_v1(proc).await
365-
})
366-
.await;
367-
match result {
368-
Ok(_proc) => halt().await,
369-
Err(e) => e.into(),
370-
}
374+
let (local_addr, name) = ok!(proc_id
375+
.as_direct()
376+
.ok_or_else(|| anyhow::anyhow!("invalid proc id type: {}", proc_id)));
377+
// TODO provide a direct way to construct these
378+
let serve_addr = format!("unix:{}", socket_dir_path.join(name).display());
379+
let serve_addr = serve_addr.parse().unwrap();
380+
381+
// The following is a modified host::spawn_proc to support direct
382+
// dialing between local procs: 1) we bind each proc to a deterministic
383+
// address in socket_dir_path; 2) we use LocalProcDialer to dial these
384+
// addresses for local procs.
385+
let proc_sender = mailbox::LocalProcDialer::new(
386+
local_addr.clone(),
387+
socket_dir_path,
388+
ok!(MailboxClient::dial(backend_addr)),
389+
);
390+
391+
let proc = Proc::new(proc_id.clone(), proc_sender.into_boxed());
392+
393+
let agent_handle = ok!(ProcMeshAgent::boot_v1(proc.clone())
394+
.await
395+
.map_err(|e| HostError::AgentSpawnFailure(proc_id, e)));
396+
397+
// Finally serve the proc on the same transport as the backend address,
398+
// and call back.
399+
let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr, "proc_backend"));
400+
proc.clone().serve(proc_rx);
401+
ok!(ok!(channel::dial(callback_addr))
402+
.send((proc_addr, agent_handle.bind::<ProcMeshAgent>()))
403+
.await
404+
.map_err(ChannelError::from));
405+
406+
halt().await
371407
}
372408
Bootstrap::Host {
373409
addr,
@@ -385,7 +421,7 @@ impl Bootstrap {
385421
Some(command) => command,
386422
None => ok!(BootstrapCommand::current()),
387423
};
388-
let manager = BootstrapProcManager::new(command);
424+
let manager = BootstrapProcManager::new(command).unwrap();
389425
let (host, _handle) = ok!(Host::serve(manager, addr).await);
390426
let addr = host.addr().clone();
391427
let host_mesh_agent = ok!(host
@@ -1421,6 +1457,11 @@ pub struct BootstrapProcManager {
14211457
/// FileMonitor that aggregates logs from all children.
14221458
/// None if file monitor creation failed.
14231459
file_appender: Option<Arc<crate::logging::FileAppender>>,
1460+
1461+
/// Directory for storing proc socket files. Procs place their sockets
1462+
/// in this directory, so that they can be looked up by other procs
1463+
/// for direct transfer.
1464+
socket_dir: TempDir,
14241465
}
14251466

14261467
impl Drop for BootstrapProcManager {
@@ -1470,7 +1511,7 @@ impl BootstrapProcManager {
14701511
/// This is the general entry point when you want to manage procs
14711512
/// backed by a specific binary path (e.g. a bootstrap
14721513
/// trampoline).
1473-
pub(crate) fn new(command: BootstrapCommand) -> Self {
1514+
pub(crate) fn new(command: BootstrapCommand) -> Result<Self, io::Error> {
14741515
let log_monitor = match crate::logging::FileAppender::new() {
14751516
Some(fm) => {
14761517
tracing::info!("log monitor created successfully");
@@ -1482,12 +1523,13 @@ impl BootstrapProcManager {
14821523
}
14831524
};
14841525

1485-
Self {
1526+
Ok(Self {
14861527
command,
14871528
children: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
14881529
pid_table: Arc::new(std::sync::Mutex::new(HashMap::new())),
14891530
file_appender: log_monitor,
1490-
}
1531+
socket_dir: runtime_dir()?,
1532+
})
14911533
}
14921534

14931535
/// The bootstrap command used to launch processes.
@@ -1672,6 +1714,7 @@ impl ProcManager for BootstrapProcManager {
16721714
proc_id: proc_id.clone(),
16731715
backend_addr,
16741716
callback_addr,
1717+
socket_dir_path: self.socket_dir.path().to_owned(),
16751718
config: Some(config.client_config_override),
16761719
};
16771720
let mut cmd = Command::new(&self.command.program);
@@ -2107,6 +2150,18 @@ impl Write for Debug {
21072150
}
21082151
}
21092152

2153+
/// Create a new runtime [`TempDir`]. The directory is created in
2154+
/// `$XDG_RUNTIME_DIR`, otherwise falling back to the system tempdir.
2155+
fn runtime_dir() -> io::Result<TempDir> {
2156+
match std::env::var_os("XDG_RUNTIME_DIR") {
2157+
Some(runtime_dir) => {
2158+
let path = PathBuf::from(runtime_dir);
2159+
tempfile::tempdir_in(path)
2160+
}
2161+
None => tempfile::tempdir(),
2162+
}
2163+
}
2164+
21102165
#[cfg(test)]
21112166
mod tests {
21122167
use std::path::PathBuf;
@@ -2150,6 +2205,7 @@ mod tests {
21502205
proc_id: id!(foo[0]),
21512206
backend_addr: ChannelAddr::any(ChannelTransport::Tcp(TcpMode::Hostname)),
21522207
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
2208+
socket_dir_path: PathBuf::from("notexist"),
21532209
config: None,
21542210
},
21552211
];
@@ -2207,13 +2263,16 @@ mod tests {
22072263
attrs[MESH_TAIL_LOG_LINES] = 123;
22082264
attrs[MESH_BOOTSTRAP_ENABLE_PDEATHSIG] = false;
22092265

2266+
let socket_dir = runtime_dir().unwrap();
2267+
22102268
// Proc case
22112269
{
22122270
let original = Bootstrap::Proc {
22132271
proc_id: id!(foo[42]),
22142272
backend_addr: ChannelAddr::any(ChannelTransport::Unix),
22152273
callback_addr: ChannelAddr::any(ChannelTransport::Unix),
22162274
config: Some(attrs.clone()),
2275+
socket_dir_path: socket_dir.path().to_owned(),
22172276
};
22182277
let env_str = original.to_env_safe_string().expect("encode bootstrap");
22192278
let decoded = Bootstrap::from_env_safe_string(&env_str).expect("decode bootstrap");
@@ -2253,14 +2312,13 @@ mod tests {
22532312
use std::process::Stdio;
22542313

22552314
use tokio::process::Command;
2256-
use tokio::time::Duration;
22572315

22582316
// Manager; program path is irrelevant for this test.
22592317
let command = BootstrapCommand {
22602318
program: PathBuf::from("/bin/true"),
22612319
..Default::default()
22622320
};
2263-
let manager = BootstrapProcManager::new(command);
2321+
let manager = BootstrapProcManager::new(command).unwrap();
22642322

22652323
// Spawn a long-running child process (sleep 30) with
22662324
// kill_on_drop(true).
@@ -2640,7 +2698,7 @@ mod tests {
26402698
program: PathBuf::from("/bin/true"),
26412699
..Default::default()
26422700
};
2643-
let manager = BootstrapProcManager::new(command);
2701+
let manager = BootstrapProcManager::new(command).unwrap();
26442702

26452703
// Spawn a fast-exiting child.
26462704
let mut cmd = Command::new("true");
@@ -2674,7 +2732,7 @@ mod tests {
26742732
program: PathBuf::from("/bin/sleep"),
26752733
..Default::default()
26762734
};
2677-
let manager = BootstrapProcManager::new(command);
2735+
let manager = BootstrapProcManager::new(command).unwrap();
26782736

26792737
// Spawn a process that will live long enough to kill.
26802738
let mut cmd = Command::new("/bin/sleep");
@@ -2791,7 +2849,8 @@ mod tests {
27912849
let manager = BootstrapProcManager::new(BootstrapCommand {
27922850
program: PathBuf::from("/bin/true"),
27932851
..Default::default()
2794-
});
2852+
})
2853+
.unwrap();
27952854
let unknown = ProcId::Direct(ChannelAddr::any(ChannelTransport::Unix), "nope".into());
27962855
assert!(manager.status(&unknown).await.is_none());
27972856
}
@@ -2801,7 +2860,8 @@ mod tests {
28012860
let manager = BootstrapProcManager::new(BootstrapCommand {
28022861
program: PathBuf::from("/bin/sleep"),
28032862
..Default::default()
2804-
});
2863+
})
2864+
.unwrap();
28052865

28062866
// Long-ish child so it's alive while we "steal" it.
28072867
let mut cmd = Command::new("/bin/sleep");
@@ -2840,7 +2900,8 @@ mod tests {
28402900
let manager = BootstrapProcManager::new(BootstrapCommand {
28412901
program: PathBuf::from("/bin/sleep"),
28422902
..Default::default()
2843-
});
2903+
})
2904+
.unwrap();
28442905

28452906
let mut cmd = Command::new("/bin/sleep");
28462907
cmd.arg("5").stdout(Stdio::null()).stderr(Stdio::null());
@@ -3193,8 +3254,6 @@ mod tests {
31933254
instance: &hyperactor::Instance<()>,
31943255
_tag: &str,
31953256
) -> (ProcId, ChannelAddr) {
3196-
let proc_id = id!(bootstrap_child[0]);
3197-
31983257
// Serve a Unix channel as the "backend_addr" and hook it into
31993258
// this test proc.
32003259
let (backend_addr, rx) =
@@ -3205,6 +3264,9 @@ mod tests {
32053264
// router.
32063265
instance.proc().clone().serve(rx);
32073266

3267+
// We return an arbitrary (but unbound!) unix direct proc id here;
3268+
// it is okay, as we're not testing connectivity.
3269+
let proc_id = ProcId::Direct(ChannelTransport::Unix.any(), "test".to_string());
32083270
(proc_id, backend_addr)
32093271
}
32103272

@@ -3216,7 +3278,7 @@ mod tests {
32163278
.unwrap();
32173279
let (instance, _handle) = root.instance("client").unwrap();
32183280

3219-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3281+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
32203282
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_term").await;
32213283
let handle = mgr
32223284
.spawn(
@@ -3279,7 +3341,7 @@ mod tests {
32793341
.unwrap();
32803342
let (instance, _handle) = root.instance("client").unwrap();
32813343

3282-
let mgr = BootstrapProcManager::new(BootstrapCommand::test());
3344+
let mgr = BootstrapProcManager::new(BootstrapCommand::test()).unwrap();
32833345

32843346
// Proc identity + host backend channel the child will dial.
32853347
let (proc_id, backend_addr) = make_proc_id_and_backend_addr(&instance, "t_kill").await;
@@ -3502,7 +3564,8 @@ mod tests {
35023564
let manager = BootstrapProcManager::new(BootstrapCommand {
35033565
program: std::path::PathBuf::from("/bin/true"), // unused in this test
35043566
..Default::default()
3505-
});
3567+
})
3568+
.unwrap();
35063569

35073570
// Give the monitors time to start
35083571
RealClock.sleep(Duration::from_millis(1000)).await;

0 commit comments

Comments
 (0)