Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ Subtasks are pure decisions: `instructions + input → output`. No side effects

### 3.3 Server endpoints (agent-facing, behind MCP)

- `GET /work/next` — atomically claim the oldest unclaimed subtask instance across all pipelines, or 204. Implementation: single statement `UPDATE subtask_instances SET claim_token=?, expires_at=? WHERE id=(SELECT id FROM subtask_instances WHERE claim_token IS NULL AND status='ready' ORDER BY created_at LIMIT 1) RETURNING …` inside `BEGIN IMMEDIATE` on a WAL-mode SQLite. No SELECT-then-UPDATE race.
- `GET /work/next` — atomically claim the oldest unclaimed subtask instance across all pipelines, or 204. Implementation: single statement `UPDATE subtask_instances SET claim_token=?, expires_at=? WHERE id=(SELECT id FROM subtask_instances WHERE claim_token IS NULL AND status='ready' ORDER BY created_at LIMIT 1) RETURNING …` inside `BEGIN IMMEDIATE` on a WAL-mode SQLite. No SELECT-then-UPDATE race. Accepts an optional `?run_id=…` query param; when supplied, the inner SELECT adds `AND run_id = ?` so the agent only picks up subtasks of that run (used by Claude-Code-driven flows that drive a single run end-to-end without consuming unrelated stale work — issue #75). Without the param, the legacy global FIFO applies.
- `POST /work/{claim_token}/result` — submit a structured result. Validates against the subtask's `output_schema` and the claim is consumed atomically: `UPDATE subtask_instances SET result=?, status='done' WHERE claim_token=? AND status='claimed' AND expires_at>now() RETURNING …`. If the row no longer matches (TTL expired, already submitted), the call returns `{accepted: false, reason: 'claim_lost'}` and the agent's submission is discarded.

**Claim semantics (MVP):**
Expand All @@ -297,7 +297,7 @@ No retries on schema-validation failure for MVP — the run fails.

Three static tools, fixed for the lifetime of the connection:

- `pull_task()` → `{ instructions, input, output_schema, claim }` or `null`.
- `pull_task({ run_id?: string })` → `{ instructions, input, output_schema, claim }` or `null`. `run_id` is optional; when supplied, the claim is restricted to ready subtasks of that run (issue #75 — drives a single run end-to-end without picking up unrelated queued work). Forwards to `GET /work/next?run_id=…`.
- `submit_result(claim, result, notes?)` → `{ accepted: true } | { accepted: false, errors: [...] }`. `notes` is an optional free-text reflection persisted in the audit log alongside the structured `result`.
- `task_tool(subcommand: string, claim: string, args?: object)` → `string | object` — universal dispatcher. Invokes a publisher-declared subcommand for a claim (see §3.1). `claim` is required (no session-based fallback in MVP). Static description (visible to the host's tool catalog):

Expand Down
163 changes: 163 additions & 0 deletions src/api/agent/agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,169 @@ describe("default seams", () => {
});
});

/* ---------- run_id-scoped claim (issue #75) ---------- */

describe("GET /work/next?run_id=... — scope claim to a specific run", () => {
/**
* Helper: seed a second pipeline + run (the default `TEST_PIPELINE_DEF`
* only declares `first` / `second`; the run-scope tests need a separate
* subtask id so we can probe pipeline-def lookup with a parallel run).
* Returns nothing; the caller observes via `/work/next`.
*/
function seedSecondPipelineAndRun(
db: Database.Database,
runId: string,
subtaskId: string,
createdAt: string,
): void {
const pipelineId = `${runId}-pipe`;
const def = {
id: pipelineId,
subtasks: [
{
id: subtaskId,
instructions: `do ${subtaskId}`,
output_schema: {
type: "object",
properties: { ok: { type: "boolean" } },
required: ["ok"],
additionalProperties: false,
},
},
],
};
db.prepare(
`INSERT INTO pipelines (id, version, def_json, created_at, updated_at)
VALUES (?, 1, ?, ?, ?)`,
).run(pipelineId, JSON.stringify(def), createdAt, createdAt);
db.prepare(
`INSERT INTO runs
(id, pipeline_id, pipeline_version, status, initial_input_json,
webhook_url, created_at)
VALUES (?, ?, 1, 'running', '{}', 'https://example.test/webhook', ?)`,
).run(runId, pipelineId, createdAt);
db.prepare(
`INSERT INTO subtask_instances
(id, run_id, subtask_id, status, input_json, created_at, updated_at)
VALUES (?, ?, ?, 'ready', '{}', ?, ?)`,
).run(`${runId}-${subtaskId}`, runId, subtaskId, createdAt, createdAt);
}

it("with no `run_id` claims oldest ready row globally (legacy behaviour)", async () => {
const h = makeHarness();
try {
// Two runs: run-OLD seeded first (older created_at), run-NEW seeded
// second (newer). Without run_id, the legacy global FIFO must pick
// run-OLD's `first` ahead of run-NEW.
const oldNow = "2026-04-29T12:00:00.000Z";
const newNow = "2026-04-29T12:00:01.000Z";
seedRun(h.db, "run-OLD", ["first", "second"], oldNow);
seedRun(h.db, "run-NEW", ["first", "second"], newNow);

const { status, body } = await getJson<NextWorkData>(h.app, "/next");
expect(status).toBe(200);
if (!body.ok || !body.data) throw new Error("expected claim");
// The claim's projected payload doesn't include run_id, but we can
// verify by reading back from the DB which instance was claimed.
const claimed = h.db
.prepare(
`SELECT run_id FROM subtask_instances WHERE claim_token = ?`,
)
.get(body.data.claim) as { run_id: string };
expect(claimed.run_id).toBe("run-OLD");
} finally {
h.db.close();
}
});

it("with `run_id=r_X` claims oldest ready row whose run_id == r_X, ignoring older rows from other runs", async () => {
const h = makeHarness();
try {
// Two runs: run-OLD is older + globally first in FIFO; run-NEW is
// newer. Asking for run-NEW's id MUST skip run-OLD even though it's
// older.
const oldNow = "2026-04-29T12:00:00.000Z";
const newNow = "2026-04-29T12:00:01.000Z";
seedRun(h.db, "run-OLD", ["first", "second"], oldNow);
seedRun(h.db, "run-NEW", ["first", "second"], newNow);

const { status, body } = await getJson<NextWorkData>(
h.app,
"/next?run_id=run-NEW",
);
expect(status).toBe(200);
if (!body.ok || !body.data) throw new Error("expected claim");
const claimed = h.db
.prepare(
`SELECT run_id, subtask_id FROM subtask_instances WHERE claim_token = ?`,
)
.get(body.data.claim) as { run_id: string; subtask_id: string };
expect(claimed.run_id).toBe("run-NEW");
expect(claimed.subtask_id).toBe("first");

// run-OLD's `first` must remain `ready` (not claimed by this call).
const oldStatus = h.db
.prepare(
`SELECT status FROM subtask_instances WHERE id = 'run-OLD-first'`,
)
.get() as { status: string };
expect(oldStatus.status).toBe("ready");
} finally {
h.db.close();
}
});

it("with `run_id=r_unknown` returns { ok: true, data: null }", async () => {
const h = makeHarness();
try {
// Even with a populated global queue, an unknown run_id matches no
// rows — must return null, not the default-FIFO row.
seedRun(h.db, "run-OLD", ["first"], h.nowFn());

const { status, body } = await getJson<NextWorkData | null>(
h.app,
"/next?run_id=r_unknown",
);
expect(status).toBe(200);
expect(body).toEqual({ ok: true, data: null });

// run-OLD's row stays ready (not consumed).
const row = h.db
.prepare(
`SELECT status FROM subtask_instances WHERE id = 'run-OLD-first'`,
)
.get() as { status: string };
expect(row.status).toBe("ready");
} finally {
h.db.close();
}
});

it("works across distinct pipelines — run_id filter is independent of pipeline_id", async () => {
// Sanity-check: the inner SELECT filters on run_id only, so two runs
// belonging to different pipelines are still distinguishable. This
// mirrors the demo scenario where a stale rehearsal run from
// pipeline-A and a fresh user-driven run on pipeline-B share the
// queue.
const h = makeHarness();
try {
const oldNow = "2026-04-29T12:00:00.000Z";
const newNow = "2026-04-29T12:00:01.000Z";
seedRun(h.db, "run-OLD", ["first"], oldNow);
seedSecondPipelineAndRun(h.db, "run-NEW", "alpha", newNow);

const { body } = await getJson<NextWorkData>(
h.app,
"/next?run_id=run-NEW",
);
if (!body.ok || !body.data) throw new Error("expected claim");
expect(body.data.instructions).toBe("do alpha");
} finally {
h.db.close();
}
});
});

/* ---------- Envelope grep gate ---------- */

describe("envelope grep gate", () => {
Expand Down
33 changes: 33 additions & 0 deletions src/api/agent/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,39 @@ UPDATE subtask_instances
RETURNING id, run_id, subtask_id, input_json, claim_token, expires_at
`;

/**
* Run-scoped variant of {@link CLAIM_SQL}. Same shape; the inner SELECT
* adds `AND run_id = ?` so an agent can drive a specific run end-to-end
* without picking up unrelated stale claims when the global queue has
* other runs interleaved (common during demo rehearsals or when several
* publishers share the murmur instance — issue #75).
*
* Bound parameters:
* 1. claim_token (TEXT, fresh)
* 2. expires_at (TEXT, RFC 3339 UTC; now + ttlMs)
* 3. updated_at (TEXT, RFC 3339 UTC; now)
* 4. run_id (TEXT, the run to scope the pickup to)
*
* The `(status='ready', created_at)` part of the inner SELECT predicate
* matches `idx_subtask_instances_ready`; SQLite's planner tolerates the
* extra `AND run_id = ?` filter without dropping the index — the
* predicate is evaluated row-by-row off the leaf, which is fine because
* the leaf is already narrowed to `status='ready' AND claim_token IS NULL`.
*/
export const CLAIM_BY_RUN_SQL = `
UPDATE subtask_instances
SET claim_token = ?,
expires_at = ?,
status = 'claimed',
updated_at = ?
WHERE id = (
SELECT id FROM subtask_instances
WHERE claim_token IS NULL AND status = 'ready' AND run_id = ?
ORDER BY created_at LIMIT 1
)
RETURNING id, run_id, subtask_id, input_json, claim_token, expires_at
`;

/**
* Atomic CAS submit. Bound parameters:
* 1. updated_at (TEXT, RFC 3339 UTC; now)
Expand Down
25 changes: 24 additions & 1 deletion src/api/agent/work.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import { newInstanceId } from "../publisher/ids.js";

import {
CAS_SQL,
CLAIM_BY_RUN_SQL,
CLAIM_SQL,
INSERT_AGENT_ACTION_SQL,
INSERT_RESULT_SQL,
Expand Down Expand Up @@ -176,6 +177,7 @@ export function createWorkRoutes(options: CreateWorkRoutesOptions): Hono {
// Compile the prepared statements once at construction time. Re-binding
// happens on every call but the parse step runs once.
const claimStmt = db.prepare(CLAIM_SQL);
const claimByRunStmt = db.prepare(CLAIM_BY_RUN_SQL);
const casStmt = db.prepare(CAS_SQL);
const insertResultStmt = db.prepare(INSERT_RESULT_SQL);
const insertActionStmt = db.prepare(INSERT_AGENT_ACTION_SQL);
Expand All @@ -194,6 +196,22 @@ export function createWorkRoutes(options: CreateWorkRoutesOptions): Hono {
const expiresAt = new Date(Date.parse(now) + ttlMs).toISOString();
const token = claimTokenFn();

// Optional run-scope filter (issue #75). When supplied, the inner
// SELECT in `CLAIM_BY_RUN_SQL` adds `AND run_id = ?` so the agent
// only picks up rows from that run. Useful for end-to-end demos
// where stale claims from earlier rehearsals would otherwise be
// FIFO'd ahead of the fresh run's pre-verify. The default (no
// param, or empty string) preserves the legacy global-FIFO
// behaviour. We treat an empty string the same as missing — Hono's
// `c.req.query` returns `undefined` for missing params, but a bare
// `?run_id=` URL surfaces as `""` and an empty filter would never
// match a real run id.
const runIdParam = c.req.query("run_id");
const runIdFilter =
typeof runIdParam === "string" && runIdParam.length > 0
? runIdParam
: null;

// Atomic claim: BEGIN IMMEDIATE upgrades to RESERVED on entry; the
// single UPDATE … RETURNING below is the only write inside the txn.
// better-sqlite3 serialises connection-level writes via its native
Expand All @@ -212,7 +230,12 @@ export function createWorkRoutes(options: CreateWorkRoutesOptions): Hono {
db.exec("BEGIN IMMEDIATE");
let row: ClaimedRow | undefined;
try {
row = claimStmt.get(token, expiresAt, now) as ClaimedRow | undefined;
row =
runIdFilter === null
? (claimStmt.get(token, expiresAt, now) as ClaimedRow | undefined)
: (claimByRunStmt.get(token, expiresAt, now, runIdFilter) as
| ClaimedRow
| undefined);
db.exec("COMMIT");
} catch (err) {
try {
Expand Down
Loading
Loading