Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 10 additions & 21 deletions engine/packages/pegboard/src/workflows/serverless/conn.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::error::Error;
use std::time::Instant;

use anyhow::{Context, bail};
Expand Down Expand Up @@ -26,7 +25,7 @@ const X_RIVET_TOTAL_SLOTS: HeaderName = HeaderName::from_static("x-rivet-total-s
const X_RIVET_RUNNER_NAME: HeaderName = HeaderName::from_static("x-rivet-runner-name");
const X_RIVET_NAMESPACE_NAME: HeaderName = HeaderName::from_static("x-rivet-namespace-name");

const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(5);
const DRAIN_GRACE_PERIOD: Duration = Duration::from_secs(10);

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Input {
Expand Down Expand Up @@ -383,35 +382,25 @@ async fn outbound_req_inner(

bail!("invalid status code ({code}):\n{}", body_slice);
}
Err(sse::Error::Transport(err)) => {
report_error(
ctx,
input.namespace_id,
&input.runner_name,
RunnerPoolError::ServerlessConnectionError {
message: if let Some(source) = err.source() {
format!("{err}\n{source}")
} else {
err.to_string()
},
},
)
.await;

return Err(err.into());
}
Err(err) => {
let wrapped_err = anyhow::Error::from(err);

report_error(
ctx,
input.namespace_id,
&input.runner_name,
RunnerPoolError::ServerlessConnectionError {
message: err.to_string(),
// Print entire error chain
message: wrapped_err
.chain()
.map(|err| err.to_string())
.collect::<Vec<_>>()
.join("\n"),
},
)
.await;

return Err(err.into());
return Err(wrapped_err);
}
}
}
Expand Down
181 changes: 91 additions & 90 deletions rivetkit-typescript/packages/rivetkit/src/actor/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ export const ActorConfigSchema = z
createVarsTimeout: z.number().positive().default(5000),
createConnStateTimeout: z.number().positive().default(5000),
onConnectTimeout: z.number().positive().default(5000),
// This must be less than ACTOR_STOP_THRESHOLD_MS
// This must be less than engine config > pegboard.actor_stop_threshold
onSleepTimeout: z.number().positive().default(5000),
// This must be less than engine config > pegboard.actor_stop_threshold
onDestroyTimeout: z.number().positive().default(5000),
stateSaveInterval: z.number().positive().default(10_000),
actionTimeout: z.number().positive().default(60_000),
Expand Down Expand Up @@ -289,11 +290,11 @@ type CreateState<
> =
| { state: TState }
| {
createState: (
c: CreateContext<TState, TInput, TDatabase, TEvents, TQueues>,
input: TInput,
) => TState | Promise<TState>;
}
createState: (
c: CreateContext<TState, TInput, TDatabase, TEvents, TQueues>,
input: TInput,
) => TState | Promise<TState>;
}
| Record<never, never>;

// Creates connection state config
Expand All @@ -313,18 +314,18 @@ type CreateConnState<
> =
| { connState: TConnState }
| {
createConnState: (
c: CreateConnStateContext<
TState,
TVars,
TInput,
TDatabase,
TEvents,
TQueues
>,
params: TConnParams,
) => TConnState | Promise<TConnState>;
}
createConnState: (
c: CreateConnStateContext<
TState,
TVars,
TInput,
TDatabase,
TEvents,
TQueues
>,
params: TConnParams,
) => TConnState | Promise<TConnState>;
}
| Record<never, never>;

// Creates vars config
Expand All @@ -344,26 +345,26 @@ type CreateVars<
TQueues extends QueueSchemaConfig,
> =
| {
/**
* @experimental
*/
vars: TVars;
}
/**
* @experimental
*/
vars: TVars;
}
| {
/**
* @experimental
*/
createVars: (
c: CreateVarsContext<
TState,
TInput,
TDatabase,
TEvents,
TQueues
>,
driverCtx: any,
) => TVars | Promise<TVars>;
}
/**
* @experimental
*/
createVars: (
c: CreateVarsContext<
TState,
TInput,
TDatabase,
TEvents,
TQueues
>,
driverCtx: any,
) => TVars | Promise<TVars>;
}
| Record<never, never>;

export interface Actions<
Expand Down Expand Up @@ -417,43 +418,43 @@ export type CanInvokeTarget<
TQueues extends QueueSchemaConfig,
> =
| {
kind: "action";
name: CanInvokeActionName<TActions>;
}
kind: "action";
name: CanInvokeActionName<TActions>;
}
| {
kind: "subscribe";
name: CanInvokeSubscribeName<TEvents>;
}
kind: "subscribe";
name: CanInvokeSubscribeName<TEvents>;
}
| {
kind: "queue";
name: CanInvokeQueueName<TQueues>;
}
kind: "queue";
name: CanInvokeQueueName<TQueues>;
}
| {
kind: "request";
}
kind: "request";
}
| {
kind: "websocket";
};
kind: "websocket";
};

export type AnyCanInvokeTarget =
| {
kind: "action";
name: string;
}
kind: "action";
name: string;
}
| {
kind: "subscribe";
name: string;
}
kind: "subscribe";
name: string;
}
| {
kind: "queue";
name: string;
}
kind: "queue";
name: string;
}
| {
kind: "request";
}
kind: "request";
}
| {
kind: "websocket";
};
kind: "websocket";
};

interface BaseActorConfig<
TState,
Expand Down Expand Up @@ -570,28 +571,28 @@ interface BaseActorConfig<
* @returns Void or a Promise. If the promise exits, the actor crashes.
*/
run?:
| ((
c: RunContext<
TState,
TConnParams,
TConnState,
TVars,
TInput,
TDatabase,
TEvents,
TQueues
>,
) => void | Promise<void>)
| RunConfig<
TState,
TConnParams,
TConnState,
TVars,
TInput,
TDatabase,
TEvents,
TQueues
>;
| ((
c: RunContext<
TState,
TConnParams,
TConnState,
TVars,
TInput,
TDatabase,
TEvents,
TQueues
>,
) => void | Promise<void>)
| RunConfig<
TState,
TConnParams,
TConnState,
TVars,
TInput,
TDatabase,
TEvents,
TQueues
>;

/**
* Called when the actor's state changes.
Expand Down Expand Up @@ -819,11 +820,11 @@ interface BaseActorConfig<

type ActorDatabaseConfig<TDatabase extends AnyDatabaseProvider> =
| {
/**
* @experimental
*/
db: TDatabase;
}
/**
* @experimental
*/
db: TDatabase;
}
| Record<never, never>;

// 1. Infer schema
Expand Down
Loading