Skip to content

Commit

Permalink
KAFKA-17002: Integrated partition leader epoch for Persister APIs (KI…
Browse files Browse the repository at this point in the history
…P-932) (#16842)

The PR integrates leader epoch for partition while invoking Persister APIs. The write RPC is retried once on leader epoch failure.

Reviewers: Abhinav Dixit <[email protected]>, Andrew Schofield <[email protected]>, Jun Rao <[email protected]>, David Arthur <[email protected]>
  • Loading branch information
apoorvmittal10 authored Oct 30, 2024
1 parent fb65dfe commit ff116df
Show file tree
Hide file tree
Showing 10 changed files with 819 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void onComplete() {
shareFetchData.future().complete(result);
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
shareFetchData.future().completeExceptionally(e);
sharePartitionManager.handleFetchException(shareFetchData.groupId(), topicPartitionData.keySet(), shareFetchData.future(), e);
} finally {
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/kafka/server/share/ShareFetchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package kafka.server.share;

import kafka.cluster.Partition;
import kafka.server.ReplicaManager;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.FileRecords;
Expand Down Expand Up @@ -128,4 +131,13 @@ static long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, Replic
Optional.empty(), true).timestampAndOffsetOpt();
return timestampAndOffset.isEmpty() ? (long) 0 : timestampAndOffset.get().offset;
}

static int leaderEpoch(ReplicaManager replicaManager, TopicPartition tp) {
Partition partition = replicaManager.getPartitionOrException(tp);
if (!partition.isLeader()) {
log.debug("The broker is not the leader for topic partition: {}-{}", tp.topic(), tp.partition());
throw new NotLeaderOrFollowerException();
}
return partition.getLeaderEpoch();
}
}
66 changes: 59 additions & 7 deletions core/src/main/java/kafka/server/share/SharePartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ShareFetchResponseData.AcquiredRecords;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
Expand Down Expand Up @@ -103,7 +105,11 @@ enum SharePartitionState {
/**
* The share partition failed to initialize with persisted state.
*/
FAILED
FAILED,
/**
* The share partition is fenced and cannot be used.
*/
FENCED
}

/**
Expand Down Expand Up @@ -181,6 +187,11 @@ public static RecordState forId(byte id) {
*/
private final TopicIdPartition topicIdPartition;

/**
* The leader epoch is used to track the partition epoch.
*/
private final int leaderEpoch;

/**
* The in-flight record is used to track the state of a record that has been fetched from the
* leader. The state of the record is used to determine if the record should be re-fetched or if it
Expand Down Expand Up @@ -280,6 +291,7 @@ public static RecordState forId(byte id) {
SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightMessages,
int maxDeliveryCount,
int defaultRecordLockDurationMs,
Expand All @@ -288,9 +300,28 @@ public static RecordState forId(byte id) {
Persister persister,
ReplicaManager replicaManager,
GroupConfigManager groupConfigManager
) {
this(groupId, topicIdPartition, leaderEpoch, maxInFlightMessages, maxDeliveryCount, defaultRecordLockDurationMs,
timer, time, persister, replicaManager, groupConfigManager, SharePartitionState.EMPTY);
}

SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
int leaderEpoch,
int maxInFlightMessages,
int maxDeliveryCount,
int defaultRecordLockDurationMs,
Timer timer,
Time time,
Persister persister,
ReplicaManager replicaManager,
GroupConfigManager groupConfigManager,
SharePartitionState sharePartitionState
) {
this.groupId = groupId;
this.topicIdPartition = topicIdPartition;
this.leaderEpoch = leaderEpoch;
this.maxInFlightMessages = maxInFlightMessages;
this.maxDeliveryCount = maxDeliveryCount;
this.cachedState = new ConcurrentSkipListMap<>();
Expand All @@ -301,7 +332,7 @@ public static RecordState forId(byte id) {
this.timer = timer;
this.time = time;
this.persister = persister;
this.partitionState = SharePartitionState.EMPTY;
this.partitionState = sharePartitionState;
this.replicaManager = replicaManager;
this.groupConfigManager = groupConfigManager;
}
Expand Down Expand Up @@ -341,7 +372,7 @@ public CompletableFuture<Void> maybeInitialize() {
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), 0)))))
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), leaderEpoch)))))
.build())
.build()
).whenComplete((result, exception) -> {
Expand Down Expand Up @@ -520,13 +551,14 @@ public long nextFetchOffset() {
* @param fetchPartitionData The fetched records for the share partition.
* @return The acquired records for the share partition.
*/
@SuppressWarnings("cyclomaticcomplexity") // Consider refactoring to avoid suppression
public ShareAcquiredRecords acquire(
String memberId,
int maxFetchRecords,
FetchPartitionData fetchPartitionData
) {
log.trace("Received acquire request for share partition: {}-{} memberId: {}", groupId, topicIdPartition, memberId);
if (maxFetchRecords <= 0) {
if (stateNotActive() || maxFetchRecords <= 0) {
// Nothing to acquire.
return ShareAcquiredRecords.empty();
}
Expand Down Expand Up @@ -1040,7 +1072,7 @@ boolean canAcquireRecords() {
* @return A boolean which indicates whether the fetch lock is acquired.
*/
boolean maybeAcquireFetchLock() {
if (partitionState() != SharePartitionState.ACTIVE) {
if (stateNotActive()) {
return false;
}
return fetchLock.compareAndSet(false, true);
Expand All @@ -1053,6 +1085,22 @@ void releaseFetchLock() {
fetchLock.set(false);
}

/**
* Marks the share partition as fenced.
*/
void markFenced() {
lock.writeLock().lock();
try {
partitionState = SharePartitionState.FENCED;
} finally {
lock.writeLock().unlock();
}
}

private boolean stateNotActive() {
return partitionState() != SharePartitionState.ACTIVE;
}

private void completeInitializationWithException(CompletableFuture<Void> future, Throwable exception) {
lock.writeLock().lock();
try {
Expand All @@ -1075,6 +1123,9 @@ private void maybeCompleteInitialization(CompletableFuture<Void> future) {
case INITIALIZING:
future.completeExceptionally(new LeaderNotAvailableException(String.format("Share partition is already initializing %s-%s", groupId, topicIdPartition)));
return;
case FENCED:
future.completeExceptionally(new FencedStateEpochException(String.format("Share partition is fenced %s-%s", groupId, topicIdPartition)));
return;
case EMPTY:
// Do not complete the future as the share partition is not yet initialized.
break;
Expand Down Expand Up @@ -1743,7 +1794,7 @@ CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatc
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
topicIdPartition.partition(), stateEpoch, startOffset, 0, stateBatches))))
topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches))))
).build()).build())
.whenComplete((result, exception) -> {
if (exception != null) {
Expand Down Expand Up @@ -1792,8 +1843,9 @@ private KafkaException fetchPersisterError(short errorCode, String errorMessage)
case COORDINATOR_LOAD_IN_PROGRESS:
return new CoordinatorNotAvailableException(errorMessage);
case GROUP_ID_NOT_FOUND:
return new GroupIdNotFoundException(errorMessage);
case UNKNOWN_TOPIC_OR_PARTITION:
return new InvalidRequestException(errorMessage);
return new UnknownTopicOrPartitionException(errorMessage);
case FENCED_STATE_EPOCH:
return new FencedStateEpochException(errorMessage);
case FENCED_LEADER_EPOCH:
Expand Down
Loading

0 comments on commit ff116df

Please sign in to comment.