Skip to content

Commit 497f86b

Browse files
author
IM.codes
committed
Fix stale transport busy recovery
1 parent daba1e2 commit 497f86b

5 files changed

Lines changed: 221 additions & 25 deletions

File tree

src/agent/session-manager.ts

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,15 @@ function isStoredTransportSession(record: Pick<SessionRecord, 'runtimeType' | 'a
7070
function shouldAutoRelaunchTransportRuntimeAfterError(
7171
providerError: TransportSessionRuntime['lastProviderError'],
7272
): boolean {
73-
return providerError?.code === PROVIDER_ERROR_CODES.CONNECTION_LOST;
73+
if (!providerError) return false;
74+
if (providerError.code === PROVIDER_ERROR_CODES.CONNECTION_LOST) return true;
75+
// Codex SDK can occasionally keep an internal "running turn" marker even
76+
// though the daemon has no active work left. Manual daemon restart clears the
77+
// stale provider state; treat repeated recoverable "already busy" failures as
78+
// the same relaunchable provider-wedged condition instead of leaving the UI in
79+
// a bare error state forever.
80+
return providerError.code === PROVIDER_ERROR_CODES.PROVIDER_ERROR
81+
&& /already busy|session is busy|provider is busy/i.test(providerError.message);
7482
}
7583

7684
function sanitizeCodexSdkStartupModel(value: string | null | undefined): string | undefined {
@@ -1156,7 +1164,7 @@ async function recoverTransportRuntimeAfterError(
11561164
pendingCount: runtime.pendingCount,
11571165
activeDispatchCount: runtime.activeDispatchEntries.length,
11581166
},
1159-
'Transport runtime error did not indicate provider connection loss; skipping provider relaunch',
1167+
'Transport runtime error did not indicate a relaunchable provider failure; skipping provider relaunch',
11601168
);
11611169
return false;
11621170
}
@@ -1169,7 +1177,7 @@ async function recoverTransportRuntimeAfterError(
11691177
providerError,
11701178
...preservation,
11711179
},
1172-
'Transport provider connection lost — preserving queues and relaunching provider runtime',
1180+
'Transport provider failure — preserving queues and relaunching provider runtime',
11731181
);
11741182

11751183
const now = Date.now();
@@ -1198,8 +1206,11 @@ async function recoverTransportRuntimeAfterError(
11981206

11991207
if (pendingCount > 0) {
12001208
const queued = getResendEntries(sessionName);
1209+
const recoveryReason = providerError?.code === PROVIDER_ERROR_CODES.CONNECTION_LOST
1210+
? 'Provider connection lost'
1211+
: 'Provider became stuck busy';
12011212
timelineEmitter.emit(sessionName, 'assistant.text', {
1202-
text: `⏳ Provider connection lost — auto-resending ${pendingCount} queued message${pendingCount === 1 ? '' : 's'} after recovery.`,
1213+
text: `⏳ ${recoveryReason} — auto-resending ${pendingCount} queued message${pendingCount === 1 ? '' : 's'} after recovery.`,
12031214
streaming: false,
12041215
memoryExcluded: true,
12051216
}, { source: 'daemon', confidence: 'high' });
@@ -1304,22 +1315,44 @@ async function drainTransportResendQueueIntoRuntime(
13041315
entry.commandId,
13051316
attachments.length > 0 ? attachments : undefined,
13061317
entry.messagePreamble,
1307-
sharedMetadata,
1318+
{
1319+
...sharedMetadata,
1320+
...(entry.timelineCommitted ? { timelineCommitted: true } : {}),
1321+
...(entry.historyCommitted ? { historyCommitted: true } : {}),
1322+
},
13081323
)
13091324
: runtime.send(
13101325
entry.text,
13111326
entry.commandId,
13121327
attachments.length > 0 ? attachments : undefined,
13131328
entry.messagePreamble,
1329+
{
1330+
...(entry.timelineCommitted ? { timelineCommitted: true } : {}),
1331+
...(entry.historyCommitted ? { historyCommitted: true } : {}),
1332+
},
13141333
))
13151334
: (attachments.length > 0
13161335
? (sharedMetadata
1317-
? runtime.send(entry.text, entry.commandId, attachments, undefined, sharedMetadata)
1318-
: runtime.send(entry.text, entry.commandId, attachments))
1336+
? runtime.send(entry.text, entry.commandId, attachments, undefined, {
1337+
...sharedMetadata,
1338+
...(entry.timelineCommitted ? { timelineCommitted: true } : {}),
1339+
...(entry.historyCommitted ? { historyCommitted: true } : {}),
1340+
})
1341+
: runtime.send(entry.text, entry.commandId, attachments, undefined, {
1342+
...(entry.timelineCommitted ? { timelineCommitted: true } : {}),
1343+
...(entry.historyCommitted ? { historyCommitted: true } : {}),
1344+
}))
13191345
: (sharedMetadata
1320-
? runtime.send(entry.text, entry.commandId, undefined, undefined, sharedMetadata)
1321-
: runtime.send(entry.text, entry.commandId)));
1322-
if (result === 'sent') {
1346+
? runtime.send(entry.text, entry.commandId, undefined, undefined, {
1347+
...sharedMetadata,
1348+
...(entry.timelineCommitted ? { timelineCommitted: true } : {}),
1349+
...(entry.historyCommitted ? { historyCommitted: true } : {}),
1350+
})
1351+
: runtime.send(entry.text, entry.commandId, undefined, undefined, {
1352+
...(entry.timelineCommitted ? { timelineCommitted: true } : {}),
1353+
...(entry.historyCommitted ? { historyCommitted: true } : {}),
1354+
})));
1355+
if (result === 'sent' && !entry.timelineCommitted) {
13231356
timelineEmitter.emit(
13241357
sessionName,
13251358
'user.message',
@@ -1341,6 +1374,14 @@ async function drainTransportResendQueueIntoRuntime(
13411374
pendingMessageEntries: runtime.pendingEntries,
13421375
pendingMessageVersion: observeTransportQueueRevision(sessionName, runtime.pendingVersion),
13431376
}, { source: 'daemon', confidence: 'high' });
1377+
} else if (result === 'sent') {
1378+
timelineEmitter.emit(sessionName, 'session.state', {
1379+
state: 'running',
1380+
pendingCount: runtime.pendingCount,
1381+
pendingMessages: runtime.pendingMessages,
1382+
pendingMessageEntries: runtime.pendingEntries,
1383+
pendingMessageVersion: observeTransportQueueRevision(sessionName, runtime.pendingVersion),
1384+
}, { source: 'daemon', confidence: 'high' });
13441385
} else if (result === 'queued') {
13451386
timelineEmitter.emit(sessionName, 'session.state', {
13461387
state: 'queued',
@@ -1422,6 +1463,8 @@ function wireTransportCallbacks(runtime: TransportSessionRuntime, sessionName: s
14221463
payload.pendingMessages = runtime.pendingMessages;
14231464
payload.pendingMessageEntries = runtime.pendingEntries;
14241465
payload.pendingMessageVersion = observeTransportQueueRevision(sessionName, runtime.pendingVersion);
1466+
} else if (mapped === 'error' && runtime.lastProviderError?.message) {
1467+
payload.error = runtime.lastProviderError.message;
14251468
}
14261469
timelineEmitter.emit(sessionName, 'session.state', payload, { source: 'daemon', confidence: 'high' });
14271470
if (status === 'error') {
@@ -1448,7 +1491,7 @@ function wireTransportCallbacks(runtime: TransportSessionRuntime, sessionName: s
14481491
{ source: 'daemon', confidence: 'high', eventId: transportUserEventId(entry.clientMessageId) },
14491492
);
14501493
}
1451-
if (messages.length === 0) {
1494+
if (messages.length === 0 && count === 0) {
14521495
timelineEmitter.emit(sessionName, 'user.message', { text: merged, batchedCount: count, allowDuplicate: true, pendingMessageVersion: drainedVersion });
14531496
}
14541497
// Include authoritative pending state after drain. The drained messages have

src/agent/transport-session-runtime.ts

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ export interface PendingTransportMessage {
5959
attachments?: TransportAttachment[];
6060
/** Server-authored share actor for attribution only; never injected into provider prompts. */
6161
sharedActor?: SharedActorEnvelope;
62+
/** @internal: this logical user event has already been written to the timeline. */
63+
timelineCommitted?: boolean;
64+
/** @internal: this logical user event has already been written to runtime history. */
65+
historyCommitted?: boolean;
66+
}
67+
68+
function publicPendingEntry(entry: PendingTransportMessage): PendingTransportMessage {
69+
const { timelineCommitted: _timelineCommitted, historyCommitted: _historyCommitted, ...publicEntry } = entry;
70+
return publicEntry;
6271
}
6372

6473
export interface TransportSendMetadata {
@@ -71,6 +80,10 @@ export interface TransportSendMetadata {
7180
* the tail of the FIFO queue.
7281
*/
7382
queuePlacement?: 'normal' | 'front';
83+
/** @internal: set when replaying entries that already have a visible user.message. */
84+
timelineCommitted?: boolean;
85+
/** @internal: set when replaying entries that already exist in runtime history. */
86+
historyCommitted?: boolean;
7487
}
7588

7689
export interface TransportRuntimeDiagnosticSnapshot {
@@ -115,6 +128,11 @@ const MAX_TRANSPORT_STALE_PENDING_RECOVERY_MS = 30 * 60_000;
115128
const MIN_TRANSPORT_STALE_PENDING_CANCEL_FALLBACK_MS = 50;
116129
const MAX_TRANSPORT_STALE_PENDING_CANCEL_FALLBACK_MS = 60_000;
117130

131+
function isRecoverableProviderBusyError(error: ProviderError): boolean {
132+
return error.code === PROVIDER_ERROR_CODES.PROVIDER_ERROR
133+
&& /already busy|session is busy|provider is busy/i.test(error.message);
134+
}
135+
118136
type TimeoutOutcome<T> =
119137
| { timedOut: false; value: T }
120138
| { timedOut: true };
@@ -638,9 +656,13 @@ export class TransportSessionRuntime implements SessionRuntime {
638656
/** Snapshot of queued messages waiting to be drained (legacy text-only view). */
639657
get pendingMessages(): string[] { return this._pendingMessages.map((entry) => entry.text); }
640658
/** Snapshot of queued messages waiting to be drained (stable entity ids for UI/edit/undo). */
641-
get pendingEntries(): PendingTransportMessage[] { return this._pendingMessages.map((entry) => ({ ...entry })); }
659+
get pendingEntries(): PendingTransportMessage[] { return this._pendingMessages.map(publicPendingEntry); }
660+
/** Snapshot of queued messages for internal resend preservation, including idempotency markers. */
661+
get pendingEntriesForResend(): PendingTransportMessage[] { return this._pendingMessages.map((entry) => ({ ...entry })); }
642662
/** Snapshot of the message entries currently being dispatched. */
643-
get activeDispatchEntries(): PendingTransportMessage[] { return this._activeDispatchEntries.map((entry) => ({ ...entry })); }
663+
get activeDispatchEntries(): PendingTransportMessage[] { return this._activeDispatchEntries.map(publicPendingEntry); }
664+
/** Snapshot of active entries for internal resend preservation, including idempotency markers. */
665+
get activeDispatchEntriesForResend(): PendingTransportMessage[] { return this._activeDispatchEntries.map((entry) => ({ ...entry })); }
644666

645667
getDiagnosticSnapshot(nowMs: number = Date.now()): TransportRuntimeDiagnosticSnapshot {
646668
let providerDiagnostics: Record<string, unknown> | null | undefined;
@@ -925,6 +947,8 @@ export class TransportSessionRuntime implements SessionRuntime {
925947
...(messagePreamble?.trim() ? { messagePreamble: messagePreamble.trim() } : {}),
926948
...(attachments?.length ? { attachments } : {}),
927949
...(metadata?.sharedActor ? { sharedActor: metadata.sharedActor } : {}),
950+
...(metadata?.timelineCommitted ? { timelineCommitted: true } : {}),
951+
...(metadata?.historyCommitted ? { historyCommitted: true } : {}),
928952
};
929953

930954
if (this.hasActiveTurnWork()) {
@@ -937,6 +961,12 @@ export class TransportSessionRuntime implements SessionRuntime {
937961
return 'queued';
938962
}
939963

964+
// Direct sends are rendered by command-handler / resend-drain after
965+
// runtime.send() returns 'sent'. If this provider-side send later fails
966+
// recoverably and gets retried from the queue, do not render the same
967+
// logical clientMessageId a second time during retry drain.
968+
entry.timelineCommitted = true;
969+
940970
// N-R8 defense-in-depth (audit 0419d1ac-1f4) — wrap direct dispatch so a
941971
// synchronous prologue throw inside `_dispatchTurn` (e.g. some future
942972
// listener regression in `setStatus → _onStatusChange`) cannot leave
@@ -1281,15 +1311,19 @@ export class TransportSessionRuntime implements SessionRuntime {
12811311
void promise.catch(() => {}); // prevent unhandled rejection
12821312
this._activeTurn = { promise, resolve, reject };
12831313

1284-
this._history.push({
1285-
id: randomUUID(),
1286-
sessionId: this._providerSessionId!,
1287-
kind: 'text',
1288-
role: 'user',
1289-
content: message,
1290-
timestamp: Date.now(),
1291-
status: 'complete',
1292-
});
1314+
const historyEntries = this._activeDispatchEntries.filter((entry) => !entry.historyCommitted);
1315+
if (historyEntries.length > 0) {
1316+
this._history.push({
1317+
id: randomUUID(),
1318+
sessionId: this._providerSessionId!,
1319+
kind: 'text',
1320+
role: 'user',
1321+
content: historyEntries.map((entry) => entry.text).join('\n\n'),
1322+
timestamp: Date.now(),
1323+
status: 'complete',
1324+
});
1325+
for (const entry of historyEntries) entry.historyCommitted = true;
1326+
}
12931327

12941328
this.setStatus('thinking');
12951329

@@ -1498,6 +1532,21 @@ export class TransportSessionRuntime implements SessionRuntime {
14981532
}
14991533
this._recoverableDispatchRetries = 0;
15001534
this.clearRecoverableRetryTimer();
1535+
if (providerError.recoverable
1536+
&& isRecoverableProviderBusyError(providerError)
1537+
&& this._activeDispatchEntries.length > 0) {
1538+
// The provider repeatedly claimed "already busy" until the retry
1539+
// budget exhausted. This is usually a stale provider-side busy
1540+
// marker, not real daemon work. Keep the logical messages queued so
1541+
// session-manager can preserve them to resend before relaunching the
1542+
// provider runtime. Do NOT drain into the same wedged provider and
1543+
// do NOT drop the active entries.
1544+
this._pendingMessages.unshift(...this._activeDispatchEntries);
1545+
this._pendingVersion++;
1546+
this._activeDispatchEntries = [];
1547+
this.setStatus('error');
1548+
return;
1549+
}
15011550
this._activeDispatchEntries = [];
15021551
if (this._drainPending()) return;
15031552
// Cancellation → idle (the user stopped). Recoverable budget exhausted
@@ -1531,6 +1580,8 @@ export class TransportSessionRuntime implements SessionRuntime {
15311580
this.clearRecoverableRetryTimer();
15321581

15331582
const messages = this._pendingMessages.splice(0);
1583+
const timelineMessages = messages.filter((entry) => !entry.timelineCommitted);
1584+
for (const entry of timelineMessages) entry.timelineCommitted = true;
15341585
// Bump the queue version the moment the queue empties. The onDrain
15351586
// callback below emits this new version on both the per-entry
15361587
// `user.message` events and the cleared `session.state`, so a stale
@@ -1551,7 +1602,7 @@ export class TransportSessionRuntime implements SessionRuntime {
15511602
// `_pendingMessages` already spliced empty — runtime stuck forever,
15521603
// user-visible as bug 2 "bot stays asleep".
15531604
try {
1554-
this._onDrain?.(messages, merged, messages.length);
1605+
this._onDrain?.(timelineMessages.map(publicPendingEntry), merged, messages.length);
15551606
} catch (err) {
15561607
logger.warn(
15571608
{ err, providerSessionId: this._providerSessionId, count: messages.length },

src/daemon/transport-resend-preservation.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ function preserveEntries(
2323
commandId: entry.clientMessageId,
2424
...(entry.attachments?.length ? { attachments: entry.attachments } : {}),
2525
...(entry.sharedActor ? { sharedActor: entry.sharedActor } : {}),
26+
...(entry.timelineCommitted ? { timelineCommitted: true } : {}),
27+
...(entry.historyCommitted ? { historyCommitted: true } : {}),
2628
queuedAt: Date.now(),
2729
});
2830
seenCommandIds.add(entry.clientMessageId);
@@ -35,8 +37,8 @@ export function preserveTransportRuntimeQueuesToResend(
3537
sessionName: string,
3638
runtime: TransportSessionRuntime,
3739
): TransportRuntimeQueuePreservationResult {
38-
const activeEntries = runtime.activeDispatchEntries ?? [];
39-
const pendingEntries = runtime.pendingEntries ?? [];
40+
const activeEntries = runtime.activeDispatchEntriesForResend ?? runtime.activeDispatchEntries ?? [];
41+
const pendingEntries = runtime.pendingEntriesForResend ?? runtime.pendingEntries ?? [];
4042
const beforeCount = getResendCount(sessionName);
4143
const seenCommandIds = new Set(getResendEntries(sessionName).map((entry) => entry.commandId));
4244
const preservedActiveCount = preserveEntries(sessionName, activeEntries, seenCommandIds);

src/daemon/transport-resend-queue.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ export interface ResendEntry {
4343
attachments?: TransportAttachment[];
4444
/** Server-authored share actor for attribution only; never injected into provider prompts. */
4545
sharedActor?: SharedActorEnvelope;
46+
/** @internal: this logical user event has already been written to the timeline. */
47+
timelineCommitted?: boolean;
48+
/** @internal: this logical user event has already been written to runtime history. */
49+
historyCommitted?: boolean;
4650
/** Enqueue timestamp for expiry calculation. */
4751
queuedAt: number;
4852
}

0 commit comments

Comments
 (0)