Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ await vm.dispose();
### Exported Types

**VM & Options**
- `AgentOsOptions` — VM creation options (commandDirs, loopbackExemptPorts, moduleAccessCwd, mounts, additionalInstructions)
- `CreateSessionOptions` — Session options (cwd, env, mcpServers, skipOsInstructions, additionalInstructions)
- `AgentOsOptions` — VM creation options (software, loopbackExemptPorts, moduleAccessCwd, rootFilesystem, mounts, additionalInstructions, scheduleDriver, toolKits, permissions, acpTimeoutMs)
- `CreateSessionOptions` — Session options (cwd, env, mcpServers, skipOsInstructions, additionalInstructions, acpTimeoutMs)

**Mount Configurations**
- `MountConfig` — Union of all mount types
Expand Down
63 changes: 46 additions & 17 deletions packages/core/src/acp-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import {
isRequest,
isResponse,
type JsonRpcError,
type JsonRpcRequest,
type JsonRpcNotification,
type JsonRpcRequest,
type JsonRpcResponse,
serializeMessage,
} from "./protocol.js";

const DEFAULT_TIMEOUT_MS = 120_000;
const DEFAULT_TIMEOUT_MS = 900_000; // 15 minutes
const EXIT_DRAIN_GRACE_MS = 50;
const LEGACY_PERMISSION_METHOD = "request/permission";
const ACP_PERMISSION_METHOD = "session/request_permission";
Expand All @@ -22,6 +22,7 @@ interface PendingRequest {
resolve: (response: JsonRpcResponse) => void;
reject: (error: Error) => void;
timer: ReturnType<typeof setTimeout>;
method: string;
}

interface PendingPermissionRequest {
Expand All @@ -31,9 +32,7 @@ interface PendingPermissionRequest {
}

export type NotificationHandler = (notification: JsonRpcNotification) => void;
export type InboundRequestHandler = (
request: JsonRpcRequest,
) =>
export type InboundRequestHandler = (request: JsonRpcRequest) =>
| Promise<
| {
result?: unknown;
Expand All @@ -52,14 +51,17 @@ export type InboundRequestHandler = (
export class AcpClient {
private _process: ManagedProcess;
private _nextId = 1;
private _pending = new Map<number | string | null, PendingRequest>();
private _pending = new Map<number, PendingRequest>();
private _seenInboundRequestIds = new Set<number | string | null>();
private _notificationHandlers: NotificationHandler[] = [];
private _closed = false;
private _timeoutMs: number;
private _stdoutIterator: AsyncIterator<string> | null = null;
private _readerClosed = false;
private _pendingPermissionRequests = new Map<string, PendingPermissionRequest>();
private _pendingPermissionRequests = new Map<
string,
PendingPermissionRequest
>();
private _requestHandler?: InboundRequestHandler;
private _recentActivity: string[] = [];

Expand All @@ -72,7 +74,13 @@ export class AcpClient {
},
) {
this._process = process;
this._timeoutMs = options?.timeoutMs ?? DEFAULT_TIMEOUT_MS;
const timeoutMs = options?.timeoutMs ?? DEFAULT_TIMEOUT_MS;
if (timeoutMs <= 0) {
throw new Error(
`ACP timeoutMs must be a positive number, got ${timeoutMs}`,
);
}
this._timeoutMs = timeoutMs;
this._requestHandler = options?.requestHandler;
this._startReading(stdoutLines);
this._watchExit();
Expand Down Expand Up @@ -102,7 +110,7 @@ export class AcpClient {
reject(this._createTimeoutError(method, id));
}, this._timeoutMs);

this._pending.set(id, { resolve, reject, timer });
this._pending.set(id, { resolve, reject, timer, method });
});

if (method !== ACP_CANCEL_METHOD) {
Expand Down Expand Up @@ -167,9 +175,11 @@ export class AcpClient {
this._recordActivity(summarizeInboundMessage(msg));

if (isResponse(msg)) {
const pending = this._pending.get(msg.id);
if (pending) {
this._pending.delete(msg.id);
const id = typeof msg.id === "number" ? msg.id : undefined;
const pending =
id !== undefined ? this._pending.get(id) : undefined;
if (id !== undefined && pending) {
this._pending.delete(id);
clearTimeout(pending.timer);
pending.resolve(msg);
}
Expand All @@ -180,6 +190,10 @@ export class AcpClient {
handler(msg);
}
}

// Reset inactivity timers for remaining pending requests
// whenever any valid JSON-RPC message arrives from the agent.
this._resetPendingTimers();
}
} catch {
// Stream ended or errored
Expand Down Expand Up @@ -244,11 +258,10 @@ export class AcpClient {
id: msg.id,
method: msg.method,
options: Array.isArray(requestParams.options)
? requestParams.options
.filter(
(option): option is Record<string, unknown> =>
option !== null && typeof option === "object",
)
? requestParams.options.filter(
(option): option is Record<string, unknown> =>
option !== null && typeof option === "object",
)
: undefined,
});
const params = {
Expand Down Expand Up @@ -354,6 +367,22 @@ export class AcpClient {
}
}

/**
* Reset all pending request inactivity timers. Called after any
* valid inbound JSON-RPC message to implement activity-aware timeout.
* Replaces each timer with a fresh one starting from now.
*/
private _resetPendingTimers(): void {
if (this._closed) return;
for (const [id, pending] of this._pending) {
clearTimeout(pending.timer);
pending.timer = setTimeout(() => {
this._pending.delete(id);
pending.reject(this._createTimeoutError(pending.method, id));
}, this._timeoutMs);
}
}

private _createTimeoutError(method: string, id: number | string): Error {
const processState = `process exitCode=${String(
this._getProcessExitCode(),
Expand Down
Loading
Loading