Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions src/commands/extract.ts
Original file line number Diff line number Diff line change
Expand Up @@ -668,32 +668,53 @@ async function extractTimelineFromDir(

// --- Sync integration hooks ---

export async function extractLinksForSlugs(engine: BrainEngine, repoPath: string, slugs: string[]): Promise<number> {
export async function extractLinksForSlugs(
engine: BrainEngine,
repoPath: string,
slugs: string[],
opts?: { sourceId?: string },
): Promise<number> {
const allFiles = walkMarkdownFiles(repoPath);
const allSlugs = new Set(allFiles.map(f => f.relPath.replace('.md', '')));
// v0.18.0+ multi-source: post-sync extract reconciles same-source edges.
// Markdown→markdown links within one repo always live in the caller's
// sourceId. Cross-source extraction (rare) would need a per-repo source
// manifest; not in this PR's scope.
const linkOpts = opts?.sourceId
? { fromSourceId: opts.sourceId, toSourceId: opts.sourceId, originSourceId: opts.sourceId }
: undefined;
let created = 0;
for (const slug of slugs) {
const filePath = join(repoPath, slug + '.md');
if (!existsSync(filePath)) continue;
try {
const content = readFileSync(filePath, 'utf-8');
for (const link of await extractLinksFromFile(content, slug + '.md', allSlugs)) {
try { await engine.addLink(link.from_slug, link.to_slug, link.context, link.link_type); created++; } catch { /* skip */ }
try { await engine.addLink(link.from_slug, link.to_slug, link.context, link.link_type, undefined, undefined, undefined, linkOpts); created++; } catch { /* skip */ }
}
} catch { /* skip */ }
}
return created;
}

export async function extractTimelineForSlugs(engine: BrainEngine, repoPath: string, slugs: string[]): Promise<number> {
export async function extractTimelineForSlugs(
engine: BrainEngine,
repoPath: string,
slugs: string[],
opts?: { sourceId?: string },
): Promise<number> {
// v0.18.0+ multi-source: source-qualify so timeline rows don't fan out
// across every source containing the slug (the addTimelineEntry's
// INSERT...SELECT-from-pages fan-out was Data R1's HIGH 2).
const entryOpts = opts?.sourceId ? { sourceId: opts.sourceId } : undefined;
let created = 0;
for (const slug of slugs) {
const filePath = join(repoPath, slug + '.md');
if (!existsSync(filePath)) continue;
try {
const content = readFileSync(filePath, 'utf-8');
for (const entry of extractTimelineFromContent(content, slug)) {
try { await engine.addTimelineEntry(entry.slug, { date: entry.date, source: entry.source, summary: entry.summary, detail: entry.detail }); created++; } catch { /* skip */ }
try { await engine.addTimelineEntry(entry.slug, { date: entry.date, source: entry.source, summary: entry.summary, detail: entry.detail }, entryOpts); created++; } catch { /* skip */ }
}
} catch { /* skip */ }
}
Expand Down
32 changes: 25 additions & 7 deletions src/commands/import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,26 @@ export interface RunImportResult {
failures: Array<{ path: string; error: string }>;
}

export async function runImport(engine: BrainEngine, args: string[], opts: { commit?: string } = {}): Promise<RunImportResult> {
export async function runImport(
engine: BrainEngine,
args: string[],
opts: { commit?: string; sourceId?: string; writeSyncConfig?: boolean } = {},
): Promise<RunImportResult> {
const noEmbed = args.includes('--no-embed');
const fresh = args.includes('--fresh');
const jsonOutput = args.includes('--json');
// v0.30.x follow-up to PR #707: programmatic sourceId support so internal
// callers (performFullSync, future Step 6 paths) can route to a named
// source. The CLI `gbrain import` deliberately has no --source flag per
// PR #707's design intent — only programmatic callers thread sourceId.
const sourceId = opts.sourceId;
// v0.30.x follow-up to PR #707: when called from performFullSync (which
// owns its own source-scoped sync anchors via writeSyncAnchor), gate the
// legacy global config writes (sync.last_commit / sync.last_run /
// sync.repo_path) so a `--source X --full` run can't overwrite the
// global repo path with X's repo and contaminate a later bare
// `gbrain sync` against the default source.
const writeSyncConfig = opts.writeSyncConfig !== false;
const workersIdx = args.indexOf('--workers');
const workersArg = workersIdx !== -1 ? args[workersIdx + 1] : null;
// v0.22.13 (PR #490 Q2): shared parseWorkers helper rejects bad input
Expand Down Expand Up @@ -110,8 +126,8 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com
// up images when GBRAIN_EMBEDDING_MULTIMODAL=true so this branch is
// unreachable when the gate is off; defense-in-depth check anyway.
const result = isImageFilePath(relativePath) && process.env.GBRAIN_EMBEDDING_MULTIMODAL === 'true'
? await importImageFile(eng, filePath, relativePath, { noEmbed })
: await importFile(eng, filePath, relativePath, { noEmbed });
? await importImageFile(eng, filePath, relativePath, { noEmbed, sourceId })
: await importFile(eng, filePath, relativePath, { noEmbed, sourceId });
if (result.status === 'imported') {
imported++;
chunksCreated += result.chunks;
Expand Down Expand Up @@ -275,17 +291,19 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com
const { recordSyncFailures } = await import('../core/sync.ts');
recordSyncFailures(failures, gitHead);
}
if (failures.length === 0) {
if (failures.length === 0 && writeSyncConfig) {
await engine.setConfig('sync.last_commit', gitHead);
} else {
} else if (failures.length > 0) {
console.error(
`\nImport completed with ${failures.length} failure(s). ` +
`sync.last_commit NOT advanced — re-run 'gbrain sync' to retry, or ` +
`'gbrain sync --skip-failed' to acknowledge and move past them.`,
);
}
await engine.setConfig('sync.last_run', new Date().toISOString());
await engine.setConfig('sync.repo_path', dir);
if (writeSyncConfig) {
await engine.setConfig('sync.last_run', new Date().toISOString());
await engine.setConfig('sync.repo_path', dir);
}
}

return { imported, skipped, errors, chunksCreated, failures };
Expand Down
12 changes: 7 additions & 5 deletions src/commands/migrate-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,13 @@ export async function runMigrateEngine(sourceEngine: BrainEngine, args: string[]

if (targetStats.page_count > 0 && opts.force) {
console.log('--force: wiping target brain...');
// Delete all pages (cascades to chunks, links, tags, etc.)
const pages = await targetEngine.listPages({ limit: 100000 });
for (const p of pages) {
await targetEngine.deletePage(p.slug);
}
// v0.18.0+ multi-source: deletePage(slug) is now source-scoped (defaults
// to 'default'), so per-page iteration would skip non-default-source
// rows. migrate-engine --force is a destructive wipe across the entire
// brain — all sources, all pages — so we issue a raw DELETE that matches
// the original semantic. Cascades through content_chunks / page_links /
// tags / timeline_entries / page_versions via existing FKs.
await targetEngine.executeRaw('DELETE FROM pages');
}

// Load or create manifest for resume
Expand Down
20 changes: 15 additions & 5 deletions src/commands/reconcile-links.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,16 @@ export async function runReconcileLinks(
// Fetch pages one at a time via getPage (no bulk read helper exists yet).
// On a 47K-page brain this is the slow path; a v0.20.x follow-up can add
// getPagesBatch. For the typical 2K–5K markdown count it's fine.
// v0.18.0+ multi-source: source-scope getPage so reconcile picks up the
// intended-source row for `default`-vs-`<source>` ambiguity. The link
// edges below also propagate the same sourceId (Data R1 MED 1: opt was
// declared on ReconcileLinksOpts but ignored end-to-end).
const getPageOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined;
const linkOpts = opts.sourceId
? { fromSourceId: opts.sourceId, toSourceId: opts.sourceId, originSourceId: opts.sourceId }
: undefined;
for (const mdSlug of mdSlugs) {
const page = await engine.getPage(mdSlug);
const page = await engine.getPage(mdSlug, getPageOpts);
if (!page) {
progress.tick(1, mdSlug);
continue;
Expand All @@ -113,10 +121,12 @@ export async function runReconcileLinks(
const ctx = ref.line ? `cited at ${ref.path}:${ref.line}` : ref.path;
edgesAttempted++;
try {
// Forward: guide documents code. addLink's inner SELECT drops
// silently if codeSlug isn't a page yet (benign — counted below).
await engine.addLink(mdSlug, codeSlug, ctx, 'documents', 'markdown', mdSlug, 'compiled_truth');
await engine.addLink(codeSlug, mdSlug, ref.path, 'documented_by', 'markdown', mdSlug, 'compiled_truth');
// Forward: guide documents code. addLink's inner JOIN drops silently
// if codeSlug isn't a page yet (benign — counted below). Source-
// qualified per opts.sourceId; same-source assumption mirrors the
// import-file.ts:303 doc↔impl auto-link.
await engine.addLink(mdSlug, codeSlug, ctx, 'documents', 'markdown', mdSlug, 'compiled_truth', linkOpts);
await engine.addLink(codeSlug, mdSlug, ref.path, 'documented_by', 'markdown', mdSlug, 'compiled_truth', linkOpts);
} catch (e: unknown) {
// Per-link errors don't abort the batch. Track them for the summary.
const msg = e instanceof Error ? e.message : String(e);
Expand Down
1 change: 1 addition & 0 deletions src/commands/reindex-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ export async function runReindexCode(
const result = await importCodeFile(engine, relPath, row.compiled_truth, {
noEmbed: opts.noEmbed,
force: opts.force,
sourceId: opts.sourceId,
});
if (result.status === 'imported') reindexed++;
else if (result.status === 'skipped') skipped++;
Expand Down
65 changes: 54 additions & 11 deletions src/commands/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,16 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
// strategy=markdown) deletes the actual code-slug page, not a ghost
// markdown-slug that never existed.
const unsyncableModified = manifest.modified.filter(p => !isSyncable(p, syncOpts));
// v0.18.0+ multi-source: scope getPage + deletePage to opts.sourceId so
// unsyncable cleanup in source A doesn't accidentally sweep same-slug
// pages in sources B/C/D.
const pageOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined;
for (const path of unsyncableModified) {
const slug = resolveSlugForPath(path);
try {
const existing = await engine.getPage(slug);
const existing = await engine.getPage(slug, pageOpts);
if (existing) {
await engine.deletePage(slug);
await engine.deletePage(slug, pageOpts);
console.log(` Deleted un-syncable page: ${slug}`);
}
} catch { /* ignore */ }
Expand Down Expand Up @@ -550,11 +554,14 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy

// Process deletes first (prevents slug conflicts). SP-5: resolveSlugForPath
// dispatches to the right slug shape so code file deletes hit the real page.
// v0.18.0+ multi-source: scope deletePage so we only delete the source-A
// row, not every same-slug row across all sources.
const deleteOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined;
if (filtered.deleted.length > 0) {
progress.start('sync.deletes', filtered.deleted.length);
for (const path of filtered.deleted) {
const slug = resolveSlugForPath(path);
await engine.deletePage(slug);
await engine.deletePage(slug, deleteOpts);
pagesAffected.push(slug);
progress.tick(1, slug);
}
Expand All @@ -567,18 +574,22 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
// all resolve to the right slug shape for each side.
if (filtered.renamed.length > 0) {
progress.start('sync.renames', filtered.renamed.length);
// v0.18.0+ multi-source: scope updateSlug so the rename only touches the
// source-A row, not every same-slug row across sources (which would
// either sweep them all OR violate (source_id, slug) UNIQUE).
const renameOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined;
for (const { from, to } of filtered.renamed) {
const oldSlug = resolveSlugForPath(from);
const newSlug = resolveSlugForPath(to);
try {
await engine.updateSlug(oldSlug, newSlug);
await engine.updateSlug(oldSlug, newSlug, renameOpts);
} catch {
// Slug doesn't exist or collision, treat as add
}
// Reimport at new path (picks up content changes)
const filePath = join(repoPath, to);
if (existsSync(filePath)) {
const result = await importFile(engine, filePath, to, { noEmbed });
const result = await importFile(engine, filePath, to, { noEmbed, sourceId: opts.sourceId });
if (result.status === 'imported') chunksCreated += result.chunks;
}
pagesAffected.push(newSlug);
Expand Down Expand Up @@ -633,7 +644,12 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
return;
}
try {
const result = await importFile(eng, filePath, path, { noEmbed });
// v0.18.0+ multi-source: thread `opts.sourceId` so per-page tx writes
// (putPage / getTags / addTag / removeTag / deleteChunks / upsertChunks
// / addLink) target (sourceId, slug). Pre-fix the schema DEFAULT
// 'default' was applied even for non-default sources, fabricating
// duplicate rows that crashed bare-slug subqueries with Postgres 21000.
const result = await importFile(eng, filePath, path, { noEmbed, sourceId: opts.sourceId });
if (result.status === 'imported') {
chunksCreated += result.chunks;
pagesAffected.push(result.slug);
Expand Down Expand Up @@ -803,19 +819,32 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
summary: `Sync: +${filtered.added.length} ~${filtered.modified.length} -${filtered.deleted.length} R${filtered.renamed.length}, ${chunksCreated} chunks, ${elapsed}ms`,
});

// Auto-extract links + timeline (always, extraction is cheap CPU)
// Auto-extract links + timeline (always, extraction is cheap CPU).
// Thread opts.sourceId so the extract phase reconciles edges + timeline
// entries against the right source — pre-fix (Data R1 HIGH 1) this phase
// bypassed sourceId entirely and the bare-slug subquery in addTimelineEntry
// (Data R1 HIGH 2) crashed with 21000 in multi-source brains.
const extractOpts = opts.sourceId ? { sourceId: opts.sourceId } : undefined;
if (!opts.noExtract && pagesAffected.length > 0) {
try {
const { extractLinksForSlugs, extractTimelineForSlugs } = await import('./extract.ts');
const linksCreated = await extractLinksForSlugs(engine, repoPath, pagesAffected);
const timelineCreated = await extractTimelineForSlugs(engine, repoPath, pagesAffected);
const linksCreated = await extractLinksForSlugs(engine, repoPath, pagesAffected, extractOpts);
const timelineCreated = await extractTimelineForSlugs(engine, repoPath, pagesAffected, extractOpts);
if (linksCreated > 0 || timelineCreated > 0) {
console.log(` Extracted: ${linksCreated} links, ${timelineCreated} timeline entries`);
}
} catch { /* extraction is best-effort */ }
}

// Auto-embed (skip for large syncs — embedding calls OpenAI)
// Auto-embed (skip for large syncs — embedding calls OpenAI).
// TODO(multi-source): runEmbed → src/commands/embed.ts:175 + :418 call
// upsertChunks defaulting to source='default'. For non-default-source syncs
// the page row lives at (sourceId, slug) so this fails with "Page not found"
// OR (when a same-slug 'default' row coexists) updates the wrong source's
// chunks. Data R1 MED 2 — deferred to a follow-up PR; threading sourceId
// through embed.ts is a larger refactor than this fix's scope. The current
// try/catch swallows the failure as best-effort, so the sync result still
// reports `embedded: 0` for the right reason.
let embedded = 0;
if (!noEmbed && pagesAffected.length > 0 && pagesAffected.length <= 100) {
try {
Expand Down Expand Up @@ -889,7 +918,21 @@ async function performFullSync(
const importArgs = [repoPath];
if (opts.noEmbed) importArgs.push('--no-embed');
if (fullConcurrency > 1) importArgs.push('--workers', String(fullConcurrency));
const result = await runImport(engine, importArgs, { commit: headCommit });
// v0.30.x follow-up to PR #707:
// - Thread sourceId through runImport's opts so performFullSync routes
// pages to the named source (the incremental sync path in this same
// file already does this).
// - Pass writeSyncConfig=false so runImport doesn't overwrite global
// `sync.last_commit` / `sync.last_run` / `sync.repo_path` keys with this
// source's values. performFullSync owns its own source-scoped anchors
// via writeSyncAnchor (below); without this gate, a `--source X --full`
// run leaves the global keys pointing at X, so a later bare
// `gbrain sync` reads X's repo path as the default-source repo.
const result = await runImport(engine, importArgs, {
commit: headCommit,
sourceId: opts.sourceId,
writeSyncConfig: !opts.sourceId,
});

// Bug 9 — gate the full-sync bookmark on success. runImport already
// writes its own sync.last_commit conditionally (import.ts), but
Expand Down
Loading