Skip to content

Commit 1458634

Browse files
authored
batch: add metrics for total batches and timeout batches (#97)
Allow evaluating the efficiency of the batch flush settings. Signed-off-by: Matt Klein <[email protected]>
1 parent 138c147 commit 1458634

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+715
-728
lines changed

Cargo.lock

Lines changed: 638 additions & 649 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@ ahash = "0.8"
1616
anyhow = "1"
1717
assert_matches = "1.5.0"
1818
async-trait = "0.1"
19-
aws-config = "1.8.6"
20-
aws-credential-types = "1.2.6"
21-
aws-sdk-sqs = "1.83.0"
22-
aws-sigv4 = "1.3.4"
23-
aws-smithy-async = "1.2.5"
24-
aws-smithy-http = "0.62.3"
25-
aws-smithy-runtime-api = { version = "1.9.0", features = ["test-util"] }
26-
aws-smithy-types = "1.3.2"
27-
axum = "0.8.4"
19+
aws-config = "1.8.8"
20+
aws-credential-types = "1.2.8"
21+
aws-sdk-sqs = "1.86.0"
22+
aws-sigv4 = "1.3.5"
23+
aws-smithy-async = "1.2.6"
24+
aws-smithy-http = "0.62.4"
25+
aws-smithy-runtime-api = { version = "1.9.1", features = ["test-util"] }
26+
aws-smithy-types = "1.3.3"
27+
axum = "0.8.6"
2828
backoff = { version = "0.4.0", features = ["tokio"] }
2929
base64ct = "1.8.0"
3030
bd-grpc = { git = "https://github.com/bitdriftlabs/shared-core.git" }
@@ -42,12 +42,12 @@ bd-test-helpers = { git = "https://github.com/bitdriftlabs/shared-core.gi
4242
bd-time = { git = "https://github.com/bitdriftlabs/shared-core.git" }
4343
built = { version = "0.8", features = ["git2"] }
4444
bytes = "1"
45-
cc = "1.2.37"
46-
clap = { version = "4.5.47", features = ["derive", "env"] }
45+
cc = "1.2.41"
46+
clap = { version = "4.5.50", features = ["derive", "env"] }
4747
comfy-table = "7.2.1"
4848
console-subscriber = "0.4.1"
4949
criterion = { version = "0.7", features = ["html_reports"] }
50-
ctor = "0.5.0"
50+
ctor = "0.6.0"
5151
cuckoofilter = "0.5.0"
5252
dashmap = { version = "6", features = ["raw-api"] }
5353
deadpool = { version = "0.12", features = ["managed", "rt_tokio_1"] }
@@ -69,13 +69,13 @@ hyper-rustls = { version = "0.27.7", default-features = false, features = [
6969
"aws-lc-rs",
7070
] }
7171

72-
hyper-util = { version = "0.1.16", features = ["client", "client-legacy"] }
72+
hyper-util = { version = "0.1.17", features = ["client", "client-legacy"] }
7373
hyperloglogplus = "0.4.1"
7474
intrusive-collections = "0.9.7"
7575
itertools = "0.14.0"
7676
k8s-openapi = { version = "0.26.0", features = ["v1_30"] }
7777

78-
kube = { version = "2.0.0", features = [
78+
kube = { version = "2.0.1", features = [
7979
"runtime",
8080
"derive",
8181
"rustls-tls",
@@ -106,24 +106,24 @@ rand = { version = "0.9", features = ["small_rng"] }
106106
rand_xoshiro = "0.7"
107107
regex = "1"
108108

109-
reqwest = { version = "0.12.23", default-features = false, features = [
109+
reqwest = { version = "0.12.24", default-features = false, features = [
110110
"rustls-tls-webpki-roots",
111111
"json",
112112
] }
113113

114114
reusable-fmt = "0.2.0"
115-
rustls = "0.23.31"
115+
rustls = "0.23.34"
116116
serde = { version = "1", features = ["derive"] }
117117
serde_json = "1"
118118
serde_yaml = "0.9.34"
119119
snap = "1"
120-
socket2 = "0.6.0"
121-
tempfile = "3.22"
120+
socket2 = "0.6.1"
121+
tempfile = "3.23"
122122
thiserror = "2"
123-
tikv-jemalloc-ctl = "0.6.0"
123+
tikv-jemalloc-ctl = "0.6.1"
124124
tikv-jemallocator.features = ["profiling", "stats", "unprefixed_malloc_on_supported_platforms"]
125-
tikv-jemallocator.version = "0.6.0"
126-
time = { version = "0.3.43", features = ["formatting"] }
125+
tikv-jemallocator.version = "0.6.1"
126+
time = { version = "0.3.44", features = ["formatting"] }
127127
tokio = { version = "1", features = ["full", "parking_lot", "tracing", "test-util"] }
128128
tokio-stream = "0.1.17"
129129
tokio-test = "0.4"

deny.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[graph]
22
all-features = false
3+
exclude = ["bd-workspace-hack"]
34
no-default-features = false
45
targets = []
56

pulse-metrics/src/batch.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ struct PerBatchLockedData<T> {
6060
struct Stats {
6161
dropped_bytes: IntCounter,
6262
queued_bytes: IntGauge,
63+
total_batches: IntCounter,
64+
timeout_batches: IntCounter,
6365
}
6466

6567
//
@@ -99,6 +101,8 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
99101
let stats = Stats {
100102
dropped_bytes: scope.counter("dropped_bytes"),
101103
queued_bytes: scope.gauge("queued_bytes"),
104+
total_batches: scope.counter("total_batches"),
105+
timeout_batches: scope.counter("timeout_batches"),
102106
};
103107

104108
let batch_builder = Arc::new(Self {
@@ -151,6 +155,7 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
151155
&cloned_batch_builder.notify_on_data,
152156
batch,
153157
size,
158+
true,
154159
);
155160
}
156161

@@ -255,6 +260,7 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
255260
&self.notify_on_data,
256261
pending_batch,
257262
finished_size,
263+
false,
258264
);
259265
} else if locked_pending_data.batch_fill_wait_task.is_none() {
260266
// If there is no fill task, start one.
@@ -286,6 +292,7 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
286292
&cloned_self.notify_on_data,
287293
pending_batch,
288294
size,
295+
true,
289296
);
290297
}));
291298
}
@@ -299,8 +306,13 @@ impl<I: Send + Sync + 'static, B: Batch<I> + Send + Sync + 'static> BatchBuilder
299306
notify_on_data: &Notify,
300307
pending_batch: B,
301308
size: usize,
309+
timeout_batch: bool,
302310
) {
303311
Self::inc_total_size(locked_data, stats, size);
312+
stats.total_batches.inc();
313+
if timeout_batch {
314+
stats.timeout_batches.inc();
315+
}
304316

305317
// In order to avoid spurious wakeups, we keep track of whether there are any waiters.
306318
// TODO(mattklein123): Due to the use of multiple batch builders in the Lyft specific config

pulse-metrics/src/pipeline/processor/internode/convert_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,6 @@ fn parsed_metric_roundtrip_metric(metric: Metric) -> anyhow::Result<()> {
164164
if m.eq(&m2) {
165165
Ok(())
166166
} else {
167-
Err(anyhow::anyhow!("m = {:?}, m2 = {:?}", m, m2))
167+
Err(anyhow::anyhow!("m = {m:?}, m2 = {m2:?}"))
168168
}
169169
}

pulse-metrics/src/protos/metric_test.rs

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -129,22 +129,14 @@ fn metrics_roundtrip_write_request(input: Vec<ParsedMetric>) -> anyhow::Result<(
129129
);
130130
assert!(errors.is_empty());
131131
if input.len() != output.len() {
132-
bail!(
133-
"mismatched lengths: input {:?}, output: {:?}",
134-
input,
135-
output
136-
);
132+
bail!("mismatched lengths: input {input:?}, output: {output:?}");
137133
}
138134
for input_metric in input.clone() {
139135
let matching = output
140136
.iter()
141137
.find(|output_metric| &input_metric.metric == *output_metric);
142138
if Some(&input_metric.metric.clone()) != matching {
143-
bail!(
144-
"input missing from output: input {:?}, output: {:?}",
145-
input_metric,
146-
output
147-
);
139+
bail!("input missing from output: input {input_metric:?}, output: {output:?}");
148140
}
149141
}
150142
for output_metric in output {
@@ -153,11 +145,7 @@ fn metrics_roundtrip_write_request(input: Vec<ParsedMetric>) -> anyhow::Result<(
153145
.find(|input_metric| input_metric.metric == output_metric)
154146
.map(|metric| &metric.metric);
155147
if Some(&output_metric.clone()) != matching {
156-
bail!(
157-
"output missing from input: output {:?}, input: {:?}",
158-
output_metric,
159-
input
160-
);
148+
bail!("output missing from input: output {output_metric:?}, input: {input:?}");
161149
}
162150
}
163151
Ok(())

pulse-protobuf/src/protos/opentelemetry/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt
77

88
// This file is generated by rust-protobuf 4.0.0-alpha.0. Do not edit
9-
// .proto file is parsed by protoc 32.1
9+
// .proto file is parsed by protoc 33.0
1010
// @generated
1111

1212
// https://github.com/rust-lang/rust-clippy/issues/702

pulse-protobuf/src/protos/opentelemetry/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt
77

88
// This file is generated by rust-protobuf 4.0.0-alpha.0. Do not edit
9-
// .proto file is parsed by protoc 32.1
9+
// .proto file is parsed by protoc 33.0
1010
// @generated
1111

1212
// https://github.com/rust-lang/rust-clippy/issues/702

pulse-protobuf/src/protos/opentelemetry/metrics_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt
77

88
// This file is generated by rust-protobuf 4.0.0-alpha.0. Do not edit
9-
// .proto file is parsed by protoc 32.1
9+
// .proto file is parsed by protoc 33.0
1010
// @generated
1111

1212
// https://github.com/rust-lang/rust-clippy/issues/702

pulse-protobuf/src/protos/opentelemetry/resource.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
// https://polyformproject.org/wp-content/uploads/2020/06/PolyForm-Shield-1.0.0.txt
77

88
// This file is generated by rust-protobuf 4.0.0-alpha.0. Do not edit
9-
// .proto file is parsed by protoc 32.1
9+
// .proto file is parsed by protoc 33.0
1010
// @generated
1111

1212
// https://github.com/rust-lang/rust-clippy/issues/702

0 commit comments

Comments
 (0)