diff --git a/engine/packages/pegboard/src/workflows/serverless/conn.rs b/engine/packages/pegboard/src/workflows/serverless/conn.rs index ac35ae92ac..6646d45a2b 100644 --- a/engine/packages/pegboard/src/workflows/serverless/conn.rs +++ b/engine/packages/pegboard/src/workflows/serverless/conn.rs @@ -1,4 +1,3 @@ -use std::error::Error; use std::time::Instant; use anyhow::{Context, bail}; @@ -26,7 +25,7 @@ const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-s const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name"); const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name"); -const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5); +const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(10); #[derive(Debug, Serialize, Deserialize, Clone)] pub struct Input { @@ -383,35 +382,25 @@ async fn outbound_req_inner( bail!("invalid status code ({code}):\n{}", body_slice); } - Err(sse::Error::Transport(err)) => { - report_error( - ctx, - input.namespace_id, - &input.runner_name, - RunnerPoolError::ServerlessConnectionError { - message: if let Some(source) = err.source() { - format!("{err}\n{source}") - } else { - err.to_string() - }, - }, - ) - .await; - - return Err(err.into()); - } Err(err) => { + let wrapped_err = anyhow::Error::from(err); + report_error( ctx, input.namespace_id, &input.runner_name, RunnerPoolError::ServerlessConnectionError { - message: err.to_string(), + // Print entire error chain + message: wrapped_err + .chain() + .map(|err| err.to_string()) + .collect::>() + .join("\n"), }, ) .await; - return Err(err.into()); + return Err(wrapped_err); } } } diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts index e772dd5680..004e32e62b 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/config.ts @@ -210,8 +210,9 @@ export const ActorConfigSchema = z createVarsTimeout: z.number().positive().default(5000), createConnStateTimeout: z.number().positive().default(5000), onConnectTimeout: z.number().positive().default(5000), - // This must be less than ACTOR_STOP_THRESHOLD_MS + // This must be less than engine config > pegboard.actor_stop_threshold onSleepTimeout: z.number().positive().default(5000), + // This must be less than engine config > pegboard.actor_stop_threshold onDestroyTimeout: z.number().positive().default(5000), stateSaveInterval: z.number().positive().default(10_000), actionTimeout: z.number().positive().default(60_000), @@ -289,11 +290,11 @@ type CreateState< > = | { state: TState } | { - createState: ( - c: CreateContext, - input: TInput, - ) => TState | Promise; - } + createState: ( + c: CreateContext, + input: TInput, + ) => TState | Promise; + } | Record; // Creates connection state config @@ -313,18 +314,18 @@ type CreateConnState< > = | { connState: TConnState } | { - createConnState: ( - c: CreateConnStateContext< - TState, - TVars, - TInput, - TDatabase, - TEvents, - TQueues - >, - params: TConnParams, - ) => TConnState | Promise; - } + createConnState: ( + c: CreateConnStateContext< + TState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + params: TConnParams, + ) => TConnState | Promise; + } | Record; // Creates vars config @@ -344,26 +345,26 @@ type CreateVars< TQueues extends QueueSchemaConfig, > = | { - /** - * @experimental - */ - vars: TVars; - } + /** + * @experimental + */ + vars: TVars; + } | { - /** - * @experimental - */ - createVars: ( - c: CreateVarsContext< - TState, - TInput, - TDatabase, - TEvents, - TQueues - >, - driverCtx: any, - ) => TVars | Promise; - } + /** + * @experimental + */ + createVars: ( + c: CreateVarsContext< + TState, + TInput, + TDatabase, + TEvents, + TQueues + >, + driverCtx: any, + ) => TVars | Promise; + } | Record; export interface Actions< @@ -417,43 +418,43 @@ export type CanInvokeTarget< TQueues extends QueueSchemaConfig, > = | { - kind: "action"; - name: CanInvokeActionName; - } + kind: "action"; + name: CanInvokeActionName; + } | { - kind: "subscribe"; - name: CanInvokeSubscribeName; - } + kind: "subscribe"; + name: CanInvokeSubscribeName; + } | { - kind: "queue"; - name: CanInvokeQueueName; - } + kind: "queue"; + name: CanInvokeQueueName; + } | { - kind: "request"; - } + kind: "request"; + } | { - kind: "websocket"; - }; + kind: "websocket"; + }; export type AnyCanInvokeTarget = | { - kind: "action"; - name: string; - } + kind: "action"; + name: string; + } | { - kind: "subscribe"; - name: string; - } + kind: "subscribe"; + name: string; + } | { - kind: "queue"; - name: string; - } + kind: "queue"; + name: string; + } | { - kind: "request"; - } + kind: "request"; + } | { - kind: "websocket"; - }; + kind: "websocket"; + }; interface BaseActorConfig< TState, @@ -570,28 +571,28 @@ interface BaseActorConfig< * @returns Void or a Promise. If the promise exits, the actor crashes. */ run?: - | (( - c: RunContext< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase, - TEvents, - TQueues - >, - ) => void | Promise) - | RunConfig< - TState, - TConnParams, - TConnState, - TVars, - TInput, - TDatabase, - TEvents, - TQueues - >; + | (( + c: RunContext< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >, + ) => void | Promise) + | RunConfig< + TState, + TConnParams, + TConnState, + TVars, + TInput, + TDatabase, + TEvents, + TQueues + >; /** * Called when the actor's state changes. @@ -819,11 +820,11 @@ interface BaseActorConfig< type ActorDatabaseConfig = | { - /** - * @experimental - */ - db: TDatabase; - } + /** + * @experimental + */ + db: TDatabase; + } | Record; // 1. Infer schema