Skip to content

Commit 00502be

Browse files
author
Dave Maughan
authored
New max sustainable rate implementation (#329)
## Motivation Currently the WorkloadGenerator.findMaximumSustainableRate algorithm introduces publish delay. It has been observed the algorithm settles on a publish rate that neither producers nor consumers can keep up with. For example, on a Kafka workload with 100 topics, 1kb message, the algorithm settled on a publish rate of 2.5m msg/s when the producers could only actually achieve 2m msg/s. ## Changes Implement a new algorithm that checks for both receive backlog and publish backlog and adjusts the publish according and has a progressive rate ramp up when there is no observed backlog.
1 parent 5967b3f commit 00502be

File tree

9 files changed

+325
-100
lines changed

9 files changed

+325
-100
lines changed

benchmark-framework/pom.xml

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,6 @@
118118
<groupId>org.apache.logging.log4j</groupId>
119119
<artifactId>log4j-slf4j-impl</artifactId>
120120
</dependency>
121-
<dependency>
122-
<groupId>org.assertj</groupId>
123-
<artifactId>assertj-core</artifactId>
124-
</dependency>
125121
<dependency>
126122
<groupId>org.asynchttpclient</groupId>
127123
<artifactId>async-http-client</artifactId>
@@ -143,13 +139,31 @@
143139
<artifactId>HdrHistogram</artifactId>
144140
<version>2.1.12</version>
145141
</dependency>
142+
<dependency>
143+
<groupId>org.projectlombok</groupId>
144+
<artifactId>lombok</artifactId>
145+
<scope>provided</scope>
146+
</dependency>
147+
<dependency>
148+
<groupId>com.github.stefanbirkner</groupId>
149+
<artifactId>system-lambda</artifactId>
150+
<version>1.2.1</version>
151+
<scope>test</scope>
152+
</dependency>
153+
<dependency>
154+
<groupId>org.assertj</groupId>
155+
<artifactId>assertj-core</artifactId>
156+
<scope>test</scope>
157+
</dependency>
146158
<dependency>
147159
<groupId>org.junit.jupiter</groupId>
148160
<artifactId>junit-jupiter</artifactId>
161+
<scope>test</scope>
149162
</dependency>
150163
<dependency>
151164
<groupId>org.mockito</groupId>
152165
<artifactId>mockito-junit-jupiter</artifactId>
166+
<scope>test</scope>
153167
</dependency>
154168
</dependencies>
155169

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.openmessaging.benchmark;
15+
16+
import static java.util.concurrent.TimeUnit.SECONDS;
17+
import static lombok.AccessLevel.PACKAGE;
18+
19+
import io.openmessaging.benchmark.utils.Env;
20+
import lombok.Getter;
21+
import lombok.extern.slf4j.Slf4j;
22+
23+
@Slf4j
24+
class RateController {
25+
private static final long ONE_SECOND_IN_NANOS = SECONDS.toNanos(1);
26+
private final long publishBacklogLimit;
27+
private final long receiveBacklogLimit;
28+
private final double minRampingFactor;
29+
private final double maxRampingFactor;
30+
31+
@Getter(PACKAGE)
32+
private double rampingFactor;
33+
34+
private long previousTotalPublished = 0;
35+
private long previousTotalReceived = 0;
36+
37+
RateController() {
38+
publishBacklogLimit = Env.getLong("PUBLISH_BACKLOG_LIMIT", 1_000);
39+
receiveBacklogLimit = Env.getLong("RECEIVE_BACKLOG_LIMIT", 1_000);
40+
minRampingFactor = Env.getDouble("MIN_RAMPING_FACTOR", 0.01);
41+
maxRampingFactor = Env.getDouble("MAX_RAMPING_FACTOR", 1);
42+
rampingFactor = maxRampingFactor;
43+
}
44+
45+
double nextRate(double rate, long periodNanos, long totalPublished, long totalReceived) {
46+
long expected = (long) ((rate / ONE_SECOND_IN_NANOS) * periodNanos);
47+
long published = totalPublished - previousTotalPublished;
48+
long received = totalReceived - previousTotalReceived;
49+
50+
previousTotalPublished = totalPublished;
51+
previousTotalReceived = totalReceived;
52+
53+
if (log.isDebugEnabled()) {
54+
log.debug(
55+
"Current rate: {} -- Publish rate {} -- Receive Rate: {}",
56+
rate,
57+
rate(published, periodNanos),
58+
rate(received, periodNanos));
59+
}
60+
61+
long receiveBacklog = totalPublished - totalReceived;
62+
if (receiveBacklog > receiveBacklogLimit) {
63+
return nextRate(periodNanos, received, expected, receiveBacklog, "Receive");
64+
}
65+
66+
long publishBacklog = expected - published;
67+
if (publishBacklog > publishBacklogLimit) {
68+
return nextRate(periodNanos, published, expected, publishBacklog, "Publish");
69+
}
70+
71+
rampUp();
72+
73+
return rate + (rate * rampingFactor);
74+
}
75+
76+
private double nextRate(long periodNanos, long actual, long expected, long backlog, String type) {
77+
log.debug("{} backlog: {}", type, backlog);
78+
rampDown();
79+
long nextExpected = Math.max(0, expected - backlog);
80+
double nextExpectedRate = rate(nextExpected, periodNanos);
81+
double actualRate = rate(actual, periodNanos);
82+
return Math.min(actualRate, nextExpectedRate);
83+
}
84+
85+
private double rate(long count, long periodNanos) {
86+
return (count / (double) periodNanos) * ONE_SECOND_IN_NANOS;
87+
}
88+
89+
private void rampUp() {
90+
rampingFactor = Math.min(maxRampingFactor, rampingFactor * 2);
91+
}
92+
93+
private void rampDown() {
94+
rampingFactor = Math.max(minRampingFactor, rampingFactor / 2);
95+
}
96+
}

benchmark-framework/src/main/java/io/openmessaging/benchmark/WorkloadGenerator.java

Lines changed: 5 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -191,20 +191,13 @@ private void ensureTopicsAreReady() throws IOException {
191191
*
192192
* @param currentRate
193193
*/
194-
@SuppressWarnings("checkstyle:LineLength")
195194
private void findMaximumSustainableRate(double currentRate) throws IOException {
196-
double maxRate = Double.MAX_VALUE; // Discovered max sustainable rate
197-
double minRate = 0.1;
198-
199195
CountersStats stats = worker.getCountersStats();
200196

201-
long localTotalMessagesSentCounter = stats.messagesSent;
202-
long localTotalMessagesReceivedCounter = stats.messagesReceived;
203-
204197
int controlPeriodMillis = 3000;
205198
long lastControlTimestamp = System.nanoTime();
206199

207-
int successfulPeriods = 0;
200+
RateController rateController = new RateController();
208201

209202
while (!runCompleted) {
210203
// Check every few seconds and adjust the rate
@@ -217,97 +210,13 @@ private void findMaximumSustainableRate(double currentRate) throws IOException {
217210
// Consider multiple copies when using multiple subscriptions
218211
stats = worker.getCountersStats();
219212
long currentTime = System.nanoTime();
220-
long totalMessagesSent = stats.messagesSent;
221-
long totalMessagesReceived = stats.messagesReceived;
222-
long messagesPublishedInPeriod = totalMessagesSent - localTotalMessagesSentCounter;
223-
long messagesReceivedInPeriod = totalMessagesReceived - localTotalMessagesReceivedCounter;
224-
double publishRateInLastPeriod =
225-
messagesPublishedInPeriod
226-
/ (double) (currentTime - lastControlTimestamp)
227-
* TimeUnit.SECONDS.toNanos(1);
228-
double receiveRateInLastPeriod =
229-
messagesReceivedInPeriod
230-
/ (double) (currentTime - lastControlTimestamp)
231-
* TimeUnit.SECONDS.toNanos(1);
232-
233-
if (log.isDebugEnabled()) {
234-
log.debug(
235-
"total-send: {} -- total-received: {} -- int-sent: {} -- int-received: {} -- sent-rate: {} -- received-rate: {}",
236-
totalMessagesSent,
237-
totalMessagesReceived,
238-
messagesPublishedInPeriod,
239-
messagesReceivedInPeriod,
240-
publishRateInLastPeriod,
241-
receiveRateInLastPeriod);
242-
}
213+
long periodNanos = currentTime - lastControlTimestamp;
243214

244-
localTotalMessagesSentCounter = totalMessagesSent;
245-
localTotalMessagesReceivedCounter = totalMessagesReceived;
246215
lastControlTimestamp = currentTime;
247216

248-
if (log.isDebugEnabled()) {
249-
log.debug(
250-
"Current rate: {} -- Publish rate {} -- Consume Rate: {} -- min-rate: {} -- max-rate: {}",
251-
dec.format(currentRate),
252-
dec.format(publishRateInLastPeriod),
253-
dec.format(receiveRateInLastPeriod),
254-
dec.format(minRate),
255-
dec.format(maxRate));
256-
}
257-
258-
if (publishRateInLastPeriod < currentRate * 0.95) {
259-
// Producer is not able to publish as fast as requested
260-
maxRate = currentRate * 1.1;
261-
currentRate = minRate + (currentRate - minRate) / 2;
262-
263-
log.debug("Publishers are not meeting requested rate. reducing to {}", currentRate);
264-
} else if (receiveRateInLastPeriod < publishRateInLastPeriod * 0.98) {
265-
// If the consumers are building backlog, we should slow down publish rate
266-
maxRate = currentRate;
267-
currentRate = minRate + (currentRate - minRate) / 2;
268-
log.debug("Consumers are not meeting requested rate. reducing to {}", currentRate);
269-
270-
// Slows the publishes to let the consumer time to absorb the backlog
271-
worker.adjustPublishRate(minRate / 10);
272-
while (true) {
273-
stats = worker.getCountersStats();
274-
long backlog =
275-
workload.subscriptionsPerTopic * stats.messagesSent - stats.messagesReceived;
276-
if (backlog < 1000) {
277-
break;
278-
}
279-
280-
try {
281-
Thread.sleep(100);
282-
} catch (InterruptedException e) {
283-
return;
284-
}
285-
}
286-
287-
log.debug("Resuming load at reduced rate");
288-
worker.adjustPublishRate(currentRate);
289-
290-
try {
291-
// Wait some more time for the publish rate to catch up
292-
Thread.sleep(500);
293-
} catch (InterruptedException e) {
294-
return;
295-
}
296-
297-
stats = worker.getCountersStats();
298-
localTotalMessagesSentCounter = stats.messagesSent;
299-
localTotalMessagesReceivedCounter = stats.messagesReceived;
300-
301-
} else if (currentRate < maxRate) {
302-
minRate = currentRate;
303-
currentRate = Math.min(currentRate * 2, maxRate);
304-
log.debug("No bottleneck found, increasing the rate to {}", currentRate);
305-
} else if (++successfulPeriods > 3) {
306-
minRate = currentRate * 0.95;
307-
maxRate = currentRate * 1.05;
308-
successfulPeriods = 0;
309-
}
310-
217+
currentRate =
218+
rateController.nextRate(
219+
currentRate, periodNanos, stats.messagesSent, stats.messagesReceived);
311220
worker.adjustPublishRate(currentRate);
312221
}
313222
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.openmessaging.benchmark.utils;
15+
16+
17+
import java.util.Optional;
18+
import java.util.function.Function;
19+
20+
public final class Env {
21+
private Env() {}
22+
23+
public static long getLong(String key, long defaultValue) {
24+
return get(key, Long::parseLong, defaultValue);
25+
}
26+
27+
public static double getDouble(String key, double defaultValue) {
28+
return get(key, Double::parseDouble, defaultValue);
29+
}
30+
31+
public static <T> T get(String key, Function<String, T> function, T defaultValue) {
32+
return Optional.ofNullable(System.getenv(key)).map(function).orElse(defaultValue);
33+
}
34+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.openmessaging.benchmark;
15+
16+
import static java.util.concurrent.TimeUnit.SECONDS;
17+
import static org.assertj.core.api.Assertions.assertThat;
18+
19+
import org.junit.jupiter.api.Test;
20+
21+
class RateControllerTest {
22+
private final RateController rateController = new RateController();
23+
private double rate = 10_000;
24+
private long periodNanos = SECONDS.toNanos(1);
25+
26+
@Test
27+
void receiveBacklog() {
28+
assertThat(rateController.getRampingFactor()).isEqualTo(1);
29+
30+
// no backlog
31+
rate = rateController.nextRate(rate, periodNanos, 10_000, 10_000);
32+
assertThat(rate).isEqualTo(20_000);
33+
assertThat(rateController.getRampingFactor()).isEqualTo(1);
34+
35+
// receive backlog
36+
rate = rateController.nextRate(rate, periodNanos, 20_000, 15_000);
37+
assertThat(rate).isEqualTo(5_000);
38+
assertThat(rateController.getRampingFactor()).isEqualTo(0.5);
39+
}
40+
41+
@Test
42+
void publishBacklog() {
43+
assertThat(rateController.getRampingFactor()).isEqualTo(1);
44+
45+
// no backlog
46+
rate = rateController.nextRate(rate, periodNanos, 10_000, 10_000);
47+
assertThat(rate).isEqualTo(20_000);
48+
assertThat(rateController.getRampingFactor()).isEqualTo(1);
49+
50+
// publish backlog
51+
rate = rateController.nextRate(rate, periodNanos, 15_000, 20_000);
52+
assertThat(rate).isEqualTo(5_000);
53+
assertThat(rateController.getRampingFactor()).isEqualTo(0.5);
54+
}
55+
56+
@Test
57+
void rampUp() {
58+
assertThat(rateController.getRampingFactor()).isEqualTo(1);
59+
60+
// receive backlog
61+
rate = rateController.nextRate(rate, periodNanos, 10_000, 5_000);
62+
assertThat(rate).isEqualTo(5_000);
63+
assertThat(rateController.getRampingFactor()).isEqualTo(0.5);
64+
65+
// no backlog
66+
rate = rateController.nextRate(rate, periodNanos, 20_000, 20_000);
67+
assertThat(rate).isEqualTo(10_000);
68+
assertThat(rateController.getRampingFactor()).isEqualTo(1);
69+
}
70+
}

0 commit comments

Comments
 (0)