Skip to content

Commit

Permalink
Refactor the database test package and finish the JobStateViewTest
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Dec 28, 2023
1 parent 75bb0ef commit 2392b78
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.flink.autoscaler.jdbc.state;
package org.apache.flink.autoscaler.jdbc;

import org.apache.flink.annotation.Experimental;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.ScalingTracking;
import org.apache.flink.autoscaler.jdbc.JobKeySerializer;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.utils.AutoScalerSerDeModule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected Optional<String> getSerializedState(String jobKey, StateType stateType
}

protected void removeSerializedState(String jobKey, StateType stateType) {
getJobStateView(jobKey).removeKey(stateType);
getJobStateView(jobKey).remove(stateType);
}

public void flush(String jobKey) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class JobStateView {
* <p>Note: {@link #inLocally} and {@link #inDatabase} are only for understand, we don't use
* them.
*/
@SuppressWarnings("unused")
enum State {

/** State doesn't exist at database, and it's not used so far, so it's not needed. */
Expand Down Expand Up @@ -175,7 +176,7 @@ public void put(StateType stateType, String value) {
updateState(stateType, STATE_TRANSITIONER::putTransition);
}

public void removeKey(StateType stateType) {
public void remove(StateType stateType) {
var oldKey = data.remove(stateType);
if (oldKey == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import static org.assertj.core.api.Assertions.assertThat;

/** The abstract IT case for {@link JDBCStore}. */
abstract class AbstractJDBCStoreITCase implements DatabaseTest {
public abstract class AbstractJDBCStoreITCase implements DatabaseTest {

private static final String DEFAULT_JOB_KEY = "jobKey";

@Test
void testCreateAndGet() throws Exception {
Expand All @@ -36,33 +38,33 @@ void testCreateAndGet() throws Exception {
var jobKey = "aaa";
var expectedValue1 = "value1";

assertCountableJDBCInteractor(countableJDBCInteractor, 0, 0, 0, 0);
countableJDBCInteractor.assertCountableJDBCInteractor(0, 0, 0, 0);
assertThat(jdbcStore.getSerializedState(jobKey, COLLECTED_METRICS)).isEmpty();
assertCountableJDBCInteractor(countableJDBCInteractor, 1, 0, 0, 0);
countableJDBCInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);

// Get from cache, and it shouldn't exist in database.
jdbcStore.putSerializedState(jobKey, COLLECTED_METRICS, expectedValue1);
assertThat(jdbcStore.getSerializedState(jobKey, COLLECTED_METRICS))
.hasValue(expectedValue1);
assertThat(getValueFromDatabase(jobKey, COLLECTED_METRICS)).isEmpty();
assertCountableJDBCInteractor(countableJDBCInteractor, 1, 0, 0, 0);
countableJDBCInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);

// Get from cache after flushing, and it should exist in database.
jdbcStore.flush(jobKey);
assertStateValueForCacheAndDatabase(jdbcStore, jobKey, COLLECTED_METRICS, expectedValue1);
assertCountableJDBCInteractor(countableJDBCInteractor, 1, 0, 0, 1);
countableJDBCInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);

// Get from database for a old JDBC Store.
jdbcStore.removeInfoFromCache(jobKey);
assertCountableJDBCInteractor(countableJDBCInteractor, 1, 0, 0, 1);
countableJDBCInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
assertStateValueForCacheAndDatabase(jdbcStore, jobKey, COLLECTED_METRICS, expectedValue1);
assertCountableJDBCInteractor(countableJDBCInteractor, 2, 0, 0, 1);
countableJDBCInteractor.assertCountableJDBCInteractor(2, 0, 0, 1);

// Get from database for a new JDBC Store.
var newJdbcStore = new JDBCStore(countableJDBCInteractor);
assertStateValueForCacheAndDatabase(
newJdbcStore, jobKey, COLLECTED_METRICS, expectedValue1);
assertCountableJDBCInteractor(countableJDBCInteractor, 3, 0, 0, 1);
countableJDBCInteractor.assertCountableJDBCInteractor(3, 0, 0, 1);
}

@Test
Expand Down Expand Up @@ -92,18 +94,6 @@ void testMultipleJobKeys() throws Exception {
// TODO
}

private void assertCountableJDBCInteractor(
CountableJDBCStateInteractor jdbcInteractor,
long expectedQueryCounter,
long expectedDeleteCounter,
long expectedUpdateCounter,
long expectedCreateCounter) {
assertThat(jdbcInteractor.getQueryCounter()).isEqualTo(expectedQueryCounter);
assertThat(jdbcInteractor.getDeleteCounter()).isEqualTo(expectedDeleteCounter);
assertThat(jdbcInteractor.getUpdateCounter()).isEqualTo(expectedUpdateCounter);
assertThat(jdbcInteractor.getCreateCounter()).isEqualTo(expectedCreateCounter);
}

private void assertStateValueForCacheAndDatabase(
JDBCStore jdbcStore, String jobKey, StateType stateType, String expectedValue)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import static org.assertj.core.api.Assertions.assertThat;

/** Countable {@link JDBCStateInteractor}. */
public class CountableJDBCStateInteractor extends JDBCStateInteractor {

Expand Down Expand Up @@ -66,19 +68,15 @@ public void updateData(
super.updateData(jobKey, updatedStateTypes, data);
}

public long getQueryCounter() {
return queryCounter.get();
}

public long getDeleteCounter() {
return deleteCounter.get();
public void assertCountableJDBCInteractor(
long expectedQueryCounter,
long expectedDeleteCounter,
long expectedUpdateCounter,
long expectedCreateCounter) {
assertThat(queryCounter).hasValue(expectedQueryCounter);
assertThat(deleteCounter).hasValue(expectedDeleteCounter);
assertThat(updateCounter).hasValue(expectedUpdateCounter);
assertThat(createCounter).hasValue(expectedCreateCounter);
}

public long getCreateCounter() {
return createCounter.get();
}

public long getUpdateCounter() {
return updateCounter.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,128 @@

package org.apache.flink.autoscaler.jdbc.state;

import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Optional;

import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link JobStateView}. */
class JobStateViewTest {
class JobStateViewTest implements DerbyTestBase {

private static final String DEFAULT_JOB_KEY = "jobKey";
private CountableJDBCStateInteractor jdbcStateInteractor;
private JobStateView jobStateView;

@BeforeEach
void beforeEach() throws Exception {
this.jdbcStateInteractor = new CountableJDBCStateInteractor(getConnection());
this.jobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY);
}

@Test
void testAllOperations() throws Exception {
// All state types should be get together to avoid query database frequently.
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);

// Put data to cache, and it shouldn't exist in database.
var value1 = "value1";
jobStateView.put(COLLECTED_METRICS, value1);
assertThat(jobStateView.get(COLLECTED_METRICS)).isEqualTo(value1);
assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();

var value2 = "value2";
jobStateView.put(SCALING_HISTORY, value2);
assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value2);
assertThat(getValueFromDatabase(SCALING_HISTORY)).isEmpty();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);

// Test creating together.
jobStateView.flush();
assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);

// Test updating data to cache, and they aren't updated in database.
var value3 = "value3";
jobStateView.put(COLLECTED_METRICS, value3);
assertThat(jobStateView.get(COLLECTED_METRICS)).isEqualTo(value3);
assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value1);

var value4 = "value4";
jobStateView.put(SCALING_HISTORY, value4);
assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value4);
assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value2);
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);

// Test updating together.
jobStateView.flush();
assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value3);
assertStateValueForCacheAndDatabase(SCALING_HISTORY, value4);
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 1, 1);

// Test deleting data from cache, and they aren't deleted in database.
jobStateView.remove(COLLECTED_METRICS);
assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value3);

jobStateView.remove(SCALING_HISTORY);
assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value4);
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 1, 1);

// Test updating together.
jobStateView.flush();
assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
assertThat(getValueFromDatabase(SCALING_HISTORY)).isEmpty();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 1, 1);
}

@Test
void test() {}
void testAvoidUnnecessaryFlushes() throws Exception {
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);

var value1 = "value1";
jobStateView.put(COLLECTED_METRICS, value1);
jobStateView.flush();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);

// Avoid unnecessary flush for creating.
jobStateView.flush();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);

// Avoid unnecessary flush for deleting.
jobStateView.clear();
jobStateView.flush();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 0, 1);
jobStateView.flush();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 0, 1);

// Avoid unnecessary flush even if clear is called..
jobStateView.clear();
jobStateView.flush();
jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 0, 1);
}

private void assertStateValueForCacheAndDatabase(StateType stateType, String expectedValue)
throws Exception {
assertThat(jobStateView.get(stateType)).isEqualTo(expectedValue);
assertThat(getValueFromDatabase(stateType)).hasValue(expectedValue);
}

private Optional<String> getValueFromDatabase(StateType stateType) throws Exception {
var jdbcInteractor = new JDBCStateInteractor(getConnection());
return Optional.ofNullable(jdbcInteractor.queryData(DEFAULT_JOB_KEY).get(stateType));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

package org.apache.flink.autoscaler.jdbc.state;
package org.apache.flink.autoscaler.jdbc.state.database;

import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;

/** Test {@link JDBCStore} via Derby database. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

package org.apache.flink.autoscaler.jdbc.state;
package org.apache.flink.autoscaler.jdbc.state.database;

import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;

/** Test {@link JDBCStore} via MySQL 5.6. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

package org.apache.flink.autoscaler.jdbc.state;
package org.apache.flink.autoscaler.jdbc.state.database;

import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;

/** Test {@link JDBCStore} via MySQL 5.7. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
* limitations under the License.
*/

package org.apache.flink.autoscaler.jdbc.state;
package org.apache.flink.autoscaler.jdbc.state.database;

import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;

/** Test {@link JDBCStore} via MySQL 8. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
* limitations under the License.
*/

package org.apache.flink.autoscaler.jdbc.state;
package org.apache.flink.autoscaler.jdbc.state.database;

import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;

/** Test for PostgreSQL 15.1. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void beforeAll(ExtensionContext extensionContext) throws Exception {
}

@Override
public void afterAll(ExtensionContext extensionContext) throws Exception {
public void afterAll(ExtensionContext extensionContext) {
try {
DriverManager.getConnection(String.format("%s;shutdown=true", JDBC_URL)).close();
} catch (SQLException ignored) {
Expand All @@ -66,6 +66,7 @@ public void afterAll(ExtensionContext extensionContext) throws Exception {
@Override
public void afterEach(ExtensionContext extensionContext) throws Exception {
Connection conn = getConnection();
// Clean up all data
for (var tableName : TABLES) {
try (var st = conn.createStatement()) {
st.executeUpdate(String.format("DELETE from %s", tableName));
Expand Down

0 comments on commit 2392b78

Please sign in to comment.