diff --git a/src/sources/kubernetes_logs/mod.rs b/src/sources/kubernetes_logs/mod.rs index fa001972a59d7..7a02b43b05496 100644 --- a/src/sources/kubernetes_logs/mod.rs +++ b/src/sources/kubernetes_logs/mod.rs @@ -8,8 +8,11 @@ use std::{cmp::min, path::PathBuf, time::Duration}; use bytes::Bytes; use chrono::Utc; -use futures::{future::FutureExt, stream::StreamExt}; -use futures_util::Stream; +use futures_util::{ + Stream, + future::{FutureExt, ready}, + stream::StreamExt, +}; use http_1::{HeaderName, HeaderValue}; use k8s_openapi::api::core::v1::{Namespace, Node, Pod}; use k8s_paths_provider::K8sPathsProvider; @@ -21,6 +24,9 @@ use kube::{ }; use lifecycle::Lifecycle; use serde_with::serde_as; +use tokio::pin; +use tokio_stream::wrappers::BroadcastStream; + use vector_lib::{ EstimatedJsonEncodedSizeOf, TimeZone, codecs::{BytesDeserializer, BytesDeserializerConfig}, @@ -53,8 +59,7 @@ use crate::{ }, kubernetes::{custom_reflector, meta_cache::MetaCache}, shutdown::ShutdownSignal, - sources, - sources::kubernetes_logs::partial_events_merger::merge_partial_events, + sources::{self, kubernetes_logs::partial_events_merger::merge_partial_events}, transforms::{FunctionTransform, OutputBuffer}, }; @@ -65,7 +70,9 @@ mod node_metadata_annotator; mod parser; mod partial_events_merger; mod path_helpers; +mod pod_info; mod pod_metadata_annotator; +mod reconciler; mod transform_utils; mod util; @@ -278,6 +285,20 @@ pub struct Config { #[configurable(metadata(docs::type_unit = "seconds"))] #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")] rotate_wait: Duration, + + /// The strategy to use for log collection. + log_collection_strategy: LogCollectionStrategy, +} + +/// Configuration for the log collection strategy. +#[configurable_component] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +enum LogCollectionStrategy { + /// Collect logs by reading log files from the filesystem. + File, + /// Collect logs via the Kubernetes Logs API. + Api, } const fn default_read_from() -> ReadFromConfig { @@ -326,6 +347,7 @@ impl Default for Config { log_namespace: None, internal_metrics: Default::default(), rotate_wait: default_rotate_wait(), + log_collection_strategy: default_log_collection_strategy(), } } } @@ -584,6 +606,7 @@ struct Source { delay_deletion: Duration, include_file_metric_tag: bool, rotate_wait: Duration, + log_collection_strategy: LogCollectionStrategy, } impl Source { @@ -673,6 +696,7 @@ impl Source { delay_deletion, include_file_metric_tag: config.internal_metrics.include_file_tag, rotate_wait: config.rotate_wait, + log_collection_strategy: config.log_collection_strategy.clone(), }) } @@ -710,6 +734,7 @@ impl Source { delay_deletion, include_file_metric_tag, rotate_wait, + log_collection_strategy, } = self; let mut reflectors = Vec::new(); @@ -734,6 +759,40 @@ impl Source { ) .backoff(watcher::DefaultBackoff::default()); + // Create shared broadcast channel for pod events + let (pod_event_tx, _) = tokio::sync::broadcast::channel(1000); + let reflector_rx = pod_event_tx.subscribe(); + + // Spawn task to forward pod events to broadcast channel + let pod_forwarder_tx = pod_event_tx.clone(); + let pod_forwarder = tokio::spawn(async move { + pin!(pod_watcher); + while let Some(event_result) = pod_watcher.next().await { + match event_result { + Ok(event) => { + // Only broadcast successful events + if pod_forwarder_tx.send(event).is_err() { + // All receivers have been dropped + break; + } + } + Err(e) => { + warn!("Pod watcher error: {}", e); + // Continue on errors to maintain resilience + } + } + } + }); + reflectors.push(pod_forwarder); + + // Convert broadcast receiver to stream for reflector + let reflector_stream = BroadcastStream::new(reflector_rx).filter_map(|result| { + ready(match result { + Ok(event) => Some(Ok(event)), + Err(_) => None, + }) + }); + let pod_store_w = reflector::store::Writer::default(); let pod_state = pod_store_w.as_reader(); let pod_cacher = MetaCache::new(); @@ -741,7 +800,7 @@ impl Source { reflectors.push(tokio::spawn(custom_reflector( pod_store_w, pod_cacher, - pod_watcher, + reflector_stream, delay_deletion, ))); @@ -772,7 +831,7 @@ impl Source { // ----------------------------------------------------------------- - let nodes = Api::::all(client); + let nodes = Api::::all(client.clone()); let node_watcher = watcher( nodes, watcher::Config { @@ -888,8 +947,11 @@ impl Source { log_namespace, ); + // TODO: annotate the logs with pods's metadata + if log_collection_strategy == LogCollectionStrategy::Api { + return event; + } let file_info = annotator.annotate(&mut event, &line.filename); - emit!(KubernetesLogsEventsReceived { file: &line.filename, byte_size: event.estimated_json_encoded_size_of(), @@ -940,17 +1002,43 @@ impl Source { let event_processing_loop = out.send_event_stream(&mut stream); let mut lifecycle = Lifecycle::new(); - { + // Only add file server when log_collection_strategy is File + if log_collection_strategy == LogCollectionStrategy::File { let (slot, shutdown) = lifecycle.add(); - let fut = util::run_file_server(file_server, file_source_tx, shutdown, checkpointer) - .map(|result| match result { - Ok(FileServerShutdown) => info!(message = "File server completed gracefully."), + let fut = + util::run_file_server(file_server, file_source_tx.clone(), shutdown, checkpointer) + .map(|result| match result { + Ok(FileServerShutdown) => { + info!(message = "File server completed gracefully.") + } + Err(error) => emit!(KubernetesLifecycleError { + message: "File server exited with an error.", + error, + count: events_count, + }), + }); + slot.bind(Box::pin(fut)); + } + if log_collection_strategy == LogCollectionStrategy::Api { + let reconciler_rx = pod_event_tx.subscribe(); + let reconciler = + reconciler::Reconciler::new(client.clone(), file_source_tx.clone(), reconciler_rx); + let (slot, shutdown) = lifecycle.add(); + let fut = util::complete_with_deadline_on_signal( + reconciler.run(), + shutdown, + Duration::from_secs(30), // more than enough time to propagate + ) + .map(|result| { + match result { + Ok(_) => info!(message = "Reconciler completed gracefully."), Err(error) => emit!(KubernetesLifecycleError { - message: "File server exited with an error.", error, + message: "Reconciler timed out during the shutdown.", count: events_count, }), - }); + }; + }); slot.bind(Box::pin(fut)); } { @@ -1093,6 +1181,9 @@ const fn default_delay_deletion_ms() -> Duration { const fn default_rotate_wait() -> Duration { Duration::from_secs(u64::MAX / 2) } +const fn default_log_collection_strategy() -> LogCollectionStrategy { + LogCollectionStrategy::File +} // This function constructs the patterns we include for file watching, created // from the defaults or user provided configuration. diff --git a/src/sources/kubernetes_logs/parser/cri.rs b/src/sources/kubernetes_logs/parser/cri.rs index 2d9741dd46c30..795907cf88dd2 100644 --- a/src/sources/kubernetes_logs/parser/cri.rs +++ b/src/sources/kubernetes_logs/parser/cri.rs @@ -8,9 +8,7 @@ use vector_lib::{ use crate::{ event::{self, Event, Value}, - internal_events::{ - DROP_EVENT, ParserConversionError, ParserMatchError, ParserMissingFieldError, - }, + internal_events::{DROP_EVENT, ParserConversionError, ParserMissingFieldError}, sources::kubernetes_logs::{Config, transform_utils::get_message_path}, transforms::{FunctionTransform, OutputBuffer}, }; @@ -58,8 +56,9 @@ impl FunctionTransform for Cri { } Some(s) => match parse_log_line(&s) { None => { - emit!(ParserMatchError { value: &s[..] }); - return; + // TODO: fix it until `FunctionTransform` supports Api logs + // emit!(ParserMatchError { value: &s[..] }); + drop(log.insert(&message_path, Value::Bytes(s))); } Some(parsed_log) => { // For all fields except `timestamp`, simply treat them as `Value::Bytes`. For diff --git a/src/sources/kubernetes_logs/pod_info.rs b/src/sources/kubernetes_logs/pod_info.rs new file mode 100644 index 0000000000000..751bef7d19e16 --- /dev/null +++ b/src/sources/kubernetes_logs/pod_info.rs @@ -0,0 +1,45 @@ +use k8s_openapi::api::core::v1::Pod; +use serde::{Deserialize, Serialize}; + +/// Pod information struct that contains essential details for log fetching +#[derive(Clone, Debug, Serialize, Deserialize, Eq, Hash, PartialEq)] +pub struct PodInfo { + /// Pod name + pub name: String, + /// Pod namespace + pub namespace: String, + /// Pod phase (Running, Pending, etc.) + pub phase: Option, + /// Container names within the pod + pub containers: Vec, +} + +impl From<&Pod> for PodInfo { + fn from(pod: &Pod) -> Self { + let metadata = &pod.metadata; + + let name = metadata.name.as_ref().cloned().unwrap_or_default(); + + let namespace = metadata.namespace.as_ref().cloned().unwrap_or_default(); + + let phase = pod.status.as_ref().and_then(|status| status.phase.clone()); + + let containers = pod + .spec + .as_ref() + .map(|spec| { + spec.containers + .iter() + .map(|container| container.name.clone()) + .collect() + }) + .unwrap_or_default(); + + PodInfo { + name, + namespace, + phase, + containers, + } + } +} diff --git a/src/sources/kubernetes_logs/reconciler.rs b/src/sources/kubernetes_logs/reconciler.rs new file mode 100644 index 0000000000000..902cf43561c49 --- /dev/null +++ b/src/sources/kubernetes_logs/reconciler.rs @@ -0,0 +1,302 @@ +use super::pod_info::PodInfo; +use bytes::Bytes; +use chrono::{DateTime, FixedOffset, Utc}; +use futures::SinkExt; +use futures::channel::mpsc; +use futures::{AsyncBufReadExt, StreamExt}; +use futures_util::{Stream, future::ready}; +use k8s_openapi::api::core::v1::Pod; +use kube::runtime::watcher; +use kube::{Api, Client, api::LogParams}; +use std::collections::HashMap; +use std::fmt; +use std::pin::Pin; +use tokio::sync::broadcast; +use tokio_stream::wrappers::BroadcastStream; +use tracing::{info, warn}; +use vector_lib::{file_source::file_server::Line, file_source_common::FileFingerprint}; + +/// Container key for identifying unique container instances +/// Format: "{namespace}/{pod_name}/{container_name}" +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct ContainerKey(String); + +impl fmt::Display for ContainerKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From<&ContainerInfo> for ContainerKey { + fn from(container_info: &ContainerInfo) -> Self { + ContainerKey(format!( + "{}/{}/{}", + container_info.namespace, container_info.pod_name, container_info.container_name + )) + } +} + +impl From<(&PodInfo, &str)> for ContainerKey { + fn from((pod_info, container_name): (&PodInfo, &str)) -> Self { + ContainerKey(format!( + "{}/{}/{}", + pod_info.namespace, pod_info.name, container_name + )) + } +} + +/// Container information for log tailing +#[derive(Clone, Debug)] +pub struct ContainerInfo { + /// Pod name containing this container + pub pod_name: String, + /// Pod namespace + pub namespace: String, + /// Container name + pub container_name: String, +} + +/// Container log information with timestamp tracking +/// Similar to docker_logs ContainerLogInfo for position tracking +#[derive(Debug)] +struct ContainerLogInfo<'a> { + /// Container information reference + container_info: &'a ContainerInfo, + /// Timestamp of when this tracking started + created: DateTime, + /// Timestamp of last log message processed + last_log: Option>, +} + +impl<'a> ContainerLogInfo<'a> { + const fn new(container_info: &'a ContainerInfo, created: DateTime) -> Self { + Self { + container_info, + created, + last_log: None, + } + } + + fn log_since(&self) -> DateTime { + self.last_log + .map(|dt| dt.with_timezone(&Utc)) + .unwrap_or(self.created) + } +} + +pub struct Reconciler { + esb: EventStreamBuilder, + states: HashMap, // Keyed by ContainerKey + pod_watcher: Pin> + Send>>, +} + +impl Reconciler { + pub fn new( + client: Client, + line_sender: mpsc::Sender>, + pod_receiver: broadcast::Receiver>, + ) -> Self { + let esb = EventStreamBuilder { + client: client.clone(), + line_sender, + }; + + let pod_stream = BroadcastStream::new(pod_receiver).filter_map(|event| ready(event.ok())); + + Self { + esb, + states: HashMap::new(), + pod_watcher: Box::pin(pod_stream), + } + } + + pub async fn run(mut self) { + info!("Starting reconciler with pod watcher integration"); + + // Listen to pod watcher events for real-time reconciliation + while let Some(event) = self.pod_watcher.next().await { + match event { + watcher::Event::Delete(pod) => { + let pod_info = PodInfo::from(&pod); + info!("Pod '{}' deleted, cleaning up log tailers", pod_info.name); + self.cleanup_pod_tailers(&pod_info).await; + } + watcher::Event::InitApply(pod) | watcher::Event::Apply(pod) => { + let pod_info = PodInfo::from(&pod); + if let Some(phase) = &pod_info.phase + && phase == "Running" + { + info!( + "Pod '{}' is running, starting log reconciliation", + pod_info.name + ); + if let Err(e) = self.reconcile_pod_containers(&pod_info).await { + warn!("Failed to reconcile pod '{}': {}", pod_info.name, e); + } + } + } + _ => {} + } + } + + info!("Reconciler pod watcher stream ended"); + } + + /// Reconcile containers for a specific pod + async fn reconcile_pod_containers(&mut self, pod_info: &PodInfo) -> crate::Result<()> { + for container_name in &pod_info.containers { + let container_info = ContainerInfo { + pod_name: pod_info.name.clone(), + namespace: pod_info.namespace.clone(), + container_name: container_name.clone(), + }; + + let key = ContainerKey::from(&container_info); + + // Only start tailer if not already running + if !self.states.contains_key(&key) { + info!( + "Starting tailer for container '{}' in pod '{}' (namespace '{}')", + container_info.container_name, + container_info.pod_name, + container_info.namespace + ); + + self.states.insert(key, self.esb.start(container_info)); + } + } + Ok(()) + } + + /// Clean up tailers for a deleted pod + async fn cleanup_pod_tailers(&mut self, pod_info: &PodInfo) { + for container_name in &pod_info.containers { + let key = ContainerKey::from((pod_info, container_name.as_str())); + + if self.states.remove(&key).is_some() { + info!( + "Cleaned up tailer for container '{}' in deleted pod '{}'", + container_name, pod_info.name + ); + } + } + } +} + +#[derive(Clone)] +struct EventStreamBuilder { + client: Client, + line_sender: mpsc::Sender>, +} + +#[derive(Clone)] +enum TailerState { + Running, +} + +impl EventStreamBuilder { + pub fn start(&self, container_info: ContainerInfo) -> TailerState { + let this = self.clone(); + tokio::spawn(async move { + let log_info = ContainerLogInfo::new(&container_info, Utc::now()); + this.run_event_stream(log_info).await; + }); + TailerState::Running + } + + pub async fn run_event_stream(mut self, log_info: ContainerLogInfo<'_>) { + let pods: Api = + Api::namespaced(self.client.clone(), &log_info.container_info.namespace); + + info!( + "Starting streaming log tail for container '{}' in pod '{}' (namespace '{}') from timestamp {}", + log_info.container_info.container_name, + log_info.container_info.pod_name, + log_info.container_info.namespace, + log_info.log_since() + ); + + let log_params = LogParams { + container: Some(log_info.container_info.container_name.clone()), + follow: true, + since_time: Some(log_info.log_since()), + timestamps: true, + ..Default::default() + }; + + match pods + .log_stream(&log_info.container_info.pod_name, &log_params) + .await + { + Ok(log_stream) => { + info!( + "Started streaming logs from container '{}' in pod '{}'", + log_info.container_info.container_name, log_info.container_info.pod_name + ); + + let mut buffer = Vec::new(); + let mut log_stream = log_stream; + + // Process the stream by reading line by line + loop { + match log_stream.read_until(b'\n', &mut buffer).await { + Ok(0) => break, // EOF + Ok(_) => { + // Remove trailing newline if present + if buffer.ends_with(b"\n") { + buffer.pop(); + } + // Remove trailing carriage return if present (for CRLF) + if buffer.ends_with(b"\r") { + buffer.pop(); + } + + let line_bytes = Bytes::from(std::mem::take(&mut buffer)); + + // TODO: track last log timestamp + + let text_len = line_bytes.len() as u64; + let line = Line { + text: line_bytes, + filename: String::new(), // Filename is not applicable for k8s logs + file_id: FileFingerprint::FirstLinesChecksum(0), + start_offset: 0, + end_offset: text_len, + }; + + // TODO: Send batches of lines instead of one by one + if self.line_sender.send(vec![line]).await.is_err() { + warn!( + "Line channel closed for container '{}' in pod '{}', stopping stream", + log_info.container_info.container_name, + log_info.container_info.pod_name + ); + break; + } + } + Err(e) => { + warn!( + "Error reading from log stream for container '{}' in pod '{}': {}", + log_info.container_info.container_name, + log_info.container_info.pod_name, + e + ); + break; + } + } + } + } + Err(e) => { + warn!( + "Failed to start log stream for container '{}' in pod '{}': {}", + log_info.container_info.container_name, log_info.container_info.pod_name, e + ); + } + } + + info!( + "Completed streaming log tail for container '{}' in pod '{}'", + log_info.container_info.container_name, log_info.container_info.pod_name + ); + } +}