diff --git a/README.md b/README.md index 0965e2fd..4ab517ea 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,8 @@ Optional: - `PIV_MAX_POLL_CYCLES` (optional; stop polling after this many cycles) - `PIV_EXIT_WHEN_IDLE` (optional; default `1`, set `0` to keep polling when no issues are found) - `PIV_DRY_RUN=1` to avoid Linear/GitHub mutations +- `PIV_STATE_STORE` (optional; `json` default, or `sqlite` for database-backed run-state storage) +- `PIV_SQLITE_PATH` (required when `PIV_STATE_STORE=sqlite`; absolute/relative path to run-state sqlite db file) - `PIV_DEV_MODE=1` to stream Codex stdout/stderr logs during runs - `CODEX_SANDBOX` (optional; leave empty to disable sandbox, or set `read-only`, `workspace-write`, `danger-full-access`) - `CODEX_MODEL_PLAN` (optional; overrides planning model) @@ -117,3 +119,4 @@ bun test - Run with authenticated `gh` (`gh auth status`). - Codex uses the default CLI home unless you explicitly set `CODEX_HOME`. - Linear integration uses the official `@linear/sdk` client. +- Config resolution stays file/env based in `src/config.ts`; only run-state persistence can use SQLite. diff --git a/src/config.ts b/src/config.ts index 0eeb035e..e96d9f88 100644 --- a/src/config.ts +++ b/src/config.ts @@ -91,6 +91,10 @@ function buildEnvBase(cwd: string): ProjectRuntimeConfig { sandbox, codexHome, }, + stateStore: { + type: normalizeStateStoreType(env.PIV_STATE_STORE), + sqlitePath: normalizeOptionalValue(env.PIV_SQLITE_PATH), + }, skills: { plan: path.join(cwd, "skills", "piv-plan", "SKILL.md"), implement: path.join(cwd, "skills", "piv-implement", "SKILL.md"), @@ -217,6 +221,11 @@ function mergeRuntime( ...(rootDefaults.codex ?? {}), ...(project.codex ?? {}), }, + stateStore: { + ...base.stateStore, + ...(rootDefaults.stateStore ?? {}), + ...(project.stateStore ?? {}), + }, skills: { ...base.skills, ...(rootDefaults.skills ?? {}), @@ -292,6 +301,21 @@ function normalizeSandboxValue( ); } +function normalizeStateStoreType( + input: string | undefined, +): ProjectRuntimeConfig["stateStore"]["type"] { + if (!input) { + return "json"; + } + const value = input.trim().toLowerCase(); + if (value === "json" || value === "sqlite") { + return value; + } + throw new Error( + `Invalid PIV_STATE_STORE value '${input}'. Use json or sqlite.`, + ); +} + function validateProject(project: ResolvedProjectConfig): void { if (!project.linear.apiKey) { throw new Error(`LINEAR_API_KEY is required for project '${project.id}'`); @@ -327,4 +351,9 @@ function validateProject(project: ResolvedProjectConfig): void { `Polling max cycles must be a positive integer for project '${project.id}'`, ); } + if (project.stateStore.type === "sqlite" && !project.stateStore.sqlitePath) { + throw new Error( + `PIV_SQLITE_PATH is required when state store is sqlite for project '${project.id}'`, + ); + } } diff --git a/src/index.ts b/src/index.ts index 28285560..2f50f56c 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,7 +2,7 @@ import { parseArgs } from "./args"; import { getProjectById, loadConfig } from "./config"; import { logger, normalizeError, setupProcessErrorHandlers } from "./logger"; -import { loadRunState, normalizeIssueKey } from "./state"; +import { createRunStateStore, normalizeIssueKey } from "./state"; import { runWorkflow } from "./workflow"; async function main(): Promise { @@ -41,7 +41,8 @@ async function main(): Promise { throw new Error(`Project '${command.projectId}' not found`); } const key = normalizeIssueKey(command.issueKey); - const state = await loadRunState(project.workspacePath, project.id, key); + const store = createRunStateStore(project); + const state = await store.load(project.id, key); if (!state) { process.stdout.write( `No run state found for ${key} in project ${project.id}\n`, diff --git a/src/state.ts b/src/state.ts index 39d65ee8..9379a0ec 100644 --- a/src/state.ts +++ b/src/state.ts @@ -1,10 +1,18 @@ +import { Database } from "bun:sqlite"; +import { mkdirSync } from "node:fs"; import { mkdir, readFile, readdir, writeFile } from "node:fs/promises"; import path from "node:path"; -import type { RunState, WorkflowStage } from "./types"; +import type { ResolvedProjectConfig, RunState, WorkflowStage } from "./types"; const LEGACY_STATE_DIR = path.join(".piv-loop", "runs"); const STATE_ROOT_DIR = path.join(".piv-loop", "projects"); +export interface RunStateStore { + load(projectId: string, issueKey: string): Promise; + save(state: RunState): Promise; + list(projectId: string): Promise; +} + export function normalizeIssueKey(input: string): string { const match = input.trim().match(/[A-Z]+-\d+/); if (!match) { @@ -27,62 +35,153 @@ export function stateFilePath( ); } -export async function loadRunState( - cwd: string, - projectId: string, - issueKey: string, -): Promise { - const file = stateFilePath(cwd, projectId, issueKey); - try { - const raw = await readFile(file, "utf8"); - return JSON.parse(raw) as RunState; - } catch { - if (projectId !== "default") { - return null; - } - const legacy = path.join( - cwd, - LEGACY_STATE_DIR, - `${normalizeIssueKey(issueKey)}.json`, - ); +function legacyStateFilePath(cwd: string, issueKey: string): string { + return path.join( + cwd, + LEGACY_STATE_DIR, + `${normalizeIssueKey(issueKey)}.json`, + ); +} + +class JsonRunStateStore implements RunStateStore { + constructor(private readonly cwd: string) {} + + async load(projectId: string, issueKey: string): Promise { + const file = stateFilePath(this.cwd, projectId, issueKey); try { - const raw = await readFile(legacy, "utf8"); + const raw = await readFile(file, "utf8"); return JSON.parse(raw) as RunState; } catch { + if (projectId !== "default") { + return null; + } + try { + const raw = await readFile( + legacyStateFilePath(this.cwd, issueKey), + "utf8", + ); + return JSON.parse(raw) as RunState; + } catch { + return null; + } + } + } + + async save(state: RunState): Promise { + const file = stateFilePath(this.cwd, state.projectId, state.issue.key); + await mkdir(path.dirname(file), { recursive: true }); + state.updatedAt = new Date().toISOString(); + await writeFile(file, `${JSON.stringify(state, null, 2)}\n`, "utf8"); + } + + async list(projectId: string): Promise { + const dir = path.join(this.cwd, STATE_ROOT_DIR, projectId, "runs"); + try { + const files = await readdir(dir); + const runs: RunState[] = []; + for (const file of files) { + if (!file.endsWith(".json")) { + continue; + } + const raw = await readFile(path.join(dir, file), "utf8"); + runs.push(JSON.parse(raw) as RunState); + } + return runs; + } catch { + return []; + } + } +} + +class SqliteRunStateStore implements RunStateStore { + private readonly db: Database; + + constructor(dbFile: string) { + mkdirSync(path.dirname(dbFile), { recursive: true }); + this.db = new Database(dbFile, { create: true }); + this.db.exec( + `CREATE TABLE IF NOT EXISTS run_states ( + project_id TEXT NOT NULL, + issue_key TEXT NOT NULL, + state_json TEXT NOT NULL, + updated_at TEXT NOT NULL, + PRIMARY KEY (project_id, issue_key) + )`, + ); + this.db.exec( + "CREATE INDEX IF NOT EXISTS idx_run_states_project_updated ON run_states(project_id, updated_at)", + ); + } + + async load(projectId: string, issueKey: string): Promise { + const key = normalizeIssueKey(issueKey); + const row = this.db + .query( + "SELECT state_json FROM run_states WHERE project_id = ?1 AND issue_key = ?2", + ) + .get(projectId, key) as { state_json: string } | null; + if (!row?.state_json) { return null; } + return JSON.parse(row.state_json) as RunState; + } + + async save(state: RunState): Promise { + state.updatedAt = new Date().toISOString(); + const key = normalizeIssueKey(state.issue.key); + this.db + .query( + `INSERT INTO run_states (project_id, issue_key, state_json, updated_at) + VALUES (?1, ?2, ?3, ?4) + ON CONFLICT(project_id, issue_key) DO UPDATE SET + state_json = excluded.state_json, + updated_at = excluded.updated_at`, + ) + .run(state.projectId, key, JSON.stringify(state), state.updatedAt); + } + + async list(projectId: string): Promise { + const rows = this.db + .query( + "SELECT state_json FROM run_states WHERE project_id = ?1 ORDER BY updated_at DESC", + ) + .all(projectId) as Array<{ state_json: string }>; + return rows.map((row) => JSON.parse(row.state_json) as RunState); } } +export function createRunStateStore( + config: Pick, +): RunStateStore { + if (config.stateStore.type === "sqlite") { + const dbPath = + config.stateStore.sqlitePath ?? + path.join(config.workspacePath, ".piv-loop", "run-state.sqlite"); + return new SqliteRunStateStore(dbPath); + } + return new JsonRunStateStore(config.workspacePath); +} + +export async function loadRunState( + cwd: string, + projectId: string, + issueKey: string, +): Promise { + return new JsonRunStateStore(cwd).load(projectId, issueKey); +} + export async function saveRunState( cwd: string, state: RunState, ): Promise { - const file = stateFilePath(cwd, state.projectId, state.issue.key); - await mkdir(path.dirname(file), { recursive: true }); - state.updatedAt = new Date().toISOString(); - await writeFile(file, `${JSON.stringify(state, null, 2)}\n`, "utf8"); + await new JsonRunStateStore(cwd).save(state); } export async function listRunStates( cwd: string, projectId: string, ): Promise { - const dir = path.join(cwd, STATE_ROOT_DIR, projectId, "runs"); - try { - const files = await readdir(dir); - const runs: RunState[] = []; - for (const file of files) { - if (!file.endsWith(".json")) { - continue; - } - const raw = await readFile(path.join(dir, file), "utf8"); - runs.push(JSON.parse(raw) as RunState); - } - return runs; - } catch { - return []; - } + return new JsonRunStateStore(cwd).list(projectId); } export function transitionStage( diff --git a/src/types.ts b/src/types.ts index b3a6d84b..093cb62d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -78,6 +78,10 @@ export interface ProjectRuntimeConfig { sandbox?: "read-only" | "workspace-write" | "danger-full-access"; codexHome?: string; }; + stateStore: { + type: "json" | "sqlite"; + sqlitePath?: string; + }; skills: { plan: string; implement: string; diff --git a/src/workflow.ts b/src/workflow.ts index c067cd3d..e81a3d6b 100644 --- a/src/workflow.ts +++ b/src/workflow.ts @@ -13,9 +13,9 @@ import { buildReviewPrompt, } from "./prompts"; import { - loadRunState, + type RunStateStore, + createRunStateStore, normalizeIssueKey, - saveRunState, transitionStage, } from "./state"; import type { @@ -39,6 +39,7 @@ export async function runWorkflow( const projectContexts = projects.map((project) => ({ config: project, linear: new LinearClient(project), + runStateStore: createRunStateStore(project), polling: resolvePollingSettings(project, options), })); const globalPolling = resolveGlobalPollingSettings(projectContexts); @@ -53,6 +54,7 @@ export async function runWorkflow( context.config, options, context.linear, + context.runStateStore, cycle, context.polling, ); @@ -119,6 +121,7 @@ async function runProjectCycle( config: ResolvedProjectConfig, options: RunOptions, linear: LinearClient, + runStateStore: RunStateStore, cycle: number, polling: PollingSettings, ): Promise { @@ -134,7 +137,7 @@ async function runProjectCycle( } for (const issue of issues) { - await processIssue(config, linear, issue); + await processIssue(config, linear, runStateStore, issue); } return issues.length; @@ -143,6 +146,7 @@ async function runProjectCycle( async function processIssue( config: ResolvedProjectConfig, linear: LinearClient, + runStateStore: RunStateStore, issue: { id: string; identifier: string; @@ -156,7 +160,7 @@ async function processIssue( ): Promise { const key = normalizeIssueKey(issue.identifier); const issueLogger = logger.child({ projectId: config.id, issueKey: key }); - const existing = await loadRunState(config.workspacePath, config.id, key); + const existing = await runStateStore.load(config.id, key); const isAssignedState = await linear.isAssignedState(issue.state.id); if (!existing && !isAssignedState) { issueLogger.info( @@ -195,13 +199,13 @@ async function processIssue( ); try { - await executeIssue(config, linear, runState); + await executeIssue(config, linear, runStateStore, runState); issueLogger.info({ stage: runState.stage }, "Issue workflow finished"); } catch (error) { const message = error instanceof Error ? error.message : String(error); runState.lastError = message; runState.stage = "blocked"; - await saveRunState(config.workspacePath, runState); + await runStateStore.save(runState); await safeLinearMoveToCanceled(linear, runState.issue.id); await safeLinearComment( linear, @@ -268,6 +272,7 @@ export function buildIssueJobLogFields( async function executeIssue( config: ResolvedProjectConfig, linear: LinearClient, + runStateStore: RunStateStore, state: RunState, ): Promise { if (state.stage === "done" || state.stage === "blocked") { @@ -278,7 +283,7 @@ async function executeIssue( await linear.markStage(state.issue.id, "planning"); await linear.comment(state.issue.id, "PIV loop started planning."); Object.assign(state, transitionStage(state, "planning")); - await saveRunState(config.workspacePath, state); + await runStateStore.save(state); } if (state.stage === "planning") { @@ -289,7 +294,7 @@ async function executeIssue( state.planSummary = result.finalMessage || result.stdout; appendCodexUsage(state, "planning", result.usage); Object.assign(state, transitionStage(state, "implementing")); - await saveRunState(config.workspacePath, state); + await runStateStore.save(state); await linear.markStage(state.issue.id, "implementing"); await linear.comment( state.issue.id, @@ -330,7 +335,7 @@ async function executeIssue( } Object.assign(state, transitionStage(state, "pr_created")); - await saveRunState(config.workspacePath, state); + await runStateStore.save(state); await linear.markStage(state.issue.id, "pr_created"); await linear.applyStageLabel(state.issue.id, "pr_created"); await linear.comment( @@ -348,7 +353,7 @@ async function executeIssue( if (state.stage === "pr_created") { Object.assign(state, transitionStage(state, "reviewing")); - await saveRunState(config.workspacePath, state); + await runStateStore.save(state); await linear.markStage(state.issue.id, "reviewing"); await linear.applyStageLabel(state.issue.id, "reviewing"); } @@ -358,7 +363,7 @@ async function executeIssue( await linear.markStage(state.issue.id, "testing"); await linear.applyStageLabel(state.issue.id, "testing"); Object.assign(state, transitionStage(state, "testing")); - await saveRunState(config.workspacePath, state); + await runStateStore.save(state); const prompt = await buildReviewPrompt( config.skills.reviewTest, @@ -372,7 +377,7 @@ async function executeIssue( state.reviewSummary = outcome.summary; state.testingSummary = outcome.summary; state.bugs = outcome.bugs; - await saveRunState(config.workspacePath, state); + await runStateStore.save(state); const reviewComment = [ `PIV loop review for ${state.issue.key}`, @@ -405,7 +410,7 @@ async function executeIssue( ); } Object.assign(state, transitionStage(state, "blocked")); - await saveRunState(config.workspacePath, state); + await runStateStore.save(state); await linear.markCanceled(state.issue.id); await linear.comment( state.issue.id, @@ -421,7 +426,7 @@ async function executeIssue( } Object.assign(state, transitionStage(state, "done")); - await saveRunState(config.workspacePath, state); + await runStateStore.save(state); await linear.markStage(state.issue.id, "done"); await linear.comment(state.issue.id, "Review/testing passed. Marked done."); logger.info( diff --git a/tests/codex.test.ts b/tests/codex.test.ts index 1471821e..4e6e96f1 100644 --- a/tests/codex.test.ts +++ b/tests/codex.test.ts @@ -51,6 +51,9 @@ const config: ResolvedProjectConfig = { sandbox: "workspace-write", codexHome: "/tmp/codex", }, + stateStore: { + type: "json", + }, skills: { plan: "p", implement: "i", reviewTest: "r" }, dryRun: false, }; diff --git a/tests/config.test.ts b/tests/config.test.ts index 0c1254ee..35ded792 100644 --- a/tests/config.test.ts +++ b/tests/config.test.ts @@ -18,6 +18,8 @@ const envKeys = [ "CODEX_MODEL_PLAN", "CODEX_MODEL_IMPLEMENT", "CODEX_MODEL_REVIEW_TEST", + "PIV_STATE_STORE", + "PIV_SQLITE_PATH", "PIV_POLL_INTERVAL_MS", "PIV_MAX_POLL_CYCLES", "PIV_EXIT_WHEN_IDLE", @@ -40,7 +42,11 @@ describe("loadConfig", () => { ? "" : key === "PIV_EXIT_WHEN_IDLE" ? "1" - : key.toLowerCase(); + : key === "PIV_STATE_STORE" + ? "json" + : key === "PIV_SQLITE_PATH" + ? "" + : key.toLowerCase(); } }); @@ -105,4 +111,28 @@ describe("loadConfig", () => { expect(config.projects[0]?.codex.models?.implement).toBe("gpt-5.3-codex"); expect(config.projects[0]?.codex.models?.reviewTest).toBe("gpt-5.3-codex"); }); + + it("defaults state store to json", async () => { + process.env.PIV_STATE_STORE = undefined; + const config = await loadConfig(process.cwd()); + expect(config.projects[0]?.stateStore.type).toBe("json"); + expect(config.projects[0]?.stateStore.sqlitePath).toBeUndefined(); + }); + + it("loads sqlite state store config from env", async () => { + process.env.PIV_STATE_STORE = "sqlite"; + process.env.PIV_SQLITE_PATH = "/tmp/piv-state.sqlite"; + const config = await loadConfig(process.cwd()); + expect(config.projects[0]?.stateStore.type).toBe("sqlite"); + expect(config.projects[0]?.stateStore.sqlitePath).toBe( + "/tmp/piv-state.sqlite", + ); + }); + + it("throws on invalid state store type", async () => { + process.env.PIV_STATE_STORE = "redis"; + await expect(loadConfig(process.cwd())).rejects.toThrow( + "Invalid PIV_STATE_STORE value", + ); + }); }); diff --git a/tests/state.test.ts b/tests/state.test.ts index 0478cd2b..424a8d5c 100644 --- a/tests/state.test.ts +++ b/tests/state.test.ts @@ -1,7 +1,43 @@ -import { describe, expect, it } from "bun:test"; -import { normalizeIssueKey, transitionStage } from "../src/state"; +import { afterEach, describe, expect, it } from "bun:test"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; +import os from "node:os"; +import path from "node:path"; +import { + createRunStateStore, + loadRunState, + normalizeIssueKey, + stateFilePath, + transitionStage, +} from "../src/state"; import type { RunState } from "../src/types"; +const tempDirs: string[] = []; + +afterEach(async () => { + for (const dir of tempDirs.splice(0)) { + await rm(dir, { recursive: true, force: true }); + } +}); + +function buildRunState(projectId: string, key: string): RunState { + const now = new Date().toISOString(); + return { + projectId, + projectName: projectId, + workspacePath: "/tmp/work", + repository: { + owner: "o", + name: "n", + baseBranch: "main", + }, + issue: { id: "1", key, title: "t", url: "u" }, + stage: "planning", + bugs: [], + startedAt: now, + updatedAt: now, + }; +} + describe("state helpers", () => { it("normalizes issue key from URL", () => { const key = normalizeIssueKey( @@ -11,23 +47,60 @@ describe("state helpers", () => { }); it("transitions stage", () => { - const now = new Date().toISOString(); - const state: RunState = { - projectId: "default", - projectName: "Default", - workspacePath: "/tmp/work", - repository: { - owner: "o", - name: "n", - baseBranch: "main", - }, - issue: { id: "1", key: "ENG-1", title: "t", url: "u" }, - stage: "planning", - bugs: [], - startedAt: now, - updatedAt: now, - }; + const state = buildRunState("default", "ENG-1"); const next = transitionStage(state, "implementing"); expect(next.stage).toBe("implementing"); }); + + it("json store saves in project-scoped path", async () => { + const cwd = await mkdtemp(path.join(os.tmpdir(), "piv-state-json-")); + tempDirs.push(cwd); + const store = createRunStateStore({ + workspacePath: cwd, + stateStore: { type: "json" }, + }); + const state = buildRunState("proj-a", "ENG-2"); + + await store.save(state); + + const file = stateFilePath(cwd, "proj-a", "ENG-2"); + const loaded = await Bun.file(file).json(); + expect(loaded.issue.key).toBe("ENG-2"); + }); + + it("json store keeps legacy fallback for default project", async () => { + const cwd = await mkdtemp(path.join(os.tmpdir(), "piv-state-legacy-")); + tempDirs.push(cwd); + await mkdir(path.join(cwd, ".piv-loop", "runs"), { recursive: true }); + const legacyState = buildRunState("default", "ENG-9"); + await writeFile( + path.join(cwd, ".piv-loop", "runs", "ENG-9.json"), + JSON.stringify(legacyState), + "utf8", + ); + + const loaded = await loadRunState(cwd, "default", "ENG-9"); + expect(loaded?.issue.key).toBe("ENG-9"); + }); + + it("sqlite store saves, loads, and lists by project", async () => { + const cwd = await mkdtemp(path.join(os.tmpdir(), "piv-state-sqlite-")); + tempDirs.push(cwd); + const dbPath = path.join(cwd, ".piv-loop", "run-state.sqlite"); + const store = createRunStateStore({ + workspacePath: cwd, + stateStore: { type: "sqlite", sqlitePath: dbPath }, + }); + + await store.save(buildRunState("proj-a", "ENG-3")); + await store.save(buildRunState("proj-a", "ENG-4")); + await store.save(buildRunState("proj-b", "ENG-5")); + + const one = await store.load("proj-a", "ENG-3"); + expect(one?.issue.key).toBe("ENG-3"); + + const listA = await store.list("proj-a"); + const keys = listA.map((state) => state.issue.key).sort(); + expect(keys).toEqual(["ENG-3", "ENG-4"]); + }); }); diff --git a/tests/workflow.test.ts b/tests/workflow.test.ts index dc5f9ef2..7943aa6c 100644 --- a/tests/workflow.test.ts +++ b/tests/workflow.test.ts @@ -124,6 +124,9 @@ describe("resolvePollingSettings", () => { codex: { binary: "codex", }, + stateStore: { + type: "json", + }, skills: { plan: "plan.md", implement: "implement.md",