Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions lib/chat/__tests__/setupToolsForRequest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ vi.mock("@/lib/composio/toolRouter", () => ({
getComposioTools: vi.fn(),
}));

vi.mock("@/lib/chat/tools/createPromptSandboxStreamingTool", () => ({
createPromptSandboxStreamingTool: vi.fn(() => ({
description: "Mock streaming sandbox tool",
parameters: {},
})),
}));

// Import after mocks
import { setupToolsForRequest } from "../setupToolsForRequest";
import { getMcpTools } from "@/lib/mcp/getMcpTools";
Expand Down Expand Up @@ -293,4 +300,50 @@ describe("setupToolsForRequest", () => {
expect(mockGetComposioTools).toHaveBeenCalledTimes(1);
});
});

describe("local streaming tool override", () => {
it("includes prompt_sandbox when authToken is provided", async () => {
const body: ChatRequestBody = {
accountId: "account-123",
orgId: null,
authToken: "test-token-123",
messages: [{ id: "1", role: "user", content: "Hello" }],
};

const result = await setupToolsForRequest(body);

expect(result).toHaveProperty("prompt_sandbox");
});

it("overrides MCP prompt_sandbox with local streaming version", async () => {
mockGetMcpTools.mockResolvedValue({
prompt_sandbox: { description: "MCP version", parameters: {} },
});

const body: ChatRequestBody = {
accountId: "account-123",
orgId: null,
authToken: "test-token-123",
messages: [{ id: "1", role: "user", content: "Hello" }],
};

const result = await setupToolsForRequest(body);

expect(result.prompt_sandbox).toEqual(
expect.objectContaining({ description: "Mock streaming sandbox tool" }),
);
});

it("does not include prompt_sandbox when authToken is not provided", async () => {
const body: ChatRequestBody = {
accountId: "account-123",
orgId: null,
messages: [{ id: "1", role: "user", content: "Hello" }],
};

const result = await setupToolsForRequest(body);

expect(result).not.toHaveProperty("prompt_sandbox");
});
});
});
12 changes: 11 additions & 1 deletion lib/chat/setupToolsForRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ import { filterExcludedTools } from "./filterExcludedTools";
import { ChatRequestBody } from "./validateChatRequest";
import { getMcpTools } from "@/lib/mcp/getMcpTools";
import { getComposioTools } from "@/lib/composio/toolRouter";
import { createPromptSandboxStreamingTool } from "@/lib/chat/tools/createPromptSandboxStreamingTool";

/**
* Sets up and filters tools for a chat request.
* Aggregates tools from:
* - MCP server (via HTTP transport to /api/mcp for proper auth)
* - Composio Tool Router (Google Sheets, Google Drive, Google Docs, TikTok)
* - Local streaming tools (override MCP versions for real-time output)
*
* @param body - The chat request body
* @returns Filtered tool set ready for use
Expand All @@ -22,10 +24,18 @@ export async function setupToolsForRequest(body: ChatRequestBody): Promise<ToolS
getComposioTools(accountId, artistId, roomId),
]);

// Merge all tools
// Local streaming tools override MCP versions for real-time output
const localStreamingTools: ToolSet = {};
if (authToken) {
localStreamingTools.prompt_sandbox =
createPromptSandboxStreamingTool(accountId, authToken);
}

// Merge all tools — local streaming tools spread last to override MCP
const allTools: ToolSet = {
...mcpTools,
...composioTools,
...localStreamingTools,
};

const tools = filterExcludedTools(allTools, excludeTools);
Expand Down
151 changes: 151 additions & 0 deletions lib/chat/tools/__tests__/createPromptSandboxStreamingTool.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import { describe, it, expect, vi, beforeEach } from "vitest";

import { createPromptSandboxStreamingTool } from "../createPromptSandboxStreamingTool";

const mockPromptSandboxStreaming = vi.fn();

vi.mock("@/lib/sandbox/promptSandboxStreaming", () => ({
promptSandboxStreaming: (...args: unknown[]) =>
mockPromptSandboxStreaming(...args),
}));

// Helper to drain an async iterable into yields + return value
async function drainGenerator(iterable: AsyncIterable<unknown>) {
const yields: unknown[] = [];
for await (const value of iterable) {
yields.push(value);
}
return yields;
}

describe("createPromptSandboxStreamingTool", () => {
beforeEach(() => {
vi.clearAllMocks();
});

it("yields booting → streaming → complete statuses in order", async () => {
const finalResult = {
sandboxId: "sbx_123",
stdout: "Hello world",
stderr: "",
exitCode: 0,
created: false,
};

async function* fakeStreaming() {
yield { data: "Hello ", stream: "stdout" as const };
yield { data: "world", stream: "stdout" as const };
return finalResult;
}

mockPromptSandboxStreaming.mockReturnValue(fakeStreaming());

const tool = createPromptSandboxStreamingTool("acc_123", "api-key-123");
const iterable = tool.execute!({ prompt: "say hello" }, {
abortSignal: new AbortController().signal,
toolCallId: "tc_1",
messages: [],
}) as AsyncIterable<unknown>;

const yields = await drainGenerator(iterable);

// First yield: booting
expect(yields[0]).toEqual({
status: "booting",
output: "",
});

// Middle yields: streaming with accumulated stdout
expect(yields[1]).toEqual({
status: "streaming",
output: "Hello ",
});
expect(yields[2]).toEqual({
status: "streaming",
output: "Hello world",
});

// Last yield: complete
expect(yields[3]).toEqual({
status: "complete",
output: "Hello world",
stderr: "",
exitCode: 0,
});
});

it("passes accountId, apiKey, and prompt to promptSandboxStreaming", async () => {
async function* fakeStreaming() {
return {
sandboxId: "sbx_123",
stdout: "",
stderr: "",
exitCode: 0,
created: false,
};
}

mockPromptSandboxStreaming.mockReturnValue(fakeStreaming());

const tool = createPromptSandboxStreamingTool("acc_456", "key_789");
const iterable = tool.execute!({ prompt: "do stuff" }, {
abortSignal: new AbortController().signal,
toolCallId: "tc_2",
messages: [],
}) as AsyncIterable<unknown>;

await drainGenerator(iterable);

expect(mockPromptSandboxStreaming).toHaveBeenCalledWith({
accountId: "acc_456",
apiKey: "key_789",
prompt: "do stuff",
abortSignal: expect.any(AbortSignal),
});
});

it("yields only stderr chunks in streaming status", async () => {
async function* fakeStreaming() {
yield { data: "warning!", stream: "stderr" as const };
return {
sandboxId: "sbx_123",
stdout: "",
stderr: "warning!",
exitCode: 1,
created: false,
};
}

mockPromptSandboxStreaming.mockReturnValue(fakeStreaming());

const tool = createPromptSandboxStreamingTool("acc_1", "key_1");
const iterable = tool.execute!({ prompt: "fail" }, {
abortSignal: new AbortController().signal,
toolCallId: "tc_3",
messages: [],
}) as AsyncIterable<unknown>;

const yields = await drainGenerator(iterable);

// booting
expect(yields[0]).toEqual({ status: "booting", output: "" });

// streaming — stderr doesn't change output (only stdout does)
expect(yields[1]).toEqual({ status: "streaming", output: "" });

// complete — stderr is included
expect(yields[2]).toEqual({
status: "complete",
output: "",
stderr: "warning!",
exitCode: 1,
});
});

it("has the correct tool description and input schema", () => {
const tool = createPromptSandboxStreamingTool("acc_1", "key_1");

expect(tool.description).toContain("sandbox");
expect(tool.inputSchema).toBeDefined();
});
});
88 changes: 88 additions & 0 deletions lib/chat/tools/createPromptSandboxStreamingTool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { z } from "zod";
import type { Tool } from "ai";
import { promptSandboxStreaming } from "@/lib/sandbox/promptSandboxStreaming";

const promptSandboxSchema = z.object({
prompt: z
.string()
.min(1)
.describe("The prompt to send to OpenClaw running in the sandbox."),
});

interface SandboxStreamProgress {
status: "booting" | "streaming" | "complete";
output: string;
stderr?: string;
exitCode?: number;
}

interface PromptSandboxFinalResult {
sandboxId: string;
stdout: string;
stderr: string;
exitCode: number;
created: boolean;
}

/**
* Creates a local AI SDK generator tool that streams sandbox output to the UI.
* Overrides the MCP prompt_sandbox tool with real-time streaming support.
*
* @param accountId - The account ID for sandbox lookup
* @param apiKey - The API key passed as RECOUP_API_KEY to the sandbox
* @returns An AI SDK tool with generator-based execute function
*/
export function createPromptSandboxStreamingTool(
accountId: string,
apiKey: string,
): Tool<z.infer<typeof promptSandboxSchema>, SandboxStreamProgress> {
return {
description:
"Send a prompt to OpenClaw running in a persistent sandbox. " +
"Reuses the account's existing running sandbox or creates one from the latest snapshot. " +
"Streams output in real-time. The sandbox stays alive for follow-up prompts.",
inputSchema: promptSandboxSchema,
execute: async function* ({ prompt }, { abortSignal }) {
yield { status: "booting" as const, output: "" };

const gen = promptSandboxStreaming({
accountId,
apiKey,
prompt,
abortSignal,
});

let stdout = "";
let finalResult: PromptSandboxFinalResult | undefined;

while (true) {
const iterResult = await gen.next();

if (iterResult.done) {
finalResult = iterResult.value as PromptSandboxFinalResult;
break;
}

const chunk = iterResult.value as {
data: string;
stream: "stdout" | "stderr";
};

if (chunk.stream === "stdout") {
stdout += chunk.data;
}

yield { status: "streaming" as const, output: stdout };
}

yield {
status: "complete" as const,
output: finalResult!.stdout,
stderr: finalResult!.stderr,
exitCode: finalResult!.exitCode,
};

return finalResult as never;
},
};
}
Loading