Skip to content

Commit e2a2755

Browse files
raminqafdawidwys
authored andcommitted
[FLINK-38286][table] Fix: MAP function with duplicate keys produces non-deterministic results
1 parent f7a159b commit e2a2755

File tree

3 files changed

+32
-15
lines changed

3 files changed

+32
-15
lines changed

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/GenerateUtils.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -802,4 +802,19 @@ object GenerateUtils {
802802
compares.mkString
803803
}
804804

805+
/**
806+
* Groups the input sequence by the key function, and keeps the order of the first appearance of
807+
* each key.
808+
*/
809+
def groupByOrdered[A, K](xs: collection.Seq[A])(
810+
f: A => K): collection.Seq[(K, collection.Seq[A])] = {
811+
val m = collection.mutable.LinkedHashMap.empty[K, collection.mutable.ArrayBuffer[A]]
812+
xs.foreach {
813+
x =>
814+
val k = f(x)
815+
m.getOrElseUpdate(k, collection.mutable.ArrayBuffer.empty[A]) += x
816+
}
817+
m.toSeq.map { case (k, buffer) => (k, buffer) }
818+
}
819+
805820
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1510,28 +1510,23 @@ object ScalarOperatorGens {
15101510
val mapType = resultType.asInstanceOf[MapType]
15111511
val baseMap = newName(ctx, "map")
15121512

1513-
// prepare map key array
1514-
val keyElements = elements
1513+
val keyValues = elements
15151514
.grouped(2)
15161515
.map { case Seq(key, value) => (key, value) }
15171516
.toSeq
1518-
.groupBy(_._1)
1519-
.map(_._2.last)
1520-
.keys
1521-
.toSeq
1517+
1518+
val deduplicatedKeyValues = groupByOrdered(keyValues)(_._1).map {
1519+
case (_, pairs) =>
1520+
pairs.last // Take the last occurrence
1521+
}
1522+
// prepare map key array
1523+
val keyElements = deduplicatedKeyValues.map(_._1)
15221524
val keyType = mapType.getKeyType
15231525
val keyExpr = generateArray(ctx, new ArrayType(keyType), keyElements)
15241526
val isKeyFixLength = isPrimitive(keyType)
15251527

15261528
// prepare map value array
1527-
val valueElements = elements
1528-
.grouped(2)
1529-
.map { case Seq(key, value) => (key, value) }
1530-
.toSeq
1531-
.groupBy(_._1)
1532-
.map(_._2.last)
1533-
.values
1534-
.toSeq
1529+
val valueElements = deduplicatedKeyValues.map(_._2)
15351530
val valueType = mapType.getValueType
15361531
val valueExpr = generateArray(ctx, new ArrayType(valueType), valueElements)
15371532
val isValueFixLength = isPrimitive(valueType)

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/MapFunctionITCase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,16 @@ private Stream<TestSetSpec> mapTestCases() {
9797
DECIMAL(10, 4).notNull(),
9898
BOOLEAN().notNull())
9999
.testResult(
100+
resultSpec(
101+
map(
102+
1, 1, 1, 2, 1, 9, 1, 3, 2, 24, 2, 29, 1, 0, 2, 22,
103+
1, 8, 2, 25, 2, 20),
104+
"MAP[1, 1, 1, 2, 1, 9, 1, 3, 2, 24, 2, 29, 1, 0, 2, 22, 1, 8, 2, 25, 2, 20]",
105+
Map.ofEntries(Map.entry(1, 8), Map.entry(2, 20)),
106+
DataTypes.MAP(INT().notNull(), INT().notNull()).notNull()),
100107
resultSpec(
101108
map($("f0"), $("f0"), $("f0"), $("f1")),
102-
"MAP[f0, f1]",
109+
"MAP[f0, f0, f0, f1]",
103110
Collections.singletonMap(1, 2),
104111
DataTypes.MAP(INT().notNull(), INT().notNull()).notNull()),
105112
resultSpec(

0 commit comments

Comments
 (0)