diff --git a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java index b6c0175331dc0..791397b24052f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/AclCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/AclCommand.java @@ -40,9 +40,7 @@ import java.io.IOException; import java.util.AbstractMap; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -177,7 +175,7 @@ private static void printResourceAcls(Map acls, ResourcePatternFilter filter) throws ExecutionException, InterruptedException { if (acls.isEmpty()) { - adminClient.deleteAcls(Collections.singletonList(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get(); + adminClient.deleteAcls(List.of(new AclBindingFilter(filter, AccessControlEntryFilter.ANY))).all().get(); } else { List aclBindingFilters = acls.stream().map(acl -> new AclBindingFilter(filter, acl.toFilter())).collect(Collectors.toList()); adminClient.deleteAcls(aclBindingFilters).all().get(); @@ -249,8 +247,8 @@ private static Map> getProducerRe Set transactionalIds = filters.stream().filter(f -> f.resourceType() == ResourceType.TRANSACTIONAL_ID).collect(Collectors.toSet()); boolean enableIdempotence = opts.options.has(opts.idempotentOpt); - Set topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE, CREATE))); - Set transactionalIdAcls = getAcl(opts, new HashSet<>(Arrays.asList(WRITE, DESCRIBE))); + Set topicAcls = getAcl(opts, Set.of(WRITE, DESCRIBE, CREATE)); + Set transactionalIdAcls = getAcl(opts, Set.of(WRITE, DESCRIBE)); //Write, Describe, Create permission on topics, Write, Describe on transactionalIds Map> result = new HashMap<>(); @@ -261,7 +259,7 @@ private static Map> getProducerRe result.put(transactionalId, transactionalIdAcls); } if (enableIdempotence) { - result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, Collections.singleton(IDEMPOTENT_WRITE))); + result.put(CLUSTER_RESOURCE_FILTER, getAcl(opts, Set.of(IDEMPOTENT_WRITE))); } return result; } @@ -272,8 +270,8 @@ private static Map> getConsumerRe Set groups = filters.stream().filter(f -> f.resourceType() == ResourceType.GROUP).collect(Collectors.toSet()); //Read, Describe on topic, Read on consumerGroup - Set topicAcls = getAcl(opts, new HashSet<>(Arrays.asList(READ, DESCRIBE))); - Set groupAcls = getAcl(opts, Collections.singleton(READ)); + Set topicAcls = getAcl(opts, Set.of(READ, DESCRIBE)); + Set groupAcls = getAcl(opts, Set.of(READ)); Map> result = new HashMap<>(); for (ResourcePatternFilter topic : topics) { @@ -333,9 +331,9 @@ private static Set getHosts(AclCommandOptions opts, OptionSpec h if (opts.options.has(hostOptionSpec)) { return opts.options.valuesOf(hostOptionSpec).stream().map(String::trim).collect(Collectors.toSet()); } else if (opts.options.has(principalOptionSpec)) { - return Collections.singleton(AclEntry.WILDCARD_HOST); + return Set.of(AclEntry.WILDCARD_HOST); } else { - return Collections.emptySet(); + return Set.of(); } } @@ -345,7 +343,7 @@ private static Set getPrincipals(AclCommandOptions opts, OptionS .map(s -> SecurityUtils.parseKafkaPrincipal(s.trim())) .collect(Collectors.toSet()); } else { - return Collections.emptySet(); + return Set.of(); } } @@ -547,7 +545,7 @@ void checkArgs() { if (!options.has(bootstrapServerOpt) && !options.has(bootstrapControllerOpt)) { CommandLineUtils.printUsageAndExit(parser, "One of --bootstrap-server or --bootstrap-controller must be specified"); } - List> mutuallyExclusiveOptions = Arrays.asList(addOpt, removeOpt, listOpt); + List> mutuallyExclusiveOptions = List.of(addOpt, removeOpt, listOpt); long mutuallyExclusiveOptionsCount = mutuallyExclusiveOptions.stream() .filter(abstractOptionSpec -> options.has(abstractOptionSpec)) .count(); @@ -592,10 +590,10 @@ public PatternType convert(String value) { @Override public String valuePattern() { - List values = Arrays.asList(PatternType.values()); + List values = List.of(PatternType.values()); List filteredValues = values.stream() .filter(type -> type != PatternType.UNKNOWN) - .collect(Collectors.toList()); + .toList(); return filteredValues.stream() .map(Object::toString) .collect(Collectors.joining("|")); diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index b4f189cfa38bc..73c01b3ea1cf8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -59,7 +59,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -67,6 +66,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -291,13 +291,13 @@ void testAdminClient() throws Throwable { tryFeature("createTopics", testConfig.createTopicsSupported, () -> { try { - client.createTopics(Collections.singleton( + client.createTopics(Set.of( new NewTopic("newtopic", 1, (short) 1))).all().get(); } catch (ExecutionException e) { throw e.getCause(); } }, - () -> createTopicsResultTest(client, Collections.singleton("newtopic")) + () -> createTopicsResultTest(client, Set.of("newtopic")) ); while (true) { @@ -337,7 +337,7 @@ private void testDescribeConfigsMethod(final Admin client) throws Throwable { ); Map brokerConfig = - client.describeConfigs(Collections.singleton(configResource)).all().get(); + client.describeConfigs(Set.of(configResource)).all().get(); if (brokerConfig.get(configResource).entries().isEmpty()) { throw new KafkaException("Expected to see config entries, but got zero entries"); diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java index 6bd0f29d33dbe..6aec33b593857 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java @@ -37,10 +37,10 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; @@ -91,11 +91,7 @@ static void execute(String... args) throws Exception { } } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause != null) { - printException(cause); - } else { - printException(e); - } + printException(Objects.requireNonNullElse(cause, e)); exitCode = 1; } catch (Throwable t) { printException(t); @@ -130,8 +126,8 @@ public void alterClientMetrics(ClientMetricsCommandOptions opts) throws Exceptio Collection alterEntries = configsToBeSet.entrySet().stream() .map(entry -> new AlterConfigOp(new ConfigEntry(entry.getKey(), entry.getValue()), entry.getValue().isEmpty() ? AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET)) - .collect(Collectors.toList()); - adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterEntries), alterOptions).all() + .toList(); + adminClient.incrementalAlterConfigs(Map.of(configResource, alterEntries), alterOptions).all() .get(30, TimeUnit.SECONDS); System.out.println("Altered client metrics config for " + entityName + "."); @@ -144,8 +140,8 @@ public void deleteClientMetrics(ClientMetricsCommandOptions opts) throws Excepti ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName); AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false); Collection alterEntries = oldConfigs.stream() - .map(entry -> new AlterConfigOp(entry, AlterConfigOp.OpType.DELETE)).collect(Collectors.toList()); - adminClient.incrementalAlterConfigs(Collections.singletonMap(configResource, alterEntries), alterOptions) + .map(entry -> new AlterConfigOp(entry, AlterConfigOp.OpType.DELETE)).toList(); + adminClient.incrementalAlterConfigs(Map.of(configResource, alterEntries), alterOptions) .all().get(30, TimeUnit.SECONDS); System.out.println("Deleted client metrics config for " + entityName + "."); @@ -162,7 +158,7 @@ public void describeClientMetrics(ClientMetricsCommandOptions opts) throws Excep System.out.println("The client metric resource " + entityNameOpt.get() + " doesn't exist and doesn't have dynamic config."); return; } - entities = Collections.singletonList(entityNameOpt.get()); + entities = List.of(entityNameOpt.get()); } else { Collection resources = adminClient .listConfigResources(Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions()) @@ -189,7 +185,7 @@ public void listClientMetrics() throws Exception { private Collection getClientMetricsConfig(String entityName) throws Exception { ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName); - Map result = adminClient.describeConfigs(Collections.singleton(configResource)) + Map result = adminClient.describeConfigs(Set.of(configResource)) .all().get(30, TimeUnit.SECONDS); return result.get(configResource).entries(); } diff --git a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java index ccffaeae0f1d0..3048179a4bcdb 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClusterTool.java @@ -32,8 +32,8 @@ import net.sourceforge.argparse4j.inf.Subparsers; import java.io.PrintStream; -import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -74,7 +74,7 @@ static void execute(String... args) throws Exception { .help("Unregister a broker."); Subparser listEndpoints = subparsers.addParser("list-endpoints") .help("List endpoints"); - for (Subparser subpparser : Arrays.asList(clusterIdParser, unregisterParser, listEndpoints)) { + for (Subparser subpparser : List.of(clusterIdParser, unregisterParser, listEndpoints)) { MutuallyExclusiveGroup connectionOptions = subpparser.addMutuallyExclusiveGroup().required(true); connectionOptions.addArgument("--bootstrap-server", "-b") .action(store()) @@ -162,7 +162,7 @@ static void listEndpoints(PrintStream stream, Admin adminClient, boolean listCon Collection nodes = adminClient.describeCluster(option).nodes().get(); String maxHostLength = String.valueOf(nodes.stream().map(node -> node.host().length()).max(Integer::compareTo).orElse(100)); - String maxRackLength = String.valueOf(nodes.stream().filter(node -> node.hasRack()).map(node -> node.rack().length()).max(Integer::compareTo).orElse(10)); + String maxRackLength = String.valueOf(nodes.stream().filter(Node::hasRack).map(node -> node.rack().length()).max(Integer::compareTo).orElse(10)); if (listControllerEndpoints) { String format = "%-10s %-" + maxHostLength + "s %-10s %-" + maxRackLength + "s %-15s%n"; diff --git a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java index 428bdf54a9d31..a3543df76c7dc 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConnectPluginPath.java @@ -42,7 +42,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -149,14 +148,12 @@ private static Config parseConfig(ArgumentParser parser, Namespace namespace, Pr if (subcommand == null) { throw new ArgumentParserException("No subcommand specified", parser); } - switch (subcommand) { - case "list": - return new Config(Command.LIST, locations, false, false, out, err); - case "sync-manifests": - return new Config(Command.SYNC_MANIFESTS, locations, namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, err); - default: - throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser); - } + return switch (subcommand) { + case "list" -> new Config(Command.LIST, locations, false, false, out, err); + case "sync-manifests" -> + new Config(Command.SYNC_MANIFESTS, locations, namespace.getBoolean("dry_run"), namespace.getBoolean("keep_not_found"), out, err); + default -> throw new ArgumentParserException("Unrecognized subcommand: '" + subcommand + "'", parser); + }; } private static Set parseLocations(ArgumentParser parser, Namespace namespace) throws ArgumentParserException, TerseException { @@ -197,7 +194,7 @@ private static Set parseLocations(ArgumentParser parser, Namespace namespa } enum Command { - LIST, SYNC_MANIFESTS; + LIST, SYNC_MANIFESTS } private static class Config { @@ -326,11 +323,12 @@ private static Set enumerateRows(ManifestWorkspace.SourceWorkspace works rowAliases.add(PluginUtils.prunedName(pluginDesc)); rows.add(newRow(workspace, pluginDesc.className(), new ArrayList<>(rowAliases), pluginDesc.type(), pluginDesc.version(), true)); // If a corresponding manifest exists, mark it as loadable by removing it from the map. + // TODO: The use of Collections here shall be fixed with KAFKA-19524. nonLoadableManifests.getOrDefault(pluginDesc.className(), Collections.emptySet()).remove(pluginDesc.type()); }); nonLoadableManifests.forEach((className, types) -> types.forEach(type -> { // All manifests which remain in the map are not loadable - rows.add(newRow(workspace, className, Collections.emptyList(), type, PluginDesc.UNDEFINED_VERSION, false)); + rows.add(newRow(workspace, className, List.of(), type, PluginDesc.UNDEFINED_VERSION, false)); })); return rows; } @@ -436,8 +434,8 @@ private static void listTablePrint(Config config, Object... args) { } private static PluginScanResult discoverPlugins(PluginSource source, ReflectionScanner reflectionScanner, ServiceLoaderScanner serviceLoaderScanner) { - PluginScanResult serviceLoadResult = serviceLoaderScanner.discoverPlugins(Collections.singleton(source)); - PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Collections.singleton(source)); - return new PluginScanResult(Arrays.asList(serviceLoadResult, reflectiveResult)); + PluginScanResult serviceLoadResult = serviceLoaderScanner.discoverPlugins(Set.of(source)); + PluginScanResult reflectiveResult = reflectionScanner.discoverPlugins(Set.of(source)); + return new PluginScanResult(List.of(serviceLoadResult, reflectiveResult)); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java index 0a5106d0e9c83..0bbc953293fa8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java @@ -37,7 +37,6 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Collection; -import java.util.Collections; import java.util.Map; import java.util.Properties; import java.util.Random; @@ -221,8 +220,8 @@ private static void printExtendedProgress(long bytesRead, } public static class ConsumerPerfRebListener implements ConsumerRebalanceListener { - private AtomicLong joinTimeMs; - private AtomicLong joinTimeMsInSingleRound; + private final AtomicLong joinTimeMs; + private final AtomicLong joinTimeMsInSingleRound; private long joinStartMs; public ConsumerPerfRebListener(AtomicLong joinTimeMs, long joinStartMs, AtomicLong joinTimeMsInSingleRound) { @@ -355,7 +354,7 @@ public Properties props() throws IOException { } public Set topic() { - return Collections.singleton(options.valueOf(topicOpt)); + return Set.of(options.valueOf(topicOpt)); } public long numMessages() { diff --git a/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java b/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java index 04162041f8eac..dad388fee6b93 100644 --- a/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java @@ -38,7 +38,6 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Base64; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Set; @@ -106,7 +105,7 @@ public static DelegationToken createToken(Admin adminClient, DelegationTokenComm CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions); DelegationToken token = createResult.delegationToken().get(); System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId()); - printToken(Collections.singletonList(token)); + printToken(List.of(token)); return token; } diff --git a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java index 96aed36f37e9d..c6914b4667b5b 100644 --- a/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java +++ b/tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java @@ -34,11 +34,11 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.Random; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -90,7 +90,7 @@ static void execute(String... args) throws Exception { int messageSizeBytes = Integer.parseInt(args[4]); Optional propertiesFile = (args.length > 5 && !Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty(); - if (!Arrays.asList("1", "all").contains(acks)) { + if (!List.of("1", "all").contains(acks)) { throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all"); } @@ -186,7 +186,7 @@ private static void createTopic(Optional propertiesFile, String brokers, Admin adminClient = Admin.create(adminProps); NewTopic newTopic = new NewTopic(topic, DEFAULT_NUM_PARTITIONS, DEFAULT_REPLICATION_FACTOR); try { - adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + adminClient.createTopics(Set.of(newTopic)).all().get(); } catch (ExecutionException | InterruptedException e) { System.out.printf("Creation of topic %s failed%n", topic); throw new RuntimeException(e); diff --git a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java index ca0ce8c4ac022..103821cf21df9 100644 --- a/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/FeatureCommand.java @@ -39,7 +39,6 @@ import net.sourceforge.argparse4j.inf.Subparsers; import net.sourceforge.argparse4j.internal.HelpScreenException; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -240,7 +239,7 @@ static void handleDescribe(Admin adminClient) throws ExecutionException, Interru } static String metadataVersionsToString(MetadataVersion first, MetadataVersion last) { - List versions = Arrays.asList(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1); + List versions = List.of(MetadataVersion.VERSIONS).subList(first.ordinal(), last.ordinal() + 1); return versions.stream() .map(String::valueOf) .collect(Collectors.joining(", ")); diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java index a2dafcdef7ae6..4b46f33608728 100644 --- a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java +++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java @@ -42,7 +42,6 @@ import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -312,7 +311,7 @@ static OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseExcep * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER */ public TopicPartitionFilter createTopicPartitionFilterWithPatternList(String topicPartitions) { - List ruleSpecs = Arrays.asList(topicPartitions.split(",")); + List ruleSpecs = List.of(topicPartitions.split(",")); List rules = ruleSpecs.stream().map(ruleSpec -> { try { return parseRuleSpec(ruleSpec); @@ -338,7 +337,7 @@ private Set createPartitionSet(String partitionsString) throws TerseExc Set partitions; if (partitionsString == null || partitionsString.isEmpty()) { - partitions = Collections.emptySet(); + partitions = Set.of(); } else { try { partitions = Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet()); diff --git a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java index c79826e70925f..d4b933c9f6c9a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/GroupsCommand.java @@ -33,6 +33,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -75,11 +76,7 @@ static void execute(String... args) throws Exception { } } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause != null) { - printException(cause); - } else { - printException(e); - } + printException(Objects.requireNonNullElse(cause, e)); exitCode = 1; } catch (Throwable t) { printException(t); diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java index 56cf8d85ab899..c87b15aadf8c6 100644 --- a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java @@ -96,7 +96,7 @@ public static void main(String[] args) { while (keepGoing) { long start = System.currentTimeMillis(); Map attributes = queryAttributes(conn, found, attributesInclude); - attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis())); + attributes.put("time", dateFormat.map(format -> format.format(new Date())).orElseGet(() -> String.valueOf(System.currentTimeMillis()))); maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes); if (options.isOneTime()) { keepGoing = false; @@ -225,7 +225,7 @@ private static Map findNumExpectedAttributes(MBeanServerCon AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo)); List expectedAttributes = new ArrayList<>(); attributes.asList().forEach(attribute -> { - if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) { + if (List.of(attributesInclude.get()).contains(attribute.getName())) { expectedAttributes.add(objectName); } }); @@ -254,10 +254,10 @@ private static Map queryAttributes(MBeanServerConnection conn, for (ObjectName objectName : objectNames) { MBeanInfo beanInfo = conn.getMBeanInfo(objectName); AttributeList attributes = conn.getAttributes(objectName, - Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new)); + Arrays.stream(beanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new)); for (Attribute attribute : attributes.asList()) { if (attributesInclude.isPresent()) { - if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) { + if (List.of(attributesInclude.get()).contains(attribute.getName())) { result.put(String.format("%s:%s", objectName.toString(), attribute.getName()), attribute.getValue()); } @@ -395,7 +395,7 @@ public boolean hasWait() { private String parseFormat() { String reportFormat = options.valueOf(reportFormatOpt).toLowerCase(Locale.ROOT); - return Arrays.asList("properties", "csv", "tsv").contains(reportFormat) ? reportFormat : "original"; + return List.of("properties", "csv", "tsv").contains(reportFormat) ? reportFormat : "original"; } public boolean hasJmxAuthPropOpt() { diff --git a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java index 2814034e167aa..2b0f9c4d7b957 100644 --- a/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/LeaderElectionCommand.java @@ -40,7 +40,6 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -93,7 +92,7 @@ static void run(Duration timeoutMs, String... args) throws Exception { Optional partitionOption = Optional.ofNullable(commandOptions.getPartition()); final Optional> singleTopicPartition = (topicOption.isPresent() && partitionOption.isPresent()) ? - Optional.of(Collections.singleton(new TopicPartition(topicOption.get(), partitionOption.get()))) : + Optional.of(Set.of(new TopicPartition(topicOption.get(), partitionOption.get()))) : Optional.empty(); /* Note: No need to look at --all-topic-partitions as we want this to be null if it is use. @@ -346,7 +345,7 @@ public void validate() { } // One and only one is required: --topic, --all-topic-partitions or --path-to-json-file - List> mutuallyExclusiveOptions = Arrays.asList( + List> mutuallyExclusiveOptions = List.of( topic, allTopicPartitions, pathToJsonFile diff --git a/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java b/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java index d600c5df53664..305a91cfdc485 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java +++ b/tools/src/main/java/org/apache/kafka/tools/ManifestWorkspace.java @@ -45,7 +45,6 @@ import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.EnumMap; import java.util.HashMap; import java.util.HashSet; @@ -83,23 +82,13 @@ public ManifestWorkspace(PrintStream out) { } public SourceWorkspace forSource(PluginSource source) throws IOException { - SourceWorkspace sourceWorkspace; - switch (source.type()) { - case CLASSPATH: - sourceWorkspace = new ClasspathWorkspace(source); - break; - case MULTI_JAR: - sourceWorkspace = new MultiJarWorkspace(source); - break; - case SINGLE_JAR: - sourceWorkspace = new SingleJarWorkspace(source); - break; - case CLASS_HIERARCHY: - sourceWorkspace = new ClassHierarchyWorkspace(source); - break; - default: - throw new IllegalStateException("Unknown source type " + source.type()); - } + SourceWorkspace sourceWorkspace = switch (source.type()) { + case CLASSPATH -> new ClasspathWorkspace(source); + case MULTI_JAR -> new MultiJarWorkspace(source); + case SINGLE_JAR -> new SingleJarWorkspace(source); + case CLASS_HIERARCHY -> new ClassHierarchyWorkspace(source); + default -> throw new IllegalStateException("Unknown source type " + source.type()); + }; workspaces.add(sourceWorkspace); return sourceWorkspace; } @@ -390,7 +379,7 @@ private void rewriteJar(boolean dryRun, Path jarPath, Map { + if (namespace.getBoolean("status") && namespace.getBoolean("replication")) { + throw new TerseException("Only one of --status or --replication should be specified with describe sub-command"); + } else if (namespace.getBoolean("replication")) { + boolean humanReadable = Optional.of(namespace.getBoolean("human_readable")).orElse(false); + handleDescribeReplication(admin, humanReadable); + } else if (namespace.getBoolean("status")) { + if (namespace.getBoolean("human_readable")) { + throw new TerseException("The option --human-readable is only supported along with --replication"); + } + handleDescribeStatus(admin); + } else { + throw new TerseException("One of --status or --replication must be specified with describe sub-command"); } - handleDescribeStatus(admin); - } else { - throw new TerseException("One of --status or --replication must be specified with describe sub-command"); } - } else if (command.equals("add-controller")) { - if (optionalCommandConfig == null) { - throw new TerseException("You must supply the configuration file of the controller you are " + - "adding when using add-controller."); + case "add-controller" -> { + if (optionalCommandConfig == null) { + throw new TerseException("You must supply the configuration file of the controller you are " + + "adding when using add-controller."); + } + handleAddController(admin, + namespace.getBoolean("dry_run"), + props); } - handleAddController(admin, - namespace.getBoolean("dry_run"), - props); - } else if (command.equals("remove-controller")) { - handleRemoveController(admin, + case "remove-controller" -> handleRemoveController(admin, namespace.getInt("controller_id"), namespace.getString("controller_directory_id"), namespace.getBoolean("dry_run")); - } else { - throw new IllegalStateException(format("Unknown command: %s", command)); + default -> throw new IllegalStateException(format("Unknown command: %s", command)); } } finally { if (admin != null) @@ -231,7 +231,7 @@ private static List> quorumInfoToRows(QuorumInfo.ReplicaState leade lastFetchTimestamp, lastCaughtUpTimestamp, status - ).map(r -> r.toString()).collect(Collectors.toList()); + ).map(Object::toString).collect(Collectors.toList()); }).collect(Collectors.toList()); } @@ -253,7 +253,7 @@ private static void handleDescribeStatus(Admin admin) throws ExecutionException, QuorumInfo quorumInfo = admin.describeMetadataQuorum().quorumInfo().get(); int leaderId = quorumInfo.leaderId(); QuorumInfo.ReplicaState leader = quorumInfo.voters().stream().filter(voter -> voter.replicaId() == leaderId).findFirst().get(); - QuorumInfo.ReplicaState maxLagFollower = quorumInfo.voters().stream().min(Comparator.comparingLong(qi -> qi.logEndOffset())).get(); + QuorumInfo.ReplicaState maxLagFollower = quorumInfo.voters().stream().min(Comparator.comparingLong(QuorumInfo.ReplicaState::logEndOffset)).get(); long maxFollowerLag = leader.logEndOffset() - maxLagFollower.logEndOffset(); long maxFollowerLagTimeMs; @@ -292,7 +292,7 @@ private static String printReplicaState(QuorumInfo quorumInfo, List currentVoterList = replicas.stream().map(voter -> new Node( voter.replicaId(), voter.replicaDirectoryId(), - getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).collect(Collectors.toList()); + getEndpoints(quorumInfo.nodes().get(voter.replicaId())))).toList(); return currentVoterList.stream().map(Objects::toString).collect(Collectors.joining(", ", "[", "]")); } @@ -378,7 +378,7 @@ static String getMetadataDirectory(Properties props) throws TerseException { static Uuid getMetadataDirectoryId(String metadataDirectory) throws Exception { MetaPropertiesEnsemble ensemble = new MetaPropertiesEnsemble.Loader(). - addLogDirs(Collections.singletonList(metadataDirectory)). + addLogDirs(List.of(metadataDirectory)). addMetadataLogDir(metadataDirectory). load(); MetaProperties metaProperties = ensemble.logDirProps().get(metadataDirectory); diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java index 4cbb3329aa188..c529e4b68201c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java +++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java @@ -42,7 +42,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -367,7 +366,7 @@ public Map resetFromFile(String groupId) { if (resetPlanForGroup == null) { printError("No reset plan for group " + groupId + " found", Optional.empty()); - return Collections.emptyMap(); + return Map.of(); } Map requestedOffsets = resetPlanForGroup.keySet().stream().collect(Collectors.toMap( @@ -376,7 +375,7 @@ public Map resetFromFile(String groupId) { return checkOffsetsRange(requestedOffsets).entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); - }).orElseGet(Collections::emptyMap); + }).orElseGet(Map::of); } public Map resetToCurrent(Collection partitionsToReset, Map currentCommittedOffsets) { diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java index 67ec1f6c50a9f..c5f1ddcc2d483 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java @@ -127,7 +127,7 @@ public static void main(String[] args) { List filteredTopicMetadata = topicsMetadata.stream().filter( topicMetadata -> options.topicsIncludeFilter().isTopicAllowed(topicMetadata.name(), false) - ).collect(Collectors.toList()); + ).toList(); if (filteredTopicMetadata.isEmpty()) { LOG.error("No topics found. {} if specified, is either filtering out all topics or there is no topic.", options.topicsIncludeOpt); @@ -196,7 +196,7 @@ public static void main(String[] args) { counter.incrementAndGet() ); }) - .collect(Collectors.toList()); + .toList(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOG.info("Stopping all fetchers"); diff --git a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java index b04bf922d041f..9e3f0ff0152f6 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java @@ -37,7 +37,6 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -396,7 +395,7 @@ public Properties props() throws IOException { } public Set topic() { - return Collections.singleton(options.valueOf(topicOpt)); + return Set.of(options.valueOf(topicOpt)); } public long numMessages() { diff --git a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java index d15d7dcde1328..7047b77c1804f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java +++ b/tools/src/main/java/org/apache/kafka/tools/StreamsResetter.java @@ -44,7 +44,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -178,7 +177,7 @@ private void maybeDeleteActiveConsumers(final String groupId, final StreamsResetterOptions options) throws ExecutionException, InterruptedException { final DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups( - Collections.singleton(groupId), + Set.of(groupId), new DescribeConsumerGroupsOptions().timeoutMs(10 * 1000)); try { final List members = @@ -212,15 +211,15 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map notFoundInputTopics = new ArrayList<>(); final List notFoundIntermediateTopics = new ArrayList<>(); - if (inputTopics.size() == 0 && intermediateTopics.size() == 0) { + if (inputTopics.isEmpty() && intermediateTopics.isEmpty()) { System.out.println("No input or intermediate topics specified. Skipping seek."); return EXIT_CODE_SUCCESS; } - if (inputTopics.size() != 0) { + if (!inputTopics.isEmpty()) { System.out.println("Reset-offsets for input topics " + inputTopics); } - if (intermediateTopics.size() != 0) { + if (!intermediateTopics.isEmpty()) { System.out.println("Seek-to-end for intermediate topics " + intermediateTopics); } @@ -313,7 +312,7 @@ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map client, final Set intermediateTopicPartitions) { - if (intermediateTopicPartitions.size() > 0) { + if (!intermediateTopicPartitions.isEmpty()) { System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")"); for (final TopicPartition topicPartition : intermediateTopicPartitions) { if (allTopics.contains(topicPartition.topic())) { @@ -328,7 +327,7 @@ private void maybeReset(final Consumer client, final Set inputTopicPartitions, final StreamsResetterOptions options) throws IOException, ParseException { - if (inputTopicPartitions.size() > 0) { + if (!inputTopicPartitions.isEmpty()) { System.out.println("Following input topics offsets will be reset to (for consumer group " + options.applicationId() + ")"); if (options.hasToOffset()) { resetOffsetsTo(client, inputTopicPartitions, options.toOffset()); @@ -405,7 +404,7 @@ public void resetToDatetime(final Consumer client, if (partitionOffset.isPresent()) { client.seek(topicPartition, partitionOffset.get()); } else { - client.seekToEnd(Collections.singletonList(topicPartition)); + client.seekToEnd(List.of(topicPartition)); System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() + " is empty, without a committed record. Falling back to latest known offset."); } @@ -508,7 +507,7 @@ private int maybeDeleteInternalTopics(final Admin adminClient, final StreamsRese final List topicsToDelete; if (!specifiedInternalTopics.isEmpty()) { - if (!inferredInternalTopics.containsAll(specifiedInternalTopics)) { + if (!new HashSet<>(inferredInternalTopics).containsAll(specifiedInternalTopics)) { throw new IllegalArgumentException("Invalid topic specified in the " + "--internal-topics option. " + "Ensure that the topics specified are all internal topics. " diff --git a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java index 4da7f47e564a2..b44443335008c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TopicCommand.java @@ -59,18 +59,19 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import java.util.stream.Stream; import joptsimple.ArgumentAcceptingOptionSpec; import joptsimple.OptionSpec; @@ -112,11 +113,7 @@ static void execute(String... args) throws Exception { } } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause != null) { - printException(cause); - } else { - printException(e); - } + printException(Objects.requireNonNullElse(cause, e)); exitCode = 1; } catch (Throwable e) { printException(e); @@ -159,10 +156,10 @@ static Map> parseReplicaAssignment(String replicaAssignme @SuppressWarnings("deprecation") private static Properties parseTopicConfigsToBeAdded(TopicCommandOptions opts) { - List> configsToBeAdded = opts.topicConfig().orElse(Collections.emptyList()) + List> configsToBeAdded = opts.topicConfig().orElse(List.of()) .stream() - .map(s -> Arrays.asList(s.split("\\s*=\\s*"))) - .collect(Collectors.toList()); + .map(s -> List.of(s.split("\\s*=\\s*"))) + .toList(); if (!configsToBeAdded.stream().allMatch(config -> config.size() == 2)) { throw new IllegalArgumentException("requirement failed: Invalid topic config: all configs to be added must be in the format \"key=val\"."); @@ -256,7 +253,7 @@ public CommandTopicPartition(TopicCommandOptions options) { name = options.topic().get(); partitions = options.partitions(); replicationFactor = options.replicationFactor(); - replicaAssignment = options.replicaAssignment().orElse(Collections.emptyMap()); + replicaAssignment = options.replicaAssignment().orElse(Map.of()); configsToAdd = parseTopicConfigsToBeAdded(options); } @@ -357,10 +354,10 @@ public void printDescription() { .collect(Collectors.joining(","))); if (reassignment != null) { System.out.print("\tAdding Replicas: " + reassignment.addingReplicas().stream() - .map(node -> node.toString()) + .map(Object::toString) .collect(Collectors.joining(","))); System.out.print("\tRemoving Replicas: " + reassignment.removingReplicas().stream() - .map(node -> node.toString()) + .map(Object::toString) .collect(Collectors.joining(","))); } @@ -443,9 +440,7 @@ public TopicService(Admin admin) { } private static Admin createAdminClient(Properties commandConfig, Optional bootstrapServer) { - if (bootstrapServer.isPresent()) { - commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer.get()); - } + bootstrapServer.ifPresent(s -> commandConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, s)); return Admin.create(commandConfig); } @@ -475,10 +470,10 @@ public void createTopic(CommandTopicPartition topic) throws Exception { } Map configsMap = topic.configsToAdd.stringPropertyNames().stream() - .collect(Collectors.toMap(name -> name, name -> topic.configsToAdd.getProperty(name))); + .collect(Collectors.toMap(name -> name, topic.configsToAdd::getProperty)); newTopic.configs(configsMap); - CreateTopicsResult createResult = adminClient.createTopics(Collections.singleton(newTopic), + CreateTopicsResult createResult = adminClient.createTopics(Set.of(newTopic), new CreateTopicsOptions().retryOnQuotaViolation(false)); createResult.all().get(); System.out.println("Created topic " + topic.name + "."); @@ -493,9 +488,7 @@ public void createTopic(CommandTopicPartition topic) throws Exception { } public void listTopics(TopicCommandOptions opts) throws ExecutionException, InterruptedException { - String results = getTopics(opts.topic(), opts.excludeInternalTopics()) - .stream() - .collect(Collectors.joining("\n")); + String results = String.join("\n", getTopics(opts.topic(), opts.excludeInternalTopics())); System.out.println(results); } @@ -539,7 +532,7 @@ public Map listAllReassignments(Set topics; if (useTopicId) { topicIds = getTopicIds(inputTopicId.get(), opts.excludeInternalTopics()); - topics = Collections.emptyList(); + topics = List.of(); } else { - topicIds = Collections.emptyList(); + topicIds = List.of(); topics = getTopics(opts.topic(), opts.excludeInternalTopics()); } @@ -588,7 +581,7 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I List topicNames = topicDescriptions.stream() .map(org.apache.kafka.clients.admin.TopicDescription::name) - .collect(Collectors.toList()); + .toList(); Map> allConfigs = adminClient.describeConfigs( topicNames.stream() .map(name -> new ConfigResource(ConfigResource.Type.TOPIC, name)) @@ -596,7 +589,7 @@ public void describeTopic(TopicCommandOptions opts) throws ExecutionException, I ).values(); List liveBrokers = adminClient.describeCluster().nodes().get().stream() .map(Node::id) - .collect(Collectors.toList()); + .toList(); DescribeOptions describeOptions = new DescribeOptions(opts, new HashSet<>(liveBrokers)); Set topicPartitions = topicDescriptions .stream() @@ -647,7 +640,7 @@ numPartitions, getReplicationFactor(firstPartition, reassignment), public void deleteTopic(TopicCommandOptions opts) throws ExecutionException, InterruptedException { List topics = getTopics(opts.topic(), opts.excludeInternalTopics()); ensureTopicExists(topics, opts.topic(), !opts.ifExists()); - adminClient.deleteTopics(Collections.unmodifiableList(topics), + adminClient.deleteTopics(List.copyOf(topics), new DeleteTopicsOptions().retryOnQuotaViolation(false) ).all().get(); } @@ -668,10 +661,10 @@ public List getTopicIds(Uuid topicIdIncludeList, boolean excludeInternalTo List allTopicIds = allTopics.listings().get().stream() .map(TopicListing::topicId) .sorted() - .collect(Collectors.toList()); + .toList(); return allTopicIds.contains(topicIdIncludeList) ? - Collections.singletonList(topicIdIncludeList) : - Collections.emptyList(); + List.of(topicIdIncludeList) : + List.of(); } @Override @@ -835,7 +828,7 @@ public Optional valueAsOption(OptionSpec option) { } public Optional> valuesAsOption(OptionSpec option) { - return valuesAsOption(option, Collections.emptyList()); + return valuesAsOption(option, List.of()); } public Optional valueAsOption(OptionSpec option, Optional defaultValue) { @@ -953,8 +946,7 @@ public void checkArgs() { // should have exactly one action long actions = - Arrays.asList(createOpt, listOpt, alterOpt, describeOpt, deleteOpt) - .stream().filter(options::has) + Stream.of(createOpt, listOpt, alterOpt, describeOpt, deleteOpt).filter(options::has) .count(); if (actions != 1) CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete"); @@ -989,29 +981,29 @@ private void checkRequiredArgs() { private void checkInvalidArgs() { // check invalid args - CommandLineUtils.checkInvalidArgs(parser, options, configOpt, invalidOptions(Arrays.asList(alterOpt, createOpt))); - CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, invalidOptions(Arrays.asList(alterOpt, createOpt))); - CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, invalidOptions(Arrays.asList(createOpt))); - CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, invalidOptions(Arrays.asList(alterOpt, createOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, configOpt, invalidOptions(List.of(alterOpt, createOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, invalidOptions(List.of(alterOpt, createOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, invalidOptions(List.of(createOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, invalidOptions(List.of(alterOpt, createOpt))); if (options.has(createOpt)) { CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, partitionsOpt, replicationFactorOpt); } CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt, - invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportUnderReplicatedPartitionsOpt))); + invalidOptions(Set.of(topicsWithOverridesOpt), List.of(describeOpt, reportUnderReplicatedPartitionsOpt))); CommandLineUtils.checkInvalidArgs(parser, options, reportUnderMinIsrPartitionsOpt, - invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportUnderMinIsrPartitionsOpt))); + invalidOptions(Set.of(topicsWithOverridesOpt), List.of(describeOpt, reportUnderMinIsrPartitionsOpt))); CommandLineUtils.checkInvalidArgs(parser, options, reportAtMinIsrPartitionsOpt, - invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportAtMinIsrPartitionsOpt))); + invalidOptions(Set.of(topicsWithOverridesOpt), List.of(describeOpt, reportAtMinIsrPartitionsOpt))); CommandLineUtils.checkInvalidArgs(parser, options, reportUnavailablePartitionsOpt, - invalidOptions(Collections.singleton(topicsWithOverridesOpt), Arrays.asList(describeOpt, reportUnavailablePartitionsOpt))); + invalidOptions(Set.of(topicsWithOverridesOpt), List.of(describeOpt, reportUnavailablePartitionsOpt))); CommandLineUtils.checkInvalidArgs(parser, options, topicsWithOverridesOpt, - invalidOptions(new HashSet<>(allReplicationReportOpts), Arrays.asList(describeOpt))); + invalidOptions(new HashSet<>(allReplicationReportOpts), List.of(describeOpt))); CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, - invalidOptions(Arrays.asList(alterOpt, deleteOpt, describeOpt))); - CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, invalidOptions(Arrays.asList(createOpt))); - CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, invalidOptions(Arrays.asList(listOpt, describeOpt))); + invalidOptions(List.of(alterOpt, deleteOpt, describeOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, invalidOptions(List.of(createOpt))); + CommandLineUtils.checkInvalidArgs(parser, options, excludeInternalTopicOpt, invalidOptions(List.of(listOpt, describeOpt))); } private Set> invalidOptions(List> removeOptions) { diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 7a79f51046c76..2930d490f4f05 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -47,17 +47,16 @@ import java.text.SimpleDateFormat; import java.time.Duration; import java.util.Collection; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import static java.util.Collections.singleton; import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; @@ -240,13 +239,13 @@ private static void resetToLastCommittedPositions(KafkaConsumer if (offsetAndMetadata != null) consumer.seek(tp, offsetAndMetadata.offset()); else - consumer.seekToBeginning(singleton(tp)); + consumer.seekToBeginning(Set.of(tp)); }); } private static long messagesRemaining(KafkaConsumer consumer, TopicPartition partition) { long currentPosition = consumer.position(partition); - Map endOffsets = consumer.endOffsets(singleton(partition)); + Map endOffsets = consumer.endOffsets(Set.of(partition)); if (endOffsets.containsKey(partition)) { return endOffsets.get(partition) - currentPosition; } @@ -319,7 +318,7 @@ public static void runEventLoop(Namespace parsedArgs) { final AtomicLong numMessagesProcessedSinceLastRebalance = new AtomicLong(0); final AtomicLong totalMessageProcessed = new AtomicLong(0); if (groupMode) { - consumer.subscribe(Collections.singleton(topicName), new ConsumerRebalanceListener() { + consumer.subscribe(Set.of(topicName), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection partitions) { } @@ -341,7 +340,7 @@ public void onPartitionsAssigned(Collection partitions) { }); } else { TopicPartition inputPartition = new TopicPartition(topicName, parsedArgs.getInt("inputPartition")); - consumer.assign(singleton(inputPartition)); + consumer.assign(Set.of(inputPartition)); remainingMessages.set(Math.min(messagesRemaining(consumer, inputPartition), remainingMessages.get())); } diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index c1dbefcef6045..9c5323104fedd 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -50,7 +50,6 @@ import java.io.PrintStream; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -65,8 +64,6 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; -import static java.util.Collections.singleton; -import static java.util.Collections.singletonList; import static net.sourceforge.argparse4j.impl.Arguments.store; public abstract class TransactionsCommand { @@ -159,7 +156,7 @@ private AbortTransactionSpec buildAbortSpec( ) throws Exception { final DescribeProducersResult.PartitionProducerState result; try { - result = admin.describeProducers(singleton(topicPartition)) + result = admin.describeProducers(Set.of(topicPartition)) .partitionResult(topicPartition) .get(); } catch (ExecutionException e) { @@ -345,7 +342,7 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception final DescribeProducersResult.PartitionProducerState result; try { - result = admin.describeProducers(singleton(topicPartition), options) + result = admin.describeProducers(Set.of(topicPartition), options) .partitionResult(topicPartition) .get(); } catch (ExecutionException e) { @@ -418,7 +415,7 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception final TransactionDescription result; try { - result = admin.describeTransactions(singleton(transactionalId)) + result = admin.describeTransactions(Set.of(transactionalId)) .description(transactionalId) .get(); } catch (ExecutionException e) { @@ -451,7 +448,7 @@ public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception result.topicPartitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",")) ); - ToolsUtils.prettyPrintTable(HEADERS, singletonList(row), out); + ToolsUtils.prettyPrintTable(HEADERS, List.of(row), out); } } @@ -615,7 +612,7 @@ void execute(Admin admin, Namespace ns, PrintStream out) throws Exception { ); if (candidates.isEmpty()) { - printHangingTransactions(Collections.emptyList(), out); + printHangingTransactions(List.of(), out); } else { Map> openTransactionsByProducerId = groupByProducerId(candidates); @@ -649,9 +646,9 @@ private List collectTopicPartitionsToSearch( if (topic.isPresent()) { if (partition.isPresent()) { - return Collections.singletonList(new TopicPartition(topic.get(), partition.get())); + return List.of(new TopicPartition(topic.get(), partition.get())); } else { - topics = Collections.singletonList(topic.get()); + topics = List.of(topic.get()); } } else { topics = listTopics(admin); @@ -752,7 +749,7 @@ private Map describeTransactions( } catch (ExecutionException e) { printErrorAndExit("Failed to describe " + transactionalIds.size() + " transactions", e.getCause()); - return Collections.emptyMap(); + return Map.of(); } } @@ -778,7 +775,7 @@ private List listTopics( return new ArrayList<>(admin.listTopics(listOptions).names().get()); } catch (ExecutionException e) { printErrorAndExit("Failed to list topics", e.getCause()); - return Collections.emptyList(); + return List.of(); } } @@ -788,14 +785,14 @@ private List findTopicPartitions( List topics ) throws Exception { List topicPartitions = new ArrayList<>(); - consumeInBatches(topics, MAX_BATCH_SIZE, batch -> { + consumeInBatches(topics, MAX_BATCH_SIZE, batch -> findTopicPartitions( admin, brokerId, batch, topicPartitions - ); - }); + ) + ); return topicPartitions; } @@ -807,13 +804,13 @@ private void findTopicPartitions( ) throws Exception { try { Map topicDescriptions = admin.describeTopics(topics).allTopicNames().get(); - topicDescriptions.forEach((topic, description) -> { + topicDescriptions.forEach((topic, description) -> description.partitions().forEach(partitionInfo -> { if (brokerId.isEmpty() || hasReplica(brokerId.get(), partitionInfo)) { topicPartitions.add(new TopicPartition(topic, partitionInfo.partition())); } - }); - }); + }) + ); } catch (ExecutionException e) { printErrorAndExit("Failed to describe " + topics.size() + " topics", e.getCause()); } @@ -838,15 +835,15 @@ private List collectCandidateOpenTransactions( List candidateTransactions = new ArrayList<>(); - consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> { + consumeInBatches(topicPartitions, MAX_BATCH_SIZE, batch -> collectCandidateOpenTransactions( admin, brokerId, maxTransactionTimeoutMs, batch, candidateTransactions - ); - }); + ) + ); return candidateTransactions; } @@ -880,7 +877,7 @@ private void collectCandidateOpenTransactions( long currentTimeMs = time.milliseconds(); - producersByPartition.forEach((topicPartition, producersStates) -> { + producersByPartition.forEach((topicPartition, producersStates) -> producersStates.activeProducers().forEach(activeProducer -> { if (activeProducer.currentTransactionStartOffset().isPresent()) { long transactionDurationMs = currentTimeMs - activeProducer.lastTimestamp(); @@ -891,8 +888,8 @@ private void collectCandidateOpenTransactions( )); } } - }); - }); + }) + ); } catch (ExecutionException e) { printErrorAndExit("Failed to describe producers for " + topicPartitions.size() + " partitions on broker " + brokerId, e.getCause()); @@ -928,7 +925,7 @@ private Map lookupTransactionalIds( } catch (ExecutionException e) { printErrorAndExit("Failed to list transactions for " + producerIds.size() + " producers", e.getCause()); - return Collections.emptyMap(); + return Map.of(); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 0da56f1340b7d..1fa1aed8c4f08 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -58,7 +58,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -231,7 +230,7 @@ public void commitSync(Map offsets) { public void run() { try { printJson(new StartupComplete()); - consumer.subscribe(Collections.singletonList(topic), this); + consumer.subscribe(List.of(topic), this); while (!isFinished()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE)); @@ -623,7 +622,7 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] boolean useAutoCommit = res.getBoolean("useAutoCommit"); String configFile = res.getString("consumer.config"); - String brokerHostandPort = res.getString("bootstrapServer"); + String brokerHostAndPort = res.getString("bootstrapServer"); Properties consumerProps = new Properties(); if (configFile != null) { @@ -664,7 +663,7 @@ public static VerifiableConsumer createFromArgs(ArgumentParser parser, String[] consumerProps.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId); } - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostandPort); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerHostAndPort); consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, useAutoCommit); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, res.getString("resetPolicy")); diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java index 496c41412ccb7..d83c1e44244a7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java @@ -61,7 +61,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -421,7 +420,7 @@ public void run() { printJson(new OffsetResetStrategySet(offsetResetStrategy.type().toString())); } - consumer.subscribe(Collections.singleton(this.topic)); + consumer.subscribe(Set.of(this.topic)); consumer.setAcknowledgementCommitCallback(this); while (!(maxMessages >= 0 && totalAcknowledged >= maxMessages)) { ConsumerRecords records = consumer.poll(Duration.ofMillis(5000)); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java index f7c1402f25e3e..74a928b972708 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -160,7 +161,7 @@ public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer con if (opts.partitionArg().isPresent()) { seek(topic.get(), opts.partitionArg().getAsInt(), opts.offsetArg()); } else { - consumer.subscribe(Collections.singletonList(topic.get())); + consumer.subscribe(List.of(topic.get())); } } else { opts.includedTopicsArg().ifPresent(topics -> consumer.subscribe(Pattern.compile(topics))); @@ -169,11 +170,11 @@ public ConsumerWrapper(ConsoleConsumerOptions opts, Consumer con private void seek(String topic, int partitionId, long offset) { TopicPartition topicPartition = new TopicPartition(topic, partitionId); - consumer.assign(Collections.singletonList(topicPartition)); + consumer.assign(List.of(topicPartition)); if (offset == ListOffsetsRequest.EARLIEST_TIMESTAMP) { - consumer.seekToBeginning(Collections.singletonList(topicPartition)); + consumer.seekToBeginning(List.of(topicPartition)); } else if (offset == ListOffsetsRequest.LATEST_TIMESTAMP) { - consumer.seekToEnd(Collections.singletonList(topicPartition)); + consumer.seekToEnd(List.of(topicPartition)); } else { consumer.seek(topicPartition, offset); } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java index 5c6d26f433ef8..cf4daa0c63626 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -186,9 +185,9 @@ public ConsoleConsumerOptions(String[] args) throws IOException { } private void checkRequiredArgs() { - List> topicOrFilterArgs = new ArrayList<>(Arrays.asList(topicArg(), includedTopicsArg())); - topicOrFilterArgs.removeIf(arg -> arg.isEmpty()); - // user need to specify value for either --topic or --include options) + List> topicOrFilterArgs = new ArrayList<>(List.of(topicArg(), includedTopicsArg())); + topicOrFilterArgs.removeIf(Optional::isEmpty); + // user need to specify value for either --topic or --include options if (topicOrFilterArgs.size() != 1) { CommandLineUtils.printUsageAndExit(parser, "Exactly one of --include/--topic is required. "); } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java index 904f2333f2fb4..2505266abf741 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java @@ -36,6 +36,7 @@ import java.time.Duration; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Optional; import java.util.Properties; import java.util.concurrent.CountDownLatch; @@ -166,7 +167,7 @@ public ConsumerWrapper(String topic, this.consumer = consumer; this.timeoutMs = timeoutMs; - consumer.subscribe(Collections.singletonList(topic)); + consumer.subscribe(List.of(topic)); } ConsumerRecord receive() { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index da3ccff926018..9a82020cac8b1 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -56,7 +56,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -119,7 +118,7 @@ static void run(ConsumerGroupCommandOptions opts) { return; } - try (ConsumerGroupService consumerGroupService = new ConsumerGroupService(opts, Collections.emptyMap())) { + try (ConsumerGroupService consumerGroupService = new ConsumerGroupService(opts, Map.of())) { if (opts.options.has(opts.listOpt)) consumerGroupService.listGroups(); else if (opts.options.has(opts.describeOpt)) @@ -240,14 +239,14 @@ void listGroups() throws ExecutionException, InterruptedException { private Set stateValues() { String stateValue = opts.options.valueOf(opts.stateOpt); return (stateValue == null || stateValue.isEmpty()) - ? Collections.emptySet() + ? Set.of() : groupStatesFromString(stateValue); } private Set typeValues() { String typeValue = opts.options.valueOf(opts.typeOpt); return (typeValue == null || typeValue.isEmpty()) - ? Collections.emptySet() + ? Set.of() : consumerGroupTypesFromString(typeValue); } @@ -583,7 +582,7 @@ private Collection collectConsumerAssignment( Optional clientIdOpt ) { if (topicPartitions.isEmpty()) { - return Collections.singleton( + return Set.of( new PartitionAssignmentState(group, coordinator, Optional.empty(), Optional.empty(), Optional.empty(), getLag(Optional.empty(), Optional.empty()), consumerIdOpt, hostOpt, clientIdOpt, Optional.empty(), Optional.empty()) ); @@ -682,7 +681,7 @@ Map> resetOffsets() { break; default: printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty()); - result.put(groupId, Collections.emptyMap()); + result.put(groupId, Map.of()); } } catch (InterruptedException ie) { throw new RuntimeException(ie); @@ -853,7 +852,7 @@ Map describeConsumerGroups(Collection * Returns the state of the specified consumer group and partition assignment states */ Entry, Optional>> collectGroupOffsets(String groupId) throws Exception { - return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty())); + return collectGroupsOffsets(List.of(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty())); } /** @@ -897,7 +896,7 @@ TreeMap, Optional, Optional, Optional>> collectGroupMembers(String groupId) throws Exception { - return collectGroupsMembers(Collections.singleton(groupId)).get(groupId); + return collectGroupsMembers(Set.of(groupId)).get(groupId); } TreeMap, Optional>>> collectGroupsMembers(Collection groupIds) throws Exception { @@ -926,7 +925,7 @@ TreeMap, Optional a.topicPartitions().stream().toList()).orElse(Collections.emptyList()), + consumer.targetAssignment().map(a -> a.topicPartitions().stream().toList()).orElse(List.of()), consumer.memberEpoch(), consumerGroup.targetAssignmentEpoch(), consumer.upgraded() @@ -937,7 +936,7 @@ TreeMap, Optional collectGroupsState(Collection groupIds) throws Exception { @@ -984,14 +983,14 @@ private Collection getPartitionsToReset(String groupId) throws E if (!opts.options.has(opts.resetFromFileOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic."); - return Collections.emptyList(); + return List.of(); } } private Map getCommittedOffsets(String groupId) { try { return adminClient.listConsumerGroupOffsets( - Collections.singletonMap(groupId, new ListConsumerGroupOffsetsSpec()), + Map.of(groupId, new ListConsumerGroupOffsetsSpec()), withTimeoutMs(new ListConsumerGroupOffsetsOptions()) ).partitionsToOffsetAndMetadata(groupId).get(); } catch (InterruptedException | ExecutionException e) { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java index 553ea3acb8a91..30ba137cd8d66 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java @@ -22,8 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -203,11 +201,11 @@ private ConsumerGroupCommandOptions(String[] args) { .describedAs("regex") .ofType(String.class); - allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt)); - allConsumerGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)); - allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt, - resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)); - allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt)); + allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt); + allConsumerGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt, resetOffsetsOpt); + allResetOffsetScenarioOpts = Set.of(resetToOffsetOpt, resetShiftByOpt, + resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt); + allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt); options = parser.parse(args); } @@ -224,7 +222,7 @@ void checkArgs() { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); - List> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt); + List> mutuallyExclusiveOpts = List.of(membersOpt, offsetsOpt, stateOpt); if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { CommandLineUtils.printUsageAndExit(parser, "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 0e5fc6167ee3a..8996f92b84efc 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -387,7 +387,7 @@ Map describeShareGroups(Collection groupI TreeMap collectGroupsDescription(Collection groupIds) throws ExecutionException, InterruptedException { Map shareGroups = describeShareGroups(groupIds); TreeMap res = new TreeMap<>(); - shareGroups.forEach(res::put); + res.putAll(shareGroups); return res; } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java index be99d2946a727..9a96cad00ed2c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -22,8 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -144,10 +142,10 @@ public ShareGroupCommandOptions(String[] args) { verboseOpt = parser.accepts("verbose", VERBOSE_DOC) .availableIf(describeOpt); - allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt)); - allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)); - allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt)); - allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt)); + allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt); + allShareGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt, resetOffsetsOpt); + allResetOffsetScenarioOpts = Set.of(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt); + allDeleteOffsetsOpts = Set.of(groupOpt, topicOpt); options = parser.parse(args); } @@ -162,7 +160,7 @@ public void checkArgs() { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); - List> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt); + List> mutuallyExclusiveOpts = List.of(membersOpt, offsetsOpt, stateOpt); if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { CommandLineUtils.printUsageAndExit(parser, "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 329b202b4a8e2..334f0738ca363 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -226,7 +226,7 @@ static VerifyAssignmentResult verifyAssignment(Admin adminClient, if (!partsOngoing && !movesOngoing && !preserveThrottles) { // If the partition assignments and replica assignments are done, clear any throttles // that were set. We have to clear all throttles, because we don't have enough - // information to know all of the source brokers that might have been involved in the + // information to know all the source brokers that might have been involved in the // previous reassignments. clearAllThrottles(adminClient, targetParts); } @@ -849,7 +849,7 @@ private static void executeMoves(Admin adminClient, .map(Object::toString) .collect(Collectors.joining(",")))); } else { - // If a replica has been moved to a new host and we also specified a particular + // If a replica has been moved to a new host, and we also specified a particular // log directory, we will have to keep retrying the alterReplicaLogDirs // call. It can't take effect until the replica is moved to that host. time.sleep(100); @@ -1342,21 +1342,18 @@ private static List parseTopicsData(String jsonData) throws JsonMappingE } private static List parseTopicsData(int version, JsonValue js) throws JsonMappingException { - switch (version) { - case 1: - List results = new ArrayList<>(); - Optional partitionsSeq = js.asJsonObject().get("topics"); - if (partitionsSeq.isPresent()) { - Iterator iter = partitionsSeq.get().asJsonArray().iterator(); - while (iter.hasNext()) { - results.add(iter.next().asJsonObject().apply("topic").to(STRING)); - } + if (version == 1) { + List results = new ArrayList<>(); + Optional partitionsSeq = js.asJsonObject().get("topics"); + if (partitionsSeq.isPresent()) { + Iterator iter = partitionsSeq.get().asJsonArray().iterator(); + while (iter.hasNext()) { + results.add(iter.next().asJsonObject().apply("topic").to(STRING)); } - return results; - - default: - throw new AdminOperationException("Not supported version field value " + version); + } + return results; } + throw new AdminOperationException("Not supported version field value " + version); } private static Entry>>, Map> parsePartitionReassignmentData( @@ -1376,46 +1373,43 @@ private static Entry>>, Map>>, Map> parsePartitionReassignmentData( int version, JsonValue jsonData ) throws JsonMappingException { - switch (version) { - case 1: - List>> partitionAssignment = new ArrayList<>(); - Map replicaAssignment = new HashMap<>(); - - Optional partitionsSeq = jsonData.asJsonObject().get("partitions"); - if (partitionsSeq.isPresent()) { - Iterator iter = partitionsSeq.get().asJsonArray().iterator(); - while (iter.hasNext()) { - JsonObject partitionFields = iter.next().asJsonObject(); - String topic = partitionFields.apply("topic").to(STRING); - int partition = partitionFields.apply("partition").to(INT); - List newReplicas = partitionFields.apply("replicas").to(INT_LIST); - Optional logDirsOpts = partitionFields.get("log_dirs"); - List newLogDirs; - if (logDirsOpts.isPresent()) - newLogDirs = logDirsOpts.get().to(STRING_LIST); - else - newLogDirs = newReplicas.stream().map(r -> ANY_LOG_DIR).collect(Collectors.toList()); - if (newReplicas.size() != newLogDirs.size()) - throw new AdminCommandFailedException("Size of replicas list " + newReplicas + " is different from " + - "size of log dirs list " + newLogDirs + " for partition " + new TopicPartition(topic, partition)); - partitionAssignment.add(Map.entry(new TopicPartition(topic, partition), newReplicas)); - for (int i = 0; i < newLogDirs.size(); i++) { - Integer replica = newReplicas.get(i); - String logDir = newLogDirs.get(i); - - if (logDir.equals(ANY_LOG_DIR)) - continue; - - replicaAssignment.put(new TopicPartitionReplica(topic, partition, replica), logDir); - } + if (version == 1) { + List>> partitionAssignment = new ArrayList<>(); + Map replicaAssignment = new HashMap<>(); + + Optional partitionsSeq = jsonData.asJsonObject().get("partitions"); + if (partitionsSeq.isPresent()) { + Iterator iter = partitionsSeq.get().asJsonArray().iterator(); + while (iter.hasNext()) { + JsonObject partitionFields = iter.next().asJsonObject(); + String topic = partitionFields.apply("topic").to(STRING); + int partition = partitionFields.apply("partition").to(INT); + List newReplicas = partitionFields.apply("replicas").to(INT_LIST); + Optional logDirsOpts = partitionFields.get("log_dirs"); + List newLogDirs; + if (logDirsOpts.isPresent()) + newLogDirs = logDirsOpts.get().to(STRING_LIST); + else + newLogDirs = newReplicas.stream().map(r -> ANY_LOG_DIR).collect(Collectors.toList()); + if (newReplicas.size() != newLogDirs.size()) + throw new AdminCommandFailedException("Size of replicas list " + newReplicas + " is different from " + + "size of log dirs list " + newLogDirs + " for partition " + new TopicPartition(topic, partition)); + partitionAssignment.add(Map.entry(new TopicPartition(topic, partition), newReplicas)); + for (int i = 0; i < newLogDirs.size(); i++) { + Integer replica = newReplicas.get(i); + String logDir = newLogDirs.get(i); + + if (logDir.equals(ANY_LOG_DIR)) + continue; + + replicaAssignment.put(new TopicPartitionReplica(topic, partition, replica), logDir); } } + } - return Map.entry(partitionAssignment, replicaAssignment); - - default: - throw new AdminOperationException("Not supported version field value " + version); + return Map.entry(partitionAssignment, replicaAssignment); } + throw new AdminOperationException("Not supported version field value " + version); } static ReassignPartitionsCommandOptions validateAndParseArgs(String[] args) { diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java index 4812f1f19e98f..9dbde0fd1af9a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/VerifyAssignmentResult.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionReplica; -import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -34,7 +33,7 @@ public final class VerifyAssignmentResult { public final boolean movesOngoing; public VerifyAssignmentResult(Map partStates) { - this(partStates, false, Collections.emptyMap(), false); + this(partStates, false, Map.of(), false); } /** diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java index 7ac61e9df9e34..0f68bf8290053 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -61,7 +61,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -457,7 +456,7 @@ private List filterExistingGroupTopics(String groupId, List> resetOffsets() { break; default: printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty()); - result.put(groupId, Collections.emptyMap()); + result.put(groupId, Map.of()); } } catch (InterruptedException ie) { throw new RuntimeException(ie); @@ -889,7 +888,7 @@ private Collection getPartitionsToReset(String groupId) throws E if (!opts.options.has(opts.resetFromFileOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic."); - return Collections.emptyList(); + return List.of(); } } diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java index e74104d27ffea..6ce387d3dbd7f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java @@ -22,8 +22,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -200,12 +198,12 @@ public StreamsGroupCommandOptions(String[] args) { exportOpt = parser.accepts("export", EXPORT_DOC); options = parser.parse(args); - allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt, - resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt)); - allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt)); - allStreamsGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt)); - allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(inputTopicOpt, allInputTopicsOpt)); - allDeleteInternalGroupsOpts = new HashSet<>(Arrays.asList(resetOffsetsOpt, deleteOpt)); + allResetOffsetScenarioOpts = Set.of(resetToOffsetOpt, resetShiftByOpt, + resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt); + allGroupSelectionScopeOpts = Set.of(groupOpt, allGroupsOpt); + allStreamsGroupLevelOpts = Set.of(listOpt, describeOpt, deleteOpt); + allDeleteOffsetsOpts = Set.of(inputTopicOpt, allInputTopicsOpt); + allDeleteInternalGroupsOpts = Set.of(resetOffsetsOpt, deleteOpt); } @SuppressWarnings("NPathComplexity") @@ -256,7 +254,7 @@ private void checkDescribeArgs() { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndExit(parser, "Option " + describeOpt + " takes one of these options: " + allGroupSelectionScopeOpts.stream().map(Object::toString).collect(Collectors.joining(", "))); - List> mutuallyExclusiveOpts = Arrays.asList(membersOpt, offsetsOpt, stateOpt); + List> mutuallyExclusiveOpts = List.of(membersOpt, offsetsOpt, stateOpt); if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 1 : 0).sum() > 1) { CommandLineUtils.printUsageAndExit(parser, "Option " + describeOpt + " takes at most one of these options: " + mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(", ")));