Skip to content

eval: actions http_streaming #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions evals/004-actions/008-http_streaming/GAPS.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
actions, http_streaming:
25 changes: 25 additions & 0 deletions evals/004-actions/008-http_streaming/TASK.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Create a backend that implements streaming HTTP responses.

Implement this function in `convex/http.ts`:

1. Create an HTTP action POST at `/stream` that:
- Takes a request body multi-line text input
- For each line from the request body:
- Calculate the length of that line of text
- Send back a streaming response message with the length
- The response format for each line should be:
```
data: {"lineLength": <number>}\n\n
```
- The endpoint should be accessible at `/stream`
- Each line in the request should trigger one SSE message
- The stream should properly close when the request ends
- Use `string` for the type of output, adding type parameters for type safety.

Create only the `convex/http.ts` and `package.json` files. Do not generate any other files.
No schema is required since this demo doesn't use the database.

Also create a query `getSiteURL` that takes no arguments and returns `process.env.CONVEX_SITE_URL!`.
This will require the @types/node npm dev dependency.

Do not export any functions from `convex/http.ts` other than the streaming HTTP handler and the query.
68 changes: 68 additions & 0 deletions evals/004-actions/008-http_streaming/answer/bun.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"lockfileVersion": 1,
"workspaces": {
"": {
"name": "convexbot",
"dependencies": {
"convex": "^1.17.4",
},
},
},
"packages": {
"@esbuild/aix-ppc64": ["@esbuild/[email protected]", "", { "os": "aix", "cpu": "ppc64" }, "sha512-3sG8Zwa5fMcA9bgqB8AfWPQ+HFke6uD3h1s3RIwUNK8EG7a4buxvuFTs3j1IMs2NXAk9F30C/FF4vxRgQCcmoQ=="],

"@esbuild/android-arm": ["@esbuild/[email protected]", "", { "os": "android", "cpu": "arm" }, "sha512-+KuOHTKKyIKgEEqKbGTK8W7mPp+hKinbMBeEnNzjJGyFcWsfrXjSTNluJHCY1RqhxFurdD8uNXQDei7qDlR6+g=="],

"@esbuild/android-arm64": ["@esbuild/[email protected]", "", { "os": "android", "cpu": "arm64" }, "sha512-EuHFUYkAVfU4qBdyivULuu03FhJO4IJN9PGuABGrFy4vUuzk91P2d+npxHcFdpUnfYKy0PuV+n6bKIpHOB3prQ=="],

"@esbuild/android-x64": ["@esbuild/[email protected]", "", { "os": "android", "cpu": "x64" }, "sha512-WRrmKidLoKDl56LsbBMhzTTBxrsVwTKdNbKDalbEZr0tcsBgCLbEtoNthOW6PX942YiYq8HzEnb4yWQMLQuipQ=="],

"@esbuild/darwin-arm64": ["@esbuild/[email protected]", "", { "os": "darwin", "cpu": "arm64" }, "sha512-YLntie/IdS31H54Ogdn+v50NuoWF5BDkEUFpiOChVa9UnKpftgwzZRrI4J132ETIi+D8n6xh9IviFV3eXdxfow=="],

"@esbuild/darwin-x64": ["@esbuild/[email protected]", "", { "os": "darwin", "cpu": "x64" }, "sha512-IMQ6eme4AfznElesHUPDZ+teuGwoRmVuuixu7sv92ZkdQcPbsNHzutd+rAfaBKo8YK3IrBEi9SLLKWJdEvJniQ=="],

"@esbuild/freebsd-arm64": ["@esbuild/[email protected]", "", { "os": "freebsd", "cpu": "arm64" }, "sha512-0muYWCng5vqaxobq6LB3YNtevDFSAZGlgtLoAc81PjUfiFz36n4KMpwhtAd4he8ToSI3TGyuhyx5xmiWNYZFyw=="],

"@esbuild/freebsd-x64": ["@esbuild/[email protected]", "", { "os": "freebsd", "cpu": "x64" }, "sha512-XKDVu8IsD0/q3foBzsXGt/KjD/yTKBCIwOHE1XwiXmrRwrX6Hbnd5Eqn/WvDekddK21tfszBSrE/WMaZh+1buQ=="],

"@esbuild/linux-arm": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "arm" }, "sha512-SEELSTEtOFu5LPykzA395Mc+54RMg1EUgXP+iw2SJ72+ooMwVsgfuwXo5Fn0wXNgWZsTVHwY2cg4Vi/bOD88qw=="],

"@esbuild/linux-arm64": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "arm64" }, "sha512-j1t5iG8jE7BhonbsEg5d9qOYcVZv/Rv6tghaXM/Ug9xahM0nX/H2gfu6X6z11QRTMT6+aywOMA8TDkhPo8aCGw=="],

"@esbuild/linux-ia32": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "ia32" }, "sha512-P7O5Tkh2NbgIm2R6x1zGJJsnacDzTFcRWZyTTMgFdVit6E98LTxO+v8LCCLWRvPrjdzXHx9FEOA8oAZPyApWUA=="],

"@esbuild/linux-loong64": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "none" }, "sha512-InQwepswq6urikQiIC/kkx412fqUZudBO4SYKu0N+tGhXRWUqAx+Q+341tFV6QdBifpjYgUndV1hhMq3WeJi7A=="],

"@esbuild/linux-mips64el": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "none" }, "sha512-J9rflLtqdYrxHv2FqXE2i1ELgNjT+JFURt/uDMoPQLcjWQA5wDKgQA4t/dTqGa88ZVECKaD0TctwsUfHbVoi4w=="],

"@esbuild/linux-ppc64": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "ppc64" }, "sha512-cShCXtEOVc5GxU0fM+dsFD10qZ5UpcQ8AM22bYj0u/yaAykWnqXJDpd77ublcX6vdDsWLuweeuSNZk4yUxZwtw=="],

"@esbuild/linux-riscv64": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "none" }, "sha512-HEtaN7Y5UB4tZPeQmgz/UhzoEyYftbMXrBCUjINGjh3uil+rB/QzzpMshz3cNUxqXN7Vr93zzVtpIDL99t9aRw=="],

"@esbuild/linux-s390x": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "s390x" }, "sha512-WDi3+NVAuyjg/Wxi+o5KPqRbZY0QhI9TjrEEm+8dmpY9Xir8+HE/HNx2JoLckhKbFopW0RdO2D72w8trZOV+Wg=="],

"@esbuild/linux-x64": ["@esbuild/[email protected]", "", { "os": "linux", "cpu": "x64" }, "sha512-a3pMQhUEJkITgAw6e0bWA+F+vFtCciMjW/LPtoj99MhVt+Mfb6bbL9hu2wmTZgNd994qTAEw+U/r6k3qHWWaOQ=="],

"@esbuild/netbsd-x64": ["@esbuild/[email protected]", "", { "os": "none", "cpu": "x64" }, "sha512-cRK+YDem7lFTs2Q5nEv/HHc4LnrfBCbH5+JHu6wm2eP+d8OZNoSMYgPZJq78vqQ9g+9+nMuIsAO7skzphRXHyw=="],

"@esbuild/openbsd-arm64": ["@esbuild/[email protected]", "", { "os": "openbsd", "cpu": "arm64" }, "sha512-suXjq53gERueVWu0OKxzWqk7NxiUWSUlrxoZK7usiF50C6ipColGR5qie2496iKGYNLhDZkPxBI3erbnYkU0rQ=="],

"@esbuild/openbsd-x64": ["@esbuild/[email protected]", "", { "os": "openbsd", "cpu": "x64" }, "sha512-6p3nHpby0DM/v15IFKMjAaayFhqnXV52aEmv1whZHX56pdkK+MEaLoQWj+H42ssFarP1PcomVhbsR4pkz09qBg=="],

"@esbuild/sunos-x64": ["@esbuild/[email protected]", "", { "os": "sunos", "cpu": "x64" }, "sha512-BFelBGfrBwk6LVrmFzCq1u1dZbG4zy/Kp93w2+y83Q5UGYF1d8sCzeLI9NXjKyujjBBniQa8R8PzLFAUrSM9OA=="],

"@esbuild/win32-arm64": ["@esbuild/[email protected]", "", { "os": "win32", "cpu": "arm64" }, "sha512-lY6AC8p4Cnb7xYHuIxQ6iYPe6MfO2CC43XXKo9nBXDb35krYt7KGhQnOkRGar5psxYkircpCqfbNDB4uJbS2jQ=="],

"@esbuild/win32-ia32": ["@esbuild/[email protected]", "", { "os": "win32", "cpu": "ia32" }, "sha512-7L1bHlOTcO4ByvI7OXVI5pNN6HSu6pUQq9yodga8izeuB1KcT2UkHaH6118QJwopExPn0rMHIseCTx1CRo/uNA=="],

"@esbuild/win32-x64": ["@esbuild/[email protected]", "", { "os": "win32", "cpu": "x64" }, "sha512-Arm+WgUFLUATuoxCJcahGuk6Yj9Pzxd6l11Zb/2aAuv5kWWvvfhLFo2fni4uSK5vzlUdCGZ/BdV5tH8klj8p8g=="],

"convex": ["[email protected]", "", { "dependencies": { "esbuild": "0.23.0", "jwt-decode": "^3.1.2", "prettier": "3.4.2" }, "peerDependencies": { "@auth0/auth0-react": "^2.0.1", "@clerk/clerk-react": "^4.12.8 || ^5.0.0", "react": "^17.0.2 || ^18.0.0 || ^19.0.0-0 || ^19.0.0", "react-dom": "^17.0.2 || ^18.0.0 || ^19.0.0-0 || ^19.0.0" }, "optionalPeers": ["@auth0/auth0-react", "@clerk/clerk-react", "react", "react-dom"], "bin": { "convex": "bin/main.js" } }, "sha512-a9VVy9Ss7Z5Twt80t0yekyB4CWUWB3EpH5aru5MxqD8EX2QLnu9lNtsTc+eKM1V1CTVvC3LuYaBWBdcvFqMsYQ=="],

"esbuild": ["[email protected]", "", { "optionalDependencies": { "@esbuild/aix-ppc64": "0.23.0", "@esbuild/android-arm": "0.23.0", "@esbuild/android-arm64": "0.23.0", "@esbuild/android-x64": "0.23.0", "@esbuild/darwin-arm64": "0.23.0", "@esbuild/darwin-x64": "0.23.0", "@esbuild/freebsd-arm64": "0.23.0", "@esbuild/freebsd-x64": "0.23.0", "@esbuild/linux-arm": "0.23.0", "@esbuild/linux-arm64": "0.23.0", "@esbuild/linux-ia32": "0.23.0", "@esbuild/linux-loong64": "0.23.0", "@esbuild/linux-mips64el": "0.23.0", "@esbuild/linux-ppc64": "0.23.0", "@esbuild/linux-riscv64": "0.23.0", "@esbuild/linux-s390x": "0.23.0", "@esbuild/linux-x64": "0.23.0", "@esbuild/netbsd-x64": "0.23.0", "@esbuild/openbsd-arm64": "0.23.0", "@esbuild/openbsd-x64": "0.23.0", "@esbuild/sunos-x64": "0.23.0", "@esbuild/win32-arm64": "0.23.0", "@esbuild/win32-ia32": "0.23.0", "@esbuild/win32-x64": "0.23.0" }, "bin": { "esbuild": "bin/esbuild" } }, "sha512-1lvV17H2bMYda/WaFb2jLPeHU3zml2k4/yagNMG8Q/YtfMjCwEUZa2eXXMgZTVSL5q1n4H7sQ0X6CdJDqqeCFA=="],

"jwt-decode": ["[email protected]", "", {}, "sha512-UfpWE/VZn0iP50d8cz9NrZLM9lSWhcJ+0Gt/nm4by88UL+J1SiKN8/5dkjMmbEzwL2CAe+67GsegCbIKtbp75A=="],

"prettier": ["[email protected]", "", { "bin": { "prettier": "bin/prettier.cjs" } }, "sha512-e9MewbtFo+Fevyuxn/4rrcDAaq0IYxPGLvObpQjiZBMAzB9IGmzlnG9RZy3FFas+eBMu2vA0CszMeduow5dIuQ=="],
}
}
69 changes: 69 additions & 0 deletions evals/004-actions/008-http_streaming/answer/convex/http.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { httpRouter } from "convex/server";
import { httpAction, query } from "./_generated/server";
import { v } from "convex/values";

const http = httpRouter();

http.route({
path: "/stream",
method: "POST",
handler: httpAction(async (ctx, request) => {
// Create a TransformStream to process the input line by line
const transform = new TransformStream<string, string>({
transform(chunk, controller) {
// Calculate line length and format as SSE message
const lineLength = chunk.length;
const message = `data: {"lineLength": ${lineLength}}\n\n`;
controller.enqueue(message);
},
});

// Create a TextDecoderStream to convert the input bytes to text
const textDecoder = new TextDecoderStream();
// Create a line break transform stream
const lineBreaker = new TransformStream<string, string>({
transform(chunk, controller) {
// Split the chunk by newlines and enqueue each line
const lines = chunk.split("\n");
for (const line of lines) {
if (line) controller.enqueue(line);
}
},
flush(controller) {
// Ensure any remaining data is processed
controller.terminate();
}
});

try {
// Chain the streams together with error handling
const responseStream = request.body!
.pipeThrough(textDecoder)
.pipeThrough(lineBreaker)
.pipeThrough(transform);

// Return the streaming response with explicit error handling
return new Response(responseStream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
});
} catch (error) {
// Handle any streaming errors
console.error('Streaming error:', error);
return new Response("Internal Server Error", { status: 500 });
}
}),
});

export const getSiteURL = query({
args: {},
returns: v.string(),
handler: async (ctx) => {
return process.env.CONVEX_SITE_URL!;
},
});

export default http;
14 changes: 14 additions & 0 deletions evals/004-actions/008-http_streaming/answer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"name": "streaming-demo",
"private": true,
"version": "0.0.0",
"scripts": {
"dev": "convex dev"
},
"dependencies": {
"convex": "^1.17.4"
},
"devDependencies": {
"@types/node": "^18.11.18"
}
}
159 changes: 159 additions & 0 deletions evals/004-actions/008-http_streaming/grader.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import { expect, test } from "vitest";
import {
compareFunctionSpec,
responseAdminClient,
} from "../../../grader";
import { api } from "./answer/convex/_generated/api";

test("compare function spec", async ({ skip }) => {
await compareFunctionSpec(skip);
});

async function getStreamURL(): Promise<string> {
const baseUrl = await responseAdminClient.query(api.http.getSiteURL, {});
return `${baseUrl}/stream`;
}

async function collectStreamMessages(response: Response): Promise<string[]> {
const reader = response.body!.getReader();
const decoder = new TextDecoder();
const messages: string[] = [];

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split('\n\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
messages.push(line.slice(6));
}
}
}

return messages;
}

test("streams line lengths correctly", async () => {
const input = "hello\nworld\ntest";
const url = await getStreamURL();

const response = await fetch(url, {
method: "POST",
body: input,
});

expect(response.status).toBe(200);
expect(response.headers.get("Content-Type")).toBe("text/event-stream");

const messages = await collectStreamMessages(response);
const expected = [
'{"lineLength": 5}',
'{"lineLength": 5}',
'{"lineLength": 4}',
];

expect(messages).toEqual(expected);
});

test("handles empty lines", async () => {
const input = "first\n\nlast";
const url = await getStreamURL();

const response = await fetch(url, {
method: "POST",
body: input,
});

const messages = await collectStreamMessages(response);
const expected = [
'{"lineLength": 5}',
'{"lineLength": 4}',
];

expect(messages).toEqual(expected);
});

test("handles single line input", async () => {
const input = "single line test";
const url = await getStreamURL();

const response = await fetch(url, {
method: "POST",
body: input,
});

const messages = await collectStreamMessages(response);
expect(messages).toEqual(['{"lineLength": 15}']);
});

test("handles large input", async () => {
const lines = Array.from({ length: 100 }, (_, i) => `Line ${i}`.repeat(10));
const input = lines.join('\n');
const url = await getStreamURL();

const response = await fetch(url, {
method: "POST",
body: input,
});

const messages = await collectStreamMessages(response);
expect(messages).toHaveLength(100);

// Verify each message format
for (const message of messages) {
expect(message).toMatch(/^{"lineLength": \d+}$/);
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const parsed = JSON.parse(message);
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
expect(parsed.lineLength).toBeTypeOf("number");
}
});

test("rejects non-POST requests", async () => {
const url = await getStreamURL();
const methods = ["GET", "PUT", "DELETE", "PATCH"];

for (const method of methods) {
const response = await fetch(url, { method });
expect(response.status).toBe(404);
}
});

test("handles special characters", async () => {
const input = "special chars: !@#$%^&*()\némoji: 👋🌍\nunicode: 你好";
const url = await getStreamURL();

const response = await fetch(url, {
method: "POST",
body: input,
});

const messages = await collectStreamMessages(response);
const expected = [
'{"lineLength": 24}',
'{"lineLength": 11}',
'{"lineLength": 11}',
];

expect(messages).toEqual(expected);
});

test("handles very long lines", async () => {
const longLine = "x".repeat(10000);
const input = `short line\n${longLine}\nfinal line`;
const url = await getStreamURL();

const response = await fetch(url, {
method: "POST",
body: input,
});

const messages = await collectStreamMessages(response);
expect(messages).toHaveLength(3);
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const parsedMiddle = JSON.parse(messages[1]);
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
expect(parsedMiddle.lineLength).toBe(10000);
});