Skip to content

Commit 2be2e1d

Browse files
committed
[hyperactor] don't pass in "reason" to channel::serve; instead rely on span contexts
Pull Request resolved: #1690 We should be using spans much more, as they give us a cheap way to inject context. We ensure that all channel::serve sites have good spans. Differential Revision: [D85727450](https://our.internmc.facebook.com/intern/diff/D85727450/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85727450/)! ghstack-source-id: 319513895
1 parent 141bc9a commit 2be2e1d

File tree

21 files changed

+144
-145
lines changed

21 files changed

+144
-145
lines changed

controller/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1573,7 +1573,7 @@ mod tests {
15731573
// Set up a local actor.
15741574
let local_proc_id = world_id.proc_id(rank);
15751575
let (local_proc_addr, local_proc_rx) =
1576-
channel::serve(ChannelAddr::any(ChannelTransport::Local), "mock_proc_actor").unwrap();
1576+
channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
15771577
let local_proc_mbox = Mailbox::new_detached(
15781578
local_proc_id.actor_id(format!("test_dummy_proc{}", idx).to_string(), 0),
15791579
);

hyperactor/benches/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ fn bench_message_sizes(c: &mut Criterion) {
8787
assert!(!socket_addr.ip().is_loopback());
8888
}
8989

90-
let (listen_addr, mut rx) = serve::<Message>(addr, "bench").unwrap();
90+
let (listen_addr, mut rx) = serve::<Message>(addr).unwrap();
9191
let tx = dial::<Message>(listen_addr).unwrap();
9292
let msg = Message::new(0, size);
9393
let start = Instant::now();
@@ -127,7 +127,7 @@ fn bench_message_rates(c: &mut Criterion) {
127127
b.iter_custom(|iters| async move {
128128
let total_msgs = iters * rate;
129129
let addr = ChannelAddr::any(transport.clone());
130-
let (listen_addr, mut rx) = serve::<Message>(addr, "bench").unwrap();
130+
let (listen_addr, mut rx) = serve::<Message>(addr).unwrap();
131131
tokio::spawn(async move {
132132
let mut received_count = 0;
133133

@@ -212,9 +212,9 @@ async fn channel_ping_pong(
212212
struct Message(Part);
213213

214214
let (client_addr, mut client_rx) =
215-
channel::serve::<Message>(ChannelAddr::any(transport.clone()), "ping_pong_client").unwrap();
215+
channel::serve::<Message>(ChannelAddr::any(transport.clone())).unwrap();
216216
let (server_addr, mut server_rx) =
217-
channel::serve::<Message>(ChannelAddr::any(transport.clone()), "ping_pong_server").unwrap();
217+
channel::serve::<Message>(ChannelAddr::any(transport.clone())).unwrap();
218218

219219
let _server_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
220220
tokio::spawn(async move {

hyperactor/example/channel.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,8 @@ async fn client(
6464
) -> anyhow::Result<()> {
6565
let server_tx = channel::dial(server_addr)?;
6666

67-
let (client_addr, mut client_rx) = channel::serve::<Message>(
68-
ChannelAddr::any(server_tx.addr().transport().clone()),
69-
"example",
70-
)
71-
.unwrap();
67+
let (client_addr, mut client_rx) =
68+
channel::serve::<Message>(ChannelAddr::any(server_tx.addr().transport().clone())).unwrap();
7269

7370
server_tx.post(Message::Hello(client_addr));
7471

@@ -167,8 +164,7 @@ async fn main() -> Result<(), anyhow::Error> {
167164
match args.command {
168165
Some(Commands::Server) => {
169166
let (server_addr, server_rx) =
170-
channel::serve::<Message>(ChannelAddr::any(args.transport.clone()), "example")
171-
.unwrap();
167+
channel::serve::<Message>(ChannelAddr::any(args.transport.clone())).unwrap();
172168
eprintln!("server listening on {}", server_addr);
173169
server(server_rx).await?;
174170
}
@@ -180,8 +176,7 @@ async fn main() -> Result<(), anyhow::Error> {
180176
// No command: run a self-contained benchmark.
181177
None => {
182178
let (server_addr, server_rx) =
183-
channel::serve::<Message>(ChannelAddr::any(args.transport.clone()), "example")
184-
.unwrap();
179+
channel::serve::<Message>(ChannelAddr::any(args.transport.clone())).unwrap();
185180
let _server_handle = tokio::spawn(server(server_rx));
186181
let client_handle = tokio::spawn(client(server_addr, args.message_size, args.num_iter));
187182

hyperactor/src/channel.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -840,7 +840,6 @@ pub fn dial<M: RemoteMessage>(addr: ChannelAddr) -> Result<ChannelTx<M>, Channel
840840
#[crate::instrument]
841841
pub fn serve<M: RemoteMessage>(
842842
addr: ChannelAddr,
843-
reason: &str,
844843
) -> Result<(ChannelAddr, ChannelRx<M>), ChannelError> {
845844
match addr {
846845
ChannelAddr::Tcp(addr) => {
@@ -871,9 +870,7 @@ pub fn serve<M: RemoteMessage>(
871870
.map(|(addr, inner)| {
872871
tracing::debug!(
873872
name = "serve",
874-
"serving channel address {} for {}",
875-
addr,
876-
reason
873+
%addr,
877874
);
878875
(addr, ChannelRx { inner })
879876
})
@@ -1051,7 +1048,7 @@ mod tests {
10511048
#[tokio::test]
10521049
async fn test_multiple_connections() {
10531050
for addr in ChannelTransport::all().map(ChannelAddr::any) {
1054-
let (listen_addr, mut rx) = crate::channel::serve::<u64>(addr, "test").unwrap();
1051+
let (listen_addr, mut rx) = crate::channel::serve::<u64>(addr).unwrap();
10551052

10561053
let mut sends: JoinSet<()> = JoinSet::new();
10571054
for message in 0u64..100u64 {
@@ -1090,7 +1087,7 @@ mod tests {
10901087
continue;
10911088
}
10921089

1093-
let (listen_addr, rx) = crate::channel::serve::<u64>(addr, "test").unwrap();
1090+
let (listen_addr, rx) = crate::channel::serve::<u64>(addr).unwrap();
10941091

10951092
let tx = dial::<u64>(listen_addr).unwrap();
10961093
tx.try_post(123, oneshot::channel().0).unwrap();
@@ -1139,7 +1136,7 @@ mod tests {
11391136
#[cfg_attr(not(feature = "fb"), ignore)]
11401137
async fn test_dial_serve() {
11411138
for addr in addrs() {
1142-
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr, "test").unwrap();
1139+
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr).unwrap();
11431140
let tx = crate::channel::dial(listen_addr).unwrap();
11441141
tx.try_post(123, oneshot::channel().0).unwrap();
11451142
assert_eq!(rx.recv().await.unwrap(), 123);
@@ -1159,7 +1156,7 @@ mod tests {
11591156
);
11601157
let _guard2 = config.override_key(crate::config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
11611158
for addr in addrs() {
1162-
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr, "test").unwrap();
1159+
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr).unwrap();
11631160
let tx = crate::channel::dial(listen_addr).unwrap();
11641161
tx.send(123).await.unwrap();
11651162
assert_eq!(rx.recv().await.unwrap(), 123);

hyperactor/src/host.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,12 @@ impl<M: ProcManager> Host<M> {
130130
/// Serve a host using the provided ProcManager, on the provided `addr`.
131131
/// On success, the host will multiplex messages for procs on the host
132132
/// on the address of the host.
133+
#[tracing::instrument(skip(manager))]
133134
pub async fn serve(
134135
manager: M,
135136
addr: ChannelAddr,
136137
) -> Result<(Self, MailboxServerHandle), HostError> {
137-
let (frontend_addr, frontend_rx) = channel::serve(addr, "host frontend")?;
138+
let (frontend_addr, frontend_rx) = channel::serve(addr)?;
138139

139140
// We set up a cascade of routers: first, the outer router supports
140141
// sending to the the system proc, while the dial router manages dialed
@@ -143,8 +144,7 @@ impl<M: ProcManager> Host<M> {
143144

144145
// Establish a backend channel on the preferred transport. We currently simply
145146
// serve the same router on both.
146-
let (backend_addr, backend_rx) =
147-
channel::serve(ChannelAddr::any(manager.transport()), "host backend")?;
147+
let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
148148

149149
// Set up a system proc. This is often used to manage the host itself.
150150
let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string());
@@ -854,6 +854,7 @@ where
854854
ChannelTransport::Local
855855
}
856856

857+
#[tracing::instrument(skip(self, _config))]
857858
async fn spawn(
858859
&self,
859860
proc_id: ProcId,
@@ -865,10 +866,7 @@ where
865866
proc_id.clone(),
866867
MailboxClient::dial(forwarder_addr)?.into_boxed(),
867868
);
868-
let (proc_addr, rx) = channel::serve(
869-
ChannelAddr::any(transport),
870-
&format!("LocalProcManager spawning: {}", &proc_id),
871-
)?;
869+
let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
872870
self.procs
873871
.lock()
874872
.await
@@ -1033,16 +1031,15 @@ where
10331031
ChannelTransport::Unix
10341032
}
10351033

1034+
#[tracing::instrument(skip(self, _config))]
10361035
async fn spawn(
10371036
&self,
10381037
proc_id: ProcId,
10391038
forwarder_addr: ChannelAddr,
10401039
_config: (),
10411040
) -> Result<Self::Handle, HostError> {
1042-
let (callback_addr, mut callback_rx) = channel::serve(
1043-
ChannelAddr::any(ChannelTransport::Unix),
1044-
&format!("ProcessProcManager spawning: {}", &proc_id),
1045-
)?;
1041+
let (callback_addr, mut callback_rx) =
1042+
channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
10461043

10471044
let mut cmd = Command::new(&self.program);
10481045
cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
@@ -1128,6 +1125,7 @@ where
11281125
/// forwarding messages to the provided `backend_addr`,
11291126
/// and returning the proc's address and agent actor on
11301127
/// the provided `callback_addr`.
1128+
#[tracing::instrument(skip(spawn))]
11311129
pub async fn spawn_proc<A, S, F>(
11321130
proc_id: ProcId,
11331131
backend_addr: ChannelAddr,
@@ -1153,10 +1151,7 @@ where
11531151

11541152
// Finally serve the proc on the same transport as the backend address,
11551153
// and call back.
1156-
let (proc_addr, proc_rx) = channel::serve(
1157-
ChannelAddr::any(backend_transport),
1158-
&format!("proc addr of: {}", &proc_id),
1159-
)?;
1154+
let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
11601155
proc.clone().serve(proc_rx);
11611156
channel::dial(callback_addr)?
11621157
.send((proc_addr, agent_handle.bind::<A>()))

hyperactor/src/mailbox.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2868,7 +2868,7 @@ mod tests {
28682868
.unwrap(),
28692869
);
28702870

2871-
let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone()), "test").unwrap();
2871+
let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
28722872
let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
28732873
let mbox = Mailbox::new_detached(id!(test[0].actor0));
28742874
let serve_handle = mbox.clone().serve(rx);
@@ -2997,8 +2997,7 @@ mod tests {
29972997

29982998
let mut handles = Vec::new(); // hold on to handles, or channels get closed
29992999
for mbox in mailboxes.iter() {
3000-
let (addr, rx) =
3001-
channel::serve(ChannelAddr::any(ChannelTransport::Local), "test").unwrap();
3000+
let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
30023001
let handle = (*mbox).clone().serve(rx);
30033002
handles.push(handle);
30043003

hyperactor/src/proc.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ use tokio::sync::mpsc;
4646
use tokio::sync::watch;
4747
use tokio::task::JoinHandle;
4848
use tracing::Instrument;
49+
use tracing::Level;
4950
use uuid::Uuid;
5051

5152
use crate as hyperactor;
@@ -333,24 +334,23 @@ impl Proc {
333334
}
334335

335336
/// Create a new direct-addressed proc.
337+
#[tracing::instrument]
336338
pub async fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
337-
let (addr, rx) = channel::serve(addr, &format!("creating Proc::direct: {}", name))?;
339+
let (addr, rx) = channel::serve(addr)?;
338340
let proc_id = ProcId::Direct(addr, name);
339341
let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
340342
proc.clone().serve(rx);
341343
Ok(proc)
342344
}
343345

344346
/// Create a new direct-addressed proc with a default sender for the forwarder.
347+
#[tracing::instrument(skip(default))]
345348
pub fn direct_with_default(
346349
addr: ChannelAddr,
347350
name: String,
348351
default: BoxedMailboxSender,
349352
) -> Result<Self, ChannelError> {
350-
let (addr, rx) = channel::serve(
351-
addr,
352-
&format!("creating Proc::direct_with_default: {}", name),
353-
)?;
353+
let (addr, rx) = channel::serve(addr)?;
354354
let proc_id = ProcId::Direct(addr, name);
355355
let proc = Self::new(
356356
proc_id,
@@ -494,15 +494,18 @@ impl Proc {
494494
params: A::Params,
495495
) -> Result<ActorHandle<A>, anyhow::Error> {
496496
let actor_id = self.allocate_root_id(name)?;
497-
let _ = tracing::debug_span!(
497+
let span = tracing::span!(
498+
Level::INFO,
498499
"spawn_actor",
499500
actor_name = name,
500501
actor_type = std::any::type_name::<A>(),
501502
actor_id = actor_id.to_string(),
502503
);
503-
let (instance, mut actor_loop_receivers, work_rx) =
504-
Instance::new(self.clone(), actor_id.clone(), false, None);
505-
let actor = A::new(params).await?;
504+
let (instance, mut actor_loop_receivers, work_rx) = {
505+
let _guard = span.clone().entered();
506+
Instance::new(self.clone(), actor_id.clone(), false, None)
507+
};
508+
let actor = A::new(params).instrument(span.clone()).await?;
506509
// Add this actor to the proc's actor ledger. We do not actively remove
507510
// inactive actors from ledger, because the actor's state can be inferred
508511
// from its weak cell.
@@ -512,6 +515,7 @@ impl Proc {
512515

513516
instance
514517
.start(actor, actor_loop_receivers.take().unwrap(), work_rx)
518+
.instrument(span)
515519
.await
516520
}
517521

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1731,7 +1731,7 @@ mod tests {
17311731
let config = hyperactor::config::global::lock();
17321732
let _guard = config.override_key(MAX_CAST_DIMENSION_SIZE, 2);
17331733

1734-
let (_, mut rx) = serve::<usize>(addr, "test").unwrap();
1734+
let (_, mut rx) = serve::<usize>(addr).unwrap();
17351735

17361736
let expected_ranks = selection
17371737
.eval(

hyperactor_mesh/src/alloc.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,6 @@ impl AllocAssignedAddr {
517517

518518
pub(crate) fn serve_with_config<M: RemoteMessage>(
519519
self,
520-
reason: &str,
521520
) -> anyhow::Result<(ChannelAddr, ChannelRx<M>)> {
522521
fn set_as_inaddr_any(original: &mut SocketAddr) {
523522
let inaddr_any: IpAddr = match &original {
@@ -552,7 +551,7 @@ impl AllocAssignedAddr {
552551
}
553552
};
554553

555-
let (mut bound, rx) = channel::serve(bind_to, reason)?;
554+
let (mut bound, rx) = channel::serve(bind_to)?;
556555

557556
// Restore the original IP address if we used INADDR_ANY.
558557
match &mut bound {
@@ -837,14 +836,13 @@ pub(crate) mod testing {
837836
transport: ChannelTransport,
838837
) -> (DialMailboxRouter, Instance<()>, Proc, ChannelAddr) {
839838
let (router_channel_addr, router_rx) =
840-
channel::serve(ChannelAddr::any(transport.clone()), "test").unwrap();
839+
channel::serve(ChannelAddr::any(transport.clone())).unwrap();
841840
let router =
842841
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
843842
router.clone().serve(router_rx);
844843

845844
let client_proc_id = ProcId::Ranked(WorldId("test_stuck".to_string()), 0);
846-
let (client_proc_addr, client_rx) =
847-
channel::serve(ChannelAddr::any(transport), "test").unwrap();
845+
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(transport)).unwrap();
848846
let client_proc = Proc::new(
849847
client_proc_id.clone(),
850848
BoxedMailboxSender::new(router.clone()),

hyperactor_mesh/src/alloc/local.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,7 @@ impl Alloc for LocalAlloc {
147147
match self.todo_rx.recv().await? {
148148
Action::Start(rank) => {
149149
let (addr, proc_rx) = loop {
150-
match channel::serve(
151-
ChannelAddr::any(self.transport()),
152-
"LocalAlloc next proc addr",
153-
) {
150+
match channel::serve(ChannelAddr::any(self.transport())) {
154151
Ok(addr_and_proc_rx) => break addr_and_proc_rx,
155152
Err(err) => {
156153
tracing::error!(

0 commit comments

Comments
 (0)