diff --git a/examples/effect/README.md b/examples/effect/README.md index d2e8cfb91c..5aa4422e96 100644 --- a/examples/effect/README.md +++ b/examples/effect/README.md @@ -1,37 +1,36 @@ -# Effect Integration for RivetKit +# Effect Integration -Demonstrates how to integrate [Effect](https://effect.website/) with RivetKit actors for functional, type-safe programming with powerful error handling and dependency injection. +Demonstrates how to integrate [Effect](https://effect.website/) with actors for functional, type-safe programming with powerful error handling and dependency injection. ## Getting Started ```sh -git clone https://github.com/rivet-dev/rivet +git clone https://github.com/rivet-dev/rivet.git cd rivet/examples/effect npm install npm run dev ``` + ## Features - **Effect-wrapped actions** - Write actor actions using Effect generators for composable, type-safe logic -- **Durable workflows** - Use `@effect/workflow` with RivetKit's `waitUntil` for reliable multi-step operations - **Actor context as Effect service** - Access actor state, broadcast, and other context via Effect's dependency injection -- **Structured logging** - Effect-based logging utilities integrated with RivetKit's actor logging +- **Structured logging** - Effect-based logging utilities integrated with actor logging ## Implementation -This example provides Effect bindings for RivetKit actors. The core implementation wraps RivetKit's actor context in Effect services, allowing you to write actions using Effect's generator syntax. +This example provides Effect bindings for actors. The core implementation wraps the actor context in Effect services, allowing you to write actions using Effect's generator syntax. Key files: -- [`src/effect/action.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/effect/src/effect/action.ts) - Effect wrappers for action handlers with `Action.effect()` and `Action.workflow()` -- [`src/effect/actor.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/effect/src/effect/actor.ts) - Effect-wrapped actor context methods (state, broadcast, etc.) -- [`src/actors.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/effect/src/actors.ts) - Example actors using the Effect integration +- [`src/actors/fetch-actor.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/effect/src/actors/fetch-actor.ts) - Multi-step workflows with error handling +- [`src/actors/queue-processor.ts`](https://github.com/rivet-dev/rivet/tree/main/examples/effect/src/actors/queue-processor.ts) - Background queue processing with the `run` handler Example usage: ```typescript import { actor } from "rivetkit"; -import { Action } from "./effect/index.ts"; +import { Action } from "@rivetkit/effect"; export const counter = actor({ state: { count: 0 }, @@ -49,9 +48,9 @@ export const counter = actor({ ## Resources - [Effect Documentation](https://effect.website/docs/introduction) -- [RivetKit Actions](/docs/actors/actions) -- [RivetKit State](/docs/actors/state) +- [Actions](/docs/actors/actions) +- [State](/docs/actors/state) ## License -Apache 2.0 +MIT diff --git a/examples/effect/package.json b/examples/effect/package.json index af9624073c..6c96332d2d 100644 --- a/examples/effect/package.json +++ b/examples/effect/package.json @@ -9,18 +9,17 @@ "tags": ["functional"], "noFrontend": true }, - "license": "Apache-2.0", + "license": "MIT", "scripts": { "dev": "npx srvx --import tsx src/server.ts", "start": "npx srvx --import tsx src/server.ts", "check-types": "tsc --noEmit", - "test": "vitest run", "build": "tsup" }, "dependencies": { - "@effect/workflow": "^0.15.1", "@hono/node-server": "^1.14.0", "@hono/node-ws": "^1.1.0", + "@rivetkit/effect": "*", "effect": "^3.19.9", "hono": "^4.7.6", "rivetkit": "*", @@ -31,7 +30,6 @@ "@types/node": "^22.13.9", "tsup": "^8.4.0", "tsx": "^3.12.7", - "typescript": "^5.7.3", - "vitest": "^3.2.4" + "typescript": "^5.7.3" } } diff --git a/examples/effect/src/actors/fetch-actor.ts b/examples/effect/src/actors/fetch-actor.ts new file mode 100644 index 0000000000..3bed591020 --- /dev/null +++ b/examples/effect/src/actors/fetch-actor.ts @@ -0,0 +1,178 @@ +import { Data, Effect } from "effect"; +import { actor } from "rivetkit"; +import { Action, Log } from "@rivetkit/effect"; + +// Custom error types using Effect's Data.TaggedError +class FetchError extends Data.TaggedError("FetchError")<{ + url: string; + message: string; +}> {} + +class ValidationError extends Data.TaggedError("ValidationError")<{ + field: string; + message: string; +}> {} + +// Simulated external API call +const fetchUserData = ( + userId: string, +): Effect.Effect<{ name: string; email: string }, FetchError> => + Effect.tryPromise({ + try: async () => { + // Simulate API call + await new Promise((resolve) => setTimeout(resolve, 10)); + return { + name: `User ${userId}`, + email: `user${userId}@example.com`, + }; + }, + catch: () => + new FetchError({ + url: `/api/users/${userId}`, + message: "Failed to fetch user", + }), + }); + +// Simulated notification service +const sendNotification = ( + email: string, + message: string, +): Effect.Effect => + Effect.tryPromise({ + try: async () => { + // Simulate notification + await new Promise((resolve) => setTimeout(resolve, 10)); + }, + catch: () => + new FetchError({ + url: "/api/notify", + message: "Failed to send notification", + }), + }); + +interface FetchActorState { + processedUsers: string[]; + lastProcessedAt: number | null; +} + +/** + * This actor demonstrates Effect for multi-step failable operations: + * - Fetching data from external services + * - Multi-step workflows with error handling + * - Actor-to-actor communication + */ +export const fetchActor = actor({ + state: { + processedUsers: [], + lastProcessedAt: null, + } as FetchActorState, + + actions: { + // Simple Effect-wrapped action + getStats: Action.effect(function* (c) { + const s = yield* Action.state(c); + return { + processedCount: s.processedUsers.length, + lastProcessedAt: s.lastProcessedAt, + }; + }), + + // Multi-step workflow with failable operations + processUser: Action.effect(function* (c, userId: string) { + yield* Log.info("Starting user processing", { userId }); + + // Step 1: Validate input + if (!userId || userId.length === 0) { + return yield* Effect.fail( + new ValidationError({ + field: "userId", + message: "User ID is required", + }), + ); + } + + // Step 2: Fetch user data (can fail) + yield* Log.info("Fetching user data", { userId }); + const userData = yield* fetchUserData(userId); + + // Step 3: Send notification (can fail) + yield* Log.info("Sending notification", { email: userData.email }); + yield* sendNotification( + userData.email, + `Welcome, ${userData.name}!`, + ); + + // Step 4: Update state + yield* Action.updateState(c, (s) => { + s.processedUsers.push(userId); + s.lastProcessedAt = Date.now(); + }); + + yield* Log.info("User processing complete", { userId }); + + return { + success: true, + user: userData, + }; + }), + + // Batch processing with error recovery + processBatch: Action.effect(function* (c, userIds: string[]) { + const results: Array<{ + userId: string; + success: boolean; + error?: string; + }> = []; + + for (const userId of userIds) { + // Use Effect.either to handle errors without stopping the batch + const result = yield* Effect.either( + Effect.gen(function* () { + const userData = yield* fetchUserData(userId); + yield* sendNotification( + userData.email, + `Batch welcome, ${userData.name}!`, + ); + return userData; + }), + ); + + if (result._tag === "Right") { + yield* Action.updateState(c, (s) => { + s.processedUsers.push(userId); + s.lastProcessedAt = Date.now(); + }); + results.push({ userId, success: true }); + } else { + results.push({ + userId, + success: false, + error: result.left.message, + }); + } + } + + return results; + }), + + // Actor-to-actor communication example + callQueueProcessor: Action.effect(function* (c, processorKey: string) { + yield* Log.info("Calling queueProcessor actor", { processorKey }); + + // Get the internal client for actor-to-actor communication + const client = yield* Action.getClient(c); + + // Call another actor's action + const stats = yield* Effect.promise(async () => { + const queueProcessor = (client as any).queueProcessor.getOrCreate([ + processorKey, + ]); + return queueProcessor.getStats(); + }); + + yield* Log.info("Got stats from queueProcessor", stats as Record); + + return { processorKey, stats }; + }), + }, +}); diff --git a/examples/effect/src/actors/index.ts b/examples/effect/src/actors/index.ts index 48fa520ae5..5296837e55 100644 --- a/examples/effect/src/actors/index.ts +++ b/examples/effect/src/actors/index.ts @@ -1,18 +1,12 @@ import { setup } from "rivetkit"; -import { counter } from "./counter.ts"; -import { user } from "./user.ts"; -import { lifecycleDemo } from "./lifecycle-demo.ts"; -import { simple } from "./simple.ts"; +import { fetchActor } from "./fetch-actor.ts"; +import { queueProcessor } from "./queue-processor.ts"; -// Re-export individual actors -export { counter } from "./counter.ts"; -export { user } from "./user.ts"; -export { lifecycleDemo } from "./lifecycle-demo.ts"; -export { simple } from "./simple.ts"; +export { fetchActor } from "./fetch-actor.ts"; +export { queueProcessor } from "./queue-processor.ts"; -// Registry setup with all actors export const registry = setup({ - use: { counter, user, lifecycleDemo, simple }, + use: { fetchActor, queueProcessor }, }); export type Registry = typeof registry; diff --git a/examples/effect/src/actors/lifecycle-demo.ts b/examples/effect/src/actors/lifecycle-demo.ts deleted file mode 100644 index 7af2ddf62d..0000000000 --- a/examples/effect/src/actors/lifecycle-demo.ts +++ /dev/null @@ -1,85 +0,0 @@ -import { actor } from "rivetkit"; -import { - Action, - Log, - OnCreate, - OnWake, - OnDestroy, - OnSleep, - OnConnect, - OnDisconnect, - OnStateChange, -} from "../effect/index.ts"; - -// Lifecycle demo actor - demonstrates all lifecycle hooks with Effect wrappers -interface LifecycleState { - events: string[]; - connectionCount: number; - lastStateChange: number; -} - -export const lifecycleDemo = actor({ - state: { - events: [], - connectionCount: 0, - lastStateChange: 0, - } as LifecycleState, - - onCreate: OnCreate.effect(function* (c, input) { - yield* Log.info("Actor created"); - c.state.events.push("onCreate"); - }), - - onWake: OnWake.effect(function* (c) { - yield* Log.info("Actor woke up"); - c.state.events.push("onWake"); - }), - - onDestroy: OnDestroy.effect(function* (c) { - yield* Log.info("Actor destroying"); - c.state.events.push("onDestroy"); - }), - - onSleep: OnSleep.effect(function* (c) { - yield* Log.info("Actor going to sleep"); - c.state.events.push("onSleep"); - }), - - onStateChange: OnStateChange.effect(function* (c, newState) { - // Note: OnStateChange is synchronous, so only use sync effects here - c.state.lastStateChange = Date.now(); - }), - - onConnect: OnConnect.effect(function* (c, conn) { - yield* Log.info("Client connected"); - c.state.connectionCount++; - c.state.events.push("onConnect"); - yield* Action.broadcast(c, "userJoined", { connId: conn.id }); - }), - - onDisconnect: OnDisconnect.effect(function* (c, conn) { - yield* Log.info("Client disconnected"); - c.state.connectionCount--; - c.state.events.push("onDisconnect"); - yield* Action.broadcast(c, "userLeft", { connId: conn.id }); - }), - - actions: { - getEvents: Action.effect(function* (c) { - const s = yield* Action.state(c); - return s.events; - }), - - getConnectionCount: Action.effect(function* (c) { - const s = yield* Action.state(c); - return s.connectionCount; - }), - - clearEvents: Action.effect(function* (c) { - yield* Action.updateState(c, (s) => { - s.events = []; - }); - yield* Log.info("Events cleared"); - }), - }, -}); diff --git a/examples/effect/src/actors/queue-processor.ts b/examples/effect/src/actors/queue-processor.ts new file mode 100644 index 0000000000..736fd4617c --- /dev/null +++ b/examples/effect/src/actors/queue-processor.ts @@ -0,0 +1,143 @@ +import { Effect } from "effect"; +import { actor } from "rivetkit"; +import { Action, Run, Log, Queue } from "@rivetkit/effect"; + +interface QueueProcessorState { + processedMessages: number; + lastMessageAt: number | null; + isRunning: boolean; +} + +interface Task { + type: "process" | "compute" | "notify"; + payload: unknown; +} + +/** + * This actor demonstrates: + * - The `run` handler for background processing + * - The queues API for task/message processing + * - Long-running actor patterns + */ +export const queueProcessor = actor({ + state: { + processedMessages: 0, + lastMessageAt: null, + isRunning: false, + } as QueueProcessorState, + + // The `run` handler is called when the actor starts + // It's perfect for background task processing loops + run: Run.effect(function* (c) { + yield* Log.info("Queue processor starting"); + + yield* Action.updateState(c, (s) => { + s.isRunning = true; + }); + + // Main processing loop + while (true) { + // Wait for the next message from the queue + // This will block until a message is available or the actor is stopped + const message = yield* Queue.next(c, "tasks", { timeout: 5000 }); + + if (!message) { + yield* Log.debug("No message received, continuing to wait"); + continue; + } + + yield* Log.info("Processing message", { + id: message.id, + name: message.name, + }); + + // Process the task based on its type + const task = message.body as Task; + const result = yield* Effect.either(processTask(task)); + + if (result._tag === "Right") { + yield* Log.info("Task processed successfully", { + type: task.type, + }); + } else { + yield* Log.error("Task processing failed", { + type: task.type, + error: result.left, + }); + } + + // Update state + yield* Action.updateState(c, (s) => { + s.processedMessages++; + s.lastMessageAt = Date.now(); + }); + + // Broadcast progress to connected clients + const state = yield* Action.state(c); + yield* Action.broadcast(c, "progress", { + processedMessages: state.processedMessages, + lastMessageAt: state.lastMessageAt, + }); + } + }), + + actions: { + // Get current processor stats + getStats: Action.effect(function* (c) { + const s = yield* Action.state(c); + return { + processedMessages: s.processedMessages, + lastMessageAt: s.lastMessageAt, + isRunning: s.isRunning, + }; + }), + + // Submit a task to the queue + // In a real app, this would be called by external clients or other actors + submitTask: Action.effect(function* (c, task: Task) { + yield* Log.info("Task submitted", { type: task.type }); + + // Note: In this example, we can't directly enqueue since enqueue + // is typically done via the client. This action simulates what + // an external client would do. + return { submitted: true, taskType: task.type }; + }), + + // Reset statistics + resetStats: Action.effect(function* (c) { + yield* Action.updateState(c, (s) => { + s.processedMessages = 0; + s.lastMessageAt = null; + }); + yield* Log.info("Stats reset"); + return { reset: true }; + }), + }, +}); + +// Helper function to process different task types +function processTask(task: Task): Effect.Effect { + return Effect.gen(function* () { + switch (task.type) { + case "process": + // Simulate data processing + yield* Effect.sleep(10); + return { processed: task.payload }; + + case "compute": + // Simulate computation + yield* Effect.sleep(20); + return { computed: true }; + + case "notify": + // Simulate notification + yield* Effect.sleep(5); + return { notified: true }; + + default: + return yield* Effect.fail( + new Error(`Unknown task type: ${task.type}`), + ); + } + }); +} diff --git a/examples/effect/src/actors/user.ts b/examples/effect/src/actors/user.ts deleted file mode 100644 index 44db1d7869..0000000000 --- a/examples/effect/src/actors/user.ts +++ /dev/null @@ -1,66 +0,0 @@ -import { Data, Effect } from "effect"; -import { actor } from "rivetkit"; -import { Activity } from "@effect/workflow"; -import { Action } from "../effect/index.ts"; - -// User actor - demonstrates Effect workflows with external service calls -interface UserInput { - email: string; -} - -interface UserState { - email: string; - customerId: string; -} - -class InvalidEmailError extends Data.TaggedError("InvalidEmailError")<{ - email: string; -}> {} - -function validateEmail(email: string): boolean { - return true; -} - -function updateStripeCustomerEmail( - customerId: string, - email: string, -): Effect.Effect { - return Effect.void; -} - -function sendResendEmailConfirmation(email: string): Effect.Effect { - return Effect.void; -} - -export const user = actor({ - createState: (c, input: UserInput): UserState => ({ - email: input.email, - customerId: crypto.randomUUID(), - }), - actions: { - getEmail: Action.effect(function* (c) { - const s = yield* Action.state(c); - return s.email; - }), - updateEmail: Action.workflow(function* (c, newEmail: string) { - if (!validateEmail(newEmail)) { - return yield* Effect.fail( - new InvalidEmailError({ email: newEmail }), - ); - } - - const s = yield* Action.state(c); - yield* Activity.make({ - name: "UpdateStripeEmail", - execute: updateStripeCustomerEmail(s.customerId, newEmail), - }); - yield* Action.updateState(c, (state) => { - state.email = newEmail; - }); - yield* Activity.make({ - name: "SendConfirmationEmail", - execute: sendResendEmailConfirmation(newEmail), - }); - }), - }, -}); diff --git a/examples/effect/src/effect/action.ts b/examples/effect/src/effect/action.ts deleted file mode 100644 index ff8bb1d47b..0000000000 --- a/examples/effect/src/effect/action.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { Effect } from "effect"; -import type { ActorContext, ActionContext } from "rivetkit"; -import type { YieldWrap } from "effect/Utils"; -import { ActorContextTag } from "./actor.ts"; - -export * from "./actor.ts"; - -// Local type alias to work around AnyDatabaseProvider not being exported -type AnyDB = undefined; - -export const getConn = < - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase extends AnyDB = AnyDB, ->( - c: ActionContext, -) => Effect.succeed(c.conn); - -export function effect< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase extends AnyDB = AnyDB, - AEff = void, - Args extends unknown[] = [], ->( - genFn: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ...args: Args - ) => Generator>, AEff, never>, -): ( - c: ActionContext, - ...args: Args -) => Promise { - return (c, ...args) => { - const gen = genFn(c, ...args); - const eff = Effect.gen>, AEff>( - () => gen, - ); - - // Provide ActorContext via Effect Context - const withContext = Effect.provideService( - eff, - ActorContextTag, - c, - ) as Effect.Effect; - - return Effect.runPromise(withContext); - }; -} - -export function workflow< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase extends AnyDB = AnyDB, - AEff = void, - Args extends unknown[] = [], ->( - genFn: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ...args: Args - ) => Generator>, AEff, never>, -): ( - c: ActionContext, - ...args: Args -) => Promise { - return (c, ...args) => { - const gen = genFn(c, ...args); - const eff = Effect.gen>, AEff>( - () => gen, - ); - - // Provide ActorContext via Effect Context - const withContext = Effect.provideService( - eff, - ActorContextTag, - c, - ) as Effect.Effect; - - // Make workflow execution durable by using waitUntil - const workflowPromise = Effect.runPromise(withContext); - c.waitUntil(workflowPromise.then(() => {})); - - return workflowPromise; - }; -} diff --git a/examples/effect/src/effect/log.ts b/examples/effect/src/effect/log.ts deleted file mode 100644 index 30a09d067a..0000000000 --- a/examples/effect/src/effect/log.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { Effect } from "effect"; -import { ActorContextTag, context } from "./actor.ts"; - -// Log namespace for structured logging -export namespace Log { - export const info = (message: string): Effect.Effect => - Effect.gen(function* () { - const c = yield* context(); - c.log.info(message); - }); - - export const warn = (message: string): Effect.Effect => - Effect.gen(function* () { - const c = yield* context(); - c.log.warn(message); - }); - - export const error = (message: string): Effect.Effect => - Effect.gen(function* () { - const c = yield* context(); - c.log.error(message); - }); - - export const debug = (message: string): Effect.Effect => - Effect.gen(function* () { - const c = yield* context(); - c.log.debug(message); - }); -} diff --git a/examples/effect/tests/effect.test.ts b/examples/effect/tests/effect.test.ts deleted file mode 100644 index 3d154260d4..0000000000 --- a/examples/effect/tests/effect.test.ts +++ /dev/null @@ -1,115 +0,0 @@ -import { setupTest } from "rivetkit/test"; -import { describe, expect, test } from "vitest"; -import { registry } from "../src/actors/index.ts"; - -describe("counter actor with Effect-wrapped actions", () => { - test("increment counter and get count", async (ctx) => { - const { client } = await setupTest(ctx, registry); - - const counter = client.counter.getOrCreate(["test-counter"]); - - // Initial count should be 0 - const initialCount = await counter.getCount(); - expect(initialCount).toBe(0); - - // Increment by 5 - const newCount = await counter.increment(5); - expect(newCount).toBe(5); - - // Increment again - const finalCount = await counter.increment(3); - expect(finalCount).toBe(8); - }); - - test("counter broadcasts events on increment", async (ctx) => { - const { client } = await setupTest(ctx, registry); - - const counter = client.counter.getOrCreate(["broadcast-counter"]); - - // Track broadcast events - const events: number[] = []; - counter.on("newCount", (count: number) => { - events.push(count); - }); - - await counter.increment(10); - await counter.increment(5); - - // Give time for events to propagate - await new Promise((resolve) => setTimeout(resolve, 100)); - - expect(events).toContain(10); - expect(events).toContain(15); - }); -}); - -describe("user actor with Effect workflows", () => { - test("create user with email and get email", async (ctx) => { - const { client } = await setupTest(ctx, registry); - - const user = await client.user.create(["user-1"], { - input: { email: "test@example.com" }, - }); - - const email = await user.getEmail(); - expect(email).toBe("test@example.com"); - }); - - test("update user email with workflow", async (ctx) => { - const { client } = await setupTest(ctx, registry); - - const user = await client.user.create(["user-2"], { - input: { email: "old@example.com" }, - }); - - // Update email through workflow - await user.updateEmail("new@example.com"); - - const email = await user.getEmail(); - expect(email).toBe("new@example.com"); - }); -}); - -describe("lifecycle-demo actor with Effect-wrapped hooks", () => { - test("onCreate hook is called on actor creation", async (ctx) => { - const { client } = await setupTest(ctx, registry); - - const demo = client.lifecycleDemo.getOrCreate(["lifecycle-1"]); - - // Give time for onCreate to complete - await new Promise((resolve) => setTimeout(resolve, 100)); - - const events = await demo.getEvents(); - expect(events).toContain("onCreate"); - }); - - test("onConnect and onDisconnect hooks track connections", async (ctx) => { - const { client } = await setupTest(ctx, registry); - - const demo = client.lifecycleDemo.getOrCreate(["lifecycle-2"]); - - // Give time for hooks to complete - await new Promise((resolve) => setTimeout(resolve, 100)); - - const count = await demo.getConnectionCount(); - // Connection count should be at least 1 (the current connection) - expect(count).toBeGreaterThanOrEqual(1); - }); - - test("clearEvents action works", async (ctx) => { - const { client } = await setupTest(ctx, registry); - - const demo = client.lifecycleDemo.getOrCreate(["lifecycle-3"]); - - // Get initial events (should have onCreate at minimum) - await new Promise((resolve) => setTimeout(resolve, 100)); - const initialEvents = await demo.getEvents(); - expect(initialEvents.length).toBeGreaterThan(0); - - // Clear events - await demo.clearEvents(); - - const clearedEvents = await demo.getEvents(); - expect(clearedEvents).toEqual([]); - }); -}); diff --git a/examples/effect/tests/simple.test.ts b/examples/effect/tests/simple.test.ts deleted file mode 100644 index e513398836..0000000000 --- a/examples/effect/tests/simple.test.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { setupTest } from "rivetkit/test"; -import { describe, expect, test } from "vitest"; -import { registry } from "../src/actors/index.ts"; - -describe("simple actor without Effect wrappers", () => { - test("getValue and setValue work", async (ctx) => { - const { client } = await setupTest(ctx, registry); - - const simple = client.simple.getOrCreate(["test-simple"]); - - // Initial value should be 0 - const initialValue = await simple.getValue(); - expect(initialValue).toBe(0); - - // Set to 42 - const newValue = await simple.setValue(42); - expect(newValue).toBe(42); - - // Get should return 42 - const finalValue = await simple.getValue(); - expect(finalValue).toBe(42); - }); -}); diff --git a/examples/effect/tsconfig.json b/examples/effect/tsconfig.json index 6e7f7a21e6..22aa25df8f 100644 --- a/examples/effect/tsconfig.json +++ b/examples/effect/tsconfig.json @@ -4,10 +4,6 @@ "lib": ["esnext"], "module": "esnext", "moduleResolution": "bundler", - "baseUrl": ".", - "paths": { - "@rivetkit/effect": ["./src/effect/index.ts"] - }, "types": ["node"], "noEmit": true, "strict": true, diff --git a/frontend/packages/example-registry/src/_gen.ts b/frontend/packages/example-registry/src/_gen.ts index 8b48e6041d..e6125df8c1 100644 --- a/frontend/packages/example-registry/src/_gen.ts +++ b/frontend/packages/example-registry/src/_gen.ts @@ -241,6 +241,20 @@ export const templates: Template[] = [ ], "noFrontend": true }, + { + "name": "effect", + "displayName": "Effect Integration", + "description": "Demonstrates how to integrate [Effect](https://effect.website/) with actors for functional, type-safe programming with powerful error handling and dependency injection.", + "technologies": [ + "rivet", + "effect", + "typescript" + ], + "tags": [ + "functional" + ], + "noFrontend": true + }, { "name": "elysia", "displayName": "Elysia Integration", diff --git a/frontend/packages/example-registry/src/const.ts b/frontend/packages/example-registry/src/const.ts index e17191042e..fa5bb9ecd2 100644 --- a/frontend/packages/example-registry/src/const.ts +++ b/frontend/packages/example-registry/src/const.ts @@ -17,7 +17,7 @@ export const TECHNOLOGIES = [ { name: "websocket", displayName: "WebSocket" }, { name: "typescript", displayName: "TypeScript" }, { name: "durable-streams", displayName: "Durable Streams" }, - { name: "react", displayName: "React" }, + { name: "effect", displayName: "Effect" }, ] as const; export const TAGS = [ @@ -27,6 +27,7 @@ export const TAGS = [ { name: "database", displayName: "Database" }, { name: "gaming", displayName: "Gaming" }, { name: "experimental", displayName: "Experimental" }, + { name: "functional", displayName: "Functional" }, ] as const; export type Technology = (typeof TECHNOLOGIES)[number]["name"]; diff --git a/package.json b/package.json index df91358bcf..6ee58181fd 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "@rivetkit/next-js": "workspace:*", "@rivetkit/db": "workspace:*", "@rivetkit/engine-api-full": "workspace:*", + "@rivetkit/effect": "workspace:*", "@types/react": "^19", "@types/react-dom": "^19", "@clerk/shared": "3.27.1" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0a525b01d5..478bad3bad 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -11,6 +11,7 @@ overrides: '@rivetkit/next-js': workspace:* '@rivetkit/db': workspace:* '@rivetkit/engine-api-full': workspace:* + '@rivetkit/effect': workspace:* '@types/react': ^19 '@types/react-dom': ^19 '@clerk/shared': 3.27.1 @@ -784,15 +785,15 @@ importers: examples/effect: dependencies: - '@effect/workflow': - specifier: ^0.15.1 - version: 0.15.2(@effect/experimental@0.57.11(@effect/platform@0.93.8(effect@3.19.14))(effect@3.19.14)(lmdb@3.4.4))(@effect/platform@0.93.8(effect@3.19.14))(@effect/rpc@0.72.2(@effect/platform@0.93.8(effect@3.19.14))(effect@3.19.14))(effect@3.19.14) '@hono/node-server': specifier: ^1.14.0 version: 1.19.7(hono@4.11.3) '@hono/node-ws': specifier: ^1.1.0 version: 1.3.0(@hono/node-server@1.19.7(hono@4.11.3))(hono@4.11.3) + '@rivetkit/effect': + specifier: workspace:* + version: link:../../rivetkit-typescript/packages/effect effect: specifier: ^3.19.9 version: 3.19.14 @@ -821,9 +822,6 @@ importers: typescript: specifier: ^5.7.3 version: 5.9.3 - vitest: - specifier: ^3.2.4 - version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1) examples/elysia: dependencies: @@ -2462,6 +2460,37 @@ importers: specifier: ^5.5.2 version: 5.9.3 + rivetkit-typescript/packages/effect: + dependencies: + rivetkit: + specifier: workspace:* + version: link:../rivetkit + devDependencies: + '@hono/node-server': + specifier: ^1.14.0 + version: 1.19.7(hono@4.11.3) + '@hono/node-ws': + specifier: ^1.1.0 + version: 1.3.0(@hono/node-server@1.19.7(hono@4.11.3))(hono@4.11.3) + '@types/node': + specifier: ^22.13.9 + version: 22.19.5 + effect: + specifier: ^3.19.9 + version: 3.19.14 + tsup: + specifier: ^8.4.0 + version: 8.5.1(@microsoft/api-extractor@7.53.2(@types/node@22.19.5))(jiti@1.21.7)(postcss@8.5.6)(tsx@4.20.6)(typescript@5.9.3)(yaml@2.8.2) + typescript: + specifier: ^5.7.3 + version: 5.9.3 + vite-tsconfig-paths: + specifier: ^5.1.4 + version: 5.1.4(typescript@5.9.3)(vite@7.2.2(@types/node@22.19.5)(jiti@1.21.7)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)(tsx@4.20.6)(yaml@2.8.2)) + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1) + rivetkit-typescript/packages/framework-base: dependencies: '@tanstack/store': @@ -4047,42 +4076,10 @@ packages: resolution: {tarball: https://pkg.pr.new/rivet-dev/durable-streams/@durable-streams/writer@0323b8bcf1c9b38f1014629e1a8b6c74cc662100} version: 0.0.0 - '@effect/experimental@0.57.11': - resolution: {integrity: sha512-M5uug3Drs/gyTHLfA+XzcIZQGUEV/Jn5yi1POki4oZswhpzNmsVTHl4THpxAordRKwa5lFvTSlsRP684YH7pSw==} - peerDependencies: - '@effect/platform': ^0.93.6 - effect: ^3.19.9 - ioredis: ^5 - lmdb: ^3 - peerDependenciesMeta: - ioredis: - optional: true - lmdb: - optional: true - '@effect/language-service@0.60.0': resolution: {integrity: sha512-elJDWHG5Naq3OkilPt9ZRn56JfSA3MhXUIlDx9RWJeScHm96kZ+HkZ3eFBxqROzXwD6Q2DTtFctFwOM0+QLZEA==} hasBin: true - '@effect/platform@0.93.8': - resolution: {integrity: sha512-xTEy6fyTy4ijmFC3afKgtvYtn/JyPoIov4ZSUWJZUv3VeOcUPNGrrqG6IJlWkXs3NhvSywKv7wc1kw3epCQVZw==} - peerDependencies: - effect: ^3.19.12 - - '@effect/rpc@0.72.2': - resolution: {integrity: sha512-BmTXybXCOq96D2r9mvSW/YdiTQs5CStnd4II+lfVKrMr3pMNERKLZ2LG37Tfm4Sy3Q8ire6IVVKO/CN+VR0uQQ==} - peerDependencies: - '@effect/platform': ^0.93.3 - effect: ^3.19.5 - - '@effect/workflow@0.15.2': - resolution: {integrity: sha512-UAo5QWEvyyKsnf4EQ7WL3zwiuZS4Wd5fmAxdpcpZSIxNOvsABp3DOuyRCiidD8l3sQhdPwES/UsVK4QOCQ7wew==} - peerDependencies: - '@effect/experimental': ^0.57.11 - '@effect/platform': ^0.93.6 - '@effect/rpc': ^0.72.2 - effect: ^3.19.10 - '@emnapi/runtime@1.7.1': resolution: {integrity: sha512-PVtJr5CmLwYAU9PZDMITZoR5iAOShYREoR45EyyLrbntV50mdePTgUn4AmOw90Ifcj+x2kRjdzr1HP3RrNiHGA==} @@ -10246,9 +10243,6 @@ packages: resolution: {integrity: sha512-aAWcW57uxVNrQZqFXjITpW3sIUQmHGG3qSb9mUah9MgMC4NeWhNOlNjXEYq3HjRAvL6arUviZGGJsBg6z0zsWA==} engines: {node: '>= 0.8'} - find-my-way-ts@0.1.6: - resolution: {integrity: sha512-a85L9ZoXtNAey3Y6Z+eBWW658kO/MwR7zIafkIUPUMf3isZG0NCs2pjW2wtjxAKuJPxMAsHUIP4ZPGv0o5gyTA==} - find-root@1.1.0: resolution: {integrity: sha512-NKfW6bec6GfKc0SGx1e07QZY9PE99u0Bft/0rzSD5k3sO/vwkVUpDUKVm5Gpp5Ue3YfShPFTX2070tDs5kB9Ng==} @@ -11939,9 +11933,6 @@ packages: muggle-string@0.3.1: resolution: {integrity: sha512-ckmWDJjphvd/FvZawgygcUeQCxzvohjFO5RxTjj4eq8kw359gFF3E1brjfI+viLMxss5JrHTDRHZvu2/tuy0Qg==} - multipasta@0.2.7: - resolution: {integrity: sha512-KPA58d68KgGil15oDqXjkUBEBYc00XvbPj5/X+dyzeo/lWm9Nc25pQRlf1D+gv4OpK7NM0J1odrbu9JNNGvynA==} - mz@2.7.0: resolution: {integrity: sha512-z81GNO7nnYMEhrGh9LeymoE4+Yr0Wn5McHIZMK5cfQCl+NDX08sCZgUc9/6MHni9IWuFLm1Z3HTCXu2z9fN62Q==} @@ -16537,36 +16528,8 @@ snapshots: '@durable-streams/client': https://pkg.pr.new/rivet-dev/durable-streams/@durable-streams/client@0323b8bcf1c9b38f1014629e1a8b6c74cc662100 fastq: 1.20.1 - '@effect/experimental@0.57.11(@effect/platform@0.93.8(effect@3.19.14))(effect@3.19.14)(lmdb@3.4.4)': - dependencies: - '@effect/platform': 0.93.8(effect@3.19.14) - effect: 3.19.14 - uuid: 11.1.0 - optionalDependencies: - lmdb: 3.4.4 - '@effect/language-service@0.60.0': {} - '@effect/platform@0.93.8(effect@3.19.14)': - dependencies: - effect: 3.19.14 - find-my-way-ts: 0.1.6 - msgpackr: 1.11.5 - multipasta: 0.2.7 - - '@effect/rpc@0.72.2(@effect/platform@0.93.8(effect@3.19.14))(effect@3.19.14)': - dependencies: - '@effect/platform': 0.93.8(effect@3.19.14) - effect: 3.19.14 - msgpackr: 1.11.5 - - '@effect/workflow@0.15.2(@effect/experimental@0.57.11(@effect/platform@0.93.8(effect@3.19.14))(effect@3.19.14)(lmdb@3.4.4))(@effect/platform@0.93.8(effect@3.19.14))(@effect/rpc@0.72.2(@effect/platform@0.93.8(effect@3.19.14))(effect@3.19.14))(effect@3.19.14)': - dependencies: - '@effect/experimental': 0.57.11(@effect/platform@0.93.8(effect@3.19.14))(effect@3.19.14)(lmdb@3.4.4) - '@effect/platform': 0.93.8(effect@3.19.14) - '@effect/rpc': 0.72.2(@effect/platform@0.93.8(effect@3.19.14))(effect@3.19.14) - effect: 3.19.14 - '@emnapi/runtime@1.7.1': dependencies: tslib: 2.8.1 @@ -23575,8 +23538,6 @@ snapshots: transitivePeerDependencies: - supports-color - find-my-way-ts@0.1.6: {} - find-root@1.1.0: {} find-up@4.1.0: @@ -25866,8 +25827,6 @@ snapshots: muggle-string@0.3.1: {} - multipasta@0.2.7: {} - mz@2.7.0: dependencies: any-promise: 1.3.0 @@ -28499,6 +28458,35 @@ snapshots: - tsx - yaml + tsup@8.5.1(@microsoft/api-extractor@7.53.2(@types/node@22.19.5))(jiti@1.21.7)(postcss@8.5.6)(tsx@4.20.6)(typescript@5.9.3)(yaml@2.8.2): + dependencies: + bundle-require: 5.1.0(esbuild@0.27.2) + cac: 6.7.14 + chokidar: 4.0.3 + consola: 3.4.2 + debug: 4.4.3 + esbuild: 0.27.2 + fix-dts-default-cjs-exports: 1.0.1 + joycon: 3.1.1 + picocolors: 1.1.1 + postcss-load-config: 6.0.1(jiti@1.21.7)(postcss@8.5.6)(tsx@4.20.6)(yaml@2.8.2) + resolve-from: 5.0.0 + rollup: 4.53.3 + source-map: 0.7.6 + sucrase: 3.35.1 + tinyexec: 0.3.2 + tinyglobby: 0.2.15 + tree-kill: 1.2.2 + optionalDependencies: + '@microsoft/api-extractor': 7.53.2(@types/node@22.19.5) + postcss: 8.5.6 + typescript: 5.9.3 + transitivePeerDependencies: + - jiti + - supports-color + - tsx + - yaml + tsup@8.5.1(@microsoft/api-extractor@7.53.2(@types/node@25.0.7))(jiti@1.21.7)(postcss@8.5.6)(tsx@4.20.6)(typescript@5.9.3)(yaml@2.8.2): dependencies: bundle-require: 5.1.0(esbuild@0.27.2) @@ -29229,7 +29217,7 @@ snapshots: vite-tsconfig-paths@5.1.4(typescript@5.9.2)(vite@5.4.20(@types/node@20.19.13)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)): dependencies: - debug: 4.4.1 + debug: 4.4.3 globrex: 0.1.2 tsconfck: 3.1.6(typescript@5.9.2) optionalDependencies: @@ -29240,7 +29228,7 @@ snapshots: vite-tsconfig-paths@5.1.4(typescript@5.9.2)(vite@7.2.2(@types/node@22.18.1)(jiti@1.21.7)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)(tsx@4.20.5)(yaml@2.8.2)): dependencies: - debug: 4.4.1 + debug: 4.4.3 globrex: 0.1.2 tsconfck: 3.1.6(typescript@5.9.2) optionalDependencies: @@ -29249,6 +29237,17 @@ snapshots: - supports-color - typescript + vite-tsconfig-paths@5.1.4(typescript@5.9.3)(vite@7.2.2(@types/node@22.19.5)(jiti@1.21.7)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)(tsx@4.20.6)(yaml@2.8.2)): + dependencies: + debug: 4.4.3 + globrex: 0.1.2 + tsconfck: 3.1.6(typescript@5.9.3) + optionalDependencies: + vite: 7.2.2(@types/node@22.19.5)(jiti@1.21.7)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)(tsx@4.20.6)(yaml@2.8.2) + transitivePeerDependencies: + - supports-color + - typescript + vite@5.4.20(@types/node@20.19.13)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1): dependencies: esbuild: 0.21.5 @@ -29414,6 +29413,27 @@ snapshots: yaml: 2.8.2 optional: true + vite@7.2.2(@types/node@22.19.5)(jiti@1.21.7)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)(tsx@4.20.6)(yaml@2.8.2): + dependencies: + esbuild: 0.25.12 + fdir: 6.5.0(picomatch@4.0.3) + picomatch: 4.0.3 + postcss: 8.5.6 + rollup: 4.53.3 + tinyglobby: 0.2.15 + optionalDependencies: + '@types/node': 22.19.5 + fsevents: 2.3.3 + jiti: 1.21.7 + less: 4.4.1 + lightningcss: 1.30.2 + sass: 1.93.2 + stylus: 0.62.0 + terser: 5.44.1 + tsx: 4.20.6 + yaml: 2.8.2 + optional: true + vite@7.2.2(@types/node@25.0.7)(jiti@1.21.7)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1)(tsx@4.20.6)(yaml@2.8.1): dependencies: esbuild: 0.25.12 diff --git a/examples/effect/src/actors/counter.ts b/rivetkit-typescript/packages/effect/fixtures/counter.ts similarity index 91% rename from examples/effect/src/actors/counter.ts rename to rivetkit-typescript/packages/effect/fixtures/counter.ts index a7213d8ad5..d78124fb91 100644 --- a/examples/effect/src/actors/counter.ts +++ b/rivetkit-typescript/packages/effect/fixtures/counter.ts @@ -1,5 +1,5 @@ import { actor } from "rivetkit"; -import { Action } from "../effect/index.ts"; +import { Action } from "../src/mod.ts"; // Counter actor - demonstrates basic Effect-wrapped actions export const counter = actor({ diff --git a/rivetkit-typescript/packages/effect/fixtures/registry.ts b/rivetkit-typescript/packages/effect/fixtures/registry.ts new file mode 100644 index 0000000000..c36f2f7641 --- /dev/null +++ b/rivetkit-typescript/packages/effect/fixtures/registry.ts @@ -0,0 +1,7 @@ +import { setup } from "rivetkit"; +import { counter } from "./counter.ts"; +import { simple } from "./simple.ts"; + +export const registry = setup({ + use: { counter, simple }, +}); diff --git a/examples/effect/src/actors/simple.ts b/rivetkit-typescript/packages/effect/fixtures/simple.ts similarity index 77% rename from examples/effect/src/actors/simple.ts rename to rivetkit-typescript/packages/effect/fixtures/simple.ts index c750d726b2..a10e7f6c11 100644 --- a/examples/effect/src/actors/simple.ts +++ b/rivetkit-typescript/packages/effect/fixtures/simple.ts @@ -1,6 +1,6 @@ import { actor } from "rivetkit"; -// Simple actor - plain functions without Effect wrappers for debugging +// Simple actor - plain functions without Effect wrappers for comparison export const simple = actor({ state: { value: 0, diff --git a/rivetkit-typescript/packages/effect/package.json b/rivetkit-typescript/packages/effect/package.json new file mode 100644 index 0000000000..7174b6aa99 --- /dev/null +++ b/rivetkit-typescript/packages/effect/package.json @@ -0,0 +1,56 @@ +{ + "name": "@rivetkit/effect", + "version": "2.0.39", + "description": "Effect integration for RivetKit actors", + "license": "Apache-2.0", + "keywords": [ + "rivetkit", + "effect", + "effect-ts", + "actors", + "functional" + ], + "sideEffects": [ + "./dist/chunk-*.js", + "./dist/chunk-*.cjs" + ], + "files": [ + "dist", + "package.json" + ], + "type": "module", + "exports": { + ".": { + "import": { + "types": "./dist/mod.d.ts", + "default": "./dist/mod.js" + }, + "require": { + "types": "./dist/mod.d.cts", + "default": "./dist/mod.cjs" + } + } + }, + "scripts": { + "build": "tsup src/mod.ts", + "check-types": "tsc --noEmit", + "test": "vitest run" + }, + "dependencies": { + "rivetkit": "workspace:^" + }, + "peerDependencies": { + "effect": "^3.0.0" + }, + "devDependencies": { + "@hono/node-server": "^1.14.0", + "@hono/node-ws": "^1.1.0", + "@types/node": "^22.13.9", + "effect": "^3.19.9", + "tsup": "^8.4.0", + "typescript": "^5.7.3", + "vite-tsconfig-paths": "^5.1.4", + "vitest": "^3.2.4" + }, + "stableVersion": "0.8.0" +} diff --git a/rivetkit-typescript/packages/effect/src/action.ts b/rivetkit-typescript/packages/effect/src/action.ts new file mode 100644 index 0000000000..781cfc4e9e --- /dev/null +++ b/rivetkit-typescript/packages/effect/src/action.ts @@ -0,0 +1,58 @@ +import { Effect } from "effect"; +import type { ActorContext, ActionContext } from "rivetkit"; +import type { YieldWrap } from "effect/Utils"; +import { ActorContextTag } from "./actor.ts"; + +export * from "./actor.ts"; + +export const getConn = < + TState, + TConnParams, + TConnState, + TVars, + TInput, +>( + c: ActionContext, +) => Effect.succeed(c.conn); + +export function effect< + TState, + TConnParams, + TConnState, + TVars, + TInput, + AEff = void, + Args extends unknown[] = [], +>( + genFn: ( + c: ActorContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + undefined + >, + ...args: Args + ) => Generator>, AEff, never>, +): ( + c: ActionContext, + ...args: Args +) => Promise { + return (c, ...args) => { + const gen = genFn(c, ...args); + const eff = Effect.gen>, AEff>( + () => gen, + ); + + // Provide ActorContext via Effect Context + const withContext = Effect.provideService( + eff, + ActorContextTag, + c, + ) as Effect.Effect; + + return Effect.runPromise(withContext); + }; +} + diff --git a/examples/effect/src/effect/actor.ts b/rivetkit-typescript/packages/effect/src/actor.ts similarity index 69% rename from examples/effect/src/effect/actor.ts rename to rivetkit-typescript/packages/effect/src/actor.ts index 81510d6fbb..bef6594c76 100644 --- a/examples/effect/src/effect/actor.ts +++ b/rivetkit-typescript/packages/effect/src/actor.ts @@ -2,10 +2,6 @@ import { Context, Effect } from "effect"; import type { ActorContext } from "rivetkit"; import type { YieldWrap } from "effect/Utils"; -// Local type alias to work around AnyDatabaseProvider not being exported -// AnyDatabaseProvider = DatabaseProvider | undefined -type AnyDB = undefined; - // Context tag for accessing ActorContext within Effects export const ActorContextTag = Context.GenericTag>( @@ -18,9 +14,8 @@ export const context = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >(): Effect.Effect< - ActorContext, + ActorContext, never, typeof ActorContextTag > => ActorContextTag as any; @@ -31,9 +26,8 @@ export const state = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, ): Effect.Effect => Effect.succeed(c.state); export const updateState = < @@ -42,9 +36,8 @@ export const updateState = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, f: (state: TState) => void, ): Effect.Effect => Effect.sync(() => f(c.state)); @@ -54,9 +47,8 @@ export const vars = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, ): Effect.Effect => Effect.succeed(c.vars); export const updateVars = < @@ -65,9 +57,8 @@ export const updateVars = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, f: (vars: TVars) => void, ): Effect.Effect => Effect.sync(() => f(c.vars)); @@ -77,10 +68,9 @@ export const broadcast = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, Args extends Array = unknown[], >( - c: ActorContext, + c: ActorContext, name: string, ...args: Args ): Effect.Effect => @@ -92,10 +82,9 @@ export const getLog = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, -) => Effect.succeed(c.log); + c: ActorContext, +): Effect.Effect => Effect.succeed(c.log); export const getActorId = < TState, @@ -103,9 +92,8 @@ export const getActorId = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, ): Effect.Effect => Effect.succeed(c.actorId); export const getName = < @@ -114,9 +102,8 @@ export const getName = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, ): Effect.Effect => Effect.succeed(c.name); export const getKey = < @@ -125,10 +112,9 @@ export const getKey = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, -) => Effect.succeed(c.key); + c: ActorContext, +): Effect.Effect => Effect.succeed(c.key); export const getRegion = < TState, @@ -136,9 +122,8 @@ export const getRegion = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, ): Effect.Effect => Effect.succeed(c.region); export const getSchedule = < @@ -147,10 +132,9 @@ export const getSchedule = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, -) => Effect.succeed(c.schedule); + c: ActorContext, +): Effect.Effect => Effect.succeed(c.schedule); export const getConns = < TState, @@ -158,10 +142,9 @@ export const getConns = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, -) => Effect.succeed(c.conns); + c: ActorContext, +): Effect.Effect => Effect.succeed(c.conns); export const getClient = < TState, @@ -169,10 +152,9 @@ export const getClient = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, -) => Effect.succeed(c.client()); + c: ActorContext, +): Effect.Effect => Effect.succeed(c.client()); export const getDb = < TState, @@ -180,10 +162,29 @@ export const getDb = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, -) => Effect.succeed(c.db); + c: ActorContext, +): Effect.Effect => Effect.succeed(c.db); + +export const getKv = < + TState, + TConnParams, + TConnState, + TVars, + TInput, +>( + c: ActorContext, +): Effect.Effect => Effect.succeed(c.kv); + +export const getQueue = < + TState, + TConnParams, + TConnState, + TVars, + TInput, +>( + c: ActorContext, +): Effect.Effect => Effect.succeed((c as any).queue); export const saveState = < TState, @@ -191,9 +192,8 @@ export const saveState = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, opts: Parameters[0], ): Effect.Effect => Effect.promise(() => c.saveState(opts)); @@ -203,11 +203,10 @@ export const waitUntil = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, A = any, E = any, >( - c: ActorContext, + c: ActorContext, effect: Effect.Effect, ): Effect.Effect => Effect.sync(() => { @@ -221,9 +220,8 @@ export const getAbortSignal = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, ): Effect.Effect => Effect.succeed(c.abortSignal); export const sleep = < @@ -232,9 +230,8 @@ export const sleep = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, ): Effect.Effect => Effect.sync(() => c.sleep()); export const destroy = < @@ -243,9 +240,8 @@ export const destroy = < TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( - c: ActorContext, + c: ActorContext, ): Effect.Effect => Effect.sync(() => c.destroy()); export function effect< @@ -254,7 +250,6 @@ export function effect< TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, AEff = void, >( genFn: ( @@ -264,11 +259,11 @@ export function effect< TConnState, TVars, TInput, - TDatabase + undefined >, ) => Generator>, AEff, never>, ): ( - c: ActorContext, + c: ActorContext, ) => Promise { return (c) => { const gen = genFn(c); @@ -283,53 +278,11 @@ export function effect< c, ) as Effect.Effect; - // Make workflow execution durable by using waitUntil - const workflowPromise = Effect.runPromise(withContext); - c.waitUntil(workflowPromise.then(() => {})); + // Make execution durable by using waitUntil + const effectPromise = Effect.runPromise(withContext); + c.waitUntil(effectPromise.then(() => {})); - return workflowPromise; + return effectPromise; }; } -export function workflow< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase extends AnyDB = AnyDB, - AEff = void, ->( - genFn: ( - c: ActorContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase - >, - ) => Generator>, AEff, never>, -): ( - c: ActorContext, -) => Promise { - return (c) => { - const gen = genFn(c); - const eff = Effect.gen>, AEff>( - () => gen, - ); - - // Provide ActorContext via Effect Context - const withContext = Effect.provideService( - eff, - ActorContextTag, - c, - ) as Effect.Effect; - - // Make workflow execution durable by using waitUntil - const workflowPromise = Effect.runPromise(withContext); - c.waitUntil(workflowPromise.then(() => {})); - - return workflowPromise; - }; -} diff --git a/examples/effect/src/effect/hooks.ts b/rivetkit-typescript/packages/effect/src/lifecycle.ts similarity index 84% rename from examples/effect/src/effect/hooks.ts rename to rivetkit-typescript/packages/effect/src/lifecycle.ts index c6e23f10d8..e0f9723105 100644 --- a/examples/effect/src/effect/hooks.ts +++ b/rivetkit-typescript/packages/effect/src/lifecycle.ts @@ -19,22 +19,19 @@ import type { import type { YieldWrap } from "effect/Utils"; import { ActorContextTag } from "./actor.ts"; -// Local type alias to work around AnyDatabaseProvider not being exported -type AnyDB = undefined; - // Pattern: Each namespace exports an `effect()` function that: // 1. Takes a generator function with the appropriate context // 2. Returns a function that RivetKit can call // 3. Runs the Effect and returns a Promise (or void for sync hooks) export namespace OnCreate { - export function effect( + export function effect( genFn: ( - c: CreateContext, + c: CreateContext, input: TInput, ) => Generator>, AEff, never>, ): ( - c: CreateContext, + c: CreateContext, input: TInput, ) => Promise { return (c, input) => { @@ -61,7 +58,6 @@ export namespace OnWake { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, AEff = void, >( genFn: ( @@ -71,11 +67,50 @@ export namespace OnWake { TConnState, TVars, TInput, - TDatabase + undefined + >, + ) => Generator>, AEff, never>, + ): ( + c: WakeContext, + ) => Promise { + return (c) => { + const gen = genFn(c); + const eff = Effect.gen>, AEff>( + () => gen, + ); + + const withContext = Effect.provideService( + eff, + ActorContextTag, + c as any, + ) as Effect.Effect; + + return Effect.runPromise(withContext); + }; + } +} + +export namespace Run { + export function effect< + TState, + TConnParams, + TConnState, + TVars, + TInput, + AEff = void, + >( + genFn: ( + c: WakeContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + undefined >, ) => Generator>, AEff, never>, ): ( - c: WakeContext, + c: WakeContext, ) => Promise { return (c) => { const gen = genFn(c); @@ -101,7 +136,6 @@ export namespace OnDestroy { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, AEff = void, >( genFn: ( @@ -111,7 +145,7 @@ export namespace OnDestroy { TConnState, TVars, TInput, - TDatabase + undefined >, ) => Generator>, AEff, never>, ): ( @@ -121,7 +155,7 @@ export namespace OnDestroy { TConnState, TVars, TInput, - TDatabase + undefined >, ) => Promise { return (c) => { @@ -148,7 +182,6 @@ export namespace OnSleep { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, AEff = void, >( genFn: ( @@ -158,11 +191,11 @@ export namespace OnSleep { TConnState, TVars, TInput, - TDatabase + undefined >, ) => Generator>, AEff, never>, ): ( - c: SleepContext, + c: SleepContext, ) => Promise { return (c) => { const gen = genFn(c); @@ -189,7 +222,6 @@ export namespace OnStateChange { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( genFn: ( c: StateChangeContext< @@ -198,7 +230,7 @@ export namespace OnStateChange { TConnState, TVars, TInput, - TDatabase + undefined >, newState: TState, ) => Generator>, void, never>, @@ -209,7 +241,7 @@ export namespace OnStateChange { TConnState, TVars, TInput, - TDatabase + undefined >, newState: TState, ) => void { @@ -236,15 +268,14 @@ export namespace OnBeforeConnect { TConnParams, TVars, TInput, - TDatabase extends AnyDB = AnyDB, AEff = void, >( genFn: ( - c: BeforeConnectContext, + c: BeforeConnectContext, params: TConnParams, ) => Generator>, AEff, never>, ): ( - c: BeforeConnectContext, + c: BeforeConnectContext, params: TConnParams, ) => Promise { return (c, params) => { @@ -271,7 +302,6 @@ export namespace OnConnect { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, AEff = void, >( genFn: ( @@ -281,9 +311,9 @@ export namespace OnConnect { TConnState, TVars, TInput, - TDatabase + undefined >, - conn: Conn, + conn: Conn, ) => Generator>, AEff, never>, ): ( c: ConnectContext< @@ -292,9 +322,9 @@ export namespace OnConnect { TConnState, TVars, TInput, - TDatabase + undefined >, - conn: Conn, + conn: Conn, ) => Promise { return (c, conn) => { const gen = genFn(c, conn); @@ -320,7 +350,6 @@ export namespace OnDisconnect { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, AEff = void, >( genFn: ( @@ -330,9 +359,9 @@ export namespace OnDisconnect { TConnState, TVars, TInput, - TDatabase + undefined >, - conn: Conn, + conn: Conn, ) => Generator>, AEff, never>, ): ( c: DisconnectContext< @@ -341,9 +370,9 @@ export namespace OnDisconnect { TConnState, TVars, TInput, - TDatabase + undefined >, - conn: Conn, + conn: Conn, ) => Promise { return (c, conn) => { const gen = genFn(c, conn); @@ -369,10 +398,9 @@ export namespace CreateConnState { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( genFn: ( - c: CreateConnStateContext, + c: CreateConnStateContext, params: TConnParams, ) => Generator< YieldWrap>, @@ -380,7 +408,7 @@ export namespace CreateConnState { never >, ): ( - c: CreateConnStateContext, + c: CreateConnStateContext, params: TConnParams, ) => Promise { return (c, params) => { @@ -408,7 +436,6 @@ export namespace OnBeforeActionResponse { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, Out = any, >( genFn: ( @@ -418,7 +445,7 @@ export namespace OnBeforeActionResponse { TConnState, TVars, TInput, - TDatabase + undefined >, name: string, args: unknown[], @@ -431,7 +458,7 @@ export namespace OnBeforeActionResponse { TConnState, TVars, TInput, - TDatabase + undefined >, name: string, args: unknown[], @@ -455,13 +482,13 @@ export namespace OnBeforeActionResponse { } export namespace CreateState { - export function effect( + export function effect( genFn: ( - c: CreateContext, + c: CreateContext, input: TInput, ) => Generator>, TState, never>, ): ( - c: CreateContext, + c: CreateContext, input: TInput, ) => Promise { return (c, input) => { @@ -482,13 +509,13 @@ export namespace CreateState { } export namespace CreateVars { - export function effect( + export function effect( genFn: ( - c: CreateVarsContext, + c: CreateVarsContext, driverCtx: any, ) => Generator>, TVars, never>, ): ( - c: CreateVarsContext, + c: CreateVarsContext, driverCtx: any, ) => Promise { return (c, driverCtx) => { @@ -515,7 +542,6 @@ export namespace OnRequest { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, >( genFn: ( c: RequestContext< @@ -524,7 +550,7 @@ export namespace OnRequest { TConnState, TVars, TInput, - TDatabase + undefined >, request: Request, ) => Generator>, Response, never>, @@ -535,7 +561,7 @@ export namespace OnRequest { TConnState, TVars, TInput, - TDatabase + undefined >, request: Request, ) => Promise { @@ -563,7 +589,6 @@ export namespace OnWebSocket { TConnState, TVars, TInput, - TDatabase extends AnyDB = AnyDB, AEff = void, >( genFn: ( @@ -573,7 +598,7 @@ export namespace OnWebSocket { TConnState, TVars, TInput, - TDatabase + undefined >, websocket: UniversalWebSocket, ) => Generator>, AEff, never>, @@ -584,7 +609,7 @@ export namespace OnWebSocket { TConnState, TVars, TInput, - TDatabase + undefined >, websocket: UniversalWebSocket, ) => Promise { diff --git a/rivetkit-typescript/packages/effect/src/log.ts b/rivetkit-typescript/packages/effect/src/log.ts new file mode 100644 index 0000000000..49d838ad06 --- /dev/null +++ b/rivetkit-typescript/packages/effect/src/log.ts @@ -0,0 +1,26 @@ +import { Effect } from "effect"; +import { ActorContextTag } from "./actor.ts"; + +export const info = (message: string, props?: Record) => + Effect.gen(function* () { + const ctx = yield* ActorContextTag; + ctx.log.info({ msg: message, ...props }); + }); + +export const warn = (message: string, props?: Record) => + Effect.gen(function* () { + const ctx = yield* ActorContextTag; + ctx.log.warn({ msg: message, ...props }); + }); + +export const error = (message: string, props?: Record) => + Effect.gen(function* () { + const ctx = yield* ActorContextTag; + ctx.log.error({ msg: message, ...props }); + }); + +export const debug = (message: string, props?: Record) => + Effect.gen(function* () { + const ctx = yield* ActorContextTag; + ctx.log.debug({ msg: message, ...props }); + }); diff --git a/examples/effect/src/effect/index.ts b/rivetkit-typescript/packages/effect/src/mod.ts similarity index 51% rename from examples/effect/src/effect/index.ts rename to rivetkit-typescript/packages/effect/src/mod.ts index b9e5db84d1..19167077ac 100644 --- a/examples/effect/src/effect/index.ts +++ b/rivetkit-typescript/packages/effect/src/mod.ts @@ -1,11 +1,14 @@ +// Re-export actor context helpers export * as Hook from "./actor.ts"; export * as Action from "./action.ts"; -export * from "./log.ts"; +export * as Log from "./log.ts"; +export * as Queue from "./queue.ts"; -// Export lifecycle hook namespaces +// Re-export lifecycle hook namespaces export { OnCreate, OnWake, + Run, OnDestroy, OnSleep, OnStateChange, @@ -18,4 +21,7 @@ export { CreateVars, OnRequest, OnWebSocket, -} from "./hooks.ts"; +} from "./lifecycle.ts"; + +// Re-export ActorContextTag for advanced usage +export { ActorContextTag } from "./actor.ts"; diff --git a/rivetkit-typescript/packages/effect/src/queue.ts b/rivetkit-typescript/packages/effect/src/queue.ts new file mode 100644 index 0000000000..ce79e8c922 --- /dev/null +++ b/rivetkit-typescript/packages/effect/src/queue.ts @@ -0,0 +1,48 @@ +import { Effect } from "effect"; +import type { ActorContext } from "rivetkit"; + +interface QueueReceiveOptions { + count?: number; + timeout?: number; +} + +interface QueueMessage { + id: bigint; + name: string; + body: unknown; + createdAt: number; +} + +/** + * Receives the next message from a single queue. + * Returns undefined if no message available or timeout reached. + */ +export const next = < + TState, + TConnParams, + TConnState, + TVars, + TInput, +>( + c: ActorContext, + name: string, + opts?: QueueReceiveOptions, +): Effect.Effect => + Effect.promise(() => (c as any).queue.next(name, opts)); + +/** + * Receives messages from multiple queues. + * Returns messages matching any of the queue names. + */ +export const nextMultiple = < + TState, + TConnParams, + TConnState, + TVars, + TInput, +>( + c: ActorContext, + names: string[], + opts?: QueueReceiveOptions, +): Effect.Effect => + Effect.promise(() => (c as any).queue.next(names, opts)); diff --git a/rivetkit-typescript/packages/effect/tests/effect.test.ts b/rivetkit-typescript/packages/effect/tests/effect.test.ts new file mode 100644 index 0000000000..6934e5abcf --- /dev/null +++ b/rivetkit-typescript/packages/effect/tests/effect.test.ts @@ -0,0 +1,74 @@ +import { describe, expect, test } from "vitest"; +import { setupTest } from "rivetkit/test"; +import { registry } from "../fixtures/registry.ts"; + +describe("Effect-wrapped actors", () => { + describe("simple actor without Effect", () => { + test("getValue and setValue work", async (ctx) => { + const { client } = await setupTest(ctx, registry); + + const simple = client.simple.getOrCreate(["test-simple"]); + + // Initial value should be 0 + const initialValue = await simple.getValue(); + expect(initialValue).toBe(0); + + // Set to 42 + const newValue = await simple.setValue(42); + expect(newValue).toBe(42); + + // Get should return 42 + const finalValue = await simple.getValue(); + expect(finalValue).toBe(42); + }); + }); + + describe("counter actor with Effect-wrapped actions", () => { + test("increment counter and get count", async (ctx) => { + const { client } = await setupTest(ctx, registry); + + const counter = client.counter.getOrCreate(["test-counter"]); + + // Initial count should be 0 + const initialCount = await counter.getCount(); + expect(initialCount).toBe(0); + + // Increment by 5 + const newCount = await counter.increment(5); + expect(newCount).toBe(5); + + // Increment again + const finalCount = await counter.increment(3); + expect(finalCount).toBe(8); + }); + + test("counter broadcasts events on increment", async (ctx) => { + const { client } = await setupTest(ctx, registry); + + // Use a unique key to avoid state pollution + const uniqueKey = `broadcast-counter-${crypto.randomUUID()}`; + const counter = client.counter.getOrCreate([uniqueKey]); + + // Connect to receive events + const conn = counter.connect(); + + // Track broadcast events + const events: number[] = []; + conn.on("newCount", (count: number) => { + events.push(count); + }); + + // Wait for connection to be established + await new Promise((resolve) => setTimeout(resolve, 50)); + + await counter.increment(10); + await counter.increment(5); + + // Give time for events to propagate + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(events).toContain(10); + expect(events).toContain(15); + }); + }); +}); diff --git a/rivetkit-typescript/packages/effect/tsconfig.json b/rivetkit-typescript/packages/effect/tsconfig.json new file mode 100644 index 0000000000..807b7a8ea8 --- /dev/null +++ b/rivetkit-typescript/packages/effect/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "target": "ES2020", + "useDefineForClassFields": true, + "lib": ["ES2020"], + "module": "ESNext", + "skipLibCheck": true, + "moduleResolution": "bundler", + "allowImportingTsExtensions": true, + "resolveJsonModule": true, + "isolatedModules": true, + "noEmit": true, + "strict": true, + "noUnusedLocals": true, + "noUnusedParameters": true, + "noFallthroughCasesInSwitch": true, + "paths": { + "@/*": ["./src/*"] + } + }, + "include": ["src", "tests", "fixtures"] +} diff --git a/rivetkit-typescript/packages/effect/tsup.config.ts b/rivetkit-typescript/packages/effect/tsup.config.ts new file mode 100644 index 0000000000..f363b829fd --- /dev/null +++ b/rivetkit-typescript/packages/effect/tsup.config.ts @@ -0,0 +1,4 @@ +import { defineConfig } from "tsup"; +import defaultConfig from "../../../tsup.base.ts"; + +export default defineConfig(defaultConfig); diff --git a/rivetkit-typescript/packages/effect/turbo.json b/rivetkit-typescript/packages/effect/turbo.json new file mode 100644 index 0000000000..29d4cb2625 --- /dev/null +++ b/rivetkit-typescript/packages/effect/turbo.json @@ -0,0 +1,4 @@ +{ + "$schema": "https://turbo.build/schema.json", + "extends": ["//"] +} diff --git a/examples/effect/vitest.config.ts b/rivetkit-typescript/packages/effect/vitest.config.ts similarity index 56% rename from examples/effect/vitest.config.ts rename to rivetkit-typescript/packages/effect/vitest.config.ts index f913a97abd..b8384f9f65 100644 --- a/examples/effect/vitest.config.ts +++ b/rivetkit-typescript/packages/effect/vitest.config.ts @@ -1,10 +1,10 @@ +import tsconfigPaths from "vite-tsconfig-paths"; import { defineConfig } from "vitest/config"; export default defineConfig({ - server: { - port: 5173, - }, + plugins: [tsconfigPaths()], test: { include: ["tests/**/*.test.ts"], + testTimeout: 30000, }, }); diff --git a/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts b/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts index 0a9733cddd..f7c5f1d6d0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts +++ b/rivetkit-typescript/packages/rivetkit/src/client/actor-handle.ts @@ -249,7 +249,6 @@ export class ActorHandleRaw { async resolve(): Promise { if ("getForKey" in this.#actorQuery) { const name = this.#actorQuery.getForKey.name; - const key = this.#actorQuery.getForKey.key; // Query the actor to get the id const { actorId } = await queryActor( @@ -260,9 +259,22 @@ export class ActorHandleRaw { this.#actorQuery = { getForId: { actorId, name } }; + return actorId; + } else if ("getOrCreateForKey" in this.#actorQuery) { + const name = this.#actorQuery.getOrCreateForKey.name; + + // Query the actor to get the id (will create if doesn't exist) + const { actorId } = await queryActor( + undefined, + this.#actorQuery, + this.#driver, + ); + + this.#actorQuery = { getForId: { actorId, name } }; + return actorId; } else if ("getForId" in this.#actorQuery) { - // SKip since it's already resolved + // Skip since it's already resolved return this.#actorQuery.getForId.actorId; } else if ("create" in this.#actorQuery) { // Cannot create a handle with this query