Skip to content

Commit

Permalink
Don't release the connection until LOB streams are read (mulesoft#372)
Browse files Browse the repository at this point in the history
* When Select() page results are processed async, then close connection on flow end to be able to read LOB streams

* Added mUnit test for LOB streams kept open

* Ignore mUnits without a proper started DB instance
  • Loading branch information
stefangretcan authored Mar 18, 2021
1 parent 13c2fe7 commit 0e0f82c
Show file tree
Hide file tree
Showing 9 changed files with 237 additions and 4 deletions.
59 changes: 59 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<maven-replacer-plugin.version>1.5.3</maven-replacer-plugin.version>

<formatterConfigPath>formatter.xml</formatterConfigPath>

<munit.extensions.maven.plugin.version>1.0.0</munit.extensions.maven.plugin.version>
<munit.version>2.2.4</munit.version>
<mtf-tools.version>1.0.0</mtf-tools.version>
<skipTests>false</skipTests>
</properties>

<dependencies>
Expand Down Expand Up @@ -161,13 +166,67 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>${skipTests}</skipTests>
<systemPropertyVariables>
<oracle.db.port>${oracle.db.port}</oracle.db.port>
<mysql.db.port>${mysql.db.port}</mysql.db.port>
<mssql.db.port>${mssql.db.port}</mssql.db.port>
</systemPropertyVariables>
</configuration>
</plugin>
<plugin>
<groupId>com.mulesoft.munit</groupId>
<artifactId>munit-extensions-maven-plugin</artifactId>
<version>${munit.extensions.maven.plugin.version}</version>
<configuration>
<munitFailIfNoTests>false</munitFailIfNoTests>
<runtimeProduct>MULE_EE</runtimeProduct>
<systemPropertyVariables>
<oracle.db.port>${oracle.db.port}</oracle.db.port>
</systemPropertyVariables>
<munitTest>.*TestCase.xml</munitTest>
<runtimeConfiguration>
<discoverRuntimes>
<product>EE</product>
</discoverRuntimes>
</runtimeConfiguration>
<sharedLibraries>
<sharedLibrary>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
</sharedLibrary>
</sharedLibraries>
</configuration>
<executions>
<execution>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
<dependencies>
<!-- MUnit Dependencies -->
<dependency>
<groupId>com.mulesoft.munit</groupId>
<artifactId>munit-runner</artifactId>
<version>${munit.version}</version>
<classifier>mule-plugin</classifier>
</dependency>
<dependency>
<groupId>com.mulesoft.munit</groupId>
<artifactId>munit-tools</artifactId>
<version>${munit.version}</version>
<classifier>mule-plugin</classifier>
</dependency>
<dependency>
<groupId>com.mulesoft.munit</groupId>
<artifactId>mtf-tools</artifactId>
<version>${mtf-tools.version}</version>
<classifier>mule-plugin</classifier>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ public interface DbConnection extends TransactionalConnection {
*/
boolean supportsContentStreaming();

/**
* Specifies that in the returned results there are streams that still need to be consumed.
* @param value A value indicating if there are open LOB streams in the returned results.
*/
void setActiveLobStreams(boolean value);

/**
* Finds out if there are open LOB streams in the results.
* @return A boolean indicating if there are open LOB streams that still need to be read.
*/
boolean hasActiveLobStreams();

/**
* Creates an {@link Array} of the given {@code typeName} with the given {@code values}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
Expand All @@ -50,6 +51,7 @@ public class DefaultDbConnection implements DbConnection {
private final Connection jdbcConnection;
private final List<DbType> customDataTypes;
private AtomicInteger streamsCount = new AtomicInteger(0);
private AtomicBoolean hasLobStreams = new AtomicBoolean(false);
private boolean isTransactionActive = false;

private static final int DATA_TYPE_INDEX = 5;
Expand Down Expand Up @@ -147,7 +149,7 @@ public void rollback() throws TransactionException {
*/
@Override
public void release() {
if (isStreaming()) {
if (isStreaming() || hasActiveLobStreams()) {
return;
}
try {
Expand Down Expand Up @@ -177,6 +179,7 @@ private void abortStreaming() {
}

/**
*
* {@inheritDoc}
*/
@Override
Expand All @@ -192,6 +195,16 @@ public boolean supportsContentStreaming() {
return true;
}

@Override
public void setActiveLobStreams(boolean value) {
this.hasLobStreams.set(value);
}

@Override
public boolean hasActiveLobStreams() {
return this.hasLobStreams.get();
}

public PreparedStatement prepareStatement(String sql) throws SQLException {
return jdbcConnection.prepareStatement(sql);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ public void endStreaming() {
connection.endStreaming();
}

@Override
public void setActiveLobStreams(boolean value) {
this.connection.setActiveLobStreams(value);
}

@Override
public boolean hasActiveLobStreams() {
return this.connection.hasActiveLobStreams();
}

@Override
public boolean isTransactionActive() {
Transaction transaction = TransactionCoordination.getInstance().getTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,13 @@ public void close(DbConnection connection) throws MuleException {
private ResultSetIterator getIterator(DbConnection connection, DbConnector connector) {
if (initialised.compareAndSet(false, true)) {
resultSetCloser = new StatementStreamingResultSetCloser(connection);
flowListener.onError(new ResultSetCloserExceptionConsumer(resultSetCloser, query.getSql()));
flowListener.onError(new ResultSetCloserExceptionConsumer(resultSetCloser, query.getSql(), connection));
flowListener.onComplete(() -> {
if (connection.hasActiveLobStreams()) {
connection.setActiveLobStreams(false);
connection.release();
}
});
final Query resolvedQuery = resolveQuery(query, connector, connection, streamingHelper, SELECT, STORE_PROCEDURE_CALL);

QueryStatementFactory statementFactory = getStatementFactory(query);
Expand Down Expand Up @@ -380,16 +386,19 @@ private static class ResultSetCloserExceptionConsumer implements Consumer<Except

private final ResultSetCloserRunnable resultSetCloserRunnable;
private final String sql;
private final DbConnection connection;

private ResultSetCloserExceptionConsumer(StatementStreamingResultSetCloser resultSetCloser, String sql) {
private ResultSetCloserExceptionConsumer(StatementStreamingResultSetCloser resultSetCloser, String sql,
DbConnection connection) {
this.resultSetCloserRunnable = new ResultSetCloserRunnable(resultSetCloser);
this.sql = sql;
this.connection = connection;
}


@Override
public void accept(Exception e) {
try {
connection.setActiveLobStreams(false);
resultSetCloserRunnable.run();
} catch (Exception t) {
if (LOGGER.isWarnEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public Map<String, Object> process(ResultSet resultSet) throws SQLException {
ResultSetMetaData metaData = resultSet.getMetaData();
int cols = metaData.getColumnCount();


for (int i = 1; i <= cols; i++) {
String column = metaData.getColumnLabel(i);
Object value = resultSet.getObject(i);
Expand Down Expand Up @@ -111,11 +112,13 @@ private Object[] handleStructType(Struct value) throws SQLException {
}

protected TypedValue<Object> handleSqlXmlType(SQLXML value) throws SQLException {
dbConnection.setActiveLobStreams(true);
return new TypedValue<>(value.getBinaryStream(), DataType.builder().type(InputStream.class).mediaType(XML).build());
}

protected TypedValue<Object> handleBlobType(Blob value) throws SQLException {
if (dbConnection != null && dbConnection.supportsContentStreaming()) {
dbConnection.setActiveLobStreams(true);
return new TypedValue<>(value.getBinaryStream(), DataType.builder().type(InputStream.class).mediaType(BINARY).build());
} else {
return new TypedValue<>(new ByteArrayInputStream(IOUtils.toByteArray(value.getBinaryStream())),
Expand All @@ -126,6 +129,7 @@ protected TypedValue<Object> handleBlobType(Blob value) throws SQLException {
protected TypedValue<Object> handleClobType(Clob value) throws SQLException {
ReaderInputStream inputStream = new ReaderInputStream(value.getCharacterStream(), charset);
if (dbConnection != null && dbConnection.supportsContentStreaming()) {
dbConnection.setActiveLobStreams(true);
return new TypedValue<>(inputStream, DataType.builder().type(InputStream.class)
.mediaType(TEXT)
.charset(charset)
Expand Down
28 changes: 28 additions & 0 deletions src/test/java/org/mule/extension/db/DbMunitUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) MuleSoft, Inc. All rights reserved. http://www.mulesoft.com
* The software in this package is published under the terms of the CPAL v1.0
* license, a copy of which has been included with this distribution in the
* LICENSE.txt file.
*/

package org.mule.extension.db;

import java.util.Properties;

public class DbMunitUtils {

private static final String DEFAULT_PORT = "100";

public static boolean isTestIgnored(String dbName) {
Properties availableProperties = System.getProperties();

return !availableProperties.containsKey(dbName);
}

public static String getDbPort(String dbName) {
Properties availableProperties = System.getProperties();
String port = String.format("%s.db.port", dbName);

return availableProperties.containsKey(port) ? availableProperties.getProperty(port) : DEFAULT_PORT;
}
}
18 changes: 18 additions & 0 deletions src/test/munit/Config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:db="http://www.mulesoft.org/schema/mule/db"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.mulesoft.org/schema/mule/batch"
xmlns:munit="http://www.mulesoft.org/schema/mule/munit" xmlns:munit-tools="http://www.mulesoft.org/schema/mule/munit-tools"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/munit http://www.mulesoft.org/schema/mule/munit/current/mule-munit.xsd
http://www.mulesoft.org/schema/mule/munit-tools http://www.mulesoft.org/schema/mule/munit-tools/current/mule-munit-tools.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
<db:config name="oracleDbConfigWithoutPoolingProfile">
<db:oracle-connection host="0.0.0.0" port="#[java!org::mule::extension::db::DbMunitUtils::getDbPort('oracle')]" instance="ORCLCDB" user="SYSTEM" password="Oradoc_db1">
</db:oracle-connection>
</db:config>
</mule>
80 changes: 80 additions & 0 deletions src/test/munit/SelectTestCase.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:db="http://www.mulesoft.org/schema/mule/db"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.mulesoft.org/schema/mule/batch"
xmlns:munit="http://www.mulesoft.org/schema/mule/munit" xmlns:munit-tools="http://www.mulesoft.org/schema/mule/munit-tools"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/munit http://www.mulesoft.org/schema/mule/munit/current/mule-munit.xsd
http://www.mulesoft.org/schema/mule/munit-tools http://www.mulesoft.org/schema/mule/munit-tools/current/mule-munit-tools.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
<munit:config name="SelectTestCase.xml" />

<munit:before-test name="createATableWithLOBs">
<try>
<db:execute-script config-ref="oracleDbConfigWithoutPoolingProfile">
<db:sql><![CDATA[DROP TABLE LOBTEST]]></db:sql>
</db:execute-script>
<error-handler>
<on-error-continue enableNotifications="true" logException="false">
</on-error-continue>
</error-handler>
</try>
<db:execute-ddl config-ref="oracleDbConfigWithoutPoolingProfile">
<db:sql><![CDATA[CREATE TABLE LOBTEST(COL1 number, COL2 BLOB, COL3 CLOB, COL4 NCLOB)]]></db:sql>
</db:execute-ddl>
<foreach collection="#[1 to 3]">
<db:execute-script config-ref="oracleDbConfigWithoutPoolingProfile">
<db:sql ><![CDATA[INSERT INTO LOBTEST VALUES (1, hextoraw('453d7a34'), 'CLOB value is here', TO_NCLOB('NCLOB value is here'))]]></db:sql>
</db:execute-script>
</foreach>
</munit:before-test>

<!-- This test uses a batch job to process the results of the select operation. The select operation returns results
from a table that has LOB columns. On Oracle, these columns are returned as input streams. This test assures
that if there are open streams not yet consumed, the connection will stay open until the flow ends. In this way,
there won't be any "Connection Closed" exception when trying to read the input streams. -->
<munit:test name="whenStreamsAreConsumedAfterPagingProviderIsClosed_ThenNoExceptionIsThrown" ignore="#[java!org::mule::extension::db::DbMunitUtils::isTestIgnored('oracle')]">
<munit:execution>
<db:select config-ref="oracleDbConfigWithoutPoolingProfile">
<ee:repeatable-file-store-iterable inMemoryObjects="10" />
<db:sql>SELECT * FROM LOBTEST</db:sql>
</db:select>
</munit:execution>

<munit:validation>
<batch:job blockSize="1000" jobName="testJob" maxFailedRecords="0">
<batch:process-records>
<batch:step name="stepName">
<batch:aggregator size="100">
<ee:transform>
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" message="Aggregator data: #[payload]"/>
</batch:aggregator>
</batch:step>
</batch:process-records>
</batch:job>
</munit:validation>
</munit:test>

<munit:after-test name="dropTable">
<try>
<db:execute-script config-ref="oracleDbConfigWithoutPoolingProfile">
<db:sql><![CDATA[DROP TABLE LOBTEST]]></db:sql>
</db:execute-script>
<error-handler>
<on-error-continue enableNotifications="true" logException="false">
</on-error-continue>
</error-handler>
</try>
</munit:after-test>
</mule>

0 comments on commit 0e0f82c

Please sign in to comment.