Skip to content

Commit

Permalink
[FLINK-34906] Only scale when all tasks are running
Browse files Browse the repository at this point in the history
  • Loading branch information
1996fanrui committed Mar 21, 2024
1 parent b584b08 commit 70d3c98
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ private void runScalingLogic(Context ctx, AutoscalerFlinkMetrics autoscalerMetri

var collectedMetrics = metricsCollector.updateMetrics(ctx, stateStore);
var jobTopology = collectedMetrics.getJobTopology();
if (!jobTopology.isRunning()) {
return;
}

var now = clock.instant();
var scalingTracking = getTrimmedScalingTracking(stateStore, ctx, now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ public CollectedMetricHistory updateMetrics(
metricHistory.clear();
}
var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
if (!topology.isRunning()) {
return new CollectedMetricHistory(topology, Collections.emptySortedMap(), jobRunningTs);
}

var stableTime = jobRunningTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
final boolean isStabilizing = now.isBefore(stableTime);

Expand Down Expand Up @@ -223,19 +227,23 @@ protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
var json = rawPlan.substring("RawJson{json='".length(), rawPlan.length() - "'}".length());

var metrics = new HashMap<JobVertexID, IOMetrics>();
var finished = new HashSet<JobVertexID>();
var finishedTasks = new HashMap<JobVertexID, Integer>();
var runningTasks = new HashMap<JobVertexID, Integer>();
jobDetailsInfo
.getJobVertexInfos()
.forEach(
d -> {
if (d.getExecutionState() == ExecutionState.FINISHED) {
finished.add(d.getJobVertexID());
}
var tasksPerState = d.getTasksPerState();
finishedTasks.put(
d.getJobVertexID(), tasksPerState.get(ExecutionState.FINISHED));
runningTasks.put(
d.getJobVertexID(), tasksPerState.get(ExecutionState.RUNNING));
metrics.put(
d.getJobVertexID(), IOMetrics.from(d.getJobVertexMetrics()));
});

return JobTopology.fromJsonPlan(json, maxParallelismMap, metrics, finished);
return JobTopology.fromJsonPlan(
json, maxParallelismMap, finishedTasks, runningTasks, metrics);
}

private void updateKafkaSourceMaxParallelisms(Context ctx, JobID jobId, JobTopology topology)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public class JobTopology {

private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Whether all tasks are running, it's different with JobStatus.RUNNING. The JobStatus will be
* * RUNNING once job starts schedule, so it doesn't mean all tasks are running. Especially, when
* * the resource isn't enough or job recovers from large state.
*/
@Getter private final boolean isRunning;

@Getter private final Map<JobVertexID, VertexInfo> vertexInfos;
@Getter private final Set<JobVertexID> finishedVertices;
@Getter private final List<JobVertexID> verticesInTopologicalOrder;
Expand All @@ -60,6 +67,7 @@ public JobTopology(VertexInfo... vertexInfo) {
}

public JobTopology(Set<VertexInfo> vertexInfo) {
this.isRunning = jobIsRunning(vertexInfo);

Map<JobVertexID, Map<JobVertexID, ShipStrategy>> vertexOutputs = new HashMap<>();
vertexInfos =
Expand Down Expand Up @@ -101,6 +109,13 @@ public void updateMaxParallelism(JobVertexID vertexID, int maxParallelism) {
get(vertexID).updateMaxParallelism(maxParallelism);
}

private boolean jobIsRunning(Set<VertexInfo> vertexInfos) {
// All vertices are running or finished, and at least one vertex is running.
return vertexInfos.stream()
.allMatch(vertexInfo -> vertexInfo.isRunning() || vertexInfo.isFinished())
&& vertexInfos.stream().anyMatch(VertexInfo::isRunning);
}

private List<JobVertexID> returnVerticesInTopologicalOrder() {
List<JobVertexID> sorted = new ArrayList<>(vertexInfos.size());

Expand Down Expand Up @@ -135,8 +150,9 @@ private List<JobVertexID> returnVerticesInTopologicalOrder() {
public static JobTopology fromJsonPlan(
String jsonPlan,
Map<JobVertexID, Integer> maxParallelismMap,
Map<JobVertexID, IOMetrics> metrics,
Set<JobVertexID> finishedVertices)
Map<JobVertexID, Integer> finishedTasks,
Map<JobVertexID, Integer> runningTasks,
Map<JobVertexID, IOMetrics> metrics)
throws JsonProcessingException {
ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class);
ArrayNode nodes = (ArrayNode) plan.get("nodes");
Expand All @@ -147,15 +163,18 @@ public static JobTopology fromJsonPlan(
var vertexId = JobVertexID.fromHexString(node.get("id").asText());
var inputs = new HashMap<JobVertexID, ShipStrategy>();
var ioMetrics = metrics.get(vertexId);
var finished = finishedVertices.contains(vertexId);
var parallelism = node.get("parallelism").asInt();
vertexInfo.add(
new VertexInfo(
vertexId,
inputs,
node.get("parallelism").asInt(),
parallelism,
maxParallelismMap.get(vertexId),
finished,
finished ? IOMetrics.FINISHED_METRICS : ioMetrics));
finishedTasks.get(vertexId),
runningTasks.get(vertexId),
parallelism == finishedTasks.get(vertexId)
? IOMetrics.FINISHED_METRICS
: ioMetrics));
if (node.has("inputs")) {
for (JsonNode input : node.get("inputs")) {
inputs.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ public class VertexInfo {

private final int originalMaxParallelism;

private final boolean finished;
private final int finishedTasks;

private final int runningTasks;

private IOMetrics ioMetrics;

Expand All @@ -51,25 +53,45 @@ public VertexInfo(
Map<JobVertexID, ShipStrategy> inputs,
int parallelism,
int maxParallelism,
boolean finished,
int finishedTasks,
int runningTasks,
IOMetrics ioMetrics) {
this.id = id;
this.inputs = inputs;
this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
this.originalMaxParallelism = maxParallelism;
this.finished = finished;
this.finishedTasks = finishedTasks;
this.runningTasks = runningTasks;
this.ioMetrics = ioMetrics;
}

@VisibleForTesting
public VertexInfo(
JobVertexID id,
Map<JobVertexID, ShipStrategy> inputs,
int parallelism,
int maxParallelism,
boolean finished,
IOMetrics ioMetrics) {
this(
id,
inputs,
parallelism,
maxParallelism,
finished ? parallelism : 0,
finished ? 0 : parallelism,
ioMetrics);
}

@VisibleForTesting
public VertexInfo(
JobVertexID id,
Map<JobVertexID, ShipStrategy> inputs,
int parallelism,
int maxParallelism,
IOMetrics ioMetrics) {
this(id, inputs, parallelism, maxParallelism, false, ioMetrics);
this(id, inputs, parallelism, maxParallelism, 0, parallelism, ioMetrics);
}

@VisibleForTesting
Expand All @@ -84,4 +106,12 @@ public VertexInfo(
public void updateMaxParallelism(int maxParallelism) {
setMaxParallelism(Math.min(originalMaxParallelism, maxParallelism));
}

public boolean isFinished() {
return finishedTasks == parallelism;
}

public boolean isRunning() {
return runningTasks > 0 && runningTasks + finishedTasks == parallelism;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ public void testJobTopologyParsingFromJobDetails() throws Exception {
+ " \"CANCELING\": 0,\n"
+ " \"CANCELED\": 0,\n"
+ " \"RECONCILING\": 0,\n"
+ " \"RUNNING\": 2,\n"
+ " \"RUNNING\": 0,\n"
+ " \"FAILED\": 0,\n"
+ " \"CREATED\": 0,\n"
+ " \"FINISHED\": 0\n"
+ " \"FINISHED\": 2\n"
+ " },\n"
+ " \"metrics\": {\n"
+ " \"read-bytes\": 0,\n"
Expand Down Expand Up @@ -305,7 +305,7 @@ protected Map<String, FlinkMetric> getFilteredVertexMetricNames(
// Mark source finished, should not be queried again
t2 =
new JobTopology(
new VertexInfo(source2, Map.of(), 1, 1, true, null),
new VertexInfo(source2, Map.of(), 1, 1, 1, 0, null),
new VertexInfo(sink, Map.of(source2, REBALANCE), 1, 1));
collector.queryFilteredMetricNames(context, t2);
assertEquals(3, metricNameQueryCounter.get(source));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -68,18 +67,22 @@ public void testTopologyFromJson() throws JsonProcessingException {

var vertices = new HashMap<String, JobVertexID>();
var maxParallelism = new HashMap<JobVertexID, Integer>();
var finishedTasks = new HashMap<JobVertexID, Integer>();
var runningTasks = new HashMap<JobVertexID, Integer>();
for (JobVertex vertex : jobGraph.getVertices()) {
vertices.put(vertex.getName(), vertex.getID());
maxParallelism.put(
vertex.getID(),
vertex.getMaxParallelism() != -1
? vertex.getMaxParallelism()
: SchedulerBase.getDefaultMaxParallelism(vertex));
finishedTasks.put(vertex.getID(), 0);
runningTasks.put(vertex.getID(), vertex.getParallelism());
}

JobTopology jobTopology =
JobTopology.fromJsonPlan(
jsonPlan, maxParallelism, Map.of(), Collections.emptySet());
jsonPlan, maxParallelism, finishedTasks, runningTasks, Map.of());

assertTrue(jobTopology.get(vertices.get("Sink: sink1")).getOutputs().isEmpty());
assertTrue(jobTopology.get(vertices.get("Sink: sink2")).getOutputs().isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ void testMemoryTuning() {

JobTopology jobTopology =
new JobTopology(
new VertexInfo(jobVertex1, Map.of(), 50, 1000, false, null),
new VertexInfo(jobVertex1, Map.of(), 50, 1000, 0, 50, null),
new VertexInfo(
jobVertex2, Map.of(jobVertex1, REBALANCE), 50, 1000, false, null));
jobVertex2, Map.of(jobVertex1, REBALANCE), 50, 1000, 0, 50, null));

Map<JobVertexID, ScalingSummary> scalingSummaries =
Map.of(
Expand Down

0 comments on commit 70d3c98

Please sign in to comment.