Skip to content

Commit ee023b0

Browse files
authored
[core] Fix that sequence group fields are mistakenly aggregated by default aggregator in partial update (apache#4897)
1 parent 4c2ba07 commit ee023b0

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ private void updateWithSequenceGroup(KeyValue kv) {
184184
row.setField(
185185
fieldIndex, getters[fieldIndex].getFieldOrNull(kv.value()));
186186
}
187+
continue;
187188
}
188189
row.setField(
189190
i, aggregator == null ? field : aggregator.agg(accumulator, field));
@@ -304,6 +305,7 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
304305
List<String> fieldNames = rowType.getFieldNames();
305306
this.fieldSeqComparators = new HashMap<>();
306307
Map<String, Integer> sequenceGroupMap = new HashMap<>();
308+
List<String> allSequenceFields = new ArrayList<>();
307309
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
308310
String k = entry.getKey();
309311
String v = entry.getValue();
@@ -318,6 +320,7 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
318320
.split(FIELDS_SEPARATOR))
319321
.map(fieldName -> validateFieldName(fieldName, fieldNames))
320322
.collect(Collectors.toList());
323+
allSequenceFields.addAll(sequenceFields);
321324

322325
Supplier<FieldsComparator> userDefinedSeqComparator =
323326
() -> UserDefinedSeqComparator.create(rowType, sequenceFields, true);
@@ -347,7 +350,8 @@ private Factory(Options options, RowType rowType, List<String> primaryKeys) {
347350
}
348351
}
349352
this.fieldAggregators =
350-
createFieldAggregators(rowType, primaryKeys, new CoreOptions(options));
353+
createFieldAggregators(
354+
rowType, primaryKeys, allSequenceFields, new CoreOptions(options));
351355
if (!fieldAggregators.isEmpty() && fieldSeqComparators.isEmpty()) {
352356
throw new IllegalArgumentException(
353357
"Must use sequence group for aggregation functions.");
@@ -514,7 +518,10 @@ private String validateFieldName(String fieldName, List<String> fieldNames) {
514518
* @return The aggregators for each column.
515519
*/
516520
private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
517-
RowType rowType, List<String> primaryKeys, CoreOptions options) {
521+
RowType rowType,
522+
List<String> primaryKeys,
523+
List<String> allSequenceFields,
524+
CoreOptions options) {
518525

519526
List<String> fieldNames = rowType.getFieldNames();
520527
List<DataType> fieldTypes = rowType.getFieldTypes();
@@ -539,7 +546,8 @@ private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
539546
isPrimaryKey,
540547
options,
541548
fieldName));
542-
} else if (defaultAggFunc != null) {
549+
} else if (defaultAggFunc != null && !allSequenceFields.contains(fieldName)) {
550+
// no agg for sequence fields
543551
fieldAggregators.put(
544552
i,
545553
() ->

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java

+18
Original file line numberDiff line numberDiff line change
@@ -723,4 +723,22 @@ public void testRemoveRecordOnDeleteLookup() throws Exception {
723723
Row.ofKind(RowKind.UPDATE_AFTER, 1, "A", "apache"));
724724
iterator.close();
725725
}
726+
727+
@Test
728+
public void testSequenceGroupWithDefaultAgg() {
729+
sql(
730+
"CREATE TABLE seq_default_agg ("
731+
+ " pk INT PRIMARY KEY NOT ENFORCED,"
732+
+ " seq INT,"
733+
+ " v INT) WITH ("
734+
+ " 'merge-engine'='partial-update',"
735+
+ " 'fields.seq.sequence-group'='v',"
736+
+ " 'fields.default-aggregate-function'='sum'"
737+
+ ")");
738+
739+
sql("INSERT INTO seq_default_agg VALUES (0, 1, 1)");
740+
sql("INSERT INTO seq_default_agg VALUES (0, 2, 2)");
741+
742+
assertThat(sql("SELECT * FROM seq_default_agg")).containsExactly(Row.of(0, 2, 3));
743+
}
726744
}

0 commit comments

Comments
 (0)