24
24
import java .sql .Connection ;
25
25
import java .util .LinkedList ;
26
26
import java .util .Queue ;
27
+ import java .util .concurrent .CompletableFuture ;
27
28
import java .util .concurrent .atomic .AtomicBoolean ;
29
+ import java .util .concurrent .atomic .AtomicReference ;
30
+ import java .util .function .BiFunction ;
28
31
29
32
import io .r2dbc .spi .Batch ;
30
33
import io .r2dbc .spi .R2dbcException ;
31
34
import io .r2dbc .spi .Result ;
35
+ import io .r2dbc .spi .Row ;
36
+ import io .r2dbc .spi .RowMetadata ;
32
37
import io .r2dbc .spi .Statement ;
33
38
import org .reactivestreams .Publisher ;
34
39
import reactor .core .publisher .Flux ;
@@ -104,6 +109,18 @@ public Batch add(String sql) {
104
109
* are executed in the order they were added. Calling this method clears all
105
110
* statements that have been added to the current batch.
106
111
* </p><p>
112
+ * A {@code Result} emitted by the returned {@code Publisher} must be
113
+ * <a href="OracleStatementImpl.html#fully-consumed-result">
114
+ * fully-consumed
115
+ * </a>
116
+ * before the next {@code Result} is emitted. This ensures that a command in
117
+ * the batch can not be executed while the {@code Result} of a previous
118
+ * command is consumed concurrently. It is a known limitation of the Oracle
119
+ * R2DBC Driver that concurrent operations on a single {@code Connection}
120
+ * will result in blocked threads. Deferring {@code Statement} execution
121
+ * until full consumption of the previous {@code Statement}'s {@code Result}
122
+ * is necessary in order to avoid blocked threads.
123
+ * </p><p>
107
124
* If the execution of any statement in the sequence results in a failure,
108
125
* then the returned publisher emits {@code onError} with an
109
126
* {@link R2dbcException} that describes the failure, and all subsequent
@@ -126,17 +143,121 @@ public Publisher<? extends Result> execute() {
126
143
statements = new LinkedList <>();
127
144
128
145
AtomicBoolean isSubscribed = new AtomicBoolean (false );
129
- return Flux .defer (() -> {
130
- if (isSubscribed .compareAndSet (false , true )) {
131
- return Flux .fromIterable (currentStatements )
132
- .concatMap (Statement ::execute );
133
- }
134
- else {
135
- return Mono .error (new IllegalStateException (
146
+ return Flux .defer (() -> isSubscribed .compareAndSet (false , true )
147
+ ? executeBatch (currentStatements )
148
+ : Mono .error (new IllegalStateException (
136
149
"Multiple subscribers are not supported by the Oracle R2DBC" +
137
- " Batch.execute() publisher" ));
138
- }
139
- });
150
+ " Batch.execute() publisher" )));
151
+ }
152
+
153
+ /**
154
+ * Executes each {@code Statement} in a {@code Queue} of {@code statements}.
155
+ * A {@code Statement} is not executed until the {@code Result} of any
156
+ * previous {@code Statement} is fully-consumed.
157
+ * @param statements {@code Statement}s to execute. Not null.
158
+ * @return A {@code Publisher} of each {@code Statement}'s {@code Result}.
159
+ * Not null.
160
+ */
161
+ private static Publisher <? extends Result > executeBatch (
162
+ Queue <Statement > statements ) {
163
+
164
+ // Reference a Publisher that terminates when the previous Statement's
165
+ // Result has been consumed.
166
+ AtomicReference <Publisher <Void >> previous =
167
+ new AtomicReference <>(Mono .empty ());
168
+
169
+ return Flux .fromIterable (statements )
170
+ .concatMap (statement -> {
171
+
172
+ // Complete when this statement's result is consumed
173
+ CompletableFuture <Void > next = new CompletableFuture <>();
174
+
175
+ return Flux .from (statement .execute ())
176
+ // Delay execution by delaying Publisher.subscribe(Subscriber) until the
177
+ // previous statement's result is consumed.
178
+ .delaySubscription (
179
+ // Update the reference; This statement is now the "previous"
180
+ // statement.
181
+ previous .getAndSet (Mono .fromCompletionStage (next )))
182
+ // Batch result completes the "next" future when fully consumed.
183
+ .map (result -> new BatchResult (next , result ));
184
+ });
185
+ }
186
+
187
+ /**
188
+ * <p>
189
+ * A {@code Result} that completes a {@link CompletableFuture} when it has
190
+ * been fully consumed. Instances of {@code BatchResult} are used by Oracle
191
+ * R2DBC to ensure that statement execution and row data processing do
192
+ * not occur concurrently; The completion of the future signals that the row
193
+ * data of a result has been fully consumed, and that no more database
194
+ * calls will be initiated to fetch additional rows.
195
+ * </p><p>
196
+ * Instances of {@code BatchResult} delegate invocations of
197
+ * {@link #getRowsUpdated()} and {@link #map(BiFunction)} to a
198
+ * {@code Result} provided on construction; The behavior of {@code Publisher}s
199
+ * returned by these methods is identical to those returned by the delegate
200
+ * {@code Result}.
201
+ * </p>
202
+ */
203
+ private static final class BatchResult implements Result {
204
+
205
+ /** Completed when this {@code BatchResult} is fully consumed */
206
+ final CompletableFuture <Void > consumeFuture ;
207
+
208
+ /** Delegate {@code Result} that provides row data or an update count */
209
+ final Result delegateResult ;
210
+
211
+ /**
212
+ * Constructs a new result that completes a {@code consumeFuture} when the
213
+ * row data or update count of a {@code delegateResult} has been fully
214
+ * consumed.
215
+ * @param consumeFuture Future completed upon consumption
216
+ * @param delegateResult Result of row data or an update count
217
+ */
218
+ BatchResult (CompletableFuture <Void > consumeFuture , Result delegateResult ) {
219
+ this .consumeFuture = consumeFuture ;
220
+ this .delegateResult = delegateResult ;
221
+ }
222
+
223
+ /**
224
+ * {@inheritDoc}
225
+ * <p>
226
+ * Immediately completes the {@link #consumeFuture} and then returns the
227
+ * update count {@code Publisher} of the {@link #delegateResult}. After
228
+ * returning an update count {@code Publisher}, the {@link #delegateResult}
229
+ * can not initiate any more database calls (based on the assumption
230
+ * noted below).
231
+ * </p>
232
+ * @implNote It is assumed that the {@link #delegateResult} will throw
233
+ * {@link IllegalStateException} upon multiple attempts to consume it, and
234
+ * this method does not check for multiple consumptions.
235
+ */
236
+ @ Override
237
+ public Publisher <Integer > getRowsUpdated () {
238
+ consumeFuture .complete (null );
239
+ return Flux .from (delegateResult .getRowsUpdated ());
240
+ }
241
+
242
+ /**
243
+ * {@inheritDoc}
244
+ * <p>
245
+ * Completes the {@link #consumeFuture} after the row data {@code
246
+ * Publisher} of the {@link #delegateResult} emits a terminal signal or
247
+ * has it's {@code Subscription} cancelled. After emitting a terminal
248
+ * signal or having it's {@code Subscription} cancelled, the
249
+ * {@link #delegateResult} can not initiate any more database calls.
250
+ * </p>
251
+ * @implNote It is assumed that the {@link #delegateResult} will throw
252
+ * {@link IllegalStateException} upon multiple attempts to consume it, and
253
+ * this method does not check for multiple consumptions.
254
+ */
255
+ @ Override
256
+ public <T > Publisher <T > map (
257
+ BiFunction <Row , RowMetadata , ? extends T > mappingFunction ) {
258
+ return Flux .<T >from (delegateResult .map (mappingFunction ))
259
+ .doFinally (signalType -> consumeFuture .complete (null ));
260
+ }
140
261
}
141
262
}
142
263
0 commit comments