Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions src/commands/import.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,35 @@ export interface RunImportResult {
failures: Array<{ path: string; error: string }>;
}

export async function runImport(engine: BrainEngine, args: string[], opts: { commit?: string } = {}): Promise<RunImportResult> {
async function writeImportSyncAnchor(
engine: BrainEngine,
sourceId: string | undefined,
which: 'repo_path' | 'last_commit',
value: string,
): Promise<void> {
if (sourceId !== undefined) {
const col = which === 'repo_path' ? 'local_path' : 'last_commit';
if (which === 'last_commit') {
await engine.executeRaw(
`UPDATE sources SET last_commit = $1, last_sync_at = now() WHERE id = $2`,
[value, sourceId],
);
} else {
await engine.executeRaw(
`UPDATE sources SET ${col} = $1 WHERE id = $2`,
[value, sourceId],
);
}
return;
}
await engine.setConfig(`sync.${which}`, value);
}

export async function runImport(
engine: BrainEngine,
args: string[],
opts: { commit?: string; sourceId?: string } = {},
): Promise<RunImportResult> {
const noEmbed = args.includes('--no-embed');
const fresh = args.includes('--fresh');
const jsonOutput = args.includes('--json');
Expand Down Expand Up @@ -105,7 +133,7 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com
async function processFile(eng: BrainEngine, filePath: string) {
const relativePath = relative(dir, filePath);
try {
const result = await importFile(eng, filePath, relativePath, { noEmbed });
const result = await importFile(eng, filePath, relativePath, { noEmbed, sourceId: opts.sourceId });
if (result.status === 'imported') {
imported++;
chunksCreated += result.chunks;
Expand Down Expand Up @@ -270,7 +298,7 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com
recordSyncFailures(failures, gitHead);
}
if (failures.length === 0) {
await engine.setConfig('sync.last_commit', gitHead);
await writeImportSyncAnchor(engine, opts.sourceId, 'last_commit', gitHead);
} else {
console.error(
`\nImport completed with ${failures.length} failure(s). ` +
Expand All @@ -279,7 +307,7 @@ export async function runImport(engine: BrainEngine, args: string[], opts: { com
);
}
await engine.setConfig('sync.last_run', new Date().toISOString());
await engine.setConfig('sync.repo_path', dir);
await writeImportSyncAnchor(engine, opts.sourceId, 'repo_path', dir);
}

return { imported, skipped, errors, chunksCreated, failures };
Expand Down
23 changes: 12 additions & 11 deletions src/commands/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ async function readSyncAnchor(
sourceId: string | undefined,
which: 'repo_path' | 'last_commit',
): Promise<string | null> {
if (sourceId) {
if (sourceId !== undefined) {
const col = which === 'repo_path' ? 'local_path' : 'last_commit';
const rows = await engine.executeRaw<Record<string, string | null>>(
`SELECT ${col} AS value FROM sources WHERE id = $1`,
Expand All @@ -223,7 +223,7 @@ async function writeSyncAnchor(
which: 'repo_path' | 'last_commit',
value: string,
): Promise<void> {
if (sourceId) {
if (sourceId !== undefined) {
const col = which === 'repo_path' ? 'local_path' : 'last_commit';
// last_sync_at bookmarked on every last_commit advance.
if (which === 'last_commit') {
Expand Down Expand Up @@ -259,7 +259,7 @@ async function readChunkerVersion(
engine: BrainEngine,
sourceId: string | undefined,
): Promise<string | null> {
if (!sourceId) return null;
if (sourceId === undefined) return null;
const rows = await engine.executeRaw<{ chunker_version: string | null }>(
`SELECT chunker_version FROM sources WHERE id = $1`,
[sourceId],
Expand All @@ -272,7 +272,7 @@ async function writeChunkerVersion(
sourceId: string | undefined,
version: string,
): Promise<void> {
if (!sourceId) return;
if (sourceId === undefined) return;
await engine.executeRaw(
`UPDATE sources SET chunker_version = $1 WHERE id = $2`,
[version, sourceId],
Expand Down Expand Up @@ -316,7 +316,7 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
// Resolve repo path
const repoPath = opts.repoPath || await readSyncAnchor(engine, opts.sourceId, 'repo_path');
if (!repoPath) {
const hint = opts.sourceId
const hint = opts.sourceId !== undefined
? `Source "${opts.sourceId}" has no local_path. Run: gbrain sources add ${opts.sourceId} --path <path>`
: `No repo path specified. Use --repo or run gbrain init with --repo first.`;
throw new Error(hint);
Expand Down Expand Up @@ -432,9 +432,10 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
for (const path of unsyncableModified) {
const slug = resolveSlugForPath(path);
try {
const existing = await engine.getPage(slug);
const sourceOpts = opts.sourceId !== undefined ? { sourceId: opts.sourceId } : undefined;
const existing = await engine.getPage(slug, sourceOpts);
if (existing) {
await engine.deletePage(slug);
await engine.deletePage(slug, sourceOpts);
console.log(` Deleted un-syncable page: ${slug}`);
}
} catch { /* ignore */ }
Expand Down Expand Up @@ -500,7 +501,7 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
progress.start('sync.deletes', filtered.deleted.length);
for (const path of filtered.deleted) {
const slug = resolveSlugForPath(path);
await engine.deletePage(slug);
await engine.deletePage(slug, opts.sourceId !== undefined ? { sourceId: opts.sourceId } : undefined);
pagesAffected.push(slug);
progress.tick(1, slug);
}
Expand All @@ -524,7 +525,7 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
// Reimport at new path (picks up content changes)
const filePath = join(repoPath, to);
if (existsSync(filePath)) {
const result = await importFile(engine, filePath, to, { noEmbed });
const result = await importFile(engine, filePath, to, { noEmbed, sourceId: opts.sourceId });
if (result.status === 'imported') chunksCreated += result.chunks;
}
pagesAffected.push(newSlug);
Expand Down Expand Up @@ -579,7 +580,7 @@ async function performSyncInner(engine: BrainEngine, opts: SyncOpts): Promise<Sy
return;
}
try {
const result = await importFile(eng, filePath, path, { noEmbed });
const result = await importFile(eng, filePath, path, { noEmbed, sourceId: opts.sourceId });
if (result.status === 'imported') {
chunksCreated += result.chunks;
pagesAffected.push(result.slug);
Expand Down Expand Up @@ -835,7 +836,7 @@ async function performFullSync(
const importArgs = [repoPath];
if (opts.noEmbed) importArgs.push('--no-embed');
if (fullConcurrency > 1) importArgs.push('--workers', String(fullConcurrency));
const result = await runImport(engine, importArgs, { commit: headCommit });
const result = await runImport(engine, importArgs, { commit: headCommit, sourceId: opts.sourceId });

// Bug 9 — gate the full-sync bookmark on success. runImport already
// writes its own sync.last_commit conditionally (import.ts), but
Expand Down
22 changes: 11 additions & 11 deletions src/core/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ export interface BrainEngine {
* by `restore_page` flow, and by operator diagnostics.
*/
getPage(slug: string, opts?: GetPageOpts): Promise<Page | null>;
putPage(slug: string, page: PageInput): Promise<Page>;
putPage(slug: string, page: PageInput, opts?: { sourceId?: string }): Promise<Page>;
/**
* Hard-delete a page row. Cascades to content_chunks, page_links,
* chunk_relations via existing FK ON DELETE CASCADE.
Expand All @@ -146,7 +146,7 @@ export interface BrainEngine {
* as the underlying primitive used by `purgeDeletedPages` and by callers
* that explicitly want hard-delete semantics (e.g. test setup teardown).
*/
deletePage(slug: string): Promise<void>;
deletePage(slug: string, opts?: { sourceId?: string }): Promise<void>;
/**
* v0.26.5 — set `deleted_at = now()` on a page. Returns the slug if a row
* was soft-deleted, null if no row matched (already soft-deleted OR not found).
Expand Down Expand Up @@ -185,8 +185,8 @@ export interface BrainEngine {
getEmbeddingsByChunkIds(ids: number[]): Promise<Map<number, Float32Array>>;

// Chunks
upsertChunks(slug: string, chunks: ChunkInput[]): Promise<void>;
getChunks(slug: string): Promise<Chunk[]>;
upsertChunks(slug: string, chunks: ChunkInput[], opts?: { sourceId?: string }): Promise<void>;
getChunks(slug: string, opts?: { sourceId?: string }): Promise<Chunk[]>;
/**
* Count chunks across the entire brain where embedded_at IS NULL.
* Pre-flight short-circuit for `embed --stale` so a 100%-embedded brain
Expand All @@ -202,7 +202,7 @@ export interface BrainEngine {
* Bounded by an internal LIMIT of 100000 to mirror listPages.
*/
listStaleChunks(): Promise<StaleChunkRow[]>;
deleteChunks(slug: string): Promise<void>;
deleteChunks(slug: string, opts?: { sourceId?: string }): Promise<void>;

// Links
/**
Expand Down Expand Up @@ -283,9 +283,9 @@ export interface BrainEngine {
findOrphanPages(): Promise<Array<{ slug: string; title: string; domain: string | null }>>;

// Tags
addTag(slug: string, tag: string): Promise<void>;
removeTag(slug: string, tag: string): Promise<void>;
getTags(slug: string): Promise<string[]>;
addTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise<void>;
removeTag(slug: string, tag: string, opts?: { sourceId?: string }): Promise<void>;
getTags(slug: string, opts?: { sourceId?: string }): Promise<string[]>;

// Timeline
/**
Expand Down Expand Up @@ -319,9 +319,9 @@ export interface BrainEngine {
putDreamVerdict(filePath: string, contentHash: string, verdict: DreamVerdictInput): Promise<void>;

// Versions
createVersion(slug: string): Promise<PageVersion>;
getVersions(slug: string): Promise<PageVersion[]>;
revertToVersion(slug: string, versionId: number): Promise<void>;
createVersion(slug: string, opts?: { sourceId?: string }): Promise<PageVersion>;
getVersions(slug: string, opts?: { sourceId?: string }): Promise<PageVersion[]>;
revertToVersion(slug: string, versionId: number, opts?: { sourceId?: string }): Promise<void>;

// Stats + health
getStats(): Promise<BrainStats>;
Expand Down
53 changes: 33 additions & 20 deletions src/core/import-file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ export interface ImportResult {
parsedPage?: ParsedPage;
}

interface ImportOptions {
noEmbed?: boolean;
inferFrontmatter?: boolean;
force?: boolean;
sourceId?: string;
}

function sourceOptions(sourceId: string | undefined): { sourceId: string } | undefined {
return sourceId !== undefined ? { sourceId } : undefined;
}

const MAX_FILE_SIZE = 5_000_000; // 5MB

/**
Expand All @@ -185,7 +196,7 @@ export async function importFromContent(
engine: BrainEngine,
slug: string,
content: string,
opts: { noEmbed?: boolean } = {},
opts: ImportOptions = {},
): Promise<ImportResult> {
// Reject oversized payloads before any parsing, chunking, or embedding happens.
// Uses Buffer.byteLength to count UTF-8 bytes the same way disk size would,
Expand Down Expand Up @@ -223,7 +234,8 @@ export async function importFromContent(
tags: parsed.tags,
};

const existing = await engine.getPage(slug);
const sourceOpts = sourceOptions(opts.sourceId);
const existing = await engine.getPage(slug, sourceOpts);
if (existing?.content_hash === hash) {
return { slug, status: 'skipped', chunks: 0, parsedPage };
}
Expand Down Expand Up @@ -261,7 +273,7 @@ export async function importFromContent(

// Transaction wraps all DB writes
await engine.transaction(async (tx) => {
if (existing) await tx.createVersion(slug);
if (existing) await tx.createVersion(slug, sourceOpts);

await tx.putPage(slug, {
type: parsed.type,
Expand All @@ -270,23 +282,23 @@ export async function importFromContent(
timeline: parsed.timeline || '',
frontmatter: parsed.frontmatter,
content_hash: hash,
});
}, sourceOpts);

// Tag reconciliation: remove stale, add current
const existingTags = await tx.getTags(slug);
const existingTags = await tx.getTags(slug, sourceOpts);
const newTags = new Set(parsed.tags);
for (const old of existingTags) {
if (!newTags.has(old)) await tx.removeTag(slug, old);
if (!newTags.has(old)) await tx.removeTag(slug, old, sourceOpts);
}
for (const tag of parsed.tags) {
await tx.addTag(slug, tag);
await tx.addTag(slug, tag, sourceOpts);
}

if (chunks.length > 0) {
await tx.upsertChunks(slug, chunks);
await tx.upsertChunks(slug, chunks, sourceOpts);
} else {
// Content is empty — delete stale chunks so they don't ghost in search results
await tx.deleteChunks(slug);
await tx.deleteChunks(slug, sourceOpts);
}

// v0.19.0 E1 — doc↔impl linking: if this markdown page cites code paths
Expand Down Expand Up @@ -333,7 +345,7 @@ export async function importFromFile(
engine: BrainEngine,
filePath: string,
relativePath: string,
opts: { noEmbed?: boolean; inferFrontmatter?: boolean } = {},
opts: ImportOptions = {},
): Promise<ImportResult> {
// Defense-in-depth: reject symlinks before reading content.
const lstat = lstatSync(filePath);
Expand Down Expand Up @@ -398,7 +410,7 @@ export async function importCodeFile(
engine: BrainEngine,
relativePath: string,
content: string,
opts: { noEmbed?: boolean; force?: boolean } = {},
opts: ImportOptions = {},
): Promise<ImportResult> {
const slug = slugifyCodePath(relativePath);
const lang = detectCodeLanguage(relativePath) || 'unknown';
Expand All @@ -415,7 +427,8 @@ export async function importCodeFile(
.update(JSON.stringify({ title, type: 'code', content, lang, chunker_version: CHUNKER_VERSION }))
.digest('hex');

const existing = await engine.getPage(slug);
const sourceOpts = sourceOptions(opts.sourceId);
const existing = await engine.getPage(slug, sourceOpts);
if (!opts.force && existing?.content_hash === hash) {
return { slug, status: 'skipped', chunks: 0 };
}
Expand Down Expand Up @@ -453,7 +466,7 @@ export async function importCodeFile(
// OpenAI API. Order matters: our chunk_index is semantic (tree-sitter
// order), so a matching (chunk_index, text_hash) means a verbatim
// preserved symbol.
const existingChunks = existing ? await engine.getChunks(slug) : [];
const existingChunks = existing ? await engine.getChunks(slug, sourceOpts) : [];
const existingByKey = new Map<string, typeof existingChunks[number]>();
for (const ec of existingChunks) {
existingByKey.set(`${ec.chunk_index}:${ec.chunk_text}`, ec);
Expand Down Expand Up @@ -488,7 +501,7 @@ export async function importCodeFile(

// Store
await engine.transaction(async (tx) => {
if (existing) await tx.createVersion(slug);
if (existing) await tx.createVersion(slug, sourceOpts);

await tx.putPage(slug, {
type: 'code' as PageType,
Expand All @@ -498,15 +511,15 @@ export async function importCodeFile(
timeline: '',
frontmatter: { language: lang, file: relativePath },
content_hash: hash,
});
}, sourceOpts);

await tx.addTag(slug, 'code');
await tx.addTag(slug, lang);
await tx.addTag(slug, 'code', sourceOpts);
await tx.addTag(slug, lang, sourceOpts);

if (chunks.length > 0) {
await tx.upsertChunks(slug, chunks);
await tx.upsertChunks(slug, chunks, sourceOpts);
} else {
await tx.deleteChunks(slug);
await tx.deleteChunks(slug, sourceOpts);
}
});

Expand All @@ -517,7 +530,7 @@ export async function importCodeFile(
// chunk IDs are stable.
if (extractedEdges.length > 0 && chunks.length > 0) {
try {
const persistedChunks = await engine.getChunks(slug);
const persistedChunks = await engine.getChunks(slug, sourceOpts);
const byIndex = new Map<number, { id?: number; symbol_name_qualified?: string | null; start_line?: number | null; end_line?: number | null }>();
for (const pc of persistedChunks) {
byIndex.set(pc.chunk_index, pc);
Expand Down
Loading