Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18827: Initialize share state, share coordinator impl. [1/N] #18968

Merged
merged 4 commits into from
Feb 22, 2025

Conversation

smjn
Copy link
Contributor

@smjn smjn commented Feb 19, 2025

In this PR, we have added the share coordinator and KafkaApis side impl of the intialize share group state RPC.
ref: https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-InitializeShareGroupStateAPI

@github-actions github-actions bot added triage PRs from the community core Kafka Broker KIP-932 Queues for Kafka clients labels Feb 19, 2025
@AndrewJSchofield AndrewJSchofield added ci-approved and removed triage PRs from the community labels Feb 19, 2025
@smjn smjn changed the title KAFKA-18827: Initialize share state RPC, share coordinator impl. [1/N] KAFKA-18827: Initialize share state, share coordinator impl. [1/N] Feb 20, 2025
@smjn smjn marked this pull request as ready for review February 20, 2025 07:45
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just a few small comments to address.

topicPartitionFor(coordinatorKey),
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
coordinator -> coordinator.initializeState(requestForCurrentPartition)
).exceptionally(deleteException ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: initializeException?

List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>(topicEntry.size());
topicEntry.forEach(
(partitionId, responseFuture) -> {
// ResponseFut would already be completed by now since we have used
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name was different in the code you copied here :)

))
))
);
WriteShareGroupStateResponseData responseData = new WriteShareGroupStateResponseData().setResults(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the previous style of

new WriteShareGroupStateResponse()
.setResults(

was more in line with the rest of the code you've written.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will need to be changed to as older style causes bad alignment. I have a JIRA for the same.

@AndrewJSchofield AndrewJSchofield merged commit 4f28973 into apache:trunk Feb 22, 2025
9 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved clients core Kafka Broker KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants