Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 99 additions & 26 deletions src/services/api-handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ function safeJSONParse(jsonString: any): any {
}
}

function toBlob(vector?: Float32Array): Uint8Array | null {
return vector ? new Uint8Array(vector.buffer) : null;
}

function extractScopeFromTag(tag: string): { scope: "project"; hash: string } {
const parts = tag.split("_");
if (parts.length >= 3) {
Expand Down Expand Up @@ -331,7 +335,49 @@ export async function handleAddMemory(data: {
metadata: JSON.stringify({ source: "api" }),
};
const db = connectionManager.getConnection(shard.dbPath);
await vectorSearch.insertVector(db, record, shard);

// Use transaction for atomic SQLite insert
const insertMemory = db.transaction(() => {
const insertStmt = db.prepare(`
INSERT INTO memories (
id, content, vector, tags_vector, container_tag, tags, type, created_at, updated_at,
metadata, display_name, user_name, user_email, project_path, project_name, git_repo_url
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
insertStmt.run(
record.id,
record.content,
toBlob(record.vector),
toBlob(record.tagsVector),
record.containerTag,
record.tags || null,
record.type || null,
record.createdAt,
record.updatedAt,
record.metadata || null,
record.displayName || null,
record.userName || null,
record.userEmail || null,
record.projectPath || null,
record.projectName || null,
record.gitRepoUrl || null
);
});
insertMemory();

// Vector index update (outside transaction — vector backend is async)
try {
const backend = await (vectorSearch as any).getBackend();
await backend.insert({ id: record.id, vector: record.vector, shard, kind: "content" });
if (record.tagsVector) {
await backend.insert({ id: record.id, vector: record.tagsVector, shard, kind: "tags" });
}
} catch (error) {
// Rollback SQLite insert on vector backend failure
db.prepare(`DELETE FROM memories WHERE id = ?`).run(record.id);
throw error;
}

shardManager.incrementVectorCount(shard.id);
return { success: true, data: { id } };
} catch (error) {
Expand Down Expand Up @@ -396,6 +442,8 @@ export async function handleUpdateMemory(
try {
if (!id) return { success: false, error: "id is required" };
await embeddingService.warmup();

// Find the existing memory first (read-only — no data modified yet)
const projectShards = shardManager.getAllShards("project", "");
let foundShard = null,
existingMemory = null;
Expand All @@ -409,39 +457,64 @@ export async function handleUpdateMemory(
}
}
if (!foundShard || !existingMemory) return { success: false, error: "Memory not found" };
const db = connectionManager.getConnection(foundShard.dbPath);
await vectorSearch.deleteVector(db, id, foundShard);
shardManager.decrementVectorCount(foundShard.id);

// STEP 1: Generate new embeddings FIRST (safe — no data deleted yet)
const newContent = data.content || existingMemory.content;
const tags = data.tags || (existingMemory.tags ? existingMemory.tags.split(",") : []);

const tags =
data.tags ||
(existingMemory.tags ? existingMemory.tags.split(",").map((t: string) => t.trim()) : []);
const vector = await embeddingService.embedWithTimeout(newContent);
let tagsVector: Float32Array | undefined = undefined;
if (tags.length > 0) {
tagsVector = await embeddingService.embedWithTimeout(tags.join(", "));
}

const updatedRecord = {
id,
content: newContent,
vector,
tagsVector,
containerTag: existingMemory.container_tag,
tags: tags.length > 0 ? tags.join(",") : undefined,
type: data.type || existingMemory.type,
createdAt: existingMemory.created_at,
updatedAt: Date.now(),
metadata: existingMemory.metadata,
displayName: existingMemory.display_name,
userName: existingMemory.user_name,
userEmail: existingMemory.user_email,
projectPath: existingMemory.project_path,
projectName: existingMemory.project_name,
gitRepoUrl: existingMemory.git_repo_url,
};
await vectorSearch.insertVector(db, updatedRecord, foundShard);
shardManager.incrementVectorCount(foundShard.id);
const db = connectionManager.getConnection(foundShard.dbPath);

// STEP 2: Wrap SQLite delete + insert in a transaction
const updateTransaction = db.transaction(() => {
// Delete old record
db.prepare(`DELETE FROM memories WHERE id = ?`).run(id);

// Insert updated record
const insertStmt = db.prepare(`
INSERT INTO memories (
id, content, vector, tags_vector, container_tag, tags, type, created_at, updated_at,
metadata, display_name, user_name, user_email, project_path, project_name, git_repo_url
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
insertStmt.run(
id,
newContent,
toBlob(vector),
toBlob(tagsVector),
existingMemory.container_tag,
tags.length > 0 ? tags.join(",") : null,
data.type || existingMemory.type,
existingMemory.created_at,
Date.now(),
existingMemory.metadata,
existingMemory.display_name,
existingMemory.user_name,
existingMemory.user_email,
existingMemory.project_path,
existingMemory.project_name,
existingMemory.git_repo_url
);
});

// Execute the SQLite transaction atomically
updateTransaction();

// STEP 3: Update vector index (outside transaction — vector backend is async/in-memory)
const backend = await (vectorSearch as any).getBackend();
await backend.delete({ id, shard: foundShard, kind: "content" });
await backend.delete({ id, shard: foundShard, kind: "tags" });
await backend.insert({ id, vector, shard: foundShard, kind: "content" });
if (tagsVector) {
await backend.insert({ id, vector: tagsVector, shard: foundShard, kind: "tags" });
}

return { success: true };
} catch (error) {
log("handleUpdateMemory: error", { error: String(error) });
Expand Down
48 changes: 47 additions & 1 deletion src/services/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ function safeJSONParse(jsonString: any): any {
}
}

function toBlob(vector?: Float32Array): Uint8Array | null {
return vector ? new Uint8Array(vector.buffer) : null;
}

function extractScopeFromContainerTag(containerTag: string): {
scope: "user" | "project";
hash: string;
Expand Down Expand Up @@ -207,7 +211,49 @@ export class LocalMemoryClient {
};

const db = connectionManager.getConnection(shard.dbPath);
await vectorSearch.insertVector(db, record, shard);

// Use transaction for atomic SQLite insert
const insertMemory = db.transaction(() => {
const insertStmt = db.prepare(`
INSERT INTO memories (
id, content, vector, tags_vector, container_tag, tags, type, created_at, updated_at,
metadata, display_name, user_name, user_email, project_path, project_name, git_repo_url
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`);
insertStmt.run(
record.id,
record.content,
toBlob(record.vector),
toBlob(record.tagsVector),
record.containerTag,
record.tags || null,
record.type || null,
record.createdAt,
record.updatedAt,
record.metadata || null,
record.displayName || null,
record.userName || null,
record.userEmail || null,
record.projectPath || null,
record.projectName || null,
record.gitRepoUrl || null
);
});
insertMemory();

// Vector index update (outside transaction — vector backend is async/in-memory)
try {
const backend = await (vectorSearch as any).getBackend();
await backend.insert({ id: record.id, vector: record.vector, shard, kind: "content" });
if (record.tagsVector) {
await backend.insert({ id: record.id, vector: record.tagsVector, shard, kind: "tags" });
}
} catch (error) {
// Rollback SQLite insert on vector backend failure
db.prepare(`DELETE FROM memories WHERE id = ?`).run(record.id);
throw error;
}

shardManager.incrementVectorCount(shard.id);

return { success: true as const, id };
Expand Down
25 changes: 25 additions & 0 deletions src/services/embedding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,34 @@ export class EmbeddingService {
private async initializeModel(progressCallback?: (progress: any) => void): Promise<void> {
try {
if (CONFIG.embeddingApiUrl && CONFIG.embeddingApiKey) {
// Send a probe request to verify the API endpoint is actually reachable
// Uses a minimal embedding of "ping" to test the full request pipeline
const probeResponse = await withTimeout(
fetch(`${CONFIG.embeddingApiUrl}/embeddings`, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${CONFIG.embeddingApiKey}`,
},
body: JSON.stringify({
input: "ping",
model: CONFIG.embeddingModel,
}),
}),
TIMEOUT_MS
);

if (!probeResponse.ok) {
throw new Error(
`Embedding API health check failed: ${probeResponse.status} ${probeResponse.statusText}`
);
}

this.isWarmedUp = true;
return;
}

// Local model path
const { pipeline } = await ensureTransformersLoaded();
this.pipe = await pipeline("feature-extraction", CONFIG.embeddingModel, {
progress_callback: progressCallback,
Expand Down