diff --git a/src/browser.ts b/src/browser.ts index 7e46a18f..ededf4e2 100644 --- a/src/browser.ts +++ b/src/browser.ts @@ -28,6 +28,22 @@ import { ENCRYPTION_KEY_ENV, } from './state-utils.js'; +/** + * Returns the default Playwright timeout for standard operations. + * Can be overridden via the AGENT_BROWSER_DEFAULT_TIMEOUT environment variable. + * CDP and recording contexts use a shorter fixed timeout (10s) and are not affected. + */ +function getDefaultTimeout(): number { + const envValue = process.env.AGENT_BROWSER_DEFAULT_TIMEOUT; + if (envValue) { + const parsed = parseInt(envValue, 10); + if (!isNaN(parsed) && parsed > 0) { + return parsed; + } + } + return 60000; +} + // Screencast frame data from CDP export interface ScreencastFrame { data: string; // base64 encoded image @@ -264,7 +280,7 @@ export class BrowserManager { context = await this.browser.newContext({ ...(this.colorScheme && { colorScheme: this.colorScheme }), }); - context.setDefaultTimeout(60000); + context.setDefaultTimeout(getDefaultTimeout()); this.contexts.push(context); this.setupContextTracking(context); } else { @@ -1018,7 +1034,7 @@ export class BrowserManager { this.kernelSessionId = session.session_id; this.kernelApiKey = kernelApiKey; this.browser = browser; - context.setDefaultTimeout(60000); + context.setDefaultTimeout(getDefaultTimeout()); this.contexts.push(context); this.pages.push(page); this.activePageIndex = 0; @@ -1091,7 +1107,7 @@ export class BrowserManager { this.browserUseSessionId = session.id; this.browserUseApiKey = browserUseApiKey; this.browser = browser; - context.setDefaultTimeout(60000); + context.setDefaultTimeout(getDefaultTimeout()); this.contexts.push(context); this.pages.push(page); this.activePageIndex = 0; @@ -1346,7 +1362,7 @@ export class BrowserManager { }); } - context.setDefaultTimeout(60000); + context.setDefaultTimeout(getDefaultTimeout()); this.contexts.push(context); this.setupContextTracking(context); @@ -1666,7 +1682,7 @@ export class BrowserManager { viewport: viewport === undefined ? { width: 1280, height: 720 } : viewport, ...(this.colorScheme && { colorScheme: this.colorScheme }), }); - context.setDefaultTimeout(60000); + context.setDefaultTimeout(getDefaultTimeout()); this.contexts.push(context); this.setupContextTracking(context); diff --git a/src/daemon.ts b/src/daemon.ts index 8fe416ec..bfd19f4d 100644 --- a/src/daemon.ts +++ b/src/daemon.ts @@ -21,6 +21,47 @@ import { // Manager type - either desktop browser or iOS type Manager = BrowserManager | IOSManager; +/** + * Backpressure-aware socket write. + * If the kernel buffer is full (socket.write returns false), + * waits for the 'drain' event before resolving. + */ +function safeWrite(socket: net.Socket, payload: string): Promise { + return new Promise((resolve, reject) => { + if (socket.destroyed) { + resolve(); + return; + } + const canContinue = socket.write(payload); + if (canContinue) { + resolve(); + } else if (socket.destroyed) { + resolve(); + } else { + const cleanup = () => { + socket.removeListener('drain', onDrain); + socket.removeListener('error', onError); + socket.removeListener('close', onClose); + }; + const onDrain = () => { + cleanup(); + resolve(); + }; + const onError = (err: Error) => { + cleanup(); + reject(err); + }; + const onClose = () => { + cleanup(); + resolve(); + }; + socket.once('drain', onDrain); + socket.once('error', onError); + socket.once('close', onClose); + } + }); +} + // Platform detection const isWindows = process.platform === 'win32'; @@ -321,35 +362,25 @@ export async function startDaemon(options?: { let buffer = ''; let httpChecked = false; - socket.on('data', async (data) => { - buffer += data.toString(); - - // Security: Detect and reject HTTP requests to prevent cross-origin attacks. - // Browsers using fetch() must send HTTP headers (e.g., "POST / HTTP/1.1"), - // while legitimate clients send raw JSON starting with "{". - if (!httpChecked) { - httpChecked = true; - const trimmed = buffer.trimStart(); - if (/^(GET|POST|PUT|DELETE|HEAD|OPTIONS|PATCH|CONNECT|TRACE)\s/i.test(trimmed)) { - socket.destroy(); - return; - } - } + // Command serialization: queue incoming lines and process them one at a time. + // This prevents concurrent command execution which can cause socket.write + // buffer contention and EAGAIN errors on the Rust CLI side. + const commandQueue: string[] = []; + let processing = false; - // Process complete lines - while (buffer.includes('\n')) { - const newlineIdx = buffer.indexOf('\n'); - const line = buffer.substring(0, newlineIdx); - buffer = buffer.substring(newlineIdx + 1); + async function processQueue(): Promise { + if (processing) return; + processing = true; - if (!line.trim()) continue; + while (commandQueue.length > 0) { + const line = commandQueue.shift()!; try { const parseResult = parseCommand(line); if (!parseResult.success) { const resp = errorResponse(parseResult.id ?? 'unknown', parseResult.error); - socket.write(serializeResponse(resp) + '\n'); + await safeWrite(socket, serializeResponse(resp) + '\n'); continue; } @@ -363,10 +394,11 @@ export async function startDaemon(options?: { success: true as const, data: { devices }, }; - socket.write(serializeResponse(response) + '\n'); + await safeWrite(socket, serializeResponse(response) + '\n'); } catch (err) { const message = err instanceof Error ? err.message : String(err); - socket.write( + await safeWrite( + socket, serializeResponse(errorResponse(parseResult.command.id, message)) + '\n' ); } @@ -493,7 +525,7 @@ export async function startDaemon(options?: { isIOS && manager instanceof IOSManager ? await executeIOSCommand(parseResult.command, manager) : await executeCommand(parseResult.command, manager as BrowserManager); - socket.write(serializeResponse(response) + '\n'); + await safeWrite(socket, serializeResponse(response) + '\n'); if (!shuttingDown) { shuttingDown = true; @@ -503,6 +535,9 @@ export async function startDaemon(options?: { process.exit(0); }, 100); } + + commandQueue.length = 0; + processing = false; return; } @@ -520,12 +555,46 @@ export async function startDaemon(options?: { } } - socket.write(serializeResponse(response) + '\n'); + await safeWrite(socket, serializeResponse(response) + '\n'); } catch (err) { const message = err instanceof Error ? err.message : String(err); - socket.write(serializeResponse(errorResponse('error', message)) + '\n'); + await safeWrite(socket, serializeResponse(errorResponse('error', message)) + '\n').catch( + () => {} + ); // Socket may already be destroyed + } + } + + processing = false; + } + + socket.on('data', (data) => { + buffer += data.toString(); + + // Security: Detect and reject HTTP requests to prevent cross-origin attacks. + // Browsers using fetch() must send HTTP headers (e.g., "POST / HTTP/1.1"), + // while legitimate clients send raw JSON starting with "{". + if (!httpChecked) { + httpChecked = true; + const trimmed = buffer.trimStart(); + if (/^(GET|POST|PUT|DELETE|HEAD|OPTIONS|PATCH|CONNECT|TRACE)\s/i.test(trimmed)) { + socket.destroy(); + return; } } + + // Extract complete lines and enqueue them for serial processing + while (buffer.includes('\n')) { + const newlineIdx = buffer.indexOf('\n'); + const line = buffer.substring(0, newlineIdx); + buffer = buffer.substring(newlineIdx + 1); + + if (!line.trim()) continue; + commandQueue.push(line); + } + + processQueue().catch(() => { + // Socket write failures during queue processing are non-fatal + }); }); socket.on('error', () => {