Skip to content

Commit 39bc08a

Browse files
committed
clusterd: expose heap usage and limit
This commit extends the usage metrics provided by clusterd's `/api/usage-metrics` endpoint with the memory usage, as well as the effective heap (memory+swap) limit. As for the existing usage collection, we mainly focus on Linux support, for macOS we only provide the required stubs to make sure the code compiles and run.
1 parent 60fe66e commit 39bc08a

File tree

2 files changed

+181
-23
lines changed

2 files changed

+181
-23
lines changed

src/clusterd/src/usage_metrics.rs

Lines changed: 165 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::path::PathBuf;
1616

1717
use serde::Serialize;
18-
use tracing::error;
18+
use tracing::{debug, error};
1919

2020
/// A system usage metrics collector.
2121
pub(crate) struct Collector {
@@ -25,9 +25,15 @@ pub(crate) struct Collector {
2525
impl Collector {
2626
/// Collect current system usage metrics.
2727
pub fn collect(&self) -> Usage {
28+
let disk_bytes = self.collect_disk_usage();
29+
let (memory_bytes, swap_bytes) = collect_heap_usage();
30+
let heap_limit = collect_heap_limit();
31+
2832
Usage {
29-
disk_bytes: self.collect_disk_usage(),
30-
swap_bytes: self.collect_swap_usage(),
33+
disk_bytes,
34+
memory_bytes,
35+
swap_bytes,
36+
heap_limit,
3137
}
3238
}
3339

@@ -49,35 +55,176 @@ impl Collector {
4955
let used_blocks = u64::from(stat.blocks() - stat.blocks_available());
5056
let used_bytes = used_blocks * stat.fragment_size();
5157

58+
debug!("disk usage: {used_bytes}");
59+
5260
Some(used_bytes)
5361
}
62+
}
63+
64+
/// A system usage measurement.
65+
#[derive(Serialize)]
66+
pub(crate) struct Usage {
67+
disk_bytes: Option<u64>,
68+
memory_bytes: Option<u64>,
69+
swap_bytes: Option<u64>,
70+
heap_limit: Option<u64>,
71+
}
72+
73+
#[cfg(target_os = "linux")]
74+
mod linux {
75+
use std::fs;
76+
use std::path::Path;
5477

55-
#[cfg(target_os = "linux")]
56-
fn collect_swap_usage(&self) -> Option<u64> {
57-
use mz_compute::memory_limiter::ProcStatus;
78+
use anyhow::{anyhow, bail};
79+
use mz_compute::memory_limiter;
80+
use mz_ore::cast::CastInto;
81+
use tracing::{debug, error};
82+
83+
/// Collect memory and swap usage.
84+
pub fn collect_heap_usage() -> (Option<u64>, Option<u64>) {
5885
use mz_ore::cast::CastInto;
5986

60-
match ProcStatus::from_proc() {
87+
match memory_limiter::ProcStatus::from_proc() {
6188
Ok(status) => {
62-
let bytes = status.vm_swap.cast_into();
63-
Some(bytes)
89+
let memory_bytes = status.vm_rss.cast_into();
90+
let swap_bytes = status.vm_swap.cast_into();
91+
92+
debug!("memory usage: {memory_bytes}");
93+
debug!("swap usage: {swap_bytes}");
94+
95+
(Some(memory_bytes), Some(swap_bytes))
6496
}
6597
Err(err) => {
6698
error!("error reading /proc/self/status: {err}");
67-
None
99+
(None, None)
68100
}
69101
}
70102
}
71103

72-
#[cfg(not(target_os = "linux"))]
73-
fn collect_swap_usage(&self) -> Option<u64> {
74-
None
104+
/// Collect the heap limit, i.e. memory + swap limit.
105+
pub fn collect_heap_limit() -> Option<u64> {
106+
// If we don't know the physical limits, we can't know the heap limit.
107+
let (phys_mem_limit, phys_swap_limit) = get_physical_limits()?;
108+
109+
// Limits might be reduced by the cgroup.
110+
let (cgroup_mem_limit, cgroup_swap_limit) = get_cgroup_limits();
111+
let mem_limit = cgroup_mem_limit.unwrap_or(u64::MAX).min(phys_mem_limit);
112+
let swap_limit = cgroup_swap_limit.unwrap_or(u64::MAX).min(phys_swap_limit);
113+
114+
let heap_limit = mem_limit + swap_limit;
115+
116+
// Heap limit might be reduced by the memory limiter.
117+
let limiter_limit = memory_limiter::get_memory_limit().map(CastInto::cast_into);
118+
let heap_limit = limiter_limit.unwrap_or(u64::MAX).min(heap_limit);
119+
120+
debug!("memory limit: {mem_limit} (phys={phys_mem_limit}, cgroup={cgroup_mem_limit:?})");
121+
debug!("swap limit: {swap_limit} (phys={phys_swap_limit}, cgroup={cgroup_swap_limit:?})");
122+
debug!("heap limit: {heap_limit} (limiter={limiter_limit:?})");
123+
124+
Some(heap_limit)
125+
}
126+
127+
/// Helper for parsing `/proc/meminfo`.
128+
struct ProcMemInfo {
129+
mem_total: u64,
130+
swap_total: u64,
131+
}
132+
133+
impl ProcMemInfo {
134+
fn from_proc() -> anyhow::Result<Self> {
135+
let contents = fs::read_to_string("/proc/meminfo")?;
136+
137+
fn parse_kib_line(line: &str) -> anyhow::Result<u64> {
138+
if let Some(kib) = line
139+
.split_whitespace()
140+
.nth(1)
141+
.and_then(|x| x.parse::<u64>().ok())
142+
{
143+
Ok(kib * 1024)
144+
} else {
145+
bail!("invalid meminfo line: {line}");
146+
}
147+
}
148+
149+
let mut memory = None;
150+
let mut swap = None;
151+
for line in contents.lines() {
152+
if line.starts_with("MemTotal:") {
153+
memory = Some(parse_kib_line(line)?);
154+
} else if line.starts_with("SwapTotal:") {
155+
swap = Some(parse_kib_line(line)?);
156+
}
157+
}
158+
159+
let mem_total = memory.ok_or_else(|| anyhow!("MemTotal not found"))?;
160+
let swap_total = swap.ok_or_else(|| anyhow!("SwapTotal not found"))?;
161+
162+
Ok(Self {
163+
mem_total,
164+
swap_total,
165+
})
166+
}
167+
}
168+
169+
/// Collect the physical memory and swap limits.
170+
fn get_physical_limits() -> Option<(u64, u64)> {
171+
let meminfo = match ProcMemInfo::from_proc() {
172+
Ok(meminfo) => meminfo,
173+
Err(error) => {
174+
error!("reading `/proc/meminfo`: {error}");
175+
return None;
176+
}
177+
};
178+
179+
Some((meminfo.mem_total, meminfo.swap_total))
180+
}
181+
182+
/// Collect the memory and swap limits enforced by the current cgroup.
183+
///
184+
/// We make the following simplifying assumptions that hold for a standard Kubernetes
185+
/// environment:
186+
// * The current process is a member of exactly one cgroups v2 hierarchy.
187+
// * The cgroups hierarchy is mounted at `/sys/fs/cgroup`.
188+
// * The limits are applied to the current cgroup directly (and not one of its ancestors).
189+
fn get_cgroup_limits() -> (Option<u64>, Option<u64>) {
190+
let Ok(proc_cgroup) = fs::read_to_string("/proc/self/cgroup") else {
191+
return (None, None);
192+
};
193+
let Some(cgroup_path) = proc_cgroup.split(':').nth(2) else {
194+
error!("invalid `/proc/self/cgroup` format: {proc_cgroup}");
195+
return (None, None);
196+
};
197+
198+
let root = Path::new("/sys/fs/cgroup").join(cgroup_path);
199+
let memory_file = root.join("memory.max");
200+
let swap_file = root.join("memory.swap.max");
201+
202+
let memory = fs::read_to_string(memory_file)
203+
.ok()
204+
.and_then(|s| s.parse().ok());
205+
let swap = fs::read_to_string(swap_file)
206+
.ok()
207+
.and_then(|s| s.parse().ok());
208+
209+
(memory, swap)
75210
}
76211
}
77212

78-
/// A system usage measurement.
79-
#[derive(Serialize)]
80-
pub(crate) struct Usage {
81-
disk_bytes: Option<u64>,
82-
swap_bytes: Option<u64>,
213+
#[cfg(not(target_os = "linux"))]
214+
mod macos {
215+
use mz_compute::memory_limiter;
216+
use mz_ore::cast::CastInto;
217+
218+
pub fn collect_heap_usage() -> (Option<u64>, Option<u64>) {
219+
(None, None)
220+
}
221+
222+
pub fn collect_heap_limit() -> Option<u64> {
223+
memory_limiter::get_memory_limit().map(CastInto::cast_into)
224+
}
83225
}
226+
227+
#[cfg(target_os = "linux")]
228+
use linux::*;
229+
#[cfg(not(target_os = "linux"))]
230+
use macos::*;

src/compute/src/memory_limiter.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
5151
mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics));
5252

5353
*limiter = Some(Limiter {
54-
memory_limit,
54+
base_memory_limit: memory_limit,
55+
effective_memory_limit: memory_limit,
5556
config_tx,
5657
});
5758
}
@@ -63,17 +64,25 @@ pub fn apply_limiter_config(config: &ConfigSet) {
6364
}
6465
}
6566

67+
/// Get the current effective memory limit.
68+
pub fn get_memory_limit() -> Option<usize> {
69+
let limiter = LIMITER.lock().expect("poisoned");
70+
limiter.as_ref().map(|l| l.effective_memory_limit)
71+
}
72+
6673
/// A handle to a running memory limiter task.
6774
struct Limiter {
68-
/// The process memory limit.
69-
memory_limit: usize,
75+
/// The base process memory limit.
76+
base_memory_limit: usize,
77+
/// The effective memory limit, obtained by applying dyncfgs to the base limit.
78+
effective_memory_limit: usize,
7079
/// A sender for limiter configuration updates.
7180
config_tx: UnboundedSender<LimiterConfig>,
7281
}
7382

7483
impl Limiter {
7584
/// Apply the given configuration to the limiter.
76-
fn apply_config(&self, config: &ConfigSet) {
85+
fn apply_config(&mut self, config: &ConfigSet) {
7786
let mut interval = MEMORY_LIMITER_INTERVAL.get(config);
7887
// A zero duration means the limiter is disabled. Translate that into an ~infinite duration
7988
// so the limiter doesn't have to worry about the special case.
@@ -82,12 +91,14 @@ impl Limiter {
8291
}
8392

8493
let memory_limit =
85-
f64::cast_lossy(self.memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config);
94+
f64::cast_lossy(self.base_memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config);
8695
let memory_limit = usize::cast_lossy(memory_limit);
8796

8897
let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config);
8998
let burst_budget = usize::cast_lossy(burst_budget);
9099

100+
self.effective_memory_limit = memory_limit;
101+
91102
self.config_tx
92103
.send(LimiterConfig {
93104
interval,

0 commit comments

Comments
 (0)