Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions apps/rowboat/app/scripts/rag-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ async function retryable<T>(fn: () => Promise<T>, maxAttempts: number = 3): Prom
}
}

// Fixed namespace for deriving deterministic Qdrant point IDs (RFC 4122 v5).
const EMBEDDING_ID_NAMESPACE = 'a4d1f3e2-7b6c-5a4d-9e8f-1c2b3a4d5e6f';

/**
* Derives a stable, deterministic point ID for an embedding from its document
* id and chunk index. Re-processing the same document chunk (e.g. when a job is
* retried after the Qdrant upsert succeeded but a later step failed) produces
* the same id, so the upsert overwrites the existing point instead of inserting
* a duplicate. Returns a valid UUID, as required by `EmbeddingRecord.id`.
*/
function embeddingPointId(docId: string, chunkIndex: number): string {
const namespace = Buffer.from(EMBEDDING_ID_NAMESPACE.replace(/-/g, ''), 'hex');
const name = Buffer.from(`${docId}-chunk-${chunkIndex}`, 'utf8');
const hash = crypto.createHash('sha1').update(namespace).update(name).digest();
const bytes = hash.subarray(0, 16);
bytes[6] = (bytes[6] & 0x0f) | 0x50; // version 5
bytes[8] = (bytes[8] & 0x3f) | 0x80; // RFC 4122 variant
const hex = bytes.toString('hex');
return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20)}`;
}

async function runProcessFilePipeline(_logger: PrefixLogger, usageTracker: UsageTracker, job: z.infer<typeof DataSource>, doc: z.infer<typeof DataSourceDoc>) {
if (doc.data.type !== 'file_local' && doc.data.type !== 'file_s3') {
throw new Error("Invalid data source type");
Expand Down Expand Up @@ -153,7 +174,7 @@ async function runProcessFilePipeline(_logger: PrefixLogger, usageTracker: Usage
// store embeddings in qdrant
logger.log("Storing embeddings in Qdrant");
const points: z.infer<typeof EmbeddingRecord>[] = embeddings.map((embedding, i) => ({
id: crypto.randomUUID(),
id: embeddingPointId(doc.id, i),
vector: embedding,
payload: {
projectId: job.projectId,
Expand Down Expand Up @@ -222,7 +243,7 @@ async function runScrapePipeline(_logger: PrefixLogger, usageTracker: UsageTrack
// store embeddings in qdrant
logger.log("Storing embeddings in Qdrant");
const points: z.infer<typeof EmbeddingRecord>[] = embeddings.map((embedding, i) => ({
id: crypto.randomUUID(),
id: embeddingPointId(doc.id, i),
vector: embedding,
payload: {
projectId: job.projectId,
Expand Down Expand Up @@ -274,7 +295,7 @@ async function runProcessTextPipeline(_logger: PrefixLogger, usageTracker: Usage
// store embeddings in qdrant
logger.log("Storing embeddings in Qdrant");
const points: z.infer<typeof EmbeddingRecord>[] = embeddings.map((embedding, i) => ({
id: crypto.randomUUID(),
id: embeddingPointId(doc.id, i),
vector: embedding,
payload: {
projectId: job.projectId,
Expand Down