Skip to content

Commit 7d390a9

Browse files
authored
[FLINK-34941][checkpoint] Add CollectIteratorAssert#matchesRecordsFromSource and SinkTestSuiteBase#checkResultWithSemantic with former signature
1 parent 05b27be commit 7d390a9

File tree

3 files changed

+42
-1
lines changed

3 files changed

+42
-1
lines changed

flink-streaming-java/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -77,5 +77,17 @@ public enum CheckpointingMode {
7777
* scenarios, where a sustained very-low latency (such as few milliseconds) is needed, and where
7878
* occasional duplicate messages (on recovery) do not matter.
7979
*/
80-
AT_LEAST_ONCE
80+
AT_LEAST_ONCE;
81+
82+
public static org.apache.flink.core.execution.CheckpointingMode convertToCheckpointingMode(
83+
org.apache.flink.streaming.api.CheckpointingMode semantic) {
84+
switch (semantic) {
85+
case EXACTLY_ONCE:
86+
return org.apache.flink.core.execution.CheckpointingMode.EXACTLY_ONCE;
87+
case AT_LEAST_ONCE:
88+
return org.apache.flink.core.execution.CheckpointingMode.AT_LEAST_ONCE;
89+
default:
90+
throw new IllegalArgumentException("Unsupported semantic: " + semantic);
91+
}
92+
}
8193
}

flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SinkTestSuiteBase.java

+15
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
8888
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
8989
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
90+
import static org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode;
9091
import static org.apache.flink.util.Preconditions.checkNotNull;
9192

9293
/**
@@ -513,6 +514,20 @@ protected void checkResultWithSemantic(
513514
});
514515
}
515516

517+
/**
518+
* This method is required for downstream projects e.g. Flink connectors extending this test for
519+
* the case when there should be supported Flink versions below 1.20. Could be removed together
520+
* with dropping support for Flink 1.19.
521+
*/
522+
@Deprecated
523+
protected void checkResultWithSemantic(
524+
ExternalSystemDataReader<T> reader,
525+
List<T> testData,
526+
org.apache.flink.streaming.api.CheckpointingMode semantic)
527+
throws Exception {
528+
checkResultWithSemantic(reader, testData, convertToCheckpointingMode(semantic));
529+
}
530+
516531
/** Compare the metrics. */
517532
private boolean compareSinkMetrics(
518533
MetricQuerier metricQuerier,

flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java

+14
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import java.util.LinkedList;
2828
import java.util.List;
2929

30+
import static org.apache.flink.streaming.api.CheckpointingMode.convertToCheckpointingMode;
31+
3032
/**
3133
* This assertion used to compare records in the collect iterator to the target test data with
3234
* different semantic(AT_LEAST_ONCE, EXACTLY_ONCE).
@@ -51,6 +53,18 @@ public CollectIteratorAssert<T> withNumRecordsLimit(int limit) {
5153
return this;
5254
}
5355

56+
/**
57+
* This method is required for downstream projects e.g. Flink connectors extending this test for
58+
* the case when there should be supported Flink versions below 1.20. Could be removed together
59+
* with dropping support for Flink 1.19.
60+
*/
61+
@Deprecated
62+
public void matchesRecordsFromSource(
63+
List<List<T>> recordsBySplitsFromSource,
64+
org.apache.flink.streaming.api.CheckpointingMode semantic) {
65+
matchesRecordsFromSource(recordsBySplitsFromSource, convertToCheckpointingMode(semantic));
66+
}
67+
5468
public void matchesRecordsFromSource(
5569
List<List<T>> recordsBySplitsFromSource, CheckpointingMode semantic) {
5670
for (List<T> recordsFromSplit : recordsBySplitsFromSource) {

0 commit comments

Comments
 (0)