Skip to content

Commit 87ea276

Browse files
committed
[FLINK-37955] Don't load all records into main memory to avoid OOM
1 parent c80a1f3 commit 87ea276

File tree

3 files changed

+338
-92
lines changed

3 files changed

+338
-92
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/stream/AbstractStreamingJoinOperator.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,16 @@
2626
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
2727
import org.apache.flink.table.runtime.generated.JoinCondition;
2828
import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
29+
import org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateView;
30+
import org.apache.flink.table.runtime.operators.join.stream.state.OuterJoinRecordStateView;
2931
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
3032
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
3133

34+
import java.util.Iterator;
35+
import java.util.function.Function;
36+
37+
import static org.apache.flink.util.Preconditions.checkState;
38+
3239
/**
3340
* Abstract implementation for streaming unbounded Join operator which defines some member fields
3441
* can be shared between different implementations.
@@ -94,4 +101,84 @@ public void close() throws Exception {
94101
joinCondition.close();
95102
}
96103
}
104+
105+
/** Wrap the passed iterator with filter and mapper. */
106+
private static <I, O> Iterator<O> collect(
107+
final Iterator<I> delegate, Function<I, Boolean> matcher, Function<I, O> mapper) {
108+
return new Iterator<O>() {
109+
private I next;
110+
111+
@Override
112+
public boolean hasNext() {
113+
advance();
114+
return next != null;
115+
}
116+
117+
@Override
118+
public O next() {
119+
checkState(hasNext());
120+
I tmp = next;
121+
next = null;
122+
return mapper.apply(tmp);
123+
}
124+
125+
private void advance() {
126+
while (next == null && delegate.hasNext()) {
127+
I record = delegate.next();
128+
if (matcher.apply(record)) {
129+
next = record;
130+
}
131+
}
132+
}
133+
};
134+
}
135+
136+
/** Creates an {@link Iterator} over the records associated to the input row. */
137+
protected static Iterator<OuterRecord> iterator(
138+
RowData input,
139+
boolean inputIsLeft,
140+
JoinRecordStateView otherSideStateView,
141+
JoinCondition condition)
142+
throws Exception {
143+
if (otherSideStateView instanceof OuterJoinRecordStateView) {
144+
return collect(
145+
((OuterJoinRecordStateView) otherSideStateView)
146+
.getRecordsAndNumOfAssociations()
147+
.iterator(),
148+
record ->
149+
inputIsLeft
150+
? condition.apply(input, record.f0)
151+
: condition.apply(record.f0, input),
152+
record -> new OuterRecord(record.f0, record.f1));
153+
} else {
154+
return collect(
155+
otherSideStateView.getRecords().iterator(),
156+
record ->
157+
inputIsLeft
158+
? condition.apply(input, record)
159+
: condition.apply(record, input),
160+
// use -1 as the default number of associations
161+
record -> new OuterRecord(record, -1));
162+
}
163+
}
164+
165+
/**
166+
* An {@link OuterRecord} is a composite of record and {@code numOfAssociations}. The {@code
167+
* numOfAssociations} represents the number of associated records in the other side. It is used
168+
* when the record is from outer side (e.g. left side in LEFT OUTER JOIN). When the {@code
169+
* numOfAssociations} is ZERO, we need to send a null padding row. This is useful to avoid
170+
* recompute the associated numbers every time.
171+
*
172+
* <p>When the record is from inner side (e.g. right side in LEFT OUTER JOIN), the {@code
173+
* numOfAssociations} will always be {@code -1}.
174+
*/
175+
protected static final class OuterRecord {
176+
public final RowData record;
177+
public final int numOfAssociations;
178+
179+
private OuterRecord(RowData record, int numOfAssociations) {
180+
this.record = record;
181+
this.numOfAssociations = numOfAssociations;
182+
}
183+
}
97184
}

0 commit comments

Comments
 (0)