Skip to content

Commit

Permalink
Only scale when all tasks are running for flink manager fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Mar 28, 2024
1 parent 2bb19a0 commit f010cd7
Showing 1 changed file with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.standalone.JobListFetcher;
import org.apache.flink.autoscaler.standalone.StandaloneAutoscalerExecutor;
import org.apache.flink.autoscaler.utils.JobStatusUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
Expand All @@ -30,6 +31,7 @@
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.shopee.di.fm.common.dto.InstanceDTO;
Expand Down Expand Up @@ -121,7 +123,11 @@ private List<JobAutoScalerContext<Long>> fetchJobContextForCurrentInstance(
try (var restClusterClient = getRestClient(new Configuration(), restServerAddress)) {
final Collection<JobStatusMessage> jobStatusMessages =
restClusterClient
.listJobs()
.sendRequest(
JobsOverviewHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance())
.thenApply(JobStatusUtils::toJobStatusMessage)
.get(restClientTimeout.toSeconds(), TimeUnit.SECONDS);
if (jobStatusMessages.size() > 1) {
LOG.warn(
Expand Down

0 comments on commit f010cd7

Please sign in to comment.