Skip to content

Commit

Permalink
KAFKA-18827: Initialize share state, share coordinator impl. [1/N] (#…
Browse files Browse the repository at this point in the history
…18968)

In this PR, we have added the share coordinator and KafkaApis side impl
of the intialize share group state RPC.
ref:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka#KIP932:QueuesforKafka-InitializeShareGroupStateAPI

Reviewers: Andrew Schofield <[email protected]>
  • Loading branch information
smjn authored Feb 22, 2025
1 parent c6335c2 commit 4f28973
Show file tree
Hide file tree
Showing 10 changed files with 1,007 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static class Builder extends AbstractRequest.Builder<InitializeShareGroup
private final InitializeShareGroupStateRequestData data;

public Builder(InitializeShareGroupStateRequestData data) {
this(data, false);
this(data, true);
}

public Builder(InitializeShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
Expand Down Expand Up @@ -64,15 +64,15 @@ public InitializeShareGroupStateRequest(InitializeShareGroupStateRequestData dat
public InitializeShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<InitializeShareGroupStateResponseData.InitializeStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData()
.setResults(results));
.setResults(results));
}

@Override
Expand All @@ -82,8 +82,8 @@ public InitializeShareGroupStateRequestData data() {

public static InitializeShareGroupStateRequest parse(ByteBuffer buffer, short version) {
return new InitializeShareGroupStateRequest(
new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class InitializeShareGroupStateResponse extends AbstractResponse {
Expand All @@ -43,9 +47,9 @@ public InitializeShareGroupStateResponseData data() {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
Expand All @@ -62,7 +66,65 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {

public static InitializeShareGroupStateResponse parse(ByteBuffer buffer, short version) {
return new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
);
}

public static InitializeShareGroupStateResponseData toGlobalErrorResponse(InitializeShareGroupStateRequestData request, Errors error) {
List<InitializeShareGroupStateResponseData.InitializeStateResult> initStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
initStateResults.add(toResponseInitializeStateResult(topicData.topicId(), partitionResults));
});
return new InitializeShareGroupStateResponseData().setResults(initStateResults);
}

public static InitializeShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult(
int partitionId,
Errors error,
String errorMessage
) {
return new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}

public static InitializeShareGroupStateResponseData.InitializeStateResult toResponseInitializeStateResult(
Uuid topicId,
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults
) {
return new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
}

public static InitializeShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
return new InitializeShareGroupStateResponseData().setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))
));
}

public static InitializeShareGroupStateResponseData.PartitionResult toResponsePartitionResult(int partitionId) {
return new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partitionId);
}

public static InitializeShareGroupStateResponseData toResponseData(Uuid topicId, int partitionId) {
return new InitializeShareGroupStateResponseData().setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
))
));
}
}
30 changes: 26 additions & 4 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3100,11 +3100,33 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): Unit = {
def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val initializeShareGroupStateRequest = request.body[InitializeShareGroupStateRequest]
// TODO: Implement the InitializeShareGroupStateRequest handling
requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())

if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new InitializeShareGroupStateResponse(
InitializeShareGroupStateResponse.toGlobalErrorResponse(
initializeShareGroupStateRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}

shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
initializeShareGroupStateRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not enabled.")))
CompletableFuture.completedFuture[Unit](())

case Some(coordinator) => coordinator.initializeState(request.context, initializeShareGroupStateRequest.data)
.handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, new InitializeShareGroupStateResponse(response))
}
}
}
}

def handleReadShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
Expand Down
121 changes: 121 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10879,6 +10879,98 @@ class KafkaApisTest extends Logging {
})
}

@Test
def testInitializeShareGroupStateSuccess(): Unit = {
val topicId = Uuid.randomUuid();
val initRequestData = new InitializeShareGroupStateRequestData()
.setGroupId("group1")
.setTopics(List(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(1)
.setStateEpoch(0)
).asJava)
).asJava)

val initStateResultData: util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).asJava)
).asJava

val config = Map(
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
)

val response = getInitializeShareGroupResponse(
initRequestData,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
initStateResultData
)

assertNotNull(response.data)
assertEquals(1, response.data.results.size)
}

@Test
def testInitializeShareGroupStateAuthorizationFailed(): Unit = {
val topicId = Uuid.randomUuid();
val initRequestData = new InitializeShareGroupStateRequestData()
.setGroupId("group1")
.setTopics(List(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(1)
.setStateEpoch(0)
).asJava)
).asJava)

val initStateResultData: util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).asJava)
).asJava

val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava)

val config = Map(
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
)

val response = getInitializeShareGroupResponse(
initRequestData,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
initStateResultData
)

assertNotNull(response.data)
assertEquals(1, response.data.results.size)
response.data.results.forEach(deleteResult => {
assertEquals(1, deleteResult.partitions.size)
assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), deleteResult.partitions.get(0).errorCode())
})
}

def getShareGroupDescribeResponse(groupIds: util.List[String], configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = {
Expand Down Expand Up @@ -11024,4 +11116,33 @@ class KafkaApisTest extends Logging {
}
response
}

def getInitializeShareGroupResponse(requestData: InitializeShareGroupStateRequestData, configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
initStateResult: util.List[InitializeShareGroupStateResponseData.InitializeStateResult]): InitializeShareGroupStateResponse = {
val requestChannelRequest = buildRequest(new InitializeShareGroupStateRequest.Builder(requestData, true).build())

val future = new CompletableFuture[InitializeShareGroupStateResponseData]()
when(shareCoordinator.initializeState(
any[RequestContext],
any[InitializeShareGroupStateRequestData]
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = configOverrides,
authorizer = Option(authorizer),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())

future.complete(new InitializeShareGroupStateResponseData()
.setResults(initStateResult))

val response = verifyNoThrottling[InitializeShareGroupStateResponse](requestChannelRequest)
if (verifyNoErr) {
val expectedInitShareGroupStateResponseData = new InitializeShareGroupStateResponseData()
.setResults(initStateResult)
assertEquals(expectedInitShareGroupStateResponseData, response.data)
}
response
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
Expand Down Expand Up @@ -96,6 +98,14 @@ public interface ShareCoordinator {
*/
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestContext context, DeleteShareGroupStateRequestData request);

/**
* Handle initialize share group state call
* @param context - represents the incoming initialize share group request context
* @param request - actual RPC request object
* @return completable future representing initialize share group RPC response data
*/
CompletableFuture<InitializeShareGroupStateResponseData> initializeState(RequestContext context, InitializeShareGroupStateRequestData request);

/**
* Called when new coordinator is elected
* @param partitionIndex - The partition index (internal topic)
Expand Down
Loading

0 comments on commit 4f28973

Please sign in to comment.