Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add histogram parsing in runner v2 #34017

Merged
merged 2 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,61 @@ message BoundedTrie {
repeated string singleton = 3;
}

// The message type used for encoding Histogram Data
message HistogramValue {
// Number of values recorded in this histogram.
optional int64 count = 1;

// Describes the bucket boundaries used in the histogram.
optional BucketOptions bucket_options = 2;

// The number of values in each bucket of the histogram, as described in
// `bucket_options`. `bucket_counts` should contain N values, where N is the
// number of buckets specified in `bucket_options`. If `bucket_counts` has
// fewer than N values, the remaining values are assumed to be 0.
repeated int64 bucket_counts = 3;

// `BucketOptions` describes the bucket boundaries used in the histogram.
message BucketOptions {
// Linear buckets with the following boundaries for indices in 0 to n-1.
// - i in [0, n-1]: [start + (i)*width, start + (i+1)*width)
message Linear {
// Must be greater than 0.
//
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: `bucket_count` would cause confusion with
// `bucket_counts` field --)
optional int32 number_of_buckets = 1;
// Distance between bucket boundaries. Must be greater than 0.
optional double width = 2;
// Lower bound of the first bucket.
optional double start = 3;
}

// Exponential buckets where the growth factor between buckets is
// `2**(2**-scale)`. e.g. for `scale=1` growth factor is
// `2**(2**(-1))=sqrt(2)`. `n` buckets will have the following boundaries.
// - 0th: [0, gf)
// - i in [1, n-1]: [gf^(i), gf^(i+1))
message Base2Exponent {
// Must be greater than 0.
//
// (-- api-linter: core::0140::prepositions=disabled
// aip.dev/not-precedent: `bucket_count` would cause confusion with
// `bucket_counts` field --)
optional int32 number_of_buckets = 1;
// Must be between -3 and 3. This forces the growth factor of the bucket
// boundaries to be between `2^(1/8)` and `256`.
optional int32 scale = 2;
}
oneof bucket_type {
// Bucket boundaries grow linearly.
Linear linear = 1;
// Bucket boundaries grow exponentially.
Base2Exponent exponential = 2;
}
}
}

// General monitored state information which contains structured information
// which does not fit into a typical metric format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import java.io.InputStream;
import java.util.Set;
import org.apache.beam.model.pipeline.v1.MetricsApi.BoundedTrie;
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
Expand Down Expand Up @@ -179,4 +181,18 @@ public static double decodeDoubleCounter(ByteString payload) {
throw new RuntimeException(e);
}
}

/** Encodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static ByteString encodeInt64Histogram(HistogramData inputHistogram) {
return inputHistogram.toProto().toByteString();
}

/** Decodes to {@link MonitoringInfoConstants.TypeUrns#PER_WORKER_HISTOGRAM}. */
public static HistogramData decodeInt64Histogram(ByteString payload) {
try {
return new HistogramData(HistogramValue.parseFrom(payload));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,44 @@
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeBoundedTrie;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleCounter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeDoubleDistribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static org.junit.Assert.assertEquals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import org.apache.beam.runners.core.metrics.BoundedTrieData.BoundedTrieNode;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.sdk.util.HistogramData.HistogramParsingException;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for {@link MonitoringInfoEncodings}. */
@RunWith(JUnit4.class)
public class MonitoringInfoEncodingsTest {
@Rule
public ExpectedLogs monitoringInfoCodingsExpectedLogs =
ExpectedLogs.none(MonitoringInfoEncodings.class);

@Rule public ExpectedException thrown = ExpectedException.none();

@Test
public void testInt64DistributionEncoding() {
DistributionData data = DistributionData.create(1L, 2L, 3L, 4L);
Expand Down Expand Up @@ -143,4 +156,36 @@ public void testDoubleCounterEncoding() {
assertEquals(ByteString.copyFrom(new byte[] {0x3f, (byte) 0xf0, 0, 0, 0, 0, 0, 0}), payload);
assertEquals(1.0, decodeDoubleCounter(payload), 0.001);
}

@Test
public void testHistgramInt64EncodingLinearHist() {
HistogramData.BucketType buckets = HistogramData.LinearBuckets.of(0, 5, 5);

HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(5, 10, 15, 20);
ByteString payload = encodeInt64Histogram(inputHistogram);

assertEquals(inputHistogram, decodeInt64Histogram(payload));
}

@Test
public void testHistgramInt64EncodingExpHist() {
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 10);
HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(2, 4, 8, 16, 32);
ByteString payload = encodeInt64Histogram(inputHistogram);
assertEquals(inputHistogram, decodeInt64Histogram(payload));
}

@Test
public void testHistgramInt64EncodingUnsupportedBucket() {
thrown.expect(HistogramParsingException.class);
thrown.expectMessage("Unable to encode Int64 Histogram, bucket is not recognized");

HistogramData.BucketType buckets = HistogramData.UnsupportedBuckets.of();

HistogramData inputHistogram = new HistogramData(buckets);
inputHistogram.record(2, 4, 8, 16, 32);
encodeInt64Histogram(inputHistogram);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
import java.util.Arrays;
import java.util.Objects;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue;
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions;
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions.Base2Exponent;
import org.apache.beam.model.pipeline.v1.MetricsApi.HistogramValue.BucketOptions.Linear;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.DoubleMath;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.math.IntMath;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -74,6 +80,43 @@ public HistogramData(BucketType bucketType) {
this.sumOfSquaredDeviations = 0;
}

/**
* Create a histogram from HistogramValue proto.
*
* @param histogramProto HistogramValue proto used to populate stats for the histogram.
*/
public HistogramData(HistogramValue histogramProto) {
int numBuckets;
if (histogramProto.getBucketOptions().hasLinear()) {
System.out.println("xxx its linear");
double start = histogramProto.getBucketOptions().getLinear().getStart();
double width = histogramProto.getBucketOptions().getLinear().getWidth();
numBuckets = histogramProto.getBucketOptions().getLinear().getNumberOfBuckets();
this.bucketType = LinearBuckets.of(start, width, numBuckets);
this.buckets = new long[bucketType.getNumBuckets()];

int idx = 0;
for (long val : histogramProto.getBucketCountsList()) {
this.buckets[idx] = val;
this.numBoundedBucketRecords += val;
idx++;
}
} else {
System.out.println("xxx its exp");
// Assume it's a exponential histogram if its not linear
int scale = histogramProto.getBucketOptions().getExponential().getScale();
numBuckets = histogramProto.getBucketOptions().getExponential().getNumberOfBuckets();
this.bucketType = ExponentialBuckets.of(scale, numBuckets);
this.buckets = new long[bucketType.getNumBuckets()];
int idx = 0;
for (long val : histogramProto.getBucketCountsList()) {
this.buckets[idx] = val;
this.numBoundedBucketRecords += val;
idx++;
}
}
}

public BucketType getBucketType() {
return this.bucketType;
}
Expand Down Expand Up @@ -207,6 +250,10 @@ public synchronized HistogramData getAndReset() {
return other;
}

public synchronized long[] getBucketCount() {
return buckets;
}

public synchronized void record(double value) {
double rangeTo = bucketType.getRangeTo();
double rangeFrom = bucketType.getRangeFrom();
Expand Down Expand Up @@ -240,6 +287,64 @@ private synchronized void updateStatistics(double value) {
sumOfSquaredDeviations += (value - mean) * (value - oldMean);
}

public static class HistogramParsingException extends RuntimeException {
public HistogramParsingException(String message) {
super(message);
}
}

/** Converts this {@link HistogramData} to its proto {@link HistogramValue}. */
public synchronized HistogramValue toProto() {
HistogramValue.Builder builder = HistogramValue.newBuilder();
// try {
int numberOfBuckets = this.getBucketType().getNumBuckets();

if (this.getBucketType() instanceof HistogramData.LinearBuckets) {
System.out.println("xxx linear buckets");
HistogramData.LinearBuckets buckets = (HistogramData.LinearBuckets) this.getBucketType();
Linear.Builder linearBuilder = Linear.newBuilder();
linearBuilder.setNumberOfBuckets(numberOfBuckets);
linearBuilder.setWidth(buckets.getWidth());
linearBuilder.setStart(buckets.getStart());
Linear linearOptions = linearBuilder.build();

BucketOptions.Builder bucketBuilder = BucketOptions.newBuilder();
bucketBuilder.setLinear(linearOptions);
builder.setBucketOptions(bucketBuilder.build());

} else if (this.getBucketType() instanceof HistogramData.ExponentialBuckets) {
System.out.println("xxx exp buckets");
HistogramData.ExponentialBuckets buckets =
(HistogramData.ExponentialBuckets) this.getBucketType();

Base2Exponent.Builder base2ExpBuilder = Base2Exponent.newBuilder();
base2ExpBuilder.setNumberOfBuckets(numberOfBuckets);
base2ExpBuilder.setScale(buckets.getScale());
Base2Exponent exponentialOptions = base2ExpBuilder.build();

BucketOptions.Builder bucketBuilder = BucketOptions.newBuilder();
bucketBuilder.setExponential(exponentialOptions);
builder.setBucketOptions(bucketBuilder.build());
} else {
throw new HistogramParsingException(
"Unable to encode Int64 Histogram, bucket is not recognized");
}

builder.setCount(this.getTotalCount());

for (long val : this.getBucketCount()) {
builder.addBucketCounts(val);
}
System.out.println("xxxx " + builder.toString());
return builder.build();
}

// /** Creates a {@link HistogramData} instance from its proto {@link HistogramValue}. */
// public static HistogramData fromProto(HistogramValue proto) {
// HistgramValue value = new HistgramValue();
// return new HistogramValue(proto);
// }

/**
* Increment the {@code numTopRecords} and update {@code topRecordsSum} when a new overflow value
* is recorded. This function should only be called when a Histogram is recording a value greater
Expand Down Expand Up @@ -573,6 +678,42 @@ public double getRangeTo() {
// Note: equals() and hashCode() are implemented by the AutoValue.
}

/** Used for testing unsupported Bucket formats. */
@AutoValue
@Internal
@VisibleForTesting
public abstract static class UnsupportedBuckets implements BucketType {

public static UnsupportedBuckets of() {
return new AutoValue_HistogramData_UnsupportedBuckets(0);
}

@Override
public int getBucketIndex(double value) {
return 0;
}

@Override
public double getBucketSize(int index) {
return 0;
}

@Override
public double getAccumulatedBucketSize(int index) {
return 0;
}

@Override
public double getRangeFrom() {
return 0;
}

@Override
public double getRangeTo() {
return 0;
}
}

@Override
public synchronized boolean equals(@Nullable Object object) {
if (object instanceof HistogramData) {
Expand Down
Loading