Skip to content

Commit 3f9a3f9

Browse files
Merge pull request #15 from oracle/5-serialized-batch-results
Batch.execute() Serializes Statement Execution
2 parents d24c1bd + 269bb0e commit 3f9a3f9

File tree

3 files changed

+154
-26
lines changed

3 files changed

+154
-26
lines changed

src/main/java/oracle/r2dbc/impl/OracleBatchImpl.java

Lines changed: 131 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,16 @@
2424
import java.sql.Connection;
2525
import java.util.LinkedList;
2626
import java.util.Queue;
27+
import java.util.concurrent.CompletableFuture;
2728
import java.util.concurrent.atomic.AtomicBoolean;
29+
import java.util.concurrent.atomic.AtomicReference;
30+
import java.util.function.BiFunction;
2831

2932
import io.r2dbc.spi.Batch;
3033
import io.r2dbc.spi.R2dbcException;
3134
import io.r2dbc.spi.Result;
35+
import io.r2dbc.spi.Row;
36+
import io.r2dbc.spi.RowMetadata;
3237
import io.r2dbc.spi.Statement;
3338
import org.reactivestreams.Publisher;
3439
import reactor.core.publisher.Flux;
@@ -104,6 +109,18 @@ public Batch add(String sql) {
104109
* are executed in the order they were added. Calling this method clears all
105110
* statements that have been added to the current batch.
106111
* </p><p>
112+
* A {@code Result} emitted by the returned {@code Publisher} must be
113+
* <a href="OracleStatementImpl.html#fully-consumed-result">
114+
* fully-consumed
115+
* </a>
116+
* before the next {@code Result} is emitted. This ensures that a command in
117+
* the batch can not be executed while the {@code Result} of a previous
118+
* command is consumed concurrently. It is a known limitation of the Oracle
119+
* R2DBC Driver that concurrent operations on a single {@code Connection}
120+
* will result in blocked threads. Deferring {@code Statement} execution
121+
* until full consumption of the previous {@code Statement}'s {@code Result}
122+
* is necessary in order to avoid blocked threads.
123+
* </p><p>
107124
* If the execution of any statement in the sequence results in a failure,
108125
* then the returned publisher emits {@code onError} with an
109126
* {@link R2dbcException} that describes the failure, and all subsequent
@@ -126,17 +143,121 @@ public Publisher<? extends Result> execute() {
126143
statements = new LinkedList<>();
127144

128145
AtomicBoolean isSubscribed = new AtomicBoolean(false);
129-
return Flux.defer(() -> {
130-
if (isSubscribed.compareAndSet(false, true)) {
131-
return Flux.fromIterable(currentStatements)
132-
.concatMap(Statement::execute);
133-
}
134-
else {
135-
return Mono.error(new IllegalStateException(
146+
return Flux.defer(() -> isSubscribed.compareAndSet(false, true)
147+
? executeBatch(currentStatements)
148+
: Mono.error(new IllegalStateException(
136149
"Multiple subscribers are not supported by the Oracle R2DBC" +
137-
" Batch.execute() publisher"));
138-
}
139-
});
150+
" Batch.execute() publisher")));
151+
}
152+
153+
/**
154+
* Executes each {@code Statement} in a {@code Queue} of {@code statements}.
155+
* A {@code Statement} is not executed until the {@code Result} of any
156+
* previous {@code Statement} is fully-consumed.
157+
* @param statements {@code Statement}s to execute. Not null.
158+
* @return A {@code Publisher} of each {@code Statement}'s {@code Result}.
159+
* Not null.
160+
*/
161+
private static Publisher<? extends Result> executeBatch(
162+
Queue<Statement> statements) {
163+
164+
// Reference a Publisher that terminates when the previous Statement's
165+
// Result has been consumed.
166+
AtomicReference<Publisher<Void>> previous =
167+
new AtomicReference<>(Mono.empty());
168+
169+
return Flux.fromIterable(statements)
170+
.concatMap(statement -> {
171+
172+
// Complete when this statement's result is consumed
173+
CompletableFuture<Void> next = new CompletableFuture<>();
174+
175+
return Flux.from(statement.execute())
176+
// Delay execution by delaying Publisher.subscribe(Subscriber) until the
177+
// previous statement's result is consumed.
178+
.delaySubscription(
179+
// Update the reference; This statement is now the "previous"
180+
// statement.
181+
previous.getAndSet(Mono.fromCompletionStage(next)))
182+
// Batch result completes the "next" future when fully consumed.
183+
.map(result -> new BatchResult(next, result));
184+
});
185+
}
186+
187+
/**
188+
* <p>
189+
* A {@code Result} that completes a {@link CompletableFuture} when it has
190+
* been fully consumed. Instances of {@code BatchResult} are used by Oracle
191+
* R2DBC to ensure that statement execution and row data processing do
192+
* not occur concurrently; The completion of the future signals that the row
193+
* data of a result has been fully consumed, and that no more database
194+
* calls will be initiated to fetch additional rows.
195+
* </p><p>
196+
* Instances of {@code BatchResult} delegate invocations of
197+
* {@link #getRowsUpdated()} and {@link #map(BiFunction)} to a
198+
* {@code Result} provided on construction; The behavior of {@code Publisher}s
199+
* returned by these methods is identical to those returned by the delegate
200+
* {@code Result}.
201+
* </p>
202+
*/
203+
private static final class BatchResult implements Result {
204+
205+
/** Completed when this {@code BatchResult} is fully consumed */
206+
final CompletableFuture<Void> consumeFuture;
207+
208+
/** Delegate {@code Result} that provides row data or an update count */
209+
final Result delegateResult;
210+
211+
/**
212+
* Constructs a new result that completes a {@code consumeFuture} when the
213+
* row data or update count of a {@code delegateResult} has been fully
214+
* consumed.
215+
* @param consumeFuture Future completed upon consumption
216+
* @param delegateResult Result of row data or an update count
217+
*/
218+
BatchResult(CompletableFuture<Void> consumeFuture, Result delegateResult) {
219+
this.consumeFuture = consumeFuture;
220+
this.delegateResult = delegateResult;
221+
}
222+
223+
/**
224+
* {@inheritDoc}
225+
* <p>
226+
* Immediately completes the {@link #consumeFuture} and then returns the
227+
* update count {@code Publisher} of the {@link #delegateResult}. After
228+
* returning an update count {@code Publisher}, the {@link #delegateResult}
229+
* can not initiate any more database calls (based on the assumption
230+
* noted below).
231+
* </p>
232+
* @implNote It is assumed that the {@link #delegateResult} will throw
233+
* {@link IllegalStateException} upon multiple attempts to consume it, and
234+
* this method does not check for multiple consumptions.
235+
*/
236+
@Override
237+
public Publisher<Integer> getRowsUpdated() {
238+
consumeFuture.complete(null);
239+
return Flux.from(delegateResult.getRowsUpdated());
240+
}
241+
242+
/**
243+
* {@inheritDoc}
244+
* <p>
245+
* Completes the {@link #consumeFuture} after the row data {@code
246+
* Publisher} of the {@link #delegateResult} emits a terminal signal or
247+
* has it's {@code Subscription} cancelled. After emitting a terminal
248+
* signal or having it's {@code Subscription} cancelled, the
249+
* {@link #delegateResult} can not initiate any more database calls.
250+
* </p>
251+
* @implNote It is assumed that the {@link #delegateResult} will throw
252+
* {@link IllegalStateException} upon multiple attempts to consume it, and
253+
* this method does not check for multiple consumptions.
254+
*/
255+
@Override
256+
public <T> Publisher<T> map(
257+
BiFunction<Row, RowMetadata, ? extends T> mappingFunction) {
258+
return Flux.<T>from(delegateResult.map(mappingFunction))
259+
.doFinally(signalType -> consumeFuture.complete(null));
260+
}
140261
}
141262
}
142263

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,14 @@
6363
* Database and an ORA-01000 error will be raised. The Oracle R2DBC Driver
6464
* will close cursors after each {@link Result} emitted by the
6565
* {@link #execute()} publisher has been fully consumed.
66-
* </p><p>
66+
* </p><p id="fully-consumed-result">
6767
* To ensure that cursors are eventually closed, application code MUST
68-
* fully consume the {@link Result} objects emitted by the {@link #execute()}
69-
* publisher. A {@code Result} is fully consumed by subscribing to it's
68+
* fully consume {@link Result} objects emitted by the {@link #execute()}
69+
* {@code Publisher}. A {@code Result} is fully consumed by subscribing to its
7070
* {@linkplain Result#getRowsUpdated() update count} or
7171
* {@linkplain Result#map(BiFunction) row data} {@code Publisher} and then
72-
* requesting items until the publisher emits {@code onComplete/onError} or
73-
* cancelling the subscription.
72+
* requesting items until the {@code Publisher} emits {@code onComplete/onError}
73+
* or its {@code Subscription} is cancelled.
7474
* </p><p>
7575
* To improve performance when the same SQL statement is executed multiple
7676
* times, implementations of {@link ReactiveJdbcAdapter} are expected to
@@ -995,4 +995,4 @@ private interface DeferredBind {
995995
Mono<Void> discard();
996996
}
997997

998-
}
998+
}

src/test/java/oracle/r2dbc/impl/OracleResultImplTest.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
import reactor.core.publisher.Mono;
3131
import reactor.core.publisher.Signal;
3232

33+
import java.util.Iterator;
3334
import java.util.List;
35+
import java.util.Queue;
36+
import java.util.concurrent.ConcurrentLinkedQueue;
3437
import java.util.function.BiFunction;
3538

3639
import static java.util.Arrays.asList;
@@ -65,12 +68,14 @@ public void testGetRowsUpdated() {
6568
"CREATE TABLE testGetRowsUpdated (x NUMBER, y NUMBER)"));
6669
try {
6770
// Expect update count of 1 from each INSERT.
68-
List<? extends Result> insertResults =
69-
awaitMany(connection.createBatch()
71+
Iterator<? extends Result> insertResults =
72+
Flux.from(connection.createBatch()
7073
.add("INSERT INTO testGetRowsUpdated (x, y) VALUES (0, 0)")
7174
.add("INSERT INTO testGetRowsUpdated (x, y) VALUES (0, 1)")
72-
.execute());
73-
Result insertResult0 = insertResults.get(0);
75+
.execute())
76+
.toIterable()
77+
.iterator();
78+
Result insertResult0 = insertResults.next();
7479
Publisher<Integer> insertCountPublisher0 =
7580
insertResult0.getRowsUpdated();
7681
awaitOne(1, insertCountPublisher0);
@@ -84,7 +89,7 @@ public void testGetRowsUpdated() {
8489
// Expect update count publisher to support multiple subscribers
8590
awaitOne(1, insertCountPublisher0);
8691

87-
Result insertResult1 = insertResults.get(1);
92+
Result insertResult1 = insertResults.next();
8893
Publisher<Integer> insertCountPublisher1 =
8994
insertResult1.getRowsUpdated();
9095
awaitOne(1, insertCountPublisher1);
@@ -185,12 +190,14 @@ public void testMap() {
185190
"CREATE TABLE testMap (x NUMBER, y NUMBER)"));
186191
try {
187192
// Expect no row data from each INSERT.
188-
List<? extends Result> insertResults =
189-
awaitMany(connection.createBatch()
193+
Iterator<? extends Result> insertResults =
194+
Flux.from(connection.createBatch()
190195
.add("INSERT INTO testMap (x, y) VALUES (0, 0)")
191196
.add("INSERT INTO testMap (x, y) VALUES (0, 1)")
192-
.execute());
193-
Result insertResult0 = insertResults.get(0);
197+
.execute())
198+
.toIterable()
199+
.iterator();
200+
Result insertResult0 = insertResults.next();
194201
Publisher<Object> insertRowPublisher0 =
195202
insertResult0.map((row, metadata) -> row.get(0));
196203
awaitNone(insertRowPublisher0);
@@ -204,7 +211,7 @@ public void testMap() {
204211
// Expect row data publisher to reject multiple subscribers
205212
awaitError(IllegalStateException.class, insertRowPublisher0);
206213

207-
Result insertResult1 = insertResults.get(1);
214+
Result insertResult1 = insertResults.next();
208215
Publisher<Object> insertRowPublisher1 =
209216
insertResult1.map((row, metadata) -> row.get(0));
210217
awaitNone(insertRowPublisher1);

0 commit comments

Comments
 (0)