Skip to content

Commit ae3c5de

Browse files
authored
KAFKA-18013: Add support for duration based offset reset strategy to Kafka Consumer (#17972)
Update AutoOffsetResetStrategy.java to support duration based reset strategy Update OffsetFetcher related classes and unit tests Reviewers: Andrew Schofield <[email protected]>, Lianet Magrans <[email protected]>
1 parent 6237325 commit ae3c5de

11 files changed

+252
-83
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ public class ConsumerConfig extends AbstractConfig {
173173
"(e.g. because that data has been deleted): " +
174174
"<ul><li>earliest: automatically reset the offset to the earliest offset" +
175175
"<li>latest: automatically reset the offset to the latest offset</li>" +
176+
"<li>by_duration:<duration>: automatically reset the offset to a configured <duration> from the current timestamp. <duration> must be specified in ISO8601 format (PnDTnHnMn.nS). " +
177+
"Negative duration is not allowed.</li>" +
176178
"<li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li>" +
177179
"<li>anything else: throw exception to the consumer.</li></ul>" +
178180
"<p>Note that altering partition numbers while setting this config to latest may cause message delivery loss since " +

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

+10
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
6262
private final SubscriptionState subscriptions;
6363
private final Map<TopicPartition, Long> beginningOffsets;
6464
private final Map<TopicPartition, Long> endOffsets;
65+
private final Map<TopicPartition, Long> durationResetOffsets;
6566
private final Map<TopicPartition, OffsetAndMetadata> committed;
6667
private final Queue<Runnable> pollTasks;
6768
private final Set<TopicPartition> paused;
@@ -104,6 +105,7 @@ private MockConsumer(AutoOffsetResetStrategy offsetResetStrategy) {
104105
this.closed = false;
105106
this.beginningOffsets = new HashMap<>();
106107
this.endOffsets = new HashMap<>();
108+
this.durationResetOffsets = new HashMap<>();
107109
this.pollTasks = new LinkedList<>();
108110
this.pollException = null;
109111
this.wakeup = new AtomicBoolean(false);
@@ -433,6 +435,10 @@ public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOff
433435
endOffsets.putAll(newOffsets);
434436
}
435437

438+
public synchronized void updateDurationOffsets(final Map<TopicPartition, Long> newOffsets) {
439+
durationResetOffsets.putAll(newOffsets);
440+
}
441+
436442
public void disableTelemetry() {
437443
telemetryDisabled = true;
438444
}
@@ -610,6 +616,10 @@ private void resetOffsetPosition(TopicPartition tp) {
610616
offset = endOffsets.get(tp);
611617
if (offset == null)
612618
throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
619+
} else if (strategy.type() == AutoOffsetResetStrategy.StrategyType.BY_DURATION) {
620+
offset = durationResetOffsets.get(tp);
621+
if (offset == null)
622+
throw new IllegalStateException("MockConsumer didn't have duration offset specified, but tried to seek to timestamp");
613623
} else {
614624
throw new NoOffsetForPartitionException(tp);
615625
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AutoOffsetResetStrategy.java

+83-23
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,19 @@
1818

1919
import org.apache.kafka.common.config.ConfigDef;
2020
import org.apache.kafka.common.config.ConfigException;
21+
import org.apache.kafka.common.requests.ListOffsetsRequest;
2122
import org.apache.kafka.common.utils.Utils;
2223

24+
import java.time.Duration;
25+
import java.time.Instant;
2326
import java.util.Arrays;
2427
import java.util.Locale;
2528
import java.util.Objects;
29+
import java.util.Optional;
2630

2731
public class AutoOffsetResetStrategy {
28-
private enum StrategyType {
29-
LATEST, EARLIEST, NONE;
32+
public enum StrategyType {
33+
LATEST, EARLIEST, NONE, BY_DURATION;
3034

3135
@Override
3236
public String toString() {
@@ -39,30 +43,65 @@ public String toString() {
3943
public static final AutoOffsetResetStrategy NONE = new AutoOffsetResetStrategy(StrategyType.NONE);
4044

4145
private final StrategyType type;
46+
private final Optional<Duration> duration;
4247

4348
private AutoOffsetResetStrategy(StrategyType type) {
4449
this.type = type;
50+
this.duration = Optional.empty();
4551
}
4652

47-
public static boolean isValid(String offsetStrategy) {
48-
return Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy);
53+
private AutoOffsetResetStrategy(Duration duration) {
54+
this.type = StrategyType.BY_DURATION;
55+
this.duration = Optional.of(duration);
4956
}
5057

58+
/**
59+
* Returns the AutoOffsetResetStrategy from the given string.
60+
*/
5161
public static AutoOffsetResetStrategy fromString(String offsetStrategy) {
52-
if (offsetStrategy == null || !isValid(offsetStrategy)) {
53-
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
62+
if (offsetStrategy == null) {
63+
throw new IllegalArgumentException("Auto offset reset strategy is null");
5464
}
55-
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
56-
switch (type) {
57-
case EARLIEST:
58-
return EARLIEST;
59-
case LATEST:
60-
return LATEST;
61-
case NONE:
62-
return NONE;
63-
default:
64-
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
65+
66+
if (StrategyType.BY_DURATION.toString().equals(offsetStrategy)) {
67+
throw new IllegalArgumentException("<:duration> part is missing in by_duration auto offset reset strategy.");
68+
}
69+
70+
if (Arrays.asList(Utils.enumOptions(StrategyType.class)).contains(offsetStrategy)) {
71+
StrategyType type = StrategyType.valueOf(offsetStrategy.toUpperCase(Locale.ROOT));
72+
switch (type) {
73+
case EARLIEST:
74+
return EARLIEST;
75+
case LATEST:
76+
return LATEST;
77+
case NONE:
78+
return NONE;
79+
default:
80+
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
81+
}
82+
}
83+
84+
if (offsetStrategy.startsWith(StrategyType.BY_DURATION + ":")) {
85+
String isoDuration = offsetStrategy.substring(StrategyType.BY_DURATION.toString().length() + 1);
86+
try {
87+
Duration duration = Duration.parse(isoDuration);
88+
if (duration.isNegative()) {
89+
throw new IllegalArgumentException("Negative duration is not supported in by_duration offset reset strategy.");
90+
}
91+
return new AutoOffsetResetStrategy(duration);
92+
} catch (Exception e) {
93+
throw new IllegalArgumentException("Unable to parse duration string in by_duration offset reset strategy.", e);
94+
}
6595
}
96+
97+
throw new IllegalArgumentException("Unknown auto offset reset strategy: " + offsetStrategy);
98+
}
99+
100+
/**
101+
* Returns the offset reset strategy type.
102+
*/
103+
public StrategyType type() {
104+
return type;
66105
}
67106

68107
/**
@@ -72,33 +111,54 @@ public String name() {
72111
return type.toString();
73112
}
74113

114+
/**
115+
* Return the timestamp to be used for the ListOffsetsRequest.
116+
* @return the timestamp for the OffsetResetStrategy,
117+
* if the strategy is EARLIEST or LATEST or duration is provided
118+
* else return Optional.empty()
119+
*/
120+
public Optional<Long> timestamp() {
121+
if (type == StrategyType.EARLIEST)
122+
return Optional.of(ListOffsetsRequest.EARLIEST_TIMESTAMP);
123+
else if (type == StrategyType.LATEST)
124+
return Optional.of(ListOffsetsRequest.LATEST_TIMESTAMP);
125+
else if (type == StrategyType.BY_DURATION && duration.isPresent()) {
126+
Instant now = Instant.now();
127+
return Optional.of(now.minus(duration.get()).toEpochMilli());
128+
} else
129+
return Optional.empty();
130+
}
131+
75132
@Override
76133
public boolean equals(Object o) {
77134
if (this == o) return true;
78135
if (o == null || getClass() != o.getClass()) return false;
79136
AutoOffsetResetStrategy that = (AutoOffsetResetStrategy) o;
80-
return Objects.equals(type, that.type);
137+
return type == that.type && Objects.equals(duration, that.duration);
81138
}
82139

83140
@Override
84141
public int hashCode() {
85-
return Objects.hashCode(type);
142+
return Objects.hash(type, duration);
86143
}
87144

88145
@Override
89146
public String toString() {
90147
return "AutoOffsetResetStrategy{" +
91-
"type='" + type + '\'' +
148+
"type=" + type +
149+
(duration.map(value -> ", duration=" + value).orElse("")) +
92150
'}';
93151
}
94152

95153
public static class Validator implements ConfigDef.Validator {
96154
@Override
97155
public void ensureValid(String name, Object value) {
98-
String strategy = (String) value;
99-
if (!AutoOffsetResetStrategy.isValid(strategy)) {
100-
throw new ConfigException(name, value, "Invalid value " + strategy + " for configuration " +
101-
name + ": the value must be either 'earliest', 'latest', or 'none'.");
156+
String offsetStrategy = (String) value;
157+
try {
158+
fromString(offsetStrategy);
159+
} catch (Exception e) {
160+
throw new ConfigException(name, value, "Invalid value `" + offsetStrategy + "` for configuration " +
161+
name + ". The value must be either 'earliest', 'latest', 'none' or of the format 'by_duration:<PnDTnHnMn.nS.>'.");
102162
}
103163
}
104164
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,13 @@ public OffsetFetcher(LogContext logContext,
101101
* and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
102102
*/
103103
public void resetPositionsIfNeeded() {
104-
Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
104+
Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap =
105+
offsetFetcherUtils.getOffsetResetStrategyForPartitions();
105106

106-
if (offsetResetTimestamps.isEmpty())
107+
if (partitionAutoOffsetResetStrategyMap.isEmpty())
107108
return;
108109

109-
resetPositionsAsync(offsetResetTimestamps);
110+
resetPositionsAsync(partitionAutoOffsetResetStrategyMap);
110111
}
111112

112113
/**
@@ -209,7 +210,9 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
209210
}
210211
}
211212

212-
private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimestamps) {
213+
private void resetPositionsAsync(Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
214+
Map<TopicPartition, Long> partitionResetTimestamps = partitionAutoOffsetResetStrategyMap.entrySet().stream()
215+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
213216
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
214217
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
215218
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
@@ -221,7 +224,7 @@ private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimesta
221224
future.addListener(new RequestFutureListener<>() {
222225
@Override
223226
public void onSuccess(ListOffsetResult result) {
224-
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps, result);
227+
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result, partitionAutoOffsetResetStrategyMap);
225228
}
226229

227230
@Override

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java

+15-24
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.kafka.common.message.ListOffsetsResponseData;
3333
import org.apache.kafka.common.protocol.ApiKeys;
3434
import org.apache.kafka.common.protocol.Errors;
35-
import org.apache.kafka.common.requests.ListOffsetsRequest;
3635
import org.apache.kafka.common.requests.ListOffsetsResponse;
3736
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
3837
import org.apache.kafka.common.utils.LogContext;
@@ -224,19 +223,22 @@ void validatePositionsOnMetadataChange() {
224223
}
225224
}
226225

227-
Map<TopicPartition, Long> getOffsetResetTimestamp() {
226+
/**
227+
* get OffsetResetStrategy for all assigned partitions
228+
*/
229+
Map<TopicPartition, AutoOffsetResetStrategy> getOffsetResetStrategyForPartitions() {
228230
// Raise exception from previous offset fetch if there is one
229231
RuntimeException exception = cachedResetPositionsException.getAndSet(null);
230232
if (exception != null)
231233
throw exception;
232234

233235
Set<TopicPartition> partitions = subscriptionState.partitionsNeedingReset(time.milliseconds());
234-
final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
236+
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap = new HashMap<>();
235237
for (final TopicPartition partition : partitions) {
236-
offsetResetTimestamps.put(partition, offsetResetStrategyTimestamp(partition));
238+
partitionAutoOffsetResetStrategyMap.put(partition, offsetResetStrategyWithValidTimestamp(partition));
237239
}
238240

239-
return offsetResetTimestamps;
241+
return partitionAutoOffsetResetStrategyMap;
240242
}
241243

242244
static Map<TopicPartition, OffsetAndTimestamp> buildListOffsetsResult(
@@ -283,14 +285,13 @@ static Map<TopicPartition, OffsetAndTimestampInternal> buildOffsetsForTimeIntern
283285
return offsetsResults;
284286
}
285287

286-
private long offsetResetStrategyTimestamp(final TopicPartition partition) {
288+
private AutoOffsetResetStrategy offsetResetStrategyWithValidTimestamp(final TopicPartition partition) {
287289
AutoOffsetResetStrategy strategy = subscriptionState.resetStrategy(partition);
288-
if (strategy == AutoOffsetResetStrategy.EARLIEST)
289-
return ListOffsetsRequest.EARLIEST_TIMESTAMP;
290-
else if (strategy == AutoOffsetResetStrategy.LATEST)
291-
return ListOffsetsRequest.LATEST_TIMESTAMP;
292-
else
290+
if (strategy.timestamp().isPresent()) {
291+
return strategy;
292+
} else {
293293
throw new NoOffsetForPartitionException(partition);
294+
}
294295
}
295296

296297
static Set<String> topicsForPartitions(Collection<TopicPartition> partitions) {
@@ -319,18 +320,9 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
319320
}
320321
}
321322

322-
static AutoOffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
323-
if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
324-
return AutoOffsetResetStrategy.EARLIEST;
325-
else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
326-
return AutoOffsetResetStrategy.LATEST;
327-
else
328-
return null;
329-
}
330-
331323
void onSuccessfulResponseForResettingPositions(
332-
final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> resetTimestamps,
333-
final ListOffsetResult result) {
324+
final ListOffsetResult result,
325+
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
334326
if (!result.partitionsToRetry.isEmpty()) {
335327
subscriptionState.requestFailed(result.partitionsToRetry, time.milliseconds() + retryBackoffMs);
336328
metadata.requestUpdate(false);
@@ -339,10 +331,9 @@ void onSuccessfulResponseForResettingPositions(
339331
for (Map.Entry<TopicPartition, ListOffsetData> fetchedOffset : result.fetchedOffsets.entrySet()) {
340332
TopicPartition partition = fetchedOffset.getKey();
341333
ListOffsetData offsetData = fetchedOffset.getValue();
342-
ListOffsetsRequestData.ListOffsetsPartition requestedReset = resetTimestamps.get(partition);
343334
resetPositionIfNeeded(
344335
partition,
345-
timestampToOffsetResetStrategy(requestedReset.timestamp()),
336+
partitionAutoOffsetResetStrategyMap.get(partition),
346337
offsetData);
347338
}
348339
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java

+10-8
Original file line numberDiff line numberDiff line change
@@ -472,20 +472,20 @@ private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> partitions)
472472
* this function (ex. {@link org.apache.kafka.common.errors.TopicAuthorizationException})
473473
*/
474474
CompletableFuture<Void> resetPositionsIfNeeded() {
475-
Map<TopicPartition, Long> offsetResetTimestamps;
475+
Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap;
476476

477477
try {
478-
offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();
478+
partitionAutoOffsetResetStrategyMap = offsetFetcherUtils.getOffsetResetStrategyForPartitions();
479479
} catch (Exception e) {
480480
CompletableFuture<Void> result = new CompletableFuture<>();
481481
result.completeExceptionally(e);
482482
return result;
483483
}
484484

485-
if (offsetResetTimestamps.isEmpty())
485+
if (partitionAutoOffsetResetStrategyMap.isEmpty())
486486
return CompletableFuture.completedFuture(null);
487487

488-
return sendListOffsetsRequestsAndResetPositions(offsetResetTimestamps);
488+
return sendListOffsetsRequestsAndResetPositions(partitionAutoOffsetResetStrategyMap);
489489
}
490490

491491
/**
@@ -652,12 +652,14 @@ private CompletableFuture<ListOffsetResult> buildListOffsetRequestToNode(
652652
* partitions. Use the retrieved offsets to reset positions in the subscription state.
653653
* This also adds the request to the list of unsentRequests.
654654
*
655-
* @param timestampsToSearch the mapping between partitions and target time
655+
* @param partitionAutoOffsetResetStrategyMap the mapping between partitions and AutoOffsetResetStrategy
656656
* @return A {@link CompletableFuture} which completes when the requests are
657657
* complete.
658658
*/
659659
private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(
660-
final Map<TopicPartition, Long> timestampsToSearch) {
660+
final Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
661+
Map<TopicPartition, Long> timestampsToSearch = partitionAutoOffsetResetStrategyMap.entrySet().stream()
662+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
661663
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> timestampsToSearchByNode =
662664
groupListOffsetRequests(timestampsToSearch, Optional.empty());
663665

@@ -677,8 +679,8 @@ private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(
677679

678680
partialResult.whenComplete((result, error) -> {
679681
if (error == null) {
680-
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(resetTimestamps,
681-
result);
682+
offsetFetcherUtils.onSuccessfulResponseForResettingPositions(result,
683+
partitionAutoOffsetResetStrategyMap);
682684
} else {
683685
RuntimeException e;
684686
if (error instanceof RuntimeException) {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, FetchPosition position
441441
log.debug("Skipping reset of partition {} since it is no longer assigned", tp);
442442
} else if (!state.awaitingReset()) {
443443
log.debug("Skipping reset of partition {} since reset is no longer needed", tp);
444-
} else if (requestedResetStrategy != state.resetStrategy) {
444+
} else if (requestedResetStrategy != null && !requestedResetStrategy.equals(state.resetStrategy)) {
445445
log.debug("Skipping reset of partition {} since an alternative reset has been requested", tp);
446446
} else {
447447
log.info("Resetting offset for partition {} to position {}.", tp, position);

0 commit comments

Comments
 (0)