Skip to content

Commit

Permalink
Kafka source offset-based deduplication.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomstepp committed Jan 24, 2025
1 parent 3cb1440 commit 8578e89
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.io.Serializable;
import java.util.List;
import java.util.Optional;
Expand All @@ -42,6 +44,8 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {
@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private KafkaCheckpointMark() {} // for Avro

private static final long OFFSET_DEDUP_PARTITIONS_PER_SPLIT = 1;

public KafkaCheckpointMark(
List<PartitionMark> partitions, Optional<KafkaUnboundedReader<?, ?>> reader) {
this.partitions = partitions;
Expand All @@ -66,6 +70,23 @@ public String toString() {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
}

@Override
public byte[] getOffsetLimit() {
if (!reader.isPresent()) {
throw new RuntimeException(
"KafkaCheckpointMark reader is not present while calling getOffsetLimit().");
}
if (!reader.get().offsetBasedDeduplicationSupported()) {
throw new RuntimeException(
"Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
}

// KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
checkState(partitions.size() == OFFSET_DEDUP_PARTITIONS_PER_SPLIT);
PartitionMark partition = partitions.get(/* index= */ 0);
return KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(partition.getNextOffset());
}

/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
* partition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ public static <K, V> Read<K, V> read() {
.setRedistributed(false)
.setAllowDuplicates(false)
.setRedistributeNumKeys(0)
.setOffsetDeduplication(false)
.build();
}

Expand Down Expand Up @@ -717,6 +718,9 @@ public abstract static class Read<K, V>
@Pure
public abstract int getRedistributeNumKeys();

@Pure
public abstract boolean isOffsetDeduplication();

@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();

Expand Down Expand Up @@ -782,6 +786,8 @@ abstract Builder<K, V> setConsumerFactoryFn(

abstract Builder<K, V> setRedistributeNumKeys(int redistributeNumKeys);

abstract Builder<K, V> setOffsetDeduplication(boolean offsetDeduplication);

abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);

Expand Down Expand Up @@ -892,6 +898,10 @@ static <K, V> void setupExternalBuilder(
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
}
// TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates.
if (config.offsetDeduplication != null) {
builder.setOffsetDeduplication(config.offsetDeduplication);
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -959,6 +969,7 @@ public static class Configuration {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
private Boolean offsetDeduplication;

public void setConsumerConfig(Map<String, String> consumerConfig) {
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}

public void setOffsetDeduplication(Boolean offsetDeduplication) {
this.offsetDeduplication = offsetDeduplication;
}
}
}

Expand Down Expand Up @@ -1086,6 +1101,10 @@ public Read<K, V> withRedistributeNumKeys(int redistributeNumKeys) {
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}

public Read<K, V> withOffsetDeduplication(boolean offsetDeduplication) {
return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
}

/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,12 @@ Object getDefaultValue() {
return false;
}
},
;
OFFSET_DEDUPLICATION(LEGACY) {
@Override
Object getDefaultValue() {
return false;
}
};

private final @NonNull ImmutableSet<KafkaIOReadImplementation> supportedImplementations;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -142,4 +144,17 @@ void update(double quantity) {
return avg;
}
}

static final class OffsetBasedDeduplication {
private static final ByteBuffer offsetBuffer = ByteBuffer.allocate(Long.BYTES);

static byte[] encodeOffset(long offset) {
offsetBuffer.putLong(/* index= */ 0, offset);
return offsetBuffer.array();
}

static byte[] getUniqueId(String topic, int partition, long offset) {
return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ public boolean advance() throws IOException {
curTimestamp =
pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
curRecord = record;
if (this.offsetBasedDeduplicationSupported) {
curOffset = KafkaIOUtils.OffsetBasedDeduplication.encodeOffset(offset);
curId =
KafkaIOUtils.OffsetBasedDeduplication.getUniqueId(
rawRecord.topic(), rawRecord.partition(), rawRecord.offset());
}

int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
Expand Down Expand Up @@ -299,6 +305,28 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
return curTimestamp;
}

@Override
public byte[] getCurrentRecordId() throws NoSuchElementException {
if (!this.offsetBasedDeduplicationSupported) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curId == null) {
throw new NoSuchElementException("KafkaUnboundedReader's curId is null.");
}
return curId;
}

@Override
public byte[] getCurrentRecordOffset() throws NoSuchElementException {
if (!this.offsetBasedDeduplicationSupported) {
throw new RuntimeException("UnboundedSource must enable offset-based deduplication.");
}
if (curOffset == null) {
throw new NoSuchElementException("KafkaUnboundedReader's curOffset is null.");
}
return curOffset;
}

@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
Expand All @@ -314,6 +342,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}

public boolean offsetBasedDeduplicationSupported() {
return this.offsetBasedDeduplicationSupported;
}

////////////////////////////////////////////////////////////////////////////////////////////////

private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
Expand All @@ -336,6 +368,10 @@ public long getSplitBacklogBytes() {
private @Nullable Instant curTimestamp = null;
private Iterator<PartitionState<K, V>> curBatch = Collections.emptyIterator();

private final boolean offsetBasedDeduplicationSupported;
private byte[] curOffset = new byte[0];
private byte[] curId = new byte[0];

private @Nullable Deserializer<K> keyDeserializerInstance = null;
private @Nullable Deserializer<V> valueDeserializerInstance = null;

Expand Down Expand Up @@ -507,6 +543,7 @@ Instant updateAndGetWatermark() {
KafkaUnboundedSource<K, V> source, @Nullable KafkaCheckpointMark checkpointMark) {
this.source = source;
this.name = "Reader-" + source.getId();
this.offsetBasedDeduplicationSupported = source.offsetBasedDeduplicationSupported();

List<TopicPartition> partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,20 @@ public List<KafkaUnboundedSource<K, V>> split(int desiredNumSplits, PipelineOpti
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");

int numSplits = Math.min(desiredNumSplits, partitions.size());
// XXX make all splits have the same # of partitions
while (partitions.size() % numSplits > 0) {
++numSplits;
int numSplits;
if (offsetBasedDeduplicationSupported()) {
// Enforce 1:1 split to partition ratio for offset deduplication.
numSplits = partitions.size();
LOG.info(
"Offset-based deduplication is enabled for KafkaUnboundedSource. "
+ "Forcing the number of splits to equal the number of total partitions: {}.",
numSplits);
} else {
numSplits = Math.min(desiredNumSplits, partitions.size());
// Make all splits have the same # of partitions.
while (partitions.size() % numSplits > 0) {
++numSplits;
}
}
List<List<TopicPartition>> assignments = new ArrayList<>(numSplits);

Expand Down Expand Up @@ -177,6 +187,11 @@ public boolean requiresDeduping() {
return false;
}

@Override
public boolean offsetBasedDeduplicationSupported() {
return spec.isOffsetDeduplication();
}

@Override
public Coder<KafkaRecord<K, V>> getOutputCoder() {
Coder<K> keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public void testConstructKafkaRead() throws Exception {
Field.of("consumer_polling_timeout", FieldType.INT64),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
Field.of("allow_duplicates", FieldType.BOOLEAN)))
Field.of("allow_duplicates", FieldType.BOOLEAN),
Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
Expand All @@ -123,6 +124,7 @@ public void testConstructKafkaRead() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
.withFieldValue("offset_deduplication", false)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down Expand Up @@ -247,7 +249,8 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
Field.of("timestamp_policy", FieldType.STRING),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
Field.of("allow_duplicates", FieldType.BOOLEAN)))
Field.of("allow_duplicates", FieldType.BOOLEAN),
Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
Expand All @@ -258,6 +261,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
.withFieldValue("offset_deduplication", false)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl
.addBooleanField("redistribute")
.addBooleanField("allows_duplicates")
.addNullableInt32Field("redistribute_num_keys")
.addBooleanField("offset_deduplication")
.addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration())
.addByteArrayField("timestamp_policy_factory")
.addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES)
Expand Down Expand Up @@ -221,6 +222,7 @@ public Row toConfigRow(Read<?, ?> transform) {
fieldValues.put("redistribute", transform.isRedistributed());
fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys());
fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
fieldValues.put("offset_deduplication", transform.isOffsetDeduplication());
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}

Expand Down Expand Up @@ -349,6 +351,10 @@ public Row toConfigRow(Read<?, ?> transform) {
}
}
}
Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
if (offsetDeduplication != null) {
transform = transform.withOffsetDeduplication(offsetDeduplication);
}
Duration maxReadTime = configRow.getValue("max_read_time");
if (maxReadTime != null) {
transform =
Expand Down

0 comments on commit 8578e89

Please sign in to comment.