diff --git a/packages/junior/src/chat/agent-dispatch/runner.ts b/packages/junior/src/chat/agent-dispatch/runner.ts index de02b6fff..b2f227a6b 100644 --- a/packages/junior/src/chat/agent-dispatch/runner.ts +++ b/packages/junior/src/chat/agent-dispatch/runner.ts @@ -47,6 +47,7 @@ import { isRetryableTurnError } from "@/chat/runtime/turn"; import { scheduleDispatchCallback } from "./signing"; import { getDispatchConversationId, + getDispatchDestinationLockId, getDispatchStorageKey, getDispatchTurnId, isTerminalDispatchStatus, @@ -192,14 +193,15 @@ export async function runAgentDispatchSlice( } let dispatch = claimedDispatch; - const conversationId = getDispatchConversationId(dispatch.destination); + const conversationId = getDispatchConversationId(dispatch); + const destinationLockId = getDispatchDestinationLockId(dispatch.destination); const stateAdapter = getStateAdapter(); await stateAdapter.connect(); - const conversationLock = await stateAdapter.acquireLock( - conversationId, + const destinationLock = await stateAdapter.acquireLock( + destinationLockId, DISPATCH_SLICE_LEASE_MS, ); - if (!conversationLock) { + if (!destinationLock) { await markDispatch({ dispatch, status: "pending", @@ -462,6 +464,6 @@ export async function runAgentDispatchSlice( errorMessage: error instanceof Error ? error.message : String(error), }); } finally { - await stateAdapter.releaseLock(conversationLock); + await stateAdapter.releaseLock(destinationLock); } } diff --git a/packages/junior/src/chat/agent-dispatch/store.ts b/packages/junior/src/chat/agent-dispatch/store.ts index 43e38cce6..96145170a 100644 --- a/packages/junior/src/chat/agent-dispatch/store.ts +++ b/packages/junior/src/chat/agent-dispatch/store.ts @@ -56,13 +56,20 @@ function buildDispatchId(plugin: string, idempotencyKey: string): string { return `dispatch_${digest}`; } -/** Map a dispatch destination to the conversation lock and memory key it owns. */ -export function getDispatchConversationId( +/** Map a dispatch destination to the lock key that serializes Slack delivery. */ +export function getDispatchDestinationLockId( destination: DispatchRecord["destination"], ): string { return `slack:${destination.teamId}:${destination.channelId}`; } +/** Return the isolated persisted conversation key for one dispatch run. */ +export function getDispatchConversationId( + dispatch: Pick, +): string { + return `agent-dispatch:${dispatch.id}`; +} + /** Give dispatch slices stable turn ids for resumability and trace correlation. */ export function getDispatchTurnId(dispatchId: string): string { return `dispatch:${dispatchId}`; diff --git a/packages/junior/tests/integration/agent-dispatch-runner.test.ts b/packages/junior/tests/integration/agent-dispatch-runner.test.ts index dfd2b9ffb..2514dabcd 100644 --- a/packages/junior/tests/integration/agent-dispatch-runner.test.ts +++ b/packages/junior/tests/integration/agent-dispatch-runner.test.ts @@ -1,11 +1,17 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createOrGetDispatch, + getDispatchConversationId, + getDispatchDestinationLockId, getDispatchRecord, } from "@/chat/agent-dispatch/store"; import { runAgentDispatchSlice } from "@/chat/agent-dispatch/runner"; -import { getPersistedThreadState } from "@/chat/runtime/thread-state"; +import { + getPersistedThreadState, + persistThreadStateById, +} from "@/chat/runtime/thread-state"; import { RetryableTurnError } from "@/chat/runtime/turn"; +import { coerceThreadConversationState } from "@/chat/state/conversation"; import { disconnectStateAdapter, getStateAdapter } from "@/chat/state/adapter"; import type { AssistantReply } from "@/chat/respond"; import { chatPostMessageOk } from "../fixtures/slack/factories/api"; @@ -70,11 +76,13 @@ describe("agent dispatch runner", () => { metadata: { runId: "run-1" }, }, }); + const dispatchConversationId = getDispatchConversationId(created.record); const generateAssistantReply = vi.fn(async (_input, context) => { expect(context.requester).toBeUndefined(); expect(context.authorizationFlowMode).toBe("disabled"); expect(context.correlation).toMatchObject({ - conversationId: "slack:T123:C123", + conversationId: dispatchConversationId, + threadId: dispatchConversationId, channelId: "C123", teamId: "T123", actorType: "system", @@ -104,7 +112,7 @@ describe("agent dispatch runner", () => { }), ]); await expect( - getPersistedThreadState("slack:T123:C123"), + getPersistedThreadState(dispatchConversationId), ).resolves.toMatchObject({ conversation: { messages: expect.arrayContaining([ @@ -125,6 +133,84 @@ describe("agent dispatch runner", () => { ]), }, }); + await expect(getPersistedThreadState("slack:T123:C123")).resolves.toEqual( + {}, + ); + }); + + it("starts dispatches without inherited destination conversation memory", async () => { + const destinationConversation = coerceThreadConversationState({ + conversation: { + messages: [ + { + id: "channel-message-1", + role: "user", + text: "Previous scheduled run failed with stale context.", + createdAtMs: Date.parse("2026-05-25T12:00:00.000Z"), + author: { userName: "alice" }, + }, + ], + }, + }); + await persistThreadStateById("slack:T123:C123", { + conversation: destinationConversation, + }); + queueSlackApiResponse("chat.postMessage", { + body: chatPostMessageOk({ + channel: "C123", + ts: "1700000000.000003", + }), + }); + const created = await createOrGetDispatch({ + plugin: "scheduler", + nowMs: Date.parse("2026-05-26T12:00:00.000Z"), + options: { + idempotencyKey: "run-isolated-context", + destination: { + platform: "slack", + teamId: "T123", + channelId: "C123", + }, + input: "Run the scheduled task.", + metadata: { runId: "run-isolated-context" }, + }, + }); + const dispatchConversationId = getDispatchConversationId(created.record); + const generateAssistantReply = vi.fn(async (_input, context) => { + expect(context.conversationContext).toBeUndefined(); + expect(context.piMessages).toEqual([]); + return createReply(); + }); + + await runAgentDispatchSlice( + { + id: created.record.id, + expectedVersion: created.record.version, + }, + { generateAssistantReply }, + ); + + const persistedDestination = + await getPersistedThreadState("slack:T123:C123"); + expect( + coerceThreadConversationState(persistedDestination).messages.map( + (message) => message.id, + ), + ).toEqual(["channel-message-1"]); + await expect( + getPersistedThreadState(dispatchConversationId), + ).resolves.toMatchObject({ + conversation: { + messages: expect.arrayContaining([ + expect.objectContaining({ + id: `dispatch:${created.record.id}:user`, + }), + expect.objectContaining({ + id: `dispatch:${created.record.id}:assistant`, + }), + ]), + }, + }); }); it("persists timeout resume state before scheduling the next slice", async () => { @@ -236,7 +322,10 @@ describe("agent dispatch runner", () => { }); const state = getStateAdapter(); await state.connect(); - const lock = await state.acquireLock("slack:T123:C123", 5 * 60 * 1000); + const lock = await state.acquireLock( + getDispatchDestinationLockId(created.record.destination), + 5 * 60 * 1000, + ); expect(lock).toBeTruthy(); try { diff --git a/specs/scheduler.md b/specs/scheduler.md index 39006c2a7..39eec9b0d 100644 --- a/specs/scheduler.md +++ b/specs/scheduler.md @@ -3,7 +3,7 @@ ## Metadata - Created: 2026-05-18 -- Last Edited: 2026-05-28 +- Last Edited: 2026-05-31 ## Purpose @@ -48,6 +48,7 @@ The stored task must include: The original user utterance may be retained for audit/debugging, but it must not be the sole execution input. Slack destinations are conversations, not existing threads. A scheduled task may target the active Slack DM or channel, and scheduled output posts as a new message in that conversation. +Each scheduled execution gets fresh dispatch-scoped conversation state. The runner must not load destination-level Slack conversation history as prior thread context for a scheduled run that is creating its own new message. The scheduler must distinguish conversation audience from visibility. A private direct conversation may use explicit user-delegated credentials for scheduled tools; a private group conversation or private channel is still multi-user and must not implicitly use one user's credentials. Unknown privacy or audience must fail closed for delegated user credentials. diff --git a/specs/trusted-plugin-dispatch.md b/specs/trusted-plugin-dispatch.md index 00c60fabf..31617a548 100644 --- a/specs/trusted-plugin-dispatch.md +++ b/specs/trusted-plugin-dispatch.md @@ -3,7 +3,7 @@ ## Metadata - Created: 2026-05-28 -- Last Edited: 2026-05-28 +- Last Edited: 2026-05-31 ## Purpose @@ -106,6 +106,8 @@ type Dispatch = { Core derives and enforces system actor identity, auth mode, conversation identity, callback scheduling, timeout continuation, sandbox state persistence, delivery behavior, tool policy, logging, tracing, and redaction. +Dispatch conversation identity is scoped to the dispatch record, not to the Slack destination. A dispatch that posts a new Slack message must start with fresh persisted conversation state unless it is resuming the same dispatch id. + ## Internal Callback `agent.dispatch` persists a core-owned dispatch record, then fires a signed internal callback: @@ -177,6 +179,7 @@ Lock order is always: 2. destination conversation lock Code must not acquire those locks in reverse order. Stale recovery uses durable status, version, attempt, and lease fields rather than process memory. +The destination conversation lock serializes Slack delivery for the target conversation; dispatch conversation state remains isolated by dispatch id. Dispatch leases are not renewed during a slice. Lease duration must exceed the maximum callback slice budget plus platform scheduling slack. @@ -186,7 +189,7 @@ The internal callback runs a core-owned dispatched agent runner. The runner owns - loading and claiming the dispatch record - acquiring the destination conversation lock -- loading persisted conversation, artifact, sandbox, and channel config state +- loading dispatch-scoped persisted conversation/artifact/sandbox state and destination channel config state - creating or reusing synthetic system-authored conversation messages - building conversation context - calling `generateAssistantReply` @@ -224,8 +227,7 @@ Dispatched requests must not use the interactive Slack turn-resume route. Timeou Continuation invariants: -- one stable conversation id and one stable turn id per dispatch -- turn id derived from dispatch id +- one stable conversation id and one stable turn id per dispatch, both derived from the dispatch id rather than the Slack destination - duplicate callbacks must not run the same dispatch concurrently - duplicate callbacks must not deliver assistant output twice - timeout continuation preserves cumulative usage and duration