From 28dc52062d2b04e3f63c1ee2eef8451d80966c59 Mon Sep 17 00:00:00 2001 From: jsboigeEpita Date: Sat, 30 May 2026 13:19:10 +0200 Subject: [PATCH] feat(web-search): intercept web search tool calls and execute via SearXNG MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When providers (Z.AI, GLM, OpenRouter) emit WebSearch/WebFetch tool calls or GLM's tags, intercept them and execute via a local SearXNG instance instead of letting them fail. This covers three interception points: 1. Sub-agent requests in proxy-server.ts: Claude Code's WebSearch tool fires a sub-agent with a single user message. Intercept before handler selection and return SearXNG results. 2. Structured tool calls in openai-sse.ts: When the OpenAI SSE parser detects a WebSearch/WebFetch tool call, suppress it, execute SearXNG, and inject results as a text block in the stream. 3. GLM tags: Detect these in accumulated text and execute SearXNG for each query, injecting results as text. Also handles empty responses (no content blocks) with a synthetic error message so downstream doesn't hang. New file: web-search-executor.ts — stream-safe SearXNG client with deadline-based timeout. Co-Authored-By: Claude Opus 4.8 --- .../shared/stream-parsers/openai-sse.ts | 92 +++++++++++- .../handlers/shared/web-search-executor.ts | 114 +++++++++++++++ packages/cli/src/proxy-server.ts | 136 ++++++++++++++++++ 3 files changed, 341 insertions(+), 1 deletion(-) create mode 100644 packages/cli/src/handlers/shared/web-search-executor.ts diff --git a/packages/cli/src/handlers/shared/stream-parsers/openai-sse.ts b/packages/cli/src/handlers/shared/stream-parsers/openai-sse.ts index ff73199a..ad5e1417 100644 --- a/packages/cli/src/handlers/shared/stream-parsers/openai-sse.ts +++ b/packages/cli/src/handlers/shared/stream-parsers/openai-sse.ts @@ -16,6 +16,7 @@ import { type ToolSchema, } from "../tool-call-recovery.js"; import { isWebSearchToolCall, warnWebSearchUnsupported } from "../web-search-detector.js"; +import { executeWebSearch, extractSearchQuery } from "../web-search-executor.js"; export interface StreamingState { usage: any; @@ -39,6 +40,7 @@ export interface ToolState { closed: boolean; arguments: string; // Accumulated JSON arguments string buffered: boolean; // Whether we're buffering args until tool call completes + suppressed?: boolean; // Whether this tool call was suppressed (e.g. web search intercepted) } /** @@ -195,6 +197,22 @@ export function createStreamingResponseHandler( } } + // Detect GLM tags in accumulated text and execute via SearXNG. + // GLM models emit query when they want to search. + const searchWebRegex = /([\s\S]*?)<\/searchWeb>/g; + let searchMatch: RegExpExecArray | null; + const searchQueries: string[] = []; + let cleanText = state.accumulatedText; + while ((searchMatch = searchWebRegex.exec(state.accumulatedText)) !== null) { + searchQueries.push(searchMatch[1].trim()); + } + if (searchQueries.length > 0) { + log(`[Streaming] Found ${searchQueries.length} tag(s) in text`); + cleanText = state.accumulatedText.replace(searchWebRegex, "").trim(); + // Strip the search tags from output if text block is still open + // Results will be appended after the search queries + } + if (state.reasoningStarted) { send("content_block_stop", { type: "content_block_stop", index: state.reasoningIdx }); } @@ -202,6 +220,55 @@ export function createStreamingResponseHandler( send("content_block_stop", { type: "content_block_stop", index: state.textIdx }); } + // Handle suppressed web search tool calls — execute via SearXNG and inject results as text. + for (const t of Array.from(state.tools.values())) { + if (t.suppressed && !t.closed) { + const query = extractSearchQuery(t.arguments); + log(`[Streaming] Executing suppressed web search: "${query}"`); + const results = await executeWebSearch(query); + log(`[Streaming] Web search results: ${results.length} chars`); + + // Inject results as a text block + if (!state.textStarted) { + state.textIdx = state.curIdx++; + send("content_block_start", { + type: "content_block_start", + index: state.textIdx, + content_block: { type: "text", text: "" }, + }); + state.textStarted = true; + } + send("content_block_delta", { + type: "content_block_delta", + index: state.textIdx, + delta: { type: "text_delta", text: results }, + }); + t.closed = true; + } + } + + // Execute queries found in accumulated text + if (searchQueries.length > 0) { + for (const query of searchQueries) { + log(`[Streaming] Executing query: "${query}"`); + const results = await executeWebSearch(query); + if (!state.textStarted) { + state.textIdx = state.curIdx++; + send("content_block_start", { + type: "content_block_start", + index: state.textIdx, + content_block: { type: "text", text: "" }, + }); + state.textStarted = true; + } + send("content_block_delta", { + type: "content_block_delta", + index: state.textIdx, + delta: { type: "text_delta", text: results }, + }); + } + } + // Handle buffered-but-unsent structured tool calls. // Some models (e.g., Gemini via LiteLLM) send tool calls with finish_reason="stop" // instead of "tool_calls", so the normal validation path (line ~695) is never reached. @@ -279,6 +346,27 @@ export function createStreamingResponseHandler( } } + // Handle empty response — no text blocks and no tool calls were started. + // This can happen with some providers that return no content at all. + if (!state.textStarted && !Array.from(state.tools.values()).some((t) => t.started)) { + log(`[Streaming] Empty response — no text or tool blocks started, injecting error message`); + const emptyIdx = state.curIdx++; + send("content_block_start", { + type: "content_block_start", + index: emptyIdx, + content_block: { type: "text", text: "" }, + }); + send("content_block_delta", { + type: "content_block_delta", + index: emptyIdx, + delta: { + type: "text_delta", + text: "[Error: The model returned an empty response. Try compacting the conversation or reducing the context size.]", + }, + }); + send("content_block_stop", { type: "content_block_stop", index: emptyIdx }); + } + if (middlewareManager) { await middlewareManager.afterStreamComplete(target, streamMetadata); } @@ -506,7 +594,9 @@ export function createStreamingResponseHandler( }; state.tools.set(idx, t); if (isWebSearchToolCall(restoredName)) { - warnWebSearchUnsupported(restoredName, target); + log(`[Streaming] Web search tool call '${restoredName}' detected — intercepting via SearXNG`); + t.suppressed = true; + t.buffered = true; } } // Only send content_block_start immediately if NOT buffering diff --git a/packages/cli/src/handlers/shared/web-search-executor.ts b/packages/cli/src/handlers/shared/web-search-executor.ts new file mode 100644 index 00000000..c35b6ac7 --- /dev/null +++ b/packages/cli/src/handlers/shared/web-search-executor.ts @@ -0,0 +1,114 @@ +/** + * Web search executor — intercepts web_search tool calls from providers + * (z.ai, GLM) and executes them via SearXNG. + * + * When a provider model requests web_search, claudish: + * 1. Extracts the search query from tool arguments + * 2. Calls the local SearXNG instance + * 3. Returns formatted results as a text block in the stream + * + * IMPORTANT: executeWebSearchSync must not block the SSE stream. + * It races SearXNG against a 3-second deadline and returns a + * fallback message immediately if SearXNG is unreachable. + */ + +import { log } from "../../logger.js"; + +const SEARXNG_URL = process.env.SEARXNG_URL || "http://search.myia.io"; + +export interface SearchResult { + title: string; + url: string; + snippet: string; +} + +/** + * Execute a web search via SearXNG. Non-throwing. + */ +async function fetchFromSearXNG(query: string, maxResults = 5): Promise { + const url = `${SEARXNG_URL}/search?q=${encodeURIComponent(query)}&format=json&categories=general`; + log(`[WebSearch] Executing: "${query}" via ${SEARXNG_URL}`); + + const response = await fetch(url, { + signal: AbortSignal.timeout(5000), + headers: { Accept: "application/json" }, + }); + + if (!response.ok) { + log(`[WebSearch] SearXNG returned HTTP ${response.status}`); + return []; + } + + const data = (await response.json()) as any; + const results: SearchResult[] = (data.results || []) + .slice(0, maxResults) + .map((r: any) => ({ + title: r.title || "", + url: r.url || "", + snippet: r.content || r.description || "", + })); + + log(`[WebSearch] Got ${results.length} results for "${query}"`); + return results; +} + +/** + * Stream-safe web search: races SearXNG against a short deadline. + * Returns formatted results text, never throws, never blocks > deadline. + */ +export async function executeWebSearch(query: string, deadlineMs = 3000): Promise { + try { + const results = await Promise.race([ + fetchFromSearXNG(query), + new Promise((resolve) => setTimeout(() => resolve(null), deadlineMs)), + ]); + + if (results === null) { + log(`[WebSearch] Timed out after ${deadlineMs}ms for "${query}"`); + return `[Web search for "${query}" timed out. SearXNG did not respond within ${deadlineMs / 1000}s.]`; + } + + return formatSearchResults(query, results); + } catch (err: any) { + log(`[WebSearch] Error: ${err.message}`); + return `[Web search for "${query}" failed: ${err.message}]`; + } +} + +/** + * Format search results as a text block for injection into the stream. + */ +export function formatSearchResults(query: string, results: SearchResult[]): string { + if (results.length === 0) { + return `[Web search for "${query}" returned no results. The search service may be unavailable.]`; + } + + const lines = [`[Web search results for "${query}"]\n`]; + for (let i = 0; i < results.length; i++) { + const r = results[i]; + lines.push(`${i + 1}. **${r.title}**`); + lines.push(` ${r.url}`); + if (r.snippet) { + lines.push(` ${r.snippet}`); + } + lines.push(""); + } + return lines.join("\n"); +} + +/** + * Extract the search query from a web_search tool call's arguments JSON. + */ +export function extractSearchQuery(argsJson: string): string { + try { + const args = JSON.parse(argsJson); + return args.query || args.q || args.search_query || args.keyword || ""; + } catch { + // If args aren't valid JSON, try to extract from raw string + const match = argsJson.match(/"query"\s*:\s*"([^"]+)"/); + if (match) return match[1]; + const match2 = argsJson.match(/"q"\s*:\s*"([^"]+)"/); + if (match2) return match2[1]; + return ""; + } +} diff --git a/packages/cli/src/proxy-server.ts b/packages/cli/src/proxy-server.ts index 37da0350..1bee36d6 100644 --- a/packages/cli/src/proxy-server.ts +++ b/packages/cli/src/proxy-server.ts @@ -35,6 +35,130 @@ import { route, loadRoutingRules } from "./providers/routing-rules.js"; import { createHandlerForProvider } from "./providers/provider-profiles.js"; import { loadCustomEndpoints } from "./providers/custom-endpoints-loader.js"; import { getApiKey, loadConfig } from "./profile-config.js"; +import { executeWebSearch } from "./handlers/shared/web-search-executor.js"; + +/** + * Intercept WebSearch/WebFetch tool calls and execute them via SearXNG instead + * of forwarding to the provider. Returns a streaming response with SearXNG results. + * + * Claude Code's WebSearch tool sends a sub-agent request with a single user message: + * "Perform a web search for the query: " + * We intercept this, execute SearXNG, and return the results as text. + */ +async function interceptWebTools(c: any, body: any): Promise { + const messages = body.messages || []; + const isStreaming = body.stream === true; + + // Case 1: Sub-agent web search request (1 message, user role, starts with "Perform a web search") + if (messages.length === 1 && messages[0].role === "user") { + const text = typeof messages[0].content === "string" + ? messages[0].content + : Array.isArray(messages[0].content) + ? messages[0].content.map((b: any) => b.text || "").join("") + : ""; + + const searchMatch = text.match(/^Perform a web search for the query:\s*(.+)$/s); + if (searchMatch) { + const query = searchMatch[1].trim(); + log(`[WebTools] Intercepted sub-agent web search: "${query}"`); + const results = await executeWebSearch(query, 5000); + return buildTextResponse(body.model || "unknown", results, isStreaming); + } + + const fetchMatch = text.match(/^Perform a web fetch for the URL:\s*(.+)$/s); + if (fetchMatch) { + const url = fetchMatch[1].trim(); + log(`[WebTools] Intercepted sub-agent web fetch: "${url}"`); + let resultText: string; + try { + const searxngUrl = `${process.env.SEARXNG_URL || "http://search.myia.io"}/search?q=${encodeURIComponent(url)}&format=json&categories=general`; + const resp = await fetch(searxngUrl, { signal: AbortSignal.timeout(5000) }); + const data = await resp.json() as any; + const results = (data.results || []).slice(0, 3); + resultText = results.length > 0 + ? results.map((r: any) => `**${r.title}**\n${r.url}\n${r.content || ""}`).join("\n\n") + : `[No results found for URL: ${url}]`; + } catch (err: any) { + resultText = `[Web fetch for "${url}" failed: ${err.message}]`; + } + return buildTextResponse(body.model || "unknown", resultText, isStreaming); + } + } + + return null; +} + +function buildTextResponse(model: string, text: string, streaming: boolean): Response { + const encoder = new TextEncoder(); + const send = (event: string, data: any) => + encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); + + if (!streaming) { + return new Response(JSON.stringify({ + id: `msg_webtools_${Date.now()}`, + type: "message", + role: "assistant", + model, + content: [{ type: "text", text }], + stop_reason: "end_turn", + stop_sequence: null, + usage: { input_tokens: 0, output_tokens: 0 }, + }), { + headers: { + "Content-Type": "application/json", + "anthropic-version": "2023-06-01", + }, + }); + } + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(send("message_start", { + type: "message_start", + message: { + id: `msg_webtools_${Date.now()}`, + type: "message", + role: "assistant", + model, + content: [], + stop_reason: null, + stop_sequence: null, + usage: { input_tokens: 0, output_tokens: 0 }, + }, + })); + controller.enqueue(send("content_block_start", { + type: "content_block_start", + index: 0, + content_block: { type: "text", text: "" }, + })); + controller.enqueue(send("content_block_delta", { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text }, + })); + controller.enqueue(send("content_block_stop", { + type: "content_block_stop", + index: 0, + })); + controller.enqueue(send("message_delta", { + type: "message_delta", + delta: { stop_reason: "end_turn", stop_sequence: null }, + usage: { input_tokens: 0, output_tokens: 0 }, + })); + controller.enqueue(send("message_stop", { type: "message_stop" })); + controller.close(); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + "anthropic-version": "2023-06-01", + }, + }); +} export interface ProxyServerOptions { summarizeTools?: boolean; // Summarize tool descriptions for local models @@ -569,6 +693,18 @@ export async function createProxyServer( app.post("/v1/messages", async (c) => { try { const body = await c.req.json(); + + // Intercept web search/fetch sub-agent requests before handler selection. + // When Claude Code's WebSearch tool fires a sub-agent, the request contains + // a single user message like "Perform a web search for the query: X". + // We intercept it and execute via SearXNG instead of forwarding to the model. + try { + const webToolResponse = await interceptWebTools(c, body); + if (webToolResponse) return webToolResponse; + } catch (e: any) { + log(`[WebTools] Intercept error (falling through to normal handler): ${e.message}`); + } + const handler = await getHandlerForRequest(body.model); // Route