Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 97 additions & 31 deletions static/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,42 @@ let _sendInProgress = false;
let _sendInProgressSid = null; // session_id of the in-flight send
const _sessionTitleProvisionalBySid = new Map();

function _clearStaleBusyStateBeforeSend({compressionRunning=false}={}){
if(!S||!S.busy||compressionRunning) return false;
const session=S.session||{};
const sid=session.session_id||'';
const hasRuntimeConfirmation=Boolean(
S.activeStreamId||
session.active_stream_id||
session.pending_user_message||
session.pending_started_at
);
if(hasRuntimeConfirmation) return false;
if(typeof INFLIGHT==='object'&&INFLIGHT&&sid&&INFLIGHT[sid]){
delete INFLIGHT[sid];
if(typeof clearInflightState==='function') clearInflightState(sid);
}
S.activeStreamId=null;
if(session) session.active_stream_id=null;
if(typeof setBusy==='function') setBusy(false);
else S.busy=false;
if(typeof setComposerStatus==='function') setComposerStatus('');
if(typeof setStatus==='function') setStatus('');
if(typeof updateSendBtn==='function') updateSendBtn();
if(sid&&typeof clearOptimisticSessionStreaming==='function') clearOptimisticSessionStreaming(sid);
return true;
}

function _runOptionalPreStartUiStep(label, fn){
try{
return typeof fn==='function'?fn():undefined;
}catch(e){
const message=e&&e.message?e.message:String(e||'unknown error');
try{console.warn('[webui] optional pre-start UI step failed', label, message);}catch(_){ }
return undefined;
}
}

function _sessionTitleLooksDefaultOrProvisional(titleText, provisionalText){
const title=String(titleText||'').replace(/\s+/g,' ').trim();
if(!title||title==='Untitled'||title==='New Chat')return true;
Expand Down Expand Up @@ -262,6 +298,7 @@ async function send(){
}

const compressionRunning=typeof isCompressionUiRunning==='function'&&isCompressionUiRunning();
_clearStaleBusyStateBeforeSend({compressionRunning});
// If busy or a manual compression is still running, handle based on busy_input_mode
if(S.busy||compressionRunning){
if(text){
Expand Down Expand Up @@ -409,39 +446,68 @@ async function send(){
const userMsg={role:'user',content:displayText,attachments:uploaded.length?uploadedNames:undefined,_ts:Date.now()/1000};
S.toolCalls=[]; // clear tool calls from previous turn
clearLiveToolCards(); // clear any leftover live cards from last turn
S.messages.push(userMsg);renderMessages();appendThinking('',{pending:true});setBusy(true);
// First optimistic pass: make the local user turn visible before /api/chat/start
// can save pending state on the server.
if(typeof upsertActiveSessionForLocalTurn==='function'){
upsertActiveSessionForLocalTurn({title:displayText.slice(0,64),messageCount:S.messages.length,timestampMs:Date.now()});
}
const optimisticMessages=[...S.messages];
INFLIGHT[activeSid]={messages:optimisticMessages,uploaded:uploadedNames,toolCalls:[]};
if(typeof saveInflightState==='function'){
saveInflightState(activeSid,{streamId:null,messages:INFLIGHT[activeSid].messages,uploaded:uploadedNames,toolCalls:[]});
}
if(typeof renderSessionListFromCache==='function') renderSessionListFromCache();
startApprovalPolling(activeSid);
startClarifyPolling(activeSid);
_fetchYoloState(activeSid); // sync YOLO pill with backend state
S.activeStreamId = null; // will be set after stream starts
if(typeof updateSendBtn==='function') updateSendBtn();
let optimisticMessages;
try{
S.messages.push(userMsg);renderMessages();appendThinking('',{pending:true});setBusy(true);
// First optimistic pass: make the local user turn visible before /api/chat/start
// can save pending state on the server.
_runOptionalPreStartUiStep('upsertActiveSessionForLocalTurn.initial', ()=>{
if(typeof upsertActiveSessionForLocalTurn==='function'){
upsertActiveSessionForLocalTurn({title:displayText.slice(0,64),messageCount:S.messages.length,timestampMs:Date.now()});
}
});
optimisticMessages=[...S.messages];
INFLIGHT[activeSid]={messages:optimisticMessages,uploaded:uploadedNames,toolCalls:[]};
if(typeof saveInflightState==='function'){
saveInflightState(activeSid,{streamId:null,messages:INFLIGHT[activeSid].messages,uploaded:uploadedNames,toolCalls:[]});
}
_runOptionalPreStartUiStep('renderSessionListFromCache.initial', ()=>{
if(typeof renderSessionListFromCache==='function') renderSessionListFromCache();
});
_runOptionalPreStartUiStep('startApprovalPolling.prestart', ()=>startApprovalPolling(activeSid));
_runOptionalPreStartUiStep('startClarifyPolling.prestart', ()=>startClarifyPolling(activeSid));
_runOptionalPreStartUiStep('fetchYoloState.prestart', ()=>_fetchYoloState(activeSid)); // sync YOLO pill with backend state
S.activeStreamId = null; // will be set after stream starts
_runOptionalPreStartUiStep('updateSendBtn.prestart', ()=>{
if(typeof updateSendBtn==='function') updateSendBtn();
});

// Set provisional title from user message immediately so session appears
// in the sidebar right away with a meaningful name. /api/chat/start persists
// the server-side provisional title and may refine this optimistic text.
if(S.session&&(S.session.title==='Untitled'||!S.session.title)){
const provisionalTitle=displayText.slice(0,64);
applySessionTitleUpdate(activeSid, provisionalTitle, {force:true, rememberProvisional:true});
if(typeof upsertActiveSessionForLocalTurn==='function'){
// Second optimistic pass: carry the provisional title into the cached row
// without re-fetching /api/sessions before pending state exists server-side.
upsertActiveSessionForLocalTurn({title:provisionalTitle,messageCount:S.messages.length,timestampMs:Date.now()});
// Set provisional title from user message immediately so session appears
// in the sidebar right away with a meaningful name. /api/chat/start persists
// the server-side provisional title and may refine this optimistic text.
if(S.session&&(S.session.title==='Untitled'||!S.session.title)){
const provisionalTitle=displayText.slice(0,64);
_runOptionalPreStartUiStep('applySessionTitleUpdate.provisional', ()=>{
applySessionTitleUpdate(activeSid, provisionalTitle, {force:true, rememberProvisional:true});
});
_runOptionalPreStartUiStep('upsertActiveSessionForLocalTurn.provisional', ()=>{
if(typeof upsertActiveSessionForLocalTurn==='function'){
// Second optimistic pass: carry the provisional title into the cached row
// without re-fetching /api/sessions before pending state exists server-side.
upsertActiveSessionForLocalTurn({title:provisionalTitle,messageCount:S.messages.length,timestampMs:Date.now()});
}
});
} else if(typeof upsertActiveSessionForLocalTurn==='function'){
_runOptionalPreStartUiStep('upsertActiveSessionForLocalTurn.titled', ()=>{
upsertActiveSessionForLocalTurn({title:S.session&&S.session.title||displayText.slice(0,64),messageCount:S.messages.length,timestampMs:Date.now()});
});
} else {
_runOptionalPreStartUiStep('renderSessionListFromCache.prestart', ()=>{
renderSessionListFromCache(); // ensure it's visible even if already titled
});
}
} else if(typeof upsertActiveSessionForLocalTurn==='function'){
upsertActiveSessionForLocalTurn({title:S.session&&S.session.title||displayText.slice(0,64),messageCount:S.messages.length,timestampMs:Date.now()});
} else {
renderSessionListFromCache(); // ensure it's visible even if already titled
}catch(preStartError){
// The user turn must reach /api/chat/start even if local optimistic UI
// bookkeeping (render cache, storage quota, sidebar reconciliation, etc.)
// throws. Otherwise the pane can show a user bubble + spinner while the
// backend never receives the turn.
const message=preStartError&&preStartError.message?preStartError.message:String(preStartError||'unknown error');
try{console.warn('[webui] pre-start optimistic UI failed; continuing to /api/chat/start', message);}catch(_){ }
if(!S.messages.includes(userMsg)) S.messages.push(userMsg);
optimisticMessages=[...S.messages];
INFLIGHT[activeSid]={messages:optimisticMessages,uploaded:uploadedNames,toolCalls:[]};
try{setBusy(true);}catch(_){S.busy=true;}
S.activeStreamId=null;
}

// Start the agent via POST, get a stream_id back
Expand Down
50 changes: 41 additions & 9 deletions static/sessions.js
Original file line number Diff line number Diff line change
Expand Up @@ -2023,6 +2023,31 @@ function _isOptimisticFirstTurnSessionRow(s){
);
}

function _shouldKeepLocalOnlyOptimisticSessionRow(local){
if(!_isOptimisticFirstTurnSessionRow(local)) return false;
const sid=local.session_id;
if(typeof _sendInProgress!=='undefined'&&_sendInProgress&&sid===_sendInProgressSid) return true;
const activeSid=S&&S.session&&S.session.session_id;
const isActive=Boolean(activeSid&&activeSid===sid);
const hasRuntimeConfirmation=Boolean(local.active_stream_id||local.pending_user_message||local.pending_started_at);
if(isActive&&S.busy&&hasRuntimeConfirmation) return true;
const localTs=Number(local.last_message_at||local.updated_at||0);
const ageMs=localTs>0?Date.now()-(localTs*1000):Infinity;
return Boolean(isActive&&S.busy&&ageMs>=0&&ageMs<5000);
}

function _dropStaleOptimisticSessionRow(sid){
if(!sid) return;
if(INFLIGHT&&INFLIGHT[sid]){
delete INFLIGHT[sid];
if(typeof clearInflightState==='function') clearInflightState(sid);
}
if(typeof _sessionStreamingById!=='undefined'&&_sessionStreamingById&&typeof _sessionStreamingById.set==='function'){
_sessionStreamingById.set(sid,false);
}
if(typeof _forgetObservedStreamingSession==='function') _forgetObservedStreamingSession(sid);
}

function _mergeOptimisticFirstTurnSessions(fetchedSessions){
const merged=Array.isArray(fetchedSessions)?[...fetchedSessions]:[];
const bySid=new Map();
Expand All @@ -2034,24 +2059,31 @@ function _mergeOptimisticFirstTurnSessions(fetchedSessions){
if(idx>=0){
const fetched=merged[idx]||{};
const fetchedIsServerIdle=_isServerIdleSessionRow(fetched);
const keepLocalOptimistic=fetchedIsServerIdle?false:_shouldKeepLocalOnlyOptimisticSessionRow(local);
const localCount=Number(local.message_count||0);
const fetchedCount=Number(fetched.message_count||0);
const localTs=Number(local.last_message_at||local.updated_at||0);
const fetchedTs=Number(fetched.last_message_at||fetched.updated_at||0);
if(!keepLocalOptimistic&&typeof _dropStaleOptimisticSessionRow==='function') _dropStaleOptimisticSessionRow(sid);
merged[idx]={
...local,
...fetched,
message_count:Math.max(localCount,fetchedCount),
last_message_at:Math.max(localTs,fetchedTs),
updated_at:Math.max(Number(local.updated_at||0),Number(fetched.updated_at||0),localTs,fetchedTs),
active_stream_id:fetchedIsServerIdle?null:(fetched.active_stream_id||local.active_stream_id||null),
pending_user_message:fetchedIsServerIdle?null:(fetched.pending_user_message||local.pending_user_message||null),
pending_started_at:fetchedIsServerIdle?null:(fetched.pending_started_at||local.pending_started_at||null),
is_streaming:fetchedIsServerIdle?false:Boolean(fetched.is_streaming||local.is_streaming||_isSessionLocallyStreaming(local)),
title:keepLocalOptimistic?(local.title||fetched.title):fetched.title,
message_count:keepLocalOptimistic?Math.max(localCount,fetchedCount):fetchedCount,
last_message_at:keepLocalOptimistic?Math.max(localTs,fetchedTs):fetchedTs,
updated_at:keepLocalOptimistic?Math.max(Number(local.updated_at||0),Number(fetched.updated_at||0),localTs,fetchedTs):Number(fetched.updated_at||fetchedTs||0),
active_stream_id:fetchedIsServerIdle?null:(keepLocalOptimistic?(fetched.active_stream_id||local.active_stream_id||null):null),
pending_user_message:fetchedIsServerIdle?null:(keepLocalOptimistic?(fetched.pending_user_message||local.pending_user_message||null):null),
pending_started_at:fetchedIsServerIdle?null:(keepLocalOptimistic?(fetched.pending_started_at||local.pending_started_at||null):null),
is_streaming:fetchedIsServerIdle?false:(keepLocalOptimistic&&Boolean(fetched.is_streaming||local.is_streaming||_isSessionLocallyStreaming(local))),
};
}else{
merged.push({...local,is_streaming:true});
bySid.set(sid,merged.length-1);
if(_shouldKeepLocalOnlyOptimisticSessionRow(local)){
merged.push({...local,is_streaming:true});
bySid.set(sid,merged.length-1);
}else{
_dropStaleOptimisticSessionRow(sid);
}
}
}
return merged;
Expand Down
89 changes: 88 additions & 1 deletion tests/test_inflight_send_start_race.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _function_body(src: str, name: str) -> str:
def test_send_preserves_optimistic_messages_across_chat_start_await():
"""send() must not dereference INFLIGHT[activeSid] after await without a fallback."""
body = _function_body(MESSAGES_JS, "send")
setup_idx = body.index("const optimisticMessages=[...S.messages];")
setup_idx = body.index("optimisticMessages=[...S.messages];")
inflight_idx = body.index("INFLIGHT[activeSid]={messages:optimisticMessages")
await_idx = body.index("const startData=await api('/api/chat/start'")
save_idx = body.index("saveInflightState(activeSid,{streamId", await_idx)
Expand All @@ -49,3 +49,90 @@ def test_stale_inflight_purge_preserves_current_send_before_stream_id_exists():
skip_idx = body.index("_sendInProgress")
delete_idx = body.index("delete INFLIGHT[sid];")
assert skip_idx < delete_idx, "the current-send skip must run before any purge deletion"


def test_send_clears_stale_busy_state_before_queue_branch():
"""A stale client-only busy flag must not divert a new user turn into the invisible queue."""
body = _function_body(MESSAGES_JS, "send")

assert "_clearStaleBusyStateBeforeSend" in body, (
"send() should reconcile client-only stale busy state before deciding busy/queue mode"
)
reconcile_idx = body.index("_clearStaleBusyStateBeforeSend")
busy_branch_idx = body.index("if(S.busy||compressionRunning)")
chat_start_idx = body.index("api('/api/chat/start'")
assert reconcile_idx < busy_branch_idx < chat_start_idx, (
"stale busy reconciliation must run before the queue branch and before /api/chat/start"
)


def test_pre_start_optimistic_ui_helpers_cannot_block_chat_start():
"""Optional optimistic UI helpers must not strand a local bubble before /api/chat/start."""
body = _function_body(MESSAGES_JS, "send")
helper_body = _function_body(MESSAGES_JS, "_runOptionalPreStartUiStep")

optimistic_idx = body.index("S.messages.push(userMsg);renderMessages();appendThinking('',{pending:true});setBusy(true);")
chat_start_idx = body.index("api('/api/chat/start'")
pre_start = body[optimistic_idx:chat_start_idx]

assert "try" in helper_body and "catch" in helper_body, (
"optional pre-start UI helper wrapper must catch errors before /api/chat/start"
)
assert "setStatus(`UI warning before send:" not in helper_body, (
"non-fatal pre-start UI helper failures should stay in the console; visible status flashes "
"look like real send errors even though /api/chat/start continues"
)
assert "_runOptionalPreStartUiStep" in pre_start, (
"send() should wrap optimistic sidebar/title/polling helpers before /api/chat/start"
)
assert "upsertActiveSessionForLocalTurn" in pre_start and "applySessionTitleUpdate" in pre_start


def test_pre_start_optimistic_block_cannot_prevent_chat_start():
"""Any pre-start UI/storage exception must still fall through to /api/chat/start."""
body = _function_body(MESSAGES_JS, "send")
optimistic_idx = body.index("S.messages.push(userMsg);renderMessages();appendThinking('',{pending:true});setBusy(true);")
chat_start_idx = body.index("api('/api/chat/start'")
pre_start = body[optimistic_idx:chat_start_idx]

assert "}catch(preStartError){" in pre_start, (
"The whole optimistic pre-start block needs a catch, not only individual optional helpers"
)
assert "continuing to /api/chat/start" in pre_start, (
"The recovery path should document that chat/start must still execute"
)
assert pre_start.rindex("}catch(preStartError){") < chat_start_idx, (
"pre-start catch must be before the /api/chat/start call"
)


def test_server_absent_optimistic_first_turn_rows_are_not_kept_forever():
"""A local first-turn sidebar row must expire when /api/chat/start never persisted it."""
body = _function_body(SESSIONS_JS, "_mergeOptimisticFirstTurnSessions")

assert "_shouldKeepLocalOnlyOptimisticSessionRow(local)" in body, (
"server-absent optimistic rows need an explicit keep/drop gate"
)
keep_idx = body.index("if(_shouldKeepLocalOnlyOptimisticSessionRow(local))")
append_idx = body.index("merged.push({...local,is_streaming:true});")
drop_idx = body.index("_dropStaleOptimisticSessionRow(sid);", append_idx)
assert keep_idx < append_idx < drop_idx, (
"local optimistic rows may only be appended inside the explicit keep gate"
)
drop_body = _function_body(SESSIONS_JS, "_dropStaleOptimisticSessionRow")
assert "clearInflightState(sid)" in drop_body, (
"dropping a phantom row should also clear persisted browser recovery state"
)


def test_server_idle_row_wins_over_stale_optimistic_count():
"""If the server says the row is idle, stale local message_count/title must not win."""
body = _function_body(SESSIONS_JS, "_mergeOptimisticFirstTurnSessions")

assert "const keepLocalOptimistic=" in body
assert "message_count:keepLocalOptimistic?Math.max(localCount,fetchedCount):fetchedCount" in body, (
"stale optimistic message_count must not override a confirmed idle server row"
)
assert "title:keepLocalOptimistic?(local.title||fetched.title):fetched.title" in body, (
"stale optimistic provisional title must not override a confirmed idle server row"
)
Loading