diff --git a/src/acp-agent.ts b/src/acp-agent.ts index 65b5afb1..5db2e6a6 100644 --- a/src/acp-agent.ts +++ b/src/acp-agent.ts @@ -149,6 +149,9 @@ type Session = { settingsManager: SettingsManager; accumulatedUsage: AccumulatedUsage; configOptions: SessionConfigOption[]; + promptRunning: boolean; + pendingMessages: Map void; order: number }>; + nextPendingOrder: number; }; type BackgroundTerminal = @@ -281,6 +284,11 @@ export class ClaudeAcpAgent implements Agent { return { protocolVersion: 1, agentCapabilities: { + _meta: { + claudeCode: { + promptQueueing: true, + }, + }, promptCapabilities: { image: true, embeddedContext: true, @@ -410,12 +418,13 @@ export class ClaudeAcpAgent implements Agent { } async prompt(params: PromptRequest): Promise { - if (!this.sessions[params.sessionId]) { + const session = this.sessions[params.sessionId]; + if (!session) { throw new Error("Session not found"); } - this.sessions[params.sessionId].cancelled = false; - this.sessions[params.sessionId].accumulatedUsage = { + session.cancelled = false; + session.accumulatedUsage = { inputTokens: 0, outputTokens: 0, cachedReadTokens: 0, @@ -424,250 +433,306 @@ export class ClaudeAcpAgent implements Agent { let lastAssistantTotalUsage: number | null = null; - const { query, input } = this.sessions[params.sessionId]; + const userMessage = promptToClaude(params); - input.push(promptToClaude(params)); - while (true) { - const { value: message, done } = await (query as AsyncGenerator).next(); - - if (done || !message) { - if (this.sessions[params.sessionId].cancelled) { - return { stopReason: "cancelled" }; - } - break; + if (session.promptRunning) { + const uuid = randomUUID(); + userMessage.uuid = uuid; + session.input.push(userMessage); + const order = session.nextPendingOrder++; + const cancelled = await new Promise((resolve) => { + session.pendingMessages.set(uuid, { resolve, order }); + }); + if (cancelled) { + return { stopReason: "cancelled" }; } + } else { + session.input.push(userMessage); + } - switch (message.type) { - case "system": - if (message.subtype === "compact_boundary") { - // We don't know the exact size, but since we compacted, - // we set it to zero. The client gets the exact size on the next message. - lastAssistantTotalUsage = 0; - } - switch (message.subtype) { - case "init": - break; - case "compact_boundary": - case "hook_started": - case "task_notification": - case "hook_progress": - case "hook_response": - case "status": - case "files_persisted": - case "task_started": - case "task_progress": - // Todo: process via status api: https://docs.claude.com/en/docs/claude-code/hooks#hook-output - break; - default: - unreachable(message, this.logger); - break; - } - break; - case "result": { - if (this.sessions[params.sessionId].cancelled) { + session.promptRunning = true; + let handedOff = false; + + try { + while (true) { + const { value: message, done } = await ( + session.query as AsyncGenerator + ).next(); + + if (done || !message) { + if (session.cancelled) { return { stopReason: "cancelled" }; } + break; + } - // Accumulate usage from this result - const session = this.sessions[params.sessionId]; - session.accumulatedUsage.inputTokens += message.usage.input_tokens; - session.accumulatedUsage.outputTokens += message.usage.output_tokens; - session.accumulatedUsage.cachedReadTokens += message.usage.cache_read_input_tokens; - session.accumulatedUsage.cachedWriteTokens += message.usage.cache_creation_input_tokens; - - // Calculate context window size from modelUsage (minimum across all models used) - const contextWindows = Object.values(message.modelUsage).map((m) => m.contextWindow); - const contextWindowSize = - contextWindows.length > 0 ? Math.min(...contextWindows) : 200000; - - // Send usage_update notification - if (lastAssistantTotalUsage !== null) { - await this.client.sessionUpdate({ - sessionId: params.sessionId, - update: { - sessionUpdate: "usage_update", - used: lastAssistantTotalUsage, - size: contextWindowSize, - cost: { - amount: message.total_cost_usd, - currency: "USD", + switch (message.type) { + case "system": + if (message.subtype === "compact_boundary") { + // We don't know the exact size, but since we compacted, + // we set it to zero. The client gets the exact size on the next message. + lastAssistantTotalUsage = 0; + } + switch (message.subtype) { + case "init": + break; + case "compact_boundary": + case "hook_started": + case "task_notification": + case "hook_progress": + case "hook_response": + case "status": + case "files_persisted": + case "task_started": + case "task_progress": + // Todo: process via status api: https://docs.claude.com/en/docs/claude-code/hooks#hook-output + break; + default: + unreachable(message, this.logger); + break; + } + break; + case "result": { + if (session.cancelled) { + return { stopReason: "cancelled" }; + } + + // Accumulate usage from this result + session.accumulatedUsage.inputTokens += message.usage.input_tokens; + session.accumulatedUsage.outputTokens += message.usage.output_tokens; + session.accumulatedUsage.cachedReadTokens += message.usage.cache_read_input_tokens; + session.accumulatedUsage.cachedWriteTokens += message.usage.cache_creation_input_tokens; + + // Calculate context window size from modelUsage (minimum across all models used) + const contextWindows = Object.values(message.modelUsage).map((m) => m.contextWindow); + const contextWindowSize = + contextWindows.length > 0 ? Math.min(...contextWindows) : 200000; + + // Send usage_update notification + if (lastAssistantTotalUsage !== null) { + await this.client.sessionUpdate({ + sessionId: params.sessionId, + update: { + sessionUpdate: "usage_update", + used: lastAssistantTotalUsage, + size: contextWindowSize, + cost: { + amount: message.total_cost_usd, + currency: "USD", + }, }, - }, - }); - } + }); + } - // Build the usage response - const usage: PromptResponse["usage"] = { - inputTokens: session.accumulatedUsage.inputTokens, - outputTokens: session.accumulatedUsage.outputTokens, - cachedReadTokens: session.accumulatedUsage.cachedReadTokens, - cachedWriteTokens: session.accumulatedUsage.cachedWriteTokens, - totalTokens: - session.accumulatedUsage.inputTokens + - session.accumulatedUsage.outputTokens + - session.accumulatedUsage.cachedReadTokens + - session.accumulatedUsage.cachedWriteTokens, - }; + // Build the usage response + const usage: PromptResponse["usage"] = { + inputTokens: session.accumulatedUsage.inputTokens, + outputTokens: session.accumulatedUsage.outputTokens, + cachedReadTokens: session.accumulatedUsage.cachedReadTokens, + cachedWriteTokens: session.accumulatedUsage.cachedWriteTokens, + totalTokens: + session.accumulatedUsage.inputTokens + + session.accumulatedUsage.outputTokens + + session.accumulatedUsage.cachedReadTokens + + session.accumulatedUsage.cachedWriteTokens, + }; - switch (message.subtype) { - case "success": { - if (message.result.includes("Please run /login")) { - throw RequestError.authRequired(); - } - if (message.is_error) { - throw RequestError.internalError(undefined, message.result); + switch (message.subtype) { + case "success": { + if (message.result.includes("Please run /login")) { + throw RequestError.authRequired(); + } + if (message.is_error) { + throw RequestError.internalError(undefined, message.result); + } + return { stopReason: "end_turn", usage }; } - return { stopReason: "end_turn", usage }; + case "error_during_execution": + if (message.is_error) { + throw RequestError.internalError( + undefined, + message.errors.join(", ") || message.subtype, + ); + } + return { stopReason: "end_turn", usage }; + case "error_max_budget_usd": + case "error_max_turns": + case "error_max_structured_output_retries": + if (message.is_error) { + throw RequestError.internalError( + undefined, + message.errors.join(", ") || message.subtype, + ); + } + return { stopReason: "max_turn_requests", usage }; + default: + unreachable(message, this.logger); + break; } - case "error_during_execution": - if (message.is_error) { - throw RequestError.internalError( - undefined, - message.errors.join(", ") || message.subtype, - ); - } - return { stopReason: "end_turn", usage }; - case "error_max_budget_usd": - case "error_max_turns": - case "error_max_structured_output_retries": - if (message.is_error) { - throw RequestError.internalError( - undefined, - message.errors.join(", ") || message.subtype, - ); - } - return { stopReason: "max_turn_requests", usage }; - default: - unreachable(message, this.logger); - break; - } - break; - } - case "stream_event": { - for (const notification of streamEventToAcpNotifications( - message, - params.sessionId, - this.toolUseCache, - this.client, - this.logger, - { clientCapabilities: this.clientCapabilities }, - )) { - await this.client.sessionUpdate(notification); + break; } - break; - } - case "user": - case "assistant": { - if (this.sessions[params.sessionId].cancelled) { + case "stream_event": { + for (const notification of streamEventToAcpNotifications( + message, + params.sessionId, + this.toolUseCache, + this.client, + this.logger, + { clientCapabilities: this.clientCapabilities }, + )) { + await this.client.sessionUpdate(notification); + } break; } + case "user": + case "assistant": { + if (session.cancelled) { + break; + } - // Store latest assistant usage (excluding subagents) - if ((message.message as any).usage && message.parent_tool_use_id === null) { - const messageWithUsage = message.message as unknown as SDKResultMessage; - lastAssistantTotalUsage = - messageWithUsage.usage.input_tokens + - messageWithUsage.usage.output_tokens + - messageWithUsage.usage.cache_read_input_tokens + - messageWithUsage.usage.cache_creation_input_tokens; - } + // Check for queued prompt replay + if (message.type === "user" && "uuid" in message && message.uuid) { + const pending = session.pendingMessages.get(message.uuid as string); + if (pending) { + pending.resolve(false); + session.pendingMessages.delete(message.uuid as string); + handedOff = true; + // the current loop stops with end_turn, + // the loop of the next prompt continues running + return { stopReason: "end_turn" }; + } + } - // Slash commands like /compact can generate invalid output... doesn't match - // their own docs: https://docs.anthropic.com/en/docs/claude-code/sdk/sdk-slash-commands#%2Fcompact-compact-conversation-history - if ( - typeof message.message.content === "string" && - message.message.content.includes("") - ) { - // Handle /context by sending its reply as regular agent message. - if (message.message.content.includes("Context Usage")) { - for (const notification of toAcpNotifications( - message.message.content - .replace("", "") - .replace("", ""), - "assistant", - params.sessionId, - this.toolUseCache, - this.client, - this.logger, - { clientCapabilities: this.clientCapabilities }, - )) { - await this.client.sessionUpdate(notification); + // Store latest assistant usage (excluding subagents) + if ((message.message as any).usage && message.parent_tool_use_id === null) { + const messageWithUsage = message.message as unknown as SDKResultMessage; + lastAssistantTotalUsage = + messageWithUsage.usage.input_tokens + + messageWithUsage.usage.output_tokens + + messageWithUsage.usage.cache_read_input_tokens + + messageWithUsage.usage.cache_creation_input_tokens; + } + + // Slash commands like /compact can generate invalid output... doesn't match + // their own docs: https://docs.anthropic.com/en/docs/claude-code/sdk/sdk-slash-commands#%2Fcompact-compact-conversation-history + if ( + typeof message.message.content === "string" && + message.message.content.includes("") + ) { + // Handle /context by sending its reply as regular agent message. + if (message.message.content.includes("Context Usage")) { + for (const notification of toAcpNotifications( + message.message.content + .replace("", "") + .replace("", ""), + "assistant", + params.sessionId, + this.toolUseCache, + this.client, + this.logger, + { clientCapabilities: this.clientCapabilities }, + )) { + await this.client.sessionUpdate(notification); + } } + this.logger.log(message.message.content); + break; + } + + if ( + typeof message.message.content === "string" && + message.message.content.includes("") + ) { + this.logger.error(message.message.content); + break; + } + // Skip these user messages for now, since they seem to just be messages we don't want in the feed + if ( + message.type === "user" && + (typeof message.message.content === "string" || + (Array.isArray(message.message.content) && + message.message.content.length === 1 && + message.message.content[0].type === "text")) + ) { + break; + } + + if ( + message.type === "assistant" && + message.message.model === "" && + Array.isArray(message.message.content) && + message.message.content.length === 1 && + message.message.content[0].type === "text" && + message.message.content[0].text.includes("Please run /login") + ) { + throw RequestError.authRequired(); } - this.logger.log(message.message.content); - break; - } - if ( - typeof message.message.content === "string" && - message.message.content.includes("") - ) { - this.logger.error(message.message.content); + const content = + message.type === "assistant" + ? // Handled by stream events above + message.message.content.filter( + (item) => !["text", "thinking"].includes(item.type), + ) + : message.message.content; + + for (const notification of toAcpNotifications( + content, + message.message.role, + params.sessionId, + this.toolUseCache, + this.client, + this.logger, + { + clientCapabilities: this.clientCapabilities, + parentToolUseId: message.parent_tool_use_id, + }, + )) { + await this.client.sessionUpdate(notification); + } break; } - // Skip these user messages for now, since they seem to just be messages we don't want in the feed - if ( - message.type === "user" && - (typeof message.message.content === "string" || - (Array.isArray(message.message.content) && - message.message.content.length === 1 && - message.message.content[0].type === "text")) - ) { + case "tool_progress": + case "tool_use_summary": break; + case "auth_status": + break; + default: + unreachable(message); + break; + } + } + throw new Error("Session did not end in result"); + } finally { + if (!handedOff) { + session.promptRunning = false; + // This usually should not happen, but in case the loop finishes + // without claude sending all message replays, we resolve the + // next pending prompt call to ensure no prompts get stuck. + if (session.pendingMessages.size > 0) { + const next = [...session.pendingMessages.entries()].sort( + (a, b) => a[1].order - b[1].order, + )[0]; + if (next) { + next[1].resolve(false); + session.pendingMessages.delete(next[0]); } - - if ( - message.type === "assistant" && - message.message.model === "" && - Array.isArray(message.message.content) && - message.message.content.length === 1 && - message.message.content[0].type === "text" && - message.message.content[0].text.includes("Please run /login") - ) { - throw RequestError.authRequired(); - } - - const content = - message.type === "assistant" - ? // Handled by stream events above - message.message.content.filter((item) => !["text", "thinking"].includes(item.type)) - : message.message.content; - - for (const notification of toAcpNotifications( - content, - message.message.role, - params.sessionId, - this.toolUseCache, - this.client, - this.logger, - { - clientCapabilities: this.clientCapabilities, - parentToolUseId: message.parent_tool_use_id, - }, - )) { - await this.client.sessionUpdate(notification); - } - break; } - case "tool_progress": - case "tool_use_summary": - break; - case "auth_status": - break; - default: - unreachable(message); - break; } } - throw new Error("Session did not end in result"); } async cancel(params: CancelNotification): Promise { - if (!this.sessions[params.sessionId]) { + const session = this.sessions[params.sessionId]; + if (!session) { throw new Error("Session not found"); } - this.sessions[params.sessionId].cancelled = true; - await this.sessions[params.sessionId].query.interrupt(); + session.cancelled = true; + for (const [, pending] of session.pendingMessages) { + pending.resolve(true); + } + session.pendingMessages.clear(); + await session.query.interrupt(); } async unstable_setSessionModel( @@ -1074,6 +1139,10 @@ export class ClaudeAcpAgent implements Agent { : isStaticBinary() ? { pathToClaudeCodeExecutable: process.execPath } : {}), + extraArgs: { + ...userProvidedOptions?.extraArgs, + "replay-user-messages": "", + }, disallowedTools: [...(userProvidedOptions?.disallowedTools || []), ...disallowedTools], tools: { type: "preset", preset: "claude_code" }, hooks: { @@ -1134,6 +1203,9 @@ export class ClaudeAcpAgent implements Agent { cachedWriteTokens: 0, }, configOptions: [], + promptRunning: false, + pendingMessages: new Map(), + nextPendingOrder: 0, }; const initializationResult = await q.initializationResult();