Skip to content

KAFKA-19441: encapsulate MetadataImage in GroupCoordinator/ShareCoordinator #20061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from

Conversation

zzbennett
Copy link

The MetadataImage has a lot of stuff in it and it gets passed around in many places in the new GroupCoordinator. This makes it difficult to understand what metadata the group coordinator actually relies on and makes it too easy to use metadata in ways it wasn't meant to be used. 

This change encapsulate the MetadataImage in an interface (CoordinatorMetadataImage) that indicates and controls what metadata the group coordinator actually uses. Now it is much easier at a glance to see what dependencies the GroupCoordinator has on the metadata. Also, now we have a level of indirection that allows more flexibility in how the GroupCoordinator is provided the metadata it needs.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tools performance KIP-932 Queues for Kafka group-coordinator labels Jun 28, 2025
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zzbennett Thanks for the patch. I just made a pass and I left some high level comments. I am not entirely happy with the interface yet.

Comment on lines 50 to 53
default Optional<TopicMetadata> topicMetadata(String topicName) {
var topicId = topicId(topicName);
return topicId.isEmpty() ? Optional.empty() : topicMetadata(topicId.get());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we provide the default implementation for the topic id version in order to be consistent with partitionCount?


CoordinatorMetadataDelta emptyDelta();

Long version();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we use long here?

Comment on lines 75 to 77
default Set<Integer> partitionSet() {
return IntStream.range(0, partitionCount()).boxed().collect(Collectors.toSet());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is annoying. Let me check where we still use this.

@@ -1921,14 +1921,14 @@ private CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShare
errorTopicResponses.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
.setTopicName(metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot return <UNKNOWN>. What was returned with the previous implementation? null?

Copy link
Author

@zzbennett zzbennett Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea before we were returning null. I had seen some other places where we were setting topic name to <UNKNOWN> e.g. https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L4924 but to maintain the existing behavior we can just keep it as null

.setErrorMessage(Errors.forCode(errItem.get().errorCode()).message())
.setErrorCode(errItem.get().errorCode())
);
} else {
successTopics.put(
topicData.topicId(),
metadataImage.topics().topicIdToNameView().get(topicData.topicId())
metadataImage.topicName(topicData.topicId()).orElse("<UNKNOWN>")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. In this case, we can return ZERO_UUID.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is setting the topic name here so I'm not sure if ZERO_UUID makes sense

Comment on lines 416 to 418
List<String> partitionRacks = new ArrayList<>(racks.get(i));
// topicMetadata returns an unmodifiable list
Collections.copy(partitionRacks, racks.get(i));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tad annoying too because we already allocate a new ArrayList when partitionRacks is called. For performance reasons, I wonder if we should not return an unmodifiable list so we can reuse it here.

This part is performance sensitive. We have a jmh benchmark that we may want to run to ensure that we don't regress here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can make it a modifiable list. Making it unmodifiable is probably overkill since, like you said, we already make a copy in the partitionRacks method. If we avoid the extra copy, then the new implementation should be equivalent to the old but I'll double check with the jmh benchmarks

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the group coordinator jmh benchmarks locally a couple times and didn't see any statistically significant variations in performance

}

CoordinatorMetadataImage.TopicMetadata topicMetadata = topicMetadataOp.get();
List<String> racks = topicMetadata.partitionRacks().get(partition);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call is pretty nasty because it generate the entire map just to return the racks of a single partition and it does so every time racksForPartition is called. We should definitely consider my suggestion for the interface.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea agreed...

CoordinatorMetadataImage.TopicMetadata topicMetadata = topicMetadataOp.get();
List<String> racks = topicMetadata.partitionRacks().get(partition);
if (racks == null) {
return Collections.emptySet();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Set.of().

@@ -477,7 +482,7 @@ public void testMemberJoinsEmptyConsumerGroup() {
MockPartitionAssignor assignor = new MockPartitionAssignor("range");
GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder()
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor))
.withMetadataImage(metadataImage)
.withMetadataImage(new KRaftCoordinatorMetadataImage(metadataImage))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am debating whether we should update MetadataImageBuilder to return a CoordinatorMetadataImage instead of wrapping it everywhere. Have you considered it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually did at some point add a method to the MetadataImageBuilder to return a CoordinatorMetadataImage but I must have given up updating the test to use the new method in all the places where it could. This test is really massive 😅


Optional<TopicMetadata> topicMetadata(Uuid topicId);

boolean shareGroupsEnabled();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels weird in the interface. I wonder whether we could expose the features instead. It seems useful for other needs too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it is weird. The share group coordinator relies on it in its onNewMetadataImage method https://github.com/apache/kafka/blob/trunk/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java#L1102

I just realized though that this particular instance of onNewMetadataImage is not part of any of the other coordinator related interfaces though, so I can easily just add the FeatureImage as an extra arg to the ShareCoordinator interface instead of handling it through the CoordinatorMetadataImage.

@github-actions github-actions bot added the kraft label Jun 30, 2025
@zzbennett
Copy link
Author

@dajac thanks for your review. I think I've addressed all your comments in e66548e


int partitionCount();

default Set<Integer> partitionSet() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I can move it out of the interface to keep the interface more clean. There are only 4 places where we use it so I can move the logic inline in those areas

@github-actions github-actions bot removed the triage PRs from the community label Jul 1, 2025
@@ -1921,14 +1921,14 @@ private CompletableFuture<DeleteShareGroupOffsetsResponseData> handleDeleteShare
errorTopicResponses.add(
new DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
.setTopicId(topicData.topicId())
.setTopicName(metadataImage.topics().topicIdToNameView().get(topicData.topicId()))
.setTopicName(metadataImage.topicName(topicData.topicId()).orElse(null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you ever get null here? I do see that it matches the existing behavior.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably wouldn't ever happen but returning null at least matches the current behavior and avoids throwing an exception if the Optional is empty


/**
* The cache for topic hash value by topic name.
* A topic hash is calculated when there is a group subscribes to it.
* A topic hash is removed when it's updated in MetadataImage or there is no group subscribes to it.
* A topic hash is removed when it's updated in GroupCoordinatorMetadataImage or there is no group subscribes to it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be CoordinatorMetadataImage?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A yea good catch

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one other place where it was GroupCoordinatorMetadataImage. At first that was what the class was called, but I renamed it to CoordinatorMetadataImage when I realized it was being shared by the Share Group Coordinator too

.collect(Collectors.toSet()),
currentTimeMs
));
metadataImage.topicMetadata(topic.topicName()).ifPresentOrElse(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really the ifPresentOrElse's fault but I do find these if blocks a little unwieldy. I don't really have a great suggestion right now -- it's probably better that we leave the current structure so that the refactoring is still fairly mechanical.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By current structure, you mean the if/else block instead of the ifPresenetOrElse? I'm fine with either one. I agree the ifPresentOrElse is kind of hard to read when it's a big block of logic. It's probably better for one or two line pieces of logic

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it to be a regular if/else block

@zzbennett zzbennett requested a review from lbradstreet July 3, 2025 00:33

CoordinatorMetadataDelta EMPTY = emptyDelta();

interface TopicsDelta {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now I'm thinking of removing TopicsDelta and moving createdTopicsIds/changedTopicIds/deletedTopicsIds to the top level interface because it's kind of redundant to have this sub-class here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants