diff --git a/DESIGN.md b/DESIGN.md index 5d6d38f..5b984d4 100644 --- a/DESIGN.md +++ b/DESIGN.md @@ -276,7 +276,7 @@ Subtasks are pure decisions: `instructions + input → output`. No side effects ### 3.3 Server endpoints (agent-facing, behind MCP) -- `GET /work/next` — atomically claim the oldest unclaimed subtask instance across all pipelines, or 204. Implementation: single statement `UPDATE subtask_instances SET claim_token=?, expires_at=? WHERE id=(SELECT id FROM subtask_instances WHERE claim_token IS NULL AND status='ready' ORDER BY created_at LIMIT 1) RETURNING …` inside `BEGIN IMMEDIATE` on a WAL-mode SQLite. No SELECT-then-UPDATE race. +- `GET /work/next` — atomically claim the oldest unclaimed subtask instance across all pipelines, or 204. Implementation: single statement `UPDATE subtask_instances SET claim_token=?, expires_at=? WHERE id=(SELECT id FROM subtask_instances WHERE claim_token IS NULL AND status='ready' ORDER BY created_at LIMIT 1) RETURNING …` inside `BEGIN IMMEDIATE` on a WAL-mode SQLite. No SELECT-then-UPDATE race. Accepts an optional `?run_id=…` query param; when supplied, the inner SELECT adds `AND run_id = ?` so the agent only picks up subtasks of that run (used by Claude-Code-driven flows that drive a single run end-to-end without consuming unrelated stale work — issue #75). Without the param, the legacy global FIFO applies. - `POST /work/{claim_token}/result` — submit a structured result. Validates against the subtask's `output_schema` and the claim is consumed atomically: `UPDATE subtask_instances SET result=?, status='done' WHERE claim_token=? AND status='claimed' AND expires_at>now() RETURNING …`. If the row no longer matches (TTL expired, already submitted), the call returns `{accepted: false, reason: 'claim_lost'}` and the agent's submission is discarded. **Claim semantics (MVP):** @@ -297,7 +297,7 @@ No retries on schema-validation failure for MVP — the run fails. Three static tools, fixed for the lifetime of the connection: -- `pull_task()` → `{ instructions, input, output_schema, claim }` or `null`. +- `pull_task({ run_id?: string })` → `{ instructions, input, output_schema, claim }` or `null`. `run_id` is optional; when supplied, the claim is restricted to ready subtasks of that run (issue #75 — drives a single run end-to-end without picking up unrelated queued work). Forwards to `GET /work/next?run_id=…`. - `submit_result(claim, result, notes?)` → `{ accepted: true } | { accepted: false, errors: [...] }`. `notes` is an optional free-text reflection persisted in the audit log alongside the structured `result`. - `task_tool(subcommand: string, claim: string, args?: object)` → `string | object` — universal dispatcher. Invokes a publisher-declared subcommand for a claim (see §3.1). `claim` is required (no session-based fallback in MVP). Static description (visible to the host's tool catalog): diff --git a/src/api/agent/agent.test.ts b/src/api/agent/agent.test.ts index b328980..8520740 100644 --- a/src/api/agent/agent.test.ts +++ b/src/api/agent/agent.test.ts @@ -759,6 +759,169 @@ describe("default seams", () => { }); }); +/* ---------- run_id-scoped claim (issue #75) ---------- */ + +describe("GET /work/next?run_id=... — scope claim to a specific run", () => { + /** + * Helper: seed a second pipeline + run (the default `TEST_PIPELINE_DEF` + * only declares `first` / `second`; the run-scope tests need a separate + * subtask id so we can probe pipeline-def lookup with a parallel run). + * Returns nothing; the caller observes via `/work/next`. + */ + function seedSecondPipelineAndRun( + db: Database.Database, + runId: string, + subtaskId: string, + createdAt: string, + ): void { + const pipelineId = `${runId}-pipe`; + const def = { + id: pipelineId, + subtasks: [ + { + id: subtaskId, + instructions: `do ${subtaskId}`, + output_schema: { + type: "object", + properties: { ok: { type: "boolean" } }, + required: ["ok"], + additionalProperties: false, + }, + }, + ], + }; + db.prepare( + `INSERT INTO pipelines (id, version, def_json, created_at, updated_at) + VALUES (?, 1, ?, ?, ?)`, + ).run(pipelineId, JSON.stringify(def), createdAt, createdAt); + db.prepare( + `INSERT INTO runs + (id, pipeline_id, pipeline_version, status, initial_input_json, + webhook_url, created_at) + VALUES (?, ?, 1, 'running', '{}', 'https://example.test/webhook', ?)`, + ).run(runId, pipelineId, createdAt); + db.prepare( + `INSERT INTO subtask_instances + (id, run_id, subtask_id, status, input_json, created_at, updated_at) + VALUES (?, ?, ?, 'ready', '{}', ?, ?)`, + ).run(`${runId}-${subtaskId}`, runId, subtaskId, createdAt, createdAt); + } + + it("with no `run_id` claims oldest ready row globally (legacy behaviour)", async () => { + const h = makeHarness(); + try { + // Two runs: run-OLD seeded first (older created_at), run-NEW seeded + // second (newer). Without run_id, the legacy global FIFO must pick + // run-OLD's `first` ahead of run-NEW. + const oldNow = "2026-04-29T12:00:00.000Z"; + const newNow = "2026-04-29T12:00:01.000Z"; + seedRun(h.db, "run-OLD", ["first", "second"], oldNow); + seedRun(h.db, "run-NEW", ["first", "second"], newNow); + + const { status, body } = await getJson(h.app, "/next"); + expect(status).toBe(200); + if (!body.ok || !body.data) throw new Error("expected claim"); + // The claim's projected payload doesn't include run_id, but we can + // verify by reading back from the DB which instance was claimed. + const claimed = h.db + .prepare( + `SELECT run_id FROM subtask_instances WHERE claim_token = ?`, + ) + .get(body.data.claim) as { run_id: string }; + expect(claimed.run_id).toBe("run-OLD"); + } finally { + h.db.close(); + } + }); + + it("with `run_id=r_X` claims oldest ready row whose run_id == r_X, ignoring older rows from other runs", async () => { + const h = makeHarness(); + try { + // Two runs: run-OLD is older + globally first in FIFO; run-NEW is + // newer. Asking for run-NEW's id MUST skip run-OLD even though it's + // older. + const oldNow = "2026-04-29T12:00:00.000Z"; + const newNow = "2026-04-29T12:00:01.000Z"; + seedRun(h.db, "run-OLD", ["first", "second"], oldNow); + seedRun(h.db, "run-NEW", ["first", "second"], newNow); + + const { status, body } = await getJson( + h.app, + "/next?run_id=run-NEW", + ); + expect(status).toBe(200); + if (!body.ok || !body.data) throw new Error("expected claim"); + const claimed = h.db + .prepare( + `SELECT run_id, subtask_id FROM subtask_instances WHERE claim_token = ?`, + ) + .get(body.data.claim) as { run_id: string; subtask_id: string }; + expect(claimed.run_id).toBe("run-NEW"); + expect(claimed.subtask_id).toBe("first"); + + // run-OLD's `first` must remain `ready` (not claimed by this call). + const oldStatus = h.db + .prepare( + `SELECT status FROM subtask_instances WHERE id = 'run-OLD-first'`, + ) + .get() as { status: string }; + expect(oldStatus.status).toBe("ready"); + } finally { + h.db.close(); + } + }); + + it("with `run_id=r_unknown` returns { ok: true, data: null }", async () => { + const h = makeHarness(); + try { + // Even with a populated global queue, an unknown run_id matches no + // rows — must return null, not the default-FIFO row. + seedRun(h.db, "run-OLD", ["first"], h.nowFn()); + + const { status, body } = await getJson( + h.app, + "/next?run_id=r_unknown", + ); + expect(status).toBe(200); + expect(body).toEqual({ ok: true, data: null }); + + // run-OLD's row stays ready (not consumed). + const row = h.db + .prepare( + `SELECT status FROM subtask_instances WHERE id = 'run-OLD-first'`, + ) + .get() as { status: string }; + expect(row.status).toBe("ready"); + } finally { + h.db.close(); + } + }); + + it("works across distinct pipelines — run_id filter is independent of pipeline_id", async () => { + // Sanity-check: the inner SELECT filters on run_id only, so two runs + // belonging to different pipelines are still distinguishable. This + // mirrors the demo scenario where a stale rehearsal run from + // pipeline-A and a fresh user-driven run on pipeline-B share the + // queue. + const h = makeHarness(); + try { + const oldNow = "2026-04-29T12:00:00.000Z"; + const newNow = "2026-04-29T12:00:01.000Z"; + seedRun(h.db, "run-OLD", ["first"], oldNow); + seedSecondPipelineAndRun(h.db, "run-NEW", "alpha", newNow); + + const { body } = await getJson( + h.app, + "/next?run_id=run-NEW", + ); + if (!body.ok || !body.data) throw new Error("expected claim"); + expect(body.data.instructions).toBe("do alpha"); + } finally { + h.db.close(); + } + }); +}); + /* ---------- Envelope grep gate ---------- */ describe("envelope grep gate", () => { diff --git a/src/api/agent/sql.ts b/src/api/agent/sql.ts index ab64b95..e59586e 100644 --- a/src/api/agent/sql.ts +++ b/src/api/agent/sql.ts @@ -74,6 +74,39 @@ UPDATE subtask_instances RETURNING id, run_id, subtask_id, input_json, claim_token, expires_at `; +/** + * Run-scoped variant of {@link CLAIM_SQL}. Same shape; the inner SELECT + * adds `AND run_id = ?` so an agent can drive a specific run end-to-end + * without picking up unrelated stale claims when the global queue has + * other runs interleaved (common during demo rehearsals or when several + * publishers share the murmur instance — issue #75). + * + * Bound parameters: + * 1. claim_token (TEXT, fresh) + * 2. expires_at (TEXT, RFC 3339 UTC; now + ttlMs) + * 3. updated_at (TEXT, RFC 3339 UTC; now) + * 4. run_id (TEXT, the run to scope the pickup to) + * + * The `(status='ready', created_at)` part of the inner SELECT predicate + * matches `idx_subtask_instances_ready`; SQLite's planner tolerates the + * extra `AND run_id = ?` filter without dropping the index — the + * predicate is evaluated row-by-row off the leaf, which is fine because + * the leaf is already narrowed to `status='ready' AND claim_token IS NULL`. + */ +export const CLAIM_BY_RUN_SQL = ` +UPDATE subtask_instances + SET claim_token = ?, + expires_at = ?, + status = 'claimed', + updated_at = ? + WHERE id = ( + SELECT id FROM subtask_instances + WHERE claim_token IS NULL AND status = 'ready' AND run_id = ? + ORDER BY created_at LIMIT 1 + ) +RETURNING id, run_id, subtask_id, input_json, claim_token, expires_at +`; + /** * Atomic CAS submit. Bound parameters: * 1. updated_at (TEXT, RFC 3339 UTC; now) diff --git a/src/api/agent/work.ts b/src/api/agent/work.ts index bf1fa99..93a9ab1 100644 --- a/src/api/agent/work.ts +++ b/src/api/agent/work.ts @@ -42,6 +42,7 @@ import { newInstanceId } from "../publisher/ids.js"; import { CAS_SQL, + CLAIM_BY_RUN_SQL, CLAIM_SQL, INSERT_AGENT_ACTION_SQL, INSERT_RESULT_SQL, @@ -176,6 +177,7 @@ export function createWorkRoutes(options: CreateWorkRoutesOptions): Hono { // Compile the prepared statements once at construction time. Re-binding // happens on every call but the parse step runs once. const claimStmt = db.prepare(CLAIM_SQL); + const claimByRunStmt = db.prepare(CLAIM_BY_RUN_SQL); const casStmt = db.prepare(CAS_SQL); const insertResultStmt = db.prepare(INSERT_RESULT_SQL); const insertActionStmt = db.prepare(INSERT_AGENT_ACTION_SQL); @@ -194,6 +196,22 @@ export function createWorkRoutes(options: CreateWorkRoutesOptions): Hono { const expiresAt = new Date(Date.parse(now) + ttlMs).toISOString(); const token = claimTokenFn(); + // Optional run-scope filter (issue #75). When supplied, the inner + // SELECT in `CLAIM_BY_RUN_SQL` adds `AND run_id = ?` so the agent + // only picks up rows from that run. Useful for end-to-end demos + // where stale claims from earlier rehearsals would otherwise be + // FIFO'd ahead of the fresh run's pre-verify. The default (no + // param, or empty string) preserves the legacy global-FIFO + // behaviour. We treat an empty string the same as missing — Hono's + // `c.req.query` returns `undefined` for missing params, but a bare + // `?run_id=` URL surfaces as `""` and an empty filter would never + // match a real run id. + const runIdParam = c.req.query("run_id"); + const runIdFilter = + typeof runIdParam === "string" && runIdParam.length > 0 + ? runIdParam + : null; + // Atomic claim: BEGIN IMMEDIATE upgrades to RESERVED on entry; the // single UPDATE … RETURNING below is the only write inside the txn. // better-sqlite3 serialises connection-level writes via its native @@ -212,7 +230,12 @@ export function createWorkRoutes(options: CreateWorkRoutesOptions): Hono { db.exec("BEGIN IMMEDIATE"); let row: ClaimedRow | undefined; try { - row = claimStmt.get(token, expiresAt, now) as ClaimedRow | undefined; + row = + runIdFilter === null + ? (claimStmt.get(token, expiresAt, now) as ClaimedRow | undefined) + : (claimByRunStmt.get(token, expiresAt, now, runIdFilter) as + | ClaimedRow + | undefined); db.exec("COMMIT"); } catch (err) { try { diff --git a/src/mcp/server.test.ts b/src/mcp/server.test.ts index 43196cc..2e5026b 100644 --- a/src/mcp/server.test.ts +++ b/src/mcp/server.test.ts @@ -237,7 +237,15 @@ describe("MCP server — pull_task delegates to /work/next", () => { it("`pull_task` MCP call delegates to `/work/next` and returns the same shape (envelope intact)", async () => { const h = await makeInMemoryHarness(); try { - const result = await h.client.callTool({ name: TOOL_PULL_TASK }); + // Pass `arguments: {}` so the SDK's input-schema validator accepts + // the call (issue #75 added an optional `run_id` to the schema; the + // SDK requires `arguments` to be an object even when all fields + // are optional — passing `{}` is semantically identical to omitting + // arguments and exercises the same legacy global-FIFO path). + const result = await h.client.callTool({ + name: TOOL_PULL_TASK, + arguments: {}, + }); // The handler emits both a structuredContent (envelope) and a // matching textual rendering. We assert against the structured // form — that's the contract the host actually consumes. @@ -275,7 +283,11 @@ describe("MCP server — pull_task delegates to /work/next", () => { await client.connect(clientTransport); try { - const result = await client.callTool({ name: TOOL_PULL_TASK }); + // See note above about `arguments: {}` and the optional `run_id`. + const result = await client.callTool({ + name: TOOL_PULL_TASK, + arguments: {}, + }); const envelope = result.structuredContent as EnvelopeResponse; expect(envelope.ok).toBe(true); if (!envelope.ok) throw new Error("unreachable"); @@ -293,7 +305,10 @@ describe("MCP server — submit_result delegates to /work/{claim}/result", () => const h = await makeInMemoryHarness(); try { // First claim a task to obtain a claim token. - const pull = await h.client.callTool({ name: TOOL_PULL_TASK }); + const pull = await h.client.callTool({ + name: TOOL_PULL_TASK, + arguments: {}, + }); const pullEnv = pull.structuredContent as EnvelopeResponse<{ claim: string; }>; @@ -342,6 +357,116 @@ describe("MCP server — submit_result delegates to /work/{claim}/result", () => }); }); +describe("MCP server — pull_task with run_id filter (issue #75)", () => { + /** + * Build a harness that seeds two ready runs: an older "run-OLD" and a + * newer "run-NEW", both on the default test pipeline (so the projected + * payload is the same `Do the first thing.`). Without `run_id`, the + * legacy global FIFO would pick run-OLD; with `run_id: 'run-NEW'`, the + * agent must skip run-OLD and claim run-NEW's row. + */ + async function makeTwoRunHarness(): Promise<{ + client: Client; + db: Database.Database; + cleanup(): Promise; + }> { + const fixture = setupDb(); + const oldNow = "2026-04-29T11:59:00.000Z"; + const newNow = "2026-04-29T12:00:00.000Z"; + + function seed(runId: string, createdAt: string): void { + fixture.db + .prepare( + `INSERT INTO runs + (id, pipeline_id, pipeline_version, status, initial_input_json, + webhook_url, created_at) + VALUES (?, ?, ?, 'running', '{}', 'https://example.test/webhook', ?)`, + ) + .run(runId, TEST_PIPELINE_ID, TEST_PIPELINE_VERSION, createdAt); + fixture.db + .prepare( + `INSERT INTO subtask_instances + (id, run_id, subtask_id, status, input_json, created_at, updated_at) + VALUES (?, ?, ?, 'ready', '{}', ?, ?)`, + ) + .run(`${runId}-first`, runId, "first", createdAt, createdAt); + } + seed("run-OLD", oldNow); + seed("run-NEW", newNow); + + const agentApp = createAgentApp({ db: fixture.db }); + const server = new McpServer({ name: "murmur-test", version: "0.0.0" }); + registerMcpTools(server, { agentApp }); + const [c, s] = InMemoryTransport.createLinkedPair(); + await server.connect(s); + const client = new Client( + { name: "murmur-test-client", version: "0.0.0" }, + { capabilities: {} }, + ); + await client.connect(c); + return { + client, + db: fixture.db, + async cleanup() { + await client.close(); + await server.close(); + fixture.db.close(); + }, + }; + } + + it("`pull_task({ run_id: 'run-NEW' })` claims only that run's work, skipping older rows from other runs", async () => { + const h = await makeTwoRunHarness(); + try { + const result = await h.client.callTool({ + name: TOOL_PULL_TASK, + arguments: { run_id: "run-NEW" }, + }); + const envelope = result.structuredContent as EnvelopeResponse<{ + instructions: string; + claim: string; + }>; + expect(envelope.ok).toBe(true); + if (!envelope.ok || envelope.data === undefined || envelope.data === null) { + throw new Error("expected claim"); + } + // Verify by reading back from the DB which run was claimed. + const claimed = h.db + .prepare( + `SELECT run_id FROM subtask_instances WHERE claim_token = ?`, + ) + .get(envelope.data.claim) as { run_id: string }; + expect(claimed.run_id).toBe("run-NEW"); + + // run-OLD's row remains ready — not picked up despite being older. + const oldRow = h.db + .prepare( + `SELECT status FROM subtask_instances WHERE id = 'run-OLD-first'`, + ) + .get() as { status: string }; + expect(oldRow.status).toBe("ready"); + } finally { + await h.cleanup(); + } + }); + + it("`pull_task({ run_id: 'r_unknown' })` returns null even when the global queue is non-empty", async () => { + const h = await makeTwoRunHarness(); + try { + const result = await h.client.callTool({ + name: TOOL_PULL_TASK, + arguments: { run_id: "r_unknown" }, + }); + const envelope = result.structuredContent as EnvelopeResponse; + expect(envelope.ok).toBe(true); + if (!envelope.ok) throw new Error("unreachable"); + expect(envelope.data).toBeNull(); + } finally { + await h.cleanup(); + } + }); +}); + describe("MCP server — task_tool stub (M7 stand-in)", () => { it("`task_tool` returns `{ ok: false, errors: ['not_implemented'] }` until M7 lands", async () => { const h = await makeInMemoryHarness(); diff --git a/src/mcp/tools.ts b/src/mcp/tools.ts index eaa6e76..873876c 100644 --- a/src/mcp/tools.ts +++ b/src/mcp/tools.ts @@ -64,7 +64,7 @@ import type { EnvelopeResponse } from "@murmur/contracts-types"; * sentence. Stable for the demo; do not change without a DESIGN.md update. */ export const PULL_TASK_DESCRIPTION = - "Atomically claim the oldest unclaimed subtask instance across all pipelines. Returns `{ instructions, input, output_schema, claim }` or `null` when the queue is empty. Use the returned `claim` for any subsequent `task_tool` and `submit_result` calls."; + "Atomically claim the oldest unclaimed subtask instance. Pass `run_id` to scope the pickup to a specific run (useful for driving one pipeline end-to-end without touching unrelated queued work). Without `run_id`, picks the oldest ready subtask across all pipelines. Returns `{ instructions, input, output_schema, claim }` or `null` when the (filtered) queue is empty. Use the returned `claim` for any subsequent `task_tool` and `submit_result` calls."; /** * `submit_result` description (DESIGN.md §3.4 second bullet). @@ -91,6 +91,28 @@ export const TOOL_TASK_TOOL = "task_tool"; /* Tool input schemas (Zod, surfaces as JSON Schema in tools/list) */ /* -------------------------------------------------------------------------- */ +/** + * `pull_task` accepts an optional `run_id` to scope the claim pickup to + * a specific run (issue #75). Omitted → global FIFO over all ready + * subtasks (legacy default). Supplied → only ready subtasks of that run + * are eligible. + * + * The agent flow for a single demoed pipeline is: + * 1. Operator triggers `POST /pipelines/{id}/runs` and shares `run_id`. + * 2. Agent calls `pull_task({ run_id })` on a loop until it returns null. + * This avoids interleaving with stale work from earlier rehearsals or + * unrelated publishers without resorting to a server restart. + */ +export const PULL_TASK_INPUT_SHAPE = { + run_id: z + .string() + .min(1) + .optional() + .describe( + "Optional run id (e.g. `r_abc123`) to restrict the claim pickup to a specific run. Omit to claim the oldest ready subtask across all pipelines.", + ), +} as const; + /** * `submit_result` requires `claim` and `result`; `notes` is optional. * @@ -279,19 +301,59 @@ function isEnvelope(value: unknown): value is EnvelopeResponse { /* ---------------------------- pull_task ---------------------------------- */ function registerPullTask(server: McpServer, agentApp: Hono): void { - // Zero-argument tool: when `inputSchema` is omitted, the SDK's - // `ToolCallback` signature is `(extra) => CallToolResult` - // (NOT `(args, extra)`). See @modelcontextprotocol/sdk's - // `BaseToolCallback` discriminating on `Args extends undefined`. + // Issue #75: `pull_task` now accepts an optional `run_id`. With + // `inputSchema`, the SDK's `ToolCallback` signature becomes + // `(args, extra) => CallToolResult` (vs the no-schema variant's + // single-arg `(extra) =>`). + // + // Defensive shape-check: under some test harnesses (and historical + // SDK versions when the schema is fully-optional) the callback can + // be invoked with a single `(extra)` parameter instead of + // `(args, extra)` — the heads-up from issue #75. We discriminate by + // checking whether the first param has `RequestHandlerExtra`-like + // slots (`requestId`, `sendNotification`); if so, treat it as + // `extra` and `args` is `{}` (legacy zero-arg call). Otherwise + // it's the real Zod-parsed `args` object and `maybeExtra` is the + // `extra`. server.registerTool( TOOL_PULL_TASK, { description: PULL_TASK_DESCRIPTION, - // No inputSchema: zero-argument tool. + inputSchema: PULL_TASK_INPUT_SHAPE, }, - async (extra) => { + async ( + maybeArgs: { run_id?: string | undefined }, + maybeExtra: RequestHandlerExtra, + ) => { + // Defensive shape-check (issue #75 heads-up): some in-process test + // harnesses invoke a fully-optional-schema callback with a single + // `(extra)` argument instead of `(args, extra)`. Discriminate by + // checking whether the first param carries `RequestHandlerExtra` + // marker slots — if so, treat it as `extra` and synthesise an + // empty `args` object. + const firstArgIsExtra = + maybeArgs !== null && + typeof maybeArgs === "object" && + ("requestId" in maybeArgs || "sendNotification" in maybeArgs); + const args = firstArgIsExtra + ? ({} as { run_id?: string | undefined }) + : maybeArgs; + const extra = firstArgIsExtra + ? (maybeArgs as unknown as RequestHandlerExtra< + ServerRequest, + ServerNotification + >) + : maybeExtra; const auth = authHeaderFromExtra(extra); - const env = await callAgentApp(agentApp, "GET", "/next", auth); + const runId = + typeof args.run_id === "string" && args.run_id.length > 0 + ? args.run_id + : null; + const path = + runId === null + ? "/next" + : `/next?run_id=${encodeURIComponent(runId)}`; + const env = await callAgentApp(agentApp, "GET", path, auth); return envelopeResult(env); }, );