diff --git a/.changeset/fix-serverless-waituntil-cleanup.md b/.changeset/fix-serverless-waituntil-cleanup.md new file mode 100644 index 000000000..9b7e6527f --- /dev/null +++ b/.changeset/fix-serverless-waituntil-cleanup.md @@ -0,0 +1,9 @@ +--- +"@voltagent/serverless-hono": patch +--- + +fix(serverless-hono): defer waitUntil cleanup to prevent tool crashes in Cloudflare Workers + +The `finally` block in `toCloudflareWorker()`, `toVercelEdge()`, and `toDeno()` was calling `cleanup()` immediately when the Response was returned, before streaming and tool execution completed. This cleared the global `___voltagent_wait_until` while tools were still using it, causing crashes with time-consuming tools. + +Cleanup is now deferred through the platform's own `waitUntil()` so it runs only after all pending background work has settled. diff --git a/packages/serverless-hono/src/serverless-provider.ts b/packages/serverless-hono/src/serverless-provider.ts index 45eee1fcf..daf47242a 100644 --- a/packages/serverless-hono/src/serverless-provider.ts +++ b/packages/serverless-hono/src/serverless-provider.ts @@ -3,7 +3,69 @@ import type { Hono } from "hono"; import { createServerlessApp } from "./app-factory"; import type { ServerlessConfig, ServerlessRuntime } from "./types"; import { detectServerlessRuntime } from "./utils/runtime-detection"; -import { withWaitUntil } from "./utils/wait-until-wrapper"; +import { type WaitUntilContext, withWaitUntil } from "./utils/wait-until-wrapper"; + +type VoltAgentGlobal = typeof globalThis & { + ___voltagent_wait_until?: (promise: Promise) => void; +}; + +/** + * Defers the waitUntil cleanup so the global stays alive while streaming and + * tool execution are still in progress. + * + * We wrap the global `___voltagent_wait_until` with a tracking proxy that + * records every promise registered by tools and observability exporters. + * Cleanup only runs after **all** tracked promises settle, guaranteeing the + * global is available for the entire lifetime of the request. + * + * If the platform context has no `waitUntil` (non-serverless), we fall back + * to immediate cleanup. + */ +export function deferCleanup( + context: WaitUntilContext | null | undefined, + cleanup: () => void, +): void { + const waitUntil = context?.waitUntil; + if (!waitUntil || typeof waitUntil !== "function") { + cleanup(); + return; + } + + try { + const tracked: Promise[] = []; + const originalWaitUntil = waitUntil.bind(context); + const globals = globalThis as VoltAgentGlobal; + + // Replace the global with a tracking wrapper so every promise + // registered by tools / observability is captured. + const currentGlobal = globals.___voltagent_wait_until; + if (currentGlobal) { + globals.___voltagent_wait_until = (promise: Promise) => { + tracked.push(promise); + originalWaitUntil(promise); + }; + } + + // Schedule cleanup to run only after every tracked promise settles. + const cleanupWhenDone = Promise.resolve().then(async () => { + // Wait in a loop — new promises may be registered while we wait. + let settled = 0; + while (settled < tracked.length) { + const batch = tracked.slice(settled); + await Promise.allSettled(batch); + settled += batch.length; + } + cleanup(); + }); + + originalWaitUntil(cleanupWhenDone); + } catch { + // waitUntil can throw after the response is committed on some + // platforms — fall through to synchronous cleanup. + cleanup(); + } +} + export class HonoServerlessProvider implements IServerlessProvider { private readonly deps: ServerProviderDeps; private readonly config?: ServerlessConfig; @@ -42,14 +104,14 @@ export class HonoServerlessProvider implements IServerlessProvider { env: Record, executionCtx: unknown, ): Promise => { - const cleanup = withWaitUntil(executionCtx as any); + const cleanup = withWaitUntil(executionCtx as WaitUntilContext | undefined); try { await this.ensureEnvironmentTarget(env); const app = await this.getApp(); return await app.fetch(request, env as Record, executionCtx as any); } finally { - cleanup(); + deferCleanup(executionCtx as WaitUntilContext | undefined, cleanup); } }, }; @@ -57,28 +119,28 @@ export class HonoServerlessProvider implements IServerlessProvider { toVercelEdge(): (request: Request, context?: unknown) => Promise { return async (request: Request, context?: unknown) => { - const cleanup = withWaitUntil(context as any); + const cleanup = withWaitUntil(context as WaitUntilContext | undefined); try { await this.ensureEnvironmentTarget(context as Record | undefined); const app = await this.getApp(); return await app.fetch(request, context as Record | undefined); } finally { - cleanup(); + deferCleanup(context as WaitUntilContext | undefined, cleanup); } }; } toDeno(): (request: Request, info?: unknown) => Promise { return async (request: Request, info?: unknown) => { - const cleanup = withWaitUntil(info as any); + const cleanup = withWaitUntil(info as WaitUntilContext | undefined); try { await this.ensureEnvironmentTarget(info as Record | undefined); const app = await this.getApp(); return await app.fetch(request, info as Record | undefined); } finally { - cleanup(); + deferCleanup(info as WaitUntilContext | undefined, cleanup); } }; } diff --git a/packages/serverless-hono/src/utils/defer-cleanup.spec.ts b/packages/serverless-hono/src/utils/defer-cleanup.spec.ts new file mode 100644 index 000000000..c892db541 --- /dev/null +++ b/packages/serverless-hono/src/utils/defer-cleanup.spec.ts @@ -0,0 +1,158 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { deferCleanup } from "../serverless-provider"; +import type { WaitUntilContext } from "./wait-until-wrapper"; + +type VoltAgentGlobal = typeof globalThis & { + ___voltagent_wait_until?: (promise: Promise) => void; +}; + +describe("deferCleanup", () => { + let originalWaitUntil: ((promise: Promise) => void) | undefined; + + beforeEach(() => { + const globals = globalThis as VoltAgentGlobal; + originalWaitUntil = globals.___voltagent_wait_until; + }); + + afterEach(() => { + const globals = globalThis as VoltAgentGlobal; + globals.___voltagent_wait_until = originalWaitUntil; + }); + + it("should defer cleanup until all tracked promises settle", async () => { + const cleanup = vi.fn(); + const registeredPromises: Promise[] = []; + const context: WaitUntilContext = { + waitUntil: vi.fn((p: Promise) => { + registeredPromises.push(p); + }), + }; + + // Set up a global so the tracking wrapper can intercept + const globals = globalThis as VoltAgentGlobal; + globals.___voltagent_wait_until = context.waitUntil?.bind(context); + + // Simulate a background tool promise that takes time + let resolveToolPromise!: () => void; + const toolPromise = new Promise((r) => { + resolveToolPromise = r; + }); + + deferCleanup(context, cleanup); + + // Simulate tool registering via the global after deferCleanup + globals.___voltagent_wait_until?.(toolPromise); + + // cleanup should NOT have run yet — tool is still pending + await Promise.resolve(); // flush microtasks + expect(cleanup).not.toHaveBeenCalled(); + + // Now resolve the tool promise + resolveToolPromise(); + await Promise.allSettled(registeredPromises); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it("should fall back to synchronous cleanup when context is null", () => { + const cleanup = vi.fn(); + + deferCleanup(null, cleanup); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it("should fall back to synchronous cleanup when context is undefined", () => { + const cleanup = vi.fn(); + + deferCleanup(undefined, cleanup); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it("should fall back to synchronous cleanup when context has no waitUntil", () => { + const cleanup = vi.fn(); + + deferCleanup({} as unknown as WaitUntilContext, cleanup); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it("should fall back to synchronous cleanup when waitUntil throws", () => { + const cleanup = vi.fn(); + const context: WaitUntilContext = { + waitUntil: vi.fn(() => { + throw new Error("Cannot call waitUntil after response committed"); + }), + }; + + deferCleanup(context, cleanup); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it("should not throw when waitUntil throws", () => { + const cleanup = vi.fn(); + const context: WaitUntilContext = { + waitUntil: vi.fn(() => { + throw new Error("platform error"); + }), + }; + + expect(() => deferCleanup(context, cleanup)).not.toThrow(); + }); + + it("should handle context with non-function waitUntil", () => { + const cleanup = vi.fn(); + const context = { waitUntil: "not a function" } as unknown as WaitUntilContext; + + deferCleanup(context, cleanup); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it("should handle late-registered promises", async () => { + const cleanup = vi.fn(); + const registeredPromises: Promise[] = []; + const context: WaitUntilContext = { + waitUntil: vi.fn((p: Promise) => { + registeredPromises.push(p); + }), + }; + + const globals = globalThis as VoltAgentGlobal; + globals.___voltagent_wait_until = context.waitUntil?.bind(context); + + let resolveFirst!: () => void; + const firstPromise = new Promise((r) => { + resolveFirst = r; + }); + + let resolveSecond!: () => void; + const secondPromise = new Promise((r) => { + resolveSecond = r; + }); + + deferCleanup(context, cleanup); + + // Register first background task + globals.___voltagent_wait_until?.(firstPromise); + + // Resolve first — but second hasn't been registered yet + resolveFirst(); + await Promise.resolve(); + await Promise.resolve(); + + // Register second task AFTER first settled (late registration) + globals.___voltagent_wait_until?.(secondPromise); + + // cleanup should still NOT have run + expect(cleanup).not.toHaveBeenCalled(); + + // Now resolve second + resolveSecond(); + await Promise.allSettled(registeredPromises); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); +});