Skip to content

Commit 6bc3d6b

Browse files
Zakellymasteryhx
authored andcommitted
[FLINK-34516][checkpoint] Use new CheckpointingMode in flink-core in scala
1 parent 54ac32f commit 6bc3d6b

File tree

8 files changed

+43
-14
lines changed

8 files changed

+43
-14
lines changed

flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala

+36-7
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
3030
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
3131
import org.apache.flink.api.scala.ClosureCleaner
3232
import org.apache.flink.configuration.{Configuration, ReadableConfig}
33-
import org.apache.flink.core.execution.{JobClient, JobListener}
33+
import org.apache.flink.core.execution.{CheckpointingMode, JobClient, JobListener}
3434
import org.apache.flink.core.fs.Path
3535
import org.apache.flink.runtime.state.StateBackend
36-
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
36+
import org.apache.flink.streaming.api.TimeCharacteristic
3737
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment => JavaEnv}
3838
import org.apache.flink.streaming.api.functions.source._
3939
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
@@ -202,12 +202,39 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
202202
@PublicEvolving
203203
def enableCheckpointing(
204204
interval: Long,
205-
mode: CheckpointingMode,
205+
mode: org.apache.flink.streaming.api.CheckpointingMode,
206206
force: Boolean): StreamExecutionEnvironment = {
207207
javaEnv.enableCheckpointing(interval, mode, force)
208208
this
209209
}
210210

211+
/**
212+
* Enables checkpointing for the streaming job. The distributed state of the streaming dataflow
213+
* will be periodically snapshotted. In case of a failure, the streaming dataflow will be
214+
* restarted from the latest completed checkpoint.
215+
*
216+
* The job draws checkpoints periodically, in the given interval. The system uses the given
217+
* [[org.apache.flink.streaming.api.CheckpointingMode]] for the checkpointing ("exactly once" vs
218+
* "at least once"). The state will be stored in the configured state backend.
219+
*
220+
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For
221+
* that reason, iterative jobs will not be started if used with enabled checkpointing.
222+
*
223+
* @param interval
224+
* Time interval between state checkpoints in milliseconds.
225+
* @param mode
226+
* The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
227+
* @deprecated
228+
* Use [[enableCheckpointing(Long, CheckpointingMode)]] instead.
229+
*/
230+
@deprecated
231+
def enableCheckpointing(
232+
interval: Long,
233+
mode: org.apache.flink.streaming.api.CheckpointingMode): StreamExecutionEnvironment = {
234+
javaEnv.enableCheckpointing(interval, mode)
235+
this
236+
}
237+
211238
/**
212239
* Enables checkpointing for the streaming job. The distributed state of the streaming dataflow
213240
* will be periodically snapshotted. In case of a failure, the streaming dataflow will be
@@ -218,8 +245,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
218245
* be stored in the configured state backend.
219246
*
220247
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For
221-
* that reason, iterative jobs will not be started if used with enabled checkpointing. To override
222-
* this mechanism, use the [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
248+
* that reason, iterative jobs will not be started if used with enabled checkpointing.
223249
*
224250
* @param interval
225251
* Time interval between state checkpoints in milliseconds.
@@ -241,8 +267,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
241267
* backend.
242268
*
243269
* NOTE: Checkpointing iterative streaming dataflows in not properly supported at the moment. For
244-
* that reason, iterative jobs will not be started if used with enabled checkpointing. To override
245-
* this mechanism, use the [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
270+
* that reason, iterative jobs will not be started if used with enabled checkpointing.
246271
*
247272
* @param interval
248273
* Time interval between state checkpoints in milliseconds.
@@ -266,8 +291,12 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) extends AutoCloseable {
266291
this
267292
}
268293

294+
/** @deprecated Use [[getCheckpointingConsistencyMode()]] instead. */
295+
@deprecated
269296
def getCheckpointingMode = javaEnv.getCheckpointingMode()
270297

298+
def getCheckpointingConsistencyMode = javaEnv.getCheckpointingConsistencyMode()
299+
271300
/**
272301
* Sets the state backend that describes how to store operator. It defines the data structures
273302
* that hold state during execution (for example hash tables, RocksDB, or other data stores).

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
1919

2020
import org.apache.flink.api.common.restartstrategy.RestartStrategies
2121
import org.apache.flink.api.scala._
22-
import org.apache.flink.streaming.api.CheckpointingMode
22+
import org.apache.flink.core.execution.CheckpointingMode
2323
import org.apache.flink.table.api._
2424
import org.apache.flink.table.api.bridge.scala._
2525
import org.apache.flink.table.api.config.OptimizerConfigOptions

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDeduplicateITCase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
1919

2020
import org.apache.flink.api.common.restartstrategy.RestartStrategies
2121
import org.apache.flink.api.scala._
22-
import org.apache.flink.streaming.api.CheckpointingMode
22+
import org.apache.flink.core.execution.CheckpointingMode
2323
import org.apache.flink.table.api.bridge.scala._
2424
import org.apache.flink.table.planner.factories.TestValuesTableFactory
2525
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowDistinctAggregateITCase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
1919

2020
import org.apache.flink.api.common.restartstrategy.RestartStrategies
2121
import org.apache.flink.api.scala._
22-
import org.apache.flink.streaming.api.CheckpointingMode
22+
import org.apache.flink.core.execution.CheckpointingMode
2323
import org.apache.flink.table.api.bridge.scala.tableConversions
2424
import org.apache.flink.table.api.config.OptimizerConfigOptions
2525
import org.apache.flink.table.planner.factories.TestValuesTableFactory

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowJoinITCase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
1919

2020
import org.apache.flink.api.common.restartstrategy.RestartStrategies
2121
import org.apache.flink.api.scala._
22-
import org.apache.flink.streaming.api.CheckpointingMode
22+
import org.apache.flink.core.execution.CheckpointingMode
2323
import org.apache.flink.table.api.bridge.scala._
2424
import org.apache.flink.table.planner.factories.TestValuesTableFactory
2525
import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowRankITCase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.flink.table.planner.runtime.stream.sql
1919

2020
import org.apache.flink.api.common.restartstrategy.RestartStrategies
2121
import org.apache.flink.api.scala._
22-
import org.apache.flink.streaming.api.CheckpointingMode
22+
import org.apache.flink.core.execution.CheckpointingMode
2323
import org.apache.flink.table.api.bridge.scala._
2424
import org.apache.flink.table.planner.factories.TestValuesTableFactory
2525
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.ConcatDistinctAggFunction

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowTableFunctionITCase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.flink.table.planner.runtime.stream.sql
1919

2020
import org.apache.flink.api.common.restartstrategy.RestartStrategies
21-
import org.apache.flink.streaming.api.CheckpointingMode
21+
import org.apache.flink.core.execution.CheckpointingMode
2222
import org.apache.flink.table.api.bridge.scala._
2323
import org.apache.flink.table.planner.factories.TestValuesTableFactory
2424
import org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, StreamingWithStateTestBase, TestData, TestingAppendSink}

flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingWithStateTestBase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
2222
import org.apache.flink.api.common.typeutils.CompositeType
2323
import org.apache.flink.configuration.{CheckpointingOptions, Configuration}
2424
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
25+
import org.apache.flink.core.execution.CheckpointingMode
2526
import org.apache.flink.runtime.state.memory.MemoryStateBackend
26-
import org.apache.flink.streaming.api.CheckpointingMode
2727
import org.apache.flink.streaming.api.functions.source.FromElementsFunction
2828
import org.apache.flink.streaming.api.scala.DataStream
2929
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

0 commit comments

Comments
 (0)