diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
index 2d5bd7f8a6e7..f024644504d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java
@@ -94,14 +94,13 @@ public boolean requiresDeduping() {
}
/**
- * If isOffsetDeduplication returns true, then the UnboundedSource needs to
- * provide the following:
- *
+ * If isOffsetDeduplication returns true, then the UnboundedSource needs to provide the following:
+ *
*
- * - UnboundedReader which provides offsets that are unique for each
- * element and lexicographically ordered.
- * - CheckpointMark which provides an offset greater than all elements
- * read and less than or equal to the next offset that will be read.
+ * - UnboundedReader which provides offsets that are unique for each element and
+ * lexicographically ordered.
+ *
- CheckpointMark which provides an offset greater than all elements read and less than or
+ * equal to the next offset that will be read.
*
*/
public boolean isOffsetDeduplication() {
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OrderedCode.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OrderedCode.java
similarity index 99%
rename from runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OrderedCode.java
rename to sdks/java/core/src/main/java/org/apache/beam/sdk/util/OrderedCode.java
index 89217c72ad19..72d522ac1e46 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OrderedCode.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/OrderedCode.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker;
+package org.apache.beam.sdk.util;
import java.math.RoundingMode;
import java.util.ArrayList;
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/OrderedCodeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/OrderedCodeTest.java
similarity index 99%
rename from runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/OrderedCodeTest.java
rename to sdks/java/core/src/test/java/org/apache/beam/sdk/util/OrderedCodeTest.java
index 58e2413e8875..f55496a563fa 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/OrderedCodeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/OrderedCodeTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.dataflow.worker;
+package org.apache.beam.sdk.util;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 966363e41f62..1dbe311ffeef 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.kafka;
+import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
@@ -66,6 +68,23 @@ public String toString() {
return "KafkaCheckpointMark{partitions=" + Joiner.on(",").join(partitions) + '}';
}
+ @Override
+ public byte[] getOffsetLimit() {
+ if (!reader.isPresent()) {
+ throw new RuntimeException(
+ "KafkaCheckpointMark reader is not present.");
+ }
+ if (!reader.get().offsetDeduplication()) {
+ throw new RuntimeException(
+ "Unexpected getOffsetLimit() called while KafkaUnboundedReader not configured for offset deduplication.");
+ }
+
+ // KafkaUnboundedSource.split() must produce a 1:1 partition to split ratio.
+ checkState(partitions.size() == 1);
+ PartitionMark partition = partitions.get(0);
+ return KafkaIOUtils.getOrderedCode(partition.getNextOffset());
+ }
+
/**
* A tuple to hold topic, partition, and offset that comprise the checkpoint for a single
* partition.
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index cb7b3020c66a..8d48c38ea1a3 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -610,6 +610,7 @@ public static Read read() {
.setRedistributed(false)
.setAllowDuplicates(false)
.setRedistributeNumKeys(0)
+ .setOffsetDeduplication(false)
.build();
}
@@ -717,6 +718,9 @@ public abstract static class Read
@Pure
public abstract int getRedistributeNumKeys();
+ @Pure
+ public abstract boolean isOffsetDeduplication();
+
@Pure
public abstract @Nullable Duration getWatchTopicPartitionDuration();
@@ -782,6 +786,8 @@ abstract Builder setConsumerFactoryFn(
abstract Builder setRedistributeNumKeys(int redistributeNumKeys);
+ abstract Builder setOffsetDeduplication(boolean offsetDeduplication);
+
abstract Builder setTimestampPolicyFactory(
TimestampPolicyFactory timestampPolicyFactory);
@@ -892,6 +898,10 @@ static void setupExternalBuilder(
builder.setRedistributeNumKeys(0);
builder.setAllowDuplicates(false);
}
+ // TODO(tomstepp): Auto-enable offset deduplication if: redistributed and !allowDuplicates.
+ if (config.offsetDeduplication != null) {
+ builder.setOffsetDeduplication(config.offsetDeduplication);
+ }
}
private static Coder resolveCoder(Class> deserializer) {
@@ -959,6 +969,7 @@ public static class Configuration {
private Integer redistributeNumKeys;
private Boolean redistribute;
private Boolean allowDuplicates;
+ private Boolean offsetDeduplication;
public void setConsumerConfig(Map consumerConfig) {
this.consumerConfig = consumerConfig;
@@ -1015,6 +1026,10 @@ public void setRedistribute(Boolean redistribute) {
public void setAllowDuplicates(Boolean allowDuplicates) {
this.allowDuplicates = allowDuplicates;
}
+
+ public void setOffsetDeduplication(Boolean offsetDeduplication) {
+ this.offsetDeduplication = offsetDeduplication;
+ }
}
}
@@ -1086,6 +1101,10 @@ public Read withRedistributeNumKeys(int redistributeNumKeys) {
return toBuilder().setRedistributeNumKeys(redistributeNumKeys).build();
}
+ public Read withOffsetDeduplication(boolean offsetDeduplication) {
+ return toBuilder().setOffsetDeduplication(offsetDeduplication).build();
+ }
+
/**
* Internally sets a {@link java.util.regex.Pattern} of topics to read from. All the partitions
* from each of the matching topics are read.
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
index 457e0003705e..ad6d382e5ff5 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java
@@ -137,6 +137,12 @@ Object getDefaultValue() {
return false;
}
},
+ OFFSET_DEDUPLICATION(LEGACY) {
+ @Override
+ Object getDefaultValue() {
+ return false;
+ }
+ }
;
private final @NonNull ImmutableSet supportedImplementations;
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
index 748418d16664..28f511e95035 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOUtils.java
@@ -19,10 +19,12 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.util.OrderedCode;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -142,4 +144,14 @@ void update(double quantity) {
return avg;
}
}
+
+ static byte[] getOrderedCode(long offset) {
+ OrderedCode orderedCode = new OrderedCode();
+ orderedCode.writeNumIncreasing(offset);
+ return orderedCode.getEncodedBytes();
+ }
+
+ static byte[] getUniqueId(String topic, int partition, long offset) {
+ return (topic + "-" + partition + "-" + offset).getBytes(StandardCharsets.UTF_8);
+ }
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index ab9e26b3b740..2b2c86063669 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -214,6 +214,10 @@ public boolean advance() throws IOException {
curTimestamp =
pState.timestampPolicy.getTimestampForRecord(pState.mkTimestampPolicyContext(), record);
curRecord = record;
+ if (this.offsetDeduplication) {
+ curOffset = KafkaIOUtils.getOrderedCode(offset);
+ curId = KafkaIOUtils.getUniqueId(rawRecord.topic(), rawRecord.partition(), rawRecord.offset());
+ }
int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
@@ -299,6 +303,30 @@ public Instant getCurrentTimestamp() throws NoSuchElementException {
return curTimestamp;
}
+ @Override
+ public byte[] getCurrentRecordId() throws NoSuchElementException {
+ if (curId == null) {
+ if (this.offsetDeduplication) {
+ throw new NoSuchElementException();
+ } else {
+ return new byte[0];
+ }
+ }
+ return curId;
+ }
+
+ @Override
+ public byte[] getCurrentRecordOffset() throws NoSuchElementException {
+ if (curOffset == null) {
+ if (this.offsetDeduplication) {
+ throw new NoSuchElementException();
+ } else {
+ return new byte[0];
+ }
+ }
+ return curOffset;
+ }
+
@Override
public long getSplitBacklogBytes() {
long backlogBytes = 0;
@@ -314,6 +342,10 @@ public long getSplitBacklogBytes() {
return backlogBytes;
}
+ public boolean offsetDeduplication() {
+ return offsetDeduplication;
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////
private static final Logger LOG = LoggerFactory.getLogger(KafkaUnboundedReader.class);
@@ -336,6 +368,10 @@ public long getSplitBacklogBytes() {
private @Nullable Instant curTimestamp = null;
private Iterator> curBatch = Collections.emptyIterator();
+ private final boolean offsetDeduplication;
+ private byte[] curOffset = new byte[0];
+ private byte[] curId = new byte[0];
+
private @Nullable Deserializer keyDeserializerInstance = null;
private @Nullable Deserializer valueDeserializerInstance = null;
@@ -507,6 +543,7 @@ Instant updateAndGetWatermark() {
KafkaUnboundedSource source, @Nullable KafkaCheckpointMark checkpointMark) {
this.source = source;
this.name = "Reader-" + source.getId();
+ this.offsetDeduplication = source.isOffsetDeduplication();
List partitions =
Preconditions.checkArgumentNotNull(source.getSpec().getTopicPartitions());
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
index 9685d859b0a1..98a24dbe1eb9 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java
@@ -113,10 +113,16 @@ public List> split(int desiredNumSplits, PipelineOpti
partitions.size() > 0,
"Could not find any partitions. Please check Kafka configuration and topic names");
- int numSplits = Math.min(desiredNumSplits, partitions.size());
- // XXX make all splits have the same # of partitions
- while (partitions.size() % numSplits > 0) {
- ++numSplits;
+ int numSplits;
+ if (isOffsetDeduplication()) {
+ // Enforce 1:1 split to partition ratio for offset deduplication.
+ numSplits = partitions.size();
+ } else {
+ numSplits = Math.min(desiredNumSplits, partitions.size());
+ // Make all splits have the same # of partitions.
+ while (partitions.size() % numSplits > 0) {
+ ++numSplits;
+ }
}
List> assignments = new ArrayList<>(numSplits);
@@ -177,6 +183,11 @@ public boolean requiresDeduping() {
return false;
}
+ @Override
+ public boolean isOffsetDeduplication() {
+ return spec.isOffsetDeduplication();
+ }
+
@Override
public Coder> getOutputCoder() {
Coder keyCoder = Preconditions.checkStateNotNull(spec.getKeyCoder());
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index f021789a912c..482476536577 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -111,7 +111,8 @@ public void testConstructKafkaRead() throws Exception {
Field.of("consumer_polling_timeout", FieldType.INT64),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
- Field.of("allow_duplicates", FieldType.BOOLEAN)))
+ Field.of("allow_duplicates", FieldType.BOOLEAN),
+ Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
@@ -123,6 +124,7 @@ public void testConstructKafkaRead() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
+ .withFieldValue("offset_deduplication", false)
.build());
RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
@@ -247,7 +249,8 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
Field.of("timestamp_policy", FieldType.STRING),
Field.of("redistribute_num_keys", FieldType.INT32),
Field.of("redistribute", FieldType.BOOLEAN),
- Field.of("allow_duplicates", FieldType.BOOLEAN)))
+ Field.of("allow_duplicates", FieldType.BOOLEAN),
+ Field.of("offset_deduplication", FieldType.BOOLEAN)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
@@ -258,6 +261,7 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
.withFieldValue("redistribute_num_keys", 0)
.withFieldValue("redistribute", false)
.withFieldValue("allow_duplicates", false)
+ .withFieldValue("offset_deduplication", false)
.build());
RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index 841236969d25..3d10b96c00b5 100644
--- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -101,6 +101,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl
.addBooleanField("redistribute")
.addBooleanField("allows_duplicates")
.addNullableInt32Field("redistribute_num_keys")
+ .addBooleanField("offset_deduplication")
.addNullableLogicalTypeField("watch_topic_partition_duration", new NanosDuration())
.addByteArrayField("timestamp_policy_factory")
.addNullableMapField("offset_consumer_config", FieldType.STRING, FieldType.BYTES)
@@ -221,6 +222,7 @@ public Row toConfigRow(Read, ?> transform) {
fieldValues.put("redistribute", transform.isRedistributed());
fieldValues.put("redistribute_num_keys", transform.getRedistributeNumKeys());
fieldValues.put("allows_duplicates", transform.isAllowDuplicates());
+ fieldValues.put("offset_deduplication", transform.isOffsetDeduplication());
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}
@@ -349,6 +351,10 @@ public Row toConfigRow(Read, ?> transform) {
}
}
}
+ Boolean offsetDeduplication = configRow.getValue("offset_deduplication");
+ if (offsetDeduplication != null) {
+ transform = transform.withOffsetDeduplication(offsetDeduplication);
+ }
Duration maxReadTime = configRow.getValue("max_read_time");
if (maxReadTime != null) {
transform =