Skip to content

Commit

Permalink
Fix the connection leak
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Dec 29, 2023
1 parent 55f327e commit d10505a
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,32 @@ void testAllOperations() throws Exception {
var value1 = "value1";
var value2 = "value2";
var value3 = "value3";
var jdbcStateInteractor = new JDBCStateInteractor(getConnection());
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
try (var conn = getConnection()) {
var jdbcStateInteractor = new JDBCStateInteractor(conn);
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();

// Test for creating data.
jdbcStateInteractor.createData(
jobKey,
List.of(COLLECTED_METRICS, SCALING_HISTORY),
Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
assertThat(jdbcStateInteractor.queryData(jobKey))
.isEqualTo(Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
// Test for creating data.
jdbcStateInteractor.createData(
jobKey,
List.of(COLLECTED_METRICS, SCALING_HISTORY),
Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
assertThat(jdbcStateInteractor.queryData(jobKey))
.isEqualTo(Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));

// Test for updating data.
jdbcStateInteractor.updateData(
jobKey,
List.of(COLLECTED_METRICS),
Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
assertThat(jdbcStateInteractor.queryData(jobKey))
.isEqualTo(Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
// Test for updating data.
jdbcStateInteractor.updateData(
jobKey,
List.of(COLLECTED_METRICS),
Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
assertThat(jdbcStateInteractor.queryData(jobKey))
.isEqualTo(Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));

// Test for deleting data.
jdbcStateInteractor.deleteData(jobKey, List.of(COLLECTED_METRICS));
assertThat(jdbcStateInteractor.queryData(jobKey))
.isEqualTo(Map.of(SCALING_HISTORY, value2));
jdbcStateInteractor.deleteData(jobKey, List.of(SCALING_HISTORY));
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
// Test for deleting data.
jdbcStateInteractor.deleteData(jobKey, List.of(COLLECTED_METRICS));
assertThat(jdbcStateInteractor.queryData(jobKey))
.isEqualTo(Map.of(SCALING_HISTORY, value2));
jdbcStateInteractor.deleteData(jobKey, List.of(SCALING_HISTORY));
assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@

import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
Expand All @@ -38,15 +40,24 @@
public abstract class AbstractJDBCStoreITCase implements DatabaseTest {

private static final String DEFAULT_JOB_KEY = "jobKey";
private Connection conn;
private CountableJDBCStateInteractor jdbcStateInteractor;
private JDBCStore jdbcStore;

@BeforeEach
void beforeEach() throws Exception {
this.jdbcStateInteractor = new CountableJDBCStateInteractor(getConnection());
this.conn = getConnection();
this.jdbcStateInteractor = new CountableJDBCStateInteractor(conn);
this.jdbcStore = new JDBCStore(jdbcStateInteractor);
}

@AfterEach
void afterEach() throws SQLException {
if (conn != null) {
conn.close();
}
}

@Test
void testCaching() throws Exception {
var value1 = "value1";
Expand Down Expand Up @@ -162,7 +173,7 @@ void testErrorHandlingDuringFlush() throws Exception {
assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();

// Modify the database directly.
var tmpJdbcInteractor = new JDBCStateInteractor(getConnection());
var tmpJdbcInteractor = new JDBCStateInteractor(conn);
tmpJdbcInteractor.createData(
DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1));
assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1);
Expand All @@ -189,7 +200,7 @@ void testErrorHandlingDuringQuery() throws Exception {
final var expectedException = new RuntimeException("Database isn't stable.");

var exceptionableJdbcStateInteractor =
new CountableJDBCStateInteractor(getConnection()) {
new CountableJDBCStateInteractor(conn) {
private final AtomicBoolean isFirst = new AtomicBoolean(true);

@Override
Expand Down Expand Up @@ -266,7 +277,7 @@ private void assertStateValueForCacheAndDatabase(StateType stateType, String exp

private Optional<String> getValueFromDatabase(String jobKey, StateType stateType)
throws Exception {
var jdbcInteractor = new JDBCStateInteractor(getConnection());
var jdbcInteractor = new JDBCStateInteractor(conn);
return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@

import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Optional;

import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
Expand All @@ -33,16 +36,25 @@
class JobStateViewTest implements DerbyTestBase {

private static final String DEFAULT_JOB_KEY = "jobKey";
private Connection conn;
private CountableJDBCStateInteractor jdbcStateInteractor;
private JobStateView jobStateView;

@BeforeEach
void beforeEach() throws Exception {
this.jdbcStateInteractor = new CountableJDBCStateInteractor(getConnection());
this.conn = getConnection();
this.jdbcStateInteractor = new CountableJDBCStateInteractor(conn);
this.jobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY);
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
}

@AfterEach
void afterEach() throws SQLException {
if (conn != null) {
conn.close();
}
}

@Test
void testAllOperations() throws Exception {
// All state types should be get together to avoid query database frequently.
Expand Down Expand Up @@ -183,7 +195,7 @@ private void assertStateValueForCacheAndDatabase(StateType stateType, String exp
}

private Optional<String> getValueFromDatabase(StateType stateType) throws Exception {
var jdbcInteractor = new JDBCStateInteractor(getConnection());
var jdbcInteractor = new JDBCStateInteractor(conn);
return Optional.ofNullable(jdbcInteractor.queryData(DEFAULT_JOB_KEY).get(stateType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {

var createIndex =
"CREATE UNIQUE INDEX un_job_state_type_inx ON t_flink_autoscaler_state_store (job_key, state_type_id)";
try (var statement = getConnection().createStatement()) {
statement.execute(stateStoreDDL);
statement.execute(createIndex);
try (var conn = getConnection();
var st = conn.createStatement()) {
st.execute(stateStoreDDL);
st.execute(createIndex);
}
}

@Override
public void afterAll(ExtensionContext extensionContext) throws Exception {
Connection connection = getConnection();
for (var tableName : TABLES) {
try (var st = connection.createStatement()) {
try (var conn = getConnection();
var st = conn.createStatement()) {
for (var tableName : TABLES) {
st.executeUpdate(String.format("DROP TABLE %s", tableName));
}
}
Expand All @@ -76,10 +77,10 @@ public void afterAll(ExtensionContext extensionContext) throws Exception {

@Override
public void afterEach(ExtensionContext extensionContext) throws Exception {
Connection conn = getConnection();
// Clean up all data
for (var tableName : TABLES) {
try (var st = conn.createStatement()) {
try (var conn = getConnection();
var st = conn.createStatement()) {
for (var tableName : TABLES) {
st.executeUpdate(String.format("DELETE from %s", tableName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ public void afterAll(ExtensionContext extensionContext) {

@Override
public void afterEach(ExtensionContext extensionContext) throws Exception {
Connection conn = getConnection();
for (var tableName : TABLES) {
try (var st = conn.createStatement()) {
try (var conn = getConnection();
var st = conn.createStatement()) {
for (var tableName : TABLES) {
st.executeUpdate(String.format("DELETE from %s", tableName));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ public void afterAll(ExtensionContext extensionContext) {

@Override
public void afterEach(ExtensionContext extensionContext) throws Exception {
Connection conn = getConnection();
for (var tableName : TABLES) {
try (var st = conn.createStatement()) {
try (var conn = getConnection();
var st = conn.createStatement()) {
for (var tableName : TABLES) {
st.executeUpdate(String.format("DELETE from %s", tableName));
}
}
Expand Down

0 comments on commit d10505a

Please sign in to comment.