Skip to content

Commit d8f38fd

Browse files
set withIdleness as parameter
1 parent 5ebeb26 commit d8f38fd

File tree

6 files changed

+14
-10
lines changed

6 files changed

+14
-10
lines changed

.github/workflows/build.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
GPG_PASSWORD: ${{ secrets.GPG_KEY_PASSWORD }}
3131
GPG_SECRET: ${{ secrets.GPG_SECRET_KEY }}
3232
- name: Push Maven Artifacts to SonaType (only on Master)
33-
if: ${{ github.ref == 'refs/heads/master' }}
33+
# if: ${{ github.ref == 'refs/heads/master' }}
3434
uses: gradle/gradle-build-action@0d13054264b0bb894ded474f08ebb30921341cee
3535
with:
3636
arguments: publishToSonatype closeAndReleaseSonatypeStagingRepository --info

core/src/main/java/com/airwallex/airskiff/flink/AbstractFlinkCompiler.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,14 @@ public abstract class AbstractFlinkCompiler implements Compiler<DataStream<?>> {
4848
protected final StreamExecutionEnvironment env;
4949
protected final StreamTableEnvironment tableEnv;
5050
protected final Duration allowedLatency;
51+
protected final Duration withIdleness;
5152

52-
public AbstractFlinkCompiler(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, Duration allowedLatency) {
53+
public AbstractFlinkCompiler(StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, Duration allowedLatency,
54+
Duration withIdleness) {
5355
this.env = env;
5456
this.tableEnv = tableEnv;
5557
this.allowedLatency = allowedLatency;
58+
this.withIdleness = withIdleness;
5659
Utils.registerFunctions(this.tableEnv);
5760
}
5861

@@ -95,7 +98,7 @@ protected <K, T, U> KeyedStream<Tuple2<Long, Pair<K, Pair<T, U>>>, K> compileLef
9598

9699
final LeftJoinPairMonoid<T, U> m = new LeftJoinPairMonoid<>();
97100
DataStream<Tuple2<Long, Pair<K, Pair<T, U>>>> ss = ks2.map(x -> new Tuple2<>(x.f0, new Pair<>(x.f1.l, new Pair<T, U>(null, x.f1.r))), outputType).union(ks1.map(x -> new Tuple2<>(x.f0, new Pair<>(x.f1.l, new Pair<T, U>(x.f1.r, null))), outputType));
98-
return ss.assignTimestampsAndWatermarks(Utils.watermark(isBatch(), Duration.ZERO)).keyBy(t -> t.f1.l, kType)
101+
return ss.assignTimestampsAndWatermarks(Utils.watermark(isBatch(), Duration.ZERO, Duration.ofMillis(300))).keyBy(t -> t.f1.l, kType)
99102
// a window to make sure if we have multiple events happening
100103
// at the same time, U is always put before T in batch mode
101104
.window(TumblingEventTimeWindows.of(Time.days(1))).allowedLateness(Time.days(1)).process(new ProcessWindowFunction<Tuple2<Long, Pair<K, Pair<T, U>>>, Tuple2<Long, Pair<K, Pair<T, U>>>, K, TimeWindow>() {
@@ -179,7 +182,8 @@ protected <K, T> KeyedStream<Tuple2<Long, Pair<K, T>>, K> compileOrderedSum(Orde
179182
}
180183

181184
protected <T> DataStream<Tuple2<Long, T>> compileConcat(ConcatStream<T> stream) {
182-
return compile(stream.a).union(compile(stream.b)).assignTimestampsAndWatermarks(Utils.watermark(isBatch(), this.allowedLatency));
185+
return compile(stream.a).union(compile(stream.b))
186+
.assignTimestampsAndWatermarks(Utils.watermark(isBatch(), this.allowedLatency, this.withIdleness));
183187
}
184188

185189
protected <T> DataStream<Tuple2<Long, T>> compileFilter(FilterStream<T> stream) {
@@ -192,7 +196,7 @@ protected <T, U> DataStream<Tuple2<Long, U>> compileSql(final SqlStream<T, U> st
192196
final var typeInfo = stream.typeInfo;
193197
return tableEnv.toAppendStream(table, Row.class).map(r -> new Tuple2<>((Long) r.getField(0), mapper.map(r)), typeInfo)
194198
// watermark and timestamp is lost after table to data stream conversion?
195-
.assignTimestampsAndWatermarks(Utils.watermark(isBatch(), this.allowedLatency));
199+
.assignTimestampsAndWatermarks(Utils.watermark(isBatch(), this.allowedLatency, this.withIdleness));
196200
}
197201

198202
protected <K, T> KeyedStream<Tuple2<Long, Pair<K, T>>, K> compileKS(KStream<K, T> ks) {

core/src/main/java/com/airwallex/airskiff/flink/FlinkLocalTextConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,6 @@ public DataStream<Tuple2<Long, String>> source(
3131
stream = env.socketTextStream("localhost", 10000);
3232
}
3333
return stream.map(t -> new Tuple2<>(System.currentTimeMillis(), t), tuple2TypeInfo(String.class))
34-
.assignTimestampsAndWatermarks(Utils.watermark(isBatch, Duration.ZERO));
34+
.assignTimestampsAndWatermarks(Utils.watermark(isBatch, Duration.ZERO, Duration.ofMillis(300)));
3535
}
3636
}

core/src/main/java/com/airwallex/airskiff/flink/Utils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@
2828

2929
public class Utils {
3030

31-
public static <T> WatermarkStrategy<Tuple2<Long, T>> watermark(boolean isBatch, Duration allowedLatency) {
31+
public static <T> WatermarkStrategy<Tuple2<Long, T>> watermark(boolean isBatch, Duration allowedLatency, Duration withIdleness) {
3232
if (isBatch) {
3333
return WatermarkStrategy.<Tuple2<Long, T>>forBoundedOutOfOrderness(Duration.ofDays(365 * 10000))
3434
.withTimestampAssigner((t, l) -> t.f0);
3535
} else {
36-
return new RealtimeWatermarkStrategy<Tuple2<Long, T>>(Duration.ofDays(5), allowedLatency).withTimestampAssigner((t, l) -> t.f0).withIdleness(Duration.ofMillis(300));
36+
return new RealtimeWatermarkStrategy<Tuple2<Long, T>>(Duration.ofDays(5), allowedLatency).withTimestampAssigner((t, l) -> t.f0).withIdleness(withIdleness);
3737
}
3838
}
3939

core/src/test/java/com/airwallex/airskiff/testhelpers/TestFlinkConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ public Class<T> clz() {
2525

2626
@Override
2727
public DataStream<Tuple2<Long, T>> source(StreamExecutionEnvironment env, boolean isBatch) {
28-
return env.fromCollection(_data).assignTimestampsAndWatermarks(Utils.watermark(isBatch, Duration.ZERO));
28+
return env.fromCollection(_data).assignTimestampsAndWatermarks(Utils.watermark(isBatch, Duration.ZERO, Duration.ofMillis(300)));
2929
}
3030
}

gradle.properties

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
group=com.airwallex.airskiff
22
name=core
3-
version=2.0.60
3+
version=2.0.61
44
org.gradle.daemon=true
55
org.gradle.parallel=true
66
file.encoding=utf-8

0 commit comments

Comments
 (0)