diff --git a/src/services/api-handlers.ts b/src/services/api-handlers.ts index b3a767a..51a2591 100644 --- a/src/services/api-handlers.ts +++ b/src/services/api-handlers.ts @@ -75,6 +75,10 @@ function safeJSONParse(jsonString: any): any { } } +function toBlob(vector?: Float32Array): Uint8Array | null { + return vector ? new Uint8Array(vector.buffer) : null; +} + function extractScopeFromTag(tag: string): { scope: "project"; hash: string } { const parts = tag.split("_"); if (parts.length >= 3) { @@ -331,7 +335,49 @@ export async function handleAddMemory(data: { metadata: JSON.stringify({ source: "api" }), }; const db = connectionManager.getConnection(shard.dbPath); - await vectorSearch.insertVector(db, record, shard); + + // Use transaction for atomic SQLite insert + const insertMemory = db.transaction(() => { + const insertStmt = db.prepare(` + INSERT INTO memories ( + id, content, vector, tags_vector, container_tag, tags, type, created_at, updated_at, + metadata, display_name, user_name, user_email, project_path, project_name, git_repo_url + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + insertStmt.run( + record.id, + record.content, + toBlob(record.vector), + toBlob(record.tagsVector), + record.containerTag, + record.tags || null, + record.type || null, + record.createdAt, + record.updatedAt, + record.metadata || null, + record.displayName || null, + record.userName || null, + record.userEmail || null, + record.projectPath || null, + record.projectName || null, + record.gitRepoUrl || null + ); + }); + insertMemory(); + + // Vector index update (outside transaction — vector backend is async) + try { + const backend = await (vectorSearch as any).getBackend(); + await backend.insert({ id: record.id, vector: record.vector, shard, kind: "content" }); + if (record.tagsVector) { + await backend.insert({ id: record.id, vector: record.tagsVector, shard, kind: "tags" }); + } + } catch (error) { + // Rollback SQLite insert on vector backend failure + db.prepare(`DELETE FROM memories WHERE id = ?`).run(record.id); + throw error; + } + shardManager.incrementVectorCount(shard.id); return { success: true, data: { id } }; } catch (error) { @@ -396,6 +442,8 @@ export async function handleUpdateMemory( try { if (!id) return { success: false, error: "id is required" }; await embeddingService.warmup(); + + // Find the existing memory first (read-only — no data modified yet) const projectShards = shardManager.getAllShards("project", ""); let foundShard = null, existingMemory = null; @@ -409,39 +457,64 @@ export async function handleUpdateMemory( } } if (!foundShard || !existingMemory) return { success: false, error: "Memory not found" }; - const db = connectionManager.getConnection(foundShard.dbPath); - await vectorSearch.deleteVector(db, id, foundShard); - shardManager.decrementVectorCount(foundShard.id); + // STEP 1: Generate new embeddings FIRST (safe — no data deleted yet) const newContent = data.content || existingMemory.content; - const tags = data.tags || (existingMemory.tags ? existingMemory.tags.split(",") : []); - + const tags = + data.tags || + (existingMemory.tags ? existingMemory.tags.split(",").map((t: string) => t.trim()) : []); const vector = await embeddingService.embedWithTimeout(newContent); let tagsVector: Float32Array | undefined = undefined; if (tags.length > 0) { tagsVector = await embeddingService.embedWithTimeout(tags.join(", ")); } - const updatedRecord = { - id, - content: newContent, - vector, - tagsVector, - containerTag: existingMemory.container_tag, - tags: tags.length > 0 ? tags.join(",") : undefined, - type: data.type || existingMemory.type, - createdAt: existingMemory.created_at, - updatedAt: Date.now(), - metadata: existingMemory.metadata, - displayName: existingMemory.display_name, - userName: existingMemory.user_name, - userEmail: existingMemory.user_email, - projectPath: existingMemory.project_path, - projectName: existingMemory.project_name, - gitRepoUrl: existingMemory.git_repo_url, - }; - await vectorSearch.insertVector(db, updatedRecord, foundShard); - shardManager.incrementVectorCount(foundShard.id); + const db = connectionManager.getConnection(foundShard.dbPath); + + // STEP 2: Wrap SQLite delete + insert in a transaction + const updateTransaction = db.transaction(() => { + // Delete old record + db.prepare(`DELETE FROM memories WHERE id = ?`).run(id); + + // Insert updated record + const insertStmt = db.prepare(` + INSERT INTO memories ( + id, content, vector, tags_vector, container_tag, tags, type, created_at, updated_at, + metadata, display_name, user_name, user_email, project_path, project_name, git_repo_url + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + insertStmt.run( + id, + newContent, + toBlob(vector), + toBlob(tagsVector), + existingMemory.container_tag, + tags.length > 0 ? tags.join(",") : null, + data.type || existingMemory.type, + existingMemory.created_at, + Date.now(), + existingMemory.metadata, + existingMemory.display_name, + existingMemory.user_name, + existingMemory.user_email, + existingMemory.project_path, + existingMemory.project_name, + existingMemory.git_repo_url + ); + }); + + // Execute the SQLite transaction atomically + updateTransaction(); + + // STEP 3: Update vector index (outside transaction — vector backend is async/in-memory) + const backend = await (vectorSearch as any).getBackend(); + await backend.delete({ id, shard: foundShard, kind: "content" }); + await backend.delete({ id, shard: foundShard, kind: "tags" }); + await backend.insert({ id, vector, shard: foundShard, kind: "content" }); + if (tagsVector) { + await backend.insert({ id, vector: tagsVector, shard: foundShard, kind: "tags" }); + } + return { success: true }; } catch (error) { log("handleUpdateMemory: error", { error: String(error) }); diff --git a/src/services/client.ts b/src/services/client.ts index 351ac65..16258a4 100644 --- a/src/services/client.ts +++ b/src/services/client.ts @@ -37,6 +37,10 @@ function safeJSONParse(jsonString: any): any { } } +function toBlob(vector?: Float32Array): Uint8Array | null { + return vector ? new Uint8Array(vector.buffer) : null; +} + function extractScopeFromContainerTag(containerTag: string): { scope: "user" | "project"; hash: string; @@ -207,7 +211,49 @@ export class LocalMemoryClient { }; const db = connectionManager.getConnection(shard.dbPath); - await vectorSearch.insertVector(db, record, shard); + + // Use transaction for atomic SQLite insert + const insertMemory = db.transaction(() => { + const insertStmt = db.prepare(` + INSERT INTO memories ( + id, content, vector, tags_vector, container_tag, tags, type, created_at, updated_at, + metadata, display_name, user_name, user_email, project_path, project_name, git_repo_url + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `); + insertStmt.run( + record.id, + record.content, + toBlob(record.vector), + toBlob(record.tagsVector), + record.containerTag, + record.tags || null, + record.type || null, + record.createdAt, + record.updatedAt, + record.metadata || null, + record.displayName || null, + record.userName || null, + record.userEmail || null, + record.projectPath || null, + record.projectName || null, + record.gitRepoUrl || null + ); + }); + insertMemory(); + + // Vector index update (outside transaction — vector backend is async/in-memory) + try { + const backend = await (vectorSearch as any).getBackend(); + await backend.insert({ id: record.id, vector: record.vector, shard, kind: "content" }); + if (record.tagsVector) { + await backend.insert({ id: record.id, vector: record.tagsVector, shard, kind: "tags" }); + } + } catch (error) { + // Rollback SQLite insert on vector backend failure + db.prepare(`DELETE FROM memories WHERE id = ?`).run(record.id); + throw error; + } + shardManager.incrementVectorCount(shard.id); return { success: true as const, id }; diff --git a/src/services/embedding.ts b/src/services/embedding.ts index aeb3819..26ec811 100644 --- a/src/services/embedding.ts +++ b/src/services/embedding.ts @@ -67,9 +67,34 @@ export class EmbeddingService { private async initializeModel(progressCallback?: (progress: any) => void): Promise { try { if (CONFIG.embeddingApiUrl && CONFIG.embeddingApiKey) { + // Send a probe request to verify the API endpoint is actually reachable + // Uses a minimal embedding of "ping" to test the full request pipeline + const probeResponse = await withTimeout( + fetch(`${CONFIG.embeddingApiUrl}/embeddings`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${CONFIG.embeddingApiKey}`, + }, + body: JSON.stringify({ + input: "ping", + model: CONFIG.embeddingModel, + }), + }), + TIMEOUT_MS + ); + + if (!probeResponse.ok) { + throw new Error( + `Embedding API health check failed: ${probeResponse.status} ${probeResponse.statusText}` + ); + } + this.isWarmedUp = true; return; } + + // Local model path const { pipeline } = await ensureTransformersLoaded(); this.pipe = await pipeline("feature-extraction", CONFIG.embeddingModel, { progress_callback: progressCallback,