Skip to content

Commit 97e24d2

Browse files
author
IM.codes
committed
Fix stale transport queue UI sync
1 parent fcacafd commit 97e24d2

5 files changed

Lines changed: 145 additions & 15 deletions

File tree

test/e2e/qwen-transport-flow.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,33 @@ describe('qwen transport flow e2e', () => {
716716
);
717717
expect(queuedStateAfterDrain).toBeUndefined();
718718

719+
const explicitEmptyRunningState = drainedEvents.find((e) =>
720+
e.session === SESSION
721+
&& e.type === 'session.state'
722+
&& e.payload.state === 'running'
723+
&& Array.isArray(e.payload.pendingMessageEntries)
724+
&& e.payload.pendingMessageEntries.length === 0
725+
&& Array.isArray(e.payload.pendingMessages)
726+
&& e.payload.pendingMessages.length === 0
727+
&& typeof e.payload.pendingMessageVersion === 'number'
728+
);
729+
expect(explicitEmptyRunningState).toBeDefined();
730+
731+
const postDrainSessionListStart = serverLink.send.mock.calls.length;
732+
handleWebCommand({ type: 'get_sessions' }, serverLink);
733+
await flushAsync();
734+
735+
const postDrainSessionList = serverLink.send.mock.calls
736+
.slice(postDrainSessionListStart)
737+
.find(([msg]: [Record<string, unknown>]) => msg.type === 'session_list')?.[0] as
738+
| { sessions?: Array<Record<string, unknown>> }
739+
| undefined;
740+
const postDrainSession = postDrainSessionList?.sessions?.find((session) => session.name === SESSION);
741+
expect(postDrainSession).toEqual(expect.objectContaining({
742+
transportPendingMessages: [],
743+
transportPendingMessageEntries: [],
744+
transportPendingMessageVersion: expect.any(Number),
745+
}));
719746

720747
const idleStateEvents = mocks.emitted.filter((e) =>
721748
e.session === SESSION

test/web/transport-queue.test.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,17 @@ describe('transport queue reconciliation', () => {
110110
expect(result.changed).toBe(false);
111111
expect(result.entries.map((entry) => entry.clientMessageId)).toEqual(['queued-a']);
112112
});
113+
114+
it('allows a real delivered id to clear a single matching legacy queued entry by text', () => {
115+
const result = removeTransportPendingEntryForUserMessage(
116+
[{ clientMessageId: 'session-a:legacy:0:same text', text: 'same text' }],
117+
['same text'],
118+
{ clientMessageId: 'real-command-id', text: 'same text' },
119+
'session-a',
120+
);
121+
122+
expect(result.changed).toBe(true);
123+
expect(result.entries).toEqual([]);
124+
expect(result.messages).toEqual([]);
125+
});
113126
});

web/src/components/SessionControls.tsx

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,8 @@ export function SessionControls({ ws, activeSession, inputRef, onAfterAction, on
884884
// Keep legacy messages visible when older state lacks structured
885885
// entries, but do not synthesize tails for partial structured entries.
886886
hasEntriesField: Array.isArray(activeSession?.transportPendingMessageEntries)
887-
&& activeSession.transportPendingMessageEntries.length > 0,
887+
&& (activeSession.transportPendingMessageEntries.length > 0
888+
|| typeof activeSession.transportPendingMessageVersion === 'number'),
888889
hasMessagesField: Array.isArray(activeSession?.transportPendingMessages),
889890
},
890891
)
@@ -1508,14 +1509,32 @@ export function SessionControls({ ws, activeSession, inputRef, onAfterAction, on
15081509
: typeof event.payload.clientMessageId === 'string'
15091510
? event.payload.clientMessageId
15101511
: '';
1511-
removeLocalQueuedEntry(commandId, typeof event.payload.text === 'string' ? event.payload.text : undefined);
1512+
const deliveredText = typeof event.payload.text === 'string' ? event.payload.text : undefined;
1513+
removeLocalQueuedEntry(commandId, deliveredText);
15121514
// Record the delivered id so a stale daemon pending snapshot can't keep
15131515
// showing it as queued (the incoming snapshot is not under our control,
1514-
// unlike the optimistic set cleared above).
1515-
if (commandId) {
1516+
// unlike the optimistic set cleared above). Legacy snapshots synthesize
1517+
// ids from text, so also settle the single matching legacy id when the
1518+
// authoritative timeline echo carries a newer real command/client id.
1519+
const idsToSettle = commandId ? [commandId] : [];
1520+
const normalizedDeliveredText = typeof deliveredText === 'string' ? normalizeQueuedText(deliveredText) : '';
1521+
if (normalizedDeliveredText) {
1522+
const legacyTextMatches = incomingQueuedTransportEntries
1523+
.filter((entry) => isLegacyTransportPendingMessageId(entry.clientMessageId, activeSession.name)
1524+
&& normalizeQueuedText(entry.text) === normalizedDeliveredText)
1525+
.map((entry) => entry.clientMessageId);
1526+
if (legacyTextMatches.length === 1) idsToSettle.push(legacyTextMatches[0]);
1527+
}
1528+
if (idsToSettle.length > 0) {
15161529
setSettledQueuedIds((prev) => {
1517-
if (prev.has(commandId)) return prev;
1518-
const ids = [...prev, commandId];
1530+
let changed = false;
1531+
const ids = [...prev];
1532+
for (const id of idsToSettle) {
1533+
if (!id || prev.has(id)) continue;
1534+
ids.push(id);
1535+
changed = true;
1536+
}
1537+
if (!changed) return prev;
15191538
return new Set(ids.length > 500 ? ids.slice(-500) : ids);
15201539
});
15211540
}
@@ -1539,7 +1558,7 @@ export function SessionControls({ ws, activeSession, inputRef, onAfterAction, on
15391558
});
15401559
}
15411560
});
1542-
}, [activeSession, ws]);
1561+
}, [activeSession, incomingQueuedTransportEntries, ws]);
15431562

15441563
// Reset P2P mode on session change
15451564
useEffect(() => { setP2pMode('solo'); setP2pOpen(false); }, [activeSession?.name]);

web/src/transport-queue.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,29 @@ export function removeTransportPendingEntryForUserMessage(
178178
const normalizedText = typeof payload.text === 'string'
179179
? normalizeTransportPendingText(payload.text)
180180
: '';
181-
const matchIndex = candidateIdSet.size > 0
181+
const idMatchIndex = candidateIdSet.size > 0
182182
? entries.findIndex((entry) => candidateIdSet.has(entry.clientMessageId))
183-
: (() => {
184-
if (!normalizedText) return -1;
185-
const matches = entries
186-
.map((entry, index) => ({ entry, index }))
187-
.filter(({ entry }) => normalizeTransportPendingText(entry.text) === normalizedText);
188-
return matches.length === 1 ? matches[0].index : -1;
189-
})();
183+
: -1;
184+
const textFallbackIndex = (() => {
185+
if (!normalizedText) return -1;
186+
const matches = entries
187+
.map((entry, index) => ({ entry, index }))
188+
.filter(({ entry }) => normalizeTransportPendingText(entry.text) === normalizedText);
189+
if (matches.length !== 1) return -1;
190+
const match = matches[0];
191+
// Stable ids normally win. However, legacy snapshots synthesize ids from
192+
// text (`session:legacy:*`) and can survive across client/daemon upgrades; a
193+
// later authoritative user.message carries the real command/client id, so an
194+
// id-only match leaves the old yellow queue card stuck beside the delivered
195+
// user bubble. Allow text fallback only for those synthetic legacy ids (or
196+
// when no id was provided at all), preserving the duplicate-text safety for
197+
// real structured queue entries.
198+
if (candidateIdSet.size === 0 || isLegacyTransportPendingMessageId(match.entry.clientMessageId, scopeKey)) {
199+
return match.index;
200+
}
201+
return -1;
202+
})();
203+
const matchIndex = idMatchIndex >= 0 ? idMatchIndex : textFallbackIndex;
190204
if (matchIndex < 0) return { messages, entries, changed: false };
191205
const nextEntries = entries.filter((_, index) => index !== matchIndex);
192206
return {

web/test/components/SessionControls.test.tsx

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2824,6 +2824,63 @@ afterEach(() => {
28242824
expect(screen.getByText('queued second')).toBeDefined();
28252825
});
28262826

2827+
it('does not resurrect stale pendingMessages when a versioned empty entries snapshot is present', () => {
2828+
const ws = makeWs();
2829+
render(
2830+
<SessionControls
2831+
ws={ws as any}
2832+
activeSession={makeSession({
2833+
runtimeType: 'transport',
2834+
transportPendingMessages: ['stale queued'],
2835+
transportPendingMessageEntries: [],
2836+
transportPendingMessageVersion: 12,
2837+
})}
2838+
quickData={makeQuickData() as any}
2839+
/>,
2840+
);
2841+
2842+
expect(document.querySelector('.controls-queued-hint')).toBeFalsy();
2843+
expect(screen.queryByText('stale queued')).toBeNull();
2844+
});
2845+
2846+
it('clears a legacy queued card by unique text when the delivered timeline message has the real id', () => {
2847+
const ws = makeWs();
2848+
render(
2849+
<SessionControls
2850+
ws={ws as any}
2851+
activeSession={makeSession({
2852+
name: 'qwen-session',
2853+
runtimeType: 'transport',
2854+
state: 'running',
2855+
transportPendingMessages: ['legacy stale send'],
2856+
transportPendingMessageEntries: [],
2857+
})}
2858+
quickData={makeQuickData() as any}
2859+
/>,
2860+
);
2861+
2862+
expect(screen.getByText('legacy stale send')).toBeDefined();
2863+
2864+
act(() => {
2865+
ws.emit({
2866+
type: 'timeline.event',
2867+
event: {
2868+
eventId: 'transport-user:real-command-id',
2869+
sessionId: 'qwen-session',
2870+
type: 'user.message',
2871+
ts: Date.now(),
2872+
seq: 1,
2873+
epoch: 1,
2874+
source: 'daemon',
2875+
confidence: 'high',
2876+
payload: { text: 'legacy stale send', commandId: 'real-command-id' },
2877+
},
2878+
});
2879+
});
2880+
2881+
expect(screen.queryByText('legacy stale send')).toBeNull();
2882+
});
2883+
28272884
it('treats partial queued transport entries as authoritative', () => {
28282885
const ws = makeWs();
28292886
render(

0 commit comments

Comments
 (0)