Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "k8s-controller"
version = "0.7.0"
version = "0.8.0"
edition = "2024"
rust-version = "1.89.0"

Expand All @@ -17,6 +17,7 @@ kube = { version = "2.0.1", default-features = false, features = ["client"] }
kube-runtime = "2.0.1"
rand = "0.9.2"
serde = "1"
thiserror = "2.0.17"
tracing = "0.1"

[dev-dependencies]
Expand Down
65 changes: 49 additions & 16 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::BTreeMap;
use std::error::Error;
use std::error::Error as _;
use std::sync::{Arc, Mutex};
use std::time::Duration;

Expand All @@ -14,6 +14,14 @@ use kube_runtime::watcher;
use rand::{Rng, rng};
use tracing::{Level, event};

#[derive(Debug, thiserror::Error)]
pub enum Error<E: std::error::Error + 'static> {
#[error("{0}")]
ControllerError(#[source] E),
#[error("{0}")]
FinalizerError(#[source] kube_runtime::finalizer::Error<E>),
}

/// The [`Controller`] watches a set of resources, calling methods on the
/// provided [`Context`] when events occur.
pub struct Controller<Ctx: Context>
Expand All @@ -36,7 +44,6 @@ impl<Ctx: Context> Controller<Ctx>
where
Ctx: Send + Sync + 'static,
Ctx::Error: Send + Sync + 'static,
Ctx::Resource: Send + Sync + 'static,
Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
for<'de> Ctx::Resource: serde::Deserialize<'de>,
<Ctx::Resource as Resource>::DynamicType:
Expand Down Expand Up @@ -213,15 +220,18 @@ where
pub trait Context {
/// The type of Kubernetes [resource](Resource) that will be watched by
/// the [`Controller`] this context is passed to
type Resource: Resource;
type Resource: Resource + Send + Sync + 'static;
/// The error type which will be returned by the [`apply`](Self::apply)
/// and [`cleanup`](Self::apply) methods
/// and [`cleanup`](Self::cleanup) methods
type Error: std::error::Error;

/// The name to use for the finalizer. This must be unique across
/// controllers - if multiple controllers with the same finalizer name
/// run against the same resource, unexpected behavior can occur.
const FINALIZER_NAME: &'static str;
///
/// If this is None (the default), a finalizer will not be used, and
/// cleanup events will not be reported.
const FINALIZER_NAME: Option<&'static str> = None;

/// This method is called when a watched resource is created or updated.
/// The [`Client`] used by the controller is passed in to allow making
Expand All @@ -243,11 +253,19 @@ pub trait Context {
/// be performed, otherwise if `None` is returned,
/// [`success_action`](Self::success_action) will be called to find the
/// action to perform.
///
/// Note that this method will only be called if a finalizer is used.
async fn cleanup(
&self,
client: Client,
resource: &Self::Resource,
) -> Result<Option<Action>, Self::Error>;
) -> Result<Option<Action>, Self::Error> {
// use a better name for the parameter name in the docs
let _client = client;
let _resource = resource;

Ok(Some(Action::await_change()))
}

/// This method is called when a call to [`apply`](Self::apply) or
/// [`cleanup`](Self::cleanup) returns `Ok(None)`. It should return the
Expand All @@ -271,7 +289,7 @@ pub trait Context {
fn error_action(
self: Arc<Self>,
resource: Arc<Self::Resource>,
err: &kube_runtime::finalizer::Error<Self::Error>,
err: &Error<Self::Error>,
consecutive_errors: u32,
) -> Action {
// use a better name for the parameter name in the docs
Expand All @@ -290,7 +308,7 @@ pub trait Context {
client: Client,
api: Api<Self::Resource>,
resource: Arc<Self::Resource>,
) -> Result<Action, kube_runtime::finalizer::Error<Self::Error>>
) -> Result<Action, Error<Self::Error>>
where
Self: Send + Sync + 'static,
Self::Error: Send + Sync + 'static,
Expand All @@ -307,11 +325,8 @@ pub trait Context {
let dynamic_type = Default::default();
let kind = Self::Resource::kind(&dynamic_type).into_owned();
let mut ran = false;
let res = finalizer(
&api,
Self::FINALIZER_NAME,
Arc::clone(&resource),
|event| async {
let res = if let Some(finalizer_name) = Self::FINALIZER_NAME {
finalizer(&api, finalizer_name, Arc::clone(&resource), |event| async {
ran = true;
event!(
Level::INFO,
Expand Down Expand Up @@ -339,9 +354,27 @@ pub trait Context {
.unwrap_or_else(Action::await_change),
};
Ok(action)
},
)
.await;
})
.await
.map_err(Error::FinalizerError)
} else {
ran = true;
event!(
Level::INFO,
resource_name = %resource.name_unchecked().as_str(),
"Reconciling {} (apply).",
kind,
);
let action = self
.apply(client, &resource)
.await
.map_err(Error::ControllerError)?;
Ok(if let Some(action) = action {
action
} else {
self.success_action(&resource)
})
};
if !ran {
event!(
Level::INFO,
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
//! type Resource = Pod;
//! type Error = kube::Error;
//!
//! const FINALIZER_NAME: &'static str = "example.com/pod-counter";
//! const FINALIZER_NAME: Option<&'static str> = Some("example.com/pod-counter");
//!
//! async fn apply(
//! &self,
Expand Down Expand Up @@ -137,7 +137,7 @@
//! # impl k8s_controller::Context for PodCounter {
//! # type Resource = Pod;
//! # type Error = kube::Error;
//! # const FINALIZER_NAME: &'static str = "example.com/pod-counter";
//! # const FINALIZER_NAME: Option<&'static str> = Some("example.com/pod-counter");
//! # async fn apply(
//! # &self,
//! # client: Client,
Expand Down
Loading