Skip to content

Commit f3fdff4

Browse files
dcramercodex
andcommitted
fix(chat): Requeue runnable conversation work
Preserve runnable conversation state when a leased worker completes after needsRun was marked during execution. This keeps continuation and late work recovery from waiting on heartbeat repair. Scope worker and heartbeat queue idempotency keys to a specific wake-up attempt so provider dedupe cannot suppress later legitimate recovery nudges. Move deterministic worker, lease, mailbox, and timeout-resume coverage into component tests and document the layer boundary. Refs GH-470 Co-Authored-By: Codex GPT-5 <noreply@openai.com>
1 parent 013a049 commit f3fdff4

14 files changed

Lines changed: 284 additions & 88 deletions

File tree

packages/junior/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
"build": "tsup && tsc -p tsconfig.build.json --emitDeclarationOnly",
4949
"lint": "oxlint --config .oxlintrc.json --deny-warnings src tests scripts bin tsup.config.ts",
5050
"lint:fix": "oxlint --config .oxlintrc.json --deny-warnings --fix src tests scripts bin tsup.config.ts",
51-
"test": "pnpm run test:slack-boundary && pnpm run test:arch-boundary && vitest run",
51+
"test": "pnpm run test:slack-boundary && pnpm run test:arch-boundary && vitest run --maxWorkers=4",
5252
"test:watch": "vitest",
5353
"test:slack-boundary": "node scripts/check-slack-test-boundary.mjs",
5454
"test:arch-boundary": "depcruise --config .dependency-cruiser.mjs src/chat",

packages/junior/src/chat/task-execution/heartbeat.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ export interface ConversationWorkRecoveryResult {
2020
function heartbeatIdempotencyKey(
2121
reason: string,
2222
conversationId: string,
23+
nowMs: number,
2324
): string {
24-
return `heartbeat:${reason}:${conversationId}`;
25+
return `heartbeat:${reason}:${conversationId}:${nowMs}`;
2526
}
2627

2728
async function sendRecoveryNudge(args: {
@@ -79,7 +80,11 @@ export async function recoverConversationWork(args: {
7980
}
8081
await sendRecoveryNudge({
8182
conversationId,
82-
idempotencyKey: heartbeatIdempotencyKey("lease", conversationId),
83+
idempotencyKey: heartbeatIdempotencyKey(
84+
"lease",
85+
conversationId,
86+
args.nowMs,
87+
),
8388
nowMs: args.nowMs,
8489
queue: args.queue,
8590
state: args.state,
@@ -106,7 +111,11 @@ export async function recoverConversationWork(args: {
106111

107112
await sendRecoveryNudge({
108113
conversationId,
109-
idempotencyKey: heartbeatIdempotencyKey("pending", conversationId),
114+
idempotencyKey: heartbeatIdempotencyKey(
115+
"pending",
116+
conversationId,
117+
args.nowMs,
118+
),
110119
nowMs: args.nowMs,
111120
queue: args.queue,
112121
state: args.state,

packages/junior/src/chat/task-execution/store.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -753,7 +753,7 @@ export async function releaseConversationWork(args: {
753753
});
754754
}
755755

756-
/** Finish a leased conversation when no pending mailbox work remains. */
756+
/** Finish a leased conversation and report whether runnable work remains. */
757757
export async function completeConversationWork(args: {
758758
conversationId: string;
759759
leaseToken: string;
@@ -767,13 +767,14 @@ export async function completeConversationWork(args: {
767767
return "lost_lease";
768768
}
769769
const hasPending = pendingMessages(current).length > 0;
770+
const hasRunnableWork = current.needsRun || hasPending;
770771
await writeWorkState(state, {
771772
...current,
772773
lease: undefined,
773-
needsRun: current.needsRun || hasPending,
774+
needsRun: hasRunnableWork,
774775
updatedAtMs: nowMs,
775776
});
776-
return hasPending ? "pending" : "completed";
777+
return hasRunnableWork ? "pending" : "completed";
777778
});
778779
}
779780

packages/junior/src/chat/task-execution/worker.ts

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,12 @@ function now(options: ProcessConversationWorkOptions): number {
5555
return options.nowMs?.() ?? Date.now();
5656
}
5757

58-
function nudgeIdempotencyKey(reason: string, conversationId: string): string {
59-
return `${reason}:${conversationId}`;
58+
function nudgeIdempotencyKey(
59+
reason: string,
60+
conversationId: string,
61+
nowMs: number,
62+
): string {
63+
return `${reason}:${conversationId}:${nowMs}`;
6064
}
6165

6266
async function sendWakeNudge(args: {
@@ -145,11 +149,12 @@ export async function processConversationWork(
145149
return { status: "no_work" };
146150
}
147151
if (lease.status === "active") {
152+
const nudgeNowMs = now(options);
148153
await sendWakeNudge({
149154
conversationId,
150155
delayMs: CONVERSATION_WORK_DEFER_DELAY_MS,
151-
idempotencyKey: nudgeIdempotencyKey("active", conversationId),
152-
nowMs: now(options),
156+
idempotencyKey: nudgeIdempotencyKey("active", conversationId, nudgeNowMs),
157+
nowMs: nudgeNowMs,
153158
options,
154159
});
155160
logInfo(
@@ -206,25 +211,30 @@ export async function processConversationWork(
206211
try {
207212
const result = await options.run(workerContext);
208213
if (result.status === "yielded") {
214+
const yieldNowMs = now(options);
209215
const continuationMarked = await requestConversationContinuation({
210216
conversationId,
211217
leaseToken: lease.leaseToken,
212-
nowMs: now(options),
218+
nowMs: yieldNowMs,
213219
state: options.state,
214220
});
215221
if (!continuationMarked) {
216222
return { status: "lost_lease" };
217223
}
218224
await sendWakeNudge({
219225
conversationId,
220-
idempotencyKey: nudgeIdempotencyKey("yield", conversationId),
221-
nowMs: now(options),
226+
idempotencyKey: nudgeIdempotencyKey(
227+
"yield",
228+
conversationId,
229+
yieldNowMs,
230+
),
231+
nowMs: yieldNowMs,
222232
options,
223233
});
224234
await releaseConversationWork({
225235
conversationId,
226236
leaseToken: lease.leaseToken,
227-
nowMs: now(options),
237+
nowMs: yieldNowMs,
228238
state: options.state,
229239
});
230240
logInfo(
@@ -249,10 +259,15 @@ export async function processConversationWork(
249259
return { status: "lost_lease" };
250260
}
251261
if (completion === "pending") {
262+
const nudgeNowMs = now(options);
252263
await sendWakeNudge({
253264
conversationId,
254-
idempotencyKey: nudgeIdempotencyKey("pending", conversationId),
255-
nowMs: now(options),
265+
idempotencyKey: nudgeIdempotencyKey(
266+
"pending",
267+
conversationId,
268+
nudgeNowMs,
269+
),
270+
nowMs: nudgeNowMs,
256271
options,
257272
});
258273
return { status: "pending_requeued" };

packages/junior/tests/unit/runtime/timeout-resume.test.ts renamed to packages/junior/tests/component/runtime/timeout-resume.test.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,17 @@ import type { ConversationWorkQueue } from "@/chat/task-execution/queue";
88
import { getConversationWorkState } from "@/chat/task-execution/store";
99
import { disconnectStateAdapter } from "@/chat/state/adapter";
1010

11+
const ORIGINAL_ENV = vi.hoisted(() => {
12+
const original = {
13+
JUNIOR_SECRET: process.env.JUNIOR_SECRET,
14+
JUNIOR_STATE_ADAPTER: process.env.JUNIOR_STATE_ADAPTER,
15+
SLACK_SIGNING_SECRET: process.env.SLACK_SIGNING_SECRET,
16+
};
17+
process.env.JUNIOR_STATE_ADAPTER = "memory";
18+
process.env.JUNIOR_SECRET = "resume-secret";
19+
return original;
20+
});
21+
1122
class FakeQueue implements ConversationWorkQueue {
1223
sent: Array<{
1324
conversationId: string;
@@ -45,9 +56,15 @@ function makeSignedResumeRequest(body: Record<string, unknown>): Request {
4556
});
4657
}
4758

48-
describe("timeout resume callback signing", () => {
49-
const originalSlackSigningSecret = process.env.SLACK_SIGNING_SECRET;
59+
function restoreEnv(name: string, value: string | undefined): void {
60+
if (value === undefined) {
61+
delete process.env[name];
62+
return;
63+
}
64+
process.env[name] = value;
65+
}
5066

67+
describe("timeout resume callback signing", () => {
5168
beforeEach(async () => {
5269
process.env.JUNIOR_STATE_ADAPTER = "memory";
5370
process.env.JUNIOR_SECRET = "resume-secret";
@@ -56,13 +73,9 @@ describe("timeout resume callback signing", () => {
5673

5774
afterEach(async () => {
5875
await disconnectStateAdapter();
59-
delete process.env.JUNIOR_STATE_ADAPTER;
60-
delete process.env.JUNIOR_SECRET;
61-
if (originalSlackSigningSecret === undefined) {
62-
delete process.env.SLACK_SIGNING_SECRET;
63-
} else {
64-
process.env.SLACK_SIGNING_SECRET = originalSlackSigningSecret;
65-
}
76+
restoreEnv("JUNIOR_STATE_ADAPTER", ORIGINAL_ENV.JUNIOR_STATE_ADAPTER);
77+
restoreEnv("JUNIOR_SECRET", ORIGINAL_ENV.JUNIOR_SECRET);
78+
restoreEnv("SLACK_SIGNING_SECRET", ORIGINAL_ENV.SLACK_SIGNING_SECRET);
6679
vi.restoreAllMocks();
6780
});
6881

packages/junior/tests/integration/conversation-work.test.ts renamed to packages/junior/tests/component/task-execution/conversation-work.test.ts

Lines changed: 78 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ describe("conversation work execution", () => {
198198
expect(queue.sent).toEqual([
199199
{
200200
conversationId: CONVERSATION_ID,
201-
idempotencyKey: `heartbeat:pending:${CONVERSATION_ID}`,
201+
idempotencyKey: `heartbeat:pending:${CONVERSATION_ID}:62000`,
202202
},
203203
]);
204204
});
@@ -243,30 +243,73 @@ describe("conversation work execution", () => {
243243
await expect(first).resolves.toEqual({ status: "completed" });
244244
});
245245

246-
it("preserves work requested while a lease is running", async () => {
246+
it("requeues work requested while a lease is running", async () => {
247247
const queue = new FakeQueue();
248+
let currentNowMs = 1_000;
248249
await appendInboundMessage({ message: inboundMessage("m1"), nowMs: 1_000 });
249250

250251
await expect(
251252
processConversationWork(CONVERSATION_ID, {
253+
nowMs: () => currentNowMs,
252254
queue,
253255
run: async (context) => {
254256
await context.drainMailbox(async () => {});
257+
currentNowMs = 2_000;
255258
await requestConversationWork({
256259
conversationId: context.conversationId,
257-
nowMs: 2_000,
260+
nowMs: currentNowMs,
258261
});
259262
return { status: "completed" };
260263
},
261264
}),
262-
).resolves.toEqual({ status: "completed" });
265+
).resolves.toEqual({ status: "pending_requeued" });
263266

264267
const state = await getConversationWorkState({
265268
conversationId: CONVERSATION_ID,
266269
});
267270
expect(state?.lease).toBeUndefined();
268271
expect(state?.needsRun).toBe(true);
269272
expect(state ? countPendingConversationMessages(state) : 0).toBe(0);
273+
expect(queue.sent).toMatchObject([
274+
{
275+
conversationId: CONVERSATION_ID,
276+
idempotencyKey: `pending:${CONVERSATION_ID}:2000`,
277+
},
278+
]);
279+
});
280+
281+
it("uses fresh queue idempotency keys for repeated worker requeues", async () => {
282+
const queue = new FakeQueue();
283+
let currentNowMs = 1_000;
284+
await requestConversationWork({
285+
conversationId: CONVERSATION_ID,
286+
nowMs: currentNowMs,
287+
});
288+
289+
async function runSlice(nowMs: number): Promise<void> {
290+
currentNowMs = nowMs;
291+
await expect(
292+
processConversationWork(CONVERSATION_ID, {
293+
nowMs: () => currentNowMs,
294+
queue,
295+
run: async (context) => {
296+
await requestConversationWork({
297+
conversationId: context.conversationId,
298+
nowMs: currentNowMs,
299+
});
300+
return { status: "completed" };
301+
},
302+
}),
303+
).resolves.toEqual({ status: "pending_requeued" });
304+
}
305+
306+
await runSlice(2_000);
307+
await runSlice(63_000);
308+
309+
expect(queue.sent.map((send) => send.idempotencyKey)).toEqual([
310+
`pending:${CONVERSATION_ID}:2000`,
311+
`pending:${CONVERSATION_ID}:63000`,
312+
]);
270313
});
271314

272315
it("drains pending messages and completes the leased conversation", async () => {
@@ -353,7 +396,7 @@ describe("conversation work execution", () => {
353396
expect(queue.sent).toMatchObject([
354397
{
355398
conversationId: CONVERSATION_ID,
356-
idempotencyKey: `heartbeat:lease:${CONVERSATION_ID}`,
399+
idempotencyKey: `heartbeat:lease:${CONVERSATION_ID}:92000`,
357400
},
358401
]);
359402
});
@@ -371,6 +414,29 @@ describe("conversation work execution", () => {
371414
expect(queue.sent).toHaveLength(1);
372415
});
373416

417+
it("uses fresh queue idempotency keys for repeated heartbeat recovery", async () => {
418+
const queue = new FakeQueue();
419+
await appendInboundMessage({ message: inboundMessage("m1"), nowMs: 1_000 });
420+
421+
await expect(
422+
recoverConversationWork({
423+
nowMs: 62_000,
424+
queue,
425+
}),
426+
).resolves.toEqual({ expiredLeaseCount: 0, pendingCount: 1 });
427+
await expect(
428+
recoverConversationWork({
429+
nowMs: 122_001,
430+
queue,
431+
}),
432+
).resolves.toEqual({ expiredLeaseCount: 0, pendingCount: 1 });
433+
434+
expect(queue.sent.map((send) => send.idempotencyKey)).toEqual([
435+
`heartbeat:pending:${CONVERSATION_ID}:62000`,
436+
`heartbeat:pending:${CONVERSATION_ID}:122001`,
437+
]);
438+
});
439+
374440
it("runs conversation work recovery from the core heartbeat", async () => {
375441
const queue = new FakeQueue();
376442
await appendInboundMessage({ message: inboundMessage("m1"), nowMs: 1_000 });
@@ -383,7 +449,7 @@ describe("conversation work execution", () => {
383449
expect(queue.sent).toEqual([
384450
{
385451
conversationId: CONVERSATION_ID,
386-
idempotencyKey: `heartbeat:pending:${CONVERSATION_ID}`,
452+
idempotencyKey: `heartbeat:pending:${CONVERSATION_ID}:62000`,
387453
},
388454
]);
389455
});
@@ -447,19 +513,22 @@ describe("conversation work execution", () => {
447513

448514
it("requeues instead of completing when final mailbox work remains", async () => {
449515
const queue = new FakeQueue();
516+
let currentNowMs = 1_000;
450517
await appendInboundMessage({ message: inboundMessage("m1"), nowMs: 1_000 });
451518

452519
await expect(
453520
processConversationWork(CONVERSATION_ID, {
521+
nowMs: () => currentNowMs,
454522
queue,
455523
run: async (context) => {
456524
await context.drainMailbox(async () => {});
525+
currentNowMs = 2_100;
457526
await appendInboundMessage({
458527
message: inboundMessage("m2", {
459528
createdAtMs: 2_000,
460529
receivedAtMs: 2_100,
461530
}),
462-
nowMs: 2_100,
531+
nowMs: currentNowMs,
463532
});
464533
return { status: "completed" };
465534
},
@@ -468,7 +537,7 @@ describe("conversation work execution", () => {
468537
expect(queue.sent).toMatchObject([
469538
{
470539
conversationId: CONVERSATION_ID,
471-
idempotencyKey: `pending:${CONVERSATION_ID}`,
540+
idempotencyKey: `pending:${CONVERSATION_ID}:2100`,
472541
},
473542
]);
474543
});
@@ -499,7 +568,7 @@ describe("conversation work execution", () => {
499568
expect(queue.sent).toMatchObject([
500569
{
501570
conversationId: CONVERSATION_ID,
502-
idempotencyKey: `yield:${CONVERSATION_ID}`,
571+
idempotencyKey: `yield:${CONVERSATION_ID}:242000`,
503572
},
504573
]);
505574
});

packages/junior/vitest.config.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ for (const envRoot of [workspaceRoot, packageRoot]) {
2323
}
2424

2525
process.env.JUNIOR_SECRET = "junior-test-secret";
26+
process.env.JUNIOR_STATE_ADAPTER = "memory";
2627
process.env.JUNIOR_STATE_KEY_PREFIX ??= `junior:test:${process.pid}`;
2728

2829
export default defineConfig({

0 commit comments

Comments
 (0)