Skip to content

Commit 0eba83b

Browse files
kirktruemanoj-mathivanan
authored andcommitted
KAFKA-17182: Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer (apache#17700)
This change reduces fetch session cache evictions on the broker for AsyncKafkaConsumer by altering its logic to determine which partitions it includes in fetch requests. Background Consumer implementations fetch data from the cluster and temporarily buffer it in memory until the user next calls Consumer.poll(). When a fetch request is being generated, partitions that already have buffered data are not included in the fetch request. The ClassicKafkaConsumer performs much of its fetch logic and network I/O in the application thread. On poll(), if there is any locally-buffered data, the ClassicKafkaConsumer does not fetch any new data and simply returns the buffered data to the user from poll(). On the other hand, the AsyncKafkaConsumer consumer splits its logic and network I/O between two threads, which results in a potential race condition during fetch. The AsyncKafkaConsumer also checks for buffered data on its application thread. If it finds there is none, it signals the background thread to create a fetch request. However, it's possible for the background thread to receive data from a previous fetch and buffer it before the fetch request logic starts. When that occurs, as the background thread creates a new fetch request, it skips any buffered data, which has the unintended result that those partitions get added to the fetch request's "to remove" set. This signals to the broker to remove those partitions from its internal cache. This issue is technically possible in the ClassicKafkaConsumer too, since the heartbeat thread performs network I/O in addition to the application thread. However, because of the frequency at which the AsyncKafkaConsumer's background thread runs, it is ~100x more likely to happen. Options The core decision is: what should the background thread do if it is asked to create a fetch request and it discovers there's buffered data. There were multiple proposals to address this issue in the AsyncKafkaConsumer. Among them are: The background thread should omit buffered partitions from the fetch request as before (this is the existing behavior) The background thread should skip the fetch request generation entirely if there are any buffered partitions The background thread should include buffered partitions in the fetch request, but use a small “max bytes” value The background thread should skip fetching from the nodes that have buffered partitions Option 4 won out. The change is localized to AbstractFetch where the basic idea is to skip fetch requests to a given node if that node is the leader for buffered data. By preventing a fetch request from being sent to that node, it won't have any "holes" where the buffered partitions should be. Reviewers: Lianet Magrans <[email protected]>, Jeff Kim <[email protected]>, Jun Rao <[email protected]>
1 parent 4f2da98 commit 0eba83b

File tree

3 files changed

+447
-73
lines changed

3 files changed

+447
-73
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java

Lines changed: 82 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import java.io.Closeable;
4646
import java.time.Duration;
47+
import java.util.Collections;
4748
import java.util.HashMap;
4849
import java.util.HashSet;
4950
import java.util.List;
@@ -315,17 +316,15 @@ protected FetchRequest.Builder createFetchRequest(final Node fetchTarget,
315316
}
316317

317318
/**
318-
* Return the list of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
319+
* Return the set of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
319320
* but <em>excluding</em> any partitions for which we still have buffered data. The idea is that since the user
320321
* has yet to process the data for the partition that has already been fetched, we should not go send for more data
321322
* until the previously-fetched data has been processed.
322323
*
324+
* @param buffered The set of partitions we have in our buffer
323325
* @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data
324326
*/
325-
private Set<TopicPartition> fetchablePartitions() {
326-
// This is the set of partitions we have in our buffer
327-
Set<TopicPartition> buffered = fetchBuffer.bufferedPartitions();
328-
327+
private Set<TopicPartition> fetchablePartitions(Set<TopicPartition> buffered) {
329328
// This is the test that returns true if the partition is *not* buffered
330329
Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);
331330

@@ -408,22 +407,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
408407
long currentTimeMs = time.milliseconds();
409408
Map<String, Uuid> topicIds = metadata.topicIds();
410409

411-
for (TopicPartition partition : fetchablePartitions()) {
412-
SubscriptionState.FetchPosition position = subscriptions.position(partition);
410+
// This is the set of partitions that have buffered data
411+
Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
413412

414-
if (position == null)
415-
throw new IllegalStateException("Missing position for fetchable partition " + partition);
413+
// This is the set of partitions that do not have buffered data
414+
Set<TopicPartition> unbuffered = fetchablePartitions(buffered);
416415

417-
Optional<Node> leaderOpt = position.currentLeader.leader;
416+
if (unbuffered.isEmpty()) {
417+
// If there are no partitions that don't already have data locally buffered, there's no need to issue
418+
// any fetch requests at the present time.
419+
return Collections.emptyMap();
420+
}
418421

419-
if (leaderOpt.isEmpty()) {
420-
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
421-
metadata.requestUpdate(false);
422+
Set<Integer> bufferedNodes = new HashSet<>();
423+
424+
for (TopicPartition partition : buffered) {
425+
// It's possible that at the time of the fetcher creating new fetch requests, a partition with buffered
426+
// data from a *previous* request is no longer assigned. So before attempting to retrieve the node
427+
// information, check that the partition is still assigned and fetchable; an unassigned/invalid partition
428+
// will throw an IllegalStateException in positionForPartition.
429+
//
430+
// Note: this check is not needed for the unbuffered partitions as the logic in
431+
// SubscriptionState.fetchablePartitions() only includes partitions currently assigned.
432+
if (!subscriptions.hasValidPosition(partition))
422433
continue;
423-
}
424434

425-
// Use the preferred read replica if set, otherwise the partition's leader
426-
Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
435+
SubscriptionState.FetchPosition position = positionForPartition(partition);
436+
Optional<Node> nodeOpt = maybeNodeForPosition(partition, position, currentTimeMs);
437+
nodeOpt.ifPresent(node -> bufferedNodes.add(node.id()));
438+
}
439+
440+
for (TopicPartition partition : unbuffered) {
441+
SubscriptionState.FetchPosition position = positionForPartition(partition);
442+
Optional<Node> nodeOpt = maybeNodeForPosition(partition, position, currentTimeMs);
443+
444+
if (nodeOpt.isEmpty())
445+
continue;
446+
447+
Node node = nodeOpt.get();
427448

428449
if (isUnavailable(node)) {
429450
maybeThrowAuthFailure(node);
@@ -432,7 +453,14 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
432453
// going to be failed anyway before being sent, so skip sending the request for now
433454
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
434455
} else if (nodesWithPendingFetchRequests.contains(node.id())) {
456+
// If there's already an inflight request for this node, don't issue another request.
435457
log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
458+
} else if (bufferedNodes.contains(node.id())) {
459+
// While a node has buffered data, don't fetch other partition data from it. Because the buffered
460+
// partitions are not included in the fetch request, those partitions will be inadvertently dropped
461+
// from the broker fetch session cache. In some cases, that could lead to the entire fetch session
462+
// being evicted.
463+
log.trace("Skipping fetch for partition {} because its leader node {} hosts buffered partitions", partition, node);
436464
} else {
437465
// if there is a leader and no in-flight requests, issue a new fetch
438466
FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
@@ -456,6 +484,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
456484
return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
457485
}
458486

487+
/**
488+
* Simple utility method that returns a {@link SubscriptionState.FetchPosition position} for the partition. If
489+
* no position exists, an {@link IllegalStateException} is thrown.
490+
*/
491+
private SubscriptionState.FetchPosition positionForPartition(TopicPartition partition) {
492+
SubscriptionState.FetchPosition position = subscriptions.position(partition);
493+
494+
if (position == null)
495+
throw new IllegalStateException("Missing position for fetchable partition " + partition);
496+
497+
return position;
498+
}
499+
500+
/**
501+
* Retrieves the node from which to fetch the partition data. If the given
502+
* {@link SubscriptionState.FetchPosition position} does not have a current
503+
* {@link Metadata.LeaderAndEpoch#leader leader} defined the method will return {@link Optional#empty()}.
504+
*
505+
* @return Three options: 1) {@link Optional#empty()} if the position's leader is empty, 2) the
506+
* {@link #selectReadReplica(TopicPartition, Node, long) read replica, if defined}, or 3) the position's
507+
* {@link Metadata.LeaderAndEpoch#leader leader}
508+
*/
509+
private Optional<Node> maybeNodeForPosition(TopicPartition partition,
510+
SubscriptionState.FetchPosition position,
511+
long currentTimeMs) {
512+
Optional<Node> leaderOpt = position.currentLeader.leader;
513+
514+
if (leaderOpt.isEmpty()) {
515+
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
516+
metadata.requestUpdate(false);
517+
return Optional.empty();
518+
}
519+
520+
// Use the preferred read replica if set, otherwise the partition's leader
521+
Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
522+
return Optional.of(node);
523+
}
524+
459525
// Visible for testing
460526
protected FetchSessionHandler sessionHandler(int node) {
461527
return sessionHandlers.get(node);

0 commit comments

Comments
 (0)