Skip to content

Commit

Permalink
[FLINK-35105][autoscaler] Support setting default Autoscaler options …
Browse files Browse the repository at this point in the history
…at autoscaler standalone level
  • Loading branch information
1996fanrui committed Apr 15, 2024
1 parent 8125456 commit ea6915d
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.configuration.Configuration;

import java.util.Collection;

/** The JobListFetcher will fetch the jobContext of all jobs. */
@Experimental
public interface JobListFetcher<KEY, Context extends JobAutoScalerContext<KEY>> {

Collection<Context> fetch() throws Exception;
/**
* Fetch the job context.
*
* @param baseConf The basic configuration for standalone autoscaler. The basic configuration
* can be overridden by the configuration at job-level.
*/
Collection<Context> fetch(Configuration baseConf) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class StandaloneAutoscalerExecutor<KEY, Context extends JobAutoScalerCont
private final JobAutoScaler<KEY, Context> autoScaler;
private final ScheduledExecutorService scheduledExecutorService;
private final ExecutorService scalingThreadPool;
private final UnmodifiableConfiguration baseConf;

/**
* Maintain a set of job keys that during scaling, it should be updated at {@link
Expand Down Expand Up @@ -88,6 +90,7 @@ public StandaloneAutoscalerExecutor(
Executors.newFixedThreadPool(
parallelism, new ExecutorThreadFactory("autoscaler-standalone-scaling"));
this.scalingJobKeys = new HashSet<>();
this.baseConf = new UnmodifiableConfiguration(conf);
}

public void start() {
Expand All @@ -107,7 +110,7 @@ protected void scaling() {
LOG.info("Standalone autoscaler starts scaling.");
Collection<Context> jobList;
try {
jobList = jobListFetcher.fetch();
jobList = jobListFetcher.fetch(baseConf);
} catch (Throwable e) {
LOG.error("Error while fetch job list.", e);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public FlinkClusterJobListFetcher(
}

@Override
public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
public Collection<JobAutoScalerContext<JobID>> fetch(Configuration baseConf) throws Exception {
try (var restClusterClient = restClientGetter.apply(new Configuration())) {
return restClusterClient
.listJobs()
Expand All @@ -60,7 +60,8 @@ public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
.map(
jobStatusMessage -> {
try {
return generateJobContext(restClusterClient, jobStatusMessage);
return generateJobContext(
baseConf, restClusterClient, jobStatusMessage);
} catch (Throwable e) {
throw new RuntimeException(
"generateJobContext throw exception", e);
Expand All @@ -71,10 +72,12 @@ public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
}

private JobAutoScalerContext<JobID> generateJobContext(
RestClusterClient<String> restClusterClient, JobStatusMessage jobStatusMessage)
Configuration baseConf,
RestClusterClient<String> restClusterClient,
JobStatusMessage jobStatusMessage)
throws Exception {
var jobId = jobStatusMessage.getJobId();
var conf = getConfiguration(restClusterClient, jobId);
var conf = getConfiguration(baseConf, restClusterClient, jobId);

return new JobAutoScalerContext<>(
jobId,
Expand All @@ -85,7 +88,8 @@ private JobAutoScalerContext<JobID> generateJobContext(
() -> restClientGetter.apply(conf));
}

private Configuration getConfiguration(RestClusterClient<String> restClusterClient, JobID jobId)
private Configuration getConfiguration(
Configuration baseConf, RestClusterClient<String> restClusterClient, JobID jobId)
throws Exception {
var jobParameters = new JobMessageParameters();
jobParameters.jobPathParameter.resolve(jobId);
Expand All @@ -98,7 +102,7 @@ private Configuration getConfiguration(RestClusterClient<String> restClusterClie
EmptyRequestBody.getInstance())
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);

var conf = new Configuration();
var conf = new Configuration(baseConf);
configurationInfo.forEach(entry -> conf.setString(entry.getKey(), entry.getValue()));
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void cleanup(JobID jobKey) {

try (var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
conf, () -> jobList, eventCollector, jobAutoScaler) {
conf, baseConf -> jobList, eventCollector, jobAutoScaler) {
@Override
protected void scalingSingleJob(JobAutoScalerContext<JobID> jobContext) {
super.scalingSingleJob(jobContext);
Expand Down Expand Up @@ -112,7 +112,7 @@ void testFetchException() {
try (var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
new Configuration(),
() -> {
baseConf -> {
throw new RuntimeException("Excepted exception.");
},
eventCollector,
Expand Down Expand Up @@ -149,7 +149,7 @@ void testScalingParallelism() {
try (var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
conf,
() -> jobList,
baseConf -> jobList,
new TestingEventCollector<>(),
new JobAutoScaler<>() {
@Override
Expand Down Expand Up @@ -197,7 +197,7 @@ void testOneJobScalingSlow() throws Exception {
try (var autoscalerExecutor =
new StandaloneAutoscalerExecutor<>(
conf,
jobContextWithIndex::keySet,
baseConf -> jobContextWithIndex.keySet(),
new TestingEventCollector<>(),
new JobAutoScaler<>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,41 @@ void testFetchJobListAndConfigurationInfo() throws Exception {
closeCounter),
Duration.ofSeconds(10));

// Test for empty base conf
assertFetcherResult(
new Configuration(), jobs, configurations, closeCounter, jobListFetcher);

// Add the new option key, and old option key with different value.
var baseConf = new Configuration();
baseConf.setString("option_key4", "option_value4");
baseConf.setString("option_key3", "option_value5");

closeCounter.set(0);
// Test for mixed base conf
assertFetcherResult(
new Configuration(), jobs, configurations, closeCounter, jobListFetcher);
}

private void assertFetcherResult(
Configuration baseConf,
Map<JobID, JobStatusMessage> jobs,
Map<JobID, Configuration> configurations,
AtomicLong closeCounter,
FlinkClusterJobListFetcher jobListFetcher)
throws Exception {
// Fetch multiple times and check whether the results are as expected each time
for (int i = 1; i <= 3; i++) {
var fetchedJobList = jobListFetcher.fetch();
var fetchedJobList = jobListFetcher.fetch(baseConf);
// Check whether rest client is closed.
assertThat(closeCounter).hasValue(i);

assertThat(fetchedJobList).hasSize(2);
for (var jobContext : fetchedJobList) {
JobStatusMessage expectedJobStatusMessage = jobs.get(jobContext.getJobID());
Configuration expectedConf = configurations.get(jobContext.getJobID());

var expectedConf = new Configuration(baseConf);
expectedConf.addAll(configurations.get(jobContext.getJobID()));

assertThat(expectedJobStatusMessage).isNotNull();
assertThat(jobContext.getJobStatus())
.isEqualTo(expectedJobStatusMessage.getJobState());
Expand All @@ -120,7 +145,9 @@ void testFetchJobListException() {
Either.Left(Map.of()),
closeCounter),
Duration.ofSeconds(10));
assertThatThrownBy(jobListFetcher::fetch).getCause().isEqualTo(expectedException);
assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration()))
.getCause()
.isEqualTo(expectedException);
assertThat(closeCounter).hasValue(1);
}

Expand All @@ -144,7 +171,9 @@ void testFetchConfigurationException() {
closeCounter),
Duration.ofSeconds(10));

assertThatThrownBy(jobListFetcher::fetch).getRootCause().isEqualTo(expectedException);
assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration()))
.getRootCause()
.isEqualTo(expectedException);
assertThat(closeCounter).hasValue(1);
}

Expand All @@ -161,7 +190,8 @@ void testFetchJobListTimeout() {
Duration.ofSeconds(2));

assertThat(closeFuture).isNotDone();
assertThatThrownBy(jobListFetcher::fetch).isInstanceOf(TimeoutException.class);
assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration()))
.isInstanceOf(TimeoutException.class);
assertThat(closeFuture).isDone();
}

Expand All @@ -182,7 +212,7 @@ void testFetchConfigurationTimeout() {
Duration.ofSeconds(2));

assertThat(closeFuture).isNotDone();
assertThatThrownBy(jobListFetcher::fetch)
assertThatThrownBy(() -> jobListFetcher.fetch(new Configuration()))
.getRootCause()
.isInstanceOf(TimeoutException.class);
assertThat(closeFuture).isDone();
Expand Down

0 comments on commit ea6915d

Please sign in to comment.