diff --git a/src/api/handlers.ts b/src/api/handlers.ts index 02442a3..b844399 100644 --- a/src/api/handlers.ts +++ b/src/api/handlers.ts @@ -7,6 +7,7 @@ import type { EngineEvent, EventSink } from "../engine/types.ts"; import { ingestFiles, type UploadedFile } from "../files/ingest.ts"; import { createFileStore } from "../files/store.ts"; import type { IdentityProvider, UserIdentity } from "../identity/provider.ts"; +import { RunInProgressError } from "../runtime/errors.ts"; import { type RequestContext, runWithRequestContext } from "../runtime/request-context.ts"; import type { Runtime } from "../runtime/runtime.ts"; import type { ChatRequest } from "../runtime/types.ts"; @@ -32,11 +33,31 @@ export async function handleChat( const parsed = await parseChatBody(request, runtime, features, identity, workspaceId); if (parsed instanceof Response) return parsed; - const result = await runtime.chat(parsed); - return json({ - ...result, - ...(parsed.workspaceId ? { workspaceId: parsed.workspaceId } : {}), - }); + if (parsed.conversationId && runtime.isConversationActive(parsed.conversationId)) { + return runInProgressResponse(parsed.conversationId); + } + + try { + const result = await runtime.chat(parsed); + return json({ + ...result, + ...(parsed.workspaceId ? { workspaceId: parsed.workspaceId } : {}), + }); + } catch (err) { + if (err instanceof RunInProgressError) { + return runInProgressResponse(err.conversationId); + } + throw err; + } +} + +function runInProgressResponse(conversationId: string): Response { + return apiError( + 409, + "run_in_progress", + "This conversation already has an active response. Wait for it to finish before sending another message.", + { conversationId }, + ); } /** Handle POST /v1/chat/stream — SSE streaming chat request. */ @@ -51,22 +72,12 @@ export async function handleChatStream( const parsed = await parseChatBody(request, runtime, features, identity, workspaceId); if (parsed instanceof Response) return parsed; - // Broadcast user.message to other participants (only for existing conversations) - const convId = parsed.conversationId; - if (convId && conversationEventManager && identity) { - conversationEventManager.broadcastToConversation( - convId, - "user.message", - { - userId: identity.id, - displayName: identity.displayName, - content: parsed.message, - timestamp: new Date().toISOString(), - }, - identity.id, - ); + if (parsed.conversationId && runtime.isConversationActive(parsed.conversationId)) { + return runInProgressResponse(parsed.conversationId); } + const convId = parsed.conversationId; + const sink = new CallbackEventSink(); let markClosed: () => void; const stream = new ReadableStream({ @@ -87,6 +98,29 @@ export async function handleChatStream( controller.close(); }; + // Defer the cross-participant user.message broadcast until the engine + // confirms the run actually started (first chat.start). If the call + // rejects with RunInProgressError, no broadcast fires and other + // participants never see a phantom message with no assistant reply. + let userMessageBroadcast = false; + const broadcastUserMessageOnce = () => { + if (userMessageBroadcast) return; + userMessageBroadcast = true; + if (convId && conversationEventManager && identity) { + conversationEventManager.broadcastToConversation( + convId, + "user.message", + { + userId: identity.id, + displayName: identity.displayName, + content: parsed.message, + timestamp: new Date().toISOString(), + }, + identity.id, + ); + } + }; + const unsubscribe = sink.subscribe((event: EngineEvent) => { if ( event.type === "chat.start" || @@ -96,6 +130,9 @@ export async function handleChatStream( event.type === "llm.done" || event.type === "data.changed" ) { + if (event.type === "chat.start") { + broadcastUserMessageOnce(); + } send(event.type, event.data); // Broadcast to other participants watching this conversation if (convId && conversationEventManager && identity) { @@ -139,6 +176,14 @@ export async function handleChatStream( finish(); }) .catch((err) => { + if (err instanceof RunInProgressError) { + send("error", { + error: "run_in_progress", + message: "This conversation already has an active response.", + }); + finish(); + return; + } console.error("[routes] handleChatStream failed:", err); const raw = err instanceof Error ? err.message : String(err); const friendly = friendlyError(raw); diff --git a/src/runtime/errors.ts b/src/runtime/errors.ts new file mode 100644 index 0000000..810db18 --- /dev/null +++ b/src/runtime/errors.ts @@ -0,0 +1,8 @@ +/** Thrown when a chat request arrives for a conversation that already has an active run. */ +export class RunInProgressError extends Error { + readonly code = "run_in_progress"; + constructor(public readonly conversationId: string) { + super(`Conversation ${conversationId} already has an active run`); + this.name = "RunInProgressError"; + } +} diff --git a/src/runtime/runtime.ts b/src/runtime/runtime.ts index d67271f..25cc9a3 100644 --- a/src/runtime/runtime.ts +++ b/src/runtime/runtime.ts @@ -56,6 +56,7 @@ import type { ToolRegistry } from "../tools/registry.ts"; import { createSystemTools } from "../tools/system-tools.ts"; import { isResourceReader, type ResourceData, type ResourceReader } from "../tools/types.ts"; import { WorkspaceStore } from "../workspace/workspace-store.ts"; +import { RunInProgressError } from "./errors.ts"; import { PlacementRegistry } from "./placement-registry.ts"; import { getRequestContext, @@ -181,6 +182,19 @@ export class Runtime { | null = null; private skillResourceCache = new Map(); private static readonly SKILL_CACHE_TTL = 5 * 60 * 1000; // 5 minutes + /** + * Conversation IDs with an in-flight chat() call. Prevents concurrent runs on + * the same conversation. + * + * Scope: single-process / single-pod. Correct today because each tenant runs + * with `platform.replicas: 1` — all chat traffic for a conversation lands on + * the same Runtime instance. If a tenant is ever scaled to multiple replicas, + * this lock stops being authoritative (concurrent requests can land on + * different pods) and this invariant needs to move to a shared store. The + * conversation JSONL on the shared PVC has the same single-writer assumption, + * so the two would need to be addressed together. + */ + private readonly activeConversations = new Set(); private constructor( _engine: AgentEngine, @@ -481,8 +495,26 @@ export class Runtime { return rt; } + /** True if a chat() is currently in flight on this conversation. */ + isConversationActive(conversationId: string): boolean { + return this.activeConversations.has(conversationId); + } + /** Process a chat message. Optional per-request EventSink for SSE streaming. */ async chat(request: ChatRequest, requestSink?: EventSink): Promise { + const lockedConvId = request.conversationId; + if (lockedConvId && this.activeConversations.has(lockedConvId)) { + throw new RunInProgressError(lockedConvId); + } + if (lockedConvId) this.activeConversations.add(lockedConvId); + try { + return await this._chatInner(request, requestSink); + } finally { + if (lockedConvId) this.activeConversations.delete(lockedConvId); + } + } + + private async _chatInner(request: ChatRequest, requestSink?: EventSink): Promise { if (!request.workspaceId) { throw new Error("workspaceId is required. Every chat request must be workspace-scoped."); } diff --git a/test/integration/api-integration.test.ts b/test/integration/api-integration.test.ts index 9bce56e..ddf2a26 100644 --- a/test/integration/api-integration.test.ts +++ b/test/integration/api-integration.test.ts @@ -282,7 +282,7 @@ describe("integration: windowing under load", () => { }); - it("concurrent requests on a long conversation do not crash", async () => { + it("concurrent requests on a long conversation are rejected cleanly", async () => { // Create a conversation with some history const firstRes = await fetch(`${baseUrl}/v1/chat`, { method: "POST", @@ -304,7 +304,8 @@ describe("integration: windowing under load", () => { }); } - // Now send 5 concurrent requests on the same conversation + // Fire 5 concurrent requests on the same conversation. Only one may run; + // the rest must fail with 409 run_in_progress rather than corrupting state. const concurrentResults = await Promise.all( Array.from({ length: 5 }, (_, i) => fetch(`${baseUrl}/v1/chat`, { @@ -314,14 +315,20 @@ describe("integration: windowing under load", () => { message: `Concurrent on long conv ${i}`, conversationId: convId, }), - }).then((r) => r.json()), + }).then(async (r) => ({ status: r.status, body: await r.json() })), ), ); - // All should succeed without crashing — concurrent requests may race, so only verify structure - for (let i = 0; i < 5; i++) { - expect(typeof concurrentResults[i].response).toBe("string"); - expect(concurrentResults[i].conversationId).toBe(convId); + const ok = concurrentResults.filter((r) => r.status === 200); + const rejected = concurrentResults.filter((r) => r.status === 409); + expect(ok.length + rejected.length).toBe(5); + expect(ok.length).toBeGreaterThanOrEqual(1); + for (const r of ok) { + expect(typeof r.body.response).toBe("string"); + expect(r.body.conversationId).toBe(convId); + } + for (const r of rejected) { + expect(r.body.error).toBe("run_in_progress"); } }); }); diff --git a/test/integration/chat-stream-concurrent.test.ts b/test/integration/chat-stream-concurrent.test.ts new file mode 100644 index 0000000..38cd3fb --- /dev/null +++ b/test/integration/chat-stream-concurrent.test.ts @@ -0,0 +1,176 @@ +/** + * HTTP-level tests for /v1/chat/stream concurrency protection. + * + * Covers both ways a concurrent stream request can be rejected: + * 1. Pre-check path — runtime.isConversationActive() returns true, handler + * returns HTTP 409 without opening the SSE stream. + * 2. Race path — pre-check passes but runtime.chat() acquires the lock first + * for another request; the losing request's stream opens, emits an SSE + * error event with `error: "run_in_progress"`, and closes. + */ + +import { afterEach, describe, expect, test } from "bun:test"; +import type { ServerHandle } from "../../src/api/server.ts"; +import { startServer } from "../../src/api/server.ts"; +import { Runtime } from "../../src/runtime/runtime.ts"; +import { createEchoModel } from "../helpers/echo-model.ts"; +import { createMockModel } from "../helpers/mock-model.ts"; +import { TEST_WORKSPACE_ID, provisionTestWorkspace } from "../helpers/test-workspace.ts"; + +interface SSEEvent { + event: string; + data: string; +} + +function parseSSE(text: string): SSEEvent[] { + const events: SSEEvent[] = []; + for (const block of text.split("\n\n").filter((b) => b.trim())) { + let event = ""; + let data = ""; + for (const line of block.split("\n")) { + if (line.startsWith("event: ")) event = line.slice(7); + else if (line.startsWith("data: ")) data = line.slice(6); + } + if (event) events.push({ event, data }); + } + return events; +} + +describe("POST /v1/chat/stream — concurrency protection", () => { + let handle: ServerHandle | null = null; + let runtime: Runtime | null = null; + + afterEach(async () => { + handle?.stop(true); + await runtime?.shutdown(); + handle = null; + runtime = null; + }); + + test("returns HTTP 409 when pre-check sees an in-flight run on the same conversation", async () => { + // A gated model lets us hold runtime.chat() open deterministically. The + // first call to doGenerate awaits the gate; releasing it lets the seed + // chat complete. + let release!: () => void; + const gate = new Promise((resolve) => { + release = resolve; + }); + let callCount = 0; + const gatedModel = createMockModel(async () => { + callCount++; + if (callCount === 1) { + // Only the first call (the seed run we hold open) is gated. + return { content: [{ type: "text", text: "seeded" }] }; + } + await gate; + return { content: [{ type: "text", text: "unblocked" }] }; + }); + + runtime = await Runtime.start({ + model: { provider: "custom", adapter: gatedModel }, + noDefaultBundles: true, + logging: { disabled: true }, + }); + await provisionTestWorkspace(runtime); + handle = startServer({ runtime, port: 0 }); + const baseUrl = `http://localhost:${handle.port}`; + + // Seed a conversation (first doGenerate call returns immediately). + const seed = await runtime.chat({ + message: "seed", + workspaceId: TEST_WORKSPACE_ID, + }); + const convId = seed.conversationId; + + // Start a second runtime.chat() but don't await — the lock is acquired + // synchronously before the first internal await, and doGenerate will + // block on the gate, so the lock is held while we make the HTTP call. + const inFlight = runtime.chat({ + message: "holding the lock", + conversationId: convId, + workspaceId: TEST_WORKSPACE_ID, + }); + expect(runtime.isConversationActive(convId)).toBe(true); + + // Streaming request on the same conversation must be pre-checked and + // rejected with a JSON 409 — no SSE stream should be opened. + const res = await fetch(`${baseUrl}/v1/chat/stream`, { + method: "POST", + headers: { "Content-Type": "application/json", "X-Workspace-Id": TEST_WORKSPACE_ID }, + body: JSON.stringify({ message: "collides", conversationId: convId }), + }); + expect(res.status).toBe(409); + expect(res.headers.get("Content-Type")).toMatch(/application\/json/); + const body = await res.json(); + expect(body.error).toBe("run_in_progress"); + expect(body.details?.conversationId).toBe(convId); + + // Release the gate and let the in-flight call finish so teardown is clean. + release(); + await inFlight; + }); + + test("concurrent stream requests produce exactly one successful run; the rest are rejected", async () => { + runtime = await Runtime.start({ + model: { provider: "custom", adapter: createEchoModel() }, + noDefaultBundles: true, + logging: { disabled: true }, + }); + await provisionTestWorkspace(runtime); + handle = startServer({ runtime, port: 0 }); + const baseUrl = `http://localhost:${handle.port}`; + + // Seed a conversation we can contend on. + const seed = await runtime.chat({ + message: "seed", + workspaceId: TEST_WORKSPACE_ID, + }); + const convId = seed.conversationId; + + // Fire 5 concurrent /v1/chat/stream requests on the same convId. Each will + // either: + // a) get HTTP 409 from the pre-check, or + // b) open the stream and get an SSE error `run_in_progress` from the + // runtime.chat() reject path, or + // c) be the single winner, emitting a `done` event. + // Either (a) or (b) is a valid rejection shape — the invariant is that + // across all responses, at most one `done` event is produced and every + // other response is cleanly rejected with the stable error code. + const results = await Promise.all( + Array.from({ length: 5 }, (_, i) => + fetch(`${baseUrl}/v1/chat/stream`, { + method: "POST", + headers: { "Content-Type": "application/json", "X-Workspace-Id": TEST_WORKSPACE_ID }, + body: JSON.stringify({ message: `concurrent ${i}`, conversationId: convId }), + }).then(async (r) => { + if (r.status === 409) { + const body = await r.json(); + return { kind: "http409" as const, error: body.error }; + } + const text = await r.text(); + const events = parseSSE(text); + const done = events.find((e) => e.event === "done"); + const err = events.find((e) => e.event === "error"); + return { + kind: done ? ("done" as const) : ("sseError" as const), + error: err ? (JSON.parse(err.data).error as string) : undefined, + }; + }), + ), + ); + + const winners = results.filter((r) => r.kind === "done"); + const http409 = results.filter((r) => r.kind === "http409"); + const sseErrors = results.filter((r) => r.kind === "sseError"); + + // Anthropic rejects back-to-back prefill only if two runs actually stream; + // here we just verify nobody corrupts state. At least one request must + // have been rejected (otherwise the lock did nothing), and every rejected + // request must carry the stable run_in_progress code. + expect(winners.length).toBeGreaterThanOrEqual(1); + expect(http409.length + sseErrors.length).toBeGreaterThanOrEqual(1); + expect(winners.length + http409.length + sseErrors.length).toBe(5); + for (const r of http409) expect(r.error).toBe("run_in_progress"); + for (const r of sseErrors) expect(r.error).toBe("run_in_progress"); + }); +}); diff --git a/test/integration/runtime/concurrent-chat.test.ts b/test/integration/runtime/concurrent-chat.test.ts new file mode 100644 index 0000000..9003322 --- /dev/null +++ b/test/integration/runtime/concurrent-chat.test.ts @@ -0,0 +1,129 @@ +import { afterAll, describe, expect, it } from "bun:test"; +import { existsSync, mkdirSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { RunInProgressError } from "../../../src/runtime/errors.ts"; +import { Runtime } from "../../../src/runtime/runtime.ts"; +import { createEchoModel } from "../../helpers/echo-model.ts"; +import { TEST_WORKSPACE_ID, provisionTestWorkspace } from "../../helpers/test-workspace.ts"; + +const testDir = join(tmpdir(), `nimblebrain-concurrent-chat-${Date.now()}`); + +afterAll(() => { + if (existsSync(testDir)) rmSync(testDir, { recursive: true }); +}); + +describe("concurrent chat rejection", () => { + it("rejects a second chat on the same conversation while the first is in flight", async () => { + const workDir = join(testDir, "concurrent"); + mkdirSync(workDir, { recursive: true }); + + const runtime = await Runtime.start({ + model: { provider: "custom", adapter: createEchoModel() }, + noDefaultBundles: true, + workDir, + }); + await provisionTestWorkspace(runtime); + + // First call — seed a conversation so we have an id to contend on. + const first = await runtime.chat({ + message: "first", + workspaceId: TEST_WORKSPACE_ID, + }); + const convId = first.conversationId; + + // Start a second chat but do not await — the lock is acquired synchronously + // before the first internal await, so by the time control returns here the + // conversation is registered as active. + const inFlight = runtime.chat({ + message: "in-flight", + conversationId: convId, + workspaceId: TEST_WORKSPACE_ID, + }); + + expect(runtime.isConversationActive(convId)).toBe(true); + + // A concurrent call on the same conversation must reject cleanly. + await expect( + runtime.chat({ + message: "interrupting", + conversationId: convId, + workspaceId: TEST_WORKSPACE_ID, + }), + ).rejects.toBeInstanceOf(RunInProgressError); + + // Let the in-flight call finish; lock must release. + await inFlight; + expect(runtime.isConversationActive(convId)).toBe(false); + + // And a subsequent call on the same conversation must succeed normally. + const third = await runtime.chat({ + message: "after", + conversationId: convId, + workspaceId: TEST_WORKSPACE_ID, + }); + expect(third.conversationId).toBe(convId); + + await runtime.shutdown(); + }); + + it("does not block concurrent chats on different conversations", async () => { + const workDir = join(testDir, "disjoint"); + mkdirSync(workDir, { recursive: true }); + + const runtime = await Runtime.start({ + model: { provider: "custom", adapter: createEchoModel() }, + noDefaultBundles: true, + workDir, + }); + await provisionTestWorkspace(runtime); + + const a = await runtime.chat({ message: "a", workspaceId: TEST_WORKSPACE_ID }); + const b = await runtime.chat({ message: "b", workspaceId: TEST_WORKSPACE_ID }); + expect(a.conversationId).not.toBe(b.conversationId); + + // Parallel resumes on the two distinct conversations must both succeed. + const [ra, rb] = await Promise.all([ + runtime.chat({ + message: "a2", + conversationId: a.conversationId, + workspaceId: TEST_WORKSPACE_ID, + }), + runtime.chat({ + message: "b2", + conversationId: b.conversationId, + workspaceId: TEST_WORKSPACE_ID, + }), + ]); + expect(ra.conversationId).toBe(a.conversationId); + expect(rb.conversationId).toBe(b.conversationId); + + await runtime.shutdown(); + }); + + it("releases the lock when the first chat throws", async () => { + const workDir = join(testDir, "release-on-throw"); + mkdirSync(workDir, { recursive: true }); + + const runtime = await Runtime.start({ + model: { provider: "custom", adapter: createEchoModel() }, + noDefaultBundles: true, + workDir, + }); + await provisionTestWorkspace(runtime); + + const seed = await runtime.chat({ message: "seed", workspaceId: TEST_WORKSPACE_ID }); + const convId = seed.conversationId; + + // Missing workspaceId causes the inner body to throw — lock must still release. + await expect( + runtime.chat({ message: "bad", conversationId: convId } as unknown as Parameters< + typeof runtime.chat + >[0]), + ).rejects.toThrow(); + + expect(runtime.isConversationActive(convId)).toBe(false); + + await runtime.shutdown(); + }); +}); diff --git a/web/src/api/format-error.ts b/web/src/api/format-error.ts index 955545d..0cb0c5e 100644 --- a/web/src/api/format-error.ts +++ b/web/src/api/format-error.ts @@ -22,5 +22,8 @@ export function formatSendError(err: unknown): string { } return err.message; } + if (err instanceof ApiClientError && err.code === "run_in_progress") { + return "The assistant is still working on your previous message. Wait for it to finish, then try again."; + } return err instanceof Error ? err.message : "An unexpected error occurred"; } diff --git a/web/src/hooks/useChat.ts b/web/src/hooks/useChat.ts index 9ce0952..7bfa7f4 100644 --- a/web/src/hooks/useChat.ts +++ b/web/src/hooks/useChat.ts @@ -1,5 +1,5 @@ import { useCallback, useRef, useState } from "react"; -import { callTool, streamChat, streamChatMultipart } from "../api/client"; +import { ApiClientError, callTool, streamChat, streamChatMultipart } from "../api/client"; import { formatSendError } from "../api/format-error"; import { captureEvent } from "../telemetry"; import type { @@ -408,6 +408,16 @@ export function useChat(initialConversationId?: string, currentUserId?: string): has_app_context: !!appContext, }); } catch (err) { + if (err instanceof ApiClientError && err.code === "run_in_progress") { + // Server rejected because a previous run is still in flight. + // Drop the optimistic user+assistant placeholders so the failed + // message doesn't stick in history as if it had succeeded. + setMessages((prev) => prev.slice(0, -2)); + captureEvent("web.chat_run_in_progress", { + conversation_id: conversationId ?? null, + has_app_context: !!appContext, + }); + } const msg = formatSendError(err); setError(msg); } finally {