Skip to content

Commit 2876249

Browse files
committed
[FLINK-34548][API] Supports sink-v2 Sink
1 parent 056660e commit 2876249

File tree

16 files changed

+711
-6
lines changed

16 files changed

+711
-6
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.dsv2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/**
24+
* Sink interface for DataStream api v2.
25+
*
26+
* <p>Note that this interface is just a placeholder because we haven't decided whether to use
27+
* sink-v2 based sink or design a new sink connector API for DataStream V2.
28+
*/
29+
@Experimental
30+
public interface Sink<T> {}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.dsv2;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
23+
/** Utils to create the DataStream V2 supported {@link Sink}. */
24+
@Experimental
25+
public class DataStreamV2SinkUtils {
26+
/**
27+
* Wrap a sink-v2 based sink to a DataStream V2 supported sink.
28+
*
29+
* @param sink The sink-v2 based sink to wrap.
30+
* @return The DataStream V2 supported sink.
31+
*/
32+
public static <T> Sink<T> wrapSink(org.apache.flink.api.connector.sink2.Sink<T> sink) {
33+
return new WrappedSink<>(sink);
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.api.connector.dsv2;
20+
21+
import org.apache.flink.annotation.Internal;
22+
23+
/** A simple {@link Sink} implementation that wrap a sink-v2 based sink. */
24+
@Internal
25+
public class WrappedSink<T> implements Sink<T> {
26+
org.apache.flink.api.connector.sink2.Sink<T> wrappedSink;
27+
28+
public WrappedSink(org.apache.flink.api.connector.sink2.Sink<T> wrappedSink) {
29+
this.wrappedSink = wrappedSink;
30+
}
31+
32+
public org.apache.flink.api.connector.sink2.Sink<T> getWrappedSink() {
33+
return wrappedSink;
34+
}
35+
}

flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.datastream.api.stream;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.connector.dsv2.Sink;
2223
import org.apache.flink.api.java.functions.KeySelector;
2324
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
2425
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
@@ -78,7 +79,7 @@ <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
7879
*/
7980
BroadcastStream<T> broadcast();
8081

81-
// TODO add toSink method.
82+
void toSink(Sink<T> sink);
8283

8384
/**
8485
* This class represents a combination of two {@link GlobalStream}. It will be used as the

flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.datastream.api.stream;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.connector.dsv2.Sink;
2223
import org.apache.flink.api.java.functions.KeySelector;
2324
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
2425
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
@@ -194,7 +195,7 @@ <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
194195
*/
195196
BroadcastStream<T> broadcast();
196197

197-
// TODO add toSink method.
198+
void toSink(Sink<T> sink);
198199

199200
/**
200201
* This class represents a combination of two {@link KeyedPartitionStream}. It will be used as

flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.datastream.api.stream;
2020

2121
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.api.connector.dsv2.Sink;
2223
import org.apache.flink.api.java.functions.KeySelector;
2324
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
2425
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
@@ -100,7 +101,7 @@ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
100101
*/
101102
BroadcastStream<T> broadcast();
102103

103-
// TODO add toSink method.
104+
void toSink(Sink<T> sink);
104105

105106
/**
106107
* This interface represents a combination of two {@link NonKeyedPartitionStream}. It will be

flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java

+17
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.flink.streaming.api.graph.StreamGraph;
4949
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
5050
import org.apache.flink.streaming.api.transformations.SourceTransformation;
51+
import org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator;
5152
import org.apache.flink.util.ExceptionUtils;
5253
import org.apache.flink.util.FlinkException;
5354
import org.apache.flink.util.Preconditions;
@@ -87,6 +88,18 @@ public class ExecutionEnvironmentImpl implements ExecutionEnvironment {
8788
*/
8889
private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;
8990

91+
static {
92+
try {
93+
// All transformation translator must be put to a map in StreamGraphGenerator, but
94+
// streaming-java is not depend on process-function module, using reflect to handle
95+
// this.
96+
DataStreamV2SinkTransformationTranslator.registerSinkTransformationTranslator();
97+
} catch (Exception e) {
98+
throw new RuntimeException(
99+
"Can not register process function transformation translator.");
100+
}
101+
}
102+
90103
/**
91104
* Create and return an instance of {@link ExecutionEnvironment}.
92105
*
@@ -202,6 +215,10 @@ public void setParallelism(int parallelism) {
202215
executionConfig.setParallelism(parallelism);
203216
}
204217

218+
public CheckpointConfig getCheckpointCfg() {
219+
return checkpointCfg;
220+
}
221+
205222
// -----------------------------------------------
206223
// Internal Methods
207224
// -----------------------------------------------

flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/GlobalStreamImpl.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.datastream.impl.stream;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.dsv2.Sink;
2223
import org.apache.flink.api.dag.Transformation;
2324
import org.apache.flink.api.java.functions.KeySelector;
2425
import org.apache.flink.api.java.tuple.Tuple2;
@@ -36,6 +37,7 @@
3637
import org.apache.flink.datastream.impl.utils.StreamUtils;
3738
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
3839
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
40+
import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
3941
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
4042
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
4143
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
@@ -98,7 +100,14 @@ public <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
98100
return new GlobalStreamImpl<>(environment, outTransformation);
99101
}
100102

101-
// TODO add toSink method.
103+
@Override
104+
public void toSink(Sink<T> sink) {
105+
DataStreamV2SinkTransformation<T, T> sinkTransformation =
106+
StreamUtils.addSinkOperator(this, sink, getType());
107+
// Operator parallelism should always be 1 for global stream.
108+
// parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler.
109+
sinkTransformation.setParallelism(1, true);
110+
}
102111

103112
// ---------------------
104113
// Partitioning

flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/KeyedPartitionStreamImpl.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.datastream.impl.stream;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.dsv2.Sink;
2223
import org.apache.flink.api.dag.Transformation;
2324
import org.apache.flink.api.java.functions.KeySelector;
2425
import org.apache.flink.api.java.tuple.Tuple2;
@@ -325,7 +326,10 @@ public KeySelector<V, K> getKeySelector() {
325326
return keySelector;
326327
}
327328

328-
// TODO add toSink method.
329+
@Override
330+
public void toSink(Sink<V> sink) {
331+
StreamUtils.addSinkOperator(this, sink, getType());
332+
}
329333

330334
// ---------------------
331335
// Partitioning

flink-datastream/src/main/java/org/apache/flink/datastream/impl/stream/NonKeyedPartitionStreamImpl.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.datastream.impl.stream;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.dsv2.Sink;
2223
import org.apache.flink.api.dag.Transformation;
2324
import org.apache.flink.api.java.functions.KeySelector;
2425
import org.apache.flink.api.java.tuple.Tuple2;
@@ -132,7 +133,10 @@ public <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
132133
return new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
133134
}
134135

135-
// TODO add toSink method.
136+
@Override
137+
public void toSink(Sink<T> sink) {
138+
StreamUtils.addSinkOperator(this, sink, getType());
139+
}
136140

137141
// ---------------------
138142
// Partitioning

flink-datastream/src/main/java/org/apache/flink/datastream/impl/utils/StreamUtils.java

+57
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.flink.datastream.impl.utils;
2020

2121
import org.apache.flink.api.common.typeinfo.TypeInformation;
22+
import org.apache.flink.api.connector.dsv2.Sink;
23+
import org.apache.flink.api.connector.dsv2.WrappedSink;
2224
import org.apache.flink.api.java.Utils;
2325
import org.apache.flink.api.java.functions.KeySelector;
2426
import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,10 +31,13 @@
2931
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
3032
import org.apache.flink.datastream.impl.stream.AbstractDataStream;
3133
import org.apache.flink.datastream.impl.stream.KeyedPartitionStreamImpl;
34+
import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
3235
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
3336
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
3437
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
38+
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
3539
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
40+
import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
3641
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
3742
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
3843

@@ -228,4 +233,56 @@ public static <IN1, IN2, OUT> TwoInputTransformation<IN1, IN2, OUT> getTwoInputT
228233

229234
return transform;
230235
}
236+
237+
/** Construct and return a new DataStream with one input operator. */
238+
public static <T, R> AbstractDataStream<R> transformOneInputOperator(
239+
String operatorName,
240+
AbstractDataStream<T> inputStream,
241+
TypeInformation<R> outTypeInfo,
242+
StreamOperatorFactory<R> operatorFactory) {
243+
// read the output type of the input Transform to coax out errors about MissingTypeInfo
244+
inputStream.getTransformation().getOutputType();
245+
246+
OneInputTransformation<T, R> resultTransform =
247+
new OneInputTransformation<>(
248+
inputStream.getTransformation(),
249+
operatorName,
250+
operatorFactory,
251+
outTypeInfo,
252+
inputStream.getEnvironment().getParallelism(),
253+
false);
254+
255+
NonKeyedPartitionStreamImpl<R> returnStream =
256+
new NonKeyedPartitionStreamImpl<>(inputStream.getEnvironment(), resultTransform);
257+
258+
inputStream.getEnvironment().addOperator(resultTransform);
259+
260+
return returnStream;
261+
}
262+
263+
/** Add sink operator to the input stream. */
264+
public static <T> DataStreamV2SinkTransformation<T, T> addSinkOperator(
265+
AbstractDataStream<T> inputStream, Sink<T> sink, TypeInformation<T> typeInformation) {
266+
// read the output type of the input Transform to coax out errors about MissingTypeInfo
267+
inputStream.getTransformation().getOutputType();
268+
269+
if (!(sink instanceof WrappedSink)) {
270+
throw new UnsupportedOperationException(
271+
"Unsupported type of sink, please use DataStreamV2SinkUtils to wrap a sink-v2 sink first.");
272+
}
273+
274+
org.apache.flink.api.connector.sink2.Sink<T> innerSink =
275+
((WrappedSink<T>) sink).getWrappedSink();
276+
277+
DataStreamV2SinkTransformation<T, T> sinkTransformation =
278+
new DataStreamV2SinkTransformation<>(
279+
inputStream,
280+
innerSink,
281+
typeInformation,
282+
"Sink",
283+
inputStream.getEnvironment().getParallelism(),
284+
false);
285+
inputStream.getEnvironment().addOperator(sinkTransformation);
286+
return sinkTransformation;
287+
}
231288
}

0 commit comments

Comments
 (0)