Skip to content

Commit

Permalink
[FLINK-34906][autoscaler] Only scale when all tasks are running
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Mar 22, 2024
1 parent b584b08 commit 6a94c25
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.standalone.JobListFetcher;
import org.apache.flink.autoscaler.standalone.utils.JobStatusUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.job.JobManagerJobConfigurationHeaders;
import org.apache.flink.util.function.FunctionWithException;

Expand Down Expand Up @@ -54,7 +57,11 @@ public FlinkClusterJobListFetcher(
public Collection<JobAutoScalerContext<JobID>> fetch() throws Exception {
try (var restClusterClient = restClientGetter.apply(new Configuration())) {
return restClusterClient
.listJobs()
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.thenApply(JobStatusUtils::toJobStatusMessage)
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS)
.stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.autoscaler.standalone.utils;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.stream.Collectors;

/** Job status related utilities. */
public class JobStatusUtils {

private static final Logger LOG = LoggerFactory.getLogger(JobStatusUtils.class);

public static List<JobStatusMessage> toJobStatusMessage(
MultipleJobsDetails multipleJobsDetails) {
return multipleJobsDetails.getJobs().stream()
.map(
details ->
new JobStatusMessage(
details.getJobId(),
details.getJobName(),
getEffectiveStatus(details),
details.getStartTime()))
.collect(Collectors.toList());
}

@VisibleForTesting
static JobStatus getEffectiveStatus(JobDetails details) {
int numRunning = details.getTasksPerState()[ExecutionState.RUNNING.ordinal()];
int numFinished = details.getTasksPerState()[ExecutionState.FINISHED.ordinal()];
boolean allRunningOrFinished = details.getNumTasks() == (numRunning + numFinished);
JobStatus effectiveStatus = details.getStatus();
if (JobStatus.RUNNING.equals(effectiveStatus) && !allRunningOrFinished) {
LOG.debug("Adjusting job state from {} to {}", JobStatus.RUNNING, JobStatus.CREATED);
return JobStatus.CREATED;
}
return effectiveStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.ConfigurationInfo;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
Expand All @@ -39,12 +43,12 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -56,12 +60,9 @@ class FlinkClusterJobListFetcherTest {
/** Test whether the job list and confs are expected. */
@Test
void testFetchJobListAndConfigurationInfo() throws Exception {
var job1 =
new JobStatusMessage(
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
var job2 =
new JobStatusMessage(
new JobID(), "", JobStatus.CANCELLING, Instant.now().toEpochMilli());
var job1 = new JobID();
var job2 = new JobID();
var jobs = Map.of(job1, JobStatus.RUNNING, job2, JobStatus.CANCELLING);

Configuration expectedConf1 = new Configuration();
expectedConf1.setString("option_key1", "option_value1");
Expand All @@ -70,18 +71,17 @@ void testFetchJobListAndConfigurationInfo() throws Exception {
expectedConf2.setString("option_key2", "option_value2");
expectedConf2.setString("option_key3", "option_value3");

var jobs = Map.of(job1.getJobId(), job1, job2.getJobId(), job2);
var configurations = Map.of(job1.getJobId(), expectedConf1, job2.getJobId(), expectedConf2);
var configurations = Map.of(job1, expectedConf1, job2, expectedConf2);
var closeCounter = new AtomicLong();
FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getRestClusterClient(
Either.Left(List.of(job1, job2)),
Either.Left(jobs),
Either.Left(
Map.of(
job1.getJobId(),
job1,
ConfigurationInfo.from(expectedConf1),
job2.getJobId(),
job2,
ConfigurationInfo.from(expectedConf2))),
closeCounter),
Duration.ofSeconds(10));
Expand All @@ -94,11 +94,9 @@ void testFetchJobListAndConfigurationInfo() throws Exception {

assertThat(fetchedJobList).hasSize(2);
for (var jobContext : fetchedJobList) {
JobStatusMessage expectedJobStatusMessage = jobs.get(jobContext.getJobID());
var expectedJobState = jobs.get(jobContext.getJobID());
Configuration expectedConf = configurations.get(jobContext.getJobID());
assertThat(expectedJobStatusMessage).isNotNull();
assertThat(jobContext.getJobStatus())
.isEqualTo(expectedJobStatusMessage.getJobState());
assertThat(jobContext.getJobStatus()).isEqualTo(expectedJobState);
assertThat(jobContext.getConfiguration()).isNotNull().isEqualTo(expectedConf);
}
}
Expand Down Expand Up @@ -130,16 +128,13 @@ void testFetchJobListException() {
*/
@Test
void testFetchConfigurationException() {
var job1 =
new JobStatusMessage(
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
var expectedException = new RuntimeException("Expected exception.");
var closeCounter = new AtomicLong();

FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getRestClusterClient(
Either.Left(List.of(job1)),
Either.Left(Map.of(new JobID(), JobStatus.RUNNING)),
Either.Right(expectedException),
closeCounter),
Duration.ofSeconds(10));
Expand Down Expand Up @@ -171,14 +166,12 @@ void testFetchJobListTimeout() {
*/
@Test
void testFetchConfigurationTimeout() {
var job1 =
new JobStatusMessage(
new JobID(), "", JobStatus.RUNNING, Instant.now().toEpochMilli());
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

FlinkClusterJobListFetcher jobListFetcher =
new FlinkClusterJobListFetcher(
getTimeoutableRestClusterClient(List.of(job1), null, closeFuture),
getTimeoutableRestClusterClient(
Map.of(new JobID(), JobStatus.RUNNING), null, closeFuture),
Duration.ofSeconds(2));

assertThat(closeFuture).isNotDone();
Expand All @@ -189,16 +182,16 @@ void testFetchConfigurationTimeout() {
}

/**
* @param jobListOrException When listJobs is called, return jobList if Either is left, return
* failedFuture if Either is right.
* @param jobsOrException When the jobs overview is called, return jobList if Either is left,
* return failedFuture if Either is right.
* @param configurationsOrException When fetch job conf, return configuration if Either is left,
* return failedFuture if Either is right.
* @param closeCounter Increment the count each time the {@link RestClusterClient#close} is
* called
*/
private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
getRestClusterClient(
Either<Collection<JobStatusMessage>, Throwable> jobListOrException,
Either<Map<JobID, JobStatus>, Throwable> jobsOrException,
Either<Map<JobID, ConfigurationInfo>, Throwable> configurationsOrException,
AtomicLong closeCounter) {
return conf ->
Expand All @@ -207,14 +200,6 @@ void testFetchConfigurationTimeout() {
"test-cluster",
(c, e) -> new StandaloneClientHAServices("localhost")) {

@Override
public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
if (jobListOrException.isLeft()) {
return CompletableFuture.completedFuture(jobListOrException.left());
}
return CompletableFuture.failedFuture(jobListOrException.right());
}

@Override
public <
M extends MessageHeaders<R, P, U>,
Expand All @@ -231,6 +216,22 @@ CompletableFuture<P> sendRequest(M h, U p, R r) {
return (CompletableFuture<P>)
CompletableFuture.completedFuture(
configurationsOrException.left().get(jobID));
} else if (h instanceof JobsOverviewHeaders) {
if (jobsOrException.isLeft()) {
return (CompletableFuture<P>)
CompletableFuture.completedFuture(
new MultipleJobsDetails(
jobsOrException.left().entrySet().stream()
.map(
entry ->
generateJobDetails(
entry
.getKey(),
entry
.getValue()))
.collect(Collectors.toList())));
}
return CompletableFuture.failedFuture(jobsOrException.right());
}
fail("Unknown request");
return null;
Expand All @@ -245,15 +246,15 @@ public void close() {
}

/**
* @param jobList When listJobs is called, return jobList if it's not null, don't complete
* @param jobs When the jobs overview is called, return jobList if it's not null, don't complete
* future if it's null.
* @param configuration When fetch job conf, return configuration if it's not null, don't
* complete future if it's null.
* @param closeFuture Complete this closeFuture when {@link RestClusterClient#close} is called.
*/
private static FunctionWithException<Configuration, RestClusterClient<String>, Exception>
getTimeoutableRestClusterClient(
@Nullable Collection<JobStatusMessage> jobList,
@Nullable Map<JobID, JobStatus> jobs,
@Nullable ConfigurationInfo configuration,
CompletableFuture<Void> closeFuture) {
return conf ->
Expand All @@ -262,14 +263,6 @@ public void close() {
"test-cluster",
(c, e) -> new StandaloneClientHAServices("localhost")) {

@Override
public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
if (jobList == null) {
return new CompletableFuture<>();
}
return CompletableFuture.completedFuture(jobList);
}

@Override
public <
M extends MessageHeaders<R, P, U>,
Expand All @@ -283,6 +276,21 @@ CompletableFuture<P> sendRequest(M h, U p, R r) {
}
return (CompletableFuture<P>)
CompletableFuture.completedFuture(configuration);
} else if (h instanceof JobsOverviewHeaders) {
if (jobs == null) {
return new CompletableFuture<>();
}
return (CompletableFuture<P>)
CompletableFuture.completedFuture(
new MultipleJobsDetails(
jobs.entrySet().stream()
.map(
entry ->
generateJobDetails(
entry.getKey(),
entry
.getValue()))
.collect(Collectors.toList())));
}
fail("Unknown request");
return null;
Expand All @@ -295,4 +303,25 @@ public void close() {
}
};
}

private static JobDetails generateJobDetails(JobID jobID, JobStatus jobStatus) {
int[] countPerState = new int[ExecutionState.values().length];
if (jobStatus == JobStatus.RUNNING) {
countPerState[ExecutionState.RUNNING.ordinal()] = 5;
countPerState[ExecutionState.FINISHED.ordinal()] = 2;
} else if (jobStatus == JobStatus.CANCELLING) {
countPerState[ExecutionState.CANCELING.ordinal()] = 7;
}
int numTasks = Arrays.stream(countPerState).sum();
return new JobDetails(
jobID,
"test-job",
System.currentTimeMillis(),
-1,
0,
jobStatus,
System.currentTimeMillis(),
countPerState,
numTasks);
}
}
Loading

0 comments on commit 6a94c25

Please sign in to comment.