diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/queue.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/queue.ts new file mode 100644 index 0000000000..10f98b24a9 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/queue.ts @@ -0,0 +1,70 @@ +import { actor } from "rivetkit"; +import type { registry } from "./registry"; + +export const queueActor = actor({ + state: {}, + actions: { + receiveOne: async ( + c, + name: string, + opts?: { count?: number; timeout?: number }, + ) => { + const message = await c.queue.next(name, opts); + if (!message) { + return null; + } + return { name: message.name, body: message.body }; + }, + receiveMany: async ( + c, + names: string[], + opts?: { count?: number; timeout?: number }, + ) => { + const messages = await c.queue.next(names, opts); + return (messages ?? []).map( + (message: { name: string; body: unknown }) => ({ + name: message.name, + body: message.body, + }), + ); + }, + receiveRequest: async ( + c, + request: { + name: string | string[]; + count?: number; + timeout?: number; + }, + ) => { + const messages = await c.queue.next(request); + return (messages ?? []).map( + (message: { name: string; body: unknown }) => ({ + name: message.name, + body: message.body, + }), + ); + }, + sendToSelf: async (c, name: string, body: unknown) => { + const client = c.client(); + const handle = client.queueActor.getForId(c.actorId); + await handle.queue[name].send(body); + return true; + }, + waitForAbort: async (c) => { + setTimeout(() => { + c.destroy(); + }, 10); + await c.queue.next("abort", { timeout: 10_000 }); + return true; + }, + }, +}); + +export const queueLimitedActor = actor({ + state: {}, + actions: {}, + options: { + maxQueueSize: 1, + maxQueueMessageSize: 64, + }, +}); diff --git a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts index 0fdc80f7a8..5ee3f7b440 100644 --- a/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts +++ b/rivetkit-typescript/packages/rivetkit/fixtures/driver-test-suite/registry.ts @@ -26,6 +26,7 @@ import { kvActor } from "./kv"; import { largePayloadActor, largePayloadConnActor } from "./large-payloads"; import { counterWithLifecycle } from "./lifecycle"; import { metadataActor } from "./metadata"; +import { queueActor, queueLimitedActor } from "./queue"; import { rawHttpActor, rawHttpHonoActor, @@ -76,6 +77,9 @@ export const registry = setup({ inlineClientActor, // From kv.ts kvActor, + // From queue.ts + queueActor, + queueLimitedActor, // From action-inputs.ts inputActor, // From action-timeout.ts diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index ddf4623860..f5f02a8ec3 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -163,7 +163,7 @@ ], "scripts": { "build": "tsup src/mod.ts src/client/mod.ts src/common/log.ts src/common/websocket.ts src/actor/errors.ts src/topologies/coordinate/mod.ts src/topologies/partition/mod.ts src/utils.ts src/driver-helpers/mod.ts src/driver-test-suite/mod.ts src/serve-test-suite/mod.ts src/test/mod.ts src/inspector/mod.ts", - "build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts", + "build:schema": "./scripts/compile-bare.ts compile schemas/client-protocol/v1.bare -o dist/schemas/client-protocol/v1.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v2.bare -o dist/schemas/client-protocol/v2.ts && ./scripts/compile-bare.ts compile schemas/client-protocol/v3.bare -o dist/schemas/client-protocol/v3.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v1.bare -o dist/schemas/file-system-driver/v1.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v2.bare -o dist/schemas/file-system-driver/v2.ts && ./scripts/compile-bare.ts compile schemas/file-system-driver/v3.bare -o dist/schemas/file-system-driver/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v1.bare -o dist/schemas/actor-persist/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v2.bare -o dist/schemas/actor-persist/v2.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v3.bare -o dist/schemas/actor-persist/v3.ts && ./scripts/compile-bare.ts compile schemas/actor-persist/v4.bare -o dist/schemas/actor-persist/v4.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v1.bare -o dist/schemas/actor-inspector/v1.ts && ./scripts/compile-bare.ts compile schemas/actor-inspector/v2.bare -o dist/schemas/actor-inspector/v2.ts", "check-types": "tsc --noEmit", "lint": "biome check .", "lint:fix": "biome check --write .", diff --git a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v1.bare b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v1.bare index df6931979c..28e9424ca0 100644 --- a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v1.bare +++ b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v1.bare @@ -34,7 +34,7 @@ type ToServerBody union { PatchStateRequest | StateRequest | ConnectionsRequest | - ActionRequest | + ActionRequest | EventsRequest | ClearEventsRequest | RpcsListRequest @@ -159,4 +159,4 @@ type ToClientBody union { type ToClient struct { body: ToClientBody -} \ No newline at end of file +} diff --git a/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v2.bare b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v2.bare new file mode 100644 index 0000000000..ef349e5d24 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/schemas/actor-inspector/v2.bare @@ -0,0 +1,168 @@ +# MARK: Message To Server + +type PatchStateRequest struct { + state: data +} + +type ActionRequest struct { + id: uint + name: str + args: data +} + +type StateRequest struct { + id: uint +} + +type ConnectionsRequest struct { + id: uint +} + +type EventsRequest struct { + id: uint +} + +type ClearEventsRequest struct { + id: uint +} + +type RpcsListRequest struct { + id: uint +} + +type ToServerBody union { + PatchStateRequest | + StateRequest | + ConnectionsRequest | + ActionRequest | + EventsRequest | + ClearEventsRequest | + RpcsListRequest +} + +type ToServer struct { + body: ToServerBody +} + +# MARK: Message To Client + +type State data + +type Connection struct { + id: str + details: data +} + +type ActionEvent struct { + name: str + args: data + connId: str +} + +type BroadcastEvent struct { + eventName: str + args: data +} + +type SubscribeEvent struct { + eventName: str + connId: str +} + +type UnSubscribeEvent struct { + eventName: str + connId: str +} + +type FiredEvent struct { + eventName: str + args: data + connId: str +} + +type EventBody union { + ActionEvent | + BroadcastEvent | + SubscribeEvent | + UnSubscribeEvent | + FiredEvent +} + +type Event struct { + id: str + timestamp: uint + body: EventBody +} + +type Init struct { + connections: list + events: list + state: optional + isStateEnabled: bool + rpcs: list + isDatabaseEnabled: bool + queueSize: uint +} + +type ConnectionsResponse struct { + rid: uint + connections: list +} + +type StateResponse struct { + rid: uint + state: optional + isStateEnabled: bool +} + +type EventsResponse struct { + rid: uint + events: list +} + +type ActionResponse struct { + rid: uint + output: data +} + +type StateUpdated struct { + state: State +} + +type EventsUpdated struct { + events: list +} + +type QueueUpdated struct { + queueSize: uint +} + +type RpcsListResponse struct { + rid: uint + rpcs: list +} + +type ConnectionsUpdated struct { + connections: list +} +type Error struct { + message: str +} + +type ToClientBody union { + StateResponse | + ConnectionsResponse | + EventsResponse | + ActionResponse | + ConnectionsUpdated | + EventsUpdated | + QueueUpdated | + StateUpdated | + RpcsListResponse | + Error | + Init +} + +type ToClient struct { + body: ToClientBody +} diff --git a/rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v4.bare b/rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v4.bare new file mode 100644 index 0000000000..fa35295721 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/schemas/actor-persist/v4.bare @@ -0,0 +1,56 @@ +type GatewayId data[4] +type RequestId data[4] +type MessageIndex u16 + +type Cbor data + +# MARK: Connection +type Subscription struct { + eventName: str +} + +# Connection associated with hibernatable WebSocket that should persist across lifecycles. +type Conn struct { + # Connection ID generated by RivetKit + id: str + parameters: Cbor + state: Cbor + subscriptions: list + + gatewayId: GatewayId + requestId: RequestId + serverMessageIndex: u16 + clientMessageIndex: u16 + + requestPath: str + requestHeaders: map +} + +# MARK: Schedule Event +type ScheduleEvent struct { + eventId: str + timestamp: i64 + action: str + args: optional +} + +# MARK: Actor +type Actor struct { + # Input data passed to the actor on initialization + input: optional + hasInitialized: bool + state: Cbor + scheduledEvents: list +} + +# MARK: Queue +type QueueMetadata struct { + nextId: u64 + size: u32 +} + +type QueueMessage struct { + name: str + body: Cbor + createdAt: i64 +} diff --git a/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v1.bare b/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v1.bare index 35423ef872..ea34364c8f 100644 --- a/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v1.bare +++ b/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v1.bare @@ -81,4 +81,3 @@ type HttpResolveRequest void type HttpResolveResponse struct { actorId: str } - diff --git a/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v3.bare b/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v3.bare new file mode 100644 index 0000000000..b6e1220f7e --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/schemas/client-protocol/v3.bare @@ -0,0 +1,93 @@ +# MARK: Message To Client +type Init struct { + actorId: str + connectionId: str +} + +type Error struct { + group: str + code: str + message: str + metadata: optional + actionId: optional +} + +type ActionResponse struct { + id: uint + output: data +} + +type Event struct { + name: str + # CBOR array + args: data +} + +type ToClientBody union { + Init | + Error | + ActionResponse | + Event +} + +type ToClient struct { + body: ToClientBody +} + +# MARK: Message To Server +type ActionRequest struct { + id: uint + name: str + # CBOR array + args: data +} + +type SubscriptionRequest struct { + eventName: str + subscribe: bool +} + +type ToServerBody union { + ActionRequest | + SubscriptionRequest +} + +type ToServer struct { + body: ToServerBody +} + +# MARK: HTTP Action +type HttpActionRequest struct { + # CBOR array + args: data +} + +type HttpActionResponse struct { + output: data +} + +# MARK: HTTP Queue + +type HttpQueueSendRequest struct { + name: str + body: data +} + +type HttpQueueSendResponse struct { + ok: bool +} + +# MARK: HTTP Error +type HttpResponseError struct { + group: str + code: str + message: str + metadata: optional +} + +# MARK: HTTP Resolve +type HttpResolveRequest void + +type HttpResolveResponse struct { + actorId: str +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index 882b72b890..836f31bbe1 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -83,6 +83,11 @@ export const ActorConfigSchema = z connectionLivenessInterval: z.number().positive().default(5000), noSleep: z.boolean().default(false), sleepTimeout: z.number().positive().default(30_000), + maxQueueSize: z.number().positive().default(1000), + maxQueueMessageSize: z + .number() + .positive() + .default(1024 * 1024), /** * Can hibernate WebSockets for onWebSocket. * @@ -689,6 +694,18 @@ export const DocActorOptionsSchema = z .describe( "Time in ms of inactivity before the actor sleeps. Default: 30000", ), + maxQueueSize: z + .number() + .optional() + .describe( + "Maximum number of queue messages before rejecting new messages. Default: 1000", + ), + maxQueueMessageSize: z + .number() + .optional() + .describe( + "Maximum size of each queue message in bytes. Default: 1048576", + ), canHibernateWebSocket: z .boolean() .optional() diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts index da47afe0b0..52c07461a7 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts @@ -7,6 +7,7 @@ import type { AnyDatabaseProvider, InferDatabaseClient } from "../../database"; import type { ActorDefinition, AnyActorDefinition } from "../../definition"; import type { ActorInstance, SaveStateOptions } from "../../instance/mod"; import { ActorKv } from "../../instance/kv"; +import { ActorQueue } from "../../instance/queue"; import type { Schedule } from "../../schedule"; /** @@ -29,6 +30,9 @@ export class ActorContext< TDatabase >; #kv: ActorKv | undefined; + #queue: + | ActorQueue + | undefined; constructor( actor: ActorInstance< @@ -91,6 +95,26 @@ export class ActorContext< return this.#actor.log; } + /** + * Access to queue receive helpers. + */ + get queue(): ActorQueue< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase + > { + if (!this.#queue) { + this.#queue = new ActorQueue( + this.#actor.queueManager, + this.#actor.abortSignal, + ); + } + return this.#queue; + } + /** * Gets actor ID. */ diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts index ff11968672..93250d0bea 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/index.ts @@ -1,10 +1,10 @@ // Base contexts -export { ActorContext, type ActorContextOf } from "./base/actor"; -export { ConnContext, type ConnContextOf } from "./base/conn"; -export { ConnInitContext, type ConnInitContextOf } from "./base/conn-init"; // Lifecycle contexts export { ActionContext, type ActionContextOf } from "./action"; +export { ActorContext, type ActorContextOf } from "./base/actor"; +export { ConnContext, type ConnContextOf } from "./base/conn"; +export { ConnInitContext, type ConnInitContextOf } from "./base/conn-init"; export { BeforeActionResponseContext, type BeforeActionResponseContextOf, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts b/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts index 26b0b03a87..101a1df661 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/errors.ts @@ -188,6 +188,45 @@ export class Unsupported extends ActorError { } } +export class QueueFull extends ActorError { + constructor(limit: number) { + super("queue", "full", `Queue is full. Limit is ${limit} messages.`, { + public: true, + metadata: { limit }, + }); + } +} + +export class QueueMessageTooLarge extends ActorError { + constructor(size: number, limit: number) { + super( + "queue", + "message_too_large", + `Queue message too large (${size} bytes). Limit is ${limit} bytes.`, + { public: true, metadata: { size, limit } }, + ); + } +} + +export class QueueMessageInvalid extends ActorError { + constructor(path?: string) { + super( + "queue", + "message_invalid", + path + ? `Queue message body contains unsupported type at ${path}.` + : "Queue message body contains unsupported type.", + { public: true, metadata: path ? { path } : undefined }, + ); + } +} + +export class ActorAborted extends ActorError { + constructor() { + super("actor", "aborted", "Actor aborted.", { public: true }); + } +} + /** * Options for the UserError class. */ diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts index 641cfd027c..1f3f0fbae4 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/keys.ts @@ -3,8 +3,12 @@ export const KEYS = { CONN_PREFIX: Uint8Array.from([2]), // Prefix for connection keys INSPECTOR_TOKEN: Uint8Array.from([3]), // Inspector token key KV: Uint8Array.from([4]), // Prefix for user-facing KV storage + QUEUE_PREFIX: Uint8Array.from([5]), // Prefix for queue message keys + QUEUE_METADATA: Uint8Array.from([6]), // Queue metadata key }; +const QUEUE_ID_BYTES = 8; + // Helper to create a prefixed key for user-facing KV storage export function makePrefixedKey(key: Uint8Array): Uint8Array { const prefixed = new Uint8Array(KEYS.KV.length + key.length); @@ -27,3 +31,26 @@ export function makeConnKey(connId: string): Uint8Array { key.set(connIdBytes, KEYS.CONN_PREFIX.length); return key; } + +// Helper to create a queue message key +export function makeQueueMessageKey(id: bigint): Uint8Array { + const key = new Uint8Array(KEYS.QUEUE_PREFIX.length + QUEUE_ID_BYTES); + key.set(KEYS.QUEUE_PREFIX, 0); + const view = new DataView(key.buffer, key.byteOffset, key.byteLength); + view.setBigUint64(KEYS.QUEUE_PREFIX.length, id, false); + return key; +} + +// Helper to decode a queue message key +export function decodeQueueMessageKey(key: Uint8Array): bigint { + const offset = KEYS.QUEUE_PREFIX.length; + if (key.length < offset + QUEUE_ID_BYTES) { + throw new Error("Queue key is too short"); + } + const view = new DataView( + key.buffer, + key.byteOffset + offset, + QUEUE_ID_BYTES, + ); + return view.getBigUint64(0, false); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index a847e80271..7960aec9a5 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -30,6 +30,7 @@ import { RequestContext, WebSocketContext, } from "../contexts"; + import type { AnyDatabaseProvider, InferDatabaseClient } from "../database"; import type { ActorDriver } from "../driver"; import * as errors from "../errors"; @@ -49,6 +50,7 @@ import { convertActorFromBarePersisted, type PersistedActor, } from "./persisted"; +import { QueueManager } from "./queue-manager"; import { ScheduleManager } from "./schedule-manager"; import { type SaveStateOptions, StateManager } from "./state-manager"; @@ -101,6 +103,8 @@ export class ActorInstance { #scheduleManager!: ScheduleManager; + queueManager!: QueueManager; + // MARK: - Logging #log!: Logger; #rLog!: Logger; @@ -269,6 +273,7 @@ export class ActorInstance { this.connectionManager = new ConnectionManager(this); this.stateManager = new StateManager(this, actorDriver, this.#config); this.eventManager = new EventManager(this); + this.queueManager = new QueueManager(this, actorDriver); this.#scheduleManager = new ScheduleManager( this, actorDriver, @@ -281,6 +286,8 @@ export class ActorInstance { // Load state await this.#loadState(); + await this.queueManager.initialize(); + // Generate or load inspector token await this.#initializeInspectorToken(); diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts new file mode 100644 index 0000000000..4591124d11 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts @@ -0,0 +1,364 @@ +import * as cbor from "cbor-x"; +import { isCborSerializable } from "@/common/utils"; +import { + CURRENT_VERSION as ACTOR_PERSIST_CURRENT_VERSION, + QUEUE_MESSAGE_VERSIONED, + QUEUE_METADATA_VERSIONED, +} from "@/schemas/actor-persist/versioned"; +import { promiseWithResolvers } from "@/utils"; +import type { AnyDatabaseProvider } from "../database"; +import type { ActorDriver } from "../driver"; +import * as errors from "../errors"; +import { decodeQueueMessageKey, KEYS, makeQueueMessageKey } from "./keys"; +import type { ActorInstance } from "./mod"; + +export interface QueueMessage { + id: bigint; + name: string; + body: unknown; + createdAt: number; +} + +interface QueueMetadata { + nextId: bigint; + size: number; +} + +interface QueueWaiter { + id: string; + nameSet: Set; + count: number; + resolve: (messages: QueueMessage[]) => void; + reject: (error: Error) => void; + signal?: AbortSignal; + timeoutHandle?: ReturnType; +} + +const DEFAULT_METADATA: QueueMetadata = { + nextId: 1n, + size: 0, +}; + +export class QueueManager { + #actor: ActorInstance; + #driver: ActorDriver; + #waiters = new Map(); + #metadata: QueueMetadata = { ...DEFAULT_METADATA }; + + constructor( + actor: ActorInstance, + driver: ActorDriver, + ) { + this.#actor = actor; + this.#driver = driver; + } + + /** Returns the current number of messages in the queue. */ + get size(): number { + return this.#metadata.size; + } + + /** Loads queue metadata from storage and initializes internal state. */ + async initialize(): Promise { + const [metadataBuffer] = await this.#driver.kvBatchGet(this.#actor.id, [ + KEYS.QUEUE_METADATA, + ]); + if (!metadataBuffer) { + await this.#driver.kvBatchPut(this.#actor.id, [ + [KEYS.QUEUE_METADATA, this.#serializeMetadata()], + ]); + this.#actor.inspector.updateQueueSize(this.#metadata.size); + return; + } + try { + const decoded = + QUEUE_METADATA_VERSIONED.deserializeWithEmbeddedVersion( + metadataBuffer, + ); + this.#metadata.nextId = decoded.nextId; + this.#metadata.size = Number(decoded.size); + } catch (error) { + this.#actor.rLog.error({ + msg: "failed to decode queue metadata, rebuilding from messages", + error, + }); + await this.#rebuildMetadata(); + } + } + + /** Adds a message to the queue with the given name and body. */ + async enqueue(name: string, body: unknown): Promise { + this.#actor.assertReady(); + + const sizeLimit = this.#actor.config.options.maxQueueSize; + if (this.#metadata.size >= sizeLimit) { + throw new errors.QueueFull(sizeLimit); + } + + let invalidPath = ""; + if ( + !isCborSerializable(body, (path) => { + invalidPath = path; + }) + ) { + throw new errors.QueueMessageInvalid(invalidPath); + } + + const createdAt = Date.now(); + const bodyCborBuffer = cbor.encode(body); + const encodedMessage = + QUEUE_MESSAGE_VERSIONED.serializeWithEmbeddedVersion( + { + name, + body: new Uint8Array(bodyCborBuffer).buffer as ArrayBuffer, + createdAt: BigInt(createdAt), + }, + ACTOR_PERSIST_CURRENT_VERSION, + ); + const encodedSize = encodedMessage.byteLength; + if (encodedSize > this.#actor.config.options.maxQueueMessageSize) { + throw new errors.QueueMessageTooLarge( + encodedSize, + this.#actor.config.options.maxQueueMessageSize, + ); + } + + const id = this.#metadata.nextId; + const messageKey = makeQueueMessageKey(id); + + // Update metadata before writing so we can batch both writes + this.#metadata.nextId = id + 1n; + this.#metadata.size += 1; + const encodedMetadata = this.#serializeMetadata(); + + // Batch write message and metadata together + await this.#driver.kvBatchPut(this.#actor.id, [ + [messageKey, encodedMessage], + [KEYS.QUEUE_METADATA, encodedMetadata], + ]); + + this.#actor.inspector.updateQueueSize(this.#metadata.size); + + const message: QueueMessage = { + id, + name, + body, + createdAt, + }; + + this.#actor.resetSleepTimer(); + await this.#maybeResolveWaiters(); + + return message; + } + + /** Receives messages from the queue matching the given names. Waits until messages are available or timeout is reached. */ + async receive( + names: string[], + count: number, + timeout?: number, + abortSignal?: AbortSignal, + ): Promise { + this.#actor.assertReady(); + const limitedCount = Math.max(1, count); + const nameSet = new Set(names); + + const immediate = await this.#drainMessages(nameSet, limitedCount); + if (immediate.length > 0 || timeout === 0) { + return timeout === 0 && immediate.length === 0 ? [] : immediate; + } + + const { promise, resolve, reject } = + promiseWithResolvers(); + const waiterId = crypto.randomUUID(); + const waiter: QueueWaiter = { + id: waiterId, + nameSet, + count: limitedCount, + resolve, + reject, + signal: abortSignal, + }; + + if (timeout !== undefined) { + waiter.timeoutHandle = setTimeout(() => { + this.#waiters.delete(waiterId); + resolve([]); + }, timeout); + } + + const onAbort = () => { + this.#waiters.delete(waiterId); + if (waiter.timeoutHandle) { + clearTimeout(waiter.timeoutHandle); + } + reject(new errors.ActorAborted()); + }; + const onStop = () => { + this.#waiters.delete(waiterId); + if (waiter.timeoutHandle) { + clearTimeout(waiter.timeoutHandle); + } + reject(new errors.ActorAborted()); + }; + const actorAbortSignal = this.#actor.abortSignal; + if (actorAbortSignal.aborted) { + onStop(); + return promise; + } + actorAbortSignal.addEventListener("abort", onStop, { once: true }); + + if (abortSignal) { + if (abortSignal.aborted) { + onAbort(); + return promise; + } + abortSignal.addEventListener("abort", onAbort, { once: true }); + } + + this.#waiters.set(waiterId, waiter); + return promise; + } + + /** Returns all messages currently in the queue without removing them. */ + async getMessages(): Promise { + return await this.#loadQueueMessages(); + } + + async #drainMessages( + nameSet: Set, + count: number, + ): Promise { + if (this.#metadata.size === 0) { + return []; + } + const entries = await this.#loadQueueMessages(); + const matched = entries.filter((entry) => nameSet.has(entry.name)); + if (matched.length === 0) { + return []; + } + + const selected = matched.slice(0, count); + await this.#removeMessages(selected); + return selected; + } + + async #loadQueueMessages(): Promise { + const entries = await this.#driver.kvListPrefix( + this.#actor.id, + KEYS.QUEUE_PREFIX, + ); + const decoded: QueueMessage[] = []; + for (const [key, value] of entries) { + try { + const messageId = decodeQueueMessageKey(key); + const decodedPayload = + QUEUE_MESSAGE_VERSIONED.deserializeWithEmbeddedVersion( + value, + ); + const body = cbor.decode(new Uint8Array(decodedPayload.body)); + decoded.push({ + id: messageId, + name: decodedPayload.name, + body, + createdAt: Number(decodedPayload.createdAt), + }); + } catch (error) { + this.#actor.rLog.error({ + msg: "failed to decode queue message", + error, + }); + } + } + decoded.sort((a, b) => (a.id < b.id ? -1 : a.id > b.id ? 1 : 0)); + if (this.#metadata.size !== decoded.length) { + this.#metadata.size = decoded.length; + this.#actor.inspector.updateQueueSize(this.#metadata.size); + } + return decoded; + } + + async #removeMessages(messages: QueueMessage[]): Promise { + if (messages.length === 0) { + return; + } + const keys = messages.map((message) => makeQueueMessageKey(message.id)); + + // Update metadata + this.#metadata.size = Math.max(0, this.#metadata.size - messages.length); + + // Delete messages and update metadata + // Note: kvBatchDelete doesn't support mixed operations, so we do two calls + await this.#driver.kvBatchDelete(this.#actor.id, keys); + await this.#driver.kvBatchPut(this.#actor.id, [ + [KEYS.QUEUE_METADATA, this.#serializeMetadata()], + ]); + + this.#actor.inspector.updateQueueSize(this.#metadata.size); + } + + async #maybeResolveWaiters() { + if (this.#waiters.size === 0) { + return; + } + const pending = [...this.#waiters.values()]; + for (const waiter of pending) { + if (waiter.signal?.aborted) { + this.#waiters.delete(waiter.id); + waiter.reject(new errors.ActorAborted()); + continue; + } + + const messages = await this.#drainMessages( + waiter.nameSet, + waiter.count, + ); + if (messages.length === 0) { + continue; + } + this.#waiters.delete(waiter.id); + if (waiter.timeoutHandle) { + clearTimeout(waiter.timeoutHandle); + } + waiter.resolve(messages); + } + } + + /** Rebuilds metadata by scanning existing queue messages. Used when metadata is corrupted. */ + async #rebuildMetadata(): Promise { + const entries = await this.#driver.kvListPrefix( + this.#actor.id, + KEYS.QUEUE_PREFIX, + ); + + let maxId = 0n; + for (const [key] of entries) { + try { + const messageId = decodeQueueMessageKey(key); + if (messageId > maxId) { + maxId = messageId; + } + } catch { + // Skip malformed keys + } + } + + this.#metadata.nextId = maxId + 1n; + this.#metadata.size = entries.length; + + await this.#driver.kvBatchPut(this.#actor.id, [ + [KEYS.QUEUE_METADATA, this.#serializeMetadata()], + ]); + this.#actor.inspector.updateQueueSize(this.#metadata.size); + } + + #serializeMetadata(): Uint8Array { + return QUEUE_METADATA_VERSIONED.serializeWithEmbeddedVersion( + { + nextId: this.#metadata.nextId, + size: this.#metadata.size, + }, + ACTOR_PERSIST_CURRENT_VERSION, + ); + } +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts new file mode 100644 index 0000000000..aac08c867b --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts @@ -0,0 +1,74 @@ +import type { AnyDatabaseProvider } from "../database"; +import type { QueueManager, QueueMessage } from "./queue-manager"; + +/** Options for receiving messages from the queue. */ +export interface QueueReceiveOptions { + /** Maximum number of messages to receive. Defaults to 1. */ + count?: number; + /** Timeout in milliseconds to wait for messages. Waits indefinitely if not specified. */ + timeout?: number; +} + +/** Request object for receiving messages from the queue. */ +export interface QueueReceiveRequest extends QueueReceiveOptions { + /** Queue name or names to receive from. */ + name: string | string[]; +} + +/** User-facing queue interface exposed on ActorContext. */ +export class ActorQueue { + #queueManager: QueueManager; + #abortSignal: AbortSignal; + + constructor( + queueManager: QueueManager, + abortSignal: AbortSignal, + ) { + this.#queueManager = queueManager; + this.#abortSignal = abortSignal; + } + + /** Receives the next message from a single queue. Returns undefined if no message available. */ + next( + name: string, + opts?: QueueReceiveOptions, + ): Promise; + /** Receives messages from multiple queues. Returns messages matching any of the queue names. */ + next( + name: string[], + opts?: QueueReceiveOptions, + ): Promise; + /** Receives messages using a request object for full control over options. */ + next(request: QueueReceiveRequest): Promise; + async next( + nameOrRequest: string | string[] | QueueReceiveRequest, + opts: QueueReceiveOptions = {}, + ): Promise { + const request = + typeof nameOrRequest === "object" && !Array.isArray(nameOrRequest) + ? nameOrRequest + : { name: nameOrRequest }; + const mergedOptions = request === nameOrRequest ? request : opts; + const names = Array.isArray(request.name) + ? request.name + : [request.name]; + const count = mergedOptions.count ?? 1; + + const messages = await this.#queueManager.receive( + names, + count, + mergedOptions.timeout, + this.#abortSignal, + ); + + if (Array.isArray(request.name)) { + return messages; + } + + if (!messages || messages.length === 0) { + return undefined; + } + + return messages[0]; + } +} diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts b/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts index dfc9c058b5..735f7d837a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts @@ -13,17 +13,24 @@ import { WS_PROTOCOL_ENCODING, } from "@/common/actor-router-consts"; import { stringifyError } from "@/common/utils"; +import type { RegistryConfig } from "@/registry/config"; import type * as protocol from "@/schemas/client-protocol/mod"; import { CURRENT_VERSION as CLIENT_PROTOCOL_CURRENT_VERSION, HTTP_ACTION_REQUEST_VERSIONED, HTTP_ACTION_RESPONSE_VERSIONED, + HTTP_QUEUE_SEND_REQUEST_VERSIONED, + HTTP_QUEUE_SEND_RESPONSE_VERSIONED, } from "@/schemas/client-protocol/versioned"; import { type HttpActionRequest as HttpActionRequestJson, HttpActionRequestSchema, type HttpActionResponse as HttpActionResponseJson, HttpActionResponseSchema, + type HttpQueueSendRequest as HttpQueueSendRequestJson, + HttpQueueSendRequestSchema, + type HttpQueueSendResponse as HttpQueueSendResponseJson, + HttpQueueSendResponseSchema, } from "@/schemas/client-protocol-zod/mod"; import { contentTypeForEncoding, @@ -35,7 +42,6 @@ import { createHttpDriver } from "./conn/drivers/http"; import { createRawRequestDriver } from "./conn/drivers/raw-request"; import type { ActorDriver } from "./driver"; import { loggerWithoutContext } from "./log"; -import { RegistryConfig } from "@/registry/config"; export interface ActionOpts { req?: HonoRequest; @@ -61,6 +67,13 @@ export interface FetchOpts { actorId: string; } +export interface QueueSendOpts { + req?: HonoRequest; + name: string; + body: unknown; + actorId: string; +} + /** * Creates an action handler */ @@ -138,7 +151,10 @@ export async function handleAction( ); // Check outgoing message size - const messageSize = serialized instanceof Uint8Array ? serialized.byteLength : serialized.length; + const messageSize = + serialized instanceof Uint8Array + ? serialized.byteLength + : serialized.length; if (messageSize > config.maxOutgoingMessageSize) { throw new errors.OutgoingMessageTooLong(); } @@ -149,6 +165,61 @@ export async function handleAction( }); } +export async function handleQueueSend( + c: HonoContext, + config: RegistryConfig, + actorDriver: ActorDriver, + actorId: string, +) { + const encoding = getRequestEncoding(c.req); + const params = getRequestConnParams(c.req); + const arrayBuffer = await c.req.arrayBuffer(); + + if (arrayBuffer.byteLength > config.maxIncomingMessageSize) { + throw new errors.IncomingMessageTooLong(); + } + + const request = deserializeWithEncoding( + encoding, + new Uint8Array(arrayBuffer), + HTTP_QUEUE_SEND_REQUEST_VERSIONED, + HttpQueueSendRequestSchema, + (json: HttpQueueSendRequestJson) => json, + (bare: protocol.HttpQueueSendRequest) => ({ + name: bare.name, + body: cbor.decode(new Uint8Array(bare.body)), + }), + ); + + const actor = await actorDriver.loadActor(actorId); + const conn = await actor.connectionManager.prepareAndConnectConn( + createHttpDriver(), + params, + c.req.raw, + c.req.path, + c.req.header(), + ); + try { + await actor.queueManager.enqueue(request.name, request.body); + } finally { + conn.disconnect(); + } + + const response = serializeWithEncoding( + encoding, + { ok: true }, + HTTP_QUEUE_SEND_RESPONSE_VERSIONED, + CLIENT_PROTOCOL_CURRENT_VERSION, + HttpQueueSendResponseSchema, + (_value): HttpQueueSendResponseJson => ({ ok: true }), + (_value): protocol.HttpQueueSendResponse => ({ ok: true }), + ); + + return c.body(response as Uint8Array as any, 200, { + "Content-Type": contentTypeForEncoding(encoding), + }); +} + export async function handleRawRequest( c: HonoContext, req: Request, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/router.ts b/rivetkit-typescript/packages/rivetkit/src/actor/router.ts index d5a057eae1..bb12b91b19 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/router.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/router.ts @@ -4,8 +4,10 @@ import { type ActionOutput, type ConnsMessageOpts, handleAction, + handleQueueSend, handleRawRequest, } from "@/actor/router-endpoints"; + import { PATH_CONNECT, PATH_INSPECTOR_CONNECT, @@ -164,6 +166,10 @@ export function createActorRouter( return handleAction(c, config, actorDriver, actionName, c.env.actorId); }); + router.post("/queue", async (c) => { + return handleQueueSend(c, config, actorDriver, c.env.actorId); + }); + router.all("/request/*", async (c) => { // TODO: This is not a clean way of doing this since `/http/` might exist mid-path // Strip the /http prefix from the URL to get the original path diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts index a75d19ae29..efdab8e9ca 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-conn.ts @@ -30,6 +30,7 @@ import { checkForSchedulingError, queryActor } from "./actor-query"; import { ACTOR_CONNS_SYMBOL, type ClientRaw } from "./client"; import * as errors from "./errors"; import { logger } from "./log"; +import { createQueueProxy, createQueueSender } from "./queue"; import { type WebSocketMessage as ConnMessage, messageLength, @@ -135,6 +136,7 @@ export class ActorConnRaw { #statusChangeHandlers = new Set(); #actionIdCounter = 0; + #queueSender: ReturnType; /** * Interval that keeps the NodeJS process alive if this is the only thing running. @@ -175,10 +177,29 @@ export class ActorConnRaw { this.#params = params; this.#encoding = encoding; this.#actorQuery = actorQuery; + this.#queueSender = createQueueSender({ + encoding: this.#encoding, + params: this.#params, + customFetch: async (request: Request) => { + if (!this.#actorId) { + const { actorId } = await queryActor( + undefined, + this.#actorQuery, + this.#driver, + ); + this.#actorId = actorId; + } + return this.#driver.sendRequest(this.#actorId, request); + }, + }); this.#keepNodeAliveInterval = setInterval(() => 60_000); } + get queue() { + return createQueueProxy(this.#queueSender); + } + /** * Call a raw action connection. See {@link ActorConn} for type-safe action calls. * @@ -243,7 +264,6 @@ export class ActorConnRaw { /** * Do not call this directly. -enc * Establishes a connection to the server using the specified endpoint & encoding & driver. * * @protected diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts index 7fe7cdb0ca..0a9733cddd 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts @@ -29,6 +29,7 @@ import { checkForSchedulingError, queryActor } from "./actor-query"; import { type ClientRaw, CREATE_ACTOR_CONN_PROXY } from "./client"; import { ActorError, isSchedulingError } from "./errors"; import { logger } from "./log"; +import { createQueueProxy, createQueueSender } from "./queue"; import { rawHttpFetch, rawWebSocket } from "./raw-utils"; import { sendHttpRequest } from "./utils"; @@ -44,6 +45,7 @@ export class ActorHandleRaw { #encoding: Encoding; #actorQuery: ActorQuery; #params: unknown; + #queueSender: ReturnType; /** * Do not call this directly. @@ -64,6 +66,22 @@ export class ActorHandleRaw { this.#encoding = encoding; this.#actorQuery = actorQuery; this.#params = params; + this.#queueSender = createQueueSender({ + encoding: this.#encoding, + params: this.#params, + customFetch: async (request: Request) => { + const { actorId } = await queryActor( + undefined, + this.#actorQuery, + this.#driver, + ); + return this.#driver.sendRequest(actorId, request); + }, + }); + } + + get queue() { + return createQueueProxy(this.#queueSender); } /** @@ -199,16 +217,10 @@ export class ActorHandleRaw { } /** - * Makes a raw HTTP request to the actor. - * - * @param input - The URL, path, or Request object - * @param init - Standard fetch RequestInit options - * @returns Promise - The raw HTTP response + * Fetches a resource from this actor via the /request endpoint. This is a + * convenience wrapper around the raw HTTP API. */ - async fetch( - input: string | URL | Request, - init?: RequestInit, - ): Promise { + fetch(input: string | URL | Request, init?: RequestInit) { return rawHttpFetch( this.#driver, this.#actorQuery, @@ -219,16 +231,9 @@ export class ActorHandleRaw { } /** - * Creates a raw WebSocket connection to the actor. - * - * @param path - The path for the WebSocket connection (e.g., "stream") - * @param protocols - Optional WebSocket subprotocols - * @returns WebSocket - A raw WebSocket connection + * Opens a raw WebSocket connection to this actor. */ - async websocket( - path?: string, - protocols?: string | string[], - ): Promise { + webSocket(path?: string, protocols?: string | string[]) { return rawWebSocket( this.#driver, this.#actorQuery, @@ -241,21 +246,12 @@ export class ActorHandleRaw { /** * Resolves the actor to get its unique actor ID. */ - async resolve({ signal }: { signal?: AbortSignal } = {}): Promise { - if ( - "getForKey" in this.#actorQuery || - "getOrCreateForKey" in this.#actorQuery - ) { - // TODO: - let name: string; - if ("getForKey" in this.#actorQuery) { - name = this.#actorQuery.getForKey.name; - } else if ("getOrCreateForKey" in this.#actorQuery) { - name = this.#actorQuery.getOrCreateForKey.name; - } else { - assertUnreachable(this.#actorQuery); - } + async resolve(): Promise { + if ("getForKey" in this.#actorQuery) { + const name = this.#actorQuery.getForKey.name; + const key = this.#actorQuery.getForKey.key; + // Query the actor to get the id const { actorId } = await queryActor( undefined, this.#actorQuery, diff --git a/rivetkit-typescript/packages/rivetkit/src/client/client.ts b/rivetkit-typescript/packages/rivetkit/src/client/client.ts index 1bd32d4aab..df298bc484 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/client.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/client.ts @@ -497,10 +497,14 @@ function createActorProxy( return value; } + if (prop === "queue") { + return Reflect.get(target, prop, receiver); + } + // Create action function that preserves 'this' context if (typeof prop === "string") { // If JS is attempting to calling this as a promise, ignore it - if (prop === "then") return undefined; + if (prop === "then" || prop === "queue") return undefined; let method = methodCache.get(prop); if (!method) { @@ -516,7 +520,7 @@ function createActorProxy( has(target: ActorHandleRaw, prop: string | symbol) { // All string properties are potentially action functions if (typeof prop === "string") { - return true; + return prop !== "queue"; } // For symbols, defer to the target's own has behavior return Reflect.has(target, prop); @@ -545,6 +549,9 @@ function createActorProxy( return targetDescriptor; } if (typeof prop === "string") { + if (prop === "queue") { + return undefined; + } // Make action methods appear non-enumerable return { configurable: true, diff --git a/rivetkit-typescript/packages/rivetkit/src/client/queue.ts b/rivetkit-typescript/packages/rivetkit/src/client/queue.ts new file mode 100644 index 0000000000..13b20d8a48 --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/client/queue.ts @@ -0,0 +1,124 @@ +import * as cbor from "cbor-x"; +import type { Encoding } from "@/actor/protocol/serde"; +import { HEADER_CONN_PARAMS, HEADER_ENCODING } from "@/driver-helpers/mod"; +import type * as protocol from "@/schemas/client-protocol/mod"; +import { + CURRENT_VERSION as CLIENT_PROTOCOL_CURRENT_VERSION, + HTTP_QUEUE_SEND_REQUEST_VERSIONED, + HTTP_QUEUE_SEND_RESPONSE_VERSIONED, +} from "@/schemas/client-protocol/versioned"; +import { + type HttpQueueSendRequest as HttpQueueSendRequestJson, + HttpQueueSendRequestSchema, + type HttpQueueSendResponse as HttpQueueSendResponseJson, + HttpQueueSendResponseSchema, +} from "@/schemas/client-protocol-zod/mod"; +import { bufferToArrayBuffer } from "@/utils"; +import { sendHttpRequest } from "./utils"; + +export interface QueueSender { + send(name: string, body: unknown, signal?: AbortSignal): Promise; +} + +export interface QueueNameSender { + send(body: unknown, signal?: AbortSignal): Promise; +} + +export type QueueProxy = QueueSender & { + [key: string]: QueueNameSender; +}; + +interface QueueSenderOptions { + encoding: Encoding; + params: unknown; + customFetch: (request: Request) => Promise; +} + +export function createQueueSender(options: QueueSenderOptions): QueueSender { + return { + async send( + name: string, + body: unknown, + signal?: AbortSignal, + ): Promise { + await sendHttpRequest< + protocol.HttpQueueSendRequest, + protocol.HttpQueueSendResponse, + HttpQueueSendRequestJson, + HttpQueueSendResponseJson, + { name: string; body: unknown }, + { ok: true } + >({ + url: "http://actor/queue", + method: "POST", + headers: { + [HEADER_ENCODING]: options.encoding, + ...(options.params !== undefined + ? { + [HEADER_CONN_PARAMS]: JSON.stringify( + options.params, + ), + } + : {}), + }, + body: { name, body }, + encoding: options.encoding, + customFetch: options.customFetch, + signal, + requestVersion: CLIENT_PROTOCOL_CURRENT_VERSION, + requestVersionedDataHandler: HTTP_QUEUE_SEND_REQUEST_VERSIONED, + responseVersion: CLIENT_PROTOCOL_CURRENT_VERSION, + responseVersionedDataHandler: + HTTP_QUEUE_SEND_RESPONSE_VERSIONED, + requestZodSchema: HttpQueueSendRequestSchema, + responseZodSchema: HttpQueueSendResponseSchema, + requestToJson: (value): HttpQueueSendRequestJson => value, + requestToBare: (value): protocol.HttpQueueSendRequest => ({ + name: value.name, + body: bufferToArrayBuffer(cbor.encode(value.body)), + }), + responseFromJson: (_json): { ok: true } => ({ ok: true }), + responseFromBare: (_bare): { ok: true } => ({ ok: true }), + }); + }, + }; +} + +export function createQueueProxy(sender: QueueSender): QueueProxy { + const methodCache = new Map(); + return new Proxy(sender, { + get(target, prop: string | symbol, receiver: unknown) { + if (typeof prop === "symbol") { + return Reflect.get(target, prop, receiver); + } + + if (prop in target) { + const value = Reflect.get(target, prop, target); + if (typeof value === "function") { + return value.bind(target); + } + return value; + } + + if (prop === "then") return undefined; + + if (typeof prop === "string") { + let method = methodCache.get(prop); + if (!method) { + method = { + send: (body: unknown, signal?: AbortSignal) => + target.send(prop, body, signal), + }; + methodCache.set(prop, method); + } + return method; + } + }, + has(target, prop: string | symbol) { + if (typeof prop === "string") { + return true; + } + return Reflect.has(target, prop); + }, + }) as QueueProxy; +} diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts index a4eaa6490b..f7c112b66f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/mod.ts @@ -25,6 +25,7 @@ import { runActorInspectorTests } from "./tests/actor-inspector"; import { runActorKvTests } from "./tests/actor-kv"; import { runActorMetadataTests } from "./tests/actor-metadata"; import { runActorOnStateChangeTests } from "./tests/actor-onstatechange"; +import { runActorQueueTests } from "./tests/actor-queue"; import { runActorVarsTests } from "./tests/actor-vars"; import { runManagerDriverTests } from "./tests/manager-driver"; import { runRawHttpTests } from "./tests/raw-http"; @@ -121,6 +122,8 @@ export function runDriverTests( runActorErrorHandlingTests(driverTestConfig); + runActorQueueTests(driverTestConfig); + runActorInlineClientTests(driverTestConfig); runActorKvTests(driverTestConfig); diff --git a/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-queue.ts b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-queue.ts new file mode 100644 index 0000000000..cf7467ffcb --- /dev/null +++ b/rivetkit-typescript/packages/rivetkit/src/driver-test-suite/tests/actor-queue.ts @@ -0,0 +1,126 @@ +import { describe, expect, test } from "vitest"; +import type { ActorError } from "@/client/mod"; +import type { DriverTestConfig } from "../mod"; +import { setupDriverTest, waitFor } from "../utils"; + +export function runActorQueueTests(driverTestConfig: DriverTestConfig) { + describe("Actor Queue Tests", () => { + test("client can send to actor queue", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.queueActor.getOrCreate(["client-send"]); + + await handle.queue.greeting.send({ hello: "world" }); + + const message = await handle.receiveOne("greeting"); + expect(message).toEqual({ + name: "greeting", + body: { hello: "world" }, + }); + }); + + test("actor can send to its own queue", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.queueActor.getOrCreate(["self-send"]); + + await handle.sendToSelf("self", { value: 42 }); + + const message = await handle.receiveOne("self"); + expect(message).toEqual({ name: "self", body: { value: 42 } }); + }); + + test("next supports name arrays and counts", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.queueActor.getOrCreate(["receive-array"]); + + await handle.queue.a.send(1); + await handle.queue.b.send(2); + await handle.queue.c.send(3); + + const messages = await handle.receiveMany(["a", "b"], { count: 2 }); + expect(messages).toEqual([ + { name: "a", body: 1 }, + { name: "b", body: 2 }, + ]); + }); + + test("next supports request objects", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.queueActor.getOrCreate(["receive-request"]); + + await handle.queue.one.send("first"); + await handle.queue.two.send("second"); + + const messages = await handle.receiveRequest({ + name: ["one", "two"], + count: 2, + }); + expect(messages).toEqual([ + { name: "one", body: "first" }, + { name: "two", body: "second" }, + ]); + }); + + test("next timeout returns empty array", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.queueActor.getOrCreate(["receive-timeout"]); + + const promise = handle.receiveMany(["missing"], { timeout: 50 }); + await waitFor(driverTestConfig, 60); + const messages = await promise; + expect(messages).toEqual([]); + }); + + test("abort throws ActorAborted", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.queueActor.getOrCreate(["abort-test"]); + + try { + await handle.waitForAbort(); + expect.fail("expected ActorAborted error"); + } catch (error) { + expect((error as ActorError).group).toBe("actor"); + expect((error as ActorError).code).toBe("aborted"); + } + }); + + test("enforces queue size limit", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const key = `size-limit-${Date.now()}-${Math.random().toString(16).slice(2)}`; + const handle = client.queueLimitedActor.getOrCreate([key]); + + await handle.queue.message.send(1); + + await waitFor(driverTestConfig, 10); + + try { + await handle.queue.message.send(2); + expect.fail("expected queue full error"); + } catch (error) { + expect(error).toBeInstanceOf(Error); + expect((error as Error).message).toContain( + "Queue is full. Limit is", + ); + if (driverTestConfig.clientType !== "http") { + expect((error as ActorError).group).toBe("queue"); + expect((error as ActorError).code).toBe("full"); + } + } + }); + + test("enforces message size limit", async (c) => { + const { client } = await setupDriverTest(c, driverTestConfig); + const handle = client.queueLimitedActor.getOrCreate([ + "message-limit", + ]); + const largePayload = "a".repeat(200); + + try { + await handle.queue.oversize.send(largePayload); + expect.fail("expected message_too_large error"); + } catch (error) { + expect((error as ActorError).group).toBe("queue"); + expect((error as ActorError).code).toBe("message_too_large"); + } + }); + }); +} diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts index 3730981522..6cc3dc8d35 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/actor-inspector.ts @@ -17,6 +17,7 @@ interface ActorInspectorEmitterEvents { connectionsUpdated: () => void; eventFired: (event: EventDetails) => void; eventsChanged: () => void; + queueUpdated: () => void; } export type Connection = Omit & { @@ -55,8 +56,10 @@ export class ActorInspector { public readonly emitter = createNanoEvents(); #lastEvents: Event[] = []; + #lastQueueSize = 0; constructor(private readonly actor: AnyActorInstance) { + this.#lastQueueSize = actor.queueManager?.size ?? 0; this.emitter.on("eventFired", (event) => { const commonParams = { id: crypto.randomUUID(), @@ -84,6 +87,18 @@ export class ActorInspector { this.emitter.emit("eventsChanged"); } + getQueueSize() { + return this.#lastQueueSize; + } + + updateQueueSize(size: number) { + if (this.#lastQueueSize === size) { + return; + } + this.#lastQueueSize = size; + this.emitter.emit("queueUpdated"); + } + // actor accessor methods isDatabaseEnabled() { diff --git a/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts b/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts index 2bf904cc80..ce5b2ddc25 100644 --- a/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts +++ b/rivetkit-typescript/packages/rivetkit/src/inspector/handler.ts @@ -4,6 +4,7 @@ import type { UpgradeWebSocketArgs } from "@/actor/router-websocket-endpoints"; import type { AnyActorInstance, RivetMessageEvent } from "@/mod"; import type { ToClient } from "@/schemas/actor-inspector/mod"; import { + CURRENT_VERSION as INSPECTOR_CURRENT_VERSION, TO_CLIENT_VERSIONED as toClient, TO_SERVER_VERSIONED as toServer, } from "@/schemas/actor-inspector/versioned"; @@ -33,6 +34,7 @@ export async function handleWebSocketInspectorConnect({ : null, isStateEnabled: inspector.isStateEnabled(), isDatabaseEnabled: inspector.isDatabaseEnabled(), + queueSize: BigInt(inspector.getQueueSize()), }, }, }); @@ -70,6 +72,16 @@ export async function handleWebSocketInspectorConnect({ }, }); }), + inspector.emitter.on("queueUpdated", () => { + sendMessage(ws, { + body: { + tag: "QueueUpdated", + val: { + queueSize: BigInt(inspector.getQueueSize()), + }, + }, + }); + }), ); }, onMessage: async (evt: RivetMessageEvent, ws: WSContext) => { @@ -181,7 +193,7 @@ function sendMessage(ws: WSContext, message: ToClient) { ws.send( toClient.serializeWithEmbeddedVersion( message, - 1, + INSPECTOR_CURRENT_VERSION, ) as unknown as ArrayBuffer, ); } diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts index f5c9cb363e..87a57d3b2c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/mod.ts @@ -1 +1 @@ -export * from "../../../dist/schemas/actor-inspector/v1"; +export * from "../../../dist/schemas/actor-inspector/v2"; diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts index 2ecb81670f..509daaea6a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-inspector/versioned.ts @@ -1,13 +1,62 @@ import { createVersionedDataHandler } from "vbare"; import * as v1 from "../../../dist/schemas/actor-inspector/v1"; +import * as v2 from "../../../dist/schemas/actor-inspector/v2"; +export const CURRENT_VERSION = 2; -export const TO_SERVER_VERSIONED = createVersionedDataHandler({ +// Converter from v1 to v2: Add queueSize field to Init message +const v1ToClientToV2 = (v1Data: v1.ToClient): v2.ToClient => { + if (v1Data.body.tag === "Init") { + const init = v1Data.body.val as v1.Init; + return { + body: { + tag: "Init", + val: { + ...init, + queueSize: 0n, + }, + }, + }; + } + return v1Data as unknown as v2.ToClient; +}; + +// Converter from v2 to v1: Remove queueSize field from Init, filter out QueueUpdated +const v2ToClientToV1 = (v2Data: v2.ToClient): v1.ToClient => { + if (v2Data.body.tag === "Init") { + const init = v2Data.body.val; + const { queueSize, ...rest } = init; + return { + body: { + tag: "Init", + val: rest, + }, + }; + } + // QueueUpdated doesn't exist in v1, so we can't convert it + if (v2Data.body.tag === "QueueUpdated") { + throw new Error("Cannot convert QueueUpdated to v1"); + } + return v2Data as unknown as v1.ToClient; +}; + +// ToServer is identical between v1 and v2 +const v1ToServerToV2 = (v1Data: v1.ToServer): v2.ToServer => { + return v1Data as unknown as v2.ToServer; +}; + +const v2ToServerToV1 = (v2Data: v2.ToServer): v1.ToServer => { + return v2Data as unknown as v1.ToServer; +}; + +export const TO_SERVER_VERSIONED = createVersionedDataHandler({ serializeVersion: (data, version) => { switch (version) { case 1: - return v1.encodeToServer(data); + return v1.encodeToServer(data as v1.ToServer); + case 2: + return v2.encodeToServer(data); default: throw new Error(`Unknown version ${version}`); } @@ -16,19 +65,23 @@ export const TO_SERVER_VERSIONED = createVersionedDataHandler({ switch (version) { case 1: return v1.decodeToServer(bytes); + case 2: + return v2.decodeToServer(bytes); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [], - serializeConverters: () => [], + deserializeConverters: () => [v1ToServerToV2], + serializeConverters: () => [v2ToServerToV1], }); -export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ +export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ serializeVersion: (data, version) => { switch (version) { case 1: - return v1.encodeToClient(data); + return v1.encodeToClient(data as v1.ToClient); + case 2: + return v2.encodeToClient(data); default: throw new Error(`Unknown version ${version}`); } @@ -37,10 +90,12 @@ export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ switch (version) { case 1: return v1.decodeToClient(bytes); + case 2: + return v2.decodeToClient(bytes); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [], - serializeConverters: () => [], + deserializeConverters: () => [v1ToClientToV2], + serializeConverters: () => [v2ToClientToV1], }); diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/versioned.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/versioned.ts index b5c9d94161..73ef7ffb20 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/versioned.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/actor-persist/versioned.ts @@ -2,8 +2,9 @@ import { createVersionedDataHandler } from "vbare"; import * as v1 from "../../../dist/schemas/actor-persist/v1"; import * as v2 from "../../../dist/schemas/actor-persist/v2"; import * as v3 from "../../../dist/schemas/actor-persist/v3"; +import * as v4 from "../../../dist/schemas/actor-persist/v4"; -export const CURRENT_VERSION = 3; +export const CURRENT_VERSION = 4; // Converter from v1 to v2 const v1ToV2 = (v1Data: v1.PersistedActor): v2.PersistedActor => ({ @@ -42,6 +43,16 @@ const v2ToV3 = (v2Data: v2.PersistedActor): v3.Actor => { }; }; +// Converter from v3 to v4: No changes to Actor structure +const v3ToV4 = (v3Data: v3.Actor): v4.Actor => { + return v3Data as unknown as v4.Actor; +}; + +// Converter from v4 to v3: No changes to Actor structure +const v4ToV3 = (v4Data: v4.Actor): v3.Actor => { + return v4Data as unknown as v3.Actor; +}; + // Converter from v3 to v2 const v3ToV2 = (v3Data: v3.Actor): v2.PersistedActor => { // Transform scheduled events from flat structure back to nested structure @@ -82,7 +93,7 @@ const v2ToV1 = (v2Data: v2.PersistedActor): v1.PersistedActor => { }; }; -export const ACTOR_VERSIONED = createVersionedDataHandler({ +export const ACTOR_VERSIONED = createVersionedDataHandler({ deserializeVersion: (bytes, version) => { switch (version) { case 1: @@ -91,6 +102,8 @@ export const ACTOR_VERSIONED = createVersionedDataHandler({ return v2.decodePersistedActor(bytes); case 3: return v3.decodeActor(bytes); + case 4: + return v4.decodeActor(bytes); default: throw new Error(`Unknown version ${version}`); } @@ -103,19 +116,32 @@ export const ACTOR_VERSIONED = createVersionedDataHandler({ return v2.encodePersistedActor(data as v2.PersistedActor); case 3: return v3.encodeActor(data as v3.Actor); + case 4: + return v4.encodeActor(data as v4.Actor); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToV2, v2ToV3], - serializeConverters: () => [v3ToV2, v2ToV1], + deserializeConverters: () => [v1ToV2, v2ToV3, v3ToV4], + serializeConverters: () => [v4ToV3, v3ToV2, v2ToV1], }); -export const CONN_VERSIONED = createVersionedDataHandler({ +// Conn identity converters (Conn is identical between v3 and v4) +const v3ConnToV4 = (v3Data: v3.Conn): v4.Conn => { + return v3Data as unknown as v4.Conn; +}; + +const v4ConnToV3 = (v4Data: v4.Conn): v3.Conn => { + return v4Data as unknown as v3.Conn; +}; + +export const CONN_VERSIONED = createVersionedDataHandler({ deserializeVersion: (bytes, version) => { switch (version) { case 3: return v3.decodeConn(bytes); + case 4: + return v4.decodeConn(bytes); default: throw new Error( `Conn type only exists in version 3+, got version ${version}`, @@ -126,12 +152,66 @@ export const CONN_VERSIONED = createVersionedDataHandler({ switch (version) { case 3: return v3.encodeConn(data as v3.Conn); + case 4: + return v4.encodeConn(data as v4.Conn); default: throw new Error( `Conn type only exists in version 3+, got version ${version}`, ); } }, - deserializeConverters: () => [], - serializeConverters: () => [], + deserializeConverters: () => [v3ConnToV4], + serializeConverters: () => [v4ConnToV3], }); + +export const QUEUE_METADATA_VERSIONED = + createVersionedDataHandler({ + deserializeVersion: (bytes, version) => { + switch (version) { + case 4: + return v4.decodeQueueMetadata(bytes); + default: + throw new Error( + `QueueMetadata type only exists in version 4+, got version ${version}`, + ); + } + }, + serializeVersion: (data, version) => { + switch (version) { + case 4: + return v4.encodeQueueMetadata(data as v4.QueueMetadata); + default: + throw new Error( + `QueueMetadata type only exists in version 4+, got version ${version}`, + ); + } + }, + deserializeConverters: () => [], + serializeConverters: () => [], + }); + +export const QUEUE_MESSAGE_VERSIONED = + createVersionedDataHandler({ + deserializeVersion: (bytes, version) => { + switch (version) { + case 4: + return v4.decodeQueueMessage(bytes); + default: + throw new Error( + `QueueMessage type only exists in version 4+, got version ${version}`, + ); + } + }, + serializeVersion: (data, version) => { + switch (version) { + case 4: + return v4.encodeQueueMessage(data as v4.QueueMessage); + default: + throw new Error( + `QueueMessage type only exists in version 4+, got version ${version}`, + ); + } + }, + deserializeConverters: () => [], + serializeConverters: () => [], + }); diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol-zod/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol-zod/mod.ts index 8460f713ac..b2067028fe 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol-zod/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol-zod/mod.ts @@ -84,6 +84,18 @@ export const HttpActionResponseSchema = z.object({ }); export type HttpActionResponse = z.infer; +// MARK: HTTP Queue +export const HttpQueueSendRequestSchema = z.object({ + name: z.string(), + body: z.unknown(), +}); +export type HttpQueueSendRequest = z.infer; + +export const HttpQueueSendResponseSchema = z.object({ + ok: z.boolean(), +}); +export type HttpQueueSendResponse = z.infer; + // MARK: HTTP Error export const HttpResponseErrorSchema = z.object({ group: z.string(), diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/mod.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/mod.ts index 19c1c3877e..a831679919 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/mod.ts @@ -1 +1 @@ -export * from "../../../dist/schemas/client-protocol/v2"; +export * from "../../../dist/schemas/client-protocol/v3"; diff --git a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/versioned.ts b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/versioned.ts index 8799261dc2..b5b7e6508e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/versioned.ts +++ b/rivetkit-typescript/packages/rivetkit/src/schemas/client-protocol/versioned.ts @@ -1,8 +1,9 @@ import { createVersionedDataHandler } from "vbare"; import * as v1 from "../../../dist/schemas/client-protocol/v1"; import * as v2 from "../../../dist/schemas/client-protocol/v2"; +import * as v3 from "../../../dist/schemas/client-protocol/v3"; -export const CURRENT_VERSION = 2; +export const CURRENT_VERSION = 3; // Converter from v1 to v2: Remove connectionToken from Init message const v1ToV2 = (v1Data: v1.ToClient): v2.ToClient => { @@ -43,13 +44,42 @@ const v2ToV1 = (v2Data: v2.ToClient): v1.ToClient => { return v2Data as unknown as v1.ToClient; }; -export const TO_SERVER_VERSIONED = createVersionedDataHandler({ +// Converter from v2 to v3: No changes needed for ToClient +const v2ToV3 = (v2Data: v2.ToClient): v3.ToClient => { + return v2Data as unknown as v3.ToClient; +}; + +// Converter from v3 to v2: No changes needed for ToClient +const v3ToV2 = (v3Data: v3.ToClient): v2.ToClient => { + return v3Data as unknown as v2.ToClient; +}; + +// ToServer identity converters (ToServer is identical across v1, v2, and v3) +const v1ToServerV2 = (v1Data: v1.ToServer): v2.ToServer => { + return v1Data as unknown as v2.ToServer; +}; + +const v2ToServerV3 = (v2Data: v2.ToServer): v3.ToServer => { + return v2Data as unknown as v3.ToServer; +}; + +const v3ToServerV2 = (v3Data: v3.ToServer): v2.ToServer => { + return v3Data as unknown as v2.ToServer; +}; + +const v2ToServerV1 = (v2Data: v2.ToServer): v1.ToServer => { + return v2Data as unknown as v1.ToServer; +}; + +export const TO_SERVER_VERSIONED = createVersionedDataHandler({ deserializeVersion: (bytes, version) => { switch (version) { case 1: return v1.decodeToServer(bytes); case 2: return v2.decodeToServer(bytes); + case 3: + return v3.decodeToServer(bytes); default: throw new Error(`Unknown version ${version}`); } @@ -60,21 +90,25 @@ export const TO_SERVER_VERSIONED = createVersionedDataHandler({ return v1.encodeToServer(data as v1.ToServer); case 2: return v2.encodeToServer(data as v2.ToServer); + case 3: + return v3.encodeToServer(data as v3.ToServer); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToV2], - serializeConverters: () => [v2ToV1], + deserializeConverters: () => [v1ToServerV2, v2ToServerV3], + serializeConverters: () => [v3ToServerV2, v2ToServerV1], }); -export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ +export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ deserializeVersion: (bytes, version) => { switch (version) { case 1: return v1.decodeToClient(bytes); case 2: return v2.decodeToClient(bytes); + case 3: + return v3.decodeToClient(bytes); default: throw new Error(`Unknown version ${version}`); } @@ -85,22 +119,26 @@ export const TO_CLIENT_VERSIONED = createVersionedDataHandler({ return v1.encodeToClient(data as v1.ToClient); case 2: return v2.encodeToClient(data as v2.ToClient); + case 3: + return v3.encodeToClient(data as v3.ToClient); default: throw new Error(`Unknown version ${version}`); } }, - deserializeConverters: () => [v1ToV2], - serializeConverters: () => [v2ToV1], + deserializeConverters: () => [v1ToV2, v2ToV3], + serializeConverters: () => [v3ToV2, v2ToV1], }); export const HTTP_ACTION_REQUEST_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ deserializeVersion: (bytes, version) => { switch (version) { case 1: return v1.decodeHttpActionRequest(bytes); case 2: return v2.decodeHttpActionRequest(bytes); + case 3: + return v3.decodeHttpActionRequest(bytes); default: throw new Error(`Unknown version ${version}`); } @@ -115,6 +153,10 @@ export const HTTP_ACTION_REQUEST_VERSIONED = return v2.encodeHttpActionRequest( data as v2.HttpActionRequest, ); + case 3: + return v3.encodeHttpActionRequest( + data as v3.HttpActionRequest, + ); default: throw new Error(`Unknown version ${version}`); } @@ -124,13 +166,15 @@ export const HTTP_ACTION_REQUEST_VERSIONED = }); export const HTTP_ACTION_RESPONSE_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ deserializeVersion: (bytes, version) => { switch (version) { case 1: return v1.decodeHttpActionResponse(bytes); case 2: return v2.decodeHttpActionResponse(bytes); + case 3: + return v3.decodeHttpActionResponse(bytes); default: throw new Error(`Unknown version ${version}`); } @@ -145,6 +189,10 @@ export const HTTP_ACTION_RESPONSE_VERSIONED = return v2.encodeHttpActionResponse( data as v2.HttpActionResponse, ); + case 3: + return v3.encodeHttpActionResponse( + data as v3.HttpActionResponse, + ); default: throw new Error(`Unknown version ${version}`); } @@ -153,14 +201,72 @@ export const HTTP_ACTION_RESPONSE_VERSIONED = serializeConverters: () => [], }); +export const HTTP_QUEUE_SEND_REQUEST_VERSIONED = + createVersionedDataHandler({ + deserializeVersion: (bytes, version) => { + switch (version) { + case 3: + return v3.decodeHttpQueueSendRequest(bytes); + default: + throw new Error( + `HttpQueueSendRequest only exists in version 3+, got version ${version}`, + ); + } + }, + serializeVersion: (data, version) => { + switch (version) { + case 3: + return v3.encodeHttpQueueSendRequest( + data as v3.HttpQueueSendRequest, + ); + default: + throw new Error( + `HttpQueueSendRequest only exists in version 3+, got version ${version}`, + ); + } + }, + deserializeConverters: () => [], + serializeConverters: () => [], + }); + +export const HTTP_QUEUE_SEND_RESPONSE_VERSIONED = + createVersionedDataHandler({ + deserializeVersion: (bytes, version) => { + switch (version) { + case 3: + return v3.decodeHttpQueueSendResponse(bytes); + default: + throw new Error( + `HttpQueueSendResponse only exists in version 3+, got version ${version}`, + ); + } + }, + serializeVersion: (data, version) => { + switch (version) { + case 3: + return v3.encodeHttpQueueSendResponse( + data as v3.HttpQueueSendResponse, + ); + default: + throw new Error( + `HttpQueueSendResponse only exists in version 3+, got version ${version}`, + ); + } + }, + deserializeConverters: () => [], + serializeConverters: () => [], + }); + export const HTTP_RESPONSE_ERROR_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ deserializeVersion: (bytes, version) => { switch (version) { case 1: return v1.decodeHttpResponseError(bytes); case 2: return v2.decodeHttpResponseError(bytes); + case 3: + return v3.decodeHttpResponseError(bytes); default: throw new Error(`Unknown version ${version}`); } @@ -175,6 +281,10 @@ export const HTTP_RESPONSE_ERROR_VERSIONED = return v2.encodeHttpResponseError( data as v2.HttpResponseError, ); + case 3: + return v3.encodeHttpResponseError( + data as v3.HttpResponseError, + ); default: throw new Error(`Unknown version ${version}`); } @@ -184,13 +294,15 @@ export const HTTP_RESPONSE_ERROR_VERSIONED = }); export const HTTP_RESOLVE_RESPONSE_VERSIONED = - createVersionedDataHandler({ + createVersionedDataHandler({ deserializeVersion: (bytes, version) => { switch (version) { case 1: return v1.decodeHttpResolveResponse(bytes); case 2: return v2.decodeHttpResolveResponse(bytes); + case 3: + return v3.decodeHttpResolveResponse(bytes); default: throw new Error(`Unknown version ${version}`); } @@ -205,6 +317,10 @@ export const HTTP_RESOLVE_RESPONSE_VERSIONED = return v2.encodeHttpResolveResponse( data as v2.HttpResolveResponse, ); + case 3: + return v3.encodeHttpResolveResponse( + data as v3.HttpResolveResponse, + ); default: throw new Error(`Unknown version ${version}`); }