From 1dfa0e8034d18374f16f52387bd96c958e3c8cab Mon Sep 17 00:00:00 2001 From: brittianwarner Date: Sat, 4 Apr 2026 12:27:45 -0400 Subject: [PATCH] feat: add configurable activity-aware ACP request timeout --- packages/core/README.md | 4 +- packages/core/src/acp-client.ts | 63 ++-- packages/core/src/agent-os.ts | 368 +++++++++++------------ packages/core/tests/acp-protocol.test.ts | 180 +++++++++-- 4 files changed, 392 insertions(+), 223 deletions(-) diff --git a/packages/core/README.md b/packages/core/README.md index f08caa706..835881cdc 100644 --- a/packages/core/README.md +++ b/packages/core/README.md @@ -136,8 +136,8 @@ await vm.dispose(); ### Exported Types **VM & Options** -- `AgentOsOptions` — VM creation options (commandDirs, loopbackExemptPorts, moduleAccessCwd, mounts, additionalInstructions) -- `CreateSessionOptions` — Session options (cwd, env, mcpServers, skipOsInstructions, additionalInstructions) +- `AgentOsOptions` — VM creation options (software, loopbackExemptPorts, moduleAccessCwd, rootFilesystem, mounts, additionalInstructions, scheduleDriver, toolKits, permissions, acpTimeoutMs) +- `CreateSessionOptions` — Session options (cwd, env, mcpServers, skipOsInstructions, additionalInstructions, acpTimeoutMs) **Mount Configurations** - `MountConfig` — Union of all mount types diff --git a/packages/core/src/acp-client.ts b/packages/core/src/acp-client.ts index 4a52983a9..c8e7fc592 100644 --- a/packages/core/src/acp-client.ts +++ b/packages/core/src/acp-client.ts @@ -4,13 +4,13 @@ import { isRequest, isResponse, type JsonRpcError, - type JsonRpcRequest, type JsonRpcNotification, + type JsonRpcRequest, type JsonRpcResponse, serializeMessage, } from "./protocol.js"; -const DEFAULT_TIMEOUT_MS = 120_000; +const DEFAULT_TIMEOUT_MS = 900_000; // 15 minutes const EXIT_DRAIN_GRACE_MS = 50; const LEGACY_PERMISSION_METHOD = "request/permission"; const ACP_PERMISSION_METHOD = "session/request_permission"; @@ -22,6 +22,7 @@ interface PendingRequest { resolve: (response: JsonRpcResponse) => void; reject: (error: Error) => void; timer: ReturnType; + method: string; } interface PendingPermissionRequest { @@ -31,9 +32,7 @@ interface PendingPermissionRequest { } export type NotificationHandler = (notification: JsonRpcNotification) => void; -export type InboundRequestHandler = ( - request: JsonRpcRequest, -) => +export type InboundRequestHandler = (request: JsonRpcRequest) => | Promise< | { result?: unknown; @@ -52,14 +51,17 @@ export type InboundRequestHandler = ( export class AcpClient { private _process: ManagedProcess; private _nextId = 1; - private _pending = new Map(); + private _pending = new Map(); private _seenInboundRequestIds = new Set(); private _notificationHandlers: NotificationHandler[] = []; private _closed = false; private _timeoutMs: number; private _stdoutIterator: AsyncIterator | null = null; private _readerClosed = false; - private _pendingPermissionRequests = new Map(); + private _pendingPermissionRequests = new Map< + string, + PendingPermissionRequest + >(); private _requestHandler?: InboundRequestHandler; private _recentActivity: string[] = []; @@ -72,7 +74,13 @@ export class AcpClient { }, ) { this._process = process; - this._timeoutMs = options?.timeoutMs ?? DEFAULT_TIMEOUT_MS; + const timeoutMs = options?.timeoutMs ?? DEFAULT_TIMEOUT_MS; + if (timeoutMs <= 0) { + throw new Error( + `ACP timeoutMs must be a positive number, got ${timeoutMs}`, + ); + } + this._timeoutMs = timeoutMs; this._requestHandler = options?.requestHandler; this._startReading(stdoutLines); this._watchExit(); @@ -102,7 +110,7 @@ export class AcpClient { reject(this._createTimeoutError(method, id)); }, this._timeoutMs); - this._pending.set(id, { resolve, reject, timer }); + this._pending.set(id, { resolve, reject, timer, method }); }); if (method !== ACP_CANCEL_METHOD) { @@ -167,9 +175,11 @@ export class AcpClient { this._recordActivity(summarizeInboundMessage(msg)); if (isResponse(msg)) { - const pending = this._pending.get(msg.id); - if (pending) { - this._pending.delete(msg.id); + const id = typeof msg.id === "number" ? msg.id : undefined; + const pending = + id !== undefined ? this._pending.get(id) : undefined; + if (id !== undefined && pending) { + this._pending.delete(id); clearTimeout(pending.timer); pending.resolve(msg); } @@ -180,6 +190,10 @@ export class AcpClient { handler(msg); } } + + // Reset inactivity timers for remaining pending requests + // whenever any valid JSON-RPC message arrives from the agent. + this._resetPendingTimers(); } } catch { // Stream ended or errored @@ -244,11 +258,10 @@ export class AcpClient { id: msg.id, method: msg.method, options: Array.isArray(requestParams.options) - ? requestParams.options - .filter( - (option): option is Record => - option !== null && typeof option === "object", - ) + ? requestParams.options.filter( + (option): option is Record => + option !== null && typeof option === "object", + ) : undefined, }); const params = { @@ -354,6 +367,22 @@ export class AcpClient { } } + /** + * Reset all pending request inactivity timers. Called after any + * valid inbound JSON-RPC message to implement activity-aware timeout. + * Replaces each timer with a fresh one starting from now. + */ + private _resetPendingTimers(): void { + if (this._closed) return; + for (const [id, pending] of this._pending) { + clearTimeout(pending.timer); + pending.timer = setTimeout(() => { + this._pending.delete(id); + pending.reject(this._createTimeoutError(pending.method, id)); + }, this._timeoutMs); + } + } + private _createTimeoutError(method: string, id: number | string): Error { const processState = `process exitCode=${String( this._getProcessExitCode(), diff --git a/packages/core/src/agent-os.ts b/packages/core/src/agent-os.ts index 5b3f546c7..f30c7ff20 100644 --- a/packages/core/src/agent-os.ts +++ b/packages/core/src/agent-os.ts @@ -9,11 +9,11 @@ import { } from "node:fs"; import { tmpdir } from "node:os"; import { + sep as hostPathSeparator, join, posix as posixPath, relative as relativeHostPath, resolve as resolveHostPath, - sep as hostPathSeparator, } from "node:path"; import { allowAll, @@ -34,8 +34,8 @@ import { import { type ToolKit, validateToolkits } from "./host-tools.js"; import { generateToolReference } from "./host-tools-prompt.js"; import { - startHostToolsServer, type HostToolsServer, + startHostToolsServer, } from "./host-tools-server.js"; import { createShimFilesystem, @@ -92,21 +92,33 @@ export interface AgentRegistryEntry { installed: boolean; } +import { createWasmVmRuntime } from "@rivet-dev/agent-os-posix"; +import { createPythonRuntime } from "@rivet-dev/agent-os-python"; import { createNodeHostNetworkAdapter, createNodeRuntime, } from "@secure-exec/nodejs"; -import { createPythonRuntime } from "@rivet-dev/agent-os-python"; -import { createWasmVmRuntime } from "@rivet-dev/agent-os-posix"; import { AcpClient } from "./acp-client.js"; +import { AGENT_CONFIGS, type AgentConfig, type AgentType } from "./agents.js"; +import { getHostDirBackendMeta } from "./backends/host-dir-backend.js"; import { createBootstrapAwareFilesystem, getBaseEnvironment, getBaseFilesystemEntries, } from "./base-filesystem.js"; +import { CronManager } from "./cron/cron-manager.js"; +import type { ScheduleDriver } from "./cron/schedule-driver.js"; +import { TimerScheduleDriver } from "./cron/timer-driver.js"; +import type { + CronEvent, + CronEventHandler, + CronJob, + CronJobInfo, + CronJobOptions, +} from "./cron/types.js"; import { - snapshotVirtualFilesystem, type FilesystemEntry, + snapshotVirtualFilesystem, } from "./filesystem-snapshot.js"; import { createDefaultRootLowerInput, @@ -117,40 +129,28 @@ import { type RootSnapshotExport, type SnapshotLayerHandle, } from "./layers.js"; -import { AGENT_CONFIGS, type AgentConfig, type AgentType } from "./agents.js"; -import { getHostDirBackendMeta } from "./backends/host-dir-backend.js"; +import { getOsInstructions } from "./os-instructions.js"; import { + processSoftware, type SoftwareInput, type SoftwareRoot, - processSoftware, } from "./packages.js"; -import { CronManager } from "./cron/cron-manager.js"; -import type { ScheduleDriver } from "./cron/schedule-driver.js"; -import { TimerScheduleDriver } from "./cron/timer-driver.js"; -import type { - CronEvent, - CronEventHandler, - CronJob, - CronJobInfo, - CronJobOptions, -} from "./cron/types.js"; -import { getOsInstructions } from "./os-instructions.js"; +import type { JsonRpcRequest, JsonRpcResponse } from "./protocol.js"; import { - Session, - type SessionInitData, type AgentCapabilities, type AgentInfo, type GetEventsOptions, type PermissionReply, + type PermissionRequestHandler, type SequencedEvent, + Session, type SessionConfigOption, type SessionEventHandler, + type SessionInitData, type SessionModeState, - type PermissionRequestHandler, } from "./session.js"; -import type { JsonRpcRequest, JsonRpcResponse } from "./protocol.js"; -import { createStdoutLineIterable } from "./stdout-lines.js"; import { createSqliteBindings } from "./sqlite-bindings.js"; +import { createStdoutLineIterable } from "./stdout-lines.js"; interface HostMountInfo { vmPath: string; @@ -231,6 +231,13 @@ export interface AgentOsOptions { * network, child process, and environment operations. Defaults to allowAll. */ permissions?: Permissions; + /** + * Default ACP request inactivity timeout in milliseconds for all + * sessions created by this VM. The timer resets whenever the agent + * sends any JSON-RPC message. Per-session override available via + * CreateSessionOptions.acpTimeoutMs. Defaults to 900 000 (15 min). + */ + acpTimeoutMs?: number; } /** Configuration for a local MCP server (spawned as a child process). */ @@ -266,6 +273,13 @@ export interface CreateSessionOptions { skipOsInstructions?: boolean; /** Additional instructions appended to the base OS instructions. */ additionalInstructions?: string; + /** + * ACP request inactivity timeout in milliseconds for this session. + * The timer resets whenever the agent sends any JSON-RPC message + * (responses, notifications, inbound requests). Overrides the + * VM-level AgentOsOptions.acpTimeoutMs. Defaults to 900 000 (15 min). + */ + acpTimeoutMs?: number; } export interface SessionInfo { @@ -290,7 +304,9 @@ export interface SpawnedProcessInfo { exitCode: number | null; } -function isOverlayMountConfig(config: MountConfig): config is OverlayMountConfig { +function isOverlayMountConfig( + config: MountConfig, +): config is OverlayMountConfig { return "filesystem" in config; } @@ -343,11 +359,11 @@ function isWasmBinaryFile(path: string): boolean { try { const header = readFileSync(path); return ( - header.length >= 4 - && header[0] === 0x00 - && header[1] === 0x61 - && header[2] === 0x73 - && header[3] === 0x6d + header.length >= 4 && + header[0] === 0x00 && + header[1] === 0x61 && + header[2] === 0x73 && + header[3] === 0x6d ); } catch { return false; @@ -392,7 +408,9 @@ function collectBootstrapWasmCommands(commandDirs: string[]): string[] { return commands; } -function collectConfiguredLowerPaths(config?: RootFilesystemConfig): Set { +function collectConfiguredLowerPaths( + config?: RootFilesystemConfig, +): Set { const paths = new Set(); for (const lower of config?.lowers ?? []) { @@ -453,7 +471,9 @@ function createKernelBootstrapLower( }); } - const uniqueCommands = [...new Set(commandNames)].sort((a, b) => a.localeCompare(b)); + const uniqueCommands = [...new Set(commandNames)].sort((a, b) => + a.localeCompare(b), + ); for (const command of uniqueCommands) { const stubPath = `/bin/${command}`; if (existingPaths.has(stubPath)) { @@ -496,22 +516,25 @@ async function createRootFilesystem( } const lowers = await Promise.all( - lowerInputs.map((lower) => rootStore.importSnapshot( - lower.kind === "bundled-base-filesystem" - ? createDefaultRootLowerInput() - : lower, - )), + lowerInputs.map((lower) => + rootStore.importSnapshot( + lower.kind === "bundled-base-filesystem" + ? createDefaultRootLowerInput() + : lower, + ), + ), ); - const rootView = normalizedConfig.mode === "read-only" - ? rootStore.createOverlayFilesystem({ - mode: "read-only", - lowers, - }) - : rootStore.createOverlayFilesystem({ - upper: await rootStore.createWritableLayer(), - lowers, - }); + const rootView = + normalizedConfig.mode === "read-only" + ? rootStore.createOverlayFilesystem({ + mode: "read-only", + lowers, + }) + : rootStore.createOverlayFilesystem({ + upper: await rootStore.createWritableLayer(), + lowers, + }); if (normalizedConfig.mode === "read-only") { return { @@ -540,32 +563,35 @@ async function resolveMounts( return []; } - return Promise.all(mounts.map(async (mount) => { - if (!isOverlayMountConfig(mount)) { + return Promise.all( + mounts.map(async (mount) => { + if (!isOverlayMountConfig(mount)) { + return { + path: mount.path, + fs: mount.driver, + readOnly: mount.readOnly, + }; + } + + const mode = mount.filesystem.mode ?? "ephemeral"; + const fs = + mode === "read-only" + ? mount.filesystem.store.createOverlayFilesystem({ + mode: "read-only", + lowers: mount.filesystem.lowers, + }) + : mount.filesystem.store.createOverlayFilesystem({ + upper: await mount.filesystem.store.createWritableLayer(), + lowers: mount.filesystem.lowers, + }); + return { path: mount.path, - fs: mount.driver, - readOnly: mount.readOnly, + fs, + readOnly: mode === "read-only", }; - } - - const mode = mount.filesystem.mode ?? "ephemeral"; - const fs = mode === "read-only" - ? mount.filesystem.store.createOverlayFilesystem({ - mode: "read-only", - lowers: mount.filesystem.lowers, - }) - : mount.filesystem.store.createOverlayFilesystem({ - upper: await mount.filesystem.store.createWritableLayer(), - lowers: mount.filesystem.lowers, - }); - - return { - path: mount.path, - fs, - readOnly: mode === "read-only", - }; - })); + }), + ); } export class AgentOs { @@ -600,6 +626,7 @@ export class AgentOs { private _hostMounts: HostMountInfo[]; private _acpTerminals = new Map(); private _acpTerminalCounter = 0; + private _acpTimeoutMs: number | undefined; private _env: Record; private _rootFilesystem: VirtualFileSystem; @@ -625,19 +652,13 @@ export class AgentOs { // Process software descriptors first so the root lower can include the // exact command stubs Secure Exec will register during boot. const processed = processSoftware(options?.software ?? []); - const bootstrapLower = createKernelBootstrapLower( - options?.rootFilesystem, - [ - ...collectBootstrapWasmCommands(processed.commandDirs), - ...NODE_RUNTIME_BOOTSTRAP_COMMANDS, - ...PYTHON_RUNTIME_BOOTSTRAP_COMMANDS, - ], - ); - const { - filesystem, - finishKernelBootstrap, - rootView, - } = await createRootFilesystem(options?.rootFilesystem, bootstrapLower); + const bootstrapLower = createKernelBootstrapLower(options?.rootFilesystem, [ + ...collectBootstrapWasmCommands(processed.commandDirs), + ...NODE_RUNTIME_BOOTSTRAP_COMMANDS, + ...PYTHON_RUNTIME_BOOTSTRAP_COMMANDS, + ]); + const { filesystem, finishKernelBootstrap, rootView } = + await createRootFilesystem(options?.rootFilesystem, bootstrapLower); const hostNetworkAdapter = createNodeHostNetworkAdapter(); const moduleAccessCwd = options?.moduleAccessCwd ?? process.cwd(); @@ -708,9 +729,9 @@ export class AgentOs { createWasmVmRuntime( processed.commandDirs.length > 0 ? { - commandDirs: processed.commandDirs, - permissions: processed.commandPermissions, - } + commandDirs: processed.commandDirs, + permissions: processed.commandPermissions, + } : undefined, ), ); @@ -719,9 +740,10 @@ export class AgentOs { bindings: createSqliteBindings(kernel), loopbackExemptPorts, moduleAccessCwd, - packageRoots: processed.softwareRoots.length > 0 - ? processed.softwareRoots - : undefined, + packageRoots: + processed.softwareRoots.length > 0 + ? processed.softwareRoots + : undefined, }), ); await kernel.mount(createPythonRuntime()); @@ -739,6 +761,7 @@ export class AgentOs { vm._toolsServer = toolsServer; vm._toolKits = toolKits ?? []; vm._shimFs = shimFs; + vm._acpTimeoutMs = options?.acpTimeoutMs; vm._cronManager = new CronManager( vm, options?.scheduleDriver ?? new TimerScheduleDriver(), @@ -853,10 +876,7 @@ export class AgentOs { } /** Subscribe to process exit. Returns an unsubscribe function. */ - onProcessExit( - pid: number, - handler: (exitCode: number) => void, - ): () => void { + onProcessExit(pid: number, handler: (exitCode: number) => void): () => void { const entry = this._processes.get(pid); if (!entry) throw new Error(`Process not found: ${pid}`); // If already exited, call immediately. @@ -935,10 +955,7 @@ export class AgentOs { try { this._assertSafeAbsolutePath(entry.path); // Create parent directories as needed - const parentDir = entry.path.substring( - 0, - entry.path.lastIndexOf("/"), - ); + const parentDir = entry.path.substring(0, entry.path.lastIndexOf("/")); if (parentDir) { await this._mkdirp(parentDir); } @@ -986,10 +1003,7 @@ export class AgentOs { } } - async mkdir( - path: string, - options?: { recursive?: boolean }, - ): Promise { + async mkdir(path: string, options?: { recursive?: boolean }): Promise { if (options?.recursive) { return this._mkdirp(path); } @@ -1024,8 +1038,7 @@ export class AgentOs { if (name === "." || name === "..") continue; if (exclude?.has(name)) continue; - const fullPath = - dirPath === "/" ? `/${name}` : `${dirPath}/${name}`; + const fullPath = dirPath === "/" ? `/${name}` : `${dirPath}/${name}`; const s = await this.kernel.stat(fullPath); if (s.isSymbolicLink) { @@ -1072,7 +1085,11 @@ export class AgentOs { ); } - mountFs(path: string, driver: VirtualFileSystem, options?: { readOnly?: boolean }): void { + mountFs( + path: string, + driver: VirtualFileSystem, + options?: { readOnly?: boolean }, + ): void { this._assertSafeAbsolutePath(path); this.kernel.mountFs(path, driver, { readOnly: options?.readOnly }); } @@ -1093,10 +1110,7 @@ export class AgentOs { await this.delete(from, { recursive: true }); } - async delete( - path: string, - options?: { recursive?: boolean }, - ): Promise { + async delete(path: string, options?: { recursive?: boolean }): Promise { this._assertSafeAbsolutePath(path); const s = await this.kernel.stat(path); if (s.isDirectory) { @@ -1188,10 +1202,7 @@ export class AgentOs { if (!relativePath) { return mount.hostPath; } - return join( - mount.hostPath, - ...relativePath.split("/").filter(Boolean), - ); + return join(mount.hostPath, ...relativePath.split("/").filter(Boolean)); } } return null; @@ -1202,9 +1213,7 @@ export class AgentOs { for (const mount of this._hostMounts) { if ( normalizedHostPath === mount.hostPath || - normalizedHostPath.startsWith( - `${mount.hostPath}${hostPathSeparator}`, - ) + normalizedHostPath.startsWith(`${mount.hostPath}${hostPathSeparator}`) ) { const relativePath = relativeHostPath( mount.hostPath, @@ -1243,7 +1252,9 @@ export class AgentOs { return; } - while (Buffer.byteLength(terminal.output, "utf8") > terminal.outputByteLimit) { + while ( + Buffer.byteLength(terminal.output, "utf8") > terminal.outputByteLimit + ) { terminal.output = terminal.output.slice(1); terminal.truncated = true; } @@ -1252,11 +1263,10 @@ export class AgentOs { private async _handleInboundAcpRequest( request: JsonRpcRequest, ): Promise<{ result?: unknown } | null> { - const params = ( + const params = request.params && typeof request.params === "object" ? (request.params as Record) - : {} - ); + : {}; switch (request.method) { case "fs/read_text_file": { @@ -1315,10 +1325,8 @@ export class AgentOs { (entry as { value: string }).value, ]; }) - .filter( - ( - entry, - ): entry is [string, string] => Array.isArray(entry), + .filter((entry): entry is [string, string] => + Array.isArray(entry), ), ) : undefined; @@ -1522,41 +1530,43 @@ export class AgentOs { ...Object.keys(AGENT_CONFIGS), ]); - return [...allIds].map((id) => { - const config = this._resolveAgentConfig(id); - if (!config) return null; - - let installed = false; - try { - // Check package roots first, then CWD-based node_modules. - const vmPrefix = `/root/node_modules/${config.acpAdapter}`; - let hostPkgJsonPath: string | null = null; - for (const root of this._softwareRoots) { - if (root.vmPath === vmPrefix) { - hostPkgJsonPath = join(root.hostPath, "package.json"); - break; + return [...allIds] + .map((id) => { + const config = this._resolveAgentConfig(id); + if (!config) return null; + + let installed = false; + try { + // Check package roots first, then CWD-based node_modules. + const vmPrefix = `/root/node_modules/${config.acpAdapter}`; + let hostPkgJsonPath: string | null = null; + for (const root of this._softwareRoots) { + if (root.vmPath === vmPrefix) { + hostPkgJsonPath = join(root.hostPath, "package.json"); + break; + } } + if (!hostPkgJsonPath) { + hostPkgJsonPath = join( + this._moduleAccessCwd, + "node_modules", + config.acpAdapter, + "package.json", + ); + } + readFileSync(hostPkgJsonPath); + installed = true; + } catch { + // Package not installed } - if (!hostPkgJsonPath) { - hostPkgJsonPath = join( - this._moduleAccessCwd, - "node_modules", - config.acpAdapter, - "package.json", - ); - } - readFileSync(hostPkgJsonPath); - installed = true; - } catch { - // Package not installed - } - return { - id: id as AgentType, - acpAdapter: config.acpAdapter, - agentPackage: config.agentPackage, - installed, - }; - }).filter((entry): entry is AgentRegistryEntry => entry !== null); + return { + id: id as AgentType, + acpAdapter: config.acpAdapter, + agentPackage: config.agentPackage, + installed, + }; + }) + .filter((entry): entry is AgentRegistryEntry => entry !== null); } private _deriveSessionConfigOptions( @@ -1664,8 +1674,8 @@ export class AgentOs { // Create stdout line iterable wired via onStdout callback const { iterable, onStdout } = createStdoutLineIterable(); const launchArgs = [...(config.launchArgs ?? []), ...extraArgs]; - let launchEnv = { ...config.defaultEnv, ...extraEnv, ...options?.env }; - let sessionCwd = options?.cwd ?? "/home/user"; + const launchEnv = { ...config.defaultEnv, ...extraEnv, ...options?.env }; + const sessionCwd = options?.cwd ?? "/home/user"; const binPath = this._resolveAdapterBin(config.acpAdapter); const pid = this.spawn("node", [binPath, ...launchArgs], { streamStdin: true, @@ -1675,7 +1685,9 @@ export class AgentOs { }).pid; const proc = this._processes.get(pid)!.proc; + const acpTimeout = options?.acpTimeoutMs ?? this._acpTimeoutMs; const client = new AcpClient(proc, iterable, { + ...(acpTimeout !== undefined && { timeoutMs: acpTimeout }), requestHandler: (request) => this._handleInboundAcpRequest(request), }); @@ -1759,24 +1771,18 @@ export class AgentOs { ]; } - const session = new Session( - client, - sessionId, - agentType, - initData, - () => { - for (const [terminalId, terminal] of this._acpTerminals) { - if (terminal.sessionId !== sessionId) { - continue; - } - if (this.getProcess(terminal.pid).exitCode === null) { - this.killProcess(terminal.pid); - } - this._acpTerminals.delete(terminalId); + const session = new Session(client, sessionId, agentType, initData, () => { + for (const [terminalId, terminal] of this._acpTerminals) { + if (terminal.sessionId !== sessionId) { + continue; } - this._sessions.delete(sessionId); - }, - ); + if (this.getProcess(terminal.pid).exitCode === null) { + this.killProcess(terminal.pid); + } + this._acpTerminals.delete(terminalId); + } + this._sessions.delete(sessionId); + }); this._sessions.set(sessionId, session); return { sessionId }; @@ -1817,9 +1823,7 @@ export class AgentOs { } if (!binEntry) { - throw new Error( - `No bin entry found in ${adapterPackage}/package.json`, - ); + throw new Error(`No bin entry found in ${adapterPackage}/package.json`); } return `${vmPrefix}/${binEntry}`; @@ -1870,10 +1874,7 @@ export class AgentOs { /** Send a prompt to the agent and wait for the final response. * Returns the raw JSON-RPC response and the accumulated agent text. */ - async prompt( - sessionId: string, - text: string, - ): Promise { + async prompt(sessionId: string, text: string): Promise { const session = this._requireSession(sessionId); // Collect streamed text while the prompt is running @@ -1981,10 +1982,7 @@ export class AgentOs { } /** Subscribe to session/update notifications for a session. Returns an unsubscribe function. */ - onSessionEvent( - sessionId: string, - handler: SessionEventHandler, - ): () => void { + onSessionEvent(sessionId: string, handler: SessionEventHandler): () => void { const session = this._requireSession(sessionId); session.onSessionEvent(handler); return () => { diff --git a/packages/core/tests/acp-protocol.test.ts b/packages/core/tests/acp-protocol.test.ts index aba11c25c..d048134e5 100644 --- a/packages/core/tests/acp-protocol.test.ts +++ b/packages/core/tests/acp-protocol.test.ts @@ -832,9 +832,7 @@ describe("ACP protocol comprehensive tests", () => { // VM stdout can deliver lines twice (known duplication); check >=2 and verify content expect(notifications.length).toBeGreaterThanOrEqual(2); - const updates = notifications.filter( - (n) => n.method === "session/update", - ); + const updates = notifications.filter((n) => n.method === "session/update"); expect(updates.length).toBeGreaterThanOrEqual(2); const types = updates.map((n) => (n.params as { type: string }).type); expect(types).toContain("status"); @@ -890,7 +888,11 @@ describe("ACP protocol comprehensive tests", () => { expect(response.error).toBeUndefined(); expect( - response.result as { cancelled: boolean; requested: boolean; via: string }, + response.result as { + cancelled: boolean; + requested: boolean; + via: string; + }, ).toMatchObject({ cancelled: false, requested: true, @@ -899,7 +901,9 @@ describe("ACP protocol comprehensive tests", () => { const countResponse = await client.request("custom/cancel-count"); expect(countResponse.error).toBeUndefined(); - expect((countResponse.result as { cancelCount: number }).cancelCount).toBe(1); + expect((countResponse.result as { cancelCount: number }).cancelCount).toBe( + 1, + ); client.close(); }, 30_000); @@ -1051,7 +1055,8 @@ describe("ACP protocol comprehensive tests", () => { expect(permResponse.error).toBeUndefined(); expect( - (permResponse.result as { outcome: { optionId: string } }).outcome.optionId, + (permResponse.result as { outcome: { optionId: string } }).outcome + .optionId, ).toBe("allow_always"); const promptResponse = await promptPromise; @@ -1068,7 +1073,10 @@ describe("ACP protocol comprehensive tests", () => { }, 30_000); test("duplicate session/request_permission requests are deduped by request ID", async () => { - const { client } = await spawnAdapter(vm, DUPLICATE_MODERN_PERMISSION_MOCK_ADAPTER); + const { client } = await spawnAdapter( + vm, + DUPLICATE_MODERN_PERMISSION_MOCK_ADAPTER, + ); await client.request("initialize", { protocolVersion: 1, @@ -1111,7 +1119,8 @@ describe("ACP protocol comprehensive tests", () => { expect(permResponse.error).toBeUndefined(); expect( - (permResponse.result as { outcome: { optionId: string } }).outcome.optionId, + (permResponse.result as { outcome: { optionId: string } }).outcome + .optionId, ).toBe("allow_once"); const promptResponse = await promptPromise; @@ -1167,7 +1176,8 @@ describe("ACP protocol comprehensive tests", () => { expect(permResponse.error).toBeUndefined(); expect( - (permResponse.result as { outcome: { optionId: string } }).outcome.optionId, + (permResponse.result as { outcome: { optionId: string } }).outcome + .optionId, ).toBe("once"); const promptResponse = await promptPromise; @@ -1341,6 +1351,145 @@ process.stdin.on('data', () => { client.close(); }, 30_000); + test("activity-aware timeout resets on inbound notifications", async () => { + // Mock adapter that sends periodic notifications for ~1200ms total, + // then responds. With a 2000ms inactivity timeout, this should NOT + // time out because each 300ms gap is well within the timeout window. + // Without activity-aware reset, the total 1200ms would exceed a + // hypothetical 500ms fixed timeout — but we use 2000ms for CI margin. + const periodicNotificationScript = ` +let buffer = ''; +process.stdin.resume(); +process.stdin.on('data', (chunk) => { + const str = chunk instanceof Uint8Array ? new TextDecoder().decode(chunk) : String(chunk); + buffer += str; + while (true) { + const idx = buffer.indexOf('\\n'); + if (idx === -1) break; + const line = buffer.substring(0, idx); + buffer = buffer.substring(idx + 1); + if (!line.trim()) continue; + try { + const msg = JSON.parse(line); + if (msg.method === 'initialize') { + process.stdout.write(JSON.stringify({ + jsonrpc: '2.0', id: msg.id, + result: { protocolVersion: 1, agentInfo: { name: 'periodic-agent' } }, + }) + '\\n'); + } else if (msg.method === 'session/prompt') { + // Send 4 notifications at 300ms intervals (1200ms total), + // then respond. Total > 2000ms is not the point — the point is + // each gap (300ms) is within the timeout window (2000ms). + let count = 0; + const iv = setInterval(() => { + count++; + process.stdout.write(JSON.stringify({ + jsonrpc: '2.0', + method: 'session/update', + params: { update: { sessionUpdate: 'agent_message_chunk', content: { text: 'chunk' + count } } }, + }) + '\\n'); + if (count >= 4) { + clearInterval(iv); + process.stdout.write(JSON.stringify({ + jsonrpc: '2.0', id: msg.id, + result: { text: 'done' }, + }) + '\\n'); + } + }, 300); + } + } catch (e) {} + } +}); +`; + + const { client } = await spawnAdapterWithTimeout( + vm, + periodicNotificationScript, + 2000, + "/tmp/periodic-adapter.mjs", + ); + + await client.request("initialize", { protocolVersion: 1 }); + + // This takes ~1200ms total but each gap is only ~300ms. + // With a 2000ms activity-aware timeout, it should succeed. + const response = await client.request("session/prompt", { + sessionId: "test", + prompt: [{ type: "text", text: "go" }], + }); + + expect(response.error).toBeUndefined(); + expect((response.result as { text: string }).text).toBe("done"); + + client.close(); + }, 30_000); + + test("activity-aware timeout still fires after activity then silence", async () => { + // Mock adapter that sends a notification after 400ms, then goes silent. + // With a 500ms inactivity timeout and the notification at T+400ms: + // - Without reset: timeout fires at T+500ms (~500ms total) + // - With reset: timeout fires at T+400ms+500ms (~900ms total) + // We assert elapsed >= 750ms to prove the timer was actually reset. + const activityThenSilenceScript = ` +let buffer = ''; +process.stdin.resume(); +process.stdin.on('data', (chunk) => { + const str = chunk instanceof Uint8Array ? new TextDecoder().decode(chunk) : String(chunk); + buffer += str; + while (true) { + const idx = buffer.indexOf('\\n'); + if (idx === -1) break; + const line = buffer.substring(0, idx); + buffer = buffer.substring(idx + 1); + if (!line.trim()) continue; + try { + const msg = JSON.parse(line); + if (msg.method === 'initialize') { + process.stdout.write(JSON.stringify({ + jsonrpc: '2.0', id: msg.id, + result: { protocolVersion: 1, agentInfo: { name: 'silence-agent' } }, + }) + '\\n'); + } else if (msg.method === 'session/prompt') { + // Send one notification after 400ms, then go silent — never respond. + // The 400ms delay ensures the timer reset is observable via wall-clock. + setTimeout(() => { + process.stdout.write(JSON.stringify({ + jsonrpc: '2.0', + method: 'session/update', + params: { update: { sessionUpdate: 'agent_message_chunk', content: { text: 'thinking...' } } }, + }) + '\\n'); + }, 400); + } + } catch (e) {} + } +}); +`; + + const { client } = await spawnAdapterWithTimeout( + vm, + activityThenSilenceScript, + 500, + "/tmp/silence-adapter.mjs", + ); + + await client.request("initialize", { protocolVersion: 1 }); + + const start = Date.now(); + await expect( + client.request("session/prompt", { + sessionId: "test", + prompt: [{ type: "text", text: "go" }], + }), + ).rejects.toThrow(/timed out after 500ms/); + const elapsed = Date.now() - start; + + // Without reset: timeout at ~500ms. With reset: timeout at ~900ms. + // Assert >= 750ms to prove the timer was actually reset by the notification. + expect(elapsed).toBeGreaterThan(750); + + client.close(); + }, 30_000); + test("concurrent requests are correlated correctly by id", async () => { const { client } = await spawnAdapter(vm, FULL_MOCK_ACP_ADAPTER); @@ -1369,17 +1518,13 @@ process.stdin.on('data', () => { // Each response should have the correct result for its method expect(cancelRes.error).toBeUndefined(); - expect((cancelRes.result as { cancelled: boolean }).cancelled).toBe( - true, - ); + expect((cancelRes.result as { cancelled: boolean }).cancelled).toBe(true); expect(modeRes.error).toBeUndefined(); expect((modeRes.result as { modeId: string }).modeId).toBe("plan"); expect(configRes.error).toBeUndefined(); - expect((configRes.result as { configId: string }).configId).toBe( - "model", - ); + expect((configRes.result as { configId: string }).configId).toBe("model"); expect((configRes.result as { value: string }).value).toBe("opus"); client.close(); @@ -1442,10 +1587,7 @@ process.stdin.on('data', () => { for (const n of notifications) { expect(n.method).toBe("session/update"); const seq = (n.params as { seq: number }).seq; - if ( - seenSeqs.length === 0 || - seenSeqs[seenSeqs.length - 1] !== seq - ) { + if (seenSeqs.length === 0 || seenSeqs[seenSeqs.length - 1] !== seq) { seenSeqs.push(seq); } }