Skip to content

Commit 1ee6502

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Add reason to channel::serve (#1674)
Summary: Pull Request resolved: #1674 We need to have a way to associate the channel address to why it was served. This diff adds a reason parameter to `channel::serve` and log it along with the address. We probably do not need this diff in the future if we include "reason" or something similar in the channel's name directly. But before that, this diff is required otherwise we cannot debug channel errors in many cases. Reviewed By: shayne-fletcher Differential Revision: D85529357 fbshipit-source-id: d75146766ff93834d5781ee67833084ca5932854
1 parent 66eb22c commit 1ee6502

File tree

20 files changed

+158
-101
lines changed

20 files changed

+158
-101
lines changed

controller/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1573,7 +1573,7 @@ mod tests {
15731573
// Set up a local actor.
15741574
let local_proc_id = world_id.proc_id(rank);
15751575
let (local_proc_addr, local_proc_rx) =
1576-
channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
1576+
channel::serve(ChannelAddr::any(ChannelTransport::Local), "mock_proc_actor").unwrap();
15771577
let local_proc_mbox = Mailbox::new_detached(
15781578
local_proc_id.actor_id(format!("test_dummy_proc{}", idx).to_string(), 0),
15791579
);

hyperactor/benches/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ fn bench_message_sizes(c: &mut Criterion) {
8787
assert!(!socket_addr.ip().is_loopback());
8888
}
8989

90-
let (listen_addr, mut rx) = serve::<Message>(addr).unwrap();
90+
let (listen_addr, mut rx) = serve::<Message>(addr, "bench").unwrap();
9191
let tx = dial::<Message>(listen_addr).unwrap();
9292
let msg = Message::new(0, size);
9393
let start = Instant::now();
@@ -127,7 +127,7 @@ fn bench_message_rates(c: &mut Criterion) {
127127
b.iter_custom(|iters| async move {
128128
let total_msgs = iters * rate;
129129
let addr = ChannelAddr::any(transport.clone());
130-
let (listen_addr, mut rx) = serve::<Message>(addr).unwrap();
130+
let (listen_addr, mut rx) = serve::<Message>(addr, "bench").unwrap();
131131
tokio::spawn(async move {
132132
let mut received_count = 0;
133133

@@ -212,9 +212,9 @@ async fn channel_ping_pong(
212212
struct Message(Part);
213213

214214
let (client_addr, mut client_rx) =
215-
channel::serve::<Message>(ChannelAddr::any(transport.clone())).unwrap();
215+
channel::serve::<Message>(ChannelAddr::any(transport.clone()), "ping_pong_client").unwrap();
216216
let (server_addr, mut server_rx) =
217-
channel::serve::<Message>(ChannelAddr::any(transport.clone())).unwrap();
217+
channel::serve::<Message>(ChannelAddr::any(transport.clone()), "ping_pong_server").unwrap();
218218

219219
let _server_handle: tokio::task::JoinHandle<Result<(), anyhow::Error>> =
220220
tokio::spawn(async move {

hyperactor/example/channel.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,11 @@ async fn client(
6464
) -> anyhow::Result<()> {
6565
let server_tx = channel::dial(server_addr)?;
6666

67-
let (client_addr, mut client_rx) =
68-
channel::serve::<Message>(ChannelAddr::any(server_tx.addr().transport().clone())).unwrap();
67+
let (client_addr, mut client_rx) = channel::serve::<Message>(
68+
ChannelAddr::any(server_tx.addr().transport().clone()),
69+
"example",
70+
)
71+
.unwrap();
6972

7073
server_tx.post(Message::Hello(client_addr));
7174

@@ -164,7 +167,8 @@ async fn main() -> Result<(), anyhow::Error> {
164167
match args.command {
165168
Some(Commands::Server) => {
166169
let (server_addr, server_rx) =
167-
channel::serve::<Message>(ChannelAddr::any(args.transport.clone())).unwrap();
170+
channel::serve::<Message>(ChannelAddr::any(args.transport.clone()), "example")
171+
.unwrap();
168172
eprintln!("server listening on {}", server_addr);
169173
server(server_rx).await?;
170174
}
@@ -176,7 +180,8 @@ async fn main() -> Result<(), anyhow::Error> {
176180
// No command: run a self-contained benchmark.
177181
None => {
178182
let (server_addr, server_rx) =
179-
channel::serve::<Message>(ChannelAddr::any(args.transport.clone())).unwrap();
183+
channel::serve::<Message>(ChannelAddr::any(args.transport.clone()), "example")
184+
.unwrap();
180185
let _server_handle = tokio::spawn(server(server_rx));
181186
let client_handle = tokio::spawn(client(server_addr, args.message_size, args.num_iter));
182187

hyperactor/src/channel.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
use core::net::SocketAddr;
1515
use std::fmt;
1616
use std::net::IpAddr;
17-
use std::net::Ipv4Addr;
1817
use std::net::Ipv6Addr;
1918
#[cfg(target_os = "linux")]
2019
use std::os::linux::net::SocketAddrExt;
@@ -35,7 +34,6 @@ use crate::Named;
3534
use crate::RemoteMessage;
3635
use crate::attrs::AttrValue;
3736
use crate::channel::sim::SimAddr;
38-
use crate::config;
3937
use crate::simnet::SimNetError;
4038

4139
pub(crate) mod local;
@@ -842,8 +840,8 @@ pub fn dial<M: RemoteMessage>(addr: ChannelAddr) -> Result<ChannelTx<M>, Channel
842840
#[crate::instrument]
843841
pub fn serve<M: RemoteMessage>(
844842
addr: ChannelAddr,
843+
reason: &str,
845844
) -> Result<(ChannelAddr, ChannelRx<M>), ChannelError> {
846-
tracing::debug!(name = "serve", "serving channel address {}", addr);
847845
match addr {
848846
ChannelAddr::Tcp(addr) => {
849847
let (addr, rx) = net::tcp::serve::<M>(addr)?;
@@ -870,7 +868,15 @@ pub fn serve<M: RemoteMessage>(
870868
a
871869
))),
872870
}
873-
.map(|(addr, inner)| (addr, ChannelRx { inner }))
871+
.map(|(addr, inner)| {
872+
tracing::debug!(
873+
name = "serve",
874+
"serving channel address {} for {}",
875+
addr,
876+
reason
877+
);
878+
(addr, ChannelRx { inner })
879+
})
874880
}
875881

876882
/// Serve on the local address. The server is turned down
@@ -900,6 +906,7 @@ mod tests {
900906
use super::*;
901907
use crate::clock::Clock;
902908
use crate::clock::RealClock;
909+
use crate::config;
903910

904911
#[test]
905912
fn test_channel_addr() {
@@ -1044,7 +1051,7 @@ mod tests {
10441051
#[tokio::test]
10451052
async fn test_multiple_connections() {
10461053
for addr in ChannelTransport::all().map(ChannelAddr::any) {
1047-
let (listen_addr, mut rx) = crate::channel::serve::<u64>(addr).unwrap();
1054+
let (listen_addr, mut rx) = crate::channel::serve::<u64>(addr, "test").unwrap();
10481055

10491056
let mut sends: JoinSet<()> = JoinSet::new();
10501057
for message in 0u64..100u64 {
@@ -1083,7 +1090,7 @@ mod tests {
10831090
continue;
10841091
}
10851092

1086-
let (listen_addr, rx) = crate::channel::serve::<u64>(addr).unwrap();
1093+
let (listen_addr, rx) = crate::channel::serve::<u64>(addr, "test").unwrap();
10871094

10881095
let tx = dial::<u64>(listen_addr).unwrap();
10891096
tx.try_post(123, oneshot::channel().0).unwrap();
@@ -1132,7 +1139,7 @@ mod tests {
11321139
#[cfg_attr(not(feature = "fb"), ignore)]
11331140
async fn test_dial_serve() {
11341141
for addr in addrs() {
1135-
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr).unwrap();
1142+
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr, "test").unwrap();
11361143
let tx = crate::channel::dial(listen_addr).unwrap();
11371144
tx.try_post(123, oneshot::channel().0).unwrap();
11381145
assert_eq!(rx.recv().await.unwrap(), 123);
@@ -1152,7 +1159,7 @@ mod tests {
11521159
);
11531160
let _guard2 = config.override_key(crate::config::MESSAGE_ACK_EVERY_N_MESSAGES, 1);
11541161
for addr in addrs() {
1155-
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr).unwrap();
1162+
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr, "test").unwrap();
11561163
let tx = crate::channel::dial(listen_addr).unwrap();
11571164
tx.send(123).await.unwrap();
11581165
assert_eq!(rx.recv().await.unwrap(), 123);

hyperactor/src/host.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ use crate::Proc;
6464
use crate::ProcId;
6565
use crate::actor::Binds;
6666
use crate::actor::Referable;
67-
use crate::attrs::Attrs;
6867
use crate::channel;
6968
use crate::channel::ChannelAddr;
7069
use crate::channel::ChannelError;
@@ -135,7 +134,7 @@ impl<M: ProcManager> Host<M> {
135134
manager: M,
136135
addr: ChannelAddr,
137136
) -> Result<(Self, MailboxServerHandle), HostError> {
138-
let (frontend_addr, frontend_rx) = channel::serve(addr)?;
137+
let (frontend_addr, frontend_rx) = channel::serve(addr, "host frontend")?;
139138

140139
// We set up a cascade of routers: first, the outer router supports
141140
// sending to the the system proc, while the dial router manages dialed
@@ -144,7 +143,8 @@ impl<M: ProcManager> Host<M> {
144143

145144
// Establish a backend channel on the preferred transport. We currently simply
146145
// serve the same router on both.
147-
let (backend_addr, backend_rx) = channel::serve(ChannelAddr::any(manager.transport()))?;
146+
let (backend_addr, backend_rx) =
147+
channel::serve(ChannelAddr::any(manager.transport()), "host backend")?;
148148

149149
// Set up a system proc. This is often used to manage the host itself.
150150
let service_proc_id = ProcId::Direct(frontend_addr.clone(), "service".to_string());
@@ -865,7 +865,10 @@ where
865865
proc_id.clone(),
866866
MailboxClient::dial(forwarder_addr)?.into_boxed(),
867867
);
868-
let (proc_addr, rx) = channel::serve(ChannelAddr::any(transport))?;
868+
let (proc_addr, rx) = channel::serve(
869+
ChannelAddr::any(transport),
870+
&format!("LocalProcManager spawning: {}", &proc_id),
871+
)?;
869872
self.procs
870873
.lock()
871874
.await
@@ -1036,8 +1039,10 @@ where
10361039
forwarder_addr: ChannelAddr,
10371040
_config: (),
10381041
) -> Result<Self::Handle, HostError> {
1039-
let (callback_addr, mut callback_rx) =
1040-
channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
1042+
let (callback_addr, mut callback_rx) = channel::serve(
1043+
ChannelAddr::any(ChannelTransport::Unix),
1044+
&format!("ProcessProcManager spawning: {}", &proc_id),
1045+
)?;
10411046

10421047
let mut cmd = Command::new(&self.program);
10431048
cmd.env("HYPERACTOR_HOST_PROC_ID", proc_id.to_string());
@@ -1144,11 +1149,14 @@ where
11441149

11451150
let agent_handle = spawn(proc.clone())
11461151
.await
1147-
.map_err(|e| HostError::AgentSpawnFailure(proc_id, e))?;
1152+
.map_err(|e| HostError::AgentSpawnFailure(proc_id.clone(), e))?;
11481153

11491154
// Finally serve the proc on the same transport as the backend address,
11501155
// and call back.
1151-
let (proc_addr, proc_rx) = channel::serve(ChannelAddr::any(backend_transport))?;
1156+
let (proc_addr, proc_rx) = channel::serve(
1157+
ChannelAddr::any(backend_transport),
1158+
&format!("proc addr of: {}", &proc_id),
1159+
)?;
11521160
proc.clone().serve(proc_rx);
11531161
channel::dial(callback_addr)?
11541162
.send((proc_addr, agent_handle.bind::<A>()))

hyperactor/src/mailbox.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,10 +1051,16 @@ pub trait MailboxServer: MailboxSender + Clone + Sized + 'static {
10511051
}
10521052
}
10531053
result = stopped_rx.changed(), if !detached => {
1054-
tracing::debug!(
1055-
"the mailbox server is stopped"
1056-
);
10571054
detached = result.is_err();
1055+
if detached {
1056+
tracing::debug!(
1057+
"the mailbox server is detached for Rx {}", rx.addr()
1058+
);
1059+
} else {
1060+
tracing::debug!(
1061+
"the mailbox server is stopped for Rx {}", rx.addr()
1062+
);
1063+
}
10581064
}
10591065
}
10601066
}
@@ -2856,7 +2862,7 @@ mod tests {
28562862
.unwrap(),
28572863
);
28582864

2859-
let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone())).unwrap();
2865+
let (_, rx) = serve::<MessageEnvelope>(ChannelAddr::Sim(dst_addr.clone()), "test").unwrap();
28602866
let tx = dial::<MessageEnvelope>(src_to_dst).unwrap();
28612867
let mbox = Mailbox::new_detached(id!(test[0].actor0));
28622868
let serve_handle = mbox.clone().serve(rx);
@@ -2985,7 +2991,8 @@ mod tests {
29852991

29862992
let mut handles = Vec::new(); // hold on to handles, or channels get closed
29872993
for mbox in mailboxes.iter() {
2988-
let (addr, rx) = channel::serve(ChannelAddr::any(ChannelTransport::Local)).unwrap();
2994+
let (addr, rx) =
2995+
channel::serve(ChannelAddr::any(ChannelTransport::Local), "test").unwrap();
29892996
let handle = (*mbox).clone().serve(rx);
29902997
handles.push(handle);
29912998

hyperactor/src/proc.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ impl Proc {
334334

335335
/// Create a new direct-addressed proc.
336336
pub async fn direct(addr: ChannelAddr, name: String) -> Result<Self, ChannelError> {
337-
let (addr, rx) = channel::serve(addr)?;
337+
let (addr, rx) = channel::serve(addr, &format!("creating Proc::direct: {}", name))?;
338338
let proc_id = ProcId::Direct(addr, name);
339339
let proc = Self::new(proc_id, DialMailboxRouter::new().into_boxed());
340340
proc.clone().serve(rx);
@@ -347,7 +347,10 @@ impl Proc {
347347
name: String,
348348
default: BoxedMailboxSender,
349349
) -> Result<Self, ChannelError> {
350-
let (addr, rx) = channel::serve(addr)?;
350+
let (addr, rx) = channel::serve(
351+
addr,
352+
&format!("creating Proc::direct_with_default: {}", name),
353+
)?;
351354
let proc_id = ProcId::Direct(addr, name);
352355
let proc = Self::new(
353356
proc_id,

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1730,7 +1730,7 @@ mod tests {
17301730
let config = hyperactor::config::global::lock();
17311731
let _guard = config.override_key(MAX_CAST_DIMENSION_SIZE, 2);
17321732

1733-
let (_, mut rx) = serve::<usize>(addr).unwrap();
1733+
let (_, mut rx) = serve::<usize>(addr, "test").unwrap();
17341734

17351735
let expected_ranks = selection
17361736
.eval(

hyperactor_mesh/src/alloc.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,7 @@ impl AllocAssignedAddr {
517517

518518
pub(crate) fn serve_with_config<M: RemoteMessage>(
519519
self,
520+
reason: &str,
520521
) -> anyhow::Result<(ChannelAddr, ChannelRx<M>)> {
521522
fn set_as_inaddr_any(original: &mut SocketAddr) {
522523
let inaddr_any: IpAddr = match &original {
@@ -551,7 +552,7 @@ impl AllocAssignedAddr {
551552
}
552553
};
553554

554-
let (mut bound, rx) = channel::serve(bind_to)?;
555+
let (mut bound, rx) = channel::serve(bind_to, reason)?;
555556

556557
// Restore the original IP address if we used INADDR_ANY.
557558
match &mut bound {
@@ -836,13 +837,14 @@ pub(crate) mod testing {
836837
transport: ChannelTransport,
837838
) -> (DialMailboxRouter, Instance<()>, Proc, ChannelAddr) {
838839
let (router_channel_addr, router_rx) =
839-
channel::serve(ChannelAddr::any(transport.clone())).unwrap();
840+
channel::serve(ChannelAddr::any(transport.clone()), "test").unwrap();
840841
let router =
841842
DialMailboxRouter::new_with_default((UndeliverableMailboxSender {}).into_boxed());
842843
router.clone().serve(router_rx);
843844

844845
let client_proc_id = ProcId::Ranked(WorldId("test_stuck".to_string()), 0);
845-
let (client_proc_addr, client_rx) = channel::serve(ChannelAddr::any(transport)).unwrap();
846+
let (client_proc_addr, client_rx) =
847+
channel::serve(ChannelAddr::any(transport), "test").unwrap();
846848
let client_proc = Proc::new(
847849
client_proc_id.clone(),
848850
BoxedMailboxSender::new(router.clone()),

hyperactor_mesh/src/alloc/local.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,10 @@ impl Alloc for LocalAlloc {
147147
match self.todo_rx.recv().await? {
148148
Action::Start(rank) => {
149149
let (addr, proc_rx) = loop {
150-
match channel::serve(ChannelAddr::any(self.transport())) {
150+
match channel::serve(
151+
ChannelAddr::any(self.transport()),
152+
"LocalAlloc next proc addr",
153+
) {
151154
Ok(addr_and_proc_rx) => break addr_and_proc_rx,
152155
Err(err) => {
153156
tracing::error!(

0 commit comments

Comments
 (0)