Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17182: Consumer fetch sessions are evicted too quickly with AsyncKafkaConsumer #17700

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
e984638
KAFKA-17439: Make polling for new records an explicit action/event in…
kirktrue Aug 28, 2024
b6af23b
Merge remote-tracking branch 'origin/trunk' into KAFKA-17439-poll-exp…
kirktrue Sep 11, 2024
335c249
Minor tweaks to FetchEvent documentation.
kirktrue Sep 11, 2024
2abd6a4
Update FetchRequestManager.java
kirktrue Sep 11, 2024
45bc8c0
Added unit tests to exercise poll, request-then-poll, and duplicate r…
kirktrue Sep 13, 2024
7265404
Updated FetchEvent to CreateFetchRequestsEvent and catching errors fr…
kirktrue Sep 13, 2024
5449549
Fixed spacing issue that checkstyle wasn't happy with
kirktrue Sep 13, 2024
f7a5940
Merge branch 'trunk' into KAFKA-17439-poll-explicitly
kirktrue Sep 26, 2024
38fbb00
PoC for adding buffered partitions to the fetch request
kirktrue Sep 26, 2024
f6f8c21
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Oct 17, 2024
9864864
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Nov 5, 2024
d5ba79e
Removed superfluous imports
kirktrue Nov 5, 2024
cafcaf2
Updates to ensure that the core logic for the classic consumer remain…
kirktrue Nov 6, 2024
eda5922
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Nov 6, 2024
668e232
Reverting some changes to avoid unnecessary diffs
kirktrue Nov 7, 2024
ceeb8e2
More tweaking to avoid diffs
kirktrue Nov 7, 2024
8c86cb6
More diff reduction
kirktrue Nov 7, 2024
af831a5
More diff
kirktrue Nov 7, 2024
705fad1
More clean up
kirktrue Nov 7, 2024
df30eb3
More tweaks to avoid tweaking existing code
kirktrue Nov 7, 2024
936e27a
More tweaks
kirktrue Nov 7, 2024
f2cdc49
Update AbstractFetch.java
kirktrue Nov 7, 2024
5537945
Updates for clarity
kirktrue Nov 7, 2024
671b1f3
Update FetchRequestManager.java
kirktrue Nov 7, 2024
f4e6fce
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Nov 8, 2024
40b3aef
Fix code style bug
kirktrue Nov 8, 2024
7ab5cbf
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Nov 13, 2024
6e26f5d
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Nov 23, 2024
d48edf2
Update FetchRequestManagerTest.java
kirktrue Nov 23, 2024
9fa2bbf
Updates to reduce some duplication
kirktrue Nov 23, 2024
30e01e7
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Nov 27, 2024
f223dfd
Discard changes to clients/src/main/java/org/apache/kafka/clients/con…
kirktrue Nov 27, 2024
c4c8f45
Temporary updates for testing
kirktrue Nov 27, 2024
dcda4c1
Fixing linting complaints
kirktrue Nov 27, 2024
b25b85f
More linting changes
kirktrue Nov 27, 2024
21ce1f2
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Nov 27, 2024
6a9551e
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Dec 4, 2024
c95aaec
Changes to properly support temp.fetch.mode
kirktrue Dec 4, 2024
2ffde38
Removed unused import
kirktrue Dec 4, 2024
8283502
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Dec 12, 2024
3ca33d0
Updates to facilitate option 4 for ClassicKafkaConsumer
kirktrue Dec 13, 2024
9f64a09
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Dec 16, 2024
0a828d6
Fixed lint failures
kirktrue Jan 6, 2025
45b0af2
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 6, 2025
9a542f5
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 9, 2025
e8d14d8
Removed test scaffolding
kirktrue Jan 9, 2025
6488cec
More clean up
kirktrue Jan 9, 2025
8b01365
Added comments
kirktrue Jan 9, 2025
3f07d50
Updating fetch tests to account for new fetch request logic
kirktrue Jan 10, 2025
acac22f
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 10, 2025
b931b14
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 11, 2025
b9ff2d1
First pass at test cleanup from PR review
kirktrue Jan 11, 2025
11840ab
Second pass at unit test clean up
kirktrue Jan 12, 2025
2b0cc3f
Little more clean up
kirktrue Jan 12, 2025
ec70c13
More updates to test
kirktrue Jan 12, 2025
b6e955d
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 13, 2025
15b8afa
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 14, 2025
cba2a6d
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 14, 2025
2d06672
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 15, 2025
ef0c0fa
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 15, 2025
24f96bf
Merge branch 'KAFKA-17182-reduce-fetch-session-eviction' of github.co…
kirktrue Jan 15, 2025
0318ec4
Removed prepareOffsetsForLeaderEpochResponse method
kirktrue Jan 15, 2025
ce53bd1
Updates to unify the logic in the loops inside prepareFetchRequests
kirktrue Jan 15, 2025
a9ffb18
Added testFetchRequestWithBufferedPartitionNotAssigned unit test
kirktrue Jan 16, 2025
7b63c63
More updates to clean up prepareFetchRequests()
kirktrue Jan 16, 2025
cedd3d1
Update AbstractFetch.java
kirktrue Jan 16, 2025
7137198
Update AbstractFetch.java
kirktrue Jan 16, 2025
4f278ec
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 16, 2025
4f999b6
Update FetchRequestManagerTest.java
kirktrue Jan 16, 2025
cf46610
Update FetchRequestManagerTest.java
kirktrue Jan 16, 2025
d4a273e
Refactoring and updates to comments
kirktrue Jan 16, 2025
7bd2d94
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 16, 2025
6f46c2b
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 17, 2025
9732588
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 21, 2025
cdc2d16
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 21, 2025
4718a70
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 22, 2025
07fa689
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 27, 2025
f81e16a
Update comments in FetchRequestManagerTest for clarity
kirktrue Jan 27, 2025
3b4a7bf
Using hasValidPosition over isAssigned when checking partitions
kirktrue Jan 27, 2025
8492a67
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 28, 2025
930c6fa
Refactoring of logic and comments of tests
kirktrue Jan 28, 2025
1286920
More refactoring structure of test for clarity
kirktrue Jan 28, 2025
290076c
Typo
kirktrue Jan 29, 2025
a387361
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 29, 2025
ba45aff
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 29, 2025
c41ce46
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 30, 2025
d488d02
Merge branch 'trunk' into KAFKA-17182-reduce-fetch-session-eviction
kirktrue Jan 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.io.Closeable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -315,17 +316,15 @@ protected FetchRequest.Builder createFetchRequest(final Node fetchTarget,
}

/**
* Return the list of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
* Return the set of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed,
* but <em>excluding</em> any partitions for which we still have buffered data. The idea is that since the user
* has yet to process the data for the partition that has already been fetched, we should not go send for more data
* until the previously-fetched data has been processed.
*
* @param buffered The set of partitions we have in our buffer
* @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data
*/
private Set<TopicPartition> fetchablePartitions() {
// This is the set of partitions we have in our buffer
Set<TopicPartition> buffered = fetchBuffer.bufferedPartitions();

private Set<TopicPartition> fetchablePartitions(Set<TopicPartition> buffered) {
// This is the test that returns true if the partition is *not* buffered
Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);

Expand Down Expand Up @@ -408,22 +407,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();

for (TopicPartition partition : fetchablePartitions()) {
SubscriptionState.FetchPosition position = subscriptions.position(partition);
// This is the set of partitions that have buffered data
Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());

if (position == null)
throw new IllegalStateException("Missing position for fetchable partition " + partition);
// This is the set of partitions that do not have buffered data
Set<TopicPartition> unbuffered = fetchablePartitions(buffered);

Optional<Node> leaderOpt = position.currentLeader.leader;
if (unbuffered.isEmpty()) {
// If there are no partitions that don't already have data locally buffered, there's no need to issue
// any fetch requests at the present time.
return Collections.emptyMap();
kirktrue marked this conversation as resolved.
Show resolved Hide resolved
}

if (leaderOpt.isEmpty()) {
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
metadata.requestUpdate(false);
Set<Integer> bufferedNodes = new HashSet<>();

for (TopicPartition partition : buffered) {
// It's possible that at the time of the fetcher creating new fetch requests, a partition with buffered
// data from a *previous* request is no longer assigned. So before attempting to retrieve the node
// information, check that the partition is still assigned and fetchable; an unassigned/invalid partition
// will throw an IllegalStateException in positionForPartition.
//
// Note: this check is not needed for the unbuffered partitions as the logic in
// SubscriptionState.fetchablePartitions() only includes partitions currently assigned.
if (!subscriptions.hasValidPosition(partition))
continue;
}

// Use the preferred read replica if set, otherwise the partition's leader
Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
SubscriptionState.FetchPosition position = positionForPartition(partition);
Optional<Node> nodeOpt = maybeNodeForPosition(partition, position, currentTimeMs);
nodeOpt.ifPresent(node -> bufferedNodes.add(node.id()));
}

for (TopicPartition partition : unbuffered) {
SubscriptionState.FetchPosition position = positionForPartition(partition);
Optional<Node> nodeOpt = maybeNodeForPosition(partition, position, currentTimeMs);

if (nodeOpt.isEmpty())
continue;

Node node = nodeOpt.get();

if (isUnavailable(node)) {
maybeThrowAuthFailure(node);
Expand All @@ -432,7 +453,14 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
// going to be failed anyway before being sent, so skip sending the request for now
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node);
} else if (nodesWithPendingFetchRequests.contains(node.id())) {
// If there's already an inflight request for this node, don't issue another request.
log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node);
} else if (bufferedNodes.contains(node.id())) {
// While a node has buffered data, don't fetch other partition data from it. Because the buffered
// partitions are not included in the fetch request, those partitions will be inadvertently dropped
// from the broker fetch session cache. In some cases, that could lead to the entire fetch session
// being evicted.
log.trace("Skipping fetch for partition {} because its leader node {} hosts buffered partitions", partition, node);
} else {
// if there is a leader and no in-flight requests, issue a new fetch
FetchSessionHandler.Builder builder = fetchable.computeIfAbsent(node, k -> {
Expand All @@ -456,6 +484,44 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
}

/**
* Simple utility method that returns a {@link SubscriptionState.FetchPosition position} for the partition. If
* no position exists, an {@link IllegalStateException} is thrown.
*/
private SubscriptionState.FetchPosition positionForPartition(TopicPartition partition) {
SubscriptionState.FetchPosition position = subscriptions.position(partition);

if (position == null)
throw new IllegalStateException("Missing position for fetchable partition " + partition);

return position;
}

/**
* Retrieves the node from which to fetch the partition data. If the given
* {@link SubscriptionState.FetchPosition position} does not have a current
* {@link Metadata.LeaderAndEpoch#leader leader} defined the method will return {@link Optional#empty()}.
*
* @return Three options: 1) {@link Optional#empty()} if the position's leader is empty, 2) the
* {@link #selectReadReplica(TopicPartition, Node, long) read replica, if defined}, or 3) the position's
* {@link Metadata.LeaderAndEpoch#leader leader}
*/
private Optional<Node> maybeNodeForPosition(TopicPartition partition,
SubscriptionState.FetchPosition position,
long currentTimeMs) {
Optional<Node> leaderOpt = position.currentLeader.leader;

if (leaderOpt.isEmpty()) {
log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", partition, position);
metadata.requestUpdate(false);
kirktrue marked this conversation as resolved.
Show resolved Hide resolved
return Optional.empty();
}

// Use the preferred read replica if set, otherwise the partition's leader
Node node = selectReadReplica(partition, leaderOpt.get(), currentTimeMs);
return Optional.of(node);
}

// Visible for testing
protected FetchSessionHandler sessionHandler(int node) {
return sessionHandlers.get(node);
Expand Down
Loading