Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions packages/junior/src/chat/agent-dispatch/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import { isRetryableTurnError } from "@/chat/runtime/turn";
import { scheduleDispatchCallback } from "./signing";
import {
getDispatchConversationId,
getDispatchDestinationLockId,
getDispatchStorageKey,
getDispatchTurnId,
isTerminalDispatchStatus,
Expand Down Expand Up @@ -192,14 +193,15 @@ export async function runAgentDispatchSlice(
}
let dispatch = claimedDispatch;

const conversationId = getDispatchConversationId(dispatch.destination);
const conversationId = getDispatchConversationId(dispatch);
const destinationLockId = getDispatchDestinationLockId(dispatch.destination);
const stateAdapter = getStateAdapter();
await stateAdapter.connect();
const conversationLock = await stateAdapter.acquireLock(
conversationId,
const destinationLock = await stateAdapter.acquireLock(
destinationLockId,
DISPATCH_SLICE_LEASE_MS,
);
if (!conversationLock) {
if (!destinationLock) {
await markDispatch({
dispatch,
status: "pending",
Expand Down Expand Up @@ -462,6 +464,6 @@ export async function runAgentDispatchSlice(
errorMessage: error instanceof Error ? error.message : String(error),
});
} finally {
await stateAdapter.releaseLock(conversationLock);
await stateAdapter.releaseLock(destinationLock);
}
}
11 changes: 9 additions & 2 deletions packages/junior/src/chat/agent-dispatch/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,20 @@ function buildDispatchId(plugin: string, idempotencyKey: string): string {
return `dispatch_${digest}`;
}

/** Map a dispatch destination to the conversation lock and memory key it owns. */
export function getDispatchConversationId(
/** Map a dispatch destination to the lock key that serializes Slack delivery. */
export function getDispatchDestinationLockId(
destination: DispatchRecord["destination"],
): string {
return `slack:${destination.teamId}:${destination.channelId}`;
}

/** Return the isolated persisted conversation key for one dispatch run. */
export function getDispatchConversationId(
dispatch: Pick<DispatchRecord, "id">,
): string {
return `agent-dispatch:${dispatch.id}`;
}

/** Give dispatch slices stable turn ids for resumability and trace correlation. */
export function getDispatchTurnId(dispatchId: string): string {
return `dispatch:${dispatchId}`;
Expand Down
97 changes: 93 additions & 4 deletions packages/junior/tests/integration/agent-dispatch-runner.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import {
createOrGetDispatch,
getDispatchConversationId,
getDispatchDestinationLockId,
getDispatchRecord,
} from "@/chat/agent-dispatch/store";
import { runAgentDispatchSlice } from "@/chat/agent-dispatch/runner";
import { getPersistedThreadState } from "@/chat/runtime/thread-state";
import {
getPersistedThreadState,
persistThreadStateById,
} from "@/chat/runtime/thread-state";
import { RetryableTurnError } from "@/chat/runtime/turn";
import { coerceThreadConversationState } from "@/chat/state/conversation";
import { disconnectStateAdapter, getStateAdapter } from "@/chat/state/adapter";
import type { AssistantReply } from "@/chat/respond";
import { chatPostMessageOk } from "../fixtures/slack/factories/api";
Expand Down Expand Up @@ -70,11 +76,13 @@ describe("agent dispatch runner", () => {
metadata: { runId: "run-1" },
},
});
const dispatchConversationId = getDispatchConversationId(created.record);
const generateAssistantReply = vi.fn(async (_input, context) => {
expect(context.requester).toBeUndefined();
expect(context.authorizationFlowMode).toBe("disabled");
expect(context.correlation).toMatchObject({
conversationId: "slack:T123:C123",
conversationId: dispatchConversationId,
threadId: dispatchConversationId,
channelId: "C123",
teamId: "T123",
actorType: "system",
Expand Down Expand Up @@ -104,7 +112,7 @@ describe("agent dispatch runner", () => {
}),
]);
await expect(
getPersistedThreadState("slack:T123:C123"),
getPersistedThreadState(dispatchConversationId),
).resolves.toMatchObject({
conversation: {
messages: expect.arrayContaining([
Expand All @@ -125,6 +133,84 @@ describe("agent dispatch runner", () => {
]),
},
});
await expect(getPersistedThreadState("slack:T123:C123")).resolves.toEqual(
{},
);
});

it("starts dispatches without inherited destination conversation memory", async () => {
const destinationConversation = coerceThreadConversationState({
conversation: {
messages: [
{
id: "channel-message-1",
role: "user",
text: "Previous scheduled run failed with stale context.",
createdAtMs: Date.parse("2026-05-25T12:00:00.000Z"),
author: { userName: "alice" },
},
],
},
});
await persistThreadStateById("slack:T123:C123", {
conversation: destinationConversation,
});
queueSlackApiResponse("chat.postMessage", {
body: chatPostMessageOk({
channel: "C123",
ts: "1700000000.000003",
}),
});
const created = await createOrGetDispatch({
plugin: "scheduler",
nowMs: Date.parse("2026-05-26T12:00:00.000Z"),
options: {
idempotencyKey: "run-isolated-context",
destination: {
platform: "slack",
teamId: "T123",
channelId: "C123",
},
input: "Run the scheduled task.",
metadata: { runId: "run-isolated-context" },
},
});
const dispatchConversationId = getDispatchConversationId(created.record);
const generateAssistantReply = vi.fn(async (_input, context) => {
expect(context.conversationContext).toBeUndefined();
expect(context.piMessages).toEqual([]);
return createReply();
});

await runAgentDispatchSlice(
{
id: created.record.id,
expectedVersion: created.record.version,
},
{ generateAssistantReply },
);

const persistedDestination =
await getPersistedThreadState("slack:T123:C123");
expect(
coerceThreadConversationState(persistedDestination).messages.map(
(message) => message.id,
),
).toEqual(["channel-message-1"]);
await expect(
getPersistedThreadState(dispatchConversationId),
).resolves.toMatchObject({
conversation: {
messages: expect.arrayContaining([
expect.objectContaining({
id: `dispatch:${created.record.id}:user`,
}),
expect.objectContaining({
id: `dispatch:${created.record.id}:assistant`,
}),
]),
},
});
});

it("persists timeout resume state before scheduling the next slice", async () => {
Expand Down Expand Up @@ -236,7 +322,10 @@ describe("agent dispatch runner", () => {
});
const state = getStateAdapter();
await state.connect();
const lock = await state.acquireLock("slack:T123:C123", 5 * 60 * 1000);
const lock = await state.acquireLock(
getDispatchDestinationLockId(created.record.destination),
5 * 60 * 1000,
);
expect(lock).toBeTruthy();

try {
Expand Down
3 changes: 2 additions & 1 deletion specs/scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Metadata

- Created: 2026-05-18
- Last Edited: 2026-05-28
- Last Edited: 2026-05-31

## Purpose

Expand Down Expand Up @@ -48,6 +48,7 @@ The stored task must include:
The original user utterance may be retained for audit/debugging, but it must not be the sole execution input.

Slack destinations are conversations, not existing threads. A scheduled task may target the active Slack DM or channel, and scheduled output posts as a new message in that conversation.
Each scheduled execution gets fresh dispatch-scoped conversation state. The runner must not load destination-level Slack conversation history as prior thread context for a scheduled run that is creating its own new message.

The scheduler must distinguish conversation audience from visibility. A private direct conversation may use explicit user-delegated credentials for scheduled tools; a private group conversation or private channel is still multi-user and must not implicitly use one user's credentials. Unknown privacy or audience must fail closed for delegated user credentials.

Expand Down
10 changes: 6 additions & 4 deletions specs/trusted-plugin-dispatch.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Metadata

- Created: 2026-05-28
- Last Edited: 2026-05-28
- Last Edited: 2026-05-31

## Purpose

Expand Down Expand Up @@ -106,6 +106,8 @@ type Dispatch = {

Core derives and enforces system actor identity, auth mode, conversation identity, callback scheduling, timeout continuation, sandbox state persistence, delivery behavior, tool policy, logging, tracing, and redaction.

Dispatch conversation identity is scoped to the dispatch record, not to the Slack destination. A dispatch that posts a new Slack message must start with fresh persisted conversation state unless it is resuming the same dispatch id.

## Internal Callback

`agent.dispatch` persists a core-owned dispatch record, then fires a signed internal callback:
Expand Down Expand Up @@ -177,6 +179,7 @@ Lock order is always:
2. destination conversation lock

Code must not acquire those locks in reverse order. Stale recovery uses durable status, version, attempt, and lease fields rather than process memory.
The destination conversation lock serializes Slack delivery for the target conversation; dispatch conversation state remains isolated by dispatch id.

Dispatch leases are not renewed during a slice. Lease duration must exceed the maximum callback slice budget plus platform scheduling slack.

Expand All @@ -186,7 +189,7 @@ The internal callback runs a core-owned dispatched agent runner. The runner owns

- loading and claiming the dispatch record
- acquiring the destination conversation lock
- loading persisted conversation, artifact, sandbox, and channel config state
- loading dispatch-scoped persisted conversation/artifact/sandbox state and destination channel config state
- creating or reusing synthetic system-authored conversation messages
- building conversation context
- calling `generateAssistantReply`
Expand Down Expand Up @@ -224,8 +227,7 @@ Dispatched requests must not use the interactive Slack turn-resume route. Timeou

Continuation invariants:

- one stable conversation id and one stable turn id per dispatch
- turn id derived from dispatch id
- one stable conversation id and one stable turn id per dispatch, both derived from the dispatch id rather than the Slack destination
- duplicate callbacks must not run the same dispatch concurrently
- duplicate callbacks must not deliver assistant output twice
- timeout continuation preserves cumulative usage and duration
Expand Down
Loading