diff --git a/tools/src/main/java/org/apache/kafka/tools/Tuple2.java b/tools/src/main/java/org/apache/kafka/tools/Tuple2.java deleted file mode 100644 index 02dd4bf3981a8..0000000000000 --- a/tools/src/main/java/org/apache/kafka/tools/Tuple2.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.tools; - -import java.util.Objects; - -public final class Tuple2 { - public final V1 v1; - - public final V2 v2; - - public Tuple2(V1 v1, V2 v2) { - this.v1 = v1; - this.v2 = v2; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Tuple2 tuple = (Tuple2) o; - return Objects.equals(v1, tuple.v1) && Objects.equals(v2, tuple.v2); - } - - @Override - public int hashCode() { - return Objects.hash(v1, v2); - } - - @Override - public String toString() { - return "Tuple2{v1=" + v1 + ", v2=" + v2 + '}'; - } -} diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java index 2b055de616191..35371ce75aa65 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java @@ -53,7 +53,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.CommandLineUtils; import org.apache.kafka.tools.ToolsUtils; -import org.apache.kafka.tools.Tuple2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,6 +60,7 @@ import java.text.ParseException; import java.time.Duration; import java.time.Instant; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -71,6 +71,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.OptionalInt; import java.util.Properties; @@ -315,10 +316,10 @@ private Optional size(Optional> colOpt) { return colOpt.map(Collection::size); } - private void printOffsets(Map, Optional>>> offsets) { + private void printOffsets(Map, Optional>>> offsets) { offsets.forEach((groupId, tuple) -> { - Optional state = tuple.v1; - Optional> assignments = tuple.v2; + Optional state = tuple.getKey(); + Optional> assignments = tuple.getValue(); if (shouldPrintMemberState(groupId, state, size(assignments))) { String format = printOffsetFormat(assignments); @@ -359,10 +360,10 @@ private static String printOffsetFormat(Optional, Optional>>> members, boolean verbose) { + private void printMembers(Map, Optional>>> members, boolean verbose) { members.forEach((groupId, tuple) -> { - Optional state = tuple.v1; - Optional> assignments = tuple.v2; + Optional state = tuple.getKey(); + Optional> assignments = tuple.getValue(); int maxGroupLen = 15, maxConsumerIdLen = 15, maxGroupInstanceIdLen = 17, maxHostLen = 15, maxClientIdLen = 15; boolean includeGroupInstanceId = false; @@ -448,11 +449,11 @@ void describeGroups() throws Exception { long subActions = Stream.of(membersOptPresent, offsetsOptPresent, stateOptPresent).filter(x -> x).count(); if (subActions == 0 || offsetsOptPresent) { - TreeMap, Optional>>> offsets + TreeMap, Optional>>> offsets = collectGroupsOffsets(groupIds); printOffsets(offsets); } else if (membersOptPresent) { - TreeMap, Optional>>> members + TreeMap, Optional>>> members = collectGroupsMembers(groupIds, opts.options.has(opts.verboseOpt)); printMembers(members, opts.options.has(opts.verboseOpt)); } else { @@ -560,7 +561,7 @@ Map> resetOffsets() { return result; } - Tuple2> deleteOffsets(String groupId, List topics) { + Entry> deleteOffsets(String groupId, List topics) { Map partitionLevelResult = new HashMap<>(); Set topicWithPartitions = new HashSet<>(); Set topicWithoutPartitions = new HashSet<>(); @@ -617,17 +618,17 @@ Tuple2> deleteOffsets(String groupId, Lis } }); - return new Tuple2<>(topLevelException, partitionLevelResult); + return new SimpleImmutableEntry<>(topLevelException, partitionLevelResult); } void deleteOffsets() { String groupId = opts.options.valueOf(opts.groupOpt); List topics = opts.options.valuesOf(opts.topicOpt); - Tuple2> res = deleteOffsets(groupId, topics); + Entry> res = deleteOffsets(groupId, topics); - Errors topLevelResult = res.v1; - Map partitionLevelResult = res.v2; + Errors topLevelResult = res.getKey(); + Map partitionLevelResult = res.getValue(); switch (topLevelResult) { case NONE: @@ -677,7 +678,7 @@ Map describeConsumerGroups(Collection withTimeoutMs(new DescribeConsumerGroupsOptions()) ).describedGroups(); - for (Map.Entry> e : stringKafkaFutureMap.entrySet()) { + for (Entry> e : stringKafkaFutureMap.entrySet()) { res.put(e.getKey(), e.getValue().get()); } return res; @@ -686,16 +687,16 @@ Map describeConsumerGroups(Collection /** * Returns the state of the specified consumer group and partition assignment states */ - Tuple2, Optional>> collectGroupOffsets(String groupId) throws Exception { - return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new Tuple2<>(Optional.empty(), Optional.empty())); + Entry, Optional>> collectGroupOffsets(String groupId) throws Exception { + return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty())); } /** * Returns states of the specified consumer groups and partition assignment states */ - TreeMap, Optional>>> collectGroupsOffsets(Collection groupIds) throws Exception { + TreeMap, Optional>>> collectGroupsOffsets(Collection groupIds) throws Exception { Map consumerGroups = describeConsumerGroups(groupIds); - TreeMap, Optional>>> groupOffsets = new TreeMap<>(); + TreeMap, Optional>>> groupOffsets = new TreeMap<>(); consumerGroups.forEach((groupId, consumerGroup) -> { ConsumerGroupState state = consumerGroup.state(); @@ -737,19 +738,19 @@ TreeMap, Optional(Optional.of(state.toString()), Optional.of(rowsWithConsumer))); + groupOffsets.put(groupId, new SimpleImmutableEntry<>(Optional.of(state.toString()), Optional.of(rowsWithConsumer))); }); return groupOffsets; } - Tuple2, Optional>> collectGroupMembers(String groupId, boolean verbose) throws Exception { + Entry, Optional>> collectGroupMembers(String groupId, boolean verbose) throws Exception { return collectGroupsMembers(Collections.singleton(groupId), verbose).get(groupId); } - TreeMap, Optional>>> collectGroupsMembers(Collection groupIds, boolean verbose) throws Exception { + TreeMap, Optional>>> collectGroupsMembers(Collection groupIds, boolean verbose) throws Exception { Map consumerGroups = describeConsumerGroups(groupIds); - TreeMap, Optional>>> res = new TreeMap<>(); + TreeMap, Optional>>> res = new TreeMap<>(); consumerGroups.forEach((groupId, consumerGroup) -> { String state = consumerGroup.state().toString(); @@ -763,7 +764,7 @@ TreeMap, Optional(verbose ? consumer.assignment().topicPartitions() : Collections.emptySet()) )).collect(Collectors.toList()); - res.put(groupId, new Tuple2<>(Optional.of(state), Optional.of(memberAssignmentStates))); + res.put(groupId, new SimpleImmutableEntry<>(Optional.of(state), Optional.of(memberAssignmentStates))); }); return res; } @@ -836,7 +837,7 @@ private Map getLogTimestampOffsets(Collection successfulLogTimestampOffsets = successfulOffsetsForTimes.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> new LogOffset(e.getValue().offset()))); + .collect(Collectors.toMap(Entry::getKey, e -> new LogOffset(e.getValue().offset()))); unsuccessfulOffsetsForTimes.forEach((tp, offsetResultInfo) -> System.out.println("\nWarn: Partition " + tp.partition() + " from topic " + tp.topic() + @@ -991,7 +992,7 @@ private Map prepareOffsetsToReset(String grou if (opts.options.has(opts.resetToOffsetOpt)) { long offset = opts.options.valueOf(opts.resetToOffsetOpt); return checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), tp -> offset))) - .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + .entrySet().stream().collect(Collectors.toMap(Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); } else if (opts.options.has(opts.resetToEarliestOpt)) { Map logStartOffsets = getLogStartOffsets(partitionsToReset); return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> { @@ -1029,7 +1030,7 @@ private Map prepareOffsetsToReset(String grou return currentOffset.offset() + shiftBy; })); return checkOffsetsRange(requestedOffsets).entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + .collect(Collectors.toMap(Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); } else if (opts.options.has(opts.resetToDatetimeOpt)) { try { long timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt)); @@ -1078,7 +1079,7 @@ private Map prepareOffsetsToReset(String grou topicPartition -> resetPlanForGroup.get(topicPartition).offset())); return checkOffsetsRange(requestedOffsets).entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); + .collect(Collectors.toMap(Entry::getKey, e -> new OffsetAndMetadata(e.getValue()))); }).orElseGet(Collections::emptyMap); } else if (opts.options.has(opts.resetToCurrentOpt)) { Map currentCommittedOffsets = getCommittedOffsets(groupId); @@ -1104,7 +1105,7 @@ private Map prepareOffsetsToReset(String grou })); Map preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset) - .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> { + .entrySet().stream().collect(Collectors.toMap(Entry::getKey, e -> { if (!(e.getValue() instanceof LogOffset)) { ToolsUtils.printUsageAndExit(opts.parser, "Error getting ending offset of topic partition: " + e.getKey()); return null; diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 3623346980743..47c4a8234a177 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -49,9 +49,9 @@ import org.apache.kafka.server.util.json.JsonValue; import org.apache.kafka.tools.TerseException; import org.apache.kafka.tools.ToolsUtils; -import org.apache.kafka.tools.Tuple2; import java.io.IOException; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -62,6 +62,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -203,20 +204,20 @@ static VerifyAssignmentResult verifyAssignment(Admin adminClient, String jsonString, Boolean preserveThrottles ) throws ExecutionException, InterruptedException, JsonProcessingException { - Tuple2>>, Map> t0 = parsePartitionReassignmentData(jsonString); + Entry>>, Map> t0 = parsePartitionReassignmentData(jsonString); - List>> targetParts = t0.v1; - Map targetLogDirs = t0.v2; + List>> targetParts = t0.getKey(); + Map targetLogDirs = t0.getValue(); - Tuple2, Boolean> t1 = verifyPartitionAssignments(adminClient, targetParts); + Entry, Boolean> t1 = verifyPartitionAssignments(adminClient, targetParts); - Map partStates = t1.v1; - Boolean partsOngoing = t1.v2; + Map partStates = t1.getKey(); + Boolean partsOngoing = t1.getValue(); - Tuple2, Boolean> t2 = verifyReplicaMoves(adminClient, targetLogDirs); + Entry, Boolean> t2 = verifyReplicaMoves(adminClient, targetLogDirs); - Map moveStates = t2.v1; - Boolean movesOngoing = t2.v2; + Map moveStates = t2.getKey(); + Boolean movesOngoing = t2.getValue(); if (!partsOngoing && !movesOngoing && !preserveThrottles) { // If the partition assignments and replica assignments are done, clear any throttles @@ -240,11 +241,11 @@ static VerifyAssignmentResult verifyAssignment(Admin adminClient, * reassignments (including reassignments not described * in the JSON file.) */ - private static Tuple2, Boolean> verifyPartitionAssignments(Admin adminClient, - List>> targets + private static Entry, Boolean> verifyPartitionAssignments(Admin adminClient, + List>> targets ) throws ExecutionException, InterruptedException { - Tuple2, Boolean> t0 = findPartitionReassignmentStates(adminClient, targets); - System.out.println(partitionReassignmentStatesToString(t0.v1)); + Entry, Boolean> t0 = findPartitionReassignmentStates(adminClient, targets); + System.out.println(partitionReassignmentStatesToString(t0.getKey())); return t0; } @@ -301,27 +302,27 @@ static String partitionReassignmentStatesToString(Map, Boolean> findPartitionReassignmentStates(Admin adminClient, - List, Boolean> findPartitionReassignmentStates(Admin adminClient, + List>> targetReassignments ) throws ExecutionException, InterruptedException { Map currentReassignments = adminClient. listPartitionReassignments().reassignments().get(); - List>> foundReassignments = new ArrayList<>(); - List>> notFoundReassignments = new ArrayList<>(); + List>> foundReassignments = new ArrayList<>(); + List>> notFoundReassignments = new ArrayList<>(); targetReassignments.forEach(reassignment -> { - if (currentReassignments.containsKey(reassignment.v1)) + if (currentReassignments.containsKey(reassignment.getKey())) foundReassignments.add(reassignment); else notFoundReassignments.add(reassignment); }); - List> foundResults = foundReassignments.stream().map(e -> { - TopicPartition part = e.v1; - List targetReplicas = e.v2; - return new Tuple2<>(part, + List> foundResults = foundReassignments.stream().map(e -> { + TopicPartition part = e.getKey(); + List targetReplicas = e.getValue(); + return new SimpleImmutableEntry<>(part, new PartitionReassignmentState( currentReassignments.get(part).replicas(), targetReplicas, @@ -329,7 +330,7 @@ static Tuple2, Boolean> findPart }).collect(Collectors.toList()); Set topicNamesToLookUp = notFoundReassignments.stream() - .map(e -> e.v1) + .map(e -> e.getKey()) .filter(part -> !currentReassignments.containsKey(part)) .map(TopicPartition::topic) .collect(Collectors.toSet()); @@ -337,28 +338,28 @@ static Tuple2, Boolean> findPart Map> topicDescriptions = adminClient. describeTopics(topicNamesToLookUp).topicNameValues(); - List> notFoundResults = new ArrayList<>(); - for (Tuple2> e : notFoundReassignments) { - TopicPartition part = e.v1; - List targetReplicas = e.v2; + List> notFoundResults = new ArrayList<>(); + for (Entry> e : notFoundReassignments) { + TopicPartition part = e.getKey(); + List targetReplicas = e.getValue(); if (currentReassignments.containsKey(part)) { PartitionReassignment reassignment = currentReassignments.get(part); - notFoundResults.add(new Tuple2<>(part, new PartitionReassignmentState( + notFoundResults.add(new SimpleImmutableEntry<>(part, new PartitionReassignmentState( reassignment.replicas(), targetReplicas, false))); } else { - notFoundResults.add(new Tuple2<>(part, topicDescriptionFutureToState(part.partition(), + notFoundResults.add(new SimpleImmutableEntry<>(part, topicDescriptionFutureToState(part.partition(), topicDescriptions.get(part.topic()), targetReplicas))); } } Map allResults = new HashMap<>(); - foundResults.forEach(e -> allResults.put(e.v1, e.v2)); - notFoundResults.forEach(e -> allResults.put(e.v1, e.v2)); + foundResults.forEach(e -> allResults.put(e.getKey(), e.getValue())); + notFoundResults.forEach(e -> allResults.put(e.getKey(), e.getValue())); - return new Tuple2<>(allResults, !currentReassignments.isEmpty()); + return new SimpleImmutableEntry<>(allResults, !currentReassignments.isEmpty()); } private static PartitionReassignmentState topicDescriptionFutureToState(int partition, @@ -396,12 +397,12 @@ private static PartitionReassignmentState topicDescriptionFutureToState(int part * reassignments. (We don't have an efficient API that * returns all ongoing replica reassignments.) */ - private static Tuple2, Boolean> verifyReplicaMoves(Admin adminClient, + private static Entry, Boolean> verifyReplicaMoves(Admin adminClient, Map targetReassignments ) throws ExecutionException, InterruptedException { Map moveStates = findLogDirMoveStates(adminClient, targetReassignments); System.out.println(replicaMoveStatesToString(moveStates)); - return new Tuple2<>(moveStates, !moveStates.values().stream().allMatch(LogDirMoveState::done)); + return new SimpleImmutableEntry<>(moveStates, !moveStates.values().stream().allMatch(LogDirMoveState::done)); } /** @@ -420,7 +421,7 @@ static Map findLogDirMoveStates(Admin ad Map replicaLogDirInfos = adminClient .describeReplicaLogDirs(targetMoves.keySet()).all().get(); - return targetMoves.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> { + return targetMoves.entrySet().stream().collect(Collectors.toMap(Entry::getKey, e -> { TopicPartitionReplica replica = e.getKey(); String targetLogDir = e.getValue(); @@ -492,16 +493,16 @@ static String replicaMoveStatesToString(Map>> targetParts + List>> targetParts ) throws ExecutionException, InterruptedException { Set brokers = adminClient.describeCluster().nodes().get().stream().map(Node::id).collect(Collectors.toSet()); - targetParts.forEach(t -> brokers.addAll(t.v2)); + targetParts.forEach(t -> brokers.addAll(t.getValue())); System.out.printf("Clearing broker-level throttles on broker%s %s%n", brokers.size() == 1 ? "" : "s", Utils.join(brokers, ",")); clearBrokerLevelThrottles(adminClient, brokers); - Set topics = targetParts.stream().map(t -> t.v1.topic()).collect(Collectors.toSet()); + Set topics = targetParts.stream().map(t -> t.getKey().topic()).collect(Collectors.toSet()); System.out.printf("Clearing topic-level throttles on topic%s %s%n", topics.size() == 1 ? "" : "s", Utils.join(topics, ",")); clearTopicLevelThrottles(adminClient, topics); @@ -550,15 +551,15 @@ private static void clearTopicLevelThrottles(Admin adminClient, Set topi * @return A tuple containing the proposed assignment and the * current assignment. */ - public static Tuple2>, Map>> generateAssignment(Admin adminClient, + public static Entry>, Map>> generateAssignment(Admin adminClient, String reassignmentJson, String brokerListString, Boolean enableRackAwareness ) throws ExecutionException, InterruptedException, JsonProcessingException { - Tuple2, List> t0 = parseGenerateAssignmentArgs(reassignmentJson, brokerListString); + Entry, List> t0 = parseGenerateAssignmentArgs(reassignmentJson, brokerListString); - List brokersToReassign = t0.v1; - List topicsToReassign = t0.v2; + List brokersToReassign = t0.getKey(); + List topicsToReassign = t0.getValue(); Map> currentAssignments = getReplicaAssignmentForTopics(adminClient, topicsToReassign); List brokerMetadatas = getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness); @@ -567,7 +568,7 @@ public static Tuple2>, Map(proposedAssignments, currentAssignments); + return new SimpleImmutableEntry<>(proposedAssignments, currentAssignments); } /** @@ -580,8 +581,8 @@ public static Tuple2>, Map> calculateAssignment(Map> currentAssignment, List brokerMetadatas) { - Map>>> groupedByTopic = new HashMap<>(); - for (Map.Entry> e : currentAssignment.entrySet()) + Map>>> groupedByTopic = new HashMap<>(); + for (Entry> e : currentAssignment.entrySet()) groupedByTopic.computeIfAbsent(e.getKey().topic(), k -> new ArrayList<>()).add(e); Map> proposedAssignments = new HashMap<>(); groupedByTopic.forEach((topic, assignment) -> { @@ -598,7 +599,7 @@ private static Map describeTopics(Admin adminClient, Set topics) throws ExecutionException, InterruptedException { Map> futures = adminClient.describeTopics(topics).topicNameValues(); Map res = new HashMap<>(); - for (Map.Entry> e : futures.entrySet()) { + for (Entry> e : futures.entrySet()) { String topicName = e.getKey(); KafkaFuture topicDescriptionFuture = e.getValue(); try { @@ -695,7 +696,7 @@ static List getBrokerMetadata(Admin adminClient, List b * * @return A tuple of brokers to reassign, topics to reassign */ - static Tuple2, List> parseGenerateAssignmentArgs(String reassignmentJson, + static Entry, List> parseGenerateAssignmentArgs(String reassignmentJson, String brokerList) throws JsonMappingException { List brokerListToReassign = Arrays.stream(brokerList.split(",")).map(Integer::parseInt).collect(Collectors.toList()); Set duplicateReassignments = ToolsUtils.duplicates(brokerListToReassign); @@ -706,7 +707,7 @@ static Tuple2, List> parseGenerateAssignmentArgs(String re if (!duplicateTopicsToReassign.isEmpty()) throw new AdminCommandFailedException(String.format("List of topics to reassign contains duplicate entries: %s", duplicateTopicsToReassign)); - return new Tuple2<>(brokerListToReassign, topicsToReassign); + return new SimpleImmutableEntry<>(brokerListToReassign, topicsToReassign); } /** @@ -731,10 +732,10 @@ public static void executeAssignment(Admin adminClient, Long timeoutMs, Time time ) throws ExecutionException, InterruptedException, JsonProcessingException, TerseException { - Tuple2>, Map> t0 = parseExecuteAssignmentArgs(reassignmentJson); + Entry>, Map> t0 = parseExecuteAssignmentArgs(reassignmentJson); - Map> proposedParts = t0.v1; - Map proposedReplicas = t0.v2; + Map> proposedParts = t0.getKey(); + Map proposedReplicas = t0.getValue(); Map currentReassignments = adminClient. listPartitionReassignments().reassignments().get(); // If there is an existing assignment, check for --additional before proceeding. @@ -901,7 +902,7 @@ static String currentPartitionReplicaAssignmentToString(Map> currentParts) throws JsonProcessingException { Map> partitionsToBeReassigned = currentParts.entrySet().stream() .filter(e -> proposedParts.containsKey(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); return String.format("Current partition replica assignment%n%n%s%n%nSave this to use as the %s", formatAsReassignmentJson(partitionsToBeReassigned, Collections.emptyMap()), @@ -921,7 +922,7 @@ static Map alterPartitionReassignments(Admin adminCli reassignments.forEach((part, replicas) -> args.put(part, Optional.of(new NewPartitionReassignment(replicas)))); Map> results = adminClient.alterPartitionReassignments(args).values(); Map errors = new HashMap<>(); - for (Map.Entry> e : results.entrySet()) { + for (Entry> e : results.entrySet()) { try { e.getValue().get(); } catch (ExecutionException t) { @@ -945,7 +946,7 @@ static Map cancelPartitionReassignments(Admin adminCl Map> results = adminClient.alterPartitionReassignments(args).values(); Map errors = new HashMap<>(); - for (Map.Entry> e : results.entrySet()) { + for (Entry> e : results.entrySet()) { try { e.getValue().get(); } catch (ExecutionException t) { @@ -993,7 +994,7 @@ static Map> calculateProposedMoveMap(Map> proposedParts, Map> currentParts) { Map> moveMap = calculateCurrentMoveMap(currentReassignments); - for (Map.Entry> e : proposedParts.entrySet()) { + for (Entry> e : proposedParts.entrySet()) { TopicPartition part = e.getKey(); List replicas = e.getValue(); Map partMoves = moveMap.computeIfAbsent(part.topic(), k -> new HashMap<>()); @@ -1178,39 +1179,39 @@ static void modifyLogDirThrottle(Admin admin, * @return A tuple of the partitions to be reassigned and the replicas * to be reassigned. */ - static Tuple2>, Map> parseExecuteAssignmentArgs( + static Entry>, Map> parseExecuteAssignmentArgs( String reassignmentJson ) throws JsonProcessingException { - Tuple2>>, Map> t0 = parsePartitionReassignmentData(reassignmentJson); + Entry>>, Map> t0 = parsePartitionReassignmentData(reassignmentJson); - List>> partitionsToBeReassigned = t0.v1; - Map replicaAssignment = t0.v2; + List>> partitionsToBeReassigned = t0.getKey(); + Map replicaAssignment = t0.getValue(); if (partitionsToBeReassigned.isEmpty()) throw new AdminCommandFailedException("Partition reassignment list cannot be empty"); - if (partitionsToBeReassigned.stream().anyMatch(t -> t.v2.isEmpty())) { + if (partitionsToBeReassigned.stream().anyMatch(t -> t.getValue().isEmpty())) { throw new AdminCommandFailedException("Partition replica list cannot be empty"); } - Set duplicateReassignedPartitions = ToolsUtils.duplicates(partitionsToBeReassigned.stream().map(t -> t.v1).collect(Collectors.toList())); + Set duplicateReassignedPartitions = ToolsUtils.duplicates(partitionsToBeReassigned.stream().map(t -> t.getKey()).collect(Collectors.toList())); if (!duplicateReassignedPartitions.isEmpty()) { throw new AdminCommandFailedException(String.format( "Partition reassignment contains duplicate topic partitions: %s", duplicateReassignedPartitions.stream().map(Object::toString).collect(Collectors.joining(","))) ); } - List>> duplicateEntries = partitionsToBeReassigned.stream() - .map(t -> new Tuple2<>(t.v1, ToolsUtils.duplicates(t.v2))) - .filter(t -> !t.v2.isEmpty()) + List>> duplicateEntries = partitionsToBeReassigned.stream() + .map(t -> new SimpleImmutableEntry<>(t.getKey(), ToolsUtils.duplicates(t.getValue()))) + .filter(t -> !t.getValue().isEmpty()) .collect(Collectors.toList()); if (!duplicateEntries.isEmpty()) { String duplicatesMsg = duplicateEntries.stream().map(t -> String.format("%s contains multiple entries for %s", - t.v1, - t.v2.stream().map(Object::toString).collect(Collectors.joining(","))) + t.getKey(), + t.getValue().stream().map(Object::toString).collect(Collectors.joining(","))) ).collect(Collectors.joining(". ")); throw new AdminCommandFailedException(String.format("Partition replica lists may not contain duplicate entries: %s", duplicatesMsg)); } - return new Tuple2<>(partitionsToBeReassigned.stream().collect(Collectors.toMap(t -> t.v1, t -> t.v2)), replicaAssignment); + return new SimpleImmutableEntry<>(partitionsToBeReassigned.stream().collect(Collectors.toMap(t -> t.getKey(), t -> t.getValue())), replicaAssignment); } /** @@ -1226,17 +1227,17 @@ static Tuple2>, Map, Set> cancelAssignment(Admin adminClient, + static Entry, Set> cancelAssignment(Admin adminClient, String jsonString, Boolean preserveThrottles, Long timeoutMs, Time time ) throws ExecutionException, InterruptedException, JsonProcessingException, TerseException { - Tuple2>>, Map> t0 = parsePartitionReassignmentData(jsonString); + Entry>>, Map> t0 = parsePartitionReassignmentData(jsonString); - List>> targetParts = t0.v1; - Map targetReplicas = t0.v2; - Set targetPartsSet = targetParts.stream().map(t -> t.v1).collect(Collectors.toSet()); + List>> targetParts = t0.getKey(); + Map targetReplicas = t0.getValue(); + Set targetPartsSet = targetParts.stream().map(t -> t.getKey()).collect(Collectors.toSet()); Set curReassigningParts = new HashSet<>(); adminClient.listPartitionReassignments(targetPartsSet).reassignments().get().forEach((part, reassignment) -> { if (reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty()) @@ -1273,7 +1274,7 @@ static Tuple2, Set> cancelAssignment( if (!preserveThrottles) { clearAllThrottles(adminClient, targetParts); } - return new Tuple2<>(curReassigningParts, curMovingParts.keySet()); + return new SimpleImmutableEntry<>(curReassigningParts, curMovingParts.keySet()); } public static String formatAsReassignmentJson(Map> partitionsToBeReassigned, @@ -1330,7 +1331,7 @@ private static List parseTopicsData(int version, JsonValue js) throws Js } } - private static Tuple2>>, Map> parsePartitionReassignmentData( + private static Entry>>, Map> parsePartitionReassignmentData( String jsonData ) throws JsonProcessingException { JsonValue js; @@ -1344,12 +1345,12 @@ private static Tuple2>>, Map>>, Map> parsePartitionReassignmentData( + private static Entry>>, Map> parsePartitionReassignmentData( int version, JsonValue jsonData ) throws JsonMappingException { switch (version) { case 1: - List>> partitionAssignment = new ArrayList<>(); + List>> partitionAssignment = new ArrayList<>(); Map replicaAssignment = new HashMap<>(); Optional partitionsSeq = jsonData.asJsonObject().get("partitions"); @@ -1369,7 +1370,7 @@ private static Tuple2>>, Map(new TopicPartition(topic, partition), newReplicas)); + partitionAssignment.add(new SimpleImmutableEntry<>(new TopicPartition(topic, partition), newReplicas)); for (int i = 0; i < newLogDirs.size(); i++) { Integer replica = newReplicas.get(i); String logDir = newLogDirs.get(i); @@ -1382,7 +1383,7 @@ private static Tuple2>>, Map(partitionAssignment, replicaAssignment); + return new SimpleImmutableEntry<>(partitionAssignment, replicaAssignment); default: throw new AdminOperationException("Not supported version field value " + version); @@ -1481,7 +1482,7 @@ static Set alterReplicaLogDirs(Admin adminClient, Set results = new HashSet<>(); Map> values = adminClient.alterReplicaLogDirs(assignment).values(); - for (Map.Entry> e : values.entrySet()) { + for (Entry> e : values.entrySet()) { TopicPartitionReplica replica = e.getKey(); KafkaFuture future = e.getValue(); try { diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java index 0bb2c9c9d1b71..d427828f2c7f1 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java @@ -31,12 +31,14 @@ import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -155,7 +157,7 @@ public static void removeReplicationThrottleForPartitions(Admin admin, List> allReplicasByPartition) throws InterruptedException, ExecutionException { - Map>>> configResourceToPartitionReplicas = + Map>>> configResourceToPartitionReplicas = allReplicasByPartition.entrySet().stream() .collect(Collectors.groupingBy( topicPartitionListEntry -> new ConfigResource(ConfigResource.Type.TOPIC, topicPartitionListEntry.getKey().topic())) @@ -163,10 +165,10 @@ public static void assignThrottledPartitionReplicas(Admin adminClient, Map> throttles = configResourceToPartitionReplicas.entrySet().stream() .collect( - Collectors.toMap(Map.Entry::getKey, entry -> { + Collectors.toMap(Entry::getKey, entry -> { List alterConfigOps = new ArrayList<>(); Map> replicaThrottle = - entry.getValue().stream().collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + entry.getValue().stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue)); alterConfigOps.add(new AlterConfigOp( new ConfigEntry(LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, formatReplicaThrottles(replicaThrottle)), AlterConfigOp.OpType.SET)); @@ -204,7 +206,7 @@ public static String formatReplicaThrottles(Map> m public static File tempPropertiesFile(Map properties) throws IOException { StringBuilder sb = new StringBuilder(); - for (Map.Entry entry : properties.entrySet()) { + for (Entry entry : properties.entrySet()) { sb.append(entry.getKey() + "=" + entry.getValue() + System.lineSeparator()); } return org.apache.kafka.test.TestUtils.tempFile(sb.toString()); @@ -249,7 +251,7 @@ public static String grabConsoleError(Runnable f) { /** * Capture both the console output and console error during the execution of the provided function. */ - public static Tuple2 grabConsoleOutputAndError(Runnable f) { + public static Entry grabConsoleOutputAndError(Runnable f) { ByteArrayOutputStream outBuf = new ByteArrayOutputStream(); ByteArrayOutputStream errBuf = new ByteArrayOutputStream(); PrintStream out = new PrintStream(outBuf); @@ -267,7 +269,7 @@ public static Tuple2 grabConsoleOutputAndError(Runnable f) { } out.flush(); err.flush(); - return new Tuple2<>(outBuf.toString(), errBuf.toString()); + return new SimpleImmutableEntry<>(outBuf.toString(), errBuf.toString()); } public static class MockExitProcedure implements Exit.Procedure { diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java index b7583c1c44f55..4fd7e3b91970e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java @@ -38,7 +38,6 @@ import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.tools.Tuple2; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatchers; @@ -49,11 +48,12 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Objects; -import java.util.Set; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -91,10 +91,10 @@ public void testAdminRequestsForDescribeOffsets() throws Exception { when(admin.listOffsets(offsetsArgMatcher(), any())) .thenReturn(listOffsetsResult()); - Tuple2, Optional>> statesAndAssignments = groupService.collectGroupOffsets(GROUP); - assertEquals(Optional.of("Stable"), statesAndAssignments.v1); - assertTrue(statesAndAssignments.v2.isPresent()); - assertEquals(TOPIC_PARTITIONS.size(), statesAndAssignments.v2.get().size()); + Entry, Optional>> statesAndAssignments = groupService.collectGroupOffsets(GROUP); + assertEquals(Optional.of("Stable"), statesAndAssignments.getKey()); + assertTrue(statesAndAssignments.getValue().isPresent()); + assertEquals(TOPIC_PARTITIONS.size(), statesAndAssignments.getValue().get().size()); verify(admin, times(1)).describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(GROUP)), any()); verify(admin, times(1)).listConsumerGroupOffsets(ArgumentMatchers.eq(listConsumerGroupOffsetsSpec()), any()); @@ -157,16 +157,16 @@ public void testAdminRequestsForDescribeNegativeOffsets() throws Exception { ArgumentMatchers.argThat(offsetsArgMatcher.apply(assignedTopicPartitions)), any() )).thenReturn(new ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> assignedTopicPartitions.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))); + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)))); when(admin.listOffsets( ArgumentMatchers.argThat(offsetsArgMatcher.apply(unassignedTopicPartitions)), any() )).thenReturn(new ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> unassignedTopicPartitions.contains(e.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))); + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)))); - Tuple2, Optional>> statesAndAssignments = groupService.collectGroupOffsets(GROUP); - Optional state = statesAndAssignments.v1; - Optional> assignments = statesAndAssignments.v2; + Entry, Optional>> statesAndAssignments = groupService.collectGroupOffsets(GROUP); + Optional state = statesAndAssignments.getKey(); + Optional> assignments = statesAndAssignments.getValue(); Map> returnedOffsets = assignments.map(results -> results.stream().collect(Collectors.toMap( diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java index 95fe212c53133..b8133433d6489 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java @@ -92,7 +92,7 @@ public void testDeleteCmdNonEmptyGroup(String quorum) throws Exception { ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition( - () -> service.collectGroupMembers(GROUP, false).v2.get().size() == 1, + () -> service.collectGroupMembers(GROUP, false).getValue().get().size() == 1, "The group did not initialize as expected." ); @@ -112,7 +112,7 @@ public void testDeleteNonEmptyGroup(String quorum) throws Exception { ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition( - () -> service.collectGroupMembers(GROUP, false).v2.get().size() == 1, + () -> service.collectGroupMembers(GROUP, false).getValue().get().size() == 1, "The group did not initialize as expected." ); diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java index 05fd00071e15e..292180b8af90b 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java @@ -29,12 +29,12 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.config.Defaults; -import org.apache.kafka.tools.Tuple2; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import java.util.Collections; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -60,8 +60,8 @@ public void testDeleteOffsetsNonExistingGroup() { String topic = "foo:1"; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(group, topic)); - Tuple2> res = service.deleteOffsets(group, Collections.singletonList(topic)); - assertEquals(Errors.GROUP_ID_NOT_FOUND, res.v1); + Entry> res = service.deleteOffsets(group, Collections.singletonList(topic)); + assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey()); } @ParameterizedTest(name = TEST_WITH_PARAMETERIZED_QUORUM_NAME) @@ -145,9 +145,9 @@ private void testWithConsumerGroup(java.util.function.Consumer withCon withConsumerGroup.accept(() -> { String topic = inputPartition >= 0 ? inputTopic + ":" + inputPartition : inputTopic; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(GROUP, topic)); - Tuple2> res = service.deleteOffsets(GROUP, Collections.singletonList(topic)); - Errors topLevelError = res.v1; - Map partitions = res.v2; + Entry> res = service.deleteOffsets(GROUP, Collections.singletonList(topic)); + Errors topLevelError = res.getKey(); + Map partitions = res.getValue(); TopicPartition tp = new TopicPartition(inputTopic, expectedPartition); // Partition level error should propagate to top level, unless this is due to a missed partition attempt. if (inputPartition >= 0) { diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 85f9f5b836a16..f1b6bba9ac1af 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.utils.Exit; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.apache.kafka.tools.Tuple2; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; @@ -34,6 +33,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -138,8 +138,8 @@ public void testDescribeOffsetsOfNonExistingGroup(String quorum, String groupPro String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - Tuple2, Optional>> res = service.collectGroupOffsets(group); - assertTrue(res.v1.map(s -> s.contains("Dead")).orElse(false) && res.v2.map(Collection::isEmpty).orElse(false), + Entry, Optional>> res = service.collectGroupOffsets(group); + assertTrue(res.getKey().map(s -> s.contains("Dead")).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), "Expected the state to be 'Dead', with no members in the group '" + group + "'."); } @@ -155,12 +155,12 @@ public void testDescribeMembersOfNonExistingGroup(String quorum, String groupPro String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--describe", "--group", group}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - Tuple2, Optional>> res = service.collectGroupMembers(group, false); - assertTrue(res.v1.map(s -> s.contains("Dead")).orElse(false) && res.v2.map(Collection::isEmpty).orElse(false), + Entry, Optional>> res = service.collectGroupMembers(group, false); + assertTrue(res.getKey().map(s -> s.contains("Dead")).orElse(false) && res.getValue().map(Collection::isEmpty).orElse(false), "Expected the state to be 'Dead', with no members in the group '" + group + "'."); - Tuple2, Optional>> res2 = service.collectGroupMembers(group, true); - assertTrue(res2.v1.map(s -> s.contains("Dead")).orElse(false) && res2.v2.map(Collection::isEmpty).orElse(false), + Entry, Optional>> res2 = service.collectGroupMembers(group, true); + assertTrue(res2.getKey().map(s -> s.contains("Dead")).orElse(false) && res2.getValue().map(Collection::isEmpty).orElse(false), "Expected the state to be 'Dead', with no members in the group '" + group + "' (verbose option)."); } @@ -197,8 +197,8 @@ public void testDescribeExistingGroup(String quorum, String groupProtocol) throw ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); TestUtils.waitForCondition(() -> { - Tuple2 res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - return res.v1.trim().split("\n").length == 2 && res.v2.isEmpty(); + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty(); }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); } } @@ -226,9 +226,9 @@ public void testDescribeExistingGroups(String quorum, String groupProtocol) thro ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); TestUtils.waitForCondition(() -> { - Tuple2 res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - long numLines = Arrays.stream(res.v1.trim().split("\n")).filter(line -> !line.isEmpty()).count(); - return (numLines == expectedNumLines) && res.v2.isEmpty(); + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count(); + return (numLines == expectedNumLines) && res.getValue().isEmpty(); }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); } } @@ -252,9 +252,9 @@ public void testDescribeAllExistingGroups(String quorum, String groupProtocol) t ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); TestUtils.waitForCondition(() -> { - Tuple2 res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - long numLines = Arrays.stream(res.v1.trim().split("\n")).filter(s -> !s.isEmpty()).count(); - return (numLines == expectedNumLines) && res.v2.isEmpty(); + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(s -> !s.isEmpty()).count(); + return (numLines == expectedNumLines) && res.getValue().isEmpty(); }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); } } @@ -271,9 +271,9 @@ public void testDescribeOffsetsOfExistingGroup(String quorum, String groupProtoc ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> groupOffsets = service.collectGroupOffsets(GROUP); - Optional state = groupOffsets.v1; - Optional> assignments = groupOffsets.v2; + Entry, Optional>> groupOffsets = service.collectGroupOffsets(GROUP); + Optional state = groupOffsets.getKey(); + Optional> assignments = groupOffsets.getValue(); Predicate isGrp = s -> Objects.equals(s.group, GROUP); @@ -307,9 +307,9 @@ public void testDescribeMembersOfExistingGroup(String quorum, String groupProtoc ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> groupMembers = service.collectGroupMembers(GROUP, false); - Optional state = groupMembers.v1; - Optional> assignments = groupMembers.v2; + Entry, Optional>> groupMembers = service.collectGroupMembers(GROUP, false); + Optional state = groupMembers.getKey(); + Optional> assignments = groupMembers.getValue(); Predicate isGrp = s -> Objects.equals(s.group, GROUP); @@ -331,10 +331,10 @@ public void testDescribeMembersOfExistingGroup(String quorum, String groupProtoc !Objects.equals(assignmentState.host, ConsumerGroupCommand.MISSING_COLUMN_VALUE); }, "Expected a 'Stable' group status, rows and valid member information for group " + GROUP + "."); - Tuple2, Optional>> res = service.collectGroupMembers(GROUP, true); + Entry, Optional>> res = service.collectGroupMembers(GROUP, true); - if (res.v2.isPresent()) { - assertTrue(res.v2.get().size() == 1 && res.v2.get().iterator().next().assignment.size() == 1, + if (res.getValue().isPresent()) { + assertTrue(res.getValue().get().size() == 1 && res.getValue().get().iterator().next().assignment.size() == 1, "Expected a topic partition assigned to the single group member for group " + GROUP); } else { fail("Expected partition assignments for members of group " + GROUP); @@ -407,8 +407,8 @@ public void testDescribeExistingGroupWithNoMembers(String quorum, String groupPr ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); TestUtils.waitForCondition(() -> { - Tuple2 res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); - return res.v1.trim().split("\n").length == 2 && res.v2.isEmpty(); + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty(); }, "Expected describe group results with one data row for describe type '" + String.join(" ", describeType) + "'"); // stop the consumer so the group has no active member anymore @@ -431,18 +431,18 @@ public void testDescribeOffsetsOfExistingGroupWithNoMembers(String quorum, Strin ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> res = service.collectGroupOffsets(GROUP); - return res.v1.map(s -> s.contains("Stable")).orElse(false) - && res.v2.map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, GROUP) && assignment.offset.isPresent())).orElse(false); + Entry, Optional>> res = service.collectGroupOffsets(GROUP); + return res.getKey().map(s -> s.contains("Stable")).orElse(false) + && res.getValue().map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, GROUP) && assignment.offset.isPresent())).orElse(false); }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); // stop the consumer so the group has no active member anymore executor.shutdown(); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> offsets = service.collectGroupOffsets(GROUP); - Optional state = offsets.v1; - Optional> assignments = offsets.v2; + Entry, Optional>> offsets = service.collectGroupOffsets(GROUP); + Optional state = offsets.getKey(); + Optional> assignments = offsets.getValue(); List testGroupAssignments = assignments.get().stream().filter(a -> Objects.equals(a.group, GROUP)).collect(Collectors.toList()); PartitionAssignmentState assignment = testGroupAssignments.get(0); return state.map(s -> s.contains("Empty")).orElse(false) && @@ -465,17 +465,17 @@ public void testDescribeMembersOfExistingGroupWithNoMembers(String quorum, Strin ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> res = service.collectGroupMembers(GROUP, false); - return res.v1.map(s -> s.contains("Stable")).orElse(false) - && res.v2.map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, GROUP))).orElse(false); + Entry, Optional>> res = service.collectGroupMembers(GROUP, false); + return res.getKey().map(s -> s.contains("Stable")).orElse(false) + && res.getValue().map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, GROUP))).orElse(false); }, "Expected the group to initially become stable, and to find group in assignments after initial offset commit."); // stop the consumer so the group has no active member anymore executor.shutdown(); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> res = service.collectGroupMembers(GROUP, false); - return res.v1.map(s -> s.contains("Empty")).orElse(false) && res.v2.isPresent() && res.v2.get().isEmpty(); + Entry, Optional>> res = service.collectGroupMembers(GROUP, false); + return res.getKey().map(s -> s.contains("Empty")).orElse(false) && res.getValue().isPresent() && res.getValue().get().isEmpty(); }, "Expected no member in describe group members results for group '" + GROUP + "'"); } @@ -521,9 +521,9 @@ public void testDescribeWithConsumersWithoutAssignedPartitions(String quorum, St ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); TestUtils.waitForCondition(() -> { - Tuple2 res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2; - return res.v2.isEmpty() && res.v1.trim().split("\n").length == expectedNumRows; + return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows; }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); } } @@ -540,11 +540,11 @@ public void testDescribeOffsetsWithConsumersWithoutAssignedPartitions(String quo ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> res = service.collectGroupOffsets(GROUP); - return res.v1.map(s -> s.contains("Stable")).isPresent() && - res.v2.isPresent() && - res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1 && - res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 1; + Entry, Optional>> res = service.collectGroupOffsets(GROUP); + return res.getKey().map(s -> s.contains("Stable")).isPresent() && + res.getValue().isPresent() && + res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 1 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 1; }, "Expected rows for consumers with no assigned partitions in describe group results"); } @@ -560,18 +560,18 @@ public void testDescribeMembersWithConsumersWithoutAssignedPartitions(String quo ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> res = service.collectGroupMembers(GROUP, false); - return res.v1.map(s -> s.contains("Stable")).orElse(false) && - res.v2.isPresent() && - res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && - res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 1 && - res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0).count() == 1 && - res.v2.get().stream().allMatch(s -> s.assignment.isEmpty()); + Entry, Optional>> res = service.collectGroupMembers(GROUP, false); + return res.getKey().map(s -> s.contains("Stable")).orElse(false) && + res.getValue().isPresent() && + res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 1 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0).count() == 1 && + res.getValue().get().stream().allMatch(s -> s.assignment.isEmpty()); }, "Expected rows for consumers with no assigned partitions in describe group results"); - Tuple2, Optional>> res = service.collectGroupMembers(GROUP, true); - assertTrue(res.v1.map(s -> s.contains("Stable")).orElse(false) - && res.v2.map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false), + Entry, Optional>> res = service.collectGroupMembers(GROUP, true); + assertTrue(res.getKey().map(s -> s.contains("Stable")).orElse(false) + && res.getValue().map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false), "Expected additional columns in verbose version of describe members"); } @@ -608,9 +608,9 @@ public void testDescribeWithMultiPartitionTopicAndMultipleConsumers(String quoru ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs.toArray(new String[0])); TestUtils.waitForCondition(() -> { - Tuple2 res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + Entry res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3; - return res.v2.isEmpty() && res.v1.trim().split("\n").length == expectedNumRows; + return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows; }, "Expected a single data row in describe group result with describe type '" + String.join(" ", describeType) + "'"); } } @@ -629,12 +629,12 @@ public void testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(Strin ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> res = service.collectGroupOffsets(GROUP); - return res.v1.map(s -> s.contains("Stable")).orElse(false) && - res.v2.isPresent() && - res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && - res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 2 && - res.v2.get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && !x.partition.isPresent()); + Entry, Optional>> res = service.collectGroupOffsets(GROUP); + return res.getKey().map(s -> s.contains("Stable")).orElse(false) && + res.getValue().isPresent() && + res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.partition.isPresent()).count() == 2 && + res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && !x.partition.isPresent()); }, "Expected two rows (one row per consumer) in describe group results."); } @@ -652,16 +652,16 @@ public void testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(Strin ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> res = service.collectGroupMembers(GROUP, false); - return res.v1.map(s -> s.contains("Stable")).orElse(false) && - res.v2.isPresent() && - res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && - res.v2.get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 2 && - res.v2.get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0); + Entry, Optional>> res = service.collectGroupMembers(GROUP, false); + return res.getKey().map(s -> s.contains("Stable")).orElse(false) && + res.getValue().isPresent() && + res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2 && + res.getValue().get().stream().filter(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 1).count() == 2 && + res.getValue().get().stream().noneMatch(x -> Objects.equals(x.group, GROUP) && x.numPartitions == 0); }, "Expected two rows (one row per consumer) in describe group members results."); - Tuple2, Optional>> res = service.collectGroupMembers(GROUP, true); - assertTrue(res.v1.map(s -> s.contains("Stable")).orElse(false) && res.v2.map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0, + Entry, Optional>> res = service.collectGroupMembers(GROUP, true); + assertTrue(res.getKey().map(s -> s.contains("Stable")).orElse(false) && res.getValue().map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0, "Expected additional columns in verbose version of describe members"); } @@ -698,9 +698,9 @@ public void testDescribeSimpleConsumerGroup(String quorum) throws Exception { ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> res = service.collectGroupOffsets(GROUP); - return res.v1.map(s -> s.contains("Empty")).orElse(false) - && res.v2.isPresent() && res.v2.get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2; + Entry, Optional>> res = service.collectGroupOffsets(GROUP); + return res.getKey().map(s -> s.contains("Empty")).orElse(false) + && res.getValue().isPresent() && res.getValue().get().stream().filter(s -> Objects.equals(s.group, GROUP)).count() == 2; }, "Expected a stable group with two members in describe group state result."); } @@ -798,18 +798,18 @@ public void testDescribeNonOffsetCommitGroup(String quorum, String groupProtocol ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); TestUtils.waitForCondition(() -> { - Tuple2, Optional>> groupOffsets = service.collectGroupOffsets(GROUP); + Entry, Optional>> groupOffsets = service.collectGroupOffsets(GROUP); Predicate isGrp = s -> Objects.equals(s.group, GROUP); - boolean res = groupOffsets.v1.map(s -> s.contains("Stable")).orElse(false) && - groupOffsets.v2.isPresent() && - groupOffsets.v2.get().stream().filter(isGrp).count() == 1; + boolean res = groupOffsets.getKey().map(s -> s.contains("Stable")).orElse(false) && + groupOffsets.getValue().isPresent() && + groupOffsets.getValue().get().stream().filter(isGrp).count() == 1; if (!res) return false; - Optional maybeAssignmentState = groupOffsets.v2.get().stream().filter(isGrp).findFirst(); + Optional maybeAssignmentState = groupOffsets.getValue().get().stream().filter(isGrp).findFirst(); if (!maybeAssignmentState.isPresent()) return false; diff --git a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java index 618cf972995b7..c7c4a4f35e65e 100644 --- a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java +++ b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java @@ -217,7 +217,7 @@ public void run(ExperimentDef config, Journal journal, boolean displayChartsOnSc System.out.println("Generating Reassignment"); Map> newAssignment = ReassignPartitionsCommand.generateAssignment(adminClient, - json(TOPIC_NAME), brokers.stream().map(Object::toString).collect(Collectors.joining(",")), true).v1; + json(TOPIC_NAME), brokers.stream().map(Object::toString).collect(Collectors.joining(",")), true).getKey(); System.out.println("Starting Reassignment"); long start = System.currentTimeMillis(); diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java index 83fc665e3e487..0348aa24ffe9f 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java @@ -43,7 +43,6 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.tools.TerseException; -import org.apache.kafka.tools.Tuple2; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -54,6 +53,7 @@ import scala.collection.Seq; import java.io.Closeable; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -62,6 +62,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -395,11 +396,11 @@ public void testCancellation(String quorum) throws Exception { waitForVerifyAssignment(cluster.adminClient, assignment, true, new VerifyAssignmentResult(partStates, true, Collections.emptyMap(), false)); // Cancel the reassignment. - assertEquals(new Tuple2<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, true)); + assertEquals(new SimpleImmutableEntry<>(new HashSet<>(asList(foo0, baz1)), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, true)); // Broker throttles are still active because we passed --preserve-throttles waitForInterBrokerThrottle(asList(0, 1, 2, 3), interBrokerThrottle); // Cancelling the reassignment again should reveal nothing to cancel. - assertEquals(new Tuple2<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, false)); + assertEquals(new SimpleImmutableEntry<>(Collections.emptySet(), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, false)); // This time, the broker throttles were removed. waitForBrokerLevelThrottles(unthrottledBrokerConfigs); // Verify that there are no ongoing reassignments. @@ -446,7 +447,7 @@ public void testCancellationWithAddingReplicaInIsr(String quorum) throws Excepti ); // Now cancel the assignment and verify that the partition is removed from cancelled replicas - assertEquals(new Tuple2<>(Collections.singleton(foo0), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, true)); + assertEquals(new SimpleImmutableEntry<>(Collections.singleton(foo0), Collections.emptySet()), runCancelAssignment(cluster.adminClient, assignment, true)); verifyReplicaDeleted(foo0, 3); verifyReplicaDeleted(foo0, 4); } @@ -708,7 +709,7 @@ private void runExecuteAssignment(Admin adminClient, } } - private Tuple2, Set> runCancelAssignment( + private Entry, Set> runCancelAssignment( Admin adminClient, String jsonString, Boolean preserveThrottles diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java index 699e048fb206e..c6f145d9a5598 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java @@ -31,18 +31,19 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.AdminCommandFailedException; import org.apache.kafka.server.common.AdminOperationException; -import org.apache.kafka.tools.Tuple2; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -174,14 +175,14 @@ public void testFindPartitionReassignmentStates() throws Exception { expStates.put(new TopicPartition("foo", 1), new PartitionReassignmentState(asList(1, 2, 3), asList(1, 2, 3), true)); - Tuple2, Boolean> actual = + Entry, Boolean> actual = findPartitionReassignmentStates(adminClient, asList( - new Tuple2<>(new TopicPartition("foo", 0), asList(0, 1, 3)), - new Tuple2<>(new TopicPartition("foo", 1), asList(1, 2, 3)) + new SimpleImmutableEntry<>(new TopicPartition("foo", 0), asList(0, 1, 3)), + new SimpleImmutableEntry<>(new TopicPartition("foo", 1), asList(1, 2, 3)) )); - assertEquals(expStates, actual.v1); - assertTrue(actual.v2); + assertEquals(expStates, actual.getKey()); + assertTrue(actual.getValue()); // Cancel the reassignment and test findPartitionReassignmentStates again. Map cancelResult = cancelPartitionReassignments(adminClient, @@ -198,12 +199,12 @@ public void testFindPartitionReassignmentStates() throws Exception { new PartitionReassignmentState(asList(1, 2, 3), asList(1, 2, 3), true)); actual = findPartitionReassignmentStates(adminClient, asList( - new Tuple2<>(new TopicPartition("foo", 0), asList(0, 1, 3)), - new Tuple2<>(new TopicPartition("foo", 1), asList(1, 2, 3)) + new SimpleImmutableEntry<>(new TopicPartition("foo", 0), asList(0, 1, 3)), + new SimpleImmutableEntry<>(new TopicPartition("foo", 1), asList(1, 2, 3)) )); - assertEquals(expStates, actual.v1); - assertFalse(actual.v2); + assertEquals(expStates, actual.getKey()); + assertFalse(actual.getValue()); } } @@ -338,13 +339,13 @@ public void testParseGenerateAssignmentArgs() throws Exception { assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs( "{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4,5"), "Expected to detect duplicate broker list entries").getMessage()); - assertEquals(new Tuple2<>(asList(5, 2, 3, 4), asList("foo")), + assertEquals(new SimpleImmutableEntry<>(asList(5, 2, 3, 4), asList("foo")), parseGenerateAssignmentArgs("{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4")); assertStartsWith("List of topics to reassign contains duplicate entries", assertThrows(AdminCommandFailedException.class, () -> parseGenerateAssignmentArgs( "{\"topics\": [{\"topic\": \"foo\"},{\"topic\": \"foo\"}], \"version\":1}", "5,2,3,4"), "Expected to detect duplicate topic entries").getMessage()); - assertEquals(new Tuple2<>(asList(5, 3, 4), asList("foo", "bar")), + assertEquals(new SimpleImmutableEntry<>(asList(5, 3, 4), asList("foo", "bar")), parseGenerateAssignmentArgs( "{\"topics\": [{\"topic\": \"foo\"},{\"topic\": \"bar\"}], \"version\":1}", "5,3,4")); } @@ -389,7 +390,7 @@ public void testGenerateAssignmentWithInconsistentRacks() throws Exception { () -> generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", true), "Expected generateAssignment to fail").getMessage()); // It should succeed when --disable-rack-aware is used. - Tuple2>, Map>> + Entry>, Map>> proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", false); Map> expCurrent = new HashMap<>(); @@ -397,7 +398,7 @@ public void testGenerateAssignmentWithInconsistentRacks() throws Exception { expCurrent.put(new TopicPartition("foo", 0), asList(0, 1, 2)); expCurrent.put(new TopicPartition("foo", 1), asList(1, 2, 3)); - assertEquals(expCurrent, proposedCurrent.v2); + assertEquals(expCurrent, proposedCurrent.getValue()); } } @@ -407,7 +408,7 @@ public void testGenerateAssignmentWithFewerBrokers() throws Exception { addTopics(adminClient); List goalBrokers = asList(0, 1, 3); - Tuple2>, Map>> + Entry>, Map>> proposedCurrent = generateAssignment(adminClient, "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}", goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), false); @@ -418,12 +419,12 @@ public void testGenerateAssignmentWithFewerBrokers() throws Exception { expCurrent.put(new TopicPartition("foo", 1), asList(1, 2, 3)); expCurrent.put(new TopicPartition("bar", 0), asList(2, 3, 0)); - assertEquals(expCurrent, proposedCurrent.v2); + assertEquals(expCurrent, proposedCurrent.getValue()); // The proposed assignment should only span the provided brokers - proposedCurrent.v1.values().forEach(replicas -> + proposedCurrent.getKey().values().forEach(replicas -> assertTrue(goalBrokers.containsAll(replicas), - "Proposed assignment " + proposedCurrent.v1 + " puts replicas on brokers other than " + goalBrokers) + "Proposed assignment " + proposedCurrent.getKey() + " puts replicas on brokers other than " + goalBrokers) ); } } @@ -567,14 +568,14 @@ public void testParseExecuteAssignmentArgs() throws Exception { partitionsToBeReassigned.put(new TopicPartition("foo", 0), asList(1, 2, 3)); partitionsToBeReassigned.put(new TopicPartition("foo", 1), asList(3, 4, 5)); - Tuple2>, Map> actual = parseExecuteAssignmentArgs( + Entry>, Map> actual = parseExecuteAssignmentArgs( "{\"version\":1,\"partitions\":" + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}," + "{\"topic\":\"foo\",\"partition\":1,\"replicas\":[3,4,5],\"log_dirs\":[\"any\",\"any\",\"any\"]}" + "]}"); - assertEquals(partitionsToBeReassigned, actual.v1); - assertTrue(actual.v2.isEmpty()); + assertEquals(partitionsToBeReassigned, actual.getKey()); + assertTrue(actual.getValue().isEmpty()); Map replicaAssignment = new HashMap<>(); @@ -587,8 +588,8 @@ public void testParseExecuteAssignmentArgs() throws Exception { "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[1,2,3],\"log_dirs\":[\"/tmp/a\",\"/tmp/b\",\"/tmp/c\"]}" + "]}"); - assertEquals(Collections.singletonMap(new TopicPartition("foo", 0), asList(1, 2, 3)), actual.v1); - assertEquals(replicaAssignment, actual.v2); + assertEquals(Collections.singletonMap(new TopicPartition("foo", 0), asList(1, 2, 3)), actual.getKey()); + assertEquals(replicaAssignment, actual.getValue()); } @Test