getSerializedState(String jobKey, StateType stateType) {
+ return Optional.ofNullable(getJobStateView(jobKey).get(stateType));
+ }
+
+ protected void removeSerializedState(String jobKey, StateType stateType) {
+ getJobStateView(jobKey).remove(stateType);
+ }
+
+ public void flush(String jobKey) throws Exception {
+ JobStateView jobStateView = cache.get(jobKey);
+ if (jobStateView == null) {
+ LOG.debug("The JobStateView doesn't exist, so skip the flush.");
+ return;
+ }
+ try {
+ jobStateView.flush();
+ } catch (Exception e) {
+ LOG.error(
+ "Error while flush autoscaler info to database, invalidating to clear the cache",
+ e);
+ removeInfoFromCache(jobKey);
+ throw e;
+ }
+ }
+
+ public void removeInfoFromCache(String jobKey) {
+ cache.remove(jobKey);
+ }
+
+ public void clearAll(String jobKey) {
+ getJobStateView(jobKey).clear();
+ }
+
+ private JobStateView getJobStateView(String jobKey) {
+ return cache.computeIfAbsent(
+ jobKey,
+ (id) -> {
+ try {
+ return createJobStateView(jobKey);
+ } catch (Exception exception) {
+ throw new RuntimeException(
+ "Meet exception during create job state view.", exception);
+ }
+ });
+ }
+
+ private JobStateView createJobStateView(String jobKey) throws Exception {
+ return new JobStateView(jdbcStateInteractor, jobKey);
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java
new file mode 100644
index 0000000000..2ea5b709ea
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/JobStateView.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_CREATE;
+import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_DELETE;
+import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NEEDS_UPDATE;
+import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.NOT_NEEDED;
+import static org.apache.flink.autoscaler.jdbc.state.JobStateView.State.UP_TO_DATE;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** The view of job state. */
+@NotThreadSafe
+public class JobStateView {
+
+ /**
+ * The state of state type about the cache and database.
+ *
+ * Note: {@link #inLocally} and {@link #inDatabase} are only for understand, we don't use
+ * them.
+ */
+ @SuppressWarnings("unused")
+ enum State {
+
+ /** State doesn't exist at database, and it's not used so far, so it's not needed. */
+ NOT_NEEDED(false, false, false),
+ /** State is only stored locally, not created in JDBC database yet. */
+ NEEDS_CREATE(true, false, true),
+ /** State exists in JDBC database but there are newer local changes. */
+ NEEDS_UPDATE(true, true, true),
+ /** State is stored locally and in database, and they are same. */
+ UP_TO_DATE(true, true, false),
+ /** State is stored in database, but it's deleted in local. */
+ NEEDS_DELETE(false, true, true);
+
+ /** The data of this state type is stored in locally when it is true. */
+ private final boolean inLocally;
+
+ /** The data of this state type is stored in database when it is true. */
+ private final boolean inDatabase;
+
+ /** The data of this state type is stored in database when it is true. */
+ private final boolean needFlush;
+
+ State(boolean inLocally, boolean inDatabase, boolean needFlush) {
+ this.inLocally = inLocally;
+ this.inDatabase = inDatabase;
+ this.needFlush = needFlush;
+ }
+
+ public boolean isNeedFlush() {
+ return needFlush;
+ }
+ }
+
+ /** Transition old state to the new state when some operations happen to cache or database. */
+ private static class StateTransitioner {
+
+ /** The transition when put data to cache. */
+ @Nonnull
+ public State putTransition(@Nonnull State oldState) {
+ switch (oldState) {
+ case NOT_NEEDED:
+ case NEEDS_CREATE:
+ return NEEDS_CREATE;
+ case NEEDS_UPDATE:
+ case UP_TO_DATE:
+ case NEEDS_DELETE:
+ return NEEDS_UPDATE;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown state : %s.", oldState));
+ }
+ }
+
+ /** The transition when delete data from cache. */
+ @Nonnull
+ public State deleteTransition(@Nonnull State oldState) {
+ switch (oldState) {
+ case NOT_NEEDED:
+ case NEEDS_CREATE:
+ return NOT_NEEDED;
+ case NEEDS_UPDATE:
+ case UP_TO_DATE:
+ case NEEDS_DELETE:
+ return NEEDS_DELETE;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown state : %s.", oldState));
+ }
+ }
+
+ /** The transition when flush data from cache to database. */
+ @Nonnull
+ public State flushTransition(@Nonnull State oldState) {
+ switch (oldState) {
+ case NOT_NEEDED:
+ case NEEDS_DELETE:
+ return NOT_NEEDED;
+ case NEEDS_CREATE:
+ case NEEDS_UPDATE:
+ case UP_TO_DATE:
+ return UP_TO_DATE;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown state : %s.", oldState));
+ }
+ }
+ }
+
+ private static final StateTransitioner STATE_TRANSITIONER = new StateTransitioner();
+
+ private final JDBCStateInteractor jdbcStateInteractor;
+ private final String jobKey;
+ private final Map data;
+
+ /**
+ * The state is maintained for each state type, which means that part of state types of current
+ * job are stored in the database, but the rest of the state types may have been created in the
+ * database.
+ */
+ private final Map states;
+
+ public JobStateView(JDBCStateInteractor jdbcStateInteractor, String jobKey) throws Exception {
+ this.jdbcStateInteractor = jdbcStateInteractor;
+ this.jobKey = jobKey;
+ this.data = jdbcStateInteractor.queryData(jobKey);
+ this.states = generateStates(this.data);
+ }
+
+ private Map generateStates(Map data) {
+ final var states = new HashMap();
+ for (StateType stateType : StateType.values()) {
+ if (data.containsKey(stateType)) {
+ states.put(stateType, UP_TO_DATE);
+ } else {
+ states.put(stateType, NOT_NEEDED);
+ }
+ }
+ return states;
+ }
+
+ public String get(StateType stateType) {
+ return data.get(stateType);
+ }
+
+ public void put(StateType stateType, String value) {
+ data.put(stateType, value);
+ updateState(stateType, STATE_TRANSITIONER::putTransition);
+ }
+
+ public void remove(StateType stateType) {
+ var oldKey = data.remove(stateType);
+ if (oldKey == null) {
+ return;
+ }
+ updateState(stateType, STATE_TRANSITIONER::deleteTransition);
+ }
+
+ public void clear() {
+ if (data.isEmpty()) {
+ return;
+ }
+ var iterator = data.keySet().iterator();
+ while (iterator.hasNext()) {
+ var stateType = iterator.next();
+ iterator.remove();
+ updateState(stateType, STATE_TRANSITIONER::deleteTransition);
+ }
+ }
+
+ public void flush() throws Exception {
+ if (states.values().stream().noneMatch(State::isNeedFlush)) {
+ // No any state needs to be flushed.
+ return;
+ }
+
+ // Build the data that need to be flushed.
+ var flushData = new HashMap>(3);
+ for (Map.Entry stateEntry : states.entrySet()) {
+ State state = stateEntry.getValue();
+ if (!state.isNeedFlush()) {
+ continue;
+ }
+ StateType stateType = stateEntry.getKey();
+ flushData.compute(
+ state,
+ (st, list) -> {
+ if (list == null) {
+ list = new LinkedList<>();
+ }
+ list.add(stateType);
+ return list;
+ });
+ }
+
+ for (var entry : flushData.entrySet()) {
+ State state = entry.getKey();
+ List stateTypes = entry.getValue();
+ switch (state) {
+ case NEEDS_CREATE:
+ jdbcStateInteractor.createData(jobKey, stateTypes, data);
+ break;
+ case NEEDS_DELETE:
+ jdbcStateInteractor.deleteData(jobKey, stateTypes);
+ break;
+ case NEEDS_UPDATE:
+ jdbcStateInteractor.updateData(jobKey, stateTypes, data);
+ break;
+ default:
+ throw new IllegalStateException(String.format("Unknown state : %s", state));
+ }
+ for (var stateType : stateTypes) {
+ updateState(stateType, STATE_TRANSITIONER::flushTransition);
+ }
+ }
+ }
+
+ private void updateState(StateType stateType, Function stateTransitioner) {
+ states.compute(
+ stateType,
+ (type, oldState) -> {
+ checkState(
+ oldState != null,
+ "The state of each state type should be maintained in states. "
+ + "It may be a bug, please raise a JIRA to Flink Community.");
+ return stateTransitioner.apply(oldState);
+ });
+ }
+
+ @VisibleForTesting
+ public Map getDataReadOnly() {
+ return Collections.unmodifiableMap(data);
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
new file mode 100644
index 0000000000..ed28a0fc58
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/main/java/org/apache/flink/autoscaler/jdbc/state/StateType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** The state type. */
+public enum StateType {
+ SCALING_HISTORY,
+ SCALING_TRACKING,
+ COLLECTED_METRICS,
+ PARALLELISM_OVERRIDES;
+
+ public static StateType valueOf(int ordinal) {
+ checkArgument(
+ ordinal >= 0 && ordinal < values().length, "It's a out-of-bounded ordinal index.");
+ return values()[ordinal];
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJDBCStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJDBCStateInteractorITCase.java
new file mode 100644
index 0000000000..f986944948
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJDBCStateInteractorITCase.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The abstract IT case for {@link JDBCStateInteractor}. */
+public abstract class AbstractJDBCStateInteractorITCase implements DatabaseTest {
+
+ @Test
+ void testAllOperations() throws Exception {
+ var jobKey = "jobKey";
+ var value1 = "value1";
+ var value2 = "value2";
+ var value3 = "value3";
+ try (var conn = getConnection()) {
+ var jdbcStateInteractor = new JDBCStateInteractor(conn);
+ assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
+
+ // Test for creating data.
+ jdbcStateInteractor.createData(
+ jobKey,
+ List.of(COLLECTED_METRICS, SCALING_HISTORY),
+ Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
+ assertThat(jdbcStateInteractor.queryData(jobKey))
+ .isEqualTo(Map.of(COLLECTED_METRICS, value1, SCALING_HISTORY, value2));
+
+ // Test for updating data.
+ jdbcStateInteractor.updateData(
+ jobKey,
+ List.of(COLLECTED_METRICS),
+ Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
+ assertThat(jdbcStateInteractor.queryData(jobKey))
+ .isEqualTo(Map.of(COLLECTED_METRICS, value3, SCALING_HISTORY, value2));
+
+ // Test for deleting data.
+ jdbcStateInteractor.deleteData(jobKey, List.of(COLLECTED_METRICS));
+ assertThat(jdbcStateInteractor.queryData(jobKey))
+ .isEqualTo(Map.of(SCALING_HISTORY, value2));
+ jdbcStateInteractor.deleteData(jobKey, List.of(SCALING_HISTORY));
+ assertThat(jdbcStateInteractor.queryData(jobKey)).isEmpty();
+ }
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJDBCStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJDBCStoreITCase.java
new file mode 100644
index 0000000000..0ef521e86c
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/AbstractJDBCStoreITCase.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** The abstract IT case for {@link JDBCStore}. */
+public abstract class AbstractJDBCStoreITCase implements DatabaseTest {
+
+ private static final String DEFAULT_JOB_KEY = "jobKey";
+ private Connection conn;
+ private CountableJDBCStateInteractor jdbcStateInteractor;
+ private JDBCStore jdbcStore;
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ this.conn = getConnection();
+ this.jdbcStateInteractor = new CountableJDBCStateInteractor(conn);
+ this.jdbcStore = new JDBCStore(jdbcStateInteractor);
+ }
+
+ @AfterEach
+ void afterEach() throws SQLException {
+ if (conn != null) {
+ conn.close();
+ }
+ }
+
+ @Test
+ void testCaching() throws Exception {
+ var value1 = "value1";
+ var value2 = "value2";
+ var value3 = "value3";
+
+ jdbcStateInteractor.assertCountableJDBCInteractor(0, 0, 0, 0);
+
+ // Query from database.
+ jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // The rest of state types of same job key shouldn't query database.
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY)).isEmpty();
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Putting does not go to database, unless flushing.
+ jdbcStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value1);
+ jdbcStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY, value2);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Flush together! Create counter is one.
+ jdbcStore.flush(DEFAULT_JOB_KEY);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Get
+ assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+ assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
+ var job2 = "job2";
+ assertThat(jdbcStore.getSerializedState(job2, COLLECTED_METRICS)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(2, 0, 0, 1);
+ assertThat(jdbcStore.getSerializedState(job2, SCALING_HISTORY)).isEmpty();
+
+ // Put and flush state for job2
+ jdbcStore.putSerializedState(job2, SCALING_TRACKING, value3);
+ jdbcStore.flush(job2);
+ jdbcStateInteractor.assertCountableJDBCInteractor(2, 0, 0, 2);
+
+ // Build the new JDBCStore
+ var newJdbcStore = new JDBCStore(jdbcStateInteractor);
+ assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS))
+ .hasValue(value1);
+ assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY))
+ .hasValue(value2);
+ jdbcStateInteractor.assertCountableJDBCInteractor(3, 0, 0, 2);
+
+ assertThat(newJdbcStore.getSerializedState(job2, SCALING_TRACKING)).hasValue(value3);
+ jdbcStateInteractor.assertCountableJDBCInteractor(4, 0, 0, 2);
+
+ // Removing the data from cache and query from database again.
+ newJdbcStore.removeInfoFromCache(job2);
+ assertThat(newJdbcStore.getSerializedState(job2, SCALING_TRACKING)).hasValue(value3);
+ jdbcStateInteractor.assertCountableJDBCInteractor(5, 0, 0, 2);
+ }
+
+ @Test
+ void testDeleting() throws Exception {
+ var value1 = "value1";
+
+ jdbcStateInteractor.assertCountableJDBCInteractor(0, 0, 0, 0);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Get from cache, and it shouldn't exist in database.
+ jdbcStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value1);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS))
+ .hasValue(value1);
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Deleting before flushing
+ jdbcStore.removeSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Flush method shouldn't flush any data.
+ jdbcStore.flush(DEFAULT_JOB_KEY);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Put and flush data to database.
+ var value2 = "value2";
+ jdbcStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value2);
+ jdbcStore.flush(DEFAULT_JOB_KEY);
+ assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value2);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Deleting after flushing, data is deleted in cache, but it still exists in database.
+ jdbcStore.removeSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value2);
+
+ // Flushing
+ jdbcStore.flush(DEFAULT_JOB_KEY);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 0, 1);
+
+ // Get from database for a new JDBC Store.
+ var newJdbcStore = new JDBCStore(jdbcStateInteractor);
+ assertThat(newJdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ }
+
+ @Test
+ void testErrorHandlingDuringFlush() throws Exception {
+ var value1 = "value1";
+ var value2 = "value2";
+ jdbcStateInteractor.assertCountableJDBCInteractor(0, 0, 0, 0);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+
+ // Modify the database directly.
+ var tmpJdbcInteractor = new JDBCStateInteractor(conn);
+ tmpJdbcInteractor.createData(
+ DEFAULT_JOB_KEY, List.of(COLLECTED_METRICS), Map.of(COLLECTED_METRICS, value1));
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1);
+
+ // Cache cannot read data of database.
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Create it with SQLException due to the data has already existed in database.
+ jdbcStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value2);
+ assertThatThrownBy(() -> jdbcStore.flush(DEFAULT_JOB_KEY))
+ .hasCauseInstanceOf(SQLException.class);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Get normally.
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS))
+ .hasValue(value1);
+ jdbcStateInteractor.assertCountableJDBCInteractor(2, 0, 0, 1);
+ }
+
+ @Test
+ void testErrorHandlingDuringQuery() throws Exception {
+ var value1 = "value1";
+ final var expectedException = new RuntimeException("Database isn't stable.");
+
+ var exceptionableJdbcStateInteractor =
+ new CountableJDBCStateInteractor(conn) {
+ private final AtomicBoolean isFirst = new AtomicBoolean(true);
+
+ @Override
+ public Map queryData(String jobKey) throws Exception {
+ if (isFirst.get()) {
+ isFirst.set(false);
+ throw expectedException;
+ }
+ return super.queryData(jobKey);
+ }
+ };
+
+ var exceptionableJdbcStore = new JDBCStore(exceptionableJdbcStateInteractor);
+
+ // First get will fail.
+ jdbcStateInteractor.assertCountableJDBCInteractor(0, 0, 0, 0);
+ assertThatThrownBy(
+ () ->
+ exceptionableJdbcStore.getSerializedState(
+ DEFAULT_JOB_KEY, COLLECTED_METRICS))
+ .rootCause()
+ .isSameAs(expectedException);
+
+ // It's recovered.
+ assertThat(exceptionableJdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS))
+ .isEmpty();
+ exceptionableJdbcStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value1);
+ exceptionableJdbcStore.flush(DEFAULT_JOB_KEY);
+ assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+ }
+
+ @Test
+ void testDiscardAllState() throws Exception {
+ var value1 = "value1";
+ var value2 = "value2";
+ var value3 = "value3";
+
+ // Put and flush all state types first.
+ jdbcStore.putSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS, value1);
+ jdbcStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY, value2);
+ jdbcStore.putSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING, value3);
+ jdbcStore.flush(DEFAULT_JOB_KEY);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+ assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
+ assertStateValueForCacheAndDatabase(SCALING_TRACKING, value3);
+
+ // Clear all in cache.
+ jdbcStore.clearAll(DEFAULT_JOB_KEY);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY)).isEmpty();
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).hasValue(value1);
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, SCALING_HISTORY)).hasValue(value2);
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, SCALING_TRACKING)).hasValue(value3);
+
+ // Flush!
+ jdbcStore.flush(DEFAULT_JOB_KEY);
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_HISTORY)).isEmpty();
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, SCALING_TRACKING)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, COLLECTED_METRICS)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, SCALING_HISTORY)).isEmpty();
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, SCALING_TRACKING)).isEmpty();
+ }
+
+ private void assertStateValueForCacheAndDatabase(StateType stateType, String expectedValue)
+ throws Exception {
+ assertThat(jdbcStore.getSerializedState(DEFAULT_JOB_KEY, stateType))
+ .hasValue(expectedValue);
+ assertThat(getValueFromDatabase(DEFAULT_JOB_KEY, stateType)).hasValue(expectedValue);
+ }
+
+ private Optional getValueFromDatabase(String jobKey, StateType stateType)
+ throws Exception {
+ var jdbcInteractor = new JDBCStateInteractor(conn);
+ return Optional.ofNullable(jdbcInteractor.queryData(jobKey).get(stateType));
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJDBCStateInteractor.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJDBCStateInteractor.java
new file mode 100644
index 0000000000..4701a8e83e
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/CountableJDBCStateInteractor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import java.sql.Connection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Countable {@link JDBCStateInteractor}. */
+public class CountableJDBCStateInteractor extends JDBCStateInteractor {
+
+ private final AtomicLong queryCounter;
+ private final AtomicLong deleteCounter;
+ private final AtomicLong createCounter;
+ private final AtomicLong updateCounter;
+
+ public CountableJDBCStateInteractor(Connection conn) {
+ super(conn);
+ queryCounter = new AtomicLong();
+ deleteCounter = new AtomicLong();
+ createCounter = new AtomicLong();
+ updateCounter = new AtomicLong();
+ }
+
+ @Override
+ public Map queryData(String jobKey) throws Exception {
+ queryCounter.incrementAndGet();
+ return super.queryData(jobKey);
+ }
+
+ @Override
+ public void deleteData(String jobKey, List deletedStateTypes) throws Exception {
+ deleteCounter.incrementAndGet();
+ super.deleteData(jobKey, deletedStateTypes);
+ }
+
+ @Override
+ public void createData(
+ String jobKey, List createdStateTypes, Map data)
+ throws Exception {
+ createCounter.incrementAndGet();
+ super.createData(jobKey, createdStateTypes, data);
+ }
+
+ @Override
+ public void updateData(
+ String jobKey, List updatedStateTypes, Map data)
+ throws Exception {
+ updateCounter.incrementAndGet();
+ super.updateData(jobKey, updatedStateTypes, data);
+ }
+
+ public void assertCountableJDBCInteractor(
+ long expectedQueryCounter,
+ long expectedDeleteCounter,
+ long expectedUpdateCounter,
+ long expectedCreateCounter) {
+ assertThat(queryCounter).hasValue(expectedQueryCounter);
+ assertThat(deleteCounter).hasValue(expectedDeleteCounter);
+ assertThat(updateCounter).hasValue(expectedUpdateCounter);
+ assertThat(createCounter).hasValue(expectedCreateCounter);
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
new file mode 100644
index 0000000000..dea99cedcf
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/JobStateViewTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Optional;
+
+import static org.apache.flink.autoscaler.jdbc.state.StateType.COLLECTED_METRICS;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_HISTORY;
+import static org.apache.flink.autoscaler.jdbc.state.StateType.SCALING_TRACKING;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link JobStateView}. */
+class JobStateViewTest implements DerbyTestBase {
+
+ private static final String DEFAULT_JOB_KEY = "jobKey";
+ private Connection conn;
+ private CountableJDBCStateInteractor jdbcStateInteractor;
+ private JobStateView jobStateView;
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ this.conn = getConnection();
+ this.jdbcStateInteractor = new CountableJDBCStateInteractor(conn);
+ this.jobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+ }
+
+ @AfterEach
+ void afterEach() throws SQLException {
+ if (conn != null) {
+ conn.close();
+ }
+ }
+
+ @Test
+ void testAllOperations() throws Exception {
+ // All state types should be get together to avoid query database frequently.
+ assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+ assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Put data to cache, and it shouldn't exist in database.
+ var value1 = "value1";
+ jobStateView.put(COLLECTED_METRICS, value1);
+ assertThat(jobStateView.get(COLLECTED_METRICS)).isEqualTo(value1);
+ assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
+
+ var value2 = "value2";
+ jobStateView.put(SCALING_HISTORY, value2);
+ assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value2);
+ assertThat(getValueFromDatabase(SCALING_HISTORY)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 0);
+
+ // Test creating together.
+ jobStateView.flush();
+ assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+ assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Test updating data to cache, and they aren't updated in database.
+ var value3 = "value3";
+ jobStateView.put(COLLECTED_METRICS, value3);
+ assertThat(jobStateView.get(COLLECTED_METRICS)).isEqualTo(value3);
+ assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value1);
+
+ var value4 = "value4";
+ jobStateView.put(SCALING_HISTORY, value4);
+ assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value4);
+ assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value2);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Test updating together.
+ jobStateView.flush();
+ assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value3);
+ assertStateValueForCacheAndDatabase(SCALING_HISTORY, value4);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 1, 1);
+
+ // Test deleting data from cache, and they aren't deleted in database.
+ jobStateView.remove(COLLECTED_METRICS);
+ assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+ assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value3);
+
+ jobStateView.remove(SCALING_HISTORY);
+ assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
+ assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value4);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 1, 1);
+
+ // Test updating together.
+ jobStateView.flush();
+ assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+ assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
+ assertThat(jobStateView.get(SCALING_HISTORY)).isNull();
+ assertThat(getValueFromDatabase(SCALING_HISTORY)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 1, 1);
+ }
+
+ @Test
+ void testAvoidUnnecessaryFlushes() throws Exception {
+ var value1 = "value1";
+ jobStateView.put(COLLECTED_METRICS, value1);
+ jobStateView.flush();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Avoid unnecessary flush for creating.
+ jobStateView.flush();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Avoid unnecessary flush for deleting.
+ jobStateView.clear();
+ jobStateView.flush();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 0, 1);
+ jobStateView.flush();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 0, 1);
+
+ // Avoid unnecessary flush even if clear is called..
+ jobStateView.clear();
+ jobStateView.flush();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 0, 1);
+ }
+
+ @Test
+ void testCreateDeleteAndUpdateWorkTogether() throws Exception {
+ var value1 = "value1";
+ var value2 = "value2";
+ var value3 = "value3";
+ var value4 = "value4";
+ // Create 2 state types first.
+ jobStateView.put(COLLECTED_METRICS, value1);
+ jobStateView.put(SCALING_HISTORY, value2);
+ jobStateView.flush();
+ assertStateValueForCacheAndDatabase(COLLECTED_METRICS, value1);
+ assertStateValueForCacheAndDatabase(SCALING_HISTORY, value2);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Delete one, update one and create one.
+ jobStateView.remove(COLLECTED_METRICS);
+ jobStateView.put(SCALING_HISTORY, value3);
+ jobStateView.put(SCALING_TRACKING, value4);
+
+ assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+ assertThat(getValueFromDatabase(COLLECTED_METRICS)).hasValue(value1);
+ assertThat(jobStateView.get(SCALING_HISTORY)).isEqualTo(value3);
+ assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value2);
+ assertThat(jobStateView.get(SCALING_TRACKING)).isEqualTo(value4);
+ assertThat(getValueFromDatabase(SCALING_TRACKING)).isEmpty();
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 0, 0, 1);
+
+ // Flush!
+ jobStateView.flush();
+ assertThat(jobStateView.get(COLLECTED_METRICS)).isNull();
+ assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
+ assertStateValueForCacheAndDatabase(SCALING_HISTORY, value3);
+ assertStateValueForCacheAndDatabase(SCALING_TRACKING, value4);
+ jdbcStateInteractor.assertCountableJDBCInteractor(1, 1, 1, 2);
+
+ // Build the new JobStateView
+ var newJobStateView = new JobStateView(jdbcStateInteractor, DEFAULT_JOB_KEY);
+ assertThat(newJobStateView.get(COLLECTED_METRICS)).isNull();
+ assertThat(getValueFromDatabase(COLLECTED_METRICS)).isEmpty();
+ assertThat(newJobStateView.get(SCALING_HISTORY)).isEqualTo(value3);
+ assertThat(getValueFromDatabase(SCALING_HISTORY)).hasValue(value3);
+ assertThat(newJobStateView.get(SCALING_TRACKING)).isEqualTo(value4);
+ assertThat(getValueFromDatabase(SCALING_TRACKING)).hasValue(value4);
+ jdbcStateInteractor.assertCountableJDBCInteractor(2, 1, 1, 2);
+ }
+
+ private void assertStateValueForCacheAndDatabase(StateType stateType, String expectedValue)
+ throws Exception {
+ assertThat(jobStateView.get(stateType)).isEqualTo(expectedValue);
+ assertThat(getValueFromDatabase(stateType)).hasValue(expectedValue);
+ }
+
+ private Optional getValueFromDatabase(StateType stateType) throws Exception {
+ var jdbcInteractor = new JDBCStateInteractor(conn);
+ return Optional.ofNullable(jdbcInteractor.queryData(DEFAULT_JOB_KEY).get(stateType));
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/DerbyJDBCStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/DerbyJDBCStateInteractorITCase.java
new file mode 100644
index 0000000000..1a434c2b68
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/DerbyJDBCStateInteractorITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStateInteractorITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStateInteractor;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+
+/** Test {@link JDBCStateInteractor} via Derby database. */
+public class DerbyJDBCStateInteractorITCase extends AbstractJDBCStateInteractorITCase
+ implements DerbyTestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/DerbyJDBCStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/DerbyJDBCStoreITCase.java
new file mode 100644
index 0000000000..c9343b954b
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/DerbyJDBCStoreITCase.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.derby.DerbyTestBase;
+
+/** Test {@link JDBCStore} via Derby database. */
+public class DerbyJDBCStoreITCase extends AbstractJDBCStoreITCase implements DerbyTestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL56JDBCStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL56JDBCStateInteractorITCase.java
new file mode 100644
index 0000000000..3dfdd0e72b
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL56JDBCStateInteractorITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStateInteractorITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStateInteractor;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;
+
+/** Test {@link JDBCStateInteractor} via MySQL 5.6.x. */
+public class MySQL56JDBCStateInteractorITCase extends AbstractJDBCStateInteractorITCase
+ implements MySQL56TestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL56JDBCStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL56JDBCStoreITCase.java
new file mode 100644
index 0000000000..51326384c8
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL56JDBCStoreITCase.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL56TestBase;
+
+/** Test {@link JDBCStore} via MySQL 5.6.x. */
+public class MySQL56JDBCStoreITCase extends AbstractJDBCStoreITCase implements MySQL56TestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL57JDBCStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL57JDBCStateInteractorITCase.java
new file mode 100644
index 0000000000..4d80999100
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL57JDBCStateInteractorITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStateInteractorITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStateInteractor;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;
+
+/** Test {@link JDBCStateInteractor} via MySQL 5.7.x. */
+public class MySQL57JDBCStateInteractorITCase extends AbstractJDBCStateInteractorITCase
+ implements MySQL57TestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL57JDBCStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL57JDBCStoreITCase.java
new file mode 100644
index 0000000000..07517b3fe7
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL57JDBCStoreITCase.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL57TestBase;
+
+/** Test {@link JDBCStore} via MySQL 5.7.x. */
+public class MySQL57JDBCStoreITCase extends AbstractJDBCStoreITCase implements MySQL57TestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL8JDBCStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL8JDBCStateInteractorITCase.java
new file mode 100644
index 0000000000..86cc8877ad
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL8JDBCStateInteractorITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStateInteractorITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStateInteractor;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
+
+/** Test {@link JDBCStateInteractor} via MySQL 8.x. */
+public class MySQL8JDBCStateInteractorITCase extends AbstractJDBCStateInteractorITCase
+ implements MySQL8TestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL8JDBCStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL8JDBCStoreITCase.java
new file mode 100644
index 0000000000..b0929e3f4e
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/MySQL8JDBCStoreITCase.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.mysql.MySQL8TestBase;
+
+/** Test {@link JDBCStore} via MySQL 8. */
+public class MySQL8JDBCStoreITCase extends AbstractJDBCStoreITCase implements MySQL8TestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/PostgreSQLJDBCStateInteractorITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/PostgreSQLJDBCStateInteractorITCase.java
new file mode 100644
index 0000000000..874156e997
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/PostgreSQLJDBCStateInteractorITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStateInteractorITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStateInteractor;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;
+
+/** Test {@link JDBCStateInteractor} via Postgre SQL. */
+public class PostgreSQLJDBCStateInteractorITCase extends AbstractJDBCStateInteractorITCase
+ implements PostgreSQLTestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/PostgreSQLJDBCStoreITCase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/PostgreSQLJDBCStoreITCase.java
new file mode 100644
index 0000000000..cd3b2669db
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/state/database/PostgreSQLJDBCStoreITCase.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.state.database;
+
+import org.apache.flink.autoscaler.jdbc.state.AbstractJDBCStoreITCase;
+import org.apache.flink.autoscaler.jdbc.state.JDBCStore;
+import org.apache.flink.autoscaler.jdbc.testutils.databases.postgres.PostgreSQLTestBase;
+
+/** Test {@link JDBCStore} via Postgre SQL. */
+public class PostgreSQLJDBCStoreITCase extends AbstractJDBCStoreITCase
+ implements PostgreSQLTestBase {}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
new file mode 100644
index 0000000000..261578b32b
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/DatabaseTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases;
+
+import java.sql.Connection;
+
+/** Database testing. */
+public interface DatabaseTest {
+
+ Connection getConnection() throws Exception;
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
new file mode 100644
index 0000000000..4ac232b87a
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyExtension.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.derby;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.List;
+
+/** The extension of Derby. */
+public class DerbyExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback {
+
+ private static final List TABLES = List.of("t_flink_autoscaler_state_store");
+ private static final String JDBC_URL = "jdbc:derby:memory:test";
+
+ public Connection getConnection() throws Exception {
+ return DriverManager.getConnection(JDBC_URL);
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext extensionContext) throws Exception {
+ DriverManager.getConnection(String.format("%s;create=true", JDBC_URL)).close();
+
+ var stateStoreDDL =
+ "CREATE TABLE t_flink_autoscaler_state_store\n"
+ + "(\n"
+ + " id BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n"
+ + " update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n"
+ + " job_key VARCHAR(191) NOT NULL,\n"
+ + " state_type_id SMALLINT NOT NULL,\n"
+ + " state_value CLOB NOT NULL,\n"
+ + " PRIMARY KEY (id)\n"
+ + ")\n";
+
+ var createIndex =
+ "CREATE UNIQUE INDEX un_job_state_type_inx ON t_flink_autoscaler_state_store (job_key, state_type_id)";
+ try (var conn = getConnection();
+ var st = conn.createStatement()) {
+ st.execute(stateStoreDDL);
+ st.execute(createIndex);
+ }
+ }
+
+ @Override
+ public void afterAll(ExtensionContext extensionContext) throws Exception {
+ try (var conn = getConnection();
+ var st = conn.createStatement()) {
+ for (var tableName : TABLES) {
+ st.executeUpdate(String.format("DROP TABLE %s", tableName));
+ }
+ }
+ try {
+ DriverManager.getConnection(String.format("%s;shutdown=true", JDBC_URL)).close();
+ } catch (SQLException ignored) {
+ }
+ }
+
+ @Override
+ public void afterEach(ExtensionContext extensionContext) throws Exception {
+ // Clean up all data
+ try (var conn = getConnection();
+ var st = conn.createStatement()) {
+ for (var tableName : TABLES) {
+ st.executeUpdate(String.format("DELETE from %s", tableName));
+ }
+ }
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
new file mode 100644
index 0000000000..e0ac3f76f8
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/derby/DerbyTestBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.derby;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** Derby database for testing. */
+public interface DerbyTestBase extends DatabaseTest {
+
+ @RegisterExtension DerbyExtension DERBY_EXTENSION = new DerbyExtension();
+
+ @Override
+ default Connection getConnection() throws Exception {
+ return DERBY_EXTENSION.getConnection();
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
new file mode 100644
index 0000000000..f2b1858cef
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL56TestBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** MySQL 5.6.x database for testing. */
+public interface MySQL56TestBase extends DatabaseTest {
+
+ @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.6.51");
+
+ default Connection getConnection() throws Exception {
+ return MYSQL_EXTENSION.getConnection();
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
new file mode 100644
index 0000000000..0b8a69685b
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL57TestBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** MySQL 5.7.x database for testing. */
+public interface MySQL57TestBase extends DatabaseTest {
+
+ @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("5.7.41");
+
+ default Connection getConnection() throws Exception {
+ return MYSQL_EXTENSION.getConnection();
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
new file mode 100644
index 0000000000..daf7788576
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQL8TestBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** MySQL 8.x database for testing. */
+public interface MySQL8TestBase extends DatabaseTest {
+
+ @RegisterExtension MySQLExtension MYSQL_EXTENSION = new MySQLExtension("8.0.32");
+
+ default Connection getConnection() throws Exception {
+ return MYSQL_EXTENSION.getConnection();
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
new file mode 100644
index 0000000000..46fe1584f1
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/mysql/MySQLExtension.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.mysql;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.containers.MySQLContainer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+/** The extension of MySQL. */
+public class MySQLExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback {
+
+ private static final String MYSQL_INIT_SCRIPT = "schema/mysql/mysql_schema.sql";
+ private static final String DATABASE_NAME = "flink_autoscaler";
+ private static final String USER_NAME = "root";
+ private static final String PASSWORD = "123456";
+ private static final List TABLES = List.of("t_flink_autoscaler_state_store");
+
+ private final MySQLContainer> container;
+
+ public MySQLExtension(String mysqlVersion) {
+ this.container =
+ new MySQLContainer<>(String.format("mysql:%s", mysqlVersion))
+ .withCommand("--character-set-server=utf8")
+ .withDatabaseName(DATABASE_NAME)
+ .withUsername(USER_NAME)
+ .withPassword(PASSWORD)
+ .withInitScript(MYSQL_INIT_SCRIPT)
+ .withEnv("MYSQL_ROOT_HOST", "%");
+ }
+
+ public Connection getConnection() throws Exception {
+ return DriverManager.getConnection(
+ container.getJdbcUrl(), container.getUsername(), container.getPassword());
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext extensionContext) {
+ container.start();
+ }
+
+ @Override
+ public void afterAll(ExtensionContext extensionContext) {
+ container.stop();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext extensionContext) throws Exception {
+ try (var conn = getConnection();
+ var st = conn.createStatement()) {
+ for (var tableName : TABLES) {
+ st.executeUpdate(String.format("DELETE from %s", tableName));
+ }
+ }
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
new file mode 100644
index 0000000000..72abf48da0
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLExtension.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.AfterEachCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.containers.PostgreSQLContainer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.util.List;
+
+/** The extension of PostgreSQL. */
+public class PostgreSQLExtension implements BeforeAllCallback, AfterAllCallback, AfterEachCallback {
+
+ private static final String INIT_SCRIPT = "schema/postgres/postgres_schema.sql";
+ private static final String DATABASE_NAME = "flink_autoscaler";
+ private static final String USER_NAME = "root";
+ private static final String PASSWORD = "123456";
+ private static final List TABLES = List.of("t_flink_autoscaler_state_store");
+
+ private final PostgreSQLContainer> container;
+
+ public PostgreSQLExtension(String postgresqlVersion) {
+ this.container =
+ new PostgreSQLContainer<>(String.format("postgres:%s", postgresqlVersion))
+ .withDatabaseName(DATABASE_NAME)
+ .withUsername(USER_NAME)
+ .withPassword(PASSWORD)
+ .withInitScript(INIT_SCRIPT)
+ .withEnv("POSTGRES_MAX_CONNECTIONS", "10");
+ }
+
+ public Connection getConnection() throws Exception {
+ return DriverManager.getConnection(
+ container.getJdbcUrl(), container.getUsername(), container.getPassword());
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext extensionContext) {
+ container.start();
+ }
+
+ @Override
+ public void afterAll(ExtensionContext extensionContext) {
+ container.stop();
+ }
+
+ @Override
+ public void afterEach(ExtensionContext extensionContext) throws Exception {
+ try (var conn = getConnection();
+ var st = conn.createStatement()) {
+ for (var tableName : TABLES) {
+ st.executeUpdate(String.format("DELETE from %s", tableName));
+ }
+ }
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
new file mode 100644
index 0000000000..1e27a6e129
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/java/org/apache/flink/autoscaler/jdbc/testutils/databases/postgres/PostgreSQLTestBase.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.jdbc.testutils.databases.postgres;
+
+import org.apache.flink.autoscaler.jdbc.testutils.databases.DatabaseTest;
+
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.sql.Connection;
+
+/** PostgreSQL database for testing. */
+public interface PostgreSQLTestBase extends DatabaseTest {
+
+ @RegisterExtension PostgreSQLExtension POSTGRE_SQL_EXTENSION = new PostgreSQLExtension("15.1");
+
+ default Connection getConnection() throws Exception {
+ return POSTGRE_SQL_EXTENSION.getConnection();
+ }
+}
diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties b/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties
new file mode 100644
index 0000000000..4a6ab83224
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/resources/log4j2.properties
@@ -0,0 +1,27 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+rootLogger.appenderRef.console.ref = ConsoleAppender
+
+# Log all infos to the console
+appender.console.name = ConsoleAppender
+appender.console.type = CONSOLE
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %style{%d}{yellow} %style{%-30c{1.}}{cyan} %highlight{[%-5level] [%X{resource.namespace}.%X{resource.name}] %msg%n%throwable}
+
diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/schema/mysql/mysql_schema.sql b/flink-autoscaler-plugin-jdbc/src/test/resources/schema/mysql/mysql_schema.sql
new file mode 100644
index 0000000000..01e074bb61
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/resources/schema/mysql/mysql_schema.sql
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+create database if not exists `flink_autoscaler` character set utf8mb4 collate utf8mb4_general_ci;
+
+use`flink_autoscaler`;
+
+create table `t_flink_autoscaler_state_store`
+(
+ `id` bigint not null auto_increment,
+ `update_time` datetime not null default current_timestamp on update current_timestamp comment 'update time',
+ `job_key` varchar(191) not null comment 'The job key',
+ `state_type_id` tinyint not null comment 'The id of state type',
+ `state_value` longtext not null comment 'The real state',
+ primary key (`id`) using btree,
+ unique key `un_job_state_type_inx` (`job_key`,`state_type_id`) using btree
+) engine=innodb default charset=utf8mb4 collate=utf8mb4_general_ci;
+
diff --git a/flink-autoscaler-plugin-jdbc/src/test/resources/schema/postgres/postgres_schema.sql b/flink-autoscaler-plugin-jdbc/src/test/resources/schema/postgres/postgres_schema.sql
new file mode 100644
index 0000000000..1abe0b1af7
--- /dev/null
+++ b/flink-autoscaler-plugin-jdbc/src/test/resources/schema/postgres/postgres_schema.sql
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- CREATE DATABASE flink_autoscaler;
+-- \c flink_autoscaler;
+
+CREATE TABLE t_flink_autoscaler_state_store
+(
+ id BIGSERIAL NOT NULL,
+ update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ job_key TEXT NOT NULL,
+ state_type_id SMALLINT NOT NULL,
+ state_value TEXT NOT NULL,
+ PRIMARY KEY (id),
+ UNIQUE (job_key, state_type_id)
+);
+
+-- CREATE OR REPLACE FUNCTION update_update_time_column()
+-- RETURNS TRIGGER AS $$
+-- BEGIN
+-- NEW.update_time = CURRENT_TIMESTAMP;
+-- RETURN NEW;
+-- END;
+-- $$ language 'plpgsql';
+--
+-- CREATE TRIGGER update_t_flink_autoscaler_state_store_modtime
+-- BEFORE UPDATE ON t_flink_autoscaler_state_store
+-- FOR EACH ROW
+-- EXECUTE FUNCTION update_update_time_column();
+
diff --git a/pom.xml b/pom.xml
index e255c56a41..e55a1016d7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,6 +58,7 @@ under the License.
flink-kubernetes-docs
flink-autoscaler
flink-autoscaler-standalone
+ flink-autoscaler-plugin-jdbc
examples/flink-sql-runner-example
examples/flink-beam-example
examples/kubernetes-client-examples