Skip to content
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.

## [Unreleased]

- The Stackable scaler now ensures that a `TrinoCluster` has changed to `ready` more than 5 seconds
ago before marking it as `ready` ([#68]).

[#68]: https://github.com/stackabletech/trino-lb/pull/68

## [0.5.0] - 2025-03-14

### Added
Expand Down
2 changes: 1 addition & 1 deletion trino-lb/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ fn main() -> Result<(), MainError> {
assert!(n > 0, "{ENV_WORKER_THREADS:?} cannot be set to 0");
n
}
// We default to at least 2 workers
// We default to at least 3 workers
Err(std::env::VarError::NotPresent) => usize::max(3, num_cpus::get()),
Err(std::env::VarError::NotUnicode(e)) => {
panic!("{ENV_WORKER_THREADS:?} must be valid unicode, error: {e:?}")
Expand Down
107 changes: 84 additions & 23 deletions trino-lb/src/scaling/stackable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time;
use kube::{
Api, Client, Discovery,
api::{Patch, PatchParams},
Expand All @@ -16,6 +18,7 @@ use trino_lb_core::{
use super::ScalerTrait;

const K8S_FIELD_MANAGER: &str = "trino-lb";
const MIN_READY_SECONDS_SINCE_LAST_TRANSITION: i64 = 5;

#[derive(Snafu, Debug)]
pub enum Error {
Expand Down Expand Up @@ -123,6 +126,16 @@ pub enum Error {
cluster: TrinoClusterName,
namespace: String,
},

#[snafu(display(
"Could not parse the lastTransitionTime {last_transition_time:?} for the Trino cluster {cluster:?} in namespace {namespace:?}"
))]
ParseLastTransitionTime {
source: serde_json::Error,
last_transition_time: Value,
cluster: TrinoClusterName,
namespace: String,
},
}

pub struct StackableScaler {
Expand Down Expand Up @@ -160,7 +173,7 @@ impl StackableScaler {
};
let (trino_resource, _) = discovery
.resolve_gvk(&trino_gvk)
.context(ResolveGvkSnafu { gvk: trino_gvk })?;
.with_context(|| ResolveGvkSnafu { gvk: trino_gvk })?;

for cluster in trino_cluster_groups
.values()
Expand All @@ -185,13 +198,13 @@ impl StackableScaler {
let api: Api<DynamicObject> =
Api::namespaced_with(client.clone(), &cluster.namespace, &trino_resource);

let trino = api
.get_opt(&cluster.name)
.await
.context(ReadTrinoClusterSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
})?;
let trino =
api.get_opt(&cluster.name)
.await
.with_context(|_| ReadTrinoClusterSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
})?;

if trino.is_none() {
TrinoClusterNotFoundSnafu {
Expand Down Expand Up @@ -219,7 +232,7 @@ impl StackableScaler {
let cluster = self
.clusters
.get(cluster)
.context(ClusterNotFoundSnafu { cluster })?;
.with_context(|| ClusterNotFoundSnafu { cluster })?;

let patch = serde_json::json!({
"apiVersion": "trino.stackable.tech/v1alpha1",
Expand All @@ -241,7 +254,7 @@ impl StackableScaler {
.patch(&cluster.name, &params, &patch)
.instrument(debug_span!("Patching Trino cluster"))
.await
.context(PatchTrinoClusterSnafu {
.with_context(|_| PatchTrinoClusterSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
})?;
Expand All @@ -266,14 +279,14 @@ impl ScalerTrait for StackableScaler {
let cluster = self
.clusters
.get(cluster)
.context(ClusterNotFoundSnafu { cluster })?;
.with_context(|| ClusterNotFoundSnafu { cluster })?;

let status = cluster
.api
.get_status(&cluster.name)
.instrument(debug_span!("Get Trino cluster status"))
.await
.context(GetTrinoClusterStatusSnafu {
.with_context(|_| GetTrinoClusterStatusSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
})?;
Expand All @@ -286,47 +299,90 @@ impl ScalerTrait for StackableScaler {
let conditions = status
.data
.get("status")
.context(StatusFieldMissingInTrinoClusterSnafu {
.with_context(|| StatusFieldMissingInTrinoClusterSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
})?
.get("conditions")
.context(StatusConditionsFieldMissingInTrinoClusterSnafu {
.with_context(|| StatusConditionsFieldMissingInTrinoClusterSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
})?
.as_array()
.context(StatusConditionsFieldIsNotArraySnafu {
.with_context(|| StatusConditionsFieldIsNotArraySnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
})?;

let available = conditions
.iter()
.find(|c| c.get("type") == Some(&Value::String("Available".to_string())))
.context(NoAvailableEntryInStatusConditionsListSnafu {
.with_context(|| NoAvailableEntryInStatusConditionsListSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
})?;

let available = available.get("status").context(
let status = available.get("status").with_context(|| {
NoStatusInAvailableEntryInStatusConditionsListSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
},
)?;
}
})?;

let available = match available {
Value::String(available) if available == "True" => true,
Value::String(available) if available == "False" => false,
let is_available = match status {
Value::String(status) if status == "True" => true,
Value::String(status) if status == "False" => false,
_ => StatusNotParsableInAvailableEntryInStatusConditionsListSnafu {
cluster: &cluster.name,
namespace: &cluster.namespace,
}
.fail()?,
};

Ok(available)
// Return early if the cluster is not available
if !is_available {
return Ok(false);
}

// Careful investigation has shown that trino-lb can quickly react to TrinoClusters coming
// available. When trying to immediately send queries to Trino we encountered errors such as:
//
// WARN trino_lb::http_server::v1::statement: Error while processing request
// error=SendQueryToTrino { source: ContactTrinoPostQuery { source: reqwest::Error { kind: Request,
// url: "https://trino-m-1-coordinator-default.default.svc.cluster.local:8443/v1/statement",
// source: hyper_util::client::legacy::Error(Connect, ConnectError("dns error", Custom { kind: Uncategorized,
// error: "failed to lookup address information: Name or service not known" })) } } }
//
// This is because the coordinator is ready but it might take some additional time for the
// DNS record of the Service to propagate.
// To prevent that we only consider TrinoClusters healthy if a minimum amount of seconds
// passed after it was marked as healthy.

// It's valid for the lastTransitionTime to be not set, we assume the cluster is old in this
// case
if let Some(last_transition_time) = available.get("lastTransitionTime") {
let last_transition_time: Time = serde_json::from_value(last_transition_time.clone())
.with_context(|_| ParseLastTransitionTimeSnafu {
last_transition_time: last_transition_time.clone(),
cluster: &cluster.name,
namespace: &cluster.namespace,
})?;

let seconds_since_last_transition = elapsed_seconds_since(last_transition_time.0);
if seconds_since_last_transition < MIN_READY_SECONDS_SINCE_LAST_TRANSITION {
tracing::debug!(
seconds_since_last_transition,
min_ready_seconds_since_last_transition =
MIN_READY_SECONDS_SINCE_LAST_TRANSITION,
"The trino cluster recently turned ready, not marking as ready yet"
);

return Ok(false);
}
}

// All checks succeeded, TrinoCluster is ready
Ok(true)
}

#[instrument(name = "StackableScaler::is_activated", skip(self))]
Expand Down Expand Up @@ -358,3 +414,8 @@ impl ScalerTrait for StackableScaler {
})?)
}
}

fn elapsed_seconds_since(datetime: DateTime<Utc>) -> i64 {
let now = Utc::now();
(now - datetime).num_seconds()
}
Loading