From 5330e2f22069fc33d225462c8692a6a5ced34dcc Mon Sep 17 00:00:00 2001 From: Marius Eriksen Date: Tue, 28 Oct 2025 18:49:25 -0700 Subject: [PATCH] [hyperactor] dont' pass in "reason" to channel::serve; instead rely on span contexts We should be using spans much more, as they give us a cheap way to inject context. We ensure that all channel::serve sites have good spans. Differential Revision: [D85727450](https://our.internmc.facebook.com/intern/diff/D85727450/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85727450/)! [ghstack-poisoned] --- controller/src/lib.rs | 2 +- hyperactor/benches/main.rs | 8 ++-- hyperactor/example/channel.rs | 13 ++---- hyperactor/src/channel.rs | 13 +++--- hyperactor/src/host.rs | 25 +++++------ hyperactor/src/mailbox.rs | 5 +-- hyperactor/src/proc.rs | 22 ++++++---- hyperactor_mesh/src/actor_mesh.rs | 2 +- hyperactor_mesh/src/alloc.rs | 8 ++-- hyperactor_mesh/src/alloc/local.rs | 5 +-- hyperactor_mesh/src/alloc/process.rs | 7 +-- hyperactor_mesh/src/alloc/remoteprocess.rs | 41 +++++++++--------- hyperactor_mesh/src/bootstrap.rs | 47 ++++++++++++++------- hyperactor_mesh/src/bootstrap/mailbox.rs | 4 +- hyperactor_mesh/src/logging.rs | 47 ++++++++++----------- hyperactor_mesh/src/proc_mesh.rs | 10 ++--- hyperactor_mesh/src/router.rs | 3 +- hyperactor_mesh/src/v1/proc_mesh.rs | 15 ++++--- hyperactor_multiprocess/src/proc_actor.rs | 2 +- hyperactor_multiprocess/src/system.rs | 5 ++- hyperactor_multiprocess/src/system_actor.rs | 5 +-- 21 files changed, 144 insertions(+), 145 deletions(-) diff --git a/controller/src/lib.rs b/controller/src/lib.rs index f6417643e..0dac1e42c 100644 --- a/controller/src/lib.rs +++ b/controller/src/lib.rs @@ -1573,7 +1573,7 @@ mod tests { // Set up a local actor. let local_proc_id = world_id.proc_id(rank); let (local_proc_addr, local_proc_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Local), "mock_proc_actor").unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap(); let local_proc_mbox = Mailbox::new_detached( local_proc_id.actor_id(format!("test_dummy_proc{}", idx).to_string(), 0), ); diff --git a/hyperactor/benches/main.rs b/hyperactor/benches/main.rs index dc15eec8c..bae362d65 100644 --- a/hyperactor/benches/main.rs +++ b/hyperactor/benches/main.rs @@ -87,7 +87,7 @@ fn bench_message_sizes(c: &mut Criterion) { assert!(!socket_addr.ip().is_loopback()); } - let (listen_addr, mut rx) = serve::(addr, "bench").unwrap(); + let (listen_addr, mut rx) = serve::(addr).unwrap(); let tx = dial::(listen_addr).unwrap(); let msg = Message::new(0, size); let start = Instant::now(); @@ -127,7 +127,7 @@ fn bench_message_rates(c: &mut Criterion) { b.iter_custom(|iters| async move { let total_msgs = iters * rate; let addr = ChannelAddr::any(transport.clone()); - let (listen_addr, mut rx) = serve::(addr, "bench").unwrap(); + let (listen_addr, mut rx) = serve::(addr).unwrap(); tokio::spawn(async move { let mut received_count = 0; @@ -212,9 +212,9 @@ async fn channel_ping_pong( struct Message(Part); let (client_addr, mut client_rx) = - channel::serve::(ChannelAddr::any(transport.clone()), "ping_pong_client").unwrap(); + channel::serve::(ChannelAddr::any(transport.clone())).unwrap(); let (server_addr, mut server_rx) = - channel::serve::(ChannelAddr::any(transport.clone()), "ping_pong_server").unwrap(); + channel::serve::(ChannelAddr::any(transport.clone())).unwrap(); let _server_handle: tokio::task::JoinHandle> = tokio::spawn(async move { diff --git a/hyperactor/example/channel.rs b/hyperactor/example/channel.rs index 32872d176..97f3d2955 100644 --- a/hyperactor/example/channel.rs +++ b/hyperactor/example/channel.rs @@ -64,11 +64,8 @@ async fn client( ) -> anyhow::Result<()> { let server_tx = channel::dial(server_addr)?; - let (client_addr, mut client_rx) = channel::serve::( - ChannelAddr::any(server_tx.addr().transport().clone()), - "example", - ) - .unwrap(); + let (client_addr, mut client_rx) = + channel::serve::(ChannelAddr::any(server_tx.addr().transport().clone())).unwrap(); server_tx.post(Message::Hello(client_addr)); @@ -167,8 +164,7 @@ async fn main() -> Result<(), anyhow::Error> { match args.command { Some(Commands::Server) => { let (server_addr, server_rx) = - channel::serve::(ChannelAddr::any(args.transport.clone()), "example") - .unwrap(); + channel::serve::(ChannelAddr::any(args.transport.clone())).unwrap(); eprintln!("server listening on {}", server_addr); server(server_rx).await?; } @@ -180,8 +176,7 @@ async fn main() -> Result<(), anyhow::Error> { // No command: run a self-contained benchmark. None => { let (server_addr, server_rx) = - channel::serve::(ChannelAddr::any(args.transport.clone()), "example") - .unwrap(); + channel::serve::(ChannelAddr::any(args.transport.clone())).unwrap(); let _server_handle = tokio::spawn(server(server_rx)); let client_handle = tokio::spawn(client(server_addr, args.message_size, args.num_iter)); diff --git a/hyperactor/src/channel.rs b/hyperactor/src/channel.rs index 65533713b..0c611a204 100644 --- a/hyperactor/src/channel.rs +++ b/hyperactor/src/channel.rs @@ -840,7 +840,6 @@ pub fn dial(addr: ChannelAddr) -> Result, Channel #[crate::instrument] pub fn serve( addr: ChannelAddr, - reason: &str, ) -> Result<(ChannelAddr, ChannelRx), ChannelError> { match addr { ChannelAddr::Tcp(addr) => { @@ -871,9 +870,7 @@ pub fn serve( .map(|(addr, inner)| { tracing::debug!( name = "serve", - "serving channel address {} for {}", - addr, - reason + %addr, ); (addr, ChannelRx { inner }) }) @@ -1051,7 +1048,7 @@ mod tests { #[tokio::test] async fn test_multiple_connections() { for addr in ChannelTransport::all().map(ChannelAddr::any) { - let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); let mut sends: JoinSet<()> = JoinSet::new(); for message in 0u64..100u64 { @@ -1090,7 +1087,7 @@ mod tests { continue; } - let (listen_addr, rx) = crate::channel::serve::(addr, "test").unwrap(); + let (listen_addr, rx) = crate::channel::serve::(addr).unwrap(); let tx = dial::(listen_addr).unwrap(); tx.try_post(123, oneshot::channel().0).unwrap(); @@ -1139,7 +1136,7 @@ mod tests { #[cfg_attr(not(feature = "fb"), ignore)] async fn test_dial_serve() { for addr in addrs() { - let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); let tx = crate::channel::dial(listen_addr).unwrap(); tx.try_post(123, oneshot::channel().0).unwrap(); assert_eq!(rx.recv().await.unwrap(), 123); @@ -1159,7 +1156,7 @@ mod tests { ); let _guard2 = config.override_key(crate::config::MESSAGE_ACK_EVERY_N_MESSAGES, 1); for addr in addrs() { - let (listen_addr, mut rx) = crate::channel::serve::(addr, "test").unwrap(); + let (listen_addr, mut rx) = crate::channel::serve::(addr).unwrap(); let tx = crate::channel::dial(listen_addr).unwrap(); tx.send(123).await.unwrap(); assert_eq!(rx.recv().await.unwrap(), 123); diff --git a/hyperactor/src/host.rs b/hyperactor/src/host.rs index e8bf9bfa9..745346ee6 100644 --- a/hyperactor/src/host.rs +++ b/hyperactor/src/host.rs @@ -130,11 +130,12 @@ impl Host { /// Serve a host using the provided ProcManager, on the provided `addr`. /// On success, the host will multiplex messages for procs on the host /// on the address of the host. + #[tracing::instrument(skip(manager))] pub async fn serve( manager: M, addr: ChannelAddr, ) -> Result<(Self, MailboxServerHandle), HostError> { - let (frontend_addr, frontend_rx) = channel::serve(addr, "host frontend")?; + let (frontend_addr, frontend_rx) = channel::serve(addr)?; // We set up a cascade of routers: first, the outer router supports // sending to the the system proc, while the dial router manages dialed @@ -143,8 +144,7 @@ impl Host { // Establish a backend channel on the preferred transport. We currently simply // serve the same router on both. - let (backend_addr, backend_rx) = - channel::serve(ChannelAddr::any(manager.transport()), "host backend")?; + let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?; // Set up a system proc. This is often used to manage the host itself. let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string()); @@ -854,6 +854,7 @@ where ChannelTransport::Local } + #[tracing::instrument(skip(self, _config))] async fn spawn( &self, proc_id: ProcId, @@ -865,10 +866,7 @@ where proc_id.clone(), MailboxClient::dial(forwarder_addr)?.into_boxed(), ); - let (proc_addr, rx) = channel::serve( - ChannelAddr::any(transport), - &format!("LocalProcManager spawning: {}", &proc_id), - )?; + let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?; self.procs .lock() .await @@ -1033,16 +1031,15 @@ where ChannelTransport::Unix } + #[tracing::instrument(skip(self, _config))] async fn spawn( &self, proc_id: ProcId, forwarder_addr: ChannelAddr, _config: (), ) -> Result { - let (callback_addr, mut callback_rx) = channel::serve( - ChannelAddr::any(ChannelTransport::Unix), - &format!("ProcessProcManager spawning: {}", &proc_id), - )?; + let (callback_addr, mut callback_rx) = + channel::serve(ChannelAddr::any(ChannelTransport::Unix))?; let mut cmd = Command::new(&self.program); cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string()); @@ -1128,6 +1125,7 @@ where /// forwarding messages to the provided `backend_addr`, /// and returning the proc's address and agent actor on /// the provided `callback_addr`. +#[tracing::instrument(skip(spawn))] pub async fn spawn_proc( proc_id: ProcId, backend_addr: ChannelAddr, @@ -1153,10 +1151,7 @@ where // Finally serve the proc on the same transport as the backend address, // and call back. - let (proc_addr, proc_rx) = channel::serve( - ChannelAddr::any(backend_transport), - &format!("proc addr of: {}", &proc_id), - )?; + let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?; proc.clone().serve(proc_rx); channel::dial(callback_addr)? .send((proc_addr, agent_handle.bind::())) diff --git a/hyperactor/src/mailbox.rs b/hyperactor/src/mailbox.rs index ca6db2a7b..d59e36ce3 100644 --- a/hyperactor/src/mailbox.rs +++ b/hyperactor/src/mailbox.rs @@ -2868,7 +2868,7 @@ mod tests { .unwrap(), ); - let (_, rx) = serve::(ChannelAddr::Sim(dst_addr.clone()), "test").unwrap(); + let (_, rx) = serve::(ChannelAddr::Sim(dst_addr.clone())).unwrap(); let tx = dial::(src_to_dst).unwrap(); let mbox = Mailbox::new_detached(id!(test[0].actor0)); let serve_handle = mbox.clone().serve(rx); @@ -2997,8 +2997,7 @@ mod tests { let mut handles = Vec::new(); // hold on to handles, or channels get closed for mbox in mailboxes.iter() { - let (addr, rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Local), "test").unwrap(); + let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap(); let handle = (*mbox).clone().serve(rx); handles.push(handle); diff --git a/hyperactor/src/proc.rs b/hyperactor/src/proc.rs index 5c2c8668b..014a3d368 100644 --- a/hyperactor/src/proc.rs +++ b/hyperactor/src/proc.rs @@ -46,6 +46,7 @@ use tokio::sync::mpsc; use tokio::sync::watch; use tokio::task::JoinHandle; use tracing::Instrument; +use tracing::Level; use uuid::Uuid; use crate as hyperactor; @@ -333,8 +334,9 @@ impl Proc { } /// Create a new direct-addressed proc. + #[tracing::instrument] pub async fn direct(addr: ChannelAddr, name: String) -> Result { - let (addr, rx) = channel::serve(addr, &format!("creating Proc::direct: {}", name))?; + let (addr, rx) = channel::serve(addr)?; let proc_id = ProcId::Direct(addr, name); let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed()); proc.clone().serve(rx); @@ -342,15 +344,13 @@ impl Proc { } /// Create a new direct-addressed proc with a default sender for the forwarder. + #[tracing::instrument(skip(default))] pub fn direct_with_default( addr: ChannelAddr, name: String, default: BoxedMailboxSender, ) -> Result { - let (addr, rx) = channel::serve( - addr, - &format!("creating Proc::direct_with_default: {}", name), - )?; + let (addr, rx) = channel::serve(addr)?; let proc_id = ProcId::Direct(addr, name); let proc = Self::new( proc_id, @@ -494,15 +494,18 @@ impl Proc { params: A::Params, ) -> Result, anyhow::Error> { let actor_id = self.allocate_root_id(name)?; - let _ = tracing::debug_span!( + let span = tracing::span!( + Level::INFO, "spawn_actor", actor_name = name, actor_type = std::any::type_name::(), actor_id = actor_id.to_string(), ); - let (instance, mut actor_loop_receivers, work_rx) = - Instance::new(self.clone(), actor_id.clone(), false, None); - let actor = A::new(params).await?; + let (instance, mut actor_loop_receivers, work_rx) = { + let _guard = span.clone().entered(); + Instance::new(self.clone(), actor_id.clone(), false, None) + }; + let actor = A::new(params).instrument(span.clone()).await?; // Add this actor to the proc's actor ledger. We do not actively remove // inactive actors from ledger, because the actor's state can be inferred // from its weak cell. @@ -512,6 +515,7 @@ impl Proc { instance .start(actor, actor_loop_receivers.take().unwrap(), work_rx) + .instrument(span) .await } diff --git a/hyperactor_mesh/src/actor_mesh.rs b/hyperactor_mesh/src/actor_mesh.rs index 66b13a336..a68a57c59 100644 --- a/hyperactor_mesh/src/actor_mesh.rs +++ b/hyperactor_mesh/src/actor_mesh.rs @@ -1731,7 +1731,7 @@ mod tests { let config = hyperactor::config::global::lock(); let _guard = config.override_key(MAX_CAST_DIMENSION_SIZE, 2); - let (_, mut rx) = serve::(addr, "test").unwrap(); + let (_, mut rx) = serve::(addr).unwrap(); let expected_ranks = selection .eval( diff --git a/hyperactor_mesh/src/alloc.rs b/hyperactor_mesh/src/alloc.rs index 2eb7d45d2..f66097b87 100644 --- a/hyperactor_mesh/src/alloc.rs +++ b/hyperactor_mesh/src/alloc.rs @@ -517,7 +517,6 @@ impl AllocAssignedAddr { pub(crate) fn serve_with_config( self, - reason: &str, ) -> anyhow::Result<(ChannelAddr, ChannelRx)> { fn set_as_inaddr_any(original: &mut SocketAddr) { let inaddr_any: IpAddr = match &original { @@ -552,7 +551,7 @@ impl AllocAssignedAddr { } }; - let (mut bound, rx) = channel::serve(bind_to, reason)?; + let (mut bound, rx) = channel::serve(bind_to)?; // Restore the original IP address if we used INADDR_ANY. match &mut bound { @@ -837,14 +836,13 @@ pub(crate) mod testing { transport: ChannelTransport, ) -> (DialMailboxRouter, Instance<()>, Proc, ChannelAddr) { let (router_channel_addr, router_rx) = - channel::serve(ChannelAddr::any(transport.clone()), "test").unwrap(); + channel::serve(ChannelAddr::any(transport.clone())).unwrap(); let router = DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed()); router.clone().serve(router_rx); let client_proc_id = ProcId::Ranked(WorldId("test_stuck".to_string()), 0); - let (client_proc_addr, client_rx) = - channel::serve(ChannelAddr::any(transport), "test").unwrap(); + let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(transport)).unwrap(); let client_proc = Proc::new( client_proc_id.clone(), BoxedMailboxSender::new(router.clone()), diff --git a/hyperactor_mesh/src/alloc/local.rs b/hyperactor_mesh/src/alloc/local.rs index 87bfd8429..039442cae 100644 --- a/hyperactor_mesh/src/alloc/local.rs +++ b/hyperactor_mesh/src/alloc/local.rs @@ -147,10 +147,7 @@ impl Alloc for LocalAlloc { match self.todo_rx.recv().await? { Action::Start(rank) => { let (addr, proc_rx) = loop { - match channel::serve( - ChannelAddr::any(self.transport()), - "LocalAlloc next proc addr", - ) { + match channel::serve(ChannelAddr::any(self.transport())) { Ok(addr_and_proc_rx) => break addr_and_proc_rx, Err(err) => { tracing::error!( diff --git a/hyperactor_mesh/src/alloc/process.rs b/hyperactor_mesh/src/alloc/process.rs index 65c918737..94d68b365 100644 --- a/hyperactor_mesh/src/alloc/process.rs +++ b/hyperactor_mesh/src/alloc/process.rs @@ -89,11 +89,8 @@ impl Allocator for ProcessAllocator { #[hyperactor::instrument(fields(name = "process_allocate", monarch_client_trace_id = spec.constraints.match_labels.get(CLIENT_TRACE_ID_LABEL).cloned().unwrap_or_else(|| "".to_string())))] async fn allocate(&mut self, spec: AllocSpec) -> Result { - let (bootstrap_addr, rx) = channel::serve( - ChannelAddr::any(ChannelTransport::Unix), - "ProcessAllocator allocate bootstrap_addr", - ) - .map_err(anyhow::Error::from)?; + let (bootstrap_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)) + .map_err(anyhow::Error::from)?; if spec.transport == ChannelTransport::Local { return Err(AllocatorError::Other(anyhow::anyhow!( diff --git a/hyperactor_mesh/src/alloc/remoteprocess.rs b/hyperactor_mesh/src/alloc/remoteprocess.rs index 96e8addb7..f54a17fe1 100644 --- a/hyperactor_mesh/src/alloc/remoteprocess.rs +++ b/hyperactor_mesh/src/alloc/remoteprocess.rs @@ -173,6 +173,7 @@ impl RemoteProcessAllocator { /// Start a remote process allocator with given allocator listening for /// RemoteProcessAllocatorMessage on serve_addr. /// Used for testing. + #[tracing::instrument(skip(self, process_allocator))] pub async fn start_with_allocator( &self, serve_addr: ChannelAddr, @@ -184,8 +185,7 @@ impl RemoteProcessAllocator { ::Alloc: Sync, { tracing::info!("starting remote allocator on: {}", serve_addr); - let (_, mut rx) = channel::serve(serve_addr.clone(), "RemoteProcessAllocator serve addr") - .map_err(anyhow::Error::from)?; + let (_, mut rx) = channel::serve(serve_addr.clone()).map_err(anyhow::Error::from)?; struct ActiveAllocation { handle: JoinHandle<()>, @@ -309,6 +309,7 @@ impl RemoteProcessAllocator { Ok(()) } + #[tracing::instrument(skip(alloc, cancel_token))] #[observe_async("RemoteProcessAllocator")] async fn handle_allocation_request( alloc: Box, @@ -320,14 +321,13 @@ impl RemoteProcessAllocator { ) { tracing::info!("handle allocation request, bootstrap_addr: {bootstrap_addr}"); // start proc message forwarder - let (forwarder_addr, forwarder_rx) = - match forwarder_addr.serve_with_config("handle_allocation_request: forwarder_addr") { - Ok(v) => v, - Err(e) => { - tracing::error!("failed to to bootstrap forwarder actor: {}", e); - return; - } - }; + let (forwarder_addr, forwarder_rx) = match forwarder_addr.serve_with_config() { + Ok(v) => v, + Err(e) => { + tracing::error!("failed to to bootstrap forwarder actor: {}", e); + return; + } + }; let router = DialMailboxRouter::new(); let mailbox_handle = router.clone().serve(forwarder_rx); tracing::info!("started forwarder on: {}", forwarder_addr); @@ -617,6 +617,7 @@ impl RemoteProcessAlloc { /// to obtain a list of allocate hosts. Then Allocate message will be sent to all /// RemoteProcessAllocator on all hosts. Heartbeats will be used to maintain health /// status of remote hosts. + #[tracing::instrument(skip(initializer))] #[observe_result("RemoteProcessAlloc")] pub async fn new( spec: AllocSpec, @@ -629,7 +630,7 @@ impl RemoteProcessAlloc { None => AllocAssignedAddr::new(ChannelAddr::any(spec.transport.clone())), }; - let (bootstrap_addr, rx) = alloc_serve_addr.serve_with_config("alloc bootstrap_addr")?; + let (bootstrap_addr, rx) = alloc_serve_addr.serve_with_config()?; tracing::info!( "starting alloc for {} on: {}", @@ -1326,7 +1327,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); let extent = extent!(host = 1, gpu = 2); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1482,7 +1483,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); let extent = extent!(host = 1, gpu = 2); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1563,7 +1564,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1702,7 +1703,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1792,7 +1793,7 @@ mod test { hyperactor_telemetry::initialize_logging_for_test(); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); let extent = extent!(host = 1, gpu = 2); @@ -1894,7 +1895,7 @@ mod test { hyperactor_telemetry::initialize_logging(ClockKind::default()); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); let extent = extent!(host = 1, gpu = 1); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -1975,7 +1976,7 @@ mod test { hyperactor_telemetry::initialize_logging(ClockKind::default()); let serve_addr = ChannelAddr::any(ChannelTransport::Unix); let bootstrap_addr = ChannelAddr::any(ChannelTransport::Unix); - let (_, mut rx) = channel::serve(bootstrap_addr.clone(), "test").unwrap(); + let (_, mut rx) = channel::serve(bootstrap_addr.clone()).unwrap(); let extent = extent!(host = 1, gpu = 1); let tx = channel::dial(serve_addr.clone()).unwrap(); @@ -2455,7 +2456,7 @@ mod test_alloc { let hosts_per_proc_mesh = 5; let pid_addr = ChannelAddr::any(ChannelTransport::Unix); - let (pid_addr, mut pid_rx) = channel::serve::(pid_addr, "test").unwrap(); + let (pid_addr, mut pid_rx) = channel::serve::(pid_addr).unwrap(); let addresses = (0..(num_proc_meshes * hosts_per_proc_mesh)) .map(|_| ChannelAddr::any(ChannelTransport::Unix).to_string()) @@ -2477,7 +2478,7 @@ mod test_alloc { let done_allocating_addr = ChannelAddr::any(ChannelTransport::Unix); let (done_allocating_addr, mut done_allocating_rx) = - channel::serve::<()>(done_allocating_addr, "test").unwrap(); + channel::serve::<()>(done_allocating_addr).unwrap(); let mut remote_process_alloc = Command::new(crate::testresource::get( "monarch/hyperactor_mesh/remote_process_alloc", )) diff --git a/hyperactor_mesh/src/bootstrap.rs b/hyperactor_mesh/src/bootstrap.rs index 00ae00ac3..39a0b0264 100644 --- a/hyperactor_mesh/src/bootstrap.rs +++ b/hyperactor_mesh/src/bootstrap.rs @@ -65,6 +65,7 @@ use tokio::process::ChildStdout; use tokio::process::Command; use tokio::sync::oneshot; use tokio::sync::watch; +use tracing::Level; use crate::logging::OutputTarget; use crate::logging::StreamFwder; @@ -354,6 +355,15 @@ impl Bootstrap { socket_dir_path, config, } => { + let _span = tracing::span!( + Level::INFO, + "proc_bootstrap", + %proc_id, + %backend_addr, + %callback_addr, + socket_dir_path = %socket_dir_path.display(), + ) + .entered(); if let Some(attrs) = config { config::set(config::Source::ClientOverride, attrs); tracing::debug!("bootstrap: installed ClientOverride config snapshot (Proc)"); @@ -396,7 +406,7 @@ impl Bootstrap { // Finally serve the proc on the same transport as the backend address, // and call back. - let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr, "proc_backend")); + let (proc_addr, proc_rx) = ok!(channel::serve(serve_addr)); proc.clone().serve(proc_rx); ok!(ok!(channel::dial(callback_addr)) .send((proc_addr, agent_handle.bind::())) @@ -1699,16 +1709,15 @@ impl ProcManager for BootstrapProcManager { /// Returns a [`BootstrapProcHandle`] that exposes the child /// process's lifecycle (status, wait/ready, termination). Errors /// are surfaced as [`HostError`]. + #[tracing::instrument(skip(self, config))] async fn spawn( &self, proc_id: ProcId, backend_addr: ChannelAddr, config: BootstrapProcConfig, ) -> Result { - let (callback_addr, mut callback_rx) = channel::serve( - ChannelAddr::any(ChannelTransport::Unix), - &format!("BootstrapProcManager::spawn callback_addr: {}", &proc_id), - )?; + let (callback_addr, mut callback_rx) = + channel::serve(ChannelAddr::any(ChannelTransport::Unix))?; let mode = Bootstrap::Proc { proc_id: proc_id.clone(), @@ -1970,8 +1979,17 @@ async fn bootstrap_v0_proc_mesh() -> anyhow::Error { .map_err(|err| anyhow::anyhow!("read `{}`: {}", BOOTSTRAP_INDEX_ENV, err))? .parse()?; let listen_addr = ChannelAddr::any(bootstrap_addr.transport()); - let (serve_addr, mut rx) = - channel::serve(listen_addr, "bootstrap_v0_proc_mesh listen_addr")?; + + let _span = tracing::span!( + Level::INFO, + "bootstrap_v0_proc_mesh", + %bootstrap_addr, + %bootstrap_index, + %listen_addr, + ) + .entered(); + + let (serve_addr, mut rx) = channel::serve(listen_addr)?; let tx = channel::dial(bootstrap_addr.clone())?; let (rtx, mut return_channel) = oneshot::channel(); @@ -1999,14 +2017,14 @@ async fn bootstrap_v0_proc_mesh() -> anyhow::Error { } } loop { - let _ = hyperactor::tracing::info_span!("wait_for_next_message_from_mesh_agent"); + let _span = + tracing::span!(Level::INFO, "wait_for_next_message_from_mesh_agent").entered(); match the_msg? { Allocator2Process::StartProc(proc_id, listen_transport) => { + let _span = + tracing::span!(Level::INFO, "Allocator2Process::StartProc", %proc_id, %listen_transport).entered(); let (proc, mesh_agent) = ProcMeshAgent::bootstrap(proc_id.clone()).await?; - let (proc_addr, proc_rx) = channel::serve( - ChannelAddr::any(listen_transport), - &format!("bootstrap_v0_proc_mesh proc_addr: {}", &proc_id,), - )?; + let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(listen_transport))?; let handle = proc.clone().serve(proc_rx); drop(handle); // linter appeasement; it is safe to drop this future tx.send(Process2Allocator( @@ -2403,7 +2421,7 @@ mod tests { let router = DialMailboxRouter::new(); let (proc_addr, proc_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone())); proc.clone().serve(proc_rx); router.bind(id!(client[0]).into(), proc_addr.clone()); @@ -3244,8 +3262,7 @@ mod tests { ) -> (ProcId, ChannelAddr) { // Serve a Unix channel as the "backend_addr" and hook it into // this test proc. - let (backend_addr, rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); + let (backend_addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); // Route messages arriving on backend_addr into this test // proc's mailbox so the bootstrap child can reach the host diff --git a/hyperactor_mesh/src/bootstrap/mailbox.rs b/hyperactor_mesh/src/bootstrap/mailbox.rs index d351c0958..6d1fecc0b 100644 --- a/hyperactor_mesh/src/bootstrap/mailbox.rs +++ b/hyperactor_mesh/src/bootstrap/mailbox.rs @@ -120,18 +120,16 @@ mod tests { format!("unix:{}/first", dir.path().display()) .parse() .unwrap(), - "test", ) .unwrap(); let (second_addr, mut second_rx) = channel::serve::( format!("unix:{}/second", dir.path().display()) .parse() .unwrap(), - "test", ) .unwrap(); let (backend_addr, mut backend_rx) = - channel::serve::(ChannelTransport::Unix.any(), "test").unwrap(); + channel::serve::(ChannelTransport::Unix.any()).unwrap(); let local_addr: ChannelAddr = "tcp:3.4.5.6:123".parse().unwrap(); let first_actor_id = ActorId( diff --git a/hyperactor_mesh/src/logging.rs b/hyperactor_mesh/src/logging.rs index 8aef2f821..338515cf4 100644 --- a/hyperactor_mesh/src/logging.rs +++ b/hyperactor_mesh/src/logging.rs @@ -63,6 +63,7 @@ use tokio::sync::Notify; use tokio::sync::RwLock; use tokio::sync::watch::Receiver; use tokio::task::JoinHandle; +use tracing::Level; use crate::bootstrap::BOOTSTRAP_LOG_CHANNEL; use crate::shortuuid::ShortUuid; @@ -472,14 +473,14 @@ impl FileAppender { return None; } }; - let (stdout_addr, stdout_rx) = match channel::serve( - ChannelAddr::any(ChannelTransport::Unix), - "FileAppender stdout_addr", - ) { - Ok((addr, rx)) => (addr, rx), - Err(e) => { - tracing::warn!("failed to serve stdout channel: {}", e); - return None; + let (stdout_addr, stdout_rx) = { + let _guard = tracing::span!(Level::INFO, "appender", file = "stdout").entered(); + match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) { + Ok((addr, rx)) => (addr, rx), + Err(e) => { + tracing::warn!("failed to serve stdout channel: {}", e); + return None; + } } }; let stdout_stop = stop.clone(); @@ -499,14 +500,14 @@ impl FileAppender { return None; } }; - let (stderr_addr, stderr_rx) = match channel::serve( - ChannelAddr::any(ChannelTransport::Unix), - "FileAppender stderr_addr", - ) { - Ok((addr, rx)) => (addr, rx), - Err(e) => { - tracing::warn!("failed to serve stderr channel: {}", e); - return None; + let (stderr_addr, stderr_rx) = { + let _guard = tracing::span!(Level::INFO, "appender", file = "stderr").entered(); + match channel::serve(ChannelAddr::any(ChannelTransport::Unix)) { + Ok((addr, rx)) => (addr, rx), + Err(e) => { + tracing::warn!("failed to serve stderr channel: {}", e); + return None; + } } }; let stderr_stop = stop.clone(); @@ -1115,7 +1116,7 @@ impl Actor for LogForwardActor { log_channel ); - let rx = match channel::serve(log_channel.clone(), "LogForwardActor") { + let rx = match channel::serve(log_channel.clone()) { Ok((_, rx)) => rx, Err(err) => { // This can happen if we are not spanwed on a separate process like local. @@ -1125,11 +1126,7 @@ impl Actor for LogForwardActor { log_channel, err ); - channel::serve( - ChannelAddr::any(ChannelTransport::Unix), - "LogForwardActor Unix fallback", - )? - .1 + channel::serve(ChannelAddr::any(ChannelTransport::Unix))?.1 } }; @@ -1576,7 +1573,7 @@ mod tests { // Setup the basics let router = DialMailboxRouter::new(); let (proc_addr, client_rx) = - channel::serve(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); + channel::serve(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); let proc = Proc::new(id!(client[0]), BoxedMailboxSender::new(router.clone())); proc.clone().serve(client_rx); router.bind(id!(client[0]).into(), proc_addr.clone()); @@ -1790,7 +1787,7 @@ mod tests { let (mut writer, reader) = tokio::io::duplex(1024); let (log_channel, mut rx) = - channel::serve::(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); // Create a temporary file for testing the writer let temp_file = tempfile::NamedTempFile::new().unwrap(); @@ -1923,7 +1920,7 @@ mod tests { #[tokio::test] async fn test_local_log_sender_inactive_status() { let (log_channel, _) = - channel::serve::(ChannelAddr::any(ChannelTransport::Unix), "test").unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Unix)).unwrap(); let mut sender = LocalLogSender::new(log_channel, 12345).unwrap(); // This test verifies that the sender handles inactive status gracefully diff --git a/hyperactor_mesh/src/proc_mesh.rs b/hyperactor_mesh/src/proc_mesh.rs index 9c855316e..596a2aeb2 100644 --- a/hyperactor_mesh/src/proc_mesh.rs +++ b/hyperactor_mesh/src/proc_mesh.rs @@ -273,6 +273,7 @@ impl ProcMesh { C.get_or_init(|| AtomicUsize::new(0)) } + #[tracing::instrument(skip_all)] #[hyperactor::observe_result("ProcMesh")] async fn allocate_boxed_inner( mut alloc: Box, @@ -314,11 +315,8 @@ impl ProcMesh { // everything else, so now the whole mesh should be able to communicate. let client_proc_id = ProcId::Ranked(WorldId(format!("{}_client", alloc.world_id().name())), 0); - let (client_proc_addr, client_rx) = channel::serve( - ChannelAddr::any(alloc.transport()), - &format!("client_proc_addr: {}", &client_proc_id), - ) - .map_err(|err| AllocatorError::Other(err.into()))?; + let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(alloc.transport())) + .map_err(|err| AllocatorError::Other(err.into()))?; tracing::info!( name = "ProcMesh::Allocate::ChannelServe", alloc_id = alloc_id, @@ -383,7 +381,7 @@ impl ProcMesh { // Ensure that the router is served so that agents may reach us. let (router_channel_addr, router_rx) = alloc .client_router_addr() - .serve_with_config("client_router_addr") + .serve_with_config() .map_err(AllocatorError::Other)?; router.serve(router_rx); tracing::info!("router channel started listening on addr: {router_channel_addr}"); diff --git a/hyperactor_mesh/src/router.rs b/hyperactor_mesh/src/router.rs index 3fab6b931..dba6ec480 100644 --- a/hyperactor_mesh/src/router.rs +++ b/hyperactor_mesh/src/router.rs @@ -57,13 +57,14 @@ impl Router { /// Servers are memoized, and we maintain only one per transport; thus /// subsequent calls using the same transport will return the same address. #[allow(dead_code)] + #[tracing::instrument(skip(self))] pub async fn serve(&self, transport: &ChannelTransport) -> Result { let mut servers = self.servers.lock().await; if let Some(addr) = servers.get(transport) { return Ok(addr.clone()); } - let (addr, rx) = channel::serve(ChannelAddr::any(transport.clone()), "Router::serve")?; + let (addr, rx) = channel::serve(ChannelAddr::any(transport.clone()))?; self.router.clone().serve(rx); servers.insert(transport.clone(), addr.clone()); Ok(addr) diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index e2c2b8764..e53f5d655 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -46,6 +46,7 @@ use ndslice::view::Region; use serde::Deserialize; use serde::Serialize; use tokio::sync::Notify; +use tracing::Level; use crate::CommActor; use crate::alloc::Alloc; @@ -281,6 +282,7 @@ impl ProcMesh { /// Allocate a new ProcMesh from the provided alloc. /// Allocate does not require an owning actor because references are not owned. + #[tracing::instrument(skip_all)] pub async fn allocate( cx: &impl context::Actor, mut alloc: Box, @@ -296,11 +298,14 @@ impl ProcMesh { let proc = cx.instance().proc(); // First make sure we can serve the proc: - let (proc_channel_addr, rx) = channel::serve( - ChannelAddr::any(alloc.transport()), - &format!("proc_channel_addr for {}", proc.proc_id()), - )?; - proc.clone().serve(rx); + let proc_channel_addr = { + let _guard = + tracing::span!(Level::INFO, "allocate_serve_proc", proc_id = %proc.proc_id()) + .entered(); + let (addr, rx) = channel::serve(ChannelAddr::any(alloc.transport()))?; + proc.clone().serve(rx); + addr + }; let bind_allocated_procs = |router: &DialMailboxRouter| { // Route all of the allocated procs: diff --git a/hyperactor_multiprocess/src/proc_actor.rs b/hyperactor_multiprocess/src/proc_actor.rs index 972b5e335..8d35889eb 100644 --- a/hyperactor_multiprocess/src/proc_actor.rs +++ b/hyperactor_multiprocess/src/proc_actor.rs @@ -417,7 +417,7 @@ impl ProcActor { labels: HashMap, lifecycle_mode: ProcLifecycleMode, ) -> Result { - let (local_addr, rx) = channel::serve(listen_addr, "bootstrap_for_proc")?; + let (local_addr, rx) = channel::serve(listen_addr)?; let mailbox_handle = proc.clone().serve(rx); let (state_tx, mut state_rx) = watch::channel(ProcState::AwaitingJoin); diff --git a/hyperactor_multiprocess/src/system.rs b/hyperactor_multiprocess/src/system.rs index ad9dc622b..6b48a89db 100644 --- a/hyperactor_multiprocess/src/system.rs +++ b/hyperactor_multiprocess/src/system.rs @@ -45,6 +45,7 @@ impl System { /// Spawns a system actor and serves it at the provided channel /// address. This becomes a well-known address with which procs /// can bootstrap. + #[tracing::instrument] pub async fn serve( addr: ChannelAddr, supervision_update_timeout: tokio::time::Duration, @@ -55,7 +56,7 @@ impl System { let (actor_handle, system_proc) = SystemActor::bootstrap_with_clock(params, clock).await?; actor_handle.bind::(); - let (local_addr, rx) = channel::serve(addr, "System::serve")?; + let (local_addr, rx) = channel::serve(addr)?; let mailbox_handle = system_proc.clone().serve(rx); Ok(ServerHandle { @@ -91,7 +92,7 @@ impl System { ); let (proc_addr, proc_rx) = - channel::serve(ChannelAddr::any(self.addr.transport()), "system").unwrap(); + channel::serve(ChannelAddr::any(self.addr.transport())).unwrap(); let _proc_serve_handle: MailboxServerHandle = proc.clone().serve(proc_rx); diff --git a/hyperactor_multiprocess/src/system_actor.rs b/hyperactor_multiprocess/src/system_actor.rs index b1e6c6b80..51bdfc05f 100644 --- a/hyperactor_multiprocess/src/system_actor.rs +++ b/hyperactor_multiprocess/src/system_actor.rs @@ -1878,8 +1878,7 @@ mod tests { host_id, ); let (local_proc_addr, local_proc_rx) = - channel::serve::(ChannelAddr::any(ChannelTransport::Local), "test") - .unwrap(); + channel::serve::(ChannelAddr::any(ChannelTransport::Local)).unwrap(); let local_proc_mbox = Mailbox::new_detached(local_proc_id.actor_id("test".to_string(), 0)); let (local_proc_message_port, local_proc_message_receiver) = local_proc_mbox.open_port(); let _local_proc_serve_handle = local_proc_mbox.clone().serve(local_proc_rx); @@ -2385,7 +2384,7 @@ mod tests { let src_id = id!(proc[0].actor); let src_addr = ChannelAddr::Sim(SimAddr::new("unix!@src".parse().unwrap()).unwrap()); let dst_addr = ChannelAddr::Sim(SimAddr::new("unix!@dst".parse().unwrap()).unwrap()); - let (_, mut rx) = channel::serve::(src_addr.clone(), "test").unwrap(); + let (_, mut rx) = channel::serve::(src_addr.clone()).unwrap(); let router = ReportingRouter::new();