Skip to content

Commit aa74e02

Browse files
authored
Fix request queue delaying (#119)
Motivation: The request queue could contain cancelled tasks, which caused delays and stalls. Modification: Implemented Filtered out cancelled tasks from the queue. Result: The queue now functions better. Resolves issue #114
1 parent 8fe3715 commit aa74e02

File tree

4 files changed

+55
-22
lines changed

4 files changed

+55
-22
lines changed

src/main/java/io/asyncer/r2dbc/mysql/client/RequestQueue.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,29 @@ final class RequestQueue extends ActiveStatus implements Runnable {
6565
*/
6666
@Override
6767
public void run() {
68-
RequestTask<?> task = queue.poll();
69-
70-
if (task == null) {
71-
// Queue was empty, set it to idle if it is not disposed.
72-
STATUS_UPDATER.compareAndSet(this, ACTIVE, IDLE);
73-
} else {
74-
int status = this.status;
75-
76-
if (status == DISPOSE) {
68+
for (;;) {
69+
RequestTask<?> task = queue.poll();
70+
final int status = this.status;
71+
72+
if (task == null) {
73+
// Queue was empty, set it to idle if it is not disposed.
74+
if (status != ACTIVE || STATUS_UPDATER.compareAndSet(this, ACTIVE, IDLE) && queue.isEmpty()) {
75+
return;
76+
}
77+
} else if (status == DISPOSE) {
7778
// Cancel and no need clear queue because it should be cleared by other one.
7879
task.cancel(requireDisposed());
80+
return;
7981
} else {
8082
task.run();
83+
// The execution of a canceled task would result in a stall of the request queue.
84+
// refer: https://github.com/asyncer-io/r2dbc-mysql/issues/114
85+
if (!task.isCancelled()) {
86+
return;
8187
}
8288
}
8389
}
90+
}
8491

8592
/**
8693
* Submit an exchange task. If the queue is inactive, it will execute directly instead of queuing.
@@ -90,13 +97,6 @@ public void run() {
9097
* @param <T> the type argument of {@link RequestTask}.
9198
*/
9299
<T> void submit(RequestTask<T> task) {
93-
if (STATUS_UPDATER.compareAndSet(this, IDLE, ACTIVE)) {
94-
// Fast path for general way.
95-
task.run();
96-
return;
97-
}
98-
99-
// Check dispose after fast path failed.
100100
int status = this.status;
101101

102102
if (status == DISPOSE) {

src/main/java/io/asyncer/r2dbc/mysql/client/RequestTask.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ final class RequestTask<T> {
3838

3939
private final T supplier;
4040

41+
private volatile boolean isCancelled;
42+
4143
private RequestTask(@Nullable Disposable disposable, MonoSink<T> sink, T supplier) {
4244
this.disposable = disposable;
4345
this.sink = sink;
@@ -54,26 +56,43 @@ void run() {
5456
* @param e cancelled by which error
5557
*/
5658
void cancel(Throwable e) {
59+
cancel0();
60+
sink.error(e);
61+
}
62+
63+
boolean isCancelled() {
64+
return isCancelled;
65+
}
66+
67+
private void cancel0() {
5768
if (disposable != null) {
5869
disposable.dispose();
5970
}
60-
sink.error(e);
71+
isCancelled = true;
6172
}
6273

6374
static <T> RequestTask<T> wrap(ClientMessage message, MonoSink<T> sink, T supplier) {
75+
final RequestTask<T> task;
6476
if (message instanceof Disposable) {
65-
return new RequestTask<>((Disposable) message, sink, supplier);
66-
}
77+
task = new RequestTask<>((Disposable) message, sink, supplier);
78+
} else {
79+
task = new RequestTask<>(null, sink, supplier);
6780

68-
return new RequestTask<>(null, sink, supplier);
81+
}
82+
sink.onCancel(() -> task.cancel0());
83+
return task;
6984
}
7085

7186
static <T> RequestTask<T> wrap(Flux<? extends ClientMessage> messages, MonoSink<T> sink, T supplier) {
72-
return new RequestTask<>(new DisposableFlux(messages), sink, supplier);
87+
final RequestTask<T> task = new RequestTask<>(new DisposableFlux(messages), sink, supplier);
88+
sink.onCancel(() -> task.cancel0());
89+
return task;
7390
}
7491

7592
static <T> RequestTask<T> wrap(MonoSink<T> sink, T supplier) {
76-
return new RequestTask<>(null, sink, supplier);
93+
final RequestTask<T> task = new RequestTask<>(null, sink, supplier);
94+
sink.onCancel(() -> task.cancel0());
95+
return task;
7796
}
7897

7998
private static final class DisposableFlux implements Disposable {

src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,16 @@ void setTransactionIsolationLevel() {
233233
.doOnNext(a -> a.isEqualTo(connection.getTransactionIsolationLevel()))));
234234
}
235235

236+
@Test
237+
void errorPropagteRequestQueue() {
238+
illegalArgument(connection -> Flux.merge(
239+
connection.createStatement("SELECT 'Result 1', SLEEP(1)").execute(),
240+
connection.createStatement("SELECT 'Result 2'").execute(),
241+
connection.createStatement("SELECT 'Result 3'").execute()
242+
).flatMap(result -> result.map((row, meta) -> row.get(0, Integer.class)))
243+
);
244+
}
245+
236246
@Test
237247
void batchCrud() {
238248
// TODO: spilt it to multiple test cases and move it to BatchIntegrationTest

src/test/java/io/asyncer/r2dbc/mysql/IntegrationTestSupport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ void badGrammar(Function<? super MySqlConnection, Publisher<?>> runner) {
5050
process(runner).verifyError(R2dbcBadGrammarException.class);
5151
}
5252

53+
void illegalArgument(Function<? super MySqlConnection, Publisher<?>> runner) {
54+
process(runner).expectError(IllegalArgumentException.class).verify(Duration.ofSeconds(3));
55+
}
56+
5357
Mono<MySqlConnection> create() {
5458
return connectionFactory.create();
5559
}

0 commit comments

Comments
 (0)