Skip to content

Commit ec1311c

Browse files
JunRuiLeezhuzhurk
authored andcommitted
[FLINK-33982][core] Introduce new config options for Batch Job Recovery
This closes apache#24026.
1 parent f38d8ca commit ec1311c

File tree

4 files changed

+130
-0
lines changed

4 files changed

+130
-0
lines changed

docs/layouts/shortcodes/generated/batch_execution_configuration.html

+18
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,24 @@
3838
<td>Integer</td>
3939
<td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
4040
</tr>
41+
<tr>
42+
<td><h5>execution.batch.job-recovery.enabled</h5></td>
43+
<td style="word-wrap: break-word;">false</td>
44+
<td>Boolean</td>
45+
<td>A flag to enable or disable the job recovery. If enabled, batch jobs can resume with previously generated intermediate results after job master restarts due to failures, thereby preserving the progress.</td>
46+
</tr>
47+
<tr>
48+
<td><h5>execution.batch.job-recovery.previous-worker.recovery.timeout</h5></td>
49+
<td style="word-wrap: break-word;">30 s</td>
50+
<td>Duration</td>
51+
<td>The timeout for a new job master to wait for the previous worker to reconnect.A reconnected worker will transmit the details of its produced intermediate results to the new job master, enabling the job master to reuse these results.</td>
52+
</tr>
53+
<tr>
54+
<td><h5>execution.batch.job-recovery.snapshot.min-pause</h5></td>
55+
<td style="word-wrap: break-word;">3 min</td>
56+
<td>Duration</td>
57+
<td>The minimal pause between snapshots taken by operator coordinator or other components. It is used to avoid performance degradation due to excessive snapshot frequency.</td>
58+
</tr>
4159
<tr>
4260
<td><h5>execution.batch.speculative.block-slow-node-duration</h5></td>
4361
<td style="word-wrap: break-word;">1 min</td>

docs/layouts/shortcodes/generated/expert_scheduling_section.html

+30
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,24 @@
3838
<td>Integer</td>
3939
<td>The lower bound of allowed parallelism to set adaptively if <code class="highlighter-rouge">jobmanager.scheduler</code> has been set to <code class="highlighter-rouge">AdaptiveBatch</code></td>
4040
</tr>
41+
<tr>
42+
<td><h5>execution.batch.job-recovery.enabled</h5></td>
43+
<td style="word-wrap: break-word;">false</td>
44+
<td>Boolean</td>
45+
<td>A flag to enable or disable the job recovery. If enabled, batch jobs can resume with previously generated intermediate results after job master restarts due to failures, thereby preserving the progress.</td>
46+
</tr>
47+
<tr>
48+
<td><h5>execution.batch.job-recovery.previous-worker.recovery.timeout</h5></td>
49+
<td style="word-wrap: break-word;">30 s</td>
50+
<td>Duration</td>
51+
<td>The timeout for a new job master to wait for the previous worker to reconnect.A reconnected worker will transmit the details of its produced intermediate results to the new job master, enabling the job master to reuse these results.</td>
52+
</tr>
53+
<tr>
54+
<td><h5>execution.batch.job-recovery.snapshot.min-pause</h5></td>
55+
<td style="word-wrap: break-word;">3 min</td>
56+
<td>Duration</td>
57+
<td>The minimal pause between snapshots taken by operator coordinator or other components. It is used to avoid performance degradation due to excessive snapshot frequency.</td>
58+
</tr>
4159
<tr>
4260
<td><h5>execution.batch.speculative.block-slow-node-duration</h5></td>
4361
<td style="word-wrap: break-word;">1 min</td>
@@ -62,6 +80,18 @@
6280
<td>Boolean</td>
6381
<td>Whether to convert all PIPELINE edges to BLOCKING when apply fine-grained resource management in batch jobs.</td>
6482
</tr>
83+
<tr>
84+
<td><h5>job-event.store.write-buffer.flush-interval</h5></td>
85+
<td style="word-wrap: break-word;">1 s</td>
86+
<td>Duration</td>
87+
<td>The flush interval of JobEventStore write buffers. Buffer contents will be flushed to external file system regularly with regard to this value.</td>
88+
</tr>
89+
<tr>
90+
<td><h5>job-event.store.write-buffer.size</h5></td>
91+
<td style="word-wrap: break-word;">1 mb</td>
92+
<td>MemorySize</td>
93+
<td>The size of the write buffer of JobEventStore. The content will be flushed to external file system once the buffer is full</td>
94+
</tr>
6595
<tr>
6696
<td><h5>jobmanager.adaptive-scheduler.min-parallelism-increase</h5></td>
6797
<td style="word-wrap: break-word;">1</td>

flink-core/src/main/java/org/apache/flink/configuration/BatchExecutionOptions.java

+29
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,35 @@ public class BatchExecutionOptions {
148148
.withDescription(
149149
"Controls how long an detected slow node should be blocked for.");
150150

151+
@Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
152+
public static final ConfigOption<Boolean> JOB_RECOVERY_ENABLED =
153+
key("execution.batch.job-recovery.enabled")
154+
.booleanType()
155+
.defaultValue(false)
156+
.withDescription(
157+
"A flag to enable or disable the job recovery. If enabled, batch jobs "
158+
+ "can resume with previously generated intermediate results "
159+
+ "after job master restarts due to failures, thereby preserving the progress.");
160+
161+
@Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
162+
public static final ConfigOption<Duration> JOB_RECOVERY_PREVIOUS_WORKER_RECOVERY_TIMEOUT =
163+
key("execution.batch.job-recovery.previous-worker.recovery.timeout")
164+
.durationType()
165+
.defaultValue(Duration.ofSeconds(30))
166+
.withDescription(
167+
"The timeout for a new job master to wait for the previous worker to reconnect."
168+
+ "A reconnected worker will transmit the details of its produced intermediate "
169+
+ "results to the new job master, enabling the job master to reuse these results.");
170+
171+
@Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
172+
public static final ConfigOption<Duration> JOB_RECOVERY_SNAPSHOT_MIN_PAUSE =
173+
key("execution.batch.job-recovery.snapshot.min-pause")
174+
.durationType()
175+
.defaultValue(Duration.ofMinutes(3))
176+
.withDescription(
177+
"The minimal pause between snapshots taken by operator coordinator or other components. "
178+
+ "It is used to avoid performance degradation due to excessive snapshot frequency.");
179+
151180
private BatchExecutionOptions() {
152181
throw new UnsupportedOperationException("This class should never be instantiated.");
153182
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.configuration;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.annotation.docs.Documentation;
23+
24+
import java.time.Duration;
25+
26+
import static org.apache.flink.configuration.ConfigOptions.key;
27+
28+
/** Configuration options for the job events. */
29+
@PublicEvolving
30+
public class JobEventStoreOptions {
31+
32+
@Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
33+
public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE =
34+
key("job-event.store.write-buffer.size")
35+
.memoryType()
36+
.defaultValue(MemorySize.parse("1MB"))
37+
.withDescription(
38+
"The size of the write buffer of JobEventStore. "
39+
+ "The content will be flushed to external file system once the buffer is full");
40+
41+
@Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
42+
public static final ConfigOption<Duration> FLUSH_INTERVAL =
43+
key("job-event.store.write-buffer.flush-interval")
44+
.durationType()
45+
.defaultValue(Duration.ofSeconds(1))
46+
.withDescription(
47+
"The flush interval of JobEventStore write buffers. Buffer contents will "
48+
+ "be flushed to external file system regularly with regard to this value.");
49+
50+
private JobEventStoreOptions() {
51+
throw new UnsupportedOperationException("This class should never be instantiated.");
52+
}
53+
}

0 commit comments

Comments
 (0)