Skip to content

Commit

Permalink
Resilient resource creation (#362)
Browse files Browse the repository at this point in the history
* Resilient resource creation

# Conflicts:
#	benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java
#	driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkDriver.java

Resilient resource creation

# Conflicts:
#	benchmark-framework/src/main/java/io/openmessaging/benchmark/worker/LocalWorker.java
#	driver-api/src/main/java/io/openmessaging/benchmark/driver/BenchmarkDriver.java
#	driver-rabbitmq/deploy/templates/rabbitmq-quorum.yaml

License header

Missing import

spotless

* findbugs exclude
  • Loading branch information
teabot authored Jan 25, 2023
1 parent d04724a commit c3e271d
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.openmessaging.benchmark.DriverConfiguration;
import io.openmessaging.benchmark.driver.BenchmarkConsumer;
import io.openmessaging.benchmark.driver.BenchmarkDriver;
import io.openmessaging.benchmark.driver.BenchmarkDriver.ConsumerInfo;
import io.openmessaging.benchmark.driver.BenchmarkDriver.ProducerInfo;
import io.openmessaging.benchmark.driver.BenchmarkDriver.TopicInfo;
import io.openmessaging.benchmark.driver.BenchmarkProducer;
import io.openmessaging.benchmark.driver.ConsumerCallback;
Expand All @@ -45,11 +47,11 @@
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand Down Expand Up @@ -125,24 +127,35 @@ private String generateTopicName(int i) {
@Override
public void createProducers(List<String> topics) {
Timer timer = new Timer();
AtomicInteger index = new AtomicInteger();

List<CompletableFuture<BenchmarkProducer>> futures =
topics.stream().map(topic -> benchmarkDriver.createProducer(topic)).collect(toList());
producers.addAll(
benchmarkDriver
.createProducers(
topics.stream()
.map(t -> new ProducerInfo(index.getAndIncrement(), t))
.collect(toList()))
.join());

futures.forEach(f -> producers.add(f.join()));
log.info("Created {} producers in {} ms", producers.size(), timer.elapsedMillis());
}

@Override
public void createConsumers(ConsumerAssignment consumerAssignment) {
Timer timer = new Timer();
AtomicInteger index = new AtomicInteger();

consumers.addAll(
benchmarkDriver
.createConsumers(
consumerAssignment.topicsSubscriptions.stream()
.map(
c ->
new ConsumerInfo(
index.getAndIncrement(), c.topic, c.subscription, this))
.collect(toList()))
.join());

List<CompletableFuture<BenchmarkConsumer>> futures =
consumerAssignment.topicsSubscriptions.stream()
.map(ts -> benchmarkDriver.createConsumer(ts.topic, ts.subscription, this))
.collect(toList());

futures.forEach(f -> consumers.add(f.join()));
log.info("Created {} consumers in {} ms", consumers.size(), timer.elapsedMillis());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.openmessaging.benchmark.driver;

import static java.util.stream.Collectors.toList;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -75,6 +76,19 @@ default CompletableFuture<Void> createTopics(List<TopicInfo> topicInfos) {
*/
CompletableFuture<BenchmarkProducer> createProducer(String topic);

/**
* Create a producers for a given topic.
*
* @param producers
* @return a producers future
*/
default CompletableFuture<List<BenchmarkProducer>> createProducers(List<ProducerInfo> producers) {
List<CompletableFuture<BenchmarkProducer>> futures =
producers.stream().map(ci -> createProducer(ci.getTopic())).collect(toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(toList()));
}

/**
* Create a benchmark consumer relative to one particular topic and subscription.
*
Expand All @@ -89,9 +103,41 @@ default CompletableFuture<Void> createTopics(List<TopicInfo> topicInfos) {
CompletableFuture<BenchmarkConsumer> createConsumer(
String topic, String subscriptionName, ConsumerCallback consumerCallback);

/**
* Create a consumers for a given topic.
*
* @param consumers
* @return a consumers future
*/
default CompletableFuture<List<BenchmarkConsumer>> createConsumers(List<ConsumerInfo> consumers) {
List<CompletableFuture<BenchmarkConsumer>> futures =
consumers.stream()
.map(
ci ->
createConsumer(
ci.getTopic(), ci.getSubscriptionName(), ci.getConsumerCallback()))
.collect(toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(toList()));
}

@Value
class TopicInfo {
String topic;
int partitions;
}

@Value
class ProducerInfo {
int id;
String topic;
}

@Value
class ConsumerInfo {
int id;
String topic;
String subscriptionName;
ConsumerCallback consumerCallback;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.benchmark.driver;

import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toMap;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class ResourceCreator<R, C> {
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final String name;
private final int maxBatchSize;
private final long interBatchDelayMs;
private final Function<List<R>, Map<R, CompletableFuture<C>>> invokeBatchFn;
private final Function<CompletableFuture<C>, CreationResult<C>> complete;

public CompletableFuture<List<C>> create(List<R> resources) {
return CompletableFuture.completedFuture(createBlocking(resources));
}

private List<C> createBlocking(List<R> resources) {
BlockingQueue<R> queue = new ArrayBlockingQueue<>(resources.size(), true, resources);
List<R> batch = new ArrayList<>();
List<C> created = new ArrayList<>();
AtomicInteger succeeded = new AtomicInteger();

ScheduledFuture<?> loggingFuture =
executor.scheduleAtFixedRate(
() -> log.info("Created {}s {}/{}", name, succeeded.get(), resources.size()),
10,
10,
SECONDS);

try {
while (succeeded.get() < resources.size()) {
int batchSize = queue.drainTo(batch, maxBatchSize);
if (batchSize > 0) {
executeBatch(batch)
.forEach(
(resource, result) -> {
if (result.success) {
created.add(result.created);
succeeded.incrementAndGet();
} else {
//noinspection ResultOfMethodCallIgnored
queue.offer(resource);
}
});
batch.clear();
}
}
} finally {
loggingFuture.cancel(true);
}
return created;
}

@SneakyThrows
private Map<R, CreationResult<C>> executeBatch(List<R> batch) {
log.debug("Executing batch, size: {}", batch.size());
Thread.sleep(interBatchDelayMs);
return invokeBatchFn.apply(batch).entrySet().stream()
.collect(toMap(Map.Entry::getKey, e -> complete.apply(e.getValue())));
}

@Value
public static class CreationResult<C> {
C created;
boolean success;
}
}
9 changes: 8 additions & 1 deletion driver-rabbitmq/deploy/templates/rabbitmq-quorum.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@ driverClass: io.openmessaging.benchmark.driver.rabbitmq.RabbitMqBenchmarkDriver

# RabbitMq client specific configurations

# RMQ struggles to create more than 10 quroum queues at a time, so we batch with a delay
producerCreationDelay: 100
producerCreationBatchSize: 5
consumerCreationDelay: 100
consumerCreationBatchSize: 5

amqpUris:
{% for pulsar in groups['rabbitmq'] %}
- amqp://admin:admin@{{ hostvars[pulsar].private_ip }}:5672
{% endfor %}
messagePersistence: false

# messagePersistence setting is ignored in the quorum implementation
queueType: QUORUM
7 changes: 6 additions & 1 deletion driver-rabbitmq/rabbitmq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ driverClass: io.openmessaging.benchmark.driver.rabbitmq.RabbitMqBenchmarkDriver

# RabbitMq client specific configurations

producerCreationDelay: 100
producerCreationBatchSize: 5
consumerCreationDelay: 100
consumerCreationBatchSize: 5

amqpUris:
- amqp://localhost
messagePersistence: false
queueType: CLASSIC
queueType: QUORUM
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.openmessaging.benchmark.driver.rabbitmq;

import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -29,6 +30,8 @@
import io.openmessaging.benchmark.driver.BenchmarkDriver;
import io.openmessaging.benchmark.driver.BenchmarkProducer;
import io.openmessaging.benchmark.driver.ConsumerCallback;
import io.openmessaging.benchmark.driver.ResourceCreator;
import io.openmessaging.benchmark.driver.ResourceCreator.CreationResult;
import java.io.File;
import java.io.IOException;
import java.net.URI;
Expand All @@ -38,6 +41,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
Expand Down Expand Up @@ -120,6 +124,57 @@ public CompletableFuture<BenchmarkProducer> createProducer(String topic) {
return future;
}

@Override
public CompletableFuture<List<BenchmarkProducer>> createProducers(List<ProducerInfo> producers) {
return new ResourceCreator<ProducerInfo, BenchmarkProducer>(
"producer",
config.producerCreationBatchSize,
config.producerCreationDelay,
ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))),
fc -> {
try {
return new CreationResult<>(fc.get(), true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
log.debug(e.getMessage());
return new CreationResult<>(null, false);
}
})
.create(producers);
}

@Override
public CompletableFuture<List<BenchmarkConsumer>> createConsumers(List<ConsumerInfo> consumers) {
return new ResourceCreator<ConsumerInfo, BenchmarkConsumer>(
"consumer",
config.consumerCreationBatchSize,
config.consumerCreationDelay,
cs ->
cs.stream()
.collect(
toMap(
c -> c,
c ->
createConsumer(
c.getTopic(),
c.getSubscriptionName(),
c.getConsumerCallback()))),
fc -> {
try {
return new CreationResult<>(fc.get(), true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
log.debug(e.getMessage());
return new CreationResult<>(null, false);
}
})
.create(consumers);
}

@Override
public CompletableFuture<BenchmarkConsumer> createConsumer(
String topic, String subscriptionName, ConsumerCallback consumerCallback) {
Expand Down Expand Up @@ -163,9 +218,6 @@ private Connection getOrCreateConnection(String primaryBrokerUri) {
return connections.computeIfAbsent(
primaryBrokerUri,
p -> {
String[] userInfo = newURI(primaryBrokerUri).getUserInfo().split(":");
String user = userInfo[0];
String password = userInfo[1];
// RabbitMQ will pick the first available address from the list. Future reconnection
// attempts will pick a random accessible address from the provided list.
List<Address> addresses =
Expand All @@ -176,11 +228,15 @@ private Connection getOrCreateConnection(String primaryBrokerUri) {
try {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setUsername(user);
connectionFactory.setPassword(password);
String userInfo = newURI(primaryBrokerUri).getUserInfo();
if (userInfo != null) {
String[] userInfoElems = userInfo.split(":");
connectionFactory.setUsername(userInfoElems[0]);
connectionFactory.setPassword(userInfoElems[1]);
}
return connectionFactory.newConnection(addresses);
} catch (Exception e) {
throw new RuntimeException("Couldn't establish connection", e);
throw new RuntimeException("Couldn't establish connection to: " + primaryBrokerUri, e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ public class RabbitMqConfig {
public List<String> amqpUris = new ArrayList<>();
public boolean messagePersistence = false;
public QueueType queueType = CLASSIC;
public long producerCreationDelay = 100;
public int producerCreationBatchSize = 5;
public long consumerCreationDelay = 100;
public int consumerCreationBatchSize = 5;

public enum QueueType {
CLASSIC {
Expand Down
Loading

0 comments on commit c3e271d

Please sign in to comment.