diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java index 1265fa9315065..60c6b692f9430 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java @@ -40,6 +40,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.planner.plan.utils.AggregateInfoList; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; @@ -65,6 +66,7 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.calcite.rel.core.AggregateCall; @@ -73,6 +75,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.time.Duration; import java.time.ZoneId; import java.util.Arrays; @@ -135,6 +139,13 @@ public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase { @JsonProperty(FIELD_NAME_NEED_RETRACTION) private final boolean needRetraction; + public static final String STATE_NAME = "groupWindowAggregateState"; + + @Nullable + @JsonProperty(FIELD_NAME_STATE) + @JsonInclude(JsonInclude.Include.NON_NULL) + private final List stateMetadataList; + public StreamExecGroupWindowAggregate( ReadableConfig tableConfig, int[] grouping, @@ -155,6 +166,7 @@ public StreamExecGroupWindowAggregate( window, namedWindowProperties, needRetraction, + StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME), Collections.singletonList(inputProperty), outputType, description); @@ -171,6 +183,7 @@ public StreamExecGroupWindowAggregate( @JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES) NamedWindowProperty[] namedWindowProperties, @JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction, + @Nullable @JsonProperty(FIELD_NAME_STATE) List stateMetadataList, @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { @@ -181,6 +194,7 @@ public StreamExecGroupWindowAggregate( this.window = checkNotNull(window); this.namedWindowProperties = checkNotNull(namedWindowProperties); this.needRetraction = needRetraction; + this.stateMetadataList = stateMetadataList; } @SuppressWarnings("unchecked") @@ -263,10 +277,13 @@ protected Transformation translateToPlanInternal( final LogicalType[] aggValueTypes = extractLogicalTypes(aggInfoList.getActualValueTypes()); final LogicalType[] accTypes = extractLogicalTypes(aggInfoList.getAccTypes()); final int inputCountIndex = aggInfoList.getIndexOfCountStar(); + final long stateRetentionTime = + StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList); final WindowOperator operator = createWindowOperator( config, + stateRetentionTime, aggCodeGenerator, equaliser, accTypes, @@ -368,6 +385,7 @@ private GeneratedClass createAggsHandler( private WindowOperator createWindowOperator( ReadableConfig config, + long stateRetentionTime, GeneratedClass aggsHandler, GeneratedRecordEqualiser recordEqualiser, LogicalType[] accTypes, @@ -381,7 +399,8 @@ private GeneratedClass createAggsHandler( WindowOperatorBuilder.builder() .withInputFields(inputFields) .withShiftTimezone(shiftTimeZone) - .withInputCountIndex(inputCountIndex); + .withInputCountIndex(inputCountIndex) + .withStateRetentionTime(stateRetentionTime); if (window instanceof TumblingGroupWindow) { TumblingGroupWindow tumblingWindow = (TumblingGroupWindow) window; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/AggregateWindowOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/AggregateWindowOperator.java index 05b117e51b2d7..2a46860e3ecf4 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/AggregateWindowOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/AggregateWindowOperator.java @@ -84,7 +84,8 @@ public class AggregateWindowOperator extends WindowOperator boolean produceUpdates, long allowedLateness, ZoneId shiftTimeZone, - int inputCountIndex) { + int inputCountIndex, + long stateRetentionTime) { super( windowAggregator, windowAssigner, @@ -98,7 +99,8 @@ public class AggregateWindowOperator extends WindowOperator produceUpdates, allowedLateness, shiftTimeZone, - inputCountIndex); + inputCountIndex, + stateRetentionTime); this.aggWindowAggregator = windowAggregator; this.equaliser = checkNotNull(equaliser); } @@ -117,7 +119,8 @@ public class AggregateWindowOperator extends WindowOperator boolean sendRetraction, long allowedLateness, ZoneId shiftTimeZone, - int inputCountIndex) { + int inputCountIndex, + long stateRetentionTime) { super( windowAssigner, trigger, @@ -130,7 +133,8 @@ public class AggregateWindowOperator extends WindowOperator sendRetraction, allowedLateness, shiftTimeZone, - inputCountIndex); + inputCountIndex, + stateRetentionTime); this.generatedAggWindowAggregator = generatedAggWindowAggregator; this.generatedEqualiser = checkNotNull(generatedEqualiser); } @@ -171,7 +175,7 @@ protected void emitWindowResult(W window) throws Exception { // has emitted result for the window if (previousAggResult != null) { // current agg is not equal to the previous emitted, should emit retract - if (!equaliser.equals(aggResult, previousAggResult)) { + if (stateRetentionTime > 0 || !equaliser.equals(aggResult, previousAggResult)) { // send UPDATE_BEFORE collect( RowKind.UPDATE_BEFORE, diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/TableAggregateWindowOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/TableAggregateWindowOperator.java index 118163d895070..d2f3511b10e77 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/TableAggregateWindowOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/TableAggregateWindowOperator.java @@ -68,7 +68,8 @@ public class TableAggregateWindowOperator extends WindowOpe boolean produceUpdates, long allowedLateness, ZoneId shiftTimeZone, - int inputCountIndex) { + int inputCountIndex, + long stateRetentionTime) { super( windowTableAggregator, windowAssigner, @@ -82,7 +83,8 @@ public class TableAggregateWindowOperator extends WindowOpe produceUpdates, allowedLateness, shiftTimeZone, - inputCountIndex); + inputCountIndex, + stateRetentionTime); this.tableAggWindowAggregator = windowTableAggregator; } @@ -99,7 +101,8 @@ public class TableAggregateWindowOperator extends WindowOpe boolean sendRetraction, long allowedLateness, ZoneId shiftTimeZone, - int inputCountIndex) { + int inputCountIndex, + long stateRetentionTime) { super( windowAssigner, trigger, @@ -112,7 +115,8 @@ public class TableAggregateWindowOperator extends WindowOpe sendRetraction, allowedLateness, shiftTimeZone, - inputCountIndex); + inputCountIndex, + stateRetentionTime); this.generatedTableAggWindowAggregator = generatedTableAggWindowAggregator; } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java index ccc0265c10064..ab0852b44579e 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java @@ -148,6 +148,9 @@ public abstract class WindowOperator extends AbstractStream /** Used to count the number of added and retracted input records. */ protected final RecordCounter recordCounter; + /** State idle retention time which unit is MILLISECONDS. */ + protected final long stateRetentionTime; + // -------------------------------------------------------------------------------- protected NamespaceAggsHandleFunctionBase windowAggregator; @@ -188,7 +191,8 @@ public abstract class WindowOperator extends AbstractStream boolean produceUpdates, long allowedLateness, ZoneId shiftTimeZone, - int inputCountIndex) { + int inputCountIndex, + long stateRetentionTime) { checkArgument(allowedLateness >= 0); this.windowAggregator = checkNotNull(windowAggregator); this.windowAssigner = checkNotNull(windowAssigner); @@ -206,6 +210,7 @@ public abstract class WindowOperator extends AbstractStream this.rowtimeIndex = rowtimeIndex; this.shiftTimeZone = shiftTimeZone; this.recordCounter = RecordCounter.of(inputCountIndex); + this.stateRetentionTime = stateRetentionTime; } @Override @@ -225,7 +230,8 @@ public boolean useInterruptibleTimers() { boolean produceUpdates, long allowedLateness, ZoneId shiftTimeZone, - int inputCountIndex) { + int inputCountIndex, + long stateRetentionTime) { checkArgument(allowedLateness >= 0); this.windowAssigner = checkNotNull(windowAssigner); this.trigger = checkNotNull(trigger); @@ -242,6 +248,7 @@ public boolean useInterruptibleTimers() { this.rowtimeIndex = rowtimeIndex; this.shiftTimeZone = shiftTimeZone; this.recordCounter = RecordCounter.of(inputCountIndex); + this.stateRetentionTime = stateRetentionTime; } protected abstract void compileGeneratedCode(); diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorBuilder.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorBuilder.java index 6bd6b28be9ed0..58133525792d5 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorBuilder.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorBuilder.java @@ -75,6 +75,7 @@ public class WindowOperatorBuilder { protected int rowtimeIndex = -1; protected ZoneId shiftTimeZone = ZoneId.of("UTC"); protected int inputCountIndex = -1; + protected long stateRetentionTime; public static WindowOperatorBuilder builder() { return new WindowOperatorBuilder(); @@ -95,6 +96,11 @@ public WindowOperatorBuilder withShiftTimezone(ZoneId shiftTimeZone) { return this; } + public WindowOperatorBuilder withStateRetentionTime(long stateRetentionTime) { + this.stateRetentionTime = stateRetentionTime; + return this; + } + public WindowOperatorBuilder tumble(Duration size) { checkArgument(windowAssigner == null); this.windowAssigner = TumblingWindowAssigner.of(size); @@ -303,7 +309,8 @@ public WindowOperator build() { windowOperatorBuilder.produceUpdates, windowOperatorBuilder.allowedLateness, windowOperatorBuilder.shiftTimeZone, - windowOperatorBuilder.inputCountIndex); + windowOperatorBuilder.inputCountIndex, + windowOperatorBuilder.stateRetentionTime); } else { //noinspection unchecked return new TableAggregateWindowOperator( @@ -320,7 +327,8 @@ public WindowOperator build() { windowOperatorBuilder.produceUpdates, windowOperatorBuilder.allowedLateness, windowOperatorBuilder.shiftTimeZone, - windowOperatorBuilder.inputCountIndex); + windowOperatorBuilder.inputCountIndex, + windowOperatorBuilder.stateRetentionTime); } } } @@ -370,7 +378,8 @@ public AggregateWindowOperator build() { windowOperatorBuilder.produceUpdates, windowOperatorBuilder.allowedLateness, windowOperatorBuilder.shiftTimeZone, - windowOperatorBuilder.inputCountIndex); + windowOperatorBuilder.inputCountIndex, + windowOperatorBuilder.stateRetentionTime); } else { //noinspection unchecked return new AggregateWindowOperator( @@ -388,7 +397,8 @@ public AggregateWindowOperator build() { windowOperatorBuilder.produceUpdates, windowOperatorBuilder.allowedLateness, windowOperatorBuilder.shiftTimeZone, - windowOperatorBuilder.inputCountIndex); + windowOperatorBuilder.inputCountIndex, + windowOperatorBuilder.stateRetentionTime); } } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorContractTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorContractTest.java index 4d50d1addffab..f452e5cca9196 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorContractTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorContractTest.java @@ -225,7 +225,8 @@ KeyedOneInputStreamOperatorTestHarness createWindowOp sendRetraction, allowedLateness, UTC_ZONE_ID, - -1); + -1, + 0); return new KeyedOneInputStreamOperatorTestHarness( operator, keySelector, keyType); } else { @@ -243,7 +244,8 @@ KeyedOneInputStreamOperatorTestHarness createWindowOp sendRetraction, allowedLateness, UTC_ZONE_ID, - -1); + -1, + 0); return new KeyedOneInputStreamOperatorTestHarness( operator, keySelector, keyType);