Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 64 additions & 103 deletions engine/artifacts/config-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/packages/config/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl Root {
}

// Validate force_shutdown_duration is greater than worker and guard shutdown durations
let worker = self.runtime.worker.shutdown_duration();
let worker = self.runtime.worker_shutdown_duration();
let guard = self.runtime.guard_shutdown_duration();
let force = self.runtime.force_shutdown_duration();
let max_graceful = worker.max(guard);
Expand Down
94 changes: 40 additions & 54 deletions engine/packages/config/src/config/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,25 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Runtime {
#[serde(default)]
pub worker: Worker,
#[serde(default)]
pub gasoline: Gasoline,
/// Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When
/// unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu.
pub worker_cpu_max: Option<usize>,
/// Determine load shedding ratio based on linear mapping on cpu usage. We will gradually
/// pull less workflows as the cpu usage increases. Units are in (permilli overall cpu usage, permilli)
/// Default:
/// | . .
/// 100% | _____ .
/// | .\ .
/// % wfs | . \ .
/// | . \.
/// 5% | . \_____
/// |_____.___.______
/// 0 70% 90%
/// avg cpu usage
worker_load_shedding_curve: Option<[(u64, u64); 2]>,
/// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM.
/// Defaults to 30 seconds.
worker_shutdown_duration: Option<u32>,
/// Time (in seconds) to allow for guard to wait for pending requests after receiving SIGTERM. Defaults
/// to 1 hour.
guard_shutdown_duration: Option<u32>,
Expand All @@ -20,9 +35,23 @@ pub struct Runtime {
/// Whether or not to allow running the engine when the previous version that was run is higher than
/// the current version.
allow_version_rollback: Option<bool>,
/// Time (in seconds) after completion before considering a workflow eligible for pruning. Defaults to 7
/// days. Set to 0 to never prune workflow data.
gasoline_prune_eligibility_duration: Option<u64>,
/// Time (in seconds) to periodically check for workflows to prune. Defaults to 12 hours.
gasoline_prune_interval_duration: Option<u64>,
}

impl Runtime {
pub fn worker_load_shedding_curve(&self) -> [(u64, u64); 2] {
self.worker_load_shedding_curve
.unwrap_or([(700, 1000), (900, 50)])
}

pub fn worker_shutdown_duration(&self) -> Duration {
Duration::from_secs(self.worker_shutdown_duration.unwrap_or(30) as u64)
}

pub fn guard_shutdown_duration(&self) -> Duration {
Duration::from_secs(self.guard_shutdown_duration.unwrap_or(60 * 60) as u64)
}
Expand All @@ -38,55 +67,9 @@ impl Runtime {
pub fn allow_version_rollback(&self) -> bool {
self.allow_version_rollback.unwrap_or_default()
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Worker {
/// Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When
/// unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu.
pub cpu_max: Option<usize>,
/// Determine load shedding ratio based on linear mapping on cpu usage. We will gradually
/// pull less workflows as the cpu usage increases. Units are in (permilli overall cpu usage, permilli)
/// Default:
/// | . .
/// 100% | _____ .
/// | .\ .
/// % wfs | . \ .
/// | . \.
/// 5% | . \_____
/// |_____.___.______
/// 0 70% 90%
/// avg cpu usage
load_shedding_curve: Option<[(u64, u64); 2]>,
/// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM.
/// Defaults to 30 seconds.
shutdown_duration: Option<u32>,
}

impl Worker {
pub fn load_shedding_curve(&self) -> [(u64, u64); 2] {
self.load_shedding_curve.unwrap_or([(700, 1000), (900, 50)])
}

pub fn shutdown_duration(&self) -> Duration {
Duration::from_secs(self.shutdown_duration.unwrap_or(30) as u64)
}
}

#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct Gasoline {
/// Time (in seconds) after completion before considering a workflow eligible for pruning. Defaults to 7
/// days. Set to 0 to never prune workflow data.
prune_eligibility_duration: Option<u64>,
/// Time (in seconds) to periodically check for workflows to prune. Defaults to 12 hours.
prune_interval_duration: Option<u64>,
}

impl Gasoline {
pub fn prune_eligibility_duration(&self) -> Option<Duration> {
if let Some(prune_eligibility_duration) = self.prune_eligibility_duration {
pub fn gasoline_prune_eligibility_duration(&self) -> Option<Duration> {
if let Some(prune_eligibility_duration) = self.gasoline_prune_eligibility_duration {
if prune_eligibility_duration == 0 {
None
} else {
Expand All @@ -97,7 +80,10 @@ impl Gasoline {
}
}

pub fn prune_interval_duration(&self) -> Duration {
Duration::from_secs(self.prune_interval_duration.unwrap_or(60 * 60 * 12))
pub fn gasoline_prune_interval_duration(&self) -> Duration {
Duration::from_secs(
self.gasoline_prune_interval_duration
.unwrap_or(60 * 60 * 12),
)
}
}
4 changes: 2 additions & 2 deletions engine/packages/gasoline-runtime/src/workflows/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub async fn gasoline_pruner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
async move {
ctx.activity(PruneInput {}).await?;

ctx.sleep(ctx.config().runtime.gasoline.prune_interval_duration())
ctx.sleep(ctx.config().runtime.gasoline_prune_interval_duration())
.await?;

Ok(Loop::<()>::Continue)
Expand Down Expand Up @@ -41,7 +41,7 @@ async fn prune(ctx: &ActivityCtx, _input: &PruneInput) -> Result<PruneOutput> {

// Check if pruning is enabled
let Some(prune_eligibility_duration) =
ctx.config().runtime.gasoline.prune_eligibility_duration()
ctx.config().runtime.gasoline_prune_eligibility_duration()
else {
return Ok(PruneOutput { prune_count: 0 });
};
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/gasoline/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,9 +1068,9 @@ impl Database for DatabaseKv {
self.system
.lock()
.await
.cpu_usage_ratio(self.config.runtime.worker.cpu_max)
.cpu_usage_ratio(self.config.runtime.worker_cpu_max)
};
let load_shed_curve = self.config.runtime.worker.load_shedding_curve();
let load_shed_curve = self.config.runtime.worker_load_shedding_curve();
let load_shed_ratio_x1000 = calc_pull_ratio(
(cpu_usage_ratio * 1000.0) as u64,
load_shed_curve[0].0,
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl Worker {

#[tracing::instrument(skip_all)]
async fn shutdown(mut self, mut term_signal: TermSignal) {
let shutdown_duration = self.config.runtime.worker.shutdown_duration();
let shutdown_duration = self.config.runtime.worker_shutdown_duration();

tracing::info!(
duration=?shutdown_duration,
Expand Down
Loading