diff --git a/Cargo.toml b/Cargo.toml index 07a8618..5721fdd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "k8s-controller" -version = "0.7.0" +version = "0.8.0" edition = "2024" rust-version = "1.89.0" @@ -17,6 +17,7 @@ kube = { version = "2.0.1", default-features = false, features = ["client"] } kube-runtime = "2.0.1" rand = "0.9.2" serde = "1" +thiserror = "2.0.17" tracing = "0.1" [dev-dependencies] diff --git a/src/controller.rs b/src/controller.rs index 64d8431..890b43b 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,5 +1,5 @@ use std::collections::BTreeMap; -use std::error::Error; +use std::error::Error as _; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -14,6 +14,14 @@ use kube_runtime::watcher; use rand::{Rng, rng}; use tracing::{Level, event}; +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("{0}")] + ControllerError(#[source] E), + #[error("{0}")] + FinalizerError(#[source] kube_runtime::finalizer::Error), +} + /// The [`Controller`] watches a set of resources, calling methods on the /// provided [`Context`] when events occur. pub struct Controller @@ -36,7 +44,6 @@ impl Controller where Ctx: Send + Sync + 'static, Ctx::Error: Send + Sync + 'static, - Ctx::Resource: Send + Sync + 'static, Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize, for<'de> Ctx::Resource: serde::Deserialize<'de>, ::DynamicType: @@ -213,15 +220,18 @@ where pub trait Context { /// The type of Kubernetes [resource](Resource) that will be watched by /// the [`Controller`] this context is passed to - type Resource: Resource; + type Resource: Resource + Send + Sync + 'static; /// The error type which will be returned by the [`apply`](Self::apply) - /// and [`cleanup`](Self::apply) methods + /// and [`cleanup`](Self::cleanup) methods type Error: std::error::Error; /// The name to use for the finalizer. This must be unique across /// controllers - if multiple controllers with the same finalizer name /// run against the same resource, unexpected behavior can occur. - const FINALIZER_NAME: &'static str; + /// + /// If this is None (the default), a finalizer will not be used, and + /// cleanup events will not be reported. + const FINALIZER_NAME: Option<&'static str> = None; /// This method is called when a watched resource is created or updated. /// The [`Client`] used by the controller is passed in to allow making @@ -243,11 +253,19 @@ pub trait Context { /// be performed, otherwise if `None` is returned, /// [`success_action`](Self::success_action) will be called to find the /// action to perform. + /// + /// Note that this method will only be called if a finalizer is used. async fn cleanup( &self, client: Client, resource: &Self::Resource, - ) -> Result, Self::Error>; + ) -> Result, Self::Error> { + // use a better name for the parameter name in the docs + let _client = client; + let _resource = resource; + + Ok(Some(Action::await_change())) + } /// This method is called when a call to [`apply`](Self::apply) or /// [`cleanup`](Self::cleanup) returns `Ok(None)`. It should return the @@ -271,7 +289,7 @@ pub trait Context { fn error_action( self: Arc, resource: Arc, - err: &kube_runtime::finalizer::Error, + err: &Error, consecutive_errors: u32, ) -> Action { // use a better name for the parameter name in the docs @@ -290,7 +308,7 @@ pub trait Context { client: Client, api: Api, resource: Arc, - ) -> Result> + ) -> Result> where Self: Send + Sync + 'static, Self::Error: Send + Sync + 'static, @@ -307,11 +325,8 @@ pub trait Context { let dynamic_type = Default::default(); let kind = Self::Resource::kind(&dynamic_type).into_owned(); let mut ran = false; - let res = finalizer( - &api, - Self::FINALIZER_NAME, - Arc::clone(&resource), - |event| async { + let res = if let Some(finalizer_name) = Self::FINALIZER_NAME { + finalizer(&api, finalizer_name, Arc::clone(&resource), |event| async { ran = true; event!( Level::INFO, @@ -339,9 +354,27 @@ pub trait Context { .unwrap_or_else(Action::await_change), }; Ok(action) - }, - ) - .await; + }) + .await + .map_err(Error::FinalizerError) + } else { + ran = true; + event!( + Level::INFO, + resource_name = %resource.name_unchecked().as_str(), + "Reconciling {} (apply).", + kind, + ); + let action = self + .apply(client, &resource) + .await + .map_err(Error::ControllerError)?; + Ok(if let Some(action) = action { + action + } else { + self.success_action(&resource) + }) + }; if !ran { event!( Level::INFO, diff --git a/src/lib.rs b/src/lib.rs index 9d5952d..2e1a5df 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,7 +89,7 @@ //! type Resource = Pod; //! type Error = kube::Error; //! -//! const FINALIZER_NAME: &'static str = "example.com/pod-counter"; +//! const FINALIZER_NAME: Option<&'static str> = Some("example.com/pod-counter"); //! //! async fn apply( //! &self, @@ -137,7 +137,7 @@ //! # impl k8s_controller::Context for PodCounter { //! # type Resource = Pod; //! # type Error = kube::Error; -//! # const FINALIZER_NAME: &'static str = "example.com/pod-counter"; +//! # const FINALIZER_NAME: Option<&'static str> = Some("example.com/pod-counter"); //! # async fn apply( //! # &self, //! # client: Client,