Skip to content

Commit 42b2f0b

Browse files
committed
fix(daemon): keep event stream alive on lag
1 parent fc62fac commit 42b2f0b

File tree

1 file changed

+25
-22
lines changed

1 file changed

+25
-22
lines changed

src-tauri/src/bin/codex_monitor_daemon.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,27 @@ async fn handle_rpc_request(
748748
}
749749
}
750750

751+
async fn forward_events(
752+
mut rx: broadcast::Receiver<DaemonEvent>,
753+
out_tx_events: mpsc::UnboundedSender<String>,
754+
) {
755+
loop {
756+
let event = match rx.recv().await {
757+
Ok(event) => event,
758+
Err(broadcast::error::RecvError::Lagged(_)) => continue,
759+
Err(broadcast::error::RecvError::Closed) => break,
760+
};
761+
762+
let Some(payload) = build_event_notification(event) else {
763+
continue;
764+
};
765+
766+
if out_tx_events.send(payload).is_err() {
767+
break;
768+
}
769+
}
770+
}
771+
751772
async fn handle_client(
752773
socket: TcpStream,
753774
config: Arc<DaemonConfig>,
@@ -773,18 +794,9 @@ async fn handle_client(
773794
let mut events_task: Option<tokio::task::JoinHandle<()>> = None;
774795

775796
if authenticated {
776-
let mut rx = events.subscribe();
797+
let rx = events.subscribe();
777798
let out_tx_events = out_tx.clone();
778-
events_task = Some(tokio::spawn(async move {
779-
while let Ok(event) = rx.recv().await {
780-
let Some(payload) = build_event_notification(event) else {
781-
continue;
782-
};
783-
if out_tx_events.send(payload).is_err() {
784-
break;
785-
}
786-
}
787-
}));
799+
events_task = Some(tokio::spawn(forward_events(rx, out_tx_events)));
788800
}
789801

790802
while let Ok(Some(line)) = lines.next_line().await {
@@ -828,18 +840,9 @@ async fn handle_client(
828840
let _ = out_tx.send(response);
829841
}
830842

831-
let mut rx = events.subscribe();
843+
let rx = events.subscribe();
832844
let out_tx_events = out_tx.clone();
833-
events_task = Some(tokio::spawn(async move {
834-
while let Ok(event) = rx.recv().await {
835-
let Some(payload) = build_event_notification(event) else {
836-
continue;
837-
};
838-
if out_tx_events.send(payload).is_err() {
839-
break;
840-
}
841-
}
842-
}));
845+
events_task = Some(tokio::spawn(forward_events(rx, out_tx_events)));
843846

844847
continue;
845848
}

0 commit comments

Comments
 (0)