diff --git a/packages/common/chirp-workflow/core/src/ctx/activity.rs b/packages/common/chirp-workflow/core/src/ctx/activity.rs index ce4b9d4c30..6378b4bdbd 100644 --- a/packages/common/chirp-workflow/core/src/ctx/activity.rs +++ b/packages/common/chirp-workflow/core/src/ctx/activity.rs @@ -223,6 +223,11 @@ impl ActivityCtx { self.conn.clickhouse().await } + /// Access the SQLite database for this workflow. This cannot access any other database. + pub async fn sqlite(&self) -> Result { + self.conn.sqlite(format!("{}-data", self.workflow_id)).await + } + // Backwards compatibility pub fn op_ctx(&self) -> &rivet_operation::OperationContext<()> { &self.op_ctx diff --git a/packages/common/connection/src/lib.rs b/packages/common/connection/src/lib.rs index da43d8a19f..a9c7cb6e8e 100644 --- a/packages/common/connection/src/lib.rs +++ b/packages/common/connection/src/lib.rs @@ -94,6 +94,10 @@ impl Connection { self.pools.redis("ephemeral") } + pub async fn sqlite(&self, key: impl AsRef) -> Result { + self.pools.sqlite(key).await + } + pub fn perf(&self) -> &chirp_perf::PerfCtx { self.client.perf() } diff --git a/packages/common/operation/core/src/lib.rs b/packages/common/operation/core/src/lib.rs index 7f9cf30cb3..d61890518b 100644 --- a/packages/common/operation/core/src/lib.rs +++ b/packages/common/operation/core/src/lib.rs @@ -240,6 +240,7 @@ where self.conn.cache_handle() } + /// Used by compat layer for chirp workflow. pub fn pools(&self) -> &rivet_pools::Pools { self.conn.pools() } diff --git a/packages/services/edge/pegboard/src/workflows/actor/migrations.rs b/packages/services/edge/pegboard/src/workflows/actor/migrations.rs index b39a2cad04..5e65bc1948 100644 --- a/packages/services/edge/pegboard/src/workflows/actor/migrations.rs +++ b/packages/services/edge/pegboard/src/workflows/actor/migrations.rs @@ -1,8 +1,6 @@ use chirp_workflow::prelude::*; -use super::Input; - -pub fn run(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { +pub async fn run(ctx: &mut WorkflowCtx) -> GlobalResult<()> { ctx.activity(MigrateInitInput {}).await?; Ok(()) @@ -12,15 +10,18 @@ pub fn run(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { struct MigrateInitInput {} #[activity(MigrateInit)] -async fn migrate_init(ctx: &ActivityCtx, &MigrateInitInput) -> GlobalResult<()> { +async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalResult<()> { + let pool = ctx.sqlite().await?; + sql_execute!( - [ctx] + [ctx, pool] " CREATE TABLE test ( ) ", ) - .await - .map_err(Into::into) + .await?; + + Ok(()) } diff --git a/packages/services/edge/pegboard/src/workflows/actor/mod.rs b/packages/services/edge/pegboard/src/workflows/actor/mod.rs index 6a7c1131e0..e21c3775d8 100644 --- a/packages/services/edge/pegboard/src/workflows/actor/mod.rs +++ b/packages/services/edge/pegboard/src/workflows/actor/mod.rs @@ -1,5 +1,7 @@ use chirp_workflow::prelude::*; +mod migrations; + #[derive(Debug, Serialize, Deserialize)] pub struct Input { pub actor_id: Uuid, @@ -7,5 +9,7 @@ pub struct Input { #[workflow] pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { + migrations::run(ctx).await?; + Ok(()) } diff --git a/packages/services/edge/pegboard/src/workflows/client/migrations.rs b/packages/services/edge/pegboard/src/workflows/client/migrations.rs index b39a2cad04..de65778390 100644 --- a/packages/services/edge/pegboard/src/workflows/client/migrations.rs +++ b/packages/services/edge/pegboard/src/workflows/client/migrations.rs @@ -1,8 +1,6 @@ use chirp_workflow::prelude::*; -use super::Input; - -pub fn run(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { +pub async fn run(ctx: &mut WorkflowCtx) -> GlobalResult<()> { ctx.activity(MigrateInitInput {}).await?; Ok(()) @@ -12,15 +10,18 @@ pub fn run(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { struct MigrateInitInput {} #[activity(MigrateInit)] -async fn migrate_init(ctx: &ActivityCtx, &MigrateInitInput) -> GlobalResult<()> { +async fn migrate_init(ctx: &ActivityCtx, _input: &MigrateInitInput) -> GlobalResult<()> { + let pool = ctx.sqlite().await?; + sql_execute!( - [ctx] + [ctx, pool] " CREATE TABLE test ( ) ", ) - .await - .map_err(Into::into) + .await?; + + Ok(()) } diff --git a/packages/services/edge/pegboard/src/workflows/client/mod.rs b/packages/services/edge/pegboard/src/workflows/client/mod.rs index ebaf2f2c78..a7c594134f 100644 --- a/packages/services/edge/pegboard/src/workflows/client/mod.rs +++ b/packages/services/edge/pegboard/src/workflows/client/mod.rs @@ -14,7 +14,11 @@ pub struct Input { #[workflow] pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> { - migrations::run(ctx, input).await?; + migrations::run(ctx).await?; + + ctx.activity(PublishRegisteredInput { + client_id: input.client_id, + }).await?; ctx.repeat(|ctx| { let client_id = input.client_id; @@ -85,6 +89,17 @@ pub async fn pegboard_client(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu Main::Command(command) => { handle_commands(ctx, client_id, vec![command]).await?; } + Main::PrewarmImage(sig) => { + ctx.msg(ToWs { + client_id, + inner: protocol::ToClient::PrewarmImage { + image_id: sig.image_id, + image_artifact_url_stub: sig.image_artifact_url_stub, + }, + }) + .send() + .await?; + } Main::Drain(_) => { ctx.activity(SetDrainInput { client_id, @@ -577,12 +592,20 @@ async fn fetch_all_actors( Ok(actor_ids) } +#[signal("pegboard_client_registered")] +pub struct Registered {} + #[message("pegboard_client_to_ws")] pub struct ToWs { pub client_id: Uuid, pub inner: protocol::ToClient, } +#[signal("pegboard_prewarm_image")] +pub struct PrewarmImage { + pub image_id: Uuid, + pub image_artifact_url_stub: String, +} #[message("pegboard_client_close_ws")] pub struct CloseWs { pub client_id: Uuid, @@ -606,6 +629,7 @@ join_signal!(Main { Command(protocol::Command), // Forwarded from the ws to this workflow Forward(protocol::ToServer), + PrewarmImage, Drain, Undrain, Destroy,