Skip to content

Commit 54ac32f

Browse files
Zakellymasteryhx
authored andcommitted
[FLINK-34516][checkpoint] Use new CheckpointingMode in flink-core everywhere
1 parent 8fac804 commit 54ac32f

File tree

49 files changed

+170
-133
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+170
-133
lines changed

flink-clients/src/test/java/org/apache/flink/client/program/StreamContextEnvironmentTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import org.apache.flink.configuration.ExecutionOptions;
2626
import org.apache.flink.configuration.PipelineOptions;
2727
import org.apache.flink.configuration.StateRecoveryOptions;
28+
import org.apache.flink.core.execution.CheckpointingMode;
2829
import org.apache.flink.core.execution.PipelineExecutorFactory;
2930
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
3031
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
31-
import org.apache.flink.streaming.api.CheckpointingMode;
3232
import org.apache.flink.streaming.api.environment.CheckpointConfig;
3333
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
3434
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.IntEncoder;
3535
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils.ModuloBucketAssigner;
3636
import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPolicy;
37+
import org.apache.flink.core.execution.CheckpointingMode;
3738
import org.apache.flink.core.execution.SavepointFormatType;
3839
import org.apache.flink.core.fs.Path;
3940
import org.apache.flink.runtime.client.JobExecutionException;
@@ -48,7 +49,6 @@
4849
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
4950
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
5051
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
51-
import org.apache.flink.streaming.api.CheckpointingMode;
5252
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
5353
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
5454
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
import org.apache.flink.api.common.time.Time;
2626
import org.apache.flink.configuration.Configuration;
2727
import org.apache.flink.configuration.ExecutionOptions;
28+
import org.apache.flink.core.execution.CheckpointingMode;
2829
import org.apache.flink.runtime.jobgraph.JobGraph;
2930
import org.apache.flink.runtime.state.CheckpointListener;
3031
import org.apache.flink.runtime.state.FunctionInitializationContext;
3132
import org.apache.flink.runtime.state.FunctionSnapshotContext;
32-
import org.apache.flink.streaming.api.CheckpointingMode;
3333
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
3434
import org.apache.flink.streaming.api.datastream.DataStreamSink;
3535
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileSinkMigrationITCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.api.common.state.ListStateDescriptor;
2626
import org.apache.flink.connector.file.sink.FileSink;
2727
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
28+
import org.apache.flink.core.execution.CheckpointingMode;
2829
import org.apache.flink.core.execution.SavepointFormatType;
2930
import org.apache.flink.core.fs.Path;
3031
import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -33,7 +34,6 @@
3334
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
3435
import org.apache.flink.runtime.state.FunctionInitializationContext;
3536
import org.apache.flink.runtime.state.FunctionSnapshotContext;
36-
import org.apache.flink.streaming.api.CheckpointingMode;
3737
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
3838
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3939
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;

flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestJobFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
import org.apache.flink.configuration.ConfigOption;
3434
import org.apache.flink.configuration.ConfigOptions;
3535
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
36+
import org.apache.flink.core.execution.CheckpointingMode;
3637
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
37-
import org.apache.flink.streaming.api.CheckpointingMode;
3838
import org.apache.flink.streaming.api.datastream.DataStream;
3939
import org.apache.flink.streaming.api.datastream.KeyedStream;
4040
import org.apache.flink.streaming.api.datastream.WindowedStream;

flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
3333
import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
3434
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
35+
import org.apache.flink.core.execution.CheckpointingMode;
3536
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
36-
import org.apache.flink.streaming.api.CheckpointingMode;
3737
import org.apache.flink.table.data.GenericRowData;
3838
import org.apache.flink.table.data.RowData;
3939
import org.apache.flink.table.data.StringData;

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/BootstrapTransformation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.api.java.functions.KeySelector;
2828
import org.apache.flink.api.java.operators.MapPartitionOperator;
2929
import org.apache.flink.configuration.Configuration;
30+
import org.apache.flink.core.execution.CheckpointingMode;
3031
import org.apache.flink.core.fs.Path;
3132
import org.apache.flink.core.memory.ManagedMemoryUseCase;
3233
import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -39,7 +40,6 @@
3940
import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
4041
import org.apache.flink.state.api.output.partitioner.HashSelector;
4142
import org.apache.flink.state.api.output.partitioner.KeyGroupRangePartitioner;
42-
import org.apache.flink.streaming.api.CheckpointingMode;
4343
import org.apache.flink.streaming.api.graph.StreamConfig;
4444
import org.apache.flink.streaming.api.operators.StreamOperator;
4545

flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/StateBootstrapTransformation.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.api.common.typeutils.TypeSerializer;
2626
import org.apache.flink.api.java.functions.KeySelector;
2727
import org.apache.flink.configuration.Configuration;
28+
import org.apache.flink.core.execution.CheckpointingMode;
2829
import org.apache.flink.core.fs.Path;
2930
import org.apache.flink.core.memory.ManagedMemoryUseCase;
3031
import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -36,7 +37,6 @@
3637
import org.apache.flink.state.api.output.operators.BroadcastStateBootstrapOperator;
3738
import org.apache.flink.state.api.output.operators.GroupReduceOperator;
3839
import org.apache.flink.state.api.runtime.MutableConfig;
39-
import org.apache.flink.streaming.api.CheckpointingMode;
4040
import org.apache.flink.streaming.api.datastream.DataStream;
4141
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
4242
import org.apache.flink.streaming.api.graph.StreamConfig;

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java

+22-19
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@
2929
import org.apache.flink.configuration.ReadableConfig;
3030
import org.apache.flink.configuration.StateRecoveryOptions;
3131
import org.apache.flink.configuration.description.InlineElement;
32+
import org.apache.flink.core.execution.CheckpointingMode;
3233
import org.apache.flink.core.fs.Path;
3334
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
3435
import org.apache.flink.runtime.state.CheckpointStorage;
3536
import org.apache.flink.runtime.state.StateBackend;
3637
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
37-
import org.apache.flink.streaming.api.CheckpointingMode;
3838
import org.apache.flink.util.Preconditions;
3939

4040
import org.slf4j.Logger;
@@ -72,9 +72,9 @@ public class CheckpointConfig implements java.io.Serializable {
7272
* The default checkpoint mode: exactly once.
7373
*
7474
* @deprecated This field is no longer used. Please use {@link
75-
* ExecutionCheckpointingOptions.CHECKPOINTING_MODE} instead.
75+
* ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE} instead.
7676
*/
77-
public static final CheckpointingMode DEFAULT_MODE =
77+
public static final org.apache.flink.streaming.api.CheckpointingMode DEFAULT_MODE =
7878
ExecutionCheckpointingOptions.CHECKPOINTING_MODE.defaultValue();
7979

8080
/**
@@ -177,41 +177,42 @@ public boolean isCheckpointingEnabled() {
177177
* Gets the checkpointing mode (exactly-once vs. at-least-once).
178178
*
179179
* @return The checkpointing mode.
180-
* @deprecated Use {@link #getCheckpointMode} instead.
180+
* @deprecated Use {@link #getConsistencyMode} instead.
181181
*/
182182
@Deprecated
183-
public CheckpointingMode getCheckpointingMode() {
183+
public org.apache.flink.streaming.api.CheckpointingMode getCheckpointingMode() {
184184
return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE);
185185
}
186186

187187
/**
188188
* Sets the checkpointing mode (exactly-once vs. at-least-once).
189189
*
190190
* @param checkpointingMode The checkpointing mode.
191-
* @deprecated Use {@link #setCheckpointMode} instead.
191+
* @deprecated Use {@link #setConsistencyMode} instead.
192192
*/
193193
@Deprecated
194-
public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
194+
public void setCheckpointingMode(
195+
org.apache.flink.streaming.api.CheckpointingMode checkpointingMode) {
195196
configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, checkpointingMode);
196197
}
197198

198199
/**
199-
* Gets the checkpointing mode (exactly-once vs. at-least-once).
200+
* Gets the checkpointing consistency mode (exactly-once vs. at-least-once).
200201
*
201202
* @return The checkpointing mode.
202203
*/
203-
public org.apache.flink.core.execution.CheckpointingMode getCheckpointMode() {
204-
return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_MODE_V2);
204+
public CheckpointingMode getConsistencyMode() {
205+
return configuration.get(ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE);
205206
}
206207

207208
/**
208-
* Sets the checkpointing mode (exactly-once vs. at-least-once).
209+
* Sets the checkpointing consistency mode (exactly-once vs. at-least-once).
209210
*
210211
* @param checkpointingMode The checkpointing mode.
211212
*/
212-
public void setCheckpointMode(
213-
org.apache.flink.core.execution.CheckpointingMode checkpointingMode) {
214-
configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE_V2, checkpointingMode);
213+
public void setConsistencyMode(CheckpointingMode checkpointingMode) {
214+
configuration.set(
215+
ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE, checkpointingMode);
215216
}
216217

217218
/**
@@ -598,7 +599,8 @@ public boolean isExternalizedCheckpointsEnabled() {
598599
* embedded into the stream of data anymore.
599600
*
600601
* <p>Unaligned checkpoints can only be enabled if {@link
601-
* ExecutionCheckpointingOptions#CHECKPOINTING_MODE} is {@link CheckpointingMode#EXACTLY_ONCE}.
602+
* ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE} is {@link
603+
* CheckpointingMode#EXACTLY_ONCE}.
602604
*
603605
* @param enabled Flag to indicate whether unaligned are enabled.
604606
*/
@@ -616,7 +618,8 @@ public void enableUnalignedCheckpoints(boolean enabled) {
616618
* embedded into the stream of data anymore.
617619
*
618620
* <p>Unaligned checkpoints can only be enabled if {@link
619-
* ExecutionCheckpointingOptions#CHECKPOINTING_MODE} is {@link CheckpointingMode#EXACTLY_ONCE}.
621+
* ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE} is {@link
622+
* CheckpointingMode#EXACTLY_ONCE}.
620623
*/
621624
@PublicEvolving
622625
public void enableUnalignedCheckpoints() {
@@ -986,7 +989,7 @@ public InlineElement getDescription() {
986989

987990
/**
988991
* Sets all relevant options contained in the {@link ReadableConfig} such as e.g. {@link
989-
* ExecutionCheckpointingOptions#CHECKPOINTING_MODE}.
992+
* ExecutionCheckpointingOptions#CHECKPOINTING_CONSISTENCY_MODE}.
990993
*
991994
* <p>It will change the value of a setting only if a corresponding option was set in the {@code
992995
* configuration}. If a key is not present, the current value of a field will remain untouched.
@@ -995,8 +998,8 @@ public InlineElement getDescription() {
995998
*/
996999
public void configure(ReadableConfig configuration) {
9971000
configuration
998-
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_MODE)
999-
.ifPresent(this::setCheckpointingMode);
1001+
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_CONSISTENCY_MODE)
1002+
.ifPresent(this::setConsistencyMode);
10001003
configuration
10011004
.getOptional(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL)
10021005
.ifPresent(i -> this.setCheckpointInterval(i.toMillis()));

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java

+13-11
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.flink.configuration.StateRecoveryOptions;
2828
import org.apache.flink.configuration.description.Description;
2929
import org.apache.flink.configuration.description.TextElement;
30-
import org.apache.flink.streaming.api.CheckpointingMode;
30+
import org.apache.flink.core.execution.CheckpointingMode;
3131

3232
import java.time.Duration;
3333

@@ -44,18 +44,20 @@ public class ExecutionCheckpointingOptions {
4444

4545
@Deprecated
4646
@Documentation.ExcludeFromDocumentation("Hidden for deprecatd.")
47-
public static final ConfigOption<CheckpointingMode> CHECKPOINTING_MODE =
48-
ConfigOptions.key("execution.checkpointing.mode")
49-
.enumType(CheckpointingMode.class)
50-
.defaultValue(CheckpointingMode.EXACTLY_ONCE)
51-
.withDescription("The checkpointing mode (exactly-once vs. at-least-once).");
47+
public static final ConfigOption<org.apache.flink.streaming.api.CheckpointingMode>
48+
CHECKPOINTING_MODE =
49+
ConfigOptions.key("execution.checkpointing.mode")
50+
.enumType(org.apache.flink.streaming.api.CheckpointingMode.class)
51+
.defaultValue(
52+
org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE)
53+
.withDescription(
54+
"The checkpointing mode (exactly-once vs. at-least-once).");
5255

5356
public static final ConfigOption<org.apache.flink.core.execution.CheckpointingMode>
54-
CHECKPOINTING_MODE_V2 =
57+
CHECKPOINTING_CONSISTENCY_MODE =
5558
ConfigOptions.key("execution.checkpointing.mode")
56-
.enumType(org.apache.flink.core.execution.CheckpointingMode.class)
57-
.defaultValue(
58-
org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE)
59+
.enumType(CheckpointingMode.class)
60+
.defaultValue(CheckpointingMode.EXACTLY_ONCE)
5961
.withDescription(
6062
"The checkpointing mode (exactly-once vs. at-least-once).");
6163

@@ -201,7 +203,7 @@ public class ExecutionCheckpointingOptions {
201203
.linebreak()
202204
.text(
203205
"Unaligned checkpoints can only be enabled if %s is %s and if %s is 1",
204-
TextElement.code(CHECKPOINTING_MODE.key()),
206+
TextElement.code(CHECKPOINTING_CONSISTENCY_MODE.key()),
205207
TextElement.code(
206208
CheckpointingMode.EXACTLY_ONCE.toString()),
207209
TextElement.code(MAX_CONCURRENT_CHECKPOINTS.key()))

0 commit comments

Comments
 (0)