Skip to content

Commit

Permalink
[test](mtmv) Fix sync mv not partition in rewrite test and some other…
Browse files Browse the repository at this point in the history
… test problems (apache#46546)

### What problem does this PR solve?

Fix sync mv not partition in rewrite because thought mv is already built
wrongly

In the `createMV()` method of `Suite.groovy`, it checks whether the most
recent materialized view of the current database's last table has been
built successfully. If a database has two tables, the synchronization
materialized view of the second table may not be completed, which could
incorrectly lead to the assumption that the build is complete.

To address this issue, modify the test case to ensure that there is at
most one table per database.
  • Loading branch information
seawinde authored Jan 15, 2025
1 parent 9c3b634 commit 4bd106b
Show file tree
Hide file tree
Showing 19 changed files with 158 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -501,8 +501,8 @@ private Statistics computeOlapScan(OlapScan olapScan) {
// mv is selected, return its estimated stats
Optional<Statistics> optStats = cascadesContext.getStatementContext()
.getStatistics(((Relation) olapScan).getRelationId());
LOG.info("computeOlapScan optStats isPresent {}, tableRowCount is {}",
optStats.isPresent(), tableRowCount);
LOG.info("computeOlapScan optStats isPresent {}, tableRowCount is {}, table name is {}",
optStats.isPresent(), tableRowCount, olapTable.getQualifiedName());
if (optStats.isPresent()) {
double selectedPartitionsRowCount = getSelectedPartitionRowCount(olapScan, tableRowCount);
LOG.info("computeOlapScan optStats is {}, selectedPartitionsRowCount is {}", optStats.get(),
Expand Down
14 changes: 11 additions & 3 deletions regression-test/data/nereids_syntax_p0/mv/newMv/multi_slot4.out
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_star --
-4 -4 -4 d
-4 -4 -4 d
-4 -4 -4 d
1 1 1 a
1 1 1 a
1 1 1 a
2 2 2 b
2 2 2 b
2 2 2 b
3 -3 \N c
3 -3 \N c
3 -3 \N c
3 -3 \N c

-- !select_mv --
-3 1
2 7
3 9
-3 3
2 21
3 27
4 \N

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !cmd --
table_for_mv_test mv_show_create_materialized_view CREATE MATERIALIZED VIEW mv_show_create_materialized_view AS\n SELECT id, name, SUM(value) AS total_value\n FROM table_for_mv_test\n GROUP BY id, name;\n
table_for_mv_test mv_show_create_materialized_view \n CREATE MATERIALIZED VIEW mv_show_create_materialized_view \n AS \n SELECT id, name, SUM(value) AS total_value\n FROM table_for_mv_test\n GROUP BY id, name\n

Original file line number Diff line number Diff line change
Expand Up @@ -742,10 +742,16 @@ class Suite implements GroovyInterceptable {
return result
}

// Should use create_sync_mv, this method only check the sync mv in current db
// If has multi sync mv in db, may make mistake
@Deprecated
void createMV(String sql) {
(new CreateMVAction(context, sql)).run()
}

// Should use create_sync_mv, this method only check the sync mv in current db
// If has multi sync mv in db, may make mistake
@Deprecated
void createMV(String sql, String expection) {
(new CreateMVAction(context, sql, expection)).run()
}
Expand Down Expand Up @@ -1475,80 +1481,101 @@ class Suite implements GroovyInterceptable {
return debugPoint
}

void waitingMTMVTaskFinishedByMvName(String mvName) {
def waitingMTMVTaskFinishedByMvName = { mvName, dbName = context.dbName ->
Thread.sleep(2000);
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvDatabaseName = '${dbName}' and MvName = '${mvName}' order by CreateTime DESC LIMIT 1"
String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
do {
List<String> toCheckTaskRow = new ArrayList<>();
while (timeoutTimestamp > System.currentTimeMillis() && (status != "SUCCESS")) {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(4)
}
logger.info("current db is " + dbName + ", showTasks is " + showTasks)
if (result.isEmpty()) {
logger.info("waitingMTMVTaskFinishedByMvName toCheckTaskRow is empty")
Thread.sleep(1000);
continue;
}
toCheckTaskRow = result.get(0);
status = toCheckTaskRow.get(4)
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL'))
}
if (status != "SUCCESS") {
logger.info("status is not success")
}
Assert.assertEquals("SUCCESS", status)
def show_tables = sql """
show tables from ${result.last().get(6)};
show tables from ${toCheckTaskRow.get(6)};
"""
def db_id = getDbId(result.last().get(6))
def table_id = getTableId(result.last().get(6), mvName)
def db_id = getDbId(toCheckTaskRow.get(6))
def table_id = getTableId(toCheckTaskRow.get(6), mvName)
logger.info("waitingMTMVTaskFinished analyze mv name is " + mvName
+ ", db name is " + result.last().get(6)
+ ", db name is " + toCheckTaskRow.get(6)
+ ", show_tables are " + show_tables
+ ", db_id is " + db_id
+ ", table_id " + table_id)
sql "analyze table ${result.last().get(6)}.${mvName} with sync;"
sql "analyze table ${toCheckTaskRow.get(6)}.${mvName} with sync;"
}

void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
def waitingMTMVTaskFinishedByMvNameAllowCancel = {mvName, dbName = context.dbName ->
Thread.sleep(2000);
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvName = '${mvName}' order by CreateTime ASC"
String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where MvDatabaseName = '${dbName}' and MvName = '${mvName}' order by CreateTime DESC LIMIT 1"

String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
do {
List<String> toCheckTaskRow = new ArrayList<>();
while (timeoutTimestamp > System.currentTimeMillis() && (status != "SUCCESS")) {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(4)
}
logger.info("current db is " + dbName + ", showTasks result: " + result.toString())
if (result.isEmpty()) {
logger.info("waitingMTMVTaskFinishedByMvName toCheckTaskRow is empty")
Thread.sleep(1000);
continue;
}
toCheckTaskRow = result.get(0)
status = toCheckTaskRow.get(4)
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status == 'PENDING' || status == 'RUNNING' || status == 'NULL' || status == 'CANCELED'))
}
if (status != "SUCCESS") {
logger.info("status is not success")
assertTrue(result.toString().contains("same table"))
}
// Need to analyze materialized view for cbo to choose the materialized view accurately
logger.info("waitingMTMVTaskFinished analyze mv name is " + result.last().get(5))
sql "analyze table ${result.last().get(6)}.${mvName} with sync;"
logger.info("waitingMTMVTaskFinished analyze mv name is " + toCheckTaskRow.get(5))
sql "analyze table ${toCheckTaskRow.get(6)}.${mvName} with sync;"
}

void waitingMVTaskFinishedByMvName(String dbName, String tableName) {
void waitingMVTaskFinishedByMvName(String dbName, String tableName, String indexName) {
Thread.sleep(2000)
String showTasks = "SHOW ALTER TABLE MATERIALIZED VIEW from ${dbName} where TableName='${tableName}' ORDER BY CreateTime ASC"
String showTasks = "SHOW ALTER TABLE MATERIALIZED VIEW from ${dbName} where TableName='${tableName}' ORDER BY CreateTime DESC"
String status = "NULL"
List<List<Object>> result
long startTime = System.currentTimeMillis()
long timeoutTimestamp = startTime + 5 * 60 * 1000 // 5 min
do {
List<String> toCheckTaskRow = new ArrayList<>();
while (timeoutTimestamp > System.currentTimeMillis() && (status != 'FINISHED')) {
result = sql(showTasks)
logger.info("result: " + result.toString())
if (!result.isEmpty()) {
status = result.last().get(8)
logger.info("crrent db is " + dbName + ", showTasks result: " + result.toString())
// just consider current db
for (List<String> taskRow : result) {
if (taskRow.get(5).equals(indexName)) {
toCheckTaskRow = taskRow;
}
}
if (toCheckTaskRow.isEmpty()) {
logger.info("waitingMVTaskFinishedByMvName toCheckTaskRow is empty")
Thread.sleep(1000);
continue;
}
status = toCheckTaskRow.get(8)
logger.info("The state of ${showTasks} is ${status}")
Thread.sleep(1000);
} while (timeoutTimestamp > System.currentTimeMillis() && (status != 'FINISHED'))
}
if (status != "FINISHED") {
logger.info("status is not success")
}
Expand Down Expand Up @@ -1925,6 +1952,15 @@ class Suite implements GroovyInterceptable {
return isReady
}

def create_sync_mv = { db, table_name, mv_name, mv_sql ->
sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_name} ON ${table_name};"""
sql"""
CREATE MATERIALIZED VIEW ${mv_name}
AS ${mv_sql}
"""
waitingMVTaskFinishedByMvName(db, table_name, mv_name)
}

def create_async_mv = { db, mv_name, mv_sql ->

sql """DROP MATERIALIZED VIEW IF EXISTS ${db}.${mv_name}"""
Expand Down
4 changes: 2 additions & 2 deletions regression-test/suites/auth_call/test_ddl_mv_auth.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ suite("test_ddl_mv_auth","p0,auth_call") {
connect(user, "${pwd}", context.config.jdbcUrl) {
sql """use ${dbName}"""
sql """create materialized view ${mvName} as select username from ${dbName}.${tableName};"""
waitingMVTaskFinishedByMvName(dbName, tableName)
waitingMVTaskFinishedByMvName(dbName, tableName, mvName)
sql """alter table ${dbName}.${tableName} add rollup ${rollupName}(username)"""
waitingMVTaskFinishedByMvName(dbName, tableName)
waitingMVTaskFinishedByMvName(dbName, tableName, rollupName)

def mv_res = sql """desc ${dbName}.${tableName} all;"""
logger.info("mv_res: " + mv_res)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ suite("test_select_column_auth","p0,auth") {
(3, "333");
"""
sql """refresh MATERIALIZED VIEW ${dbName}.${mtmv_name} auto"""
waitingMTMVTaskFinishedByMvName(mtmv_name)
waitingMTMVTaskFinishedByMvName(mtmv_name, dbName)

sql """grant select_priv on regression_test to ${user}"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

suite("test_mv_case") {

sql """drop table if exists test_table_aaa2;"""
sql """CREATE TABLE `test_table_aaa2` (
`ordernum` varchar(65533) NOT NULL ,
Expand All @@ -29,7 +30,7 @@ suite("test_mv_case") {
"replication_allocation" = "tag.location.default: 1"
);"""
sql """DROP MATERIALIZED VIEW IF EXISTS ods_zn_dnt_max1 ON test_table_aaa2;"""
createMV("""create materialized view ods_zn_dnt_max1 as
create_sync_mv(context.dbName, "test_table_aaa2", "ods_zn_dnt_max1", """
select ordernum,max(dnt) as dnt from test_table_aaa2
group by ordernum
ORDER BY ordernum;""")
Expand Down Expand Up @@ -92,7 +93,7 @@ suite("test_mv_case") {
)
"""
sql """insert into tb1 select id,map_agg(a, b) from(select 123 id,3 a,'5' b union all select 123 id, 6 a, '8' b) aa group by id"""
createMV ("""CREATE MATERIALIZED VIEW mv1 BUILD IMMEDIATE REFRESH COMPLETE ON SCHEDULE EVERY 10 MINUTE DUPLICATE KEY(info_id) DISTRIBUTED BY HASH(`info_id`) BUCKETS 2 PROPERTIES (
sql"""CREATE MATERIALIZED VIEW mv1 BUILD IMMEDIATE REFRESH COMPLETE ON SCHEDULE EVERY 10 MINUTE DUPLICATE KEY(info_id) DISTRIBUTED BY HASH(`info_id`) BUCKETS 2 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
Expand All @@ -112,8 +113,9 @@ suite("test_mv_case") {
cast(a.id as bigint) info_id,
map_infos
from
tb1 a;""")
createMV ("""CREATE MATERIALIZED VIEW mv2 BUILD IMMEDIATE REFRESH COMPLETE ON SCHEDULE EVERY 10 MINUTE DUPLICATE KEY(info_id) DISTRIBUTED BY HASH(`info_id`) BUCKETS 2 PROPERTIES (
tb1 a;"""
waitingMTMVTaskFinishedByMvName("mv1")
sql """CREATE MATERIALIZED VIEW mv2 BUILD IMMEDIATE REFRESH COMPLETE ON SCHEDULE EVERY 10 MINUTE DUPLICATE KEY(info_id) DISTRIBUTED BY HASH(`info_id`) BUCKETS 2 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"min_load_replica_num" = "-1",
"is_being_synced" = "false",
Expand All @@ -132,6 +134,7 @@ suite("test_mv_case") {
info_id,
map_infos
from
mv1 a;""")
mv1 a;"""
waitingMTMVTaskFinishedByMvName("mv2")
qt_select_mv """ select * from mv2 """
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ suite("create_view_use_mv") {
(3, 1, 1, 2, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx', '2023-10-19'),
(1, 3, 2, 2, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy', '2023-10-17');"""

createMV("""
CREATE MATERIALIZED VIEW t_mv_mv AS select
create_sync_mv(context.dbName, "orders", "t_mv_mv", """
select
o_orderkey,
sum(o_totalprice) as sum_total,
max(o_totalprice) as max_total,
Expand Down
28 changes: 14 additions & 14 deletions regression-test/suites/mtmv_p0/test_iceberg_mtmv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -107,35 +107,35 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_
sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:02:03', 1), ('2024-10-27 01:02:03', 2), ('2024-10-27 21:02:03', 3)"""
sql """CREATE MATERIALIZED VIEW ${mvName1} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable1}"""
sql """REFRESH MATERIALIZED VIEW ${mvName1} complete"""
waitingMTMVTaskFinishedByMvName(mvName1)
waitingMTMVTaskFinishedByMvName(mvName1, dbName)
qt_test_ts_refresh1 "select * from ${mvName1} order by value"

sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 21:02:03', 4)"""
sql """REFRESH MATERIALIZED VIEW ${mvName1} auto"""
waitingMTMVTaskFinishedByMvName(mvName1)
waitingMTMVTaskFinishedByMvName(mvName1, dbName)
qt_test_ts_refresh2 """select * from ${mvName1} order by value"""

sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-26 01:22:03', 5), ('2024-10-27 01:12:03', 6);"""
sql """REFRESH MATERIALIZED VIEW ${mvName1} partitions(p_20241026000000_20241027000000);"""
waitingMTMVTaskFinishedByMvName(mvName1)
waitingMTMVTaskFinishedByMvName(mvName1, dbName)
qt_test_ts_refresh3 """select * from ${mvName1} order by value"""

sql """REFRESH MATERIALIZED VIEW ${mvName1} auto"""
waitingMTMVTaskFinishedByMvName(mvName1)
waitingMTMVTaskFinishedByMvName(mvName1, dbName)
qt_test_ts_refresh4 """select * from ${mvName1} order by value"""

sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values ('2024-10-28 01:22:03', 7);"""
sql """REFRESH MATERIALIZED VIEW ${mvName1} partitions(p_20241026000000_20241027000000);"""
waitingMTMVTaskFinishedByMvName(mvName1)
waitingMTMVTaskFinishedByMvName(mvName1, dbName)
qt_test_ts_refresh5 """select * from ${mvName1} order by value"""

sql """REFRESH MATERIALIZED VIEW ${mvName1} auto"""
waitingMTMVTaskFinishedByMvName(mvName1)
waitingMTMVTaskFinishedByMvName(mvName1, dbName)
qt_test_ts_refresh6 """select * from ${mvName1} order by value"""

sql """insert into ${catalog_name}.${icebergDb}.${icebergTable1} values (null, 8);"""
sql """REFRESH MATERIALIZED VIEW ${mvName1} auto"""
waitingMTMVTaskFinishedByMvName(mvName1)
waitingMTMVTaskFinishedByMvName(mvName1, dbName)
qt_test_ts_refresh_null """select * from ${mvName1} order by value"""

def showPartitionsResult = sql """show partitions from ${mvName1}"""
Expand Down Expand Up @@ -176,25 +176,25 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_
sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-08-26', 1), ('2024-09-17', 2), ('2024-09-27', 3);"""
sql """CREATE MATERIALIZED VIEW ${mvName2} BUILD DEFERRED REFRESH AUTO ON MANUAL partition by(`d`) DISTRIBUTED BY RANDOM BUCKETS 2 PROPERTIES ('replication_num' = '1') as SELECT * FROM ${catalog_name}.${icebergDb}.${icebergTable2}"""
sql """REFRESH MATERIALIZED VIEW ${mvName2} complete"""
waitingMTMVTaskFinishedByMvName(mvName2)
waitingMTMVTaskFinishedByMvName(mvName2, dbName)
qt_test_d_refresh1 "select * from ${mvName2} order by value"

sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-09-01', 4);"""
sql """REFRESH MATERIALIZED VIEW ${mvName2} auto"""
waitingMTMVTaskFinishedByMvName(mvName2)
waitingMTMVTaskFinishedByMvName(mvName2, dbName)
qt_test_d_refresh2 "select * from ${mvName2} order by value"

sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-08-22', 5), ('2024-09-30', 6);"""
sql """REFRESH MATERIALIZED VIEW ${mvName2} partitions(p_20240801_20240901);"""
waitingMTMVTaskFinishedByMvName(mvName2)
waitingMTMVTaskFinishedByMvName(mvName2, dbName)
qt_test_d_refresh3 "select * from ${mvName2} order by value"
sql """REFRESH MATERIALIZED VIEW ${mvName2} partitions(p_20240901_20241001);"""
waitingMTMVTaskFinishedByMvName(mvName2)
waitingMTMVTaskFinishedByMvName(mvName2, dbName)
qt_test_d_refresh4 "select * from ${mvName2} order by value"

sql """insert into ${catalog_name}.${icebergDb}.${icebergTable2} values ('2024-10-28', 7);"""
sql """REFRESH MATERIALIZED VIEW ${mvName2} auto"""
waitingMTMVTaskFinishedByMvName(mvName2)
waitingMTMVTaskFinishedByMvName(mvName2, dbName)
qt_test_d_refresh5 "select * from ${mvName2} order by value"

showPartitionsResult = sql """show partitions from ${mvName2}"""
Expand Down Expand Up @@ -240,7 +240,7 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_

// refresh one partiton
sql """REFRESH MATERIALIZED VIEW ${mvName} partitions(p_20240101000000_20240102000000);"""
waitingMTMVTaskFinishedByMvName(mvName)
waitingMTMVTaskFinishedByMvName(mvName, dbName)
order_qt_refresh_one_partition "SELECT * FROM ${mvName} "
def explainOnePartition = sql """ explain ${mvSql} """
logger.info("explainOnePartition: " + explainOnePartition.toString())
Expand All @@ -250,7 +250,7 @@ suite("test_iceberg_mtmv", "p0,external,iceberg,external_docker,external_docker_

//refresh auto
sql """REFRESH MATERIALIZED VIEW ${mvName} auto"""
waitingMTMVTaskFinishedByMvName(mvName)
waitingMTMVTaskFinishedByMvName(mvName, dbName)
order_qt_refresh_auto "SELECT * FROM ${mvName} "
def explainAllPartition = sql """ explain ${mvSql}; """
logger.info("explainAllPartition: " + explainAllPartition.toString())
Expand Down
Loading

0 comments on commit 4bd106b

Please sign in to comment.