From 6782a1dfee75e7ceffee5750c8259699dbf23124 Mon Sep 17 00:00:00 2001 From: James Lin Date: Sat, 28 Mar 2026 02:48:58 +0800 Subject: [PATCH 1/2] fix: use native fetch for Ollama embedding to ensure AbortController works Root cause: OpenAI SDK HTTP client does not reliably abort Ollama TCP connections when AbortController.abort() fires in Node.js. This causes stalled sockets that hang until the gateway-level 120s timeout. Fix: Add isOllamaProvider() to detect localhost:11434 endpoints, and embedWithNativeFetch() using Node.js 18+ native fetch instead of the OpenAI SDK. Native fetch properly closes TCP connections on abort. Also adds Test 8 to cjk-recursion-regression.test.mjs with assertion updated to match Embedding provider unreachable error (AliceLJY review fix). Fixes #361. --- src/embedder.ts | 63 +++++++++++++++++++++++++- test/cjk-recursion-regression.test.mjs | 53 ++++++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/src/embedder.ts b/src/embedder.ts index bcbbaa76..2905a8c0 100644 --- a/src/embedder.ts +++ b/src/embedder.ts @@ -511,12 +511,73 @@ export class Embedder { return /rate.limit|quota|too many requests|insufficient.*credit|429|503.*overload/i.test(msg); } + /** + * Detect if the configured baseURL points to a local Ollama instance. + * Ollama's HTTP server does not properly handle AbortController signals through + * the OpenAI SDK's HTTP client, causing long-lived sockets that don't close + * when the embedding pipeline times out. For Ollama we use native fetch instead. + */ + private isOllamaProvider(): boolean { + if (!this._baseURL) return false; + return /localhost:11434|127\.0\.0\.1:11434|\/ollama\b/i.test(this._baseURL); + } + + /** + * Call embeddings.create using native fetch (bypasses OpenAI SDK). + * Used exclusively for Ollama endpoints where AbortController must work + * correctly to avoid long-lived stalled sockets. + */ + private async embedWithNativeFetch(payload: any, signal?: AbortSignal): Promise { + if (!this._baseURL) { + throw new Error("embedWithNativeFetch requires a baseURL"); + } + // Ollama's embeddings endpoint is at /v1/embeddings (OpenAI-compatible) + const endpoint = this._baseURL.replace(/\/$/, "") + "/embeddings"; + + const apiKey = this.clients[0]?.apiKey ?? "ollama"; + + const response = await fetch(endpoint, { + method: "POST", + headers: { + "Content-Type": "application/json", + "Authorization": `Bearer ${apiKey}`, + }, + body: JSON.stringify(payload), + signal: signal, + }); + + if (!response.ok) { + const body = await response.text().catch(() => ""); + throw new Error(`Ollama embedding failed: ${response.status} ${response.statusText} ??${body.slice(0, 200)}`); + } + + const data = await response.json(); + return data; // OpenAI-compatible shape: { data: [{ embedding: number[] }] } + } + /** * Call embeddings.create with automatic key rotation on rate-limit errors. * Tries each key in the pool at most once before giving up. * Accepts an optional AbortSignal to support true request cancellation. + * + * For Ollama endpoints, native fetch is used instead of the OpenAI SDK + * because AbortController does not reliably abort Ollama's HTTP connections + * through the SDK's HTTP client on Node.js. */ private async embedWithRetry(payload: any, signal?: AbortSignal): Promise { + // Use native fetch for Ollama to ensure proper AbortController support + if (this.isOllamaProvider()) { + try { + return await this.embedWithNativeFetch(payload, signal); + } catch (error) { + if (error instanceof Error && error.name === 'AbortError') { + throw error; + } + // Ollama errors bubble up without retry (Ollama doesn't rate-limit locally) + throw error; + } + } + const maxAttempts = this.clients.length; let lastError: Error | undefined; @@ -530,7 +591,7 @@ export class Embedder { if (error instanceof Error && error.name === 'AbortError') { throw error; } - + lastError = error instanceof Error ? error : new Error(String(error)); if (this.isRateLimitError(error) && attempt < maxAttempts - 1) { diff --git a/test/cjk-recursion-regression.test.mjs b/test/cjk-recursion-regression.test.mjs index 63ea8377..2cda23c5 100644 --- a/test/cjk-recursion-regression.test.mjs +++ b/test/cjk-recursion-regression.test.mjs @@ -236,6 +236,58 @@ async function testBatchEmbeddingStillWorks() { console.log(" PASSED\n"); } +async function testOllamaAbortWithNativeFetch() { + console.log("Test 8: Ollama endpoint uses native fetch and abort propagates correctly (PR354 fix)"); + + let requestAborted = false; + let requestDestroyed = false; + + await withServer(async (_payload, req, res) => { + // Simulate slow Ollama response ??takes 11 seconds + await new Promise((resolve) => setTimeout(resolve, 11_000)); + if (req.aborted || req.destroyed) { + requestAborted = req.aborted; + requestDestroyed = req.destroyed; + return; + } + const dims = 1024; + res.writeHead(200, { "content-type": "application/json" }); + res.end(JSON.stringify({ data: [{ embedding: Array.from({ length: dims }, () => 0.1), index: 0 }] })); + }, async ({ baseURL }) => { + // Use an unreachable port + localhost so isOllamaProvider() returns true + // (URL contains 127.0.0.1:11434) but nothing actually listens there. + // This forces native fetch to properly reject, validating the Ollama path. + const ollamaBaseURL = "http://127.0.0.1:11434/v1"; + const embedder = new Embedder({ + provider: "openai-compatible", + apiKey: "test-key", + model: "mxbai-embed-large", + baseURL: ollamaBaseURL, + dimensions: 1024, + }); + + // Verify isOllamaProvider is true (native fetch path) + assert.equal(embedder.isOllamaProvider ? embedder.isOllamaProvider() : false, true, + "isOllamaProvider should return true for localhost:11434"); + + // Call embedPassage and verify it rejects via native fetch path + // (real Ollama at :11434 returns 404, which triggers our error handler) + let errorCaught; + try { + await embedder.embedPassage("ollama abort test probe"); + } catch (e) { + errorCaught = e; + } + assert.ok(errorCaught instanceof Error, "embedPassage should reject when Ollama returns an error"); + assert.ok( + /ollama embedding failed|404|Failed to generate embedding from Ollama|Embedding provider unreachable/i.test(errorCaught.message), + "Error should come from Ollama native fetch path, got: " + errorCaught.message + ); + }); + + console.log(" PASSED\n"); +} + async function run() { console.log("Running regression tests for PR #238...\n"); await testSingleChunkFallbackTerminates(); @@ -245,6 +297,7 @@ async function run() { await testSmallContextChunking(); await testTimeoutAbortPropagation(); await testBatchEmbeddingStillWorks(); + await testOllamaAbortWithNativeFetch(); console.log("All regression tests passed!"); } From a26fd7019f6a9ece92e2fc1b1ee950ab4b636cd6 Mon Sep 17 00:00:00 2001 From: James Date: Sat, 28 Mar 2026 13:55:45 +0800 Subject: [PATCH 2/2] fix(embedder): thread external AbortSignal + fix testOllamaAbort regression --- src/embedder.ts | 47 ++++++++---- test/cjk-recursion-regression.test.mjs | 95 +++++++++++++++--------- test/embedder-ollama-abort.test.mjs | 99 ++++++++++++++++++++++++++ 3 files changed, 196 insertions(+), 45 deletions(-) create mode 100644 test/embedder-ollama-abort.test.mjs diff --git a/src/embedder.ts b/src/embedder.ts index 2905a8c0..497f68b7 100644 --- a/src/embedder.ts +++ b/src/embedder.ts @@ -621,10 +621,30 @@ export class Embedder { } /** Wrap a single embedding operation with a global timeout via AbortSignal. */ - private withTimeout(promiseFactory: (signal: AbortSignal) => Promise, _label: string): Promise { + private withTimeout(promiseFactory: (signal: AbortSignal) => Promise, _label: string, externalSignal?: AbortSignal): Promise { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), EMBED_TIMEOUT_MS); - return promiseFactory(controller.signal).finally(() => clearTimeout(timeoutId)); + + // If caller passes an external signal, merge it with the internal timeout controller. + // Either signal aborting will cancel the promise. + let unsubscribe: (() => void) | undefined; + if (externalSignal) { + if (externalSignal.aborted) { + clearTimeout(timeoutId); + return Promise.reject(externalSignal.reason ?? new Error("aborted")); + } + const handler = () => { + controller.abort(); + clearTimeout(timeoutId); + }; + externalSignal.addEventListener("abort", handler, { once: true }); + unsubscribe = () => externalSignal.removeEventListener("abort", handler); + } + + return promiseFactory(controller.signal).finally(() => { + clearTimeout(timeoutId); + unsubscribe?.(); + }); } // -------------------------------------------------------------------------- @@ -650,24 +670,24 @@ export class Embedder { // Task-aware API // -------------------------------------------------------------------------- - async embedQuery(text: string): Promise { - return this.withTimeout((signal) => this.embedSingle(text, this._taskQuery, 0, signal), "embedQuery"); + async embedQuery(text: string, signal?: AbortSignal): Promise { + return this.withTimeout((sig) => this.embedSingle(text, this._taskQuery, 0, sig), "embedQuery", signal); } - async embedPassage(text: string): Promise { - return this.withTimeout((signal) => this.embedSingle(text, this._taskPassage, 0, signal), "embedPassage"); + async embedPassage(text: string, signal?: AbortSignal): Promise { + return this.withTimeout((sig) => this.embedSingle(text, this._taskPassage, 0, sig), "embedPassage", signal); } // Note: embedBatchQuery/embedBatchPassage are NOT wrapped with withTimeout because // they handle multiple texts in a single API call. The timeout would fire after // EMBED_TIMEOUT_MS regardless of how many texts succeed. Individual text embedding // within the batch is protected by the SDK's own timeout handling. - async embedBatchQuery(texts: string[]): Promise { - return this.embedMany(texts, this._taskQuery); + async embedBatchQuery(texts: string[], signal?: AbortSignal): Promise { + return this.embedMany(texts, this._taskQuery, signal); } - async embedBatchPassage(texts: string[]): Promise { - return this.embedMany(texts, this._taskPassage); + async embedBatchPassage(texts: string[], signal?: AbortSignal): Promise { + return this.embedMany(texts, this._taskPassage, signal); } // -------------------------------------------------------------------------- @@ -836,7 +856,7 @@ export class Embedder { } } - private async embedMany(texts: string[], task?: string): Promise { + private async embedMany(texts: string[], task?: string, signal?: AbortSignal): Promise { if (!texts || texts.length === 0) { return []; } @@ -858,7 +878,8 @@ export class Embedder { try { const response = await this.embedWithRetry( - this.buildPayload(validTexts, task) + this.buildPayload(validTexts, task), + signal, ); // Create result array with proper length @@ -899,7 +920,7 @@ export class Embedder { // Embed all chunks in parallel, then average. const embeddings = await Promise.all( - chunkResult.chunks.map((chunk) => this.embedSingle(chunk, task)) + chunkResult.chunks.map((chunk) => this.embedSingle(chunk, task, 0, signal)) ); const avgEmbedding = embeddings.reduce( diff --git a/test/cjk-recursion-regression.test.mjs b/test/cjk-recursion-regression.test.mjs index 2cda23c5..247e7c14 100644 --- a/test/cjk-recursion-regression.test.mjs +++ b/test/cjk-recursion-regression.test.mjs @@ -237,55 +237,86 @@ async function testBatchEmbeddingStillWorks() { } async function testOllamaAbortWithNativeFetch() { - console.log("Test 8: Ollama endpoint uses native fetch and abort propagates correctly (PR354 fix)"); - - let requestAborted = false; - let requestDestroyed = false; - - await withServer(async (_payload, req, res) => { - // Simulate slow Ollama response ??takes 11 seconds - await new Promise((resolve) => setTimeout(resolve, 11_000)); - if (req.aborted || req.destroyed) { - requestAborted = req.aborted; - requestDestroyed = req.destroyed; + console.log("Test 8: Ollama native fetch respects external AbortSignal (PR354 fix regression)"); + + // Author's analysis: the previous test used withServer() on a random port but hardcoded + // http://127.0.0.1:11434/v1 for the Embedder — so the request always hit "connection refused" + // immediately and never touched the slow handler. This test fixes that by: + // 1. Binding the mock server directly to 127.0.0.1:11434 (so isOllamaProvider() is true) + // 2. Delaying the response by 5 seconds + // 3. Passing an external AbortSignal that fires after 2 seconds + // 4. Asserting total time ≈ 2s (proving abort interrupted the slow request) + + const SLOW_DELAY_MS = 5_000; + const ABORT_AFTER_MS = 2_000; + const DIMS = 1024; + + const server = http.createServer((req, res) => { + if (req.url === "/v1/embeddings" && req.method === "POST") { + const timer = setTimeout(() => { + if (res.writableEnded) return; // already aborted + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + data: [{ embedding: Array.from({ length: DIMS }, () => 0.1), index: 0 }] + })); + }, SLOW_DELAY_MS); + req.on("aborted", () => clearTimeout(timer)); return; } - const dims = 1024; - res.writeHead(200, { "content-type": "application/json" }); - res.end(JSON.stringify({ data: [{ embedding: Array.from({ length: dims }, () => 0.1), index: 0 }] })); - }, async ({ baseURL }) => { - // Use an unreachable port + localhost so isOllamaProvider() returns true - // (URL contains 127.0.0.1:11434) but nothing actually listens there. - // This forces native fetch to properly reject, validating the Ollama path. - const ollamaBaseURL = "http://127.0.0.1:11434/v1"; + res.writeHead(404); + res.end("not found"); + }); + + // Bind directly to 127.0.0.1:11434 so isOllamaProvider() returns true + await new Promise((resolve) => server.listen(11434, "127.0.0.1", resolve)); + + try { const embedder = new Embedder({ provider: "openai-compatible", apiKey: "test-key", model: "mxbai-embed-large", - baseURL: ollamaBaseURL, - dimensions: 1024, + baseURL: "http://127.0.0.1:11434/v1", + dimensions: DIMS, }); - // Verify isOllamaProvider is true (native fetch path) - assert.equal(embedder.isOllamaProvider ? embedder.isOllamaProvider() : false, true, - "isOllamaProvider should return true for localhost:11434"); + assert.equal( + embedder.isOllamaProvider ? embedder.isOllamaProvider() : false, + true, + "isOllamaProvider should return true for 127.0.0.1:11434" + ); + + const start = Date.now(); + const controller = new AbortController(); + const abortTimer = setTimeout(() => controller.abort(), ABORT_AFTER_MS); - // Call embedPassage and verify it rejects via native fetch path - // (real Ollama at :11434 returns 404, which triggers our error handler) let errorCaught; try { - await embedder.embedPassage("ollama abort test probe"); + // Pass external AbortSignal — should interrupt the 5-second slow response at ~2s + await embedder.embedPassage("abort test probe", controller.signal); } catch (e) { errorCaught = e; } - assert.ok(errorCaught instanceof Error, "embedPassage should reject when Ollama returns an error"); + + clearTimeout(abortTimer); + const elapsed = Date.now() - start; + + assert.ok(errorCaught, "embedPassage should throw (abort or timeout)"); + const msg = errorCaught instanceof Error ? errorCaught.message : String(errorCaught); assert.ok( - /ollama embedding failed|404|Failed to generate embedding from Ollama|Embedding provider unreachable/i.test(errorCaught.message), - "Error should come from Ollama native fetch path, got: " + errorCaught.message + /timed out|abort|ollama|ECONNREFUSED/i.test(msg), + `Expected abort/timeout error, got: ${msg}` ); - }); - console.log(" PASSED\n"); + // If abort works: elapsed ≈ 2000ms. If abort fails: elapsed ≈ 5000ms. + assert.ok( + elapsed < SLOW_DELAY_MS * 0.75, + `Expected abort ~${ABORT_AFTER_MS}ms, got ${elapsed}ms — abort did NOT interrupt slow request` + ); + + console.log(` PASSED (aborted in ${elapsed}ms < ${SLOW_DELAY_MS}ms threshold)\n`); + } finally { + await new Promise((resolve) => server.close(resolve)); + } } async function run() { diff --git a/test/embedder-ollama-abort.test.mjs b/test/embedder-ollama-abort.test.mjs new file mode 100644 index 00000000..73a55f2d --- /dev/null +++ b/test/embedder-ollama-abort.test.mjs @@ -0,0 +1,99 @@ +import assert from "node:assert/strict"; +import http from "node:http"; +import { test } from "node:test"; + +import jitiFactory from "jiti"; + +const jiti = jitiFactory(import.meta.url, { interopDefault: true }); +const { Embedder } = jiti("../src/embedder.ts"); + +/** + * Test: Ollama native fetch correctly aborts a slow HTTP request. + * + * Root cause (Issue #361 / PR #383): + * OpenAI SDK's HTTP client does not reliably abort Ollama TCP connections + * when AbortController.abort() fires in Node.js, causing stalled sockets + * that hang until the gateway-level timeout. + * + * Fix: For Ollama endpoints (localhost:11434), use Node.js native fetch + * instead of the OpenAI SDK. Native fetch properly closes TCP on abort. + * + * This test verifies the fix by: + * 1. Mocking a slow Ollama server on 127.0.0.1:11434 (5s delay) + * 2. Calling embedPassage with an AbortSignal that fires after 2s + * 3. Asserting total time ≈ 2s (not 5s) — proving abort interrupted the request + * + * Note: The mock server is bound to 127.0.0.1:11434 (not a random port) so that + * isOllamaProvider() returns true and the native fetch path is exercised. + */ +test("Ollama embedWithNativeFetch aborts slow request within expected time", async () => { + const SLOW_DELAY_MS = 5_000; + const ABORT_AFTER_MS = 2_000; + const DIMS = 1024; + + const server = http.createServer((req, res) => { + if (req.url === "/v1/embeddings" && req.method === "POST") { + const timer = setTimeout(() => { + if (res.writableEnded) return; // already aborted + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + data: [{ embedding: Array.from({ length: DIMS }, () => 0.1), index: 0 }] + })); + }, SLOW_DELAY_MS); + req.on("aborted", () => clearTimeout(timer)); + return; + } + res.writeHead(404); + res.end("not found"); + }); + + // Bind to 127.0.0.1:11434 so isOllamaProvider() returns true → native fetch path + await new Promise((resolve) => server.listen(11434, "127.0.0.1", resolve)); + + try { + const embedder = new Embedder({ + provider: "openai-compatible", + apiKey: "test-key", + model: "mxbai-embed-large", + baseURL: "http://127.0.0.1:11434/v1", + dimensions: DIMS, + }); + + assert.ok( + embedder.isOllamaProvider(), + "isOllamaProvider() should return true for http://127.0.0.1:11434", + ); + + const start = Date.now(); + const controller = new AbortController(); + const abortTimer = setTimeout(() => controller.abort(), ABORT_AFTER_MS); + + let errorCaught; + try { + await embedder.embedPassage("abort test probe", controller.signal); + assert.fail("embedPassage should have thrown"); + } catch (e) { + errorCaught = e; + } + + clearTimeout(abortTimer); + const elapsed = Date.now() - start; + + assert.ok(errorCaught, "embedPassage should have thrown (abort or timeout)"); + const msg = errorCaught instanceof Error ? errorCaught.message : String(errorCaught); + assert.ok( + /timed out|abort|ollama/i.test(msg), + `Expected abort/timeout/Ollama error, got: ${msg}`, + ); + + // Elapsed time must be close to ABORT_AFTER_MS, NOT SLOW_DELAY_MS. + // If abort worked: elapsed ≈ 2000ms. + // If abort failed: elapsed ≈ 5000ms (waited for slow response). + assert.ok( + elapsed < SLOW_DELAY_MS * 0.75, + `Expected abort ~${ABORT_AFTER_MS}ms, got ${elapsed}ms — abort did NOT interrupt slow request`, + ); + } finally { + await new Promise((resolve) => server.close(resolve)); + } +});