Skip to content

Commit 98f661a

Browse files
lovromazgonhariso
andauthored
Aggregate metrics (#18)
* add env var documentation * export aggregated metrics in the end * ignore linter warning * explain results better --------- Co-authored-by: Haris Osmanagić <[email protected]>
1 parent ceb907e commit 98f661a

File tree

5 files changed

+234
-29
lines changed

5 files changed

+234
-29
lines changed

README.md

+44-12
Original file line numberDiff line numberDiff line change
@@ -40,26 +40,37 @@ curl https://raw.githubusercontent.com/ConduitIO/benchi/main/install.sh | sh
4040

4141
### Running Benchmarks
4242

43-
The repository includes an [example benchmark](./example). Use the following
44-
command to run the benchmark:
43+
Run benchi and point `-config` to a benchmark [configuration file](#configuration).
44+
The repository includes an [example benchmark](./example), which can be run
45+
using the following command:
4546

4647
```sh
4748
benchi -config ./example/bench-kafka-kafka/bench.yml
4849
```
4950

50-
Running the benchmark will store the results in the `results` folder. Inside the
51-
results folder, you will find a folder named after the current date and time
52-
(e.g. `results/20060102_150405`). This folder will contain logs and results:
51+
### Results
52+
53+
Running the benchmark will store the results in a folder named after the current
54+
date and time inside of `results` (e.g. `results/20060102_150405`). You can
55+
adjust the output folder using the `-out` flag.
56+
57+
The output folder will contain two files:
58+
59+
- `benchi.log`: Log file containing the full output of benchi.
60+
- `aggregated-results.csv`: Aggregated metric results from all collectors and
61+
all tests. The results are aggregated using a
62+
[trimmed mean](https://en.wikipedia.org/wiki/Truncated_mean), where the top
63+
and bottom 5% of the results are removed. Benchi also disregards any 0 values
64+
from the start and end of the test, to accomodate for warm-up and cool-down
65+
periods.
66+
67+
The output folder will also contain one folder per benchmark run (i.e. per test
68+
and tool combination). Each benchmark run folder will contain:
5369

54-
- `benchi.log`: Log file containing the output of the benchmark run.
5570
- `infra.log`: Log file containing the output of the infrastructure docker containers.
5671
- `tools.log`: Log file containing the output of the tools docker containers.
57-
- `conduit.csv`: Metrics collected using the [Conduit](#conduit) collector.
58-
- `docker.csv`: Metrics collected using the [Docker](#docker) collector.
59-
- `kafka.csv`: Metrics collected using the [Kafka](#kafka) collector.
60-
61-
For details about the example benchmark, see the
62-
[example README](./example/README.md).
72+
- `COLLECTOR.csv`: Raw metrics collected using the corresponding
73+
[metrics collector](#collectors).
6374

6475
### Command-Line Flags
6576

@@ -90,6 +101,27 @@ networks:
90101
external: true
91102
```
92103
104+
### Environment variables
105+
106+
Benchi runs all Docker Compose commands using the same environment variables as
107+
the current shell. This means that you can use environment variables to pass
108+
values to your services.
109+
110+
For instance, having the following Docker Compose configuration:
111+
112+
```yaml
113+
services:
114+
my-service:
115+
environment:
116+
- MY_ENV_VAR=${MY_ENV_VAR}
117+
```
118+
119+
You can inject the environment variable by running Benchi as follows:
120+
121+
```sh
122+
MY_ENV_VAR=my-value benchi -config ./my-benchmark.yml
123+
```
124+
93125
### Configuration
94126

95127
Benchi uses a YAML configuration file to define the benchmark in combination

cmd/benchi/main.go

+46-5
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,11 @@ type mainModel struct {
142142
config config.Config
143143
resultsDir string
144144
dockerClient client.APIClient
145+
testRunners benchi.TestRunners
145146

146147
tests []testModel
147148
currentTestIndex int
149+
executedTests []int // Indexes of tests that have been executed
148150

149151
// Log models for the CLI.
150152
infoLogModel internal.LogModel
@@ -168,6 +170,10 @@ type mainModelMsgNextTest struct {
168170
testIndex int
169171
}
170172

173+
type mainModelMsgExportedAggregatedMetrics struct {
174+
err error
175+
}
176+
171177
func newMainModel(infoReader, errorReader io.Reader) mainModel {
172178
ctx, ctxCancel := context.WithCancel(context.Background())
173179
cleanupCtx, cleanupCtxCancel := context.WithCancel(context.Background())
@@ -294,6 +300,23 @@ func (mainModel) runTestCmd(index int) tea.Cmd {
294300
}
295301
}
296302

303+
func (m mainModel) exportAggregatedMetricsCmd() tea.Cmd {
304+
return func() tea.Msg {
305+
if len(m.executedTests) == 0 {
306+
return mainModelMsgExportedAggregatedMetrics{}
307+
}
308+
309+
// Collect executed test runners
310+
testRunners := make(benchi.TestRunners, 0, len(m.executedTests))
311+
for _, i := range m.executedTests {
312+
testRunners = append(testRunners, m.testRunners[i])
313+
}
314+
315+
err := testRunners.ExportAggregatedMetrics(m.resultsDir)
316+
return mainModelMsgExportedAggregatedMetrics{err: err}
317+
}
318+
}
319+
297320
func (m mainModel) quitCmd() tea.Cmd {
298321
return func() tea.Msg {
299322
if m.dockerClient != nil {
@@ -321,6 +344,7 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
321344
m.config = msg.config
322345
m.resultsDir = msg.resultsDir
323346
m.dockerClient = msg.dockerClient
347+
m.testRunners = msg.testRunners
324348

325349
tests := make([]testModel, len(msg.testRunners))
326350
for i, tr := range msg.testRunners {
@@ -347,7 +371,7 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
347371
return m, m.runTestCmd(0)
348372
case mainModelMsgNextTest:
349373
if msg.testIndex >= len(m.tests) {
350-
return m, m.quitCmd()
374+
return m, m.exportAggregatedMetricsCmd()
351375
}
352376
m.currentTestIndex = msg.testIndex
353377
return m, m.tests[m.currentTestIndex].Init()
@@ -358,8 +382,21 @@ func (m mainModel) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
358382
// Main context is cancelled, skip to the end.
359383
nextIndex = len(m.tests)
360384
}
385+
if msg.err != nil {
386+
m.lastError = errors.Join(m.lastError, msg.err)
387+
} else {
388+
// Only store the index of tests that have been executed successfully.
389+
m.executedTests = append(m.executedTests, m.currentTestIndex)
390+
}
361391
return m, m.runTestCmd(nextIndex)
362392

393+
case mainModelMsgExportedAggregatedMetrics:
394+
if msg.err != nil {
395+
slog.Error("Failed to export aggregated metrics", "error", msg.err)
396+
m.lastError = errors.Join(m.lastError, msg.err)
397+
}
398+
return m, m.quitCmd()
399+
363400
case tea.KeyMsg:
364401
if msg.String() == "ctrl+c" {
365402
switch {
@@ -446,7 +483,9 @@ type testModelMsgStep struct {
446483
err error
447484
}
448485

449-
type testModelMsgDone struct{}
486+
type testModelMsgDone struct {
487+
err error
488+
}
450489

451490
func newTestModel(ctx context.Context, cleanupCtx context.Context, client client.APIClient, runner *benchi.TestRunner) (testModel, error) {
452491
collectorModels := make([]internal.CollectorMonitorModel, 0, len(runner.Collectors()))
@@ -489,9 +528,11 @@ func (m testModel) stepCmd(ctx context.Context) tea.Cmd {
489528
}
490529
}
491530

492-
func (m testModel) doneCmd() tea.Cmd {
531+
func (m testModel) doneCmd(err error) tea.Cmd {
493532
return func() tea.Msg {
494-
return testModelMsgDone{}
533+
return testModelMsgDone{
534+
err: err,
535+
}
495536
}
496537
}
497538

@@ -508,7 +549,7 @@ func (m testModel) Update(msg tea.Msg) (testModel, tea.Cmd) {
508549

509550
switch {
510551
case m.currentStep == benchi.StepDone:
511-
return m, m.doneCmd()
552+
return m, m.doneCmd(errors.Join(m.errors...))
512553
case m.currentStep >= benchi.StepPreCleanup:
513554
// Cleanup steps use the cleanup context.
514555
return m, m.stepCmd(m.cleanupCtx)

example/README.md

+10-8
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@ and Docker, displaying them in the CLI. After the benchmark is complete, the
3030
metrics will be exported to CSV files in the output folder (e.g.
3131
`results/20060102_150405`).
3232

33-
Benchi will store the results in the `results` folder. Inside the results folder,
34-
you will find a folder named after the current date and time (e.g.
35-
`results/20060102_150405`). This folder will contain logs and results:
33+
The output folder will contain logs and results:
3634

3735
- `benchi.log`: Log file containing the output of the benchmark run.
38-
- `infra.log`: Log file containing the output of the infrastructure docker containers.
39-
- `tools.log`: Log file containing the output of the tools docker containers.
40-
- `conduit.csv`: Metrics collected using the [Conduit](../README.md#conduit) collector.
41-
- `docker.csv`: Metrics collected using the [Docker](../README.md##docker) collector.
42-
- `kafka.csv`: Metrics collected using the [Kafka](../README.md##kafka) collector.
36+
- `aggregated-results.csv`: Aggregated metric results from all collectors.
37+
- `kafka-to-kafka_conduit`: Folder containing the logs and metrics for the
38+
`kafka-to-kafka` test and the `conduit` tool.
39+
- `infra.log`: Log file containing the output of the infrastructure docker
40+
containers.
41+
- `tools.log`: Log file containing the output of the tools docker containers.
42+
- `conduit.csv`: Metrics collected using the [Conduit](../README.md#conduit) collector.
43+
- `docker.csv`: Metrics collected using the [Docker](../README.md##docker) collector.
44+
- `kafka.csv`: Metrics collected using the [Kafka](../README.md##kafka) collector.

metrics/collector.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,7 @@ type Collector interface {
3232
// cancelled. The function should block until the context is cancelled, an
3333
// error occurs, or the Stop function is called.
3434
Run(ctx context.Context) error
35-
// Results returns the collected metrics. If the collector is collecting
36-
// multiple metrics, the key should be the name of the metric.
35+
// Results returns the collected metrics.
3736
Results() []Results
3837
}
3938

runner.go

+133-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"log/slog"
2626
"maps"
27+
"math"
2728
"os"
2829
"path/filepath"
2930
"slices"
@@ -49,8 +50,6 @@ const (
4950
DefaultHookImage = "alpine:latest"
5051
)
5152

52-
type TestRunners []*TestRunner
53-
5453
type TestRunnerOptions struct {
5554
// ResultsDir is the directory where the test results are stored.
5655
ResultsDir string
@@ -227,6 +226,138 @@ func findContainerNames(ctx context.Context, files []string) ([]string, error) {
227226
return containers, nil
228227
}
229228

229+
type TestRunners []*TestRunner
230+
231+
func (runners TestRunners) ExportAggregatedMetrics(resultsDir string) error {
232+
path := filepath.Join(resultsDir, "aggregated-results.csv")
233+
slog.Info("Exporting aggregated metrics", "path", path)
234+
235+
headers, records := runners.aggregatedMetrics()
236+
237+
f, err := os.Create(path)
238+
if err != nil {
239+
return fmt.Errorf("error creating file: %w", err)
240+
}
241+
defer f.Close()
242+
243+
writer := csv.NewWriter(f)
244+
err = writer.Write(headers)
245+
if err != nil {
246+
return fmt.Errorf("error writing CSV header: %w", err)
247+
}
248+
249+
err = writer.WriteAll(records)
250+
if err != nil {
251+
return fmt.Errorf("error writing CSV records: %w", err)
252+
}
253+
254+
err = writer.Error()
255+
if err != nil {
256+
return fmt.Errorf("error writing CSV records: %w", err)
257+
}
258+
259+
return nil
260+
}
261+
262+
func (runners TestRunners) aggregatedMetrics() (headers []string, records [][]string) {
263+
headers = []string{"test", "tool"}
264+
265+
// colIndexes maps the collector name and metric name to the column index in
266+
// the records and headers slices.
267+
colIndexes := make(map[string]map[string]int)
268+
269+
for _, tr := range runners {
270+
recs := make([]string, len(headers))
271+
recs[0] = tr.Name()
272+
recs[1] = tr.Tool()
273+
274+
for _, c := range tr.Collectors() {
275+
indexes := colIndexes[c.Name()]
276+
if indexes == nil {
277+
indexes = make(map[string]int)
278+
colIndexes[c.Name()] = indexes
279+
}
280+
281+
for _, r := range c.Results() {
282+
idx := indexes[r.Name]
283+
if idx == 0 {
284+
idx = len(headers)
285+
indexes[r.Name] = idx
286+
headers = append(headers, r.Name)
287+
// Backfill records with empty values.
288+
for i := 0; i < len(records); i++ {
289+
records[i] = append(records[i], "")
290+
}
291+
recs = append(recs, "") //nolint:makezero // False positive.
292+
}
293+
294+
mean, ok := runners.trimmedMean(r.Samples)
295+
if ok {
296+
recs[idx] = fmt.Sprintf("%.2f", mean)
297+
}
298+
}
299+
}
300+
records = append(records, recs)
301+
}
302+
303+
return headers, records
304+
}
305+
306+
// trimmedMean calculates the trimmed mean of the samples. It trims any zeros
307+
// from the start and end of the samples, then trims any samples that are more
308+
// than 2 standard deviations from the mean. It returns the trimmed mean and a
309+
// boolean indicating if the trimmed mean was calculated successfully.
310+
func (runners TestRunners) trimmedMean(samples []metrics.Sample) (float64, bool) {
311+
if len(samples) == 0 {
312+
return 0, false
313+
}
314+
315+
// Trim any zeros from the start and end of the samples.
316+
for len(samples) > 0 && samples[0].Value == 0 {
317+
samples = samples[1:]
318+
}
319+
for len(samples) > 0 && samples[len(samples)-1].Value == 0 {
320+
samples = samples[:len(samples)-1]
321+
}
322+
323+
if len(samples) == 0 {
324+
return 0, true
325+
}
326+
327+
n := float64(len(samples)) // Number of samples as a float.
328+
329+
// Calculate mean and standard deviation
330+
var sum float64
331+
for _, s := range samples {
332+
sum += s.Value
333+
}
334+
mean := sum / n
335+
336+
var variance float64
337+
for _, s := range samples {
338+
variance += (s.Value - mean) * (s.Value - mean)
339+
}
340+
stddev := math.Sqrt(variance / n)
341+
342+
// Trim samples that are more than 2 standard deviations from the mean.
343+
var trimmed []float64
344+
lowerBound := mean - 2*stddev
345+
upperBound := mean + 2*stddev
346+
for _, s := range samples {
347+
if s.Value >= lowerBound && s.Value <= upperBound {
348+
trimmed = append(trimmed, s.Value)
349+
}
350+
}
351+
352+
// Calculate the trimmed mean.
353+
var trimmedSum float64
354+
for _, s := range trimmed {
355+
trimmedSum += s
356+
}
357+
358+
return trimmedSum / float64(len(trimmed)), true
359+
}
360+
230361
// TestRunner is a single test run for a single tool.
231362
type TestRunner struct {
232363
step Step

0 commit comments

Comments
 (0)