diff --git a/CHANGELOG.md b/CHANGELOG.md index 6498698ae4..7dcbbb23cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,18 @@ ## [Unreleased] +## [v0.51.122] — 2026-05-24 — Release CT (stage-batch4 — 4-PR low-risk batch — stale cache tail / inflight UI / segment flush / reasoning accumulator) + +### Fixed + +- **PR #2802** by @ai-ag2026 — Drop stale inactive cached user tails when `/api/session` reloads a conversation whose saved sidecar already ends on an assistant answer. Supersedes #2733 (held due to async-compression interaction): the new guard adds a `len(cached_messages) <= len(disk_messages)` filter so it never fires when the cache has genuine new concurrent edits beyond the disk state — only when the cache has an unsaved user row past the saved assistant tail. Adds `api/models._inactive_cache_tail_needs_disk_check()` + `_cache_has_stale_unsaved_user_tail()` helpers and 5 new tests in `tests/test_webui_state_db_reconciliation.py`. Previously-held test `test_session_compress_async_reports_stale_session_guard` now passes (verified). Closes umbrella #2361 partially. + +- **PR #2796** by @ai-ag2026 — Clear stale inflight UI state before starting a new send so blocked composer busy-state from failed/incomplete prior turns doesn't divert new turns into the invisible queue. Five-commit squashed fix: (1) drop stale optimistic sidebar rows once canonical session data arrives, (2) clear stale busy state before send via `_clearStaleBusyStateBeforeSend()`, (3) preserve server idle rows over stale optimistic local rows, (4) let `/api/chat/start` survive non-fatal pre-start UI errors via `_runOptionalPreStartUiStep()`, (5) keep those warnings console-only instead of throwing. Adds `_shouldKeepLocalOnlyOptimisticSessionRow()` in `static/sessions.js` and 8 new tests in `tests/test_inflight_send_start_race.py`. Closes #2795. Authorship preserved via `--author`. + +- **PR #2777** by @b3nw — Flush pending render before segment reset at tool/interim_assistant boundaries so live tokens that arrived in the 66ms rAF throttle window don't get lost from the DOM when `_resetAssistantSegment()` clears `assistantBody`. New `_flushPendingSegmentRender()` helper writes via `smd`, `renderMd`, or `esc` fallback (same paths as `_doRender`) only when `_renderPending` is true. Completed transcripts were never affected — `renderMessages` rebuilds from the full `assistantText` accumulator on `done`. Adds `tests/test_issue2713_streaming_segment_flush.py`. Closes #2713. + +- **PR #2778** by @b3nw — Reset reasoning accumulator per turn and prefer `reasoning_content` over `reasoning` on read. Two related bugs: (1) `reasoningText` was initialized once when the SSE stream opened and never reset between turns, so the `done` event would assign the union of every turn's reasoning to the last assistant message in multi-turn agent sessions; now reset at both turn boundaries (`tool` + `interim_assistant`). (2) `static/ui.js renderMessages` preferred `m.reasoning` (potentially corrupted by bug 1) over `m.reasoning_content` (the clean per-turn backend value); the fallback now reads `m.reasoning_content || m.reasoning`. Updates `tests/test_streaming_race_fix.py` to scope the reconnect-accumulator guard to the `_wireSSE` preamble only (turn-boundary resets inside event listeners are intentional). Adds `tests/test_issue2565_reasoning_accumulation.py`. Closes #2565. + ## [v0.51.121] — 2026-05-24 — Release CS (stage-batch3 — 4-PR low-risk batch — state.db merge / display counts / compression marker / Windows launcher) ### Fixed diff --git a/api/models.py b/api/models.py index 1fde1b510b..f6416650c6 100644 --- a/api/models.py +++ b/api/models.py @@ -1723,6 +1723,89 @@ def _repair_stale_pending(session) -> bool: return False +def _last_non_tool_role(messages) -> str: + if not isinstance(messages, list): + return '' + for message in reversed(messages): + role = _message_role(message) + if role and role != 'tool': + return role + return '' + + +def _last_non_tool_message(messages): + if not isinstance(messages, list): + return None + for message in reversed(messages): + role = _message_role(message) + if role and role != 'tool': + return message + return None + + +def _message_content_text(message) -> str: + if not isinstance(message, dict): + return '' + content = message.get('content') + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for item in content: + if isinstance(item, str): + parts.append(item) + elif isinstance(item, dict) and isinstance(item.get('text'), str): + parts.append(item['text']) + return ''.join(parts) + return '' + + +def _inactive_cache_tail_needs_disk_check(cached) -> bool: + if cached is None: + return False + if getattr(cached, 'active_stream_id', None) or getattr(cached, 'pending_user_message', None): + return False + return _last_non_tool_role(getattr(cached, 'messages', None) or []) == 'user' + + +def _cache_has_stale_unsaved_user_tail(cached, disk_session) -> bool: + """Return True when an inactive cached session has an unsaved user tail. + + A completed turn is saved to the sidecar before the browser reloads it. In + rare compaction/reconnect paths the in-process cache can retain a recovered + or optimistic user row after the saved assistant tail even though the row was + never persisted. If /api/session serves that cache entry, the visible + transcript appears to end on the old prompt and the saved assistant answer + looks missing until a fork/reload resets the cache. + """ + if cached is None or disk_session is None: + return False + if getattr(cached, 'active_stream_id', None) or getattr(cached, 'pending_user_message', None): + return False + cached_messages = getattr(cached, 'messages', None) or [] + disk_messages = getattr(disk_session, 'messages', None) or [] + if len(cached_messages) <= len(disk_messages): + return False + if _last_non_tool_role(cached_messages) != 'user': + return False + if _last_non_tool_role(disk_messages) != 'assistant': + return False + + cached_tail = _last_non_tool_message(cached_messages) + previous_disk_user = None + for message in reversed(disk_messages): + if _message_role(message) == 'user': + previous_disk_user = message + break + if previous_disk_user is None: + return False + + # Only drop tails that look like a duplicated optimistic/recovered user row. + # A genuinely new concurrent user edit must stay in memory so stale-session + # guards can report and preserve it. + return _message_content_text(cached_tail) == _message_content_text(previous_disk_user) + + def get_session(sid, metadata_only=False): """Load a session, optionally with metadata only (skipping the messages array). @@ -1736,6 +1819,19 @@ def get_session(sid, metadata_only=False): if cached is not None: SESSIONS.move_to_end(sid) # LRU: mark as recently used if cached is not None: + if not metadata_only and _inactive_cache_tail_needs_disk_check(cached): + try: + disk_session = Session.load(sid) + if _cache_has_stale_unsaved_user_tail(cached, disk_session): + with LOCK: + SESSIONS[sid] = disk_session + SESSIONS.move_to_end(sid) + cached = disk_session + except Exception: + logger.debug( + "stale cached user-tail check failed for session %s", + sid, exc_info=True, + ) if not metadata_only and _session_has_pending_journal_retry(cached): try: _try_retry_journal_recovery_in_place(cached) diff --git a/static/messages.js b/static/messages.js index 82f231b5f3..170952ddc2 100644 --- a/static/messages.js +++ b/static/messages.js @@ -190,6 +190,42 @@ let _sendInProgress = false; let _sendInProgressSid = null; // session_id of the in-flight send const _sessionTitleProvisionalBySid = new Map(); +function _clearStaleBusyStateBeforeSend({compressionRunning=false}={}){ + if(!S||!S.busy||compressionRunning) return false; + const session=S.session||{}; + const sid=session.session_id||''; + const hasRuntimeConfirmation=Boolean( + S.activeStreamId|| + session.active_stream_id|| + session.pending_user_message|| + session.pending_started_at + ); + if(hasRuntimeConfirmation) return false; + if(typeof INFLIGHT==='object'&&INFLIGHT&&sid&&INFLIGHT[sid]){ + delete INFLIGHT[sid]; + if(typeof clearInflightState==='function') clearInflightState(sid); + } + S.activeStreamId=null; + if(session) session.active_stream_id=null; + if(typeof setBusy==='function') setBusy(false); + else S.busy=false; + if(typeof setComposerStatus==='function') setComposerStatus(''); + if(typeof setStatus==='function') setStatus(''); + if(typeof updateSendBtn==='function') updateSendBtn(); + if(sid&&typeof clearOptimisticSessionStreaming==='function') clearOptimisticSessionStreaming(sid); + return true; +} + +function _runOptionalPreStartUiStep(label, fn){ + try{ + return typeof fn==='function'?fn():undefined; + }catch(e){ + const message=e&&e.message?e.message:String(e||'unknown error'); + try{console.warn('[webui] optional pre-start UI step failed', label, message);}catch(_){ } + return undefined; + } +} + function _sessionTitleLooksDefaultOrProvisional(titleText, provisionalText){ const title=String(titleText||'').replace(/\s+/g,' ').trim(); if(!title||title==='Untitled'||title==='New Chat')return true; @@ -262,6 +298,7 @@ async function send(){ } const compressionRunning=typeof isCompressionUiRunning==='function'&&isCompressionUiRunning(); + _clearStaleBusyStateBeforeSend({compressionRunning}); // If busy or a manual compression is still running, handle based on busy_input_mode if(S.busy||compressionRunning){ if(text){ @@ -409,39 +446,68 @@ async function send(){ const userMsg={role:'user',content:displayText,attachments:uploaded.length?uploadedNames:undefined,_ts:Date.now()/1000}; S.toolCalls=[]; // clear tool calls from previous turn clearLiveToolCards(); // clear any leftover live cards from last turn - S.messages.push(userMsg);renderMessages();appendThinking('',{pending:true});setBusy(true); - // First optimistic pass: make the local user turn visible before /api/chat/start - // can save pending state on the server. - if(typeof upsertActiveSessionForLocalTurn==='function'){ - upsertActiveSessionForLocalTurn({title:displayText.slice(0,64),messageCount:S.messages.length,timestampMs:Date.now()}); - } - const optimisticMessages=[...S.messages]; - INFLIGHT[activeSid]={messages:optimisticMessages,uploaded:uploadedNames,toolCalls:[]}; - if(typeof saveInflightState==='function'){ - saveInflightState(activeSid,{streamId:null,messages:INFLIGHT[activeSid].messages,uploaded:uploadedNames,toolCalls:[]}); - } - if(typeof renderSessionListFromCache==='function') renderSessionListFromCache(); - startApprovalPolling(activeSid); - startClarifyPolling(activeSid); - _fetchYoloState(activeSid); // sync YOLO pill with backend state - S.activeStreamId = null; // will be set after stream starts - if(typeof updateSendBtn==='function') updateSendBtn(); + let optimisticMessages; + try{ + S.messages.push(userMsg);renderMessages();appendThinking('',{pending:true});setBusy(true); + // First optimistic pass: make the local user turn visible before /api/chat/start + // can save pending state on the server. + _runOptionalPreStartUiStep('upsertActiveSessionForLocalTurn.initial', ()=>{ + if(typeof upsertActiveSessionForLocalTurn==='function'){ + upsertActiveSessionForLocalTurn({title:displayText.slice(0,64),messageCount:S.messages.length,timestampMs:Date.now()}); + } + }); + optimisticMessages=[...S.messages]; + INFLIGHT[activeSid]={messages:optimisticMessages,uploaded:uploadedNames,toolCalls:[]}; + if(typeof saveInflightState==='function'){ + saveInflightState(activeSid,{streamId:null,messages:INFLIGHT[activeSid].messages,uploaded:uploadedNames,toolCalls:[]}); + } + _runOptionalPreStartUiStep('renderSessionListFromCache.initial', ()=>{ + if(typeof renderSessionListFromCache==='function') renderSessionListFromCache(); + }); + _runOptionalPreStartUiStep('startApprovalPolling.prestart', ()=>startApprovalPolling(activeSid)); + _runOptionalPreStartUiStep('startClarifyPolling.prestart', ()=>startClarifyPolling(activeSid)); + _runOptionalPreStartUiStep('fetchYoloState.prestart', ()=>_fetchYoloState(activeSid)); // sync YOLO pill with backend state + S.activeStreamId = null; // will be set after stream starts + _runOptionalPreStartUiStep('updateSendBtn.prestart', ()=>{ + if(typeof updateSendBtn==='function') updateSendBtn(); + }); - // Set provisional title from user message immediately so session appears - // in the sidebar right away with a meaningful name. /api/chat/start persists - // the server-side provisional title and may refine this optimistic text. - if(S.session&&(S.session.title==='Untitled'||!S.session.title)){ - const provisionalTitle=displayText.slice(0,64); - applySessionTitleUpdate(activeSid, provisionalTitle, {force:true, rememberProvisional:true}); - if(typeof upsertActiveSessionForLocalTurn==='function'){ - // Second optimistic pass: carry the provisional title into the cached row - // without re-fetching /api/sessions before pending state exists server-side. - upsertActiveSessionForLocalTurn({title:provisionalTitle,messageCount:S.messages.length,timestampMs:Date.now()}); + // Set provisional title from user message immediately so session appears + // in the sidebar right away with a meaningful name. /api/chat/start persists + // the server-side provisional title and may refine this optimistic text. + if(S.session&&(S.session.title==='Untitled'||!S.session.title)){ + const provisionalTitle=displayText.slice(0,64); + _runOptionalPreStartUiStep('applySessionTitleUpdate.provisional', ()=>{ + applySessionTitleUpdate(activeSid, provisionalTitle, {force:true, rememberProvisional:true}); + }); + _runOptionalPreStartUiStep('upsertActiveSessionForLocalTurn.provisional', ()=>{ + if(typeof upsertActiveSessionForLocalTurn==='function'){ + // Second optimistic pass: carry the provisional title into the cached row + // without re-fetching /api/sessions before pending state exists server-side. + upsertActiveSessionForLocalTurn({title:provisionalTitle,messageCount:S.messages.length,timestampMs:Date.now()}); + } + }); + } else if(typeof upsertActiveSessionForLocalTurn==='function'){ + _runOptionalPreStartUiStep('upsertActiveSessionForLocalTurn.titled', ()=>{ + upsertActiveSessionForLocalTurn({title:S.session&&S.session.title||displayText.slice(0,64),messageCount:S.messages.length,timestampMs:Date.now()}); + }); + } else { + _runOptionalPreStartUiStep('renderSessionListFromCache.prestart', ()=>{ + renderSessionListFromCache(); // ensure it's visible even if already titled + }); } - } else if(typeof upsertActiveSessionForLocalTurn==='function'){ - upsertActiveSessionForLocalTurn({title:S.session&&S.session.title||displayText.slice(0,64),messageCount:S.messages.length,timestampMs:Date.now()}); - } else { - renderSessionListFromCache(); // ensure it's visible even if already titled + }catch(preStartError){ + // The user turn must reach /api/chat/start even if local optimistic UI + // bookkeeping (render cache, storage quota, sidebar reconciliation, etc.) + // throws. Otherwise the pane can show a user bubble + spinner while the + // backend never receives the turn. + const message=preStartError&&preStartError.message?preStartError.message:String(preStartError||'unknown error'); + try{console.warn('[webui] pre-start optimistic UI failed; continuing to /api/chat/start', message);}catch(_){ } + if(!S.messages.includes(userMsg)) S.messages.push(userMsg); + optimisticMessages=[...S.messages]; + INFLIGHT[activeSid]={messages:optimisticMessages,uploaded:uploadedNames,toolCalls:[]}; + try{setBusy(true);}catch(_){S.busy=true;} + S.activeStreamId=null; } // Start the agent via POST, get a stream_id back @@ -1285,6 +1351,20 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ }; step(); } + function _flushPendingSegmentRender(){ + if(!assistantBody||!_renderPending) return; + _cancelAnimationFramePendingStreamRender(); + const displayText=segmentStart===0 + ? _parseStreamState().displayText + : _stripXmlToolCalls(assistantText.slice(segmentStart)); + if(_smdParser){ + _smdWrite(displayText); + } else if(renderMd){ + assistantBody.innerHTML=renderMd(displayText); + } else { + assistantBody.innerHTML=esc(displayText); + } + } function _resetAssistantSegment(){ assistantRow=null; assistantBody=null; @@ -1410,6 +1490,8 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ if(!visible){ return; } + reasoningText=''; + liveReasoningText=''; if(alreadyStreamed){ if(!S.session||S.session.session_id!==activeSid) return; _resetAssistantSegment(); @@ -1423,6 +1505,7 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ if(typeof updateThinking==='function') updateThinking(_liveThinkingText()); else appendThinking(_liveThinkingText()); } + _flushPendingSegmentRender(); ensureAssistantRow(true); _resetAssistantSegment(); _scheduleRender(); @@ -1467,12 +1550,14 @@ function attachLiveStream(activeSid, streamId, uploaded=[], options={}){ // to be re-created below everything when reasoning resumed post-tool. if(typeof finalizeThinkingCard==='function') finalizeThinkingCard(); liveReasoningText=''; + reasoningText=''; const oldRow=$('toolRunningRow');if(oldRow)oldRow.remove(); appendLiveToolCard(tc); snapshotLiveTurn(); // Reset the live assistant row reference so that any text tokens arriving // after this tool call create a NEW segment appended below the tool card, // rather than updating the old segment that sits above it in the DOM. + _flushPendingSegmentRender(); _freshSegment=true; _smdEndParser(); _resetAssistantSegment(); diff --git a/static/sessions.js b/static/sessions.js index b8546624f0..8a55fef1a0 100644 --- a/static/sessions.js +++ b/static/sessions.js @@ -2024,6 +2024,31 @@ function _isOptimisticFirstTurnSessionRow(s){ ); } +function _shouldKeepLocalOnlyOptimisticSessionRow(local){ + if(!_isOptimisticFirstTurnSessionRow(local)) return false; + const sid=local.session_id; + if(typeof _sendInProgress!=='undefined'&&_sendInProgress&&sid===_sendInProgressSid) return true; + const activeSid=S&&S.session&&S.session.session_id; + const isActive=Boolean(activeSid&&activeSid===sid); + const hasRuntimeConfirmation=Boolean(local.active_stream_id||local.pending_user_message||local.pending_started_at); + if(isActive&&S.busy&&hasRuntimeConfirmation) return true; + const localTs=Number(local.last_message_at||local.updated_at||0); + const ageMs=localTs>0?Date.now()-(localTs*1000):Infinity; + return Boolean(isActive&&S.busy&&ageMs>=0&&ageMs<5000); +} + +function _dropStaleOptimisticSessionRow(sid){ + if(!sid) return; + if(INFLIGHT&&INFLIGHT[sid]){ + delete INFLIGHT[sid]; + if(typeof clearInflightState==='function') clearInflightState(sid); + } + if(typeof _sessionStreamingById!=='undefined'&&_sessionStreamingById&&typeof _sessionStreamingById.set==='function'){ + _sessionStreamingById.set(sid,false); + } + if(typeof _forgetObservedStreamingSession==='function') _forgetObservedStreamingSession(sid); +} + function _mergeOptimisticFirstTurnSessions(fetchedSessions){ const merged=Array.isArray(fetchedSessions)?[...fetchedSessions]:[]; const bySid=new Map(); @@ -2035,24 +2060,31 @@ function _mergeOptimisticFirstTurnSessions(fetchedSessions){ if(idx>=0){ const fetched=merged[idx]||{}; const fetchedIsServerIdle=_isServerIdleSessionRow(fetched); + const keepLocalOptimistic=fetchedIsServerIdle?false:_shouldKeepLocalOnlyOptimisticSessionRow(local); const localCount=Number(local.message_count||0); const fetchedCount=Number(fetched.message_count||0); const localTs=Number(local.last_message_at||local.updated_at||0); const fetchedTs=Number(fetched.last_message_at||fetched.updated_at||0); + if(!keepLocalOptimistic&&typeof _dropStaleOptimisticSessionRow==='function') _dropStaleOptimisticSessionRow(sid); merged[idx]={ ...local, ...fetched, - message_count:Math.max(localCount,fetchedCount), - last_message_at:Math.max(localTs,fetchedTs), - updated_at:Math.max(Number(local.updated_at||0),Number(fetched.updated_at||0),localTs,fetchedTs), - active_stream_id:fetchedIsServerIdle?null:(fetched.active_stream_id||local.active_stream_id||null), - pending_user_message:fetchedIsServerIdle?null:(fetched.pending_user_message||local.pending_user_message||null), - pending_started_at:fetchedIsServerIdle?null:(fetched.pending_started_at||local.pending_started_at||null), - is_streaming:fetchedIsServerIdle?false:Boolean(fetched.is_streaming||local.is_streaming||_isSessionLocallyStreaming(local)), + title:keepLocalOptimistic?(local.title||fetched.title):fetched.title, + message_count:keepLocalOptimistic?Math.max(localCount,fetchedCount):fetchedCount, + last_message_at:keepLocalOptimistic?Math.max(localTs,fetchedTs):fetchedTs, + updated_at:keepLocalOptimistic?Math.max(Number(local.updated_at||0),Number(fetched.updated_at||0),localTs,fetchedTs):Number(fetched.updated_at||fetchedTs||0), + active_stream_id:fetchedIsServerIdle?null:(keepLocalOptimistic?(fetched.active_stream_id||local.active_stream_id||null):null), + pending_user_message:fetchedIsServerIdle?null:(keepLocalOptimistic?(fetched.pending_user_message||local.pending_user_message||null):null), + pending_started_at:fetchedIsServerIdle?null:(keepLocalOptimistic?(fetched.pending_started_at||local.pending_started_at||null):null), + is_streaming:fetchedIsServerIdle?false:(keepLocalOptimistic&&Boolean(fetched.is_streaming||local.is_streaming||_isSessionLocallyStreaming(local))), }; }else{ - merged.push({...local,is_streaming:true}); - bySid.set(sid,merged.length-1); + if(_shouldKeepLocalOnlyOptimisticSessionRow(local)){ + merged.push({...local,is_streaming:true}); + bySid.set(sid,merged.length-1); + }else{ + _dropStaleOptimisticSessionRow(sid); + } } } return merged; diff --git a/static/ui.js b/static/ui.js index 3737fff89c..5dc03b7ac5 100644 --- a/static/ui.js +++ b/static/ui.js @@ -6098,7 +6098,7 @@ function renderMessages(options){ thinkingText=content.filter(p=>p&&(p.type==='thinking'||p.type==='reasoning')).map(p=>p.thinking||p.reasoning||p.text||'').join('\n'); content=content.filter(p=>p&&p.type==='text').map(p=>p.text||p.content||'').join('\n'); } - if(!thinkingText && m.reasoning) thinkingText=m.reasoning; + if(!thinkingText && (m.reasoning_content || m.reasoning)) thinkingText=m.reasoning_content || m.reasoning; if(!thinkingText && typeof content==='string'){ const thinkMatch=content.match(/^\s*([\s\S]*?)<\/think>\s*/); if(thinkMatch){ diff --git a/tests/test_inflight_send_start_race.py b/tests/test_inflight_send_start_race.py index 933e62a428..4b0c7ba500 100644 --- a/tests/test_inflight_send_start_race.py +++ b/tests/test_inflight_send_start_race.py @@ -24,7 +24,7 @@ def _function_body(src: str, name: str) -> str: def test_send_preserves_optimistic_messages_across_chat_start_await(): """send() must not dereference INFLIGHT[activeSid] after await without a fallback.""" body = _function_body(MESSAGES_JS, "send") - setup_idx = body.index("const optimisticMessages=[...S.messages];") + setup_idx = body.index("optimisticMessages=[...S.messages];") inflight_idx = body.index("INFLIGHT[activeSid]={messages:optimisticMessages") await_idx = body.index("const startData=await api('/api/chat/start'") save_idx = body.index("saveInflightState(activeSid,{streamId", await_idx) @@ -49,3 +49,90 @@ def test_stale_inflight_purge_preserves_current_send_before_stream_id_exists(): skip_idx = body.index("_sendInProgress") delete_idx = body.index("delete INFLIGHT[sid];") assert skip_idx < delete_idx, "the current-send skip must run before any purge deletion" + + +def test_send_clears_stale_busy_state_before_queue_branch(): + """A stale client-only busy flag must not divert a new user turn into the invisible queue.""" + body = _function_body(MESSAGES_JS, "send") + + assert "_clearStaleBusyStateBeforeSend" in body, ( + "send() should reconcile client-only stale busy state before deciding busy/queue mode" + ) + reconcile_idx = body.index("_clearStaleBusyStateBeforeSend") + busy_branch_idx = body.index("if(S.busy||compressionRunning)") + chat_start_idx = body.index("api('/api/chat/start'") + assert reconcile_idx < busy_branch_idx < chat_start_idx, ( + "stale busy reconciliation must run before the queue branch and before /api/chat/start" + ) + + +def test_pre_start_optimistic_ui_helpers_cannot_block_chat_start(): + """Optional optimistic UI helpers must not strand a local bubble before /api/chat/start.""" + body = _function_body(MESSAGES_JS, "send") + helper_body = _function_body(MESSAGES_JS, "_runOptionalPreStartUiStep") + + optimistic_idx = body.index("S.messages.push(userMsg);renderMessages();appendThinking('',{pending:true});setBusy(true);") + chat_start_idx = body.index("api('/api/chat/start'") + pre_start = body[optimistic_idx:chat_start_idx] + + assert "try" in helper_body and "catch" in helper_body, ( + "optional pre-start UI helper wrapper must catch errors before /api/chat/start" + ) + assert "setStatus(`UI warning before send:" not in helper_body, ( + "non-fatal pre-start UI helper failures should stay in the console; visible status flashes " + "look like real send errors even though /api/chat/start continues" + ) + assert "_runOptionalPreStartUiStep" in pre_start, ( + "send() should wrap optimistic sidebar/title/polling helpers before /api/chat/start" + ) + assert "upsertActiveSessionForLocalTurn" in pre_start and "applySessionTitleUpdate" in pre_start + + +def test_pre_start_optimistic_block_cannot_prevent_chat_start(): + """Any pre-start UI/storage exception must still fall through to /api/chat/start.""" + body = _function_body(MESSAGES_JS, "send") + optimistic_idx = body.index("S.messages.push(userMsg);renderMessages();appendThinking('',{pending:true});setBusy(true);") + chat_start_idx = body.index("api('/api/chat/start'") + pre_start = body[optimistic_idx:chat_start_idx] + + assert "}catch(preStartError){" in pre_start, ( + "The whole optimistic pre-start block needs a catch, not only individual optional helpers" + ) + assert "continuing to /api/chat/start" in pre_start, ( + "The recovery path should document that chat/start must still execute" + ) + assert pre_start.rindex("}catch(preStartError){") < chat_start_idx, ( + "pre-start catch must be before the /api/chat/start call" + ) + + +def test_server_absent_optimistic_first_turn_rows_are_not_kept_forever(): + """A local first-turn sidebar row must expire when /api/chat/start never persisted it.""" + body = _function_body(SESSIONS_JS, "_mergeOptimisticFirstTurnSessions") + + assert "_shouldKeepLocalOnlyOptimisticSessionRow(local)" in body, ( + "server-absent optimistic rows need an explicit keep/drop gate" + ) + keep_idx = body.index("if(_shouldKeepLocalOnlyOptimisticSessionRow(local))") + append_idx = body.index("merged.push({...local,is_streaming:true});") + drop_idx = body.index("_dropStaleOptimisticSessionRow(sid);", append_idx) + assert keep_idx < append_idx < drop_idx, ( + "local optimistic rows may only be appended inside the explicit keep gate" + ) + drop_body = _function_body(SESSIONS_JS, "_dropStaleOptimisticSessionRow") + assert "clearInflightState(sid)" in drop_body, ( + "dropping a phantom row should also clear persisted browser recovery state" + ) + + +def test_server_idle_row_wins_over_stale_optimistic_count(): + """If the server says the row is idle, stale local message_count/title must not win.""" + body = _function_body(SESSIONS_JS, "_mergeOptimisticFirstTurnSessions") + + assert "const keepLocalOptimistic=" in body + assert "message_count:keepLocalOptimistic?Math.max(localCount,fetchedCount):fetchedCount" in body, ( + "stale optimistic message_count must not override a confirmed idle server row" + ) + assert "title:keepLocalOptimistic?(local.title||fetched.title):fetched.title" in body, ( + "stale optimistic provisional title must not override a confirmed idle server row" + ) diff --git a/tests/test_issue2565_reasoning_accumulation.py b/tests/test_issue2565_reasoning_accumulation.py new file mode 100644 index 0000000000..c3475b681b --- /dev/null +++ b/tests/test_issue2565_reasoning_accumulation.py @@ -0,0 +1,154 @@ +"""Regression tests for issue #2565: reasoning display bugs. + +Issue 1: reasoningText accumulates across turns within a single SSE stream. + - reasoningText must be reset at each turn boundary (tool and interim_assistant + events) so the done event only persists the current turn's reasoning. + +Issue 2: ui.js display prefers m.reasoning over m.reasoning_content. + - The rendering path must prefer m.reasoning_content (the clean per-turn value + from the backend) over m.reasoning (which can be corrupted by Issue 1). + +Both fixes are needed: Issue 2 alone cannot cover providers that stream reasoning +events without populating reasoning_content on the final API message. +""" + +import pathlib +import re + +REPO = pathlib.Path(__file__).parent.parent + + +def read(rel): + return (REPO / rel).read_text(encoding='utf-8') + + +# ── Issue 1: reasoningText reset at turn boundaries ────────────────────────── + + +class TestReasoningTextResetOnTool: + """reasoningText must be reset alongside liveReasoningText in the tool + listener so multi-tool-turn sessions don't accumulate reasoning across + turns.""" + + def _tool_listener_body(self): + """Extract the full tool listener body between the tool and + tool_complete addEventListener calls.""" + src = read('static/messages.js') + tool_start = src.find("source.addEventListener('tool'") + assert tool_start >= 0, "tool listener not found" + tool_complete_start = src.find( + "source.addEventListener('tool_complete'", tool_start + 1, + ) + assert tool_complete_start >= 0, "tool_complete listener not found" + return src[tool_start:tool_complete_start] + + def test_reasoning_text_reset_in_tool_listener(self): + body = self._tool_listener_body() + assert "reasoningText=''" in body, ( + "reasoningText must be reset to '' inside the tool listener " + "(Issue 1: accumulated reasoning from prior turns was assigned " + "to the last assistant message on the done event)" + ) + + def test_live_reasoning_text_also_reset_in_tool_listener(self): + body = self._tool_listener_body() + assert "liveReasoningText=''" in body, ( + "liveReasoningText must also be reset in the tool listener" + ) + + +class TestReasoningTextResetOnInterimAssistant: + """reasoningText must be reset at the interim_assistant boundary — the + other turn boundary where the previous turn's reasoning closes out. + Without this, providers that emit reasoning before an interim_assistant + event will still co-mingle reasoning across turns.""" + + def test_reasoning_text_reset_in_interim_assistant_listener(self): + src = read('static/messages.js') + m = re.search( + r"source\.addEventListener\('interim_assistant'\s*,\s*(?:e|ev)\s*=>\s*\{(.*?)\n\s*\}\);", + src, re.DOTALL, + ) + assert m, "interim_assistant listener not found in messages.js" + body = m.group(1) + assert "reasoningText=''" in body, ( + "reasoningText must be reset to '' inside the interim_assistant " + "listener (Issue 1: turn boundary where prior reasoning closes)" + ) + + def test_live_reasoning_text_reset_in_interim_assistant_listener(self): + src = read('static/messages.js') + m = re.search( + r"source\.addEventListener\('interim_assistant'\s*,\s*(?:e|ev)\s*=>\s*\{(.*?)\n\s*\}\);", + src, re.DOTALL, + ) + assert m + body = m.group(1) + assert "liveReasoningText=''" in body, ( + "liveReasoningText must be reset in the interim_assistant listener" + ) + + +# ── Issue 2: reasoning_content preference on read ──────────────────────────── + + +class TestReasoningContentPreference: + """The rendering path in ui.js must prefer m.reasoning_content (the clean + per-turn value from the backend) over m.reasoning (which can be corrupted + by Issue 1's accumulation bug).""" + + def test_reasoning_content_checked_before_reasoning(self): + src = read('static/ui.js') + assert 'm.reasoning_content' in src, ( + "ui.js must reference m.reasoning_content so the clean per-turn " + "value from the backend is used for thinking card display" + ) + + def test_reasoning_content_preferred_in_thinking_text_fallback(self): + src = read('static/ui.js') + lines = src.splitlines() + for line in lines: + if 'thinkingText' in line and 'm.reasoning' in line: + if 'm.reasoning_content' not in line and 'reasoning_content' not in line: + if 'Array.isArray' not in line: + raise AssertionError( + f"Line references m.reasoning without checking " + f"m.reasoning_content first: {line.strip()}" + ) + + def test_reasoning_content_has_priority_over_reasoning(self): + """The fallback expression must evaluate reasoning_content first.""" + src = read('static/ui.js') + m = re.search( + r"thinkingText\s*=\s*(m\.reasoning_content\s*\|\|\s*m\.reasoning)", + src, + ) + assert m, ( + "thinkingText assignment must use m.reasoning_content || m.reasoning " + "so the clean backend value takes priority over the potentially " + "corrupted frontend-accumulated value" + ) + + +# ── Cross-cutting: done event still has the persist-on-done guard ──────────── + + +class TestDoneEventReasoningPersist: + """The done event's reasoning persistence guard must still exist — + the reset fixes reduce the blast radius but the guard prevents double-write + when the backend already populated .reasoning.""" + + def test_done_event_has_reasoning_guard(self): + src = read('static/messages.js') + assert '!lastAsst.reasoning' in src, ( + "done event must guard reasoningText persistence with " + "!lastAsst.reasoning to avoid overwriting backend-populated values" + ) + + def test_done_event_persists_reasoning_text(self): + src = read('static/messages.js') + assert 'lastAsst.reasoning=reasoningText' in src, ( + "done event must still persist reasoningText to lastAsst.reasoning " + "for providers that stream reasoning events without populating " + "reasoning_content on the final API message" + ) diff --git a/tests/test_issue2713_streaming_segment_flush.py b/tests/test_issue2713_streaming_segment_flush.py new file mode 100644 index 0000000000..83259f9a61 --- /dev/null +++ b/tests/test_issue2713_streaming_segment_flush.py @@ -0,0 +1,178 @@ +"""Regression tests for #2713 — flush pending render before segment reset. + +During live streaming with tool calls, the rAF-throttled render callback could +be orphaned when _resetAssistantSegment() cleared assistantBody before the +pending callback fired. The fix introduces _flushPendingSegmentRender() which +synchronously writes any pending segment text to the DOM before the segment is +sealed. + +These tests use static analysis (same pattern as test_streaming_race_fix.py) +to pin the structural invariants so a future refactor cannot silently re-break +the flush guarantee. +""" +import pathlib +import re + +REPO = pathlib.Path(__file__).parent.parent + + +def read(rel): + return (REPO / rel).read_text(encoding="utf-8") + + +class TestFlushHelperExists: + """_flushPendingSegmentRender must exist and have the right shape.""" + + def test_flush_helper_declared(self): + src = read("static/messages.js") + assert "function _flushPendingSegmentRender()" in src, ( + "_flushPendingSegmentRender helper must be declared in messages.js" + ) + + def test_flush_helper_guards_on_assistant_body(self): + src = read("static/messages.js") + m = re.search( + r"function _flushPendingSegmentRender\(\)\{.*?\n \}", + src, + re.DOTALL, + ) + assert m, "_flushPendingSegmentRender not found" + fn = m.group(0) + assert "assistantBody" in fn, ( + "_flushPendingSegmentRender must guard on assistantBody" + ) + + def test_flush_helper_guards_on_render_pending(self): + src = read("static/messages.js") + m = re.search( + r"function _flushPendingSegmentRender\(\)\{.*?\n \}", + src, + re.DOTALL, + ) + assert m + fn = m.group(0) + assert "_renderPending" in fn, ( + "_flushPendingSegmentRender must guard on _renderPending" + ) + + def test_flush_helper_cancels_pending_raf(self): + src = read("static/messages.js") + m = re.search( + r"function _flushPendingSegmentRender\(\)\{.*?\n \}", + src, + re.DOTALL, + ) + assert m + fn = m.group(0) + assert "_cancelAnimationFramePendingStreamRender()" in fn, ( + "_flushPendingSegmentRender must cancel the pending rAF" + ) + + def test_flush_helper_uses_smd_write(self): + src = read("static/messages.js") + m = re.search( + r"function _flushPendingSegmentRender\(\)\{.*?\n \}", + src, + re.DOTALL, + ) + assert m + fn = m.group(0) + assert "_smdWrite(" in fn, ( + "_flushPendingSegmentRender must write via _smdWrite for smd path" + ) + + def test_flush_helper_has_render_md_fallback(self): + src = read("static/messages.js") + m = re.search( + r"function _flushPendingSegmentRender\(\)\{.*?\n \}", + src, + re.DOTALL, + ) + assert m + fn = m.group(0) + assert "renderMd" in fn, ( + "_flushPendingSegmentRender must have renderMd fallback" + ) + + def test_flush_helper_has_esc_fallback(self): + src = read("static/messages.js") + m = re.search( + r"function _flushPendingSegmentRender\(\)\{.*?\n \}", + src, + re.DOTALL, + ) + assert m + fn = m.group(0) + assert "esc(" in fn, ( + "_flushPendingSegmentRender must have esc() fallback" + ) + + +def _extract_handler(src, event_name): + """Extract a full SSE handler body by matching balanced indentation. + + Finds `source.addEventListener(''` and captures through the + matching ` });` closing (4-space indent, matching the addEventListener + call site inside _wireSSE). + """ + start_pattern = f"source.addEventListener('{event_name}'" + start = src.index(start_pattern) + # Find the closing ` });` that ends this handler at 6-space indent level + # (the handler bodies are indented 6 spaces inside _wireSSE) + end_marker = "\n });" + pos = start + while True: + idx = src.index(end_marker, pos + 1) + # Confirm the next line after `});` starts a new addEventListener or + # is at the same or lower indent. Accept first match after the handler + # body has at least some content. + if idx > start + len(start_pattern) + 20: + return src[start : idx + len(end_marker)] + pos = idx + + +class TestToolHandlerFlush: + """The tool SSE handler must call _flushPendingSegmentRender before reset.""" + + def test_tool_handler_calls_flush(self): + src = read("static/messages.js") + fn = _extract_handler(src, "tool") + assert "_flushPendingSegmentRender()" in fn, ( + "tool handler must call _flushPendingSegmentRender() before " + "_resetAssistantSegment()" + ) + + def test_tool_handler_flush_before_reset(self): + src = read("static/messages.js") + fn = _extract_handler(src, "tool") + flush_pos = fn.index("_flushPendingSegmentRender()") + reset_pos = fn.index("_resetAssistantSegment()") + assert flush_pos < reset_pos, ( + "_flushPendingSegmentRender must be called BEFORE " + "_resetAssistantSegment in the tool handler" + ) + + +class TestInterimAssistantHandlerFlush: + """The interim_assistant handler must call _flushPendingSegmentRender.""" + + def test_interim_handler_calls_flush(self): + src = read("static/messages.js") + fn = _extract_handler(src, "interim_assistant") + assert "_flushPendingSegmentRender()" in fn, ( + "interim_assistant handler must call _flushPendingSegmentRender() " + "before _resetAssistantSegment()" + ) + + def test_interim_handler_flush_before_last_reset(self): + """The flush must precede the final _resetAssistantSegment that seals + the segment for new content (not the early alreadyStreamed branch).""" + src = read("static/messages.js") + fn = _extract_handler(src, "interim_assistant") + flush_pos = fn.index("_flushPendingSegmentRender()") + # Find the _resetAssistantSegment call that comes AFTER the flush + reset_pos = fn.index("_resetAssistantSegment()", flush_pos) + assert flush_pos < reset_pos, ( + "_flushPendingSegmentRender must be called BEFORE the final " + "_resetAssistantSegment in the interim_assistant handler" + ) diff --git a/tests/test_streaming_race_fix.py b/tests/test_streaming_race_fix.py index 0780b0c03c..1702b379d3 100644 --- a/tests/test_streaming_race_fix.py +++ b/tests/test_streaming_race_fix.py @@ -114,19 +114,31 @@ class TestReconnectAccumulatorPreservation: """ def test_wire_sse_does_not_reset_accumulators(self): - """Regression guard: _wireSSE must not contain a literal - accumulator-reset statement. Preserves pre-reconnect content so - the user sees the full response across a drop+reconnect.""" + """Regression guard: the _wireSSE preamble (before any event + listeners are attached) must not contain a literal accumulator- + reset statement. Preserves pre-reconnect content so the user + sees the full response across a drop+reconnect. + + Turn-boundary resets inside event listeners (tool, + interim_assistant) are intentional (#2565) and not covered by + this guard — they prevent reasoning from accumulating across + multi-turn agent sessions.""" src = read('static/messages.js') m = re.search(r'function _wireSSE\(source\)\{.*?\n \}', src, re.DOTALL) assert m, "_wireSSE not found" fn = m.group(0) - assert "assistantText=''" not in fn and 'assistantText = ""' not in fn, ( - "_wireSSE must NOT reset assistantText — the server does not replay " - "events on reconnect, so the reset would wipe valid pre-drop content" - ) - assert "reasoningText=''" not in fn and 'reasoningText = ""' not in fn, ( - "_wireSSE must NOT reset reasoningText on reconnect" + # Check only the preamble before the first addEventListener — this is + # the reconnect path where resets would cause data loss. + first_listener = fn.find("source.addEventListener(") + assert first_listener > 0, "no addEventListener in _wireSSE" + preamble = fn[:first_listener] + assert "assistantText=''" not in preamble and 'assistantText = ""' not in preamble, ( + "_wireSSE preamble must NOT reset assistantText — the server does " + "not replay events on reconnect, so the reset would wipe valid " + "pre-drop content" + ) + assert "reasoningText=''" not in preamble and 'reasoningText = ""' not in preamble, ( + "_wireSSE preamble must NOT reset reasoningText on reconnect" ) def test_closure_initialises_accumulators_empty(self): diff --git a/tests/test_webui_state_db_reconciliation.py b/tests/test_webui_state_db_reconciliation.py index e1858d7c8e..f977b6c20c 100644 --- a/tests/test_webui_state_db_reconciliation.py +++ b/tests/test_webui_state_db_reconciliation.py @@ -467,6 +467,51 @@ def test_metadata_fast_path_excludes_state_db_rows_filtered_by_reconciliation(mo assert session["last_message_at"] == 1001.0 +def test_api_session_reload_drops_stale_cached_user_tail_after_saved_assistant(monkeypatch, tmp_path): + import api.models as models + import api.routes as routes + + sid = "webui_reconcile_cached_user_tail" + _install_test_session( + monkeypatch, + tmp_path, + sid, + [ + {"role": "user", "content": "please audit phase c", "timestamp": 1000.0}, + {"role": "assistant", "content": "final audit complete", "timestamp": 1001.0}, + ], + ) + _make_state_db( + tmp_path / "state.db", + sid, + [ + {"role": "user", "content": "please audit phase c", "timestamp": 1000.0}, + {"role": "assistant", "content": "final audit complete", "timestamp": 1001.0}, + ], + ) + + cached = models.Session.load(sid) + cached.messages.append( + { + "role": "user", + "content": "please audit phase c", + "timestamp": 1002.0, + } + ) + cached.pending_user_message = None + cached.active_stream_id = None + models.SESSIONS[sid] = cached + + handler = _GetHandler(f"/api/session?session_id={sid}&messages=1&resolve_model=0") + routes.handle_get(handler, urlparse(handler.path)) + + assert handler.status == 200 + messages = handler.response_json["session"]["messages"] + assert messages[-1]["role"] == "assistant" + assert messages[-1]["content"] == "final audit complete" + assert handler.response_json["session"]["message_count"] == 2 + + def test_state_db_reconciliation_preserves_tool_metadata(monkeypatch, tmp_path): import api.routes as routes