Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ public boolean consistentWithEquals() {

@Override
public boolean isRegisterByteSizeObserverCheap(IntervalWindow value) {
return instantCoder.isRegisterByteSizeObserverCheap(value.end)
&& durationCoder.isRegisterByteSizeObserverCheap(new Duration(value.start, value.end));
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
* <p>Note: This does not uniquely identify a pane, and should not be used for comparisons.
*/
public final class PaneInfo {

/**
* Enumerates the possibilities for the timing of this pane firing related to the input and output
* watermarks for its computation.
Expand Down Expand Up @@ -322,6 +323,7 @@ public String toString() {

/** A Coder for encoding PaneInfo instances. */
public static class PaneInfoCoder extends AtomicCoder<PaneInfo> {

private static final byte ELEMENT_METADATA_MASK = (byte) 0x80;

private enum Encoding {
Expand Down Expand Up @@ -385,6 +387,21 @@ public void encode(PaneInfo value, final OutputStream outStream)
}
}

@Override
protected long getEncodedElementByteSize(PaneInfo value) throws Exception {
Encoding encoding = chooseEncoding(value);
switch (encoding) {
case FIRST:
return 1;
case ONE_INDEX:
return 1L + VarInt.getLength(value.index);
case TWO_INDICES:
return 1L + VarInt.getLength(value.index) + VarInt.getLength(value.nonSpeculativeIndex);
default:
throw new CoderException("Unknown encoding " + encoding);
}
}

@Override
public PaneInfo decode(final InputStream inStream) throws CoderException, IOException {
byte keyAndTag = (byte) inStream.read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -40,6 +41,7 @@
*/
@Internal
public class ValueWithRecordId<ValueT> {

private final ValueT value;
private final byte[] id;

Expand Down Expand Up @@ -81,6 +83,7 @@ public int hashCode() {
/** A {@link Coder} for {@code ValueWithRecordId}, using a wrapped value {@code Coder}. */
public static class ValueWithRecordIdCoder<ValueT>
extends StructuredCoder<ValueWithRecordId<ValueT>> {

public static <ValueT> ValueWithRecordIdCoder<ValueT> of(Coder<ValueT> valueCoder) {
return new ValueWithRecordIdCoder<>(valueCoder);
}
Expand Down Expand Up @@ -124,6 +127,19 @@ public void verifyDeterministic() throws NonDeterministicException {
valueCoder.verifyDeterministic();
}

@Override
public boolean isRegisterByteSizeObserverCheap(ValueWithRecordId<ValueT> value) {
// idCoder is always cheap
return valueCoder.isRegisterByteSizeObserverCheap(value.value);
}

@Override
public void registerByteSizeObserver(
ValueWithRecordId<ValueT> value, ElementByteSizeObserver observer) throws Exception {
valueCoder.registerByteSizeObserver(value.getValue(), observer);
idCoder.registerByteSizeObserver(value.getId(), observer);
}

public Coder<ValueT> getValueCoder() {
return valueCoder;
}
Expand All @@ -134,6 +150,7 @@ public Coder<ValueT> getValueCoder() {

/** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {

@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertSame;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
import org.junit.Test;
Expand Down Expand Up @@ -52,6 +53,33 @@ public void testEncodingRoundTrip() throws Exception {
}
}

@Test
public void testByteCount() throws Exception {
Coder<PaneInfo> coder = PaneInfo.PaneInfoCoder.INSTANCE;
for (Coder.Context context : CoderProperties.ALL_CONTEXTS) {
for (Timing timing : Timing.values()) {
long onTimeIndex = timing == Timing.EARLY ? -1 : 37;
testByteCount(coder, context, PaneInfo.createPane(false, false, timing, 389, onTimeIndex));
testByteCount(coder, context, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex));
testByteCount(coder, context, PaneInfo.createPane(true, false, timing, 0, 0));
testByteCount(coder, context, PaneInfo.createPane(true, true, timing, 0, 0));

// With metadata
testByteCount(
coder, context, PaneInfo.createPane(false, false, timing, 389, onTimeIndex, true));
testByteCount(
coder, context, PaneInfo.createPane(false, true, timing, 5077, onTimeIndex, true));
testByteCount(coder, context, PaneInfo.createPane(true, false, timing, 0, 0, true));
testByteCount(coder, context, PaneInfo.createPane(true, true, timing, 0, 0, true));
}
}
}

private static void testByteCount(Coder<PaneInfo> coder, Context context, PaneInfo paneInfo)
throws Exception {
CoderProperties.testByteCount(coder, context, new PaneInfo[] {paneInfo});
}

@Test
public void testEncodingRoundTripWithElementMetadata() throws Exception {
Coder<PaneInfo> coder = PaneInfo.PaneInfoCoder.INSTANCE;
Expand Down
Loading