Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions hyperactor/src/channel/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,17 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
/// Return when the oldest message has not been acked within the
/// timeout limit. This method is used in tokio::select with other
/// branches.
async fn wait_for_timeout(&self) {
///
/// The provided floor sets a minimum "received at" time to consider
/// in the calculation. This is used to give the channel grace time
/// after watchdog or reconnection events.
async fn wait_for_timeout(&self, floor: Option<Instant>) {
match self.deque.front() {
Some(msg) => {
RealClock
.sleep_until(
msg.received_at + config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
floor.map_or(msg.received_at, |f| f.max(msg.received_at))
+ config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
)
.await
}
Expand Down Expand Up @@ -672,6 +677,8 @@ impl<M: RemoteMessage> NetTx<M> {

let mut watchdog = Watchdog::spawn(config::global::get(config::CHANNEL_WATCHDOG_INTERVAL));

let mut last_recovered: Option<Instant> = None;

let (state, conn) = loop {
let span = Self::state_span(&state, &conn, session_id, &link);

Expand All @@ -680,6 +687,7 @@ impl<M: RemoteMessage> NetTx<M> {
conn,
session_id,
&log_id,
&mut last_recovered,
&mut watchdog,
&link,
&mut receiver,
Expand Down Expand Up @@ -799,6 +807,7 @@ impl<M: RemoteMessage> NetTx<M> {
conn: Conn<S>,
session_id: u64,
log_id: &'a str,
last_recovered: &mut Option<Instant>,
watchdog: &mut Watchdog,
link: &L,
receiver: &mut mpsc::UnboundedReceiver<(M, oneshot::Sender<M>, Instant)>,
Expand Down Expand Up @@ -894,14 +903,15 @@ impl<M: RemoteMessage> NetTx<M> {
}
}
}
(state, _conn) if !watchdog.ok() => {
(state, conn) if conn.is_connected() && !watchdog.ok() => {
// Reconnect on watchdog failure. Maybe the underlying session is stuck somehow.
tracing::error!(
"{log_id}: reconnecting after watchdog failure, last alive: {:?}",
watchdog.last_alive().elapsed()
);
// This will recover the watchdog:
watchdog.send();
*last_recovered = Some(RealClock.now());
(state, Conn::reconnect_with_default())
}
(
Expand Down Expand Up @@ -973,7 +983,7 @@ impl<M: RemoteMessage> NetTx<M> {
},

// If acking message takes too long, consider the link broken.
_ = unacked.wait_for_timeout(), if !unacked.is_empty() => {
_ = unacked.wait_for_timeout(*last_recovered), if !unacked.is_empty() => {
let error_msg = format!(
"{log_id}: failed to receive ack within timeout {} secs; link is currently connected",
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT).as_secs(),
Expand Down