Skip to content

Commit

Permalink
MINOR: Tuple2 replaced with Map.Entry (#15560)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
nizhikov authored Mar 23, 2024
1 parent 6eba905 commit 0f216b6
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 290 deletions.
48 changes: 0 additions & 48 deletions tools/src/main/java/org/apache/kafka/tools/Tuple2.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.tools.ToolsUtils;
import org.apache.kafka.tools.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -71,6 +71,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Properties;
Expand Down Expand Up @@ -315,10 +316,10 @@ private Optional<Integer> size(Optional<? extends Collection<?>> colOpt) {
return colOpt.map(Collection::size);
}

private void printOffsets(Map<String, Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> offsets) {
private void printOffsets(Map<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> offsets) {
offsets.forEach((groupId, tuple) -> {
Optional<String> state = tuple.v1;
Optional<Collection<PartitionAssignmentState>> assignments = tuple.v2;
Optional<String> state = tuple.getKey();
Optional<Collection<PartitionAssignmentState>> assignments = tuple.getValue();

if (shouldPrintMemberState(groupId, state, size(assignments))) {
String format = printOffsetFormat(assignments);
Expand Down Expand Up @@ -359,10 +360,10 @@ private static String printOffsetFormat(Optional<Collection<PartitionAssignmentS
return format;
}

private void printMembers(Map<String, Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
private void printMembers(Map<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
members.forEach((groupId, tuple) -> {
Optional<String> state = tuple.v1;
Optional<Collection<MemberAssignmentState>> assignments = tuple.v2;
Optional<String> state = tuple.getKey();
Optional<Collection<MemberAssignmentState>> assignments = tuple.getValue();
int maxGroupLen = 15, maxConsumerIdLen = 15, maxGroupInstanceIdLen = 17, maxHostLen = 15, maxClientIdLen = 15;
boolean includeGroupInstanceId = false;

Expand Down Expand Up @@ -448,11 +449,11 @@ void describeGroups() throws Exception {
long subActions = Stream.of(membersOptPresent, offsetsOptPresent, stateOptPresent).filter(x -> x).count();

if (subActions == 0 || offsetsOptPresent) {
TreeMap<String, Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> offsets
TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> offsets
= collectGroupsOffsets(groupIds);
printOffsets(offsets);
} else if (membersOptPresent) {
TreeMap<String, Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members
TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members
= collectGroupsMembers(groupIds, opts.options.has(opts.verboseOpt));
printMembers(members, opts.options.has(opts.verboseOpt));
} else {
Expand Down Expand Up @@ -560,7 +561,7 @@ Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets() {
return result;
}

Tuple2<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, List<String> topics) {
Entry<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, List<String> topics) {
Map<TopicPartition, Throwable> partitionLevelResult = new HashMap<>();
Set<String> topicWithPartitions = new HashSet<>();
Set<String> topicWithoutPartitions = new HashSet<>();
Expand Down Expand Up @@ -617,17 +618,17 @@ Tuple2<Errors, Map<TopicPartition, Throwable>> deleteOffsets(String groupId, Lis
}
});

return new Tuple2<>(topLevelException, partitionLevelResult);
return new SimpleImmutableEntry<>(topLevelException, partitionLevelResult);
}

void deleteOffsets() {
String groupId = opts.options.valueOf(opts.groupOpt);
List<String> topics = opts.options.valuesOf(opts.topicOpt);

Tuple2<Errors, Map<TopicPartition, Throwable>> res = deleteOffsets(groupId, topics);
Entry<Errors, Map<TopicPartition, Throwable>> res = deleteOffsets(groupId, topics);

Errors topLevelResult = res.v1;
Map<TopicPartition, Throwable> partitionLevelResult = res.v2;
Errors topLevelResult = res.getKey();
Map<TopicPartition, Throwable> partitionLevelResult = res.getValue();

switch (topLevelResult) {
case NONE:
Expand Down Expand Up @@ -677,7 +678,7 @@ Map<String, ConsumerGroupDescription> describeConsumerGroups(Collection<String>
withTimeoutMs(new DescribeConsumerGroupsOptions())
).describedGroups();

for (Map.Entry<String, KafkaFuture<ConsumerGroupDescription>> e : stringKafkaFutureMap.entrySet()) {
for (Entry<String, KafkaFuture<ConsumerGroupDescription>> e : stringKafkaFutureMap.entrySet()) {
res.put(e.getKey(), e.getValue().get());
}
return res;
Expand All @@ -686,16 +687,16 @@ Map<String, ConsumerGroupDescription> describeConsumerGroups(Collection<String>
/**
* Returns the state of the specified consumer group and partition assignment states
*/
Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new Tuple2<>(Optional.empty(), Optional.empty()));
Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>> collectGroupOffsets(String groupId) throws Exception {
return collectGroupsOffsets(Collections.singletonList(groupId)).getOrDefault(groupId, new SimpleImmutableEntry<>(Optional.empty(), Optional.empty()));
}

/**
* Returns states of the specified consumer groups and partition assignment states
*/
TreeMap<String, Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> collectGroupsOffsets(Collection<String> groupIds) throws Exception {
TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> collectGroupsOffsets(Collection<String> groupIds) throws Exception {
Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
TreeMap<String, Tuple2<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> groupOffsets = new TreeMap<>();
TreeMap<String, Entry<Optional<String>, Optional<Collection<PartitionAssignmentState>>>> groupOffsets = new TreeMap<>();

consumerGroups.forEach((groupId, consumerGroup) -> {
ConsumerGroupState state = consumerGroup.state();
Expand Down Expand Up @@ -737,19 +738,19 @@ TreeMap<String, Tuple2<Optional<String>, Optional<Collection<PartitionAssignment

rowsWithConsumer.addAll(rowsWithoutConsumer);

groupOffsets.put(groupId, new Tuple2<>(Optional.of(state.toString()), Optional.of(rowsWithConsumer)));
groupOffsets.put(groupId, new SimpleImmutableEntry<>(Optional.of(state.toString()), Optional.of(rowsWithConsumer)));
});

return groupOffsets;
}

Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId, boolean verbose) throws Exception {
Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>> collectGroupMembers(String groupId, boolean verbose) throws Exception {
return collectGroupsMembers(Collections.singleton(groupId), verbose).get(groupId);
}

TreeMap<String, Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds, boolean verbose) throws Exception {
TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> collectGroupsMembers(Collection<String> groupIds, boolean verbose) throws Exception {
Map<String, ConsumerGroupDescription> consumerGroups = describeConsumerGroups(groupIds);
TreeMap<String, Tuple2<Optional<String>, Optional<Collection<MemberAssignmentState>>>> res = new TreeMap<>();
TreeMap<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> res = new TreeMap<>();

consumerGroups.forEach((groupId, consumerGroup) -> {
String state = consumerGroup.state().toString();
Expand All @@ -763,7 +764,7 @@ TreeMap<String, Tuple2<Optional<String>, Optional<Collection<MemberAssignmentSta
consumer.assignment().topicPartitions().size(),
new ArrayList<>(verbose ? consumer.assignment().topicPartitions() : Collections.emptySet())
)).collect(Collectors.toList());
res.put(groupId, new Tuple2<>(Optional.of(state), Optional.of(memberAssignmentStates)));
res.put(groupId, new SimpleImmutableEntry<>(Optional.of(state), Optional.of(memberAssignmentStates)));
});
return res;
}
Expand Down Expand Up @@ -836,7 +837,7 @@ private Map<TopicPartition, LogOffsetResult> getLogTimestampOffsets(Collection<T
});

Map<TopicPartition, LogOffsetResult> successfulLogTimestampOffsets = successfulOffsetsForTimes.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new LogOffset(e.getValue().offset())));
.collect(Collectors.toMap(Entry::getKey, e -> new LogOffset(e.getValue().offset())));

unsuccessfulOffsetsForTimes.forEach((tp, offsetResultInfo) ->
System.out.println("\nWarn: Partition " + tp.partition() + " from topic " + tp.topic() +
Expand Down Expand Up @@ -991,7 +992,7 @@ private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String grou
if (opts.options.has(opts.resetToOffsetOpt)) {
long offset = opts.options.valueOf(opts.resetToOffsetOpt);
return checkOffsetsRange(partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), tp -> offset)))
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
.entrySet().stream().collect(Collectors.toMap(Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
} else if (opts.options.has(opts.resetToEarliestOpt)) {
Map<TopicPartition, LogOffsetResult> logStartOffsets = getLogStartOffsets(partitionsToReset);
return partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
Expand Down Expand Up @@ -1029,7 +1030,7 @@ private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String grou
return currentOffset.offset() + shiftBy;
}));
return checkOffsetsRange(requestedOffsets).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
.collect(Collectors.toMap(Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
try {
long timestamp = Utils.getDateTime(opts.options.valueOf(opts.resetToDatetimeOpt));
Expand Down Expand Up @@ -1078,7 +1079,7 @@ private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String grou
topicPartition -> resetPlanForGroup.get(topicPartition).offset()));

return checkOffsetsRange(requestedOffsets).entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
.collect(Collectors.toMap(Entry::getKey, e -> new OffsetAndMetadata(e.getValue())));
}).orElseGet(Collections::emptyMap);
} else if (opts.options.has(opts.resetToCurrentOpt)) {
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets = getCommittedOffsets(groupId);
Expand All @@ -1104,7 +1105,7 @@ private Map<TopicPartition, OffsetAndMetadata> prepareOffsetsToReset(String grou
}));

Map<TopicPartition, OffsetAndMetadata> preparedOffsetsForPartitionsWithoutCommittedOffset = getLogEndOffsets(partitionsToResetWithoutCommittedOffset)
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
.entrySet().stream().collect(Collectors.toMap(Entry::getKey, e -> {
if (!(e.getValue() instanceof LogOffset)) {
ToolsUtils.printUsageAndExit(opts.parser, "Error getting ending offset of topic partition: " + e.getKey());
return null;
Expand Down
Loading

0 comments on commit 0f216b6

Please sign in to comment.