Skip to content

Commit

Permalink
KAFKA-17182: Consumer fetch sessions are evicted too quickly with Asy…
Browse files Browse the repository at this point in the history
…ncKafkaConsumer (#17700)

This change reduces fetch session cache evictions on the broker for AsyncKafkaConsumer by altering its logic to determine which partitions it includes in fetch requests.

Background
Consumer implementations fetch data from the cluster and temporarily buffer it in memory until the user next calls Consumer.poll(). When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request.

The ClassicKafkaConsumer performs much of its fetch logic and network I/O in the application thread. On poll(), if there is any locally-buffered data, the ClassicKafkaConsumer does not fetch any new data and simply returns the buffered data to the user from poll().

On the other hand, the AsyncKafkaConsumer consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. The AsyncKafkaConsumer also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache.

This issue is technically possible in the ClassicKafkaConsumer too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which the AsyncKafkaConsumer's background thread runs, it is ~100x more likely to happen.

Options
The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the AsyncKafkaConsumer. Among them are:

The background thread should omit buffered partitions from the fetch request as before (this is the existing behavior)
The background thread should skip the fetch request generation entirely if there are any buffered partitions
The background thread should include buffered partitions in the fetch request, but use a small “max bytes” value
The background thread should skip fetching from the nodes that have buffered partitions
Option 4 won out. The change is localized to AbstractFetch where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be.

Reviewers: Lianet Magrans <[email protected]>, Jeff Kim <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
kirktrue authored Jan 30, 2025
1 parent 9980e12 commit 6cf54c4
Show file tree
Hide file tree
Showing 3 changed files with 447 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -315,17 +316,15 @@ protected FetchRequest.Builder createFetchRequest(final Node fetchTarget,
}

/**
* Return the list of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
* Return the set of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
* but <em>excluding</em> any partitions for which we still have buffered data. The idea is that since the user
* has yet to process the data for the partition that has already been fetched, we should not go send for more data
* until the previously-fetched data has been processed.
*
* @param buffered The set of partitions we have in our buffer
* @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data
*/
private Set<TopicPartition> fetchablePartitions() {
// This is the set of partitions we have in our buffer
Set<TopicPartition> buffered = fetchBuffer.bufferedPartitions();

private Set<TopicPartition> fetchablePartitions(Set<TopicPartition> buffered) {
// This is the test that returns true if the partition is *not* buffered
Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);

Expand Down Expand Up @@ -408,22 +407,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();

for (TopicPartition partition : fetchablePartitions()) {
SubscriptionState.FetchPosition position = subscriptions.position(partition);
// This is the set of partitions that have buffered data
Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());

if (position == null)
throw new IllegalStateException("Missing position for fetchable partition " + partition);
// This is the set of partitions that do not have buffered data
Set<TopicPartition> unbuffered = fetchablePartitions(buffered);

Optional<Node> leaderOpt = position.currentLeader.leader;
if (unbuffered.isEmpty()) {
// If there are no partitions that don't already have data locally buffered, there's no need to issue
// any fetch requests at the present time.
return Collections.emptyMap();
}

if (leaderOpt.isEmpty()) {
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
metadata.requestUpdate(false);
Set<Integer> bufferedNodes = new HashSet<>();

for (TopicPartition partition : buffered) {
// It's possible that at the time of the fetcher creating new fetch requests, a partition with buffered
// data from a *previous* request is no longer assigned. So before attempting to retrieve the node
// information, check that the partition is still assigned and fetchable; an unassigned/invalid partition
// will throw an IllegalStateException in positionForPartition.
//
// Note: this check is not needed for the unbuffered partitions as the logic in
// SubscriptionState.fetchablePartitions() only includes partitions currently assigned.
if (!subscriptions.hasValidPosition(partition))
continue;
}

// Use the preferred read replica if set, otherwise the partition's leader
Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
SubscriptionState.FetchPosition position = positionForPartition(partition);
Optional<Node> nodeOpt = maybeNodeForPosition(partition, position, currentTimeMs);
nodeOpt.ifPresent(node -> bufferedNodes.add(node.id()));
}

for (TopicPartition partition : unbuffered) {
SubscriptionState.FetchPosition position = positionForPartition(partition);
Optional<Node> nodeOpt = maybeNodeForPosition(partition, position, currentTimeMs);

if (nodeOpt.isEmpty())
continue;

Node node = nodeOpt.get();

if (isUnavailable(node)) {
maybeThrowAuthFailure(node);
Expand All @@ -432,7 +453,14 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
// going to be failed anyway before being sent, so skip sending the request for now
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
} else if (nodesWithPendingFetchRequests.contains(node.id())) {
// If there's already an inflight request for this node, don't issue another request.
log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
} else if (bufferedNodes.contains(node.id())) {
// While a node has buffered data, don't fetch other partition data from it. Because the buffered
// partitions are not included in the fetch request, those partitions will be inadvertently dropped
// from the broker fetch session cache. In some cases, that could lead to the entire fetch session
// being evicted.
log.trace("Skipping fetch for partition {} because its leader node {} hosts buffered partitions", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new fetch
FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
Expand All @@ -456,6 +484,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
}

/**
* Simple utility method that returns a {@link SubscriptionState.FetchPosition position} for the partition. If
* no position exists, an {@link IllegalStateException} is thrown.
*/
private SubscriptionState.FetchPosition positionForPartition(TopicPartition partition) {
SubscriptionState.FetchPosition position = subscriptions.position(partition);

if (position == null)
throw new IllegalStateException("Missing position for fetchable partition " + partition);

return position;
}

/**
* Retrieves the node from which to fetch the partition data. If the given
* {@link SubscriptionState.FetchPosition position} does not have a current
* {@link Metadata.LeaderAndEpoch#leader leader} defined the method will return {@link Optional#empty()}.
*
* @return Three options: 1) {@link Optional#empty()} if the position's leader is empty, 2) the
* {@link #selectReadReplica(TopicPartition, Node, long) read replica, if defined}, or 3) the position's
* {@link Metadata.LeaderAndEpoch#leader leader}
*/
private Optional<Node> maybeNodeForPosition(TopicPartition partition,
SubscriptionState.FetchPosition position,
long currentTimeMs) {
Optional<Node> leaderOpt = position.currentLeader.leader;

if (leaderOpt.isEmpty()) {
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
metadata.requestUpdate(false);
return Optional.empty();
}

// Use the preferred read replica if set, otherwise the partition's leader
Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
return Optional.of(node);
}

// Visible for testing
protected FetchSessionHandler sessionHandler(int node) {
return sessionHandlers.get(node);
Expand Down
Loading

0 comments on commit 6cf54c4

Please sign in to comment.