Skip to content

Commit

Permalink
Synchronize VertX start (linkedin#3)
Browse files Browse the repository at this point in the history
This PR synchronizes the vertx start method so the start method will return only when the verticle has been deployed and is running.
  • Loading branch information
viktorsomogyi authored and KB913 committed Nov 3, 2021
1 parent b7ce2a6 commit ed55388
Showing 1 changed file with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@
import com.codahale.metrics.MetricRegistry;
import com.linkedin.kafka.cruisecontrol.async.AsyncKafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.linkedin.kafka.cruisecontrol.vertx.MainVerticle;
import io.vertx.core.Vertx;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


/**
* This is the main entry point for the Vertx based API.
*/
public class KafkaCruiseControlVertxApp extends KafkaCruiseControlApp {

protected static MainVerticle verticle;

KafkaCruiseControlVertxApp(KafkaCruiseControlConfig config, Integer port, String hostname) {
super(config, port, hostname);
}

//only for tests
//visible for testing
KafkaCruiseControlVertxApp(KafkaCruiseControlConfig config, Integer port,
String hostname, AsyncKafkaCruiseControl asyncKafkaCruiseControl, MetricRegistry metricRegistry) {
super(config, port, hostname, asyncKafkaCruiseControl, metricRegistry);
Expand All @@ -32,18 +39,28 @@ public String serverUrl() {

@Override
void start() {
_kafkaCruiseControl.startUp();
CountDownLatch latch = new CountDownLatch(1);
if (LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.NOT_STARTED
.equals(_kafkaCruiseControl.getLoadMonitorTaskRunnerState())) {
_kafkaCruiseControl.startUp();
}
Vertx vertx = Vertx.vertx();
verticle = new MainVerticle(_kafkaCruiseControl, _metricRegistry, _port, _hostname);
vertx.deployVerticle(verticle);
vertx.deployVerticle(verticle, event -> {
if (event.failed()) {
throw new RuntimeException(event.cause());
}
latch.countDown();
});
try {
latch.await(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException("Startup interrupted", e);
}
}

//only for tests
/**
* Used only for tests
* @return the verticle
*/
public static MainVerticle getVerticle() throws Exception {
//visible for testing
static MainVerticle getVerticle() throws Exception {
if (verticle == null) {
throw new Exception();
}
Expand Down

0 comments on commit ed55388

Please sign in to comment.