Skip to content

Commit faaaf56

Browse files
committed
clusterd: expose heap usage and limit
This commit extends the usage metrics provided by clusterd's `/api/usage-metrics` endpoint with the memory usage, as well as the effective heap (memory+swap) limit. As for the existing usage collection, we mainly focus on Linux support, for macOS we only provide the required stubs to make sure the code compiles and run.
1 parent 60fe66e commit faaaf56

File tree

2 files changed

+169
-23
lines changed

2 files changed

+169
-23
lines changed

src/clusterd/src/usage_metrics.rs

Lines changed: 153 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::path::PathBuf;
1616

1717
use serde::Serialize;
18-
use tracing::error;
18+
use tracing::{debug, error};
1919

2020
/// A system usage metrics collector.
2121
pub(crate) struct Collector {
@@ -25,9 +25,15 @@ pub(crate) struct Collector {
2525
impl Collector {
2626
/// Collect current system usage metrics.
2727
pub fn collect(&self) -> Usage {
28+
let disk_bytes = self.collect_disk_usage();
29+
let (memory_bytes, swap_bytes) = collect_heap_usage();
30+
let heap_limit = collect_heap_limit();
31+
2832
Usage {
29-
disk_bytes: self.collect_disk_usage(),
30-
swap_bytes: self.collect_swap_usage(),
33+
disk_bytes,
34+
memory_bytes,
35+
swap_bytes,
36+
heap_limit,
3137
}
3238
}
3339

@@ -49,35 +55,164 @@ impl Collector {
4955
let used_blocks = u64::from(stat.blocks() - stat.blocks_available());
5056
let used_bytes = used_blocks * stat.fragment_size();
5157

58+
debug!("disk usage: {used_bytes}");
59+
5260
Some(used_bytes)
5361
}
62+
}
63+
64+
/// A system usage measurement.
65+
#[derive(Serialize)]
66+
pub(crate) struct Usage {
67+
disk_bytes: Option<u64>,
68+
memory_bytes: Option<u64>,
69+
swap_bytes: Option<u64>,
70+
heap_limit: Option<u64>,
71+
}
72+
73+
#[cfg(target_os = "linux")]
74+
mod linux {
75+
use std::fs;
76+
use std::path::Path;
5477

55-
#[cfg(target_os = "linux")]
56-
fn collect_swap_usage(&self) -> Option<u64> {
57-
use mz_compute::memory_limiter::ProcStatus;
78+
use mz_compute::memory_limiter;
79+
use mz_ore::cast::CastInto;
80+
use tracing::{debug, error};
81+
82+
/// Collect memory and swap usage.
83+
pub fn collect_heap_usage() -> (Option<u64>, Option<u64>) {
5884
use mz_ore::cast::CastInto;
5985

60-
match ProcStatus::from_proc() {
86+
match memory_limiter::ProcStatus::from_proc() {
6187
Ok(status) => {
62-
let bytes = status.vm_swap.cast_into();
63-
Some(bytes)
88+
let memory_bytes = status.vm_rss.cast_into();
89+
let swap_bytes = status.vm_swap.cast_into();
90+
91+
debug!("memory usage: {memory_bytes}");
92+
debug!("swap usage: {swap_bytes}");
93+
94+
(Some(memory_bytes), Some(swap_bytes))
6495
}
6596
Err(err) => {
6697
error!("error reading /proc/self/status: {err}");
67-
None
98+
(None, None)
6899
}
69100
}
70101
}
71102

72-
#[cfg(not(target_os = "linux"))]
73-
fn collect_swap_usage(&self) -> Option<u64> {
74-
None
103+
/// Collect the heap limit, i.e. memory + swap limit.
104+
pub fn collect_heap_limit() -> Option<u64> {
105+
// If we don't know the physical limits, we can't know the heap limit.
106+
let (phys_mem_limit, phys_swap_limit) = get_physical_limits()?;
107+
108+
// Limits might be reduced by the cgroup.
109+
let (cgroup_mem_limit, cgroup_swap_limit) = get_cgroup_limits();
110+
let mem_limit = cgroup_mem_limit.unwrap_or(u64::MAX).min(phys_mem_limit);
111+
let swap_limit = cgroup_swap_limit.unwrap_or(u64::MAX).min(phys_swap_limit);
112+
113+
let heap_limit = mem_limit + swap_limit;
114+
115+
// Heap limit might be reduced by the memory limiter.
116+
let limiter_limit = memory_limiter::get_memory_limit().map(CastInto::cast_into);
117+
let heap_limit = limiter_limit.unwrap_or(u64::MAX).min(heap_limit);
118+
119+
debug!("memory limit: {mem_limit} (phys={phys_mem_limit}, cgroup={cgroup_mem_limit:?})");
120+
debug!("swap limit: {swap_limit} (phys={phys_swap_limit}, cgroup={cgroup_swap_limit:?})");
121+
debug!("heap limit: {heap_limit} (limiter={limiter_limit:?})");
122+
123+
Some(heap_limit)
124+
}
125+
126+
/// Collect the physical memory and swap limits.
127+
fn get_physical_limits() -> Option<(u64, u64)> {
128+
let meminfo = match fs::read_to_string("/proc/meminfo") {
129+
Ok(meminfo) => meminfo,
130+
Err(error) => {
131+
error!("reading `/proc/meminfo`: {error}");
132+
return None;
133+
}
134+
};
135+
136+
fn parse_kib_line(line: &str) -> Option<u64> {
137+
let result = line
138+
.split_whitespace()
139+
.nth(1)
140+
.and_then(|x| x.parse::<u64>().ok())
141+
.map(|kib| kib * 1024);
142+
if result.is_none() {
143+
error!("invalid `/proc/meminfo` line: {line}");
144+
}
145+
result
146+
}
147+
148+
let mut memory = None;
149+
let mut swap = None;
150+
for line in meminfo.lines() {
151+
if line.starts_with("MemTotal:") {
152+
memory = parse_kib_line(line);
153+
} else if line.starts_with("SwapTotal:") {
154+
swap = parse_kib_line(line);
155+
}
156+
}
157+
158+
let Some(memory) = memory else {
159+
error!("failed to collect physical memory limit");
160+
return None;
161+
};
162+
let Some(swap) = swap else {
163+
error!("failed to collect physical swap limit");
164+
return None;
165+
};
166+
167+
Some((memory, swap))
168+
}
169+
170+
/// Collect the memory and swap limits enforced by the current cgroup.
171+
///
172+
/// We make the following simplifying assumptions that hold for a standard Kubernetes
173+
/// environment:
174+
// * The current process is a member of exactly one cgroups v2 hierarchy.
175+
// * The cgroups hierarchy is mounted at `/sys/fs/cgroup`.
176+
// * The limits are applied to the current cgroup directly (and not one of its ancestors).
177+
fn get_cgroup_limits() -> (Option<u64>, Option<u64>) {
178+
let Ok(proc_cgroup) = fs::read_to_string("/proc/self/cgroup") else {
179+
return (None, None);
180+
};
181+
let Some(cgroup_path) = proc_cgroup.split(':').nth(2) else {
182+
error!("invalid `/proc/self/cgroup` format: {proc_cgroup}");
183+
return (None, None);
184+
};
185+
186+
let root = Path::new("/sys/fs/cgroup").join(cgroup_path);
187+
let memory_file = root.join("memory.max");
188+
let swap_file = root.join("memory.swap.max");
189+
190+
let memory = fs::read_to_string(memory_file)
191+
.ok()
192+
.and_then(|s| s.parse().ok());
193+
let swap = fs::read_to_string(swap_file)
194+
.ok()
195+
.and_then(|s| s.parse().ok());
196+
197+
(memory, swap)
75198
}
76199
}
77200

78-
/// A system usage measurement.
79-
#[derive(Serialize)]
80-
pub(crate) struct Usage {
81-
disk_bytes: Option<u64>,
82-
swap_bytes: Option<u64>,
201+
#[cfg(not(target_os = "linux"))]
202+
mod macos {
203+
use mz_compute::memory_limiter;
204+
use mz_ore::cast::CastInto;
205+
206+
pub fn collect_heap_usage() -> (Option<u64>, Option<u64>) {
207+
(None, None)
208+
}
209+
210+
pub fn collect_heap_limit() -> Option<u64> {
211+
memory_limiter::get_memory_limit().map(CastInto::cast_into)
212+
}
83213
}
214+
215+
#[cfg(target_os = "linux")]
216+
use linux::*;
217+
#[cfg(not(target_os = "linux"))]
218+
use macos::*;

src/compute/src/memory_limiter.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ pub fn start_limiter(memory_limit: usize, metrics_registry: &MetricsRegistry) {
5151
mz_ore::task::spawn(|| "memory-limiter", LimiterTask::run(config_rx, metrics));
5252

5353
*limiter = Some(Limiter {
54-
memory_limit,
54+
base_memory_limit: memory_limit,
55+
effective_memory_limit: memory_limit,
5556
config_tx,
5657
});
5758
}
@@ -63,17 +64,25 @@ pub fn apply_limiter_config(config: &ConfigSet) {
6364
}
6465
}
6566

67+
/// Get the current effective memory limit.
68+
pub fn get_memory_limit() -> Option<usize> {
69+
let limiter = LIMITER.lock().expect("poisoned");
70+
limiter.as_ref().map(|l| l.effective_memory_limit)
71+
}
72+
6673
/// A handle to a running memory limiter task.
6774
struct Limiter {
68-
/// The process memory limit.
69-
memory_limit: usize,
75+
/// The base process memory limit.
76+
base_memory_limit: usize,
77+
/// The effective memory limit, obtained by applying dyncfgs to the base limit.
78+
effective_memory_limit: usize,
7079
/// A sender for limiter configuration updates.
7180
config_tx: UnboundedSender<LimiterConfig>,
7281
}
7382

7483
impl Limiter {
7584
/// Apply the given configuration to the limiter.
76-
fn apply_config(&self, config: &ConfigSet) {
85+
fn apply_config(&mut self, config: &ConfigSet) {
7786
let mut interval = MEMORY_LIMITER_INTERVAL.get(config);
7887
// A zero duration means the limiter is disabled. Translate that into an ~infinite duration
7988
// so the limiter doesn't have to worry about the special case.
@@ -82,12 +91,14 @@ impl Limiter {
8291
}
8392

8493
let memory_limit =
85-
f64::cast_lossy(self.memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config);
94+
f64::cast_lossy(self.base_memory_limit) * MEMORY_LIMITER_USAGE_BIAS.get(config);
8695
let memory_limit = usize::cast_lossy(memory_limit);
8796

8897
let burst_budget = f64::cast_lossy(memory_limit) * MEMORY_LIMITER_BURST_FACTOR.get(config);
8998
let burst_budget = usize::cast_lossy(burst_budget);
9099

100+
self.effective_memory_limit = memory_limit;
101+
91102
self.config_tx
92103
.send(LimiterConfig {
93104
interval,

0 commit comments

Comments
 (0)