diff --git a/src/embedder.ts b/src/embedder.ts index bcbbaa7..497f68b 100644 --- a/src/embedder.ts +++ b/src/embedder.ts @@ -511,12 +511,73 @@ export class Embedder { return /rate.limit|quota|too many requests|insufficient.*credit|429|503.*overload/i.test(msg); } + /** + * Detect if the configured baseURL points to a local Ollama instance. + * Ollama's HTTP server does not properly handle AbortController signals through + * the OpenAI SDK's HTTP client, causing long-lived sockets that don't close + * when the embedding pipeline times out. For Ollama we use native fetch instead. + */ + private isOllamaProvider(): boolean { + if (!this._baseURL) return false; + return /localhost:11434|127\.0\.0\.1:11434|\/ollama\b/i.test(this._baseURL); + } + + /** + * Call embeddings.create using native fetch (bypasses OpenAI SDK). + * Used exclusively for Ollama endpoints where AbortController must work + * correctly to avoid long-lived stalled sockets. + */ + private async embedWithNativeFetch(payload: any, signal?: AbortSignal): Promise { + if (!this._baseURL) { + throw new Error("embedWithNativeFetch requires a baseURL"); + } + // Ollama's embeddings endpoint is at /v1/embeddings (OpenAI-compatible) + const endpoint = this._baseURL.replace(/\/$/, "") + "/embeddings"; + + const apiKey = this.clients[0]?.apiKey ?? "ollama"; + + const response = await fetch(endpoint, { + method: "POST", + headers: { + "Content-Type": "application/json", + "Authorization": `Bearer ${apiKey}`, + }, + body: JSON.stringify(payload), + signal: signal, + }); + + if (!response.ok) { + const body = await response.text().catch(() => ""); + throw new Error(`Ollama embedding failed: ${response.status} ${response.statusText} ??${body.slice(0, 200)}`); + } + + const data = await response.json(); + return data; // OpenAI-compatible shape: { data: [{ embedding: number[] }] } + } + /** * Call embeddings.create with automatic key rotation on rate-limit errors. * Tries each key in the pool at most once before giving up. * Accepts an optional AbortSignal to support true request cancellation. + * + * For Ollama endpoints, native fetch is used instead of the OpenAI SDK + * because AbortController does not reliably abort Ollama's HTTP connections + * through the SDK's HTTP client on Node.js. */ private async embedWithRetry(payload: any, signal?: AbortSignal): Promise { + // Use native fetch for Ollama to ensure proper AbortController support + if (this.isOllamaProvider()) { + try { + return await this.embedWithNativeFetch(payload, signal); + } catch (error) { + if (error instanceof Error && error.name === 'AbortError') { + throw error; + } + // Ollama errors bubble up without retry (Ollama doesn't rate-limit locally) + throw error; + } + } + const maxAttempts = this.clients.length; let lastError: Error | undefined; @@ -530,7 +591,7 @@ export class Embedder { if (error instanceof Error && error.name === 'AbortError') { throw error; } - + lastError = error instanceof Error ? error : new Error(String(error)); if (this.isRateLimitError(error) && attempt < maxAttempts - 1) { @@ -560,10 +621,30 @@ export class Embedder { } /** Wrap a single embedding operation with a global timeout via AbortSignal. */ - private withTimeout(promiseFactory: (signal: AbortSignal) => Promise, _label: string): Promise { + private withTimeout(promiseFactory: (signal: AbortSignal) => Promise, _label: string, externalSignal?: AbortSignal): Promise { const controller = new AbortController(); const timeoutId = setTimeout(() => controller.abort(), EMBED_TIMEOUT_MS); - return promiseFactory(controller.signal).finally(() => clearTimeout(timeoutId)); + + // If caller passes an external signal, merge it with the internal timeout controller. + // Either signal aborting will cancel the promise. + let unsubscribe: (() => void) | undefined; + if (externalSignal) { + if (externalSignal.aborted) { + clearTimeout(timeoutId); + return Promise.reject(externalSignal.reason ?? new Error("aborted")); + } + const handler = () => { + controller.abort(); + clearTimeout(timeoutId); + }; + externalSignal.addEventListener("abort", handler, { once: true }); + unsubscribe = () => externalSignal.removeEventListener("abort", handler); + } + + return promiseFactory(controller.signal).finally(() => { + clearTimeout(timeoutId); + unsubscribe?.(); + }); } // -------------------------------------------------------------------------- @@ -589,24 +670,24 @@ export class Embedder { // Task-aware API // -------------------------------------------------------------------------- - async embedQuery(text: string): Promise { - return this.withTimeout((signal) => this.embedSingle(text, this._taskQuery, 0, signal), "embedQuery"); + async embedQuery(text: string, signal?: AbortSignal): Promise { + return this.withTimeout((sig) => this.embedSingle(text, this._taskQuery, 0, sig), "embedQuery", signal); } - async embedPassage(text: string): Promise { - return this.withTimeout((signal) => this.embedSingle(text, this._taskPassage, 0, signal), "embedPassage"); + async embedPassage(text: string, signal?: AbortSignal): Promise { + return this.withTimeout((sig) => this.embedSingle(text, this._taskPassage, 0, sig), "embedPassage", signal); } // Note: embedBatchQuery/embedBatchPassage are NOT wrapped with withTimeout because // they handle multiple texts in a single API call. The timeout would fire after // EMBED_TIMEOUT_MS regardless of how many texts succeed. Individual text embedding // within the batch is protected by the SDK's own timeout handling. - async embedBatchQuery(texts: string[]): Promise { - return this.embedMany(texts, this._taskQuery); + async embedBatchQuery(texts: string[], signal?: AbortSignal): Promise { + return this.embedMany(texts, this._taskQuery, signal); } - async embedBatchPassage(texts: string[]): Promise { - return this.embedMany(texts, this._taskPassage); + async embedBatchPassage(texts: string[], signal?: AbortSignal): Promise { + return this.embedMany(texts, this._taskPassage, signal); } // -------------------------------------------------------------------------- @@ -775,7 +856,7 @@ export class Embedder { } } - private async embedMany(texts: string[], task?: string): Promise { + private async embedMany(texts: string[], task?: string, signal?: AbortSignal): Promise { if (!texts || texts.length === 0) { return []; } @@ -797,7 +878,8 @@ export class Embedder { try { const response = await this.embedWithRetry( - this.buildPayload(validTexts, task) + this.buildPayload(validTexts, task), + signal, ); // Create result array with proper length @@ -838,7 +920,7 @@ export class Embedder { // Embed all chunks in parallel, then average. const embeddings = await Promise.all( - chunkResult.chunks.map((chunk) => this.embedSingle(chunk, task)) + chunkResult.chunks.map((chunk) => this.embedSingle(chunk, task, 0, signal)) ); const avgEmbedding = embeddings.reduce( diff --git a/test/cjk-recursion-regression.test.mjs b/test/cjk-recursion-regression.test.mjs index 63ea837..247e7c1 100644 --- a/test/cjk-recursion-regression.test.mjs +++ b/test/cjk-recursion-regression.test.mjs @@ -236,6 +236,89 @@ async function testBatchEmbeddingStillWorks() { console.log(" PASSED\n"); } +async function testOllamaAbortWithNativeFetch() { + console.log("Test 8: Ollama native fetch respects external AbortSignal (PR354 fix regression)"); + + // Author's analysis: the previous test used withServer() on a random port but hardcoded + // http://127.0.0.1:11434/v1 for the Embedder — so the request always hit "connection refused" + // immediately and never touched the slow handler. This test fixes that by: + // 1. Binding the mock server directly to 127.0.0.1:11434 (so isOllamaProvider() is true) + // 2. Delaying the response by 5 seconds + // 3. Passing an external AbortSignal that fires after 2 seconds + // 4. Asserting total time ≈ 2s (proving abort interrupted the slow request) + + const SLOW_DELAY_MS = 5_000; + const ABORT_AFTER_MS = 2_000; + const DIMS = 1024; + + const server = http.createServer((req, res) => { + if (req.url === "/v1/embeddings" && req.method === "POST") { + const timer = setTimeout(() => { + if (res.writableEnded) return; // already aborted + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + data: [{ embedding: Array.from({ length: DIMS }, () => 0.1), index: 0 }] + })); + }, SLOW_DELAY_MS); + req.on("aborted", () => clearTimeout(timer)); + return; + } + res.writeHead(404); + res.end("not found"); + }); + + // Bind directly to 127.0.0.1:11434 so isOllamaProvider() returns true + await new Promise((resolve) => server.listen(11434, "127.0.0.1", resolve)); + + try { + const embedder = new Embedder({ + provider: "openai-compatible", + apiKey: "test-key", + model: "mxbai-embed-large", + baseURL: "http://127.0.0.1:11434/v1", + dimensions: DIMS, + }); + + assert.equal( + embedder.isOllamaProvider ? embedder.isOllamaProvider() : false, + true, + "isOllamaProvider should return true for 127.0.0.1:11434" + ); + + const start = Date.now(); + const controller = new AbortController(); + const abortTimer = setTimeout(() => controller.abort(), ABORT_AFTER_MS); + + let errorCaught; + try { + // Pass external AbortSignal — should interrupt the 5-second slow response at ~2s + await embedder.embedPassage("abort test probe", controller.signal); + } catch (e) { + errorCaught = e; + } + + clearTimeout(abortTimer); + const elapsed = Date.now() - start; + + assert.ok(errorCaught, "embedPassage should throw (abort or timeout)"); + const msg = errorCaught instanceof Error ? errorCaught.message : String(errorCaught); + assert.ok( + /timed out|abort|ollama|ECONNREFUSED/i.test(msg), + `Expected abort/timeout error, got: ${msg}` + ); + + // If abort works: elapsed ≈ 2000ms. If abort fails: elapsed ≈ 5000ms. + assert.ok( + elapsed < SLOW_DELAY_MS * 0.75, + `Expected abort ~${ABORT_AFTER_MS}ms, got ${elapsed}ms — abort did NOT interrupt slow request` + ); + + console.log(` PASSED (aborted in ${elapsed}ms < ${SLOW_DELAY_MS}ms threshold)\n`); + } finally { + await new Promise((resolve) => server.close(resolve)); + } +} + async function run() { console.log("Running regression tests for PR #238...\n"); await testSingleChunkFallbackTerminates(); @@ -245,6 +328,7 @@ async function run() { await testSmallContextChunking(); await testTimeoutAbortPropagation(); await testBatchEmbeddingStillWorks(); + await testOllamaAbortWithNativeFetch(); console.log("All regression tests passed!"); } diff --git a/test/embedder-ollama-abort.test.mjs b/test/embedder-ollama-abort.test.mjs new file mode 100644 index 0000000..73a55f2 --- /dev/null +++ b/test/embedder-ollama-abort.test.mjs @@ -0,0 +1,99 @@ +import assert from "node:assert/strict"; +import http from "node:http"; +import { test } from "node:test"; + +import jitiFactory from "jiti"; + +const jiti = jitiFactory(import.meta.url, { interopDefault: true }); +const { Embedder } = jiti("../src/embedder.ts"); + +/** + * Test: Ollama native fetch correctly aborts a slow HTTP request. + * + * Root cause (Issue #361 / PR #383): + * OpenAI SDK's HTTP client does not reliably abort Ollama TCP connections + * when AbortController.abort() fires in Node.js, causing stalled sockets + * that hang until the gateway-level timeout. + * + * Fix: For Ollama endpoints (localhost:11434), use Node.js native fetch + * instead of the OpenAI SDK. Native fetch properly closes TCP on abort. + * + * This test verifies the fix by: + * 1. Mocking a slow Ollama server on 127.0.0.1:11434 (5s delay) + * 2. Calling embedPassage with an AbortSignal that fires after 2s + * 3. Asserting total time ≈ 2s (not 5s) — proving abort interrupted the request + * + * Note: The mock server is bound to 127.0.0.1:11434 (not a random port) so that + * isOllamaProvider() returns true and the native fetch path is exercised. + */ +test("Ollama embedWithNativeFetch aborts slow request within expected time", async () => { + const SLOW_DELAY_MS = 5_000; + const ABORT_AFTER_MS = 2_000; + const DIMS = 1024; + + const server = http.createServer((req, res) => { + if (req.url === "/v1/embeddings" && req.method === "POST") { + const timer = setTimeout(() => { + if (res.writableEnded) return; // already aborted + res.writeHead(200, { "Content-Type": "application/json" }); + res.end(JSON.stringify({ + data: [{ embedding: Array.from({ length: DIMS }, () => 0.1), index: 0 }] + })); + }, SLOW_DELAY_MS); + req.on("aborted", () => clearTimeout(timer)); + return; + } + res.writeHead(404); + res.end("not found"); + }); + + // Bind to 127.0.0.1:11434 so isOllamaProvider() returns true → native fetch path + await new Promise((resolve) => server.listen(11434, "127.0.0.1", resolve)); + + try { + const embedder = new Embedder({ + provider: "openai-compatible", + apiKey: "test-key", + model: "mxbai-embed-large", + baseURL: "http://127.0.0.1:11434/v1", + dimensions: DIMS, + }); + + assert.ok( + embedder.isOllamaProvider(), + "isOllamaProvider() should return true for http://127.0.0.1:11434", + ); + + const start = Date.now(); + const controller = new AbortController(); + const abortTimer = setTimeout(() => controller.abort(), ABORT_AFTER_MS); + + let errorCaught; + try { + await embedder.embedPassage("abort test probe", controller.signal); + assert.fail("embedPassage should have thrown"); + } catch (e) { + errorCaught = e; + } + + clearTimeout(abortTimer); + const elapsed = Date.now() - start; + + assert.ok(errorCaught, "embedPassage should have thrown (abort or timeout)"); + const msg = errorCaught instanceof Error ? errorCaught.message : String(errorCaught); + assert.ok( + /timed out|abort|ollama/i.test(msg), + `Expected abort/timeout/Ollama error, got: ${msg}`, + ); + + // Elapsed time must be close to ABORT_AFTER_MS, NOT SLOW_DELAY_MS. + // If abort worked: elapsed ≈ 2000ms. + // If abort failed: elapsed ≈ 5000ms (waited for slow response). + assert.ok( + elapsed < SLOW_DELAY_MS * 0.75, + `Expected abort ~${ABORT_AFTER_MS}ms, got ${elapsed}ms — abort did NOT interrupt slow request`, + ); + } finally { + await new Promise((resolve) => server.close(resolve)); + } +});