Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion packages/cli/src/handlers/shared/stream-parsers/openai-sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
type ToolSchema,
} from "../tool-call-recovery.js";
import { isWebSearchToolCall, warnWebSearchUnsupported } from "../web-search-detector.js";
import { executeWebSearch, extractSearchQuery } from "../web-search-executor.js";

export interface StreamingState {
usage: any;
Expand All @@ -39,6 +40,7 @@ export interface ToolState {
closed: boolean;
arguments: string; // Accumulated JSON arguments string
buffered: boolean; // Whether we're buffering args until tool call completes
suppressed?: boolean; // Whether this tool call was suppressed (e.g. web search intercepted)
}

/**
Expand Down Expand Up @@ -195,13 +197,78 @@ export function createStreamingResponseHandler(
}
}

// Detect GLM <searchWeb> tags in accumulated text and execute via SearXNG.
// GLM models emit <searchWeb>query</searchWeb> when they want to search.
const searchWebRegex = /<searchWeb>([\s\S]*?)<\/searchWeb>/g;
let searchMatch: RegExpExecArray | null;
const searchQueries: string[] = [];
let cleanText = state.accumulatedText;
while ((searchMatch = searchWebRegex.exec(state.accumulatedText)) !== null) {
searchQueries.push(searchMatch[1].trim());
}
if (searchQueries.length > 0) {
log(`[Streaming] Found ${searchQueries.length} <searchWeb> tag(s) in text`);
cleanText = state.accumulatedText.replace(searchWebRegex, "").trim();
// Strip the search tags from output if text block is still open
// Results will be appended after the search queries
}

if (state.reasoningStarted) {
send("content_block_stop", { type: "content_block_stop", index: state.reasoningIdx });
}
if (state.textStarted) {
send("content_block_stop", { type: "content_block_stop", index: state.textIdx });
}

// Handle suppressed web search tool calls — execute via SearXNG and inject results as text.
for (const t of Array.from(state.tools.values())) {
if (t.suppressed && !t.closed) {
const query = extractSearchQuery(t.arguments);
log(`[Streaming] Executing suppressed web search: "${query}"`);
const results = await executeWebSearch(query);
log(`[Streaming] Web search results: ${results.length} chars`);

// Inject results as a text block
if (!state.textStarted) {
state.textIdx = state.curIdx++;
send("content_block_start", {
type: "content_block_start",
index: state.textIdx,
content_block: { type: "text", text: "" },
});
state.textStarted = true;
}
send("content_block_delta", {
type: "content_block_delta",
index: state.textIdx,
delta: { type: "text_delta", text: results },
});
t.closed = true;
}
}

// Execute <searchWeb> queries found in accumulated text
if (searchQueries.length > 0) {
for (const query of searchQueries) {
log(`[Streaming] Executing <searchWeb> query: "${query}"`);
const results = await executeWebSearch(query);
if (!state.textStarted) {
state.textIdx = state.curIdx++;
send("content_block_start", {
type: "content_block_start",
index: state.textIdx,
content_block: { type: "text", text: "" },
});
state.textStarted = true;
}
send("content_block_delta", {
type: "content_block_delta",
index: state.textIdx,
delta: { type: "text_delta", text: results },
});
}
}

// Handle buffered-but-unsent structured tool calls.
// Some models (e.g., Gemini via LiteLLM) send tool calls with finish_reason="stop"
// instead of "tool_calls", so the normal validation path (line ~695) is never reached.
Expand Down Expand Up @@ -279,6 +346,27 @@ export function createStreamingResponseHandler(
}
}

// Handle empty response — no text blocks and no tool calls were started.
// This can happen with some providers that return no content at all.
if (!state.textStarted && !Array.from(state.tools.values()).some((t) => t.started)) {
log(`[Streaming] Empty response — no text or tool blocks started, injecting error message`);
const emptyIdx = state.curIdx++;
send("content_block_start", {
type: "content_block_start",
index: emptyIdx,
content_block: { type: "text", text: "" },
});
send("content_block_delta", {
type: "content_block_delta",
index: emptyIdx,
delta: {
type: "text_delta",
text: "[Error: The model returned an empty response. Try compacting the conversation or reducing the context size.]",
},
});
send("content_block_stop", { type: "content_block_stop", index: emptyIdx });
}

if (middlewareManager) {
await middlewareManager.afterStreamComplete(target, streamMetadata);
}
Expand Down Expand Up @@ -506,7 +594,9 @@ export function createStreamingResponseHandler(
};
state.tools.set(idx, t);
if (isWebSearchToolCall(restoredName)) {
warnWebSearchUnsupported(restoredName, target);
log(`[Streaming] Web search tool call '${restoredName}' detected — intercepting via SearXNG`);
t.suppressed = true;
t.buffered = true;
}
}
// Only send content_block_start immediately if NOT buffering
Expand Down
114 changes: 114 additions & 0 deletions packages/cli/src/handlers/shared/web-search-executor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/**
* Web search executor — intercepts web_search tool calls from providers
* (z.ai, GLM) and executes them via SearXNG.
*
* When a provider model requests web_search, claudish:
* 1. Extracts the search query from tool arguments
* 2. Calls the local SearXNG instance
* 3. Returns formatted results as a text block in the stream
*
* IMPORTANT: executeWebSearchSync must not block the SSE stream.
* It races SearXNG against a 3-second deadline and returns a
* fallback message immediately if SearXNG is unreachable.
*/

import { log } from "../../logger.js";

const SEARXNG_URL = process.env.SEARXNG_URL || "http://search.myia.io";

export interface SearchResult {
title: string;
url: string;
snippet: string;
}

/**
* Execute a web search via SearXNG. Non-throwing.
*/
async function fetchFromSearXNG(query: string, maxResults = 5): Promise<SearchResult[]> {
const url = `${SEARXNG_URL}/search?q=${encodeURIComponent(query)}&format=json&categories=general`;
log(`[WebSearch] Executing: "${query}" via ${SEARXNG_URL}`);

const response = await fetch(url, {
signal: AbortSignal.timeout(5000),
headers: { Accept: "application/json" },
});

if (!response.ok) {
log(`[WebSearch] SearXNG returned HTTP ${response.status}`);
return [];
}

const data = (await response.json()) as any;
const results: SearchResult[] = (data.results || [])
.slice(0, maxResults)
.map((r: any) => ({
title: r.title || "",
url: r.url || "",
snippet: r.content || r.description || "",
}));

log(`[WebSearch] Got ${results.length} results for "${query}"`);
return results;
}

/**
* Stream-safe web search: races SearXNG against a short deadline.
* Returns formatted results text, never throws, never blocks > deadline.
*/
export async function executeWebSearch(query: string, deadlineMs = 3000): Promise<string> {
try {
const results = await Promise.race([
fetchFromSearXNG(query),
new Promise<null>((resolve) => setTimeout(() => resolve(null), deadlineMs)),
]);

if (results === null) {
log(`[WebSearch] Timed out after ${deadlineMs}ms for "${query}"`);
return `[Web search for "${query}" timed out. SearXNG did not respond within ${deadlineMs / 1000}s.]`;
}

return formatSearchResults(query, results);
} catch (err: any) {
log(`[WebSearch] Error: ${err.message}`);
return `[Web search for "${query}" failed: ${err.message}]`;
}
}

/**
* Format search results as a text block for injection into the stream.
*/
export function formatSearchResults(query: string, results: SearchResult[]): string {
if (results.length === 0) {
return `[Web search for "${query}" returned no results. The search service may be unavailable.]`;
}

const lines = [`[Web search results for "${query}"]\n`];
for (let i = 0; i < results.length; i++) {
const r = results[i];
lines.push(`${i + 1}. **${r.title}**`);
lines.push(` ${r.url}`);
if (r.snippet) {
lines.push(` ${r.snippet}`);
}
lines.push("");
}
return lines.join("\n");
}

/**
* Extract the search query from a web_search tool call's arguments JSON.
*/
export function extractSearchQuery(argsJson: string): string {
try {
const args = JSON.parse(argsJson);
return args.query || args.q || args.search_query || args.keyword || "";
} catch {
// If args aren't valid JSON, try to extract from raw string
const match = argsJson.match(/"query"\s*:\s*"([^"]+)"/);
if (match) return match[1];
const match2 = argsJson.match(/"q"\s*:\s*"([^"]+)"/);
if (match2) return match2[1];
return "";
}
}
136 changes: 136 additions & 0 deletions packages/cli/src/proxy-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,130 @@ import { route, loadRoutingRules } from "./providers/routing-rules.js";
import { createHandlerForProvider } from "./providers/provider-profiles.js";
import { loadCustomEndpoints } from "./providers/custom-endpoints-loader.js";
import { getApiKey, loadConfig } from "./profile-config.js";
import { executeWebSearch } from "./handlers/shared/web-search-executor.js";

/**
* Intercept WebSearch/WebFetch tool calls and execute them via SearXNG instead
* of forwarding to the provider. Returns a streaming response with SearXNG results.
*
* Claude Code's WebSearch tool sends a sub-agent request with a single user message:
* "Perform a web search for the query: <query>"
* We intercept this, execute SearXNG, and return the results as text.
*/
async function interceptWebTools(c: any, body: any): Promise<Response | null> {
const messages = body.messages || [];
const isStreaming = body.stream === true;

// Case 1: Sub-agent web search request (1 message, user role, starts with "Perform a web search")
if (messages.length === 1 && messages[0].role === "user") {
const text = typeof messages[0].content === "string"
? messages[0].content
: Array.isArray(messages[0].content)
? messages[0].content.map((b: any) => b.text || "").join("")
: "";

const searchMatch = text.match(/^Perform a web search for the query:\s*(.+)$/s);
if (searchMatch) {
const query = searchMatch[1].trim();
log(`[WebTools] Intercepted sub-agent web search: "${query}"`);
const results = await executeWebSearch(query, 5000);
return buildTextResponse(body.model || "unknown", results, isStreaming);
}

const fetchMatch = text.match(/^Perform a web fetch for the URL:\s*(.+)$/s);
if (fetchMatch) {
const url = fetchMatch[1].trim();
log(`[WebTools] Intercepted sub-agent web fetch: "${url}"`);
let resultText: string;
try {
const searxngUrl = `${process.env.SEARXNG_URL || "http://search.myia.io"}/search?q=${encodeURIComponent(url)}&format=json&categories=general`;
const resp = await fetch(searxngUrl, { signal: AbortSignal.timeout(5000) });
const data = await resp.json() as any;
const results = (data.results || []).slice(0, 3);
resultText = results.length > 0
? results.map((r: any) => `**${r.title}**\n${r.url}\n${r.content || ""}`).join("\n\n")
: `[No results found for URL: ${url}]`;
} catch (err: any) {
resultText = `[Web fetch for "${url}" failed: ${err.message}]`;
}
return buildTextResponse(body.model || "unknown", resultText, isStreaming);
}
}

return null;
}

function buildTextResponse(model: string, text: string, streaming: boolean): Response {
const encoder = new TextEncoder();
const send = (event: string, data: any) =>
encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);

if (!streaming) {
return new Response(JSON.stringify({
id: `msg_webtools_${Date.now()}`,
type: "message",
role: "assistant",
model,
content: [{ type: "text", text }],
stop_reason: "end_turn",
stop_sequence: null,
usage: { input_tokens: 0, output_tokens: 0 },
}), {
headers: {
"Content-Type": "application/json",
"anthropic-version": "2023-06-01",
},
});
}

const stream = new ReadableStream({
start(controller) {
controller.enqueue(send("message_start", {
type: "message_start",
message: {
id: `msg_webtools_${Date.now()}`,
type: "message",
role: "assistant",
model,
content: [],
stop_reason: null,
stop_sequence: null,
usage: { input_tokens: 0, output_tokens: 0 },
},
}));
controller.enqueue(send("content_block_start", {
type: "content_block_start",
index: 0,
content_block: { type: "text", text: "" },
}));
controller.enqueue(send("content_block_delta", {
type: "content_block_delta",
index: 0,
delta: { type: "text_delta", text },
}));
controller.enqueue(send("content_block_stop", {
type: "content_block_stop",
index: 0,
}));
controller.enqueue(send("message_delta", {
type: "message_delta",
delta: { stop_reason: "end_turn", stop_sequence: null },
usage: { input_tokens: 0, output_tokens: 0 },
}));
controller.enqueue(send("message_stop", { type: "message_stop" }));
controller.close();
},
});

return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"anthropic-version": "2023-06-01",
},
});
}

export interface ProxyServerOptions {
summarizeTools?: boolean; // Summarize tool descriptions for local models
Expand Down Expand Up @@ -569,6 +693,18 @@ export async function createProxyServer(
app.post("/v1/messages", async (c) => {
try {
const body = await c.req.json();

// Intercept web search/fetch sub-agent requests before handler selection.
// When Claude Code's WebSearch tool fires a sub-agent, the request contains
// a single user message like "Perform a web search for the query: X".
// We intercept it and execute via SearXNG instead of forwarding to the model.
try {
const webToolResponse = await interceptWebTools(c, body);
if (webToolResponse) return webToolResponse;
} catch (e: any) {
log(`[WebTools] Intercept error (falling through to normal handler): ${e.message}`);
}

const handler = await getHandlerForRequest(body.model);

// Route
Expand Down