Skip to content

Commit 53bc8cf

Browse files
author
IM.codes
committed
Fix Codex late event state pollution
1 parent 6283630 commit 53bc8cf

2 files changed

Lines changed: 203 additions & 13 deletions

File tree

src/agent/providers/codex-sdk.ts

Lines changed: 73 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ const CANCEL_INTERRUPT_TIMEOUT_MS = 1_500;
6060
const COMPACT_START_ACCEPT_TIMEOUT_MS = 15_000;
6161
const COMPACT_NO_SIGNAL_SETTLE_MS = 5_000;
6262
const COMPACT_HARD_TIMEOUT_MS = 120_000;
63+
const TERMINATED_TURN_CACHE_LIMIT = 200;
64+
const TERMINATED_COMPACT_TURN_CACHE_LIMIT = 80;
6365
const DEFAULT_CODEX_SDK_CONTEXT_INJECTION_MAX_CHARS = 32_000;
6466
const MIN_CODEX_SDK_CONTEXT_INJECTION_MAX_CHARS = 4_000;
6567
const MAX_CODEX_SDK_CONTEXT_INJECTION_MAX_CHARS = 128_000;
@@ -435,6 +437,8 @@ interface CodexSdkSessionState {
435437
pendingSessionSystemTextUpdateTurnId?: string;
436438
completedTurnIds: Set<string>;
437439
completedCompactTurnIds: Set<string>;
440+
terminatedTurnIds: Set<string>;
441+
terminatedCompactTurnIds: Set<string>;
438442
generatedImageTracking: GeneratedImageTrackingSnapshot | null;
439443
generatedImagePaths: string[];
440444
rawChecklistStartedAt: number;
@@ -1570,6 +1574,8 @@ export class CodexSdkProvider implements TransportProvider {
15701574
pendingSessionSystemTextUpdateTurnId: undefined,
15711575
completedTurnIds: existing?.completedTurnIds ?? new Set(),
15721576
completedCompactTurnIds: existing?.completedCompactTurnIds ?? new Set(),
1577+
terminatedTurnIds: existing?.terminatedTurnIds ?? new Set(),
1578+
terminatedCompactTurnIds: existing?.terminatedCompactTurnIds ?? new Set(),
15731579
generatedImageTracking: null,
15741580
generatedImagePaths: [],
15751581
rawChecklistStartedAt: Date.now(),
@@ -1739,6 +1745,7 @@ export class CodexSdkProvider implements TransportProvider {
17391745
if (!this.sessions.has(sessionId)) return;
17401746
if (state.runningTurnId !== turnId) return;
17411747
this.clearStatus(sessionId, state);
1748+
this.rememberTerminatedTurn(state, turnId);
17421749
state.runningTurnId = undefined;
17431750
state.turnStartInFlight = false; state.activeItemIds.clear();
17441751
this.clearRawChecklistPollTimer(state);
@@ -1761,6 +1768,7 @@ export class CodexSdkProvider implements TransportProvider {
17611768
this.clearCompactTimers(state); this.clearRawChecklistPollTimer(state);
17621769
if (!options.clearSessions) {
17631770
this.clearStatus(sessionId, state);
1771+
this.rememberTerminatedActiveTurn(state);
17641772
state.loaded = false;
17651773
state.runningTurnId = undefined;
17661774
state.turnStartInFlight = false;
@@ -1926,6 +1934,7 @@ export class CodexSdkProvider implements TransportProvider {
19261934
state.nativePlanEventSeen = false;
19271935
if (state.runningTurnId) {
19281936
state.completedTurnIds.delete(state.runningTurnId);
1937+
state.terminatedTurnIds.delete(state.runningTurnId);
19291938
}
19301939
if (shouldInjectStableUpdate) {
19311940
state.pendingSessionSystemTextUpdate = desiredSessionSystemText;
@@ -1936,6 +1945,7 @@ export class CodexSdkProvider implements TransportProvider {
19361945
}
19371946
if (state.runningTurnId) this.armRawChecklistPolling(sessionId, state);
19381947
} catch (err) {
1948+
this.rememberTerminatedTurn(state, state.runningTurnId);
19391949
state.runningTurnId = undefined;
19401950
state.turnStartInFlight = false; state.activeItemIds.clear();
19411951
this.clearRawChecklistPollTimer(state);
@@ -1990,6 +2000,7 @@ export class CodexSdkProvider implements TransportProvider {
19902000
}
19912001
} catch (err) {
19922002
this.clearCompactTimers(state); this.clearStatus(sessionId, state);
2003+
this.rememberTerminatedCompactTurn(state, state.runningTurnId);
19932004
state.runningCompact = false;
19942005
state.runningTurnId = undefined;
19952006
state.turnStartInFlight = false;
@@ -2557,6 +2568,7 @@ export class CodexSdkProvider implements TransportProvider {
25572568
const state = sessionId ? this.sessions.get(sessionId) : null;
25582569
if (!sessionId || !state) return;
25592570
const turnId = readParamTurnId(params);
2571+
if (turnId && (state.cancelled || this.isClosedCodexTurn(state, turnId))) return;
25602572
if (turnId && state.runningTurnId && turnId !== state.runningTurnId) return;
25612573
// Native plan event (codex >= 0.139). Render it AND suppress the legacy
25622574
// rollout-file scan for this session so old (file-scrape) + new never
@@ -2576,14 +2588,15 @@ export class CodexSdkProvider implements TransportProvider {
25762588
if (!sessionId || !state) return;
25772589
if (state.cancelled) return;
25782590
const turnId = readParamTurnId(params);
2579-
if (turnId && state.completedTurnIds.has(turnId)) return;
2591+
const closedTurn = this.isClosedCodexTurn(state, turnId);
25802592
// NEVER drop live assistant text. If our turn bookkeeping lags the
25812593
// app-server (turn/start's result carried no turn id, so runningTurnId was
25822594
// never set, or this delta's turnId is shaped differently), adopt the
25832595
// delta's turnId and render anyway — a real text update must always reach
2584-
// the UI. Only a genuinely-completed turn (above) or an explicit cancel skips.
2585-
if (turnId && !state.runningTurnId) state.runningTurnId = turnId;
2586-
this.clearStatus(sessionId, state);
2596+
// the UI. Closed/terminated turns may still render late text, but they
2597+
// must never be adopted back into running state.
2598+
if (turnId && !closedTurn && !state.runningTurnId) state.runningTurnId = turnId;
2599+
if (!closedTurn) this.clearStatus(sessionId, state);
25872600
// Reset the streaming accumulator when a new agentMessage item starts so
25882601
// its deltas don't render prefixed with the previous message's full text
25892602
// (multi-message turns occur after every tool round). Guards the case
@@ -2609,17 +2622,20 @@ export class CodexSdkProvider implements TransportProvider {
26092622
const state = sessionId ? this.sessions.get(sessionId) : null;
26102623
if (!sessionId || !state) return;
26112624
const turnId = readParamTurnId(params);
2612-
if (turnId && (state.completedTurnIds.has(turnId) || state.completedCompactTurnIds.has(turnId))) return;
2625+
if (state.cancelled) return;
2626+
const closedTurn = this.isClosedCodexTurn(state, turnId);
26132627

26142628
const item = params.item as Record<string, any> | undefined;
26152629
if (!item) return;
2630+
if (closedTurn && item.type !== 'agentMessage') return;
26162631
// NEVER drop a real provider item. If our turn bookkeeping lags the
26172632
// app-server (turn/start's result carried no turn id, or this event's
26182633
// turnId is shaped differently), adopt the event's turnId and process it
26192634
// anyway rather than silently dropping tool calls / reasoning / final
2620-
// assistant text. Completed turns (above) and explicit cancels still skip.
2621-
if (turnId && !state.runningTurnId) state.runningTurnId = turnId;
2622-
this.trackCodexTurnItemActivity(sessionId, state, method, item);
2635+
// assistant text. Closed/terminated turns may still surface final
2636+
// assistant text, but they must never be adopted back into running state.
2637+
if (turnId && !closedTurn && !state.runningTurnId) state.runningTurnId = turnId;
2638+
if (!closedTurn) this.trackCodexTurnItemActivity(sessionId, state, method, item);
26232639

26242640
if (item.type === 'contextCompaction') {
26252641
state.runningCompact = true;
@@ -2637,8 +2653,6 @@ export class CodexSdkProvider implements TransportProvider {
26372653
return;
26382654
}
26392655

2640-
if (state.cancelled) return;
2641-
26422656
if (item.type === 'reasoning') {
26432657
this.emitStatus(sessionId, state, {
26442658
status: 'thinking',
@@ -2647,7 +2661,7 @@ export class CodexSdkProvider implements TransportProvider {
26472661
return;
26482662
}
26492663

2650-
this.clearStatus(sessionId, state);
2664+
if (!closedTurn) this.clearStatus(sessionId, state);
26512665

26522666
const tool = toolFromItem(sessionId, item, method === 'item/started' ? 'started' : 'completed');
26532667
if (tool) {
@@ -2689,14 +2703,15 @@ export class CodexSdkProvider implements TransportProvider {
26892703
const status = turn.status;
26902704
const turnId = readParamTurnId(params);
26912705

2692-
if (turnId && state.completedCompactTurnIds.has(turnId)) {
2706+
if (turnId && this.isClosedCompactTurn(state, turnId)) {
26932707
return;
26942708
}
2695-
if (turnId && state.completedTurnIds.has(turnId)) {
2709+
if (turnId && this.isClosedTurn(state, turnId)) {
26962710
return;
26972711
}
26982712

26992713
if (status === 'failed') {
2714+
this.rememberTerminatedActiveTurn(state, turnId);
27002715
this.clearCancelTimer(state);
27012716
this.clearCompactTimers(state);
27022717
this.clearRawChecklistPollTimer(state);
@@ -2716,6 +2731,7 @@ export class CodexSdkProvider implements TransportProvider {
27162731
return;
27172732
}
27182733
if (status === 'interrupted') {
2734+
this.rememberTerminatedActiveTurn(state, turnId);
27192735
this.clearCancelTimer(state);
27202736
this.clearCompactTimers(state);
27212737
this.clearRawChecklistPollTimer(state);
@@ -2740,6 +2756,7 @@ export class CodexSdkProvider implements TransportProvider {
27402756
}
27412757

27422758
if (state.cancelled) {
2759+
this.rememberTerminatedActiveTurn(state, turnId);
27432760
this.clearCancelTimer(state);
27442761
this.clearRawChecklistPollTimer(state);
27452762
this.clearStatus(sessionId, state);
@@ -2824,6 +2841,29 @@ export class CodexSdkProvider implements TransportProvider {
28242841
if (oldest) state.completedTurnIds.delete(oldest);
28252842
}
28262843

2844+
private rememberTerminatedTurn(state: CodexSdkSessionState, turnId?: string): void {
2845+
if (!turnId) return;
2846+
state.terminatedTurnIds.add(turnId);
2847+
if (state.terminatedTurnIds.size <= TERMINATED_TURN_CACHE_LIMIT) return;
2848+
const oldest = state.terminatedTurnIds.values().next().value;
2849+
if (oldest) state.terminatedTurnIds.delete(oldest);
2850+
}
2851+
2852+
private isClosedTurn(state: CodexSdkSessionState, turnId?: string): boolean {
2853+
return Boolean(turnId && (state.completedTurnIds.has(turnId) || state.terminatedTurnIds.has(turnId)));
2854+
}
2855+
2856+
private isClosedCompactTurn(state: CodexSdkSessionState, turnId?: string): boolean {
2857+
return Boolean(turnId && (
2858+
state.completedCompactTurnIds.has(turnId)
2859+
|| state.terminatedCompactTurnIds.has(turnId)
2860+
));
2861+
}
2862+
2863+
private isClosedCodexTurn(state: CodexSdkSessionState, turnId?: string): boolean {
2864+
return this.isClosedTurn(state, turnId) || this.isClosedCompactTurn(state, turnId);
2865+
}
2866+
28272867
private request(method: string, params: Record<string, any>, timeoutMs?: number): Promise<any> {
28282868
if (!this.child?.stdin.writable) {
28292869
return Promise.reject(new Error('Codex app-server stdin is not writable'));
@@ -2893,6 +2933,24 @@ export class CodexSdkProvider implements TransportProvider {
28932933
if (oldest) state.completedCompactTurnIds.delete(oldest);
28942934
}
28952935

2936+
private rememberTerminatedCompactTurn(state: CodexSdkSessionState, turnId?: string): void {
2937+
if (!turnId) return;
2938+
state.terminatedCompactTurnIds.add(turnId);
2939+
if (state.terminatedCompactTurnIds.size <= TERMINATED_COMPACT_TURN_CACHE_LIMIT) return;
2940+
const oldest = state.terminatedCompactTurnIds.values().next().value;
2941+
if (oldest) state.terminatedCompactTurnIds.delete(oldest);
2942+
}
2943+
2944+
private rememberTerminatedActiveTurn(state: CodexSdkSessionState, turnId?: string): void {
2945+
const resolvedTurnId = turnId ?? state.runningTurnId;
2946+
if (!resolvedTurnId) return;
2947+
if (state.runningCompact) {
2948+
this.rememberTerminatedCompactTurn(state, resolvedTurnId);
2949+
return;
2950+
}
2951+
this.rememberTerminatedTurn(state, resolvedTurnId);
2952+
}
2953+
28962954
/**
28972955
* Expose the `account/rateLimits/read` RPC over the already-connected
28982956
* app-server so callers (e.g. the daemon's rate-limit probe) can reuse
@@ -3030,6 +3088,7 @@ export class CodexSdkProvider implements TransportProvider {
30303088
this.clearCancelTimer(state);
30313089
this.clearCompactTimers(state);
30323090
this.clearRawChecklistPollTimer(state); this.clearStatus(sessionId, state);
3091+
this.rememberTerminatedCompactTurn(state, state.runningTurnId);
30333092
state.runningCompact = false;
30343093
state.runningTurnId = undefined;
30353094
state.turnStartInFlight = false;
@@ -3107,6 +3166,7 @@ export class CodexSdkProvider implements TransportProvider {
31073166
if (!this.sessions.has(sessionId)) return;
31083167
if (!state.runningCompact) return;
31093168
this.clearCompactTimers(state); this.clearStatus(sessionId, state);
3169+
this.rememberTerminatedCompactTurn(state, state.runningTurnId);
31103170
state.runningCompact = false;
31113171
state.runningTurnId = undefined;
31123172
state.turnStartInFlight = false;

test/agent/codex-sdk-provider.test.ts

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2844,6 +2844,136 @@ describe('CodexSdkProvider', () => {
28442844
expect(child.requests.filter((req) => req.method === 'turn/start')).toHaveLength(2);
28452845
});
28462846

2847+
it('does not let late Codex item events re-adopt a cancelled turn after the cancel watchdog', async () => {
2848+
vi.useFakeTimers();
2849+
childProcessMock.setHoldTurnInterrupt(true);
2850+
const provider = new CodexSdkProvider();
2851+
await provider.connect({ binaryPath: 'codex' });
2852+
await provider.createSession({ sessionKey: 'route-cancel-late-item-turnid', cwd: '/tmp/project' });
2853+
2854+
await provider.send('route-cancel-late-item-turnid', 'hello');
2855+
const child = childProcessMock.children[0];
2856+
await provider.cancel('route-cancel-late-item-turnid');
2857+
await vi.advanceTimersByTimeAsync(1_600);
2858+
2859+
child.emits({
2860+
method: 'item/started',
2861+
params: {
2862+
threadId: 'thread-1',
2863+
turnId: 'turn-1',
2864+
item: { id: 'late-reasoning', type: 'reasoning', text: 'late' },
2865+
},
2866+
});
2867+
await vi.advanceTimersByTimeAsync(0);
2868+
2869+
expect(provider.getSessionDiagnostics('route-cancel-late-item-turnid')).toMatchObject({
2870+
runningTurnId: null,
2871+
runningCompact: false,
2872+
});
2873+
await provider.send('route-cancel-late-item-turnid', 'after-cancel');
2874+
expect(child.requests.filter((req) => req.method === 'turn/start')).toHaveLength(2);
2875+
});
2876+
2877+
it('does not let late contextCompaction items re-enter compacting after cancel', async () => {
2878+
vi.useFakeTimers();
2879+
childProcessMock.setHoldTurnInterrupt(true);
2880+
const provider = new CodexSdkProvider();
2881+
await provider.connect({ binaryPath: 'codex' });
2882+
await provider.createSession({ sessionKey: 'route-cancel-late-compact-item', cwd: '/tmp/project' });
2883+
2884+
await provider.send('route-cancel-late-compact-item', 'hello');
2885+
const child = childProcessMock.children[0];
2886+
await provider.cancel('route-cancel-late-compact-item');
2887+
await vi.advanceTimersByTimeAsync(1_600);
2888+
2889+
child.emits({
2890+
method: 'item/started',
2891+
params: {
2892+
threadId: 'thread-1',
2893+
turnId: 'turn-1',
2894+
item: { id: 'late-compact', type: 'contextCompaction' },
2895+
},
2896+
});
2897+
await vi.advanceTimersByTimeAsync(0);
2898+
2899+
expect(provider.getSessionDiagnostics('route-cancel-late-compact-item')).toMatchObject({
2900+
runningTurnId: null,
2901+
runningCompact: false,
2902+
});
2903+
await provider.send('route-cancel-late-compact-item', 'after-cancel');
2904+
expect(child.requests.filter((req) => req.method === 'turn/start')).toHaveLength(2);
2905+
});
2906+
2907+
it('does not let late Codex item events re-adopt a failed turn', async () => {
2908+
const provider = new CodexSdkProvider();
2909+
await provider.connect({ binaryPath: 'codex' });
2910+
await provider.createSession({ sessionKey: 'route-failed-late-item-turnid', cwd: '/tmp/project' });
2911+
2912+
const deltas: string[] = [];
2913+
provider.onDelta((_sid, delta) => deltas.push(delta.delta));
2914+
2915+
await provider.send('route-failed-late-item-turnid', 'hello');
2916+
const child = childProcessMock.children[0];
2917+
child.emits({
2918+
method: 'turn/completed',
2919+
params: {
2920+
threadId: 'thread-1',
2921+
turn: { id: 'turn-1', status: 'failed', error: { message: 'boom' } },
2922+
},
2923+
});
2924+
await flush();
2925+
2926+
child.emits({
2927+
method: 'item/completed',
2928+
params: {
2929+
threadId: 'thread-1',
2930+
turnId: 'turn-1',
2931+
item: { id: 'late-msg', type: 'agentMessage', text: 'late text' },
2932+
},
2933+
});
2934+
await flush();
2935+
2936+
expect(provider.getSessionDiagnostics('route-failed-late-item-turnid')).toMatchObject({
2937+
runningTurnId: null,
2938+
runningCompact: false,
2939+
});
2940+
expect(deltas).toContain('late text');
2941+
await provider.send('route-failed-late-item-turnid', 'after-failure');
2942+
expect(child.requests.filter((req) => req.method === 'turn/start')).toHaveLength(2);
2943+
});
2944+
2945+
it('does not render late plan updates for a failed turn', async () => {
2946+
const provider = new CodexSdkProvider();
2947+
await provider.connect({ binaryPath: 'codex' });
2948+
await provider.createSession({ sessionKey: 'route-failed-late-plan', cwd: '/tmp/project' });
2949+
2950+
const tools: ToolCallEvent[] = [];
2951+
provider.onToolCall((_sid, tool) => tools.push(tool));
2952+
2953+
await provider.send('route-failed-late-plan', 'hello');
2954+
const child = childProcessMock.children[0];
2955+
child.emits({
2956+
method: 'turn/completed',
2957+
params: {
2958+
threadId: 'thread-1',
2959+
turn: { id: 'turn-1', status: 'failed', error: { message: 'boom' } },
2960+
},
2961+
});
2962+
await flush();
2963+
2964+
child.emits({
2965+
method: 'turn/plan/updated',
2966+
params: {
2967+
threadId: 'thread-1',
2968+
turnId: 'turn-1',
2969+
plan: { steps: [{ text: 'late plan', status: 'in_progress' }] },
2970+
},
2971+
});
2972+
await flush();
2973+
2974+
expect(tools).toEqual([]);
2975+
});
2976+
28472977
it('emits WebSearch tool events for webSearch items (legacy top-level query)', async () => {
28482978
const provider = new CodexSdkProvider();
28492979
await provider.connect({ binaryPath: 'codex' });

0 commit comments

Comments
 (0)