Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 96 additions & 14 deletions src/embedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,73 @@ export class Embedder {
return /rate.limit|quota|too many requests|insufficient.*credit|429|503.*overload/i.test(msg);
}

/**
* Detect if the configured baseURL points to a local Ollama instance.
* Ollama's HTTP server does not properly handle AbortController signals through
* the OpenAI SDK's HTTP client, causing long-lived sockets that don't close
* when the embedding pipeline times out. For Ollama we use native fetch instead.
*/
private isOllamaProvider(): boolean {
if (!this._baseURL) return false;
return /localhost:11434|127\.0\.0\.1:11434|\/ollama\b/i.test(this._baseURL);
}

/**
* Call embeddings.create using native fetch (bypasses OpenAI SDK).
* Used exclusively for Ollama endpoints where AbortController must work
* correctly to avoid long-lived stalled sockets.
*/
private async embedWithNativeFetch(payload: any, signal?: AbortSignal): Promise<any> {
if (!this._baseURL) {
throw new Error("embedWithNativeFetch requires a baseURL");
}
// Ollama's embeddings endpoint is at /v1/embeddings (OpenAI-compatible)
const endpoint = this._baseURL.replace(/\/$/, "") + "/embeddings";

const apiKey = this.clients[0]?.apiKey ?? "ollama";

const response = await fetch(endpoint, {
method: "POST",
headers: {
"Content-Type": "application/json",
"Authorization": `Bearer ${apiKey}`,
},
body: JSON.stringify(payload),
signal: signal,
});

if (!response.ok) {
const body = await response.text().catch(() => "");
throw new Error(`Ollama embedding failed: ${response.status} ${response.statusText} ??${body.slice(0, 200)}`);
}

const data = await response.json();
return data; // OpenAI-compatible shape: { data: [{ embedding: number[] }] }
}

/**
* Call embeddings.create with automatic key rotation on rate-limit errors.
* Tries each key in the pool at most once before giving up.
* Accepts an optional AbortSignal to support true request cancellation.
*
* For Ollama endpoints, native fetch is used instead of the OpenAI SDK
* because AbortController does not reliably abort Ollama's HTTP connections
* through the SDK's HTTP client on Node.js.
*/
private async embedWithRetry(payload: any, signal?: AbortSignal): Promise<any> {
// Use native fetch for Ollama to ensure proper AbortController support
if (this.isOllamaProvider()) {
try {
return await this.embedWithNativeFetch(payload, signal);
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
throw error;
}
// Ollama errors bubble up without retry (Ollama doesn't rate-limit locally)
throw error;
}
}

const maxAttempts = this.clients.length;
let lastError: Error | undefined;

Expand All @@ -530,7 +591,7 @@ export class Embedder {
if (error instanceof Error && error.name === 'AbortError') {
throw error;
}

lastError = error instanceof Error ? error : new Error(String(error));

if (this.isRateLimitError(error) && attempt < maxAttempts - 1) {
Expand Down Expand Up @@ -560,10 +621,30 @@ export class Embedder {
}

/** Wrap a single embedding operation with a global timeout via AbortSignal. */
private withTimeout<T>(promiseFactory: (signal: AbortSignal) => Promise<T>, _label: string): Promise<T> {
private withTimeout<T>(promiseFactory: (signal: AbortSignal) => Promise<T>, _label: string, externalSignal?: AbortSignal): Promise<T> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), EMBED_TIMEOUT_MS);
return promiseFactory(controller.signal).finally(() => clearTimeout(timeoutId));

// If caller passes an external signal, merge it with the internal timeout controller.
// Either signal aborting will cancel the promise.
let unsubscribe: (() => void) | undefined;
if (externalSignal) {
if (externalSignal.aborted) {
clearTimeout(timeoutId);
return Promise.reject(externalSignal.reason ?? new Error("aborted"));
}
const handler = () => {
controller.abort();
clearTimeout(timeoutId);
};
externalSignal.addEventListener("abort", handler, { once: true });
unsubscribe = () => externalSignal.removeEventListener("abort", handler);
}

return promiseFactory(controller.signal).finally(() => {
clearTimeout(timeoutId);
unsubscribe?.();
});
}

// --------------------------------------------------------------------------
Expand All @@ -589,24 +670,24 @@ export class Embedder {
// Task-aware API
// --------------------------------------------------------------------------

async embedQuery(text: string): Promise<number[]> {
return this.withTimeout((signal) => this.embedSingle(text, this._taskQuery, 0, signal), "embedQuery");
async embedQuery(text: string, signal?: AbortSignal): Promise<number[]> {
return this.withTimeout((sig) => this.embedSingle(text, this._taskQuery, 0, sig), "embedQuery", signal);
}

async embedPassage(text: string): Promise<number[]> {
return this.withTimeout((signal) => this.embedSingle(text, this._taskPassage, 0, signal), "embedPassage");
async embedPassage(text: string, signal?: AbortSignal): Promise<number[]> {
return this.withTimeout((sig) => this.embedSingle(text, this._taskPassage, 0, sig), "embedPassage", signal);
}

// Note: embedBatchQuery/embedBatchPassage are NOT wrapped with withTimeout because
// they handle multiple texts in a single API call. The timeout would fire after
// EMBED_TIMEOUT_MS regardless of how many texts succeed. Individual text embedding
// within the batch is protected by the SDK's own timeout handling.
async embedBatchQuery(texts: string[]): Promise<number[][]> {
return this.embedMany(texts, this._taskQuery);
async embedBatchQuery(texts: string[], signal?: AbortSignal): Promise<number[][]> {
return this.embedMany(texts, this._taskQuery, signal);
}

async embedBatchPassage(texts: string[]): Promise<number[][]> {
return this.embedMany(texts, this._taskPassage);
async embedBatchPassage(texts: string[], signal?: AbortSignal): Promise<number[][]> {
return this.embedMany(texts, this._taskPassage, signal);
}

// --------------------------------------------------------------------------
Expand Down Expand Up @@ -775,7 +856,7 @@ export class Embedder {
}
}

private async embedMany(texts: string[], task?: string): Promise<number[][]> {
private async embedMany(texts: string[], task?: string, signal?: AbortSignal): Promise<number[][]> {
if (!texts || texts.length === 0) {
return [];
}
Expand All @@ -797,7 +878,8 @@ export class Embedder {

try {
const response = await this.embedWithRetry(
this.buildPayload(validTexts, task)
this.buildPayload(validTexts, task),
signal,
);

// Create result array with proper length
Expand Down Expand Up @@ -838,7 +920,7 @@ export class Embedder {

// Embed all chunks in parallel, then average.
const embeddings = await Promise.all(
chunkResult.chunks.map((chunk) => this.embedSingle(chunk, task))
chunkResult.chunks.map((chunk) => this.embedSingle(chunk, task, 0, signal))
);

const avgEmbedding = embeddings.reduce(
Expand Down
84 changes: 84 additions & 0 deletions test/cjk-recursion-regression.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,89 @@ async function testBatchEmbeddingStillWorks() {
console.log(" PASSED\n");
}

async function testOllamaAbortWithNativeFetch() {
console.log("Test 8: Ollama native fetch respects external AbortSignal (PR354 fix regression)");

// Author's analysis: the previous test used withServer() on a random port but hardcoded
// http://127.0.0.1:11434/v1 for the Embedder — so the request always hit "connection refused"
// immediately and never touched the slow handler. This test fixes that by:
// 1. Binding the mock server directly to 127.0.0.1:11434 (so isOllamaProvider() is true)
// 2. Delaying the response by 5 seconds
// 3. Passing an external AbortSignal that fires after 2 seconds
// 4. Asserting total time ≈ 2s (proving abort interrupted the slow request)

const SLOW_DELAY_MS = 5_000;
const ABORT_AFTER_MS = 2_000;
const DIMS = 1024;

const server = http.createServer((req, res) => {
if (req.url === "/v1/embeddings" && req.method === "POST") {
const timer = setTimeout(() => {
if (res.writableEnded) return; // already aborted
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({
data: [{ embedding: Array.from({ length: DIMS }, () => 0.1), index: 0 }]
}));
}, SLOW_DELAY_MS);
req.on("aborted", () => clearTimeout(timer));
return;
}
res.writeHead(404);
res.end("not found");
});

// Bind directly to 127.0.0.1:11434 so isOllamaProvider() returns true
await new Promise((resolve) => server.listen(11434, "127.0.0.1", resolve));

try {
const embedder = new Embedder({
provider: "openai-compatible",
apiKey: "test-key",
model: "mxbai-embed-large",
baseURL: "http://127.0.0.1:11434/v1",
dimensions: DIMS,
});

assert.equal(
embedder.isOllamaProvider ? embedder.isOllamaProvider() : false,
true,
"isOllamaProvider should return true for 127.0.0.1:11434"
);

const start = Date.now();
const controller = new AbortController();
const abortTimer = setTimeout(() => controller.abort(), ABORT_AFTER_MS);

let errorCaught;
try {
// Pass external AbortSignal — should interrupt the 5-second slow response at ~2s
await embedder.embedPassage("abort test probe", controller.signal);
} catch (e) {
errorCaught = e;
}

clearTimeout(abortTimer);
const elapsed = Date.now() - start;

assert.ok(errorCaught, "embedPassage should throw (abort or timeout)");
const msg = errorCaught instanceof Error ? errorCaught.message : String(errorCaught);
assert.ok(
/timed out|abort|ollama|ECONNREFUSED/i.test(msg),
`Expected abort/timeout error, got: ${msg}`
);

// If abort works: elapsed ≈ 2000ms. If abort fails: elapsed ≈ 5000ms.
assert.ok(
elapsed < SLOW_DELAY_MS * 0.75,
`Expected abort ~${ABORT_AFTER_MS}ms, got ${elapsed}ms — abort did NOT interrupt slow request`
);

console.log(` PASSED (aborted in ${elapsed}ms < ${SLOW_DELAY_MS}ms threshold)\n`);
} finally {
await new Promise((resolve) => server.close(resolve));
}
}

async function run() {
console.log("Running regression tests for PR #238...\n");
await testSingleChunkFallbackTerminates();
Expand All @@ -245,6 +328,7 @@ async function run() {
await testSmallContextChunking();
await testTimeoutAbortPropagation();
await testBatchEmbeddingStillWorks();
await testOllamaAbortWithNativeFetch();
console.log("All regression tests passed!");
}

Expand Down
99 changes: 99 additions & 0 deletions test/embedder-ollama-abort.test.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import assert from "node:assert/strict";
import http from "node:http";
import { test } from "node:test";

import jitiFactory from "jiti";

const jiti = jitiFactory(import.meta.url, { interopDefault: true });
const { Embedder } = jiti("../src/embedder.ts");

/**
* Test: Ollama native fetch correctly aborts a slow HTTP request.
*
* Root cause (Issue #361 / PR #383):
* OpenAI SDK's HTTP client does not reliably abort Ollama TCP connections
* when AbortController.abort() fires in Node.js, causing stalled sockets
* that hang until the gateway-level timeout.
*
* Fix: For Ollama endpoints (localhost:11434), use Node.js native fetch
* instead of the OpenAI SDK. Native fetch properly closes TCP on abort.
*
* This test verifies the fix by:
* 1. Mocking a slow Ollama server on 127.0.0.1:11434 (5s delay)
* 2. Calling embedPassage with an AbortSignal that fires after 2s
* 3. Asserting total time ≈ 2s (not 5s) — proving abort interrupted the request
*
* Note: The mock server is bound to 127.0.0.1:11434 (not a random port) so that
* isOllamaProvider() returns true and the native fetch path is exercised.
*/
test("Ollama embedWithNativeFetch aborts slow request within expected time", async () => {
const SLOW_DELAY_MS = 5_000;
const ABORT_AFTER_MS = 2_000;
const DIMS = 1024;

const server = http.createServer((req, res) => {
if (req.url === "/v1/embeddings" && req.method === "POST") {
const timer = setTimeout(() => {
if (res.writableEnded) return; // already aborted
res.writeHead(200, { "Content-Type": "application/json" });
res.end(JSON.stringify({
data: [{ embedding: Array.from({ length: DIMS }, () => 0.1), index: 0 }]
}));
}, SLOW_DELAY_MS);
req.on("aborted", () => clearTimeout(timer));
return;
}
res.writeHead(404);
res.end("not found");
});

// Bind to 127.0.0.1:11434 so isOllamaProvider() returns true → native fetch path
await new Promise((resolve) => server.listen(11434, "127.0.0.1", resolve));

try {
const embedder = new Embedder({
provider: "openai-compatible",
apiKey: "test-key",
model: "mxbai-embed-large",
baseURL: "http://127.0.0.1:11434/v1",
dimensions: DIMS,
});

assert.ok(
embedder.isOllamaProvider(),
"isOllamaProvider() should return true for http://127.0.0.1:11434",
);

const start = Date.now();
const controller = new AbortController();
const abortTimer = setTimeout(() => controller.abort(), ABORT_AFTER_MS);

let errorCaught;
try {
await embedder.embedPassage("abort test probe", controller.signal);
assert.fail("embedPassage should have thrown");
} catch (e) {
errorCaught = e;
}

clearTimeout(abortTimer);
const elapsed = Date.now() - start;

assert.ok(errorCaught, "embedPassage should have thrown (abort or timeout)");
const msg = errorCaught instanceof Error ? errorCaught.message : String(errorCaught);
assert.ok(
/timed out|abort|ollama/i.test(msg),
`Expected abort/timeout/Ollama error, got: ${msg}`,
);

// Elapsed time must be close to ABORT_AFTER_MS, NOT SLOW_DELAY_MS.
// If abort worked: elapsed ≈ 2000ms.
// If abort failed: elapsed ≈ 5000ms (waited for slow response).
assert.ok(
elapsed < SLOW_DELAY_MS * 0.75,
`Expected abort ~${ABORT_AFTER_MS}ms, got ${elapsed}ms — abort did NOT interrupt slow request`,
);
} finally {
await new Promise((resolve) => server.close(resolve));
}
});
Loading