Skip to content

Commit 1e7525f

Browse files
Poorvankbhatiafapaul
authored andcommitted
[FLINK-32695] [Tests] Migrated ChangelogStateBackendLoadingTest to Source V2.
1 parent e2a2755 commit 1e7525f

File tree

2 files changed

+15
-12
lines changed

2 files changed

+15
-12
lines changed

flink-state-backends/flink-statebackend-changelog/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,13 @@ under the License.
112112
<scope>test</scope>
113113
</dependency>
114114

115+
<dependency>
116+
<groupId>org.apache.flink</groupId>
117+
<artifactId>flink-test-utils-connector</artifactId>
118+
<version>${project.version}</version>
119+
<scope>test</scope>
120+
</dependency>
121+
115122
</dependencies>
116123

117124
</project>

flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendLoadingTest.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.state.changelog;
2020

2121
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2223
import org.apache.flink.api.common.functions.FlatMapFunction;
2324
import org.apache.flink.configuration.Configuration;
2425
import org.apache.flink.configuration.IllegalConfigurationException;
@@ -43,8 +44,8 @@
4344
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
4445
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
4546
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
46-
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
4747
import org.apache.flink.streaming.api.graph.StreamGraph;
48+
import org.apache.flink.test.util.source.AbstractTestSource;
4849
import org.apache.flink.util.Collector;
4950
import org.apache.flink.util.TernaryBoolean;
5051

@@ -262,19 +263,12 @@ private void testLoadingStateBackend(
262263

263264
private StreamExecutionEnvironment getEnvironment() {
264265
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
265-
SourceFunction<Integer> srcFun =
266-
new SourceFunction<Integer>() {
267-
private static final long serialVersionUID = 1L;
268-
269-
@Override
270-
public void run(SourceContext<Integer> ctx) throws Exception {}
271-
272-
@Override
273-
public void cancel() {}
274-
};
275266

276267
SingleOutputStreamOperator<Object> operator =
277-
env.addSource(srcFun)
268+
env.fromSource(
269+
new EmptyTestSource(),
270+
WatermarkStrategy.noWatermarks(),
271+
"EmptyTestSource")
278272
.flatMap(
279273
new FlatMapFunction<Integer, Object>() {
280274

@@ -288,6 +282,8 @@ public void flatMap(Integer value, Collector<Object> out)
288282
return env;
289283
}
290284

285+
private static class EmptyTestSource extends AbstractTestSource<Integer> {}
286+
291287
private void assertStateBackendAndChangelogInStreamGraphAndJobGraph(
292288
StreamExecutionEnvironment env, TernaryBoolean isChangelogEnabled) throws Exception {
293289
assertEquals(isChangelogEnabled, env.isChangelogStateBackendEnabled());

0 commit comments

Comments
 (0)