Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/commands/autopilot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,12 @@ async function installDaemon(engine: BrainEngine, args: string[]) {
}

function installLaunchd(wrapperPath: string, home: string, repoPath: string) {
// StartInterval (run every N seconds) instead of KeepAlive (restart
// immediately on exit). Prior behavior was a hot loop: each cycle exited
// in seconds and launchd respawned instantly, producing ~4200 cycles/day
// and ~400-600 GB/day of egress when the brain was fully embedded.
// 300s = 5 min cycles = 288/day max, plenty for any reasonable ingest
// cadence. Tunable per-user by editing the plist directly.
const plist = `<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
Expand All @@ -436,7 +442,7 @@ function installLaunchd(wrapperPath: string, home: string, repoPath: string) {
<string>${escapeXml(wrapperPath)}</string>
</array>
<key>RunAtLoad</key><true/>
<key>KeepAlive</key><true/>
<key>StartInterval</key><integer>300</integer>
<key>StandardOutPath</key><string>${escapeXml(home)}/.gbrain/autopilot.log</string>
<key>StandardErrorPath</key><string>${escapeXml(home)}/.gbrain/autopilot.err</string>
</dict>
Expand Down
41 changes: 38 additions & 3 deletions src/commands/embed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,21 @@ export interface EmbedResult {
skipped: number;
/** Chunks that would be embedded if not for dryRun (0 in non-dryRun). */
would_embed: number;
/** Total chunks considered across all processed pages. */
/**
* Total chunks considered across all processed pages.
*
* In `--stale` mode, fully-embedded pages are pre-filtered out and never
* visited, so this count reflects only chunks on pages that had at least
* one stale chunk — not the entire brain.
*/
total_chunks: number;
/** Number of pages processed (whether or not they had stale chunks). */
/**
* Number of pages processed.
*
* In `--stale` mode, equals the number of pages with at least one stale
* chunk (pre-filtered via listStalePageSlugs). In `--all` mode, equals
* every page in the brain.
*/
pages_processed: number;
/** True if this run was a dry-run. */
dryRun: boolean;
Expand Down Expand Up @@ -220,7 +232,30 @@ async function embedAll(
result: EmbedResult,
onProgress?: (done: number, total: number, embedded: number) => void,
) {
const pages = await engine.listPages({ limit: 100000 });
// Stale-only path: ask the DB which pages actually have unembedded chunks
// before pulling the full page list. In steady state (brain fully
// embedded), this query returns 0 rows and we exit without a single
// getChunks round-trip — vs. the previous behavior of calling getChunks
// on every page just to filter to zero in memory. Cuts autopilot egress
// from ~100MB/cycle to ~few KB/cycle.
let pages: Awaited<ReturnType<BrainEngine['listPages']>>;
if (staleOnly) {
const staleSlugs = await engine.listStalePageSlugs();
if (staleSlugs.length === 0) {
onProgress?.(0, 0, 0);
if (dryRun) {
console.log(`[dry-run] Would embed 0 chunks across 0 pages`);
} else {
console.log(`Embedded 0 chunks across 0 pages`);
}
return;
}
const staleSet = new Set(staleSlugs);
const allPages = await engine.listPages({ limit: 100000 });
pages = allPages.filter(p => staleSet.has(p.slug));
} else {
pages = await engine.listPages({ limit: 100000 });
}
let processed = 0;

// Concurrency limit for parallel page embedding.
Expand Down
7 changes: 7 additions & 0 deletions src/core/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ export interface BrainEngine {
upsertChunks(slug: string, chunks: ChunkInput[]): Promise<void>;
getChunks(slug: string): Promise<Chunk[]>;
deleteChunks(slug: string): Promise<void>;
/**
* Return page slugs that have at least one chunk with embedding IS NULL.
* Used by `embed --stale` to skip the all-pages scan when the brain is
* already fully embedded (avoids 682+ wasted getChunks round-trips per
* autopilot cycle).
*/
listStalePageSlugs(): Promise<string[]>;

// Links
/**
Expand Down
18 changes: 17 additions & 1 deletion src/core/pglite-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,12 @@ export class PGLiteEngine implements BrainEngine {
}

async getChunks(slug: string): Promise<Chunk[]> {
// Explicit projection: omit embedding column. See postgres-engine.ts
// getChunks for rationale.
const { rows } = await this.db.query(
`SELECT cc.* FROM content_chunks cc
`SELECT cc.id, cc.page_id, cc.chunk_index, cc.chunk_text, cc.chunk_source,
cc.model, cc.token_count, cc.embedded_at
FROM content_chunks cc
JOIN pages p ON p.id = cc.page_id
WHERE p.slug = $1
ORDER BY cc.chunk_index`,
Expand All @@ -353,6 +357,18 @@ export class PGLiteEngine implements BrainEngine {
return (rows as Record<string, unknown>[]).map(r => rowToChunk(r));
}

async listStalePageSlugs(): Promise<string[]> {
const { rows } = await this.db.query(
`SELECT DISTINCT p.slug
FROM pages p
JOIN content_chunks cc ON cc.page_id = p.id
WHERE cc.embedding IS NULL
ORDER BY p.slug`,
[]
);
return (rows as { slug: string }[]).map(r => r.slug);
}

async deleteChunks(slug: string): Promise<void> {
await this.db.query(
`DELETE FROM content_chunks
Expand Down
21 changes: 20 additions & 1 deletion src/core/postgres-engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,15 +373,34 @@ export class PostgresEngine implements BrainEngine {

async getChunks(slug: string): Promise<Chunk[]> {
const sql = this.sql;
// Explicit projection: omit the 1536-dim embedding column. rowToChunk's
// default (includeEmbedding=false) discards it anyway, but pulling it
// across the wire is the bulk of egress on hot-loop callers (embed
// --stale, autopilot). Callers that need the vector use
// getChunksWithEmbeddings() instead.
const rows = await sql`
SELECT cc.* FROM content_chunks cc
SELECT cc.id, cc.page_id, cc.chunk_index, cc.chunk_text, cc.chunk_source,
cc.model, cc.token_count, cc.embedded_at
FROM content_chunks cc
JOIN pages p ON p.id = cc.page_id
WHERE p.slug = ${slug}
ORDER BY cc.chunk_index
`;
return rows.map((r) => rowToChunk(r as Record<string, unknown>));
}

async listStalePageSlugs(): Promise<string[]> {
const sql = this.sql;
const rows = await sql`
SELECT DISTINCT p.slug
FROM pages p
JOIN content_chunks cc ON cc.page_id = p.id
WHERE cc.embedding IS NULL
ORDER BY p.slug
`;
return rows.map((r) => (r as { slug: string }).slug);
}

async deleteChunks(slug: string): Promise<void> {
const sql = this.sql;
await sql`
Expand Down
25 changes: 22 additions & 3 deletions test/embed.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ mock.module('../src/core/embedding.ts', () => ({
// Import AFTER mocking.
const { runEmbed } = await import('../src/commands/embed.ts');

// Derive the slugs that have at least one stale (embedded_at: null/falsy)
// chunk from a test's chunksBySlug map. Test fixtures inject this as the
// listStalePageSlugs override so the embedAll early-exit path matches what
// the test data actually contains.
function deriveStaleSlugs(chunksBySlug: Map<string, any[]>): string[] {
const out: string[] = [];
for (const [slug, chunks] of chunksBySlug) {
if (chunks.some(c => !c.embedded_at)) out.push(slug);
}
return out.sort();
}

// Proxy-based mock engine that matches test/import-file.test.ts pattern.
function mockEngine(overrides: Partial<Record<string, any>> = {}): BrainEngine {
const calls: { method: string; args: any[] }[] = [];
Expand Down Expand Up @@ -115,6 +127,7 @@ describe('runEmbed --all (parallel)', () => {
const engine = mockEngine({
listPages: async () => pages,
getChunks: async (slug: string) => chunksBySlug.get(slug) || [],
listStalePageSlugs: async () => deriveStaleSlugs(chunksBySlug),
upsertChunks: async () => {},
});

Expand Down Expand Up @@ -150,6 +163,7 @@ describe('runEmbedCore --dry-run never calls the embedding model', () => {
const engine = mockEngine({
listPages: async () => pages,
getChunks: async (slug: string) => chunksBySlug.get(slug) || [],
listStalePageSlugs: async () => deriveStaleSlugs(chunksBySlug),
upsertChunks: async (slug: string) => { upserts.push(slug); },
});

Expand Down Expand Up @@ -189,6 +203,7 @@ describe('runEmbedCore --dry-run never calls the embedding model', () => {
const engine = mockEngine({
listPages: async () => pages,
getChunks: async (slug: string) => chunksBySlug.get(slug) || [],
listStalePageSlugs: async () => deriveStaleSlugs(chunksBySlug),
upsertChunks: async () => {},
});

Expand All @@ -197,9 +212,12 @@ describe('runEmbedCore --dry-run never calls the embedding model', () => {
expect(totalEmbedCalls).toBe(0);
expect(result.dryRun).toBe(true);
expect(result.would_embed).toBe(3); // 1 from 'partial' + 2 from 'all-stale'
expect(result.skipped).toBe(3); // 2 from 'fresh' + 1 from 'partial'
expect(result.total_chunks).toBe(6);
expect(result.pages_processed).toBe(3);
// With the stale-prefilter optimization, fully-embedded pages are
// skipped entirely (not visited), so their chunks no longer contribute
// to skipped/total_chunks/pages_processed. 'fresh' is excluded.
expect(result.skipped).toBe(1); // 1 already-embedded chunk from 'partial'
expect(result.total_chunks).toBe(4); // 2 from 'partial' + 2 from 'all-stale'
expect(result.pages_processed).toBe(2); // 'partial' + 'all-stale'
});

test('dry-run --slugs on a single page counts stale chunks, no API calls', async () => {
Expand Down Expand Up @@ -240,6 +258,7 @@ describe('runEmbedCore --dry-run never calls the embedding model', () => {
const engine = mockEngine({
listPages: async () => pages,
getChunks: async (slug: string) => chunksBySlug.get(slug) || [],
listStalePageSlugs: async () => deriveStaleSlugs(chunksBySlug),
upsertChunks: async () => {},
});

Expand Down