From de207dcac31e101ac674a7587ea330dfd0ab2506 Mon Sep 17 00:00:00 2001 From: David Cramer Date: Sun, 31 May 2026 08:12:22 -0700 Subject: [PATCH] fix(scheduler): Isolate scheduled dispatch context Scope dispatched agent conversation state to the dispatch record instead of the Slack destination. This lets scheduled runs that post new Slack messages start with fresh context while preserving destination-level delivery locking. Fixes GH-451 Co-Authored-By: GPT-5 Codex --- .../junior/src/chat/agent-dispatch/runner.ts | 12 ++- .../junior/src/chat/agent-dispatch/store.ts | 11 ++- .../integration/agent-dispatch-runner.test.ts | 97 ++++++++++++++++++- specs/scheduler.md | 3 +- specs/trusted-plugin-dispatch.md | 10 +- 5 files changed, 117 insertions(+), 16 deletions(-) diff --git a/packages/junior/src/chat/agent-dispatch/runner.ts b/packages/junior/src/chat/agent-dispatch/runner.ts index de02b6fff..b2f227a6b 100644 --- a/packages/junior/src/chat/agent-dispatch/runner.ts +++ b/packages/junior/src/chat/agent-dispatch/runner.ts @@ -47,6 +47,7 @@ import { isRetryableTurnError } from "@/chat/runtime/turn"; import { scheduleDispatchCallback } from "./signing"; import { getDispatchConversationId, + getDispatchDestinationLockId, getDispatchStorageKey, getDispatchTurnId, isTerminalDispatchStatus, @@ -192,14 +193,15 @@ export async function runAgentDispatchSlice( } let dispatch = claimedDispatch; - const conversationId = getDispatchConversationId(dispatch.destination); + const conversationId = getDispatchConversationId(dispatch); + const destinationLockId = getDispatchDestinationLockId(dispatch.destination); const stateAdapter = getStateAdapter(); await stateAdapter.connect(); - const conversationLock = await stateAdapter.acquireLock( - conversationId, + const destinationLock = await stateAdapter.acquireLock( + destinationLockId, DISPATCH_SLICE_LEASE_MS, ); - if (!conversationLock) { + if (!destinationLock) { await markDispatch({ dispatch, status: "pending", @@ -462,6 +464,6 @@ export async function runAgentDispatchSlice( errorMessage: error instanceof Error ? error.message : String(error), }); } finally { - await stateAdapter.releaseLock(conversationLock); + await stateAdapter.releaseLock(destinationLock); } } diff --git a/packages/junior/src/chat/agent-dispatch/store.ts b/packages/junior/src/chat/agent-dispatch/store.ts index 43e38cce6..96145170a 100644 --- a/packages/junior/src/chat/agent-dispatch/store.ts +++ b/packages/junior/src/chat/agent-dispatch/store.ts @@ -56,13 +56,20 @@ function buildDispatchId(plugin: string, idempotencyKey: string): string { return `dispatch_${digest}`; } -/** Map a dispatch destination to the conversation lock and memory key it owns. */ -export function getDispatchConversationId( +/** Map a dispatch destination to the lock key that serializes Slack delivery. */ +export function getDispatchDestinationLockId( destination: DispatchRecord["destination"], ): string { return `slack:${destination.teamId}:${destination.channelId}`; } +/** Return the isolated persisted conversation key for one dispatch run. */ +export function getDispatchConversationId( + dispatch: Pick, +): string { + return `agent-dispatch:${dispatch.id}`; +} + /** Give dispatch slices stable turn ids for resumability and trace correlation. */ export function getDispatchTurnId(dispatchId: string): string { return `dispatch:${dispatchId}`; diff --git a/packages/junior/tests/integration/agent-dispatch-runner.test.ts b/packages/junior/tests/integration/agent-dispatch-runner.test.ts index dfd2b9ffb..2514dabcd 100644 --- a/packages/junior/tests/integration/agent-dispatch-runner.test.ts +++ b/packages/junior/tests/integration/agent-dispatch-runner.test.ts @@ -1,11 +1,17 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createOrGetDispatch, + getDispatchConversationId, + getDispatchDestinationLockId, getDispatchRecord, } from "@/chat/agent-dispatch/store"; import { runAgentDispatchSlice } from "@/chat/agent-dispatch/runner"; -import { getPersistedThreadState } from "@/chat/runtime/thread-state"; +import { + getPersistedThreadState, + persistThreadStateById, +} from "@/chat/runtime/thread-state"; import { RetryableTurnError } from "@/chat/runtime/turn"; +import { coerceThreadConversationState } from "@/chat/state/conversation"; import { disconnectStateAdapter, getStateAdapter } from "@/chat/state/adapter"; import type { AssistantReply } from "@/chat/respond"; import { chatPostMessageOk } from "../fixtures/slack/factories/api"; @@ -70,11 +76,13 @@ describe("agent dispatch runner", () => { metadata: { runId: "run-1" }, }, }); + const dispatchConversationId = getDispatchConversationId(created.record); const generateAssistantReply = vi.fn(async (_input, context) => { expect(context.requester).toBeUndefined(); expect(context.authorizationFlowMode).toBe("disabled"); expect(context.correlation).toMatchObject({ - conversationId: "slack:T123:C123", + conversationId: dispatchConversationId, + threadId: dispatchConversationId, channelId: "C123", teamId: "T123", actorType: "system", @@ -104,7 +112,7 @@ describe("agent dispatch runner", () => { }), ]); await expect( - getPersistedThreadState("slack:T123:C123"), + getPersistedThreadState(dispatchConversationId), ).resolves.toMatchObject({ conversation: { messages: expect.arrayContaining([ @@ -125,6 +133,84 @@ describe("agent dispatch runner", () => { ]), }, }); + await expect(getPersistedThreadState("slack:T123:C123")).resolves.toEqual( + {}, + ); + }); + + it("starts dispatches without inherited destination conversation memory", async () => { + const destinationConversation = coerceThreadConversationState({ + conversation: { + messages: [ + { + id: "channel-message-1", + role: "user", + text: "Previous scheduled run failed with stale context.", + createdAtMs: Date.parse("2026-05-25T12:00:00.000Z"), + author: { userName: "alice" }, + }, + ], + }, + }); + await persistThreadStateById("slack:T123:C123", { + conversation: destinationConversation, + }); + queueSlackApiResponse("chat.postMessage", { + body: chatPostMessageOk({ + channel: "C123", + ts: "1700000000.000003", + }), + }); + const created = await createOrGetDispatch({ + plugin: "scheduler", + nowMs: Date.parse("2026-05-26T12:00:00.000Z"), + options: { + idempotencyKey: "run-isolated-context", + destination: { + platform: "slack", + teamId: "T123", + channelId: "C123", + }, + input: "Run the scheduled task.", + metadata: { runId: "run-isolated-context" }, + }, + }); + const dispatchConversationId = getDispatchConversationId(created.record); + const generateAssistantReply = vi.fn(async (_input, context) => { + expect(context.conversationContext).toBeUndefined(); + expect(context.piMessages).toEqual([]); + return createReply(); + }); + + await runAgentDispatchSlice( + { + id: created.record.id, + expectedVersion: created.record.version, + }, + { generateAssistantReply }, + ); + + const persistedDestination = + await getPersistedThreadState("slack:T123:C123"); + expect( + coerceThreadConversationState(persistedDestination).messages.map( + (message) => message.id, + ), + ).toEqual(["channel-message-1"]); + await expect( + getPersistedThreadState(dispatchConversationId), + ).resolves.toMatchObject({ + conversation: { + messages: expect.arrayContaining([ + expect.objectContaining({ + id: `dispatch:${created.record.id}:user`, + }), + expect.objectContaining({ + id: `dispatch:${created.record.id}:assistant`, + }), + ]), + }, + }); }); it("persists timeout resume state before scheduling the next slice", async () => { @@ -236,7 +322,10 @@ describe("agent dispatch runner", () => { }); const state = getStateAdapter(); await state.connect(); - const lock = await state.acquireLock("slack:T123:C123", 5 * 60 * 1000); + const lock = await state.acquireLock( + getDispatchDestinationLockId(created.record.destination), + 5 * 60 * 1000, + ); expect(lock).toBeTruthy(); try { diff --git a/specs/scheduler.md b/specs/scheduler.md index 39006c2a7..39eec9b0d 100644 --- a/specs/scheduler.md +++ b/specs/scheduler.md @@ -3,7 +3,7 @@ ## Metadata - Created: 2026-05-18 -- Last Edited: 2026-05-28 +- Last Edited: 2026-05-31 ## Purpose @@ -48,6 +48,7 @@ The stored task must include: The original user utterance may be retained for audit/debugging, but it must not be the sole execution input. Slack destinations are conversations, not existing threads. A scheduled task may target the active Slack DM or channel, and scheduled output posts as a new message in that conversation. +Each scheduled execution gets fresh dispatch-scoped conversation state. The runner must not load destination-level Slack conversation history as prior thread context for a scheduled run that is creating its own new message. The scheduler must distinguish conversation audience from visibility. A private direct conversation may use explicit user-delegated credentials for scheduled tools; a private group conversation or private channel is still multi-user and must not implicitly use one user's credentials. Unknown privacy or audience must fail closed for delegated user credentials. diff --git a/specs/trusted-plugin-dispatch.md b/specs/trusted-plugin-dispatch.md index 00c60fabf..31617a548 100644 --- a/specs/trusted-plugin-dispatch.md +++ b/specs/trusted-plugin-dispatch.md @@ -3,7 +3,7 @@ ## Metadata - Created: 2026-05-28 -- Last Edited: 2026-05-28 +- Last Edited: 2026-05-31 ## Purpose @@ -106,6 +106,8 @@ type Dispatch = { Core derives and enforces system actor identity, auth mode, conversation identity, callback scheduling, timeout continuation, sandbox state persistence, delivery behavior, tool policy, logging, tracing, and redaction. +Dispatch conversation identity is scoped to the dispatch record, not to the Slack destination. A dispatch that posts a new Slack message must start with fresh persisted conversation state unless it is resuming the same dispatch id. + ## Internal Callback `agent.dispatch` persists a core-owned dispatch record, then fires a signed internal callback: @@ -177,6 +179,7 @@ Lock order is always: 2. destination conversation lock Code must not acquire those locks in reverse order. Stale recovery uses durable status, version, attempt, and lease fields rather than process memory. +The destination conversation lock serializes Slack delivery for the target conversation; dispatch conversation state remains isolated by dispatch id. Dispatch leases are not renewed during a slice. Lease duration must exceed the maximum callback slice budget plus platform scheduling slack. @@ -186,7 +189,7 @@ The internal callback runs a core-owned dispatched agent runner. The runner owns - loading and claiming the dispatch record - acquiring the destination conversation lock -- loading persisted conversation, artifact, sandbox, and channel config state +- loading dispatch-scoped persisted conversation/artifact/sandbox state and destination channel config state - creating or reusing synthetic system-authored conversation messages - building conversation context - calling `generateAssistantReply` @@ -224,8 +227,7 @@ Dispatched requests must not use the interactive Slack turn-resume route. Timeou Continuation invariants: -- one stable conversation id and one stable turn id per dispatch -- turn id derived from dispatch id +- one stable conversation id and one stable turn id per dispatch, both derived from the dispatch id rather than the Slack destination - duplicate callbacks must not run the same dispatch concurrently - duplicate callbacks must not deliver assistant output twice - timeout continuation preserves cumulative usage and duration