Skip to content

Commit

Permalink
KAFKA-18275 Restarting broker in testing should use the same port (#1…
Browse files Browse the repository at this point in the history
…8381)

Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
peterxcli authored Feb 6, 2025
1 parent 780640f commit 0621c0b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.kafka.connect.util.SinkUtils;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.test.TestUtils;

import org.junit.jupiter.api.AfterEach;
Expand All @@ -57,8 +56,6 @@

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -247,8 +244,6 @@ public void testBrokerCoordinator() throws Exception {
ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000));

useFixedBrokerPort();

// start the clusters
connect = connectBuilder.build();
connect.start();
Expand Down Expand Up @@ -813,8 +808,6 @@ public void testRequestTimeouts() throws Exception {
workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
workerProps.put(METADATA_RECOVERY_STRATEGY_CONFIG, MetadataRecoveryStrategy.NONE.name);

useFixedBrokerPort();

connect = connectBuilder
.numWorkers(1)
.build();
Expand Down Expand Up @@ -1431,23 +1424,6 @@ private Map<String, String> defaultSourceConnectorProps(String topic) {
return props;
}

private void useFixedBrokerPort() throws IOException {
// Find a free port and use it in the Kafka broker's listeners config. We can't use port 0 in the listeners
// config to get a random free port because in this test we want to stop the Kafka broker and then bring it
// back up and listening on the same port in order to verify that the Connect cluster can re-connect to Kafka
// and continue functioning normally. If we were to use port 0 here, the Kafka broker would most likely listen
// on a different random free port the second time it is started. Note that we can only use the static port
// because we have a single broker setup in this test.
int listenerPort;
try (ServerSocket s = new ServerSocket(0)) {
listenerPort = s.getLocalPort();
}
brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG, String.format("EXTERNAL://localhost:%d,CONTROLLER://localhost:0", listenerPort));
connectBuilder
.numBrokers(1)
.brokerProps(brokerProps);
}

public static class EmptyTaskConfigsConnector extends SinkConnector {
@Override
public String version() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,35 +206,42 @@ private void setSecurityProtocolProps(Map<String, Object> props, String security
}
}

public KafkaClusterTestKit build() throws Exception {
Map<Integer, ControllerServer> controllers = new HashMap<>();
Map<Integer, BrokerServer> brokers = new HashMap<>();
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
File jaasFile = null;

private Optional<File> maybeSetupJaasFile() throws Exception {
if (brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
jaasFile = JaasUtils.writeJaasContextsToFile(Set.of(
File file = JaasUtils.writeJaasContextsToFile(Set.of(
new JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
List.of(
JaasModule.plainLoginModule(
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
JaasUtils.KAFKA_PLAIN_ADMIN,
JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD,
true,
Map.of(
JaasUtils.KAFKA_PLAIN_USER1, JaasUtils.KAFKA_PLAIN_USER1_PASSWORD,
JaasUtils.KAFKA_PLAIN_ADMIN, JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD)
)
)
)
)
));
JaasUtils.refreshJavaLoginConfigParam(jaasFile);
JaasUtils.refreshJavaLoginConfigParam(file);
return Optional.of(file);
}
return Optional.empty();
}

public KafkaClusterTestKit build() throws Exception {
Map<Integer, ControllerServer> controllers = new HashMap<>();
Map<Integer, BrokerServer> brokers = new HashMap<>();
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
Optional<File> jaasFile = maybeSetupJaasFile();
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
socketFactoryManager.getOrCreatePortForListener(node.id(), controllerListenerName);
}
for (TestKitNode node : nodes.brokerNodes().values()) {
socketFactoryManager.getOrCreatePortForListener(node.id(), brokerListenerName);
}
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
KafkaConfig config = createNodeConfig(node);
Expand Down Expand Up @@ -308,7 +315,7 @@ public KafkaClusterTestKit build() throws Exception {
baseDirectory,
faultHandlerFactory,
socketFactoryManager,
jaasFile == null ? Optional.empty() : Optional.of(jaasFile));
jaasFile);
}

private String listeners(int node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
Expand Down Expand Up @@ -354,6 +356,28 @@ public void testControllerListenerName(ClusterInstance cluster) throws Execution
}
}

@ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, brokers = 1)
public void testBrokerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException {
final String topicName = "topic";
try (Admin admin = cluster.admin();
Producer<String, String> producer = cluster.producer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()))) {
admin.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();

cluster.waitForTopic(topicName, 1);

cluster.brokers().values().forEach(broker -> {
broker.shutdown();
broker.awaitShutdown();
broker.startup();
});

RecordMetadata recordMetadata0 = producer.send(new ProducerRecord<>(topicName, 0, "key 0", "value 0")).get();
assertEquals(0, recordMetadata0.offset());
}
}

@ClusterTest(types = {Type.KRAFT})
public void testControllerRestart(ClusterInstance cluster) throws ExecutionException, InterruptedException {
try (Admin admin = cluster.admin()) {
Expand Down

0 comments on commit 0621c0b

Please sign in to comment.