@@ -15,6 +15,7 @@ use std::future;
1515use std:: io;
1616use std:: io:: Write ;
1717use std:: os:: unix:: process:: ExitStatusExt ;
18+ use std:: path:: Path ;
1819use std:: path:: PathBuf ;
1920use std:: process:: Stdio ;
2021use std:: sync:: Arc ;
@@ -34,6 +35,7 @@ use hyperactor::ProcId;
3435use hyperactor:: attrs:: Attrs ;
3536use hyperactor:: channel;
3637use hyperactor:: channel:: ChannelAddr ;
38+ use hyperactor:: channel:: ChannelError ;
3739use hyperactor:: channel:: ChannelTransport ;
3840use hyperactor:: channel:: Rx ;
3941use hyperactor:: channel:: Tx ;
@@ -48,10 +50,13 @@ use hyperactor::host::HostError;
4850use hyperactor:: host:: ProcHandle ;
4951use hyperactor:: host:: ProcManager ;
5052use hyperactor:: host:: TerminateSummary ;
53+ use hyperactor:: mailbox:: IntoBoxedMailboxSender ;
54+ use hyperactor:: mailbox:: MailboxClient ;
5155use hyperactor:: mailbox:: MailboxServer ;
5256use hyperactor:: proc:: Proc ;
5357use serde:: Deserialize ;
5458use serde:: Serialize ;
59+ use tempfile:: TempDir ;
5560use tokio:: process:: Child ;
5661use tokio:: process:: Command ;
5762use tokio:: sync:: oneshot;
@@ -64,6 +69,8 @@ use crate::v1;
6469use crate :: v1:: host_mesh:: mesh_agent:: HostAgentMode ;
6570use crate :: v1:: host_mesh:: mesh_agent:: HostMeshAgent ;
6671
72+ mod mailbox;
73+
6774declare_attrs ! {
6875 /// If enabled (default), bootstrap child processes install
6976 /// `PR_SET_PDEATHSIG(SIGKILL)` so the kernel reaps them if the
@@ -212,6 +219,10 @@ pub enum Bootstrap {
212219 backend_addr : ChannelAddr ,
213220 /// The callback address used to indicate successful spawning.
214221 callback_addr : ChannelAddr ,
222+ /// Directory for storing proc socket files. Procs place their sockets
223+ /// in this directory, so that they can be looked up by other procs
224+ /// for direct transfer.
225+ socket_dir_path : PathBuf ,
215226 /// Optional config snapshot (`hyperactor::config::Attrs`)
216227 /// captured by the parent. If present, the child installs it
217228 /// as the `Runtime` layer so the parent's effective config
@@ -324,6 +335,7 @@ impl Bootstrap {
324335 proc_id,
325336 backend_addr,
326337 callback_addr,
338+ socket_dir_path,
327339 config,
328340 } => {
329341 if let Some ( attrs) = config {
@@ -343,15 +355,39 @@ impl Bootstrap {
343355 eprintln ! ( "(bootstrap) PDEATHSIG disabled via config" ) ;
344356 }
345357
346- let result =
347- host:: spawn_proc ( proc_id, backend_addr, callback_addr, |proc| async move {
348- ProcMeshAgent :: boot_v1 ( proc) . await
349- } )
350- . await ;
351- match result {
352- Ok ( _proc) => halt ( ) . await ,
353- Err ( e) => e. into ( ) ,
354- }
358+ let ( local_addr, name) = ok ! ( proc_id
359+ . as_direct( )
360+ . ok_or_else( || anyhow:: anyhow!( "invalid proc id type: {}" , proc_id) ) ) ;
361+ // TODO provide a direct way to construct these
362+ let serve_addr = format ! ( "unix:{}" , socket_dir_path. join( name) . display( ) ) ;
363+ let serve_addr = serve_addr. parse ( ) . unwrap ( ) ;
364+
365+ // The following is a modified host::spawn_proc to support direct
366+ // dialing between local procs: 1) we bind each proc to a deterministic
367+ // address in socket_dir_path; 2) we use LocalProcDialer to dial these
368+ // addresses for local procs.
369+ let proc_sender = mailbox:: LocalProcDialer :: new (
370+ local_addr. clone ( ) ,
371+ socket_dir_path,
372+ ok ! ( MailboxClient :: dial( backend_addr) ) ,
373+ ) ;
374+
375+ let proc = Proc :: new ( proc_id. clone ( ) , proc_sender. into_boxed ( ) ) ;
376+
377+ let agent_handle = ok ! ( ProcMeshAgent :: boot_v1( proc. clone( ) )
378+ . await
379+ . map_err( |e| HostError :: AgentSpawnFailure ( proc_id, e) ) ) ;
380+
381+ // Finally serve the proc on the same transport as the backend address,
382+ // and call back.
383+ let ( proc_addr, proc_rx) = ok ! ( channel:: serve( serve_addr) ) ;
384+ proc. clone ( ) . serve ( proc_rx) ;
385+ ok ! ( ok!( channel:: dial( callback_addr) )
386+ . send( ( proc_addr, agent_handle. bind:: <ProcMeshAgent >( ) ) )
387+ . await
388+ . map_err( ChannelError :: from) ) ;
389+
390+ halt ( ) . await
355391 }
356392 Bootstrap :: Host {
357393 addr,
@@ -369,7 +405,7 @@ impl Bootstrap {
369405 Some ( command) => command,
370406 None => ok ! ( BootstrapCommand :: current( ) ) ,
371407 } ;
372- let manager = BootstrapProcManager :: new ( command) ;
408+ let manager = BootstrapProcManager :: new ( command) . unwrap ( ) ;
373409 let ( host, _handle) = ok ! ( Host :: serve( manager, addr) . await ) ;
374410 let addr = host. addr ( ) . clone ( ) ;
375411 let host_mesh_agent = ok ! ( host
@@ -1402,6 +1438,11 @@ pub struct BootstrapProcManager {
14021438 /// exclusively in the [`Drop`] impl to send `SIGKILL` without
14031439 /// needing async context.
14041440 pid_table : Arc < std:: sync:: Mutex < HashMap < ProcId , u32 > > > ,
1441+
1442+ /// Directory for storing proc socket files. Procs place their sockets
1443+ /// in this directory, so that they can be looked up by other procs
1444+ /// for direct transfer.
1445+ socket_dir : TempDir ,
14051446}
14061447
14071448impl Drop for BootstrapProcManager {
@@ -1451,12 +1492,13 @@ impl BootstrapProcManager {
14511492 /// This is the general entry point when you want to manage procs
14521493 /// backed by a specific binary path (e.g. a bootstrap
14531494 /// trampoline).
1454- pub ( crate ) fn new ( command : BootstrapCommand ) -> Self {
1455- Self {
1495+ pub ( crate ) fn new ( command : BootstrapCommand ) -> Result < Self , io :: Error > {
1496+ Ok ( Self {
14561497 command,
14571498 children : Arc :: new ( tokio:: sync:: Mutex :: new ( HashMap :: new ( ) ) ) ,
14581499 pid_table : Arc :: new ( std:: sync:: Mutex :: new ( HashMap :: new ( ) ) ) ,
1459- }
1500+ socket_dir : tempfile:: tempdir ( ) ?,
1501+ } )
14601502 }
14611503
14621504 /// The bootstrap command used to launch processes.
@@ -1628,6 +1670,7 @@ impl ProcManager for BootstrapProcManager {
16281670 proc_id : proc_id. clone ( ) ,
16291671 backend_addr,
16301672 callback_addr,
1673+ socket_dir_path : self . socket_dir . path ( ) . to_owned ( ) ,
16311674 config : Some ( cfg) ,
16321675 } ;
16331676 let mut cmd = Command :: new ( & self . command . program ) ;
@@ -2062,6 +2105,7 @@ mod tests {
20622105 proc_id : id ! ( foo[ 0 ] ) ,
20632106 backend_addr : ChannelAddr :: any ( ChannelTransport :: Tcp ) ,
20642107 callback_addr : ChannelAddr :: any ( ChannelTransport :: Unix ) ,
2108+ socket_dir_path : PathBuf :: from ( "notexist" ) ,
20652109 config : None ,
20662110 } ,
20672111 ] ;
@@ -2119,13 +2163,16 @@ mod tests {
21192163 attrs[ MESH_TAIL_LOG_LINES ] = 123 ;
21202164 attrs[ MESH_BOOTSTRAP_ENABLE_PDEATHSIG ] = false ;
21212165
2166+ let socket_dir = tempfile:: tempdir ( ) . unwrap ( ) ;
2167+
21222168 // Proc case
21232169 {
21242170 let original = Bootstrap :: Proc {
21252171 proc_id : id ! ( foo[ 42 ] ) ,
21262172 backend_addr : ChannelAddr :: any ( ChannelTransport :: Unix ) ,
21272173 callback_addr : ChannelAddr :: any ( ChannelTransport :: Unix ) ,
21282174 config : Some ( attrs. clone ( ) ) ,
2175+ socket_dir_path : socket_dir. path ( ) . to_owned ( ) ,
21292176 } ;
21302177 let env_str = original. to_env_safe_string ( ) . expect ( "encode bootstrap" ) ;
21312178 let decoded = Bootstrap :: from_env_safe_string ( & env_str) . expect ( "decode bootstrap" ) ;
@@ -2165,14 +2212,13 @@ mod tests {
21652212 use std:: process:: Stdio ;
21662213
21672214 use tokio:: process:: Command ;
2168- use tokio:: time:: Duration ;
21692215
21702216 // Manager; program path is irrelevant for this test.
21712217 let command = BootstrapCommand {
21722218 program : PathBuf :: from ( "/bin/true" ) ,
21732219 ..Default :: default ( )
21742220 } ;
2175- let manager = BootstrapProcManager :: new ( command) ;
2221+ let manager = BootstrapProcManager :: new ( command) . unwrap ( ) ;
21762222
21772223 // Spawn a long-running child process (sleep 30) with
21782224 // kill_on_drop(true).
@@ -2552,7 +2598,7 @@ mod tests {
25522598 program : PathBuf :: from ( "/bin/true" ) ,
25532599 ..Default :: default ( )
25542600 } ;
2555- let manager = BootstrapProcManager :: new ( command) ;
2601+ let manager = BootstrapProcManager :: new ( command) . unwrap ( ) ;
25562602
25572603 // Spawn a fast-exiting child.
25582604 let mut cmd = Command :: new ( "true" ) ;
@@ -2586,7 +2632,7 @@ mod tests {
25862632 program : PathBuf :: from ( "/bin/sleep" ) ,
25872633 ..Default :: default ( )
25882634 } ;
2589- let manager = BootstrapProcManager :: new ( command) ;
2635+ let manager = BootstrapProcManager :: new ( command) . unwrap ( ) ;
25902636
25912637 // Spawn a process that will live long enough to kill.
25922638 let mut cmd = Command :: new ( "/bin/sleep" ) ;
@@ -2703,7 +2749,8 @@ mod tests {
27032749 let manager = BootstrapProcManager :: new ( BootstrapCommand {
27042750 program : PathBuf :: from ( "/bin/true" ) ,
27052751 ..Default :: default ( )
2706- } ) ;
2752+ } )
2753+ . unwrap ( ) ;
27072754 let unknown = ProcId :: Direct ( ChannelAddr :: any ( ChannelTransport :: Unix ) , "nope" . into ( ) ) ;
27082755 assert ! ( manager. status( & unknown) . await . is_none( ) ) ;
27092756 }
@@ -2713,7 +2760,8 @@ mod tests {
27132760 let manager = BootstrapProcManager :: new ( BootstrapCommand {
27142761 program : PathBuf :: from ( "/bin/sleep" ) ,
27152762 ..Default :: default ( )
2716- } ) ;
2763+ } )
2764+ . unwrap ( ) ;
27172765
27182766 // Long-ish child so it's alive while we "steal" it.
27192767 let mut cmd = Command :: new ( "/bin/sleep" ) ;
@@ -2752,7 +2800,8 @@ mod tests {
27522800 let manager = BootstrapProcManager :: new ( BootstrapCommand {
27532801 program : PathBuf :: from ( "/bin/sleep" ) ,
27542802 ..Default :: default ( )
2755- } ) ;
2803+ } )
2804+ . unwrap ( ) ;
27562805
27572806 let mut cmd = Command :: new ( "/bin/sleep" ) ;
27582807 cmd. arg ( "5" ) . stdout ( Stdio :: null ( ) ) . stderr ( Stdio :: null ( ) ) ;
@@ -3105,8 +3154,6 @@ mod tests {
31053154 instance : & hyperactor:: Instance < ( ) > ,
31063155 _tag : & str ,
31073156 ) -> ( ProcId , ChannelAddr ) {
3108- let proc_id = id ! ( bootstrap_child[ 0 ] ) ;
3109-
31103157 // Serve a Unix channel as the "backend_addr" and hook it into
31113158 // this test proc.
31123159 let ( backend_addr, rx) = channel:: serve ( ChannelAddr :: any ( ChannelTransport :: Unix ) ) . unwrap ( ) ;
@@ -3116,6 +3163,9 @@ mod tests {
31163163 // router.
31173164 instance. proc ( ) . clone ( ) . serve ( rx) ;
31183165
3166+ // We return an arbitrary (but unbound!) unix direct proc id here;
3167+ // it is okay, as we're not testing connectivity.
3168+ let proc_id = ProcId :: Direct ( ChannelTransport :: Unix . any ( ) , "test" . to_string ( ) ) ;
31193169 ( proc_id, backend_addr)
31203170 }
31213171
@@ -3127,7 +3177,7 @@ mod tests {
31273177 . unwrap ( ) ;
31283178 let ( instance, _handle) = root. instance ( "client" ) . unwrap ( ) ;
31293179
3130- let mgr = BootstrapProcManager :: new ( BootstrapCommand :: test ( ) ) ;
3180+ let mgr = BootstrapProcManager :: new ( BootstrapCommand :: test ( ) ) . unwrap ( ) ;
31313181 let ( proc_id, backend_addr) = make_proc_id_and_backend_addr ( & instance, "t_term" ) . await ;
31323182 let handle = mgr
31333183 . spawn ( proc_id. clone ( ) , backend_addr. clone ( ) )
@@ -3183,7 +3233,7 @@ mod tests {
31833233 . unwrap ( ) ;
31843234 let ( instance, _handle) = root. instance ( "client" ) . unwrap ( ) ;
31853235
3186- let mgr = BootstrapProcManager :: new ( BootstrapCommand :: test ( ) ) ;
3236+ let mgr = BootstrapProcManager :: new ( BootstrapCommand :: test ( ) ) . unwrap ( ) ;
31873237
31883238 // Proc identity + host backend channel the child will dial.
31893239 let ( proc_id, backend_addr) = make_proc_id_and_backend_addr ( & instance, "t_kill" ) . await ;
@@ -3382,7 +3432,8 @@ mod tests {
33823432 let manager = BootstrapProcManager :: new ( BootstrapCommand {
33833433 program : std:: path:: PathBuf :: from ( "/bin/true" ) , // unused in this test
33843434 ..Default :: default ( )
3385- } ) ;
3435+ } )
3436+ . unwrap ( ) ;
33863437 manager. spawn_exit_monitor ( proc_id. clone ( ) , handle. clone ( ) ) ;
33873438
33883439 // Await terminal status and assert on exit code and stderr
0 commit comments