From 3c876125e53d0c14e66cb7e7e5e91ada10e7adcb Mon Sep 17 00:00:00 2001 From: Michael McMahon Date: Tue, 3 Dec 2024 16:01:05 -0800 Subject: [PATCH 1/3] Set default LOB prefetch to 1GB --- .../r2dbc/impl/OracleReactiveJdbcAdapter.java | 4 +- .../r2dbc/impl/OracleLargeObjectsTest.java | 136 +++++++++++++++++- .../impl/OracleReactiveJdbcAdapterTest.java | 2 +- 3 files changed, 137 insertions(+), 5 deletions(-) diff --git a/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java b/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java index 40a7b0f..78f62f4 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java +++ b/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java @@ -46,8 +46,6 @@ import java.sql.Blob; import java.sql.Clob; import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; @@ -658,7 +656,7 @@ private static void configureJdbcDefaults(OracleDataSource oracleDataSource) { // value requested as a ByteBuffer or String. setPropertyIfAbsent(oracleDataSource, OracleConnection.CONNECTION_PROPERTY_DEFAULT_LOB_PREFETCH_SIZE, - "1048576"); + "1073741824"); // TODO: Disable the result set cache? This is needed to support the // SERIALIZABLE isolation level, which requires result set caching to be diff --git a/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java b/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java index 5645586..39626b7 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java @@ -24,25 +24,33 @@ import io.r2dbc.spi.Blob; import io.r2dbc.spi.Clob; import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactories; import io.r2dbc.spi.Parameters; import io.r2dbc.spi.Row; import io.r2dbc.spi.Statement; import oracle.r2dbc.OracleR2dbcObject; import oracle.r2dbc.OracleR2dbcTypes; import oracle.r2dbc.test.DatabaseConfig; +import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import static io.r2dbc.spi.ConnectionFactoryOptions.HOST; +import static io.r2dbc.spi.ConnectionFactoryOptions.PORT; +import static java.nio.charset.StandardCharsets.US_ASCII; import static java.util.Arrays.asList; import static oracle.r2dbc.test.DatabaseConfig.connectTimeout; import static oracle.r2dbc.test.DatabaseConfig.sharedConnection; @@ -540,7 +548,7 @@ public Publisher discard() { class TestClob implements Clob { final CharBuffer clobData = - CharBuffer.wrap(new String(data, StandardCharsets.US_ASCII)); + CharBuffer.wrap(new String(data, US_ASCII)); boolean isDiscarded = false; @@ -640,4 +648,130 @@ public void testNullLob() { } } + /** + * Verifies that the default LOB prefetch size is at least large enough to + * fully prefetch 1MB of data. + */ + @Test + public void testDefaultLobPrefetch() throws Exception { + Assumptions.assumeTrue( + null == DatabaseConfig.protocol(), "Test requires TCP protocol"); + + // A local server will monitor network I/O + try (ServerSocketChannel localServer = ServerSocketChannel.open()) { + localServer.configureBlocking(true); + localServer.bind(null); + + class TestThread extends Thread { + + /** Count of bytes exchanged between JDBC and the database */ + int ioCount = 0; + + @Override + public void run() { + InetSocketAddress databaseAddress = + new InetSocketAddress(DatabaseConfig.host(), DatabaseConfig.port()); + + try ( + SocketChannel jdbcChannel = localServer.accept(); + SocketChannel databaseChannel = + SocketChannel.open(databaseAddress)){ + + jdbcChannel.configureBlocking(false); + databaseChannel.configureBlocking(false); + + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(8192); + while (true) { + + byteBuffer.clear(); + if (-1 == jdbcChannel.read(byteBuffer)) + break; + byteBuffer.flip(); + ioCount += byteBuffer.remaining(); + + while (byteBuffer.hasRemaining()) + databaseChannel.write(byteBuffer); + + byteBuffer.clear(); + databaseChannel.read(byteBuffer); + byteBuffer.flip(); + ioCount += byteBuffer.remaining(); + + while (byteBuffer.hasRemaining()) + jdbcChannel.write(byteBuffer); + } + } + catch (Exception exception) { + exception.printStackTrace(); + } + } + } + + TestThread testThread = new TestThread(); + testThread.start(); + + + int lobSize = 99 + (1024 * 1024); // <-- 99 + 1MB + Connection connection = awaitOne(ConnectionFactories.get( + DatabaseConfig.connectionFactoryOptions() + .mutate() + .option(HOST, "localhost") + .option(PORT, + ((InetSocketAddress)localServer.getLocalAddress()).getPort()) + .build()) + .create()); + try { + awaitExecution(connection.createStatement( + "CREATE TABLE testLobPrefetch (" + + " id NUMBER GENERATED ALWAYS AS IDENTITY," + + " blobValue BLOB," + + " clobValue CLOB," + + " PRIMARY KEY(id))")); + + // Insert two rows of LOBs larger than 1MB + byte[] bytes = new byte[lobSize]; + ByteBuffer blobValue = ByteBuffer.wrap(bytes); + Arrays.fill(bytes, (byte)'a'); + String clobValue = new String(bytes, US_ASCII); + awaitUpdate(List.of(1,1), connection.createStatement( + "INSERT INTO testLobPrefetch (blobValue, clobValue)" + + " VALUES (:blobValue, :clobValue)") + .bind("blobValue", blobValue) + .bind("clobValue", clobValue) + .add() + .bind("blobValue", blobValue) + .bind("clobValue", clobValue)); + + // Query two rows of LOBs larger than 1MB + awaitQuery( + List.of( + List.of(blobValue, clobValue), + List.of(blobValue, clobValue)), + row -> { + try { + // Expect no I/O to result from mapping a fully prefetched BLOB or + // CLOB: + int ioCount = testThread.ioCount; + var result = List.of(row.get("blobValue"), row.get("clobValue")); + assertEquals(ioCount, testThread.ioCount); + return result; + } + catch (Exception exception) { + throw new RuntimeException(exception); + } + }, + connection.createStatement( + "SELECT blobValue, clobValue FROM testLobPrefetch ORDER BY id") + .fetchSize(1)); + } + finally { + tryAwaitExecution(connection.createStatement( + "DROP TABLE testLobPrefetch")); + tryAwaitNone(connection.close()); + testThread.join(10_000); + testThread.interrupt(); + } + } + } + } diff --git a/src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java b/src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java index 04ccc76..2696680 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java @@ -784,7 +784,7 @@ private static Properties getJdbcDefaultProperties() throws SQLException { OracleConnection.CONNECTION_PROPERTY_IMPLICIT_STATEMENT_CACHE_SIZE, "25"); defaultProperties.setProperty( OracleConnection.CONNECTION_PROPERTY_DEFAULT_LOB_PREFETCH_SIZE, - "1048576"); + "1073741824"); defaultProperties.setProperty( OracleConnection.CONNECTION_PROPERTY_THIN_NET_USE_ZERO_COPY_IO, "false"); From ca14f5c74148beb9ad5e061fa506110d0ef54912 Mon Sep 17 00:00:00 2001 From: Michael McMahon Date: Tue, 3 Dec 2024 16:53:17 -0800 Subject: [PATCH 2/3] Set decimal 1GB default LOB prefetch --- .../oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java | 12 +++++------- .../oracle/r2dbc/impl/OracleLargeObjectsTest.java | 3 +-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java b/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java index 78f62f4..d517712 100755 --- a/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java +++ b/src/main/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapter.java @@ -648,15 +648,13 @@ private static void configureJdbcDefaults(OracleDataSource oracleDataSource) { setPropertyIfAbsent(oracleDataSource, OracleConnection.CONNECTION_PROPERTY_IMPLICIT_STATEMENT_CACHE_SIZE, "25"); - // Prefetch LOB values by default. The database's maximum supported - // prefetch size, 1GB, is configured by default. This is done so that - // Row.get(...) can map LOB values into ByteBuffer/String without a - // blocking database call. If the entire value is prefetched, then JDBC - // won't need to fetch the remainder from the database when the entire is - // value requested as a ByteBuffer or String. + // Prefetch LOB values by default. This allows Row.get(...) to map most LOB + // values into ByteBuffer/String without a blocking database call to fetch + // the remaining bytes. 1GB is configured by default, as this is close to + // the maximum allowed by the Autonomous Database service. setPropertyIfAbsent(oracleDataSource, OracleConnection.CONNECTION_PROPERTY_DEFAULT_LOB_PREFETCH_SIZE, - "1073741824"); + "1000000000"); // TODO: Disable the result set cache? This is needed to support the // SERIALIZABLE isolation level, which requires result set caching to be diff --git a/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java b/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java index 39626b7..0ee3598 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleLargeObjectsTest.java @@ -729,9 +729,8 @@ public void run() { + " PRIMARY KEY(id))")); // Insert two rows of LOBs larger than 1MB - byte[] bytes = new byte[lobSize]; + byte[] bytes = getBytes(lobSize); ByteBuffer blobValue = ByteBuffer.wrap(bytes); - Arrays.fill(bytes, (byte)'a'); String clobValue = new String(bytes, US_ASCII); awaitUpdate(List.of(1,1), connection.createStatement( "INSERT INTO testLobPrefetch (blobValue, clobValue)" From ce7f5d9336a1eea655bf803f96873caddc0fdf43 Mon Sep 17 00:00:00 2001 From: Michael McMahon Date: Tue, 3 Dec 2024 17:05:56 -0800 Subject: [PATCH 3/3] Set decimal 1GB default LOB prefetch --- .../java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java b/src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java index 2696680..af2627f 100644 --- a/src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java +++ b/src/test/java/oracle/r2dbc/impl/OracleReactiveJdbcAdapterTest.java @@ -784,7 +784,7 @@ private static Properties getJdbcDefaultProperties() throws SQLException { OracleConnection.CONNECTION_PROPERTY_IMPLICIT_STATEMENT_CACHE_SIZE, "25"); defaultProperties.setProperty( OracleConnection.CONNECTION_PROPERTY_DEFAULT_LOB_PREFETCH_SIZE, - "1073741824"); + "1000000000"); defaultProperties.setProperty( OracleConnection.CONNECTION_PROPERTY_THIN_NET_USE_ZERO_COPY_IO, "false");