From 06bdd41765875df5cc9f50bc24fcf69610b7b9ab Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Fri, 27 Feb 2026 15:43:53 -0800 Subject: [PATCH] chore: flatten runtime config --- engine/artifacts/config-schema.json | 167 +++++++----------- engine/packages/config/src/config/mod.rs | 2 +- engine/packages/config/src/config/runtime.rs | 94 +++++----- .../gasoline-runtime/src/workflows/pruner.rs | 4 +- engine/packages/gasoline/src/db/kv/mod.rs | 4 +- engine/packages/gasoline/src/worker.rs | 2 +- 6 files changed, 110 insertions(+), 163 deletions(-) diff --git a/engine/artifacts/config-schema.json b/engine/artifacts/config-schema.json index 320ab0b625..535f01a143 100644 --- a/engine/artifacts/config-schema.json +++ b/engine/artifacts/config-schema.json @@ -132,16 +132,12 @@ "default": { "allow_version_rollback": null, "force_shutdown_duration": null, - "gasoline": { - "prune_eligibility_duration": null, - "prune_interval_duration": null - }, + "gasoline_prune_eligibility_duration": null, + "gasoline_prune_interval_duration": null, "guard_shutdown_duration": null, - "worker": { - "cpu_max": null, - "load_shedding_curve": null, - "shutdown_duration": null - } + "worker_cpu_max": null, + "worker_load_shedding_curve": null, + "worker_shutdown_duration": null }, "allOf": [ { @@ -352,30 +348,6 @@ }, "additionalProperties": false }, - "Gasoline": { - "type": "object", - "properties": { - "prune_eligibility_duration": { - "description": "Time (in seconds) after completion before considering a workflow eligible for pruning. Defaults to 7 days. Set to 0 to never prune workflow data.", - "type": [ - "integer", - "null" - ], - "format": "uint64", - "minimum": 0.0 - }, - "prune_interval_duration": { - "description": "Time (in seconds) to periodically check for workflows to prune. Defaults to 12 hours.", - "type": [ - "integer", - "null" - ], - "format": "uint64", - "minimum": 0.0 - } - }, - "additionalProperties": false - }, "Guard": { "type": "object", "properties": { @@ -893,16 +865,23 @@ "format": "uint32", "minimum": 0.0 }, - "gasoline": { - "default": { - "prune_eligibility_duration": null, - "prune_interval_duration": null - }, - "allOf": [ - { - "$ref": "#/definitions/Gasoline" - } - ] + "gasoline_prune_eligibility_duration": { + "description": "Time (in seconds) after completion before considering a workflow eligible for pruning. Defaults to 7 days. Set to 0 to never prune workflow data.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "gasoline_prune_interval_duration": { + "description": "Time (in seconds) to periodically check for workflows to prune. Defaults to 12 hours.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 }, "guard_shutdown_duration": { "description": "Time (in seconds) to allow for guard to wait for pending requests after receiving SIGTERM. Defaults to 1 hour.", @@ -913,17 +892,49 @@ "format": "uint32", "minimum": 0.0 }, - "worker": { - "default": { - "cpu_max": null, - "load_shedding_curve": null, - "shutdown_duration": null + "worker_cpu_max": { + "description": "Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu.", + "type": [ + "integer", + "null" + ], + "format": "uint", + "minimum": 0.0 + }, + "worker_load_shedding_curve": { + "description": "Determine load shedding ratio based on linear mapping on cpu usage. We will gradually pull less workflows as the cpu usage increases. Units are in (permilli overall cpu usage, permilli) Default: | . . 100% | _____ . | .\\ . % wfs | . \\ . | . \\. 5% | . \\_____ |_____.___.______ 0 70% 90% avg cpu usage", + "type": [ + "array", + "null" + ], + "items": { + "type": "array", + "items": [ + { + "type": "integer", + "format": "uint64", + "minimum": 0.0 + }, + { + "type": "integer", + "format": "uint64", + "minimum": 0.0 + } + ], + "maxItems": 2, + "minItems": 2 }, - "allOf": [ - { - "$ref": "#/definitions/Worker" - } - ] + "maxItems": 2, + "minItems": 2 + }, + "worker_shutdown_duration": { + "description": "Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM. Defaults to 30 seconds.", + "type": [ + "integer", + "null" + ], + "format": "uint32", + "minimum": 0.0 } }, "additionalProperties": false @@ -989,56 +1000,6 @@ } }, "additionalProperties": false - }, - "Worker": { - "type": "object", - "properties": { - "cpu_max": { - "description": "Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu.", - "type": [ - "integer", - "null" - ], - "format": "uint", - "minimum": 0.0 - }, - "load_shedding_curve": { - "description": "Determine load shedding ratio based on linear mapping on cpu usage. We will gradually pull less workflows as the cpu usage increases. Units are in (permilli overall cpu usage, permilli) Default: | . . 100% | _____ . | .\\ . % wfs | . \\ . | . \\. 5% | . \\_____ |_____.___.______ 0 70% 90% avg cpu usage", - "type": [ - "array", - "null" - ], - "items": { - "type": "array", - "items": [ - { - "type": "integer", - "format": "uint64", - "minimum": 0.0 - }, - { - "type": "integer", - "format": "uint64", - "minimum": 0.0 - } - ], - "maxItems": 2, - "minItems": 2 - }, - "maxItems": 2, - "minItems": 2 - }, - "shutdown_duration": { - "description": "Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM. Defaults to 30 seconds.", - "type": [ - "integer", - "null" - ], - "format": "uint32", - "minimum": 0.0 - } - }, - "additionalProperties": false } } } \ No newline at end of file diff --git a/engine/packages/config/src/config/mod.rs b/engine/packages/config/src/config/mod.rs index 88c60d782d..63eb3630fa 100644 --- a/engine/packages/config/src/config/mod.rs +++ b/engine/packages/config/src/config/mod.rs @@ -204,7 +204,7 @@ impl Root { } // Validate force_shutdown_duration is greater than worker and guard shutdown durations - let worker = self.runtime.worker.shutdown_duration(); + let worker = self.runtime.worker_shutdown_duration(); let guard = self.runtime.guard_shutdown_duration(); let force = self.runtime.force_shutdown_duration(); let max_graceful = worker.max(guard); diff --git a/engine/packages/config/src/config/runtime.rs b/engine/packages/config/src/config/runtime.rs index 31b9e92ac5..60919abfea 100644 --- a/engine/packages/config/src/config/runtime.rs +++ b/engine/packages/config/src/config/runtime.rs @@ -6,10 +6,25 @@ use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] #[serde(deny_unknown_fields)] pub struct Runtime { - #[serde(default)] - pub worker: Worker, - #[serde(default)] - pub gasoline: Gasoline, + /// Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When + /// unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu. + pub worker_cpu_max: Option, + /// Determine load shedding ratio based on linear mapping on cpu usage. We will gradually + /// pull less workflows as the cpu usage increases. Units are in (permilli overall cpu usage, permilli) + /// Default: + /// | . . + /// 100% | _____ . + /// | .\ . + /// % wfs | . \ . + /// | . \. + /// 5% | . \_____ + /// |_____.___.______ + /// 0 70% 90% + /// avg cpu usage + worker_load_shedding_curve: Option<[(u64, u64); 2]>, + /// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM. + /// Defaults to 30 seconds. + worker_shutdown_duration: Option, /// Time (in seconds) to allow for guard to wait for pending requests after receiving SIGTERM. Defaults /// to 1 hour. guard_shutdown_duration: Option, @@ -20,9 +35,23 @@ pub struct Runtime { /// Whether or not to allow running the engine when the previous version that was run is higher than /// the current version. allow_version_rollback: Option, + /// Time (in seconds) after completion before considering a workflow eligible for pruning. Defaults to 7 + /// days. Set to 0 to never prune workflow data. + gasoline_prune_eligibility_duration: Option, + /// Time (in seconds) to periodically check for workflows to prune. Defaults to 12 hours. + gasoline_prune_interval_duration: Option, } impl Runtime { + pub fn worker_load_shedding_curve(&self) -> [(u64, u64); 2] { + self.worker_load_shedding_curve + .unwrap_or([(700, 1000), (900, 50)]) + } + + pub fn worker_shutdown_duration(&self) -> Duration { + Duration::from_secs(self.worker_shutdown_duration.unwrap_or(30) as u64) + } + pub fn guard_shutdown_duration(&self) -> Duration { Duration::from_secs(self.guard_shutdown_duration.unwrap_or(60 * 60) as u64) } @@ -38,55 +67,9 @@ impl Runtime { pub fn allow_version_rollback(&self) -> bool { self.allow_version_rollback.unwrap_or_default() } -} -#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] -#[serde(deny_unknown_fields)] -pub struct Worker { - /// Adjusts worker curve around this value (in millicores, i.e. 1000 = 1 core). Is not a hard limit. When - /// unset, uses /sys/fs/cgroup/cpu.max, and if that is unset uses total host cpu. - pub cpu_max: Option, - /// Determine load shedding ratio based on linear mapping on cpu usage. We will gradually - /// pull less workflows as the cpu usage increases. Units are in (permilli overall cpu usage, permilli) - /// Default: - /// | . . - /// 100% | _____ . - /// | .\ . - /// % wfs | . \ . - /// | . \. - /// 5% | . \_____ - /// |_____.___.______ - /// 0 70% 90% - /// avg cpu usage - load_shedding_curve: Option<[(u64, u64); 2]>, - /// Time (in seconds) to allow for the gasoline worker engine to stop gracefully after receiving SIGTERM. - /// Defaults to 30 seconds. - shutdown_duration: Option, -} - -impl Worker { - pub fn load_shedding_curve(&self) -> [(u64, u64); 2] { - self.load_shedding_curve.unwrap_or([(700, 1000), (900, 50)]) - } - - pub fn shutdown_duration(&self) -> Duration { - Duration::from_secs(self.shutdown_duration.unwrap_or(30) as u64) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)] -#[serde(deny_unknown_fields)] -pub struct Gasoline { - /// Time (in seconds) after completion before considering a workflow eligible for pruning. Defaults to 7 - /// days. Set to 0 to never prune workflow data. - prune_eligibility_duration: Option, - /// Time (in seconds) to periodically check for workflows to prune. Defaults to 12 hours. - prune_interval_duration: Option, -} - -impl Gasoline { - pub fn prune_eligibility_duration(&self) -> Option { - if let Some(prune_eligibility_duration) = self.prune_eligibility_duration { + pub fn gasoline_prune_eligibility_duration(&self) -> Option { + if let Some(prune_eligibility_duration) = self.gasoline_prune_eligibility_duration { if prune_eligibility_duration == 0 { None } else { @@ -97,7 +80,10 @@ impl Gasoline { } } - pub fn prune_interval_duration(&self) -> Duration { - Duration::from_secs(self.prune_interval_duration.unwrap_or(60 * 60 * 12)) + pub fn gasoline_prune_interval_duration(&self) -> Duration { + Duration::from_secs( + self.gasoline_prune_interval_duration + .unwrap_or(60 * 60 * 12), + ) } } diff --git a/engine/packages/gasoline-runtime/src/workflows/pruner.rs b/engine/packages/gasoline-runtime/src/workflows/pruner.rs index a0419a1cc6..68ada307d5 100644 --- a/engine/packages/gasoline-runtime/src/workflows/pruner.rs +++ b/engine/packages/gasoline-runtime/src/workflows/pruner.rs @@ -13,7 +13,7 @@ pub async fn gasoline_pruner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> async move { ctx.activity(PruneInput {}).await?; - ctx.sleep(ctx.config().runtime.gasoline.prune_interval_duration()) + ctx.sleep(ctx.config().runtime.gasoline_prune_interval_duration()) .await?; Ok(Loop::<()>::Continue) @@ -41,7 +41,7 @@ async fn prune(ctx: &ActivityCtx, _input: &PruneInput) -> Result { // Check if pruning is enabled let Some(prune_eligibility_duration) = - ctx.config().runtime.gasoline.prune_eligibility_duration() + ctx.config().runtime.gasoline_prune_eligibility_duration() else { return Ok(PruneOutput { prune_count: 0 }); }; diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 3be187cd45..0346510cbc 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -1068,9 +1068,9 @@ impl Database for DatabaseKv { self.system .lock() .await - .cpu_usage_ratio(self.config.runtime.worker.cpu_max) + .cpu_usage_ratio(self.config.runtime.worker_cpu_max) }; - let load_shed_curve = self.config.runtime.worker.load_shedding_curve(); + let load_shed_curve = self.config.runtime.worker_load_shedding_curve(); let load_shed_ratio_x1000 = calc_pull_ratio( (cpu_usage_ratio * 1000.0) as u64, load_shed_curve[0].0, diff --git a/engine/packages/gasoline/src/worker.rs b/engine/packages/gasoline/src/worker.rs index 6631b401bb..5d196eed60 100644 --- a/engine/packages/gasoline/src/worker.rs +++ b/engine/packages/gasoline/src/worker.rs @@ -289,7 +289,7 @@ impl Worker { #[tracing::instrument(skip_all)] async fn shutdown(mut self, mut term_signal: TermSignal) { - let shutdown_duration = self.config.runtime.worker.shutdown_duration(); + let shutdown_duration = self.config.runtime.worker_shutdown_duration(); tracing::info!( duration=?shutdown_duration,