Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/artifacts/config-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions engine/packages/config/src/config/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub struct Worker {
/// | . \.
/// 5% | . \_____
/// |_____.___.______
/// 0 60% 80%
/// 0 70% 90%
/// avg cpu usage
load_shedding_curve: Option<[(u64, u64); 2]>,
/// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM.
Expand All @@ -66,7 +66,7 @@ pub struct Worker {

impl Worker {
pub fn load_shedding_curve(&self) -> [(u64, u64); 2] {
self.load_shedding_curve.unwrap_or([(600, 1000), (800, 50)])
self.load_shedding_curve.unwrap_or([(700, 1000), (900, 50)])
}

pub fn shutdown_duration(&self) -> Duration {
Expand Down
12 changes: 10 additions & 2 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
ctx.listen_n::<Main>(256).await?
};

let now = util::timestamp::now();

for sig in signals {
match sig {
// NOTE: This is only received when allocated to mk1 runner
Expand Down Expand Up @@ -584,6 +582,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
Main::Wake(sig) => {
// Clear alarm
if let Some(alarm_ts) = state.alarm_ts {
let now = ctx.v(3).activity(GetTsInput {}).await?;

if now >= alarm_ts {
state.alarm_ts = None;
}
Expand Down Expand Up @@ -1225,6 +1225,14 @@ async fn handle_stopped(
Ok(StoppedResult::Continue)
}

#[derive(Debug, Serialize, Deserialize, Hash)]
struct GetTsInput {}

#[activity(GetTs)]
async fn get_ts(ctx: &ActivityCtx, input: &GetTsInput) -> Result<i64> {
Ok(util::timestamp::now())
}

#[message("pegboard_actor_create_complete")]
pub struct CreateComplete {}

Expand Down
68 changes: 59 additions & 9 deletions engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,31 @@ pub async fn spawn_actor(
})
.await?;
}
// Bump the pool so it can scale down
else if allocate_res.serverless {
let res = ctx
.v(2)
.signal(crate::workflows::runner_pool::Bump::default())
.to_workflow::<crate::workflows::runner_pool::Workflow>()
.tag("namespace_id", input.namespace_id)
.tag("runner_name", input.runner_name_selector.clone())
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) = res
.as_ref()
.err()
.and_then(|x| x.chain().find_map(|x| x.downcast_ref::<WorkflowError>()))
{
tracing::warn!(
namespace_id=%input.namespace_id,
runner_name=%input.runner_name_selector,
"serverless pool workflow not found, respective runner config likely deleted"
);
} else {
res?;
}
}

Ok(SpawnActorOutput::Destroy)
}
Expand Down Expand Up @@ -969,6 +994,31 @@ pub async fn spawn_actor(
runner_protocol_version,
})
} else {
// Bump the pool so it can scale down
if allocate_res.serverless {
let res = ctx
.v(2)
.signal(crate::workflows::runner_pool::Bump::default())
.to_workflow::<crate::workflows::runner_pool::Workflow>()
.tag("namespace_id", input.namespace_id)
.tag("runner_name", input.runner_name_selector.clone())
.send()
.await;

if let Some(WorkflowError::WorkflowNotFound) =
res.as_ref().err().and_then(|x| {
x.chain().find_map(|x| x.downcast_ref::<WorkflowError>())
}) {
tracing::warn!(
namespace_id=%input.namespace_id,
runner_name=%input.runner_name_selector,
"serverless pool workflow not found, respective runner config likely deleted"
);
} else {
res?;
}
}

Ok(SpawnActorOutput::Sleep)
}
}
Expand Down Expand Up @@ -1078,18 +1128,18 @@ pub async fn clear_pending_allocation(
let cleared = ctx
.udb()?
.run(|tx| async move {
let pending_alloc_key =
keys::subspace().pack(&keys::ns::PendingActorByRunnerNameSelectorKey::new(
input.namespace_id,
input.runner_name_selector.clone(),
input.pending_allocation_ts,
input.actor_id,
));
let tx = tx.with_subspace(keys::subspace());

let exists = tx.get(&pending_alloc_key, Serializable).await?.is_some();
let pending_alloc_key = keys::ns::PendingActorByRunnerNameSelectorKey::new(
input.namespace_id,
input.runner_name_selector.clone(),
input.pending_allocation_ts,
input.actor_id,
);
let exists = tx.exists(&pending_alloc_key, Serializable).await?;

if exists {
tx.clear(&pending_alloc_key);
tx.delete(&pending_alloc_key);

// If the pending actor key still exists, we must clear its desired slot because after this
// activity the actor will go to sleep or be destroyed. We don't clear the slot if the key
Expand Down
8 changes: 7 additions & 1 deletion engine/packages/pegboard/src/workflows/runner2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ pub(crate) struct AllocatePendingActorsInput {
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct AllocatePendingActorsOutput {
pub allocations: Vec<ActorAllocation>,
#[serde(default)]
pub attempted: usize,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -697,6 +699,7 @@ pub(crate) async fn allocate_pending_actors(
// Shuffle for good measure
pending_actors.shuffle(&mut rand::thread_rng());

let attempted = pending_actors.len();
let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();
let actor_allocation_candidate_sample_size = ctx
.config()
Expand Down Expand Up @@ -875,7 +878,10 @@ pub(crate) async fn allocate_pending_actors(
.collect()
.await;

Ok(AllocatePendingActorsOutput { allocations })
Ok(AllocatePendingActorsOutput {
allocations,
attempted,
})
}

#[derive(Debug, Serialize, Deserialize, Hash)]
Expand Down
Loading
Loading