-
Notifications
You must be signed in to change notification settings - Fork 14.7k
KAFKA-19820: remove the unnecessary copy from AbstractFetch#fetchablePartitions #20745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, for this patch, left one comment
| * Return the set of <em>fetchable</em> partitions, which are the set of partitions to which we are subscribed, | ||
| * but <em>excluding</em> any partitions for which we still have buffered data. The idea is that since the user | ||
| * has yet to process the data for the partition that has already been fetched, we should not go send for more data | ||
| * until the previously-fetched data has been processed. | ||
| * | ||
| * @param buffered The set of partitions we have in our buffer | ||
| * @return {@link Set} of {@link TopicPartition topic partitions} for which we should fetch data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also update the Javadoc accordingly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it.
| // Return all partitions that are in an otherwise fetchable state *and* for which we don't already have some | ||
| // messages sitting in our buffer. | ||
| return new HashSet<>(subscriptions.fetchablePartitions(isNotBuffered)); | ||
| return subscriptions.fetchablePartitions(isNotBuffered); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue do you have free cycle to take a look at this change? thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR.
Is there any reason not to make this change at SubscriptionState.fetchablePartitions()? That is...
@@ -482,9 +482,9 @@ public class SubscriptionState {
}
// Visible for testing
- public synchronized List<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
+ public synchronized Set<TopicPartition> fetchablePartitions(Predicate<TopicPartition> isAvailable) {
// Since this is in the hot-path for fetching, we do this instead of using java.util.stream API
- List<TopicPartition> result = new ArrayList<>();
+ Set<TopicPartition> result = new HashSet<>();
assignment.forEach((topicPartition, topicPartitionState) -> {
// Cheap check is first to avoid evaluating the predicate if possible
if ((subscriptionType.equals(SubscriptionType.AUTO_TOPICS_SHARE) || isFetchableAndSubscribed(topicPartition, topicPartitionState))As @chia7712 mentioned here, it's already a distinct set, so we could reduce ambiguity by addressing the problem at "the source."
That would still require the change to remove the duplicate set in AbstractFetch as well as a minor change to ShareConsumeRequestManager and SubscriptionStateTest.
Thoughts?
Remove the redundant copy and change the return type to List. This
change also align
ShareConsumer