diff --git a/src/commands/import.ts b/src/commands/import.ts index 1cf691af7..70669b57a 100644 --- a/src/commands/import.ts +++ b/src/commands/import.ts @@ -28,7 +28,35 @@ export interface RunImportResult { failures: Array<{ path: string; error: string }>; } -export async function runImport(engine: BrainEngine, args: string[], opts: { commit?: string } = {}): Promise { +async function writeImportSyncAnchor( + engine: BrainEngine, + sourceId: string | undefined, + which: 'repo_path' | 'last_commit', + value: string, +): Promise { + if (sourceId !== undefined) { + const col = which === 'repo_path' ? 'local_path' : 'last_commit'; + if (which === 'last_commit') { + await engine.executeRaw( + `UPDATE sources SET last_commit = $1, last_sync_at = now() WHERE id = $2`, + [value, sourceId], + ); + } else { + await engine.executeRaw( + `UPDATE sources SET ${col} = $1 WHERE id = $2`, + [value, sourceId], + ); + } + return; + } + await engine.setConfig(`sync.${which}`, value); +} + +export async function runImport( + engine: BrainEngine, + args: string[], + opts: { commit?: string; sourceId?: string } = {}, +): Promise { const noEmbed = args.includes('--no-embed'); const fresh = args.includes('--fresh'); const jsonOutput = args.includes('--json'); @@ -105,7 +133,7 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com async function processFile(eng: BrainEngine, filePath: string) { const relativePath = relative(dir, filePath); try { - const result = await importFile(eng, filePath, relativePath, { noEmbed }); + const result = await importFile(eng, filePath, relativePath, { noEmbed, sourceId: opts.sourceId }); if (result.status === 'imported') { imported++; chunksCreated += result.chunks; @@ -270,7 +298,7 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com recordSyncFailures(failures, gitHead); } if (failures.length === 0) { - await engine.setConfig('sync.last_commit', gitHead); + await writeImportSyncAnchor(engine, opts.sourceId, 'last_commit', gitHead); } else { console.error( `\nImport completed with ${failures.length} failure(s). ` + @@ -279,7 +307,7 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com ); } await engine.setConfig('sync.last_run', new Date().toISOString()); - await engine.setConfig('sync.repo_path', dir); + await writeImportSyncAnchor(engine, opts.sourceId, 'repo_path', dir); } return { imported, skipped, errors, chunksCreated, failures }; diff --git a/src/commands/sync.ts b/src/commands/sync.ts index 00356c48e..c38727104 100644 --- a/src/commands/sync.ts +++ b/src/commands/sync.ts @@ -206,7 +206,7 @@ async function readSyncAnchor( sourceId: string | undefined, which: 'repo_path' | 'last_commit', ): Promise { - if (sourceId) { + if (sourceId !== undefined) { const col = which === 'repo_path' ? 'local_path' : 'last_commit'; const rows = await engine.executeRaw>( `SELECT ${col} AS value FROM sources WHERE id = $1`, @@ -223,7 +223,7 @@ async function writeSyncAnchor( which: 'repo_path' | 'last_commit', value: string, ): Promise { - if (sourceId) { + if (sourceId !== undefined) { const col = which === 'repo_path' ? 'local_path' : 'last_commit'; // last_sync_at bookmarked on every last_commit advance. if (which === 'last_commit') { @@ -259,7 +259,7 @@ async function readChunkerVersion( engine: BrainEngine, sourceId: string | undefined, ): Promise { - if (!sourceId) return null; + if (sourceId === undefined) return null; const rows = await engine.executeRaw<{ chunker_version: string | null }>( `SELECT chunker_version FROM sources WHERE id = $1`, [sourceId], @@ -272,7 +272,7 @@ async function writeChunkerVersion( sourceId: string | undefined, version: string, ): Promise { - if (!sourceId) return; + if (sourceId === undefined) return; await engine.executeRaw( `UPDATE sources SET chunker_version = $1 WHERE id = $2`, [version, sourceId], @@ -316,7 +316,7 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise` : `No repo path specified. Use --repo or run gbrain init with --repo first.`; throw new Error(hint); @@ -432,9 +432,10 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise 1) importArgs.push('--workers', String(fullConcurrency)); - const result = await runImport(engine, importArgs, { commit: headCommit }); + const result = await runImport(engine, importArgs, { commit: headCommit, sourceId: opts.sourceId }); // Bug 9 — gate the full-sync bookmark on success. runImport already // writes its own sync.last_commit conditionally (import.ts), but diff --git a/src/core/engine.ts b/src/core/engine.ts index 05bcc8ead..35587416e 100644 --- a/src/core/engine.ts +++ b/src/core/engine.ts @@ -136,7 +136,7 @@ export interface BrainEngine { * by `restore_page` flow, and by operator diagnostics. */ getPage(slug: string, opts?: GetPageOpts): Promise; - putPage(slug: string, page: PageInput): Promise; + putPage(slug: string, page: PageInput, opts?: { sourceId?: string }): Promise; /** * Hard-delete a page row. Cascades to content_chunks, page_links, * chunk_relations via existing FK ON DELETE CASCADE. @@ -146,7 +146,7 @@ export interface BrainEngine { * as the underlying primitive used by `purgeDeletedPages` and by callers * that explicitly want hard-delete semantics (e.g. test setup teardown). */ - deletePage(slug: string): Promise; + deletePage(slug: string, opts?: { sourceId?: string }): Promise; /** * v0.26.5 — set `deleted_at = now()` on a page. Returns the slug if a row * was soft-deleted, null if no row matched (already soft-deleted OR not found). @@ -185,8 +185,8 @@ export interface BrainEngine { getEmbeddingsByChunkIds(ids: number[]): Promise>; // Chunks - upsertChunks(slug: string, chunks: ChunkInput[]): Promise; - getChunks(slug: string): Promise; + upsertChunks(slug: string, chunks: ChunkInput[], opts?: { sourceId?: string }): Promise; + getChunks(slug: string, opts?: { sourceId?: string }): Promise; /** * Count chunks across the entire brain where embedded_at IS NULL. * Pre-flight short-circuit for `embed --stale` so a 100%-embedded brain @@ -202,7 +202,7 @@ export interface BrainEngine { * Bounded by an internal LIMIT of 100000 to mirror listPages. */ listStaleChunks(): Promise; - deleteChunks(slug: string): Promise; + deleteChunks(slug: string, opts?: { sourceId?: string }): Promise; // Links /** @@ -283,9 +283,9 @@ export interface BrainEngine { findOrphanPages(): Promise>; // Tags - addTag(slug: string, tag: string): Promise; - removeTag(slug: string, tag: string): Promise; - getTags(slug: string): Promise; + addTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise; + removeTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise; + getTags(slug: string, opts?: { sourceId?: string }): Promise; // Timeline /** @@ -319,9 +319,9 @@ export interface BrainEngine { putDreamVerdict(filePath: string, contentHash: string, verdict: DreamVerdictInput): Promise; // Versions - createVersion(slug: string): Promise; - getVersions(slug: string): Promise; - revertToVersion(slug: string, versionId: number): Promise; + createVersion(slug: string, opts?: { sourceId?: string }): Promise; + getVersions(slug: string, opts?: { sourceId?: string }): Promise; + revertToVersion(slug: string, versionId: number, opts?: { sourceId?: string }): Promise; // Stats + health getStats(): Promise; diff --git a/src/core/import-file.ts b/src/core/import-file.ts index 341d417da..14e36d7c5 100644 --- a/src/core/import-file.ts +++ b/src/core/import-file.ts @@ -167,6 +167,17 @@ export interface ImportResult { parsedPage?: ParsedPage; } +interface ImportOptions { + noEmbed?: boolean; + inferFrontmatter?: boolean; + force?: boolean; + sourceId?: string; +} + +function sourceOptions(sourceId: string | undefined): { sourceId: string } | undefined { + return sourceId !== undefined ? { sourceId } : undefined; +} + const MAX_FILE_SIZE = 5_000_000; // 5MB /** @@ -185,7 +196,7 @@ export async function importFromContent( engine: BrainEngine, slug: string, content: string, - opts: { noEmbed?: boolean } = {}, + opts: ImportOptions = {}, ): Promise { // Reject oversized payloads before any parsing, chunking, or embedding happens. // Uses Buffer.byteLength to count UTF-8 bytes the same way disk size would, @@ -223,7 +234,8 @@ export async function importFromContent( tags: parsed.tags, }; - const existing = await engine.getPage(slug); + const sourceOpts = sourceOptions(opts.sourceId); + const existing = await engine.getPage(slug, sourceOpts); if (existing?.content_hash === hash) { return { slug, status: 'skipped', chunks: 0, parsedPage }; } @@ -261,7 +273,7 @@ export async function importFromContent( // Transaction wraps all DB writes await engine.transaction(async (tx) => { - if (existing) await tx.createVersion(slug); + if (existing) await tx.createVersion(slug, sourceOpts); await tx.putPage(slug, { type: parsed.type, @@ -270,23 +282,23 @@ export async function importFromContent( timeline: parsed.timeline || '', frontmatter: parsed.frontmatter, content_hash: hash, - }); + }, sourceOpts); // Tag reconciliation: remove stale, add current - const existingTags = await tx.getTags(slug); + const existingTags = await tx.getTags(slug, sourceOpts); const newTags = new Set(parsed.tags); for (const old of existingTags) { - if (!newTags.has(old)) await tx.removeTag(slug, old); + if (!newTags.has(old)) await tx.removeTag(slug, old, sourceOpts); } for (const tag of parsed.tags) { - await tx.addTag(slug, tag); + await tx.addTag(slug, tag, sourceOpts); } if (chunks.length > 0) { - await tx.upsertChunks(slug, chunks); + await tx.upsertChunks(slug, chunks, sourceOpts); } else { // Content is empty — delete stale chunks so they don't ghost in search results - await tx.deleteChunks(slug); + await tx.deleteChunks(slug, sourceOpts); } // v0.19.0 E1 — doc↔impl linking: if this markdown page cites code paths @@ -333,7 +345,7 @@ export async function importFromFile( engine: BrainEngine, filePath: string, relativePath: string, - opts: { noEmbed?: boolean; inferFrontmatter?: boolean } = {}, + opts: ImportOptions = {}, ): Promise { // Defense-in-depth: reject symlinks before reading content. const lstat = lstatSync(filePath); @@ -398,7 +410,7 @@ export async function importCodeFile( engine: BrainEngine, relativePath: string, content: string, - opts: { noEmbed?: boolean; force?: boolean } = {}, + opts: ImportOptions = {}, ): Promise { const slug = slugifyCodePath(relativePath); const lang = detectCodeLanguage(relativePath) || 'unknown'; @@ -415,7 +427,8 @@ export async function importCodeFile( .update(JSON.stringify({ title, type: 'code', content, lang, chunker_version: CHUNKER_VERSION })) .digest('hex'); - const existing = await engine.getPage(slug); + const sourceOpts = sourceOptions(opts.sourceId); + const existing = await engine.getPage(slug, sourceOpts); if (!opts.force && existing?.content_hash === hash) { return { slug, status: 'skipped', chunks: 0 }; } @@ -453,7 +466,7 @@ export async function importCodeFile( // OpenAI API. Order matters: our chunk_index is semantic (tree-sitter // order), so a matching (chunk_index, text_hash) means a verbatim // preserved symbol. - const existingChunks = existing ? await engine.getChunks(slug) : []; + const existingChunks = existing ? await engine.getChunks(slug, sourceOpts) : []; const existingByKey = new Map(); for (const ec of existingChunks) { existingByKey.set(`${ec.chunk_index}:${ec.chunk_text}`, ec); @@ -488,7 +501,7 @@ export async function importCodeFile( // Store await engine.transaction(async (tx) => { - if (existing) await tx.createVersion(slug); + if (existing) await tx.createVersion(slug, sourceOpts); await tx.putPage(slug, { type: 'code' as PageType, @@ -498,15 +511,15 @@ export async function importCodeFile( timeline: '', frontmatter: { language: lang, file: relativePath }, content_hash: hash, - }); + }, sourceOpts); - await tx.addTag(slug, 'code'); - await tx.addTag(slug, lang); + await tx.addTag(slug, 'code', sourceOpts); + await tx.addTag(slug, lang, sourceOpts); if (chunks.length > 0) { - await tx.upsertChunks(slug, chunks); + await tx.upsertChunks(slug, chunks, sourceOpts); } else { - await tx.deleteChunks(slug); + await tx.deleteChunks(slug, sourceOpts); } }); @@ -517,7 +530,7 @@ export async function importCodeFile( // chunk IDs are stable. if (extractedEdges.length > 0 && chunks.length > 0) { try { - const persistedChunks = await engine.getChunks(slug); + const persistedChunks = await engine.getChunks(slug, sourceOpts); const byIndex = new Map(); for (const pc of persistedChunks) { byIndex.set(pc.chunk_index, pc); diff --git a/src/core/operations.ts b/src/core/operations.ts index b5ef2ab64..b9d077801 100644 --- a/src/core/operations.ts +++ b/src/core/operations.ts @@ -1088,9 +1088,13 @@ const get_versions: Operation = { description: 'Page version history', params: { slug: { type: 'string', required: true }, + source_id: { type: 'string' }, }, handler: async (ctx, p) => { - return ctx.engine.getVersions(p.slug as string); + return ctx.engine.getVersions( + p.slug as string, + p.source_id !== undefined ? { sourceId: p.source_id as string } : undefined, + ); }, scope: 'read', cliHints: { name: 'history', positional: ['slug'] }, @@ -1102,13 +1106,15 @@ const revert_version: Operation = { params: { slug: { type: 'string', required: true }, version_id: { type: 'number', required: true }, + source_id: { type: 'string' }, }, mutating: true, scope: 'write', handler: async (ctx, p) => { if (ctx.dryRun) return { dry_run: true, action: 'revert_version', slug: p.slug, version_id: p.version_id }; - await ctx.engine.createVersion(p.slug as string); - await ctx.engine.revertToVersion(p.slug as string, p.version_id as number); + const sourceOpts = p.source_id !== undefined ? { sourceId: p.source_id as string } : undefined; + await ctx.engine.createVersion(p.slug as string, sourceOpts); + await ctx.engine.revertToVersion(p.slug as string, p.version_id as number, sourceOpts); return { status: 'reverted' }; }, cliHints: { name: 'revert', positional: ['slug', 'version_id'] }, diff --git a/src/core/pglite-engine.ts b/src/core/pglite-engine.ts index a3cdf483d..e0ff3d78b 100644 --- a/src/core/pglite-engine.ts +++ b/src/core/pglite-engine.ts @@ -352,13 +352,9 @@ export class PGLiteEngine implements BrainEngine { async getPage(slug: string, opts?: { sourceId?: string; includeDeleted?: boolean }): Promise { // v0.26.5: hide soft-deleted by default; opt-in via opts.includeDeleted. const includeDeleted = opts?.includeDeleted === true; - const sourceId = opts?.sourceId; - const where: string[] = ['slug = $1']; - const params: unknown[] = [slug]; - if (sourceId) { - params.push(sourceId); - where.push(`source_id = $${params.length}`); - } + const sourceId = opts?.sourceId ?? 'default'; + const where: string[] = ['slug = $1', 'source_id = $2']; + const params: unknown[] = [slug, sourceId]; if (!includeDeleted) { where.push('deleted_at IS NULL'); } @@ -371,10 +367,11 @@ export class PGLiteEngine implements BrainEngine { return rowToPage(rows[0] as Record); } - async putPage(slug: string, page: PageInput): Promise { + async putPage(slug: string, page: PageInput, opts?: { sourceId?: string }): Promise { slug = validateSlug(slug); const hash = page.content_hash || contentHash(page); const frontmatter = page.frontmatter || {}; + const sourceId = opts?.sourceId ?? page.source_id ?? 'default'; // v0.18.0 Step 2: source_id relies on the schema DEFAULT 'default' so // existing callers still target the default source without threading @@ -383,8 +380,8 @@ export class PGLiteEngine implements BrainEngine { // surface an explicit sourceId param on putPage for multi-source sync. const pageKind = page.page_kind || 'markdown'; const { rows } = await this.db.query( - `INSERT INTO pages (slug, type, page_kind, title, compiled_truth, timeline, frontmatter, content_hash, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, now()) + `INSERT INTO pages (source_id, slug, type, page_kind, title, compiled_truth, timeline, frontmatter, content_hash, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9, now()) ON CONFLICT (source_id, slug) DO UPDATE SET type = EXCLUDED.type, page_kind = EXCLUDED.page_kind, @@ -395,13 +392,13 @@ export class PGLiteEngine implements BrainEngine { content_hash = EXCLUDED.content_hash, updated_at = now() RETURNING id, slug, type, title, compiled_truth, timeline, frontmatter, content_hash, created_at, updated_at`, - [slug, page.type, pageKind, page.title, page.compiled_truth, page.timeline || '', JSON.stringify(frontmatter), hash] + [sourceId, slug, page.type, pageKind, page.title, page.compiled_truth, page.timeline || '', JSON.stringify(frontmatter), hash] ); return rowToPage(rows[0] as Record); } - async deletePage(slug: string): Promise { - await this.db.query('DELETE FROM pages WHERE slug = $1', [slug]); + async deletePage(slug: string, opts?: { sourceId?: string }): Promise { + await this.db.query('DELETE FROM pages WHERE slug = $1 AND source_id = $2', [slug, opts?.sourceId ?? 'default']); } async softDeletePage(slug: string, opts?: { sourceId?: string }): Promise<{ slug: string } | null> { @@ -748,9 +745,12 @@ export class PGLiteEngine implements BrainEngine { } // Chunks - async upsertChunks(slug: string, chunks: ChunkInput[]): Promise { + async upsertChunks(slug: string, chunks: ChunkInput[], opts?: { sourceId?: string }): Promise { // Get page_id - const pageResult = await this.db.query('SELECT id FROM pages WHERE slug = $1', [slug]); + const pageResult = await this.db.query( + 'SELECT id FROM pages WHERE slug = $1 AND source_id = $2', + [slug, opts?.sourceId ?? 'default'], + ); if (pageResult.rows.length === 0) throw new Error(`Page not found: ${slug}`); const pageId = (pageResult.rows[0] as { id: number }).id; @@ -835,13 +835,13 @@ export class PGLiteEngine implements BrainEngine { ); } - async getChunks(slug: string): Promise { + async getChunks(slug: string, opts?: { sourceId?: string }): Promise { const { rows } = await this.db.query( `SELECT cc.* FROM content_chunks cc JOIN pages p ON p.id = cc.page_id - WHERE p.slug = $1 + WHERE p.slug = $1 AND p.source_id = $2 ORDER BY cc.chunk_index`, - [slug] + [slug, opts?.sourceId ?? 'default'] ); return (rows as Record[]).map(r => rowToChunk(r)); } @@ -869,11 +869,11 @@ export class PGLiteEngine implements BrainEngine { return rows as unknown as StaleChunkRow[]; } - async deleteChunks(slug: string): Promise { + async deleteChunks(slug: string, opts?: { sourceId?: string }): Promise { await this.db.query( `DELETE FROM content_chunks - WHERE page_id = (SELECT id FROM pages WHERE slug = $1)`, - [slug] + WHERE page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2)`, + [slug, opts?.sourceId ?? 'default'] ); } @@ -1214,30 +1214,30 @@ export class PGLiteEngine implements BrainEngine { } // Tags - async addTag(slug: string, tag: string): Promise { + async addTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise { await this.db.query( `INSERT INTO tags (page_id, tag) - SELECT id, $2 FROM pages WHERE slug = $1 + SELECT id, $2 FROM pages WHERE slug = $1 AND source_id = $3 ON CONFLICT (page_id, tag) DO NOTHING`, - [slug, tag] + [slug, tag, opts?.sourceId ?? 'default'] ); } - async removeTag(slug: string, tag: string): Promise { + async removeTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise { await this.db.query( `DELETE FROM tags - WHERE page_id = (SELECT id FROM pages WHERE slug = $1) + WHERE page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $3) AND tag = $2`, - [slug, tag] + [slug, tag, opts?.sourceId ?? 'default'] ); } - async getTags(slug: string): Promise { + async getTags(slug: string, opts?: { sourceId?: string }): Promise { const { rows } = await this.db.query( `SELECT tag FROM tags - WHERE page_id = (SELECT id FROM pages WHERE slug = $1) + WHERE page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2) ORDER BY tag`, - [slug] + [slug, opts?.sourceId ?? 'default'] ); return (rows as { tag: string }[]).map(r => r.tag); } @@ -1386,37 +1386,37 @@ export class PGLiteEngine implements BrainEngine { } // Versions - async createVersion(slug: string): Promise { + async createVersion(slug: string, opts?: { sourceId?: string }): Promise { const { rows } = await this.db.query( `INSERT INTO page_versions (page_id, compiled_truth, frontmatter) SELECT id, compiled_truth, frontmatter - FROM pages WHERE slug = $1 + FROM pages WHERE slug = $1 AND source_id = $2 RETURNING *`, - [slug] + [slug, opts?.sourceId ?? 'default'] ); return rows[0] as unknown as PageVersion; } - async getVersions(slug: string): Promise { + async getVersions(slug: string, opts?: { sourceId?: string }): Promise { const { rows } = await this.db.query( `SELECT pv.* FROM page_versions pv JOIN pages p ON p.id = pv.page_id - WHERE p.slug = $1 + WHERE p.slug = $1 AND p.source_id = $2 ORDER BY pv.snapshot_at DESC`, - [slug] + [slug, opts?.sourceId ?? 'default'] ); return rows as unknown as PageVersion[]; } - async revertToVersion(slug: string, versionId: number): Promise { + async revertToVersion(slug: string, versionId: number, opts?: { sourceId?: string }): Promise { await this.db.query( `UPDATE pages SET compiled_truth = pv.compiled_truth, frontmatter = pv.frontmatter, updated_at = now() FROM page_versions pv - WHERE pages.slug = $1 AND pv.id = $2 AND pv.page_id = pages.id`, - [slug, versionId] + WHERE pages.slug = $1 AND pages.source_id = $2 AND pv.id = $3 AND pv.page_id = pages.id`, + [slug, opts?.sourceId ?? 'default', versionId] ); } diff --git a/src/core/postgres-engine.ts b/src/core/postgres-engine.ts index 2b4394739..526aa587e 100644 --- a/src/core/postgres-engine.ts +++ b/src/core/postgres-engine.ts @@ -307,26 +307,26 @@ export class PostgresEngine implements BrainEngine { async getPage(slug: string, opts?: { sourceId?: string; includeDeleted?: boolean }): Promise { const sql = this.sql; const includeDeleted = opts?.includeDeleted === true; - const sourceId = opts?.sourceId; + const sourceId = opts?.sourceId ?? 'default'; // v0.26.5: default hides soft-deleted rows. Compose with optional sourceId // filter via fragment chaining (postgres.js supports sql`` composition). - const sourceCondition = sourceId ? sql`AND source_id = ${sourceId}` : sql``; const deletedCondition = includeDeleted ? sql`` : sql`AND deleted_at IS NULL`; const rows = await sql` SELECT id, slug, type, title, compiled_truth, timeline, frontmatter, content_hash, created_at, updated_at, deleted_at FROM pages - WHERE slug = ${slug} ${sourceCondition} ${deletedCondition} + WHERE slug = ${slug} AND source_id = ${sourceId} ${deletedCondition} LIMIT 1 `; if (rows.length === 0) return null; return rowToPage(rows[0]); } - async putPage(slug: string, page: PageInput): Promise { + async putPage(slug: string, page: PageInput, opts?: { sourceId?: string }): Promise { slug = validateSlug(slug); const sql = this.sql; const hash = page.content_hash || contentHash(page); const frontmatter = page.frontmatter || {}; + const sourceId = opts?.sourceId ?? page.source_id ?? 'default'; // v0.18.0 Step 2: source_id relies on schema DEFAULT 'default'. ON // CONFLICT target becomes (source_id, slug) since global UNIQUE(slug) @@ -334,8 +334,8 @@ export class PostgresEngine implements BrainEngine { // notes; multi-source sync (Step 5) will surface an explicit sourceId. const pageKind = page.page_kind || 'markdown'; const rows = await sql` - INSERT INTO pages (slug, type, page_kind, title, compiled_truth, timeline, frontmatter, content_hash, updated_at) - VALUES (${slug}, ${page.type}, ${pageKind}, ${page.title}, ${page.compiled_truth}, ${page.timeline || ''}, ${sql.json(frontmatter as Parameters[0])}, ${hash}, now()) + INSERT INTO pages (source_id, slug, type, page_kind, title, compiled_truth, timeline, frontmatter, content_hash, updated_at) + VALUES (${sourceId}, ${slug}, ${page.type}, ${pageKind}, ${page.title}, ${page.compiled_truth}, ${page.timeline || ''}, ${sql.json(frontmatter as Parameters[0])}, ${hash}, now()) ON CONFLICT (source_id, slug) DO UPDATE SET type = EXCLUDED.type, page_kind = EXCLUDED.page_kind, @@ -350,9 +350,9 @@ export class PostgresEngine implements BrainEngine { return rowToPage(rows[0]); } - async deletePage(slug: string): Promise { + async deletePage(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; - await sql`DELETE FROM pages WHERE slug = ${slug}`; + await sql`DELETE FROM pages WHERE slug = ${slug} AND source_id = ${opts?.sourceId ?? 'default'}`; } async softDeletePage(slug: string, opts?: { sourceId?: string }): Promise<{ slug: string } | null> { @@ -779,11 +779,11 @@ export class PostgresEngine implements BrainEngine { } // Chunks - async upsertChunks(slug: string, chunks: ChunkInput[]): Promise { + async upsertChunks(slug: string, chunks: ChunkInput[], opts?: { sourceId?: string }): Promise { const sql = this.sql; // Get page_id - const pages = await sql`SELECT id FROM pages WHERE slug = ${slug}`; + const pages = await sql`SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${opts?.sourceId ?? 'default'}`; if (pages.length === 0) throw new Error(`Page not found: ${slug}`); const pageId = pages[0].id; @@ -868,12 +868,12 @@ export class PostgresEngine implements BrainEngine { ); } - async getChunks(slug: string): Promise { + async getChunks(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; const rows = await sql` SELECT cc.* FROM content_chunks cc JOIN pages p ON p.id = cc.page_id - WHERE p.slug = ${slug} + WHERE p.slug = ${slug} AND p.source_id = ${opts?.sourceId ?? 'default'} ORDER BY cc.chunk_index `; return rows.map((r) => rowToChunk(r as Record)); @@ -903,11 +903,11 @@ export class PostgresEngine implements BrainEngine { return rows as unknown as StaleChunkRow[]; } - async deleteChunks(slug: string): Promise { + async deleteChunks(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; await sql` DELETE FROM content_chunks - WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug}) + WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${opts?.sourceId ?? 'default'}) `; } @@ -1264,11 +1264,11 @@ export class PostgresEngine implements BrainEngine { } // Tags - async addTag(slug: string, tag: string): Promise { + async addTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; // Verify page exists before attempting insert (ON CONFLICT DO NOTHING // swallows the "already tagged" case, but we still need to detect missing pages) - const page = await sql`SELECT id FROM pages WHERE slug = ${slug}`; + const page = await sql`SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${opts?.sourceId ?? 'default'}`; if (page.length === 0) throw new Error(`addTag failed: page "${slug}" not found`); await sql` INSERT INTO tags (page_id, tag) @@ -1277,20 +1277,20 @@ export class PostgresEngine implements BrainEngine { `; } - async removeTag(slug: string, tag: string): Promise { + async removeTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; await sql` DELETE FROM tags - WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug}) + WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${opts?.sourceId ?? 'default'}) AND tag = ${tag} `; } - async getTags(slug: string): Promise { + async getTags(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; const rows = await sql` SELECT tag FROM tags - WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug}) + WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${opts?.sourceId ?? 'default'}) ORDER BY tag `; return rows.map((r) => r.tag as string); @@ -1440,30 +1440,30 @@ export class PostgresEngine implements BrainEngine { } // Versions - async createVersion(slug: string): Promise { + async createVersion(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; const rows = await sql` INSERT INTO page_versions (page_id, compiled_truth, frontmatter) SELECT id, compiled_truth, frontmatter - FROM pages WHERE slug = ${slug} + FROM pages WHERE slug = ${slug} AND source_id = ${opts?.sourceId ?? 'default'} RETURNING * `; if (rows.length === 0) throw new Error(`createVersion failed: page "${slug}" not found`); return rows[0] as unknown as PageVersion; } - async getVersions(slug: string): Promise { + async getVersions(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; const rows = await sql` SELECT pv.* FROM page_versions pv JOIN pages p ON p.id = pv.page_id - WHERE p.slug = ${slug} + WHERE p.slug = ${slug} AND p.source_id = ${opts?.sourceId ?? 'default'} ORDER BY pv.snapshot_at DESC `; return rows as unknown as PageVersion[]; } - async revertToVersion(slug: string, versionId: number): Promise { + async revertToVersion(slug: string, versionId: number, opts?: { sourceId?: string }): Promise { const sql = this.sql; await sql` UPDATE pages SET @@ -1471,7 +1471,10 @@ export class PostgresEngine implements BrainEngine { frontmatter = pv.frontmatter, updated_at = now() FROM page_versions pv - WHERE pages.slug = ${slug} AND pv.id = ${versionId} AND pv.page_id = pages.id + WHERE pages.slug = ${slug} + AND pages.source_id = ${opts?.sourceId ?? 'default'} + AND pv.id = ${versionId} + AND pv.page_id = pages.id `; } diff --git a/src/core/types.ts b/src/core/types.ts index f1b8e7565..3188048cc 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -40,8 +40,10 @@ export interface PageInput { * to 'markdown' when omitted so existing callers work unchanged. Set to * 'code' by importCodeFile; drives orphans filter, auto-link bypass, and * `query --lang` filtering. - */ + */ page_kind?: PageKind; + /** Target source for multi-source imports. Defaults to 'default'. */ + source_id?: string; } export interface PageFilters { @@ -70,7 +72,7 @@ export interface PageFilters { /** v0.26.5 — opts for getPage / softDeletePage / restorePage. */ export interface GetPageOpts { - /** Filter to a specific source. When omitted, getPage returns the first slug match across sources (pre-existing semantics). */ + /** Filter get/soft-delete/restore calls to a specific source. When omitted, uses the default source. */ sourceId?: string; /** Include soft-deleted pages. Default false. See PageFilters.includeDeleted. */ includeDeleted?: boolean; diff --git a/test/sync.test.ts b/test/sync.test.ts index 33a61c924..f38772b26 100644 --- a/test/sync.test.ts +++ b/test/sync.test.ts @@ -359,6 +359,153 @@ describe('performSync dry-run never writes', () => { // Structural assertion: the contract includes `embedded: number`. expect(typeof result.embedded).toBe('number'); }); + + test('full sync writes pages under the requested source, not default', async () => { + const { performSync } = await import('../src/commands/sync.ts'); + await engine.executeRaw( + `INSERT INTO sources (id, name, local_path, config) + VALUES ($1, $2, $3, '{}'::jsonb)`, + ['source-sync', 'source-sync', repoPath], + ); + + const result = await performSync(engine, { + repoPath, + noPull: true, + noEmbed: true, + sourceId: 'source-sync', + }); + + expect(result.status).toBe('first_sync'); + expect(await engine.getPage('people/alice', { sourceId: 'source-sync' })).not.toBeNull(); + expect(await engine.getPage('people/bob', { sourceId: 'source-sync' })).not.toBeNull(); + expect(await engine.getPage('people/alice')).toBeNull(); + + const rows = await engine.executeRaw<{ source_id: string; n: number }>( + `SELECT source_id, COUNT(*)::int AS n + FROM pages + WHERE slug IN ('people/alice', 'people/bob') + GROUP BY source_id + ORDER BY source_id`, + ); + expect(rows).toEqual([{ source_id: 'source-sync', n: 2 }]); + }); + + test('empty-string source id is still an explicit source scope', async () => { + const { performSync } = await import('../src/commands/sync.ts'); + await engine.executeRaw( + `INSERT INTO sources (id, name, local_path, config) + VALUES ($1, $2, $3, '{}'::jsonb)`, + ['', 'empty-source', repoPath], + ); + + const result = await performSync(engine, { + repoPath, + noPull: true, + noEmbed: true, + sourceId: '', + }); + + expect(result.status).toBe('first_sync'); + expect(await engine.getPage('people/alice', { sourceId: '' })).not.toBeNull(); + expect(await engine.getPage('people/alice')).toBeNull(); + expect(await engine.getConfig('sync.last_commit')).toBeNull(); + const rows = await engine.executeRaw<{ last_commit: string | null; chunker_version: string | null }>( + `SELECT last_commit, chunker_version FROM sources WHERE id = $1`, + [''], + ); + expect(rows[0].last_commit).toBeTruthy(); + expect(rows[0].chunker_version).toBeTruthy(); + }); + + test('runImport threads sourceId into imported pages, chunks, tags, and versions', async () => { + const { runImport } = await import('../src/commands/import.ts'); + await engine.executeRaw( + `INSERT INTO sources (id, name, local_path, config) + VALUES ($1, $2, $3, '{}'::jsonb)`, + ['source-import', 'source-import', repoPath], + ); + writeFileSync(join(repoPath, 'people/alice.md'), [ + '---', + 'type: person', + 'title: Alice', + 'tags: [original]', + '---', + '', + 'Alice is a person.', + ].join('\n')); + + const first = await runImport(engine, [repoPath, '--no-embed'], { sourceId: 'source-import' }); + expect(first.imported).toBe(2); + expect(await engine.getPage('people/alice', { sourceId: 'source-import' })).not.toBeNull(); + expect(await engine.getPage('people/alice')).toBeNull(); + expect(await engine.getChunks('people/alice', { sourceId: 'source-import' })).toHaveLength(1); + expect(await engine.getTags('people/alice', { sourceId: 'source-import' })).toEqual(['original']); + + writeFileSync(join(repoPath, 'people/alice.md'), [ + '---', + 'type: person', + 'title: Alice Updated', + 'tags: [updated]', + '---', + '', + 'Alice is a person with updated notes.', + ].join('\n')); + + const second = await runImport(engine, [repoPath, '--no-embed'], { sourceId: 'source-import' }); + expect(second.imported).toBe(1); + expect(await engine.getTags('people/alice', { sourceId: 'source-import' })).toEqual(['updated']); + const versions = await engine.executeRaw<{ n: number }>( + `SELECT COUNT(*)::int AS n + FROM page_versions pv + JOIN pages p ON p.id = pv.page_id + WHERE p.slug = 'people/alice' + AND p.source_id = 'source-import'`, + ); + expect(versions[0].n).toBe(1); + }); + + test('version history and revert are scoped by source id', async () => { + const { importFromContent } = await import('../src/core/import-file.ts'); + await engine.executeRaw( + `INSERT INTO sources (id, name, local_path, config) + VALUES ($1, $2, $3, '{}'::jsonb)`, + ['version-source', 'version-source', repoPath], + ); + + await importFromContent(engine, 'people/shared', [ + '---', + 'type: person', + 'title: Default Shared', + '---', + '', + 'Default source body.', + ].join('\n'), { noEmbed: true }); + await importFromContent(engine, 'people/shared', [ + '---', + 'type: person', + 'title: Named Shared', + '---', + '', + 'Named source first body.', + ].join('\n'), { noEmbed: true, sourceId: 'version-source' }); + await importFromContent(engine, 'people/shared', [ + '---', + 'type: person', + 'title: Named Shared Updated', + '---', + '', + 'Named source updated body.', + ].join('\n'), { noEmbed: true, sourceId: 'version-source' }); + + const defaultVersions = await engine.getVersions('people/shared'); + const namedVersions = await engine.getVersions('people/shared', { sourceId: 'version-source' }); + expect(defaultVersions).toHaveLength(0); + expect(namedVersions).toHaveLength(1); + + await engine.revertToVersion('people/shared', namedVersions[0].id, { sourceId: 'version-source' }); + expect((await engine.getPage('people/shared'))?.title).toBe('Default Shared'); + expect((await engine.getPage('people/shared', { sourceId: 'version-source' }))?.compiled_truth).toBe('Named source first body.'); + }); }); describe('sync regression — #132 nested transaction deadlock', () => {