Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 64 additions & 19 deletions src/api/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { EngineEvent, EventSink } from "../engine/types.ts";
import { ingestFiles, type UploadedFile } from "../files/ingest.ts";
import { createFileStore } from "../files/store.ts";
import type { IdentityProvider, UserIdentity } from "../identity/provider.ts";
import { RunInProgressError } from "../runtime/errors.ts";
import { type RequestContext, runWithRequestContext } from "../runtime/request-context.ts";
import type { Runtime } from "../runtime/runtime.ts";
import type { ChatRequest } from "../runtime/types.ts";
Expand All @@ -32,11 +33,31 @@ export async function handleChat(
const parsed = await parseChatBody(request, runtime, features, identity, workspaceId);
if (parsed instanceof Response) return parsed;

const result = await runtime.chat(parsed);
return json({
...result,
...(parsed.workspaceId ? { workspaceId: parsed.workspaceId } : {}),
});
if (parsed.conversationId && runtime.isConversationActive(parsed.conversationId)) {
return runInProgressResponse(parsed.conversationId);
}

try {
const result = await runtime.chat(parsed);
return json({
...result,
...(parsed.workspaceId ? { workspaceId: parsed.workspaceId } : {}),
});
} catch (err) {
if (err instanceof RunInProgressError) {
return runInProgressResponse(err.conversationId);
}
throw err;
}
}

function runInProgressResponse(conversationId: string): Response {
return apiError(
409,
"run_in_progress",
"This conversation already has an active response. Wait for it to finish before sending another message.",
{ conversationId },
);
}

/** Handle POST /v1/chat/stream — SSE streaming chat request. */
Expand All @@ -51,22 +72,12 @@ export async function handleChatStream(
const parsed = await parseChatBody(request, runtime, features, identity, workspaceId);
if (parsed instanceof Response) return parsed;

// Broadcast user.message to other participants (only for existing conversations)
const convId = parsed.conversationId;
if (convId && conversationEventManager && identity) {
conversationEventManager.broadcastToConversation(
convId,
"user.message",
{
userId: identity.id,
displayName: identity.displayName,
content: parsed.message,
timestamp: new Date().toISOString(),
},
identity.id,
);
if (parsed.conversationId && runtime.isConversationActive(parsed.conversationId)) {
return runInProgressResponse(parsed.conversationId);
}

const convId = parsed.conversationId;

const sink = new CallbackEventSink();
let markClosed: () => void;
const stream = new ReadableStream<Uint8Array>({
Expand All @@ -87,6 +98,29 @@ export async function handleChatStream(
controller.close();
};

// Defer the cross-participant user.message broadcast until the engine
// confirms the run actually started (first chat.start). If the call
// rejects with RunInProgressError, no broadcast fires and other
// participants never see a phantom message with no assistant reply.
let userMessageBroadcast = false;
const broadcastUserMessageOnce = () => {
if (userMessageBroadcast) return;
userMessageBroadcast = true;
if (convId && conversationEventManager && identity) {
conversationEventManager.broadcastToConversation(
convId,
"user.message",
{
userId: identity.id,
displayName: identity.displayName,
content: parsed.message,
timestamp: new Date().toISOString(),
},
identity.id,
);
}
};

const unsubscribe = sink.subscribe((event: EngineEvent) => {
if (
event.type === "chat.start" ||
Expand All @@ -96,6 +130,9 @@ export async function handleChatStream(
event.type === "llm.done" ||
event.type === "data.changed"
) {
if (event.type === "chat.start") {
broadcastUserMessageOnce();
}
send(event.type, event.data);
// Broadcast to other participants watching this conversation
if (convId && conversationEventManager && identity) {
Expand Down Expand Up @@ -139,6 +176,14 @@ export async function handleChatStream(
finish();
})
.catch((err) => {
if (err instanceof RunInProgressError) {
send("error", {
error: "run_in_progress",
message: "This conversation already has an active response.",
});
finish();
return;
}
console.error("[routes] handleChatStream failed:", err);
const raw = err instanceof Error ? err.message : String(err);
const friendly = friendlyError(raw);
Expand Down
8 changes: 8 additions & 0 deletions src/runtime/errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/** Thrown when a chat request arrives for a conversation that already has an active run. */
export class RunInProgressError extends Error {
readonly code = "run_in_progress";
constructor(public readonly conversationId: string) {
super(`Conversation ${conversationId} already has an active run`);
this.name = "RunInProgressError";
}
}
32 changes: 32 additions & 0 deletions src/runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import type { ToolRegistry } from "../tools/registry.ts";
import { createSystemTools } from "../tools/system-tools.ts";
import { isResourceReader, type ResourceData, type ResourceReader } from "../tools/types.ts";
import { WorkspaceStore } from "../workspace/workspace-store.ts";
import { RunInProgressError } from "./errors.ts";
import { PlacementRegistry } from "./placement-registry.ts";
import {
getRequestContext,
Expand Down Expand Up @@ -181,6 +182,19 @@ export class Runtime {
| null = null;
private skillResourceCache = new Map<string, { content: string; fetchedAt: number }>();
private static readonly SKILL_CACHE_TTL = 5 * 60 * 1000; // 5 minutes
/**
* Conversation IDs with an in-flight chat() call. Prevents concurrent runs on
* the same conversation.
*
* Scope: single-process / single-pod. Correct today because each tenant runs
* with `platform.replicas: 1` — all chat traffic for a conversation lands on
* the same Runtime instance. If a tenant is ever scaled to multiple replicas,
* this lock stops being authoritative (concurrent requests can land on
* different pods) and this invariant needs to move to a shared store. The
* conversation JSONL on the shared PVC has the same single-writer assumption,
* so the two would need to be addressed together.
*/
private readonly activeConversations = new Set<string>();

private constructor(
_engine: AgentEngine,
Expand Down Expand Up @@ -481,8 +495,26 @@ export class Runtime {
return rt;
}

/** True if a chat() is currently in flight on this conversation. */
isConversationActive(conversationId: string): boolean {
return this.activeConversations.has(conversationId);
}

/** Process a chat message. Optional per-request EventSink for SSE streaming. */
async chat(request: ChatRequest, requestSink?: EventSink): Promise<ChatResult> {
const lockedConvId = request.conversationId;
if (lockedConvId && this.activeConversations.has(lockedConvId)) {
throw new RunInProgressError(lockedConvId);
}
if (lockedConvId) this.activeConversations.add(lockedConvId);
try {
return await this._chatInner(request, requestSink);
} finally {
if (lockedConvId) this.activeConversations.delete(lockedConvId);
}
}

private async _chatInner(request: ChatRequest, requestSink?: EventSink): Promise<ChatResult> {
if (!request.workspaceId) {
throw new Error("workspaceId is required. Every chat request must be workspace-scoped.");
}
Expand Down
21 changes: 14 additions & 7 deletions test/integration/api-integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ describe("integration: windowing under load", () => {

});

it("concurrent requests on a long conversation do not crash", async () => {
it("concurrent requests on a long conversation are rejected cleanly", async () => {
// Create a conversation with some history
const firstRes = await fetch(`${baseUrl}/v1/chat`, {
method: "POST",
Expand All @@ -304,7 +304,8 @@ describe("integration: windowing under load", () => {
});
}

// Now send 5 concurrent requests on the same conversation
// Fire 5 concurrent requests on the same conversation. Only one may run;
// the rest must fail with 409 run_in_progress rather than corrupting state.
const concurrentResults = await Promise.all(
Array.from({ length: 5 }, (_, i) =>
fetch(`${baseUrl}/v1/chat`, {
Expand All @@ -314,14 +315,20 @@ describe("integration: windowing under load", () => {
message: `Concurrent on long conv ${i}`,
conversationId: convId,
}),
}).then((r) => r.json()),
}).then(async (r) => ({ status: r.status, body: await r.json() })),
),
);

// All should succeed without crashing — concurrent requests may race, so only verify structure
for (let i = 0; i < 5; i++) {
expect(typeof concurrentResults[i].response).toBe("string");
expect(concurrentResults[i].conversationId).toBe(convId);
const ok = concurrentResults.filter((r) => r.status === 200);
const rejected = concurrentResults.filter((r) => r.status === 409);
expect(ok.length + rejected.length).toBe(5);
expect(ok.length).toBeGreaterThanOrEqual(1);
for (const r of ok) {
expect(typeof r.body.response).toBe("string");
expect(r.body.conversationId).toBe(convId);
}
for (const r of rejected) {
expect(r.body.error).toBe("run_in_progress");
}
});
});
Expand Down
Loading