Skip to content

Commit f371bec

Browse files
IM.codesclaude
andcommitted
chore(wip): batch in-progress transport/sub-session/todo working-tree changes
Consolidates the remaining working-tree changes (transport session runtime + queue/ack behavior, session-manager, claude-code-sdk, command-handler, lifecycle, sub-session UI, agent-todos) and their tests, committed together at the user's request. Verified before commit: daemon/web/server typecheck clean; all touched test files pass (daemon 308, web 202). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 9ed6afa commit f371bec

17 files changed

Lines changed: 1135 additions & 114 deletions

src/agent/providers/claude-code-sdk.ts

Lines changed: 92 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,24 @@ const CLAUDE_RUNTIME_SUBAGENT_SYSTEM_SUBTYPES = new Set([
9494
'runtime_subagent_notification',
9595
'runtime_subagent_status',
9696
]);
97+
const CLAUDE_CHECKLIST_TOOL_NAMES = new Set([
98+
'todowrite',
99+
'todo_write',
100+
'write_todos',
101+
'writetodos',
102+
'update_plan',
103+
'updateplan',
104+
'update_todo_list',
105+
'updatetodolist',
106+
'set_plan',
107+
'setplan',
108+
]);
109+
const CLAUDE_CHECKLIST_LIST_KEYS = ['todos', 'plan', 'tasks', 'steps'] as const;
110+
const CLAUDE_CHECKLIST_TEXT_KEYS = ['content', 'step', 'text', 'title', 'task', 'description', 'name'] as const;
111+
112+
type ClaudeChecklistInput = {
113+
plan: Array<{ content: string; status: 'pending' | 'in_progress' | 'completed' }>;
114+
};
97115

98116
interface ClaudeSdkSessionState {
99117
routeId: string;
@@ -1537,15 +1555,16 @@ export class ClaudeCodeSdkProvider implements TransportProvider, InteractiveQues
15371555
}
15381556

15391557
private emitToolCall(sessionId: string, state: ClaudeSdkSessionState, tool: ToolCallEvent): void {
1558+
const normalizedTool = this.normalizeChecklistToolCall(tool);
15401559
const signature = JSON.stringify({
1541-
status: tool.status,
1542-
name: tool.name,
1543-
input: tool.input ?? null,
1544-
output: tool.output ?? null,
1560+
status: normalizedTool.status,
1561+
name: normalizedTool.name,
1562+
input: normalizedTool.input ?? null,
1563+
output: normalizedTool.output ?? null,
15451564
});
1546-
if (state.emittedToolStates.get(tool.id) === signature) return;
1547-
state.emittedToolStates.set(tool.id, signature);
1548-
for (const cb of this.toolCallCallbacks) cb(sessionId, tool);
1565+
if (state.emittedToolStates.get(normalizedTool.id) === signature) return;
1566+
state.emittedToolStates.set(normalizedTool.id, signature);
1567+
for (const cb of this.toolCallCallbacks) cb(sessionId, normalizedTool);
15491568
}
15501569

15511570
private isClaudeTaskLifecycleMessage(msg: SDKMessage): msg is ClaudeTaskLifecycleMessage {
@@ -1587,6 +1606,72 @@ export class ClaudeCodeSdkProvider implements TransportProvider, InteractiveQues
15871606
};
15881607
}
15891608

1609+
private normalizeChecklistToolCall(tool: ToolCallEvent): ToolCallEvent {
1610+
const normalizedName = normalizeStatusName(tool.name);
1611+
if (!CLAUDE_CHECKLIST_TOOL_NAMES.has(normalizedName)) return tool;
1612+
const input = this.normalizeClaudeChecklistInput(tool.input)
1613+
?? this.normalizeClaudeChecklistInput(tool.detail?.input);
1614+
if (!input) return tool;
1615+
return {
1616+
...tool,
1617+
input,
1618+
detail: {
1619+
...(tool.detail ?? {}),
1620+
kind: 'plan',
1621+
summary: 'Plan',
1622+
input,
1623+
raw: tool.detail?.raw ?? tool.input,
1624+
},
1625+
};
1626+
}
1627+
1628+
private normalizeClaudeChecklistInput(value: unknown): ClaudeChecklistInput | null {
1629+
const rawItems = Array.isArray(value) ? value : this.rawChecklistItemsFromRecord(value);
1630+
if (!rawItems) return null;
1631+
const plan: ClaudeChecklistInput['plan'] = [];
1632+
for (const rawItem of rawItems) {
1633+
const content = this.claudeChecklistText(rawItem);
1634+
if (!content) continue;
1635+
const record = this.asRecord(rawItem);
1636+
const status = record
1637+
? this.normalizeClaudeChecklistStatus(record.status)
1638+
: 'pending';
1639+
plan.push({ content, status });
1640+
}
1641+
return { plan };
1642+
}
1643+
1644+
private rawChecklistItemsFromRecord(value: unknown): unknown[] | null {
1645+
const record = this.asRecord(value);
1646+
if (!record) return null;
1647+
for (const key of CLAUDE_CHECKLIST_LIST_KEYS) {
1648+
if (Array.isArray(record[key])) return record[key] as unknown[];
1649+
}
1650+
return null;
1651+
}
1652+
1653+
private claudeChecklistText(value: unknown): string {
1654+
if (typeof value === 'string') return value.trim();
1655+
const record = this.asRecord(value);
1656+
if (!record) return '';
1657+
for (const key of CLAUDE_CHECKLIST_TEXT_KEYS) {
1658+
const text = record[key];
1659+
if (typeof text === 'string' && text.trim()) return text.trim();
1660+
}
1661+
return '';
1662+
}
1663+
1664+
private normalizeClaudeChecklistStatus(value: unknown): 'pending' | 'in_progress' | 'completed' {
1665+
const normalized = normalizeStatusName(typeof value === 'string' ? value : undefined);
1666+
if (normalized === 'completed' || normalized === 'complete' || normalized === 'done' || normalized === 'finished' || normalized === 'checked') {
1667+
return 'completed';
1668+
}
1669+
if (normalized === 'inprogress' || normalized === 'active' || normalized === 'doing' || normalized === 'running' || normalized === 'started') {
1670+
return 'in_progress';
1671+
}
1672+
return 'pending';
1673+
}
1674+
15901675
private tryParsePartialJson(value: string): unknown {
15911676
try {
15921677
return JSON.parse(value);

src/agent/session-manager.ts

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import { buildTransportResumeLaunchOpts } from './transport-resume-opts.js';
1212
import { RUNTIME_TYPES } from './session-runtime.js';
1313
import { TransportSessionRuntime } from './transport-session-runtime.js';
1414
import { ensureProviderConnected, getProvider } from './provider-registry.js';
15-
import type { SessionInfoUpdate } from './transport-provider.js';
15+
import { PROVIDER_ERROR_CODES, type SessionInfoUpdate } from './transport-provider.js';
1616
import { setupCCStopHook } from './signal.js';
1717
import { setupCodexNotify, setupOpenCodePlugin } from './notify-setup.js';
1818
import {
@@ -22,6 +22,7 @@ import {
2222
listSessions as storeSessions,
2323
updateSessionState,
2424
type SessionRecord,
25+
type SessionState,
2526
} from '../store/session-store.js';
2627
import logger from '../util/logger.js';
2728
import { mapWithConcurrency } from '../util/concurrency.js';
@@ -66,6 +67,12 @@ function isStoredTransportSession(record: Pick<SessionRecord, 'runtimeType' | 'a
6667
|| isTransportAgent(record.agentType as AgentType);
6768
}
6869

70+
function shouldAutoRelaunchTransportRuntimeAfterError(
71+
providerError: TransportSessionRuntime['lastProviderError'],
72+
): boolean {
73+
return providerError?.code === PROVIDER_ERROR_CODES.CONNECTION_LOST;
74+
}
75+
6976
function sanitizeCodexSdkStartupModel(value: string | null | undefined): string | undefined {
7077
const trimmed = value?.trim();
7178
if (!trimmed) return undefined;
@@ -1091,9 +1098,9 @@ const TRANSPORT_RESTORE_INTER_SESSION_DELAY_MS = (() => {
10911098
})();
10921099
const transportErrorRecoveryTimestamps = new Map<string, number[]>();
10931100

1094-
function pauseBetweenTransportRestores(index: number): Promise<void> {
1101+
function pauseBetweenTransportRestores(index: number, delayMs = TRANSPORT_RESTORE_INTER_SESSION_DELAY_MS): Promise<void> {
10951102
if (index <= 0) return new Promise((resolve) => setImmediate(resolve));
1096-
return new Promise((resolve) => setTimeout(resolve, TRANSPORT_RESTORE_INTER_SESSION_DELAY_MS));
1103+
return new Promise((resolve) => setTimeout(resolve, delayMs));
10971104
}
10981105

10991106
function buildTransportSessionEnv(
@@ -1139,8 +1146,31 @@ async function recoverTransportRuntimeAfterError(
11391146
return false;
11401147
}
11411148

1149+
const providerError = runtime.lastProviderError;
1150+
if (!shouldAutoRelaunchTransportRuntimeAfterError(providerError)) {
1151+
logger.warn(
1152+
{
1153+
sessionName,
1154+
providerError,
1155+
status: runtime.getStatus(),
1156+
pendingCount: runtime.pendingCount,
1157+
activeDispatchCount: runtime.activeDispatchEntries.length,
1158+
},
1159+
'Transport runtime error did not indicate provider connection loss; skipping provider relaunch',
1160+
);
1161+
return false;
1162+
}
1163+
11421164
const preservation = preserveTransportRuntimeQueuesToResend(sessionName, runtime);
11431165
const pendingCount = preservation.afterCount;
1166+
logger.warn(
1167+
{
1168+
sessionName,
1169+
providerError,
1170+
...preservation,
1171+
},
1172+
'Transport provider connection lost — preserving queues and relaunching provider runtime',
1173+
);
11441174

11451175
const now = Date.now();
11461176
const windowStart = now - RESTART_WINDOW_MS;
@@ -1169,7 +1199,7 @@ async function recoverTransportRuntimeAfterError(
11691199
if (pendingCount > 0) {
11701200
const queued = getResendEntries(sessionName);
11711201
timelineEmitter.emit(sessionName, 'assistant.text', {
1172-
text: `⏳ Provider error detectedrestarting and auto-resending ${pendingCount} queued message${pendingCount === 1 ? '' : 's'}.`,
1202+
text: `⏳ Provider connection lost — auto-resending ${pendingCount} queued message${pendingCount === 1 ? '' : 's'} after recovery.`,
11731203
streaming: false,
11741204
memoryExcluded: true,
11751205
}, { source: 'daemon', confidence: 'high' });
@@ -1368,12 +1398,21 @@ async function drainTransportResendQueueIntoRuntime(
13681398

13691399
function wireTransportCallbacks(runtime: TransportSessionRuntime, sessionName: string): void {
13701400
const transportUserEventId = (clientMessageId: string) => `transport-user:${clientMessageId}`;
1401+
const persistTransportState = (state: unknown): void => {
1402+
if (state !== 'running' && state !== 'idle' && state !== 'error') return;
1403+
const existing = getSession(sessionName);
1404+
if (!existing || existing.state === state) return;
1405+
const next: SessionRecord = { ...existing, state: state as SessionState, updatedAt: Date.now() };
1406+
upsertSession(next);
1407+
emitSessionPersist(next, sessionName);
1408+
};
13711409
runtime.onStatusChange = (status) => {
13721410
// Emit assistant.thinking for chat typing indicator (matches tmux watcher behavior)
13731411
if (status === 'thinking') {
13741412
timelineEmitter.emit(sessionName, 'assistant.thinking', { text: '' }, { source: 'daemon', confidence: 'high' });
13751413
}
13761414
const mapped = (status === 'streaming' || status === 'thinking') ? 'running' : status;
1415+
persistTransportState(mapped);
13771416
// Include pending info only on idle — the authoritative "turn done, queue empty" signal.
13781417
// During running/streaming, command-handler's 'queued' event is the sole queue-update
13791418
// authority. This keeps queued messages visible in the UI until the drained turn completes.
@@ -1416,6 +1455,7 @@ function wireTransportCallbacks(runtime: TransportSessionRuntime, sessionName: s
14161455
// been moved into the timeline via user.message emissions above, so they must
14171456
// leave the queue UI simultaneously. The runtime's pending queue is now [] (or
14181457
// contains any NEW messages queued since drain started).
1458+
persistTransportState('running');
14191459
timelineEmitter.emit(sessionName, 'session.state', {
14201460
state: 'running',
14211461
pendingCount: runtime.pendingCount,
@@ -1654,10 +1694,16 @@ export function getTransportRuntime(name: string): TransportSessionRuntime | und
16541694
*/
16551695
export async function restoreTransportSessions(
16561696
providerId: string,
1657-
options: { onlyWithPendingResend?: boolean } = {},
1697+
options: { onlyWithPendingResend?: boolean; concurrency?: number; interSessionDelayMs?: number } = {},
16581698
): Promise<void> {
16591699
const all = storeSessions();
16601700
const qwenRuntime = providerId === 'qwen' ? await getQwenRuntimeConfig().catch(() => null) : null;
1701+
const restoreConcurrency = Number.isFinite(options.concurrency) && (options.concurrency ?? 0) >= 1
1702+
? Math.trunc(options.concurrency!)
1703+
: TRANSPORT_RESTORE_CONCURRENCY;
1704+
const restoreInterSessionDelayMs = Number.isFinite(options.interSessionDelayMs) && (options.interSessionDelayMs ?? -1) >= 0
1705+
? Math.trunc(options.interSessionDelayMs!)
1706+
: TRANSPORT_RESTORE_INTER_SESSION_DELAY_MS;
16611707
// Restore with BOUNDED CONCURRENCY rather than one-at-a-time. Each session's
16621708
// restore is ~1s of mostly-I/O wait (context bootstrap has a 2.5s timeout +
16631709
// the provider's resume RPC), so a sequential loop over ~30 transport
@@ -1669,15 +1715,25 @@ export async function restoreTransportSessions(
16691715
// process; node:sqlite is synchronous so memory/context reads serialise on
16701716
// the main thread anyway; every store write is keyed by session name.
16711717
type Restorable = SessionRecord & { providerId: string; providerSessionId: string };
1672-
const pending = all.filter((s): s is Restorable =>
1718+
const pending = all.filter((s) =>
16731719
isStoredTransportSession(s)
1674-
&& s.providerId === providerId
1720+
&& (s.providerId ?? s.agentType) === providerId
16751721
&& !!s.providerSessionId
16761722
&& (!options.onlyWithPendingResend || getResendCount(s.name) > 0),
1677-
);
1723+
).map((s) => ({ ...s, providerId, providerSessionId: s.providerSessionId! } as Restorable));
16781724
const restoreOne = async (s: Restorable, index: number): Promise<void> => {
1679-
await pauseBetweenTransportRestores(index);
1680-
if (transportRuntimes.has(s.name)) return; // already rebuilt by oc-sync
1725+
await pauseBetweenTransportRestores(index, restoreInterSessionDelayMs);
1726+
const existingRuntime = transportRuntimes.get(s.name);
1727+
if (existingRuntime?.providerSessionId) return; // already rebuilt by oc-sync / warm restore
1728+
if (existingRuntime) {
1729+
const preservation = preserveTransportRuntimeQueuesToResend(s.name, existingRuntime);
1730+
if (preservation.preservedCount > 0) {
1731+
logger.info({ sessionName: s.name, ...preservation }, 'preserved unbound transport runtime queues before restore');
1732+
}
1733+
await stopTransportRuntimeSession(s.name).catch((err) => {
1734+
logger.warn({ err, session: s.name }, 'Failed to stop unbound transport runtime before restore');
1735+
});
1736+
}
16811737
try {
16821738
const provider = getProvider(s.providerId);
16831739
if (!provider) return;
@@ -1883,10 +1939,10 @@ export async function restoreTransportSessions(
18831939
logger.info({
18841940
providerId,
18851941
count: pending.length,
1886-
concurrency: TRANSPORT_RESTORE_CONCURRENCY,
1887-
interSessionDelayMs: TRANSPORT_RESTORE_INTER_SESSION_DELAY_MS,
1942+
concurrency: restoreConcurrency,
1943+
interSessionDelayMs: restoreInterSessionDelayMs,
18881944
}, 'Restoring transport session runtimes');
1889-
await mapWithConcurrency(pending, TRANSPORT_RESTORE_CONCURRENCY, restoreOne);
1945+
await mapWithConcurrency(pending, restoreConcurrency, restoreOne);
18901946
logger.info({ providerId, count: pending.length }, 'Transport session runtime restore completed');
18911947
}
18921948

0 commit comments

Comments
 (0)