Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,15 @@ protected FetchRequest.Builder createFetchRequest(final Node fetchTarget,
* until the previously-fetched data has been processed.
*
* @param buffered The set of partitions we have in our buffer
* @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data
* @return {@link List} of {@link TopicPartition topic partitions} for which we should fetch data
*/
private Set<TopicPartition> fetchablePartitions(Set<TopicPartition> buffered) {
private List<TopicPartition> fetchablePartitions(Set<TopicPartition> buffered) {
// This is the test that returns true if the partition is *not* buffered
Predicate<TopicPartition> isNotBuffered = tp -> !buffered.contains(tp);

// Return all partitions that are in an otherwise fetchable state *and* for which we don't already have some
// messages sitting in our buffer.
return new HashSet<>(subscriptions.fetchablePartitions(isNotBuffered));
return subscriptions.fetchablePartitions(isNotBuffered);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue do you have free cycle to take a look at this change? thanks!

}

/**
Expand Down Expand Up @@ -430,7 +430,7 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests()
Set<TopicPartition> buffered = Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());

// This is the set of partitions that do not have buffered data
Set<TopicPartition> unbuffered = fetchablePartitions(buffered);
List<TopicPartition> unbuffered = fetchablePartitions(buffered);

if (unbuffered.isEmpty()) {
// If there are no partitions that don't already have data locally buffered, there's no need to issue
Expand Down