Skip to content

Commit a13253c

Browse files
committed
envd: collect and record heap metrics
This change makes the kubernetes orchestrator collect the new heap metrics reported by clusterd processes and report them to the controller, where they are added to the `mz_cluster_replica_metrics_history` relation.
1 parent faaaf56 commit a13253c

File tree

8 files changed

+84
-60
lines changed

8 files changed

+84
-60
lines changed

src/catalog/src/builtin.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4945,6 +4945,8 @@ pub static MZ_CLUSTER_REPLICA_METRICS: LazyLock<BuiltinView> = LazyLock::new(||
49454945
.with_column("cpu_nano_cores", SqlScalarType::UInt64.nullable(true))
49464946
.with_column("memory_bytes", SqlScalarType::UInt64.nullable(true))
49474947
.with_column("disk_bytes", SqlScalarType::UInt64.nullable(true))
4948+
.with_column("heap_bytes", SqlScalarType::UInt64.nullable(true))
4949+
.with_column("heap_limit", SqlScalarType::UInt64.nullable(true))
49484950
.with_key(vec![0, 1])
49494951
.finish(),
49504952
column_comments: BTreeMap::from_iter([
@@ -4964,7 +4966,9 @@ SELECT
49644966
process_id,
49654967
cpu_nano_cores,
49664968
memory_bytes,
4967-
disk_bytes
4969+
disk_bytes,
4970+
heap_bytes,
4971+
heap_limit
49684972
FROM mz_internal.mz_cluster_replica_metrics_history
49694973
JOIN mz_cluster_replicas r ON r.id = replica_id
49704974
ORDER BY replica_id, process_id, occurred_at DESC",
@@ -8863,6 +8867,7 @@ pub static MZ_CLUSTER_REPLICA_UTILIZATION: LazyLock<BuiltinView> = LazyLock::new
88638867
.with_column("cpu_percent", SqlScalarType::Float64.nullable(true))
88648868
.with_column("memory_percent", SqlScalarType::Float64.nullable(true))
88658869
.with_column("disk_percent", SqlScalarType::Float64.nullable(true))
8870+
.with_column("heap_percent", SqlScalarType::Float64.nullable(true))
88668871
.finish(),
88678872
column_comments: BTreeMap::from_iter([
88688873
("replica_id", "The ID of a cluster replica."),
@@ -8886,7 +8891,8 @@ SELECT
88868891
m.process_id,
88878892
m.cpu_nano_cores::float8 / NULLIF(s.cpu_nano_cores, 0) * 100 AS cpu_percent,
88888893
m.memory_bytes::float8 / NULLIF(s.memory_bytes, 0) * 100 AS memory_percent,
8889-
m.disk_bytes::float8 / NULLIF(s.disk_bytes, 0) * 100 AS disk_percent
8894+
m.disk_bytes::float8 / NULLIF(s.disk_bytes, 0) * 100 AS disk_percent,
8895+
m.heap_bytes::float8 / NULLIF(m.heap_limit, 0) * 100 AS heap_percent
88908896
FROM
88918897
mz_catalog.mz_cluster_replicas AS r
88928898
JOIN mz_catalog.mz_cluster_replica_sizes AS s ON r.size = s.size
@@ -8905,6 +8911,7 @@ pub static MZ_CLUSTER_REPLICA_UTILIZATION_HISTORY: LazyLock<BuiltinView> =
89058911
.with_column("cpu_percent", SqlScalarType::Float64.nullable(true))
89068912
.with_column("memory_percent", SqlScalarType::Float64.nullable(true))
89078913
.with_column("disk_percent", SqlScalarType::Float64.nullable(true))
8914+
.with_column("heap_percent", SqlScalarType::Float64.nullable(true))
89088915
.with_column(
89098916
"occurred_at",
89108917
SqlScalarType::TimestampTz { precision: None }.nullable(false),
@@ -8937,6 +8944,7 @@ SELECT
89378944
m.cpu_nano_cores::float8 / NULLIF(s.cpu_nano_cores, 0) * 100 AS cpu_percent,
89388945
m.memory_bytes::float8 / NULLIF(s.memory_bytes, 0) * 100 AS memory_percent,
89398946
m.disk_bytes::float8 / NULLIF(s.disk_bytes, 0) * 100 AS disk_percent,
8947+
m.heap_bytes::float8 / NULLIF(m.heap_limit, 0) * 100 AS heap_percent,
89408948
m.occurred_at
89418949
FROM
89428950
mz_catalog.mz_cluster_replicas AS r

src/controller/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -591,8 +591,10 @@ where
591591
Datum::UInt64(u64::cast_from(process_id)),
592592
m.cpu_nano_cores.into(),
593593
m.memory_bytes.into(),
594-
m.disk_usage_bytes.into(),
594+
m.disk_bytes.into(),
595595
Datum::TimestampTz(now_tz),
596+
m.heap_bytes.into(),
597+
m.heap_limit.into(),
596598
]);
597599
(row.clone(), mz_repr::Diff::ONE)
598600
})

src/orchestrator-kubernetes/src/lib.rs

Lines changed: 57 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ use mz_ore::task::AbortOnDropHandle;
5454
use serde::Deserialize;
5555
use sha2::{Digest, Sha256};
5656
use tokio::sync::{mpsc, oneshot};
57-
use tracing::{info, warn};
57+
use tracing::{error, info, warn};
5858

5959
pub mod cloud_resource_controller;
6060
pub mod secrets;
@@ -1423,6 +1423,15 @@ impl OrchestratorWorker {
14231423
.collect();
14241424
}
14251425

1426+
/// Usage metrics reported by clusterd processes.
1427+
#[derive(Deserialize)]
1428+
pub(crate) struct ClusterdUsage {
1429+
disk_bytes: Option<u64>,
1430+
memory_bytes: Option<u64>,
1431+
swap_bytes: Option<u64>,
1432+
heap_limit: Option<u64>,
1433+
}
1434+
14261435
/// Get metrics for a particular service and process, converting them into a sane (i.e., numeric) format.
14271436
///
14281437
/// Note that we want to keep going even if a lookup fails for whatever reason,
@@ -1435,12 +1444,13 @@ impl OrchestratorWorker {
14351444
) -> ServiceProcessMetrics {
14361445
let name = format!("{service_name}-{i}");
14371446

1438-
let disk_usage_fut = get_disk_usage(self_, service_name, i);
1439-
let (metrics, disk_usage) =
1440-
match futures::future::join(self_.metrics_api.get(&name), disk_usage_fut).await {
1441-
(Ok(metrics), Ok(disk_usage)) => (metrics, disk_usage),
1447+
let clusterd_usage_fut = get_clusterd_usage(self_, service_name, i);
1448+
let (metrics, clusterd_usage) =
1449+
match futures::future::join(self_.metrics_api.get(&name), clusterd_usage_fut).await
1450+
{
1451+
(Ok(metrics), Ok(clusterd_usage)) => (metrics, Some(clusterd_usage)),
14421452
(Ok(metrics), Err(e)) => {
1443-
warn!("Failed to fetch disk usage for {name}: {e}");
1453+
warn!("Failed to fetch clusterd usage for {name}: {e}");
14441454
(metrics, None)
14451455
}
14461456
(Err(e), _) => {
@@ -1461,56 +1471,58 @@ impl OrchestratorWorker {
14611471
return ServiceProcessMetrics::default();
14621472
};
14631473

1464-
let cpu = match parse_k8s_quantity(cpu_str) {
1474+
let mut process_metrics = ServiceProcessMetrics::default();
1475+
1476+
match parse_k8s_quantity(cpu_str) {
14651477
Ok(q) => match q.try_to_integer(-9, true) {
1466-
Some(i) => Some(i),
1467-
None => {
1468-
tracing::error!("CPU value {q:? }out of range");
1469-
None
1470-
}
1478+
Some(nano_cores) => process_metrics.cpu_nano_cores = Some(nano_cores),
1479+
None => error!("CPU value {q:?} out of range"),
14711480
},
1472-
Err(e) => {
1473-
tracing::error!("Failed to parse CPU value {cpu_str}: {e}");
1474-
None
1475-
}
1476-
};
1477-
let memory = match parse_k8s_quantity(mem_str) {
1481+
Err(e) => error!("failed to parse CPU value {cpu_str}: {e}"),
1482+
}
1483+
match parse_k8s_quantity(mem_str) {
14781484
Ok(q) => match q.try_to_integer(0, false) {
1479-
Some(i) => Some(i),
1480-
None => {
1481-
tracing::error!("Memory value {q:?} out of range");
1482-
None
1483-
}
1485+
Some(mem) => process_metrics.memory_bytes = Some(mem),
1486+
None => error!("memory value {q:?} out of range"),
14841487
},
1485-
Err(e) => {
1486-
tracing::error!("Failed to parse memory value {mem_str}: {e}");
1487-
None
1488-
}
1489-
};
1488+
Err(e) => error!("failed to parse memory value {mem_str}: {e}"),
1489+
}
1490+
1491+
if let Some(usage) = clusterd_usage {
1492+
// clusterd may report disk usage as either `disk_bytes`, or `swap_bytes`, or both.
1493+
//
1494+
// For now the Console expects the swap size to be reported in `disk_bytes`.
1495+
// Once the Console has been ported to use `heap_bytes`/`heap_limit`, we can
1496+
// simplify things by setting `process_metrics.disk_bytes = usage.disk_bytes`.
1497+
process_metrics.disk_bytes = match (usage.disk_bytes, usage.swap_bytes) {
1498+
(Some(disk), Some(swap)) => Some(disk + swap),
1499+
(disk, swap) => disk.or(swap),
1500+
};
1501+
1502+
// clusterd may report heap usage as `memory_bytes` and optionally `swap_bytes`.
1503+
// If no `memory_bytes` is reported, we can't know the heap usage.
1504+
process_metrics.heap_bytes = match (usage.memory_bytes, usage.swap_bytes) {
1505+
(Some(memory), Some(swap)) => Some(memory + swap),
1506+
(Some(memory), None) => Some(memory),
1507+
(None, _) => None,
1508+
};
14901509

1491-
ServiceProcessMetrics {
1492-
cpu_nano_cores: cpu,
1493-
memory_bytes: memory,
1494-
disk_usage_bytes: disk_usage,
1510+
process_metrics.heap_limit = usage.heap_limit;
14951511
}
1512+
1513+
process_metrics
14961514
}
14971515

1498-
/// Get the current disk usage for a particular service and process.
1516+
/// Get the current usage metrics exposed by a clusterd process.
14991517
///
1500-
/// Disk usage is collected by connecting to a metrics endpoint exposed by the process. The
1501-
/// endpoint is assumed to be reachable at the 'internal-http' under the HTTP path
1518+
/// Usage metrics are collected by connecting to a metrics endpoint exposed by the process.
1519+
/// The endpoint is assumed to be reachable at the 'internal-http' under the HTTP path
15021520
/// `/api/usage-metrics`.
1503-
async fn get_disk_usage(
1521+
async fn get_clusterd_usage(
15041522
self_: &OrchestratorWorker,
15051523
service_name: &str,
15061524
i: usize,
1507-
) -> anyhow::Result<Option<u64>> {
1508-
#[derive(Deserialize)]
1509-
pub(crate) struct Usage {
1510-
disk_bytes: Option<u64>,
1511-
swap_bytes: Option<u64>,
1512-
}
1513-
1525+
) -> anyhow::Result<ClusterdUsage> {
15141526
let service = self_
15151527
.service_api
15161528
.get(service_name)
@@ -1542,17 +1554,9 @@ impl OrchestratorWorker {
15421554
.build()
15431555
.context("error building HTTP client")?;
15441556
let resp = http_client.get(metrics_url).send().await?;
1545-
let Usage {
1546-
disk_bytes,
1547-
swap_bytes,
1548-
} = resp.json().await?;
1557+
let usage = resp.json().await?;
15491558

1550-
let bytes = if let (Some(disk), Some(swap)) = (disk_bytes, swap_bytes) {
1551-
Some(disk + swap)
1552-
} else {
1553-
disk_bytes.or(swap_bytes)
1554-
};
1555-
Ok(bytes)
1559+
Ok(usage)
15561560
}
15571561

15581562
let ret =

src/orchestrator-process/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -553,8 +553,10 @@ impl OrchestratorWorker {
553553
metrics.push(ServiceProcessMetrics {
554554
cpu_nano_cores,
555555
memory_bytes,
556-
// Process orchestrator does not support this right now.
557-
disk_usage_bytes: None,
556+
// Process orchestrator does not support the remaining fields right now.
557+
disk_bytes: None,
558+
heap_bytes: None,
559+
heap_limit: None,
558560
});
559561
}
560562
Ok(metrics)

src/orchestrator/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ pub trait Service: fmt::Debug + Send + Sync {
145145
pub struct ServiceProcessMetrics {
146146
pub cpu_nano_cores: Option<u64>,
147147
pub memory_bytes: Option<u64>,
148-
pub disk_usage_bytes: Option<u64>,
148+
pub disk_bytes: Option<u64>,
149+
pub heap_bytes: Option<u64>,
150+
pub heap_limit: Option<u64>,
149151
}
150152

151153
/// A simple language for describing assertions about a label's existence and value.

src/storage-client/src/healthcheck.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,8 @@ pub static REPLICA_METRICS_HISTORY_DESC: LazyLock<RelationDesc> = LazyLock::new(
169169
"occurred_at",
170170
SqlScalarType::TimestampTz { precision: None }.nullable(false),
171171
)
172+
.with_column("heap_bytes", SqlScalarType::UInt64.nullable(true))
173+
.with_column("heap_limit", SqlScalarType::UInt64.nullable(true))
172174
.finish()
173175
});
174176

test/sqllogictest/mz_catalog_server_index_accounting.slt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,15 @@ mz_cluster_replica_history replica_name
224224
mz_cluster_replica_history size
225225
mz_cluster_replica_metrics cpu_nano_cores
226226
mz_cluster_replica_metrics disk_bytes
227+
mz_cluster_replica_metrics heap_bytes
228+
mz_cluster_replica_metrics heap_limit
227229
mz_cluster_replica_metrics memory_bytes
228230
mz_cluster_replica_metrics process_id
229231
mz_cluster_replica_metrics replica_id
230232
mz_cluster_replica_metrics_history cpu_nano_cores
231233
mz_cluster_replica_metrics_history disk_bytes
234+
mz_cluster_replica_metrics_history heap_bytes
235+
mz_cluster_replica_metrics_history heap_limit
232236
mz_cluster_replica_metrics_history memory_bytes
233237
mz_cluster_replica_metrics_history occurred_at
234238
mz_cluster_replica_metrics_history process_id

test/sqllogictest/system-cluster.slt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ ORDER BY r.id;
410410
----
411411
Explained Query:
412412
Finish order_by=[#0{id} asc nulls_last] output=[#0..=#5]
413-
Project (#0{id}..=#3{size}, #5{name}, #29)
413+
Project (#0{id}..=#3{size}, #5{name}, #31)
414414
Map (((uint8_to_double(#27{memory_bytes}) / uint8_to_double(case when (0 = uint8_to_numeric(#21{memory_bytes})) then null else #21{memory_bytes} end)) * 100))
415415
Join on=(#0{id} = #15{id} = #24{replica_id} AND #2{cluster_id} = #4{id} AND #16{size} = #17{size}) type=delta
416416
ArrangeBy keys=[[#0{id}], [#2{cluster_id}]]

0 commit comments

Comments
 (0)