Skip to content

Commit

Permalink
[FLINK-33450][autoscaler] StandaloneAutoscalerEntrypoint supports the…
Browse files Browse the repository at this point in the history
… JDBCAutoScalerStateStore
  • Loading branch information
1996fanrui committed Dec 29, 2023
1 parent cb58829 commit 0a21e05
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 6 deletions.
6 changes: 6 additions & 0 deletions flink-autoscaler-standalone/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-autoscaler-plugin-jdbc</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.standalone;

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.jdbc.state.JDBCAutoScalerStateStore;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;

import java.sql.DriverManager;

import static org.apache.flink.util.Preconditions.checkNotNull;

public class AutoscalerStateStoreFactory {

private static final String STATE_STORE_TYPE = "state-store.type";
private static final String IN_MEMORY_STATE_STORE = "memory";

private static final String JDBC_STATE_STORE = "jdbc";
private static final String JDBC_STATE_STORE_URL = "state-store.jdbc.url";
private static final String JDBC_STATE_STORE_USER = "state-store.jdbc.username";
private static final String JDBC_STATE_STORE_PASSWORD = "state-store.jdbc.password";

public static <KEY, Context extends JobAutoScalerContext<KEY>>
AutoScalerStateStore<KEY, Context> create(ParameterTool parameters) throws Exception {
var stateStoreType = parameters.get(STATE_STORE_TYPE, IN_MEMORY_STATE_STORE).toLowerCase();
switch (stateStoreType) {
case IN_MEMORY_STATE_STORE:
return new InMemoryAutoScalerStateStore<>();
case JDBC_STATE_STORE:
return createJDBCStateStore(parameters);
default:
throw new IllegalArgumentException(
String.format(
"Unknown state store type : %s. Optional state store types are: %s and %s.",
stateStoreType, IN_MEMORY_STATE_STORE, JDBC_STATE_STORE));
}
}

private static <KEY, Context extends JobAutoScalerContext<KEY>>
AutoScalerStateStore<KEY, Context> createJDBCStateStore(ParameterTool parameters)
throws Exception {
var jdbcUrl = parameters.get(JDBC_STATE_STORE_URL);
checkNotNull(jdbcUrl, "%s is required for jdbc state store.", JDBC_STATE_STORE_URL);
var user = parameters.get(JDBC_STATE_STORE_USER);
var password = parameters.get(JDBC_STATE_STORE_PASSWORD);

var conn = DriverManager.getConnection(jdbcUrl, user, password);
return new JDBCAutoScalerStateStore<>(conn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.autoscaler.standalone.flinkcluster.FlinkClusterJobListFetcher;
import org.apache.flink.autoscaler.standalone.realizer.RescaleApiScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.util.TimeUtils;
Expand Down Expand Up @@ -63,7 +62,8 @@ public class StandaloneAutoscalerEntrypoint {
public static final String FLINK_CLUSTER_PORT = "flinkClusterPort";
private static final int DEFAULT_FLINK_CLUSTER_PORT = 8081;

public static <KEY, Context extends JobAutoScalerContext<KEY>> void main(String[] args) {
public static <KEY, Context extends JobAutoScalerContext<KEY>> void main(String[] args)
throws Exception {
var parameters = ParameterTool.fromArgs(args);
LOG.info("The standalone autoscaler is started, parameters: {}", parameters.toMap());

Expand All @@ -79,10 +79,12 @@ public static <KEY, Context extends JobAutoScalerContext<KEY>> void main(String[

// Initialize JobListFetcher and JobAutoScaler.
var eventHandler = new LoggingEventHandler<KEY, Context>();
AutoScalerStateStore<KEY, Context> stateStore =
AutoscalerStateStoreFactory.create(parameters);
var autoScaler = createJobAutoscaler(eventHandler, stateStore);

JobListFetcher<KEY, Context> jobListFetcher =
createJobListFetcher(parameters, restClientTimeout);
var autoScaler = createJobAutoscaler(eventHandler);

var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
scalingInterval, jobListFetcher, eventHandler, autoScaler);
Expand All @@ -109,8 +111,8 @@ JobListFetcher<KEY, Context> createJobListFetcher(

private static <KEY, Context extends JobAutoScalerContext<KEY>>
JobAutoScaler<KEY, Context> createJobAutoscaler(
AutoScalerEventHandler<KEY, Context> eventHandler) {
AutoScalerStateStore<KEY, Context> stateStore = new InMemoryAutoScalerStateStore<>();
AutoScalerEventHandler<KEY, Context> eventHandler,
AutoScalerStateStore<KEY, Context> stateStore) {
return new JobAutoScalerImpl<>(
new RestApiMetricsCollector<>(),
new ScalingMetricEvaluator(),
Expand Down

0 comments on commit 0a21e05

Please sign in to comment.