Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18827: Initialize share state, share coordinator impl. [1/N] #18968

Merged
merged 4 commits into from
Feb 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public static class Builder extends AbstractRequest.Builder<InitializeShareGroup
private final InitializeShareGroupStateRequestData data;

public Builder(InitializeShareGroupStateRequestData data) {
this(data, false);
this(data, true);
}

public Builder(InitializeShareGroupStateRequestData data, boolean enableUnstableLastVersion) {
Expand Down Expand Up @@ -64,15 +64,15 @@ public InitializeShareGroupStateRequest(InitializeShareGroupStateRequestData dat
public InitializeShareGroupStateResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<InitializeShareGroupStateResponseData.InitializeStateResult> results = new ArrayList<>();
data.topics().forEach(
topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
topicResult -> results.add(new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicResult.topicId())
.setPartitions(topicResult.partitions().stream()
.map(partitionData -> new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionData.partition())
.setErrorCode(Errors.forException(e).code()))
.collect(Collectors.toList()))));
return new InitializeShareGroupStateResponse(new InitializeShareGroupStateResponseData()
.setResults(results));
.setResults(results));
}

@Override
Expand All @@ -82,8 +82,8 @@ public InitializeShareGroupStateRequestData data() {

public static InitializeShareGroupStateRequest parse(ByteBuffer buffer, short version) {
return new InitializeShareGroupStateRequest(
new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
new InitializeShareGroupStateRequestData(new ByteBufferAccessor(buffer), version),
version
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

package org.apache.kafka.common.requests;

import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class InitializeShareGroupStateResponse extends AbstractResponse {
Expand All @@ -43,9 +47,9 @@ public InitializeShareGroupStateResponseData data() {
public Map<Errors, Integer> errorCounts() {
Map<Errors, Integer> counts = new HashMap<>();
data.results().forEach(
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
result -> result.partitions().forEach(
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
)
);
return counts;
}
Expand All @@ -62,7 +66,65 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {

public static InitializeShareGroupStateResponse parse(ByteBuffer buffer, short version) {
return new InitializeShareGroupStateResponse(
new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
new InitializeShareGroupStateResponseData(new ByteBufferAccessor(buffer), version)
);
}

public static InitializeShareGroupStateResponseData toGlobalErrorResponse(InitializeShareGroupStateRequestData request, Errors error) {
List<InitializeShareGroupStateResponseData.InitializeStateResult> initStateResults = new ArrayList<>();
request.topics().forEach(topicData -> {
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults = new ArrayList<>();
topicData.partitions().forEach(partitionData -> partitionResults.add(
toErrorResponsePartitionResult(partitionData.partition(), error, error.message()))
);
initStateResults.add(toResponseInitializeStateResult(topicData.topicId(), partitionResults));
});
return new InitializeShareGroupStateResponseData().setResults(initStateResults);
}

public static InitializeShareGroupStateResponseData.PartitionResult toErrorResponsePartitionResult(
int partitionId,
Errors error,
String errorMessage
) {
return new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage);
}

public static InitializeShareGroupStateResponseData.InitializeStateResult toResponseInitializeStateResult(
Uuid topicId,
List<InitializeShareGroupStateResponseData.PartitionResult> partitionResults
) {
return new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(partitionResults);
}

public static InitializeShareGroupStateResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) {
return new InitializeShareGroupStateResponseData().setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
.setErrorCode(error.code())
.setErrorMessage(errorMessage)))
));
}

public static InitializeShareGroupStateResponseData.PartitionResult toResponsePartitionResult(int partitionId) {
return new InitializeShareGroupStateResponseData.PartitionResult().setPartition(partitionId);
}

public static InitializeShareGroupStateResponseData toResponseData(Uuid topicId, int partitionId) {
return new InitializeShareGroupStateResponseData().setResults(List.of(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List.of(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(partitionId)
))
));
}
}
30 changes: 26 additions & 4 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3100,11 +3100,33 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): Unit = {
def handleInitializeShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val initializeShareGroupStateRequest = request.body[InitializeShareGroupStateRequest]
// TODO: Implement the InitializeShareGroupStateRequest handling
requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())

if (!authorizeClusterOperation(request, CLUSTER_ACTION)) {
requestHelper.sendMaybeThrottle(request, new InitializeShareGroupStateResponse(
InitializeShareGroupStateResponse.toGlobalErrorResponse(
initializeShareGroupStateRequest.data(),
Errors.CLUSTER_AUTHORIZATION_FAILED
)))
return CompletableFuture.completedFuture[Unit](())
}

shareCoordinator match {
case None => requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
initializeShareGroupStateRequest.getErrorResponse(requestThrottleMs,
new ApiException("Share coordinator is not enabled.")))
CompletableFuture.completedFuture[Unit](())

case Some(coordinator) => coordinator.initializeState(request.context, initializeShareGroupStateRequest.data)
.handle[Unit] { (response, exception) =>
if (exception != null) {
requestHelper.sendMaybeThrottle(request, initializeShareGroupStateRequest.getErrorResponse(exception))
} else {
requestHelper.sendMaybeThrottle(request, new InitializeShareGroupStateResponse(response))
}
}
}
}

def handleReadShareGroupStateRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
Expand Down
121 changes: 121 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10879,6 +10879,98 @@ class KafkaApisTest extends Logging {
})
}

@Test
def testInitializeShareGroupStateSuccess(): Unit = {
val topicId = Uuid.randomUuid();
val initRequestData = new InitializeShareGroupStateRequestData()
.setGroupId("group1")
.setTopics(List(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(1)
.setStateEpoch(0)
).asJava)
).asJava)

val initStateResultData: util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).asJava)
).asJava

val config = Map(
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
)

val response = getInitializeShareGroupResponse(
initRequestData,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = true,
null,
initStateResultData
)

assertNotNull(response.data)
assertEquals(1, response.data.results.size)
}

@Test
def testInitializeShareGroupStateAuthorizationFailed(): Unit = {
val topicId = Uuid.randomUuid();
val initRequestData = new InitializeShareGroupStateRequestData()
.setGroupId("group1")
.setTopics(List(
new InitializeShareGroupStateRequestData.InitializeStateData()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateRequestData.PartitionData()
.setPartition(1)
.setStateEpoch(0)
).asJava)
).asJava)

val initStateResultData: util.List[InitializeShareGroupStateResponseData.InitializeStateResult] = List(
new InitializeShareGroupStateResponseData.InitializeStateResult()
.setTopicId(topicId)
.setPartitions(List(
new InitializeShareGroupStateResponseData.PartitionResult()
.setPartition(1)
.setErrorCode(Errors.NONE.code())
.setErrorMessage(null)
).asJava)
).asJava

val authorizer: Authorizer = mock(classOf[Authorizer])
when(authorizer.authorize(any[RequestContext], any[util.List[Action]]))
.thenReturn(Seq(AuthorizationResult.DENIED).asJava, Seq(AuthorizationResult.ALLOWED).asJava)

val config = Map(
ShareGroupConfig.SHARE_GROUP_ENABLE_CONFIG -> "true",
)

val response = getInitializeShareGroupResponse(
initRequestData,
config ++ ShareCoordinatorTestConfig.testConfigMap().asScala,
verifyNoErr = false,
authorizer,
initStateResultData
)

assertNotNull(response.data)
assertEquals(1, response.data.results.size)
response.data.results.forEach(deleteResult => {
assertEquals(1, deleteResult.partitions.size)
assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED.code(), deleteResult.partitions.get(0).errorCode())
})
}

def getShareGroupDescribeResponse(groupIds: util.List[String], configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
describedGroups: util.List[ShareGroupDescribeResponseData.DescribedGroup]): ShareGroupDescribeResponse = {
Expand Down Expand Up @@ -11024,4 +11116,33 @@ class KafkaApisTest extends Logging {
}
response
}

def getInitializeShareGroupResponse(requestData: InitializeShareGroupStateRequestData, configOverrides: Map[String, String] = Map.empty,
verifyNoErr: Boolean = true, authorizer: Authorizer = null,
initStateResult: util.List[InitializeShareGroupStateResponseData.InitializeStateResult]): InitializeShareGroupStateResponse = {
val requestChannelRequest = buildRequest(new InitializeShareGroupStateRequest.Builder(requestData, true).build())

val future = new CompletableFuture[InitializeShareGroupStateResponseData]()
when(shareCoordinator.initializeState(
any[RequestContext],
any[InitializeShareGroupStateRequestData]
)).thenReturn(future)
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(
overrideProperties = configOverrides,
authorizer = Option(authorizer),
)
kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching())

future.complete(new InitializeShareGroupStateResponseData()
.setResults(initStateResult))

val response = verifyNoThrottling[InitializeShareGroupStateResponse](requestChannelRequest)
if (verifyNoErr) {
val expectedInitShareGroupStateResponseData = new InitializeShareGroupStateResponseData()
.setResults(initStateResult)
assertEquals(expectedInitShareGroupStateResponseData, response.data)
}
response
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.kafka.common.message.DeleteShareGroupStateRequestData;
import org.apache.kafka.common.message.DeleteShareGroupStateResponseData;
import org.apache.kafka.common.message.InitializeShareGroupStateRequestData;
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateSummaryRequestData;
Expand Down Expand Up @@ -96,6 +98,14 @@ public interface ShareCoordinator {
*/
CompletableFuture<DeleteShareGroupStateResponseData> deleteState(RequestContext context, DeleteShareGroupStateRequestData request);

/**
* Handle initialize share group state call
* @param context - represents the incoming initialize share group request context
* @param request - actual RPC request object
* @return completable future representing initialize share group RPC response data
*/
CompletableFuture<InitializeShareGroupStateResponseData> initializeState(RequestContext context, InitializeShareGroupStateRequestData request);

/**
* Called when new coordinator is elected
* @param partitionIndex - The partition index (internal topic)
Expand Down
Loading