Skip to content

Commit 0acf92f

Browse files
authored
[FLINK-28693][table] Janino compile failed because of the generated code refers a class in table-planner
1 parent 163b9cc commit 0acf92f

File tree

14 files changed

+355
-15
lines changed

14 files changed

+355
-15
lines changed
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.table.sql.codegen;
19+
package org.apache.flink.table.sql;
2020

2121
import org.apache.hadoop.fs.LocatedFileStatus;
2222
import org.apache.hadoop.fs.RemoteIterator;
+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.table.sql.codegen;
19+
package org.apache.flink.table.sql;
2020

2121
import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
2222
import org.apache.flink.table.api.DataTypes;
+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.table.sql.codegen;
19+
package org.apache.flink.table.sql;
2020

2121
import org.apache.flink.test.resources.ResourceTestUtils;
2222
import org.apache.flink.test.util.SQLJobSubmission;
+10-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.table.sql.codegen;
19+
package org.apache.flink.table.sql;
2020

2121
import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
2222
import org.apache.flink.table.api.DataTypes;
@@ -33,6 +33,8 @@
3333
import java.util.Collections;
3434
import java.util.List;
3535

36+
import static org.junit.Assume.assumeTrue;
37+
3638
/**
3739
* End-to-End tests for table planner scala-free since 1.15. Due to scala-free of table planner
3840
* introduced, the class in table planner is not visible in distribution runtime, if we use these
@@ -61,6 +63,13 @@ public void testImperativeUdaf() throws Exception {
6163
runAndCheckSQL("scala_free_e2e.sql", Arrays.asList("+I[Bob, 2]", "+I[Alice, 1]"));
6264
}
6365

66+
/** The test data is from {@link org.apache.flink.table.toolbox.TestSourceFunction#DATA}. */
67+
@Test
68+
public void testWatermarkPushDown() throws Exception {
69+
assumeTrue(executionMode.equalsIgnoreCase("streaming"));
70+
runAndCheckSQL("watermark_push_down_e2e.sql", Arrays.asList("+I[Bob, 1]", "+I[Alice, 2]"));
71+
}
72+
6473
@Override
6574
protected List<String> formatRawResult(List<String> rawResult) {
6675
return convertToMaterializedResult(rawResult, SINK_TABLE_SCHEMA, DESERIALIZATION_SCHEMA);
+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.table.sql.codegen;
19+
package org.apache.flink.table.sql;
2020

2121
import org.apache.flink.api.common.serialization.DeserializationSchema;
2222
import org.apache.flink.api.common.time.Deadline;
+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.flink.table.sql.codegen;
19+
package org.apache.flink.table.sql;
2020

2121
import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
2222
import org.apache.flink.table.api.DataTypes;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
CREATE TABLE SourceTable (
20+
user_name STRING,
21+
order_cnt BIGINT,
22+
order_time TIMESTAMP(3),
23+
ts AS COALESCE(`order_time` ,CURRENT_TIMESTAMP),
24+
watermark FOR `ts` AS `ts` - INTERVAL '5' SECOND
25+
) WITH (
26+
'connector' = 'test-scan-table-source-with-watermark-push-down'
27+
);
28+
29+
CREATE TABLE SinkTable (
30+
user_name STRING,
31+
order_cnt BIGINT
32+
) WITH (
33+
'connector' = 'filesystem',
34+
'path' = '$RESULT',
35+
'sink.rolling-policy.rollover-interval' = '2s',
36+
'sink.rolling-policy.check-interval' = '2s',
37+
'format' = 'debezium-json'
38+
);
39+
40+
SET execution.runtime-mode = $MODE;
41+
42+
INSERT INTO SinkTable
43+
SELECT user_name, order_cnt
44+
FROM SourceTable;

flink-end-to-end-tests/flink-sql-client-test/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ under the License.
3838
<scope>provided</scope>
3939
</dependency>
4040

41+
<dependency>
42+
<groupId>org.apache.flink</groupId>
43+
<artifactId>flink-table-api-java-bridge</artifactId>
44+
<version>${project.version}</version>
45+
<scope>provided</scope>
46+
</dependency>
47+
4148
<dependency>
4249
<groupId>org.apache.flink</groupId>
4350
<artifactId>flink-end-to-end-tests-common</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.toolbox;
20+
21+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
22+
import org.apache.flink.table.connector.ChangelogMode;
23+
import org.apache.flink.table.connector.source.DynamicTableSource;
24+
import org.apache.flink.table.connector.source.ScanTableSource;
25+
import org.apache.flink.table.connector.source.SourceFunctionProvider;
26+
import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
27+
import org.apache.flink.table.data.RowData;
28+
29+
/**
30+
* A source used to test {@link SupportsWatermarkPushDown}.
31+
*
32+
* <p>For simplicity, the deprecated source function method is used to create the source.
33+
*/
34+
public class TestScanTableSourceWithWatermarkPushDown
35+
implements ScanTableSource, SupportsWatermarkPushDown {
36+
37+
private WatermarkStrategy<RowData> watermarkStrategy;
38+
39+
@Override
40+
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
41+
this.watermarkStrategy = watermarkStrategy;
42+
}
43+
44+
@Override
45+
public DynamicTableSource copy() {
46+
final TestScanTableSourceWithWatermarkPushDown newSource =
47+
new TestScanTableSourceWithWatermarkPushDown();
48+
newSource.watermarkStrategy = this.watermarkStrategy;
49+
return newSource;
50+
}
51+
52+
@Override
53+
public String asSummaryString() {
54+
return "TestScanTableSourceWithWatermarkPushDown";
55+
}
56+
57+
@Override
58+
public ChangelogMode getChangelogMode() {
59+
return ChangelogMode.insertOnly();
60+
}
61+
62+
@Override
63+
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
64+
return SourceFunctionProvider.of(new TestSourceFunction(watermarkStrategy), false);
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.toolbox;
20+
21+
import org.apache.flink.configuration.ConfigOption;
22+
import org.apache.flink.table.connector.source.DynamicTableSource;
23+
import org.apache.flink.table.factories.DynamicTableSourceFactory;
24+
25+
import java.util.Collections;
26+
import java.util.Set;
27+
28+
/** A factory to create {@link TestScanTableSourceWithWatermarkPushDown}. */
29+
public class TestScanTableSourceWithWatermarkPushDownFactory implements DynamicTableSourceFactory {
30+
31+
public static final String IDENTIFIER = "test-scan-table-source-with-watermark-push-down";
32+
33+
@Override
34+
public DynamicTableSource createDynamicTableSource(Context context) {
35+
return new TestScanTableSourceWithWatermarkPushDown();
36+
}
37+
38+
@Override
39+
public String factoryIdentifier() {
40+
return IDENTIFIER;
41+
}
42+
43+
@Override
44+
public Set<ConfigOption<?>> requiredOptions() {
45+
return Collections.emptySet();
46+
}
47+
48+
@Override
49+
public Set<ConfigOption<?>> optionalOptions() {
50+
return Collections.emptySet();
51+
}
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.toolbox;
20+
21+
import org.apache.flink.api.common.eventtime.Watermark;
22+
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
23+
import org.apache.flink.api.common.eventtime.WatermarkOutput;
24+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
25+
import org.apache.flink.streaming.api.functions.source.SourceFunction;
26+
import org.apache.flink.table.data.GenericRowData;
27+
import org.apache.flink.table.data.RowData;
28+
import org.apache.flink.table.data.StringData;
29+
import org.apache.flink.table.data.TimestampData;
30+
31+
import java.util.ArrayList;
32+
import java.util.Arrays;
33+
import java.util.List;
34+
35+
/**
36+
* A source function used to test.
37+
*
38+
* <p>For simplicity, the deprecated source function method is used to create the source.
39+
*/
40+
@SuppressWarnings("deprecation")
41+
public class TestSourceFunction implements SourceFunction<RowData> {
42+
43+
public static final List<List<Object>> DATA = new ArrayList<>();
44+
45+
static {
46+
DATA.add(Arrays.asList(StringData.fromString("Bob"), 1L, TimestampData.fromEpochMillis(1)));
47+
DATA.add(
48+
Arrays.asList(
49+
StringData.fromString("Alice"), 2L, TimestampData.fromEpochMillis(2)));
50+
}
51+
52+
private final WatermarkStrategy<RowData> watermarkStrategy;
53+
54+
public TestSourceFunction(WatermarkStrategy<RowData> watermarkStrategy) {
55+
this.watermarkStrategy = watermarkStrategy;
56+
}
57+
58+
private volatile boolean isRunning = true;
59+
60+
@Override
61+
public void run(SourceContext<RowData> ctx) {
62+
WatermarkGenerator<RowData> generator =
63+
watermarkStrategy.createWatermarkGenerator(() -> null);
64+
WatermarkOutput output = new TestWatermarkOutput(ctx);
65+
66+
int rowDataSize = DATA.get(0).size();
67+
int index = 0;
68+
while (index < DATA.size() && isRunning) {
69+
List<Object> list = DATA.get(index);
70+
GenericRowData row = new GenericRowData(rowDataSize);
71+
for (int i = 0; i < rowDataSize; i++) {
72+
row.setField(i, list.get(i));
73+
}
74+
generator.onEvent(row, Long.MIN_VALUE, output);
75+
generator.onPeriodicEmit(output);
76+
77+
ctx.collect(row);
78+
79+
index++;
80+
}
81+
ctx.close();
82+
}
83+
84+
@Override
85+
public void cancel() {
86+
isRunning = false;
87+
}
88+
89+
private static class TestWatermarkOutput implements WatermarkOutput {
90+
91+
private final SourceFunction.SourceContext<RowData> ctx;
92+
93+
public TestWatermarkOutput(SourceFunction.SourceContext<RowData> ctx) {
94+
this.ctx = ctx;
95+
}
96+
97+
@Override
98+
public void emitWatermark(Watermark watermark) {
99+
ctx.emitWatermark(
100+
new org.apache.flink.streaming.api.watermark.Watermark(
101+
watermark.getTimestamp()));
102+
}
103+
104+
@Override
105+
public void markIdle() {}
106+
107+
@Override
108+
public void markActive() {}
109+
}
110+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
org.apache.flink.table.toolbox.TestScanTableSourceWithWatermarkPushDownFactory

0 commit comments

Comments
 (0)