diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java index cc56eb3e69..e651811d30 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java @@ -177,6 +177,9 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore); var jobTopology = collectedMetrics.getJobTopology(); + if (!jobTopology.isRunning()) { + return; + } var now = clock.instant(); var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now); diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java index 0a3fef3327..114ff92c95 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java @@ -117,6 +117,10 @@ public CollectedMetricHistory updateMetrics( metricHistory.clear(); } var topology = getJobTopology(ctx, stateStore, jobDetailsInfo); + if (!topology.isRunning()) { + return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs); + } + var stableTime = jobRunningTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL)); final boolean isStabilizing = now.isBefore(stableTime); @@ -223,19 +227,23 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) { var json = rawPlan.substring("RawJson{json='".length(), rawPlan.length() - "'}".length()); var metrics = new HashMap(); - var finished = new HashSet(); + var finishedTasks = new HashMap(); + var runningTasks = new HashMap(); jobDetailsInfo .getJobVertexInfos() .forEach( d -> { - if (d.getExecutionState() == ExecutionState.FINISHED) { - finished.add(d.getJobVertexID()); - } + var tasksPerState = d.getTasksPerState(); + finishedTasks.put( + d.getJobVertexID(), tasksPerState.get(ExecutionState.FINISHED)); + runningTasks.put( + d.getJobVertexID(), tasksPerState.get(ExecutionState.RUNNING)); metrics.put( d.getJobVertexID(), IOMetrics.from(d.getJobVertexMetrics())); }); - return JobTopology.fromJsonPlan(json, maxParallelismMap, metrics, finished); + return JobTopology.fromJsonPlan( + json, maxParallelismMap, finishedTasks, runningTasks, metrics); } private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java index 8945882cb1..eb2e181960 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java @@ -47,6 +47,13 @@ public class JobTopology { private static final ObjectMapper objectMapper = new ObjectMapper(); + /** + * Whether all tasks are running, it's different with JobStatus.RUNNING. The JobStatus will be + * * RUNNING once job starts schedule, so it doesn't mean all tasks are running. Especially, when + * * the resource isn't enough or job recovers from large state. + */ + @Getter private final boolean isRunning; + @Getter private final Map vertexInfos; @Getter private final Set finishedVertices; @Getter private final List verticesInTopologicalOrder; @@ -60,6 +67,7 @@ public JobTopology(VertexInfo... vertexInfo) { } public JobTopology(Set vertexInfo) { + this.isRunning = jobIsRunning(vertexInfo); Map> vertexOutputs = new HashMap<>(); vertexInfos = @@ -101,6 +109,13 @@ public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) { get(vertexID).updateMaxParallelism(maxParallelism); } + private boolean jobIsRunning(Set vertexInfos) { + // All vertices are running or finished, and at least one vertex is running. + return vertexInfos.stream() + .allMatch(vertexInfo -> vertexInfo.isRunning() || vertexInfo.isFinished()) + && vertexInfos.stream().anyMatch(VertexInfo::isRunning); + } + private List returnVerticesInTopologicalOrder() { List sorted = new ArrayList<>(vertexInfos.size()); @@ -135,8 +150,9 @@ private List returnVerticesInTopologicalOrder() { public static JobTopology fromJsonPlan( String jsonPlan, Map maxParallelismMap, - Map metrics, - Set finishedVertices) + Map finishedTasks, + Map runningTasks, + Map metrics) throws JsonProcessingException { ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class); ArrayNode nodes = (ArrayNode) plan.get("nodes"); @@ -147,15 +163,18 @@ public static JobTopology fromJsonPlan( var vertexId = JobVertexID.fromHexString(node.get("id").asText()); var inputs = new HashMap(); var ioMetrics = metrics.get(vertexId); - var finished = finishedVertices.contains(vertexId); + var parallelism = node.get("parallelism").asInt(); vertexInfo.add( new VertexInfo( vertexId, inputs, - node.get("parallelism").asInt(), + parallelism, maxParallelismMap.get(vertexId), - finished, - finished ? IOMetrics.FINISHED_METRICS : ioMetrics)); + finishedTasks.get(vertexId), + runningTasks.get(vertexId), + parallelism == finishedTasks.get(vertexId) + ? IOMetrics.FINISHED_METRICS + : ioMetrics)); if (node.has("inputs")) { for (JsonNode input : node.get("inputs")) { inputs.put( diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java index 2428cdebbe..9ffff2d51d 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java @@ -42,7 +42,9 @@ public class VertexInfo { private final int originalMaxParallelism; - private final boolean finished; + private final int finishedTasks; + + private final int runningTasks; private IOMetrics ioMetrics; @@ -51,17 +53,37 @@ public VertexInfo( Map inputs, int parallelism, int maxParallelism, - boolean finished, + int finishedTasks, + int runningTasks, IOMetrics ioMetrics) { this.id = id; this.inputs = inputs; this.parallelism = parallelism; this.maxParallelism = maxParallelism; this.originalMaxParallelism = maxParallelism; - this.finished = finished; + this.finishedTasks = finishedTasks; + this.runningTasks = runningTasks; this.ioMetrics = ioMetrics; } + @VisibleForTesting + public VertexInfo( + JobVertexID id, + Map inputs, + int parallelism, + int maxParallelism, + boolean finished, + IOMetrics ioMetrics) { + this( + id, + inputs, + parallelism, + maxParallelism, + finished ? parallelism : 0, + finished ? 0 : parallelism, + ioMetrics); + } + @VisibleForTesting public VertexInfo( JobVertexID id, @@ -69,7 +91,7 @@ public VertexInfo( int parallelism, int maxParallelism, IOMetrics ioMetrics) { - this(id, inputs, parallelism, maxParallelism, false, ioMetrics); + this(id, inputs, parallelism, maxParallelism, 0, parallelism, ioMetrics); } @VisibleForTesting @@ -84,4 +106,12 @@ public VertexInfo( public void updateMaxParallelism(int maxParallelism) { setMaxParallelism(Math.min(originalMaxParallelism, maxParallelism)); } + + public boolean isFinished() { + return finishedTasks == parallelism; + } + + public boolean isRunning() { + return runningTasks > 0 && runningTasks + finishedTasks == parallelism; + } } diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java index 7bc1190c84..1221233868 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java @@ -113,10 +113,10 @@ public void testJobTopologyParsingFromJobDetails() throws Exception { + " \"CANCELING\": 0,\n" + " \"CANCELED\": 0,\n" + " \"RECONCILING\": 0,\n" - + " \"RUNNING\": 2,\n" + + " \"RUNNING\": 0,\n" + " \"FAILED\": 0,\n" + " \"CREATED\": 0,\n" - + " \"FINISHED\": 0\n" + + " \"FINISHED\": 2\n" + " },\n" + " \"metrics\": {\n" + " \"read-bytes\": 0,\n" @@ -305,7 +305,7 @@ protected Map getFilteredVertexMetricNames( // Mark source finished, should not be queried again t2 = new JobTopology( - new VertexInfo(source2, Map.of(), 1, 1, true, null), + new VertexInfo(source2, Map.of(), 1, 1, 1, 0, null), new VertexInfo(sink, Map.of(source2, REBALANCE), 1, 1)); collector.queryFilteredMetricNames(context, t2); assertEquals(3, metricNameQueryCounter.get(source)); diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java index f857361a8f..807aa3caaf 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java @@ -27,7 +27,6 @@ import org.junit.jupiter.api.Test; -import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -68,6 +67,8 @@ public void testTopologyFromJson() throws JsonProcessingException { var vertices = new HashMap(); var maxParallelism = new HashMap(); + var finishedTasks = new HashMap(); + var runningTasks = new HashMap(); for (JobVertex vertex : jobGraph.getVertices()) { vertices.put(vertex.getName(), vertex.getID()); maxParallelism.put( @@ -75,11 +76,13 @@ public void testTopologyFromJson() throws JsonProcessingException { vertex.getMaxParallelism() != -1 ? vertex.getMaxParallelism() : SchedulerBase.getDefaultMaxParallelism(vertex)); + finishedTasks.put(vertex.getID(), 0); + runningTasks.put(vertex.getID(), vertex.getParallelism()); } JobTopology jobTopology = JobTopology.fromJsonPlan( - jsonPlan, maxParallelism, Map.of(), Collections.emptySet()); + jsonPlan, maxParallelism, finishedTasks, runningTasks, Map.of()); assertTrue(jobTopology.get(vertices.get("Sink: sink1")).getOutputs().isEmpty()); assertTrue(jobTopology.get(vertices.get("Sink: sink2")).getOutputs().isEmpty());