Skip to content

Commit d936c73

Browse files
gustavodemoraisrkhachatryan
authored andcommitted
[FLINK-37955] Add test for duplicated results from StreamingJoinOperator
1 parent 87ea276 commit d936c73

File tree

2 files changed

+63
-1
lines changed

2 files changed

+63
-1
lines changed

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/common/JoinTestPrograms.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818

1919
package org.apache.flink.table.planner.plan.nodes.exec.common;
2020

21+
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecJoin;
2122
import org.apache.flink.table.test.program.SinkTestStep;
2223
import org.apache.flink.table.test.program.SourceTestStep;
2324
import org.apache.flink.table.test.program.TableTestProgram;
2425
import org.apache.flink.types.Row;
26+
import org.apache.flink.types.RowKind;
27+
28+
import java.util.stream.IntStream;
2529

2630
/** {@link TableTestProgram} definitions for testing {@link StreamExecJoin}. */
2731
public class JoinTestPrograms {
@@ -41,6 +45,63 @@ public class JoinTestPrograms {
4145
public static final TableTestProgram JOIN_WITH_STATE_TTL_HINT;
4246
public static final TableTestProgram SEMI_ANTI_JOIN_WITH_LITERAL_AGG;
4347

48+
public static final TableTestProgram OUTER_JOIN_CHANGELOG_TEST =
49+
TableTestProgram.of("join-duplicate-emission-bug", "bug with CTE and left join")
50+
.setupTableSource(
51+
SourceTestStep.newBuilder("upsert_table_with_duplicates")
52+
.addSchema(
53+
"`execution_plan_id` VARCHAR(2147483647) NOT NULL",
54+
"`workflow_id` VARCHAR(2147483647) NOT NULL",
55+
"`event_section_id` VARCHAR(2147483647) NOT NULL",
56+
"CONSTRAINT `PRIMARY` PRIMARY KEY (`execution_plan_id`, `event_section_id`) NOT ENFORCED")
57+
.addOption("changelog-mode", "I, UA,D")
58+
.producedValues(
59+
IntStream.range(0, 13)
60+
.mapToObj(
61+
i ->
62+
Row.ofKind(
63+
RowKind.UPDATE_AFTER,
64+
"section_id_1",
65+
"section_id_2",
66+
"section_id_3"))
67+
.toArray(Row[]::new))
68+
.build())
69+
.setupTableSink(
70+
SinkTestStep.newBuilder("sink")
71+
.addSchema("event_element_id STRING", "cnt BIGINT")
72+
.testMaterializedData()
73+
.consumedValues(Row.of("pk-1", 1), Row.of("pk-2", 1))
74+
.build())
75+
.runSql(
76+
"INSERT INTO sink WITH\n"
77+
+ " section_detail as (\n"
78+
+ " SELECT s.event_section_id\n"
79+
+ " \n"
80+
+ " FROM upsert_table_with_duplicates s\n"
81+
+ " ),\n"
82+
+ "\n"
83+
+ " event_element as (\n"
84+
+ " SELECT\n"
85+
+ " ed.id as event_element_id\n"
86+
+ " FROM (\n"
87+
+ " SELECT\n"
88+
+ " 'pk-2' id,\n"
89+
+ " 'section_id_3' section_id\n"
90+
+ " UNION ALL\n"
91+
+ " SELECT\n"
92+
+ " 'pk-1' id,\n"
93+
+ " 'section_id_3' section_id\n"
94+
+ " ) ed \n"
95+
+ " LEFT JOIN\n"
96+
+ " section_detail as s\n"
97+
+ " ON s.event_section_id = ed.section_id\n"
98+
+ " )\n"
99+
+ "\n"
100+
+ "SELECT event_element_id, COUNT(*) cnt\n"
101+
+ "FROM event_element\n"
102+
+ "GROUP BY event_element_id")
103+
.build();
104+
44105
static final SourceTestStep EMPLOYEE =
45106
SourceTestStep.newBuilder("EMPLOYEE")
46107
.addSchema("deptno int", "salary bigint", "name varchar")

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinRestoreTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public List<TableTestProgram> programs() {
4949
JoinTestPrograms.SEMI_JOIN,
5050
JoinTestPrograms.ANTI_JOIN,
5151
JoinTestPrograms.JOIN_WITH_STATE_TTL_HINT,
52-
JoinTestPrograms.SEMI_ANTI_JOIN_WITH_LITERAL_AGG);
52+
JoinTestPrograms.SEMI_ANTI_JOIN_WITH_LITERAL_AGG,
53+
JoinTestPrograms.OUTER_JOIN_CHANGELOG_TEST);
5354
}
5455
}

0 commit comments

Comments
 (0)