Skip to content

Commit e7189e4

Browse files
Merge pull request #24 from oracle/22-poolable-connections
22 poolable connections
2 parents 3f9a3f9 + 7fb7c87 commit e7189e4

File tree

8 files changed

+216
-62
lines changed

8 files changed

+216
-62
lines changed

src/main/java/oracle/r2dbc/impl/OracleConnectionFactoryImpl.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,21 @@
5353
* <li>{@link ConnectionFactoryOptions#HOST}</li>
5454
* </ul>
5555
* <h3 id="supported_options">Supported Options</h3><p>
56-
* This implementation supports the following options for connection creation:
56+
* This implementation supports the following well known options for connection
57+
* creation:
5758
* </p><ul>
5859
* <li>{@link ConnectionFactoryOptions#PORT}</li>
5960
* <li>{@link ConnectionFactoryOptions#DATABASE}</li>
6061
* <li>{@link ConnectionFactoryOptions#USER}</li>
6162
* <li>{@link ConnectionFactoryOptions#PASSWORD}</li>
63+
* <li>{@link ConnectionFactoryOptions#CONNECT_TIMEOUT}</li>
64+
* <li>{@link ConnectionFactoryOptions#SSL}</li>
6265
* </ul>
66+
* <h3 id="extended_options">Supported Options</h3><p>
67+
* This implementation supports extended options having the name of a
68+
* subset of Oracle JDBC connection properties. The list of supported
69+
* connection properties is specified by {@link OracleReactiveJdbcAdapter}.
70+
* </p>
6371
*
6472
* @author harayuanwang, michael-a-mcmahon
6573
* @since 0.1.0
@@ -116,7 +124,6 @@ final class OracleConnectionFactoryImpl implements ConnectionFactory {
116124
*/
117125
OracleConnectionFactoryImpl(ConnectionFactoryOptions options) {
118126
OracleR2dbcExceptions.requireNonNull(options, "options is null.");
119-
120127
adapter = ReactiveJdbcAdapter.getOracleAdapter();
121128
dataSource = adapter.createDataSource(options);
122129
}
@@ -138,14 +145,15 @@ final class OracleConnectionFactoryImpl implements ConnectionFactory {
138145
* the returned publisher, so that the database can reclaim the resources
139146
* allocated for that connection.
140147
* </p><p>
141-
* The returned publisher does not support multiple subscribers. After a
142-
* subscriber has subscribed, the returned publisher emits {@code onError}
143-
* with an {@link IllegalStateException} to all subsequent subscribers.
148+
* The returned publisher supports multiple subscribers. One {@code
149+
* Connection} is emitted to each subscriber that subscribes and signals
150+
* demand.
144151
* </p>
145152
*/
146153
@Override
147154
public Publisher<Connection> create() {
148-
return Mono.fromDirect(adapter.publishConnection(dataSource))
155+
return Mono.defer(() ->
156+
Mono.fromDirect(adapter.publishConnection(dataSource)))
149157
.map(conn -> new OracleConnectionImpl(adapter, conn));
150158
}
151159

src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,10 @@ private static void configureExtendedOptions(
511511

512512
// Apply any extended options as connection properties
513513
for (Option<CharSequence> option : SUPPORTED_CONNECTION_PROPERTY_OPTIONS) {
514-
CharSequence value = options.getValue(option);
514+
// Using Object as the value type allows options to be set as types like
515+
// Boolean or Integer. These types make sense for numeric or boolean
516+
// connection property values, such as statement cache size, or enable x.
517+
Object value = options.getValue(option);
515518
if (value != null) {
516519
runOrHandleSQLException(() ->
517520
oracleDataSource.setConnectionProperty(

src/main/java/oracle/r2dbc/impl/OracleResultImpl.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import static oracle.r2dbc.impl.OracleR2dbcExceptions.getOrHandleSQLException;
3737
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
38+
import static oracle.r2dbc.impl.OracleR2dbcExceptions.runOrHandleSQLException;
3839

3940
/**
4041
* <p>
@@ -94,9 +95,13 @@ <T> Publisher<T> publishRows(
9495
}
9596

9697
/**
98+
* <p>
9799
* Creates a {@code Result} that either publishes a {@code ResultSet} of
98100
* row data from a query, or publishes an update count as an empty stream.
99-
*
101+
* </p><p>
102+
* The {@link java.sql.Statement} that created the {@code resultSet} is closed
103+
* when the returned result is fully consumed.
104+
* </p>
100105
* @param adapter Adapts {@code ResultSet} API calls into reactive streams.
101106
* Not null.
102107
* @param resultSet Row data to publish
@@ -109,19 +114,29 @@ public static Result createQueryResult(
109114

110115
@Override
111116
Publisher<Integer> publishUpdateCount() {
117+
runOrHandleSQLException(() ->
118+
resultSet.getStatement().close());
112119
return Mono.empty();
113120
}
114121

115122
@Override
116123
<T> Publisher<T> publishRows(
117124
BiFunction<Row, RowMetadata, ? extends T> mappingFunction) {
118125

126+
// Obtain a reference to the statement before the ResultSet is
127+
// logically closed by its row publisher. The statement is closed when
128+
// the publisher terminates.
129+
java.sql.Statement jdbcStatement =
130+
getOrHandleSQLException(resultSet::getStatement);
131+
119132
OracleRowMetadataImpl metadata = new OracleRowMetadataImpl(
120133
getOrHandleSQLException(resultSet::getMetaData));
121134

122-
return Flux.from(adapter.publishRows(resultSet, jdbcRow ->
135+
return Flux.<T>from(adapter.publishRows(resultSet, jdbcRow ->
123136
mappingFunction.apply(
124-
new OracleRowImpl(jdbcRow, metadata, adapter), metadata)));
137+
new OracleRowImpl(jdbcRow, metadata, adapter), metadata)))
138+
.doFinally(signalType ->
139+
runOrHandleSQLException(jdbcStatement::close));
125140
}
126141
};
127142
}
@@ -132,10 +147,15 @@ <T> Publisher<T> publishRows(
132147
* {@link PreparedStatement#getGeneratedKeys()} {@code ResultSet}, or
133148
* publishes an {@code updateCount}.
134149
* </p><p>
150+
* The {@link java.sql.Statement} that created the {@code ResultSet} is closed
151+
* when the {@code Publisher} returned by this method emits a
152+
* {@code Result}.
153+
* </p><p>
135154
* For compliance with R2DBC standards, a {@code Row} of generated column
136155
* values will remain valid after the {@code Connection} that created them
137156
* is closed. This behavior is verified by version 0.8.2 of
138-
* {@code io.r2dbc.spi.test.TestKit#returnGeneratedValues()}.
157+
* {@code io.r2dbc.spi.test.TestKit#returnGeneratedValues()}. The {@code Rows}
158+
* of generated value
139159
* </p>
140160
*
141161
* @implNote
@@ -171,16 +191,25 @@ public static Publisher<Result> createGeneratedValuesResult(
171191

172192
// Avoid invoking ResultSet.getMetaData() on an empty ResultSet, it may
173193
// throw a SQLException
174-
if (! getOrHandleSQLException(values::isBeforeFirst))
194+
if (! getOrHandleSQLException(values::isBeforeFirst)) {
195+
runOrHandleSQLException(() -> values.getStatement().close());
175196
return Mono.just(createUpdateCountResult(updateCount));
197+
}
176198

177199
// Obtain metadata before the ResultSet is closed by publishRows(...)
178200
OracleRowMetadataImpl metadata =
179201
new OracleRowMetadataImpl(getOrHandleSQLException(values::getMetaData));
180202

203+
// Obtain a reference to the statement before the ResultSet is
204+
// logically closed by its row publisher. The statement is closed when
205+
// the publisher terminates.
206+
java.sql.Statement jdbcStatement =
207+
getOrHandleSQLException(values::getStatement);
208+
181209
return Flux.from(adapter.publishRows(
182210
values, ReactiveJdbcAdapter.JdbcRow::copy))
183211
.collectList()
212+
.doFinally(signalType -> runOrHandleSQLException(jdbcStatement::close))
184213
.map(cachedRows -> new OracleResultImpl() {
185214

186215
@Override

src/main/java/oracle/r2dbc/impl/OracleStatementImpl.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Queue;
4242
import java.util.concurrent.atomic.AtomicBoolean;
4343
import java.util.function.BiFunction;
44+
import java.util.function.Function;
4445

4546
import static oracle.r2dbc.impl.OracleR2dbcExceptions.getOrHandleSQLException;
4647
import static oracle.r2dbc.impl.OracleR2dbcExceptions.requireNonNull;
@@ -546,8 +547,8 @@ private Publisher<Result> createResultPublisher(
546547

547548
/**
548549
* <p>
549-
* Executes a JDBC statement with a single, non-batched, set of parameters.
550-
* If the execution returns a {@link java.sql.ResultSet} then the
550+
* Executes a JDBC statement with a single, non-batched, set of {@code
551+
* bindValues}. If the execution results in a {@link java.sql.ResultSet} then the
551552
* {@code jdbcStatement} is closed after the {@code ResultSet} is fully
552553
* consumed by {@link Result#map(BiFunction)}. Otherwise, if the execution
553554
* only produces an update count, then the {@code jdbcStatement} is closed
@@ -569,7 +570,6 @@ private Publisher<Result> executeSingle(
569570
return Mono.from(adapter.publishSQLExecution(jdbcStatement))
570571
.map(isResultSet -> {
571572
if (isResultSet) {
572-
runOrHandleSQLException(jdbcStatement::closeOnCompletion);
573573
return OracleResultImpl.createQueryResult(
574574
adapter, getOrHandleSQLException(jdbcStatement::getResultSet));
575575
}
@@ -614,22 +614,27 @@ private Publisher<Result> executeBatch(
614614

615615
// Execute the batch. The execution won't return a ResultSet, so the JDBC
616616
// statement is closed immediately after the execution completes.
617-
runOrHandleSQLException(jdbcStatement::closeOnCompletion);
618617
return Flux.from(adapter.publishBatchUpdate(jdbcStatement))
619618
.map(updateCount ->
620-
OracleResultImpl.createUpdateCountResult(Math.toIntExact(updateCount)));
619+
OracleResultImpl.createUpdateCountResult(Math.toIntExact(updateCount)))
620+
.doFinally(signalType -> runOrHandleSQLException(jdbcStatement::close));
621621
}
622622

623623
/**
624624
* <p>
625-
* Executes a key generating {@code jdbcStatement} for each set
626-
* of bind values in a {@code batch}. The {@code jdbcStatement} is closed
627-
* after all executions have completed. If any execution results in an
628-
* error, subsequent executions are skipped.
625+
* Executes a JDBC statement with a single, non-batched, set of {@code
626+
* bindValues}. If the execution results in a {@link java.sql.ResultSet} then
627+
* the {@code jdbcStatement} is closed after the {@code ResultSet} is fully
628+
* consumed by {@link Result#map(BiFunction)}. If the execution results in
629+
* an update count and/or values generated by DML, then the {@code
630+
* jdbcStatement} is closed after the returned {@code Publisher} emits a
631+
* {@code Result} with all generated values cached, such that
632+
* {@link Result#map(BiFunction)} may be called after the database connection
633+
* has been closed.
629634
* </p><p>
630-
* The returned {@code Publisher} emits a {@code Result} with an update
631-
* count and generated values for each execution of the {@code
632-
* jdbcStatement}.
635+
* The returned publisher initiates SQL execution <i>the first time</i> a
636+
* subscriber subscribes, before the subscriber emits a {@code request}
637+
* signal.
633638
* </p>
634639
*
635640
* @implNote The 21c Oracle JDBC Driver does not support batch DML when
@@ -638,25 +643,23 @@ private Publisher<Result> executeBatch(
638643
*
639644
* @param jdbcStatement A JDBC statement
640645
* @param bindValues A set of bind values
641-
* @return A publisher that emits the {@code Results} of executing the
642-
* JDBC statement for each set of bind values in the {@code batch}
646+
* @return A publisher that emits the {@code Result} of executing the
647+
* JDBC statement.
643648
*/
644649
private Publisher<Result> executeGeneratingValues(
645650
PreparedStatement jdbcStatement, Object[] bindValues) {
646651

647652
setJdbcBindValues(bindValues, jdbcStatement);
648653
return Mono.from(adapter.publishSQLExecution(jdbcStatement))
649-
.flatMap(isResultSet -> {
650-
runOrHandleSQLException(jdbcStatement::closeOnCompletion);
651-
return isResultSet
654+
.flatMap(isResultSet ->
655+
isResultSet
652656
? Mono.just(OracleResultImpl.createQueryResult(
653657
adapter,
654658
getOrHandleSQLException(jdbcStatement::getResultSet)))
655659
: Mono.from(OracleResultImpl.createGeneratedValuesResult(
656660
adapter,
657661
getOrHandleSQLException(jdbcStatement::getUpdateCount),
658-
getOrHandleSQLException(jdbcStatement::getGeneratedKeys)));
659-
});
662+
getOrHandleSQLException(jdbcStatement::getGeneratedKeys))));
660663
}
661664

662665
/**

src/test/java/oracle/r2dbc/DatabaseConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.r2dbc.spi.ConnectionFactories;
2626
import io.r2dbc.spi.ConnectionFactory;
2727
import io.r2dbc.spi.ConnectionFactoryOptions;
28+
import io.r2dbc.spi.Option;
29+
import oracle.jdbc.OracleConnection;
2830
import oracle.r2dbc.util.SharedConnectionFactory;
2931
import org.reactivestreams.Publisher;
3032

@@ -216,6 +218,11 @@ public static int databaseVersion() {
216218
.option(ConnectionFactoryOptions.DATABASE, SERVICE_NAME)
217219
.option(ConnectionFactoryOptions.USER, USER)
218220
.option(ConnectionFactoryOptions.PASSWORD, PASSWORD)
221+
// Disable statement caching in order to verify cursor closing;
222+
// Cached statements don't close their cursors
223+
.option(Option.valueOf(
224+
OracleConnection.CONNECTION_PROPERTY_IMPLICIT_STATEMENT_CACHE_SIZE),
225+
0)
219226
.build());
220227

221228
SHARED_CONNECTION_FACTORY = new SharedConnectionFactory(

src/test/java/oracle/r2dbc/impl/OracleConnectionFactoryImplTest.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333

3434
import java.sql.SQLException;
3535
import java.time.Duration;
36+
import java.util.HashSet;
37+
import java.util.Set;
3638
import java.util.concurrent.atomic.AtomicInteger;
3739

3840
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -115,21 +117,24 @@ public void testCreate() {
115117
.build())
116118
.create();
117119

118-
// Expect publisher to emit 1 connection
119-
AtomicInteger counter = new AtomicInteger(0);
120+
// Expect publisher to emit one connection to each subscriber
121+
Set<Connection> connections = new HashSet<>();
120122
Flux.from(connectionPublisher)
121-
.doOnNext(connection -> counter.incrementAndGet())
123+
.doOnNext(connections::add)
122124
.doOnNext(connection -> Mono.from(connection.close()).subscribe())
123125
.blockLast(DatabaseConfig.connectTimeout());
124-
assertEquals(1, counter.get());
126+
assertEquals(1, connections.size());
127+
Flux.from(connectionPublisher)
128+
.doOnNext(connections::add)
129+
.doOnNext(connection -> Mono.from(connection.close()).subscribe())
130+
.blockLast(DatabaseConfig.connectTimeout());
131+
assertEquals(2, connections.size());
132+
Flux.from(connectionPublisher)
133+
.doOnNext(connections::add)
134+
.doOnNext(connection -> Mono.from(connection.close()).subscribe())
135+
.blockLast(DatabaseConfig.connectTimeout());
136+
assertEquals(3, connections.size());
125137

126-
// Expect publisher to reject multiple subscribers
127-
try {
128-
Mono.from(connectionPublisher)
129-
.block(Duration.ofSeconds(1));
130-
fail("Connection publisher did not reject multiple subscribers");
131-
}
132-
catch (IllegalStateException expected) { }
133138
}
134139

135140
/**

src/test/java/oracle/r2dbc/impl/OracleConnectionImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ public void testSetAutoCommit() {
559559
selectInSessionB);
560560
}
561561
finally {
562-
awaitNone(sessionA.close());
562+
awaitNone(sessionB.close());
563563
}
564564
}
565565
finally {

0 commit comments

Comments
 (0)