Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions src/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ import {
ENCRYPTION_KEY_ENV,
} from './state-utils.js';

/**
* Returns the default Playwright timeout for standard operations.
* Can be overridden via the AGENT_BROWSER_DEFAULT_TIMEOUT environment variable.
* CDP and recording contexts use a shorter fixed timeout (10s) and are not affected.
*/
function getDefaultTimeout(): number {
const envValue = process.env.AGENT_BROWSER_DEFAULT_TIMEOUT;
if (envValue) {
const parsed = parseInt(envValue, 10);
if (!isNaN(parsed) && parsed > 0) {
return parsed;
}
}
return 60000;
}

// Screencast frame data from CDP
export interface ScreencastFrame {
data: string; // base64 encoded image
Expand Down Expand Up @@ -264,7 +280,7 @@ export class BrowserManager {
context = await this.browser.newContext({
...(this.colorScheme && { colorScheme: this.colorScheme }),
});
context.setDefaultTimeout(60000);
context.setDefaultTimeout(getDefaultTimeout());
this.contexts.push(context);
this.setupContextTracking(context);
} else {
Expand Down Expand Up @@ -1018,7 +1034,7 @@ export class BrowserManager {
this.kernelSessionId = session.session_id;
this.kernelApiKey = kernelApiKey;
this.browser = browser;
context.setDefaultTimeout(60000);
context.setDefaultTimeout(getDefaultTimeout());
this.contexts.push(context);
this.pages.push(page);
this.activePageIndex = 0;
Expand Down Expand Up @@ -1091,7 +1107,7 @@ export class BrowserManager {
this.browserUseSessionId = session.id;
this.browserUseApiKey = browserUseApiKey;
this.browser = browser;
context.setDefaultTimeout(60000);
context.setDefaultTimeout(getDefaultTimeout());
this.contexts.push(context);
this.pages.push(page);
this.activePageIndex = 0;
Expand Down Expand Up @@ -1346,7 +1362,7 @@ export class BrowserManager {
});
}

context.setDefaultTimeout(60000);
context.setDefaultTimeout(getDefaultTimeout());
this.contexts.push(context);
this.setupContextTracking(context);

Expand Down Expand Up @@ -1666,7 +1682,7 @@ export class BrowserManager {
viewport: viewport === undefined ? { width: 1280, height: 720 } : viewport,
...(this.colorScheme && { colorScheme: this.colorScheme }),
});
context.setDefaultTimeout(60000);
context.setDefaultTimeout(getDefaultTimeout());
this.contexts.push(context);
this.setupContextTracking(context);

Expand Down
121 changes: 95 additions & 26 deletions src/daemon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,47 @@ import {
// Manager type - either desktop browser or iOS
type Manager = BrowserManager | IOSManager;

/**
* Backpressure-aware socket write.
* If the kernel buffer is full (socket.write returns false),
* waits for the 'drain' event before resolving.
*/
function safeWrite(socket: net.Socket, payload: string): Promise<void> {
return new Promise((resolve, reject) => {
if (socket.destroyed) {
resolve();
return;
}
const canContinue = socket.write(payload);
if (canContinue) {
resolve();
} else if (socket.destroyed) {
resolve();
} else {
const cleanup = () => {
socket.removeListener('drain', onDrain);
socket.removeListener('error', onError);
socket.removeListener('close', onClose);
};
const onDrain = () => {
cleanup();
resolve();
};
const onError = (err: Error) => {
cleanup();
reject(err);
};
const onClose = () => {
cleanup();
resolve();
};
socket.once('drain', onDrain);
socket.once('error', onError);
socket.once('close', onClose);
}
});
}

// Platform detection
const isWindows = process.platform === 'win32';

Expand Down Expand Up @@ -321,35 +362,25 @@ export async function startDaemon(options?: {
let buffer = '';
let httpChecked = false;

socket.on('data', async (data) => {
buffer += data.toString();

// Security: Detect and reject HTTP requests to prevent cross-origin attacks.
// Browsers using fetch() must send HTTP headers (e.g., "POST / HTTP/1.1"),
// while legitimate clients send raw JSON starting with "{".
if (!httpChecked) {
httpChecked = true;
const trimmed = buffer.trimStart();
if (/^(GET|POST|PUT|DELETE|HEAD|OPTIONS|PATCH|CONNECT|TRACE)\s/i.test(trimmed)) {
socket.destroy();
return;
}
}
// Command serialization: queue incoming lines and process them one at a time.
// This prevents concurrent command execution which can cause socket.write
// buffer contention and EAGAIN errors on the Rust CLI side.
const commandQueue: string[] = [];
let processing = false;

// Process complete lines
while (buffer.includes('\n')) {
const newlineIdx = buffer.indexOf('\n');
const line = buffer.substring(0, newlineIdx);
buffer = buffer.substring(newlineIdx + 1);
async function processQueue(): Promise<void> {
if (processing) return;
processing = true;

if (!line.trim()) continue;
while (commandQueue.length > 0) {
const line = commandQueue.shift()!;

try {
const parseResult = parseCommand(line);

if (!parseResult.success) {
const resp = errorResponse(parseResult.id ?? 'unknown', parseResult.error);
socket.write(serializeResponse(resp) + '\n');
await safeWrite(socket, serializeResponse(resp) + '\n');
continue;
}

Expand All @@ -363,10 +394,11 @@ export async function startDaemon(options?: {
success: true as const,
data: { devices },
};
socket.write(serializeResponse(response) + '\n');
await safeWrite(socket, serializeResponse(response) + '\n');
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
socket.write(
await safeWrite(
socket,
serializeResponse(errorResponse(parseResult.command.id, message)) + '\n'
);
}
Expand Down Expand Up @@ -493,7 +525,7 @@ export async function startDaemon(options?: {
isIOS && manager instanceof IOSManager
? await executeIOSCommand(parseResult.command, manager)
: await executeCommand(parseResult.command, manager as BrowserManager);
socket.write(serializeResponse(response) + '\n');
await safeWrite(socket, serializeResponse(response) + '\n');

if (!shuttingDown) {
shuttingDown = true;
Expand All @@ -503,6 +535,9 @@ export async function startDaemon(options?: {
process.exit(0);
}, 100);
}

commandQueue.length = 0;
processing = false;
return;
}

Expand All @@ -520,12 +555,46 @@ export async function startDaemon(options?: {
}
}

socket.write(serializeResponse(response) + '\n');
await safeWrite(socket, serializeResponse(response) + '\n');
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
socket.write(serializeResponse(errorResponse('error', message)) + '\n');
await safeWrite(socket, serializeResponse(errorResponse('error', message)) + '\n').catch(
() => {}
); // Socket may already be destroyed
}
}

processing = false;
}

socket.on('data', (data) => {
buffer += data.toString();

// Security: Detect and reject HTTP requests to prevent cross-origin attacks.
// Browsers using fetch() must send HTTP headers (e.g., "POST / HTTP/1.1"),
// while legitimate clients send raw JSON starting with "{".
if (!httpChecked) {
httpChecked = true;
const trimmed = buffer.trimStart();
if (/^(GET|POST|PUT|DELETE|HEAD|OPTIONS|PATCH|CONNECT|TRACE)\s/i.test(trimmed)) {
socket.destroy();
return;
}
}

// Extract complete lines and enqueue them for serial processing
while (buffer.includes('\n')) {
const newlineIdx = buffer.indexOf('\n');
const line = buffer.substring(0, newlineIdx);
buffer = buffer.substring(newlineIdx + 1);

if (!line.trim()) continue;
commandQueue.push(line);
}

processQueue().catch(() => {
// Socket write failures during queue processing are non-fatal
});
});

socket.on('error', () => {
Expand Down