-
Notifications
You must be signed in to change notification settings - Fork 43
Calling SP with XMLTYPE types #165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
In the current release of Oracle R2DBC, 1.3.0, it is possible to use SQLXML data, but there are some limitations. I've described them in the comments of this code example: import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Parameters;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.sql.SQLXML;
import java.util.List;
public class XMLTest {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory =
ConnectionFactories.get("r2dbc:oracle://test");
Connection connection = Mono.from(connectionFactory.create()).block();
try (
AutoCloseable close = () ->
Mono.from(connection.close()).block();
AutoCloseable drop = () ->
Flux.from(connection.createBatch()
.add("DROP TABLE test")
.add("DROP PROCEDURE test_proc")
.execute())
.blockLast()
){
Batch batch =
connection.createBatch()
.add("CREATE TABLE test (id VARCHAR(1000), value XMLTYPE)")
.add("CREATE OR REPLACE PROCEDURE test_proc("
+ " idIn IN VARCHAR,"
+ " xmlIn IN SYS.XMLTYPE,"
+ " xmlOut OUT SYS.XMLTYPE)"
+ " IS "
+ " BEGIN"
+ " SELECT value INTO xmlOut FROM test WHERE id = idIn;"
+ " UPDATE test SET value = xmlIn WHERE id = idIn;"
+ " END;");
Flux.from(batch.execute()).blockLast();
String id = "A";
// In SQL commands, like INSERT, a String/VARCHAR bind can be implicitly
// converted to SQLXML:
Statement insert = connection.createStatement(
"INSERT INTO test VALUES (?, ?)")
.bind(0, id)
.bind(1, "<name>metalpalo</name>");
Flux.from(insert.execute())
.map(Result::getRowsUpdated)
.flatMap(Flux::from)
.blockLast();
// Oracle R2DBC can convert a SQLXML column into a java.sql.SQLXML object:
Statement query = connection.createStatement(
"SELECT value FROM test");
List<SQLXML> queryResult =
Flux.from(query.execute())
.map(result ->
result.map(row -> row.get(0, SQLXML.class)))
.flatMap(Flux::from)
.collectList()
.block();
System.out.println("Query Result:");
for (SQLXML sqlxml : queryResult)
System.out.println(sqlxml.getString());
// SQLXML OUT Parameters:
// In PL/SQL calls, we need to register the SQL type of OUT parameters,
// but no SQLXML type is declared by io.r2dbc.spi.R2dbcTypes, nor is there
// such a type declared by OracleR2dbcTypes.
// However, it is possible for Oracle R2DBC to infer a SQL type based on
// the Java class it is converted to. We'll call
// "Parameters.out(SQLXML.class)" below, and Oracle R2DBC will infer:
// If SQL type is one that converts to a SQLXML object, then the SQL type
// must be SQLXML.
// This is effectively same as calling:
// callableStatement.registerOutParameter(3, JDBCType.SQLXML)
// SQLXML IN Parameters:
// Above, we saw how SQL commands could implicitly convert a
// String/VARCHAR into SQLXML, but this won't happen for PL/SQL calls. If
// an IN parameter is SQLXML then it MUST be sent as that type.
// With JDBC, we'd send a SQLXML type by calling connection.createXML() to
// create a SQLXML object, and then bind that SQLXML object to a
// PreparedStatement. But with R2DBC, there is no equivalent to
// createSQLXML(), so we have no way tell the bind method that we want
// to send data as SQLXML. To work around this limitation, we'll use the
// "XMLTYPE(?)" expression in our CALL command. This allows us to bind
// VARCHAR data, and then have it converted to SQLXML before being passed
// as an IN parameter.
Statement statement0 = connection.createStatement(
"CALL test_proc(?, XMLTYPE(?), ?)")
.bind(0, id)
.bind(1, "<name>Michael</name>")
.bind(2, Parameters.out(SQLXML.class));
SQLXML xmlOut0 =
Flux.from(statement0.execute())
.map(result ->
result.map(outParameters -> outParameters.get(0, SQLXML.class)))
.flatMap(Mono::from)
.blockLast();
System.out.println("Call Result:");
System.out.println(xmlOut0.getString());
// If we have received a SQLXML object from a previous operation, we can
// use it as an IN parameter, and avoid the "XMLTYPE(?)" workaround seen
// above.
Statement statement1 = connection.createStatement(
"CALL test_proc(?, ?, ?)")
.bind(0, id)
.bind(1, xmlOut0)
.bind(2, Parameters.out(SQLXML.class));
SQLXML xmlOut1 =
Flux.from(statement1.execute())
.map(result ->
result.map(outParameters -> outParameters.get(0, SQLXML.class)))
.flatMap(Mono::from)
.blockLast();
System.out.println("Call Result:");
System.out.println(xmlOut1.getString());
// Binding a SQLXML object works with SQL commands too. This "insert"
// Statement executes: "INSERT INTO test VALUES (?, ?)"
insert.bind(0, "B").bind(1, xmlOut1);
Flux.from(insert.execute())
.map(Result::getRowsUpdated)
.flatMap(Flux::from)
.blockLast();
List<SQLXML> sqlxmlList =
Flux.from(query.execute())
.map(result ->
result.map(row -> row.get(0, SQLXML.class)))
.flatMap(Flux::from)
.collectList()
.block();
System.out.println("Query Result:");
for (SQLXML sqlxml : sqlxmlList)
System.out.println(sqlxml.getString());
}
}
} @metalpalo : Does this seem like a workable solution for you? For future reference, I'll note down some ideas about to improve SQLXML support:
|
@Michael-A-McMahon thank you very much I will try. maybe another question regarding our problem on jdbc:
now we want to know if r2dbc can give us some benefit for such situation:
Or does exists some another workable solution for this situation? thanks |
You would effectively see the same result executing that with R2DBC. The only thing R2DBC would do is reduce the number of threads. With JDBC you need 10 threads to execute 10 calls concurrently. With R2DBC you can execute those same 10 calls, concurrently, without blocking any thread. Where it gets interesting is pipelining. With pipelining you can have just one connection (JDBC or R2DBC) and you could execute many calls in a pipeline, 10 calls, or much more than that. Pipelining provides some degree of concurrency, in that the network transfers happen concurrently. However, pipelining does not mean the database is going to execute to execute calls in parallel. The database still executes calls one-by-one. Still, you are likely to see a significant reduction in latency by pipelining the calls versus executing them one-by-one with java.sql.CallableStatement.execute(). Because the database does not execute pipelined calls in parallel, you can think about the number of CPU cores on your database host, and size a connection pool to match it. This way, each connection acts like a thread of execution on the database, and it doesn't make sense to more threads than cores for compute-bound tasks. So, if your DB has 4 cores, then you can size the pool at 4 connections. Then, you can split up the workload of calls across those 4 connections, and have each connection pipeline some portion of the calls. Say you have 100 calls to execute, and 4 connections, you could have each connection execute a pipeline of 25 calls. I would expect 4 connections executing 100 pipelined calls to be done in about the same time it takes for 100 connections to complete 100 calls where each connection executes 1 call. Usually, you would size the connection pool to be greater than the CPU core count of your database. This is because non-pipelined calls leave the database idle while it waits for the next call. So we add more connections because the DB would otherwise be idle. With pipelining, the DB is immediately receiving the next call, so there is no idle time. R2DBC is great in that it standardized an interface for asynchronous database access. This lets you write code against the R2DBC interface, and that code can work with any database that has an R2DBC driver. However, in your case, you could consider using the Reactive Extensions already built into Oracle JDBC. This is a proprietary API that won't work with any other database. But it might be sensible to do this because your code already seems somewhat Oracle specific, and because Oracle R2DBC is lacking in its support for SQLXML. Oracle R2DBC is built on top of the Reactive Extensions, so you're ultimately executing the same code either way, it's just a matter of which API suits your needs better. To pipeline calls with the Reactive Extensions: import oracle.jdbc.internal.OracleCallableStatement;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.JDBCType;
import java.sql.SQLException;
import java.sql.SQLXML;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.Future;
public class XMLPipline {
static {
// In case we're running on Mac OS, or an OS where out-of-band data doesn't
// work in Java (See README of oracle-r2dbc on GitHub):
System.setProperty("oracle.jdbc.disablePipeline", "false");
}
public static void main(String[] args) throws Exception {
try (
Connection connection =
DriverManager.getConnection("jdbc:oracle:thin:@test");
Statement statement = connection.createStatement();
AutoCloseable drop = () -> {
statement.addBatch("DROP TABLE test");
statement.addBatch("DROP PROCEDURE test_proc");
statement.executeBatch();
}
){
statement.addBatch(
"CREATE TABLE test (id VARCHAR(1000), value XMLTYPE)");
statement.addBatch(
"CREATE OR REPLACE PROCEDURE test_proc("
+ " idIn IN VARCHAR,"
+ " xmlIn IN SYS.XMLTYPE,"
+ " xmlOut OUT SYS.XMLTYPE)"
+ " IS "
+ " BEGIN"
+ " SELECT value INTO xmlOut FROM test WHERE id = idIn;"
+ " UPDATE test SET value = xmlIn WHERE id = idIn;"
+ " END;");
statement.addBatch(
"INSERT INTO test VALUES ('A', '<name>Michael</name>')");
statement.executeBatch();
List<SQLXML> sqlxmlList = new ArrayList<>();
for (int i = 0; i < 25; i++) {
SQLXML sqlxml = connection.createSQLXML();
sqlxml.setString("<name>" + i + "</name>");
sqlxmlList.add(sqlxml);
}
List<CallableStatement> callableStatements = new ArrayList<>();
try (
AutoCloseable closeStatements = () -> closeAll(callableStatements);
){
// Setup CallableStatements, one for each SQLXML
for (SQLXML sqlxml : sqlxmlList) {
CallableStatement callableStatement =
connection.prepareCall("{call test_proc(?,?,?)}");
callableStatement.setString("idIn", "A");
callableStatement.setObject("xmlIn", sqlxml);
callableStatement.registerOutParameter("xmlOut", JDBCType.SQLXML);
callableStatements.add(callableStatement);
}
List<Future<Boolean>> futures = new ArrayList<>();
for (CallableStatement callableStatement : callableStatements) {
// Unwrap to access the Reactive Extensions API of the
// OracleCallableStatement statement. The unwrap call is usually
// needed, as opposed to a simple cast, because a connection pool will
// wrap the OracleCallableStatement in its own CallableStatement
// implementation.
OracleCallableStatement oracleCallableStatement =
callableStatement.unwrap(OracleCallableStatement.class);
Flow.Publisher<Boolean> publisher =
oracleCallableStatement.executeAsyncOracle();
Future<Boolean> future = toFuture(publisher);
futures.add(future);
}
// Notice what the loop above has done: It has pipelined the execution
// of many CallableStatements by calling executeAsyncOracle() on each
// one. Now the asynchronous result each statement will be captured into
// each Future object, and the next loop is going to consume those results.
for (int i = 0; i < futures.size(); i++) {
try {
Future future = futures.get(i);
future.get();
}
catch (Exception exception) {
// Use more disciplined error handling if needed. The point this
// example wants to make is that even if one SQL operation fails,
// the remaining operations are still going to be executed, so we
// don't want to throw ourselves out of here just yet. We want to
// continue looping through the futures to get all the results.
System.err.println(exception.getMessage());
continue;
}
// Now that Future.get() has returned, we can consume the result from
// the CallableStatement.
CallableStatement callableStatement = callableStatements.get(i);
SQLXML sqlxml = callableStatement.getSQLXML("xmlOut");
System.out.println("Call returned: " + sqlxml.getString());
}
}
}
}
/**
* Closes all AutoCloseables (like a CallableStatement), suppressing any
* exceptions that may occur. The idea is that we don't want to this method
* to throw before it has called close() on every AutoCloseable.
*/
static void closeAll(Iterable<? extends AutoCloseable> autoCloseables)
throws Exception {
Exception exception = null;
for (AutoCloseable autoCloseable : autoCloseables) {
try {
autoCloseable.close();
}
catch (Exception closeException) {
if (exception == null)
exception = closeException;
else
exception.addSuppressed(closeException);
}
}
if (exception != null)
throw exception;
}
/**
* Returns a Future that completes with the first signal from a Publisher. The
* Future completes with null if onComplete is emitted first, or
* completes with an exception if onError is emitted first, or completes with
* a non-null value if onNext is emitted first.
* <i>
* This is just a simple way to consume an asynchronous result from a reactive
* streams publisher, without having to learn the complex API of a
* full-fledged reactive library like Project Reactor or RxJava. However, if
* you need to do more than just block for a signal, those libraries are
* highly recommended; The learning curve is well worth it!
* </i>
*/
static <T> Future<T> toFuture(Flow.Publisher<T> publisher) {
CompletableFuture<T> future = new CompletableFuture<>();
publisher.subscribe(new Flow.Subscriber<T>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
(this.subscription = subscription).request(1);
}
@Override
public void onNext(T item) {
subscription.cancel();
future.complete(item);
}
@Override
public void onError(Throwable throwable) {
future.completeExceptionally(throwable);
}
@Override
public void onComplete() {
future.complete(null);
}
});
return future;
}
} |
Hi guys, I would like to migrate from regular jdbc call stored procedure to r2dbc, problem is that original code use XmlType as input and output parameter is possible to rewrite it to work it? my original code is:
thanks
The text was updated successfully, but these errors were encountered: