Skip to content

Commit 49369fb

Browse files
committed
[FLINK-38748][runtime] Enhance the switching logic of AdaptiveScheduler from Restarting to CreatingExecutionGraph and accelerate the transition.
1 parent bb2b1db commit 49369fb

File tree

3 files changed

+31
-4
lines changed

3 files changed

+31
-4
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Restarting.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ void onGloballyTerminalState(JobStatus globallyTerminalState) {
114114
}
115115

116116
private void goToSubsequentState() {
117-
if (availableParallelismNotChanged(restartWithParallelism)) {
117+
if (availableParallelismNotChanged(restartWithParallelism)
118+
|| context.hasDesiredResources()) {
118119
context.goToCreatingExecutionGraph(getExecutionGraph());
119120
} else {
120121
context.goToWaitingForResources(getExecutionGraph());
@@ -163,6 +164,13 @@ interface Context
163164
* slots.
164165
*/
165166
Optional<VertexParallelism> getAvailableVertexParallelism();
167+
168+
/**
169+
* Checks whether we have the desired resources.
170+
*
171+
* @return {@code true} if we have enough resources; otherwise {@code false}
172+
*/
173+
boolean hasDesiredResources();
166174
}
167175

168176
static class Factory implements StateFactory<Restarting> {

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/MockRestartingContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class MockRestartingContext extends MockStateWithExecutionGraphContext
5151

5252
@Nullable private VertexParallelism availableVertexParallelism;
5353

54+
private boolean hasDesiredResources = false;
55+
5456
public void setExpectCancelling(Consumer<ExecutingTest.CancellingArguments> asserter) {
5557
cancellingStateValidator.expectInput(asserter);
5658
}
@@ -68,6 +70,15 @@ public void setAvailableVertexParallelism(
6870
this.availableVertexParallelism = availableVertexParallelism;
6971
}
7072

73+
public void setHasDesiredResources(boolean hasDesiredResources) {
74+
this.hasDesiredResources = hasDesiredResources;
75+
}
76+
77+
@Override
78+
public boolean hasDesiredResources() {
79+
return hasDesiredResources;
80+
}
81+
7182
@Override
7283
public void goToCanceling(
7384
ExecutionGraph executionGraph,

flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/RestartingTest.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.junit.jupiter.params.ParameterizedTest;
3131
import org.junit.jupiter.params.provider.Arguments;
3232
import org.junit.jupiter.params.provider.MethodSource;
33+
import org.junit.jupiter.params.provider.ValueSource;
3334
import org.slf4j.Logger;
3435
import org.slf4j.LoggerFactory;
3536

@@ -76,8 +77,10 @@ public void testTransitionToSubsequentStateWhenCancellationComplete(
7677
}
7778
}
7879

79-
@Test
80-
public void testTransitionToSubsequentStateWhenResourceChanged() throws Exception {
80+
@ParameterizedTest
81+
@ValueSource(booleans = {true, false})
82+
public void testTransitionToSubsequentStateWhenResourceChanged(boolean hasDesiredResources)
83+
throws Exception {
8184
try (MockRestartingContext ctx = new MockRestartingContext()) {
8285
JobVertexID jobVertexId = new JobVertexID();
8386
VertexParallelism availableParallelism =
@@ -86,8 +89,13 @@ public void testTransitionToSubsequentStateWhenResourceChanged() throws Exceptio
8689
new VertexParallelism(singletonMap(jobVertexId, 2));
8790

8891
ctx.setAvailableVertexParallelism(availableParallelism);
92+
ctx.setHasDesiredResources(hasDesiredResources);
8993
Restarting restarting = createRestartingState(ctx, requiredParallelismForForcedRestart);
90-
ctx.setExpectWaitingForResources();
94+
if (hasDesiredResources) {
95+
ctx.setExpectCreatingExecutionGraph();
96+
} else {
97+
ctx.setExpectWaitingForResources();
98+
}
9199
restarting.onGloballyTerminalState(JobStatus.CANCELED);
92100
}
93101
}

0 commit comments

Comments
 (0)