Skip to content

Commit 4f71c5b

Browse files
committed
[FLINK-34548][API] Implement process function's underlying operators
1 parent e1147ca commit 4f71c5b

26 files changed

+2064
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl.common;
20+
21+
import org.apache.flink.api.java.functions.KeySelector;
22+
import org.apache.flink.util.ExceptionUtils;
23+
24+
import java.util.function.Supplier;
25+
26+
import static org.apache.flink.util.Preconditions.checkNotNull;
27+
28+
/**
29+
* This output checks whether the current key of the output record and the key extracted with a
30+
* specific key selector are exactly the same.
31+
*/
32+
public class KeyCheckedOutputCollector<KEY, OUT> extends TimestampCollector<OUT> {
33+
private final TimestampCollector<OUT> collector;
34+
35+
private final KeySelector<OUT, KEY> outKeySelector;
36+
37+
private final Supplier<KEY> currentKeyGetter;
38+
39+
public KeyCheckedOutputCollector(
40+
TimestampCollector<OUT> collector,
41+
KeySelector<OUT, KEY> outKeySelector,
42+
Supplier<KEY> currentKeyGetter) {
43+
this.collector = collector;
44+
this.outKeySelector = outKeySelector;
45+
this.currentKeyGetter = currentKeyGetter;
46+
}
47+
48+
@Override
49+
public void collect(OUT outputRecord) {
50+
checkOutputKey(outputRecord);
51+
collector.collect(outputRecord);
52+
}
53+
54+
@Override
55+
public void collectAndOverwriteTimestamp(OUT outputRecord, long timestamp) {
56+
checkOutputKey(outputRecord);
57+
collector.collectAndOverwriteTimestamp(outputRecord, timestamp);
58+
}
59+
60+
private void checkOutputKey(OUT outputRecord) {
61+
try {
62+
KEY currentKey = currentKeyGetter.get();
63+
KEY outputKey = checkNotNull(outKeySelector).getKey(outputRecord);
64+
if (!outputKey.equals(currentKey)) {
65+
throw new IllegalStateException(
66+
"Output key must equals to input key if the output key selector is not null.");
67+
}
68+
} catch (Exception e) {
69+
ExceptionUtils.rethrow(e);
70+
}
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl.common;
20+
21+
import org.apache.flink.datastream.api.common.Collector;
22+
import org.apache.flink.streaming.api.operators.Output;
23+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
24+
25+
/** The default implementation of {@link Collector}. */
26+
public class OutputCollector<OUT> extends TimestampCollector<OUT> {
27+
private final Output<StreamRecord<OUT>> output;
28+
29+
public OutputCollector(Output<StreamRecord<OUT>> output) {
30+
this.output = output;
31+
}
32+
33+
@Override
34+
public void collect(OUT outputRecord) {
35+
output.collect(reuse.replace(outputRecord));
36+
}
37+
38+
@Override
39+
public void collectAndOverwriteTimestamp(OUT record, long timestamp) {
40+
setTimestamp(timestamp);
41+
output.collect(reuse.replace(record));
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl.common;
20+
21+
import org.apache.flink.datastream.api.common.Collector;
22+
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
23+
24+
/** The base {@link Collector} which take care of records timestamp. */
25+
public abstract class TimestampCollector<OUT> implements Collector<OUT> {
26+
protected final StreamRecord<OUT> reuse = new StreamRecord<>(null);
27+
28+
public void setTimestampFromStreamRecord(StreamRecord<?> timestampBase) {
29+
if (timestampBase.hasTimestamp()) {
30+
setTimestamp(timestampBase.getTimestamp());
31+
} else {
32+
eraseTimestamp();
33+
}
34+
}
35+
36+
public void setTimestamp(long timestamp) {
37+
reuse.setTimestamp(timestamp);
38+
}
39+
40+
public void eraseTimestamp() {
41+
reuse.eraseTimestamp();
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl.context;
20+
21+
import org.apache.flink.datastream.api.context.NonPartitionedContext;
22+
import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
23+
24+
/** The default implementation of {@link NonPartitionedContext}. */
25+
public class DefaultNonPartitionedContext<OUT> implements NonPartitionedContext<OUT> {
26+
27+
@Override
28+
public void applyToAllPartitions(ApplyPartitionFunction<OUT> applyPartitionFunction) {
29+
// TODO implements this method.
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl.context;
20+
21+
import org.apache.flink.datastream.api.context.RuntimeContext;
22+
23+
/** The default implementation of {@link RuntimeContext}. */
24+
public class DefaultRuntimeContext implements RuntimeContext {
25+
// TODO implements this class
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl.context;
20+
21+
import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
22+
import org.apache.flink.datastream.api.function.TwoOutputApplyPartitionFunction;
23+
24+
/** The default implementation of {@link TwoOutputNonPartitionedContext}. */
25+
public class DefaultTwoOutputNonPartitionedContext<OUT1, OUT2>
26+
implements TwoOutputNonPartitionedContext<OUT1, OUT2> {
27+
@Override
28+
public void applyToAllPartitions(
29+
TwoOutputApplyPartitionFunction<OUT1, OUT2> applyPartitionFunction) {
30+
// TODO implements this method.
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl.operators;
20+
21+
import org.apache.flink.api.java.functions.KeySelector;
22+
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
23+
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
24+
import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
25+
import org.apache.flink.datastream.impl.common.OutputCollector;
26+
import org.apache.flink.datastream.impl.common.TimestampCollector;
27+
28+
import javax.annotation.Nullable;
29+
30+
/** Operator for {@link OneInputStreamProcessFunction} in {@link KeyedPartitionStream}. */
31+
public class KeyedProcessOperator<KEY, IN, OUT> extends ProcessOperator<IN, OUT> {
32+
33+
@Nullable private final KeySelector<OUT, KEY> outKeySelector;
34+
35+
public KeyedProcessOperator(OneInputStreamProcessFunction<IN, OUT> userFunction) {
36+
this(userFunction, null);
37+
}
38+
39+
public KeyedProcessOperator(
40+
OneInputStreamProcessFunction<IN, OUT> userFunction,
41+
@Nullable KeySelector<OUT, KEY> outKeySelector) {
42+
super(userFunction);
43+
this.outKeySelector = outKeySelector;
44+
}
45+
46+
@Override
47+
protected TimestampCollector<OUT> getOutputCollector() {
48+
return outKeySelector != null
49+
? new KeyCheckedOutputCollector<>(
50+
new OutputCollector<>(output), outKeySelector, () -> (KEY) getCurrentKey())
51+
: new OutputCollector<>(output);
52+
}
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.datastream.impl.operators;
20+
21+
import org.apache.flink.api.java.functions.KeySelector;
22+
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
23+
import org.apache.flink.datastream.api.stream.KeyedPartitionStream;
24+
import org.apache.flink.datastream.impl.common.KeyCheckedOutputCollector;
25+
import org.apache.flink.datastream.impl.common.OutputCollector;
26+
import org.apache.flink.datastream.impl.common.TimestampCollector;
27+
28+
import javax.annotation.Nullable;
29+
30+
/** Operator for {@link TwoInputBroadcastStreamProcessFunction} in {@link KeyedPartitionStream}. */
31+
public class KeyedTwoInputBroadcastProcessOperator<KEY, IN1, IN2, OUT>
32+
extends TwoInputBroadcastProcessOperator<IN1, IN2, OUT> {
33+
@Nullable private final KeySelector<OUT, KEY> outKeySelector;
34+
35+
public KeyedTwoInputBroadcastProcessOperator(
36+
TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> userFunction) {
37+
this(userFunction, null);
38+
}
39+
40+
public KeyedTwoInputBroadcastProcessOperator(
41+
TwoInputBroadcastStreamProcessFunction<IN1, IN2, OUT> userFunction,
42+
@Nullable KeySelector<OUT, KEY> outKeySelector) {
43+
super(userFunction);
44+
this.outKeySelector = outKeySelector;
45+
}
46+
47+
@Override
48+
protected TimestampCollector<OUT> getOutputCollector() {
49+
return outKeySelector == null
50+
? new OutputCollector<>(output)
51+
: new KeyCheckedOutputCollector<>(
52+
new OutputCollector<>(output), outKeySelector, () -> (KEY) getCurrentKey());
53+
}
54+
}

0 commit comments

Comments
 (0)