Skip to content

Commit 6f2be0a

Browse files
authored
[FLINK-37827][table] Add support for SHOW CREATE MATERIALIZED TABLE
1 parent 1e7525f commit 6f2be0a

File tree

7 files changed

+470
-128
lines changed

7 files changed

+470
-128
lines changed

flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@
133133
"org.apache.flink.sql.parser.dql.SqlShowTables"
134134
"org.apache.flink.sql.parser.dql.SqlShowColumns"
135135
"org.apache.flink.sql.parser.dql.SqlShowCreate"
136+
"org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable"
136137
"org.apache.flink.sql.parser.dql.SqlShowCreateModel"
137138
"org.apache.flink.sql.parser.dql.SqlShowCreateTable"
138139
"org.apache.flink.sql.parser.dql.SqlShowCreateView"

flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,13 @@ SqlShowCreate SqlShowCreate() :
806806
{
807807
return new SqlShowCreateModel(pos, sqlIdentifier);
808808
}
809+
|
810+
<MATERIALIZED> <TABLE>
811+
{ pos = getPos(); }
812+
sqlIdentifier = CompoundIdentifier()
813+
{
814+
return new SqlShowCreateMaterializedTable(pos, sqlIdentifier);
815+
}
809816
)
810817
}
811818

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.sql.parser.dql;
20+
21+
import org.apache.calcite.sql.SqlIdentifier;
22+
import org.apache.calcite.sql.SqlKind;
23+
import org.apache.calcite.sql.SqlNode;
24+
import org.apache.calcite.sql.SqlOperator;
25+
import org.apache.calcite.sql.SqlSpecialOperator;
26+
import org.apache.calcite.sql.SqlWriter;
27+
import org.apache.calcite.sql.parser.SqlParserPos;
28+
29+
import java.util.Collections;
30+
import java.util.List;
31+
32+
/** SHOW CREATE MATERIALIZED TABLE sql call. */
33+
public class SqlShowCreateMaterializedTable extends SqlShowCreate {
34+
public static final SqlSpecialOperator OPERATOR =
35+
new SqlSpecialOperator("SHOW CREATE MATERIALIZED TABLE", SqlKind.OTHER_DDL);
36+
37+
public SqlShowCreateMaterializedTable(SqlParserPos pos, SqlIdentifier sqlIdentifier) {
38+
super(pos, sqlIdentifier);
39+
}
40+
41+
public SqlIdentifier getMaterializedTableName() {
42+
return sqlIdentifier;
43+
}
44+
45+
public String[] getFullMaterializedTableName() {
46+
return sqlIdentifier.names.toArray(new String[0]);
47+
}
48+
49+
@Override
50+
public SqlOperator getOperator() {
51+
return OPERATOR;
52+
}
53+
54+
@Override
55+
public List<SqlNode> getOperandList() {
56+
return Collections.singletonList(sqlIdentifier);
57+
}
58+
59+
@Override
60+
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
61+
writer.keyword(OPERATOR.getName());
62+
sqlIdentifier.unparse(writer, leftPrec, rightPrec);
63+
}
64+
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java

Lines changed: 102 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.api.TableException;
2323
import org.apache.flink.table.catalog.CatalogBaseTable;
24+
import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
2425
import org.apache.flink.table.catalog.CatalogDescriptor;
2526
import org.apache.flink.table.catalog.CatalogView;
2627
import org.apache.flink.table.catalog.Column;
28+
import org.apache.flink.table.catalog.IntervalFreshness;
2729
import org.apache.flink.table.catalog.ObjectIdentifier;
2830
import org.apache.flink.table.catalog.QueryOperationCatalogView;
2931
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
32+
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
3033
import org.apache.flink.table.catalog.ResolvedCatalogModel;
3134
import org.apache.flink.table.catalog.ResolvedCatalogTable;
3235
import org.apache.flink.table.catalog.ResolvedSchema;
@@ -38,6 +41,7 @@
3841
import org.apache.commons.lang3.StringUtils;
3942

4043
import java.util.List;
44+
import java.util.Locale;
4145
import java.util.Map;
4246
import java.util.Objects;
4347
import java.util.Optional;
@@ -82,12 +86,7 @@ public static String buildShowCreateTableRow(
8286
ObjectIdentifier tableIdentifier,
8387
boolean isTemporary,
8488
SqlFactory sqlFactory) {
85-
if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) {
86-
throw new TableException(
87-
String.format(
88-
"SHOW CREATE TABLE is only supported for tables, but %s is a view. Please use SHOW CREATE VIEW instead.",
89-
tableIdentifier.asSerializableString()));
90-
}
89+
validateTableKind(table, tableIdentifier, TableKind.TABLE);
9190
StringBuilder sb =
9291
new StringBuilder()
9392
.append(buildCreateFormattedPrefix("TABLE", isTemporary, tableIdentifier));
@@ -110,17 +109,40 @@ public static String buildShowCreateTableRow(
110109
return sb.toString();
111110
}
112111

112+
/** Show create materialized table statement only for materialized tables. */
113+
public static String buildShowCreateMaterializedTableRow(
114+
ResolvedCatalogMaterializedTable table,
115+
ObjectIdentifier tableIdentifier,
116+
boolean isTemporary) {
117+
validateTableKind(table, tableIdentifier, TableKind.MATERIALIZED_TABLE);
118+
StringBuilder sb =
119+
new StringBuilder()
120+
.append(
121+
buildCreateFormattedPrefix(
122+
"MATERIALIZED TABLE", isTemporary, tableIdentifier));
123+
sb.append(extractFormattedColumns(table, PRINT_INDENT));
124+
extractFormattedPrimaryKey(table, PRINT_INDENT)
125+
.ifPresent(pk -> sb.append(",\n").append(pk));
126+
sb.append("\n)\n");
127+
extractComment(table).ifPresent(c -> sb.append(formatComment(c)).append("\n"));
128+
extractFormattedPartitionedInfo(table)
129+
.ifPresent(partitionedBy -> sb.append(formatPartitionedBy(partitionedBy)));
130+
extractFormattedOptions(table.getOptions(), PRINT_INDENT)
131+
.ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n"));
132+
sb.append(extractFreshness(table))
133+
.append("\n")
134+
.append(extractRefreshMode(table))
135+
.append("\n");
136+
sb.append("AS ").append(table.getDefinitionQuery()).append('\n');
137+
return sb.toString();
138+
}
139+
113140
/** Show create view statement only for views. */
114141
public static String buildShowCreateViewRow(
115142
ResolvedCatalogBaseTable<?> view,
116143
ObjectIdentifier viewIdentifier,
117144
boolean isTemporary) {
118-
if (view.getTableKind() != CatalogBaseTable.TableKind.VIEW) {
119-
throw new TableException(
120-
String.format(
121-
"SHOW CREATE VIEW is only supported for views, but %s is a table. Please use SHOW CREATE TABLE instead.",
122-
viewIdentifier.asSerializableString()));
123-
}
145+
validateTableKind(view, viewIdentifier, TableKind.VIEW);
124146
final CatalogBaseTable origin = view.getOrigin();
125147
if (origin instanceof QueryOperationCatalogView
126148
&& !((QueryOperationCatalogView) origin).supportsShowCreateView()) {
@@ -243,6 +265,10 @@ private static String formatComment(String comment) {
243265
return String.format("COMMENT '%s'", EncodingUtils.escapeSingleQuotes(comment));
244266
}
245267

268+
private static String formatPartitionedBy(String partitionedByColumns) {
269+
return String.format("PARTITION BY (%s)\n", partitionedByColumns);
270+
}
271+
246272
static Optional<String> extractComment(ResolvedCatalogBaseTable<?> table) {
247273
return StringUtils.isEmpty(table.getComment())
248274
? Optional.empty()
@@ -263,10 +289,32 @@ static Optional<String> extractFormattedPartitionedInfo(ResolvedCatalogTable cat
263289
if (!catalogTable.isPartitioned()) {
264290
return Optional.empty();
265291
}
266-
return Optional.of(
267-
catalogTable.getPartitionKeys().stream()
268-
.map(EncodingUtils::escapeIdentifier)
269-
.collect(Collectors.joining(", ")));
292+
return Optional.of(extractPartitionKeys(catalogTable.getPartitionKeys()));
293+
}
294+
295+
static Optional<String> extractFormattedPartitionedInfo(
296+
ResolvedCatalogMaterializedTable catalogMaterializedTable) {
297+
if (!catalogMaterializedTable.isPartitioned()) {
298+
return Optional.empty();
299+
}
300+
return Optional.of(extractPartitionKeys(catalogMaterializedTable.getPartitionKeys()));
301+
}
302+
303+
private static String extractPartitionKeys(List<String> partitionKeys) {
304+
return partitionKeys.stream()
305+
.map(EncodingUtils::escapeIdentifier)
306+
.collect(Collectors.joining(", "));
307+
}
308+
309+
static String extractFreshness(ResolvedCatalogMaterializedTable materializedTable) {
310+
final IntervalFreshness definitionFreshness = materializedTable.getDefinitionFreshness();
311+
return String.format(
312+
"FRESHNESS = INTERVAL '%s' %s",
313+
definitionFreshness.getInterval(), definitionFreshness.getTimeUnit());
314+
}
315+
316+
static String extractRefreshMode(ResolvedCatalogMaterializedTable materializedTable) {
317+
return String.format("REFRESH_MODE = %s", materializedTable.getRefreshMode());
270318
}
271319

272320
static Optional<String> extractFormattedOptions(Map<String, String> conf, String printIndent) {
@@ -297,4 +345,42 @@ static String extractFormattedColumnNames(
297345
EncodingUtils.escapeIdentifier(column.getName())))
298346
.collect(Collectors.joining(",\n"));
299347
}
348+
349+
private static void validateTableKind(
350+
ResolvedCatalogBaseTable<?> table,
351+
ObjectIdentifier tableIdentifier,
352+
TableKind expectedTableKind) {
353+
if (table.getTableKind() == expectedTableKind) {
354+
return;
355+
}
356+
357+
final String tableKindName = table.getTableKind().name().replace('_', ' ');
358+
final String commandToUse = showCreateCommandToUse(table.getTableKind());
359+
final String expectedTableKindName = expectedTableKind.name().replace('_', ' ');
360+
final String currentCommand = "SHOW CREATE " + expectedTableKindName;
361+
362+
throw new TableException(
363+
String.format(
364+
"%s is only supported for %ss, but %s is a %s. Please use %s instead.",
365+
currentCommand,
366+
expectedTableKindName.toLowerCase(Locale.ROOT),
367+
tableIdentifier.asSerializableString(),
368+
tableKindName.toLowerCase(Locale.ROOT),
369+
commandToUse));
370+
}
371+
372+
private static String showCreateCommandToUse(TableKind tableKind) {
373+
switch (tableKind) {
374+
case MATERIALIZED_TABLE:
375+
return "SHOW CREATE MATERIALIZED TABLE";
376+
case TABLE:
377+
return "SHOW CREATE TABLE";
378+
case VIEW:
379+
return "SHOW CREATE VIEW";
380+
default:
381+
throw new TableException(
382+
String.format(
383+
"SHOW CREATE is not implemented for %s yet.", tableKind.name()));
384+
}
385+
}
300386
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.operations;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.api.ValidationException;
23+
import org.apache.flink.table.api.internal.ShowCreateUtil;
24+
import org.apache.flink.table.api.internal.TableResultInternal;
25+
import org.apache.flink.table.catalog.ContextResolvedTable;
26+
import org.apache.flink.table.catalog.ObjectIdentifier;
27+
28+
import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult;
29+
30+
/** Operation to describe a SHOW CREATE MATERIALIZED TABLE statement. */
31+
@Internal
32+
public class ShowCreateMaterializedTableOperation implements ShowOperation {
33+
private final ObjectIdentifier tableIdentifier;
34+
35+
public ShowCreateMaterializedTableOperation(ObjectIdentifier sqlIdentifier) {
36+
this.tableIdentifier = sqlIdentifier;
37+
}
38+
39+
@Override
40+
public String asSummaryString() {
41+
return String.format(
42+
"SHOW CREATE MATERIALIZED TABLE %s", tableIdentifier.asSummaryString());
43+
}
44+
45+
@Override
46+
public TableResultInternal execute(Context ctx) {
47+
ContextResolvedTable table =
48+
ctx.getCatalogManager()
49+
.getTable(tableIdentifier)
50+
.orElseThrow(
51+
() ->
52+
new ValidationException(
53+
String.format(
54+
"Could not execute SHOW CREATE MATERIALIZED TABLE. Materialized table with identifier %s does not exist.",
55+
tableIdentifier.asSerializableString())));
56+
String resultRow =
57+
ShowCreateUtil.buildShowCreateMaterializedTableRow(
58+
table.getResolvedTable(), tableIdentifier, table.isTemporary());
59+
60+
return buildStringArrayResult("result", new String[] {resultRow});
61+
}
62+
}

0 commit comments

Comments
 (0)