diff --git a/apps/rowboat/app/scripts/rag-worker.ts b/apps/rowboat/app/scripts/rag-worker.ts index 95f85fa2b..be2105837 100644 --- a/apps/rowboat/app/scripts/rag-worker.ts +++ b/apps/rowboat/app/scripts/rag-worker.ts @@ -61,6 +61,27 @@ async function retryable(fn: () => Promise, maxAttempts: number = 3): Prom } } +// Fixed namespace for deriving deterministic Qdrant point IDs (RFC 4122 v5). +const EMBEDDING_ID_NAMESPACE = 'a4d1f3e2-7b6c-5a4d-9e8f-1c2b3a4d5e6f'; + +/** + * Derives a stable, deterministic point ID for an embedding from its document + * id and chunk index. Re-processing the same document chunk (e.g. when a job is + * retried after the Qdrant upsert succeeded but a later step failed) produces + * the same id, so the upsert overwrites the existing point instead of inserting + * a duplicate. Returns a valid UUID, as required by `EmbeddingRecord.id`. + */ +function embeddingPointId(docId: string, chunkIndex: number): string { + const namespace = Buffer.from(EMBEDDING_ID_NAMESPACE.replace(/-/g, ''), 'hex'); + const name = Buffer.from(`${docId}-chunk-${chunkIndex}`, 'utf8'); + const hash = crypto.createHash('sha1').update(namespace).update(name).digest(); + const bytes = hash.subarray(0, 16); + bytes[6] = (bytes[6] & 0x0f) | 0x50; // version 5 + bytes[8] = (bytes[8] & 0x3f) | 0x80; // RFC 4122 variant + const hex = bytes.toString('hex'); + return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20)}`; +} + async function runProcessFilePipeline(_logger: PrefixLogger, usageTracker: UsageTracker, job: z.infer, doc: z.infer) { if (doc.data.type !== 'file_local' && doc.data.type !== 'file_s3') { throw new Error("Invalid data source type"); @@ -153,7 +174,7 @@ async function runProcessFilePipeline(_logger: PrefixLogger, usageTracker: Usage // store embeddings in qdrant logger.log("Storing embeddings in Qdrant"); const points: z.infer[] = embeddings.map((embedding, i) => ({ - id: crypto.randomUUID(), + id: embeddingPointId(doc.id, i), vector: embedding, payload: { projectId: job.projectId, @@ -222,7 +243,7 @@ async function runScrapePipeline(_logger: PrefixLogger, usageTracker: UsageTrack // store embeddings in qdrant logger.log("Storing embeddings in Qdrant"); const points: z.infer[] = embeddings.map((embedding, i) => ({ - id: crypto.randomUUID(), + id: embeddingPointId(doc.id, i), vector: embedding, payload: { projectId: job.projectId, @@ -274,7 +295,7 @@ async function runProcessTextPipeline(_logger: PrefixLogger, usageTracker: Usage // store embeddings in qdrant logger.log("Storing embeddings in Qdrant"); const points: z.infer[] = embeddings.map((embedding, i) => ({ - id: crypto.randomUUID(), + id: embeddingPointId(doc.id, i), vector: embedding, payload: { projectId: job.projectId,