diff --git a/engine/artifacts/config-schema.json b/engine/artifacts/config-schema.json index d800b7c2ea..da87de9fb3 100644 --- a/engine/artifacts/config-schema.json +++ b/engine/artifacts/config-schema.json @@ -1201,7 +1201,7 @@ "minimum": 0.0 }, "load_shedding_curve": { - "description": "Determine load shedding ratio based on linear mapping on cpu usage. We will gradually pull less workflows as the cpu usage increases. Units are in (permilli overall cpu usage, permilli) Default: | . . 100% | _____ . | .\\ . % wfs | . \\ . | . \\. 5% | . \\_____ |_____.___.______ 0 60% 80% avg cpu usage", + "description": "Determine load shedding ratio based on linear mapping on cpu usage. We will gradually pull less workflows as the cpu usage increases. Units are in (permilli overall cpu usage, permilli) Default: | . . 100% | _____ . | .\\ . % wfs | . \\ . | . \\. 5% | . \\_____ |_____.___.______ 0 70% 90% avg cpu usage", "type": [ "array", "null" diff --git a/engine/packages/config/src/config/runtime.rs b/engine/packages/config/src/config/runtime.rs index 2f2c495b85..31b9e92ac5 100644 --- a/engine/packages/config/src/config/runtime.rs +++ b/engine/packages/config/src/config/runtime.rs @@ -56,7 +56,7 @@ pub struct Worker { /// | . \. /// 5% | . \_____ /// |_____.___.______ - /// 0 60% 80% + /// 0 70% 90% /// avg cpu usage load_shedding_curve: Option<[(u64, u64); 2]>, /// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM. @@ -66,7 +66,7 @@ pub struct Worker { impl Worker { pub fn load_shedding_curve(&self) -> [(u64, u64); 2] { - self.load_shedding_curve.unwrap_or([(600, 1000), (800, 50)]) + self.load_shedding_curve.unwrap_or([(700, 1000), (900, 50)]) } pub fn shutdown_duration(&self) -> Duration { diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index dc832ca020..c182ecb27d 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -260,8 +260,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> ctx.listen_n::
(256).await? }; - let now = util::timestamp::now(); - for sig in signals { match sig { // NOTE: This is only received when allocated to mk1 runner @@ -584,6 +582,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> Main::Wake(sig) => { // Clear alarm if let Some(alarm_ts) = state.alarm_ts { + let now = ctx.v(3).activity(GetTsInput {}).await?; + if now >= alarm_ts { state.alarm_ts = None; } @@ -1225,6 +1225,14 @@ async fn handle_stopped( Ok(StoppedResult::Continue) } +#[derive(Debug, Serialize, Deserialize, Hash)] +struct GetTsInput {} + +#[activity(GetTs)] +async fn get_ts(ctx: &ActivityCtx, input: &GetTsInput) -> Result { + Ok(util::timestamp::now()) +} + #[message("pegboard_actor_create_complete")] pub struct CreateComplete {} diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index ec701bc201..3655fc0079 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -883,6 +883,31 @@ pub async fn spawn_actor( }) .await?; } + // Bump the pool so it can scale down + else if allocate_res.serverless { + let res = ctx + .v(2) + .signal(crate::workflows::runner_pool::Bump::default()) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = res + .as_ref() + .err() + .and_then(|x| x.chain().find_map(|x| x.downcast_ref::())) + { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + res?; + } + } Ok(SpawnActorOutput::Destroy) } @@ -969,6 +994,31 @@ pub async fn spawn_actor( runner_protocol_version, }) } else { + // Bump the pool so it can scale down + if allocate_res.serverless { + let res = ctx + .v(2) + .signal(crate::workflows::runner_pool::Bump::default()) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name_selector.clone()) + .send() + .await; + + if let Some(WorkflowError::WorkflowNotFound) = + res.as_ref().err().and_then(|x| { + x.chain().find_map(|x| x.downcast_ref::()) + }) { + tracing::warn!( + namespace_id=%input.namespace_id, + runner_name=%input.runner_name_selector, + "serverless pool workflow not found, respective runner config likely deleted" + ); + } else { + res?; + } + } + Ok(SpawnActorOutput::Sleep) } } @@ -1078,18 +1128,18 @@ pub async fn clear_pending_allocation( let cleared = ctx .udb()? .run(|tx| async move { - let pending_alloc_key = - keys::subspace().pack(&keys::ns::PendingActorByRunnerNameSelectorKey::new( - input.namespace_id, - input.runner_name_selector.clone(), - input.pending_allocation_ts, - input.actor_id, - )); + let tx = tx.with_subspace(keys::subspace()); - let exists = tx.get(&pending_alloc_key, Serializable).await?.is_some(); + let pending_alloc_key = keys::ns::PendingActorByRunnerNameSelectorKey::new( + input.namespace_id, + input.runner_name_selector.clone(), + input.pending_allocation_ts, + input.actor_id, + ); + let exists = tx.exists(&pending_alloc_key, Serializable).await?; if exists { - tx.clear(&pending_alloc_key); + tx.delete(&pending_alloc_key); // If the pending actor key still exists, we must clear its desired slot because after this // activity the actor will go to sleep or be destroyed. We don't clear the slot if the key diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs index 860214d0a5..90133f4ed8 100644 --- a/engine/packages/pegboard/src/workflows/runner2.rs +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -636,6 +636,8 @@ pub(crate) struct AllocatePendingActorsInput { #[derive(Debug, Serialize, Deserialize)] pub(crate) struct AllocatePendingActorsOutput { pub allocations: Vec, + #[serde(default)] + pub attempted: usize, } #[derive(Debug, Serialize, Deserialize)] @@ -697,6 +699,7 @@ pub(crate) async fn allocate_pending_actors( // Shuffle for good measure pending_actors.shuffle(&mut rand::thread_rng()); + let attempted = pending_actors.len(); let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold(); let actor_allocation_candidate_sample_size = ctx .config() @@ -875,7 +878,10 @@ pub(crate) async fn allocate_pending_actors( .collect() .await; - Ok(AllocatePendingActorsOutput { allocations }) + Ok(AllocatePendingActorsOutput { + allocations, + attempted, + }) } #[derive(Debug, Serialize, Deserialize, Hash)] diff --git a/engine/packages/pegboard/src/workflows/runner_pool.rs b/engine/packages/pegboard/src/workflows/runner_pool.rs index 8ea4bf9f1e..74e568712a 100644 --- a/engine/packages/pegboard/src/workflows/runner_pool.rs +++ b/engine/packages/pegboard/src/workflows/runner_pool.rs @@ -47,121 +47,124 @@ pub async fn pegboard_runner_pool(ctx: &mut WorkflowCtx, input: &Input) -> Resul .dispatch() .await?; - ctx.loope(LifecycleState::default(), |ctx, state| { - let input = input.clone(); - async move { - // Get desired count -> drain and start counts - let ReadDesiredOutput::Desired { - desired_count, - details_hash, - } = ctx.activity(ReadDesiredInput { - namespace_id: input.namespace_id, - runner_name: input.runner_name.clone(), - }) - .await? - else { - // Drain all - for runner in &state.runners { - ctx.signal(serverless::receiver::Drain {}) - .to_workflow_id(runner.receiver_wf_id) - .send() - .await?; - } - - return Ok(Loop::Break(())); - }; - - // Remove runners that have an outdated hash. This is done outside of the below draining mechanism - // because we drain specific runners, not just a number of runners - let (new, outdated) = std::mem::take(&mut state.runners) - .into_iter() - .partition::, _>(|r| r.details_hash == details_hash); - state.runners = new; - - for runner in outdated { - // TODO: Spawn sub wf to process these so this is not blocking the loop - ctx.signal(serverless::receiver::Drain {}) - .to_workflow_id(runner.receiver_wf_id) - .send() - .await?; - } + ctx.lupe() + .commit_interval(5) + .with_state(LifecycleState::default()) + .run(|ctx, state| { + let input = input.clone(); + async move { + // Get desired count -> drain and start counts + let ReadDesiredOutput::Desired { + desired_count, + details_hash, + } = ctx.activity(ReadDesiredInput { + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .await? + else { + // Drain all + for runner in &state.runners { + ctx.signal(serverless::receiver::Drain {}) + .to_workflow_id(runner.receiver_wf_id) + .send() + .await?; + } - let drain_count = state.runners.len().saturating_sub(desired_count); - let start_count = desired_count.saturating_sub(state.runners.len()); + return Ok(Loop::Break(())); + }; - // Drain unnecessary runners - if drain_count != 0 { - // TODO: Implement smart logic of draining runners with the lowest allocated actors - let draining_runners = state.runners.iter().take(drain_count).collect::>(); + // Remove runners that have an outdated hash. This is done outside of the below draining mechanism + // because we drain specific runners, not just a number of runners + let (new, outdated) = std::mem::take(&mut state.runners) + .into_iter() + .partition::, _>(|r| r.details_hash == details_hash); + state.runners = new; - // TODO: Spawn sub wf to process these so this is not blocking the loop - for runner in draining_runners { + for runner in outdated { + // TODO: Spawn sub wf to process these so this is not blocking the loop ctx.signal(serverless::receiver::Drain {}) .to_workflow_id(runner.receiver_wf_id) .send() .await?; } - } - // Dispatch new runner workflows - if start_count != 0 { - // TODO: Spawn sub wf to process these so this is not blocking the loop - for _ in 0..start_count { - let receiver_wf_id = ctx - .workflow(serverless::receiver::Input { - pool_wf_id: ctx.workflow_id(), - namespace_id: input.namespace_id, - runner_name: input.runner_name.clone(), - }) - .tag("namespace_id", input.namespace_id) - .tag("runner_name", input.runner_name.clone()) - .dispatch() - .await?; + let drain_count = state.runners.len().saturating_sub(desired_count); + let start_count = desired_count.saturating_sub(state.runners.len()); + + // Drain unnecessary runners + if drain_count != 0 { + // TODO: Implement smart logic of draining runners with the lowest allocated actors + let draining_runners = + state.runners.iter().take(drain_count).collect::>(); + + // TODO: Spawn sub wf to process these so this is not blocking the loop + for runner in draining_runners { + ctx.signal(serverless::receiver::Drain {}) + .to_workflow_id(runner.receiver_wf_id) + .send() + .await?; + } + } - state.runners.push(RunnerState { - receiver_wf_id, - details_hash, - }); + // Dispatch new runner workflows + if start_count != 0 { + // TODO: Spawn sub wf to process these so this is not blocking the loop + for _ in 0..start_count { + let receiver_wf_id = ctx + .workflow(serverless::receiver::Input { + pool_wf_id: ctx.workflow_id(), + namespace_id: input.namespace_id, + runner_name: input.runner_name.clone(), + }) + .tag("namespace_id", input.namespace_id) + .tag("runner_name", input.runner_name.clone()) + .dispatch() + .await?; + + state.runners.push(RunnerState { + receiver_wf_id, + details_hash, + }); + } } - } - // Wait for Bump or serverless signals until we tick again - for sig in ctx.listen_n::
(512).await? { - match sig { - Main::OutboundConnDrainStarted(sig) => { - let (new, drain_started) = - std::mem::take(&mut state.runners) + // Wait for Bump or serverless signals until we tick again + for sig in ctx.listen_n::
(256).await? { + match sig { + Main::OutboundConnDrainStarted(sig) => { + let (new, drain_started) = std::mem::take(&mut state.runners) .into_iter() .partition::, _>(|r| r.receiver_wf_id != sig.receiver_wf_id); - state.runners = new; - - for runner in drain_started { - // TODO: Spawn sub wf to process these so this is not blocking the loop - ctx.signal(serverless::receiver::Drain {}) - .to_workflow_id(runner.receiver_wf_id) - .send() - .await?; + state.runners = new; + + for runner in drain_started { + // TODO: Spawn sub wf to process these so this is not blocking the loop + ctx.signal(serverless::receiver::Drain {}) + .to_workflow_id(runner.receiver_wf_id) + .send() + .await?; + } } - } - Main::Bump(bump) => { - if bump.endpoint_config_changed { - // Forward to metadata poller to trigger immediate metadata fetch - ctx.signal(runner_pool_metadata_poller::EndpointConfigChanged {}) - .to_workflow::() - .tag("namespace_id", input.namespace_id) - .tag("runner_name", &input.runner_name) - .send() - .await?; + Main::Bump(bump) => { + if bump.endpoint_config_changed { + // Forward to metadata poller to trigger immediate metadata fetch + ctx.signal(runner_pool_metadata_poller::EndpointConfigChanged {}) + .to_workflow::() + .tag("namespace_id", input.namespace_id) + .tag("runner_name", &input.runner_name) + .send() + .await?; + } } } } - } - Ok(Loop::Continue) - } - .boxed() - }) - .await?; + Ok(Loop::Continue) + } + .boxed() + }) + .await?; Ok(()) } diff --git a/engine/packages/pegboard/src/workflows/serverless/conn.rs b/engine/packages/pegboard/src/workflows/serverless/conn.rs index a442a7a685..b00031139c 100644 --- a/engine/packages/pegboard/src/workflows/serverless/conn.rs +++ b/engine/packages/pegboard/src/workflows/serverless/conn.rs @@ -517,9 +517,7 @@ async fn finish_non_critical_draining( // Wait for runner to shut down tokio::select! { res = wait_for_shutdown_fut => return res.map_err(Into::into), - _ = tokio::time::sleep(Duration::from_millis(ctx.config().pegboard().serverless_drain_grace_period())) => { - tracing::debug!(?runner_id, "reached drain grace period before runner shut down") - } + _ = tokio::time::sleep(Duration::from_millis(ctx.config().pegboard().serverless_drain_grace_period())) => {} _ = term_signal.recv() => {} } @@ -539,7 +537,7 @@ async fn finish_non_critical_draining( #[tracing::instrument(skip_all)] async fn drain_runner(ctx: &ActivityCtx, runner_id: Id) -> Result<()> { let res = ctx - .signal(crate::workflows::runner::Stop { + .signal(crate::workflows::runner2::Stop { reset_actor_rescheduling: true, }) // This is ok, because runner_id changes every retry of outbound_req diff --git a/engine/sdks/typescript/test-runner/src/index.ts b/engine/sdks/typescript/test-runner/src/index.ts index de576b2eba..938b14f1fe 100644 --- a/engine/sdks/typescript/test-runner/src/index.ts +++ b/engine/sdks/typescript/test-runner/src/index.ts @@ -70,6 +70,10 @@ app.get("/has-actor", async (c) => { return c.text("ok"); }); +app.get("/health", (c) => { + return c.text("ok"); +}); + app.get("/shutdown", async (c) => { await runner?.shutdown(true); return c.text("ok"); diff --git a/scripts/tests/load-test/actor-lifecycle.js b/scripts/tests/load-test/actor-lifecycle.js index c9e67fa09d..b26e81f52a 100644 --- a/scripts/tests/load-test/actor-lifecycle.js +++ b/scripts/tests/load-test/actor-lifecycle.js @@ -87,8 +87,6 @@ export const options = { 'actor_destroy_success': ['rate>0.95'], 'actor_ping_success': ['rate>0.95'], 'websocket_success': ['rate>0.90'], - 'http_req_duration': ['p(95)<5000', 'p(99)<10000'], - // 'actor_create_duration': ['p(95)<3000'], }, noConnectionReuse: false, userAgent: 'k6-actor-lifecycle-test',