Skip to content

Commit ead957b

Browse files
committed
[hyperactor] implement a grace period for stalled client loops
This change implements a grace period whenever the client loop stalls. Specifically, we reset the ack timeout when such stalls occur, giving the channel time to recover from underlying issues. Differential Revision: [D85187188](https://our.internmc.facebook.com/intern/diff/D85187188/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D85187188/)! ghstack-source-id: 317768313 Pull Request resolved: #1634
1 parent f067d77 commit ead957b

File tree

1 file changed

+14
-4
lines changed

1 file changed

+14
-4
lines changed

hyperactor/src/channel/net.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -518,12 +518,17 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> {
518518
/// Return when the oldest message has not been acked within the
519519
/// timeout limit. This method is used in tokio::select with other
520520
/// branches.
521-
async fn wait_for_timeout(&self) {
521+
///
522+
/// The provided floor sets a minimum "received at" time to consider
523+
/// in the calculation. This is used to give the channel grace time
524+
/// after watchdog or reconnection events.
525+
async fn wait_for_timeout(&self, floor: Option<Instant>) {
522526
match self.deque.front() {
523527
Some(msg) => {
524528
RealClock
525529
.sleep_until(
526-
msg.received_at + config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
530+
floor.map_or(msg.received_at, |f| f.max(msg.received_at))
531+
+ config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
527532
)
528533
.await
529534
}
@@ -672,6 +677,8 @@ impl<M: RemoteMessage> NetTx<M> {
672677

673678
let mut watchdog = Watchdog::spawn(config::global::get(config::CHANNEL_WATCHDOG_INTERVAL));
674679

680+
let mut last_recovered: Option<Instant> = None;
681+
675682
let (state, conn) = loop {
676683
let span = Self::state_span(&state, &conn, session_id, &link);
677684

@@ -680,6 +687,7 @@ impl<M: RemoteMessage> NetTx<M> {
680687
conn,
681688
session_id,
682689
&log_id,
690+
&mut last_recovered,
683691
&mut watchdog,
684692
&link,
685693
&mut receiver,
@@ -799,6 +807,7 @@ impl<M: RemoteMessage> NetTx<M> {
799807
conn: Conn<S>,
800808
session_id: u64,
801809
log_id: &'a str,
810+
last_recovered: &mut Option<Instant>,
802811
watchdog: &mut Watchdog,
803812
link: &L,
804813
receiver: &mut mpsc::UnboundedReceiver<(M, oneshot::Sender<M>, Instant)>,
@@ -894,14 +903,15 @@ impl<M: RemoteMessage> NetTx<M> {
894903
}
895904
}
896905
}
897-
(state, _conn) if !watchdog.ok() => {
906+
(state, conn) if conn.is_connected() && !watchdog.ok() => {
898907
// Reconnect on watchdog failure. Maybe the underlying session is stuck somehow.
899908
tracing::error!(
900909
"{log_id}: reconnecting after watchdog failure, last alive: {:?}",
901910
watchdog.last_alive().elapsed()
902911
);
903912
// This will recover the watchdog:
904913
watchdog.send();
914+
*last_recovered = Some(RealClock.now());
905915
(state, Conn::reconnect_with_default())
906916
}
907917
(
@@ -973,7 +983,7 @@ impl<M: RemoteMessage> NetTx<M> {
973983
},
974984

975985
// If acking message takes too long, consider the link broken.
976-
_ = unacked.wait_for_timeout(), if !unacked.is_empty() => {
986+
_ = unacked.wait_for_timeout(*last_recovered), if !unacked.is_empty() => {
977987
let error_msg = format!(
978988
"{log_id}: failed to receive ack within timeout {} secs; link is currently connected",
979989
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT).as_secs(),

0 commit comments

Comments
 (0)