diff --git a/src/commands/extract.ts b/src/commands/extract.ts index 9b9d08baa..b008f0393 100644 --- a/src/commands/extract.ts +++ b/src/commands/extract.ts @@ -668,9 +668,21 @@ async function extractTimelineFromDir( // --- Sync integration hooks --- -export async function extractLinksForSlugs(engine: BrainEngine, repoPath: string, slugs: string[]): Promise { +export async function extractLinksForSlugs( + engine: BrainEngine, + repoPath: string, + slugs: string[], + opts?: { sourceId?: string }, +): Promise { const allFiles = walkMarkdownFiles(repoPath); const allSlugs = new Set(allFiles.map(f => f.relPath.replace('.md', ''))); + // v0.18.0+ multi-source: post-sync extract reconciles same-source edges. + // Markdown→markdown links within one repo always live in the caller's + // sourceId. Cross-source extraction (rare) would need a per-repo source + // manifest; not in this PR's scope. + const linkOpts = opts?.sourceId + ? { fromSourceId: opts.sourceId, toSourceId: opts.sourceId, originSourceId: opts.sourceId } + : undefined; let created = 0; for (const slug of slugs) { const filePath = join(repoPath, slug + '.md'); @@ -678,14 +690,23 @@ export async function extractLinksForSlugs(engine: BrainEngine, repoPath: string try { const content = readFileSync(filePath, 'utf-8'); for (const link of await extractLinksFromFile(content, slug + '.md', allSlugs)) { - try { await engine.addLink(link.from_slug, link.to_slug, link.context, link.link_type); created++; } catch { /* skip */ } + try { await engine.addLink(link.from_slug, link.to_slug, link.context, link.link_type, undefined, undefined, undefined, linkOpts); created++; } catch { /* skip */ } } } catch { /* skip */ } } return created; } -export async function extractTimelineForSlugs(engine: BrainEngine, repoPath: string, slugs: string[]): Promise { +export async function extractTimelineForSlugs( + engine: BrainEngine, + repoPath: string, + slugs: string[], + opts?: { sourceId?: string }, +): Promise { + // v0.18.0+ multi-source: source-qualify so timeline rows don't fan out + // across every source containing the slug (the addTimelineEntry's + // INSERT...SELECT-from-pages fan-out was Data R1's HIGH 2). + const entryOpts = opts?.sourceId ? { sourceId: opts.sourceId } : undefined; let created = 0; for (const slug of slugs) { const filePath = join(repoPath, slug + '.md'); @@ -693,7 +714,7 @@ export async function extractTimelineForSlugs(engine: BrainEngine, repoPath: str try { const content = readFileSync(filePath, 'utf-8'); for (const entry of extractTimelineFromContent(content, slug)) { - try { await engine.addTimelineEntry(entry.slug, { date: entry.date, source: entry.source, summary: entry.summary, detail: entry.detail }); created++; } catch { /* skip */ } + try { await engine.addTimelineEntry(entry.slug, { date: entry.date, source: entry.source, summary: entry.summary, detail: entry.detail }, entryOpts); created++; } catch { /* skip */ } } } catch { /* skip */ } } diff --git a/src/commands/import.ts b/src/commands/import.ts index 58992f11a..8961d72b8 100644 --- a/src/commands/import.ts +++ b/src/commands/import.ts @@ -28,10 +28,26 @@ export interface RunImportResult { failures: Array<{ path: string; error: string }>; } -export async function runImport(engine: BrainEngine, args: string[], opts: { commit?: string } = {}): Promise { +export async function runImport( + engine: BrainEngine, + args: string[], + opts: { commit?: string; sourceId?: string; writeSyncConfig?: boolean } = {}, +): Promise { const noEmbed = args.includes('--no-embed'); const fresh = args.includes('--fresh'); const jsonOutput = args.includes('--json'); + // v0.30.x follow-up to PR #707: programmatic sourceId support so internal + // callers (performFullSync, future Step 6 paths) can route to a named + // source. The CLI `gbrain import` deliberately has no --source flag per + // PR #707's design intent — only programmatic callers thread sourceId. + const sourceId = opts.sourceId; + // v0.30.x follow-up to PR #707: when called from performFullSync (which + // owns its own source-scoped sync anchors via writeSyncAnchor), gate the + // legacy global config writes (sync.last_commit / sync.last_run / + // sync.repo_path) so a `--source X --full` run can't overwrite the + // global repo path with X's repo and contaminate a later bare + // `gbrain sync` against the default source. + const writeSyncConfig = opts.writeSyncConfig !== false; const workersIdx = args.indexOf('--workers'); const workersArg = workersIdx !== -1 ? args[workersIdx + 1] : null; // v0.22.13 (PR #490 Q2): shared parseWorkers helper rejects bad input @@ -110,8 +126,8 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com // up images when GBRAIN_EMBEDDING_MULTIMODAL=true so this branch is // unreachable when the gate is off; defense-in-depth check anyway. const result = isImageFilePath(relativePath) && process.env.GBRAIN_EMBEDDING_MULTIMODAL === 'true' - ? await importImageFile(eng, filePath, relativePath, { noEmbed }) - : await importFile(eng, filePath, relativePath, { noEmbed }); + ? await importImageFile(eng, filePath, relativePath, { noEmbed, sourceId }) + : await importFile(eng, filePath, relativePath, { noEmbed, sourceId }); if (result.status === 'imported') { imported++; chunksCreated += result.chunks; @@ -275,17 +291,19 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com const { recordSyncFailures } = await import('../core/sync.ts'); recordSyncFailures(failures, gitHead); } - if (failures.length === 0) { + if (failures.length === 0 && writeSyncConfig) { await engine.setConfig('sync.last_commit', gitHead); - } else { + } else if (failures.length > 0) { console.error( `\nImport completed with ${failures.length} failure(s). ` + `sync.last_commit NOT advanced — re-run 'gbrain sync' to retry, or ` + `'gbrain sync --skip-failed' to acknowledge and move past them.`, ); } - await engine.setConfig('sync.last_run', new Date().toISOString()); - await engine.setConfig('sync.repo_path', dir); + if (writeSyncConfig) { + await engine.setConfig('sync.last_run', new Date().toISOString()); + await engine.setConfig('sync.repo_path', dir); + } } return { imported, skipped, errors, chunksCreated, failures }; diff --git a/src/commands/migrate-engine.ts b/src/commands/migrate-engine.ts index 5984d1c90..0d40e43fa 100644 --- a/src/commands/migrate-engine.ts +++ b/src/commands/migrate-engine.ts @@ -117,11 +117,13 @@ export async function runMigrateEngine(sourceEngine: BrainEngine, args: string[] if (targetStats.page_count > 0 && opts.force) { console.log('--force: wiping target brain...'); - // Delete all pages (cascades to chunks, links, tags, etc.) - const pages = await targetEngine.listPages({ limit: 100000 }); - for (const p of pages) { - await targetEngine.deletePage(p.slug); - } + // v0.18.0+ multi-source: deletePage(slug) is now source-scoped (defaults + // to 'default'), so per-page iteration would skip non-default-source + // rows. migrate-engine --force is a destructive wipe across the entire + // brain — all sources, all pages — so we issue a raw DELETE that matches + // the original semantic. Cascades through content_chunks / page_links / + // tags / timeline_entries / page_versions via existing FKs. + await targetEngine.executeRaw('DELETE FROM pages'); } // Load or create manifest for resume diff --git a/src/commands/reconcile-links.ts b/src/commands/reconcile-links.ts index f41e77e2b..c2487bcfe 100644 --- a/src/commands/reconcile-links.ts +++ b/src/commands/reconcile-links.ts @@ -89,8 +89,16 @@ export async function runReconcileLinks( // Fetch pages one at a time via getPage (no bulk read helper exists yet). // On a 47K-page brain this is the slow path; a v0.20.x follow-up can add // getPagesBatch. For the typical 2K–5K markdown count it's fine. + // v0.18.0+ multi-source: source-scope getPage so reconcile picks up the + // intended-source row for `default`-vs-`` ambiguity. The link + // edges below also propagate the same sourceId (Data R1 MED 1: opt was + // declared on ReconcileLinksOpts but ignored end-to-end). + const getPageOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined; + const linkOpts = opts.sourceId + ? { fromSourceId: opts.sourceId, toSourceId: opts.sourceId, originSourceId: opts.sourceId } + : undefined; for (const mdSlug of mdSlugs) { - const page = await engine.getPage(mdSlug); + const page = await engine.getPage(mdSlug, getPageOpts); if (!page) { progress.tick(1, mdSlug); continue; @@ -113,10 +121,12 @@ export async function runReconcileLinks( const ctx = ref.line ? `cited at ${ref.path}:${ref.line}` : ref.path; edgesAttempted++; try { - // Forward: guide documents code. addLink's inner SELECT drops - // silently if codeSlug isn't a page yet (benign — counted below). - await engine.addLink(mdSlug, codeSlug, ctx, 'documents', 'markdown', mdSlug, 'compiled_truth'); - await engine.addLink(codeSlug, mdSlug, ref.path, 'documented_by', 'markdown', mdSlug, 'compiled_truth'); + // Forward: guide documents code. addLink's inner JOIN drops silently + // if codeSlug isn't a page yet (benign — counted below). Source- + // qualified per opts.sourceId; same-source assumption mirrors the + // import-file.ts:303 doc↔impl auto-link. + await engine.addLink(mdSlug, codeSlug, ctx, 'documents', 'markdown', mdSlug, 'compiled_truth', linkOpts); + await engine.addLink(codeSlug, mdSlug, ref.path, 'documented_by', 'markdown', mdSlug, 'compiled_truth', linkOpts); } catch (e: unknown) { // Per-link errors don't abort the batch. Track them for the summary. const msg = e instanceof Error ? e.message : String(e); diff --git a/src/commands/reindex-code.ts b/src/commands/reindex-code.ts index 713d2c9ed..2fbfc6396 100644 --- a/src/commands/reindex-code.ts +++ b/src/commands/reindex-code.ts @@ -200,6 +200,7 @@ export async function runReindexCode( const result = await importCodeFile(engine, relPath, row.compiled_truth, { noEmbed: opts.noEmbed, force: opts.force, + sourceId: opts.sourceId, }); if (result.status === 'imported') reindexed++; else if (result.status === 'skipped') skipped++; diff --git a/src/commands/sync.ts b/src/commands/sync.ts index 8d76e82fc..80c83552d 100644 --- a/src/commands/sync.ts +++ b/src/commands/sync.ts @@ -483,12 +483,16 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise !isSyncable(p, syncOpts)); + // v0.18.0+ multi-source: scope getPage + deletePage to opts.sourceId so + // unsyncable cleanup in source A doesn't accidentally sweep same-slug + // pages in sources B/C/D. + const pageOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined; for (const path of unsyncableModified) { const slug = resolveSlugForPath(path); try { - const existing = await engine.getPage(slug); + const existing = await engine.getPage(slug, pageOpts); if (existing) { - await engine.deletePage(slug); + await engine.deletePage(slug, pageOpts); console.log(` Deleted un-syncable page: ${slug}`); } } catch { /* ignore */ } @@ -550,11 +554,14 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise 0) { progress.start('sync.deletes', filtered.deleted.length); for (const path of filtered.deleted) { const slug = resolveSlugForPath(path); - await engine.deletePage(slug); + await engine.deletePage(slug, deleteOpts); pagesAffected.push(slug); progress.tick(1, slug); } @@ -567,18 +574,22 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise 0) { progress.start('sync.renames', filtered.renamed.length); + // v0.18.0+ multi-source: scope updateSlug so the rename only touches the + // source-A row, not every same-slug row across sources (which would + // either sweep them all OR violate (source_id, slug) UNIQUE). + const renameOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined; for (const { from, to } of filtered.renamed) { const oldSlug = resolveSlugForPath(from); const newSlug = resolveSlugForPath(to); try { - await engine.updateSlug(oldSlug, newSlug); + await engine.updateSlug(oldSlug, newSlug, renameOpts); } catch { // Slug doesn't exist or collision, treat as add } // Reimport at new path (picks up content changes) const filePath = join(repoPath, to); if (existsSync(filePath)) { - const result = await importFile(engine, filePath, to, { noEmbed }); + const result = await importFile(engine, filePath, to, { noEmbed, sourceId: opts.sourceId }); if (result.status === 'imported') chunksCreated += result.chunks; } pagesAffected.push(newSlug); @@ -633,7 +644,12 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise 0) { try { const { extractLinksForSlugs, extractTimelineForSlugs } = await import('./extract.ts'); - const linksCreated = await extractLinksForSlugs(engine, repoPath, pagesAffected); - const timelineCreated = await extractTimelineForSlugs(engine, repoPath, pagesAffected); + const linksCreated = await extractLinksForSlugs(engine, repoPath, pagesAffected, extractOpts); + const timelineCreated = await extractTimelineForSlugs(engine, repoPath, pagesAffected, extractOpts); if (linksCreated > 0 || timelineCreated > 0) { console.log(` Extracted: ${linksCreated} links, ${timelineCreated} timeline entries`); } } catch { /* extraction is best-effort */ } } - // Auto-embed (skip for large syncs — embedding calls OpenAI) + // Auto-embed (skip for large syncs — embedding calls OpenAI). + // TODO(multi-source): runEmbed → src/commands/embed.ts:175 + :418 call + // upsertChunks defaulting to source='default'. For non-default-source syncs + // the page row lives at (sourceId, slug) so this fails with "Page not found" + // OR (when a same-slug 'default' row coexists) updates the wrong source's + // chunks. Data R1 MED 2 — deferred to a follow-up PR; threading sourceId + // through embed.ts is a larger refactor than this fix's scope. The current + // try/catch swallows the failure as best-effort, so the sync result still + // reports `embedded: 0` for the right reason. let embedded = 0; if (!noEmbed && pagesAffected.length > 0 && pagesAffected.length <= 100) { try { @@ -889,7 +918,21 @@ async function performFullSync( const importArgs = [repoPath]; if (opts.noEmbed) importArgs.push('--no-embed'); if (fullConcurrency > 1) importArgs.push('--workers', String(fullConcurrency)); - const result = await runImport(engine, importArgs, { commit: headCommit }); + // v0.30.x follow-up to PR #707: + // - Thread sourceId through runImport's opts so performFullSync routes + // pages to the named source (the incremental sync path in this same + // file already does this). + // - Pass writeSyncConfig=false so runImport doesn't overwrite global + // `sync.last_commit` / `sync.last_run` / `sync.repo_path` keys with this + // source's values. performFullSync owns its own source-scoped anchors + // via writeSyncAnchor (below); without this gate, a `--source X --full` + // run leaves the global keys pointing at X, so a later bare + // `gbrain sync` reads X's repo path as the default-source repo. + const result = await runImport(engine, importArgs, { + commit: headCommit, + sourceId: opts.sourceId, + writeSyncConfig: !opts.sourceId, + }); // Bug 9 — gate the full-sync bookmark on success. runImport already // writes its own sync.last_commit conditionally (import.ts), but diff --git a/src/core/engine.ts b/src/core/engine.ts index 4f1a6f774..feb3f2146 100644 --- a/src/core/engine.ts +++ b/src/core/engine.ts @@ -343,7 +343,14 @@ export interface BrainEngine { * by `restore_page` flow, and by operator diagnostics. */ getPage(slug: string, opts?: GetPageOpts): Promise; - putPage(slug: string, page: PageInput): Promise; + /** + * Insert or update a page. When `opts.sourceId` is omitted, the row is + * written under the schema DEFAULT ('default'). When provided, `source_id` + * is included in the INSERT column list so ON CONFLICT (source_id, slug) + * DO UPDATE actually targets the intended row instead of fabricating a + * duplicate at (default, slug). Multi-source brains MUST pass sourceId. + */ + putPage(slug: string, page: PageInput, opts?: { sourceId?: string }): Promise; /** * Hard-delete a page row. Cascades to content_chunks, page_links, * chunk_relations via existing FK ON DELETE CASCADE. @@ -353,7 +360,13 @@ export interface BrainEngine { * as the underlying primitive used by `purgeDeletedPages` and by callers * that explicitly want hard-delete semantics (e.g. test setup teardown). */ - deletePage(slug: string): Promise; + /** + * v0.18.0+ multi-source: `opts.sourceId` scopes the DELETE so a source-A + * delete doesn't hard-delete the same-slug pages in sources B/C/D. Without + * it, the bare DELETE matches every row with that slug across all sources. + * Cascades through content_chunks / page_links / chunk_relations via FKs. + */ + deletePage(slug: string, opts?: { sourceId?: string }): Promise; /** * v0.26.5 — set `deleted_at = now()` on a page. Returns the slug if a row * was soft-deleted, null if no row matched (already soft-deleted OR not found). @@ -392,8 +405,20 @@ export interface BrainEngine { getEmbeddingsByChunkIds(ids: number[]): Promise>; // Chunks - upsertChunks(slug: string, chunks: ChunkInput[]): Promise; - getChunks(slug: string): Promise; + /** + * Replace the chunk set for a page. Internal page-id lookup is sourceId- + * scoped when `opts.sourceId` is given; without it, the schema DEFAULT + * matches and bare-slug lookup blows up if the same slug exists in + * multiple sources (Postgres 21000). + */ + upsertChunks(slug: string, chunks: ChunkInput[], opts?: { sourceId?: string }): Promise; + /** + * Read every chunk for a page. `opts.sourceId` source-scopes the page + * lookup; without it, multi-source brains return chunks from every + * same-slug source (importCodeFile uses this for incremental embedding + * reuse, which would then attach the wrong source's embeddings). + */ + getChunks(slug: string, opts?: { sourceId?: string }): Promise; /** * Count chunks across the entire brain where embedded_at IS NULL. * Pre-flight short-circuit for `embed --stale` so a 100%-embedded brain @@ -409,7 +434,12 @@ export interface BrainEngine { * Bounded by an internal LIMIT of 100000 to mirror listPages. */ listStaleChunks(): Promise; - deleteChunks(slug: string): Promise; + /** + * Delete every chunk for a page. Internal page-id lookup is sourceId-scoped + * when `opts.sourceId` is given; otherwise the bare-slug subquery returns + * the wrong row count in multi-source brains. + */ + deleteChunks(slug: string, opts?: { sourceId?: string }): Promise; // Links /** @@ -417,6 +447,12 @@ export interface BrainEngine { * with pre-v0.13 callers. Pass 'frontmatter' + originSlug + originField for * frontmatter-derived edges; 'manual' for user-initiated edges. */ + /** + * v0.18.0+ multi-source: each endpoint can live in a different source. + * `opts.fromSourceId` / `opts.toSourceId` / `opts.originSourceId` default to + * 'default'. Without these, the original cross-product `FROM pages f, pages t` + * fanned out across every source containing the slug. + */ addLink( from: string, to: string, @@ -425,6 +461,7 @@ export interface BrainEngine { linkSource?: string, originSlug?: string, originField?: string, + opts?: { fromSourceId?: string; toSourceId?: string; originSourceId?: string }, ): Promise; /** * Bulk insert links via a single multi-row INSERT...SELECT FROM (VALUES) JOIN pages @@ -441,7 +478,13 @@ export interface BrainEngine { * 'manual') — used by runAutoLink reconciliation to avoid deleting edges from * other provenances when pruning frontmatter-derived edges. */ - removeLink(from: string, to: string, linkType?: string, linkSource?: string): Promise; + removeLink( + from: string, + to: string, + linkType?: string, + linkSource?: string, + opts?: { fromSourceId?: string; toSourceId?: string }, + ): Promise; getLinks(slug: string): Promise; getBacklinks(slug: string): Promise; /** @@ -519,9 +562,15 @@ export interface BrainEngine { findOrphanPages(): Promise>; // Tags - addTag(slug: string, tag: string): Promise; - removeTag(slug: string, tag: string): Promise; - getTags(slug: string): Promise; + /** + * v0.18.0+ multi-source: `opts.sourceId` scopes the page-id lookup. When + * omitted, the schema DEFAULT 'default' applies; in multi-source brains + * with the same slug across sources the bare-slug lookup returns >1 row + * and the INSERT/DELETE fails with Postgres 21000. + */ + addTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise; + removeTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise; + getTags(slug: string, opts?: { sourceId?: string }): Promise; // Timeline /** @@ -530,10 +579,17 @@ export interface BrainEngine { * known to exist (e.g., from a getAllSlugs() snapshot). Duplicates are silently * deduplicated by the (page_id, date, summary) UNIQUE index (ON CONFLICT DO NOTHING). */ + /** + * Insert a timeline entry. By default verifies the page exists and throws if not. + * `opts.skipExistenceCheck` skips the pre-check for batch loops where the slug + * is already known to exist. `opts.sourceId` source-scopes both the existence + * check AND the page-id lookup inside the INSERT — required for multi-source + * brains where the slug exists in 2+ sources. + */ addTimelineEntry( slug: string, entry: TimelineInput, - opts?: { skipExistenceCheck?: boolean }, + opts?: { skipExistenceCheck?: boolean; sourceId?: string }, ): Promise; /** * Bulk insert timeline entries via a single multi-row INSERT...SELECT FROM (VALUES) @@ -670,7 +726,12 @@ export interface BrainEngine { putDreamVerdict(filePath: string, contentHash: string, verdict: DreamVerdictInput): Promise; // Versions - createVersion(slug: string): Promise; + /** + * Snapshot a page row into page_versions. Source-scoped via `opts.sourceId`; + * without it the bare-slug lookup snapshots whichever row Postgres returns + * first when the slug exists across multiple sources. + */ + createVersion(slug: string, opts?: { sourceId?: string }): Promise; getVersions(slug: string): Promise; revertToVersion(slug: string, versionId: number): Promise; @@ -683,7 +744,13 @@ export interface BrainEngine { getIngestLog(opts?: { limit?: number }): Promise; // Sync - updateSlug(oldSlug: string, newSlug: string): Promise; + /** + * Rename a page's slug (chunks + links + tags + timeline + versions all + * preserved via stable page_id). `opts.sourceId` scopes the UPDATE — without + * it, the bare `WHERE slug = old` matches every row across every source and + * would either rename them all OR violate the (source_id, slug) UNIQUE. + */ + updateSlug(oldSlug: string, newSlug: string, opts?: { sourceId?: string }): Promise; rewriteLinks(oldSlug: string, newSlug: string): Promise; // Config diff --git a/src/core/import-file.ts b/src/core/import-file.ts index 076af7a42..60bd84ce7 100644 --- a/src/core/import-file.ts +++ b/src/core/import-file.ts @@ -188,6 +188,7 @@ export async function importFromContent( content: string, opts: { noEmbed?: boolean; + sourceId?: string; /** * v0.29.1: basename without extension for filename-date precedence on * `daily/`, `meetings/` slugs. importFromFile threads this from the @@ -196,6 +197,12 @@ export async function importFromContent( filename?: string; } = {}, ): Promise { + // v0.18.0+ multi-source: when caller is syncing under a non-default source, + // every per-page tx call must carry `sourceId` so writes target the right + // (source_id, slug) row. Pre-fix, putPage relied on the schema DEFAULT and + // silently fabricated a duplicate at (default, slug) — causing later + // bare-slug subqueries (getTags, deleteChunks, etc.) to crash with 21000. + const sourceId = opts.sourceId; // Reject oversized payloads before any parsing, chunking, or embedding happens. // Uses Buffer.byteLength to count UTF-8 bytes the same way disk size would, // so the network path behaves identically to the file path. @@ -232,7 +239,7 @@ export async function importFromContent( tags: parsed.tags, }; - const existing = await engine.getPage(slug); + const existing = await engine.getPage(slug, sourceId ? { sourceId } : undefined); if (existing?.content_hash === hash) { return { slug, status: 'skipped', chunks: 0, parsedPage }; } @@ -268,9 +275,13 @@ export async function importFromContent( } } - // Transaction wraps all DB writes + // Transaction wraps all DB writes. Every per-page tx call carries the + // caller's sourceId so writes target (sourceId, slug) rather than the + // schema DEFAULT — required for multi-source brains; harmless ('default') + // for single-source callers. + const txOpts = sourceId ? { sourceId } : undefined; await engine.transaction(async (tx) => { - if (existing) await tx.createVersion(slug); + if (existing) await tx.createVersion(slug, txOpts); // v0.29.1 — compute effective_date from frontmatter precedence chain. // Filename comes from importFromFile path (basename) or the slug tail @@ -299,23 +310,23 @@ export async function importFromContent( effective_date: effectiveDate, effective_date_source: effectiveDateSource, import_filename: filenameForChain, - }); + }, txOpts); // Tag reconciliation: remove stale, add current - const existingTags = await tx.getTags(slug); + const existingTags = await tx.getTags(slug, txOpts); const newTags = new Set(parsed.tags); for (const old of existingTags) { - if (!newTags.has(old)) await tx.removeTag(slug, old); + if (!newTags.has(old)) await tx.removeTag(slug, old, txOpts); } for (const tag of parsed.tags) { - await tx.addTag(slug, tag); + await tx.addTag(slug, tag, txOpts); } if (chunks.length > 0) { - await tx.upsertChunks(slug, chunks); + await tx.upsertChunks(slug, chunks, txOpts); } else { // Content is empty — delete stale chunks so they don't ghost in search results - await tx.deleteChunks(slug); + await tx.deleteChunks(slug, txOpts); } // v0.19.0 E1 — doc↔impl linking: if this markdown page cites code paths @@ -325,6 +336,15 @@ export async function importFromContent( // before their code repo syncs are common, and the missing edges land // later via `gbrain reconcile-links` (Layer 8 D3, v0.21.0). const codeRefs = extractCodeRefs(parsed.compiled_truth + '\n' + (parsed.timeline || '')); + // For doc↔impl edges, both endpoints are within the same source as the + // markdown page being imported. Cross-source edges (markdown in one + // source, code in another) currently fail with "page not found" — a + // faster failure mode than the pre-fix cross-product fan-out, which + // silently wired edges to whichever same-slug page Postgres returned + // first across sources. + const linkOpts = sourceId + ? { fromSourceId: sourceId, toSourceId: sourceId, originSourceId: sourceId } + : undefined; for (const ref of codeRefs) { const codeSlug = slugifyCodePath(ref.path); // Forward: markdown guide → code page (this guide documents that code) @@ -333,6 +353,7 @@ export async function importFromContent( slug, codeSlug, ref.line ? `cited at ${ref.path}:${ref.line}` : ref.path, 'documents', 'markdown', slug, 'compiled_truth', + linkOpts, ); } catch { /* code page not yet imported — reconcile-links will catch it */ } // Reverse: code page → markdown guide (this code is documented by the guide) @@ -340,6 +361,7 @@ export async function importFromContent( await tx.addLink( codeSlug, slug, ref.path, 'documented_by', 'markdown', slug, 'compiled_truth', + linkOpts, ); } catch { /* same reason — silent skip */ } } @@ -362,7 +384,7 @@ export async function importFromFile( engine: BrainEngine, filePath: string, relativePath: string, - opts: { noEmbed?: boolean; inferFrontmatter?: boolean } = {}, + opts: { noEmbed?: boolean; inferFrontmatter?: boolean; sourceId?: string } = {}, ): Promise { // Defense-in-depth: reject symlinks before reading content. const lstat = lstatSync(filePath); @@ -379,7 +401,10 @@ export async function importFromFile( // Route code files through the code import path if (isCodeFilePath(relativePath)) { - return importCodeFile(engine, relativePath, content, opts); + return importCodeFile(engine, relativePath, content, { + noEmbed: opts.noEmbed, + sourceId: opts.sourceId, + }); } // v0.22.8 — Frontmatter inference: if the file has no frontmatter and @@ -431,11 +456,13 @@ export async function importCodeFile( engine: BrainEngine, relativePath: string, content: string, - opts: { noEmbed?: boolean; force?: boolean } = {}, + opts: { noEmbed?: boolean; force?: boolean; sourceId?: string } = {}, ): Promise { const slug = slugifyCodePath(relativePath); const lang = detectCodeLanguage(relativePath) || 'unknown'; const title = `${relativePath} (${lang})`; + const sourceId = opts.sourceId; + const txOpts = sourceId ? { sourceId } : undefined; const byteLength = Buffer.byteLength(content, 'utf-8'); if (byteLength > MAX_FILE_SIZE) { @@ -448,7 +475,7 @@ export async function importCodeFile( .update(JSON.stringify({ title, type: 'code', content, lang, chunker_version: CHUNKER_VERSION })) .digest('hex'); - const existing = await engine.getPage(slug); + const existing = await engine.getPage(slug, sourceId ? { sourceId } : undefined); if (!opts.force && existing?.content_hash === hash) { return { slug, status: 'skipped', chunks: 0 }; } @@ -486,7 +513,7 @@ export async function importCodeFile( // OpenAI API. Order matters: our chunk_index is semantic (tree-sitter // order), so a matching (chunk_index, text_hash) means a verbatim // preserved symbol. - const existingChunks = existing ? await engine.getChunks(slug) : []; + const existingChunks = existing ? await engine.getChunks(slug, sourceId ? { sourceId } : undefined) : []; const existingByKey = new Map(); for (const ec of existingChunks) { existingByKey.set(`${ec.chunk_index}:${ec.chunk_text}`, ec); @@ -519,9 +546,11 @@ export async function importCodeFile( } } - // Store + // Store. Every per-page tx call carries `txOpts.sourceId` so multi-source + // brains write to the correct (source_id, slug) row instead of duplicating + // under the schema DEFAULT. await engine.transaction(async (tx) => { - if (existing) await tx.createVersion(slug); + if (existing) await tx.createVersion(slug, txOpts); await tx.putPage(slug, { type: 'code' as PageType, @@ -531,15 +560,15 @@ export async function importCodeFile( timeline: '', frontmatter: { language: lang, file: relativePath }, content_hash: hash, - }); + }, txOpts); - await tx.addTag(slug, 'code'); - await tx.addTag(slug, lang); + await tx.addTag(slug, 'code', txOpts); + await tx.addTag(slug, lang, txOpts); if (chunks.length > 0) { - await tx.upsertChunks(slug, chunks); + await tx.upsertChunks(slug, chunks, txOpts); } else { - await tx.deleteChunks(slug); + await tx.deleteChunks(slug, txOpts); } }); @@ -550,7 +579,7 @@ export async function importCodeFile( // chunk IDs are stable. if (extractedEdges.length > 0 && chunks.length > 0) { try { - const persistedChunks = await engine.getChunks(slug); + const persistedChunks = await engine.getChunks(slug, sourceId ? { sourceId } : undefined); const byIndex = new Map(); for (const pc of persistedChunks) { byIndex.set(pc.chunk_index, pc); @@ -647,6 +676,14 @@ export interface ImportTransactionSpec { chunks?: ChunkInput[]; /** Optional file-row insert (image ingest). Page link injected automatically. */ file?: FileSpec; + /** + * v0.30.x follow-up to PR #707: source the entire transaction's writes to + * a named source. When omitted, every tx call defaults to source='default'. + * Mirrors importFromContent's source-aware threading; required for image + * imports under multi-source sync (otherwise pages, chunks, files, and + * sibling addLinks all silently route to default). + */ + sourceId?: string; /** Inside-transaction hook for type-specific work (tags, links). */ after?: (tx: BrainEngine) => Promise; } @@ -655,23 +692,25 @@ export async function withImportTransaction( engine: BrainEngine, spec: ImportTransactionSpec, ): Promise { + const sourceOpts = spec.sourceId ? { sourceId: spec.sourceId } : undefined; await engine.transaction(async (tx) => { - if (spec.hadExisting) await tx.createVersion(spec.slug); - await tx.putPage(spec.slug, spec.page); + if (spec.hadExisting) await tx.createVersion(spec.slug, sourceOpts); + await tx.putPage(spec.slug, spec.page, sourceOpts); if (spec.file) { // page_id resolution after putPage so the new row's id is available. - const stored = await tx.getPage(spec.slug); + const stored = await tx.getPage(spec.slug, sourceOpts); await tx.upsertFile({ ...spec.file, + source_id: spec.sourceId ?? spec.file.source_id, page_slug: spec.slug, page_id: stored?.id ?? null, }); } if (spec.chunks !== undefined) { if (spec.chunks.length > 0) { - await tx.upsertChunks(spec.slug, spec.chunks); + await tx.upsertChunks(spec.slug, spec.chunks, sourceOpts); } else { - await tx.deleteChunks(spec.slug); + await tx.deleteChunks(spec.slug, sourceOpts); } } if (spec.after) await spec.after(tx); @@ -857,6 +896,13 @@ export interface ImportImageOptions { ocrConcurrency?: number; /** Skip the embed call (for tests that want fast metadata-only inserts). */ noEmbed?: boolean; + /** + * Route image-page, chunk, file, and sibling-link writes to a named source. + * Mirrors importFromContent's source-aware threading; without this, image + * imports invoked under multi-source full-sync would silently land all + * rows in source='default' even when the caller passed sourceId. + */ + sourceId?: string; } /** Module-level limiter so concurrent imports across files share the budget. */ @@ -898,7 +944,8 @@ export async function importImageFile( const buf = readFileSync(filePath); const hash = createHash('sha256').update(buf).digest('hex'); - const existing = await engine.getPage(imageSlug); + const sourceOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined; + const existing = await engine.getPage(imageSlug, sourceOpts); if (existing?.content_hash === hash) { return { slug: imageSlug, status: 'skipped', chunks: 0 }; } @@ -974,6 +1021,7 @@ export async function importImageFile( await withImportTransaction(engine, { slug: imageSlug, hadExisting: !!existing, + sourceId: opts.sourceId, page: { type: 'image', page_kind: 'image', @@ -990,14 +1038,25 @@ export async function importImageFile( // matching candidate gets an image_of edge. Best-effort — addLink // throws when the target doesn't exist; we silently skip for now and // let `gbrain reconcile-links` pick up later additions. + // + // The sibling lookup + edge are scoped to opts.sourceId so a + // multi-source brain can't accidentally cross-link an image in source + // X to a same-named text page in source Y. for (const candidate of imageOfCandidates(imageSlug)) { - const sibling = await tx.getPage(candidate); + const sibling = await tx.getPage(candidate, sourceOpts); if (sibling) { try { await tx.addLink( imageSlug, candidate, filename, 'image_of', 'manual', imageSlug, 'frontmatter', + opts.sourceId + ? { + fromSourceId: opts.sourceId, + toSourceId: opts.sourceId, + originSourceId: opts.sourceId, + } + : undefined, ); } catch { /* sibling vanished mid-tx; skip */ } break; // one canonical link per image diff --git a/src/core/pglite-engine.ts b/src/core/pglite-engine.ts index 7b3d4c066..e71fcedb0 100644 --- a/src/core/pglite-engine.ts +++ b/src/core/pglite-engine.ts @@ -461,12 +461,16 @@ export class PGLiteEngine implements BrainEngine { return rowToPage(rows[0] as Record); } - async putPage(slug: string, page: PageInput): Promise { + async putPage(slug: string, page: PageInput, opts?: { sourceId?: string }): Promise { slug = validateSlug(slug); const hash = page.content_hash || contentHash(page); const frontmatter = page.frontmatter || {}; + const sourceId = opts?.sourceId ?? 'default'; - // v0.18.0 Step 2: source_id relies on the schema DEFAULT 'default'. + // v0.18.0 Step 5+: source_id is now in the INSERT column list so multi- + // source callers land on the intended (source_id, slug) row. Omitting it + // let the schema DEFAULT 'default' apply, fabricating duplicate slugs that + // later made bare-slug subqueries return multiple rows. // ON CONFLICT target is (source_id, slug); global UNIQUE(slug) dropped in v17. const pageKind = page.page_kind || 'markdown'; // v0.29.1 — additive opt-in columns. COALESCE(EXCLUDED.x, pages.x) @@ -478,8 +482,8 @@ export class PGLiteEngine implements BrainEngine { const effectiveDateSource = page.effective_date_source ?? null; const importFilename = page.import_filename ?? null; const { rows } = await this.db.query( - `INSERT INTO pages (slug, type, page_kind, title, compiled_truth, timeline, frontmatter, content_hash, updated_at, effective_date, effective_date_source, import_filename) - VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8, now(), $9::timestamptz, $10, $11) + `INSERT INTO pages (source_id, slug, type, page_kind, title, compiled_truth, timeline, frontmatter, content_hash, updated_at, effective_date, effective_date_source, import_filename) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb, $9, now(), $10::timestamptz, $11, $12) ON CONFLICT (source_id, slug) DO UPDATE SET type = EXCLUDED.type, page_kind = EXCLUDED.page_kind, @@ -493,13 +497,17 @@ export class PGLiteEngine implements BrainEngine { effective_date_source = COALESCE(EXCLUDED.effective_date_source, pages.effective_date_source), import_filename = COALESCE(EXCLUDED.import_filename, pages.import_filename) RETURNING id, slug, type, title, compiled_truth, timeline, frontmatter, content_hash, created_at, updated_at, effective_date, effective_date_source, import_filename`, - [slug, page.type, pageKind, page.title, page.compiled_truth, page.timeline || '', JSON.stringify(frontmatter), hash, effectiveDate, effectiveDateSource, importFilename] + [sourceId, slug, page.type, pageKind, page.title, page.compiled_truth, page.timeline || '', JSON.stringify(frontmatter), hash, effectiveDate, effectiveDateSource, importFilename] ); return rowToPage(rows[0] as Record); } - async deletePage(slug: string): Promise { - await this.db.query('DELETE FROM pages WHERE slug = $1', [slug]); + async deletePage(slug: string, opts?: { sourceId?: string }): Promise { + const sourceId = opts?.sourceId ?? 'default'; + await this.db.query( + 'DELETE FROM pages WHERE slug = $1 AND source_id = $2', + [slug, sourceId] + ); } async softDeletePage(slug: string, opts?: { sourceId?: string }): Promise<{ slug: string } | null> { @@ -896,10 +904,16 @@ export class PGLiteEngine implements BrainEngine { } // Chunks - async upsertChunks(slug: string, chunks: ChunkInput[]): Promise { - // Get page_id - const pageResult = await this.db.query('SELECT id FROM pages WHERE slug = $1', [slug]); - if (pageResult.rows.length === 0) throw new Error(`Page not found: ${slug}`); + async upsertChunks(slug: string, chunks: ChunkInput[], opts?: { sourceId?: string }): Promise { + const sourceId = opts?.sourceId ?? 'default'; + + // Source-scope the page-id lookup so duplicate slugs in different sources + // do not return multiple rows or target the wrong page. + const pageResult = await this.db.query( + 'SELECT id FROM pages WHERE slug = $1 AND source_id = $2', + [slug, sourceId] + ); + if (pageResult.rows.length === 0) throw new Error(`Page not found: ${slug} (source=${sourceId})`); const pageId = (pageResult.rows[0] as { id: number }).id; // Remove chunks that no longer exist @@ -1000,13 +1014,14 @@ export class PGLiteEngine implements BrainEngine { ); } - async getChunks(slug: string): Promise { + async getChunks(slug: string, opts?: { sourceId?: string }): Promise { + const sourceId = opts?.sourceId ?? 'default'; const { rows } = await this.db.query( `SELECT cc.* FROM content_chunks cc JOIN pages p ON p.id = cc.page_id - WHERE p.slug = $1 + WHERE p.slug = $1 AND p.source_id = $2 ORDER BY cc.chunk_index`, - [slug] + [slug, sourceId] ); return (rows as Record[]).map(r => rowToChunk(r)); } @@ -1034,11 +1049,13 @@ export class PGLiteEngine implements BrainEngine { return rows as unknown as StaleChunkRow[]; } - async deleteChunks(slug: string): Promise { + async deleteChunks(slug: string, opts?: { sourceId?: string }): Promise { + const sourceId = opts?.sourceId ?? 'default'; + // Source-qualify the page-id subquery; slugs are only unique per source. await this.db.query( `DELETE FROM content_chunks - WHERE page_id = (SELECT id FROM pages WHERE slug = $1)`, - [slug] + WHERE page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2)`, + [slug, sourceId] ); } @@ -1051,19 +1068,38 @@ export class PGLiteEngine implements BrainEngine { linkSource?: string, originSlug?: string, originField?: string, + opts?: { fromSourceId?: string; toSourceId?: string; originSourceId?: string }, ): Promise { + const fromSrc = opts?.fromSourceId ?? 'default'; + const toSrc = opts?.toSourceId ?? 'default'; + const originSrc = opts?.originSourceId ?? 'default'; + + // Source-qualified pre-check gives a clean missing-page error before the + // INSERT SELECT path can silently return zero rows. + const exists = await this.db.query( + `SELECT 1 FROM pages WHERE slug = $1 AND source_id = $2 + INTERSECT + SELECT 1 FROM pages WHERE slug = $3 AND source_id = $4`, + [from, fromSrc, to, toSrc] + ); + if (exists.rows.length === 0) { + throw new Error(`addLink failed: page "${from}" (source=${fromSrc}) or "${to}" (source=${toSrc}) not found`); + } const src = linkSource ?? 'markdown'; + // Mirror addLinksBatch's VALUES + composite JOIN shape. The old cross- + // product over pages f/t fanned out across sources containing the slugs. await this.db.query( `INSERT INTO links (from_page_id, to_page_id, link_type, context, link_source, origin_page_id, origin_field) - SELECT f.id, t.id, $3, $4, $5, - (SELECT id FROM pages WHERE slug = $6), - $7 - FROM pages f, pages t - WHERE f.slug = $1 AND t.slug = $2 + SELECT f.id, t.id, v.link_type, v.context, v.link_source, o.id, v.origin_field + FROM (VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)) + AS v(from_slug, to_slug, link_type, context, link_source, origin_slug, origin_field, from_source_id, to_source_id, origin_source_id) + JOIN pages f ON f.slug = v.from_slug AND f.source_id = v.from_source_id + JOIN pages t ON t.slug = v.to_slug AND t.source_id = v.to_source_id + LEFT JOIN pages o ON o.slug = v.origin_slug AND o.source_id = v.origin_source_id ON CONFLICT (from_page_id, to_page_id, link_type, link_source, origin_page_id) DO UPDATE SET context = EXCLUDED.context, origin_field = EXCLUDED.origin_field`, - [from, to, linkType || '', context || '', src, originSlug ?? null, originField ?? null] + [from, to, linkType || '', context || '', src, originSlug ?? null, originField ?? null, fromSrc, toSrc, originSrc] ); } @@ -1102,38 +1138,48 @@ export class PGLiteEngine implements BrainEngine { return result.rows.length; } - async removeLink(from: string, to: string, linkType?: string, linkSource?: string): Promise { + async removeLink( + from: string, + to: string, + linkType?: string, + linkSource?: string, + opts?: { fromSourceId?: string; toSourceId?: string }, + ): Promise { + const fromSrc = opts?.fromSourceId ?? 'default'; + const toSrc = opts?.toSourceId ?? 'default'; + // Each branch source-qualifies page-id subqueries so a delete only targets + // the intended edge between per-source slug rows. if (linkType !== undefined && linkSource !== undefined) { await this.db.query( `DELETE FROM links - WHERE from_page_id = (SELECT id FROM pages WHERE slug = $1) - AND to_page_id = (SELECT id FROM pages WHERE slug = $2) - AND link_type = $3 - AND link_source IS NOT DISTINCT FROM $4`, - [from, to, linkType, linkSource] + WHERE from_page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2) + AND to_page_id = (SELECT id FROM pages WHERE slug = $3 AND source_id = $4) + AND link_type = $5 + AND link_source IS NOT DISTINCT FROM $6`, + [from, fromSrc, to, toSrc, linkType, linkSource] ); } else if (linkType !== undefined) { await this.db.query( `DELETE FROM links - WHERE from_page_id = (SELECT id FROM pages WHERE slug = $1) - AND to_page_id = (SELECT id FROM pages WHERE slug = $2) - AND link_type = $3`, - [from, to, linkType] + WHERE from_page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2) + AND to_page_id = (SELECT id FROM pages WHERE slug = $3 AND source_id = $4) + AND link_type = $5`, + [from, fromSrc, to, toSrc, linkType] ); } else if (linkSource !== undefined) { await this.db.query( `DELETE FROM links - WHERE from_page_id = (SELECT id FROM pages WHERE slug = $1) - AND to_page_id = (SELECT id FROM pages WHERE slug = $2) - AND link_source IS NOT DISTINCT FROM $3`, - [from, to, linkSource] + WHERE from_page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2) + AND to_page_id = (SELECT id FROM pages WHERE slug = $3 AND source_id = $4) + AND link_source IS NOT DISTINCT FROM $5`, + [from, fromSrc, to, toSrc, linkSource] ); } else { await this.db.query( `DELETE FROM links - WHERE from_page_id = (SELECT id FROM pages WHERE slug = $1) - AND to_page_id = (SELECT id FROM pages WHERE slug = $2)`, - [from, to] + WHERE from_page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2) + AND to_page_id = (SELECT id FROM pages WHERE slug = $3 AND source_id = $4)`, + [from, fromSrc, to, toSrc] ); } } @@ -1431,30 +1477,42 @@ export class PGLiteEngine implements BrainEngine { } // Tags - async addTag(slug: string, tag: string): Promise { + async addTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise { + const sourceId = opts?.sourceId ?? 'default'; + // Pre-check source-scoped page existence; ON CONFLICT only handles the + // already-tagged case, not missing pages. + const page = await this.db.query( + 'SELECT id FROM pages WHERE slug = $1 AND source_id = $2', + [slug, sourceId] + ); + if (page.rows.length === 0) throw new Error(`addTag failed: page "${slug}" (source=${sourceId}) not found`); await this.db.query( `INSERT INTO tags (page_id, tag) - SELECT id, $2 FROM pages WHERE slug = $1 + VALUES ($1, $2) ON CONFLICT (page_id, tag) DO NOTHING`, - [slug, tag] + [(page.rows[0] as { id: number }).id, tag] ); } - async removeTag(slug: string, tag: string): Promise { + async removeTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise { + const sourceId = opts?.sourceId ?? 'default'; + // Source-qualify the page-id subquery; slugs are only unique per source. await this.db.query( `DELETE FROM tags - WHERE page_id = (SELECT id FROM pages WHERE slug = $1) - AND tag = $2`, - [slug, tag] + WHERE page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2) + AND tag = $3`, + [slug, sourceId, tag] ); } - async getTags(slug: string): Promise { + async getTags(slug: string, opts?: { sourceId?: string }): Promise { + const sourceId = opts?.sourceId ?? 'default'; + // Source-qualify the page-id subquery; slugs are only unique per source. const { rows } = await this.db.query( `SELECT tag FROM tags - WHERE page_id = (SELECT id FROM pages WHERE slug = $1) + WHERE page_id = (SELECT id FROM pages WHERE slug = $1 AND source_id = $2) ORDER BY tag`, - [slug] + [slug, sourceId] ); return (rows as { tag: string }[]).map(r => r.tag); } @@ -1463,22 +1521,27 @@ export class PGLiteEngine implements BrainEngine { async addTimelineEntry( slug: string, entry: TimelineInput, - opts?: { skipExistenceCheck?: boolean }, + opts?: { skipExistenceCheck?: boolean; sourceId?: string }, ): Promise { + const sourceId = opts?.sourceId ?? 'default'; if (!opts?.skipExistenceCheck) { - const { rows } = await this.db.query('SELECT 1 FROM pages WHERE slug = $1', [slug]); + const { rows } = await this.db.query( + 'SELECT 1 FROM pages WHERE slug = $1 AND source_id = $2', + [slug, sourceId] + ); if (rows.length === 0) { - throw new Error(`Page not found: ${slug}`); + throw new Error(`addTimelineEntry failed: page "${slug}" (source=${sourceId}) not found`); } } // ON CONFLICT DO NOTHING via the (page_id, date, summary) unique index. - // If insert is a no-op (duplicate), no row is returned; that's intentional. + // Source-qualify the page-id lookup so multi-source brains don't fan + // timeline rows out across every source containing the slug. await this.db.query( `INSERT INTO timeline_entries (page_id, date, source, summary, detail) SELECT id, $2::date, $3, $4, $5 - FROM pages WHERE slug = $1 + FROM pages WHERE slug = $1 AND source_id = $6 ON CONFLICT (page_id, date, summary) DO NOTHING`, - [slug, entry.date, entry.source || '', entry.summary, entry.detail || ''] + [slug, entry.date, entry.source || '', entry.summary, entry.detail || '', sourceId] ); } @@ -2038,14 +2101,16 @@ export class PGLiteEngine implements BrainEngine { } // Versions - async createVersion(slug: string): Promise { + async createVersion(slug: string, opts?: { sourceId?: string }): Promise { + const sourceId = opts?.sourceId ?? 'default'; const { rows } = await this.db.query( `INSERT INTO page_versions (page_id, compiled_truth, frontmatter) SELECT id, compiled_truth, frontmatter - FROM pages WHERE slug = $1 + FROM pages WHERE slug = $1 AND source_id = $2 RETURNING *`, - [slug] + [slug, sourceId] ); + if (rows.length === 0) throw new Error(`createVersion failed: page "${slug}" (source=${sourceId}) not found`); return rows[0] as unknown as PageVersion; } @@ -2213,11 +2278,14 @@ export class PGLiteEngine implements BrainEngine { } // Sync - async updateSlug(oldSlug: string, newSlug: string): Promise { + async updateSlug(oldSlug: string, newSlug: string, opts?: { sourceId?: string }): Promise { newSlug = validateSlug(newSlug); + const sourceId = opts?.sourceId ?? 'default'; + // Source-qualify so a rename in source A doesn't sweep up same-slug rows + // in sources B/C/D (mirrors postgres-engine.ts). await this.db.query( - `UPDATE pages SET slug = $1, updated_at = now() WHERE slug = $2`, - [newSlug, oldSlug] + `UPDATE pages SET slug = $1, updated_at = now() WHERE slug = $2 AND source_id = $3`, + [newSlug, oldSlug, sourceId] ); } diff --git a/src/core/postgres-engine.ts b/src/core/postgres-engine.ts index ad3f0ad6b..17d66d353 100644 --- a/src/core/postgres-engine.ts +++ b/src/core/postgres-engine.ts @@ -497,15 +497,20 @@ export class PostgresEngine implements BrainEngine { return rowToPage(rows[0]); } - async putPage(slug: string, page: PageInput): Promise { + async putPage(slug: string, page: PageInput, opts?: { sourceId?: string }): Promise { slug = validateSlug(slug); const sql = this.sql; const hash = page.content_hash || contentHash(page); const frontmatter = page.frontmatter || {}; - - // v0.18.0 Step 2: source_id relies on schema DEFAULT 'default'. ON - // CONFLICT target becomes (source_id, slug) since global UNIQUE(slug) - // was dropped in migration v17. + const sourceId = opts?.sourceId ?? 'default'; + + // v0.18.0 Step 5+: source_id is now in the INSERT column list so multi- + // source callers actually land on the (source_id, slug) row they intend. + // Pre-fix: omitting source_id let the schema DEFAULT 'default' apply, so + // a caller syncing under 'jarvis-memory' silently fabricated a duplicate + // at (default, slug); subsequent bare-slug subqueries (getTags, deleteChunks, + // etc.) then matched 2 rows and blew up with Postgres 21000. + // ON CONFLICT target is (source_id, slug); global UNIQUE(slug) dropped in v17. const pageKind = page.page_kind || 'markdown'; // v0.29.1 — effective_date / effective_date_source / import_filename are // additive opt-in inputs from the importer (computeEffectiveDate). When @@ -516,8 +521,8 @@ export class PostgresEngine implements BrainEngine { const effectiveDateSource = page.effective_date_source ?? null; const importFilename = page.import_filename ?? null; const rows = await sql` - INSERT INTO pages (slug, type, page_kind, title, compiled_truth, timeline, frontmatter, content_hash, updated_at, effective_date, effective_date_source, import_filename) - VALUES (${slug}, ${page.type}, ${pageKind}, ${page.title}, ${page.compiled_truth}, ${page.timeline || ''}, ${sql.json(frontmatter as Parameters[0])}, ${hash}, now(), ${effectiveDate}, ${effectiveDateSource}, ${importFilename}) + INSERT INTO pages (source_id, slug, type, page_kind, title, compiled_truth, timeline, frontmatter, content_hash, updated_at, effective_date, effective_date_source, import_filename) + VALUES (${sourceId}, ${slug}, ${page.type}, ${pageKind}, ${page.title}, ${page.compiled_truth}, ${page.timeline || ''}, ${sql.json(frontmatter as Parameters[0])}, ${hash}, now(), ${effectiveDate}, ${effectiveDateSource}, ${importFilename}) ON CONFLICT (source_id, slug) DO UPDATE SET type = EXCLUDED.type, page_kind = EXCLUDED.page_kind, @@ -535,9 +540,10 @@ export class PostgresEngine implements BrainEngine { return rowToPage(rows[0]); } - async deletePage(slug: string): Promise { + async deletePage(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; - await sql`DELETE FROM pages WHERE slug = ${slug}`; + const sourceId = opts?.sourceId ?? 'default'; + await sql`DELETE FROM pages WHERE slug = ${slug} AND source_id = ${sourceId}`; } async softDeletePage(slug: string, opts?: { sourceId?: string }): Promise<{ slug: string } | null> { @@ -1016,12 +1022,15 @@ export class PostgresEngine implements BrainEngine { } // Chunks - async upsertChunks(slug: string, chunks: ChunkInput[]): Promise { + async upsertChunks(slug: string, chunks: ChunkInput[], opts?: { sourceId?: string }): Promise { const sql = this.sql; + const sourceId = opts?.sourceId ?? 'default'; - // Get page_id - const pages = await sql`SELECT id FROM pages WHERE slug = ${slug}`; - if (pages.length === 0) throw new Error(`Page not found: ${slug}`); + // Source-scope the page-id lookup. Without this filter, multi-source + // brains where the slug exists in 2+ sources return >1 row and the + // chunk replacement targets the wrong page (or fans out across pages). + const pages = await sql`SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${sourceId}`; + if (pages.length === 0) throw new Error(`Page not found: ${slug} (source=${sourceId})`); const pageId = pages[0].id; // Remove chunks that no longer exist (chunk_index beyond new count) @@ -1117,12 +1126,13 @@ export class PostgresEngine implements BrainEngine { ); } - async getChunks(slug: string): Promise { + async getChunks(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; + const sourceId = opts?.sourceId ?? 'default'; const rows = await sql` SELECT cc.* FROM content_chunks cc JOIN pages p ON p.id = cc.page_id - WHERE p.slug = ${slug} + WHERE p.slug = ${slug} AND p.source_id = ${sourceId} ORDER BY cc.chunk_index `; return rows.map((r) => rowToChunk(r as Record)); @@ -1152,11 +1162,12 @@ export class PostgresEngine implements BrainEngine { return rows as unknown as StaleChunkRow[]; } - async deleteChunks(slug: string): Promise { + async deleteChunks(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; + const sourceId = opts?.sourceId ?? 'default'; await sql` DELETE FROM content_chunks - WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug}) + WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${sourceId}) `; } @@ -1169,28 +1180,39 @@ export class PostgresEngine implements BrainEngine { linkSource?: string, originSlug?: string, originField?: string, + opts?: { fromSourceId?: string; toSourceId?: string; originSourceId?: string }, ): Promise { const sql = this.sql; + const fromSrc = opts?.fromSourceId ?? 'default'; + const toSrc = opts?.toSourceId ?? 'default'; + const originSrc = opts?.originSourceId ?? 'default'; + // Pre-check existence so we can throw a clear error (ON CONFLICT DO UPDATE - // returns 0 rows when source SELECT is empty, indistinguishable from missing page). + // returns 0 rows when source SELECT is empty, indistinguishable from missing + // page). Source-qualified — pre-v0.18 the bare slug check matched ANY source, + // letting addLink succeed even when the intended source row was missing. const exists = await sql` - SELECT 1 FROM pages WHERE slug = ${from} + SELECT 1 FROM pages WHERE slug = ${from} AND source_id = ${fromSrc} INTERSECT - SELECT 1 FROM pages WHERE slug = ${to} + SELECT 1 FROM pages WHERE slug = ${to} AND source_id = ${toSrc} `; if (exists.length === 0) { - throw new Error(`addLink failed: page "${from}" or "${to}" not found`); + throw new Error(`addLink failed: page "${from}" (source=${fromSrc}) or "${to}" (source=${toSrc}) not found`); } // Default link_source to 'markdown' for back-compat with pre-v0.13 callers. - // origin_page_id resolves from originSlug via the pages join (NULL if no slug). + // Mirror addLinksBatch's VALUES + JOIN-on-(slug, source_id) shape. The old + // `FROM pages f, pages t` cross-product fanned out across every source + // containing either slug, so a multi-source brain silently created edges + // pointing at the wrong pages. const src = linkSource ?? 'markdown'; await sql` INSERT INTO links (from_page_id, to_page_id, link_type, context, link_source, origin_page_id, origin_field) - SELECT f.id, t.id, ${linkType || ''}, ${context || ''}, ${src}, - (SELECT id FROM pages WHERE slug = ${originSlug ?? null}), - ${originField ?? null} - FROM pages f, pages t - WHERE f.slug = ${from} AND t.slug = ${to} + SELECT f.id, t.id, v.link_type, v.context, v.link_source, o.id, v.origin_field + FROM (VALUES (${from}, ${to}, ${linkType || ''}, ${context || ''}, ${src}, ${originSlug ?? null}, ${originField ?? null}, ${fromSrc}, ${toSrc}, ${originSrc})) + AS v(from_slug, to_slug, link_type, context, link_source, origin_slug, origin_field, from_source_id, to_source_id, origin_source_id) + JOIN pages f ON f.slug = v.from_slug AND f.source_id = v.from_source_id + JOIN pages t ON t.slug = v.to_slug AND t.source_id = v.to_source_id + LEFT JOIN pages o ON o.slug = v.origin_slug AND o.source_id = v.origin_source_id ON CONFLICT (from_page_id, to_page_id, link_type, link_source, origin_page_id) DO UPDATE SET context = EXCLUDED.context, origin_field = EXCLUDED.origin_field @@ -1236,37 +1258,47 @@ export class PostgresEngine implements BrainEngine { return result.length; } - async removeLink(from: string, to: string, linkType?: string, linkSource?: string): Promise { + async removeLink( + from: string, + to: string, + linkType?: string, + linkSource?: string, + opts?: { fromSourceId?: string; toSourceId?: string }, + ): Promise { const sql = this.sql; + const fromSrc = opts?.fromSourceId ?? 'default'; + const toSrc = opts?.toSourceId ?? 'default'; // Build up filters dynamically. linkType + linkSource are independent - // optional constraints; all four combinations are valid. + // optional constraints; all four combinations are valid. Each branch's + // page-id subquery is source-qualified so multi-source brains don't + // delete the wrong (from, to) pair. if (linkType !== undefined && linkSource !== undefined) { await sql` DELETE FROM links - WHERE from_page_id = (SELECT id FROM pages WHERE slug = ${from}) - AND to_page_id = (SELECT id FROM pages WHERE slug = ${to}) + WHERE from_page_id = (SELECT id FROM pages WHERE slug = ${from} AND source_id = ${fromSrc}) + AND to_page_id = (SELECT id FROM pages WHERE slug = ${to} AND source_id = ${toSrc}) AND link_type = ${linkType} AND link_source IS NOT DISTINCT FROM ${linkSource} `; } else if (linkType !== undefined) { await sql` DELETE FROM links - WHERE from_page_id = (SELECT id FROM pages WHERE slug = ${from}) - AND to_page_id = (SELECT id FROM pages WHERE slug = ${to}) + WHERE from_page_id = (SELECT id FROM pages WHERE slug = ${from} AND source_id = ${fromSrc}) + AND to_page_id = (SELECT id FROM pages WHERE slug = ${to} AND source_id = ${toSrc}) AND link_type = ${linkType} `; } else if (linkSource !== undefined) { await sql` DELETE FROM links - WHERE from_page_id = (SELECT id FROM pages WHERE slug = ${from}) - AND to_page_id = (SELECT id FROM pages WHERE slug = ${to}) + WHERE from_page_id = (SELECT id FROM pages WHERE slug = ${from} AND source_id = ${fromSrc}) + AND to_page_id = (SELECT id FROM pages WHERE slug = ${to} AND source_id = ${toSrc}) AND link_source IS NOT DISTINCT FROM ${linkSource} `; } else { await sql` DELETE FROM links - WHERE from_page_id = (SELECT id FROM pages WHERE slug = ${from}) - AND to_page_id = (SELECT id FROM pages WHERE slug = ${to}) + WHERE from_page_id = (SELECT id FROM pages WHERE slug = ${from} AND source_id = ${fromSrc}) + AND to_page_id = (SELECT id FROM pages WHERE slug = ${to} AND source_id = ${toSrc}) `; } } @@ -1573,12 +1605,15 @@ export class PostgresEngine implements BrainEngine { } // Tags - async addTag(slug: string, tag: string): Promise { + async addTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; + const sourceId = opts?.sourceId ?? 'default'; // Verify page exists before attempting insert (ON CONFLICT DO NOTHING - // swallows the "already tagged" case, but we still need to detect missing pages) - const page = await sql`SELECT id FROM pages WHERE slug = ${slug}`; - if (page.length === 0) throw new Error(`addTag failed: page "${slug}" not found`); + // swallows the "already tagged" case, but we still need to detect missing + // pages). Source-scoped lookup — pre-v0.18 the bare-slug subquery returned + // multiple rows in multi-source brains and crashed with Postgres 21000. + const page = await sql`SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${sourceId}`; + if (page.length === 0) throw new Error(`addTag failed: page "${slug}" (source=${sourceId}) not found`); await sql` INSERT INTO tags (page_id, tag) VALUES (${page[0].id}, ${tag}) @@ -1586,20 +1621,22 @@ export class PostgresEngine implements BrainEngine { `; } - async removeTag(slug: string, tag: string): Promise { + async removeTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; + const sourceId = opts?.sourceId ?? 'default'; await sql` DELETE FROM tags - WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug}) + WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${sourceId}) AND tag = ${tag} `; } - async getTags(slug: string): Promise { + async getTags(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; + const sourceId = opts?.sourceId ?? 'default'; const rows = await sql` SELECT tag FROM tags - WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug}) + WHERE page_id = (SELECT id FROM pages WHERE slug = ${slug} AND source_id = ${sourceId}) ORDER BY tag `; return rows.map((r) => r.tag as string); @@ -1609,22 +1646,25 @@ export class PostgresEngine implements BrainEngine { async addTimelineEntry( slug: string, entry: TimelineInput, - opts?: { skipExistenceCheck?: boolean }, + opts?: { skipExistenceCheck?: boolean; sourceId?: string }, ): Promise { const sql = this.sql; + const sourceId = opts?.sourceId ?? 'default'; if (!opts?.skipExistenceCheck) { - const exists = await sql`SELECT 1 FROM pages WHERE slug = ${slug}`; + const exists = await sql`SELECT 1 FROM pages WHERE slug = ${slug} AND source_id = ${sourceId}`; if (exists.length === 0) { - throw new Error(`addTimelineEntry failed: page "${slug}" not found`); + throw new Error(`addTimelineEntry failed: page "${slug}" (source=${sourceId}) not found`); } } // ON CONFLICT DO NOTHING via the (page_id, date, summary) unique index. // Returning 0 rows means either page missing OR duplicate; skipExistenceCheck - // makes that ambiguity safe (caller asserts page exists). + // makes that ambiguity safe (caller asserts page exists). Source-qualify + // the page-id lookup so multi-source brains don't fan timeline rows out + // across every source containing the slug. await sql` INSERT INTO timeline_entries (page_id, date, source, summary, detail) SELECT id, ${entry.date}::date, ${entry.source || ''}, ${entry.summary}, ${entry.detail || ''} - FROM pages WHERE slug = ${slug} + FROM pages WHERE slug = ${slug} AND source_id = ${sourceId} ON CONFLICT (page_id, date, summary) DO NOTHING `; } @@ -2146,15 +2186,16 @@ export class PostgresEngine implements BrainEngine { } // Versions - async createVersion(slug: string): Promise { + async createVersion(slug: string, opts?: { sourceId?: string }): Promise { const sql = this.sql; + const sourceId = opts?.sourceId ?? 'default'; const rows = await sql` INSERT INTO page_versions (page_id, compiled_truth, frontmatter) SELECT id, compiled_truth, frontmatter - FROM pages WHERE slug = ${slug} + FROM pages WHERE slug = ${slug} AND source_id = ${sourceId} RETURNING * `; - if (rows.length === 0) throw new Error(`createVersion failed: page "${slug}" not found`); + if (rows.length === 0) throw new Error(`createVersion failed: page "${slug}" (source=${sourceId}) not found`); return rows[0] as unknown as PageVersion; } @@ -2324,10 +2365,14 @@ export class PostgresEngine implements BrainEngine { } // Sync - async updateSlug(oldSlug: string, newSlug: string): Promise { + async updateSlug(oldSlug: string, newSlug: string, opts?: { sourceId?: string }): Promise { newSlug = validateSlug(newSlug); const sql = this.sql; - await sql`UPDATE pages SET slug = ${newSlug}, updated_at = now() WHERE slug = ${oldSlug}`; + const sourceId = opts?.sourceId ?? 'default'; + // Source-qualify so a rename in source A doesn't sweep up same-slug rows + // in sources B/C/D (which would either rename them all OR fail the + // (source_id, slug) UNIQUE if the new slug already exists in another source). + await sql`UPDATE pages SET slug = ${newSlug}, updated_at = now() WHERE slug = ${oldSlug} AND source_id = ${sourceId}`; } async rewriteLinks(_oldSlug: string, _newSlug: string): Promise { diff --git a/test/import-image-file.test.ts b/test/import-image-file.test.ts index 8542e1d83..9b3d6e612 100644 --- a/test/import-image-file.test.ts +++ b/test/import-image-file.test.ts @@ -129,3 +129,67 @@ describe('importImageFile happy path (noEmbed)', () => { expect(result.error).toMatch(/Image too large/); }); }); + +describe('importImageFile source_id routing (multi-source)', () => { + // Regression for the image-import body wiring. Pre-fix, importImageFile + // accepted opts.sourceId for typecheck but never routed page/chunk/file + // writes to that source — they all silently landed in 'default'. Under + // multi-source full-sync with GBRAIN_EMBEDDING_MULTIMODAL=true, that + // recreated the same silent-misroute bug the text-path fix addressed. + + async function pageCountBySource(engine: PGLiteEngine): Promise> { + const rows = await engine.executeRaw<{ source_id: string; n: number }>( + `SELECT source_id, COUNT(*)::int AS n FROM pages WHERE type = 'image' GROUP BY source_id`, + ); + const out: Record = {}; + for (const r of rows) out[r.source_id] = r.n; + return out; + } + + test('imports image to a named source when sourceId is passed', async () => { + // Pre-create the named source so the FK on pages.source_id is satisfied. + await engine.executeRaw( + `INSERT INTO sources (id, name) VALUES ('img-src', 'Image Source') ON CONFLICT (id) DO NOTHING`, + ); + + const target = join(tmpDir, 'sourced-photo.png'); + copyFileSync('test/fixtures/images/tiny.avif', target); + + const result = await importImageFile(engine, target, 'photos/sourced-photo.png', { + noEmbed: true, + sourceId: 'img-src', + }); + expect(result.status).toBe('imported'); + + const counts = await pageCountBySource(engine); + expect(counts['img-src']).toBe(1); + expect(counts['default'] ?? 0).toBe(0); + + // File row also lands in img-src (FileSpec identity is (source_id, storage_path)). + const fileInSrc = await engine.getFile('img-src', 'photos/sourced-photo.png'); + expect(fileInSrc).not.toBeNull(); + const fileInDefault = await engine.getFile('default', 'photos/sourced-photo.png'); + expect(fileInDefault).toBeNull(); + + // Page is reachable in img-src, not default. + const pageInSrc = await engine.getPage('photos/sourced-photo.png', { sourceId: 'img-src' }); + expect(pageInSrc).not.toBeNull(); + const pageInDefault = await engine.getPage('photos/sourced-photo.png', { sourceId: 'default' }); + expect(pageInDefault).toBeNull(); + }); + + test('omitting sourceId still targets default (back-compat preserved)', async () => { + const target = join(tmpDir, 'default-photo.png'); + copyFileSync('test/fixtures/images/tiny.avif', target); + + const result = await importImageFile(engine, target, 'photos/default-photo.png', { noEmbed: true }); + expect(result.status).toBe('imported'); + + const counts = await pageCountBySource(engine); + expect(counts['default']).toBe(1); + expect(counts['img-src'] ?? 0).toBe(0); + + const file = await engine.getFile('default', 'photos/default-photo.png'); + expect(file).not.toBeNull(); + }); +}); diff --git a/test/performfullsync-source-id.test.ts b/test/performfullsync-source-id.test.ts new file mode 100644 index 000000000..299d44b55 --- /dev/null +++ b/test/performfullsync-source-id.test.ts @@ -0,0 +1,235 @@ +/** + * Regression test for performFullSync source_id threading. + * + * Pre-fix bug: + * - The engine-layer source_id thread (in this PR's earlier commit) plumbs + * source_id through sync's incremental loop (sync.ts), but performFullSync + * (the path `--full` invokes) called `runImport(engine, importArgs, + * { commit: headCommit })` without threading sourceId. + * - Result: `gbrain sync --source X --full` updated `sources.last_sync_at` to + * look like binding worked, but actual page rows landed in source_id='default'. + * - The existing engine-layer regression tests (test/source-id-tx-regression.test.ts) + * validate the transaction surface (putPage / addTag / etc.) but do NOT + * exercise performFullSync. Confirmed via: + * grep -c 'performFullSync' test/source-id-tx-regression.test.ts → 0. + * + * Fix (this PR): + * - runImport accepts opts.sourceId (programmatic-only — no CLI flag, + * preserves the design intent of `gbrain import` being default-only). + * - runImport threads sourceId to importFile + importImageFile. + * - performFullSync passes opts.sourceId to runImport. + * + * This test verifies the sync-command-layer fix end-to-end on PGLite. + */ + +import { describe, test, expect, beforeAll, afterAll, beforeEach, afterEach } from 'bun:test'; +import { mkdtempSync, writeFileSync, rmSync, mkdirSync } from 'fs'; +import { execSync } from 'child_process'; +import { tmpdir } from 'os'; +import { join } from 'path'; +import { PGLiteEngine } from '../src/core/pglite-engine.ts'; +import { runSources } from '../src/commands/sources.ts'; +import { resetPgliteState } from './helpers/reset-pglite.ts'; + +let engine: PGLiteEngine; +let repoPath: string; + +async function pageCountBySource(): Promise> { + const rows = await engine.executeRaw<{ source_id: string; n: number }>( + `SELECT source_id, COUNT(*)::int AS n FROM pages GROUP BY source_id`, + ); + const out: Record = {}; + for (const r of rows) out[r.source_id] = r.n; + return out; +} + +describe('performFullSync threads sourceId end-to-end', () => { + beforeAll(async () => { + engine = new PGLiteEngine(); + await engine.connect({}); + await engine.initSchema(); + await runSources(engine, ['add', 'testsrc-pfs', '--no-federated']); + }, 60_000); + + afterAll(async () => { + if (engine) await engine.disconnect(); + }, 60_000); + + beforeEach(async () => { + await resetPgliteState(engine); + // resetPgliteState clears pages but doesn't drop the source row; re-add only if missing + const sources = await engine.executeRaw<{ id: string }>(`SELECT id FROM sources WHERE id = 'testsrc-pfs'`); + if (sources.length === 0) { + await runSources(engine, ['add', 'testsrc-pfs', '--no-federated']); + } + + repoPath = mkdtempSync(join(tmpdir(), 'gbrain-pfs-')); + execSync('git init', { cwd: repoPath, stdio: 'pipe' }); + execSync('git config user.email "test@test.com"', { cwd: repoPath, stdio: 'pipe' }); + execSync('git config user.name "Test"', { cwd: repoPath, stdio: 'pipe' }); + mkdirSync(join(repoPath, 'topics'), { recursive: true }); + writeFileSync(join(repoPath, 'topics/foo.md'), [ + '---', + 'type: concept', + 'title: Foo Topic', + '---', + '', + 'Test content for performFullSync source binding.', + ].join('\n')); + writeFileSync(join(repoPath, 'topics/bar.md'), [ + '---', + 'type: concept', + 'title: Bar Topic', + '---', + '', + 'Second test page to verify multi-page routing.', + ].join('\n')); + execSync('git add -A && git commit -m "initial"', { cwd: repoPath, stdio: 'pipe' }); + }); + + afterEach(() => { + if (repoPath) rmSync(repoPath, { recursive: true, force: true }); + }); + + test('performFullSync with --source routes pages to named source (not default)', async () => { + const { performSync } = await import('../src/commands/sync.ts'); + const result = await performSync(engine, { + repoPath, + full: true, + sourceId: 'testsrc-pfs', + noPull: true, + noEmbed: true, + }); + + // status is 'first_sync' for fresh imports, 'synced' for incremental — accept both + expect(['first_sync', 'synced']).toContain(result.status); + expect(result.added).toBeGreaterThan(0); + + const counts = await pageCountBySource(); + // Pre-fix bug: pages would land in 'default' (sources.last_sync_at would still + // update on testsrc-pfs, making the gap silent at the sources-list level). + // Post-fix: pages land in 'testsrc-pfs'. + expect(counts['testsrc-pfs']).toBeGreaterThan(0); + expect(counts['default'] ?? 0).toBe(0); + }); + + test('performFullSync WITHOUT --source still targets default (back-compat preserved)', async () => { + const { performSync } = await import('../src/commands/sync.ts'); + const result = await performSync(engine, { + repoPath, + full: true, + // no sourceId — expect default-source behavior + noPull: true, + noEmbed: true, + }); + + // status is 'first_sync' for fresh imports, 'synced' for incremental — accept both + expect(['first_sync', 'synced']).toContain(result.status); + expect(result.added).toBeGreaterThan(0); + + const counts = await pageCountBySource(); + // Back-compat: callers that omit sourceId continue to target source 'default'. + expect(counts['default']).toBeGreaterThan(0); + expect(counts['testsrc-pfs'] ?? 0).toBe(0); + }); +}); + +describe('performFullSync global sync-config isolation under --source', () => { + // Regression for the writeSyncConfig opt: runImport writes three global + // (non-source-scoped) config keys — sync.last_commit / sync.last_run / + // sync.repo_path. Pre-fix, those wrote even when called from + // performFullSync with a sourceId, so a `--source X --full` run would + // overwrite global `sync.repo_path` with X's repo. A later bare + // `gbrain sync` then read X's path as the default-source repo and + // imported X content into source='default'. Silent contamination. + // + // Post-fix, performFullSync passes writeSyncConfig=false to runImport + // when sourceId is set, so the legacy global writes are gated. The + // source-scoped anchor (sources.X.last_commit) is owned by performFullSync + // itself via writeSyncAnchor. + let isolationEngine: PGLiteEngine; + let isolationRepo: string; + + beforeAll(async () => { + isolationEngine = new PGLiteEngine(); + await isolationEngine.connect({}); + await isolationEngine.initSchema(); + await runSources(isolationEngine, ['add', 'isolated-src', '--no-federated']); + }, 60_000); + + afterAll(async () => { + if (isolationEngine) await isolationEngine.disconnect(); + }, 60_000); + + beforeEach(async () => { + await resetPgliteState(isolationEngine); + const sources = await isolationEngine.executeRaw<{ id: string }>( + `SELECT id FROM sources WHERE id = 'isolated-src'`, + ); + if (sources.length === 0) { + await runSources(isolationEngine, ['add', 'isolated-src', '--no-federated']); + } + isolationRepo = mkdtempSync(join(tmpdir(), 'gbrain-iso-')); + execSync('git init', { cwd: isolationRepo, stdio: 'pipe' }); + execSync('git config user.email "test@test.com"', { cwd: isolationRepo, stdio: 'pipe' }); + execSync('git config user.name "Test"', { cwd: isolationRepo, stdio: 'pipe' }); + mkdirSync(join(isolationRepo, 'topics'), { recursive: true }); + writeFileSync(join(isolationRepo, 'topics/iso.md'), [ + '---', 'type: concept', 'title: Iso Topic', '---', + '', 'Isolation test content.', + ].join('\n')); + execSync('git add -A && git commit -m "initial"', { cwd: isolationRepo, stdio: 'pipe' }); + }); + + afterEach(() => { + if (isolationRepo) rmSync(isolationRepo, { recursive: true, force: true }); + }); + + test('--source X --full does NOT overwrite global sync.repo_path or sync.last_commit', async () => { + // Pre-set the global keys to known sentinel values so we can detect + // whether runImport overwrote them. + await isolationEngine.setConfig('sync.repo_path', '/tmp/sentinel-default-repo'); + await isolationEngine.setConfig('sync.last_commit', 'sentinel-default-commit'); + + const { performSync } = await import('../src/commands/sync.ts'); + const result = await performSync(isolationEngine, { + repoPath: isolationRepo, + full: true, + sourceId: 'isolated-src', + noPull: true, + noEmbed: true, + }); + expect(['first_sync', 'synced']).toContain(result.status); + + // Pre-fix: these would now hold isolationRepo / its HEAD. Post-fix: untouched. + const repoPathAfter = await isolationEngine.getConfig('sync.repo_path'); + const lastCommitAfter = await isolationEngine.getConfig('sync.last_commit'); + expect(repoPathAfter).toBe('/tmp/sentinel-default-repo'); + expect(lastCommitAfter).toBe('sentinel-default-commit'); + + // The source-scoped anchor IS expected to be written (performFullSync owns + // it directly via writeSyncAnchor). Verify by reading sources.isolated-src. + const srcRows = await isolationEngine.executeRaw<{ last_commit: string | null }>( + `SELECT last_commit FROM sources WHERE id = 'isolated-src'`, + ); + expect(srcRows[0]?.last_commit).not.toBeNull(); + expect(srcRows[0]?.last_commit).not.toBe('sentinel-default-commit'); + }); + + test('full sync WITHOUT --source DOES update global sync.repo_path (back-compat)', async () => { + await isolationEngine.setConfig('sync.repo_path', '/tmp/sentinel-default-repo'); + + const { performSync } = await import('../src/commands/sync.ts'); + const result = await performSync(isolationEngine, { + repoPath: isolationRepo, + full: true, + // no sourceId — back-compat path; runImport SHOULD write global config + noPull: true, + noEmbed: true, + }); + expect(['first_sync', 'synced']).toContain(result.status); + + const repoPathAfter = await isolationEngine.getConfig('sync.repo_path'); + expect(repoPathAfter).toBe(isolationRepo); + }); +}); diff --git a/test/source-id-tx-regression.test.ts b/test/source-id-tx-regression.test.ts new file mode 100644 index 000000000..b267672dc --- /dev/null +++ b/test/source-id-tx-regression.test.ts @@ -0,0 +1,466 @@ +/** + * v0.18.0+ Step 5+ regression — source_id threading through the per-page + * transaction surface (putPage / createVersion / getTags / addTag / removeTag / + * deleteChunks / upsertChunks / addLink / removeLink). + * + * Pre-fix bug: + * - putPage omitted source_id from its INSERT column list, so the schema + * DEFAULT 'default' was applied even when the caller meant to write under + * a non-default source (e.g. 'jarvis-memory'). When the same slug already + * existed under the intended source, putPage silently fabricated a + * duplicate row at (default, slug). Both rows then coexisted under the + * composite UNIQUE. + * - Subsequent bare-slug subqueries inside the same transaction — + * `(SELECT id FROM pages WHERE slug = $1)` in getTags / removeTag / + * deleteChunks / removeLink — returned 2 rows and crashed with Postgres + * 21000 ("more than one row returned by a subquery used as an expression"), + * rolling back the entire tx. + * + * Fix: + * - putPage adds source_id to the INSERT column list (defaults to 'default' + * when opts.sourceId is omitted, preserving back-compat). + * - Every bare-slug page-id subquery becomes source-qualified + * (`AND source_id = $X`), eliminating the multi-row fan-out. + * - addLink converts away from `FROM pages f, pages t` cross-product and + * mirrors addLinksBatch's VALUES + JOIN-on-(slug, source_id) shape. + * + * Backwards-compat: every method's opts param is optional. Existing callers + * that don't pass sourceId continue to target source 'default' (the schema + * default) and behave identically to pre-fix. + */ + +import { describe, test, expect, beforeAll, afterAll } from 'bun:test'; +import { PGLiteEngine } from '../src/core/pglite-engine.ts'; +import { runSources } from '../src/commands/sources.ts'; +import { importFromContent } from '../src/core/import-file.ts'; + +let engine: PGLiteEngine; + +beforeAll(async () => { + engine = new PGLiteEngine(); + await engine.connect({ type: 'pglite' } as never); + await engine.initSchema(); + // Add the second source up-front; tests below assume both 'default' and + // 'testsrc' exist. + await runSources(engine, ['add', 'testsrc', '--no-federated']); +}, 60_000); + +afterAll(async () => { + if (engine) await engine.disconnect(); +}, 60_000); + +const SLUG = 'topics/source-id-regression'; + +describe('putPage threads source_id into the INSERT column list', () => { + test('putPage with opts.sourceId writes under the intended source', async () => { + await engine.putPage(SLUG, { + type: 'concept', + title: 'Default-source variant', + compiled_truth: 'Lives under source=default.', + }); + await engine.putPage(SLUG, { + type: 'concept', + title: 'Testsrc-source variant', + compiled_truth: 'Lives under source=testsrc.', + }, { sourceId: 'testsrc' }); + + const rows = await engine.executeRaw<{ source_id: string; title: string }>( + `SELECT source_id, title FROM pages WHERE slug = $1 ORDER BY source_id`, + [SLUG], + ); + expect(rows.length).toBe(2); + expect(rows[0].source_id).toBe('default'); + expect(rows[0].title).toBe('Default-source variant'); + expect(rows[1].source_id).toBe('testsrc'); + expect(rows[1].title).toBe('Testsrc-source variant'); + }); + + test('putPage without opts.sourceId still targets source=default (back-compat)', async () => { + // Call again under default to verify the no-opts path still hits the same + // (default, slug) row rather than fabricating a duplicate. + const updated = await engine.putPage(SLUG, { + type: 'concept', + title: 'Default-source updated', + compiled_truth: 'Updated content.', + }); + expect(updated.title).toBe('Default-source updated'); + + const rows = await engine.executeRaw<{ source_id: string; title: string }>( + `SELECT source_id, title FROM pages WHERE slug = $1 ORDER BY source_id`, + [SLUG], + ); + // Still exactly two rows — no duplicate fabricated. + expect(rows.length).toBe(2); + expect(rows.find(r => r.source_id === 'default')!.title).toBe('Default-source updated'); + expect(rows.find(r => r.source_id === 'testsrc')!.title).toBe('Testsrc-source variant'); + }); +}); + +describe('Per-page tx methods source-qualify their bare-slug subqueries', () => { + test('getTags(slug, { sourceId }) returns scoped tags without 21000', async () => { + // Pre-fix: this call would crash because the bare-slug subquery + // `(SELECT id FROM pages WHERE slug = $1)` matched both rows. + await engine.addTag(SLUG, 'shared-by-default', { sourceId: 'default' }); + await engine.addTag(SLUG, 'unique-to-testsrc', { sourceId: 'testsrc' }); + await engine.addTag(SLUG, 'also-shared', { sourceId: 'default' }); + await engine.addTag(SLUG, 'also-shared', { sourceId: 'testsrc' }); + + const defaultTags = await engine.getTags(SLUG, { sourceId: 'default' }); + expect(defaultTags.sort()).toEqual(['also-shared', 'shared-by-default']); + + const testsrcTags = await engine.getTags(SLUG, { sourceId: 'testsrc' }); + expect(testsrcTags.sort()).toEqual(['also-shared', 'unique-to-testsrc']); + }); + + test('removeTag(slug, tag, { sourceId }) only removes from one source', async () => { + await engine.removeTag(SLUG, 'also-shared', { sourceId: 'testsrc' }); + expect((await engine.getTags(SLUG, { sourceId: 'default' })).sort()) + .toEqual(['also-shared', 'shared-by-default']); + expect((await engine.getTags(SLUG, { sourceId: 'testsrc' })).sort()) + .toEqual(['unique-to-testsrc']); + }); + + test('deleteChunks(slug, { sourceId }) only deletes one source\'s chunks', async () => { + await engine.upsertChunks(SLUG, [ + { chunk_index: 0, chunk_text: 'default chunk 0', chunk_source: 'compiled_truth' }, + ], { sourceId: 'default' }); + await engine.upsertChunks(SLUG, [ + { chunk_index: 0, chunk_text: 'testsrc chunk 0', chunk_source: 'compiled_truth' }, + ], { sourceId: 'testsrc' }); + + const beforeRows = await engine.executeRaw<{ source_id: string; chunk_text: string }>( + `SELECT p.source_id, cc.chunk_text + FROM content_chunks cc + JOIN pages p ON p.id = cc.page_id + WHERE p.slug = $1 + ORDER BY p.source_id`, + [SLUG], + ); + expect(beforeRows.length).toBe(2); + + await engine.deleteChunks(SLUG, { sourceId: 'testsrc' }); + + const afterRows = await engine.executeRaw<{ source_id: string; chunk_text: string }>( + `SELECT p.source_id, cc.chunk_text + FROM content_chunks cc + JOIN pages p ON p.id = cc.page_id + WHERE p.slug = $1`, + [SLUG], + ); + expect(afterRows.length).toBe(1); + expect(afterRows[0].source_id).toBe('default'); + }); + + test('createVersion(slug, { sourceId }) snapshots the right row', async () => { + const v = await engine.createVersion(SLUG, { sourceId: 'testsrc' }); + expect(v).toBeDefined(); + const rows = await engine.executeRaw<{ source_id: string; compiled_truth: string }>( + `SELECT p.source_id, pv.compiled_truth + FROM page_versions pv + JOIN pages p ON p.id = pv.page_id + WHERE p.slug = $1 + ORDER BY pv.snapshot_at DESC + LIMIT 1`, + [SLUG], + ); + expect(rows.length).toBe(1); + expect(rows[0].source_id).toBe('testsrc'); + expect(rows[0].compiled_truth).toBe('Lives under source=testsrc.'); + }); +}); + +describe('addLink rewrites the cross-product into a source-qualified JOIN', () => { + const FROM_SLUG = 'topics/regression-link-from'; + const TO_SLUG = 'topics/regression-link-to'; + + test('addLink with opts.{from,to,origin}SourceId targets the right rows', async () => { + // Set up: same (from, to) slug pair under both default and testsrc. + await engine.putPage(FROM_SLUG, { type: 'concept', title: 'F default', compiled_truth: '' }); + await engine.putPage(TO_SLUG, { type: 'concept', title: 'T default', compiled_truth: '' }); + await engine.putPage(FROM_SLUG, { type: 'concept', title: 'F testsrc', compiled_truth: '' }, { sourceId: 'testsrc' }); + await engine.putPage(TO_SLUG, { type: 'concept', title: 'T testsrc', compiled_truth: '' }, { sourceId: 'testsrc' }); + + // Add an edge under testsrc only. + await engine.addLink( + FROM_SLUG, TO_SLUG, 'testsrc edge', 'documents', 'markdown', undefined, undefined, + { fromSourceId: 'testsrc', toSourceId: 'testsrc', originSourceId: 'testsrc' }, + ); + + // Verify the link's endpoints both point at the testsrc rows, not the + // default rows. Pre-fix, the cross-product `FROM pages f, pages t` would + // pick whichever order Postgres returned; the source filter eliminates + // that fan-out. + const rows = await engine.executeRaw<{ from_src: string; to_src: string; context: string }>( + `SELECT f.source_id AS from_src, t.source_id AS to_src, l.context + FROM links l + JOIN pages f ON f.id = l.from_page_id + JOIN pages t ON t.id = l.to_page_id + WHERE l.context = 'testsrc edge'`, + ); + expect(rows.length).toBe(1); + expect(rows[0].from_src).toBe('testsrc'); + expect(rows[0].to_src).toBe('testsrc'); + }); + + test('addLink with no opts defaults to source=default (back-compat)', async () => { + await engine.addLink( + FROM_SLUG, TO_SLUG, 'default edge', 'documents', 'markdown', + ); + const rows = await engine.executeRaw<{ from_src: string; to_src: string }>( + `SELECT f.source_id AS from_src, t.source_id AS to_src + FROM links l + JOIN pages f ON f.id = l.from_page_id + JOIN pages t ON t.id = l.to_page_id + WHERE l.context = 'default edge'`, + ); + expect(rows.length).toBe(1); + expect(rows[0].from_src).toBe('default'); + expect(rows[0].to_src).toBe('default'); + }); + + test('addLink fails fast when the source-qualified endpoint doesn\'t exist', async () => { + // Pre-fix: cross-product would silently fall back to the wrong source + // pair and succeed. Post-fix: missing-source-row → no JOIN match → no row + // inserted → INTERSECT pre-check throws. + let err: Error | null = null; + try { + await engine.addLink( + FROM_SLUG, TO_SLUG, 'phantom edge', 'documents', 'markdown', undefined, undefined, + { fromSourceId: 'nonexistent-src', toSourceId: 'nonexistent-src' }, + ); + } catch (e) { + err = e as Error; + } + expect(err).not.toBeNull(); + expect(err!.message).toMatch(/not found/); + }); +}); + +describe('importFromContent threads sourceId through the entire transaction body', () => { + const IMP_SLUG = 'topics/regression-import-thread'; + + test('importFromContent under source=testsrc does not fabricate a (default, slug) duplicate', async () => { + // Pre-seed a default-source row at the same slug to prove the fix actually + // discriminates: pre-fix, importing under testsrc would have ALSO touched + // the default row (or duplicated it) and the bare-slug getTags inside the + // tx would crash with 21000. + await engine.putPage(IMP_SLUG, { + type: 'concept', + title: 'Default-source seed', + compiled_truth: 'pre-existing default row', + }); + + const md = `--- +type: concept +title: Imported under testsrc +--- + +# Imported under testsrc + +Body content; tags get reconciled inside the transaction. +`; + + // No 21000, no duplicate. Pre-fix this call would have either crashed + // mid-tx (rolling back) OR fabricated a third row at (default, slug). + const result = await importFromContent(engine, IMP_SLUG, md, { + noEmbed: true, + sourceId: 'testsrc', + }); + expect(result.status).toBe('imported'); + + const rows = await engine.executeRaw<{ source_id: string; title: string }>( + `SELECT source_id, title FROM pages WHERE slug = $1 ORDER BY source_id`, + [IMP_SLUG], + ); + expect(rows.length).toBe(2); + expect(rows[0].source_id).toBe('default'); + expect(rows[0].title).toBe('Default-source seed'); + expect(rows[1].source_id).toBe('testsrc'); + expect(rows[1].title).toBe('Imported under testsrc'); + }); + + test('re-importing same content under same sourceId is idempotent (status=skipped)', async () => { + const md = `--- +type: concept +title: Imported under testsrc +--- + +# Imported under testsrc + +Body content; tags get reconciled inside the transaction. +`; + const result = await importFromContent(engine, IMP_SLUG, md, { + noEmbed: true, + sourceId: 'testsrc', + }); + expect(result.status).toBe('skipped'); + }); +}); + +describe('addTimelineEntry source-scoping (Data R1 HIGH 2 fix)', () => { + const TL_SLUG = 'topics/regression-timeline'; + + test('addTimelineEntry with opts.sourceId only writes to the intended source', async () => { + // Set up: same slug under both default and testsrc. + await engine.putPage(TL_SLUG, { type: 'concept', title: 'TL default', compiled_truth: '' }); + await engine.putPage(TL_SLUG, { type: 'concept', title: 'TL testsrc', compiled_truth: '' }, { sourceId: 'testsrc' }); + + // Pre-fix: bare-slug `INSERT ... SELECT id FROM pages WHERE slug = $1` + // would have inserted timeline rows for BOTH source rows, fanning out + // the entry across sources. + await engine.addTimelineEntry(TL_SLUG, { + date: '2026-05-07', + source: 'test', + summary: 'testsrc-only entry', + detail: 'Should land only under testsrc.', + }, { sourceId: 'testsrc' }); + + const rows = await engine.executeRaw<{ source_id: string; summary: string }>( + `SELECT p.source_id, te.summary + FROM timeline_entries te + JOIN pages p ON p.id = te.page_id + WHERE p.slug = $1`, + [TL_SLUG], + ); + expect(rows.length).toBe(1); + expect(rows[0].source_id).toBe('testsrc'); + expect(rows[0].summary).toBe('testsrc-only entry'); + }); + + test('addTimelineEntry rejects missing source-qualified page', async () => { + let err: Error | null = null; + try { + await engine.addTimelineEntry(TL_SLUG, { + date: '2026-05-08', + source: 'test', + summary: 'bad source', + detail: '', + }, { sourceId: 'nonexistent-src' }); + } catch (e) { + err = e as Error; + } + expect(err).not.toBeNull(); + expect(err!.message).toMatch(/not found/); + }); + + test('addTimelineEntry without opts defaults to source=default (back-compat)', async () => { + await engine.addTimelineEntry(TL_SLUG, { + date: '2026-05-09', + source: 'test', + summary: 'default-source entry', + detail: '', + }); + + const rows = await engine.executeRaw<{ source_id: string; summary: string }>( + `SELECT p.source_id, te.summary + FROM timeline_entries te + JOIN pages p ON p.id = te.page_id + WHERE p.slug = $1 AND te.summary = 'default-source entry'`, + [TL_SLUG], + ); + expect(rows.length).toBe(1); + expect(rows[0].source_id).toBe('default'); + }); +}); + +describe('deletePage + updateSlug source-scoping (Data R2 CRITICAL + HIGH fix)', () => { + const DEL_SLUG = 'topics/regression-delete'; + const REN_FROM = 'topics/regression-rename-from'; + const REN_TO = 'topics/regression-rename-to'; + + test('deletePage with opts.sourceId only deletes the intended source row', async () => { + // Set up: same slug under both default and testsrc. + await engine.putPage(DEL_SLUG, { type: 'concept', title: 'D default', compiled_truth: '' }); + await engine.putPage(DEL_SLUG, { type: 'concept', title: 'D testsrc', compiled_truth: '' }, { sourceId: 'testsrc' }); + + // Pre-fix: bare `DELETE FROM pages WHERE slug = $1` would have hard-deleted + // BOTH rows across sources. Post-fix: only the testsrc row goes. + await engine.deletePage(DEL_SLUG, { sourceId: 'testsrc' }); + + const rows = await engine.executeRaw<{ source_id: string }>( + `SELECT source_id FROM pages WHERE slug = $1`, + [DEL_SLUG], + ); + expect(rows.length).toBe(1); + expect(rows[0].source_id).toBe('default'); + }); + + test('deletePage without opts targets source=default only (back-compat)', async () => { + // Recreate the testsrc row to test that default-source delete leaves it. + await engine.putPage(DEL_SLUG, { type: 'concept', title: 'D testsrc back', compiled_truth: '' }, { sourceId: 'testsrc' }); + await engine.deletePage(DEL_SLUG); // no opts → defaults to 'default' + + const rows = await engine.executeRaw<{ source_id: string }>( + `SELECT source_id FROM pages WHERE slug = $1`, + [DEL_SLUG], + ); + expect(rows.length).toBe(1); + expect(rows[0].source_id).toBe('testsrc'); + }); + + test('updateSlug with opts.sourceId only renames the intended source row', async () => { + // Set up: same slug under both default and testsrc. + await engine.putPage(REN_FROM, { type: 'concept', title: 'R default', compiled_truth: '' }); + await engine.putPage(REN_FROM, { type: 'concept', title: 'R testsrc', compiled_truth: '' }, { sourceId: 'testsrc' }); + + // Pre-fix: bare `UPDATE pages SET slug = $new WHERE slug = $old` would have + // hit both rows; if REN_TO already existed in either source, the (source_id, + // slug) UNIQUE would fail. Post-fix: only the testsrc row gets renamed. + await engine.updateSlug(REN_FROM, REN_TO, { sourceId: 'testsrc' }); + + const fromRows = await engine.executeRaw<{ source_id: string }>( + `SELECT source_id FROM pages WHERE slug = $1 ORDER BY source_id`, + [REN_FROM], + ); + expect(fromRows.length).toBe(1); + expect(fromRows[0].source_id).toBe('default'); + + const toRows = await engine.executeRaw<{ source_id: string }>( + `SELECT source_id FROM pages WHERE slug = $1`, + [REN_TO], + ); + expect(toRows.length).toBe(1); + expect(toRows[0].source_id).toBe('testsrc'); + }); + + test('getChunks with opts.sourceId only returns the intended source\'s chunks', async () => { + // Set up: same slug under both default and testsrc, each with distinct chunks. + const CHUNK_SLUG = 'topics/regression-getchunks'; + await engine.putPage(CHUNK_SLUG, { type: 'concept', title: 'C default', compiled_truth: '' }); + await engine.putPage(CHUNK_SLUG, { type: 'concept', title: 'C testsrc', compiled_truth: '' }, { sourceId: 'testsrc' }); + await engine.upsertChunks(CHUNK_SLUG, [ + { chunk_index: 0, chunk_text: 'default chunk text', chunk_source: 'compiled_truth' }, + ], { sourceId: 'default' }); + await engine.upsertChunks(CHUNK_SLUG, [ + { chunk_index: 0, chunk_text: 'testsrc chunk text', chunk_source: 'compiled_truth' }, + ], { sourceId: 'testsrc' }); + + // Pre-fix: bare-slug `WHERE p.slug = $1` returned BOTH source's chunks + // mashed together. importCodeFile uses getChunks for incremental embedding + // reuse; pre-fix would have grabbed the wrong source's embeddings. + const defaultChunks = await engine.getChunks(CHUNK_SLUG, { sourceId: 'default' }); + expect(defaultChunks.length).toBe(1); + expect(defaultChunks[0].chunk_text).toBe('default chunk text'); + + const testsrcChunks = await engine.getChunks(CHUNK_SLUG, { sourceId: 'testsrc' }); + expect(testsrcChunks.length).toBe(1); + expect(testsrcChunks[0].chunk_text).toBe('testsrc chunk text'); + }); + + test('updateSlug without opts targets source=default only (back-compat)', async () => { + // Default still has REN_FROM. Rename it without opts; testsrc REN_TO + // already exists, so a bare rename would fail (source_id, slug) UNIQUE + // when both default and testsrc converge on REN_TO. Source-scoped rename + // succeeds because testsrc is untouched. + const REN_TO_2 = 'topics/regression-rename-to-2'; + await engine.updateSlug(REN_FROM, REN_TO_2); + + const rows = await engine.executeRaw<{ source_id: string; slug: string }>( + `SELECT source_id, slug FROM pages WHERE slug IN ($1, $2) ORDER BY source_id`, + [REN_FROM, REN_TO_2], + ); + expect(rows.length).toBe(1); + expect(rows[0].source_id).toBe('default'); + expect(rows[0].slug).toBe(REN_TO_2); + }); +});