diff --git a/src/memory/__tests__/semantic.test.ts b/src/memory/__tests__/semantic.test.ts index f2f555dc..d796d867 100644 --- a/src/memory/__tests__/semantic.test.ts +++ b/src/memory/__tests__/semantic.test.ts @@ -59,6 +59,16 @@ describe("SemanticStore", () => { return Promise.resolve(new Response(JSON.stringify({ result: { points: [] } }), { status: 200 })); } + // Exact duplicate scroll returns no match + if (urlStr.includes("/points/scroll")) { + return Promise.resolve( + new Response( + JSON.stringify({ result: { points: [], next_page_offset: null } }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ), + ); + } + if (urlStr.includes("/points") && init?.method === "PUT") { upsertBody = JSON.parse(init.body as string); } @@ -244,6 +254,128 @@ describe("SemanticStore", () => { expect(contradictions.length).toBe(0); }); + test("store() skips upsert and merges episode IDs for exact duplicate", async () => { + const vec = make768dVector(); + let upsertCalled = false; + let updatePayloadBody: Record | null = null; + + globalThis.fetch = mock((url: string | Request, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.url; + + if (urlStr.includes("/api/embed")) { + return Promise.resolve(new Response(JSON.stringify({ embeddings: [vec] }), { status: 200 })); + } + + // Contradiction search returns no results + if (urlStr.includes("/points/query")) { + return Promise.resolve(new Response(JSON.stringify({ result: { points: [] } }), { status: 200 })); + } + + // Exact duplicate scroll returns an existing fact + if (urlStr.includes("/points/scroll")) { + return Promise.resolve( + new Response( + JSON.stringify({ + result: { + points: [ + { + id: "existing-fact-id", + payload: { + subject: "staging server", + predicate: "runs on", + object: "port 3001", + natural_language: "The staging server runs on port 3001", + source_episode_ids: ["ep-001"], + confidence: 0.9, + valid_from: Date.now(), + valid_until: null, + version: 1, + category: "domain_knowledge", + tags: ["infrastructure"], + }, + }, + ], + }, + }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ), + ); + } + + // Track upsert calls (should NOT be called) + if (urlStr.includes("/points") && init?.method === "PUT") { + upsertCalled = true; + } + + // Track updatePayload calls + if (urlStr.includes("/points/payload") && init?.method === "POST") { + updatePayloadBody = JSON.parse(init.body as string); + } + + return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 })); + }) as unknown as typeof fetch; + + const qdrant = new QdrantClient(TEST_CONFIG); + const embedder = new EmbeddingClient(TEST_CONFIG); + const store = new SemanticStore(qdrant, embedder, TEST_CONFIG); + + const fact = makeTestFact({ id: "new-fact-id", source_episode_ids: ["ep-002"] }); + const id = await store.store(fact); + + expect(id).toBe("existing-fact-id"); + expect(upsertCalled).toBe(false); + expect(updatePayloadBody).not.toBeNull(); + const payload = (updatePayloadBody as Record).payload as Record; + const mergedEpisodes = payload.source_episode_ids as string[]; + expect(mergedEpisodes).toContain("ep-001"); + expect(mergedEpisodes).toContain("ep-002"); + expect(mergedEpisodes.length).toBe(2); + }); + + test("store() creates new point when subject matches but object differs", async () => { + const vec = make768dVector(); + let upsertCalled = false; + + globalThis.fetch = mock((url: string | Request, init?: RequestInit) => { + const urlStr = typeof url === "string" ? url : url.url; + + if (urlStr.includes("/api/embed")) { + return Promise.resolve(new Response(JSON.stringify({ embeddings: [vec] }), { status: 200 })); + } + + // Contradiction search returns no results + if (urlStr.includes("/points/query")) { + return Promise.resolve(new Response(JSON.stringify({ result: { points: [] } }), { status: 200 })); + } + + // Exact duplicate scroll returns no match (different object) + if (urlStr.includes("/points/scroll")) { + return Promise.resolve( + new Response( + JSON.stringify({ result: { points: [], next_page_offset: null } }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ), + ); + } + + if (urlStr.includes("/points") && init?.method === "PUT") { + upsertCalled = true; + } + + return Promise.resolve(new Response(JSON.stringify({ status: "ok" }), { status: 200 })); + }) as unknown as typeof fetch; + + const qdrant = new QdrantClient(TEST_CONFIG); + const embedder = new EmbeddingClient(TEST_CONFIG); + const store = new SemanticStore(qdrant, embedder, TEST_CONFIG); + + const fact = makeTestFact({ id: "new-fact-id", object: "port 4000" }); + const id = await store.store(fact); + + expect(id).toBe("new-fact-id"); + expect(upsertCalled).toBe(true); + }); + test("resolveContradiction() invalidates old fact when new has higher confidence", async () => { let updatePayloadCalled = false; let updateBody: Record | null = null; diff --git a/src/memory/semantic.ts b/src/memory/semantic.ts index e332acbf..d4d20c4c 100644 --- a/src/memory/semantic.ts +++ b/src/memory/semantic.ts @@ -15,6 +15,7 @@ const COLLECTION_SCHEMA = { const PAYLOAD_INDEXES: { field: string; type: "keyword" | "integer" | "float" }[] = [ { field: "subject", type: "keyword" }, { field: "predicate", type: "keyword" }, + { field: "object", type: "keyword" }, { field: "category", type: "keyword" }, { field: "confidence", type: "float" }, { field: "valid_from", type: "integer" }, @@ -55,6 +56,16 @@ export class SemanticStore { await this.resolveContradiction(fact, existing); } + // Check for exact duplicates (same subject + object, still valid) + const duplicate = await this.findExactDuplicate(fact); + if (duplicate) { + const mergedEpisodes = [...new Set([...duplicate.source_episode_ids, ...fact.source_episode_ids])]; + await this.qdrant.updatePayload(this.collectionName, duplicate.id, { + source_episode_ids: mergedEpisodes, + }); + return duplicate.id; + } + const factVec = await this.embedder.embed(fact.natural_language); const sparse = textToSparseVector(`${fact.subject} ${fact.predicate} ${fact.object} ${fact.natural_language}`); @@ -133,6 +144,23 @@ export class SemanticStore { .map((r) => this.payloadToFact(r)); } + async findExactDuplicate(newFact: SemanticFact): Promise { + const { points } = await this.qdrant.scroll(this.collectionName, { + limit: 1, + filter: { + must: [ + { key: "subject", match: { value: newFact.subject } }, + { key: "object", match: { value: newFact.object } }, + { is_null: { key: "valid_until" } }, + ], + }, + withPayload: true, + }); + + if (points.length === 0) return null; + return this.payloadToFact(points[0]); + } + async resolveContradiction(newFact: SemanticFact, existingFact: SemanticFact): Promise { // Newer fact with higher or equal confidence supersedes the old one if (newFact.confidence >= existingFact.confidence) {