Skip to content

Commit

Permalink
MINOR: Various cleanups in tools (#15709)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored Apr 15, 2024
1 parent 3617dda commit 395fdae
Show file tree
Hide file tree
Showing 16 changed files with 46 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public void testConsume(final long prodTimeMs) throws Throwable {
new ClientCompatibilityTestDeserializer(testConfig.expectClusterId);
try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(consumerProps, deserializer, deserializer)) {
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(testConfig.topic);
if (partitionInfos.size() < 1)
if (partitionInfos.isEmpty())
throw new RuntimeException("Expected at least one partition for topic " + testConfig.topic);
final Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
final LinkedList<TopicPartition> topicPartitions = new LinkedList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ClientMetricsCommand {
private static final Logger LOG = LoggerFactory.getLogger(ClientMetricsCommand.class);
Expand All @@ -74,17 +75,16 @@ static void execute(String... args) throws Exception {
Properties config = opts.commandConfig();
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.bootstrapServer());

ClientMetricsService service = new ClientMetricsService(config);
int exitCode = 0;
try {
try (ClientMetricsService service = new ClientMetricsService(config)) {
if (opts.hasAlterOption()) {
service.alterClientMetrics(opts);
} else if (opts.hasDescribeOption()) {
service.describeClientMetrics(opts);
} else if (opts.hasDeleteOption()) {
service.deleteClientMetrics(opts);
} else if (opts.hasListOption()) {
service.listClientMetrics(opts);
service.listClientMetrics();
}
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Expand All @@ -98,7 +98,6 @@ static void execute(String... args) throws Exception {
printException(t);
exitCode = 1;
} finally {
service.close();
Exit.exit(exitCode);
}
}
Expand All @@ -120,8 +119,8 @@ public void alterClientMetrics(ClientMetricsCommandOptions opts) throws Exceptio

Map<String, String> configsToBeSet = new HashMap<>();
opts.interval().map(intervalVal -> configsToBeSet.put("interval.ms", intervalVal.toString()));
opts.metrics().map(metricslist -> configsToBeSet.put("metrics", metricslist.stream().collect(Collectors.joining(","))));
opts.match().map(matchlist -> configsToBeSet.put("match", matchlist.stream().collect(Collectors.joining(","))));
opts.metrics().map(metricslist -> configsToBeSet.put("metrics", String.join(",", metricslist)));
opts.match().map(matchlist -> configsToBeSet.put("match", String.join(",", matchlist)));

ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityName);
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
Expand Down Expand Up @@ -168,7 +167,7 @@ public void describeClientMetrics(ClientMetricsCommandOptions opts) throws Excep
}
}

public void listClientMetrics(ClientMetricsCommandOptions opts) throws Exception {
public void listClientMetrics() throws Exception {
Collection<ClientMetricsResourceListing> resources = adminClient.listClientMetricsResources()
.all().get(30, TimeUnit.SECONDS);
String results = resources.stream().map(ClientMetricsResourceListing::name).collect(Collectors.joining("\n"));
Expand Down Expand Up @@ -212,8 +211,6 @@ public static final class ClientMetricsCommandOptions extends CommandDefaultOpti

private final ArgumentAcceptingOptionSpec<Integer> intervalOpt;

private final String nl;

private final ArgumentAcceptingOptionSpec<String> matchOpt;

private final ArgumentAcceptingOptionSpec<String> metricsOpt;
Expand Down Expand Up @@ -244,15 +241,14 @@ public ClientMetricsCommandOptions(String[] args) {
.describedAs("push interval")
.ofType(java.lang.Integer.class);

nl = System.getProperty("line.separator");
String nl = System.lineSeparator();

String[] matchSelectors = new String[] {
"client_id", "client_instance_id", "client_software_name",
"client_software_version", "client_source_address", "client_source_port"
};
String matchSelectorNames = Arrays.stream(matchSelectors).map(config -> "\t" + config).collect(Collectors.joining(nl));
matchOpt = parser.accepts("match", "Matching selector 'k1=v1,k2=v2'. The following is a list of valid selector names: " +
nl + matchSelectorNames)
matchOpt = parser.accepts("match", "Matching selector 'k1=v1,k2=v2'. The following is a list of valid selector names: " + nl + matchSelectorNames)
.withRequiredArg()
.describedAs("k1=v1,k2=v2")
.ofType(String.class)
Expand Down Expand Up @@ -347,10 +343,7 @@ public void checkArgs() {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to manipulate and describe client metrics configurations.");

// should have exactly one action
long actions =
Arrays.asList(alterOpt, deleteOpt, describeOpt, listOpt)
.stream().filter(options::has)
.count();
long actions = Stream.of(alterOpt, deleteOpt, describeOpt, listOpt).filter(options::has).count();
if (actions != 1)
CommandLineUtils.printUsageAndExit(parser, "Command must include exactly one action: --alter, --delete, --describe or --list.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ static void execute(String... args) throws IOException, ExecutionException, Inte
}

private static class GetOffsetShellOptions extends CommandDefaultOptions {
private final OptionSpec<String> brokerListOpt;
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> topicPartitionsOpt;
private final OptionSpec<String> topicOpt;
private final OptionSpec<String> partitionsOpt;
Expand All @@ -108,11 +106,11 @@ private static class GetOffsetShellOptions extends CommandDefaultOptions {
public GetOffsetShellOptions(String[] args) throws TerseException {
super(args);

brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
OptionSpec<String> brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.withRequiredArg()
.describedAs("HOST1:PORT1,...,HOST3:PORT3")
.ofType(String.class);
bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
OptionSpec<String> bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.")
.requiredUnless("broker-list")
.withRequiredArg()
.describedAs("HOST1:PORT1,...,HOST3:PORT3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ private static String printOffsetFormat(Optional<Collection<PartitionAssignmentS
}
}

String format = "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";
return format;
return "\n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %-15s %-15s %-15s %" + (-maxConsumerIdLen) + "s %" + (-maxHostLen) + "s %s";
}

private void printMembers(Map<String, Entry<Optional<String>, Optional<Collection<MemberAssignmentState>>>> members, boolean verbose) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import org.apache.kafka.common.protocol.Errors;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -42,8 +40,8 @@
import static org.mockito.Mockito.when;

public class ClientMetricsCommandTest {
private String bootstrapServer = "localhost:9092";
private String clientMetricsName = "cm";
private final String bootstrapServer = "localhost:9092";
private final String clientMetricsName = "cm";

@Test
public void testOptionsNoActionFails() {
Expand Down Expand Up @@ -237,13 +235,12 @@ public void testList() {

String capturedOutput = ToolsTestUtils.captureStandardOut(() -> {
try {
service.listClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions(
new String[]{"--bootstrap-server", bootstrapServer, "--list"}));
service.listClientMetrics();
} catch (Throwable t) {
fail(t);
}
});
assertEquals("one,two", Arrays.stream(capturedOutput.split("\n")).collect(Collectors.joining(",")));
assertEquals("one,two", String.join(",", capturedOutput.split("\n")));
}

@Test
Expand All @@ -254,9 +251,7 @@ public void testListFailsWithUnsupportedVersionException() {
ListClientMetricsResourcesResult result = AdminClientTestUtils.listClientMetricsResourcesResult(Errors.UNSUPPORTED_VERSION.exception());
when(adminClient.listClientMetricsResources()).thenReturn(result);

assertThrows(ExecutionException.class,
() -> service.listClientMetrics(new ClientMetricsCommand.ClientMetricsCommandOptions(
new String[] {"--bootstrap-server", bootstrapServer, "--list"})));
assertThrows(ExecutionException.class, () -> service.listClientMetrics());
}

private void assertInitializeInvalidOptionsExitCode(int expected, String[] options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private DelegationTokenCommand.DelegationTokenCommandOptions getDescribeOpts(Str
args.add("--command-config");
args.add("testfile");
args.add("--describe");
if (!owner.equals("")) {
if (!owner.isEmpty()) {
args.add("--owner-principal");
args.add(owner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,9 @@ public void testOffsetFileNotExists() {
"--bootstrap-server", "localhost:9092",
"--offset-json-file", "/not/existing/file"
}, System.out));
assertEquals(1, DeleteRecordsCommand.mainNoExit(new String[]{
assertEquals(1, DeleteRecordsCommand.mainNoExit(
"--bootstrap-server", "localhost:9092",
"--offset-json-file", "/not/existing/file"
}));
"--offset-json-file", "/not/existing/file"));
}

@Test
Expand All @@ -137,11 +136,10 @@ public void testCommandConfigNotExists() {
"--offset-json-file", "/not/existing/file",
"--command-config", "/another/not/existing/file"
}, System.out));
assertEquals(1, DeleteRecordsCommand.mainNoExit(new String[] {
assertEquals(1, DeleteRecordsCommand.mainNoExit(
"--bootstrap-server", "localhost:9092",
"--offset-json-file", "/not/existing/file",
"--command-config", "/another/not/existing/file"
}));
"--command-config", "/another/not/existing/file"));
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import static java.lang.String.format;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class JmxToolTest {
Expand Down Expand Up @@ -167,7 +168,7 @@ public void allMetrics() {
assertNormalExit();

Map<String, String> csv = parseCsv(out);
assertTrue(csv.size() > 0);
assertFalse(csv.isEmpty());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public void testDefaultClientId() throws Exception {
}

@Test
public void testStatsInitializationWithLargeNumRecords() throws Exception {
public void testStatsInitializationWithLargeNumRecords() {
long numRecords = Long.MAX_VALUE;
assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
Expand Down Expand Up @@ -235,7 +236,7 @@ public void testMetricValues() throws Exception {
assertEquals(3, metrics.size());
List<JsonNode> metricsList = Arrays.asList(metrics.get(0), metrics.get(1), metrics.get(2));
// Sort metrics based on name so that we can verify the value for each metric below
metricsList.sort((m1, m2) -> m1.get("name").textValue().compareTo(m2.get("name").textValue()));
metricsList.sort(Comparator.comparing(m -> m.get("name").textValue()));

JsonNode m1 = metricsList.get(0);
assertEquals("name1", m1.get("name").textValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -39,12 +38,11 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Timeout(value = 600, unit = TimeUnit.SECONDS)
@Timeout(value = 600)
public class StreamsResetterTest {
private static final String TOPIC = "topic1";
private final StreamsResetter streamsResetter = new StreamsResetter();
Expand Down Expand Up @@ -256,7 +254,7 @@ public void shouldSeekToEndOffset() {
public void shouldDeleteTopic() throws InterruptedException, ExecutionException {
final Cluster cluster = createCluster(1);
try (final MockAdminClient adminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
final TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
final TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.emptyList());
adminClient.addTopic(false, TOPIC, Collections.singletonList(topicPartitionInfo), null);
streamsResetter.doDelete(Collections.singletonList(TOPIC), adminClient);
assertEquals(Collections.emptySet(), adminClient.listTopics().names().get());
Expand Down Expand Up @@ -300,8 +298,8 @@ private Cluster createCluster(final int numNodes) {
nodes.put(i, new Node(i, "localhost", 8121 + i));
}
return new Cluster("mockClusterId", nodes.values(),
Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(),
Collections.<String>emptySet(), nodes.get(0));
Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), nodes.get(0));
}

private static class EmptyPartitionConsumer<K, V> extends MockConsumer<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ToolsTestUtils {
/** @see TestInfoUtils#TestWithParameterizedQuorumAndGroupProtocolNames() */
public static final String TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES = "{displayName}.quorum={0}.groupProtocol={1}";

private static int randomPort = 0;
private static final int RANDOM_PORT = 0;

public static String captureStandardOut(Runnable runnable) {
return captureStandardStream(false, runnable);
Expand All @@ -68,7 +68,7 @@ private static String captureStandardStream(boolean isErr, Runnable runnable) {
System.setOut(tempStream);
try {
runnable.run();
return new String(outputStream.toByteArray()).trim();
return outputStream.toString().trim();
} finally {
if (isErr)
System.setErr(currentStream);
Expand Down Expand Up @@ -100,11 +100,11 @@ public static List<Properties> createBrokerProperties(int numConfigs, String zkC
List<Properties> result = new ArrayList<>();
int endingIdNumber = startingIdNumber + numConfigs - 1;
for (int node = startingIdNumber; node <= endingIdNumber; node++) {
result.add(TestUtils.createBrokerConfig(node, zkConnect, true, true, randomPort,
result.add(TestUtils.createBrokerConfig(node, zkConnect, true, true, RANDOM_PORT,
scala.Option.empty(),
scala.Option.empty(),
scala.Option.empty(),
true, false, randomPort, false, randomPort, false, randomPort,
true, false, RANDOM_PORT, false, RANDOM_PORT, false, RANDOM_PORT,
scala.Option.apply(rackInfo.get(node)),
logDirCount, enableToken, numPartitions, defaultReplicationFactor, false));
}
Expand Down
Loading

0 comments on commit 395fdae

Please sign in to comment.