Skip to content

MINOR: Cleanup Tools Module (1/n) #20091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions tools/src/main/java/org/apache/kafka/tools/AclCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
import java.io.IOException;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -177,7 +175,7 @@ private static void printResourceAcls(Map<ResourcePattern, Set<AccessControlEntr

private static void removeAcls(Admin adminClient, Set<AccessControlEntry> acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException {
if (acls.isEmpty()) {
adminClient.deleteAcls(Collections.singletonList(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
adminClient.deleteAcls(List.of(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get();
} else {
List<AclBindingFilter> aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList());
adminClient.deleteAcls(aclBindingFilters).all().get();
Expand Down Expand Up @@ -249,8 +247,8 @@ private static Map<ResourcePatternFilter, Set<AccessControlEntry>> getProducerRe
Set<ResourcePatternFilter> transactionalIds = filters.stream().filter(f -> f.resourceType() == ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet());
boolean enableIdempotence = opts.options.has(opts.idempotentOpt);

Set<AccessControlEntry> topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE)));
Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE)));
Set<AccessControlEntry> topicAcls = getAcl(opts, Set.of(WRITE, DESCRIBE, CREATE));
Set<AccessControlEntry> transactionalIdAcls = getAcl(opts, Set.of(WRITE, DESCRIBE));

//Write, Describe, Create permission on topics, Write, Describe on transactionalIds
Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new HashMap<>();
Expand All @@ -261,7 +259,7 @@ private static Map<ResourcePatternFilter, Set<AccessControlEntry>> getProducerRe
result.put(transactionalId, transactionalIdAcls);
}
if (enableIdempotence) {
result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, Collections.singleton(IDEMPOTENT_WRITE)));
result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, Set.of(IDEMPOTENT_WRITE)));
}
return result;
}
Expand All @@ -272,8 +270,8 @@ private static Map<ResourcePatternFilter, Set<AccessControlEntry>> getConsumerRe
Set<ResourcePatternFilter> groups = filters.stream().filter(f -> f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet());

//Read, Describe on topic, Read on consumerGroup
Set<AccessControlEntry> topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(READ, DESCRIBE)));
Set<AccessControlEntry> groupAcls = getAcl(opts, Collections.singleton(READ));
Set<AccessControlEntry> topicAcls = getAcl(opts, Set.of(READ, DESCRIBE));
Set<AccessControlEntry> groupAcls = getAcl(opts, Set.of(READ));

Map<ResourcePatternFilter, Set<AccessControlEntry>> result = new HashMap<>();
for (ResourcePatternFilter topic : topics) {
Expand Down Expand Up @@ -333,9 +331,9 @@ private static Set<String> getHosts(AclCommandOptions opts, OptionSpec<String> h
if (opts.options.has(hostOptionSpec)) {
return opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet());
} else if (opts.options.has(principalOptionSpec)) {
return Collections.singleton(AclEntry.WILDCARD_HOST);
return Set.of(AclEntry.WILDCARD_HOST);
} else {
return Collections.emptySet();
return Set.of();
}
}

Expand All @@ -345,7 +343,7 @@ private static Set<KafkaPrincipal> getPrincipals(AclCommandOptions opts, OptionS
.map(s -> SecurityUtils.parseKafkaPrincipal(s.trim()))
.collect(Collectors.toSet());
} else {
return Collections.emptySet();
return Set.of();
}
}

Expand Down Expand Up @@ -547,7 +545,7 @@ void checkArgs() {
if (!options.has(bootstrapServerOpt) && !options.has(bootstrapControllerOpt)) {
CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server or --bootstrap-controller must be specified");
}
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = Arrays.asList(addOpt, removeOpt, listOpt);
List<AbstractOptionSpec<?>> mutuallyExclusiveOptions = List.of(addOpt, removeOpt, listOpt);
long mutuallyExclusiveOptionsCount = mutuallyExclusiveOptions.stream()
.filter(abstractOptionSpec -> options.has(abstractOptionSpec))
.count();
Expand Down Expand Up @@ -592,10 +590,10 @@ public PatternType convert(String value) {

@Override
public String valuePattern() {
List<PatternType> values = Arrays.asList(PatternType.values());
List<PatternType> values = List.of(PatternType.values());
List<PatternType> filteredValues = values.stream()
.filter(type -> type != PatternType.UNKNOWN)
.collect(Collectors.toList());
.toList();
return filteredValues.stream()
.map(Object::toString)
.collect(Collectors.joining("|"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -291,13 +291,13 @@ void testAdminClient() throws Throwable {
tryFeature("createTopics", testConfig.createTopicsSupported,
() -> {
try {
client.createTopics(Collections.singleton(
client.createTopics(Set.of(
new NewTopic("newtopic", 1, (short) 1))).all().get();
} catch (ExecutionException e) {
throw e.getCause();
}
},
() -> createTopicsResultTest(client, Collections.singleton("newtopic"))
() -> createTopicsResultTest(client, Set.of("newtopic"))
);

while (true) {
Expand Down Expand Up @@ -337,7 +337,7 @@ private void testDescribeConfigsMethod(final Admin client) throws Throwable {
);

Map<ConfigResource, Config> brokerConfig =
client.describeConfigs(Collections.singleton(configResource)).all().get();
client.describeConfigs(Set.of(configResource)).all().get();

if (brokerConfig.get(configResource).entries().isEmpty()) {
throw new KafkaException("Expected to see config entries, but got zero entries");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -91,11 +91,7 @@ static void execute(String... args) throws Exception {
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
printException(cause);
} else {
printException(e);
}
printException(Objects.requireNonNullElse(cause, e));
exitCode = 1;
} catch (Throwable t) {
printException(t);
Expand Down Expand Up @@ -130,8 +126,8 @@ public void alterClientMetrics(ClientMetricsCommandOptions opts) throws Exceptio
Collection<AlterConfigOp> alterEntries = configsToBeSet.entrySet().stream()
.map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()),
entry.getValue().isEmpty() ? AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET))
.collect(Collectors.toList());
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterEntries), alterOptions).all()
.toList();
adminClient.incrementalAlterConfigs(Map.of(configResource, alterEntries), alterOptions).all()
.get(30, TimeUnit.SECONDS);

System.out.println("Altered client metrics config for " + entityName + ".");
Expand All @@ -144,8 +140,8 @@ public void deleteClientMetrics(ClientMetricsCommandOptions opts) throws Excepti
ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
Collection<AlterConfigOp> alterEntries = oldConfigs.stream()
.map(entry -> new AlterConfigOp(entry, AlterConfigOp.OpType.DELETE)).collect(Collectors.toList());
adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterEntries), alterOptions)
.map(entry -> new AlterConfigOp(entry, AlterConfigOp.OpType.DELETE)).toList();
adminClient.incrementalAlterConfigs(Map.of(configResource, alterEntries), alterOptions)
.all().get(30, TimeUnit.SECONDS);

System.out.println("Deleted client metrics config for " + entityName + ".");
Expand All @@ -162,7 +158,7 @@ public void describeClientMetrics(ClientMetricsCommandOptions opts) throws Excep
System.out.println("The client metric resource " + entityNameOpt.get() + " doesn't exist and doesn't have dynamic config.");
return;
}
entities = Collections.singletonList(entityNameOpt.get());
entities = List.of(entityNameOpt.get());
} else {
Collection<ConfigResource> resources = adminClient
.listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions())
Expand All @@ -189,7 +185,7 @@ public void listClientMetrics() throws Exception {

private Collection<ConfigEntry> getClientMetricsConfig(String entityName) throws Exception {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
Map<ConfigResource, Config> result = adminClient.describeConfigs(Collections.singleton(configResource))
Map<ConfigResource, Config> result = adminClient.describeConfigs(Set.of(configResource))
.all().get(30, TimeUnit.SECONDS);
return result.get(configResource).entries();
}
Expand Down
6 changes: 3 additions & 3 deletions tools/src/main/java/org/apache/kafka/tools/ClusterTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import net.sourceforge.argparse4j.inf.Subparsers;

import java.io.PrintStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -74,7 +74,7 @@ static void execute(String... args) throws Exception {
.help("Unregister a broker.");
Subparser listEndpoints = subparsers.addParser("list-endpoints")
.help("List endpoints");
for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser, listEndpoints)) {
for (Subparser subpparser : List.of(clusterIdParser, unregisterParser, listEndpoints)) {
MutuallyExclusiveGroup connectionOptions = subpparser.addMutuallyExclusiveGroup().required(true);
connectionOptions.addArgument("--bootstrap-server", "-b")
.action(store())
Expand Down Expand Up @@ -162,7 +162,7 @@ static void listEndpoints(PrintStream stream, Admin adminClient, boolean listCon
Collection<Node> nodes = adminClient.describeCluster(option).nodes().get();

String maxHostLength = String.valueOf(nodes.stream().map(node -> node.host().length()).max(Integer::compareTo).orElse(100));
String maxRackLength = String.valueOf(nodes.stream().filter(node -> node.hasRack()).map(node -> node.rack().length()).max(Integer::compareTo).orElse(10));
String maxRackLength = String.valueOf(nodes.stream().filter(Node::hasRack).map(node -> node.rack().length()).max(Integer::compareTo).orElse(10));

if (listControllerEndpoints) {
String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-15s%n";
Expand Down
28 changes: 12 additions & 16 deletions tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -149,14 +147,12 @@ private static Config parseConfig(ArgumentParser parser, Namespace namespace, Pr
if (subcommand == null) {
throw new ArgumentParserException("No subcommand specified", parser);
}
switch (subcommand) {
case "list":
return new Config(Command.LIST, locations, false, false, out, err);
case "sync-manifests":
return new Config(Command.SYNC_MANIFESTS, locations, namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, err);
default:
throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser);
}
return switch (subcommand) {
case "list" -> new Config(Command.LIST, locations, false, false, out, err);
case "sync-manifests" ->
new Config(Command.SYNC_MANIFESTS, locations, namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, err);
default -> throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser);
};
}

private static Set<Path> parseLocations(ArgumentParser parser, Namespace namespace) throws ArgumentParserException, TerseException {
Expand Down Expand Up @@ -197,7 +193,7 @@ private static Set<Path> parseLocations(ArgumentParser parser, Namespace namespa
}

enum Command {
LIST, SYNC_MANIFESTS;
LIST, SYNC_MANIFESTS
}

private static class Config {
Expand Down Expand Up @@ -326,11 +322,11 @@ private static Set<Row> enumerateRows(ManifestWorkspace.SourceWorkspace<?> works
rowAliases.add(PluginUtils.prunedName(pluginDesc));
rows.add(newRow(workspace, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true));
// If a corresponding manifest exists, mark it as loadable by removing it from the map.
nonLoadableManifests.getOrDefault(pluginDesc.className(), Collections.emptySet()).remove(pluginDesc.type());
nonLoadableManifests.getOrDefault(pluginDesc.className(), new HashSet<>()).remove(pluginDesc.type());
});
nonLoadableManifests.forEach((className, types) -> types.forEach(type -> {
// All manifests which remain in the map are not loadable
rows.add(newRow(workspace, className, Collections.emptyList(), type, PluginDesc.UNDEFINED_VERSION, false));
rows.add(newRow(workspace, className, List.of(), type, PluginDesc.UNDEFINED_VERSION, false));
}));
return rows;
}
Expand Down Expand Up @@ -436,8 +432,8 @@ private static void listTablePrint(Config config, Object... args) {
}

private static PluginScanResult discoverPlugins(PluginSource source, ReflectionScanner reflectionScanner, ServiceLoaderScanner serviceLoaderScanner) {
PluginScanResult serviceLoadResult = serviceLoaderScanner.discoverPlugins(Collections.singleton(source));
PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Collections.singleton(source));
return new PluginScanResult(Arrays.asList(serviceLoadResult, reflectiveResult));
PluginScanResult serviceLoadResult = serviceLoaderScanner.discoverPlugins(Set.of(source));
PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Set.of(source));
return new PluginScanResult(List.of(serviceLoadResult, reflectiveResult));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
Expand Down Expand Up @@ -221,8 +220,8 @@ private static void printExtendedProgress(long bytesRead,
}

public static class ConsumerPerfRebListener implements ConsumerRebalanceListener {
private AtomicLong joinTimeMs;
private AtomicLong joinTimeMsInSingleRound;
private final AtomicLong joinTimeMs;
private final AtomicLong joinTimeMsInSingleRound;
private long joinStartMs;

public ConsumerPerfRebListener(AtomicLong joinTimeMs, long joinStartMs, AtomicLong joinTimeMsInSingleRound) {
Expand Down Expand Up @@ -355,7 +354,7 @@ public Properties props() throws IOException {
}

public Set<String> topic() {
return Collections.singleton(options.valueOf(topicOpt));
return Set.of(options.valueOf(topicOpt));
}

public long numMessages() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -106,7 +105,7 @@ public static DelegationToken createToken(Admin adminClient, DelegationTokenComm
CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
DelegationToken token = createResult.delegationToken().get();
System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
printToken(Collections.singletonList(token));
printToken(List.of(token));

return token;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -90,7 +90,7 @@ static void execute(String... args) throws Exception {
int messageSizeBytes = Integer.parseInt(args[4]);
Optional<String> propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();

if (!Arrays.asList("1", "all").contains(acks)) {
if (!List.of("1", "all").contains(acks)) {
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all");
}

Expand Down Expand Up @@ -186,7 +186,7 @@ private static void createTopic(Optional<String> propertiesFile, String brokers,
Admin adminClient = Admin.create(adminProps);
NewTopic newTopic = new NewTopic(topic, DEFAULT_NUM_PARTITIONS, DEFAULT_REPLICATION_FACTOR);
try {
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
adminClient.createTopics(Set.of(newTopic)).all().get();
} catch (ExecutionException | InterruptedException e) {
System.out.printf("Creation of topic %s failed%n", topic);
throw new RuntimeException(e);
Expand Down
Loading