Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion apps/desktop/src-tauri/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use serde::Serialize;

use crate::emote_index::{EmoteBundle, EmoteIndex};
use crate::message::{
parse_kick_event, parse_twitch_envelope, parse_youtube_message, UnifiedMessage,
compute_effective_ts, parse_kick_event, parse_twitch_envelope, parse_youtube_message,
UnifiedMessage,
};
use crate::ringbuf::RawHandle;

Expand Down Expand Up @@ -60,6 +61,11 @@ pub struct TwitchCreds {
/// which keeps the hot path allocation-free and avoids copying the same
/// URL strings into every message.
///
/// The pure-data sort field [`UnifiedMessage::effective_ts`] is computed
/// here under the snap rule. The monotonic
/// [`UnifiedMessage::arrival_seq`] is left at zero; the drain loop owns
/// the counter and assigns it via [`crate::message::assign_arrival_seqs`] before emit.
///
/// Messages that fail to parse or that aren't chat notifications are dropped
/// with a log. Each parse is wrapped in `catch_unwind` so a panicking parser
/// cannot kill the drain loop (`docs/stability.md` §Rust Panic Handling).
Expand All @@ -82,6 +88,7 @@ pub fn parse_batch(raw: &[Vec<u8>], batch: &mut Vec<UnifiedMessage>, emote_index
match outcome {
Ok(Ok(Some(mut msg))) => {
emote_index.scan_into(&msg.message_text, &mut msg.emote_spans);
msg.effective_ts = compute_effective_ts(msg.timestamp, msg.arrival_time);
batch.push(msg);
}
Ok(Ok(None)) => {}
Expand Down Expand Up @@ -645,6 +652,33 @@ mod tests {
assert!(batch.is_empty());
}

/// parse_batch is responsible for stamping `effective_ts`. The platform
/// timestamp here is from 2023, so it's far outside the snap window
/// from `arrival_time` (now), which means the rule must fall back to
/// `arrival_time`.
#[test]
fn parse_batch_stamps_effective_ts_using_snap_rule() {
let viewer = tag_twitch(br##"{
"metadata": {"message_id":"m","message_type":"notification","message_timestamp":"2023-11-06T18:11:47.492Z"},
"payload": {
"subscription": {"type":"channel.chat.message"},
"event": {
"chatter_user_id":"1","chatter_user_login":"u","chatter_user_name":"U",
"message_id":"mid","message":{"text":"hi"}
}
}
}"##);

let mut batch = Vec::new();
let idx = EmoteIndex::new();
parse_batch(std::slice::from_ref(&viewer), &mut batch, &idx);
assert_eq!(batch.len(), 1);
// Far-stale platform ts → effective_ts falls back to arrival_time.
assert_eq!(batch[0].effective_ts, batch[0].arrival_time);
// arrival_seq is left at 0; the supervisor stamps it before emit.
assert_eq!(batch[0].arrival_seq, 0);
}

#[cfg(windows)]
#[test]
fn mark_and_unmark_handle_inheritance_round_trip() {
Expand Down
133 changes: 133 additions & 0 deletions apps/desktop/src-tauri/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ pub struct UnifiedMessage {
pub platform: Platform,
pub timestamp: i64,
pub arrival_time: i64,
/// Effective sort timestamp for unified ordering across platforms.
/// Set by [`compute_effective_ts`] using the snap rule: trust the
/// platform-stamped `timestamp` when it agrees with `arrival_time`
/// within [`SNAP_WINDOW_MS`], else fall back to `arrival_time`. This
/// keeps cross-platform interleave coherent without ever reordering
/// already-rendered messages because of vendor clock disagreement.
pub effective_ts: i64,
/// Monotonic per-process arrival counter assigned by [`assign_arrival_seqs`].
/// The frontend uses `(effective_ts, arrival_seq)` as a stable sort
/// key so two messages with identical effective timestamps never swap
/// position on re-render.
pub arrival_seq: u64,
pub username: String,
pub display_name: String,
pub platform_user_id: String,
Expand All @@ -40,6 +52,37 @@ pub struct UnifiedMessage {
pub emote_spans: Vec<EmoteSpan>,
}

/// Tolerance window for snapping `effective_ts` to the platform-stamped
/// `timestamp`. Inside this window we trust the platform's clock; outside
/// it we fall back to local arrival time so cross-platform interleave
/// stays coherent. 500 ms is enough to absorb normal vendor clock skew
/// without letting badly delayed messages time-travel up the visible list.
pub const SNAP_WINDOW_MS: i64 = 500;

/// Computes the effective sort timestamp using the snap rule documented
/// on [`UnifiedMessage::effective_ts`]. Pure so the rule can be tested
/// directly without going through a parser.
#[inline]
pub fn compute_effective_ts(timestamp: i64, arrival_time: i64) -> i64 {
if (timestamp - arrival_time).abs() <= SNAP_WINDOW_MS {
timestamp
} else {
arrival_time
}
}

/// Stamps every message in `batch` with a monotonic `arrival_seq`,
/// advancing the caller-owned `next_seq` once per message. The drain loop
/// owns the counter so seq is unique for the lifetime of one sidecar run,
/// and the frontend's `(effective_ts, arrival_seq)` sort key remains
/// stable.
pub fn assign_arrival_seqs(batch: &mut [UnifiedMessage], next_seq: &mut u64) {
for msg in batch.iter_mut() {
msg.arrival_seq = *next_seq;
*next_seq = next_seq.wrapping_add(1);
}
}

#[derive(Debug)]
pub enum ParseError {
Json(serde_json::Error),
Expand Down Expand Up @@ -189,6 +232,8 @@ pub fn parse_twitch_envelope(bytes: &[u8]) -> Result<Option<UnifiedMessage>, Par
platform: Platform::Twitch,
timestamp: platform_ts,
arrival_time,
effective_ts: 0,
arrival_seq: 0,
username: event.chatter_user_login,
display_name: event.chatter_user_name,
platform_user_id: event.chatter_user_id,
Expand Down Expand Up @@ -337,6 +382,8 @@ pub fn parse_youtube_message(bytes: &[u8]) -> Result<Option<UnifiedMessage>, Par
platform: Platform::YouTube,
timestamp,
arrival_time: chrono::Utc::now().timestamp_millis(),
effective_ts: 0,
arrival_seq: 0,
username: channel_id.clone(),
display_name,
platform_user_id: channel_id,
Expand Down Expand Up @@ -447,6 +494,8 @@ pub fn parse_kick_event(bytes: &[u8]) -> Result<Option<UnifiedMessage>, ParseErr
platform: Platform::Kick,
timestamp: platform_ts,
arrival_time,
effective_ts: 0,
arrival_seq: 0,
username: msg.sender.username.clone(),
display_name: msg.sender.username,
platform_user_id: msg.sender.id.to_string(),
Expand Down Expand Up @@ -945,4 +994,88 @@ mod tests {
let err = parse_kick_event(b"not json").unwrap_err();
assert!(matches!(err, ParseError::Json(_)));
}

fn fake_msg(timestamp: i64, arrival_time: i64) -> UnifiedMessage {
UnifiedMessage {
id: String::new(),
platform: Platform::Twitch,
timestamp,
arrival_time,
effective_ts: 0,
arrival_seq: 0,
username: String::new(),
display_name: String::new(),
platform_user_id: String::new(),
message_text: String::new(),
badges: Vec::new(),
is_mod: false,
is_subscriber: false,
is_broadcaster: false,
color: None,
reply_to: None,
emote_spans: Vec::new(),
}
}

#[test]
fn snap_uses_platform_ts_when_within_window() {
// Exactly at 0 delta → trust platform.
assert_eq!(compute_effective_ts(1_000, 1_000), 1_000);
// Inside window on either side.
assert_eq!(compute_effective_ts(1_000, 1_400), 1_000);
assert_eq!(compute_effective_ts(1_400, 1_000), 1_400);
}

#[test]
fn snap_boundary_inclusive() {
// Delta exactly at SNAP_WINDOW_MS still trusts platform.
assert_eq!(compute_effective_ts(1_000, 1_000 + SNAP_WINDOW_MS), 1_000);
assert_eq!(
compute_effective_ts(1_000 + SNAP_WINDOW_MS, 1_000),
1_000 + SNAP_WINDOW_MS
);
}

#[test]
fn snap_falls_back_to_arrival_when_outside_window() {
// One ms past the window → arrival wins.
assert_eq!(
compute_effective_ts(1_000, 1_000 + SNAP_WINDOW_MS + 1),
1_000 + SNAP_WINDOW_MS + 1
);
// Negative delta past the window also flips to arrival.
assert_eq!(compute_effective_ts(10_000, 1_000), 1_000);
}

#[test]
fn assign_arrival_seqs_assigns_in_order_and_advances_counter() {
let mut batch = vec![fake_msg(0, 0), fake_msg(0, 0), fake_msg(0, 0)];
let mut counter: u64 = 100;
assign_arrival_seqs(&mut batch, &mut counter);
assert_eq!(batch[0].arrival_seq, 100);
assert_eq!(batch[1].arrival_seq, 101);
assert_eq!(batch[2].arrival_seq, 102);
assert_eq!(counter, 103);
}

#[test]
fn assign_arrival_seqs_continues_across_batches() {
let mut counter: u64 = 0;
let mut first = vec![fake_msg(0, 0), fake_msg(0, 0)];
assign_arrival_seqs(&mut first, &mut counter);
let mut second = vec![fake_msg(0, 0)];
assign_arrival_seqs(&mut second, &mut counter);
assert_eq!(first[0].arrival_seq, 0);
assert_eq!(first[1].arrival_seq, 1);
assert_eq!(second[0].arrival_seq, 2);
assert_eq!(counter, 3);
}

#[test]
fn assign_arrival_seqs_empty_batch_is_noop() {
let mut batch: Vec<UnifiedMessage> = Vec::new();
let mut counter: u64 = 7;
assign_arrival_seqs(&mut batch, &mut counter);
assert_eq!(counter, 7);
}
}
41 changes: 35 additions & 6 deletions apps/desktop/src-tauri/src/sidecar_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tauri::{AppHandle, Emitter, Runtime};

#[cfg(windows)]
use std::sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
};
#[cfg(windows)]
Expand All @@ -40,7 +40,7 @@ use crate::host::{
SidecarEvent, TwitchCreds, SIDECAR_BINARY, SIGNAL_WAIT_TIMEOUT,
};
#[cfg(windows)]
use crate::message::UnifiedMessage;
use crate::message::{assign_arrival_seqs, UnifiedMessage};
#[cfg(windows)]
use crate::ringbuf::{RawHandle, RingBufReader, WaitOutcome, DEFAULT_CAPACITY};
#[cfg(windows)]
Expand Down Expand Up @@ -135,6 +135,11 @@ async fn supervise<R: Runtime>(
// (ADR 29).
let mut attempt: u32 = 0;
let mut backoff = cfg.initial_backoff;
// Per-process monotonic arrival counter. Owned by `supervise` so it
// survives sidecar respawns; without this, every respawn would reset
// `arrival_seq` to 0 and break `(effective_ts, arrival_seq)` as a
// stable sort key across the boundary.
let arrival_seq = Arc::new(AtomicU64::new(0));

loop {
attempt += 1;
Expand Down Expand Up @@ -179,7 +184,17 @@ async fn supervise<R: Runtime>(
};

let started = Instant::now();
match run_once(&app, &cfg, attempt, Some(&creds), &sender, &auth).await {
match run_once(
&app,
&cfg,
attempt,
Some(&creds),
&sender,
&auth,
&arrival_seq,
)
.await
{
Ok(()) => tracing::info!(attempt, "sidecar iteration ended"),
Err(e) => tracing::error!(error = %e, attempt, "sidecar iteration failed"),
}
Expand Down Expand Up @@ -259,6 +274,7 @@ async fn run_once<R: Runtime>(
creds: Option<&TwitchCreds>,
sender: &SidecarCommandSender,
auth: &Arc<AuthManager>,
arrival_seq: &Arc<AtomicU64>,
) -> Result<(), Box<dyn std::error::Error>> {
let reader = RingBufReader::create_owner(DEFAULT_CAPACITY)?;
let handle = reader.raw_handle();
Expand Down Expand Up @@ -337,8 +353,9 @@ async fn run_once<R: Runtime>(
let drain_shutdown = shutdown.clone();
let drain_app = app.clone();
let drain_index = emote_index.clone();
let drain_seq = Arc::clone(arrival_seq);
let drain_handle = tauri::async_runtime::spawn_blocking(move || {
run_drain_loop(reader, drain_app, drain_shutdown, drain_index);
run_drain_loop(reader, drain_app, drain_shutdown, drain_index, drain_seq);
});

emit_status(app, "running", attempt, None);
Expand Down Expand Up @@ -532,26 +549,36 @@ fn run_drain_loop<R: Runtime>(
app: AppHandle<R>,
shutdown: Arc<AtomicBool>,
emote_index: Arc<EmoteIndex>,
arrival_seq: Arc<AtomicU64>,
) {
let timeout_ms: u32 = SIGNAL_WAIT_TIMEOUT
.as_millis()
.try_into()
.expect("signal wait timeout fits in u32 ms");
let mut batch: Vec<UnifiedMessage> = Vec::with_capacity(64);
// Local mirror of the shared atomic; loaded once, written back after
// each emit. Only one drain loop runs at a time across the
// supervisor lifetime (each respawn awaits the prior `run_once`),
// so there are no concurrent writers and Relaxed ordering is
// sufficient for the load/store pair.
let mut next_seq: u64 = arrival_seq.load(Ordering::Relaxed);

loop {
if shutdown.load(Ordering::Acquire) {
drain_and_emit(&mut reader, &app, &mut batch, &emote_index);
drain_and_emit(&mut reader, &app, &mut batch, &emote_index, &mut next_seq);
arrival_seq.store(next_seq, Ordering::Relaxed);
return;
}
match reader.wait_for_signal(timeout_ms) {
Ok(WaitOutcome::Signaled) | Ok(WaitOutcome::TimedOut) => {}
Err(e) => {
tracing::error!(error = %e, "wait_for_signal failed, drain loop exiting");
arrival_seq.store(next_seq, Ordering::Relaxed);
return;
}
}
drain_and_emit(&mut reader, &app, &mut batch, &emote_index);
drain_and_emit(&mut reader, &app, &mut batch, &emote_index, &mut next_seq);
arrival_seq.store(next_seq, Ordering::Relaxed);
}
}

Expand All @@ -561,6 +588,7 @@ fn drain_and_emit<R: Runtime>(
app: &AppHandle<R>,
batch: &mut Vec<UnifiedMessage>,
emote_index: &EmoteIndex,
next_seq: &mut u64,
) {
let raw = reader.drain();
if raw.is_empty() {
Expand All @@ -571,6 +599,7 @@ fn drain_and_emit<R: Runtime>(
if batch.is_empty() {
return;
}
assign_arrival_seqs(batch, next_seq);
if let Err(e) = app.emit("chat_messages", &*batch) {
tracing::error!(error = %e, "failed to emit chat_messages");
}
Expand Down
2 changes: 2 additions & 0 deletions apps/desktop/src/stores/chatStore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ function makeMsg(id: string, text = `msg ${id}`): ChatMessage {
platform: "Twitch",
timestamp: 0,
arrival_time: 0,
effective_ts: 0,
arrival_seq: 0,
username: "u",
display_name: "U",
platform_user_id: "1",
Expand Down
15 changes: 15 additions & 0 deletions apps/desktop/src/stores/chatStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ export interface ChatMessage {
platform: "Twitch" | "YouTube" | "Kick";
timestamp: number;
arrival_time: number;
/**
* Sort timestamp under the unified-ordering snap rule (see
* `message.rs::compute_effective_ts`). Equals `timestamp` when the
* platform clock agrees with local arrival within the snap window,
* otherwise equals `arrival_time`. Use `(effective_ts, arrival_seq)`
* as a stable sort key when interleaving messages from different
* platforms or repositioning late arrivals.
*/
effective_ts: number;
/**
* Per-process monotonic arrival counter assigned by the Rust drain
* loop. Tie-breaks messages with identical `effective_ts` so two
* renderers always agree on order.
*/
arrival_seq: number;
username: string;
display_name: string;
platform_user_id: string;
Expand Down
Loading