Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apps/ade-cli/src/services/sync/syncHostService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3681,6 +3681,18 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
peer.subscribedChatSessionIds.add(sessionId);

const session = args.sessionService.get(sessionId);
// Snapshots are byte-capped transcript tails — a long-running turn's
// `status: started` event can sit outside the tail, leaving a client
// that subscribes mid-turn unable to tell the session is streaming.
// Ship the live turn state on the ack so clients don't depend on the
// (slower) changeset pump for running/stop affordances. Resolved
// immediately before each send (getSessionSummary is microtask-only):
// computing it earlier leaves an I/O window (readTranscriptTail) where
// a terminal chat_event could overtake a stale `turnActive: true`.
const resolveLiveStatusFields = async (): Promise<{ turnActive?: boolean }> => {
const liveSummary = await args.agentChatService?.getSessionSummary(sessionId).catch(() => null);
return liveSummary ? { turnActive: liveSummary.status === "active" } : {};
};
const resumePlan = planChatEventResume(chatEventReplayBuffers.get(sessionId), payload?.sinceSeq);
if (resumePlan.mode === "replay") {
// The replay buffer covers everything the peer missed: skip the
Expand All @@ -3697,6 +3709,7 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
truncated: false,
events: [],
resumed: true,
...(await resolveLiveStatusFields()),
};
sendRequired(peer, "chat_subscribe", resumeAck, envelope.requestId);
for (const entry of resumePlan.entries) {
Expand Down Expand Up @@ -3733,6 +3746,7 @@ export function createSyncHostService(args: SyncHostServiceArgs) {
capturedAt: nowIso(),
truncated: transcriptSize > maxBytes,
events,
...(await resolveLiveStatusFields()),
};
sendRequired(peer, "chat_subscribe", snapshot, envelope.requestId);
break;
Expand Down
4 changes: 4 additions & 0 deletions apps/ade-cli/src/services/sync/syncRemoteCommandService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2935,6 +2935,10 @@ function registerGitAndFileRemoteCommands({ args, register }: RemoteCommandRegis
requireService(args.gitService, "Git service not available.").rebaseContinue(parseConflictLaneArgs(payload, "git.rebaseContinue")));
register("git.rebaseAbort", { viewerAllowed: true, queueable: true }, async (payload) =>
requireService(args.gitService, "Git service not available.").rebaseAbort(parseConflictLaneArgs(payload, "git.rebaseAbort")));
register("git.mergeContinue", { viewerAllowed: true, queueable: true }, async (payload) =>
requireService(args.gitService, "Git service not available.").mergeContinue(parseConflictLaneArgs(payload, "git.mergeContinue")));
register("git.mergeAbort", { viewerAllowed: true, queueable: true }, async (payload) =>
requireService(args.gitService, "Git service not available.").mergeAbort(parseConflictLaneArgs(payload, "git.mergeAbort")));
register("git.listBranches", { viewerAllowed: true }, async (payload) =>
requireService(args.gitService, "Git service not available.").listBranches(parseGitListBranchesArgs(payload)));
register("git.checkoutBranch", { viewerAllowed: true, queueable: true }, async (payload) =>
Expand Down
53 changes: 52 additions & 1 deletion apps/desktop/src/main/services/chat/agentChatService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ const mockState = vi.hoisted(() => ({
cursorSdkSendCalls: [] as Array<Record<string, unknown>>,
cursorSdkPolicyUpdates: [] as Array<Record<string, unknown>>,
cursorSdkPooled: null as any,
cursorSdkAgentIdForNextAcquire: null as string | null,
cursorSdkCloudRequests: [] as Array<{ type: string; payload: Record<string, unknown> }>,
cursorSdkCloudResponses: new Map<string, unknown>(),
cursorSendPromptGate: null as Promise<void> | null,
Expand Down Expand Up @@ -631,6 +632,8 @@ vi.mock("./cursorSdkPool", () => ({
),
acquireCursorSdkConnection: vi.fn(async (args: Record<string, unknown>) => {
mockState.cursorSdkAcquireCalls.push(args);
const agentId = mockState.cursorSdkAgentIdForNextAcquire ?? "cursor-sdk-agent-1";
mockState.cursorSdkAgentIdForNextAcquire = null;
const pooled: any = {
process: { exitCode: null, killed: false },
bridge: {
Expand All @@ -639,7 +642,7 @@ vi.mock("./cursorSdkPool", () => ({
onRunResult: null as any,
onHookRequest: null as any,
},
agentId: "cursor-sdk-agent-1",
agentId,
runId: null,
request: vi.fn(async (type: string, payload?: unknown) => {
if (type === "policy_update") {
Expand Down Expand Up @@ -1511,6 +1514,7 @@ beforeEach(() => {
mockState.cursorSdkSendCalls = [];
mockState.cursorSdkPolicyUpdates = [];
mockState.cursorSdkPooled = null;
mockState.cursorSdkAgentIdForNextAcquire = null;
mockState.cursorSdkCloudRequests = [];
mockState.cursorSdkCloudResponses = new Map<string, unknown>();
mockState.cursorSendPromptGate = null;
Expand Down Expand Up @@ -7014,6 +7018,53 @@ describe("createAgentChatService", () => {
expect(mockState.cursorSdkPooled.sendPrompt).toHaveBeenCalledTimes(1);
});

it("injects recent ADE context when Cursor SDK resume opens a new agent", async () => {
process.env.CURSOR_API_KEY = "cursor-test-key";
const { service } = createService();
const session = await service.createSession({
laneId: "lane-1",
provider: "cursor",
model: "composer-2",
modelId: "cursor/composer-2",
});

await service.runSessionTurn({
sessionId: session.id,
text: "Inspect the mobile files tab parity work.",
});
const firstPooled = mockState.cursorSdkPooled;
firstPooled.process.exitCode = 1;
mockState.cursorSdkAgentIdForNextAcquire = "cursor-sdk-agent-2";

await service.runSessionTurn({
sessionId: session.id,
text: "Did you finish the prior work?",
});

expect(mockState.cursorSdkAcquireCalls).toHaveLength(2);
expect(mockState.cursorSdkAcquireCalls[1]).toEqual(
expect.objectContaining({ agentId: "cursor-sdk-agent-1" }),
);
const promptText = String(mockState.cursorSdkSendCalls.at(-1)?.promptText ?? "");
expect(promptText).toContain("Cursor SDK continuity recovery");
expect(promptText).toContain("cursor-sdk-agent-1");
expect(promptText).toContain("cursor-sdk-agent-2");
expect(promptText).toContain("Recent Conversation Tail");
expect(promptText).toContain("User: Inspect the mobile files tab parity work.");
expect(promptText).toContain("Did you finish the prior work?");
// Prompts are prepared before the runtime is acquired, so the rotation
// turn itself stays deduped — but the rotated agent is brand new, so the
// lane execution directive must be re-emitted on the following turn
// instead of staying suppressed by lastLaneDirectiveKey.
expect(promptText).not.toContain("[ADE launch directive]");
await service.runSessionTurn({
sessionId: session.id,
text: "Continue with the next step.",
});
const postRotationPrompt = String(mockState.cursorSdkSendCalls.at(-1)?.promptText ?? "");
expect(postRotationPrompt).toContain("[ADE launch directive]");
});

it("reports active Droid SDK turns so project switching does not close the chat runtime", async () => {
const events: AgentChatEventEnvelope[] = [];
let finishTurn = () => {};
Expand Down
62 changes: 54 additions & 8 deletions apps/desktop/src/main/services/chat/agentChatService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7202,6 +7202,38 @@ export function createAgentChatService(args: {
managed.pendingReconstructionContext = nextContext.length ? nextContext : null;
};

const stageCursorSdkAgentRotationRecovery = (
managed: ManagedChatSession,
previousAgentId: string,
nextAgentId: string,
): void => {
const sections = [
[
"Cursor SDK continuity recovery",
`ADE attempted to resume Cursor SDK agent ${previousAgentId}, but the Cursor SDK opened agent ${nextAgentId} instead.`,
"Use this ADE transcript context to continue the user's work. Do not claim access to hidden Cursor SDK state that was not restored.",
].join("\n"),
];

if (managed.continuitySummary?.trim()) {
sections.push(["Continuity Summary", managed.continuitySummary.trim()].join("\n"));
}

const recentConversation = buildRecentConversationContext(managed);
if (recentConversation.length) {
sections.push(["Recent Conversation Tail", recentConversation].join("\n"));
}

const existing = managed.pendingReconstructionContext?.trim();
if (existing) sections.push(existing);

const nextContext = sections.map((section) => section.trim()).filter((section) => section.length > 0).join("\n\n");
managed.pendingReconstructionContext = nextContext.length ? nextContext : null;
// The rotated agent is brand new, so re-emit the lane execution directive on
// the next turn instead of letting the dedupe key suppress it.
clearLaneDirectiveKey(managed);
};

const detectAuth = async () => {
const snapshot = projectConfigService.get();
const configured = snapshot.effective.ai?.apiKeys;
Expand Down Expand Up @@ -9809,8 +9841,8 @@ export function createAgentChatService(args: {
return;
}

const preserveClaudeResumeState =
managed.runtime.kind === "claude" && reasonAllowsPreservation;
const preserveProviderResumeState =
(managed.runtime.kind === "claude" || managed.runtime.kind === "cursor") && reasonAllowsPreservation;
if (managed.runtime.kind === "codex") {
const runtime = managed.runtime;
const interruptedTurnId = runtime.activeTurnId ?? runtime.startedTurnId ?? null;
Expand Down Expand Up @@ -9868,7 +9900,7 @@ export function createAgentChatService(args: {
if (managed.runtime?.kind === "claude") {
// Mark interrupted so the streaming catch block takes the graceful path
managed.runtime.interrupted = true;
if (preserveClaudeResumeState) persistChatState(managed);
if (preserveProviderResumeState) persistChatState(managed);
cancelClaudeWarmup(managed, managed.runtime, "teardown");
try { managed.runtime.query?.close(); } catch { /* ignore */ }
managed.runtime.inputPump?.close();
Expand Down Expand Up @@ -9908,6 +9940,7 @@ export function createAgentChatService(args: {
cancelCursorPermissionWaiter(w, "Cursor tool approval was cancelled because the session closed.");
}
rt.permissionWaiters.clear();
if (preserveProviderResumeState) persistChatState(managed);
releaseCursorSdkConnection(rt.poolKey, rt.poolGeneration);
managed.runtime = null;
}
Expand All @@ -9920,8 +9953,8 @@ export function createAgentChatService(args: {
releaseDroidSdkConnection(rt.poolKey, rt.poolGeneration);
managed.runtime = null;
}
managed.runtimeInvalidated = !preserveClaudeResumeState;
if (!preserveClaudeResumeState) {
managed.runtimeInvalidated = !preserveProviderResumeState;
if (!preserveProviderResumeState) {
clearLaneDirectiveKey(managed);
}
};
Expand Down Expand Up @@ -19740,7 +19773,7 @@ export function createAgentChatService(args: {
killed: existing.sdk.process.killed,
connected: existing.sdk.process.connected,
});
teardownRuntime(managed, "handle_close");
teardownRuntime(managed, "pool_compaction");
} else {
existing.sdkPolicy = policy;
existing.currentModeId = displayModeId;
Expand Down Expand Up @@ -19836,12 +19869,25 @@ export function createAgentChatService(args: {
throw error;
}
const pooled = acquired.pooled;
const nextCursorSdkAgentId = pooled.agentId?.trim() || null;
if (
persistedCursorSdkAgentId
&& nextCursorSdkAgentId
&& nextCursorSdkAgentId !== persistedCursorSdkAgentId
) {
stageCursorSdkAgentRotationRecovery(managed, persistedCursorSdkAgentId, nextCursorSdkAgentId);
logger.warn("agent_chat.cursor_sdk_agent_rotated_after_resume", {
sessionId: managed.session.id,
previousAgentId: persistedCursorSdkAgentId,
nextAgentId: nextCursorSdkAgentId,
});
}
const rt: CursorRuntime = {
kind: "cursor",
poolKey,
poolGeneration: acquired.generation,
sdk: pooled,
sdkAgentId: pooled.agentId,
sdkAgentId: nextCursorSdkAgentId,
sdkRunId: pooled.runId,
sdkPolicy: policy,
sdkApprovedTools: new Set(),
Expand Down Expand Up @@ -19941,7 +19987,7 @@ export function createAgentChatService(args: {
const reconstructionContext = managed.pendingReconstructionContext?.trim() ?? "";
if (reconstructionContext.length) {
composed = [
"System context (CTO reconstruction, do not echo verbatim):",
"System context (ADE continuity, do not echo verbatim):",
reconstructionContext,
"",
composed,
Expand Down
105 changes: 105 additions & 0 deletions apps/desktop/src/main/services/sync/syncHostService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,111 @@ describe.skipIf(!isCrsqliteAvailable())("syncHostService", () => {
await expect(clientB.queue.next("chat_event", 250)).rejects.toThrow(/Timed out waiting for chat_event/);
}, 15_000);

it("ships live turn state on the chat_subscribe ack so mid-turn subscribers see streaming state", async () => {
const brainDb = await openKvDb(makeDbPath("ade-sync-chat-turnstate-"), createLogger() as any);
const projectRoot = makeProjectRoot("ade-sync-chat-turnstate-project-");
const workspaceRoot = path.join(projectRoot, "workspace");
fs.mkdirSync(workspaceRoot, { recursive: true });
const chatService = createStubChatService();
// Snapshot tails can miss a long turn's `status: started` event, so the
// ack itself must carry the live "a turn is running" state.
chatService.service.getSessionSummary.mockImplementation(async (sessionId: string) =>
sessionId === "session-awaiting"
? { sessionId, status: "active", awaitingInput: true }
: { sessionId, status: "active" });

const host = createSyncHostService({
db: brainDb,
logger: createLogger() as any,
projectId: "project-1",
projectRoot,
port: 0,
fileService: createStubFileService(workspaceRoot) as any,
laneService: {
list: vi.fn().mockResolvedValue([]),
create: vi.fn(),
archive: vi.fn(),
} as any,
prService: {
listAll: vi.fn().mockResolvedValue([]),
refresh: vi.fn().mockResolvedValue([]),
listSnapshots: vi.fn().mockReturnValue([]),
getDetail: vi.fn(),
getStatus: vi.fn(),
getChecks: vi.fn(),
getReviews: vi.fn(),
getComments: vi.fn(),
getFiles: vi.fn(),
createFromLane: vi.fn(),
land: vi.fn(),
closePr: vi.fn(),
requestReviewers: vi.fn(),
} as any,
sessionService: {
list: () => [],
get: () => null,
readTranscriptTail: async () => "",
} as any,
ptyService: {
create: vi.fn(),
} as any,
agentChatService: chatService.service,
computerUseArtifactBrokerService: {
listArtifacts: () => [],
} as any,
pinStore: createStubPinStore(),
});
activeDisposers.push(async () => {
await host.dispose();
brainDb.close();
});

const port = await host.waitUntilListening();
const client = await connectClient({
port,
token: host.getBootstrapToken(),
deviceId: "peer-chat-turnstate",
deviceName: "Peer Chat Turn State",
siteId: brainDb.sync.getSiteId(),
dbVersion: brainDb.sync.getDbVersion(),
});
activeDisposers.push(client.close);

client.ws.send(encodeSyncEnvelope({
type: "chat_subscribe",
payload: { sessionId: "session-running" },
}));
const runningAck = await client.queue.next("chat_subscribe");
expect(runningAck.payload).toMatchObject({
sessionId: "session-running",
turnActive: true,
});

// Awaiting-input sessions still report an active turn — the turn is
// running, just paused on a prompt; clients keep their stop affordance.
client.ws.send(encodeSyncEnvelope({
type: "chat_subscribe",
payload: { sessionId: "session-awaiting" },
}));
const awaitingAck = await client.queue.next("chat_subscribe");
expect(awaitingAck.payload).toMatchObject({
sessionId: "session-awaiting",
turnActive: true,
});

// When the chat service has no summary for the session, the ack must
// omit the field rather than fabricate state — clients treat absence as
// "no live signal" and fall back to transcript-derived streaming state.
chatService.service.getSessionSummary.mockResolvedValue(null);
client.ws.send(encodeSyncEnvelope({
type: "chat_subscribe",
payload: { sessionId: "session-unknown" },
}));
const unknownAck = await client.queue.next("chat_subscribe");
expect(unknownAck.payload).toMatchObject({ sessionId: "session-unknown" });
expect(unknownAck.payload).not.toHaveProperty("turnActive");
}, 15_000);

it("resubscribes chat listeners after reconnect and routes chat remote commands", async () => {
const brainDb = await openKvDb(makeDbPath("ade-sync-chat-commands-"), createLogger() as any);
const projectRoot = makeProjectRoot("ade-sync-chat-commands-project-");
Expand Down
Loading
Loading