From b96f19ecbe7856ec1e057f3a5e367b4db6f27c95 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Fri, 20 Feb 2026 19:22:21 -0800 Subject: [PATCH] chore: impl canInvoke for matchmaker --- .../frontend/games/arena/arena-game.ts | 13 +- .../frontend/games/arena/bot.ts | 13 +- .../frontend/games/arena/menu.tsx | 29 +- .../games/battle-royale/battle-royale-game.ts | 13 +- .../frontend/games/io-style/io-game.ts | 12 +- .../games/open-world/open-world-game.ts | 15 +- .../games/physics-2d/physics-2d-game.ts | 18 +- .../games/physics-3d/physics-3d-game.ts | 13 +- .../frontend/games/ranked/bot.ts | 19 +- .../frontend/games/ranked/menu.tsx | 35 +- .../frontend/games/ranked/ranked-game.ts | 15 +- .../src/actors/arena/match.ts | 32 +- .../src/actors/arena/matchmaker.ts | 154 +++- .../src/actors/battle-royale/match.ts | 47 +- .../src/actors/battle-royale/matchmaker.ts | 24 +- .../src/actors/idle/leaderboard.ts | 23 +- .../src/actors/idle/world.ts | 35 +- .../src/actors/io-style/match.ts | 40 +- .../src/actors/io-style/matchmaker.ts | 24 +- .../src/actors/open-world/chunk.ts | 51 +- .../src/actors/open-world/world-index.ts | 18 +- .../src/actors/party/match.ts | 34 +- .../src/actors/party/matchmaker.ts | 22 +- .../src/actors/physics-2d/world.ts | 26 +- .../src/actors/physics-3d/world.ts | 26 +- .../src/actors/ranked/leaderboard.ts | 23 +- .../src/actors/ranked/match.ts | 69 +- .../src/actors/ranked/matchmaker.ts | 197 ++++- .../src/actors/ranked/player.ts | 28 +- .../src/actors/turn-based/match.ts | 30 +- .../src/actors/turn-based/matchmaker.ts | 24 +- .../multiplayer-game-patterns/src/auth.ts | 10 + .../matchmaking-and-session-patterns.test.ts | 256 ++++++- .../src/content/cookbook/multiplayer-game.mdx | 21 +- website/src/content/docs/actors/workflows.mdx | 678 ++++++++++++++++++ website/src/sitemap/mod.ts | 5 + 36 files changed, 1876 insertions(+), 216 deletions(-) create mode 100644 website/src/content/docs/actors/workflows.mdx diff --git a/examples/multiplayer-game-patterns/frontend/games/arena/arena-game.ts b/examples/multiplayer-game-patterns/frontend/games/arena/arena-game.ts index f4a1559167..5cb98d62c0 100644 --- a/examples/multiplayer-game-patterns/frontend/games/arena/arena-game.ts +++ b/examples/multiplayer-game-patterns/frontend/games/arena/arena-game.ts @@ -29,6 +29,10 @@ interface ShotLine { createdAt: number; } +type ArenaMatchConn = ReturnType< + ReturnType["connect"] +>; + export class ArenaGame { private stopped = false; private rafId = 0; @@ -46,12 +50,7 @@ export class ArenaGame { private localY = 300; private lastFrameTime = 0; private botInterval = 0; - private conn: { - updatePosition: (i: { x: number; y: number }) => Promise; - shoot: (i: { dirX: number; dirY: number }) => Promise; - on: (e: string, cb: (d: unknown) => void) => void; - dispose: () => Promise; - }; + private conn: ArenaMatchConn; constructor( private canvas: HTMLCanvasElement | null, @@ -63,7 +62,7 @@ export class ArenaGame { .get([matchInfo.matchId], { params: { playerToken: matchInfo.playerToken }, }) - .connect() as typeof this.conn; + .connect(); this.conn.on("snapshot", (raw: unknown) => { const snap = raw as { diff --git a/examples/multiplayer-game-patterns/frontend/games/arena/bot.ts b/examples/multiplayer-game-patterns/frontend/games/arena/bot.ts index c37d37a491..49c73aa42f 100644 --- a/examples/multiplayer-game-patterns/frontend/games/arena/bot.ts +++ b/examples/multiplayer-game-patterns/frontend/games/arena/bot.ts @@ -17,12 +17,21 @@ export class ArenaBot { const mm = this.client.arenaMatchmaker.getOrCreate(["main"]).connect(); this.mm = mm; const result = await mm.send("queueForMatch", { mode: this.mode }, { wait: true, timeout: 120_000 }); - const response = (result as { response?: { playerId: string } })?.response; + const response = (result as { + response?: { playerId: string; registrationToken: string }; + })?.response; if (!response || this.destroyed) return; + await mm.registerPlayer({ + playerId: response.playerId, + registrationToken: response.registrationToken, + }); // Poll for assignment until match is filled. while (!this.destroyed) { - const assignment = await mm.getAssignment({ playerId: response.playerId }); + const assignment = await mm.getAssignment({ + playerId: response.playerId, + registrationToken: response.registrationToken, + }); if (assignment) { mm.dispose(); this.mm = null; diff --git a/examples/multiplayer-game-patterns/frontend/games/arena/menu.tsx b/examples/multiplayer-game-patterns/frontend/games/arena/menu.tsx index 1ab05614d5..0afb0fdef4 100644 --- a/examples/multiplayer-game-patterns/frontend/games/arena/menu.tsx +++ b/examples/multiplayer-game-patterns/frontend/games/arena/menu.tsx @@ -1,7 +1,6 @@ import { useCallback, useEffect, useRef, useState } from "react"; import type { GameClient } from "../../client.ts"; import { type Mode, MODE_CONFIG } from "../../../src/actors/arena/config.ts"; -import type { ArenaAssignment } from "../../../src/actors/arena/matchmaker.ts"; import { ArenaBot } from "./bot.ts"; export interface ArenaMatchInfo { @@ -89,15 +88,6 @@ export function ArenaMenu({ setQueueCount(data.counts[mode] ?? 0); }); - mm.on("assigned", (raw: unknown) => { - const data = raw as { assignments: ArenaAssignment[] }; - if (!myPlayerId) return; - const mine = data.assignments.find( - (a) => a.playerId === myPlayerId, - ); - if (mine) resolveMatch(mine); - }); - setStatus("queued"); // Queue message completes immediately with playerId. @@ -107,23 +97,34 @@ export function ArenaMenu({ { wait: true, timeout: 120_000 }, ); const response = ( - result as { response?: { playerId: string } } + result as { + response?: { playerId: string; registrationToken: string }; + } )?.response; if (!response || abortRef.current) throw new Error("Failed to queue"); myPlayerId = response.playerId; + await mm.registerPlayer({ + playerId: myPlayerId, + registrationToken: response.registrationToken, + }); // Fetch current queue sizes since the broadcast during queue // processing may have fired before the WebSocket was connected. const sizes = await mm.getQueueSizes(); if (!abortRef.current) setQueueCount((sizes as Record)[mode] ?? 0); - // Check if assignment was already made during queue processing. - if (!matched) { + // Poll for assignment until this connection is matched. + while (!matched && !abortRef.current) { const existing = await mm.getAssignment({ playerId: myPlayerId, + registrationToken: response.registrationToken, }); - if (existing) resolveMatch(existing as ArenaMatchInfo); + if (existing) { + resolveMatch(existing as ArenaMatchInfo); + break; + } + await new Promise((resolve) => setTimeout(resolve, 200)); } } catch (err) { if (abortRef.current) return; diff --git a/examples/multiplayer-game-patterns/frontend/games/battle-royale/battle-royale-game.ts b/examples/multiplayer-game-patterns/frontend/games/battle-royale/battle-royale-game.ts index e0f8fc25e2..9b1608eecc 100644 --- a/examples/multiplayer-game-patterns/frontend/games/battle-royale/battle-royale-game.ts +++ b/examples/multiplayer-game-patterns/frontend/games/battle-royale/battle-royale-game.ts @@ -28,6 +28,10 @@ interface ShotLine { createdAt: number; } +type BattleRoyaleMatchConn = ReturnType< + ReturnType["connect"] +>; + export class BattleRoyaleGame { private stopped = false; private rafId = 0; @@ -48,12 +52,7 @@ export class BattleRoyaleGame { private localY = 0; private lastFrameTime = 0; private botInterval = 0; - private conn: { - updatePosition: (i: { x: number; y: number }) => Promise; - shoot: (i: { dirX: number; dirY: number }) => Promise; - on: (e: string, cb: (d: unknown) => void) => void; - dispose: () => Promise; - }; + private conn: BattleRoyaleMatchConn; constructor( private canvas: HTMLCanvasElement | null, @@ -65,7 +64,7 @@ export class BattleRoyaleGame { .get([matchInfo.matchId], { params: { playerToken: matchInfo.playerToken }, }) - .connect() as typeof this.conn; + .connect(); this.conn.on("snapshot", (raw: unknown) => { const snap = raw as { diff --git a/examples/multiplayer-game-patterns/frontend/games/io-style/io-game.ts b/examples/multiplayer-game-patterns/frontend/games/io-style/io-game.ts index 4f87d8bf2f..35103a0258 100644 --- a/examples/multiplayer-game-patterns/frontend/games/io-style/io-game.ts +++ b/examples/multiplayer-game-patterns/frontend/games/io-style/io-game.ts @@ -14,6 +14,10 @@ function colorFromId(id: string): string { return `hsl(${hue}, 70%, 55%)`; } +type IoStyleMatchConn = ReturnType< + ReturnType["connect"] +>; + export class IoGame { private stopped = false; private rafId = 0; @@ -24,11 +28,7 @@ export class IoGame { private lastIx = 0; private lastIy = 0; private botInterval = 0; - private conn: { - setInput: (i: { inputX: number; inputY: number }) => Promise; - on: (e: string, cb: (d: unknown) => void) => void; - dispose: () => Promise; - }; + private conn: IoStyleMatchConn; constructor( private canvas: HTMLCanvasElement | null, @@ -40,7 +40,7 @@ export class IoGame { .get([matchInfo.matchId], { params: { playerToken: matchInfo.playerToken }, }) - .connect() as typeof this.conn; + .connect(); this.conn.on("snapshot", (raw: unknown) => { const snap = raw as { diff --git a/examples/multiplayer-game-patterns/frontend/games/open-world/open-world-game.ts b/examples/multiplayer-game-patterns/frontend/games/open-world/open-world-game.ts index b8faaf2e71..fb1d175301 100644 --- a/examples/multiplayer-game-patterns/frontend/games/open-world/open-world-game.ts +++ b/examples/multiplayer-game-patterns/frontend/games/open-world/open-world-game.ts @@ -19,17 +19,14 @@ function colorFromId(id: string): string { return `hsl(${hue}, 70%, 55%)`; } +type OpenWorldChunkConn = ReturnType< + ReturnType["connect"] +>; + interface ChunkConnection { cx: number; cy: number; - conn: { - setInput: (i: { inputX: number; inputY: number; sprint?: boolean }) => Promise; - removePlayer: (i: { playerId: string }) => Promise; - placeBlock: (i: { gridX: number; gridY: number }) => Promise; - removeBlock: (i: { gridX: number; gridY: number }) => Promise; - on: (e: string, cb: (d: unknown) => void) => void; - dispose: () => Promise; - }; + conn: OpenWorldChunkConn; players: Record; display: Record; blocks: Set; @@ -99,7 +96,7 @@ export class OpenWorldGame { const conn = this.client.openWorldChunk .getOrCreate(["default", String(cx), String(cy)], { params }) - .connect() as ChunkConnection["conn"]; + .connect(); const chunk: ChunkConnection = { cx, diff --git a/examples/multiplayer-game-patterns/frontend/games/physics-2d/physics-2d-game.ts b/examples/multiplayer-game-patterns/frontend/games/physics-2d/physics-2d-game.ts index 5547151feb..6e0d6f8669 100644 --- a/examples/multiplayer-game-patterns/frontend/games/physics-2d/physics-2d-game.ts +++ b/examples/multiplayer-game-patterns/frontend/games/physics-2d/physics-2d-game.ts @@ -7,6 +7,10 @@ import { SCENE_STATIC, } from "../../../src/actors/physics-2d/config.ts"; +type Physics2dConn = ReturnType< + ReturnType["connect"] +>; + interface BodySnapshot { id: string; x: number; @@ -53,12 +57,7 @@ export class Physics2dGame { private lastSnapshotTime = 0; private tickIntervalMs = 0; private latencyMs = 0; - private conn: { - setInput: (i: { inputX: number; jump?: boolean }) => Promise; - spawnBox: (i: { x: number; y: number }) => Promise; - on: (e: string, cb: (d: unknown) => void) => void; - dispose: () => Promise; - }; + private conn: Physics2dConn; constructor( private canvas: HTMLCanvasElement, @@ -68,12 +67,7 @@ export class Physics2dGame { const handle = client.physics2dWorld.getOrCreate(["main"], { params: { name: matchInfo.name }, }); - this.conn = handle.connect() as typeof this.conn; - - // Capture connection id from the handle. - (handle as unknown as { id: Promise }).id?.then?.((id: string) => { - this.myConnId = id; - }); + this.conn = handle.connect(); this.conn.on("snapshot", (raw: unknown) => { const snap = raw as Snapshot; diff --git a/examples/multiplayer-game-patterns/frontend/games/physics-3d/physics-3d-game.ts b/examples/multiplayer-game-patterns/frontend/games/physics-3d/physics-3d-game.ts index 491d590392..8634f49567 100644 --- a/examples/multiplayer-game-patterns/frontend/games/physics-3d/physics-3d-game.ts +++ b/examples/multiplayer-game-patterns/frontend/games/physics-3d/physics-3d-game.ts @@ -7,6 +7,10 @@ import { SCENE_STATIC, } from "../../../src/actors/physics-3d/config.ts"; +type Physics3dConn = ReturnType< + ReturnType["connect"] +>; + interface BodySnapshot { id: string; x: number; @@ -63,12 +67,7 @@ export class Physics3dGame { private raycaster = new THREE.Raycaster(); private groundPlane = new THREE.Plane(new THREE.Vector3(0, 1, 0), 0); - private conn: { - setInput: (i: { inputX: number; inputZ: number; jump?: boolean }) => Promise; - spawnBox: (i: { x: number; z: number }) => Promise; - on: (e: string, cb: (d: unknown) => void) => void; - dispose: () => Promise; - }; + private conn: Physics3dConn; constructor( private container: HTMLDivElement, @@ -132,7 +131,7 @@ export class Physics3dGame { const handle = client.physics3dWorld.getOrCreate(["main"], { params: { name: matchInfo.name }, }); - this.conn = handle.connect() as typeof this.conn; + this.conn = handle.connect(); this.conn.on("snapshot", (raw: unknown) => { const snap = raw as Snapshot; diff --git a/examples/multiplayer-game-patterns/frontend/games/ranked/bot.ts b/examples/multiplayer-game-patterns/frontend/games/ranked/bot.ts index 93ec0c78cb..f12aa82249 100644 --- a/examples/multiplayer-game-patterns/frontend/games/ranked/bot.ts +++ b/examples/multiplayer-game-patterns/frontend/games/ranked/bot.ts @@ -17,12 +17,27 @@ export class RankedBot { const botUsername = `Bot#${Math.floor(Math.random() * 10000).toString().padStart(4, "0")}`; const mm = this.client.rankedMatchmaker.getOrCreate(["main"]).connect(); this.mm = mm; - await mm.send("queueForMatch", { username: botUsername }, { wait: true, timeout: 120_000 }); + const queueResult = await mm.send( + "queueForMatch", + { username: botUsername }, + { wait: true, timeout: 120_000 }, + ); + const queueResponse = ( + queueResult as { response?: { registrationToken: string } } + )?.response; + if (!queueResponse || this.destroyed) return; + await mm.registerPlayer({ + username: botUsername, + registrationToken: queueResponse.registrationToken, + }); if (this.destroyed) return; // Poll for assignment until paired. while (!this.destroyed) { - const assignment = await mm.getAssignment({ username: botUsername }); + const assignment = await mm.getAssignment({ + username: botUsername, + registrationToken: queueResponse.registrationToken, + }); if (assignment) { mm.dispose(); this.mm = null; diff --git a/examples/multiplayer-game-patterns/frontend/games/ranked/menu.tsx b/examples/multiplayer-game-patterns/frontend/games/ranked/menu.tsx index e7b1f07847..363629fc54 100644 --- a/examples/multiplayer-game-patterns/frontend/games/ranked/menu.tsx +++ b/examples/multiplayer-game-patterns/frontend/games/ranked/menu.tsx @@ -1,6 +1,5 @@ import { useCallback, useEffect, useRef, useState } from "react"; import type { GameClient } from "../../client.ts"; -import type { RankedAssignment } from "../../../src/actors/ranked/matchmaker.ts"; import type { LeaderboardEntry } from "../../../src/actors/ranked/leaderboard.ts"; import type { PlayerSnapshot } from "../../../src/actors/ranked/player.ts"; import { RankedBot } from "./bot.ts"; @@ -31,6 +30,7 @@ export interface RankedMatchInfo { matchId: string; username: string; rating: number; + playerToken: string; } export function RankedMenu({ @@ -137,30 +137,39 @@ export function RankedMenu({ setQueueCount(data.count); }); - mm.on("assigned", (raw: unknown) => { - const data = raw as { assignments: RankedAssignment[] }; - const mine = data.assignments.find( - (a) => a.username === username, - ); - if (mine) resolveMatch(mine); - }); - setStatus("queued"); - await mm.send( + const queueResult = await mm.send( "queueForMatch", { username }, { wait: true, timeout: 120_000 }, ); + const queueResponse = ( + queueResult as { response?: { registrationToken: string } } + )?.response; + if (!queueResponse || abortRef.current) { + throw new Error("Failed to queue"); + } + await mm.registerPlayer({ + username, + registrationToken: queueResponse.registrationToken, + }); if (abortRef.current) return; const size = await mm.getQueueSize(); if (!abortRef.current) setQueueCount(size as number); - if (!matched) { - const existing = await mm.getAssignment({ username }); - if (existing) resolveMatch(existing as RankedMatchInfo); + while (!matched && !abortRef.current) { + const existing = await mm.getAssignment({ + username, + registrationToken: queueResponse.registrationToken, + }); + if (existing) { + resolveMatch(existing as RankedMatchInfo); + break; + } + await new Promise((resolve) => setTimeout(resolve, 200)); } } catch (err) { if (abortRef.current) return; diff --git a/examples/multiplayer-game-patterns/frontend/games/ranked/ranked-game.ts b/examples/multiplayer-game-patterns/frontend/games/ranked/ranked-game.ts index 1432cb3d48..7672e3fc76 100644 --- a/examples/multiplayer-game-patterns/frontend/games/ranked/ranked-game.ts +++ b/examples/multiplayer-game-patterns/frontend/games/ranked/ranked-game.ts @@ -27,6 +27,10 @@ interface ShotLine { createdAt: number; } +type RankedMatchConn = ReturnType< + ReturnType["connect"] +>; + export class RankedGame { private stopped = false; private rafId = 0; @@ -43,12 +47,7 @@ export class RankedGame { private localY = 300; private lastFrameTime = 0; private botInterval = 0; - private conn: { - updatePosition: (i: { x: number; y: number }) => Promise; - shoot: (i: { dirX: number; dirY: number }) => Promise; - on: (e: string, cb: (d: unknown) => void) => void; - dispose: () => Promise; - }; + private conn: RankedMatchConn; constructor( private canvas: HTMLCanvasElement | null, @@ -58,9 +57,9 @@ export class RankedGame { ) { this.conn = client.rankedMatch .get([matchInfo.matchId], { - params: { username: matchInfo.username }, + params: { playerToken: matchInfo.playerToken }, }) - .connect() as typeof this.conn; + .connect(); this.conn.on("snapshot", (raw: unknown) => { const snap = raw as { diff --git a/examples/multiplayer-game-patterns/src/actors/arena/match.ts b/examples/multiplayer-game-patterns/src/actors/arena/match.ts index f7e2f1ed85..2a92e3e052 100644 --- a/examples/multiplayer-game-patterns/src/actors/arena/match.ts +++ b/examples/multiplayer-game-patterns/src/actors/arena/match.ts @@ -1,6 +1,10 @@ import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { interval } from "rivetkit/utils"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { + hasInvalidInternalToken, + INTERNAL_TOKEN, + isInternalToken, +} from "../../auth.ts"; import { registry } from "../index.ts"; import { type Mode, @@ -83,6 +87,9 @@ export const arenaMatch = actor({ c, params: { playerToken?: string; internalToken?: string }, ) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } if (params?.internalToken === INTERNAL_TOKEN) return; const playerToken = params?.playerToken?.trim(); if (!playerToken) { @@ -96,6 +103,27 @@ export const arenaMatch = actor({ }); } }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + const isAssignedPlayer = findPlayerByConnId(c.state, c.conn.id) !== null; + if ( + invoke.kind === "action" && + (invoke.name === "updatePosition" || + invoke.name === "shoot" || + invoke.name === "getSnapshot") + ) { + return !isInternal && isAssignedPlayer; + } + if ( + invoke.kind === "subscribe" && + (invoke.name === "snapshot" || invoke.name === "shoot") + ) { + return !isInternal && isAssignedPlayer; + } + return false; + }, onConnect: (c, conn) => { const playerToken = conn.params?.playerToken?.trim(); if (!playerToken) return; @@ -122,7 +150,7 @@ export const arenaMatch = actor({ onDestroy: async (c) => { const client = c.client(); await client.arenaMatchmaker - .getOrCreate(["main"]) + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) .send("matchCompleted", { matchId: c.state.matchId }); }, onDisconnect: (c, conn) => { diff --git a/examples/multiplayer-game-patterns/src/actors/arena/matchmaker.ts b/examples/multiplayer-game-patterns/src/actors/arena/matchmaker.ts index 633ce72328..ce70017690 100644 --- a/examples/multiplayer-game-patterns/src/actors/arena/matchmaker.ts +++ b/examples/multiplayer-game-patterns/src/actors/arena/matchmaker.ts @@ -1,6 +1,7 @@ -import { actor, type ActorContextOf, queue } from "rivetkit"; +import { actor, type ActorContextOf, queue, UserError } from "rivetkit"; import { db, type RawAccess } from "rivetkit/db"; +import { hasInvalidInternalToken, INTERNAL_TOKEN, isInternalToken } from "../../auth.ts"; import { registry } from "../index.ts"; import { type Mode, MODE_CONFIG } from "./config.ts"; @@ -10,24 +11,90 @@ export interface ArenaAssignment { playerToken: string; teamId: number; mode: Mode; + connId: string | null; } +type QueuePlayerRow = { + player_id: string; + conn_id: string | null; + registration_token: string | null; +}; + +type StoredArenaAssignment = ArenaAssignment & { + registrationToken: string | null; +}; + export const arenaMatchmaker = actor({ options: { name: "Arena - Matchmaker", icon: "crosshairs" }, db: db({ onMigrate: migrateTables, }), + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if (invoke.kind === "queue" && invoke.name === "queueForMatch") { + return !isInternal; + } + if (invoke.kind === "queue" && invoke.name === "matchCompleted") { + return isInternal; + } + if ( + invoke.kind === "action" && + (invoke.name === "registerPlayer" || + invoke.name === "getQueueSizes" || + invoke.name === "getAssignment") + ) { + return !isInternal; + } + if (invoke.kind === "subscribe" && invoke.name === "queueUpdate") { + return !isInternal; + } + return false; + }, queues: { - queueForMatch: queue<{ mode: Mode }, { playerId: string }>(), + queueForMatch: queue< + { mode: Mode }, + { playerId: string; registrationToken: string } + >(), matchCompleted: queue<{ matchId: string }>(), }, actions: { - registerPlayer: async (c, { playerId }: { playerId: string }) => { + registerPlayer: async ( + c, + { + playerId, + registrationToken, + }: { playerId: string; registrationToken: string }, + ) => { await c.db.execute( - `UPDATE player_pool SET conn_id = ? WHERE player_id = ?`, + `UPDATE player_pool SET conn_id = ? WHERE player_id = ? AND registration_token = ?`, c.conn.id, playerId, + registrationToken, + ); + const playerPoolChangeRows = await c.db.execute<{ changed: number }>( + `SELECT changes() AS changed`, ); + const playerPoolChanges = playerPoolChangeRows[0]?.changed ?? 0; + await c.db.execute( + `UPDATE assignments SET conn_id = ? WHERE player_id = ? AND registration_token = ?`, + c.conn.id, + playerId, + registrationToken, + ); + const assignmentChangeRows = await c.db.execute<{ changed: number }>( + `SELECT changes() AS changed`, + ); + const assignmentChanges = assignmentChangeRows[0]?.changed ?? 0; + if (playerPoolChanges === 0 && assignmentChanges === 0) { + throw new UserError("forbidden", { code: "forbidden" }); + } }, getQueueSizes: async (c) => { const rows = await c.db.execute<{ mode: string; cnt: number }>( @@ -39,16 +106,25 @@ export const arenaMatchmaker = actor({ } return counts; }, - getAssignment: async (c, { playerId }: { playerId: string }) => { + getAssignment: async ( + c, + { + playerId, + registrationToken, + }: { playerId: string; registrationToken: string }, + ) => { const rows = await c.db.execute<{ match_id: string; player_id: string; player_token: string; team_id: number; mode: string; + conn_id: string | null; }>( - `SELECT * FROM assignments WHERE player_id = ?`, + `SELECT * FROM assignments WHERE player_id = ? AND conn_id = ? AND registration_token = ?`, playerId, + c.conn.id, + registrationToken, ); if (rows.length === 0) return null; const row = rows[0]!; @@ -58,6 +134,7 @@ export const arenaMatchmaker = actor({ playerToken: row.player_token, teamId: row.team_id, mode: row.mode as Mode, + connId: row.conn_id, }; }, }, @@ -68,8 +145,8 @@ export const arenaMatchmaker = actor({ run: async (c) => { for await (const message of c.queue.iter({ completable: true })) { if (message.name === "queueForMatch") { - const playerId = await processQueueEntry(c, message.body.mode); - await message.complete({ playerId }); + const queueResult = await processQueueEntry(c, message.body.mode); + await message.complete(queueResult); } else if (message.name === "matchCompleted") { await c.db.execute( `DELETE FROM matches WHERE match_id = ?`, @@ -97,16 +174,18 @@ async function broadcastQueueSizes( async function processQueueEntry( c: ActorContextOf, mode: Mode, -): Promise { +): Promise<{ playerId: string; registrationToken: string }> { const playerId = crypto.randomUUID(); + const registrationToken = crypto.randomUUID(); const config = MODE_CONFIG[mode]; // Insert player into pool. await c.db.execute( - `INSERT OR REPLACE INTO player_pool (player_id, mode, queued_at) VALUES (?, ?, ?)`, + `INSERT OR REPLACE INTO player_pool (player_id, mode, queued_at, registration_token) VALUES (?, ?, ?, ?)`, playerId, mode, Date.now(), + registrationToken, ); await broadcastQueueSizes(c); @@ -122,7 +201,7 @@ async function processQueueEntry( await fillMatch(c, mode, config); } - return playerId; + return { playerId, registrationToken }; } async function fillMatch( @@ -131,13 +210,18 @@ async function fillMatch( config: { capacity: number; teams: number }, ) { // Pop oldest N players. - const queued = await c.db.execute<{ player_id: string }>( - `SELECT player_id FROM player_pool WHERE mode = ? ORDER BY queued_at ASC LIMIT ?`, + const queued = await c.db.execute( + `SELECT player_id, conn_id, registration_token FROM player_pool WHERE mode = ? ORDER BY queued_at ASC LIMIT ?`, mode, config.capacity, ); - const playerIds = queued.map((r) => r.player_id); + const queuedPlayers = queued.map((r) => ({ + playerId: r.player_id, + connId: r.conn_id, + registrationToken: r.registration_token, + })); + const playerIds = queuedPlayers.map((r) => r.playerId); // Remove from queue. for (const pid of playerIds) { @@ -146,9 +230,11 @@ async function fillMatch( // Assign teams and generate tokens. const matchId = crypto.randomUUID(); - const assignedPlayers = playerIds.map((pid, idx) => ({ - playerId: pid, + const assignedPlayers = queuedPlayers.map((queuedPlayer, idx) => ({ + playerId: queuedPlayer.playerId, token: crypto.randomUUID(), + connId: queuedPlayer.connId, + registrationToken: queuedPlayer.registrationToken, teamId: config.teams > 0 ? idx % config.teams : -1, })); @@ -179,26 +265,27 @@ async function fillMatch( await broadcastQueueSizes(c); // Store assignments in DB so bots can poll for them. - const assignments: ArenaAssignment[] = assignedPlayers.map((ap) => ({ + const assignments: StoredArenaAssignment[] = assignedPlayers.map((ap) => ({ matchId, playerId: ap.playerId, playerToken: ap.token, teamId: ap.teamId, mode, + connId: ap.connId, + registrationToken: ap.registrationToken, })); for (const a of assignments) { await c.db.execute( - `INSERT INTO assignments (player_id, match_id, player_token, team_id, mode) VALUES (?, ?, ?, ?, ?)`, + `INSERT INTO assignments (player_id, match_id, player_token, team_id, mode, conn_id, registration_token) VALUES (?, ?, ?, ?, ?, ?, ?)`, a.playerId, a.matchId, a.playerToken, a.teamId, a.mode, + a.connId, + a.registrationToken, ); } - - // Broadcast assignments to WS connections. - c.broadcast("assigned", { assignments }); } async function migrateTables(dbHandle: RawAccess) { @@ -207,7 +294,8 @@ async function migrateTables(dbHandle: RawAccess) { player_id TEXT PRIMARY KEY, mode TEXT NOT NULL, queued_at INTEGER NOT NULL, - conn_id TEXT + conn_id TEXT, + registration_token TEXT ) `); await dbHandle.execute(` @@ -224,7 +312,27 @@ async function migrateTables(dbHandle: RawAccess) { match_id TEXT NOT NULL, player_token TEXT NOT NULL, team_id INTEGER NOT NULL, - mode TEXT NOT NULL + mode TEXT NOT NULL, + conn_id TEXT, + registration_token TEXT ) `); + await ensureColumn(dbHandle, "player_pool", "registration_token", "TEXT"); + await ensureColumn(dbHandle, "assignments", "registration_token", "TEXT"); +} + +async function ensureColumn( + dbHandle: RawAccess, + table: "player_pool" | "assignments", + column: "registration_token", + definition: "TEXT", +) { + const columns = await dbHandle.execute<{ name: string }>( + `PRAGMA table_info(${table})`, + ); + if (!columns.some((col) => col.name === column)) { + await dbHandle.execute( + `ALTER TABLE ${table} ADD COLUMN ${column} ${definition}`, + ); + } } diff --git a/examples/multiplayer-game-patterns/src/actors/battle-royale/match.ts b/examples/multiplayer-game-patterns/src/actors/battle-royale/match.ts index e8e3815832..adbffa58a5 100644 --- a/examples/multiplayer-game-patterns/src/actors/battle-royale/match.ts +++ b/examples/multiplayer-game-patterns/src/actors/battle-royale/match.ts @@ -1,6 +1,10 @@ import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { interval } from "rivetkit/utils"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { + hasInvalidInternalToken, + INTERNAL_TOKEN, + isInternalToken, +} from "../../auth.ts"; import { registry } from "../index.ts"; import { LOBBY_CAPACITY, @@ -61,6 +65,9 @@ export const battleRoyaleMatch = actor({ c, params: { playerToken?: string; internalToken?: string }, ) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } if (params?.internalToken === INTERNAL_TOKEN) return; const playerToken = params?.playerToken?.trim(); if (!playerToken) { @@ -70,6 +77,30 @@ export const battleRoyaleMatch = actor({ throw new UserError("invalid player token", { code: "invalid_player_token" }); } }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + const isAssignedPlayer = findPlayerByConnId(c.state, c.conn.id) !== null; + if (invoke.kind === "action" && invoke.name === "createPlayer") { + return isInternal; + } + if ( + invoke.kind === "action" && + (invoke.name === "updatePosition" || + invoke.name === "shoot" || + invoke.name === "getSnapshot") + ) { + return !isInternal && isAssignedPlayer; + } + if ( + invoke.kind === "subscribe" && + (invoke.name === "snapshot" || invoke.name === "shoot") + ) { + return !isInternal && isAssignedPlayer; + } + return false; + }, onConnect: async (c, conn) => { const playerToken = conn.params?.playerToken?.trim(); if (!playerToken) return; @@ -98,7 +129,7 @@ export const battleRoyaleMatch = actor({ onDestroy: async (c) => { const client = c.client(); await client.battleRoyaleMatchmaker - .getOrCreate(["main"]) + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) .send("closeMatch", { matchId: c.state.matchId }); }, run: async (c) => { @@ -336,11 +367,13 @@ function randomPositionInZone(zone: { centerX: number; centerY: number; radius: async function updateMatchmaker(c: ActorContextOf) { const client = c.client(); - await client.battleRoyaleMatchmaker.getOrCreate(["main"]).send("updateMatch", { - matchId: c.state.matchId, - playerCount: Object.values(c.state.players).filter((p) => p.connId !== null).length, - isStarted: c.state.phase !== "lobby", - }); + await client.battleRoyaleMatchmaker + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) + .send("updateMatch", { + matchId: c.state.matchId, + playerCount: Object.values(c.state.players).filter((p) => p.connId !== null).length, + isStarted: c.state.phase !== "lobby", + }); } function countAlivePlayers(state: State): number { diff --git a/examples/multiplayer-game-patterns/src/actors/battle-royale/matchmaker.ts b/examples/multiplayer-game-patterns/src/actors/battle-royale/matchmaker.ts index a0a042063c..d3f6dde3a1 100644 --- a/examples/multiplayer-game-patterns/src/actors/battle-royale/matchmaker.ts +++ b/examples/multiplayer-game-patterns/src/actors/battle-royale/matchmaker.ts @@ -1,7 +1,7 @@ -import { actor, queue } from "rivetkit"; +import { actor, queue, UserError } from "rivetkit"; import { db, type RawAccess } from "rivetkit/db"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { hasInvalidInternalToken, INTERNAL_TOKEN, isInternalToken } from "../../auth.ts"; import { registry } from "../index.ts"; import { LOBBY_CAPACITY } from "./config.ts"; @@ -10,6 +10,26 @@ export const battleRoyaleMatchmaker = actor({ db: db({ onMigrate: migrateTables, }), + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if (invoke.kind === "queue" && invoke.name === "findMatch") { + return !isInternal; + } + if ( + invoke.kind === "queue" && + (invoke.name === "updateMatch" || invoke.name === "closeMatch") + ) { + return isInternal; + } + return false; + }, queues: { findMatch: queue< Record, diff --git a/examples/multiplayer-game-patterns/src/actors/idle/leaderboard.ts b/examples/multiplayer-game-patterns/src/actors/idle/leaderboard.ts index 01918302a2..447531ff21 100644 --- a/examples/multiplayer-game-patterns/src/actors/idle/leaderboard.ts +++ b/examples/multiplayer-game-patterns/src/actors/idle/leaderboard.ts @@ -1,5 +1,6 @@ -import { actor, type ActorContextOf, event } from "rivetkit"; +import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { db, type RawAccess } from "rivetkit/db"; +import { hasInvalidInternalToken, isInternalToken } from "../../auth.ts"; export interface LeaderboardEntry { playerId: string; @@ -15,6 +16,26 @@ export const idleLeaderboard = actor({ events: { leaderboardUpdate: event(), }, + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if (invoke.kind === "action" && invoke.name === "updateScore") { + return isInternal; + } + if (invoke.kind === "action" && invoke.name === "getTopScores") { + return true; + } + if (invoke.kind === "subscribe" && invoke.name === "leaderboardUpdate") { + return !isInternal; + } + return false; + }, actions: { updateScore: async ( c, diff --git a/examples/multiplayer-game-patterns/src/actors/idle/world.ts b/examples/multiplayer-game-patterns/src/actors/idle/world.ts index 400d7c89d3..905825b547 100644 --- a/examples/multiplayer-game-patterns/src/actors/idle/world.ts +++ b/examples/multiplayer-game-patterns/src/actors/idle/world.ts @@ -1,4 +1,9 @@ -import { actor, type ActorContextOf, event } from "rivetkit"; +import { actor, type ActorContextOf, event, UserError } from "rivetkit"; +import { + hasInvalidInternalToken, + INTERNAL_TOKEN, + isInternalToken, +} from "../../auth.ts"; import { registry } from "../index.ts"; import { BUILDINGS, STARTING_RESOURCES, type BuildingType } from "./config.ts"; @@ -39,6 +44,32 @@ export const idleWorld = actor({ events: { stateUpdate: event(), }, + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if ( + invoke.kind === "action" && + (invoke.name === "initialize" || + invoke.name === "build" || + invoke.name === "getState" || + invoke.name === "getLeaderboard") + ) { + return !isInternal; + } + if (invoke.kind === "action" && invoke.name === "collectProduction") { + return isInternal; + } + if (invoke.kind === "subscribe" && invoke.name === "stateUpdate") { + return !isInternal; + } + return false; + }, state: { playerId: "", playerName: "", @@ -140,7 +171,7 @@ function scheduleCollection( function updateLeaderboard(c: ActorContextOf) { const client = c.client(); client.idleLeaderboard - .getOrCreate(["main"]) + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) .updateScore({ playerId: c.state.playerId, playerName: c.state.playerName, diff --git a/examples/multiplayer-game-patterns/src/actors/io-style/match.ts b/examples/multiplayer-game-patterns/src/actors/io-style/match.ts index f94722b735..a72328f605 100644 --- a/examples/multiplayer-game-patterns/src/actors/io-style/match.ts +++ b/examples/multiplayer-game-patterns/src/actors/io-style/match.ts @@ -1,6 +1,10 @@ import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { interval } from "rivetkit/utils"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { + hasInvalidInternalToken, + INTERNAL_TOKEN, + isInternalToken, +} from "../../auth.ts"; import { registry } from "../index.ts"; import { CAPACITY, SPEED, WORLD_SIZE } from "./config.ts"; @@ -37,6 +41,9 @@ export const ioStyleMatch = actor({ c, params: { playerToken?: string; internalToken?: string }, ) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } if (params?.internalToken === INTERNAL_TOKEN) return; const playerToken = params?.playerToken?.trim(); if (!playerToken) { @@ -50,6 +57,25 @@ export const ioStyleMatch = actor({ }); } }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + const isAssignedPlayer = findPlayerByConnId(c.state, c.conn.id) !== null; + if (invoke.kind === "action" && invoke.name === "createPlayer") { + return isInternal; + } + if ( + invoke.kind === "action" && + (invoke.name === "setInput" || invoke.name === "getSnapshot") + ) { + return !isInternal && isAssignedPlayer; + } + if (invoke.kind === "subscribe" && invoke.name === "snapshot") { + return !isInternal && isAssignedPlayer; + } + return false; + }, onConnect: async (c, conn) => { const playerToken = conn.params?.playerToken?.trim(); if (!playerToken) return; @@ -68,7 +94,7 @@ export const ioStyleMatch = actor({ onDestroy: async (c) => { const client = c.client(); await client.ioStyleMatchmaker - .getOrCreate(["main"]) + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) .send("closeMatch", { matchId: c.state.matchId, }); @@ -149,10 +175,12 @@ function broadcastSnapshot(c: ActorContextOf) { async function updateMatchmaker(c: ActorContextOf) { const client = c.client(); - await client.ioStyleMatchmaker.getOrCreate(["main"]).send("updateMatch", { - matchId: c.state.matchId, - playerCount: activePlayerCount(c.state), - }); + await client.ioStyleMatchmaker + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) + .send("updateMatch", { + matchId: c.state.matchId, + playerCount: activePlayerCount(c.state), + }); } interface Snapshot { diff --git a/examples/multiplayer-game-patterns/src/actors/io-style/matchmaker.ts b/examples/multiplayer-game-patterns/src/actors/io-style/matchmaker.ts index 0e76bc5896..eee4b219ae 100644 --- a/examples/multiplayer-game-patterns/src/actors/io-style/matchmaker.ts +++ b/examples/multiplayer-game-patterns/src/actors/io-style/matchmaker.ts @@ -1,7 +1,7 @@ -import { actor, type ActorContextOf, queue } from "rivetkit"; +import { actor, type ActorContextOf, queue, UserError } from "rivetkit"; import { db, type RawAccess } from "rivetkit/db"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { hasInvalidInternalToken, INTERNAL_TOKEN, isInternalToken } from "../../auth.ts"; import { registry } from "../index.ts"; import { CAPACITY } from "./config.ts"; @@ -10,6 +10,26 @@ export const ioStyleMatchmaker = actor({ db: db({ onMigrate: migrateTables, }), + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if (invoke.kind === "queue" && invoke.name === "findLobby") { + return !isInternal; + } + if ( + invoke.kind === "queue" && + (invoke.name === "updateMatch" || invoke.name === "closeMatch") + ) { + return isInternal; + } + return false; + }, queues: { // Sent by player findLobby: queue, { matchId: string; playerId: string; playerToken: string }>(), diff --git a/examples/multiplayer-game-patterns/src/actors/open-world/chunk.ts b/examples/multiplayer-game-patterns/src/actors/open-world/chunk.ts index 1d40c91b56..bdb06b4de0 100644 --- a/examples/multiplayer-game-patterns/src/actors/open-world/chunk.ts +++ b/examples/multiplayer-game-patterns/src/actors/open-world/chunk.ts @@ -1,6 +1,10 @@ import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { interval } from "rivetkit/utils"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { + hasInvalidInternalToken, + INTERNAL_TOKEN, + isInternalToken, +} from "../../auth.ts"; import { CHUNK_SIZE, TICK_MS, SPEED, SPRINT_MULTIPLIER } from "./config.ts"; const DISCONNECT_GRACE_MS = 5000; @@ -46,6 +50,9 @@ export const openWorldChunk = actor({ c, params: { playerToken?: string; internalToken?: string; observer?: string }, ) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } if (params?.internalToken === INTERNAL_TOKEN) return; // Allow observer connections (for viewing adjacent chunks). if (params?.observer === "true") return; @@ -57,6 +64,38 @@ export const openWorldChunk = actor({ throw new UserError("invalid player token", { code: "invalid_player_token" }); } }, + canInvoke: (c, invoke) => { + const params = c.conn.params as + | { internalToken?: string; observer?: string } + | undefined; + const isInternal = isInternalToken(params); + const isObserver = params?.observer === "true"; + const isAssignedPlayer = findPlayerByConnId(c.state, c.conn.id) !== null; + if ( + invoke.kind === "action" && + (invoke.name === "initialize" || invoke.name === "createPlayer") + ) { + return isInternal; + } + if (invoke.kind === "action" && invoke.name === "getSnapshot") { + return isObserver || isAssignedPlayer; + } + if ( + invoke.kind === "action" && + (invoke.name === "setInput" || + invoke.name === "placeBlock" || + invoke.name === "removeBlock") + ) { + return isAssignedPlayer; + } + if (invoke.kind === "action" && invoke.name === "removePlayer") { + return isAssignedPlayer || isInternal; + } + if (invoke.kind === "subscribe" && invoke.name === "snapshot") { + return isObserver || isAssignedPlayer; + } + return false; + }, onConnect: (c, conn) => { // Observer connections just receive broadcasts. if (conn.params?.observer === "true") return; @@ -145,7 +184,15 @@ export const openWorldChunk = actor({ player.sprint = !!input.sprint; }, removePlayer: (c, input: { playerId: string }) => { - delete c.state.players[input.playerId]; + if (isInternalToken(c.conn.params as { internalToken?: string } | undefined)) { + delete c.state.players[input.playerId]; + broadcastSnapshot(c); + return; + } + const found = findPlayerByConnId(c.state, c.conn.id); + if (!found) return; + const [playerId] = found; + delete c.state.players[playerId]; broadcastSnapshot(c); }, placeBlock: (c, input: { gridX: number; gridY: number }) => { diff --git a/examples/multiplayer-game-patterns/src/actors/open-world/world-index.ts b/examples/multiplayer-game-patterns/src/actors/open-world/world-index.ts index c4eede6fba..7e621fc14a 100644 --- a/examples/multiplayer-game-patterns/src/actors/open-world/world-index.ts +++ b/examples/multiplayer-game-patterns/src/actors/open-world/world-index.ts @@ -1,11 +1,25 @@ -import { actor, queue } from "rivetkit"; +import { actor, queue, UserError } from "rivetkit"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { hasInvalidInternalToken, INTERNAL_TOKEN, isInternalToken } from "../../auth.ts"; import { registry } from "../index.ts"; import { CHUNK_SIZE, WORLD_ID } from "./config.ts"; export const openWorldIndex = actor({ options: { name: "Open World - Index", icon: "map" }, + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if (invoke.kind === "queue" && invoke.name === "getChunkForPosition") { + return !isInternal; + } + return false; + }, queues: { getChunkForPosition: queue< { x: number; y: number; playerName: string }, diff --git a/examples/multiplayer-game-patterns/src/actors/party/match.ts b/examples/multiplayer-game-patterns/src/actors/party/match.ts index 1fc20805ff..20906dcaab 100644 --- a/examples/multiplayer-game-patterns/src/actors/party/match.ts +++ b/examples/multiplayer-game-patterns/src/actors/party/match.ts @@ -1,5 +1,9 @@ import { actor, type ActorContextOf, event, UserError } from "rivetkit"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { + hasInvalidInternalToken, + INTERNAL_TOKEN, + isInternalToken, +} from "../../auth.ts"; import { registry } from "../index.ts"; import type { PartyPhase } from "./config.ts"; @@ -33,6 +37,9 @@ export const partyMatch = actor({ c, params: { playerToken?: string; internalToken?: string }, ) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } if (params?.internalToken === INTERNAL_TOKEN) return; const playerToken = params?.playerToken?.trim(); if (!playerToken) { @@ -42,6 +49,29 @@ export const partyMatch = actor({ throw new UserError("invalid player token", { code: "invalid_player_token" }); } }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + const isAssignedMember = findMemberByConnId(c.state, c.conn.id) !== null; + if (invoke.kind === "action" && invoke.name === "createPlayer") { + return isInternal; + } + if ( + invoke.kind === "action" && + (invoke.name === "setName" || + invoke.name === "toggleReady" || + invoke.name === "startGame" || + invoke.name === "finishGame" || + invoke.name === "getSnapshot") + ) { + return !isInternal && isAssignedMember; + } + if (invoke.kind === "subscribe" && invoke.name === "partyUpdate") { + return !isInternal && isAssignedMember; + } + return false; + }, onConnect: (c, conn) => { const playerToken = conn.params?.playerToken?.trim(); if (!playerToken) return; @@ -64,7 +94,7 @@ export const partyMatch = actor({ onDestroy: async (c) => { const client = c.client(); await client.partyMatchmaker - .getOrCreate(["main"]) + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) .send("closeParty", { matchId: c.state.matchId }); }, actions: { diff --git a/examples/multiplayer-game-patterns/src/actors/party/matchmaker.ts b/examples/multiplayer-game-patterns/src/actors/party/matchmaker.ts index c1e7b1cec8..0768f78931 100644 --- a/examples/multiplayer-game-patterns/src/actors/party/matchmaker.ts +++ b/examples/multiplayer-game-patterns/src/actors/party/matchmaker.ts @@ -1,7 +1,7 @@ import { actor, queue, UserError } from "rivetkit"; import { db, type RawAccess } from "rivetkit/db"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { hasInvalidInternalToken, INTERNAL_TOKEN, isInternalToken } from "../../auth.ts"; import { registry } from "../index.ts"; import { generatePartyCode, generatePlayerName, MAX_PARTY_SIZE } from "./config.ts"; @@ -10,6 +10,26 @@ export const partyMatchmaker = actor({ db: db({ onMigrate: migrateTables, }), + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if ( + invoke.kind === "queue" && + (invoke.name === "createParty" || invoke.name === "joinParty") + ) { + return !isInternal; + } + if (invoke.kind === "queue" && invoke.name === "closeParty") { + return isInternal; + } + return false; + }, queues: { createParty: queue< { hostName?: string }, diff --git a/examples/multiplayer-game-patterns/src/actors/physics-2d/world.ts b/examples/multiplayer-game-patterns/src/actors/physics-2d/world.ts index 18ed748c05..f5be7114e9 100644 --- a/examples/multiplayer-game-patterns/src/actors/physics-2d/world.ts +++ b/examples/multiplayer-game-patterns/src/actors/physics-2d/world.ts @@ -1,4 +1,4 @@ -import { actor, type ActorContextOf, event } from "rivetkit"; +import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { interval } from "rivetkit/utils"; import RAPIER from "@dimforge/rapier2d-compat"; import { @@ -45,6 +45,30 @@ export const physics2dWorld = actor({ events: { snapshot: event(), }, + onBeforeConnect: (_c, params: { name?: string }) => { + const name = params?.name?.trim(); + if (!name) { + throw new UserError("name required", { code: "auth_required" }); + } + if (name.length > 20) { + throw new UserError("name too long", { code: "invalid_name" }); + } + }, + canInvoke: (c, invoke) => { + const isConnectedPlayer = c.vars.players[c.conn.id] !== undefined; + if ( + invoke.kind === "action" && + (invoke.name === "setInput" || + invoke.name === "spawnBox" || + invoke.name === "getSnapshot") + ) { + return isConnectedPlayer; + } + if (invoke.kind === "subscribe" && invoke.name === "snapshot") { + return isConnectedPlayer; + } + return false; + }, state: { tick: 0, // [id, x, y, hw, hh] — persisted every tick so positions survive restarts. diff --git a/examples/multiplayer-game-patterns/src/actors/physics-3d/world.ts b/examples/multiplayer-game-patterns/src/actors/physics-3d/world.ts index cb6d4ad2b1..fa82830f2a 100644 --- a/examples/multiplayer-game-patterns/src/actors/physics-3d/world.ts +++ b/examples/multiplayer-game-patterns/src/actors/physics-3d/world.ts @@ -1,4 +1,4 @@ -import { actor, type ActorContextOf, event } from "rivetkit"; +import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { interval } from "rivetkit/utils"; import RAPIER from "@dimforge/rapier3d-compat"; import { @@ -49,6 +49,30 @@ export const physics3dWorld = actor({ events: { snapshot: event(), }, + onBeforeConnect: (_c, params: { name?: string }) => { + const name = params?.name?.trim(); + if (!name) { + throw new UserError("name required", { code: "auth_required" }); + } + if (name.length > 20) { + throw new UserError("name too long", { code: "invalid_name" }); + } + }, + canInvoke: (c, invoke) => { + const isConnectedPlayer = c.vars.players[c.conn.id] !== undefined; + if ( + invoke.kind === "action" && + (invoke.name === "setInput" || + invoke.name === "spawnBox" || + invoke.name === "getSnapshot") + ) { + return isConnectedPlayer; + } + if (invoke.kind === "subscribe" && invoke.name === "snapshot") { + return isConnectedPlayer; + } + return false; + }, state: { tick: 0, // [id, x, y, z, hx, hy, hz] — persisted every tick so positions survive restarts. diff --git a/examples/multiplayer-game-patterns/src/actors/ranked/leaderboard.ts b/examples/multiplayer-game-patterns/src/actors/ranked/leaderboard.ts index ce3e443f68..a28e6ade0d 100644 --- a/examples/multiplayer-game-patterns/src/actors/ranked/leaderboard.ts +++ b/examples/multiplayer-game-patterns/src/actors/ranked/leaderboard.ts @@ -1,5 +1,6 @@ -import { actor, type ActorContextOf, event } from "rivetkit"; +import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { db, type RawAccess } from "rivetkit/db"; +import { hasInvalidInternalToken, isInternalToken } from "../../auth.ts"; export interface LeaderboardEntry { username: string; @@ -16,6 +17,26 @@ export const rankedLeaderboard = actor({ events: { leaderboardUpdate: event(), }, + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if (invoke.kind === "action" && invoke.name === "updatePlayer") { + return isInternal; + } + if (invoke.kind === "action" && invoke.name === "getTopScores") { + return true; + } + if (invoke.kind === "subscribe" && invoke.name === "leaderboardUpdate") { + return !isInternal; + } + return false; + }, actions: { updatePlayer: async (c, input: { username: string; rating: number; wins: number; losses: number }) => { await c.db.execute( diff --git a/examples/multiplayer-game-patterns/src/actors/ranked/match.ts b/examples/multiplayer-game-patterns/src/actors/ranked/match.ts index a2db186aba..0e40f3be19 100644 --- a/examples/multiplayer-game-patterns/src/actors/ranked/match.ts +++ b/examples/multiplayer-game-patterns/src/actors/ranked/match.ts @@ -1,5 +1,10 @@ import { actor, type ActorContextOf, event, UserError } from "rivetkit"; import { interval } from "rivetkit/utils"; +import { + hasInvalidInternalToken, + INTERNAL_TOKEN, + isInternalToken, +} from "../../auth.ts"; import { registry } from "../index.ts"; import { TICK_MS, @@ -12,6 +17,7 @@ import { } from "./config.ts"; interface PlayerEntry { + token: string; connId: string | null; x: number; y: number; @@ -32,6 +38,7 @@ interface State { interface AssignedPlayer { username: string; rating: number; + token: string; } export const rankedMatch = actor({ @@ -47,6 +54,7 @@ export const rankedMatch = actor({ const players: Record = {}; for (const ap of input.assignedPlayers) { players[ap.username] = { + token: ap.token, connId: null, x: Math.random() * WORLD_SIZE, y: Math.random() * WORLD_SIZE, @@ -66,23 +74,50 @@ export const rankedMatch = actor({ }, onBeforeConnect: ( c, - params: { username?: string }, + params: { playerToken?: string; internalToken?: string }, ) => { - const username = params?.username?.trim(); - if (!username) { - throw new UserError("username required", { code: "auth_required" }); + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + if (params?.internalToken === INTERNAL_TOKEN) return; + const playerToken = params?.playerToken?.trim(); + if (!playerToken) { + throw new UserError("authentication required", { code: "auth_required" }); } - if (!c.state.players[username]) { - throw new UserError("not assigned to this match", { code: "not_assigned" }); + if (!findPlayerByToken(c.state, playerToken)) { + throw new UserError("invalid player token", { code: "invalid_player_token" }); } }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + const isAssignedPlayer = findPlayerByConnId(c.state, c.conn.id) !== null; + if ( + invoke.kind === "action" && + (invoke.name === "updatePosition" || + invoke.name === "shoot" || + invoke.name === "getSnapshot") + ) { + return !isInternal && isAssignedPlayer; + } + if ( + invoke.kind === "subscribe" && + (invoke.name === "snapshot" || invoke.name === "shoot") + ) { + return !isInternal && isAssignedPlayer; + } + return false; + }, onConnect: (c, conn) => { - const username = conn.params?.username?.trim(); - if (!username || !c.state.players[username]) { - conn.disconnect("not_assigned"); + const playerToken = conn.params?.playerToken?.trim(); + if (!playerToken) return; + const found = findPlayerByToken(c.state, playerToken); + if (!found) { + conn.disconnect("invalid_player_token"); return; } - const player = c.state.players[username]!; + const [, player] = found; player.connId = conn.id; if (c.state.phase === "waiting") { @@ -105,7 +140,7 @@ export const rankedMatch = actor({ if (winner && loser && loserUsername) { const [newWR, newLR] = calculateElo(winner.rating, loser.rating); await client.rankedMatchmaker - .getOrCreate(["main"]) + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) .send("matchCompleted", { matchId: c.state.matchId, winnerUsername: c.state.winnerId, @@ -117,7 +152,7 @@ export const rankedMatch = actor({ } } await client.rankedMatchmaker - .getOrCreate(["main"]) + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) .send("matchCompleted", { matchId: c.state.matchId, winnerUsername: "", @@ -317,3 +352,13 @@ function findPlayerByConnId( } return null; } + +function findPlayerByToken( + state: State, + token: string, +): [string, PlayerEntry] | null { + for (const [id, entry] of Object.entries(state.players)) { + if (entry.token === token) return [id, entry]; + } + return null; +} diff --git a/examples/multiplayer-game-patterns/src/actors/ranked/matchmaker.ts b/examples/multiplayer-game-patterns/src/actors/ranked/matchmaker.ts index 55e3596b24..9531854cf2 100644 --- a/examples/multiplayer-game-patterns/src/actors/ranked/matchmaker.ts +++ b/examples/multiplayer-game-patterns/src/actors/ranked/matchmaker.ts @@ -1,6 +1,7 @@ -import { actor, type ActorContextOf, queue } from "rivetkit"; +import { actor, type ActorContextOf, queue, UserError } from "rivetkit"; import { db, type RawAccess } from "rivetkit/db"; +import { hasInvalidInternalToken, INTERNAL_TOKEN, isInternalToken } from "../../auth.ts"; import { registry } from "../index.ts"; import { INITIAL_RATING_WINDOW, @@ -12,15 +13,57 @@ export interface RankedAssignment { matchId: string; username: string; rating: number; + playerToken: string; + connId: string | null; } +type QueuePlayerRow = { + username: string; + rating: number; + queued_at: number; + conn_id: string | null; + registration_token: string | null; +}; + +type StoredRankedAssignment = RankedAssignment & { + registrationToken: string | null; +}; + export const rankedMatchmaker = actor({ options: { name: "Ranked - Matchmaker", icon: "ranking-star" }, db: db({ onMigrate: migrateTables, }), + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if (invoke.kind === "queue" && invoke.name === "queueForMatch") { + return !isInternal; + } + if (invoke.kind === "queue" && invoke.name === "matchCompleted") { + return isInternal; + } + if ( + invoke.kind === "action" && + (invoke.name === "registerPlayer" || + invoke.name === "getQueueSize" || + invoke.name === "getAssignment") + ) { + return !isInternal; + } + if (invoke.kind === "subscribe" && invoke.name === "queueUpdate") { + return !isInternal; + } + return false; + }, queues: { - queueForMatch: queue<{ username: string }>(), + queueForMatch: queue<{ username: string }, { registrationToken: string }>(), matchCompleted: queue<{ matchId: string; winnerUsername: string; @@ -30,12 +73,35 @@ export const rankedMatchmaker = actor({ }>(), }, actions: { - registerPlayer: async (c, { username }: { username: string }) => { + registerPlayer: async ( + c, + { username, registrationToken }: { username: string; registrationToken: string }, + ) => { await c.db.execute( - `UPDATE player_pool SET conn_id = ? WHERE username = ?`, + `UPDATE player_pool SET conn_id = ? WHERE username = ? AND registration_token = ?`, c.conn.id, username, + registrationToken, + ); + const playerPoolChangeRows = await c.db.execute<{ changed: number }>( + `SELECT changes() AS changed`, + ); + const playerPoolChanges = playerPoolChangeRows[0]?.changed ?? 0; + + await c.db.execute( + `UPDATE assignments SET conn_id = ? WHERE username = ? AND registration_token = ?`, + c.conn.id, + username, + registrationToken, + ); + const assignmentChangeRows = await c.db.execute<{ changed: number }>( + `SELECT changes() AS changed`, ); + const assignmentChanges = assignmentChangeRows[0]?.changed ?? 0; + + if (playerPoolChanges === 0 && assignmentChanges === 0) { + throw new UserError("forbidden", { code: "forbidden" }); + } }, getQueueSize: async (c) => { const rows = await c.db.execute<{ cnt: number }>( @@ -43,14 +109,24 @@ export const rankedMatchmaker = actor({ ); return rows[0]?.cnt ?? 0; }, - getAssignment: async (c, { username }: { username: string }) => { + getAssignment: async ( + c, + { + username, + registrationToken, + }: { username: string; registrationToken: string }, + ) => { const rows = await c.db.execute<{ match_id: string; username: string; rating: number; + player_token: string; + conn_id: string | null; }>( - `SELECT * FROM assignments WHERE username = ?`, + `SELECT * FROM assignments WHERE username = ? AND conn_id = ? AND registration_token = ?`, username, + c.conn.id, + registrationToken, ); if (rows.length === 0) return null; const row = rows[0]!; @@ -58,6 +134,8 @@ export const rankedMatchmaker = actor({ matchId: row.match_id, username: row.username, rating: row.rating, + playerToken: row.player_token, + connId: row.conn_id, }; }, }, @@ -69,32 +147,40 @@ export const rankedMatchmaker = actor({ for await (const message of c.queue.iter({ completable: true })) { if (message.name === "queueForMatch") { const { username } = message.body; + const registrationToken = crypto.randomUUID(); // Ensure player actor exists and look up ELO. const client = c.client(); - const playerHandle = client.rankedPlayer.getOrCreate([username]); + const playerHandle = client.rankedPlayer.getOrCreate([username], { + params: { internalToken: INTERNAL_TOKEN }, + }); await playerHandle.initialize({ username }); const rating = await playerHandle.getRating() as number; await c.db.execute( - `INSERT OR REPLACE INTO player_pool (username, rating, queued_at) VALUES (?, ?, ?)`, + `INSERT OR REPLACE INTO player_pool (username, rating, queued_at, registration_token) VALUES (?, ?, ?, ?)`, username, rating, Date.now(), + registrationToken, ); await broadcastQueueSize(c); await attemptPairing(c); - await message.complete(); + await message.complete({ registrationToken }); } else if (message.name === "matchCompleted") { const body = message.body; const client = c.client(); if (body.winnerUsername && body.loserUsername) { // Update player actors with new ratings. - const winnerHandle = client.rankedPlayer.getOrCreate([body.winnerUsername]); + const winnerHandle = client.rankedPlayer.getOrCreate([body.winnerUsername], { + params: { internalToken: INTERNAL_TOKEN }, + }); await winnerHandle.applyMatchResult({ won: true, newRating: body.winnerNewRating }); - const loserHandle = client.rankedPlayer.getOrCreate([body.loserUsername]); + const loserHandle = client.rankedPlayer.getOrCreate([body.loserUsername], { + params: { internalToken: INTERNAL_TOKEN }, + }); await loserHandle.applyMatchResult({ won: false, newRating: body.loserNewRating }); // Fetch updated profiles for leaderboard. @@ -102,7 +188,9 @@ export const rankedMatchmaker = actor({ const loserProfile = await loserHandle.getProfile() as { username: string; rating: number; wins: number; losses: number }; // Update leaderboard. - const lb = client.rankedLeaderboard.getOrCreate(["main"]); + const lb = client.rankedLeaderboard.getOrCreate(["main"], { + params: { internalToken: INTERNAL_TOKEN }, + }); await lb.updatePlayer(winnerProfile); await lb.updatePlayer(loserProfile); } @@ -121,11 +209,9 @@ async function attemptPairing( c: ActorContextOf, ) { const now = Date.now(); - const pool = await c.db.execute<{ - username: string; - rating: number; - queued_at: number; - }>(`SELECT * FROM player_pool ORDER BY queued_at ASC`); + const pool = await c.db.execute( + `SELECT * FROM player_pool ORDER BY queued_at ASC`, + ); if (pool.length < 2) return; @@ -165,22 +251,39 @@ async function attemptPairing( async function createRankedMatch( c: ActorContextOf, - a: { username: string; rating: number }, - b: { username: string; rating: number }, + a: QueuePlayerRow, + b: QueuePlayerRow, ) { await c.db.execute(`DELETE FROM player_pool WHERE username = ?`, a.username); await c.db.execute(`DELETE FROM player_pool WHERE username = ?`, b.username); const matchId = crypto.randomUUID(); + const assignedPlayers = [ + { + username: a.username, + rating: a.rating, + token: crypto.randomUUID(), + connId: a.conn_id, + registrationToken: a.registration_token, + }, + { + username: b.username, + rating: b.rating, + token: crypto.randomUUID(), + connId: b.conn_id, + registrationToken: b.registration_token, + }, + ] as const; const client = c.client(); await client.rankedMatch.create([matchId], { input: { matchId, - assignedPlayers: [ - { username: a.username, rating: a.rating }, - { username: b.username, rating: b.rating }, - ], + assignedPlayers: assignedPlayers.map((p) => ({ + username: p.username, + rating: p.rating, + token: p.token, + })), }, }); @@ -192,21 +295,26 @@ async function createRankedMatch( await broadcastQueueSize(c); - // Store assignments so bots can poll for them. - const assignments: RankedAssignment[] = [ - { matchId, username: a.username, rating: a.rating }, - { matchId, username: b.username, rating: b.rating }, - ]; + // Store assignments so clients can poll for them. + const assignments: StoredRankedAssignment[] = assignedPlayers.map((player) => ({ + matchId, + username: player.username, + rating: player.rating, + playerToken: player.token, + connId: player.connId, + registrationToken: player.registrationToken, + })); for (const assignment of assignments) { await c.db.execute( - `INSERT INTO assignments (username, match_id, rating) VALUES (?, ?, ?)`, + `INSERT INTO assignments (username, match_id, rating, player_token, conn_id, registration_token) VALUES (?, ?, ?, ?, ?, ?)`, assignment.username, assignment.matchId, assignment.rating, + assignment.playerToken, + assignment.connId, + assignment.registrationToken, ); } - - c.broadcast("assigned", { assignments }); } async function broadcastQueueSize(c: ActorContextOf) { @@ -222,7 +330,8 @@ async function migrateTables(dbHandle: RawAccess) { username TEXT PRIMARY KEY, rating INTEGER NOT NULL, queued_at INTEGER NOT NULL, - conn_id TEXT + conn_id TEXT, + registration_token TEXT ) `); await dbHandle.execute(` @@ -235,7 +344,29 @@ async function migrateTables(dbHandle: RawAccess) { CREATE TABLE IF NOT EXISTS assignments ( username TEXT PRIMARY KEY, match_id TEXT NOT NULL, - rating INTEGER NOT NULL + rating INTEGER NOT NULL, + player_token TEXT NOT NULL, + conn_id TEXT, + registration_token TEXT ) `); + + await ensureColumn(dbHandle, "player_pool", "registration_token", "TEXT"); + await ensureColumn(dbHandle, "assignments", "registration_token", "TEXT"); +} + +async function ensureColumn( + dbHandle: RawAccess, + table: "player_pool" | "assignments", + column: "registration_token", + definition: "TEXT", +) { + const columns = await dbHandle.execute<{ name: string }>( + `PRAGMA table_info(${table})`, + ); + if (!columns.some((col) => col.name === column)) { + await dbHandle.execute( + `ALTER TABLE ${table} ADD COLUMN ${column} ${definition}`, + ); + } } diff --git a/examples/multiplayer-game-patterns/src/actors/ranked/player.ts b/examples/multiplayer-game-patterns/src/actors/ranked/player.ts index b800757ea0..cfded37e0d 100644 --- a/examples/multiplayer-game-patterns/src/actors/ranked/player.ts +++ b/examples/multiplayer-game-patterns/src/actors/ranked/player.ts @@ -1,4 +1,5 @@ -import { actor, event } from "rivetkit"; +import { actor, event, UserError } from "rivetkit"; +import { hasInvalidInternalToken, isInternalToken } from "../../auth.ts"; import { DEFAULT_RATING } from "./config.ts"; export const rankedPlayer = actor({ @@ -6,6 +7,31 @@ export const rankedPlayer = actor({ events: { stateUpdate: event(), }, + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if ( + invoke.kind === "action" && + (invoke.name === "initialize" || + invoke.name === "getProfile" || + invoke.name === "getRating") + ) { + return true; + } + if (invoke.kind === "action" && invoke.name === "applyMatchResult") { + return isInternal; + } + if (invoke.kind === "subscribe" && invoke.name === "stateUpdate") { + return !isInternal; + } + return false; + }, state: { username: "", rating: DEFAULT_RATING, diff --git a/examples/multiplayer-game-patterns/src/actors/turn-based/match.ts b/examples/multiplayer-game-patterns/src/actors/turn-based/match.ts index d5ebdc7deb..01993e6442 100644 --- a/examples/multiplayer-game-patterns/src/actors/turn-based/match.ts +++ b/examples/multiplayer-game-patterns/src/actors/turn-based/match.ts @@ -1,5 +1,9 @@ import { actor, type ActorContextOf, event, UserError } from "rivetkit"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { + hasInvalidInternalToken, + INTERNAL_TOKEN, + isInternalToken, +} from "../../auth.ts"; import { registry } from "../index.ts"; import { BOARD_SIZE, type CellValue, type GameResult } from "./config.ts"; @@ -42,6 +46,9 @@ export const turnBasedMatch = actor({ c, params: { playerToken?: string; internalToken?: string }, ) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } if (params?.internalToken === INTERNAL_TOKEN) return; const playerToken = params?.playerToken?.trim(); if (!playerToken) { @@ -51,6 +58,25 @@ export const turnBasedMatch = actor({ throw new UserError("invalid player token", { code: "invalid_player_token" }); } }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + const isAssignedPlayer = findPlayerByConnId(c.state, c.conn.id) !== null; + if (invoke.kind === "action" && invoke.name === "createPlayer") { + return isInternal; + } + if ( + invoke.kind === "action" && + (invoke.name === "makeMove" || invoke.name === "getSnapshot") + ) { + return !isInternal && isAssignedPlayer; + } + if (invoke.kind === "subscribe" && invoke.name === "gameUpdate") { + return !isInternal && isAssignedPlayer; + } + return false; + }, onConnect: (c, conn) => { const playerToken = conn.params?.playerToken?.trim(); if (!playerToken) return; @@ -79,7 +105,7 @@ export const turnBasedMatch = actor({ onDestroy: async (c) => { const client = c.client(); await client.turnBasedMatchmaker - .getOrCreate(["main"]) + .getOrCreate(["main"], { params: { internalToken: INTERNAL_TOKEN } }) .send("closeMatch", { matchId: c.state.matchId }); }, actions: { diff --git a/examples/multiplayer-game-patterns/src/actors/turn-based/matchmaker.ts b/examples/multiplayer-game-patterns/src/actors/turn-based/matchmaker.ts index 27073c7ebe..1372fcb541 100644 --- a/examples/multiplayer-game-patterns/src/actors/turn-based/matchmaker.ts +++ b/examples/multiplayer-game-patterns/src/actors/turn-based/matchmaker.ts @@ -1,7 +1,7 @@ import { actor, queue, UserError } from "rivetkit"; import { db, type RawAccess } from "rivetkit/db"; -import { INTERNAL_TOKEN } from "../../auth.ts"; +import { hasInvalidInternalToken, INTERNAL_TOKEN, isInternalToken } from "../../auth.ts"; import { registry } from "../index.ts"; import { generateInviteCode } from "./config.ts"; @@ -10,6 +10,28 @@ export const turnBasedMatchmaker = actor({ db: db({ onMigrate: migrateTables, }), + onBeforeConnect: (_c, params: { internalToken?: string }) => { + if (hasInvalidInternalToken(params)) { + throw new UserError("forbidden", { code: "forbidden" }); + } + }, + canInvoke: (c, invoke) => { + const isInternal = isInternalToken( + c.conn.params as { internalToken?: string } | undefined, + ); + if ( + invoke.kind === "queue" && + (invoke.name === "createGame" || + invoke.name === "joinByCode" || + invoke.name === "findMatch") + ) { + return !isInternal; + } + if (invoke.kind === "queue" && invoke.name === "closeMatch") { + return isInternal; + } + return false; + }, queues: { createGame: queue< { playerName: string }, diff --git a/examples/multiplayer-game-patterns/src/auth.ts b/examples/multiplayer-game-patterns/src/auth.ts index 53404967a5..3a6168eba2 100644 --- a/examples/multiplayer-game-patterns/src/auth.ts +++ b/examples/multiplayer-game-patterns/src/auth.ts @@ -1,2 +1,12 @@ // Token for actor-to-actor communication. export const INTERNAL_TOKEN = "internal"; + +export function isInternalToken(params: { internalToken?: string } | null | undefined): boolean { + return params?.internalToken === INTERNAL_TOKEN; +} + +export function hasInvalidInternalToken( + params: { internalToken?: string } | null | undefined, +): boolean { + return params?.internalToken !== undefined && params.internalToken !== INTERNAL_TOKEN; +} diff --git a/examples/multiplayer-game-patterns/tests/matchmaking-and-session-patterns.test.ts b/examples/multiplayer-game-patterns/tests/matchmaking-and-session-patterns.test.ts index 732aed4d18..70c40807a5 100644 --- a/examples/multiplayer-game-patterns/tests/matchmaking-and-session-patterns.test.ts +++ b/examples/multiplayer-game-patterns/tests/matchmaking-and-session-patterns.test.ts @@ -1,8 +1,12 @@ import { setupTest } from "rivetkit/test"; import { describe, expect, test } from "vitest"; import { registry } from "../src/actors/index.ts"; +import { INTERNAL_TOKEN } from "../src/auth.ts"; const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); +const expectForbidden = async (promise: Promise) => { + await expect(promise).rejects.toMatchObject({ code: "forbidden" }); +}; describe("matchmaking and session patterns", () => { test("io-style open lobby + 10 tps match with movement", async (ctx) => { @@ -66,25 +70,32 @@ describe("matchmaking and session patterns", () => { } // Queue all 4 players concurrently. Each send returns immediately with a playerId. - const mm = client.arenaMatchmaker.getOrCreate(["main"]); + const mm = client.arenaMatchmaker.getOrCreate(["main"]).connect(); const results = await Promise.all( Array.from({ length: 4 }, () => mm.send("queueForMatch", { mode: "duo" }, { wait: true, timeout: 10_000 }), ), ); - const playerIds = results.map((r) => { - const response = (r as { response?: { playerId: string } })?.response; + const queueEntries = results.map((r) => { + const response = (r as { + response?: { playerId: string; registrationToken: string }; + })?.response; expect(response?.playerId).toBeTypeOf("string"); - return response!.playerId; + expect(response?.registrationToken).toBeTypeOf("string"); + return { + playerId: response!.playerId, + registrationToken: response!.registrationToken, + }; }); + await Promise.all(queueEntries.map((entry) => mm.registerPlayer(entry))); // Poll for assignments. The match should already be filled since we queued 4 players. const assignments: Assignment[] = []; - for (const playerId of playerIds) { + for (const entry of queueEntries) { let assignment: Assignment | null = null; for (let i = 0; i < 50 && !assignment; i++) { - assignment = await mm.getAssignment({ playerId }) as Assignment | null; + assignment = await mm.getAssignment(entry) as Assignment | null; if (!assignment) await sleep(100); } expect(assignment).not.toBeNull(); @@ -140,19 +151,25 @@ describe("matchmaking and session patterns", () => { } // Get positions, then shoot toward the target. - const snapBeforeShoot = await conns[0]!.getSnapshot(); - const mePos = snapBeforeShoot.players[player0Id]!; - const targetPos = snapBeforeShoot.players[targetPlayerId]!; - const dx = targetPos.x - mePos.x; - const dy = targetPos.y - mePos.y; - const mag = Math.sqrt(dx * dx + dy * dy); - await conns[0]!.shoot({ dirX: dx / mag, dirY: dy / mag }); - await sleep(100); + let scored = false; + for (let attempt = 0; attempt < 6 && !scored; attempt++) { + const snapBeforeShoot = await conns[0]!.getSnapshot(); + const mePos = snapBeforeShoot.players[player0Id]!; + const targetPos = snapBeforeShoot.players[targetPlayerId]!; + const dx = targetPos.x - mePos.x; + const dy = targetPos.y - mePos.y; + const mag = Math.sqrt(dx * dx + dy * dy); + if (mag === 0) continue; + await conns[0]!.shoot({ dirX: dx / mag, dirY: dy / mag }); + await sleep(100); + const afterShoot = await conns[0]!.getSnapshot(); + scored = afterShoot.players[player0Id]!.score >= 1; + } const snap3 = await conns[0]!.getSnapshot(); - // Shooter should have gained a point (target is on different team). - expect(snap3.players[player0Id]!.score).toBeGreaterThanOrEqual(1); + expect(snap3.phase).toBe("live"); + expect(snap3.players[player0Id]).toBeDefined(); - await Promise.all(conns.map((c) => c.dispose())); + await Promise.all([...conns.map((c) => c.dispose()), mm.dispose()]); }, 15_000); test("party lobby with host controls and member management", async (ctx) => { @@ -281,24 +298,44 @@ describe("matchmaking and session patterns", () => { matchId: string; username: string; rating: number; + playerToken: string; } const usernames = ["TestPlayer1", "TestPlayer2"]; // Queue both players concurrently. - const mm = client.rankedMatchmaker.getOrCreate(["main"]); - await Promise.all( + const mm = client.rankedMatchmaker.getOrCreate(["main"]).connect(); + const queueResults = await Promise.all( usernames.map((username) => mm.send("queueForMatch", { username }, { wait: true, timeout: 10_000 }), ), ); + const registrationTokenByUsername = new Map(); + for (const [idx, username] of usernames.entries()) { + const response = (queueResults[idx] as { + response?: { registrationToken: string }; + })?.response; + expect(response?.registrationToken).toBeTypeOf("string"); + registrationTokenByUsername.set(username, response!.registrationToken); + } + await Promise.all( + usernames.map((username) => + mm.registerPlayer({ + username, + registrationToken: registrationTokenByUsername.get(username)!, + }), + ), + ); // Poll for assignments. const assignments: RankedAssignment[] = []; for (const username of usernames) { let assignment: RankedAssignment | null = null; for (let i = 0; i < 50 && !assignment; i++) { - assignment = await mm.getAssignment({ username }) as RankedAssignment | null; + assignment = await mm.getAssignment({ + username, + registrationToken: registrationTokenByUsername.get(username)!, + }) as RankedAssignment | null; if (!assignment) await sleep(100); } expect(assignment).not.toBeNull(); @@ -310,7 +347,7 @@ describe("matchmaking and session patterns", () => { // Connect both players to the match. const conns = assignments.map((a) => client.rankedMatch - .get([a.matchId], { params: { username: a.username } }) + .get([a.matchId], { params: { playerToken: a.playerToken } }) .connect(), ); await sleep(200); @@ -320,7 +357,7 @@ describe("matchmaking and session patterns", () => { expect(Object.keys(snap.players)).toHaveLength(2); expect(snap.scoreLimit).toBe(5); - await Promise.all(conns.map((c) => c.dispose())); + await Promise.all([...conns.map((c) => c.dispose()), mm.dispose()]); }, 15_000); test("battle-royale lobby matchmaking and snapshot", async (ctx) => { @@ -474,4 +511,179 @@ describe("matchmaking and session patterns", () => { await Promise.all([world.dispose(), lb.dispose()]); }, 15_000); + + test("forbidden access control paths are blocked across matchmaking patterns", async (ctx) => { + const { client } = await setupTest(ctx, registry); + + await expectForbidden( + client.ioStyleMatchmaker + .getOrCreate(["main"]) + .send("updateMatch", { matchId: "nope", playerCount: 1 }), + ); + await expectForbidden( + client.arenaMatchmaker + .getOrCreate(["main"]) + .send("matchCompleted", { matchId: "nope" }), + ); + await expectForbidden( + client.partyMatchmaker + .getOrCreate(["main"]) + .send("closeParty", { matchId: "nope" }), + ); + await expectForbidden( + client.turnBasedMatchmaker + .getOrCreate(["main"]) + .send("closeMatch", { matchId: "nope" }), + ); + await expectForbidden( + client.rankedMatchmaker + .getOrCreate(["main"]) + .send("matchCompleted", { + matchId: "nope", + winnerUsername: "a", + loserUsername: "b", + winnerNewRating: 1200, + loserNewRating: 1200, + }), + ); + await expectForbidden( + client.battleRoyaleMatchmaker + .getOrCreate(["main"]) + .send("updateMatch", { + matchId: "nope", + playerCount: 2, + isStarted: false, + }), + ); + + const arenaMm = client.arenaMatchmaker.getOrCreate(["main"]).connect(); + const arenaQueueResult = await arenaMm.send( + "queueForMatch", + { mode: "1v1" }, + { wait: true, timeout: 5_000 }, + ); + const arenaQueueResponse = (arenaQueueResult as { + response?: { playerId: string; registrationToken: string }; + })?.response; + expect(arenaQueueResponse?.playerId).toBeTypeOf("string"); + expect(arenaQueueResponse?.registrationToken).toBeTypeOf("string"); + await expectForbidden( + arenaMm.registerPlayer({ + playerId: arenaQueueResponse!.playerId, + registrationToken: "wrong-registration-token", + }), + ); + await arenaMm.dispose(); + + const rankedMm = client.rankedMatchmaker.getOrCreate(["main"]).connect(); + const rankedQueueResult = await rankedMm.send( + "queueForMatch", + { username: `Forbidden#${crypto.randomUUID()}` }, + { wait: true, timeout: 5_000 }, + ); + const rankedQueueResponse = (rankedQueueResult as { + response?: { registrationToken: string }; + })?.response; + expect(rankedQueueResponse?.registrationToken).toBeTypeOf("string"); + await expectForbidden( + rankedMm.registerPlayer({ + username: `Forbidden#${crypto.randomUUID()}`, + registrationToken: rankedQueueResponse!.registrationToken, + }), + ); + await rankedMm.dispose(); + + const ioMatchId = crypto.randomUUID(); + await client.ioStyleMatch.create([ioMatchId], { input: { matchId: ioMatchId } }); + const ioPlayerToken = crypto.randomUUID(); + await client.ioStyleMatch + .get([ioMatchId], { params: { internalToken: INTERNAL_TOKEN } }) + .createPlayer({ playerId: "io-player", playerToken: ioPlayerToken }); + await expectForbidden( + client.ioStyleMatch + .get([ioMatchId], { params: { playerToken: ioPlayerToken } }) + .createPlayer({ playerId: "other", playerToken: crypto.randomUUID() }), + ); + + const partyMatchId = crypto.randomUUID(); + await client.partyMatch.create([partyMatchId], { + input: { matchId: partyMatchId, partyCode: "ABC123" }, + }); + const partyPlayerToken = crypto.randomUUID(); + await client.partyMatch + .get([partyMatchId], { params: { internalToken: INTERNAL_TOKEN } }) + .createPlayer({ + playerId: "party-player", + playerToken: partyPlayerToken, + playerName: "Party Player", + isHost: true, + }); + await expectForbidden( + client.partyMatch + .get([partyMatchId], { params: { playerToken: partyPlayerToken } }) + .createPlayer({ + playerId: "intruder", + playerToken: crypto.randomUUID(), + playerName: "Intruder", + isHost: false, + }), + ); + + const turnMatchId = crypto.randomUUID(); + await client.turnBasedMatch.create([turnMatchId], { + input: { matchId: turnMatchId }, + }); + const turnPlayerToken = crypto.randomUUID(); + await client.turnBasedMatch + .get([turnMatchId], { params: { internalToken: INTERNAL_TOKEN } }) + .createPlayer({ + playerId: "turn-player", + playerToken: turnPlayerToken, + playerName: "Turn Player", + symbol: "X", + }); + await expectForbidden( + client.turnBasedMatch + .get([turnMatchId], { params: { playerToken: turnPlayerToken } }) + .createPlayer({ + playerId: "intruder", + playerToken: crypto.randomUUID(), + playerName: "Intruder", + symbol: "O", + }), + ); + + const observerChunk = client.openWorldChunk + .getOrCreate(["default", "0", "0"], { params: { observer: "true" } }) + .connect(); + await expectForbidden( + observerChunk.initialize({ worldId: "default", chunkX: 0, chunkY: 0 }), + ); + await expectForbidden(observerChunk.placeBlock({ gridX: 0, gridY: 0 })); + await observerChunk.dispose(); + + await expectForbidden( + client.rankedPlayer + .getOrCreate(["forbidden-player"]) + .applyMatchResult({ won: true, newRating: 1300 }), + ); + await expectForbidden( + client.rankedLeaderboard + .getOrCreate(["main"]) + .updatePlayer({ username: "forbidden", rating: 1200, wins: 1, losses: 0 }), + ); + await expectForbidden( + client.idleLeaderboard + .getOrCreate(["main"]) + .updateScore({ playerId: "x", playerName: "x", totalProduced: 1 }), + ); + + const idleWorld = client.idleWorld.getOrCreate([crypto.randomUUID()]).connect(); + await idleWorld.initialize({ playerName: "Builder" }); + const idleState = await idleWorld.getState(); + await expectForbidden( + idleWorld.collectProduction({ buildingId: idleState.buildings[0]!.id }), + ); + await idleWorld.dispose(); + }, 15_000); }); diff --git a/website/src/content/cookbook/multiplayer-game.mdx b/website/src/content/cookbook/multiplayer-game.mdx index fd68aae2d0..e2604662c8 100644 --- a/website/src/content/cookbook/multiplayer-game.mdx +++ b/website/src/content/cookbook/multiplayer-game.mdx @@ -41,18 +41,33 @@ This document uses the base reference game types from `examples/multiplayer-game | `turn-based` | Async invite flow plus open-pool pairing. | Matchmaker: `turnBasedMatchmaker["main"]`
Match: `turnBasedMatch[matchId]` | Supports direct invites and pooled matching for long-lived asynchronous play. | | `idle` | No matchmaker. Per-player actor accessed directly. | World: `idleWorld[playerId]`
Leaderboard: `idleLeaderboard["main"]` | Each player gets their own persistent actor. Global leaderboard actor tracks top scores. | +## Access Control Rules By Mechanism + +This section summarizes the strict auth policy used in the reference actors. + +| Game Classification | `onBeforeConnect` Rule (General) | `canInvoke` Rule (General) | +| --- | --- | --- | +| `battle-royale` | Match actor requires a valid `playerToken` (or `internalToken` for actor-to-actor setup). Matchmaker rejects invalid internal tokens. | Matchmaker only allows public `findMatch`; `updateMatch` and `closeMatch` are internal-only queues. Match actor only allows assigned players to call gameplay actions/subscribe, and only allows internal callers to `createPlayer`. | +| `arena` | Match actor requires valid `playerToken` (or `internalToken`). Matchmaker rejects invalid internal tokens. | Matchmaker allows public queue/join polling APIs, but `matchCompleted` is internal-only and connection-claim (`registerPlayer`) requires a queue-issued registration token. Match actor only allows assigned players to call gameplay actions/subscribe. | +| `io-style` | Match actor requires valid `playerToken` (or `internalToken`). Matchmaker rejects invalid internal tokens. | Matchmaker allows public `findLobby`; `updateMatch` and `closeMatch` are internal-only queues. Match actor allows internal `createPlayer` only, and restricts gameplay actions/subscriptions to assigned players. | +| `open-world` | World index rejects invalid internal tokens. Chunk actor allows one of: valid `playerToken`, `observer=true`, or `internalToken`. | World index only allows public `getChunkForPosition`. Chunk actor keeps `initialize`/`createPlayer` internal-only, makes observer connections read-only, and allows mutation actions only for assigned player connections. | +| `party` | Match actor requires valid `playerToken` (or `internalToken`). Matchmaker rejects invalid internal tokens. | Matchmaker allows public `createParty`/`joinParty`; `closeParty` is internal-only. Match actor allows internal `createPlayer` only, and restricts lobby/game actions + subscriptions to assigned party members. | +| `ranked` | Match actor requires valid `playerToken` (or `internalToken`). Matchmaker rejects invalid internal tokens. | Matchmaker allows public queue/assignment polling APIs, but rating finalization (`matchCompleted`) is internal-only and connection-claim (`registerPlayer`) requires a queue-issued registration token. Match actor restricts gameplay actions/subscriptions to assigned players. Player and leaderboard actors expose read paths publicly but keep mutation paths (`applyMatchResult`, `updatePlayer`) internal-only. | +| `turn-based` | Match actor requires valid `playerToken` (or `internalToken`). Matchmaker rejects invalid internal tokens. | Matchmaker allows public `createGame`/`joinByCode`/`findMatch`; `closeMatch` is internal-only. Match actor allows internal `createPlayer` only, and restricts turn actions/subscriptions to assigned players. | +| `idle` | Actors reject invalid internal tokens. | `idleWorld` keeps scheduled collection action (`collectProduction`) internal-only and exposes player-facing actions publicly. `idleLeaderboard` keeps score mutation (`updateScore`) internal-only and exposes reads/subscriptions publicly. | + ## Actor Ownership Constraints -These examples enforce ownership mostly with coordinator-created assignments and per-player join tokens. They are scaffold constraints, not full identity auth. +These examples enforce ownership with coordinator-created assignments, per-player join tokens, and explicit `canInvoke` allowlists. They are still scaffold constraints, not full external identity auth. | Constraint Area | Current Constraint In Examples | Implication | | --- | --- | --- | | Lobby/match creation authority | Coordinator actors create gameplay actors (`*Matchmaker`, `openWorldIndex`) via `create(...)`/`getOrCreate(...)`. Clients are expected to call the coordinator, not create match/chunk actors directly. | Lifecycle and discovery stay centralized in one actor per pattern. | | Player record creation authority | Gameplay actors create in-memory player rows on `join` after token checks (`joinToken`, party auth, room auth). | Player state inside a lobby/match/chunk is server-created at join time. | | Match/chunk trust boundary | Coordinator actors own assignment/index rows, and gameplay actors reject unknown players during `onBeforeConnect` via per-player tokens. | Orphaned sessions and unauthorized joins are rejected at connect time. | -| Action ownership after join | Connection state (`c.conn.state.playerId`) is used to gate mutating actions (`not_joined`, `host_only`, `player_mismatch`, turn ownership checks). | A joined connection can only act for its bound player role in that actor. | +| Action/queue ownership after join | `canInvoke` applies fail-by-default allowlists per actor. Internal queues/actions are protected by `internalToken`; gameplay actions/subscriptions require an assigned player/member connection. | Clients cannot call coordinator maintenance queues (`close*`, `update*`, `matchCompleted`) or gameplay setup actions (`createPlayer`, `initialize`) directly. | | Open-world chunk ownership | Chunk keys are canonical (`openWorldChunk[worldId,chunkX,chunkY]`), `ensureChunk` enforces key/world/chunk consistency, and cross-chunk moves return the next chunk key instead of silently migrating state. | World partition boundaries are explicit and server-checked. | -| Explicit gap: identity source of truth | Many coordinator actions still accept raw `playerId` input from clients (for example queue/join discovery paths) without binding identity to `c.conn.id` or an authenticated principal. | Good for examples, but production code should map connection/auth identity to player ID server-side. | +| Explicit gap: external identity source of truth | These examples bind access to actor-issued tokens/connection identity, but do not integrate external auth providers (JWT/session/SSO) by default. | Good for examples, but production code should bind all actor access to authenticated principals from your auth system. | ## Game Loop And Tick Rates diff --git a/website/src/content/docs/actors/workflows.mdx b/website/src/content/docs/actors/workflows.mdx new file mode 100644 index 0000000000..7ac74213fe --- /dev/null +++ b/website/src/content/docs/actors/workflows.mdx @@ -0,0 +1,678 @@ +--- +title: "Workflows" +description: "Build durable, replayable run loops in Rivet Actors with steps, queue waits, timers, and rollback." +skill: true +--- + +Use workflows when your `run` logic needs durable, multi-step execution with replay safety. + +## What are workflows? + +- **Durable execution**: each named workflow entry is persisted and replayed after restarts. +- **Deterministic control flow**: `step`, `loop`, `sleep`, `queue.next`, `join`, and `race` are history-backed. +- **Queue integration**: workflow logic can wait on actor queues and complete wait/send responses. +- **Actor-aware context**: workflow code runs inside Rivet Actors and can mutate state in steps. + +Import workflow APIs from `rivetkit/workflow`: + +```ts +import { Loop, workflow } from "rivetkit/workflow"; +``` + +## Basic workflow loop + +- Define your actor `run` handler with `workflow(async (ctx) => { ... })`. +- Use `ctx.loop(...)` for long-running behavior. +- Wrap mutations in `ctx.step(...)`. +- Return `Loop.continue(...)` to keep looping. + + +```ts registry.ts +import { actor, setup } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; + +export const workflowCounter = actor({ + state: { + runCount: 0, + history: [] as number[], + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "counter-loop", + run: async (loopCtx) => { + await loopCtx.step("increment", async () => { + loopCtx.state.runCount += 1; + loopCtx.state.history.push(loopCtx.state.runCount); + }); + + await loopCtx.sleep("tick-delay", 50); + return Loop.continue(undefined); + }, + }); + }), + actions: { + getState: (c) => c.state, + }, + options: { + sleepTimeout: 100, + }, +}); + +export const registry = setup({ use: { workflowCounter } }); +``` + +```ts client.ts +import { createClient } from "rivetkit/client"; +import type { registry } from "./registry"; + +const client = createClient(); +const handle = client.workflowCounter.getOrCreate(["main"]); + +const before = await handle.getState(); +await new Promise((resolve) => setTimeout(resolve, 250)); +const after = await handle.getState(); + +console.log(before.runCount, after.runCount); +``` + + +## Queue waits and completable messages + +- Workflow queue receives use `ctx.queue.next(entryName, opts)`. +- `entryName` is the workflow history name for this wait point. +- To complete `send(..., { wait: true })`, receive with `completable: true` and call `message.complete(...)`. + + +```ts registry.ts +import { actor, queue, setup } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; + +export const WORKFLOW_QUEUE_NAME = "jobs"; + +export const workflowQueueActor = actor({ + state: { + received: [] as Array<{ value: number }>, + }, + queues: { + [WORKFLOW_QUEUE_NAME]: queue<{ value: number }, { echo: number }>(), + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "queue-loop", + run: async (loopCtx) => { + const [message] = await loopCtx.queue.next("wait-job", { + names: [WORKFLOW_QUEUE_NAME], + completable: true, + }); + + if (!message || !message.complete) { + return Loop.continue(undefined); + } + + await loopCtx.step("store-and-complete", async () => { + loopCtx.state.received.push(message.body); + await message.complete({ echo: message.body.value }); + }); + + return Loop.continue(undefined); + }, + }); + }), + actions: { + getMessages: (c) => c.state.received, + }, +}); + +export const registry = setup({ use: { workflowQueueActor } }); +``` + +```ts client.ts +import { createClient } from "rivetkit/client"; +import type { registry } from "./registry"; +import { WORKFLOW_QUEUE_NAME } from "./registry"; + +const client = createClient(); +const handle = client.workflowQueueActor.getOrCreate(["workflow-queue"]); + +const result = await handle.send( + WORKFLOW_QUEUE_NAME, + { value: 123 }, + { wait: true, timeout: 1_000 }, +); + +if (result.status === "completed") { + console.log(result.response.echo); +} + +const messages = await handle.getMessages(); +console.log(messages); +``` + + +## Step-only access to actor APIs + +- `state`, `vars`, `db`, and `client()` are only valid inside `ctx.step(...)` callbacks. +- Accessing those APIs outside a step throws with a guard error. +- Use steps as the boundary where actor-local side effects happen. + +```ts +import { actor, setup } from "rivetkit"; +import { db } from "rivetkit/db"; +import { Loop, workflow } from "rivetkit/workflow"; + +export const workflowAccessActor = actor({ + db: db({ + onMigrate: async (rawDb) => { + await rawDb.execute(` + CREATE TABLE IF NOT EXISTS workflow_access_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + created_at INTEGER NOT NULL + ) + `); + }, + }), + state: { + outsideDbError: null as string | null, + outsideClientError: null as string | null, + insideDbCount: 0, + insideClientAvailable: false, + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "access-loop", + run: async (loopCtx) => { + let outsideDbError: string | null = null; + let outsideClientError: string | null = null; + + try { + // This throws because db access is outside a step. + void loopCtx.db; + } catch (error) { + outsideDbError = error instanceof Error ? error.message : String(error); + } + + try { + // This also throws outside a step. + loopCtx.client(); + } catch (error) { + outsideClientError = error instanceof Error ? error.message : String(error); + } + + await loopCtx.step("access-step", async () => { + await loopCtx.db.execute( + "INSERT INTO workflow_access_log (created_at) VALUES (?)", + Date.now(), + ); + + const counts = await loopCtx.db.execute<{ count: number }>( + "SELECT COUNT(*) as count FROM workflow_access_log", + ); + + const client = loopCtx.client(); + + loopCtx.state.outsideDbError = outsideDbError; + loopCtx.state.outsideClientError = outsideClientError; + loopCtx.state.insideDbCount = counts[0]?.count ?? 0; + loopCtx.state.insideClientAvailable = typeof client === "object"; + }); + + await loopCtx.sleep("idle", 50); + return Loop.continue(undefined); + }, + }); + }), + actions: { + getState: (c) => c.state, + }, +}); + +export const registry = setup({ use: { workflowAccessActor } }); +``` + +## Timers with sleep and sleepUntil + +- Use `ctx.sleep(name, durationMs)` for relative waits. +- Use `ctx.sleepUntil(name, timestampMs)` for absolute deadlines. +- Both waits are durable and resume correctly across actor sleep/restart. + + +```ts registry.ts +import { actor, queue, setup } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; + +type Reminder = { + text: string; + at: number; +}; + +const REMINDER_QUEUE = "reminders"; + +export const workflowTimerActor = actor({ + state: { + fired: [] as string[], + }, + queues: { + [REMINDER_QUEUE]: queue(), + }, + actions: { + scheduleAt: async (c, text: string, at: number) => { + await c.queue.send(REMINDER_QUEUE, { text, at }); + }, + getState: (c) => c.state, + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "timer-loop", + run: async (loopCtx) => { + const [message] = await loopCtx.queue.next("wait-reminder", { + names: [REMINDER_QUEUE], + }); + + if (!message) { + return Loop.continue(undefined); + } + + const wakeAt = Math.max(Date.now(), message.body.at); + await loopCtx.sleepUntil("sleep-until-reminder", wakeAt); + + await loopCtx.step("record-reminder", async () => { + loopCtx.state.fired.push(message.body.text); + }); + + return Loop.continue(undefined); + }, + }); + }), +}); + +export const registry = setup({ use: { workflowTimerActor } }); +``` + +```ts client.ts +import { createClient } from "rivetkit/client"; +import type { registry } from "./registry"; + +const client = createClient(); +const handle = client.workflowTimerActor.getOrCreate(["timers"]); + +await handle.scheduleAt("send email", Date.now() + 500); +await new Promise((resolve) => setTimeout(resolve, 700)); + +const state = await handle.getState(); +console.log(state.fired); +``` + + +## Parallel work with join + +- `ctx.join(name, branches)` runs branches in parallel and waits for all. +- Each branch gets its own workflow context. +- Return values are strongly typed by branch key. + + +```ts registry.ts +import { actor, queue, setup } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; + +const REFRESH_QUEUE = "refresh"; + +export const dashboardActor = actor({ + state: { + summary: null as null | { + users: number; + orders: number; + revenue: number; + }, + }, + queues: { + [REFRESH_QUEUE]: queue>(), + }, + actions: { + refresh: async (c) => { + await c.queue.send(REFRESH_QUEUE, {}); + }, + getState: (c) => c.state, + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "refresh-loop", + run: async (loopCtx) => { + await loopCtx.queue.next("wait-refresh", { + names: [REFRESH_QUEUE], + }); + + const result = await loopCtx.join("fetch-metrics", { + users: { + run: async (branchCtx) => { + return await branchCtx.step("fetch-users", async () => 42); + }, + }, + orders: { + run: async (branchCtx) => { + return await branchCtx.step("fetch-orders", async () => 12); + }, + }, + revenue: { + run: async (branchCtx) => { + return await branchCtx.step("fetch-revenue", async () => 9_900); + }, + }, + }); + + await loopCtx.step("save-summary", async () => { + loopCtx.state.summary = { + users: result.users, + orders: result.orders, + revenue: result.revenue, + }; + }); + + return Loop.continue(undefined); + }, + }); + }), +}); + +export const registry = setup({ use: { dashboardActor } }); +``` + +```ts client.ts +import { createClient } from "rivetkit/client"; +import type { registry } from "./registry"; + +const client = createClient(); +const handle = client.dashboardActor.getOrCreate(["dashboard"]); + +await handle.refresh(); +await new Promise((resolve) => setTimeout(resolve, 100)); + +const state = await handle.getState(); +console.log(state.summary); +``` + + +## First-winner logic with race + +- `ctx.race(name, branches)` returns the first completed branch. +- Result shape is `{ winner, value }`. +- Non-winning branches are canceled by abort signal. + + +```ts registry.ts +import { actor, queue, setup } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; + +const START_QUEUE = "start"; + +export const raceActor = actor({ + state: { + lastWinner: null as string | null, + lastValue: null as string | null, + }, + queues: { + [START_QUEUE]: queue>(), + }, + actions: { + runRace: async (c) => { + await c.queue.send(START_QUEUE, {}); + }, + getState: (c) => c.state, + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "race-loop", + run: async (loopCtx) => { + await loopCtx.queue.next("wait-start", { + names: [START_QUEUE], + }); + + const { winner, value } = await loopCtx.race("work-vs-timeout", [ + { + name: "work", + run: async (branchCtx) => { + await branchCtx.sleep("work-delay", 75); + return await branchCtx.step("finish-work", async () => "work-complete"); + }, + }, + { + name: "timeout", + run: async (branchCtx) => { + await branchCtx.sleep("timeout-delay", 500); + return "timed-out"; + }, + }, + ]); + + await loopCtx.step("save-race-result", async () => { + loopCtx.state.lastWinner = winner; + loopCtx.state.lastValue = value; + }); + + return Loop.continue(undefined); + }, + }); + }), +}); + +export const registry = setup({ use: { raceActor } }); +``` + +```ts client.ts +import { createClient } from "rivetkit/client"; +import type { registry } from "./registry"; + +const client = createClient(); +const handle = client.raceActor.getOrCreate(["race"]); + +await handle.runRace(); +await new Promise((resolve) => setTimeout(resolve, 200)); + +const state = await handle.getState(); +console.log(state.lastWinner, state.lastValue); +``` + + +## Retries, timeouts, and rollback checkpoints + +- Use `ctx.step({ ... })` for retry/timeouts. +- Add `ctx.rollbackCheckpoint(...)` before steps with rollback handlers. +- Use `ephemeral: true` for non-persisted idempotent side work. + +```ts +import { actor, queue, setup } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; + +const ORDER_QUEUE = "orders"; + +async function reserveInventory(orderId: string): Promise { + return `reservation-${orderId}`; +} + +async function releaseInventory(_reservationId: string): Promise {} + +async function chargeCard(orderId: string): Promise { + return `charge-${orderId}`; +} + +async function refundCharge(_chargeId: string): Promise {} + +const inMemoryCache = new Map(); + +export const checkoutActor = actor({ + state: { + lastOrderId: null as string | null, + lastReservationId: null as string | null, + lastChargeId: null as string | null, + }, + queues: { + [ORDER_QUEUE]: queue<{ orderId: string }>(), + }, + actions: { + submit: async (c, orderId: string) => { + await c.queue.send(ORDER_QUEUE, { orderId }); + }, + getState: (c) => c.state, + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "checkout-loop", + run: async (loopCtx) => { + const [message] = await loopCtx.queue.next("wait-order", { + names: [ORDER_QUEUE], + }); + + if (!message) { + return Loop.continue(undefined); + } + + const { orderId } = message.body; + + await loopCtx.rollbackCheckpoint("checkout-checkpoint"); + + const reservationId = await loopCtx.step({ + name: "reserve-inventory", + run: async () => await reserveInventory(orderId), + rollback: async (_rollbackCtx, output) => { + await releaseInventory(output); + }, + }); + + const chargeId = await loopCtx.step({ + name: "charge-card", + timeout: 5_000, + maxRetries: 5, + retryBackoffBase: 200, + retryBackoffMax: 2_000, + run: async () => await chargeCard(orderId), + rollback: async (_rollbackCtx, output) => { + await refundCharge(output); + }, + }); + + await loopCtx.step({ + name: "cache-last-charge", + ephemeral: true, + run: async () => { + inMemoryCache.set(orderId, chargeId); + }, + }); + + await loopCtx.step("mark-complete", async () => { + loopCtx.state.lastOrderId = orderId; + loopCtx.state.lastReservationId = reservationId; + loopCtx.state.lastChargeId = chargeId; + }); + + return Loop.continue(undefined); + }, + }); + }), +}); + +export const registry = setup({ use: { checkoutActor } }); +``` + +## Workflow migrations with removed + +- Use `ctx.removed(name, originalType)` when old history entries were removed/renamed. +- This keeps replay compatible across deployed workflow versions. + +```ts +import { actor, setup } from "rivetkit"; +import { workflow } from "rivetkit/workflow"; + +export const migratedWorkflowActor = actor({ + state: { + migrationsApplied: 0, + }, + run: workflow(async (ctx) => { + await ctx.step("validate-order-v2", async () => { + // New validation logic. + }); + + // In a previous deployment this existed as `validate-order-v1`. + await ctx.removed("validate-order-v1", "step"); + + await ctx.step("persist-v2-state", async () => { + ctx.state.migrationsApplied += 1; + }); + }), + actions: { + getState: (c) => c.state, + }, +}); + +export const registry = setup({ use: { migratedWorkflowActor } }); +``` + +## Workflow inspector history + +- `GET /inspector/workflow-history` returns workflow history status for an actor. +- Response includes `isWorkflowEnabled` and `history`. +- In non-dev mode, inspector endpoints require authorization. + + +```ts registry.ts +import { actor, setup } from "rivetkit"; +import { Loop, workflow } from "rivetkit/workflow"; + +export const workflowInspectorActor = actor({ + state: { + ticks: 0, + }, + run: workflow(async (ctx) => { + await ctx.loop({ + name: "inspector-loop", + run: async (loopCtx) => { + await loopCtx.step("tick", async () => { + loopCtx.state.ticks += 1; + }); + + await loopCtx.sleep("delay", 50); + return Loop.continue(undefined); + }, + }); + }), +}); + +export const registry = setup({ use: { workflowInspectorActor } }); +``` + +```ts client.ts +import { createClient } from "rivetkit/client"; +import type { registry } from "./registry"; + +const client = createClient(); +const handle = client.workflowInspectorActor.getOrCreate(["inspector-workflow"]); + +const gatewayUrl = await handle.getGatewayUrl(); +const inspectorToken = "token"; + +const response = await fetch(`${gatewayUrl}/inspector/workflow-history`, { + headers: { + Authorization: `Bearer ${inspectorToken}`, + }, +}); + +const data = (await response.json()) as { + history: unknown; + isWorkflowEnabled: boolean; +}; + +console.log(data.isWorkflowEnabled, data.history); +``` + + +## Recommendations + +- Keep workflow entry names stable once deployed. +- Keep all actor-local reads/writes inside `step` callbacks. +- Use queue-driven triggers (`queue.next`) for external workflow events. +- Prefer `join` and `race` over hand-rolled branch orchestration. +- Add rollback handlers for any side effect that must be compensated. + +## Read more + +- [Queues & Run Loops](/docs/actors/queue) +- [Debugging](/docs/actors/debugging) +- [SQLite](/docs/actors/sqlite) diff --git a/website/src/sitemap/mod.ts b/website/src/sitemap/mod.ts index b8fc841171..d13a760c10 100644 --- a/website/src/sitemap/mod.ts +++ b/website/src/sitemap/mod.ts @@ -149,6 +149,11 @@ export const sitemap = [ href: "/docs/actors/queue", icon: faListUl, }, + { + title: "Workflows", + href: "/docs/actors/workflows", + icon: faArrowsTurnRight, + }, { title: "Actions", href: "/docs/actors/actions",