diff --git a/apps/desktop/src-tauri/src/host.rs b/apps/desktop/src-tauri/src/host.rs index 974defd..cd5f0c0 100644 --- a/apps/desktop/src-tauri/src/host.rs +++ b/apps/desktop/src-tauri/src/host.rs @@ -13,7 +13,8 @@ use serde::Serialize; use crate::emote_index::{EmoteBundle, EmoteIndex}; use crate::message::{ - parse_kick_event, parse_twitch_envelope, parse_youtube_message, UnifiedMessage, + compute_effective_ts, parse_kick_event, parse_twitch_envelope, parse_youtube_message, + UnifiedMessage, }; use crate::ringbuf::RawHandle; @@ -60,6 +61,11 @@ pub struct TwitchCreds { /// which keeps the hot path allocation-free and avoids copying the same /// URL strings into every message. /// +/// The pure-data sort field [`UnifiedMessage::effective_ts`] is computed +/// here under the snap rule. The monotonic +/// [`UnifiedMessage::arrival_seq`] is left at zero; the drain loop owns +/// the counter and assigns it via [`crate::message::assign_arrival_seqs`] before emit. +/// /// Messages that fail to parse or that aren't chat notifications are dropped /// with a log. Each parse is wrapped in `catch_unwind` so a panicking parser /// cannot kill the drain loop (`docs/stability.md` §Rust Panic Handling). @@ -82,6 +88,7 @@ pub fn parse_batch(raw: &[Vec], batch: &mut Vec, emote_index match outcome { Ok(Ok(Some(mut msg))) => { emote_index.scan_into(&msg.message_text, &mut msg.emote_spans); + msg.effective_ts = compute_effective_ts(msg.timestamp, msg.arrival_time); batch.push(msg); } Ok(Ok(None)) => {} @@ -645,6 +652,33 @@ mod tests { assert!(batch.is_empty()); } + /// parse_batch is responsible for stamping `effective_ts`. The platform + /// timestamp here is from 2023, so it's far outside the snap window + /// from `arrival_time` (now), which means the rule must fall back to + /// `arrival_time`. + #[test] + fn parse_batch_stamps_effective_ts_using_snap_rule() { + let viewer = tag_twitch(br##"{ + "metadata": {"message_id":"m","message_type":"notification","message_timestamp":"2023-11-06T18:11:47.492Z"}, + "payload": { + "subscription": {"type":"channel.chat.message"}, + "event": { + "chatter_user_id":"1","chatter_user_login":"u","chatter_user_name":"U", + "message_id":"mid","message":{"text":"hi"} + } + } + }"##); + + let mut batch = Vec::new(); + let idx = EmoteIndex::new(); + parse_batch(std::slice::from_ref(&viewer), &mut batch, &idx); + assert_eq!(batch.len(), 1); + // Far-stale platform ts → effective_ts falls back to arrival_time. + assert_eq!(batch[0].effective_ts, batch[0].arrival_time); + // arrival_seq is left at 0; the supervisor stamps it before emit. + assert_eq!(batch[0].arrival_seq, 0); + } + #[cfg(windows)] #[test] fn mark_and_unmark_handle_inheritance_round_trip() { diff --git a/apps/desktop/src-tauri/src/message.rs b/apps/desktop/src-tauri/src/message.rs index dd2898a..1865c36 100644 --- a/apps/desktop/src-tauri/src/message.rs +++ b/apps/desktop/src-tauri/src/message.rs @@ -24,6 +24,18 @@ pub struct UnifiedMessage { pub platform: Platform, pub timestamp: i64, pub arrival_time: i64, + /// Effective sort timestamp for unified ordering across platforms. + /// Set by [`compute_effective_ts`] using the snap rule: trust the + /// platform-stamped `timestamp` when it agrees with `arrival_time` + /// within [`SNAP_WINDOW_MS`], else fall back to `arrival_time`. This + /// keeps cross-platform interleave coherent without ever reordering + /// already-rendered messages because of vendor clock disagreement. + pub effective_ts: i64, + /// Monotonic per-process arrival counter assigned by [`assign_arrival_seqs`]. + /// The frontend uses `(effective_ts, arrival_seq)` as a stable sort + /// key so two messages with identical effective timestamps never swap + /// position on re-render. + pub arrival_seq: u64, pub username: String, pub display_name: String, pub platform_user_id: String, @@ -40,6 +52,37 @@ pub struct UnifiedMessage { pub emote_spans: Vec, } +/// Tolerance window for snapping `effective_ts` to the platform-stamped +/// `timestamp`. Inside this window we trust the platform's clock; outside +/// it we fall back to local arrival time so cross-platform interleave +/// stays coherent. 500 ms is enough to absorb normal vendor clock skew +/// without letting badly delayed messages time-travel up the visible list. +pub const SNAP_WINDOW_MS: i64 = 500; + +/// Computes the effective sort timestamp using the snap rule documented +/// on [`UnifiedMessage::effective_ts`]. Pure so the rule can be tested +/// directly without going through a parser. +#[inline] +pub fn compute_effective_ts(timestamp: i64, arrival_time: i64) -> i64 { + if (timestamp - arrival_time).abs() <= SNAP_WINDOW_MS { + timestamp + } else { + arrival_time + } +} + +/// Stamps every message in `batch` with a monotonic `arrival_seq`, +/// advancing the caller-owned `next_seq` once per message. The drain loop +/// owns the counter so seq is unique for the lifetime of one sidecar run, +/// and the frontend's `(effective_ts, arrival_seq)` sort key remains +/// stable. +pub fn assign_arrival_seqs(batch: &mut [UnifiedMessage], next_seq: &mut u64) { + for msg in batch.iter_mut() { + msg.arrival_seq = *next_seq; + *next_seq = next_seq.wrapping_add(1); + } +} + #[derive(Debug)] pub enum ParseError { Json(serde_json::Error), @@ -189,6 +232,8 @@ pub fn parse_twitch_envelope(bytes: &[u8]) -> Result, Par platform: Platform::Twitch, timestamp: platform_ts, arrival_time, + effective_ts: 0, + arrival_seq: 0, username: event.chatter_user_login, display_name: event.chatter_user_name, platform_user_id: event.chatter_user_id, @@ -337,6 +382,8 @@ pub fn parse_youtube_message(bytes: &[u8]) -> Result, Par platform: Platform::YouTube, timestamp, arrival_time: chrono::Utc::now().timestamp_millis(), + effective_ts: 0, + arrival_seq: 0, username: channel_id.clone(), display_name, platform_user_id: channel_id, @@ -447,6 +494,8 @@ pub fn parse_kick_event(bytes: &[u8]) -> Result, ParseErr platform: Platform::Kick, timestamp: platform_ts, arrival_time, + effective_ts: 0, + arrival_seq: 0, username: msg.sender.username.clone(), display_name: msg.sender.username, platform_user_id: msg.sender.id.to_string(), @@ -945,4 +994,88 @@ mod tests { let err = parse_kick_event(b"not json").unwrap_err(); assert!(matches!(err, ParseError::Json(_))); } + + fn fake_msg(timestamp: i64, arrival_time: i64) -> UnifiedMessage { + UnifiedMessage { + id: String::new(), + platform: Platform::Twitch, + timestamp, + arrival_time, + effective_ts: 0, + arrival_seq: 0, + username: String::new(), + display_name: String::new(), + platform_user_id: String::new(), + message_text: String::new(), + badges: Vec::new(), + is_mod: false, + is_subscriber: false, + is_broadcaster: false, + color: None, + reply_to: None, + emote_spans: Vec::new(), + } + } + + #[test] + fn snap_uses_platform_ts_when_within_window() { + // Exactly at 0 delta → trust platform. + assert_eq!(compute_effective_ts(1_000, 1_000), 1_000); + // Inside window on either side. + assert_eq!(compute_effective_ts(1_000, 1_400), 1_000); + assert_eq!(compute_effective_ts(1_400, 1_000), 1_400); + } + + #[test] + fn snap_boundary_inclusive() { + // Delta exactly at SNAP_WINDOW_MS still trusts platform. + assert_eq!(compute_effective_ts(1_000, 1_000 + SNAP_WINDOW_MS), 1_000); + assert_eq!( + compute_effective_ts(1_000 + SNAP_WINDOW_MS, 1_000), + 1_000 + SNAP_WINDOW_MS + ); + } + + #[test] + fn snap_falls_back_to_arrival_when_outside_window() { + // One ms past the window → arrival wins. + assert_eq!( + compute_effective_ts(1_000, 1_000 + SNAP_WINDOW_MS + 1), + 1_000 + SNAP_WINDOW_MS + 1 + ); + // Negative delta past the window also flips to arrival. + assert_eq!(compute_effective_ts(10_000, 1_000), 1_000); + } + + #[test] + fn assign_arrival_seqs_assigns_in_order_and_advances_counter() { + let mut batch = vec![fake_msg(0, 0), fake_msg(0, 0), fake_msg(0, 0)]; + let mut counter: u64 = 100; + assign_arrival_seqs(&mut batch, &mut counter); + assert_eq!(batch[0].arrival_seq, 100); + assert_eq!(batch[1].arrival_seq, 101); + assert_eq!(batch[2].arrival_seq, 102); + assert_eq!(counter, 103); + } + + #[test] + fn assign_arrival_seqs_continues_across_batches() { + let mut counter: u64 = 0; + let mut first = vec![fake_msg(0, 0), fake_msg(0, 0)]; + assign_arrival_seqs(&mut first, &mut counter); + let mut second = vec![fake_msg(0, 0)]; + assign_arrival_seqs(&mut second, &mut counter); + assert_eq!(first[0].arrival_seq, 0); + assert_eq!(first[1].arrival_seq, 1); + assert_eq!(second[0].arrival_seq, 2); + assert_eq!(counter, 3); + } + + #[test] + fn assign_arrival_seqs_empty_batch_is_noop() { + let mut batch: Vec = Vec::new(); + let mut counter: u64 = 7; + assign_arrival_seqs(&mut batch, &mut counter); + assert_eq!(counter, 7); + } } diff --git a/apps/desktop/src-tauri/src/sidecar_supervisor.rs b/apps/desktop/src-tauri/src/sidecar_supervisor.rs index c1afa96..7334ffa 100644 --- a/apps/desktop/src-tauri/src/sidecar_supervisor.rs +++ b/apps/desktop/src-tauri/src/sidecar_supervisor.rs @@ -20,7 +20,7 @@ use tauri::{AppHandle, Emitter, Runtime}; #[cfg(windows)] use std::sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicU64, Ordering}, Arc, }; #[cfg(windows)] @@ -40,7 +40,7 @@ use crate::host::{ SidecarEvent, TwitchCreds, SIDECAR_BINARY, SIGNAL_WAIT_TIMEOUT, }; #[cfg(windows)] -use crate::message::UnifiedMessage; +use crate::message::{assign_arrival_seqs, UnifiedMessage}; #[cfg(windows)] use crate::ringbuf::{RawHandle, RingBufReader, WaitOutcome, DEFAULT_CAPACITY}; #[cfg(windows)] @@ -135,6 +135,11 @@ async fn supervise( // (ADR 29). let mut attempt: u32 = 0; let mut backoff = cfg.initial_backoff; + // Per-process monotonic arrival counter. Owned by `supervise` so it + // survives sidecar respawns; without this, every respawn would reset + // `arrival_seq` to 0 and break `(effective_ts, arrival_seq)` as a + // stable sort key across the boundary. + let arrival_seq = Arc::new(AtomicU64::new(0)); loop { attempt += 1; @@ -179,7 +184,17 @@ async fn supervise( }; let started = Instant::now(); - match run_once(&app, &cfg, attempt, Some(&creds), &sender, &auth).await { + match run_once( + &app, + &cfg, + attempt, + Some(&creds), + &sender, + &auth, + &arrival_seq, + ) + .await + { Ok(()) => tracing::info!(attempt, "sidecar iteration ended"), Err(e) => tracing::error!(error = %e, attempt, "sidecar iteration failed"), } @@ -259,6 +274,7 @@ async fn run_once( creds: Option<&TwitchCreds>, sender: &SidecarCommandSender, auth: &Arc, + arrival_seq: &Arc, ) -> Result<(), Box> { let reader = RingBufReader::create_owner(DEFAULT_CAPACITY)?; let handle = reader.raw_handle(); @@ -337,8 +353,9 @@ async fn run_once( let drain_shutdown = shutdown.clone(); let drain_app = app.clone(); let drain_index = emote_index.clone(); + let drain_seq = Arc::clone(arrival_seq); let drain_handle = tauri::async_runtime::spawn_blocking(move || { - run_drain_loop(reader, drain_app, drain_shutdown, drain_index); + run_drain_loop(reader, drain_app, drain_shutdown, drain_index, drain_seq); }); emit_status(app, "running", attempt, None); @@ -532,26 +549,36 @@ fn run_drain_loop( app: AppHandle, shutdown: Arc, emote_index: Arc, + arrival_seq: Arc, ) { let timeout_ms: u32 = SIGNAL_WAIT_TIMEOUT .as_millis() .try_into() .expect("signal wait timeout fits in u32 ms"); let mut batch: Vec = Vec::with_capacity(64); + // Local mirror of the shared atomic; loaded once, written back after + // each emit. Only one drain loop runs at a time across the + // supervisor lifetime (each respawn awaits the prior `run_once`), + // so there are no concurrent writers and Relaxed ordering is + // sufficient for the load/store pair. + let mut next_seq: u64 = arrival_seq.load(Ordering::Relaxed); loop { if shutdown.load(Ordering::Acquire) { - drain_and_emit(&mut reader, &app, &mut batch, &emote_index); + drain_and_emit(&mut reader, &app, &mut batch, &emote_index, &mut next_seq); + arrival_seq.store(next_seq, Ordering::Relaxed); return; } match reader.wait_for_signal(timeout_ms) { Ok(WaitOutcome::Signaled) | Ok(WaitOutcome::TimedOut) => {} Err(e) => { tracing::error!(error = %e, "wait_for_signal failed, drain loop exiting"); + arrival_seq.store(next_seq, Ordering::Relaxed); return; } } - drain_and_emit(&mut reader, &app, &mut batch, &emote_index); + drain_and_emit(&mut reader, &app, &mut batch, &emote_index, &mut next_seq); + arrival_seq.store(next_seq, Ordering::Relaxed); } } @@ -561,6 +588,7 @@ fn drain_and_emit( app: &AppHandle, batch: &mut Vec, emote_index: &EmoteIndex, + next_seq: &mut u64, ) { let raw = reader.drain(); if raw.is_empty() { @@ -571,6 +599,7 @@ fn drain_and_emit( if batch.is_empty() { return; } + assign_arrival_seqs(batch, next_seq); if let Err(e) = app.emit("chat_messages", &*batch) { tracing::error!(error = %e, "failed to emit chat_messages"); } diff --git a/apps/desktop/src/stores/chatStore.test.ts b/apps/desktop/src/stores/chatStore.test.ts index 8775495..659ebfe 100644 --- a/apps/desktop/src/stores/chatStore.test.ts +++ b/apps/desktop/src/stores/chatStore.test.ts @@ -7,6 +7,8 @@ function makeMsg(id: string, text = `msg ${id}`): ChatMessage { platform: "Twitch", timestamp: 0, arrival_time: 0, + effective_ts: 0, + arrival_seq: 0, username: "u", display_name: "U", platform_user_id: "1", diff --git a/apps/desktop/src/stores/chatStore.ts b/apps/desktop/src/stores/chatStore.ts index 90dcfdf..bb87474 100644 --- a/apps/desktop/src/stores/chatStore.ts +++ b/apps/desktop/src/stores/chatStore.ts @@ -39,6 +39,21 @@ export interface ChatMessage { platform: "Twitch" | "YouTube" | "Kick"; timestamp: number; arrival_time: number; + /** + * Sort timestamp under the unified-ordering snap rule (see + * `message.rs::compute_effective_ts`). Equals `timestamp` when the + * platform clock agrees with local arrival within the snap window, + * otherwise equals `arrival_time`. Use `(effective_ts, arrival_seq)` + * as a stable sort key when interleaving messages from different + * platforms or repositioning late arrivals. + */ + effective_ts: number; + /** + * Per-process monotonic arrival counter assigned by the Rust drain + * loop. Tie-breaks messages with identical `effective_ts` so two + * renderers always agree on order. + */ + arrival_seq: number; username: string; display_name: string; platform_user_id: string;