diff --git a/src/commands/autopilot.ts b/src/commands/autopilot.ts index 2188d64f5..d967ea26d 100644 --- a/src/commands/autopilot.ts +++ b/src/commands/autopilot.ts @@ -427,6 +427,12 @@ async function installDaemon(engine: BrainEngine, args: string[]) { } function installLaunchd(wrapperPath: string, home: string, repoPath: string) { + // StartInterval (run every N seconds) instead of KeepAlive (restart + // immediately on exit). Prior behavior was a hot loop: each cycle exited + // in seconds and launchd respawned instantly, producing ~4200 cycles/day + // and ~400-600 GB/day of egress when the brain was fully embedded. + // 300s = 5 min cycles = 288/day max, plenty for any reasonable ingest + // cadence. Tunable per-user by editing the plist directly. const plist = ` @@ -436,7 +442,7 @@ function installLaunchd(wrapperPath: string, home: string, repoPath: string) { ${escapeXml(wrapperPath)} RunAtLoad - KeepAlive + StartInterval300 StandardOutPath${escapeXml(home)}/.gbrain/autopilot.log StandardErrorPath${escapeXml(home)}/.gbrain/autopilot.err diff --git a/src/commands/embed.ts b/src/commands/embed.ts index 0152ccf85..0ea968a7c 100644 --- a/src/commands/embed.ts +++ b/src/commands/embed.ts @@ -44,9 +44,21 @@ export interface EmbedResult { skipped: number; /** Chunks that would be embedded if not for dryRun (0 in non-dryRun). */ would_embed: number; - /** Total chunks considered across all processed pages. */ + /** + * Total chunks considered across all processed pages. + * + * In `--stale` mode, fully-embedded pages are pre-filtered out and never + * visited, so this count reflects only chunks on pages that had at least + * one stale chunk — not the entire brain. + */ total_chunks: number; - /** Number of pages processed (whether or not they had stale chunks). */ + /** + * Number of pages processed. + * + * In `--stale` mode, equals the number of pages with at least one stale + * chunk (pre-filtered via listStalePageSlugs). In `--all` mode, equals + * every page in the brain. + */ pages_processed: number; /** True if this run was a dry-run. */ dryRun: boolean; @@ -220,7 +232,30 @@ async function embedAll( result: EmbedResult, onProgress?: (done: number, total: number, embedded: number) => void, ) { - const pages = await engine.listPages({ limit: 100000 }); + // Stale-only path: ask the DB which pages actually have unembedded chunks + // before pulling the full page list. In steady state (brain fully + // embedded), this query returns 0 rows and we exit without a single + // getChunks round-trip — vs. the previous behavior of calling getChunks + // on every page just to filter to zero in memory. Cuts autopilot egress + // from ~100MB/cycle to ~few KB/cycle. + let pages: Awaited>; + if (staleOnly) { + const staleSlugs = await engine.listStalePageSlugs(); + if (staleSlugs.length === 0) { + onProgress?.(0, 0, 0); + if (dryRun) { + console.log(`[dry-run] Would embed 0 chunks across 0 pages`); + } else { + console.log(`Embedded 0 chunks across 0 pages`); + } + return; + } + const staleSet = new Set(staleSlugs); + const allPages = await engine.listPages({ limit: 100000 }); + pages = allPages.filter(p => staleSet.has(p.slug)); + } else { + pages = await engine.listPages({ limit: 100000 }); + } let processed = 0; // Concurrency limit for parallel page embedding. diff --git a/src/core/engine.ts b/src/core/engine.ts index 542324676..678dcdacf 100644 --- a/src/core/engine.ts +++ b/src/core/engine.ts @@ -133,6 +133,13 @@ export interface BrainEngine { upsertChunks(slug: string, chunks: ChunkInput[]): Promise; getChunks(slug: string): Promise; deleteChunks(slug: string): Promise; + /** + * Return page slugs that have at least one chunk with embedding IS NULL. + * Used by `embed --stale` to skip the all-pages scan when the brain is + * already fully embedded (avoids 682+ wasted getChunks round-trips per + * autopilot cycle). + */ + listStalePageSlugs(): Promise; // Links /** diff --git a/src/core/pglite-engine.ts b/src/core/pglite-engine.ts index d0c084dd7..3cc75821b 100644 --- a/src/core/pglite-engine.ts +++ b/src/core/pglite-engine.ts @@ -343,8 +343,12 @@ export class PGLiteEngine implements BrainEngine { } async getChunks(slug: string): Promise { + // Explicit projection: omit embedding column. See postgres-engine.ts + // getChunks for rationale. const { rows } = await this.db.query( - `SELECT cc.* FROM content_chunks cc + `SELECT cc.id, cc.page_id, cc.chunk_index, cc.chunk_text, cc.chunk_source, + cc.model, cc.token_count, cc.embedded_at + FROM content_chunks cc JOIN pages p ON p.id = cc.page_id WHERE p.slug = $1 ORDER BY cc.chunk_index`, @@ -353,6 +357,18 @@ export class PGLiteEngine implements BrainEngine { return (rows as Record[]).map(r => rowToChunk(r)); } + async listStalePageSlugs(): Promise { + const { rows } = await this.db.query( + `SELECT DISTINCT p.slug + FROM pages p + JOIN content_chunks cc ON cc.page_id = p.id + WHERE cc.embedding IS NULL + ORDER BY p.slug`, + [] + ); + return (rows as { slug: string }[]).map(r => r.slug); + } + async deleteChunks(slug: string): Promise { await this.db.query( `DELETE FROM content_chunks diff --git a/src/core/postgres-engine.ts b/src/core/postgres-engine.ts index 0deebeca6..c5ef589d2 100644 --- a/src/core/postgres-engine.ts +++ b/src/core/postgres-engine.ts @@ -373,8 +373,15 @@ export class PostgresEngine implements BrainEngine { async getChunks(slug: string): Promise { const sql = this.sql; + // Explicit projection: omit the 1536-dim embedding column. rowToChunk's + // default (includeEmbedding=false) discards it anyway, but pulling it + // across the wire is the bulk of egress on hot-loop callers (embed + // --stale, autopilot). Callers that need the vector use + // getChunksWithEmbeddings() instead. const rows = await sql` - SELECT cc.* FROM content_chunks cc + SELECT cc.id, cc.page_id, cc.chunk_index, cc.chunk_text, cc.chunk_source, + cc.model, cc.token_count, cc.embedded_at + FROM content_chunks cc JOIN pages p ON p.id = cc.page_id WHERE p.slug = ${slug} ORDER BY cc.chunk_index @@ -382,6 +389,18 @@ export class PostgresEngine implements BrainEngine { return rows.map((r) => rowToChunk(r as Record)); } + async listStalePageSlugs(): Promise { + const sql = this.sql; + const rows = await sql` + SELECT DISTINCT p.slug + FROM pages p + JOIN content_chunks cc ON cc.page_id = p.id + WHERE cc.embedding IS NULL + ORDER BY p.slug + `; + return rows.map((r) => (r as { slug: string }).slug); + } + async deleteChunks(slug: string): Promise { const sql = this.sql; await sql` diff --git a/test/embed.test.ts b/test/embed.test.ts index 0c9992307..33b100fe2 100644 --- a/test/embed.test.ts +++ b/test/embed.test.ts @@ -25,6 +25,18 @@ mock.module('../src/core/embedding.ts', () => ({ // Import AFTER mocking. const { runEmbed } = await import('../src/commands/embed.ts'); +// Derive the slugs that have at least one stale (embedded_at: null/falsy) +// chunk from a test's chunksBySlug map. Test fixtures inject this as the +// listStalePageSlugs override so the embedAll early-exit path matches what +// the test data actually contains. +function deriveStaleSlugs(chunksBySlug: Map): string[] { + const out: string[] = []; + for (const [slug, chunks] of chunksBySlug) { + if (chunks.some(c => !c.embedded_at)) out.push(slug); + } + return out.sort(); +} + // Proxy-based mock engine that matches test/import-file.test.ts pattern. function mockEngine(overrides: Partial> = {}): BrainEngine { const calls: { method: string; args: any[] }[] = []; @@ -115,6 +127,7 @@ describe('runEmbed --all (parallel)', () => { const engine = mockEngine({ listPages: async () => pages, getChunks: async (slug: string) => chunksBySlug.get(slug) || [], + listStalePageSlugs: async () => deriveStaleSlugs(chunksBySlug), upsertChunks: async () => {}, }); @@ -150,6 +163,7 @@ describe('runEmbedCore --dry-run never calls the embedding model', () => { const engine = mockEngine({ listPages: async () => pages, getChunks: async (slug: string) => chunksBySlug.get(slug) || [], + listStalePageSlugs: async () => deriveStaleSlugs(chunksBySlug), upsertChunks: async (slug: string) => { upserts.push(slug); }, }); @@ -189,6 +203,7 @@ describe('runEmbedCore --dry-run never calls the embedding model', () => { const engine = mockEngine({ listPages: async () => pages, getChunks: async (slug: string) => chunksBySlug.get(slug) || [], + listStalePageSlugs: async () => deriveStaleSlugs(chunksBySlug), upsertChunks: async () => {}, }); @@ -197,9 +212,12 @@ describe('runEmbedCore --dry-run never calls the embedding model', () => { expect(totalEmbedCalls).toBe(0); expect(result.dryRun).toBe(true); expect(result.would_embed).toBe(3); // 1 from 'partial' + 2 from 'all-stale' - expect(result.skipped).toBe(3); // 2 from 'fresh' + 1 from 'partial' - expect(result.total_chunks).toBe(6); - expect(result.pages_processed).toBe(3); + // With the stale-prefilter optimization, fully-embedded pages are + // skipped entirely (not visited), so their chunks no longer contribute + // to skipped/total_chunks/pages_processed. 'fresh' is excluded. + expect(result.skipped).toBe(1); // 1 already-embedded chunk from 'partial' + expect(result.total_chunks).toBe(4); // 2 from 'partial' + 2 from 'all-stale' + expect(result.pages_processed).toBe(2); // 'partial' + 'all-stale' }); test('dry-run --slugs on a single page counts stale chunks, no API calls', async () => { @@ -240,6 +258,7 @@ describe('runEmbedCore --dry-run never calls the embedding model', () => { const engine = mockEngine({ listPages: async () => pages, getChunks: async (slug: string) => chunksBySlug.get(slug) || [], + listStalePageSlugs: async () => deriveStaleSlugs(chunksBySlug), upsertChunks: async () => {}, });