Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
Expand All @@ -65,6 +66,7 @@
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import org.apache.calcite.rel.core.AggregateCall;
Expand All @@ -73,6 +75,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
Expand Down Expand Up @@ -135,6 +139,13 @@ public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
@JsonProperty(FIELD_NAME_NEED_RETRACTION)
private final boolean needRetraction;

public static final String STATE_NAME = "groupWindowAggregateState";

@Nullable
@JsonProperty(FIELD_NAME_STATE)
@JsonInclude(JsonInclude.Include.NON_NULL)
private final List<StateMetadata> stateMetadataList;

public StreamExecGroupWindowAggregate(
ReadableConfig tableConfig,
int[] grouping,
Expand All @@ -155,6 +166,7 @@ public StreamExecGroupWindowAggregate(
window,
namedWindowProperties,
needRetraction,
StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME),
Collections.singletonList(inputProperty),
outputType,
description);
Expand All @@ -171,6 +183,7 @@ public StreamExecGroupWindowAggregate(
@JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
NamedWindowProperty[] namedWindowProperties,
@JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
@Nullable @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> stateMetadataList,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
Expand All @@ -181,6 +194,7 @@ public StreamExecGroupWindowAggregate(
this.window = checkNotNull(window);
this.namedWindowProperties = checkNotNull(namedWindowProperties);
this.needRetraction = needRetraction;
this.stateMetadataList = stateMetadataList;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -263,10 +277,13 @@ protected Transformation<RowData> translateToPlanInternal(
final LogicalType[] aggValueTypes = extractLogicalTypes(aggInfoList.getActualValueTypes());
final LogicalType[] accTypes = extractLogicalTypes(aggInfoList.getAccTypes());
final int inputCountIndex = aggInfoList.getIndexOfCountStar();
final long stateRetentionTime =
StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList);

final WindowOperator<?, ?> operator =
createWindowOperator(
config,
stateRetentionTime,
aggCodeGenerator,
equaliser,
accTypes,
Expand Down Expand Up @@ -368,6 +385,7 @@ private GeneratedClass<?> createAggsHandler(

private WindowOperator<?, ?> createWindowOperator(
ReadableConfig config,
long stateRetentionTime,
GeneratedClass<?> aggsHandler,
GeneratedRecordEqualiser recordEqualiser,
LogicalType[] accTypes,
Expand All @@ -381,7 +399,8 @@ private GeneratedClass<?> createAggsHandler(
WindowOperatorBuilder.builder()
.withInputFields(inputFields)
.withShiftTimezone(shiftTimeZone)
.withInputCountIndex(inputCountIndex);
.withInputCountIndex(inputCountIndex)
.withStateRetentionTime(stateRetentionTime);

if (window instanceof TumblingGroupWindow) {
TumblingGroupWindow tumblingWindow = (TumblingGroupWindow) window;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public class AggregateWindowOperator<K, W extends Window> extends WindowOperator
boolean produceUpdates,
long allowedLateness,
ZoneId shiftTimeZone,
int inputCountIndex) {
int inputCountIndex,
long stateRetentionTime) {
super(
windowAggregator,
windowAssigner,
Expand All @@ -98,7 +99,8 @@ public class AggregateWindowOperator<K, W extends Window> extends WindowOperator
produceUpdates,
allowedLateness,
shiftTimeZone,
inputCountIndex);
inputCountIndex,
stateRetentionTime);
this.aggWindowAggregator = windowAggregator;
this.equaliser = checkNotNull(equaliser);
}
Expand All @@ -117,7 +119,8 @@ public class AggregateWindowOperator<K, W extends Window> extends WindowOperator
boolean sendRetraction,
long allowedLateness,
ZoneId shiftTimeZone,
int inputCountIndex) {
int inputCountIndex,
long stateRetentionTime) {
super(
windowAssigner,
trigger,
Expand All @@ -130,7 +133,8 @@ public class AggregateWindowOperator<K, W extends Window> extends WindowOperator
sendRetraction,
allowedLateness,
shiftTimeZone,
inputCountIndex);
inputCountIndex,
stateRetentionTime);
this.generatedAggWindowAggregator = generatedAggWindowAggregator;
this.generatedEqualiser = checkNotNull(generatedEqualiser);
}
Expand Down Expand Up @@ -171,7 +175,7 @@ protected void emitWindowResult(W window) throws Exception {
// has emitted result for the window
if (previousAggResult != null) {
// current agg is not equal to the previous emitted, should emit retract
if (!equaliser.equals(aggResult, previousAggResult)) {
if (stateRetentionTime > 0 || !equaliser.equals(aggResult, previousAggResult)) {
// send UPDATE_BEFORE
collect(
RowKind.UPDATE_BEFORE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public class TableAggregateWindowOperator<K, W extends Window> extends WindowOpe
boolean produceUpdates,
long allowedLateness,
ZoneId shiftTimeZone,
int inputCountIndex) {
int inputCountIndex,
long stateRetentionTime) {
super(
windowTableAggregator,
windowAssigner,
Expand All @@ -82,7 +83,8 @@ public class TableAggregateWindowOperator<K, W extends Window> extends WindowOpe
produceUpdates,
allowedLateness,
shiftTimeZone,
inputCountIndex);
inputCountIndex,
stateRetentionTime);
this.tableAggWindowAggregator = windowTableAggregator;
}

Expand All @@ -99,7 +101,8 @@ public class TableAggregateWindowOperator<K, W extends Window> extends WindowOpe
boolean sendRetraction,
long allowedLateness,
ZoneId shiftTimeZone,
int inputCountIndex) {
int inputCountIndex,
long stateRetentionTime) {
super(
windowAssigner,
trigger,
Expand All @@ -112,7 +115,8 @@ public class TableAggregateWindowOperator<K, W extends Window> extends WindowOpe
sendRetraction,
allowedLateness,
shiftTimeZone,
inputCountIndex);
inputCountIndex,
stateRetentionTime);
this.generatedTableAggWindowAggregator = generatedTableAggWindowAggregator;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
/** Used to count the number of added and retracted input records. */
protected final RecordCounter recordCounter;

/** State idle retention time which unit is MILLISECONDS. */
protected final long stateRetentionTime;

// --------------------------------------------------------------------------------

protected NamespaceAggsHandleFunctionBase<W> windowAggregator;
Expand Down Expand Up @@ -188,7 +191,8 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
boolean produceUpdates,
long allowedLateness,
ZoneId shiftTimeZone,
int inputCountIndex) {
int inputCountIndex,
long stateRetentionTime) {
checkArgument(allowedLateness >= 0);
this.windowAggregator = checkNotNull(windowAggregator);
this.windowAssigner = checkNotNull(windowAssigner);
Expand All @@ -206,6 +210,7 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
this.rowtimeIndex = rowtimeIndex;
this.shiftTimeZone = shiftTimeZone;
this.recordCounter = RecordCounter.of(inputCountIndex);
this.stateRetentionTime = stateRetentionTime;
}

@Override
Expand All @@ -225,7 +230,8 @@ public boolean useInterruptibleTimers() {
boolean produceUpdates,
long allowedLateness,
ZoneId shiftTimeZone,
int inputCountIndex) {
int inputCountIndex,
long stateRetentionTime) {
checkArgument(allowedLateness >= 0);
this.windowAssigner = checkNotNull(windowAssigner);
this.trigger = checkNotNull(trigger);
Expand All @@ -242,6 +248,7 @@ public boolean useInterruptibleTimers() {
this.rowtimeIndex = rowtimeIndex;
this.shiftTimeZone = shiftTimeZone;
this.recordCounter = RecordCounter.of(inputCountIndex);
this.stateRetentionTime = stateRetentionTime;
}

protected abstract void compileGeneratedCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class WindowOperatorBuilder {
protected int rowtimeIndex = -1;
protected ZoneId shiftTimeZone = ZoneId.of("UTC");
protected int inputCountIndex = -1;
protected long stateRetentionTime;

public static WindowOperatorBuilder builder() {
return new WindowOperatorBuilder();
Expand All @@ -95,6 +96,11 @@ public WindowOperatorBuilder withShiftTimezone(ZoneId shiftTimeZone) {
return this;
}

public WindowOperatorBuilder withStateRetentionTime(long stateRetentionTime) {
this.stateRetentionTime = stateRetentionTime;
return this;
}

public WindowOperatorBuilder tumble(Duration size) {
checkArgument(windowAssigner == null);
this.windowAssigner = TumblingWindowAssigner.of(size);
Expand Down Expand Up @@ -303,7 +309,8 @@ public WindowOperator build() {
windowOperatorBuilder.produceUpdates,
windowOperatorBuilder.allowedLateness,
windowOperatorBuilder.shiftTimeZone,
windowOperatorBuilder.inputCountIndex);
windowOperatorBuilder.inputCountIndex,
windowOperatorBuilder.stateRetentionTime);
} else {
//noinspection unchecked
return new TableAggregateWindowOperator(
Expand All @@ -320,7 +327,8 @@ public WindowOperator build() {
windowOperatorBuilder.produceUpdates,
windowOperatorBuilder.allowedLateness,
windowOperatorBuilder.shiftTimeZone,
windowOperatorBuilder.inputCountIndex);
windowOperatorBuilder.inputCountIndex,
windowOperatorBuilder.stateRetentionTime);
}
}
}
Expand Down Expand Up @@ -370,7 +378,8 @@ public AggregateWindowOperator build() {
windowOperatorBuilder.produceUpdates,
windowOperatorBuilder.allowedLateness,
windowOperatorBuilder.shiftTimeZone,
windowOperatorBuilder.inputCountIndex);
windowOperatorBuilder.inputCountIndex,
windowOperatorBuilder.stateRetentionTime);
} else {
//noinspection unchecked
return new AggregateWindowOperator(
Expand All @@ -388,7 +397,8 @@ public AggregateWindowOperator build() {
windowOperatorBuilder.produceUpdates,
windowOperatorBuilder.allowedLateness,
windowOperatorBuilder.shiftTimeZone,
windowOperatorBuilder.inputCountIndex);
windowOperatorBuilder.inputCountIndex,
windowOperatorBuilder.stateRetentionTime);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOp
sendRetraction,
allowedLateness,
UTC_ZONE_ID,
-1);
-1,
0);
return new KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>(
operator, keySelector, keyType);
} else {
Expand All @@ -243,7 +244,8 @@ KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOp
sendRetraction,
allowedLateness,
UTC_ZONE_ID,
-1);
-1,
0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add tests to check that a non zero stateRetentionTime works as well please.


return new KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>(
operator, keySelector, keyType);
Expand Down