Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18629: ShareGroupDeleteState admin client impl. #18928

Merged
merged 12 commits into from
Feb 22, 2025
17 changes: 17 additions & 0 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,14 @@ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, List
*/
DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions options);

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'd prefer to see these two methods declared with the other share group methods, not mixed in with the consumer group methods.

* Delete share groups from the cluster.
*
* @param options The options to use when deleting a share group.
* @return The DeleteShareGroupsResult.
*/
DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options);

/**
* Delete consumer groups from the cluster with the default options.
*
Expand All @@ -962,6 +970,15 @@ default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> group
return deleteConsumerGroups(groupIds, new DeleteConsumerGroupsOptions());
}

/**
* Delete share groups from the cluster with the default options.
*
* @return The DeleteShareGroupsResult.
*/
default DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds) {
return deleteShareGroups(groupIds, new DeleteShareGroupsOptions());
}

/**
* Delete committed offsets for a set of partitions in a consumer group. This will
* succeed at the partition level only if the group is not actively subscribed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;

/**
* Options for the {@link Admin#deleteShareGroups(Collection <String>, DeleteShareGroupsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteShareGroupsOptions extends AbstractOptions<DeleteShareGroupsOptions> {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin;

import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;

import java.util.Collection;
import java.util.Map;

/**
* The result of the {@link Admin#deleteShareGroups(Collection <String>, DeleteShareGroupsOptions)} call.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
@InterfaceStability.Evolving
public class DeleteShareGroupsResult extends DeleteConsumerGroupsResult {
DeleteShareGroupsResult(final Map<String, KafkaFuture<Void>> futures) {
super(futures);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation for the public methods here could be added.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of now, it is exactly the same as the parent class.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, OK. I suggest making a common superclass DeleteGroupsResult and having both DeleteConsumerGroupsResult and DeleteShareGroupsResult inheriting from that.

Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,11 @@ public ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGr
return delegate.listShareGroupOffsets(groupSpecs, options);
}

@Override
public DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
return delegate.deleteShareGroups(groupIds, options);
}

@Override
public ListGroupsResult listGroups(ListGroupsOptions options) {
return delegate.listGroups(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupOffsetsHandler;
import org.apache.kafka.clients.admin.internals.DeleteConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DeleteRecordsHandler;
import org.apache.kafka.clients.admin.internals.DeleteShareGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeClassicGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeConsumerGroupsHandler;
import org.apache.kafka.clients.admin.internals.DescribeProducersHandler;
Expand Down Expand Up @@ -3829,6 +3830,16 @@ public DescribeClassicGroupsResult describeClassicGroups(final Collection<String
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

@Override
public DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
SimpleAdminApiFuture<CoordinatorKey, Void> future =
DeleteShareGroupsHandler.newFuture(groupIds);
DeleteShareGroupsHandler handler = new DeleteShareGroupsHandler(logContext);
invokeDriver(handler, future, options.timeoutMs);
return new DeleteShareGroupsResult(future.all().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue)));
}

@Override
public Map<MetricName, ? extends Metric> metrics() {
return Collections.unmodifiableMap(this.metrics.metrics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ public class DeleteConsumerGroupsHandler extends AdminApiHandler.Batched<Coordin
public DeleteConsumerGroupsHandler(
LogContext logContext
) {
this.log = logContext.logger(DeleteConsumerGroupsHandler.class);
this(logContext, DeleteConsumerGroupsHandler.class);
}

public DeleteConsumerGroupsHandler(
LogContext logContext,
Class<?> loggerClass
) {
this.log = logContext.logger(loggerClass);
this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.common.utils.LogContext;

public class DeleteShareGroupsHandler extends DeleteConsumerGroupsHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, in a similar vein, maybe having a common DeleteGroupsHandler and having 2 subclasses would be sensible here. Having the share group handler inherit from the consumer group one is strange. I'm pretty sure that there will be an Admin.deleteGroups method before long too.

public DeleteShareGroupsHandler(
LogContext logContext
) {
super(logContext, DeleteShareGroupsHandler.class);
}

@Override
public String apiName() {
return "deleteShareGroups";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1399,6 +1399,11 @@ public synchronized ListShareGroupOffsetsResult listShareGroupOffsets(Map<String
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public synchronized DeleteShareGroupsResult deleteShareGroups(Collection<String> groupIds, DeleteShareGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public synchronized DescribeClassicGroupsResult describeClassicGroups(Collection<String> groupIds, DescribeClassicGroupsOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,16 @@ public class DeleteConsumerGroupsHandlerTest {

@Test
public void testBuildRequest() {
DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext);
DeleteConsumerGroupsHandler handler = getHandler();
DeleteGroupsRequest request = handler.buildBatchedRequest(1, singleton(CoordinatorKey.byGroupId(groupId1))).build();
assertEquals(1, request.data().groupsNames().size());
assertEquals(groupId1, request.data().groupsNames().get(0));
}

protected DeleteConsumerGroupsHandler getHandler() {
return new DeleteConsumerGroupsHandler(logContext);
}

@Test
public void testSuccessfulHandleResponse() {
assertCompleted(handleWithError(Errors.NONE));
Expand Down Expand Up @@ -87,7 +91,7 @@ private DeleteGroupsResponse buildResponse(Errors error) {
private AdminApiHandler.ApiResult<CoordinatorKey, Void> handleWithError(
Errors error
) {
DeleteConsumerGroupsHandler handler = new DeleteConsumerGroupsHandler(logContext);
DeleteConsumerGroupsHandler handler = getHandler();
DeleteGroupsResponse response = buildResponse(error);
return handler.handleResponse(new Node(1, "host", 1234), singleton(CoordinatorKey.byGroupId(groupId1)), response);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.clients.admin.internals;

import org.apache.kafka.common.utils.LogContext;

public class DeleteShareGroupsHandlerTest extends DeleteConsumerGroupsHandlerTest {
private final LogContext logContext = new LogContext();

@Override
protected DeleteConsumerGroupsHandler getHandler() {
return new DeleteShareGroupsHandler(logContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.kafka.clients.admin.DeleteConsumerGroupsResult;
import org.apache.kafka.clients.admin.DeleteRecordsOptions;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
Expand Down Expand Up @@ -428,6 +430,11 @@ public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListS
return adminDelegate.listShareGroupOffsets(groupSpecs, options);
}

@Override
public DeleteShareGroupsResult deleteShareGroups(final Collection<String> groupIds, final DeleteShareGroupsOptions options) {
return adminDelegate.deleteShareGroups(groupIds, options);
}

@Override
public DescribeClassicGroupsResult describeClassicGroups(final Collection<String> groupIds, final DescribeClassicGroupsOptions options) {
return adminDelegate.describeClassicGroups(groupIds, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.apache.kafka.tools.consumer.group;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
Expand Down Expand Up @@ -81,7 +83,7 @@ public static void run(ShareGroupCommandOptions opts) {
} else if (opts.options.has(opts.describeOpt)) {
shareGroupService.describeGroups();
} else if (opts.options.has(opts.deleteOpt)) {
throw new UnsupportedOperationException("--delete option is not yet implemented");
shareGroupService.deleteShareGroups();
} else if (opts.options.has(opts.resetOffsetsOpt)) {
throw new UnsupportedOperationException("--reset-offsets option is not yet implemented");
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
Expand Down Expand Up @@ -154,6 +156,18 @@ List<String> listShareGroups() {
}
}

List<GroupListing> listDetailedShareGroups() {
try {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
.withTypes(Set.of(GroupType.SHARE)));
Collection<GroupListing> listings = result.all().get();
return listings.stream().toList();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}

List<GroupListing> listShareGroupsInStates(Set<GroupState> states) throws ExecutionException, InterruptedException {
ListGroupsResult result = adminClient.listGroups(new ListGroupsOptions()
.timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue())
Expand Down Expand Up @@ -208,6 +222,67 @@ public void describeGroups() throws ExecutionException, InterruptedException {
}
}

Map<String, Throwable> deleteShareGroups() {
List<GroupListing> shareGroupIds = listDetailedShareGroups();
List<String> groupIds = opts.options.has(opts.allGroupsOpt)
? shareGroupIds.stream().map(GroupListing::groupId).toList()
: opts.options.valuesOf(opts.groupOpt);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because deleteShareGroups actually deletes all types of group, I think it would be prudent for this method to check that the groups being deleted are share groups.

In the --all-groups case, you're listing the share groups. If you listed the share groups in the other case too, you'd easily be able to see whether the groups are share groups and then avoid deleting consumer groups unawares. Do you think this would be a sensible precaution? It's quite a limited code change.

if (groupIds.isEmpty()) {
throw new IllegalArgumentException("--groups or --all-groups argument is mandatory");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would have said that the checking that there is either --groups or --all-groups should be done in the command line validation, not here.

As it stands, if the user specified --all-groups and there are no share groups, the error message is --groups or --all-groups is mandatory. Really, I would have said that this should be treated as successful deletion of no groups. That's what kafka-consumer-groups.sh does. It says "Deletion of requested share groups ('') was successful." Slight inelegant, but effective.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield
It is already happening correctly. The util method getShareGroupService was not performing the validation check and hence I missed it and added it at the wrong place.
I will rectify.

}

for (String groupId : groupIds) {
Optional<GroupListing> listing = shareGroupIds.stream().filter(item -> item.groupId().equals(groupId)).findAny();
if (listing.isEmpty()) {
throw new IllegalArgumentException("Group '" + groupId + "' is not a share group.");
} else {
Optional<GroupState> state = listing.get().groupState();
if (state.isPresent() && !state.get().equals(GroupState.EMPTY)) {
throw new IllegalStateException("Share group '" + groupId + "' is not EMPTY.");
}
}
}

Map<String, KafkaFuture<Void>> groupsToDelete = adminClient.deleteShareGroups(
groupIds,
withTimeoutMs(new DeleteShareGroupsOptions())
).deletedGroups();

Map<String, Throwable> success = new HashMap<>();
Map<String, Throwable> failed = new HashMap<>();

groupsToDelete.forEach((g, f) -> {
try {
f.get();
success.put(g, null);
} catch (InterruptedException ie) {
failed.put(g, ie);
} catch (ExecutionException e) {
failed.put(g, e.getCause());
}
});

if (failed.isEmpty())
System.out.println("Deletion of requested share groups (" + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining(", ")) + "'" + ") was successful.");
else {
printError("Deletion of some share groups failed:", Optional.empty());
failed.forEach((group, error) -> System.out.println("* Share Group '" + group + "' could not be deleted due to: " + error));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Share group not Share Group please.


if (!success.isEmpty())
System.out.println("\nThese share groups were deleted successfully: " + "'" + success.keySet().stream().map(Object::toString).collect(Collectors.joining("'")) + "', '");
}

failed.putAll(success);

return failed;
}

private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
return options.timeoutMs(t);
}

Map<String, ShareGroupDescription> describeShareGroups(Collection<String> groupIds) throws ExecutionException, InterruptedException {
Map<String, ShareGroupDescription> res = new HashMap<>();
Map<String, KafkaFuture<ShareGroupDescription>> stringKafkaFutureMap = adminClient.describeShareGroups(
Expand Down
Loading