Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add histogram to metrics container #33043

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,16 @@ message MonitoringInfoSpecs {
}
]
}];

USER_HISTOGRAM = 23 [(monitoring_info_spec) = {
urn: "beam:metric:user:histogram_int64:v1",
type: "beam:metrics:histogram_int64:v1",
required_labels: ["PTRANSFORM", "NAMESPACE", "NAME"],
annotations: [{
key: "description",
value: "URN utilized to report histogram metric."
}]
}];
}
}

Expand Down Expand Up @@ -593,6 +603,12 @@ message MonitoringInfoTypeUrns {
BOUNDED_TRIE_TYPE = 12 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:bounded_trie:v1"];

// Represents a histogram.
//
// Encoding: DataflowHistogram proto
HISTOGRAM = 13 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:metrics:histogram_int64:v1"];

// General monitored state information which contains structured information
// which does not fit into a typical metric format. See MonitoringTableData
// for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.StringSetResult;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -44,18 +45,21 @@ public class DefaultMetricResults extends MetricResults {
private final Iterable<MetricResult<GaugeResult>> gauges;
private final Iterable<MetricResult<StringSetResult>> stringSets;
private final Iterable<MetricResult<BoundedTrieResult>> boundedTries;
private final Iterable<MetricResult<HistogramData>> histograms;

public DefaultMetricResults(
Iterable<MetricResult<Long>> counters,
Iterable<MetricResult<DistributionResult>> distributions,
Iterable<MetricResult<GaugeResult>> gauges,
Iterable<MetricResult<StringSetResult>> stringSets,
Iterable<MetricResult<BoundedTrieResult>> boundedTries) {
Iterable<MetricResult<BoundedTrieResult>> boundedTries,
Iterable<MetricResult<HistogramData>> histograms) {
this.counters = counters;
this.distributions = distributions;
this.gauges = gauges;
this.stringSets = stringSets;
this.boundedTries = boundedTries;
this.histograms = histograms;
}

@Override
Expand All @@ -68,6 +72,8 @@ public MetricQueryResults queryMetrics(@Nullable MetricsFilter filter) {
Iterables.filter(
stringSets, stringSets -> MetricFiltering.matches(filter, stringSets.getKey())),
Iterables.filter(
boundedTries, boundedTries -> MetricFiltering.matches(filter, boundedTries.getKey())));
boundedTries, boundedTries -> MetricFiltering.matches(filter, boundedTries.getKey())),
Iterables.filter(
histograms, histogram -> MetricFiltering.matches(filter, histogram.getKey())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ public void update(HistogramCell other) {
dirty.afterModification();
}

@Override
public void update(HistogramData data) {
this.value.update(data);
dirty.afterModification();
}

// TODO(https://github.com/apache/beam/issues/20853): Update this function to allow incrementing
// the infinite buckets as well.
// and remove the incTopBucketCount and incBotBucketCount methods.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;

/** Representation of multiple metric updates. */
Expand All @@ -35,6 +36,7 @@ public abstract class MetricUpdates {
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList());

/**
Expand Down Expand Up @@ -69,15 +71,24 @@ public static <T> MetricUpdate<T> create(MetricKey key, T update) {

public abstract Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates();

/** All the histogram updates. */
public abstract Iterable<MetricUpdate<HistogramData>> histogramsUpdates();

/** Create a new {@link MetricUpdates} bundle. */
public static MetricUpdates create(
Iterable<MetricUpdate<Long>> counterUpdates,
Iterable<MetricUpdate<DistributionData>> distributionUpdates,
Iterable<MetricUpdate<GaugeData>> gaugeUpdates,
Iterable<MetricUpdate<StringSetData>> stringSetUpdates,
Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates) {
Iterable<MetricUpdate<BoundedTrieData>> boundedTrieUpdates,
Iterable<MetricUpdate<HistogramData>> histogramsUpdates) {
return new AutoValue_MetricUpdates(
counterUpdates, distributionUpdates, gaugeUpdates, stringSetUpdates, boundedTrieUpdates);
counterUpdates,
distributionUpdates,
gaugeUpdates,
stringSetUpdates,
boundedTrieUpdates,
histogramsUpdates);
}

/** Returns true if there are no updates in this MetricUpdates object. */
Expand All @@ -86,6 +97,7 @@ public boolean isEmpty() {
&& Iterables.isEmpty(distributionUpdates())
&& Iterables.isEmpty(gaugeUpdates())
&& Iterables.isEmpty(stringSetUpdates())
&& Iterables.isEmpty(boundedTrieUpdates());
&& Iterables.isEmpty(boundedTrieUpdates())
&& Iterables.isEmpty(histogramsUpdates());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,21 @@

import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.BOUNDED_TRIE_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.DISTRIBUTION_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SET_STRING_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoConstants.TypeUrns.SUM_INT64_TYPE;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeBoundedTrie;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Histogram;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeStringSet;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeBoundedTrie;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Histogram;
import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeStringSet;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

Expand All @@ -47,6 +50,7 @@
import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
import org.apache.beam.runners.core.metrics.MetricUpdates.MetricUpdate;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.Metric;
import org.apache.beam.sdk.metrics.MetricKey;
import org.apache.beam.sdk.metrics.MetricName;
Expand Down Expand Up @@ -240,6 +244,10 @@ public BoundedTrieCell getBoundedTrie(MetricName metricName) {
return boundedTries.tryGet(metricName);
}

public MetricsMap<KV<MetricName, HistogramData.BucketType>, HistogramCell> getHistogram() {
return histograms;
}

private <UpdateT, CellT extends MetricCell<UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
Expand All @@ -253,6 +261,22 @@ ImmutableList<MetricUpdate<UpdateT>> extractUpdates(MetricsMap<MetricName, CellT
return updates.build();
}

// Needs a separate update since its constructor is different
private <UpdateT, CellT extends MetricCell<UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractHistogramUpdates(
MetricsMap<KV<MetricName, HistogramData.BucketType>, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
cells.forEach(
(key, value) -> {
if (value.getDirty().beforeCommit()) {
updates.add(
MetricUpdate.create(
MetricKey.create(stepName, key.getKey()), value.getCumulative()));
}
});
return updates.build();
}

/**
* Return the cumulative values for any metrics that have changed since the last time updates were
* committed.
Expand All @@ -263,7 +287,8 @@ public MetricUpdates getUpdates() {
extractUpdates(distributions),
extractUpdates(gauges),
extractUpdates(stringSets),
extractUpdates(boundedTries));
extractUpdates(boundedTries),
extractHistogramUpdates(histograms));
}

/** @return The MonitoringInfo metadata from the metric. */
Expand Down Expand Up @@ -296,6 +321,20 @@ public MetricUpdates getUpdates() {
return builder;
}

/**
* @param metricUpdate
* @return The MonitoringInfo generated from the histogram metricUpdate.
*/
private @Nullable MonitoringInfo histogramUpdateToMonitoringInfo(
MetricUpdate<HistogramData> metricUpdate) {
SimpleMonitoringInfoBuilder builder = histogramToMonitoringMetadata(metricUpdate.getKey());
if (builder == null) {
return null;
}
builder.setInt64HistogramValue(metricUpdate.getUpdate());
return builder.build();
}

/** @return The MonitoringInfo metadata from the counter metric. */
private @Nullable SimpleMonitoringInfoBuilder counterToMonitoringMetadata(MetricKey metricKey) {
return metricToMonitoringMetadata(
Expand Down Expand Up @@ -376,6 +415,14 @@ public MetricUpdates getUpdates() {
MonitoringInfoConstants.Urns.USER_BOUNDED_TRIE);
}

/** @return The MonitoringInfo metadata from the histogram metric. */
private @Nullable SimpleMonitoringInfoBuilder histogramToMonitoringMetadata(MetricKey metricKey) {
return metricToMonitoringMetadata(
metricKey,
MonitoringInfoConstants.TypeUrns.HISTOGRAM_TYPE,
MonitoringInfoConstants.Urns.USER_HISTOGRAM);
}

/**
* @param metricUpdate
* @return The MonitoringInfo generated from the string set metricUpdate.
Expand Down Expand Up @@ -444,6 +491,14 @@ public Iterable<MonitoringInfo> getMonitoringInfos() {
monitoringInfos.add(mi);
}
}

for (MetricUpdate<HistogramData> metricUpdate : metricUpdates.histogramsUpdates()) {
MonitoringInfo mi = histogramUpdateToMonitoringInfo(metricUpdate);
if (mi != null) {
monitoringInfos.add(mi);
}
}

return monitoringInfos;
}

Expand Down Expand Up @@ -496,6 +551,16 @@ public Map<String, ByteString> getMonitoringData(ShortIdMap shortIds) {
}
}
});
histograms.forEach(
(metricName, histogramCell) -> {
if (histogramCell.getDirty().beforeCommit()) {
String shortId =
getShortId(metricName.getKey(), this::histogramToMonitoringMetadata, shortIds);
if (shortId != null) {
builder.put(shortId, encodeInt64Histogram(histogramCell.getCumulative()));
}
}
});
return builder.build();
}

Expand Down Expand Up @@ -532,6 +597,10 @@ public void commitUpdates() {
gauges.forEachValue(gauge -> gauge.getDirty().afterCommit());
stringSets.forEachValue(sSets -> sSets.getDirty().afterCommit());
boundedTries.forEachValue(bTrie -> bTrie.getDirty().afterCommit());
histograms.forEachValue(
histogram -> {
histogram.getDirty().afterCommit();
});
}

private <UserT extends Metric, UpdateT, CellT extends MetricCell<UpdateT>>
Expand All @@ -545,6 +614,18 @@ ImmutableList<MetricUpdate<UpdateT>> extractCumulatives(MetricsMap<MetricName, C
return updates.build();
}

private <UserT extends Metric, UpdateT, CellT extends MetricCell<UpdateT>>
ImmutableList<MetricUpdate<UpdateT>> extractHistogramCumulatives(
MetricsMap<KV<MetricName, HistogramData.BucketType>, CellT> cells) {
ImmutableList.Builder<MetricUpdate<UpdateT>> updates = ImmutableList.builder();
cells.forEach(
(key, value) -> {
UpdateT update = checkNotNull(value.getCumulative());
updates.add(MetricUpdate.create(MetricKey.create(stepName, key.getKey()), update));
});
return updates.build();
}

/**
* Return the {@link MetricUpdates} representing the cumulative values of all metrics in this
* container.
Expand All @@ -555,7 +636,8 @@ public MetricUpdates getCumulative() {
extractCumulatives(distributions),
extractCumulatives(gauges),
extractCumulatives(stringSets),
extractCumulatives(boundedTries));
extractCumulatives(boundedTries),
extractHistogramCumulatives(histograms));
}

/** Update values of this {@link MetricsContainerImpl} by merging the value of another cell. */
Expand All @@ -577,7 +659,6 @@ private void updateForSumInt64Type(MonitoringInfo monitoringInfo) {
private void updateForDistributionInt64Type(MonitoringInfo monitoringInfo) {
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
Distribution distribution = getDistribution(metricName);

DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
distribution.update(data.sum(), data.count(), data.min(), data.max());
}
Expand All @@ -600,6 +681,14 @@ private void updateForBoundedTrieType(MonitoringInfo monitoringInfo) {
boundedTrie.update(decodeBoundedTrie(monitoringInfo.getPayload()));
}

private void updateForHistogramInt64(MonitoringInfo monitoringInfo) {
MetricName metricName = MonitoringInfoMetricName.of(monitoringInfo);
HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 17);
Histogram histogram = getHistogram(metricName, buckets);
HistogramData data = decodeInt64Histogram(monitoringInfo.getPayload());
histogram.update(data);
}

/** Update values of this {@link MetricsContainerImpl} by reading from {@code monitoringInfos}. */
public void update(Iterable<MonitoringInfo> monitoringInfos) {
for (MonitoringInfo monitoringInfo : monitoringInfos) {
Expand Down Expand Up @@ -628,6 +717,9 @@ public void update(Iterable<MonitoringInfo> monitoringInfos) {
updateForBoundedTrieType(monitoringInfo);
break;

case HISTOGRAM_TYPE:
updateForHistogramInt64(monitoringInfo); // use type, and not urn info
break;
default:
LOG.warn("Unsupported metric type {}", monitoringInfo.getType());
}
Expand Down Expand Up @@ -677,14 +769,16 @@ public boolean equals(@Nullable Object object) {
&& Objects.equals(distributions, metricsContainerImpl.distributions)
&& Objects.equals(gauges, metricsContainerImpl.gauges)
&& Objects.equals(stringSets, metricsContainerImpl.stringSets)
&& Objects.equals(boundedTries, metricsContainerImpl.boundedTries);
&& Objects.equals(boundedTries, metricsContainerImpl.boundedTries)
&& Objects.equals(histograms, metricsContainerImpl.histograms);
}
return false;
}

@Override
public int hashCode() {
return Objects.hash(stepName, counters, distributions, gauges, stringSets, boundedTries);
return Objects.hash(
stepName, counters, distributions, gauges, stringSets, boundedTries, histograms);
}

/**
Expand Down Expand Up @@ -816,6 +910,7 @@ public static MetricsContainerImpl deltaContainer(
deltaValueCell.incTopBucketCount(
currValue.getTopBucketCount() - prevValue.getTopBucketCount());
}

for (Map.Entry<MetricName, StringSetCell> cell : curr.stringSets.entries()) {
// Simply take the most recent value for stringSets, no need to count deltas.
deltaContainer.stringSets.get(cell.getKey()).update(cell.getValue().getCumulative());
Expand Down
Loading
Loading