From 24fc6779f2c053397000d3d5bdc3ddfaa53f362a Mon Sep 17 00:00:00 2001 From: serhiizghama Date: Sun, 14 Jun 2026 08:49:51 +0700 Subject: [PATCH 1/2] refactor(rag-worker): add deterministic embedding point id helper Derive Qdrant point IDs from the document id and chunk index via an RFC 4122 v5 UUID instead of a random UUID per run. This is a pure addition; call sites are updated in the next commit. --- apps/rowboat/app/scripts/rag-worker.ts | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/apps/rowboat/app/scripts/rag-worker.ts b/apps/rowboat/app/scripts/rag-worker.ts index 95f85fa2b..baf05817c 100644 --- a/apps/rowboat/app/scripts/rag-worker.ts +++ b/apps/rowboat/app/scripts/rag-worker.ts @@ -61,6 +61,27 @@ async function retryable(fn: () => Promise, maxAttempts: number = 3): Prom } } +// Fixed namespace for deriving deterministic Qdrant point IDs (RFC 4122 v5). +const EMBEDDING_ID_NAMESPACE = 'a4d1f3e2-7b6c-5a4d-9e8f-1c2b3a4d5e6f'; + +/** + * Derives a stable, deterministic point ID for an embedding from its document + * id and chunk index. Re-processing the same document chunk (e.g. when a job is + * retried after the Qdrant upsert succeeded but a later step failed) produces + * the same id, so the upsert overwrites the existing point instead of inserting + * a duplicate. Returns a valid UUID, as required by `EmbeddingRecord.id`. + */ +function embeddingPointId(docId: string, chunkIndex: number): string { + const namespace = Buffer.from(EMBEDDING_ID_NAMESPACE.replace(/-/g, ''), 'hex'); + const name = Buffer.from(`${docId}-chunk-${chunkIndex}`, 'utf8'); + const hash = crypto.createHash('sha1').update(namespace).update(name).digest(); + const bytes = hash.subarray(0, 16); + bytes[6] = (bytes[6] & 0x0f) | 0x50; // version 5 + bytes[8] = (bytes[8] & 0x3f) | 0x80; // RFC 4122 variant + const hex = bytes.toString('hex'); + return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20)}`; +} + async function runProcessFilePipeline(_logger: PrefixLogger, usageTracker: UsageTracker, job: z.infer, doc: z.infer) { if (doc.data.type !== 'file_local' && doc.data.type !== 'file_s3') { throw new Error("Invalid data source type"); From 0199b4bf6e45b5f1c38afd5bc29cd6db4a9e21e1 Mon Sep 17 00:00:00 2001 From: serhiizghama Date: Sun, 14 Jun 2026 08:50:29 +0700 Subject: [PATCH 2/2] fix(rag-worker): use deterministic embedding point ids to avoid duplicates The file, scrape and text RAG pipelines generated a fresh random UUID for every Qdrant point on each run. When a job is retried after the Qdrant upsert succeeded but a later step (e.g. the MongoDB status update) failed, the retry wrote a brand-new set of points while the originals stayed in the collection, leaving orphaned duplicate vectors that degrade retrieval and are no longer tracked by the app. Derive each point id from the document id and chunk index so retries upsert over the same points and stay idempotent. Fixes #603 --- apps/rowboat/app/scripts/rag-worker.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/rowboat/app/scripts/rag-worker.ts b/apps/rowboat/app/scripts/rag-worker.ts index baf05817c..be2105837 100644 --- a/apps/rowboat/app/scripts/rag-worker.ts +++ b/apps/rowboat/app/scripts/rag-worker.ts @@ -174,7 +174,7 @@ async function runProcessFilePipeline(_logger: PrefixLogger, usageTracker: Usage // store embeddings in qdrant logger.log("Storing embeddings in Qdrant"); const points: z.infer[] = embeddings.map((embedding, i) => ({ - id: crypto.randomUUID(), + id: embeddingPointId(doc.id, i), vector: embedding, payload: { projectId: job.projectId, @@ -243,7 +243,7 @@ async function runScrapePipeline(_logger: PrefixLogger, usageTracker: UsageTrack // store embeddings in qdrant logger.log("Storing embeddings in Qdrant"); const points: z.infer[] = embeddings.map((embedding, i) => ({ - id: crypto.randomUUID(), + id: embeddingPointId(doc.id, i), vector: embedding, payload: { projectId: job.projectId, @@ -295,7 +295,7 @@ async function runProcessTextPipeline(_logger: PrefixLogger, usageTracker: Usage // store embeddings in qdrant logger.log("Storing embeddings in Qdrant"); const points: z.infer[] = embeddings.map((embedding, i) => ({ - id: crypto.randomUUID(), + id: embeddingPointId(doc.id, i), vector: embedding, payload: { projectId: job.projectId,