Skip to content

Commit 104085f

Browse files
liuyongvsyongliu
authored andcommitted
[FLINK-38228][table] Respect state TTL in window operator; prevent downstream premature expiry
1 parent 00241a6 commit 104085f

File tree

6 files changed

+64
-18
lines changed

6 files changed

+64
-18
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
4141
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
4242
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
43+
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
4344
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
4445
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
4546
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
@@ -65,6 +66,7 @@
6566
import org.apache.flink.table.types.logical.RowType;
6667

6768
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
69+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
6870
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
6971

7072
import org.apache.calcite.rel.core.AggregateCall;
@@ -73,6 +75,8 @@
7375
import org.slf4j.Logger;
7476
import org.slf4j.LoggerFactory;
7577

78+
import javax.annotation.Nullable;
79+
7680
import java.time.Duration;
7781
import java.time.ZoneId;
7882
import java.util.Arrays;
@@ -135,6 +139,13 @@ public class StreamExecGroupWindowAggregate extends StreamExecAggregateBase {
135139
@JsonProperty(FIELD_NAME_NEED_RETRACTION)
136140
private final boolean needRetraction;
137141

142+
public static final String STATE_NAME = "groupWindowAggregateState";
143+
144+
@Nullable
145+
@JsonProperty(FIELD_NAME_STATE)
146+
@JsonInclude(JsonInclude.Include.NON_NULL)
147+
private final List<StateMetadata> stateMetadataList;
148+
138149
public StreamExecGroupWindowAggregate(
139150
ReadableConfig tableConfig,
140151
int[] grouping,
@@ -155,6 +166,7 @@ public StreamExecGroupWindowAggregate(
155166
window,
156167
namedWindowProperties,
157168
needRetraction,
169+
StateMetadata.getOneInputOperatorDefaultMeta(tableConfig, STATE_NAME),
158170
Collections.singletonList(inputProperty),
159171
outputType,
160172
description);
@@ -171,6 +183,7 @@ public StreamExecGroupWindowAggregate(
171183
@JsonProperty(FIELD_NAME_NAMED_WINDOW_PROPERTIES)
172184
NamedWindowProperty[] namedWindowProperties,
173185
@JsonProperty(FIELD_NAME_NEED_RETRACTION) boolean needRetraction,
186+
@Nullable @JsonProperty(FIELD_NAME_STATE) List<StateMetadata> stateMetadataList,
174187
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
175188
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
176189
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
@@ -181,6 +194,7 @@ public StreamExecGroupWindowAggregate(
181194
this.window = checkNotNull(window);
182195
this.namedWindowProperties = checkNotNull(namedWindowProperties);
183196
this.needRetraction = needRetraction;
197+
this.stateMetadataList = stateMetadataList;
184198
}
185199

186200
@SuppressWarnings("unchecked")
@@ -263,10 +277,13 @@ protected Transformation<RowData> translateToPlanInternal(
263277
final LogicalType[] aggValueTypes = extractLogicalTypes(aggInfoList.getActualValueTypes());
264278
final LogicalType[] accTypes = extractLogicalTypes(aggInfoList.getAccTypes());
265279
final int inputCountIndex = aggInfoList.getIndexOfCountStar();
280+
final long stateRetentionTime =
281+
StateMetadata.getStateTtlForOneInputOperator(config, stateMetadataList);
266282

267283
final WindowOperator<?, ?> operator =
268284
createWindowOperator(
269285
config,
286+
stateRetentionTime,
270287
aggCodeGenerator,
271288
equaliser,
272289
accTypes,
@@ -368,6 +385,7 @@ private GeneratedClass<?> createAggsHandler(
368385

369386
private WindowOperator<?, ?> createWindowOperator(
370387
ReadableConfig config,
388+
long stateRetentionTime,
371389
GeneratedClass<?> aggsHandler,
372390
GeneratedRecordEqualiser recordEqualiser,
373391
LogicalType[] accTypes,
@@ -381,7 +399,8 @@ private GeneratedClass<?> createAggsHandler(
381399
WindowOperatorBuilder.builder()
382400
.withInputFields(inputFields)
383401
.withShiftTimezone(shiftTimeZone)
384-
.withInputCountIndex(inputCountIndex);
402+
.withInputCountIndex(inputCountIndex)
403+
.withStateRetentionTime(stateRetentionTime);
385404

386405
if (window instanceof TumblingGroupWindow) {
387406
TumblingGroupWindow tumblingWindow = (TumblingGroupWindow) window;

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/AggregateWindowOperator.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public class AggregateWindowOperator<K, W extends Window> extends WindowOperator
8484
boolean produceUpdates,
8585
long allowedLateness,
8686
ZoneId shiftTimeZone,
87-
int inputCountIndex) {
87+
int inputCountIndex,
88+
long stateRetentionTime) {
8889
super(
8990
windowAggregator,
9091
windowAssigner,
@@ -98,7 +99,8 @@ public class AggregateWindowOperator<K, W extends Window> extends WindowOperator
9899
produceUpdates,
99100
allowedLateness,
100101
shiftTimeZone,
101-
inputCountIndex);
102+
inputCountIndex,
103+
stateRetentionTime);
102104
this.aggWindowAggregator = windowAggregator;
103105
this.equaliser = checkNotNull(equaliser);
104106
}
@@ -117,7 +119,8 @@ public class AggregateWindowOperator<K, W extends Window> extends WindowOperator
117119
boolean sendRetraction,
118120
long allowedLateness,
119121
ZoneId shiftTimeZone,
120-
int inputCountIndex) {
122+
int inputCountIndex,
123+
long stateRetentionTime) {
121124
super(
122125
windowAssigner,
123126
trigger,
@@ -130,7 +133,8 @@ public class AggregateWindowOperator<K, W extends Window> extends WindowOperator
130133
sendRetraction,
131134
allowedLateness,
132135
shiftTimeZone,
133-
inputCountIndex);
136+
inputCountIndex,
137+
stateRetentionTime);
134138
this.generatedAggWindowAggregator = generatedAggWindowAggregator;
135139
this.generatedEqualiser = checkNotNull(generatedEqualiser);
136140
}
@@ -171,7 +175,7 @@ protected void emitWindowResult(W window) throws Exception {
171175
// has emitted result for the window
172176
if (previousAggResult != null) {
173177
// current agg is not equal to the previous emitted, should emit retract
174-
if (!equaliser.equals(aggResult, previousAggResult)) {
178+
if (stateRetentionTime > 0 || !equaliser.equals(aggResult, previousAggResult)) {
175179
// send UPDATE_BEFORE
176180
collect(
177181
RowKind.UPDATE_BEFORE,

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/TableAggregateWindowOperator.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public class TableAggregateWindowOperator<K, W extends Window> extends WindowOpe
6868
boolean produceUpdates,
6969
long allowedLateness,
7070
ZoneId shiftTimeZone,
71-
int inputCountIndex) {
71+
int inputCountIndex,
72+
long stateRetentionTime) {
7273
super(
7374
windowTableAggregator,
7475
windowAssigner,
@@ -82,7 +83,8 @@ public class TableAggregateWindowOperator<K, W extends Window> extends WindowOpe
8283
produceUpdates,
8384
allowedLateness,
8485
shiftTimeZone,
85-
inputCountIndex);
86+
inputCountIndex,
87+
stateRetentionTime);
8688
this.tableAggWindowAggregator = windowTableAggregator;
8789
}
8890

@@ -99,7 +101,8 @@ public class TableAggregateWindowOperator<K, W extends Window> extends WindowOpe
99101
boolean sendRetraction,
100102
long allowedLateness,
101103
ZoneId shiftTimeZone,
102-
int inputCountIndex) {
104+
int inputCountIndex,
105+
long stateRetentionTime) {
103106
super(
104107
windowAssigner,
105108
trigger,
@@ -112,7 +115,8 @@ public class TableAggregateWindowOperator<K, W extends Window> extends WindowOpe
112115
sendRetraction,
113116
allowedLateness,
114117
shiftTimeZone,
115-
inputCountIndex);
118+
inputCountIndex,
119+
stateRetentionTime);
116120
this.generatedTableAggWindowAggregator = generatedTableAggWindowAggregator;
117121
}
118122

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperator.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
148148
/** Used to count the number of added and retracted input records. */
149149
protected final RecordCounter recordCounter;
150150

151+
/** State idle retention time which unit is MILLISECONDS. */
152+
protected final long stateRetentionTime;
153+
151154
// --------------------------------------------------------------------------------
152155

153156
protected NamespaceAggsHandleFunctionBase<W> windowAggregator;
@@ -188,7 +191,8 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
188191
boolean produceUpdates,
189192
long allowedLateness,
190193
ZoneId shiftTimeZone,
191-
int inputCountIndex) {
194+
int inputCountIndex,
195+
long stateRetentionTime) {
192196
checkArgument(allowedLateness >= 0);
193197
this.windowAggregator = checkNotNull(windowAggregator);
194198
this.windowAssigner = checkNotNull(windowAssigner);
@@ -206,6 +210,7 @@ public abstract class WindowOperator<K, W extends Window> extends AbstractStream
206210
this.rowtimeIndex = rowtimeIndex;
207211
this.shiftTimeZone = shiftTimeZone;
208212
this.recordCounter = RecordCounter.of(inputCountIndex);
213+
this.stateRetentionTime = stateRetentionTime;
209214
}
210215

211216
@Override
@@ -225,7 +230,8 @@ public boolean useInterruptibleTimers() {
225230
boolean produceUpdates,
226231
long allowedLateness,
227232
ZoneId shiftTimeZone,
228-
int inputCountIndex) {
233+
int inputCountIndex,
234+
long stateRetentionTime) {
229235
checkArgument(allowedLateness >= 0);
230236
this.windowAssigner = checkNotNull(windowAssigner);
231237
this.trigger = checkNotNull(trigger);
@@ -242,6 +248,7 @@ public boolean useInterruptibleTimers() {
242248
this.rowtimeIndex = rowtimeIndex;
243249
this.shiftTimeZone = shiftTimeZone;
244250
this.recordCounter = RecordCounter.of(inputCountIndex);
251+
this.stateRetentionTime = stateRetentionTime;
245252
}
246253

247254
protected abstract void compileGeneratedCode();

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorBuilder.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public class WindowOperatorBuilder {
7575
protected int rowtimeIndex = -1;
7676
protected ZoneId shiftTimeZone = ZoneId.of("UTC");
7777
protected int inputCountIndex = -1;
78+
protected long stateRetentionTime;
7879

7980
public static WindowOperatorBuilder builder() {
8081
return new WindowOperatorBuilder();
@@ -95,6 +96,11 @@ public WindowOperatorBuilder withShiftTimezone(ZoneId shiftTimeZone) {
9596
return this;
9697
}
9798

99+
public WindowOperatorBuilder withStateRetentionTime(long stateRetentionTime) {
100+
this.stateRetentionTime = stateRetentionTime;
101+
return this;
102+
}
103+
98104
public WindowOperatorBuilder tumble(Duration size) {
99105
checkArgument(windowAssigner == null);
100106
this.windowAssigner = TumblingWindowAssigner.of(size);
@@ -303,7 +309,8 @@ public WindowOperator build() {
303309
windowOperatorBuilder.produceUpdates,
304310
windowOperatorBuilder.allowedLateness,
305311
windowOperatorBuilder.shiftTimeZone,
306-
windowOperatorBuilder.inputCountIndex);
312+
windowOperatorBuilder.inputCountIndex,
313+
windowOperatorBuilder.stateRetentionTime);
307314
} else {
308315
//noinspection unchecked
309316
return new TableAggregateWindowOperator(
@@ -320,7 +327,8 @@ public WindowOperator build() {
320327
windowOperatorBuilder.produceUpdates,
321328
windowOperatorBuilder.allowedLateness,
322329
windowOperatorBuilder.shiftTimeZone,
323-
windowOperatorBuilder.inputCountIndex);
330+
windowOperatorBuilder.inputCountIndex,
331+
windowOperatorBuilder.stateRetentionTime);
324332
}
325333
}
326334
}
@@ -370,7 +378,8 @@ public AggregateWindowOperator build() {
370378
windowOperatorBuilder.produceUpdates,
371379
windowOperatorBuilder.allowedLateness,
372380
windowOperatorBuilder.shiftTimeZone,
373-
windowOperatorBuilder.inputCountIndex);
381+
windowOperatorBuilder.inputCountIndex,
382+
windowOperatorBuilder.stateRetentionTime);
374383
} else {
375384
//noinspection unchecked
376385
return new AggregateWindowOperator(
@@ -388,7 +397,8 @@ public AggregateWindowOperator build() {
388397
windowOperatorBuilder.produceUpdates,
389398
windowOperatorBuilder.allowedLateness,
390399
windowOperatorBuilder.shiftTimeZone,
391-
windowOperatorBuilder.inputCountIndex);
400+
windowOperatorBuilder.inputCountIndex,
401+
windowOperatorBuilder.stateRetentionTime);
392402
}
393403
}
394404
}

flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/window/groupwindow/operator/WindowOperatorContractTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOp
225225
sendRetraction,
226226
allowedLateness,
227227
UTC_ZONE_ID,
228-
-1);
228+
-1,
229+
0);
229230
return new KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>(
230231
operator, keySelector, keyType);
231232
} else {
@@ -243,7 +244,8 @@ KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> createWindowOp
243244
sendRetraction,
244245
allowedLateness,
245246
UTC_ZONE_ID,
246-
-1);
247+
-1,
248+
0);
247249

248250
return new KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData>(
249251
operator, keySelector, keyType);

0 commit comments

Comments
 (0)