Skip to content

Commit ef68915

Browse files
author
IM.codes
committed
Fix transport queue synchronization
1 parent a71f256 commit ef68915

12 files changed

Lines changed: 372 additions & 158 deletions

src/daemon/command-handler.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2099,8 +2099,8 @@ function markTransportCancelIdle(sessionName: string, error?: string): void {
20992099
pendingMessages: runtime?.pendingMessages ?? [],
21002100
pendingMessageEntries: runtime?.pendingEntries ?? [],
21012101
pendingMessageVersion: runtime
2102-
? (typeof runtime.pendingVersion === 'number' ? observeTransportQueueRevision(sessionName, runtime.pendingVersion) : 0)
2103-
: (getTransportQueueRevision(sessionName) ?? 0),
2102+
? observeTransportQueueRevision(sessionName, runtime.pendingVersion)
2103+
: (getTransportQueueRevision(sessionName) ?? observeTransportQueueRevision(sessionName, undefined)),
21042104
...(error ? { error } : {}),
21052105
}, { source: 'daemon', confidence: 'high' });
21062106
}

src/daemon/lifecycle.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import { buildWorkerSessionPersistBody, mergeWorkerSessionSnapshot, shouldPersis
3838
import { replicatePendingProcessedContext } from '../context/processed-context-replication.js';
3939
import { configureSharedContextRuntime } from '../context/shared-context-runtime.js';
4040
import { fetchBackendSharedContextRuntimeConfig } from '../context/backend-runtime-config.js';
41+
import { observeTransportQueueRevision } from './transport-queue-revision.js';
4142
import { setContextModelRuntimeConfig } from '../context/context-model-config.js';
4243
import { closeLiveContextMaterializationAdmission, LiveContextIngestion } from '../context/live-context-ingestion.js';
4344
import { LocalSkillReviewWorker } from '../context/skill-review-worker.js';
@@ -746,7 +747,7 @@ export async function startup(): Promise<DaemonContext> {
746747
...(transportRuntime ? {
747748
transportPendingMessages: transportRuntime.pendingMessages,
748749
transportPendingMessageEntries: transportRuntime.pendingEntries,
749-
transportPendingMessageVersion: transportRuntime.pendingVersion,
750+
transportPendingMessageVersion: observeTransportQueueRevision(session.name, transportRuntime.pendingVersion),
750751
} : {}),
751752
});
752753
} catch { /* ignore */ }

src/daemon/transport-queue-revision.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ export function observeTransportQueueRevision(sessionName: string, observed: unk
2424
const current = revisions.get(sessionName);
2525
const normalized = normalizeRevision(observed);
2626
if (current === undefined) {
27-
// If the daemon just restarted and the first visible queue mutation already
28-
// has runtime version 1, emit 0 once so the web reset rule can establish a
29-
// fresh baseline instead of rejecting a lower-than-old-session number.
30-
const initial = normalized === undefined || normalized <= 1 ? 0 : normalized;
27+
// Never use 0 as an ordinary queue revision. Older UI code treated 0 as a
28+
// reset and would lower its baseline, allowing stale queued snapshots to
29+
// resurrect already-drained queue cards. A future epoch/generation field can
30+
// model true restarts explicitly; within this numeric namespace revisions
31+
// must be monotonic and positive.
32+
const initial = normalized === undefined || normalized < 1 ? 1 : normalized;
3133
revisions.set(sessionName, initial);
3234
if (normalized !== undefined) observedRuntimeVersions.set(sessionName, normalized);
3335
return initial;
@@ -50,7 +52,7 @@ export function observeTransportQueueRevision(sessionName: string, observed: unk
5052

5153
export function bumpTransportQueueRevision(sessionName: string): number {
5254
const current = revisions.get(sessionName);
53-
const next = current === undefined ? 0 : current + 1;
55+
const next = current === undefined ? 1 : current + 1;
5456
revisions.set(sessionName, next);
5557
return next;
5658
}

test/daemon/command-handler-transport-queue.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1012,7 +1012,7 @@ describe('handleWebCommand transport queue behavior', () => {
10121012
pendingCount: 3,
10131013
pendingMessages: ['a', 'b', 'c'],
10141014
pendingMessageEntries: [],
1015-
pendingMessageVersion: 0,
1015+
pendingMessageVersion: expect.any(Number),
10161016
},
10171017
expect.objectContaining({ source: 'daemon', confidence: 'high' }),
10181018
);

test/daemon/transport-resend-queue-emit.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ describe('transport-resend-queue user-visible signals (audit 0419d1ac-1f4)', ()
6262

6363
it('resend mutations advance a session queue revision', async () => {
6464
const first = enqueueResend('s-version', { text: 'first', commandId: 'c1', queuedAt: Date.now() });
65-
expect(first.pendingVersion).toBe(0);
65+
expect(first.pendingVersion).toBe(1);
6666
const second = enqueueResend('s-version', { text: 'second', commandId: 'c2', queuedAt: Date.now() });
67-
expect(second.pendingVersion).toBe(1);
68-
expect(getTransportQueueRevision('s-version')).toBe(1);
67+
expect(second.pendingVersion).toBe(2);
68+
expect(getTransportQueueRevision('s-version')).toBe(2);
6969

7070
await drainResend('s-version', () => 'sent');
7171
expect(getResendEntries('s-version')).toEqual([]);
72-
expect(getTransportQueueRevision('s-version')).toBe(2);
72+
expect(getTransportQueueRevision('s-version')).toBe(3);
7373
});
7474

7575
it('resend pending snapshots carry the queue revision', () => {

test/web/transport-queue.test.ts

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { describe, expect, it } from 'vitest';
22
import {
3+
hasExplicitTransportPendingSnapshot,
4+
normalizeTransportPendingEntries,
5+
nextTransportQueueVersion,
36
removeTransportPendingEntryForUserMessage,
47
shouldApplyTransportQueueSnapshot,
58
shouldApplyTransportQueueSnapshotForPayload,
@@ -9,16 +12,18 @@ describe('transport queue reconciliation', () => {
912
it('does not apply unversioned snapshots after a versioned baseline exists', () => {
1013
expect(shouldApplyTransportQueueSnapshot(undefined, undefined)).toBe(true);
1114
expect(shouldApplyTransportQueueSnapshot(3, undefined)).toBe(false);
15+
expect(shouldApplyTransportQueueSnapshot(3, 0)).toBe(false);
1216
expect(shouldApplyTransportQueueSnapshot(3, 2)).toBe(false);
1317
expect(shouldApplyTransportQueueSnapshot(3, 3)).toBe(true);
1418
expect(shouldApplyTransportQueueSnapshot(3, 4)).toBe(true);
19+
expect(nextTransportQueueVersion(3, 0)).toBe(3);
1520
});
1621

17-
it('applies only explicit empty unversioned clear after a versioned baseline', () => {
22+
it('rejects explicit empty unversioned clear after a versioned baseline', () => {
1823
expect(shouldApplyTransportQueueSnapshotForPayload(3, undefined, {
1924
hasExplicitSnapshot: true,
2025
isExplicitEmpty: true,
21-
})).toBe(true);
26+
})).toBe(false);
2227
expect(shouldApplyTransportQueueSnapshotForPayload(3, undefined, {
2328
hasExplicitSnapshot: false,
2429
isExplicitEmpty: true,
@@ -29,7 +34,57 @@ describe('transport queue reconciliation', () => {
2934
})).toBe(false);
3035
});
3136

32-
it('text fallback consumes only one matching pending entry', () => {
37+
it('treats entries as authoritative when legacy messages disagree', () => {
38+
const entries = normalizeTransportPendingEntries(
39+
[{ clientMessageId: 'stable-a', text: 'same text' }],
40+
['same text'],
41+
'session-a',
42+
);
43+
expect(entries).toEqual([{ clientMessageId: 'stable-a', text: 'same text' }]);
44+
});
45+
46+
47+
48+
it('treats present entries as fully authoritative and does not append legacy message tails', () => {
49+
const entries = normalizeTransportPendingEntries(
50+
[{ clientMessageId: 'stable-a', text: 'A' }],
51+
['A', 'stale-B'],
52+
'session-a',
53+
{ hasEntriesField: true, hasMessagesField: true },
54+
);
55+
56+
expect(entries).toEqual([{ clientMessageId: 'stable-a', text: 'A' }]);
57+
});
58+
59+
it('treats present empty entries as an authoritative empty queue', () => {
60+
const entries = normalizeTransportPendingEntries(
61+
[],
62+
['stale-B'],
63+
'session-a',
64+
{ hasEntriesField: true, hasMessagesField: true },
65+
);
66+
67+
expect(entries).toEqual([]);
68+
});
69+
70+
it('falls back to legacy messages only when entries are absent', () => {
71+
const entries = normalizeTransportPendingEntries(
72+
undefined,
73+
['legacy-A'],
74+
'session-a',
75+
{ hasEntriesField: false, hasMessagesField: true },
76+
);
77+
78+
expect(entries).toEqual([{ clientMessageId: 'session-a:legacy:0:legacy-A', text: 'legacy-A' }]);
79+
});
80+
81+
it('detects explicit snapshots from entries-only payloads', () => {
82+
expect(hasExplicitTransportPendingSnapshot({ pendingMessageEntries: [] })).toBe(true);
83+
expect(hasExplicitTransportPendingSnapshot({ transportPendingMessageEntries: [] })).toBe(true);
84+
expect(hasExplicitTransportPendingSnapshot({ state: 'running' })).toBe(false);
85+
});
86+
87+
it('text fallback refuses ambiguous duplicate pending entries', () => {
3388
const result = removeTransportPendingEntryForUserMessage(
3489
[
3590
{ clientMessageId: 'a', text: 'same text' },
@@ -40,8 +95,19 @@ describe('transport queue reconciliation', () => {
4095
'session-a',
4196
);
4297

43-
expect(result.changed).toBe(true);
44-
expect(result.entries.map((entry) => entry.clientMessageId)).toEqual(['b']);
45-
expect(result.messages).toEqual(['same text']);
98+
expect(result.changed).toBe(false);
99+
expect(result.entries.map((entry) => entry.clientMessageId)).toEqual(['a', 'b']);
100+
});
101+
102+
it('does not let a wrong id fall back to deleting by text', () => {
103+
const result = removeTransportPendingEntryForUserMessage(
104+
[{ clientMessageId: 'queued-a', text: 'same text' }],
105+
['same text'],
106+
{ clientMessageId: 'delivered-other', text: 'same text' },
107+
'session-a',
108+
);
109+
110+
expect(result.changed).toBe(false);
111+
expect(result.entries.map((entry) => entry.clientMessageId)).toEqual(['queued-a']);
46112
});
47113
});
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { describe, expect, it } from 'vitest';
2+
import { buildTransportPendingSyncPatch, hasTransportPendingSyncSnapshot } from '../../web/src/transport-queue.js';
3+
4+
describe('sub-session transport queue sync patch', () => {
5+
it('derives messages from authoritative entries-only snapshots and drops stale message tails', () => {
6+
const patch = buildTransportPendingSyncPatch(
7+
{
8+
transportPendingMessages: ['A', 'stale-B'],
9+
transportPendingMessageEntries: [
10+
{ clientMessageId: 'a', text: 'A' },
11+
{ clientMessageId: 'b', text: 'stale-B' },
12+
],
13+
transportPendingMessageVersion: 3,
14+
},
15+
{
16+
transportPendingMessageEntries: [{ clientMessageId: 'a', text: 'A' }],
17+
transportPendingMessageVersion: 4,
18+
},
19+
'deck_sub_a',
20+
);
21+
22+
expect(patch).toMatchObject({
23+
transportPendingMessages: ['A'],
24+
transportPendingMessageEntries: [{ clientMessageId: 'a', text: 'A' }],
25+
transportPendingMessageVersion: 4,
26+
});
27+
});
28+
29+
it('treats entries-only empty snapshots as authoritative clears', () => {
30+
const patch = buildTransportPendingSyncPatch(
31+
{
32+
transportPendingMessages: ['stale-B'],
33+
transportPendingMessageEntries: [{ clientMessageId: 'b', text: 'stale-B' }],
34+
transportPendingMessageVersion: 3,
35+
},
36+
{
37+
transportPendingMessageEntries: [],
38+
transportPendingMessageVersion: 4,
39+
},
40+
'deck_sub_a',
41+
);
42+
43+
expect(patch).toMatchObject({
44+
transportPendingMessages: [],
45+
transportPendingMessageEntries: [],
46+
transportPendingMessageVersion: 4,
47+
});
48+
});
49+
50+
it('does not treat version-only payloads as pending queue content sync', () => {
51+
expect(hasTransportPendingSyncSnapshot({ transportPendingMessageVersion: 5 })).toBe(false);
52+
expect(buildTransportPendingSyncPatch(
53+
{
54+
transportPendingMessages: ['stale-B'],
55+
transportPendingMessageEntries: [{ clientMessageId: 'b', text: 'stale-B' }],
56+
transportPendingMessageVersion: 3,
57+
},
58+
{ transportPendingMessageVersion: 5 },
59+
'deck_sub_a',
60+
)).toEqual({});
61+
});
62+
});

web/src/app.tsx

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,9 @@ import { isP2pDiscussionVisibleInSubSessionBar } from './p2p-discussion-scope.js
149149
import {
150150
extractTransportPendingMessages,
151151
extractTransportPendingVersion,
152+
hasExplicitTransportPendingSnapshot,
152153
mergeTransportPendingEntriesForIdleState,
153154
mergeTransportPendingEntriesForRunningState,
154-
mergeTransportPendingMessagesForIdleState,
155-
mergeTransportPendingMessagesForRunningState,
156155
nextTransportQueueVersion,
157156
normalizeTransportPendingEntries,
158157
removeTransportPendingEntryForUserMessage,
@@ -2957,15 +2956,19 @@ export function App() {
29572956
// Sync session state from live timeline events (running/idle)
29582957
if (event.type === 'session.state' && !event.sessionId.startsWith('deck_sub_')) {
29592958
const liveState = String(event.payload.state ?? '');
2960-
const hasPendingMessagesField = Object.prototype.hasOwnProperty.call(event.payload ?? {}, 'pendingMessages');
2959+
const hasPendingSnapshot = hasExplicitTransportPendingSnapshot(event.payload);
29612960
const incomingVersion = extractTransportPendingVersion(event.payload.pendingMessageVersion);
29622961
if (liveState === 'queued') {
2963-
const pendingMessages = extractTransportPendingMessages(event.payload.pendingMessages);
2962+
const hasPendingEntriesField = Object.prototype.hasOwnProperty.call(event.payload, 'pendingMessageEntries');
2963+
const hasPendingMessagesField = Object.prototype.hasOwnProperty.call(event.payload, 'pendingMessages');
2964+
const parsedPendingMessages = extractTransportPendingMessages(event.payload.pendingMessages);
29642965
const pendingEntries = normalizeTransportPendingEntries(
29652966
event.payload.pendingMessageEntries,
2966-
pendingMessages,
2967+
parsedPendingMessages,
29672968
event.sessionId,
2969+
{ hasEntriesField: hasPendingEntriesField, hasMessagesField: hasPendingMessagesField },
29682970
);
2971+
const pendingMessages = hasPendingEntriesField ? pendingEntries.map((entry) => entry.text) : parsedPendingMessages;
29692972
setSessions((prev) => prev.map((s) => {
29702973
if (s.name !== event.sessionId) return s;
29712974
// Drop a stale `queued` snapshot wholesale: it would otherwise
@@ -2986,33 +2989,34 @@ export function App() {
29862989
} else if (liveState === 'running') {
29872990
setSessions((prev) => prev.map((s) => {
29882991
if (s.name !== event.sessionId) return s;
2989-
const incomingMessages = extractTransportPendingMessages(event.payload.pendingMessages);
2992+
const hasPendingEntriesField = Object.prototype.hasOwnProperty.call(event.payload, 'pendingMessageEntries');
2993+
const hasPendingMessagesField = Object.prototype.hasOwnProperty.call(event.payload, 'pendingMessages');
2994+
const parsedIncomingMessages = extractTransportPendingMessages(event.payload.pendingMessages);
29902995
const incomingEntries = normalizeTransportPendingEntries(
29912996
event.payload.pendingMessageEntries,
2992-
incomingMessages,
2997+
parsedIncomingMessages,
29932998
event.sessionId,
2999+
{ hasEntriesField: hasPendingEntriesField, hasMessagesField: hasPendingMessagesField },
29943000
);
3001+
const incomingMessages = hasPendingEntriesField ? incomingEntries.map((entry) => entry.text) : parsedIncomingMessages;
29953002
const applyPending = shouldApplyTransportQueueSnapshotForPayload(s.transportPendingMessageVersion, incomingVersion, {
2996-
hasExplicitSnapshot: hasPendingMessagesField,
2997-
isExplicitEmpty: hasPendingMessagesField && incomingMessages.length === 0 && incomingEntries.length === 0,
3003+
hasExplicitSnapshot: hasPendingSnapshot,
3004+
isExplicitEmpty: hasPendingSnapshot && incomingMessages.length === 0 && incomingEntries.length === 0,
29983005
});
29993006
return {
30003007
...s,
30013008
state: 'running' as SessionInfo['state'],
30023009
transportPendingMessages: applyPending
3003-
? mergeTransportPendingMessagesForRunningState(
3004-
s.transportPendingMessages,
3005-
event.payload.pendingMessages,
3006-
hasPendingMessagesField,
3007-
)
3010+
? incomingMessages
30083011
: (s.transportPendingMessages ?? []),
30093012
transportPendingMessageEntries: applyPending
30103013
? mergeTransportPendingEntriesForRunningState(
30113014
s.transportPendingMessageEntries,
30123015
event.payload.pendingMessageEntries,
30133016
event.payload.pendingMessages,
3014-
hasPendingMessagesField,
3017+
hasPendingSnapshot,
30153018
event.sessionId,
3019+
hasPendingEntriesField,
30163020
)
30173021
: (s.transportPendingMessageEntries ?? []),
30183022
transportPendingMessageVersion: applyPending
@@ -3023,33 +3027,34 @@ export function App() {
30233027
} else if (liveState === 'idle') {
30243028
setSessions((prev) => prev.map((s) => {
30253029
if (s.name !== event.sessionId) return s;
3026-
const incomingMessages = extractTransportPendingMessages(event.payload.pendingMessages);
3030+
const hasPendingEntriesField = Object.prototype.hasOwnProperty.call(event.payload, 'pendingMessageEntries');
3031+
const hasPendingMessagesField = Object.prototype.hasOwnProperty.call(event.payload, 'pendingMessages');
3032+
const parsedIncomingMessages = extractTransportPendingMessages(event.payload.pendingMessages);
30273033
const incomingEntries = normalizeTransportPendingEntries(
30283034
event.payload.pendingMessageEntries,
3029-
incomingMessages,
3035+
parsedIncomingMessages,
30303036
event.sessionId,
3037+
{ hasEntriesField: hasPendingEntriesField, hasMessagesField: hasPendingMessagesField },
30313038
);
3039+
const incomingMessages = hasPendingEntriesField ? incomingEntries.map((entry) => entry.text) : parsedIncomingMessages;
30323040
const applyPending = shouldApplyTransportQueueSnapshotForPayload(s.transportPendingMessageVersion, incomingVersion, {
3033-
hasExplicitSnapshot: hasPendingMessagesField,
3034-
isExplicitEmpty: hasPendingMessagesField && incomingMessages.length === 0 && incomingEntries.length === 0,
3041+
hasExplicitSnapshot: hasPendingSnapshot,
3042+
isExplicitEmpty: hasPendingSnapshot && incomingMessages.length === 0 && incomingEntries.length === 0,
30353043
});
30363044
return {
30373045
...s,
30383046
state: liveState as SessionInfo['state'],
30393047
transportPendingMessages: applyPending
3040-
? mergeTransportPendingMessagesForIdleState(
3041-
s.transportPendingMessages,
3042-
event.payload.pendingMessages,
3043-
hasPendingMessagesField,
3044-
)
3048+
? incomingMessages
30453049
: (s.transportPendingMessages ?? []),
30463050
transportPendingMessageEntries: applyPending
30473051
? mergeTransportPendingEntriesForIdleState(
30483052
s.transportPendingMessageEntries,
30493053
event.payload.pendingMessageEntries,
30503054
event.payload.pendingMessages,
3051-
hasPendingMessagesField,
3055+
hasPendingSnapshot,
30523056
event.sessionId,
3057+
hasPendingEntriesField,
30533058
)
30543059
: (s.transportPendingMessageEntries ?? []),
30553060
transportPendingMessageVersion: applyPending

0 commit comments

Comments
 (0)