Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

## [Unreleased]

## [v0.50.249] — 2026-04-30

### Added
- **Real-time clarify notifications via SSE long-connection** — replaces the 1.5s HTTP polling loop for clarify (`/api/clarify/pending`) with a Server-Sent Events endpoint at `/api/clarify/stream?session_id=` that pushes clarify events to the browser the instant they fire. Mirrors the approval-SSE pattern shipped in v0.50.248 (#1350) including all the correctness lessons learned during that release: atomic subscribe + initial snapshot inside a single `with clarify._lock:` block (no snapshot/subscribe race), `_clarify_sse_notify` invoked from inside `_lock` in both `submit_pending` and `resolve_clarify` (no notify-ordering race), payload built from `q[0].data` head-of-queue (not the just-appended entry), and `resolve_clarify` re-emits the new head (or `None`/`0` when empty) so trailing clarify prompts never get stuck. Frontend uses `EventSource` with automatic 3s HTTP polling fallback on `onerror`, plus a 60s reconnect timer to recover from silently-broken connections. Bounded `queue.Queue(maxsize=16)` per subscriber with silent drop on full prevents memory leaks from slow tabs. 29 new static-analysis + unit + concurrency tests. (`api/clarify.py`, `api/routes.py`, `static/messages.js`, `tests/test_clarify_sse.py`) @fxd-jason — PR #1355

### Fixed
- **Context window indicator no longer shows misleading "100% used (0% left)" when context_length is missing from the live SSE payload** — the v0.50.247 / PR #1348 fallback to `agent.model_metadata.get_model_context_length()` was applied to the session-save path but NOT to the live SSE `usage` event. For sessions on large-context models (e.g. claude-sonnet-4.6 via OpenRouter, 1M tokens) where the agent didn't have a compressor configured, `usage.context_length` was omitted from the SSE payload, the JS frontend defaulted to 128K, and cumulative `input_tokens` over multiple turns overflowed against the 128K default — clamping the ring to 100% with a tooltip claiming the context was "0% left." The fix mirrors the session-save fallback exactly: when `usage.context_length` is missing, resolve via `get_model_context_length(model, base_url)` and write it onto the `usage` dict before serialization. Symmetric fallback added for `last_prompt_tokens` (uses `s.last_prompt_tokens` instead of the cumulative `input_tokens` counter). Frontend now tracks `rawPct` separately from the clamped `pct`; when `rawPct > 100` the tooltip shows `${rawPct}% used (context exceeded)` instead of misleading users. (`api/streaming.py`, `static/ui.js`) — PR #1356
- **"Uploading…" composer status persists for the entire stream duration after a file upload** — `setComposerStatus('Uploading…')` was set before `uploadPendingFiles()` but never cleared after the upload completed; only `setBusy(false)` at the end of the agent stream eventually wiped it. Users saw "Uploading…" displayed during the agent response, which is misleading. The fix clears the status unconditionally after the upload await completes. UX defect, no behavior change to upload correctness or message text. (`static/messages.js`) — PR #1356
- **Imported CLI/gateway session metadata survives compact() round-trip** — `Session.load_metadata_only().compact()` was dropping `is_cli_session`, `source_tag`, `session_source`, and `source_label`, so imported agent/Telegram/messaging sessions in the sidebar lost their provenance after the metadata-only fast path. Adds these four fields to `Session.__init__`, the `METADATA_FIELDS` save round-trip, and `compact()` output. Without this, sidebar payloads couldn't distinguish imported sessions from native WebUI ones. (`api/models.py`, `tests/test_gateway_sync.py`) @dso2ng — PR #1357
- **Sidebar collapses compression-lineage segments instead of showing every segment as a separate row** — when an agent session has a compression lineage (`_lineage_root_id` populated by the gateway-import path in `api/agent_sessions.py:169`), the sidebar previously listed each segment as its own top-level conversation, cluttering the list with what the user perceives as a single conversation. Adds a pure client-side helper `_collapseSessionLineageForSidebar()` that groups by `_lineage_root_id`/`lineage_root_id`/`parent_session_id`, keeps only the most recently active tip per group, and stores `_lineage_collapsed_count` on the visible row for future UI affordances. Non-destructive — no session JSON or messages are merged, deleted, or rewritten. Only collapses rows when lineage metadata is present. (`static/sessions.js`, `tests/test_session_lineage_collapse.py`) @dso2ng — PR #1358
- **Active session synchronizes across multiple browser tabs** — multiple WebUI tabs sharing the same `localStorage` would diverge from each other when one tab switched sessions, leaving idle tabs with stale in-memory active-session state until their next user action wrote into the wrong session. Adds a `storage` event listener on the `hermes-webui-session` localStorage key. Idle tabs auto-load the new active session and re-render the sidebar cache. Busy tabs (currently mid-turn) do not auto-switch — they show a brief toast instead, so the user notices but the active turn isn't interrupted. (`static/sessions.js`, `tests/test_session_cross_tab_sync.py`) @dso2ng — PR #1359

## [v0.50.248] — 2026-04-30

### Added
Expand Down
61 changes: 50 additions & 11 deletions api/clarify.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from __future__ import annotations

import queue
import threading
import time
from typing import Optional
Expand All @@ -17,6 +18,9 @@
_gateway_queues: dict[str, list] = {}
_gateway_notify_cbs: dict[str, object] = {}

# ── SSE subscriber registry ─────────────────────────────────────────────
_clarify_sse_subscribers: dict[str, list[queue.Queue]] = {}


class _ClarifyEntry:
"""One pending clarify request inside a session."""
Expand Down Expand Up @@ -70,15 +74,46 @@ def _with_timeout_metadata(data: dict) -> dict:
return item


def _clarify_sse_notify(session_id: str, head: dict | None, total: int) -> None:
"""Push a clarify event to all SSE subscribers for a session."""
payload = {"pending": dict(head) if head else None, "pending_count": total}
for q in _clarify_sse_subscribers.get(session_id, ()):
try:
q.put_nowait(payload)
except queue.Full:
pass # drop if subscriber is slow


def sse_subscribe(session_id: str) -> queue.Queue:
"""Register a bounded Queue for SSE push to a given session."""
q: queue.Queue = queue.Queue(maxsize=16)
with _lock:
_clarify_sse_subscribers.setdefault(session_id, []).append(q)
return q


def sse_unsubscribe(session_id: str, q: queue.Queue) -> None:
"""Remove a subscriber Queue; clean up empty session entries."""
with _lock:
subs = _clarify_sse_subscribers.get(session_id)
if subs:
try:
subs.remove(q)
except ValueError:
pass
if not subs:
_clarify_sse_subscribers.pop(session_id, None)


def submit_pending(session_key: str, data: dict) -> _ClarifyEntry:
"""Queue a pending clarify request and notify the UI callback if registered."""
data = _with_timeout_metadata(data)
with _lock:
queue = _gateway_queues.setdefault(session_key, [])
gw_queue = _gateway_queues.setdefault(session_key, [])
# De-duplicate while unresolved: if the most recent pending clarify is
# semantically identical, reuse it instead of stacking duplicates.
if queue:
last = queue[-1]
if gw_queue:
last = gw_queue[-1]
if (
str(last.data.get("question", "")) == str(data.get("question", ""))
and list(last.data.get("choices_offered") or [])
Expand All @@ -87,7 +122,7 @@ def submit_pending(session_key: str, data: dict) -> _ClarifyEntry:
entry = last
cb = _gateway_notify_cbs.get(session_key)
# Keep _pending aligned to the oldest unresolved entry.
_pending[session_key] = queue[0].data
_pending[session_key] = gw_queue[0].data
if cb:
try:
cb(dict(entry.data))
Expand All @@ -96,9 +131,11 @@ def submit_pending(session_key: str, data: dict) -> _ClarifyEntry:
return entry

entry = _ClarifyEntry(data)
queue.append(entry)
_pending[session_key] = queue[0].data
gw_queue.append(entry)
_pending[session_key] = gw_queue[0].data
cb = _gateway_notify_cbs.get(session_key)
# Notify SSE subscribers from inside _lock for ordering guarantees.
_clarify_sse_notify(session_key, dict(gw_queue[0].data), len(gw_queue))
if cb:
try:
cb(data)
Expand All @@ -125,15 +162,17 @@ def has_pending(session_key: str) -> bool:
def resolve_clarify(session_key: str, response: str, resolve_all: bool = False) -> int:
"""Resolve the oldest pending clarify request for a session."""
with _lock:
queue = _gateway_queues.get(session_key)
if not queue:
q = _gateway_queues.get(session_key)
if not q:
_pending.pop(session_key, None)
return 0
entries = list(queue) if resolve_all else [queue.pop(0)]
if queue:
_pending[session_key] = queue[0].data
entries = list(q) if resolve_all else [q.pop(0)]
if q:
_pending[session_key] = q[0].data
_clarify_sse_notify(session_key, dict(q[0].data), len(q))
else:
_clear_queue_locked(session_key)
_clarify_sse_notify(session_key, None, 0)
count = 0
for entry in entries:
entry.result = response
Expand Down
9 changes: 9 additions & 0 deletions api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ def __init__(self, session_id: str=None, title: str='Untitled',
self.context_length = context_length
self.threshold_tokens = threshold_tokens
self.last_prompt_tokens = last_prompt_tokens
self.is_cli_session = bool(kwargs.get('is_cli_session', False))
self.source_tag = kwargs.get('source_tag')
self.session_source = kwargs.get('session_source')
self.source_label = kwargs.get('source_label')
self._metadata_message_count = None

@property
Expand All @@ -367,6 +371,7 @@ def save(self, touch_updated_at: bool = True, skip_index: bool = False) -> None:
'pending_user_message', 'pending_attachments', 'pending_started_at',
'compression_anchor_visible_idx', 'compression_anchor_message_key',
'context_length', 'threshold_tokens', 'last_prompt_tokens',
'is_cli_session', 'source_tag', 'session_source', 'source_label',
]
meta = {k: getattr(self, k, None) for k in METADATA_FIELDS}
meta['messages'] = self.messages
Expand Down Expand Up @@ -462,6 +467,10 @@ def compact(self, include_runtime=False, active_stream_ids=None) -> dict:
'threshold_tokens': self.threshold_tokens,
'last_prompt_tokens': self.last_prompt_tokens,
'active_stream_id': self.active_stream_id,
'is_cli_session': self.is_cli_session,
'source_tag': self.source_tag,
'session_source': self.session_source,
'source_label': self.source_label,
'is_streaming': _is_streaming_session(
self.active_stream_id, active_stream_ids
) if include_runtime else False,
Expand Down
78 changes: 78 additions & 0 deletions api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,13 @@ def submit_pending(session_key: str, approval: dict) -> None:
submit_pending as submit_clarify_pending,
get_pending as get_clarify_pending,
resolve_clarify,
sse_subscribe as clarify_sse_subscribe,
sse_unsubscribe as clarify_sse_unsubscribe,
)
except ImportError:
submit_clarify_pending = lambda *a, **k: None
get_clarify_pending = lambda *a, **k: None
clarify_sse_subscribe = None
resolve_clarify = lambda *a, **k: 0


Expand Down Expand Up @@ -1271,6 +1274,9 @@ def handle_get(handler, parsed) -> bool:
if parsed.path == "/api/clarify/pending":
return _handle_clarify_pending(handler, parsed)

if parsed.path == "/api/clarify/stream":
return _handle_clarify_sse_stream(handler, parsed)

if parsed.path == "/api/clarify/inject_test":
# Loopback-only: used by automated tests; blocked from any remote client
if handler.client_address[0] != "127.0.0.1":
Expand Down Expand Up @@ -2879,6 +2885,78 @@ def _handle_clarify_pending(handler, parsed):
return j(handler, {"pending": None})


def _handle_clarify_sse_stream(handler, parsed):
"""SSE endpoint for real-time clarify notifications.

Long-lived connection that pushes clarify events the moment they arrive,
replacing the 1.5s polling loop. The frontend uses EventSource and falls
back to HTTP polling if the connection fails.
"""
if clarify_sse_subscribe is None:
return bad(handler, "clarify SSE not available")

sid = parse_qs(parsed.query).get("session_id", [""])[0]
if not sid:
return bad(handler, "session_id is required")

# Subscribe AND snapshot atomically. We import clarify's _lock so that
# subscribe and the snapshot read happen under the same mutex — same
# pattern as the approval SSE handler.
#
# NOTE: We must NOT call clarify.get_pending() here — it acquires _lock
# internally, which would deadlock since clarify._lock is a non-reentrant
# threading.Lock. Instead, read _gateway_queues / _pending inline under
# the lock we already hold.
from api.clarify import (
_lock as _clarify_lock,
_clarify_sse_subscribers as _clarify_subs,
_gateway_queues as _clarify_gateway_queues,
_pending as _clarify_pending,
)
q = queue.Queue(maxsize=16)
initial_pending = None
initial_count = 0
with _clarify_lock:
_clarify_subs.setdefault(sid, []).append(q)
gw_q = _clarify_gateway_queues.get(sid) or []
if gw_q:
initial_pending = dict(gw_q[0].data)
initial_count = len(gw_q)
else:
_legacy = _clarify_pending.get(sid)
if _legacy:
initial_pending = dict(_legacy)
initial_count = 1

handler.send_response(200)
handler.send_header('Content-Type', 'text/event-stream; charset=utf-8')
handler.send_header('Cache-Control', 'no-cache')
handler.send_header('X-Accel-Buffering', 'no')
handler.send_header('Connection', 'keep-alive')
handler.end_headers()

from api.streaming import _sse

# Push initial state immediately so the client doesn't miss anything.
_sse(handler, 'initial', {"pending": initial_pending, "pending_count": initial_count})

try:
while True:
try:
payload = q.get(timeout=30)
except queue.Empty:
handler.wfile.write(b': keepalive\n\n')
handler.wfile.flush()
continue
if payload is None:
break
_sse(handler, 'clarify', payload)
except _CLIENT_DISCONNECT_ERRORS:
pass
finally:
clarify_sse_unsubscribe(sid, q)


def _handle_clarify_inject(handler, parsed):
"""Inject a fake pending clarify prompt -- loopback-only, used by automated tests."""
qs = parse_qs(parsed.query)
Expand Down
22 changes: 22 additions & 0 deletions api/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -2241,6 +2241,28 @@ def _periodic_checkpoint():
usage['context_length'] = getattr(_cc, 'context_length', 0) or 0
usage['threshold_tokens'] = getattr(_cc, 'threshold_tokens', 0) or 0
usage['last_prompt_tokens'] = getattr(_cc, 'last_prompt_tokens', 0) or 0
# Fallback: when the compressor is absent or reports context_length=0,
# resolve the model's context window from metadata so the UI indicator
# shows the correct percentage rather than overflowing against the 128K
# JS default. Mirrors the session-save fallback above (lines ~2205-2217).
if not usage.get('context_length'):
try:
from agent.model_metadata import get_model_context_length as _get_cl
_fb_cl = _get_cl(
getattr(agent, 'model', resolved_model or '') or '',
getattr(agent, 'base_url', '') or '',
)
if _fb_cl:
usage['context_length'] = _fb_cl
except Exception:
pass
# Fallback: when last_prompt_tokens is missing (no compressor), use the
# session-persisted value rather than letting the frontend fall back to
# the cumulative input_tokens counter, which overflows for long sessions.
if not usage.get('last_prompt_tokens'):
_sess_lpt = getattr(s, 'last_prompt_tokens', 0) or 0
if _sess_lpt:
usage['last_prompt_tokens'] = _sess_lpt
# (reasoning trace already attached + saved above, before s.save())
# Leftover-steer delivery: if a /steer was queued (via
# api/chat/steer) but the agent finished its turn before
Expand Down
Loading
Loading