diff --git a/doc/user/content/sql/system-catalog/mz_internal.md b/doc/user/content/sql/system-catalog/mz_internal.md index fbd50bba42cb9..01c2914525c12 100644 --- a/doc/user/content/sql/system-catalog/mz_internal.md +++ b/doc/user/content/sql/system-catalog/mz_internal.md @@ -162,15 +162,13 @@ for all processes of all extant cluster replicas. At this time, we do not make any guarantees about the exactness or freshness of these numbers. -| Field | Type | Meaning -| ------------------- | ------------ | -------- -| `replica_id` | [`text`] | The ID of a cluster replica. -| `process_id` | [`uint8`] | The ID of a process within the replica. -| `cpu_nano_cores` | [`uint8`] | Approximate CPU usage, in billionths of a vCPU core. -| `memory_bytes` | [`uint8`] | Approximate RAM usage, in bytes. -| `disk_bytes` | [`uint8`] | Approximate disk usage, in bytes. -| `heap_bytes` | [`uint8`] | Approximate heap (RAM + swap) usage, in bytes. -| `heap_limit` | [`uint8`] | Available heap (RAM + swap) space, in bytes. +| Field | Type | Meaning | +| ------------------- | ------------ | -------- | +| `replica_id` | [`text`] | The ID of a cluster replica. | +| `process_id` | [`uint8`] | The ID of a process within the replica. | +| `cpu_nano_cores` | [`uint8`] | Approximate CPU usage, in billionths of a vCPU core. | +| `memory_bytes` | [`uint8`] | Approximate RAM usage, in bytes. | +| `disk_bytes` | [`uint8`] | Approximate disk usage in bytes. | ## `mz_cluster_replica_metrics_history` @@ -185,12 +183,10 @@ At this time, we do not make any guarantees about the exactness or freshness of | ---------------- | --------- | -------- | `replica_id` | [`text`] | The ID of a cluster replica. | `process_id` | [`uint8`] | The ID of a process within the replica. -| `cpu_nano_cores` | [`uint8`] | Approximate CPU usage, in billionths of a vCPU core. -| `memory_bytes` | [`uint8`] | Approximate memory usage, in bytes. -| `disk_bytes` | [`uint8`] | Approximate disk usage, in bytes. +| `cpu_nano_cores` | [`uint8`] | Approximate CPU usage in billionths of a vCPU core. +| `memory_bytes` | [`uint8`] | Approximate memory usage in bytes. +| `disk_bytes` | [`uint8`] | Approximate disk usage in bytes. | `occurred_at` | [`timestamp with time zone`] | Wall-clock timestamp at which the event occurred. -| `heap_bytes` | [`uint8`] | Approximate heap (RAM + swap) usage, in bytes. -| `heap_limit` | [`uint8`] | Available heap (RAM + swap) space, in bytes. ## `mz_cluster_replica_statuses` @@ -229,14 +225,13 @@ for all processes of all extant cluster replicas, as a percentage of the total r At this time, we do not make any guarantees about the exactness or freshness of these numbers. -| Field | Type | Meaning -|------------------|----------------------|--------- -| `replica_id` | [`text`] | The ID of a cluster replica. -| `process_id` | [`uint8`] | The ID of a process within the replica. -| `cpu_percent` | [`double precision`] | Approximate CPU usage, in percent of the total allocation. -| `memory_percent` | [`double precision`] | Approximate RAM usage, in percent of the total allocation. -| `disk_percent` | [`double precision`] | Approximate disk usage, in percent of the total allocation. -| `heap_percent` | [`double precision`] | Approximate heap (RAM + swap) usage, in percent of the total allocation. +| Field | Type | Meaning | +|------------------|----------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `replica_id` | [`text`] | The ID of a cluster replica. | +| `process_id` | [`uint8`] | The ID of a process within the replica. | +| `cpu_percent` | [`double precision`] | Approximate CPU usage in percent of the total allocation. | +| `memory_percent` | [`double precision`] | Approximate RAM usage in percent of the total allocation. | +| `disk_percent` | [`double precision`] | Approximate disk usage in percent of the total allocation. | ## `mz_cluster_replica_utilization_history` @@ -251,10 +246,9 @@ At this time, we do not make any guarantees about the exactness or freshness of |------------------|----------------------|-------- | `replica_id` | [`text`] | The ID of a cluster replica. | `process_id` | [`uint8`] | The ID of a process within the replica. -| `cpu_percent` | [`double precision`] | Approximate CPU usage, in percent of the total allocation. -| `memory_percent` | [`double precision`] | Approximate RAM usage, in percent of the total allocation. -| `disk_percent` | [`double precision`] | Approximate disk usage, in percent of the total allocation. -| `heap_percent` | [`double precision`] | Approximate heap (RAM + swap) usage, in percent of the total allocation. +| `cpu_percent` | [`double precision`] | Approximate CPU usage in percent of the total allocation. +| `memory_percent` | [`double precision`] | Approximate RAM usage in percent of the total allocation. +| `disk_percent` | [`double precision`] | Approximate disk usage in percent of the total allocation. | `occurred_at` | [`timestamp with time zone`] | Wall-clock timestamp at which the event occurred. ## `mz_cluster_replica_history` diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index 5db103a0aec72..9ce8f524845c3 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -4893,19 +4893,14 @@ pub static MZ_CLUSTER_REPLICA_METRICS_HISTORY: LazyLock = ("process_id", "The ID of a process within the replica."), ( "cpu_nano_cores", - "Approximate CPU usage, in billionths of a vCPU core.", + "Approximate CPU usage in billionths of a vCPU core.", ), - ("memory_bytes", "Approximate memory usage, in bytes."), - ("disk_bytes", "Approximate disk usage, in bytes."), + ("memory_bytes", "Approximate memory usage in bytes."), + ("disk_bytes", "Approximate disk usage in bytes."), ( "occurred_at", "Wall-clock timestamp at which the event occurred.", ), - ( - "heap_bytes", - "Approximate heap (RAM + swap) usage, in bytes.", - ), - ("heap_limit", "Available heap (RAM + swap) space, in bytes."), ]), is_retained_metrics_object: false, access: vec![PUBLIC_SELECT], @@ -4939,8 +4934,6 @@ pub static MZ_CLUSTER_REPLICA_METRICS: LazyLock = LazyLock::new(|| .with_column("cpu_nano_cores", SqlScalarType::UInt64.nullable(true)) .with_column("memory_bytes", SqlScalarType::UInt64.nullable(true)) .with_column("disk_bytes", SqlScalarType::UInt64.nullable(true)) - .with_column("heap_bytes", SqlScalarType::UInt64.nullable(true)) - .with_column("heap_limit", SqlScalarType::UInt64.nullable(true)) .with_key(vec![0, 1]) .finish(), column_comments: BTreeMap::from_iter([ @@ -4951,12 +4944,7 @@ pub static MZ_CLUSTER_REPLICA_METRICS: LazyLock = LazyLock::new(|| "Approximate CPU usage, in billionths of a vCPU core.", ), ("memory_bytes", "Approximate RAM usage, in bytes."), - ("disk_bytes", "Approximate disk usage, in bytes."), - ( - "heap_bytes", - "Approximate heap (RAM + swap) usage, in bytes.", - ), - ("heap_limit", "Available heap (RAM + swap) space, in bytes."), + ("disk_bytes", "Approximate disk usage in bytes."), ]), sql: " SELECT @@ -4965,9 +4953,7 @@ SELECT process_id, cpu_nano_cores, memory_bytes, - disk_bytes, - heap_bytes, - heap_limit + disk_bytes FROM mz_internal.mz_cluster_replica_metrics_history JOIN mz_cluster_replicas r ON r.id = replica_id ORDER BY replica_id, process_id, occurred_at DESC", @@ -8866,26 +8852,21 @@ pub static MZ_CLUSTER_REPLICA_UTILIZATION: LazyLock = LazyLock::new .with_column("cpu_percent", SqlScalarType::Float64.nullable(true)) .with_column("memory_percent", SqlScalarType::Float64.nullable(true)) .with_column("disk_percent", SqlScalarType::Float64.nullable(true)) - .with_column("heap_percent", SqlScalarType::Float64.nullable(true)) .finish(), column_comments: BTreeMap::from_iter([ ("replica_id", "The ID of a cluster replica."), ("process_id", "The ID of a process within the replica."), ( "cpu_percent", - "Approximate CPU usage, in percent of the total allocation.", + "Approximate CPU usage in percent of the total allocation.", ), ( "memory_percent", - "Approximate RAM usage, in percent of the total allocation.", + "Approximate RAM usage in percent of the total allocation.", ), ( "disk_percent", - "Approximate disk usage, in percent of the total allocation.", - ), - ( - "heap_percent", - "Approximate heap (RAM + swap) usage, in percent of the total allocation.", + "Approximate disk usage in percent of the total allocation.", ), ]), sql: " @@ -8894,8 +8875,7 @@ SELECT m.process_id, m.cpu_nano_cores::float8 / NULLIF(s.cpu_nano_cores, 0) * 100 AS cpu_percent, m.memory_bytes::float8 / NULLIF(s.memory_bytes, 0) * 100 AS memory_percent, - m.disk_bytes::float8 / NULLIF(s.disk_bytes, 0) * 100 AS disk_percent, - m.heap_bytes::float8 / NULLIF(m.heap_limit, 0) * 100 AS heap_percent + m.disk_bytes::float8 / NULLIF(s.disk_bytes, 0) * 100 AS disk_percent FROM mz_catalog.mz_cluster_replicas AS r JOIN mz_catalog.mz_cluster_replica_sizes AS s ON r.size = s.size @@ -8914,7 +8894,6 @@ pub static MZ_CLUSTER_REPLICA_UTILIZATION_HISTORY: LazyLock = .with_column("cpu_percent", SqlScalarType::Float64.nullable(true)) .with_column("memory_percent", SqlScalarType::Float64.nullable(true)) .with_column("disk_percent", SqlScalarType::Float64.nullable(true)) - .with_column("heap_percent", SqlScalarType::Float64.nullable(true)) .with_column( "occurred_at", SqlScalarType::TimestampTz { precision: None }.nullable(false), @@ -8925,19 +8904,15 @@ pub static MZ_CLUSTER_REPLICA_UTILIZATION_HISTORY: LazyLock = ("process_id", "The ID of a process within the replica."), ( "cpu_percent", - "Approximate CPU usage, in percent of the total allocation.", + "Approximate CPU usage in percent of the total allocation.", ), ( "memory_percent", - "Approximate RAM usage, in percent of the total allocation.", + "Approximate RAM usage in percent of the total allocation.", ), ( "disk_percent", - "Approximate disk usage, in percent of the total allocation.", - ), - ( - "heap_percent", - "Approximate heap (RAM + swap) usage, in percent of the total allocation.", + "Approximate disk usage in percent of the total allocation.", ), ( "occurred_at", @@ -8951,7 +8926,6 @@ SELECT m.cpu_nano_cores::float8 / NULLIF(s.cpu_nano_cores, 0) * 100 AS cpu_percent, m.memory_bytes::float8 / NULLIF(s.memory_bytes, 0) * 100 AS memory_percent, m.disk_bytes::float8 / NULLIF(s.disk_bytes, 0) * 100 AS disk_percent, - m.heap_bytes::float8 / NULLIF(m.heap_limit, 0) * 100 AS heap_percent, m.occurred_at FROM mz_catalog.mz_cluster_replicas AS r diff --git a/src/clusterd/src/usage_metrics.rs b/src/clusterd/src/usage_metrics.rs index 2e77c4ec75be2..efaeb1cb41232 100644 --- a/src/clusterd/src/usage_metrics.rs +++ b/src/clusterd/src/usage_metrics.rs @@ -15,7 +15,7 @@ use std::path::PathBuf; use serde::Serialize; -use tracing::{debug, error}; +use tracing::error; /// A system usage metrics collector. pub(crate) struct Collector { @@ -25,15 +25,9 @@ pub(crate) struct Collector { impl Collector { /// Collect current system usage metrics. pub fn collect(&self) -> Usage { - let disk_bytes = self.collect_disk_usage(); - let (memory_bytes, swap_bytes) = collect_heap_usage(); - let heap_limit = collect_heap_limit(); - Usage { - disk_bytes, - memory_bytes, - swap_bytes, - heap_limit, + disk_bytes: self.collect_disk_usage(), + swap_bytes: self.collect_swap_usage(), } } @@ -55,176 +49,35 @@ impl Collector { let used_blocks = u64::from(stat.blocks() - stat.blocks_available()); let used_bytes = used_blocks * stat.fragment_size(); - debug!("disk usage: {used_bytes}"); - Some(used_bytes) } -} - -/// A system usage measurement. -#[derive(Serialize)] -pub(crate) struct Usage { - disk_bytes: Option, - memory_bytes: Option, - swap_bytes: Option, - heap_limit: Option, -} - -#[cfg(target_os = "linux")] -mod linux { - use std::fs; - use std::path::Path; - use anyhow::{anyhow, bail}; - use mz_compute::memory_limiter; - use mz_ore::cast::CastInto; - use tracing::{debug, error}; - - /// Collect memory and swap usage. - pub fn collect_heap_usage() -> (Option, Option) { + #[cfg(target_os = "linux")] + fn collect_swap_usage(&self) -> Option { + use mz_compute::memory_limiter::ProcStatus; use mz_ore::cast::CastInto; - match memory_limiter::ProcStatus::from_proc() { + match ProcStatus::from_proc() { Ok(status) => { - let memory_bytes = status.vm_rss.cast_into(); - let swap_bytes = status.vm_swap.cast_into(); - - debug!("memory usage: {memory_bytes}"); - debug!("swap usage: {swap_bytes}"); - - (Some(memory_bytes), Some(swap_bytes)) + let bytes = status.vm_swap.cast_into(); + Some(bytes) } Err(err) => { error!("error reading /proc/self/status: {err}"); - (None, None) + None } } } - /// Collect the heap limit, i.e. memory + swap limit. - pub fn collect_heap_limit() -> Option { - // If we don't know the physical limits, we can't know the heap limit. - let (phys_mem_limit, phys_swap_limit) = get_physical_limits()?; - - // Limits might be reduced by the cgroup. - let (cgroup_mem_limit, cgroup_swap_limit) = get_cgroup_limits(); - let mem_limit = cgroup_mem_limit.unwrap_or(u64::MAX).min(phys_mem_limit); - let swap_limit = cgroup_swap_limit.unwrap_or(u64::MAX).min(phys_swap_limit); - - let heap_limit = mem_limit + swap_limit; - - // Heap limit might be reduced by the memory limiter. - let limiter_limit = memory_limiter::get_memory_limit().map(CastInto::cast_into); - let heap_limit = limiter_limit.unwrap_or(u64::MAX).min(heap_limit); - - debug!("memory limit: {mem_limit} (phys={phys_mem_limit}, cgroup={cgroup_mem_limit:?})"); - debug!("swap limit: {swap_limit} (phys={phys_swap_limit}, cgroup={cgroup_swap_limit:?})"); - debug!("heap limit: {heap_limit} (limiter={limiter_limit:?})"); - - Some(heap_limit) - } - - /// Helper for parsing `/proc/meminfo`. - struct ProcMemInfo { - mem_total: u64, - swap_total: u64, - } - - impl ProcMemInfo { - fn from_proc() -> anyhow::Result { - let contents = fs::read_to_string("/proc/meminfo")?; - - fn parse_kib_line(line: &str) -> anyhow::Result { - if let Some(kib) = line - .split_whitespace() - .nth(1) - .and_then(|x| x.parse::().ok()) - { - Ok(kib * 1024) - } else { - bail!("invalid meminfo line: {line}"); - } - } - - let mut memory = None; - let mut swap = None; - for line in contents.lines() { - if line.starts_with("MemTotal:") { - memory = Some(parse_kib_line(line)?); - } else if line.starts_with("SwapTotal:") { - swap = Some(parse_kib_line(line)?); - } - } - - let mem_total = memory.ok_or_else(|| anyhow!("MemTotal not found"))?; - let swap_total = swap.ok_or_else(|| anyhow!("SwapTotal not found"))?; - - Ok(Self { - mem_total, - swap_total, - }) - } - } - - /// Collect the physical memory and swap limits. - fn get_physical_limits() -> Option<(u64, u64)> { - let meminfo = match ProcMemInfo::from_proc() { - Ok(meminfo) => meminfo, - Err(error) => { - error!("reading `/proc/meminfo`: {error}"); - return None; - } - }; - - Some((meminfo.mem_total, meminfo.swap_total)) - } - - /// Collect the memory and swap limits enforced by the current cgroup. - /// - /// We make the following simplifying assumptions that hold for a standard Kubernetes - /// environment: - // * The current process is a member of exactly one cgroups v2 hierarchy. - // * The cgroups hierarchy is mounted at `/sys/fs/cgroup`. - // * The limits are applied to the current cgroup directly (and not one of its ancestors). - fn get_cgroup_limits() -> (Option, Option) { - let Ok(proc_cgroup) = fs::read_to_string("/proc/self/cgroup") else { - return (None, None); - }; - let Some(cgroup_path) = proc_cgroup.split(':').nth(2) else { - error!("invalid `/proc/self/cgroup` format: {proc_cgroup}"); - return (None, None); - }; - - let root = Path::new("/sys/fs/cgroup").join(cgroup_path); - let memory_file = root.join("memory.max"); - let swap_file = root.join("memory.swap.max"); - - let memory = fs::read_to_string(memory_file) - .ok() - .and_then(|s| s.parse().ok()); - let swap = fs::read_to_string(swap_file) - .ok() - .and_then(|s| s.parse().ok()); - - (memory, swap) + #[cfg(not(target_os = "linux"))] + fn collect_swap_usage(&self) -> Option { + None } } -#[cfg(not(target_os = "linux"))] -mod macos { - use mz_compute::memory_limiter; - use mz_ore::cast::CastInto; - - pub fn collect_heap_usage() -> (Option, Option) { - (None, None) - } - - pub fn collect_heap_limit() -> Option { - memory_limiter::get_memory_limit().map(CastInto::cast_into) - } +/// A system usage measurement. +#[derive(Serialize)] +pub(crate) struct Usage { + disk_bytes: Option, + swap_bytes: Option, } - -#[cfg(target_os = "linux")] -use linux::*; -#[cfg(not(target_os = "linux"))] -use macos::*; diff --git a/src/compute/src/memory_limiter.rs b/src/compute/src/memory_limiter.rs index 30d3fae6d9a7c..35a26e6dcb5b4 100644 --- a/src/compute/src/memory_limiter.rs +++ b/src/compute/src/memory_limiter.rs @@ -51,8 +51,7 @@ pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) { mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics)); *limiter = Some(Limiter { - base_memory_limit: memory_limit, - effective_memory_limit: memory_limit, + memory_limit, config_tx, }); } @@ -64,25 +63,17 @@ pub fn apply_limiter_config(config: &ConfigSet) { } } -/// Get the current effective memory limit. -pub fn get_memory_limit() -> Option { - let limiter = LIMITER.lock().expect("poisoned"); - limiter.as_ref().map(|l| l.effective_memory_limit) -} - /// A handle to a running memory limiter task. struct Limiter { - /// The base process memory limit. - base_memory_limit: usize, - /// The effective memory limit, obtained by applying dyncfgs to the base limit. - effective_memory_limit: usize, + /// The process memory limit. + memory_limit: usize, /// A sender for limiter configuration updates. config_tx: UnboundedSender, } impl Limiter { /// Apply the given configuration to the limiter. - fn apply_config(&mut self, config: &ConfigSet) { + fn apply_config(&self, config: &ConfigSet) { let mut interval = MEMORY_LIMITER_INTERVAL.get(config); // A zero duration means the limiter is disabled. Translate that into an ~infinite duration // so the limiter doesn't have to worry about the special case. @@ -91,14 +82,12 @@ impl Limiter { } let memory_limit = - f64::cast_lossy(self.base_memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config); + f64::cast_lossy(self.memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config); let memory_limit = usize::cast_lossy(memory_limit); let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config); let burst_budget = usize::cast_lossy(burst_budget); - self.effective_memory_limit = memory_limit; - self.config_tx .send(LimiterConfig { interval, diff --git a/src/controller/src/lib.rs b/src/controller/src/lib.rs index 499f2cf3cdfbf..af3e80137d2de 100644 --- a/src/controller/src/lib.rs +++ b/src/controller/src/lib.rs @@ -591,10 +591,8 @@ where Datum::UInt64(u64::cast_from(process_id)), m.cpu_nano_cores.into(), m.memory_bytes.into(), - m.disk_bytes.into(), + m.disk_usage_bytes.into(), Datum::TimestampTz(now_tz), - m.heap_bytes.into(), - m.heap_limit.into(), ]); (row.clone(), mz_repr::Diff::ONE) }) diff --git a/src/orchestrator-kubernetes/src/lib.rs b/src/orchestrator-kubernetes/src/lib.rs index 0670a1d1fa363..71bb5d1edf1d7 100644 --- a/src/orchestrator-kubernetes/src/lib.rs +++ b/src/orchestrator-kubernetes/src/lib.rs @@ -54,7 +54,7 @@ use mz_ore::task::AbortOnDropHandle; use serde::Deserialize; use sha2::{Digest, Sha256}; use tokio::sync::{mpsc, oneshot}; -use tracing::{error, info, warn}; +use tracing::{info, warn}; pub mod cloud_resource_controller; pub mod secrets; @@ -1423,15 +1423,6 @@ impl OrchestratorWorker { .collect(); } - /// Usage metrics reported by clusterd processes. - #[derive(Deserialize)] - pub(crate) struct ClusterdUsage { - disk_bytes: Option, - memory_bytes: Option, - swap_bytes: Option, - heap_limit: Option, - } - /// Get metrics for a particular service and process, converting them into a sane (i.e., numeric) format. /// /// Note that we want to keep going even if a lookup fails for whatever reason, @@ -1444,13 +1435,12 @@ impl OrchestratorWorker { ) -> ServiceProcessMetrics { let name = format!("{service_name}-{i}"); - let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i); - let (metrics, clusterd_usage) = - match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await - { - (Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)), + let disk_usage_fut = get_disk_usage(self_, service_name, i); + let (metrics, disk_usage) = + match futures::future::join(self_.metrics_api.get(&name), disk_usage_fut).await { + (Ok(metrics), Ok(disk_usage)) => (metrics, disk_usage), (Ok(metrics), Err(e)) => { - warn!("Failed to fetch clusterd usage for {name}: {e}"); + warn!("Failed to fetch disk usage for {name}: {e}"); (metrics, None) } (Err(e), _) => { @@ -1471,58 +1461,56 @@ impl OrchestratorWorker { return ServiceProcessMetrics::default(); }; - let mut process_metrics = ServiceProcessMetrics::default(); - - match parse_k8s_quantity(cpu_str) { + let cpu = match parse_k8s_quantity(cpu_str) { Ok(q) => match q.try_to_integer(-9, true) { - Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores), - None => error!("CPU value {q:?} out of range"), + Some(i) => Some(i), + None => { + tracing::error!("CPU value {q:? }out of range"); + None + } }, - Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"), - } - match parse_k8s_quantity(mem_str) { + Err(e) => { + tracing::error!("Failed to parse CPU value {cpu_str}: {e}"); + None + } + }; + let memory = match parse_k8s_quantity(mem_str) { Ok(q) => match q.try_to_integer(0, false) { - Some(mem) => process_metrics.memory_bytes = Some(mem), - None => error!("memory value {q:?} out of range"), + Some(i) => Some(i), + None => { + tracing::error!("Memory value {q:?} out of range"); + None + } }, - Err(e) => error!("failed to parse memory value {mem_str}: {e}"), - } - - if let Some(usage) = clusterd_usage { - // clusterd may report disk usage as either `disk_bytes`, or `swap_bytes`, or both. - // - // For now the Console expects the swap size to be reported in `disk_bytes`. - // Once the Console has been ported to use `heap_bytes`/`heap_limit`, we can - // simplify things by setting `process_metrics.disk_bytes = usage.disk_bytes`. - process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) { - (Some(disk), Some(swap)) => Some(disk + swap), - (disk, swap) => disk.or(swap), - }; - - // clusterd may report heap usage as `memory_bytes` and optionally `swap_bytes`. - // If no `memory_bytes` is reported, we can't know the heap usage. - process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) { - (Some(memory), Some(swap)) => Some(memory + swap), - (Some(memory), None) => Some(memory), - (None, _) => None, - }; + Err(e) => { + tracing::error!("Failed to parse memory value {mem_str}: {e}"); + None + } + }; - process_metrics.heap_limit = usage.heap_limit; + ServiceProcessMetrics { + cpu_nano_cores: cpu, + memory_bytes: memory, + disk_usage_bytes: disk_usage, } - - process_metrics } - /// Get the current usage metrics exposed by a clusterd process. + /// Get the current disk usage for a particular service and process. /// - /// Usage metrics are collected by connecting to a metrics endpoint exposed by the process. - /// The endpoint is assumed to be reachable at the 'internal-http' under the HTTP path + /// Disk usage is collected by connecting to a metrics endpoint exposed by the process. The + /// endpoint is assumed to be reachable at the 'internal-http' under the HTTP path /// `/api/usage-metrics`. - async fn get_clusterd_usage( + async fn get_disk_usage( self_: &OrchestratorWorker, service_name: &str, i: usize, - ) -> anyhow::Result { + ) -> anyhow::Result> { + #[derive(Deserialize)] + pub(crate) struct Usage { + disk_bytes: Option, + swap_bytes: Option, + } + let service = self_ .service_api .get(service_name) @@ -1554,9 +1542,17 @@ impl OrchestratorWorker { .build() .context("error building HTTP client")?; let resp = http_client.get(metrics_url).send().await?; - let usage = resp.json().await?; + let Usage { + disk_bytes, + swap_bytes, + } = resp.json().await?; - Ok(usage) + let bytes = if let (Some(disk), Some(swap)) = (disk_bytes, swap_bytes) { + Some(disk + swap) + } else { + disk_bytes.or(swap_bytes) + }; + Ok(bytes) } let ret = diff --git a/src/orchestrator-process/src/lib.rs b/src/orchestrator-process/src/lib.rs index 9f7c8613c725a..b24cc041f85ff 100644 --- a/src/orchestrator-process/src/lib.rs +++ b/src/orchestrator-process/src/lib.rs @@ -553,10 +553,8 @@ impl OrchestratorWorker { metrics.push(ServiceProcessMetrics { cpu_nano_cores, memory_bytes, - // Process orchestrator does not support the remaining fields right now. - disk_bytes: None, - heap_bytes: None, - heap_limit: None, + // Process orchestrator does not support this right now. + disk_usage_bytes: None, }); } Ok(metrics) diff --git a/src/orchestrator/src/lib.rs b/src/orchestrator/src/lib.rs index 2879e82b64c4a..3645c9f591c08 100644 --- a/src/orchestrator/src/lib.rs +++ b/src/orchestrator/src/lib.rs @@ -145,9 +145,7 @@ pub trait Service: fmt::Debug + Send + Sync { pub struct ServiceProcessMetrics { pub cpu_nano_cores: Option, pub memory_bytes: Option, - pub disk_bytes: Option, - pub heap_bytes: Option, - pub heap_limit: Option, + pub disk_usage_bytes: Option, } /// A simple language for describing assertions about a label's existence and value. diff --git a/src/storage-client/src/healthcheck.rs b/src/storage-client/src/healthcheck.rs index 1c164a2f24dd3..ec81ef1c028c6 100644 --- a/src/storage-client/src/healthcheck.rs +++ b/src/storage-client/src/healthcheck.rs @@ -158,9 +158,6 @@ pub static REPLICA_STATUS_HISTORY_DESC: LazyLock = LazyLock::new(| .finish() }); -/// NOTE: We want to avoid breaking schema changes, as those would cause the builtin migrations to -/// drop all existing data. For details on what changes are compatible, see -/// [`mz_persist_types::schema::backward_compatible`]. pub static REPLICA_METRICS_HISTORY_DESC: LazyLock = LazyLock::new(|| { RelationDesc::builder() .with_column("replica_id", SqlScalarType::String.nullable(false)) @@ -172,8 +169,6 @@ pub static REPLICA_METRICS_HISTORY_DESC: LazyLock = LazyLock::new( "occurred_at", SqlScalarType::TimestampTz { precision: None }.nullable(false), ) - .with_column("heap_bytes", SqlScalarType::UInt64.nullable(true)) - .with_column("heap_limit", SqlScalarType::UInt64.nullable(true)) .finish() }); diff --git a/test/sqllogictest/autogenerated/mz_internal.slt b/test/sqllogictest/autogenerated/mz_internal.slt index 700f69f0ce187..7b2047fd8a068 100644 --- a/test/sqllogictest/autogenerated/mz_internal.slt +++ b/test/sqllogictest/autogenerated/mz_internal.slt @@ -118,8 +118,6 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 3 cpu_nano_cores uint8 4 memory_bytes uint8 5 disk_bytes uint8 -6 heap_bytes uint8 -7 heap_limit uint8 query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_cluster_replica_metrics_history' ORDER BY position @@ -130,8 +128,6 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 4 memory_bytes uint8 5 disk_bytes uint8 6 occurred_at timestamp␠with␠time␠zone -7 heap_bytes uint8 -8 heap_limit uint8 query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_cluster_replica_statuses' ORDER BY position @@ -159,7 +155,6 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 3 cpu_percent double␠precision 4 memory_percent double␠precision 5 disk_percent double␠precision -6 heap_percent double␠precision query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_cluster_replica_utilization_history' ORDER BY position @@ -169,8 +164,7 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 3 cpu_percent double␠precision 4 memory_percent double␠precision 5 disk_percent double␠precision -6 heap_percent double␠precision -7 occurred_at timestamp␠with␠time␠zone +6 occurred_at timestamp␠with␠time␠zone query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_cluster_replica_history' ORDER BY position diff --git a/test/sqllogictest/mz_catalog_server_index_accounting.slt b/test/sqllogictest/mz_catalog_server_index_accounting.slt index 4781bc80df6d4..f86a6b3d9f50d 100644 --- a/test/sqllogictest/mz_catalog_server_index_accounting.slt +++ b/test/sqllogictest/mz_catalog_server_index_accounting.slt @@ -224,15 +224,11 @@ mz_cluster_replica_history replica_name mz_cluster_replica_history size mz_cluster_replica_metrics cpu_nano_cores mz_cluster_replica_metrics disk_bytes -mz_cluster_replica_metrics heap_bytes -mz_cluster_replica_metrics heap_limit mz_cluster_replica_metrics memory_bytes mz_cluster_replica_metrics process_id mz_cluster_replica_metrics replica_id mz_cluster_replica_metrics_history cpu_nano_cores mz_cluster_replica_metrics_history disk_bytes -mz_cluster_replica_metrics_history heap_bytes -mz_cluster_replica_metrics_history heap_limit mz_cluster_replica_metrics_history memory_bytes mz_cluster_replica_metrics_history occurred_at mz_cluster_replica_metrics_history process_id diff --git a/test/sqllogictest/system-cluster.slt b/test/sqllogictest/system-cluster.slt index 9f7154ecaddf7..7df0c5b01b201 100644 --- a/test/sqllogictest/system-cluster.slt +++ b/test/sqllogictest/system-cluster.slt @@ -410,7 +410,7 @@ ORDER BY r.id; ---- Explained Query: Finish order_by=[#0{id} asc nulls_last] output=[#0..=#5] - Project (#0{id}..=#3{size}, #5{name}, #31) + Project (#0{id}..=#3{size}, #5{name}, #29) Map (((uint8_to_double(#27{memory_bytes}) / uint8_to_double(case when (0 = uint8_to_numeric(#21{memory_bytes})) then null else #21{memory_bytes} end)) * 100)) Join on=(#0{id} = #15{id} = #24{replica_id} AND #2{cluster_id} = #4{id} AND #16{size} = #17{size}) type=delta ArrangeBy keys=[[#0{id}], [#2{cluster_id}]]