From 4c510983a91627db8b5801ddd96d190287248830 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Mon, 18 Aug 2025 17:52:00 +0200 Subject: [PATCH 1/2] [FLINK-37827][table] Add support for `SHOW CREATE MATERIZLIED TABLE` --- .../src/main/codegen/data/Parser.tdd | 1 + .../src/main/codegen/includes/parserImpls.ftl | 7 ++ .../dql/SqlShowCreateMaterializedTable.java | 64 ++++++++++ .../table/api/internal/ShowCreateUtil.java | 118 +++++++++++++++--- .../ShowCreateMaterializedTableOperation.java | 62 +++++++++ .../api/internal/ShowCreateUtilTest.java | 79 ++++++++++++ .../SqlNodeToOperationConversion.java | 16 +++ 7 files changed, 331 insertions(+), 16 deletions(-) create mode 100644 flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateMaterializedTable.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 4108e5b1495d0..05619b382e7fe 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -133,6 +133,7 @@ "org.apache.flink.sql.parser.dql.SqlShowTables" "org.apache.flink.sql.parser.dql.SqlShowColumns" "org.apache.flink.sql.parser.dql.SqlShowCreate" + "org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable" "org.apache.flink.sql.parser.dql.SqlShowCreateModel" "org.apache.flink.sql.parser.dql.SqlShowCreateTable" "org.apache.flink.sql.parser.dql.SqlShowCreateView" diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index fb2b688b42cbd..45b21f51af61c 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -806,6 +806,13 @@ SqlShowCreate SqlShowCreate() : { return new SqlShowCreateModel(pos, sqlIdentifier); } + | + + { pos = getPos(); } + sqlIdentifier = CompoundIdentifier() + { + return new SqlShowCreateMaterializedTable(pos, sqlIdentifier); + } ) } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateMaterializedTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateMaterializedTable.java new file mode 100644 index 0000000000000..eb38d2f431b83 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateMaterializedTable.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser.dql; + +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; + +import java.util.Collections; +import java.util.List; + +/** SHOW CREATE MATERIALIZED TABLE sql call. */ +public class SqlShowCreateMaterializedTable extends SqlShowCreate { + public static final SqlSpecialOperator OPERATOR = + new SqlSpecialOperator("SHOW CREATE MATERIALIZED TABLE", SqlKind.OTHER_DDL); + + public SqlShowCreateMaterializedTable(SqlParserPos pos, SqlIdentifier sqlIdentifier) { + super(pos, sqlIdentifier); + } + + public SqlIdentifier getMaterializedTableName() { + return sqlIdentifier; + } + + public String[] getFullMaterializedTableName() { + return sqlIdentifier.names.toArray(new String[0]); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(sqlIdentifier); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword(OPERATOR.getName()); + sqlIdentifier.unparse(writer, leftPrec, rightPrec); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java index 40f77169f7f5a..aa9b8f2e0e98d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/ShowCreateUtil.java @@ -21,12 +21,15 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.QueryOperationCatalogView; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogModel; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; @@ -38,6 +41,7 @@ import org.apache.commons.lang3.StringUtils; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -82,12 +86,7 @@ public static String buildShowCreateTableRow( ObjectIdentifier tableIdentifier, boolean isTemporary, SqlFactory sqlFactory) { - if (table.getTableKind() == CatalogBaseTable.TableKind.VIEW) { - throw new TableException( - String.format( - "SHOW CREATE TABLE is only supported for tables, but %s is a view. Please use SHOW CREATE VIEW instead.", - tableIdentifier.asSerializableString())); - } + validateTableKind(table, tableIdentifier, TableKind.TABLE); StringBuilder sb = new StringBuilder() .append(buildCreateFormattedPrefix("TABLE", isTemporary, tableIdentifier)); @@ -110,17 +109,40 @@ public static String buildShowCreateTableRow( return sb.toString(); } + /** Show create materialized table statement only for materialized tables. */ + public static String buildShowCreateMaterializedTableRow( + ResolvedCatalogMaterializedTable table, + ObjectIdentifier tableIdentifier, + boolean isTemporary) { + validateTableKind(table, tableIdentifier, TableKind.MATERIALIZED_TABLE); + StringBuilder sb = + new StringBuilder() + .append( + buildCreateFormattedPrefix( + "MATERIALIZED TABLE", isTemporary, tableIdentifier)); + sb.append(extractFormattedColumns(table, PRINT_INDENT)); + extractFormattedPrimaryKey(table, PRINT_INDENT) + .ifPresent(pk -> sb.append(",\n").append(pk)); + sb.append("\n)\n"); + extractComment(table).ifPresent(c -> sb.append(formatComment(c)).append("\n")); + extractFormattedPartitionedInfo(table) + .ifPresent(partitionedBy -> sb.append(formatPartitionedBy(partitionedBy))); + extractFormattedOptions(table.getOptions(), PRINT_INDENT) + .ifPresent(v -> sb.append("WITH (\n").append(v).append("\n)\n")); + sb.append(extractFreshness(table)) + .append("\n") + .append(extractRefreshMode(table)) + .append("\n"); + sb.append("AS ").append(table.getDefinitionQuery()).append('\n'); + return sb.toString(); + } + /** Show create view statement only for views. */ public static String buildShowCreateViewRow( ResolvedCatalogBaseTable view, ObjectIdentifier viewIdentifier, boolean isTemporary) { - if (view.getTableKind() != CatalogBaseTable.TableKind.VIEW) { - throw new TableException( - String.format( - "SHOW CREATE VIEW is only supported for views, but %s is a table. Please use SHOW CREATE TABLE instead.", - viewIdentifier.asSerializableString())); - } + validateTableKind(view, viewIdentifier, TableKind.VIEW); final CatalogBaseTable origin = view.getOrigin(); if (origin instanceof QueryOperationCatalogView && !((QueryOperationCatalogView) origin).supportsShowCreateView()) { @@ -243,6 +265,10 @@ private static String formatComment(String comment) { return String.format("COMMENT '%s'", EncodingUtils.escapeSingleQuotes(comment)); } + private static String formatPartitionedBy(String partitionedByColumns) { + return String.format("PARTITION BY (%s)\n", partitionedByColumns); + } + static Optional extractComment(ResolvedCatalogBaseTable table) { return StringUtils.isEmpty(table.getComment()) ? Optional.empty() @@ -263,10 +289,32 @@ static Optional extractFormattedPartitionedInfo(ResolvedCatalogTable cat if (!catalogTable.isPartitioned()) { return Optional.empty(); } - return Optional.of( - catalogTable.getPartitionKeys().stream() - .map(EncodingUtils::escapeIdentifier) - .collect(Collectors.joining(", "))); + return Optional.of(extractPartitionKeys(catalogTable.getPartitionKeys())); + } + + static Optional extractFormattedPartitionedInfo( + ResolvedCatalogMaterializedTable catalogMaterializedTable) { + if (!catalogMaterializedTable.isPartitioned()) { + return Optional.empty(); + } + return Optional.of(extractPartitionKeys(catalogMaterializedTable.getPartitionKeys())); + } + + private static String extractPartitionKeys(List partitionKeys) { + return partitionKeys.stream() + .map(EncodingUtils::escapeIdentifier) + .collect(Collectors.joining(", ")); + } + + static String extractFreshness(ResolvedCatalogMaterializedTable materializedTable) { + final IntervalFreshness definitionFreshness = materializedTable.getDefinitionFreshness(); + return String.format( + "FRESHNESS = INTERVAL '%s' %s", + definitionFreshness.getInterval(), definitionFreshness.getTimeUnit()); + } + + static String extractRefreshMode(ResolvedCatalogMaterializedTable materializedTable) { + return String.format("REFRESH_MODE = %s", materializedTable.getRefreshMode()); } static Optional extractFormattedOptions(Map conf, String printIndent) { @@ -297,4 +345,42 @@ static String extractFormattedColumnNames( EncodingUtils.escapeIdentifier(column.getName()))) .collect(Collectors.joining(",\n")); } + + private static void validateTableKind( + ResolvedCatalogBaseTable table, + ObjectIdentifier tableIdentifier, + TableKind expectedTableKind) { + if (table.getTableKind() == expectedTableKind) { + return; + } + + final String tableKindName = table.getTableKind().name().replace('_', ' '); + final String commandToUse = showCreateCommandToUse(table.getTableKind()); + final String expectedTableKindName = expectedTableKind.name().replace('_', ' '); + final String currentCommand = "SHOW CREATE " + expectedTableKindName; + + throw new TableException( + String.format( + "%s is only supported for %ss, but %s is a %s. Please use %s instead.", + currentCommand, + expectedTableKindName.toLowerCase(Locale.ROOT), + tableIdentifier.asSerializableString(), + tableKindName.toLowerCase(Locale.ROOT), + commandToUse)); + } + + private static String showCreateCommandToUse(TableKind tableKind) { + switch (tableKind) { + case MATERIALIZED_TABLE: + return "SHOW CREATE MATERIALIZED TABLE"; + case TABLE: + return "SHOW CREATE TABLE"; + case VIEW: + return "SHOW CREATE VIEW"; + default: + throw new TableException( + String.format( + "SHOW CREATE is not implemented for %s yet.", tableKind.name())); + } + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java new file mode 100644 index 0000000000000..e410aa25f6c15 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowCreateMaterializedTableOperation.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.ShowCreateUtil; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ContextResolvedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildStringArrayResult; + +/** Operation to describe a SHOW CREATE MATERIALIZED TABLE statement. */ +@Internal +public class ShowCreateMaterializedTableOperation implements ShowOperation { + private final ObjectIdentifier tableIdentifier; + + public ShowCreateMaterializedTableOperation(ObjectIdentifier sqlIdentifier) { + this.tableIdentifier = sqlIdentifier; + } + + @Override + public String asSummaryString() { + return String.format( + "SHOW CREATE MATERIALIZED TABLE %s", tableIdentifier.asSummaryString()); + } + + @Override + public TableResultInternal execute(Context ctx) { + ContextResolvedTable table = + ctx.getCatalogManager() + .getTable(tableIdentifier) + .orElseThrow( + () -> + new ValidationException( + String.format( + "Could not execute SHOW CREATE MATERIALIZED TABLE. Materialized table with identifier %s does not exist.", + tableIdentifier.asSerializableString()))); + String resultRow = + ShowCreateUtil.buildShowCreateMaterializedTableRow( + table.getResolvedTable(), tableIdentifier, table.isTemporary()); + + return buildStringArrayResult("result", new String[] {resultRow}); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java index 768b49d49f5e4..6c8a6ec071b9d 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java @@ -22,10 +22,16 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; +import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedCatalogView; import org.apache.flink.table.catalog.ResolvedSchema; @@ -52,6 +58,8 @@ public class ShowCreateUtilTest { ObjectIdentifier.of("catalogName", "dbName", "tableName"); private static final ObjectIdentifier VIEW_IDENTIFIER = ObjectIdentifier.of("catalogName", "dbName", "viewName"); + private static final ObjectIdentifier MATERIALIZED_TABLE_IDENTIFIER = + ObjectIdentifier.of("catalogName", "dbName", "materializedTableName"); private static final ResolvedSchema ONE_COLUMN_SCHEMA = ResolvedSchema.of(Column.physical("id", DataTypes.INT())); @@ -78,6 +86,16 @@ void showCreateView(ResolvedCatalogView resolvedCatalogView, String expected) { assertThat(createViewString).isEqualTo(expected); } + @ParameterizedTest(name = "{index}: {1}") + @MethodSource("argsForShowCreateMaterializedTable") + void showCreateMaterializedTable( + ResolvedCatalogMaterializedTable materializedTable, String expected) { + final String createMaterializedTableString = + ShowCreateUtil.buildShowCreateMaterializedTableRow( + materializedTable, MATERIALIZED_TABLE_IDENTIFIER, false); + assertThat(createMaterializedTableString).isEqualTo(expected); + } + @ParameterizedTest(name = "{index}: {1}") @MethodSource("argsForShowCreateCatalog") void showCreateCatalog(CatalogDescriptor catalogDescriptor, String expected) { @@ -241,6 +259,46 @@ private static Collection argsForShowCreateTable() { return argList; } + private static Collection argsForShowCreateMaterializedTable() { + Collection argList = new ArrayList<>(); + argList.add( + Arguments.of( + createResolvedMaterialized( + ONE_COLUMN_SCHEMA, + null, + List.of(), + IntervalFreshness.ofMinute("1"), + RefreshMode.CONTINUOUS, + "SELECT 1"), + "CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" + + " `id` INT\n" + + ")\n" + + "FRESHNESS = INTERVAL '1' MINUTE\n" + + "REFRESH_MODE = CONTINUOUS\n" + + "AS SELECT 1\n")); + + argList.add( + Arguments.of( + createResolvedMaterialized( + TWO_COLUMNS_SCHEMA, + "Materialized table comment", + List.of("id"), + IntervalFreshness.ofMinute("3"), + RefreshMode.FULL, + "SELECT id, name FROM tbl_a"), + "CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" + + " `id` INT,\n" + + " `name` VARCHAR(2147483647)\n" + + ")\n" + + "COMMENT 'Materialized table comment'\n" + + "PARTITION BY (`id`)\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT id, name FROM tbl_a\n")); + + return argList; + } + private static ResolvedCatalogTable createResolvedTable( ResolvedSchema resolvedSchema, Map options, @@ -273,4 +331,25 @@ private static ResolvedCatalogView createResolvedView( Collections.emptyMap()), resolvedSchema); } + + private static ResolvedCatalogMaterializedTable createResolvedMaterialized( + ResolvedSchema resolvedSchema, + String comment, + List partitionBy, + IntervalFreshness freshness, + RefreshMode refreshMode, + String definitionQuery) { + return new ResolvedCatalogMaterializedTable( + CatalogMaterializedTable.newBuilder() + .comment(comment) + .partitionKeys(partitionBy) + .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema).build()) + .freshness(freshness) + .refreshMode(refreshMode) + .definitionQuery(definitionQuery) + .logicalRefreshMode(LogicalRefreshMode.AUTOMATIC) + .refreshStatus(RefreshStatus.ACTIVATED) + .build(), + resolvedSchema); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 269af4f60d025..f4cab24a48341 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -65,6 +65,7 @@ import org.apache.flink.sql.parser.dql.SqlRichDescribeTable; import org.apache.flink.sql.parser.dql.SqlRichExplain; import org.apache.flink.sql.parser.dql.SqlShowColumns; +import org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable; import org.apache.flink.sql.parser.dql.SqlShowCreateTable; import org.apache.flink.sql.parser.dql.SqlShowCreateView; import org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog; @@ -122,6 +123,7 @@ import org.apache.flink.table.operations.NopOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ShowColumnsOperation; +import org.apache.flink.table.operations.ShowCreateMaterializedTableOperation; import org.apache.flink.table.operations.ShowCreateTableOperation; import org.apache.flink.table.operations.ShowCreateViewOperation; import org.apache.flink.table.operations.ShowCurrentCatalogOperation; @@ -320,6 +322,10 @@ private static Optional convertValidatedSqlNode( return Optional.of(converter.convertAlterFunction((SqlAlterFunction) validated)); } else if (validated instanceof SqlShowCreateTable) { return Optional.of(converter.convertShowCreateTable((SqlShowCreateTable) validated)); + } else if (validated instanceof SqlShowCreateMaterializedTable) { + return Optional.of( + converter.convertShowCreateMaterializedTable( + (SqlShowCreateMaterializedTable) validated)); } else if (validated instanceof SqlShowCreateView) { return Optional.of(converter.convertShowCreateView((SqlShowCreateView) validated)); } else if (validated instanceof SqlRichExplain) { @@ -884,6 +890,16 @@ private Operation convertShowCreateTable(SqlShowCreateTable sqlShowCreateTable) return new ShowCreateTableOperation(identifier); } + /** Convert SHOW CREATE MATERIALIZED TABLE statement. */ + private Operation convertShowCreateMaterializedTable( + SqlShowCreateMaterializedTable sqlShowCreateMaterializedTable) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of( + sqlShowCreateMaterializedTable.getFullMaterializedTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + return new ShowCreateMaterializedTableOperation(identifier); + } + /** Convert SHOW CREATE VIEW statement. */ private Operation convertShowCreateView(SqlShowCreateView sqlShowCreateView) { UnresolvedIdentifier unresolvedIdentifier = From 24343278850e21dbcbffc6369e8b177536098d61 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Wed, 27 Aug 2025 09:07:35 +0200 Subject: [PATCH 2/2] Add temporary to tests --- .../api/internal/ShowCreateUtilTest.java | 325 ++++++++++-------- 1 file changed, 176 insertions(+), 149 deletions(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java index 6c8a6ec071b9d..c8c92a8208e89 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java @@ -21,6 +21,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.CatalogMaterializedTable.LogicalRefreshMode; @@ -53,7 +54,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** Test {@link ShowCreateUtil}. */ -public class ShowCreateUtilTest { +class ShowCreateUtilTest { private static final ObjectIdentifier TABLE_IDENTIFIER = ObjectIdentifier.of("catalogName", "dbName", "tableName"); private static final ObjectIdentifier VIEW_IDENTIFIER = @@ -69,30 +70,38 @@ public class ShowCreateUtilTest { Column.physical("id", DataTypes.INT()), Column.physical("name", DataTypes.STRING())); - @ParameterizedTest(name = "{index}: {1}") + @ParameterizedTest(name = "{index}: {2}") @MethodSource("argsForShowCreateTable") - void showCreateTable(ResolvedCatalogTable resolvedCatalogTable, String expected) { + void showCreateTable( + ResolvedCatalogTable resolvedCatalogTable, boolean isTemporary, String expected) { final String createTableString = ShowCreateUtil.buildShowCreateTableRow( - resolvedCatalogTable, TABLE_IDENTIFIER, false, DefaultSqlFactory.INSTANCE); + resolvedCatalogTable, + TABLE_IDENTIFIER, + isTemporary, + DefaultSqlFactory.INSTANCE); assertThat(createTableString).isEqualTo(expected); } - @ParameterizedTest(name = "{index}: {1}") + @ParameterizedTest(name = "{index}: {2}") @MethodSource("argsForShowCreateView") - void showCreateView(ResolvedCatalogView resolvedCatalogView, String expected) { + void showCreateView( + ResolvedCatalogView resolvedCatalogView, boolean isTemporary, String expected) { final String createViewString = - ShowCreateUtil.buildShowCreateViewRow(resolvedCatalogView, VIEW_IDENTIFIER, false); + ShowCreateUtil.buildShowCreateViewRow( + resolvedCatalogView, VIEW_IDENTIFIER, isTemporary); assertThat(createViewString).isEqualTo(expected); } - @ParameterizedTest(name = "{index}: {1}") + @ParameterizedTest(name = "{index}: {2}") @MethodSource("argsForShowCreateMaterializedTable") void showCreateMaterializedTable( - ResolvedCatalogMaterializedTable materializedTable, String expected) { + ResolvedCatalogMaterializedTable materializedTable, + boolean isTemporary, + String expected) { final String createMaterializedTableString = ShowCreateUtil.buildShowCreateMaterializedTableRow( - materializedTable, MATERIALIZED_TABLE_IDENTIFIER, false); + materializedTable, MATERIALIZED_TABLE_IDENTIFIER, isTemporary); assertThat(createMaterializedTableString).isEqualTo(expected); } @@ -137,168 +146,186 @@ private static Collection argsForShowCreateCatalog() { } private static Collection argsForShowCreateView() { - Collection argList = new ArrayList<>(); - argList.add( - Arguments.of( - createResolvedView(ONE_COLUMN_SCHEMA, "SELECT 1", "SELECT 1", null), - "CREATE VIEW `catalogName`.`dbName`.`viewName` (\n" - + " `id`\n" - + ")\n" - + "AS SELECT 1\n")); + final Collection argList = new ArrayList<>(); + addTemporaryAndPermanent( + argList, + createResolvedView(ONE_COLUMN_SCHEMA, "SELECT 1", "SELECT 1", null), + "CREATE %sVIEW `catalogName`.`dbName`.`viewName` (\n" + + " `id`\n" + + ")\n" + + "AS SELECT 1\n"); - argList.add( - Arguments.of( - createResolvedView( - TWO_COLUMNS_SCHEMA, - "SELECT id, name FROM tbl_a", - "SELECT id, name FROM `catalogName`.`dbName`.`tbl_a`", - "View comment"), - "CREATE VIEW `catalogName`.`dbName`.`viewName` (\n" - + " `id`,\n" - + " `name`\n" - + ")\n" - + "COMMENT 'View comment'\n" - + "AS SELECT id, name FROM `catalogName`.`dbName`.`tbl_a`\n")); + addTemporaryAndPermanent( + argList, + createResolvedView( + TWO_COLUMNS_SCHEMA, + "SELECT id, name FROM tbl_a", + "SELECT id, name FROM `catalogName`.`dbName`.`tbl_a`", + "View comment"), + "CREATE %sVIEW `catalogName`.`dbName`.`viewName` (\n" + + " `id`,\n" + + " `name`\n" + + ")\n" + + "COMMENT 'View comment'\n" + + "AS SELECT id, name FROM `catalogName`.`dbName`.`tbl_a`\n"); return argList; } private static Collection argsForShowCreateTable() { - Collection argList = new ArrayList<>(); - argList.add( - Arguments.of( - createResolvedTable( - ONE_COLUMN_SCHEMA, - Collections.emptyMap(), - Collections.emptyList(), - TableDistribution.of( - TableDistribution.Kind.HASH, - 2, - Arrays.asList("key1", "key2")), - null), - "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n" - + " `id` INT\n" - + ")\n" - + "DISTRIBUTED BY HASH(`key1`, `key2`) INTO 2 BUCKETS\n")); + final Collection argList = new ArrayList<>(); + addTemporaryAndPermanent( + argList, + createResolvedTable( + ONE_COLUMN_SCHEMA, + Collections.emptyMap(), + Collections.emptyList(), + TableDistribution.of( + TableDistribution.Kind.HASH, 2, Arrays.asList("key1", "key2")), + null), + "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n" + + " `id` INT\n" + + ")\n" + + "DISTRIBUTED BY HASH(`key1`, `key2`) INTO 2 BUCKETS\n"); - argList.add( - Arguments.of( - createResolvedTable( - ONE_COLUMN_SCHEMA, - Collections.emptyMap(), - Collections.emptyList(), - TableDistribution.of( - TableDistribution.Kind.RANGE, 2, Arrays.asList("1", "10")), - "Table comment"), - "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n" - + " `id` INT\n" - + ")\n" - + "COMMENT 'Table comment'\n" - + "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2 BUCKETS\n")); + addTemporaryAndPermanent( + argList, + createResolvedTable( + ONE_COLUMN_SCHEMA, + Collections.emptyMap(), + Collections.emptyList(), + TableDistribution.of( + TableDistribution.Kind.RANGE, 2, Arrays.asList("1", "10")), + "Table comment"), + "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n" + + " `id` INT\n" + + ")\n" + + "COMMENT 'Table comment'\n" + + "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2 BUCKETS\n"); final Map options = new HashMap<>(); options.put("option_key_a", "option_value_a"); options.put("option_key_b", "option_value_b"); options.put("option_key_c", "option_value_c"); - argList.add( - Arguments.of( - createResolvedTable( - TWO_COLUMNS_SCHEMA, - options, - Collections.emptyList(), - null, - "Another table comment"), - "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n" - + " `id` INT,\n" - + " `name` VARCHAR(2147483647)\n" - + ")\n" - + "COMMENT 'Another table comment'\n" - + "WITH (\n" - + " 'option_key_a' = 'option_value_a',\n" - + " 'option_key_b' = 'option_value_b',\n" - + " 'option_key_c' = 'option_value_c'\n" - + ")\n")); + addTemporaryAndPermanent( + argList, + createResolvedTable( + TWO_COLUMNS_SCHEMA, + options, + Collections.emptyList(), + null, + "Another table comment"), + "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n" + + " `id` INT,\n" + + " `name` VARCHAR(2147483647)\n" + + ")\n" + + "COMMENT 'Another table comment'\n" + + "WITH (\n" + + " 'option_key_a' = 'option_value_a',\n" + + " 'option_key_b' = 'option_value_b',\n" + + " 'option_key_c' = 'option_value_c'\n" + + ")\n"); - argList.add( - Arguments.of( - createResolvedTable( - ONE_COLUMN_SCHEMA, - Collections.emptyMap(), - Arrays.asList("key1", "key2"), - null, - "comment"), - "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n" - + " `id` INT\n" - + ")\n" - + "COMMENT 'comment'\n" - + "PARTITIONED BY (`key1`, `key2`)\n")); + addTemporaryAndPermanent( + argList, + createResolvedTable( + ONE_COLUMN_SCHEMA, + Collections.emptyMap(), + Arrays.asList("key1", "key2"), + null, + "comment"), + "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n" + + " `id` INT\n" + + ")\n" + + "COMMENT 'comment'\n" + + "PARTITIONED BY (`key1`, `key2`)\n"); - argList.add( - Arguments.of( - createResolvedTable( - TWO_COLUMNS_SCHEMA, - options, - Arrays.asList("key1", "key2"), - TableDistribution.of( - TableDistribution.Kind.UNKNOWN, - 3, - Arrays.asList("1", "2", "3")), - "table comment"), - "CREATE TABLE `catalogName`.`dbName`.`tableName` (\n" - + " `id` INT,\n" - + " `name` VARCHAR(2147483647)\n" - + ")\n" - + "COMMENT 'table comment'\n" - + "DISTRIBUTED BY (`1`, `2`, `3`) INTO 3 BUCKETS\n" - + "PARTITIONED BY (`key1`, `key2`)\n" - + "WITH (\n" - + " 'option_key_a' = 'option_value_a',\n" - + " 'option_key_b' = 'option_value_b',\n" - + " 'option_key_c' = 'option_value_c'\n" - + ")\n")); + addTemporaryAndPermanent( + argList, + createResolvedTable( + TWO_COLUMNS_SCHEMA, + options, + Arrays.asList("key1", "key2"), + TableDistribution.of( + TableDistribution.Kind.UNKNOWN, 3, Arrays.asList("1", "2", "3")), + "table comment"), + "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n" + + " `id` INT,\n" + + " `name` VARCHAR(2147483647)\n" + + ")\n" + + "COMMENT 'table comment'\n" + + "DISTRIBUTED BY (`1`, `2`, `3`) INTO 3 BUCKETS\n" + + "PARTITIONED BY (`key1`, `key2`)\n" + + "WITH (\n" + + " 'option_key_a' = 'option_value_a',\n" + + " 'option_key_b' = 'option_value_b',\n" + + " 'option_key_c' = 'option_value_c'\n" + + ")\n"); return argList; } private static Collection argsForShowCreateMaterializedTable() { - Collection argList = new ArrayList<>(); - argList.add( - Arguments.of( - createResolvedMaterialized( - ONE_COLUMN_SCHEMA, - null, - List.of(), - IntervalFreshness.ofMinute("1"), - RefreshMode.CONTINUOUS, - "SELECT 1"), - "CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" - + " `id` INT\n" - + ")\n" - + "FRESHNESS = INTERVAL '1' MINUTE\n" - + "REFRESH_MODE = CONTINUOUS\n" - + "AS SELECT 1\n")); + final Collection argList = new ArrayList<>(); + addTemporaryAndPermanent( + argList, + createResolvedMaterialized( + ONE_COLUMN_SCHEMA, + null, + List.of(), + IntervalFreshness.ofMinute("1"), + RefreshMode.CONTINUOUS, + "SELECT 1"), + "CREATE %sMATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" + + " `id` INT\n" + + ")\n" + + "FRESHNESS = INTERVAL '1' MINUTE\n" + + "REFRESH_MODE = CONTINUOUS\n" + + "AS SELECT 1\n"); - argList.add( - Arguments.of( - createResolvedMaterialized( - TWO_COLUMNS_SCHEMA, - "Materialized table comment", - List.of("id"), - IntervalFreshness.ofMinute("3"), - RefreshMode.FULL, - "SELECT id, name FROM tbl_a"), - "CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" - + " `id` INT,\n" - + " `name` VARCHAR(2147483647)\n" - + ")\n" - + "COMMENT 'Materialized table comment'\n" - + "PARTITION BY (`id`)\n" - + "FRESHNESS = INTERVAL '3' MINUTE\n" - + "REFRESH_MODE = FULL\n" - + "AS SELECT id, name FROM tbl_a\n")); + addTemporaryAndPermanent( + argList, + createResolvedMaterialized( + ONE_COLUMN_SCHEMA, + null, + List.of(), + IntervalFreshness.ofMinute("1"), + RefreshMode.CONTINUOUS, + "SELECT 1"), + "CREATE %sMATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" + + " `id` INT\n" + + ")\n" + + "FRESHNESS = INTERVAL '1' MINUTE\n" + + "REFRESH_MODE = CONTINUOUS\n" + + "AS SELECT 1\n"); + + addTemporaryAndPermanent( + argList, + createResolvedMaterialized( + TWO_COLUMNS_SCHEMA, + "Materialized table comment", + List.of("id"), + IntervalFreshness.ofMinute("3"), + RefreshMode.FULL, + "SELECT id, name FROM tbl_a"), + "CREATE %sMATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" + + " `id` INT,\n" + + " `name` VARCHAR(2147483647)\n" + + ")\n" + + "COMMENT 'Materialized table comment'\n" + + "PARTITION BY (`id`)\n" + + "FRESHNESS = INTERVAL '3' MINUTE\n" + + "REFRESH_MODE = FULL\n" + + "AS SELECT id, name FROM tbl_a\n"); return argList; } + private static void addTemporaryAndPermanent( + Collection argList, CatalogBaseTable catalogBaseTable, String sql) { + argList.add(Arguments.of(catalogBaseTable, false, String.format(sql, ""))); + argList.add(Arguments.of(catalogBaseTable, true, String.format(sql, "TEMPORARY "))); + } + private static ResolvedCatalogTable createResolvedTable( ResolvedSchema resolvedSchema, Map options,