diff --git a/tempo/agents/browser.py b/tempo/agents/browser.py index 550599d..2a4a29b 100644 --- a/tempo/agents/browser.py +++ b/tempo/agents/browser.py @@ -18,6 +18,7 @@ from tempo.tools.registry import ToolRegistry from tempo.prompts.builder import PromptBuilder from tempo.infra.protocols import FindingsUpdater + from tempo.tools.network_capture import NetworkRequestCapture @register_agent @@ -108,6 +109,7 @@ def create_tools( def create_browser_use_agent( self, container: "ContainerManager | None" = None, + network_capture: "NetworkRequestCapture | None" = None, ): """ Create a browser-use Agent with custom tempo-sec tools. @@ -117,9 +119,10 @@ def create_browser_use_agent( Args: container: Container manager for shell/file operations + network_capture: Optional NetworkRequestCapture for network request visibility Returns: - Tuple of (browser-use Agent instance, Browser instance, Tools instance) + Tuple of (browser-use Agent instance, Browser instance, Tools instance, NetworkRequestCapture or None) """ from browser_use import Agent, Browser from browser_use.tools.service import Tools @@ -148,6 +151,7 @@ def create_browser_use_agent( imap_config=self.imap_config, agent_email=self.agent_email, tools_instance=shared_tools, # Pass shared instance + network_capture=network_capture, # Pass network capture for tool registration ) # Select LLM based on configuration diff --git a/tempo/prompts/templates/browser_base.txt b/tempo/prompts/templates/browser_base.txt index d61d9c9..585e105 100644 --- a/tempo/prompts/templates/browser_base.txt +++ b/tempo/prompts/templates/browser_base.txt @@ -112,6 +112,31 @@ Review prior work to avoid duplication and build on existing knowledge. - Use `file_write` to create test scripts, payloads, or save extracted data - Use findings tools to document discoveries for the security assessment - Use `read_findings` to check what's already been tested +- Use `get_network_requests` to inspect API calls made by the page (see below) + +## Network Request Analysis + +Use `get_network_requests` to inspect XHR/fetch requests made by the web application: + +**When to use:** +- Analyzing API endpoints and request/response patterns +- Capturing authentication tokens (bearer tokens, API keys, session cookies) +- Understanding client-side API interactions on modern SPAs +- Finding hidden API endpoints not visible in the page source +- Bypassing WAF restrictions by replaying legitimate browser requests + +**Parameters:** +- `filter_type`: Filter by "xhr", "fetch", or "all" (default: both xhr and fetch) +- `include_bodies`: Set to `true` to include request/response bodies (useful for API analysis) +- `max_requests`: Maximum requests to return (default: 30) + +**Note:** If output is truncated, the full data is saved to a file. Use `file_read` on the saved path to access complete request history. + +**Example workflow:** +1. Navigate to a page that makes API calls +2. Call `get_network_requests()` to see captured requests +3. If you need request/response bodies, call `get_network_requests(include_bodies=true)` +4. Use captured tokens in `shell` commands for direct API testing ## Common Testing Scenarios diff --git a/tempo/tools/browser_task_tool.py b/tempo/tools/browser_task_tool.py index fefb29a..2d401fa 100644 --- a/tempo/tools/browser_task_tool.py +++ b/tempo/tools/browser_task_tool.py @@ -87,6 +87,7 @@ async def execute( import uuid import os from tempo.agents.browser import BrowserAgent + from tempo.tools.network_capture import NetworkRequestCapture # Fixed step limit for browser agent max_steps = 50 @@ -102,6 +103,12 @@ async def execute( "password": imap_password, } + # Create network request capture for API traffic visibility + network_capture = NetworkRequestCapture( + output_dir="/workspace", + capture_response_bodies=True, # Queue bodies for on-demand fetch via get_network_requests(include_bodies=True) + ) + # Create browser agent definition browser_agent_def = BrowserAgent( task=task, @@ -114,11 +121,29 @@ async def execute( use_browser_use_llm=use_browser_use_llm, ) - # Create browser-use agent with custom tools + # Create browser-use agent with custom tools (including network capture tool) browser_use_agent, browser, combined_tools = browser_agent_def.create_browser_use_agent( container=self._container_manager, + network_capture=network_capture, ) + # Attach network capture to browser session after it's available + # Note: The browser session exists on the agent, but CDP is set up during run() + # We'll attach after the browser session starts via a background hook + async def attach_network_capture_after_start(): + """Wait for browser session to have CDP client, then attach network capture.""" + for _ in range(50): # Wait up to 5 seconds + if hasattr(browser_use_agent, 'browser_session'): + session = browser_use_agent.browser_session + if session and getattr(session, '_cdp_client_root', None): + await network_capture.attach(session) + return + await asyncio.sleep(0.1) + + # Start the attachment in background (will complete during agent.run()) + import asyncio + attach_task = asyncio.create_task(attach_network_capture_after_start()) + # --- Visualization bridge for browser-use actions --- # browser-use executes browser interactions via combined_tools.registry.execute_action(...) # The Tempo visualization UI expects events via VisualizationCollector.on_step/on_tool_call_start/on_tool_call. @@ -322,6 +347,13 @@ async def _execute_action_with_viz(*args, **kwargs): ) finally: + # Clean up background attach task if still running + try: + if 'attach_task' in locals() and not attach_task.done(): + attach_task.cancel() + except Exception: + pass + # Clean up browser resources try: await browser.close() diff --git a/tempo/tools/browser_use_adapter.py b/tempo/tools/browser_use_adapter.py index 5bd13eb..6a6a4d1 100644 --- a/tempo/tools/browser_use_adapter.py +++ b/tempo/tools/browser_use_adapter.py @@ -36,6 +36,7 @@ def _run_async(coro: Coroutine[None, None, T]) -> T: if TYPE_CHECKING: from tempo.infra.container import ContainerManager from tempo.infra.protocols import FindingsUpdater + from tempo.tools.network_capture import NetworkRequestCapture class BrowserUseToolsAdapter: @@ -48,6 +49,7 @@ def __init__( imap_config: dict | None = None, agent_email: str | None = None, tools_instance: Tools | None = None, + network_capture: "NetworkRequestCapture | None" = None, ): """ Initialize the tools adapter. @@ -58,18 +60,21 @@ def __init__( imap_config: IMAP configuration for email tools agent_email: Email address for verification flows tools_instance: Optional shared Tools instance (if None, creates new) + network_capture: Optional NetworkRequestCapture for network requests """ self.container = container_manager self.findings = findings_updater self.imap_config = imap_config or {} self.agent_email = agent_email self.tools = tools_instance if tools_instance is not None else Tools() + self.network_capture = network_capture # Register all custom tools self._register_findings_tools() self._register_shell_tools() self._register_filesystem_tools() self._register_email_tools() + self._register_network_tools() def _register_findings_tools(self): """Register findings management tools.""" @@ -377,6 +382,61 @@ def get_email_content(message_id: str) -> str: return f"Error: {result.error}" return result.output + def _register_network_tools(self): + """Register network request capture tools.""" + if not self.network_capture: + return + + network_capture = self.network_capture + + @self.tools.action( + "Get captured network requests (XHR/fetch) from the current page" + ) + def get_network_requests( + filter_type: str = "", + include_bodies: bool = False, + max_requests: int = 30, + ) -> str: + """ + Get captured XHR/fetch network requests from the current page. + + Use this to analyze API calls made by the web application: + - View API endpoints and request/response patterns + - Capture authentication tokens and headers for security testing + - Understand client-side API interactions + + Note: Requests are saved on page navigation. Use file_read on the + saved file path to access complete request history if output is truncated. + + Args: + filter_type: Filter by type: "xhr", "fetch", "all" (default: xhr+fetch) + include_bodies: Include request/response bodies (use for API analysis) + max_requests: Maximum requests to return (default: 30) + + Returns: + Formatted list of network requests with headers and optional bodies + """ + # Use async version if bodies requested (to fetch via CDP) + if include_bodies: + output, saved_file = _run_async( + network_capture.get_requests_async( + filter_type=filter_type, + include_bodies=include_bodies, + max_requests=max_requests, + ) + ) + else: + output, saved_file = network_capture.get_requests( + filter_type=filter_type, + include_bodies=include_bodies, + max_requests=max_requests, + ) + + if saved_file: + return f"{output}\n\nFull data saved to: {saved_file}" + return output + def get_tools(self) -> Tools: """Return the browser-use Tools instance.""" return self.tools + diff --git a/tempo/tools/network_capture.py b/tempo/tools/network_capture.py new file mode 100644 index 0000000..eaf2398 --- /dev/null +++ b/tempo/tools/network_capture.py @@ -0,0 +1,478 @@ +""" +Network Request Capture for Browser Agent. + +Captures XHR/fetch requests via CDP Network domain events, +providing visibility into API calls made by web pages. +""" + +import asyncio +import json +import os +from collections import deque +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from browser_use.browser.session import BrowserSession + + +@dataclass +class NetworkRequest: + """Captured network request/response data.""" + + request_id: str + url: str + method: str + resource_type: str # XHR, Fetch, Document, Script, etc. + request_headers: dict[str, str] = field(default_factory=dict) + request_body: str | None = None # POST body if available + response_status: int | None = None + response_headers: dict[str, str] | None = None + response_body: str | None = None # Body if captured + timestamp: float = 0.0 + completed: bool = False + error: str | None = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "request_id": self.request_id, + "url": self.url, + "method": self.method, + "resource_type": self.resource_type, + "request_headers": self.request_headers, + "request_body": self.request_body, + "response_status": self.response_status, + "response_headers": self.response_headers, + "response_body": self.response_body, + "timestamp": self.timestamp, + "completed": self.completed, + "error": self.error, + } + + def format_summary(self, include_body: bool = False) -> str: + """Format request as a human-readable summary.""" + lines = [ + f"[{self.method}] {self.url}", + f" Type: {self.resource_type}", + ] + + if self.response_status: + lines.append(f" Status: {self.response_status}") + + if self.error: + lines.append(f" Error: {self.error}") + + # Show key headers + interesting_headers = [ + "authorization", + "x-api-key", + "cookie", + "content-type", + "x-csrf-token", + "x-requested-with", + ] + for header in interesting_headers: + if header in self.request_headers: + value = self.request_headers[header] + # Truncate long values + if len(value) > 100: + value = value[:100] + "..." + lines.append(f" {header}: {value}") + + if include_body and self.request_body: + body_preview = self.request_body[:500] + if len(self.request_body) > 500: + body_preview += f"... ({len(self.request_body)} chars total)" + lines.append(f" Request Body: {body_preview}") + + if include_body and self.response_body: + body_preview = self.response_body[:500] + if len(self.response_body) > 500: + body_preview += f"... ({len(self.response_body)} chars total)" + lines.append(f" Response Body: {body_preview}") + + return "\n".join(lines) + + +class NetworkRequestCapture: + """ + Captures network requests via CDP Network domain events. + + Attaches to a browser session and listens for XHR/fetch requests. + On navigation, saves full history to file and clears in-memory buffer. + """ + + # Resource types to capture by default (XHR and Fetch for API calls) + DEFAULT_RESOURCE_TYPES = {"XHR", "Fetch"} + + # Maximum requests to keep in memory + MAX_REQUESTS = 100 + + # Character limit for displayed output + DISPLAY_CHAR_LIMIT = 45000 + + # Maximum response body size to capture (1MB limit to prevent memory issues) + MAX_BODY_SIZE = 1024 * 1024 + + # Content types worth capturing (skip images, fonts, etc.) + CAPTURABLE_CONTENT_TYPES = { + "application/json", + "application/xml", + "text/html", + "text/plain", + "text/xml", + "application/javascript", + "text/javascript", + } + + def __init__( + self, + output_dir: str = "/workspace", + capture_response_bodies: bool = False, + resource_types: set[str] | None = None, + ): + """ + Initialize network request capture. + + Args: + output_dir: Directory for saving full request logs + capture_response_bodies: Whether to capture response bodies (memory intensive) + resource_types: Resource types to capture (default: XHR, Fetch) + """ + self._requests: deque[NetworkRequest] = deque(maxlen=self.MAX_REQUESTS) + self._request_map: dict[str, NetworkRequest] = {} # request_id -> request + self._pending_body_requests: set[str] = set() # request_ids needing body fetch + self._output_dir = output_dir + self._capture_response_bodies = capture_response_bodies + self._resource_types = resource_types or self.DEFAULT_RESOURCE_TYPES + self._navigation_count = 0 + self._session: "BrowserSession | None" = None + self._attached = False + + async def attach(self, browser_session: "BrowserSession") -> None: + """ + Attach to a browser session to capture network requests. + + Args: + browser_session: The browser-use BrowserSession instance + """ + if self._attached: + return + + self._session = browser_session + + # Get the CDP client from session + cdp_client = browser_session._cdp_client_root + if not cdp_client: + return + + # Register handlers for network events + cdp_client.register.Network.requestWillBeSent(self._on_request_will_be_sent) + cdp_client.register.Network.responseReceived(self._on_response_received) + cdp_client.register.Network.loadingFinished(self._on_loading_finished) + cdp_client.register.Network.loadingFailed(self._on_loading_failed) + + # Register handler for navigation (to clear/save requests) + cdp_client.register.Page.frameNavigated(self._on_frame_navigated) + + self._attached = True + + def _on_request_will_be_sent( + self, event: dict[str, Any], session_id: str | None = None + ) -> None: + """Handle Network.requestWillBeSent event.""" + request_id = event.get("requestId", "") + request = event.get("request", {}) + resource_type = event.get("type", "Other") + + # Filter by resource type + if resource_type not in self._resource_types: + return + + url = request.get("url", "") + method = request.get("method", "GET") + headers = request.get("headers", {}) + post_data = request.get("postData") + + # Normalize headers to lowercase keys + normalized_headers = {k.lower(): v for k, v in headers.items()} + + network_request = NetworkRequest( + request_id=request_id, + url=url, + method=method, + resource_type=resource_type, + request_headers=normalized_headers, + request_body=post_data, + timestamp=datetime.now(timezone.utc).timestamp(), + ) + + self._requests.append(network_request) + self._request_map[request_id] = network_request + + def _on_response_received( + self, event: dict[str, Any], session_id: str | None = None + ) -> None: + """Handle Network.responseReceived event.""" + request_id = event.get("requestId", "") + + if request_id not in self._request_map: + return + + network_request = self._request_map[request_id] + response = event.get("response", {}) + + network_request.response_status = response.get("status") + + # Capture response headers + headers = response.get("headers", {}) + network_request.response_headers = {k.lower(): v for k, v in headers.items()} + + def _on_loading_finished( + self, event: dict[str, Any], session_id: str | None = None + ) -> None: + """Handle Network.loadingFinished event.""" + request_id = event.get("requestId", "") + + if request_id not in self._request_map: + return + + network_request = self._request_map[request_id] + network_request.completed = True + + # Queue for body capture if enabled and content type is worth capturing + if self._capture_response_bodies and network_request.response_headers: + content_type = network_request.response_headers.get("content-type", "") + # Check if content type starts with any capturable type + if any(ct in content_type for ct in self.CAPTURABLE_CONTENT_TYPES): + self._pending_body_requests.add(request_id) + + def _on_loading_failed( + self, event: dict[str, Any], session_id: str | None = None + ) -> None: + """Handle Network.loadingFailed event.""" + request_id = event.get("requestId", "") + + if request_id not in self._request_map: + return + + network_request = self._request_map[request_id] + network_request.completed = True + network_request.error = event.get("errorText", "Unknown error") + + def _on_frame_navigated( + self, event: dict[str, Any], session_id: str | None = None + ) -> None: + """Handle Page.frameNavigated event - save and clear requests.""" + # Only act on main frame navigations + frame = event.get("frame", {}) + if frame.get("parentId"): + # This is an iframe navigation, ignore + return + + # Save current requests to file before clearing + if self._requests: + self._save_requests_to_file() + + # Clear in-memory state + self._requests.clear() + self._request_map.clear() + self._pending_body_requests.clear() + self._navigation_count += 1 + + async def fetch_pending_bodies(self) -> int: + """ + Fetch response bodies for pending requests via CDP. + + This is called on-demand when include_bodies=True is requested. + Bodies are fetched asynchronously via Network.getResponseBody. + + Returns: + Number of bodies successfully fetched + """ + if not self._session or not self._pending_body_requests: + return 0 + + cdp_client = getattr(self._session, "_cdp_client_root", None) + if not cdp_client: + return 0 + + fetched = 0 + to_remove = set() + + for request_id in list(self._pending_body_requests): + if request_id not in self._request_map: + to_remove.add(request_id) + continue + + network_request = self._request_map[request_id] + if network_request.response_body is not None: + # Already fetched + to_remove.add(request_id) + continue + + try: + # Call CDP command to get response body + result = await cdp_client.send( + "Network.getResponseBody", + {"requestId": request_id}, + ) + + body = result.get("body", "") + is_base64 = result.get("base64Encoded", False) + + if is_base64: + # Skip binary content (images, etc.) + network_request.response_body = "[Binary content - base64 encoded]" + elif len(body) > self.MAX_BODY_SIZE: + network_request.response_body = ( + body[: self.MAX_BODY_SIZE] + + f"\n[TRUNCATED - response was {len(body)} bytes]" + ) + else: + network_request.response_body = body + + fetched += 1 + to_remove.add(request_id) + + except Exception: + # Request may have been cleaned up by browser, skip it + to_remove.add(request_id) + + # Clean up processed requests + self._pending_body_requests -= to_remove + + return fetched + + def _save_requests_to_file(self) -> str | None: + """Save all captured requests to a JSON file.""" + if not self._requests: + return None + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + filename = f"network_requests_{timestamp}.json" + filepath = os.path.join(self._output_dir, filename) + + try: + data = { + "captured_at": timestamp, + "navigation_count": self._navigation_count, + "request_count": len(self._requests), + "requests": [r.to_dict() for r in self._requests], + } + + os.makedirs(self._output_dir, exist_ok=True) + with open(filepath, "w") as f: + json.dump(data, f, indent=2) + + return filepath + except Exception: + return None + + def get_requests( + self, + filter_type: str = "", + include_bodies: bool = False, + max_requests: int = 50, + ) -> tuple[str, str | None]: + """ + Get formatted summary of captured requests. + + Args: + filter_type: Filter by resource type (xhr, fetch, all, or empty for default) + include_bodies: Include request/response bodies + max_requests: Maximum number of requests to return + + Returns: + Tuple of (formatted output, file path if truncated) + """ + # Determine which requests to include + requests_to_show = list(self._requests) + + if filter_type: + filter_type_upper = filter_type.upper() + if filter_type_upper != "ALL": + requests_to_show = [ + r for r in requests_to_show if r.resource_type == filter_type_upper + ] + + # Limit to max_requests (most recent) + if len(requests_to_show) > max_requests: + requests_to_show = requests_to_show[-max_requests:] + + if not requests_to_show: + return "No network requests captured.", None + + # Format output + lines = [ + f"Captured {len(requests_to_show)} network requests " + f"(showing {min(len(requests_to_show), max_requests)}):", + "", + ] + + for i, req in enumerate(requests_to_show, 1): + lines.append(f"--- Request {i} ---") + lines.append(req.format_summary(include_body=include_bodies)) + lines.append("") + + full_output = "\n".join(lines) + + # Check if we need to truncate and save to file + saved_file = None + if len(full_output) > self.DISPLAY_CHAR_LIMIT: + saved_file = self._save_requests_to_file() + truncated_output = full_output[: self.DISPLAY_CHAR_LIMIT] + truncated_output += ( + f"\n\n[OUTPUT TRUNCATED - {len(full_output)} chars total]\n" + ) + if saved_file: + truncated_output += ( + f"Full output saved to: {saved_file}\n" + f"Use file_read tool to view complete data." + ) + return truncated_output, saved_file + + return full_output, None + + async def get_requests_async( + self, + filter_type: str = "", + include_bodies: bool = False, + max_requests: int = 50, + ) -> tuple[str, str | None]: + """ + Async version of get_requests that fetches response bodies on-demand. + + When include_bodies=True, this will fetch any pending response bodies + via CDP before formatting the output. + + Args: + filter_type: Filter by resource type (xhr, fetch, all, or empty for default) + include_bodies: Include request/response bodies (triggers async fetch) + max_requests: Maximum number of requests to return + + Returns: + Tuple of (formatted output, file path if truncated) + """ + # Fetch pending bodies if requested + if include_bodies and self._pending_body_requests: + await self.fetch_pending_bodies() + + # Delegate to sync method for formatting + return self.get_requests( + filter_type=filter_type, + include_bodies=include_bodies, + max_requests=max_requests, + ) + + def get_request_count(self) -> int: + """Get the number of captured requests.""" + return len(self._requests) + + def clear(self) -> None: + """Clear all captured requests.""" + self._requests.clear() + self._request_map.clear() + self._pending_body_requests.clear() diff --git a/tests/tools/test_network_capture.py b/tests/tools/test_network_capture.py new file mode 100644 index 0000000..b826420 --- /dev/null +++ b/tests/tools/test_network_capture.py @@ -0,0 +1,395 @@ +"""Tests for NetworkRequestCapture functionality.""" + +import pytest +from unittest.mock import MagicMock, AsyncMock +import asyncio + +from tempo.tools.network_capture import NetworkRequest, NetworkRequestCapture + + +class TestNetworkRequest: + """Tests for NetworkRequest dataclass.""" + + def test_create_network_request(self): + """Test creating a NetworkRequest.""" + req = NetworkRequest( + request_id="123", + url="https://api.example.com/users", + method="GET", + resource_type="XHR", + ) + assert req.request_id == "123" + assert req.url == "https://api.example.com/users" + assert req.method == "GET" + assert req.resource_type == "XHR" + assert req.completed is False + assert req.response_status is None + + def test_to_dict(self): + """Test converting NetworkRequest to dict.""" + req = NetworkRequest( + request_id="456", + url="https://api.example.com/data", + method="POST", + resource_type="Fetch", + request_body='{"key": "value"}', + response_status=200, + ) + data = req.to_dict() + assert data["request_id"] == "456" + assert data["method"] == "POST" + assert data["request_body"] == '{"key": "value"}' + assert data["response_status"] == 200 + + def test_format_summary_basic(self): + """Test basic summary formatting.""" + req = NetworkRequest( + request_id="789", + url="https://api.example.com/endpoint", + method="GET", + resource_type="XHR", + response_status=200, + ) + summary = req.format_summary() + assert "[GET]" in summary + assert "https://api.example.com/endpoint" in summary + assert "Status: 200" in summary + + def test_format_summary_with_headers(self): + """Test summary includes interesting headers.""" + req = NetworkRequest( + request_id="abc", + url="https://api.example.com/secure", + method="POST", + resource_type="XHR", + request_headers={ + "authorization": "Bearer token123", + "content-type": "application/json", + }, + ) + summary = req.format_summary() + assert "authorization:" in summary + assert "Bearer token123" in summary + assert "content-type:" in summary + + def test_format_summary_with_body(self): + """Test summary includes body when requested.""" + req = NetworkRequest( + request_id="def", + url="https://api.example.com/data", + method="POST", + resource_type="Fetch", + request_body='{"username": "test"}', + response_body='{"success": true}', + ) + summary = req.format_summary(include_body=True) + assert "Request Body:" in summary + assert "username" in summary + assert "Response Body:" in summary + assert "success" in summary + + +class TestNetworkRequestCapture: + """Tests for NetworkRequestCapture class.""" + + def test_init_defaults(self): + """Test default initialization.""" + capture = NetworkRequestCapture() + assert capture.get_request_count() == 0 + assert capture._capture_response_bodies is False + assert "XHR" in capture._resource_types + assert "Fetch" in capture._resource_types + + def test_init_custom_settings(self): + """Test custom initialization.""" + capture = NetworkRequestCapture( + output_dir="/tmp/test", + capture_response_bodies=True, + resource_types={"Script", "Document"}, + ) + assert capture._output_dir == "/tmp/test" + assert capture._capture_response_bodies is True + assert "Script" in capture._resource_types + assert "XHR" not in capture._resource_types + + def test_on_request_will_be_sent_xhr(self): + """Test handling XHR request event.""" + capture = NetworkRequestCapture() + + event = { + "requestId": "req-001", + "type": "XHR", + "request": { + "url": "https://api.example.com/data", + "method": "POST", + "headers": {"Authorization": "Bearer xyz"}, + "postData": '{"test": true}', + }, + } + + capture._on_request_will_be_sent(event) + + assert capture.get_request_count() == 1 + assert "req-001" in capture._request_map + req = capture._request_map["req-001"] + assert req.url == "https://api.example.com/data" + assert req.method == "POST" + assert req.request_body == '{"test": true}' + assert "authorization" in req.request_headers + + def test_on_request_will_be_sent_filters_other_types(self): + """Test that non-XHR/Fetch requests are filtered.""" + capture = NetworkRequestCapture() + + # Script request should be ignored + event = { + "requestId": "req-002", + "type": "Script", + "request": { + "url": "https://example.com/app.js", + "method": "GET", + "headers": {}, + }, + } + + capture._on_request_will_be_sent(event) + + assert capture.get_request_count() == 0 + + def test_on_response_received(self): + """Test handling response event.""" + capture = NetworkRequestCapture() + + # First add a request + capture._on_request_will_be_sent({ + "requestId": "req-003", + "type": "Fetch", + "request": { + "url": "https://api.example.com/users", + "method": "GET", + "headers": {}, + }, + }) + + # Then handle response + capture._on_response_received({ + "requestId": "req-003", + "response": { + "status": 200, + "headers": {"Content-Type": "application/json"}, + }, + }) + + req = capture._request_map["req-003"] + assert req.response_status == 200 + assert "content-type" in req.response_headers + + def test_on_loading_finished(self): + """Test handling loading finished event.""" + capture = NetworkRequestCapture() + + # Add a request + capture._on_request_will_be_sent({ + "requestId": "req-004", + "type": "XHR", + "request": { + "url": "https://api.example.com/test", + "method": "GET", + "headers": {}, + }, + }) + + assert capture._request_map["req-004"].completed is False + + # Mark as finished + capture._on_loading_finished({"requestId": "req-004"}) + + assert capture._request_map["req-004"].completed is True + + def test_on_loading_failed(self): + """Test handling loading failed event.""" + capture = NetworkRequestCapture() + + # Add a request + capture._on_request_will_be_sent({ + "requestId": "req-005", + "type": "Fetch", + "request": { + "url": "https://api.example.com/fail", + "method": "POST", + "headers": {}, + }, + }) + + # Mark as failed + capture._on_loading_failed({ + "requestId": "req-005", + "errorText": "net::ERR_CONNECTION_REFUSED", + }) + + req = capture._request_map["req-005"] + assert req.completed is True + assert req.error == "net::ERR_CONNECTION_REFUSED" + + def test_get_requests_empty(self): + """Test get_requests with no captured requests.""" + capture = NetworkRequestCapture() + output, saved_file = capture.get_requests() + assert "No network requests captured" in output + assert saved_file is None + + def test_get_requests_with_data(self): + """Test get_requests returns formatted data.""" + capture = NetworkRequestCapture() + + # Add some requests + capture._on_request_will_be_sent({ + "requestId": "req-006", + "type": "XHR", + "request": { + "url": "https://api.example.com/users", + "method": "GET", + "headers": {}, + }, + }) + capture._on_response_received({ + "requestId": "req-006", + "response": {"status": 200, "headers": {}}, + }) + + output, saved_file = capture.get_requests() + + assert "Captured 1 network requests" in output + assert "[GET]" in output + assert "https://api.example.com/users" in output + assert saved_file is None # Small output shouldn't trigger file save + + def test_clear(self): + """Test clearing captured requests.""" + capture = NetworkRequestCapture() + + # Add a request + capture._on_request_will_be_sent({ + "requestId": "req-007", + "type": "XHR", + "request": { + "url": "https://api.example.com/test", + "method": "GET", + "headers": {}, + }, + }) + + assert capture.get_request_count() == 1 + + capture.clear() + + assert capture.get_request_count() == 0 + assert len(capture._request_map) == 0 + + def test_max_requests_limit(self): + """Test that requests are limited by MAX_REQUESTS.""" + capture = NetworkRequestCapture() + # Override MAX_REQUESTS for testing + capture._requests = type(capture._requests)(maxlen=5) + + # Add more requests than the limit + for i in range(10): + capture._on_request_will_be_sent({ + "requestId": f"req-{i:03d}", + "type": "XHR", + "request": { + "url": f"https://api.example.com/item/{i}", + "method": "GET", + "headers": {}, + }, + }) + + # Should only keep the most recent 5 + assert capture.get_request_count() == 5 + + def test_pending_body_requests_queued_on_loading_finished(self): + """Test that requests with capturable content types are queued for body fetch.""" + capture = NetworkRequestCapture(capture_response_bodies=True) + + # Add request with response headers + capture._on_request_will_be_sent({ + "requestId": "req-body-001", + "type": "XHR", + "request": { + "url": "https://api.example.com/data", + "method": "GET", + "headers": {}, + }, + }) + + # Add response with JSON content type + capture._on_response_received({ + "requestId": "req-body-001", + "response": { + "status": 200, + "headers": {"Content-Type": "application/json; charset=utf-8"}, + }, + }) + + # Loading finished should queue for body capture + capture._on_loading_finished({"requestId": "req-body-001"}) + + assert "req-body-001" in capture._pending_body_requests + + def test_pending_body_requests_not_queued_for_binary(self): + """Test that binary content types are not queued for body fetch.""" + capture = NetworkRequestCapture(capture_response_bodies=True) + + capture._on_request_will_be_sent({ + "requestId": "req-img-001", + "type": "XHR", + "request": { + "url": "https://api.example.com/image.png", + "method": "GET", + "headers": {}, + }, + }) + + capture._on_response_received({ + "requestId": "req-img-001", + "response": { + "status": 200, + "headers": {"Content-Type": "image/png"}, + }, + }) + + capture._on_loading_finished({"requestId": "req-img-001"}) + + # Should NOT be queued for body capture + assert "req-img-001" not in capture._pending_body_requests + + def test_clear_also_clears_pending_body_requests(self): + """Test that clear() also clears pending body requests.""" + capture = NetworkRequestCapture(capture_response_bodies=True) + + # Queue a pending body request + capture._pending_body_requests.add("test-request-id") + + capture.clear() + + assert len(capture._pending_body_requests) == 0 + + async def test_get_requests_async(self): + """Test async get_requests_async method.""" + capture = NetworkRequestCapture() + + # Add a request + capture._on_request_will_be_sent({ + "requestId": "async-001", + "type": "Fetch", + "request": { + "url": "https://api.example.com/async", + "method": "GET", + "headers": {}, + }, + }) + + output, saved_file = await capture.get_requests_async() + + assert "https://api.example.com/async" in output + assert saved_file is None