diff --git a/docs/contracts.md b/docs/contracts.md index bad8a81..7697e92 100644 --- a/docs/contracts.md +++ b/docs/contracts.md @@ -101,7 +101,7 @@ A single shared bearer token, used by every Murmur boundary. Required on every request as `Authorization: Bearer `: - Agents → Murmur (MCP transport + publisher API). -- Publishers → Murmur (`POST /pipelines`, `POST /pipelines/{id}/runs`, `GET /runs/{id}`). +- Publishers → Murmur (`POST /pipelines`, `POST /pipelines/{id}/runs`, `GET /runs/{id}`, `GET /runs`). - Murmur → publisher subcommand endpoints (proxied `task_tool` calls). - Murmur → publisher webhook URL. @@ -440,6 +440,56 @@ Resulting `final_output`: --- +## 8. `GET /runs` — list runs + +Companion to `GET /runs/{run_id}` (DESIGN.md §3.2). Lets an agent +discover the right `run_id` for a user's natural-language request +(e.g. *"add Stripe to jobseek"*) without the user pasting a run id. + +### 8.1 Request + +``` +GET /runs?status=running&pipeline_id=jobseek-add-company&initial_input.company_name=Stripe&limit=10 HTTP/1.1 +Authorization: Bearer +``` + +All query params are optional. Multiple filters AND-combine. + +| Param | Meaning | +|---|---| +| `status` | Exact match against `runs.status`. | +| `pipeline_id` | Exact match against `runs.pipeline_id`. | +| `initial_input.` | Equality against `JSON_EXTRACT(initial_input_json, '$.')`. `` MUST match `^[A-Za-z0-9_]+$`. Repeating with different `` segments AND-combines. | +| `limit` | Integer ≥ 1. Server clamps at 100; default 25. | +| `offset` | Integer ≥ 0; default 0. | + +### 8.2 Response + +`200 { ok: true, data: { runs: RunListItem[] } }`. Always 200 on a +well-formed query — no matches yields `{"runs": []}` (NEVER 404). + +```ts +interface RunListItem { + run_id: string; + pipeline_id: string; + status: string; // e.g. "running" | "completed" + initial_input: unknown; // rehydrated from initial_input_json + created_at: string; // RFC 3339 UTC + webhook_status: string | null; // see §6 + GET /runs/{id} +} +``` + +Order: `created_at DESC, id ASC` (newest first; tie-break by id). + +### 8.3 Errors + +- 400 `{ ok: false, errors: ["limit_invalid"] }` — non-integer or `< 1` limit. +- 400 `{ ok: false, errors: ["offset_invalid"] }` — non-integer or `< 0` offset. +- 400 `{ ok: false, errors: ["initial_input_field_invalid:"] }` — `` outside `^[A-Za-z0-9_]+$`. +- 401 — missing/bad bearer (handled by the shared auth middleware). + +--- + ## Appendix A — Single-envelope grep gate `scripts/grep-no-accepted-key.sh` searches under `src/` and `test/` for diff --git a/src/api/publisher/runs.test.ts b/src/api/publisher/runs.test.ts new file mode 100644 index 0000000..a023cbf --- /dev/null +++ b/src/api/publisher/runs.test.ts @@ -0,0 +1,366 @@ +/** + * `GET /runs` (list) tests — colophon-group/murmur#76. + * + * Drives the route end-to-end through `app.request(...)` against a + * fresh in-memory SQLite (migrations applied per test). The fixture + * pipeline is the same `jobseek-add-company` shape used by the other + * publisher tests; runs are created via `POST /pipelines/{id}/runs` so + * the rows exercise the real ingest path (status='running', + * initial_input_json populated, created_at filled). + */ + +import Database from "better-sqlite3"; +import { beforeEach, describe, expect, it } from "vitest"; + +import type { EnvelopeResponse } from "@murmur/contracts-types"; + +import { runMigrations } from "../../db/migrate.js"; +import { createServer } from "../../server.js"; +import type { RunListItem, RunListView } from "./runs.js"; + +const TEST_TOKEN = "test-murmur-token-secret"; +const TEST_TOKEN_BUF = Buffer.from(TEST_TOKEN, "utf8"); +const AUTH_HEADERS = { Authorization: `Bearer ${TEST_TOKEN}` }; + +const VALID_PIPELINE_YAML = ` +id: jobseek-add-company +initial_input: + type: object + required: [company_name, website] + properties: + company_name: { type: string, minLength: 1 } + website: { type: string, format: uri } +subtasks: + - id: pre-verify + instructions: | + Confirm the company exists. + output_schema: + type: object + required: [verified] + properties: + verified: { type: boolean } +final_output: + composes: + - "pre-verify.*" + webhook: https://example.com/webhook +`; + +const SECOND_PIPELINE_YAML = ` +id: other-pipeline +initial_input: + type: object + required: [target] + properties: + target: { type: string, minLength: 1 } +subtasks: + - id: only + instructions: do + output_schema: + type: object + required: [done] + properties: + done: { type: boolean } +final_output: + composes: + - "only.*" + webhook: https://example.com/webhook2 +`; + +function freshServer(): { + db: Database.Database; + app: ReturnType; +} { + const db = new Database(":memory:"); + db.pragma("foreign_keys = ON"); + runMigrations(db); + const app = createServer({ token: TEST_TOKEN_BUF, db }); + return { db, app }; +} + +async function registerPipeline( + app: ReturnType, + yaml: string, + id: string, +): Promise { + const r = await app.request("/pipelines", { + method: "POST", + headers: { ...AUTH_HEADERS, "Content-Type": "application/json" }, + body: JSON.stringify({ id, def_yaml: yaml }), + }); + expect(r.status).toBe(200); +} + +async function createRun( + app: ReturnType, + pipelineId: string, + initialInput: Record, +): Promise { + const r = await app.request(`/pipelines/${pipelineId}/runs`, { + method: "POST", + headers: { ...AUTH_HEADERS, "Content-Type": "application/json" }, + body: JSON.stringify({ initial_input: initialInput }), + }); + expect(r.status).toBe(200); + const body = (await r.json()) as EnvelopeResponse<{ run_id: string }>; + if (body.ok !== true || body.data === undefined) { + throw new Error("create-run failed"); + } + return body.data.run_id; +} + +/** + * Direct SQL hatch: flip a run's status so we can exercise the + * `status` filter without having to drive a full pipeline through + * `submit_result` (which the M-tasks owning that loop haven't all + * landed yet at the time this issue was scoped). + */ +function setRunStatus( + db: Database.Database, + runId: string, + status: string, +): void { + db.prepare("UPDATE runs SET status = ? WHERE id = ?").run(status, runId); +} + +/** + * Backdate `created_at` so DESC ordering is deterministic in the test. + * Without this two runs minted in the same ms can flip order based on + * insertion timing. + */ +function setRunCreatedAt( + db: Database.Database, + runId: string, + createdAt: string, +): void { + db.prepare("UPDATE runs SET created_at = ? WHERE id = ?").run( + createdAt, + runId, + ); +} + +async function listRuns( + app: ReturnType, + query: string, + authed = true, +): Promise<{ status: number; body: EnvelopeResponse }> { + const url = `/runs${query.length > 0 ? `?${query}` : ""}`; + const headers = authed ? AUTH_HEADERS : {}; + const response = await app.request(url, { headers }); + return { + status: response.status, + body: (await response.json()) as EnvelopeResponse, + }; +} + +describe("GET /runs", () => { + let server: ReturnType; + + beforeEach(async () => { + server = freshServer(); + await registerPipeline( + server.app, + VALID_PIPELINE_YAML, + "jobseek-add-company", + ); + await registerPipeline(server.app, SECOND_PIPELINE_YAML, "other-pipeline"); + }); + + it("returns 401 without an Authorization header", async () => { + const { status } = await listRuns(server.app, "", false); + expect(status).toBe(401); + }); + + it("returns 200 with empty runs[] when no rows match (NOT 404)", async () => { + const { status, body } = await listRuns( + server.app, + "status=running&pipeline_id=does-not-exist", + ); + expect(status).toBe(200); + expect(body.ok).toBe(true); + if (body.ok !== true || body.data === undefined) throw new Error("ok"); + expect(body.data.runs).toEqual([]); + }); + + it("?status=running returns only running runs ordered by created_at DESC", async () => { + const r1 = await createRun(server.app, "jobseek-add-company", { + company_name: "Alpha", + website: "https://alpha.example", + }); + const r2 = await createRun(server.app, "jobseek-add-company", { + company_name: "Bravo", + website: "https://bravo.example", + }); + const r3 = await createRun(server.app, "jobseek-add-company", { + company_name: "Charlie", + website: "https://charlie.example", + }); + + // Backdate created_at: r1 oldest, r3 newest. + setRunCreatedAt(server.db, r1, "2026-01-01T00:00:00.000Z"); + setRunCreatedAt(server.db, r2, "2026-01-02T00:00:00.000Z"); + setRunCreatedAt(server.db, r3, "2026-01-03T00:00:00.000Z"); + + // Mark r2 as completed; r1 + r3 stay running. + setRunStatus(server.db, r2, "completed"); + + const { status, body } = await listRuns(server.app, "status=running"); + expect(status).toBe(200); + if (body.ok !== true || body.data === undefined) throw new Error("ok"); + const ids = body.data.runs.map((r) => r.run_id); + expect(ids).toEqual([r3, r1]); + for (const r of body.data.runs) expect(r.status).toBe("running"); + }); + + it("?pipeline_id=...&status=running AND-combines filters", async () => { + const a1 = await createRun(server.app, "jobseek-add-company", { + company_name: "Alpha", + website: "https://alpha.example", + }); + const a2 = await createRun(server.app, "jobseek-add-company", { + company_name: "Bravo", + website: "https://bravo.example", + }); + const b1 = await createRun(server.app, "other-pipeline", { + target: "x", + }); + const b2 = await createRun(server.app, "other-pipeline", { + target: "y", + }); + + // Mark one of each pipeline as completed; one of each stays running. + setRunStatus(server.db, a2, "completed"); + setRunStatus(server.db, b2, "completed"); + + const { status, body } = await listRuns( + server.app, + "status=running&pipeline_id=jobseek-add-company", + ); + expect(status).toBe(200); + if (body.ok !== true || body.data === undefined) throw new Error("ok"); + const ids = body.data.runs.map((r) => r.run_id).sort(); + expect(ids).toEqual([a1].sort()); + + // Also confirm the other-pipeline filter works on its own. + const second = await listRuns( + server.app, + "status=running&pipeline_id=other-pipeline", + ); + if (second.body.ok !== true || second.body.data === undefined) + throw new Error("ok"); + expect(second.body.data.runs.map((r) => r.run_id)).toEqual([b1]); + }); + + it("?initial_input.= matches runs whose initial_input has that field equal to that value", async () => { + const stripeRun = await createRun(server.app, "jobseek-add-company", { + company_name: "Stripe", + website: "https://stripe.com", + }); + await createRun(server.app, "jobseek-add-company", { + company_name: "Plaid", + website: "https://plaid.com", + }); + + const { status, body } = await listRuns( + server.app, + "initial_input.company_name=Stripe", + ); + expect(status).toBe(200); + if (body.ok !== true || body.data === undefined) throw new Error("ok"); + expect(body.data.runs.map((r) => r.run_id)).toEqual([stripeRun]); + const item = body.data.runs[0] as RunListItem; + // initial_input is rehydrated as the original JSON object. + expect(item.initial_input).toEqual({ + company_name: "Stripe", + website: "https://stripe.com", + }); + }); + + it("?limit=5&offset=10 paginates", async () => { + const ids: string[] = []; + for (let i = 0; i < 20; i++) { + const id = await createRun(server.app, "jobseek-add-company", { + company_name: `Co-${i}`, + website: `https://co-${i}.example`, + }); + // Stagger created_at so DESC order is deterministic and matches + // creation order: i=0 oldest, i=19 newest. + setRunCreatedAt( + server.db, + id, + `2026-01-${(i + 1).toString().padStart(2, "0")}T00:00:00.000Z`, + ); + ids.push(id); + } + + const { status, body } = await listRuns(server.app, "limit=5&offset=10"); + expect(status).toBe(200); + if (body.ok !== true || body.data === undefined) throw new Error("ok"); + expect(body.data.runs).toHaveLength(5); + // DESC order: id at index 19 is row 0; offset 10 skips ids[19..10], + // returning ids[9..5] inclusive (5 rows). + const expected = [ids[9], ids[8], ids[7], ids[6], ids[5]]; + expect(body.data.runs.map((r) => r.run_id)).toEqual(expected); + }); + + it("clamps limit at 100 server-side regardless of caller value", async () => { + // Seed 105 runs so we can prove we get back at most 100. + for (let i = 0; i < 105; i++) { + await createRun(server.app, "jobseek-add-company", { + company_name: `Co-${i}`, + website: `https://co-${i}.example`, + }); + } + const { status, body } = await listRuns(server.app, "limit=10000"); + expect(status).toBe(200); + if (body.ok !== true || body.data === undefined) throw new Error("ok"); + expect(body.data.runs.length).toBe(100); + }); + + it("rejects non-integer limit with 400", async () => { + const { status, body } = await listRuns(server.app, "limit=abc"); + expect(status).toBe(400); + expect(body.ok).toBe(false); + }); + + it("rejects negative offset with 400", async () => { + const { status, body } = await listRuns(server.app, "offset=-1"); + expect(status).toBe(400); + expect(body.ok).toBe(false); + }); + + it("rejects an initial_input. name that has special characters", async () => { + // `initial_input.bad-field` would interpolate `-` into the + // JSON_EXTRACT path; we whitelist `[A-Za-z0-9_]+` to keep that + // surface clean. + const { status, body } = await listRuns( + server.app, + "initial_input.bad-field=x", + ); + expect(status).toBe(400); + expect(body.ok).toBe(false); + }); + + it("returns the documented row shape with all required fields", async () => { + const runId = await createRun(server.app, "jobseek-add-company", { + company_name: "Stripe", + website: "https://stripe.com", + }); + const { status, body } = await listRuns( + server.app, + "pipeline_id=jobseek-add-company", + ); + expect(status).toBe(200); + if (body.ok !== true || body.data === undefined) throw new Error("ok"); + const item = body.data.runs.find((r) => r.run_id === runId); + expect(item).toBeDefined(); + if (item === undefined) return; + expect(item.pipeline_id).toBe("jobseek-add-company"); + expect(item.status).toBe("running"); + expect(item.webhook_status).toBeNull(); + expect(typeof item.created_at).toBe("string"); + expect(item.initial_input).toEqual({ + company_name: "Stripe", + website: "https://stripe.com", + }); + }); +}); diff --git a/src/api/publisher/runs.ts b/src/api/publisher/runs.ts index 819798f..005b05b 100644 --- a/src/api/publisher/runs.ts +++ b/src/api/publisher/runs.ts @@ -1,13 +1,19 @@ /** - * `GET /runs/{run_id}` route handler. + * `GET /runs/{run_id}` and `GET /runs` route handlers. * - * Returns the publisher-facing run status: the run's current status, - * its final output (when completed), and the per-subtask audit trail. - * Each `agent_actions` row's `args_json`/`response_json` payloads are - * truncated per `./truncate.ts` so the response stays scannable for a - * long-running pipeline. + * `GET /runs/{run_id}` returns the publisher-facing run status: the + * run's current status, its final output (when completed), and the + * per-subtask audit trail. Each `agent_actions` row's + * `args_json`/`response_json` payloads are truncated per `./truncate.ts` + * so the response stays scannable for a long-running pipeline. + * + * `GET /runs` lists runs filtered by `status`, `pipeline_id`, and + * arbitrary `initial_input.` equality predicates against the + * `initial_input_json` blob, paginated by `limit`/`offset`. See the + * companion section in `docs/contracts.md`. * * @see DESIGN.md §3.2 — GET /runs/{run_id} + * @see colophon-group/murmur#76 — GET /runs (list) */ import type Database from "better-sqlite3"; @@ -83,6 +89,49 @@ interface AgentActionRow { readonly truncated: number; } +/** + * One row returned by `GET /runs`. A trimmed projection of `runs` — + * just enough for an agent to disambiguate a natural-language request + * (e.g. *"add Stripe to jobseek"*) into a concrete `run_id`. + */ +export interface RunListItem { + readonly run_id: string; + readonly pipeline_id: string; + readonly status: string; + readonly initial_input: unknown; + readonly created_at: string; + readonly webhook_status: string | null; +} + +/** + * Body shape returned by `GET /runs` on success. Empty `runs` array + * is the documented "no matches" outcome — never 404. + */ +export interface RunListView { + readonly runs: ReadonlyArray; +} + +/** + * Hard server-side cap on `?limit=`. Callers asking for more silently + * receive at most this many rows. Sized for the demo: a single user's + * pending-run carousel never exceeds a handful. + */ +export const RUN_LIST_MAX_LIMIT = 100; + +/** + * Default `?limit=` when the caller omits the param. + */ +export const RUN_LIST_DEFAULT_LIMIT = 25; + +/** + * Whitelist regex for the `` segment of an + * `initial_input.=` query param. The field name is + * interpolated (with the `$.` JSON-Pointer prefix) into the SQL via + * `JSON_EXTRACT`, so anything outside this charset would open the + * door to SQL injection. The value side stays bound. + */ +export const RUN_LIST_INITIAL_INPUT_FIELD_RE = /^[A-Za-z0-9_]+$/; + /** * Mount the `GET /runs/{run_id}` route onto the given Hono sub-app. * @@ -114,6 +163,8 @@ export function mountRunRoutes(app: Hono, db: Database.Database): void { ORDER BY a.ts ASC, a.id ASC`, ); + mountRunListRoute(app, db); + app.get("/runs/:run_id", (c) => { const runId = c.req.param("run_id"); if (runId === undefined || runId === "") { @@ -173,3 +224,144 @@ export function mountRunRoutes(app: Hono, db: Database.Database): void { return c.json(ok, 200); }); } + +/** + * Mount the `GET /runs` (list) route onto the given Hono sub-app. + * + * Query params (all optional): + * - `status` — exact match against `runs.status`. + * - `pipeline_id` — exact match. + * - `initial_input.` — equality against + * `JSON_EXTRACT(initial_input_json, '$.')`. `` MUST + * match {@link RUN_LIST_INITIAL_INPUT_FIELD_RE}; otherwise 400. + * Multiple `initial_input.*` params are AND-combined. + * - `limit` — integer in `[1, RUN_LIST_MAX_LIMIT]`. Values above the + * cap are clamped silently. Default {@link RUN_LIST_DEFAULT_LIMIT}. + * - `offset` — non-negative integer. Default 0. + * + * Response: + * - 200 `{ ok: true, data: { runs: RunListItem[] } }`. `runs` may be + * empty (NEVER 404 on an empty result set). + * - 400 `{ ok: false, errors: [...] }` on malformed params. + * + * Auth is inherited from the bearer-auth middleware mounted at the + * server root by `createServer`; this handler does not re-check. + * + * @param app the publisher sub-app. + * @param db the open SQLite handle. + */ +export function mountRunListRoute(app: Hono, db: Database.Database): void { + app.get("/runs", (c) => { + // Hono returns query params as Record for first-only + // wins; that's fine for our scalar params. We re-read the URL when we + // need to walk the full set of `initial_input.*` keys. + const url = new URL(c.req.url); + const params = url.searchParams; + + // --- Numeric params ----------------------------------------------- + const limitRaw = params.get("limit"); + let limit = RUN_LIST_DEFAULT_LIMIT; + if (limitRaw !== null) { + const parsed = parseStrictInt(limitRaw); + if (parsed === null || parsed < 1) { + const err: Err = { ok: false, errors: ["limit_invalid"] }; + return c.json(err, 400); + } + // Silent clamp at the server-side cap (per issue scope). + limit = Math.min(parsed, RUN_LIST_MAX_LIMIT); + } + + const offsetRaw = params.get("offset"); + let offset = 0; + if (offsetRaw !== null) { + const parsed = parseStrictInt(offsetRaw); + if (parsed === null || parsed < 0) { + const err: Err = { ok: false, errors: ["offset_invalid"] }; + return c.json(err, 400); + } + offset = parsed; + } + + // --- Filter params ------------------------------------------------ + const wherePieces: string[] = []; + const bindings: Array = []; + + const status = params.get("status"); + if (status !== null && status.length > 0) { + wherePieces.push("status = ?"); + bindings.push(status); + } + + const pipelineId = params.get("pipeline_id"); + if (pipelineId !== null && pipelineId.length > 0) { + wherePieces.push("pipeline_id = ?"); + bindings.push(pipelineId); + } + + // initial_input.= — collected by walking every query + // key. Multiple entries AND-combine. + for (const [key, value] of params.entries()) { + if (!key.startsWith("initial_input.")) continue; + const field = key.slice("initial_input.".length); + if (!RUN_LIST_INITIAL_INPUT_FIELD_RE.test(field)) { + const err: Err = { + ok: false, + errors: [`initial_input_field_invalid:${field}`], + }; + return c.json(err, 400); + } + // The field name is interpolated into the SQL string; the value + // stays bound. The regex above is the SQL-injection guard. + wherePieces.push( + `JSON_EXTRACT(initial_input_json, '$.${field}') = ?`, + ); + bindings.push(value); + } + + // --- Build + run query -------------------------------------------- + const whereSql = + wherePieces.length > 0 ? ` WHERE ${wherePieces.join(" AND ")}` : ""; + const sql = + `SELECT id, pipeline_id, status, initial_input_json, created_at,` + + ` webhook_status FROM runs${whereSql} ORDER BY created_at DESC,` + + ` id ASC LIMIT ? OFFSET ?`; + bindings.push(limit, offset); + + interface Row { + readonly id: string; + readonly pipeline_id: string; + readonly status: string; + readonly initial_input_json: string; + readonly created_at: string; + readonly webhook_status: string | null; + } + // `prepare` is not cached because the SQL shape varies with the + // filter set; the prepare cost is negligible at demo scale and the + // alternative (cache by SQL string) adds complexity without benefit. + const rows = db.prepare(sql).all(...bindings) as ReadonlyArray; + + const runs: RunListItem[] = rows.map((row) => ({ + run_id: row.id, + pipeline_id: row.pipeline_id, + status: row.status, + initial_input: JSON.parse(row.initial_input_json) as unknown, + created_at: row.created_at, + webhook_status: row.webhook_status, + })); + + const ok: Ok = { ok: true, data: { runs } }; + return c.json(ok, 200); + }); +} + +/** + * Strict integer parser: rejects floats, scientific notation, leading + * `+`, whitespace, and any string that doesn't round-trip through + * `String(parseInt(...))`. Returns `null` on bad input. + */ +function parseStrictInt(raw: string): number | null { + if (!/^-?\d+$/.test(raw)) return null; + const n = Number(raw); + if (!Number.isFinite(n) || !Number.isInteger(n)) return null; + return n; +}