Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
93099c2
Fix live stream session replay and progress contract
May 28, 2026
d0e3f57
Fix running stream reattach progress duplication
May 28, 2026
2810a17
Fix running stream temporary reattach duplicates
May 28, 2026
32c3099
Fix running stream reattach live assistant projection
May 28, 2026
c9a369f
Align live stream progress activity boundaries
May 28, 2026
a9e75a2
Bind live tool activity to progress segments
May 28, 2026
d25545e
Align settled live activity with progress bursts
May 28, 2026
cc54f0d
Keep live activity groups anchored to progress text
May 28, 2026
76f1b93
Fix live activity burst anchor mismatch
May 28, 2026
11533ae
Keep PR3005 scoped to live stream replay
May 28, 2026
38959b7
Update live stream regression tests for anchored replay
May 28, 2026
5775f8b
Keep reattach replay cursor tied to consumed events
May 28, 2026
96bd6e2
Normalize empty live activity bursts
May 28, 2026
ffc1c25
Preserve live tool burst metadata when settling
May 28, 2026
0f2be16
Keep settled tool metadata aligned with live bursts
May 28, 2026
e15f827
Fold empty assistant tool activity into progress anchors
May 28, 2026
1a8379d
Preserve segmented live progress on reconnect
May 28, 2026
5c2c83a
Persist inactive interim activity boundaries
May 28, 2026
886e754
Keep reconnect rendering on live tail segment
May 28, 2026
88601a2
Keep reattach state authoritative over stale snapshots
May 28, 2026
1011d3c
Preserve all live progress segments on reattach
May 28, 2026
ddff720
Flush live progress before inserting tool activity
May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

## [Unreleased]

### Fixed

- WebUI progress guidance now restores a firm visible interim-progress contract for long tool-running chat turns, and session-switch reattach now backfills active stream gaps from the run journal with per-event SSE ids so visible live progress stays stable after switching away and back. (#3014, #2924)
- Live Stream Activity now keeps the same progress-text/tool-burst structure after a run settles: the top Run Activity owns the whole-turn timer, while per-segment tool Activity cards stay bound to the progress text that produced them instead of collapsing back into one late `Activity: N tools` block.
- Live Stream reattach now resumes multi-segment process text from the full assistant accumulator instead of the latest visible segment, preventing older progress text anchors from disappearing after repeated session switches.
- Live Stream now records interim progress boundaries even when a session is being switched away, so tools emitted during that inactive-pane window still reattach to the progress text that produced them.

## [v0.51.152] — 2026-05-28 — Release DX (stage-batch34 — single-PR optional gateway-backed browser chat)

### Added
Expand Down
37 changes: 34 additions & 3 deletions api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4591,9 +4591,21 @@ class StreamChannel:
def __init__(self):
self._lock = threading.Lock()
self._subscribers: list[queue.Queue] = []
self._offline_buffer: list[tuple[str, object]] = []
self._offline_buffer: list[tuple[str, object] | tuple[str, object, str | None]] = []
self._last_event_id: str | None = None

@staticmethod
def _event_id_from_item(item) -> str | None:
if isinstance(item, tuple) and len(item) >= 3:
event_id = item[2]
return str(event_id) if event_id else None
return None

def subscribe(self) -> queue.Queue:
q, _snapshot = self.subscribe_with_snapshot()
return q

def subscribe_with_snapshot(self) -> tuple[queue.Queue, dict[str, object]]:
q: queue.Queue = queue.Queue()
with self._lock:
# Replay buffered events to the new subscriber INSIDE the lock so a
Expand All @@ -4604,7 +4616,17 @@ def subscribe(self) -> queue.Queue:
for item in self._offline_buffer:
q.put_nowait(item)
self._subscribers.append(q)
return q
snapshot = {
"subscriber_count": len(self._subscribers),
"offline_buffered_events": len(self._offline_buffer),
"offline_buffered_event_ids": [
event_id
for event_id in (self._event_id_from_item(item) for item in self._offline_buffer)
if event_id
],
"last_event_id": self._last_event_id,
}
return q, snapshot

def unsubscribe(self, q: queue.Queue) -> None:
with self._lock:
Expand All @@ -4613,8 +4635,11 @@ def unsubscribe(self, q: queue.Queue) -> None:
except ValueError:
pass

def put_nowait(self, item: tuple[str, object]) -> None:
def put_nowait(self, item: tuple[str, object] | tuple[str, object, str | None]) -> None:
with self._lock:
event_id = self._event_id_from_item(item)
if event_id:
self._last_event_id = event_id
subscribers = list(self._subscribers)
if not subscribers:
self._offline_buffer.append(item)
Expand All @@ -4629,6 +4654,12 @@ def diagnostic_snapshot(self) -> dict[str, int]:
return {
"subscriber_count": len(self._subscribers),
"offline_buffered_events": len(self._offline_buffer),
"offline_buffered_event_ids": [
event_id
for event_id in (self._event_id_from_item(item) for item in self._offline_buffer)
if event_id
],
"last_event_id": self._last_event_id,
}


Expand Down
2 changes: 1 addition & 1 deletion api/gateway_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def put_gateway_event(event, data):
except Exception:
logger.debug("Failed to append gateway event %s for stream %s", event, stream_id, exc_info=True)
try:
q.put_nowait((event, data))
q.put_nowait((event, data, event_id))
except Exception:
logger.debug("Failed to put gateway event to queue")

Expand Down
80 changes: 71 additions & 9 deletions api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7068,14 +7068,42 @@ def _parse_run_journal_after_seq(qs: dict) -> int | None:
return 0


def _replay_run_journal(handler, stream_id: str, after_seq: int | None) -> bool:
def _parse_stream_event_seq(event_id) -> int | None:
raw = str(event_id or "").strip()
if not raw:
return None
tail = raw.rsplit(":", 1)[-1]
try:
return max(0, int(tail))
except (TypeError, ValueError):
return None


def _stream_queue_item_parts(item) -> tuple[str, object, str | None]:
if isinstance(item, tuple):
if len(item) >= 3:
return item[0], item[1], (str(item[2]) if item[2] else None)
if len(item) >= 2:
return item[0], item[1], None
return "message", item, None


def _replay_run_journal(
handler,
stream_id: str,
after_seq: int | None,
*,
max_seq: int | None = None,
emit_stale_interrupted: bool = True,
) -> bool:
summary = find_run_summary(stream_id)
if not summary:
return False
journal = read_run_events(
str(summary.get("session_id") or ""),
stream_id,
after_seq=after_seq,
max_seq=max_seq,
)
for entry in journal.get("events") or []:
_sse_with_id(
Expand All @@ -7084,7 +7112,7 @@ def _replay_run_journal(handler, stream_id: str, after_seq: int | None) -> bool:
entry.get("payload"),
entry.get("event_id"),
)
if not summary.get("terminal"):
if emit_stale_interrupted and not summary.get("terminal"):
stale = stale_interrupted_event(
str(summary.get("session_id") or ""),
stream_id,
Expand All @@ -7095,6 +7123,20 @@ def _replay_run_journal(handler, stream_id: str, after_seq: int | None) -> bool:
return True


def _run_journal_terminal_at_or_before(stream_id: str, max_seq: int | None) -> bool:
if max_seq is None:
return False
summary = find_run_summary(stream_id)
if not summary or not summary.get("terminal"):
return False
journal = read_run_events(
str(summary.get("session_id") or ""),
stream_id,
max_seq=max_seq,
)
return any(bool(entry.get("terminal")) for entry in journal.get("events") or [])


def _handle_sse_stream(handler, parsed):
qs = parse_qs(parsed.query)
stream_id = qs.get("stream_id", [""])[0]
Expand All @@ -7117,26 +7159,46 @@ def _handle_sse_stream(handler, parsed):
except _CLIENT_DISCONNECT_ERRORS:
pass
return True
subscriber = stream.subscribe() if hasattr(stream, "subscribe") else stream
active_replay = qs.get("replay", [""])[0] == "1"
subscriber = None
replay_cutoff_seq = None
if hasattr(stream, "subscribe_with_snapshot"):
subscriber, snapshot = stream.subscribe_with_snapshot()
if active_replay:
replay_cutoff_seq = _parse_stream_event_seq((snapshot or {}).get("last_event_id"))
else:
subscriber = stream.subscribe() if hasattr(stream, "subscribe") else stream
handler.send_response(200)
handler.send_header("Content-Type", "text/event-stream; charset=utf-8")
handler.send_header("Cache-Control", "no-cache")
handler.send_header("X-Accel-Buffering", "no")
handler.send_header("Connection", "close")
handler.end_headers()
try:
if active_replay:
replayed = _replay_run_journal(
handler,
stream_id,
_parse_run_journal_after_seq(qs),
max_seq=replay_cutoff_seq,
emit_stale_interrupted=False,
)
if not replayed:
replay_cutoff_seq = None
elif _run_journal_terminal_at_or_before(stream_id, replay_cutoff_seq):
return True
while True:
try:
event, data = subscriber.get(timeout=_SSE_HEARTBEAT_INTERVAL_SECONDS)
item = subscriber.get(timeout=_SSE_HEARTBEAT_INTERVAL_SECONDS)
except queue.Empty:
handler.wfile.write(b": heartbeat\n\n")
handler.wfile.flush()
continue
# Stage-364: emit `id:` from STREAM_LAST_EVENT_ID side-channel so
# the frontend's `_lastRunJournalSeq` cursor advances during live
# streaming. Without this, mid-stream error→replay would arrive
# with after_seq=0 and double-render every journaled event.
event_id = STREAM_LAST_EVENT_ID.get(stream_id)
event, data, event_id = _stream_queue_item_parts(item)
if active_replay and replay_cutoff_seq is not None:
item_seq = _parse_stream_event_seq(event_id)
if item_seq is not None and item_seq <= replay_cutoff_seq:
continue
if event_id:
_sse_with_id(handler, event, data, event_id)
else:
Expand Down
3 changes: 3 additions & 0 deletions api/run_journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,15 @@ def read_run_events(
run_id: str,
*,
after_seq: int | None = None,
max_seq: int | None = None,
session_dir: Path | None = None,
) -> dict:
path = _run_path(session_id, run_id, session_dir=session_dir)
events, malformed = _read_jsonl(path)
if after_seq is not None:
events = [event for event in events if int(event.get("seq") or 0) > int(after_seq)]
if max_seq is not None:
events = [event for event in events if int(event.get("seq") or 0) <= int(max_seq)]
return {
"session_id": str(session_id),
"run_id": str(run_id),
Expand Down
57 changes: 45 additions & 12 deletions api/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -3104,6 +3104,43 @@ def _nearest_assistant_msg_idx(messages, msg_idx: int) -> int:
return -1


def _assistant_message_has_visible_content(msg) -> bool:
if not isinstance(msg, dict) or msg.get('role') != 'assistant':
return False
content = msg.get('content', '')
if isinstance(content, str):
return bool(content.strip())
if not isinstance(content, list):
return False
for part in content:
if isinstance(part, str) and part.strip():
return True
if not isinstance(part, dict):
continue
if part.get('type') in {'text', 'input_text', 'output_text'}:
if str(part.get('text') or part.get('content') or '').strip():
return True
return False


def _nearest_visible_assistant_msg_idx(messages, msg_idx: int) -> int:
"""Find the closest preceding assistant message with visible progress text."""
for idx in range(msg_idx - 1, -1, -1):
msg = messages[idx]
if _assistant_message_has_visible_content(msg):
return idx
return -1


def _assistant_tool_anchor_msg_idx(messages, msg_idx: int) -> int:
"""Anchor empty assistant tool-call messages to the prior visible assistant."""
msg = messages[msg_idx] if isinstance(messages, list) and 0 <= msg_idx < len(messages) else None
if _assistant_message_has_visible_content(msg):
return msg_idx
visible_idx = _nearest_visible_assistant_msg_idx(messages, msg_idx)
return visible_idx if visible_idx >= 0 else msg_idx


def _extract_tool_calls_from_messages(messages, live_tool_calls=None):
"""Build persisted tool-call summaries from final messages plus live progress fallback."""
tool_calls = []
Expand All @@ -3125,7 +3162,7 @@ def _extract_tool_calls_from_messages(messages, live_tool_calls=None):
if tid:
pending_names[tid] = part.get('name', '')
pending_args[tid] = part.get('input', {})
pending_asst_idx[tid] = msg_idx
pending_asst_idx[tid] = _assistant_tool_anchor_msg_idx(messages, msg_idx)
for tc in m.get('tool_calls', []):
if not isinstance(tc, dict):
continue
Expand All @@ -3139,7 +3176,7 @@ def _extract_tool_calls_from_messages(messages, live_tool_calls=None):
if tid and name:
pending_names[tid] = name
pending_args[tid] = args
pending_asst_idx[tid] = msg_idx
pending_asst_idx[tid] = _assistant_tool_anchor_msg_idx(messages, msg_idx)
elif role == 'tool':
tid = m.get('tool_call_id') or m.get('tool_use_id', '')
raw = m.get('content', '')
Expand All @@ -3165,11 +3202,14 @@ def _extract_tool_calls_from_messages(messages, live_tool_calls=None):
if seq_idx >= len(live):
break
live_tc = live[seq_idx]
anchor_idx = _nearest_visible_assistant_msg_idx(messages, seq.get('msg_idx', -1))
if anchor_idx < 0:
anchor_idx = _nearest_assistant_msg_idx(messages, seq.get('msg_idx', -1))
tool_calls.append({
'name': live_tc.get('name', 'tool'),
'snippet': _tool_result_snippet(seq.get('raw', '')),
'tid': live_tc.get('tid', '') or '',
'assistant_msg_idx': _nearest_assistant_msg_idx(messages, seq.get('msg_idx', -1)),
'assistant_msg_idx': anchor_idx,
'args': _truncate_tool_args(live_tc.get('args', {}), limit=4),
})

Expand Down Expand Up @@ -3704,24 +3744,17 @@ def put(event, data):
# If cancelled, drop all further events except the cancel event itself
if cancel_event.is_set() and event not in ('cancel', 'error'):
return
event_id = None
if run_journal is not None:
try:
journaled = run_journal.append_sse_event(event, data)
# Stage-364: propagate journal event_id via a side-channel dict
# (STREAM_LAST_EVENT_ID) instead of changing the queue tuple
# shape — keeping the 2-tuple shape preserves backward
# compatibility for tests and any non-SSE queue consumer. The
# SSE handler reads this dict at emit time to populate `id:`
# on every live frame, which lets the frontend's cursor
# advance during live streaming and prevents replay from
# double-rendering tokens after a mid-stream error→reconnect.
event_id = (journaled or {}).get('event_id') if isinstance(journaled, dict) else None
if event_id:
STREAM_LAST_EVENT_ID[stream_id] = event_id
except Exception:
logger.debug("Failed to append run journal event %s for stream %s", event, stream_id, exc_info=True)
try:
q.put_nowait((event, data))
q.put_nowait((event, data, event_id))
except Exception:
logger.debug("Failed to put event to queue")

Expand Down
Loading