diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java index 98624d54c2e6..99382c60ce11 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java @@ -186,8 +186,7 @@ public boolean consistentWithEquals() { @Override public boolean isRegisterByteSizeObserverCheap(IntervalWindow value) { - return instantCoder.isRegisterByteSizeObserverCheap(value.end) - && durationCoder.isRegisterByteSizeObserverCheap(new Duration(value.start, value.end)); + return true; } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index bc83687bae4e..f253d1794837 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -43,6 +43,7 @@ *

Note: This does not uniquely identify a pane, and should not be used for comparisons. */ public final class PaneInfo { + /** * Enumerates the possibilities for the timing of this pane firing related to the input and output * watermarks for its computation. @@ -322,6 +323,7 @@ public String toString() { /** A Coder for encoding PaneInfo instances. */ public static class PaneInfoCoder extends AtomicCoder { + private static final byte ELEMENT_METADATA_MASK = (byte) 0x80; private enum Encoding { @@ -385,6 +387,21 @@ public void encode(PaneInfo value, final OutputStream outStream) } } + @Override + protected long getEncodedElementByteSize(PaneInfo value) throws Exception { + Encoding encoding = chooseEncoding(value); + switch (encoding) { + case FIRST: + return 1; + case ONE_INDEX: + return 1L + VarInt.getLength(value.index); + case TWO_INDICES: + return 1L + VarInt.getLength(value.index) + VarInt.getLength(value.nonSpeculativeIndex); + default: + throw new CoderException("Unknown encoding " + encoding); + } + } + @Override public PaneInfo decode(final InputStream inStream) throws CoderException, IOException { byte keyAndTag = (byte) inStream.read(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java index 024376691b3f..93f2976eaf1c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueWithRecordId.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StructuredCoder; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.util.common.ElementByteSizeObserver; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.checkerframework.checker.nullness.qual.Nullable; @@ -40,6 +41,7 @@ */ @Internal public class ValueWithRecordId { + private final ValueT value; private final byte[] id; @@ -81,6 +83,7 @@ public int hashCode() { /** A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}. */ public static class ValueWithRecordIdCoder extends StructuredCoder> { + public static ValueWithRecordIdCoder of(Coder valueCoder) { return new ValueWithRecordIdCoder<>(valueCoder); } @@ -124,6 +127,19 @@ public void verifyDeterministic() throws NonDeterministicException { valueCoder.verifyDeterministic(); } + @Override + public boolean isRegisterByteSizeObserverCheap(ValueWithRecordId value) { + // idCoder is always cheap + return valueCoder.isRegisterByteSizeObserverCheap(value.value); + } + + @Override + public void registerByteSizeObserver( + ValueWithRecordId value, ElementByteSizeObserver observer) throws Exception { + valueCoder.registerByteSizeObserver(value.getValue(), observer); + idCoder.registerByteSizeObserver(value.getId(), observer); + } + public Coder getValueCoder() { return valueCoder; } @@ -134,6 +150,7 @@ public Coder getValueCoder() { /** {@link DoFn} to turn a {@code ValueWithRecordId} back to the value {@code T}. */ public static class StripIdsDoFn extends DoFn, T> { + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().getValue()); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java index cda8ee1ea55c..e6e904289600 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/PaneInfoTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertSame; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.testing.CoderProperties; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.junit.Test; @@ -52,6 +53,33 @@ public void testEncodingRoundTrip() throws Exception { } } + @Test + public void testByteCount() throws Exception { + Coder coder = PaneInfo.PaneInfoCoder.INSTANCE; + for (Coder.Context context : CoderProperties.ALL_CONTEXTS) { + for (Timing timing : Timing.values()) { + long onTimeIndex = timing == Timing.EARLY ? -1 : 37; + testByteCount(coder, context, PaneInfo.createPane(false, false, timing, 389, onTimeIndex)); + testByteCount(coder, context, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex)); + testByteCount(coder, context, PaneInfo.createPane(true, false, timing, 0, 0)); + testByteCount(coder, context, PaneInfo.createPane(true, true, timing, 0, 0)); + + // With metadata + testByteCount( + coder, context, PaneInfo.createPane(false, false, timing, 389, onTimeIndex, true)); + testByteCount( + coder, context, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex, true)); + testByteCount(coder, context, PaneInfo.createPane(true, false, timing, 0, 0, true)); + testByteCount(coder, context, PaneInfo.createPane(true, true, timing, 0, 0, true)); + } + } + } + + private static void testByteCount(Coder coder, Context context, PaneInfo paneInfo) + throws Exception { + CoderProperties.testByteCount(coder, context, new PaneInfo[] {paneInfo}); + } + @Test public void testEncodingRoundTripWithElementMetadata() throws Exception { Coder coder = PaneInfo.PaneInfoCoder.INSTANCE;