diff --git a/regression-test/suites/fault_injection_p0/test_disable_move_memtable.groovy b/regression-test/suites/fault_injection_p0/test_disable_move_memtable.groovy index 83b04fb514972e..5bca47cb71c1b1 100644 --- a/regression-test/suites/fault_injection_p0/test_disable_move_memtable.groovy +++ b/regression-test/suites/fault_injection_p0/test_disable_move_memtable.groovy @@ -19,288 +19,290 @@ import org.codehaus.groovy.runtime.IOGroovyMethods import org.apache.doris.regression.util.Http suite("test_disable_move_memtable", "nonConcurrent") { - sql """ set enable_memtable_on_sink_node=true """ - sql """ DROP TABLE IF EXISTS `baseall` """ - sql """ DROP TABLE IF EXISTS `test` """ - sql """ DROP TABLE IF EXISTS `baseall1` """ - sql """ DROP TABLE IF EXISTS `test1` """ - sql """ DROP TABLE IF EXISTS `brokerload` """ - sql """ DROP TABLE IF EXISTS `brokerload1` """ - sql """ sync """ - sql """ - CREATE TABLE IF NOT EXISTS `baseall` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties( - "light_schema_change" = "true", - "replication_num" = "1" - ) + if (!isCloudMode()) { + sql """ set enable_memtable_on_sink_node=true """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ DROP TABLE IF EXISTS `baseall1` """ + sql """ DROP TABLE IF EXISTS `test1` """ + sql """ DROP TABLE IF EXISTS `brokerload` """ + sql """ DROP TABLE IF EXISTS `brokerload1` """ + sql """ sync """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties( + "light_schema_change" = "true", + "replication_num" = "1" + ) + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties( + "light_schema_change" = "true", + "replication_num" = "1" + ) + """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall1` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties( + "light_schema_change" = "false", + "replication_num" = "1" + ) + """ + sql """ + CREATE TABLE IF NOT EXISTS `test1` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties( + "light_schema_change" = "false", + "replication_num" = "1" + ) """ - sql """ - CREATE TABLE IF NOT EXISTS `test` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace_if_not_null null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties( - "light_schema_change" = "true", - "replication_num" = "1" - ) + sql """ + CREATE TABLE IF NOT EXISTS brokerload ( + user_id bigint, + date date, + group_id bigint, + modify_date date, + keyword VARCHAR(128) + ) ENGINE=OLAP + UNIQUE KEY(user_id, date, group_id) + COMMENT 'OLAP' + DISTRIBUTED BY HASH (user_id) BUCKETS 32 + PROPERTIES ( + "replication_num" = "1", + "light_schema_change" = "true" + ); """ - sql """ - CREATE TABLE IF NOT EXISTS `baseall1` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties( - "light_schema_change" = "false", - "replication_num" = "1" - ) + sql """ + CREATE TABLE IF NOT EXISTS brokerload1 ( + user_id bigint, + date date, + group_id bigint, + modify_date date, + keyword VARCHAR(128) + ) ENGINE=OLAP + UNIQUE KEY(user_id, date, group_id) + COMMENT 'OLAP' + DISTRIBUTED BY HASH (user_id) BUCKETS 32 + PROPERTIES ( + "replication_num" = "1", + "light_schema_change" = "false" + ); """ - sql """ - CREATE TABLE IF NOT EXISTS `test1` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace_if_not_null null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties( - "light_schema_change" = "false", - "replication_num" = "1" - ) - """ - sql """ - CREATE TABLE IF NOT EXISTS brokerload ( - user_id bigint, - date date, - group_id bigint, - modify_date date, - keyword VARCHAR(128) - ) ENGINE=OLAP - UNIQUE KEY(user_id, date, group_id) - COMMENT 'OLAP' - DISTRIBUTED BY HASH (user_id) BUCKETS 32 - PROPERTIES ( - "replication_num" = "1", - "light_schema_change" = "true" - ); - """ - sql """ - CREATE TABLE IF NOT EXISTS brokerload1 ( - user_id bigint, - date date, - group_id bigint, - modify_date date, - keyword VARCHAR(128) - ) ENGINE=OLAP - UNIQUE KEY(user_id, date, group_id) - COMMENT 'OLAP' - DISTRIBUTED BY HASH (user_id) BUCKETS 32 - PROPERTIES ( - "replication_num" = "1", - "light_schema_change" = "false" - ); - """ - GetDebugPoint().clearDebugPointsForAllBEs() - streamLoad { - table "baseall" - db "regression_test_fault_injection_p0" - set 'column_separator', ',' - file "baseall.txt" - } - sql """ sync """ + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } + sql """ sync """ - def insert_into_value_with_injection = { injection, tableName, error_msg-> - try { - GetDebugPoint().enableDebugPointForAllBEs(injection) - sql """ insert into ${tableName} values(true, 10, 1000, 1, 1, 1, 'a', 2024-01-01, 2024-01-01, 'a', 1, 1, "hello", 1) """ - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains(error_msg)) - } finally { - GetDebugPoint().disableDebugPointForAllBEs(injection) + def insert_into_value_with_injection = { injection, tableName, error_msg-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql """ insert into ${tableName} values(true, 10, 1000, 1, 1, 1, 'a', 2024-01-01, 2024-01-01, 'a', 1, 1, "hello", 1) """ + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg)) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection) + } } - } - def insert_into_select_with_injection = { injection, tableName, error_msg-> - try { - GetDebugPoint().enableDebugPointForAllBEs(injection) - sql "insert into ${tableName} select * from baseall where k1 <= 3" - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains(error_msg)) - } finally { - GetDebugPoint().disableDebugPointForAllBEs(injection) + def insert_into_select_with_injection = { injection, tableName, error_msg-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql "insert into ${tableName} select * from baseall where k1 <= 3" + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg)) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection) + } } - } - def stream_load_with_injection = { injection, tableName, res-> - try { - GetDebugPoint().enableDebugPointForAllBEs(injection) - streamLoad { - table tableName - db "regression_test_fault_injection_p0" - set 'column_separator', ',' - set 'memtable_on_sink_node', 'true' - file "baseall.txt" + def stream_load_with_injection = { injection, tableName, res-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + streamLoad { + table tableName + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + set 'memtable_on_sink_node', 'true' + file "baseall.txt" - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("res: ${result}".toString()) + def json = parseJson(result) + assertEquals("${res}".toString(), json.Status.toLowerCase().toString()) } - log.info("res: ${result}".toString()) - def json = parseJson(result) - assertEquals("${res}".toString(), json.Status.toLowerCase().toString()) } + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection) } - } finally { - GetDebugPoint().disableDebugPointForAllBEs(injection) } - } - def load_from_hdfs_norm = {tableName, label, hdfsFilePath, format, brokerName, hdfsUser, hdfsPasswd -> - def result1= sql """ - LOAD LABEL ${label} ( - DATA INFILE("${hdfsFilePath}") - INTO TABLE ${tableName} - FORMAT as "${format}" - PROPERTIES ("num_as_string"="true") - ) - with BROKER "${brokerName}" ( - "username"="${hdfsUser}", - "password"="${hdfsPasswd}") - PROPERTIES ( - "timeout"="1200", - "max_filter_ratio"="0.1"); - """ - log.info("result1: ${result1}") - assertTrue(result1.size() == 1) - assertTrue(result1[0].size() == 1) - assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected") - } + def load_from_hdfs_norm = {tableName, label, hdfsFilePath, format, brokerName, hdfsUser, hdfsPasswd -> + def result1= sql """ + LOAD LABEL ${label} ( + DATA INFILE("${hdfsFilePath}") + INTO TABLE ${tableName} + FORMAT as "${format}" + PROPERTIES ("num_as_string"="true") + ) + with BROKER "${brokerName}" ( + "username"="${hdfsUser}", + "password"="${hdfsPasswd}") + PROPERTIES ( + "timeout"="1200", + "max_filter_ratio"="0.1"); + """ + log.info("result1: ${result1}") + assertTrue(result1.size() == 1) + assertTrue(result1[0].size() == 1) + assertTrue(result1[0][0] == 0, "Query OK, 0 rows affected") + } - def check_load_result = {checklabel, testTablex, res -> - max_try_milli_secs = 10000 - while(max_try_milli_secs) { - result = sql "show load where label = '${checklabel}'" - log.info("result: ${result}") - if(result[0][2].toString() == "${res}".toString()) { - break - } else { - sleep(1000) // wait 1 second every time - max_try_milli_secs -= 1000 - if(max_try_milli_secs <= 0) { - assertEquals(1, 2) + def check_load_result = {checklabel, testTablex, res -> + max_try_milli_secs = 10000 + while(max_try_milli_secs) { + result = sql "show load where label = '${checklabel}'" + log.info("result: ${result}") + if(result[0][2].toString() == "${res}".toString()) { + break + } else { + sleep(1000) // wait 1 second every time + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertEquals(1, 2) + } } } } - } - def broker_load_with_injection = { injection, tableName, res-> - try { - GetDebugPoint().enableDebugPointForAllBEs(injection) - if (enableHdfs()) { - brokerName = getBrokerName() - hdfsUser = getHdfsUser() - hdfsPasswd = getHdfsPasswd() - def hdfs_csv_file_path = uploadToHdfs "load_p0/broker_load/broker_load_with_properties.json" - def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") - load_from_hdfs_norm.call(tableName, test_load_label, hdfs_csv_file_path, "json", - brokerName, hdfsUser, hdfsPasswd) - check_load_result.call(test_load_label, tableName, res) + def broker_load_with_injection = { injection, tableName, res-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + if (enableHdfs()) { + brokerName = getBrokerName() + hdfsUser = getHdfsUser() + hdfsPasswd = getHdfsPasswd() + def hdfs_csv_file_path = uploadToHdfs "load_p0/broker_load/broker_load_with_properties.json" + def test_load_label = UUID.randomUUID().toString().replaceAll("-", "") + load_from_hdfs_norm.call(tableName, test_load_label, hdfs_csv_file_path, "json", + brokerName, hdfsUser, hdfsPasswd) + check_load_result.call(test_load_label, tableName, res) + } + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection) } - } finally { - GetDebugPoint().disableDebugPointForAllBEs(injection) } - } - insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test", "unknown destination tuple descriptor") - insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test1", "success") - sql """ set enable_insert_strict = false """ - sql """ set group_commit = sync_mode """ - insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test", "unknown destination tuple descriptor") - insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test1", "success") - sql """ set enable_insert_strict = true """ - sql """ set group_commit = sync_mode """ - insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test", "unknown destination tuple descriptor") - insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test1", "success") - sql """ set group_commit = off_mode """ - insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test", "unknown destination tuple descriptor") - insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test1", "success") + insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test", "unknown destination tuple descriptor") + insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test1", "success") + sql """ set enable_insert_strict = false """ + sql """ set group_commit = sync_mode """ + insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test", "unknown destination tuple descriptor") + insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test1", "success") + sql """ set enable_insert_strict = true """ + sql """ set group_commit = sync_mode """ + insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test", "unknown destination tuple descriptor") + insert_into_value_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test1", "success") + sql """ set group_commit = off_mode """ + insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test", "unknown destination tuple descriptor") + insert_into_select_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "test1", "success") - if (isGroupCommitMode()) { - def ret = sql "SHOW FRONTEND CONFIG like '%stream_load_default_memtable_on_sink_node%';" - logger.info("${ret}") - try { - sql "ADMIN SET FRONTEND CONFIG ('stream_load_default_memtable_on_sink_node' = 'true')" - sql """ set enable_nereids_planner=true """ - stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall", "fail") - stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall1", "fail") - } finally { - sql "ADMIN SET FRONTEND CONFIG ('stream_load_default_memtable_on_sink_node' = '${ret[0][1]}')" + if (isGroupCommitMode()) { + def ret = sql "SHOW FRONTEND CONFIG like '%stream_load_default_memtable_on_sink_node%';" + logger.info("${ret}") + try { + sql "ADMIN SET FRONTEND CONFIG ('stream_load_default_memtable_on_sink_node' = 'true')" + sql """ set enable_nereids_planner=true """ + stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall", "fail") + stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall1", "fail") + } finally { + sql "ADMIN SET FRONTEND CONFIG ('stream_load_default_memtable_on_sink_node' = '${ret[0][1]}')" + } + return } - return - } - stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall", "fail") - stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall1", "success") + stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall", "fail") + stream_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall1", "success") - broker_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall", "CANCELLED") - broker_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall1", "FINISHED") + broker_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall", "CANCELLED") + broker_load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "baseall1", "FINISHED") - sql """ set enable_memtable_on_sink_node=false """ - sql """ DROP TABLE IF EXISTS `baseall` """ - sql """ DROP TABLE IF EXISTS `test` """ - sql """ DROP TABLE IF EXISTS `baseall1` """ - sql """ DROP TABLE IF EXISTS `test1` """ - sql """ DROP TABLE IF EXISTS `brokerload` """ - sql """ DROP TABLE IF EXISTS `brokerload1` """ - sql """ sync """ -} \ No newline at end of file + sql """ set enable_memtable_on_sink_node=false """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ DROP TABLE IF EXISTS `baseall1` """ + sql """ DROP TABLE IF EXISTS `test1` """ + sql """ DROP TABLE IF EXISTS `brokerload` """ + sql """ DROP TABLE IF EXISTS `brokerload1` """ + sql """ sync """ + } +} diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_back_pressure_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_back_pressure_fault_injection.groovy index bccaa8aa62f84d..677402098e7a3e 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_back_pressure_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_back_pressure_fault_injection.groovy @@ -19,84 +19,86 @@ import org.codehaus.groovy.runtime.IOGroovyMethods import org.apache.doris.regression.util.Http suite("test_load_stream_back_pressure_fault_injection", "nonConcurrent") { - sql """ set enable_memtable_on_sink_node=true """ - sql """ DROP TABLE IF EXISTS `baseall` """ - sql """ DROP TABLE IF EXISTS `test` """ - sql """ - CREATE TABLE IF NOT EXISTS `baseall` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ - sql """ - CREATE TABLE IF NOT EXISTS `test` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace_if_not_null null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ + if (!isCloudMode()) { + sql """ set enable_memtable_on_sink_node=true """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ - GetDebugPoint().clearDebugPointsForAllBEs() - streamLoad { - table "baseall" - db "regression_test_fault_injection_p0" - set 'column_separator', ',' - file "baseall.txt" - } + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } - try { - GetDebugPoint().enableDebugPointForAllBEs("TabletStream.append_data.long_wait") - // the kill thread only means to end the test faster when the code does not behave as expected - def kill_thread = new Thread({ - sleep(5000) - def processList = sql "show processlist" - logger.info(processList.toString()) - processList.each { item -> - logger.info(item[1].toString()) - logger.info(item[11].toString()) - if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){ - def res = sql "kill ${item[1]}" - logger.info(res.toString()) + try { + GetDebugPoint().enableDebugPointForAllBEs("TabletStream.append_data.long_wait") + // the kill thread only means to end the test faster when the code does not behave as expected + def kill_thread = new Thread({ + sleep(5000) + def processList = sql "show processlist" + logger.info(processList.toString()) + processList.each { item -> + logger.info(item[1].toString()) + logger.info(item[11].toString()) + if (item[11].toString() == "insert into test select * from baseall where k1 <= 3".toString()){ + def res = sql "kill ${item[1]}" + logger.info(res.toString()) + } } - } - }) - kill_thread.start() - def res = sql "insert into test select * from baseall where k1 <= 3" - logger.info(res.toString()) - assertTrue(false, "Expected exception to be thrown") - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains("wait flush token back pressure time is more than load_stream_max_wait_flush_token_time")) - } finally { - GetDebugPoint().disableDebugPointForAllBEs("TabletStream.append_data.long_wait") - } + }) + kill_thread.start() + def res = sql "insert into test select * from baseall where k1 <= 3" + logger.info(res.toString()) + assertTrue(false, "Expected exception to be thrown") + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("wait flush token back pressure time is more than load_stream_max_wait_flush_token_time")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("TabletStream.append_data.long_wait") + } - sql """ DROP TABLE IF EXISTS `baseall` """ - sql """ DROP TABLE IF EXISTS `test` """ - sql """ set enable_memtable_on_sink_node=false """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ set enable_memtable_on_sink_node=false """ + } } diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy index 58b6ba4a075e33..72a892dcb1b7d8 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_stub_close_wait_fault_injection.groovy @@ -19,71 +19,73 @@ import org.codehaus.groovy.runtime.IOGroovyMethods import org.apache.doris.regression.util.Http suite("test_load_stream_stub_close_wait_fault_injection", "nonConcurrent") { - sql """ set enable_memtable_on_sink_node=true """ - sql """ DROP TABLE IF EXISTS `baseall` """ - sql """ DROP TABLE IF EXISTS `test` """ - sql """ - CREATE TABLE IF NOT EXISTS `baseall` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ - sql """ - CREATE TABLE IF NOT EXISTS `test` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace_if_not_null null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ + if (!isCloudMode()) { + sql """ set enable_memtable_on_sink_node=true """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ - GetDebugPoint().clearDebugPointsForAllBEs() - streamLoad { - table "baseall" - db "regression_test_fault_injection_p0" - set 'column_separator', ',' - file "baseall.txt" - } + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } - try { - GetDebugPoint().enableDebugPointForAllBEs("VTabletWriterV2.close.cancel") - GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait") - def res = sql "insert into test select * from baseall where k1 <= 3" - logger.info(res.toString()) - assertTrue(false, "Expected Exception to be thrown") - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains("cancel")) - } finally { - GetDebugPoint().disableDebugPointForAllBEs("VTabletWriterV2.close.cancel") - GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait") - } + try { + GetDebugPoint().enableDebugPointForAllBEs("VTabletWriterV2.close.cancel") + GetDebugPoint().enableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait") + def res = sql "insert into test select * from baseall where k1 <= 3" + logger.info(res.toString()) + assertTrue(false, "Expected Exception to be thrown") + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("cancel")) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("VTabletWriterV2.close.cancel") + GetDebugPoint().disableDebugPointForAllBEs("LoadStreamStub::close_wait.long_wait") + } - sql """ DROP TABLE IF EXISTS `baseall` """ - sql """ DROP TABLE IF EXISTS `test` """ - sql """ set enable_memtable_on_sink_node=false """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ set enable_memtable_on_sink_node=false """ + } } diff --git a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy index 48c32883302e40..7f6172de64c06c 100644 --- a/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_load_stream_stub_failure_injection.groovy @@ -19,77 +19,79 @@ import org.codehaus.groovy.runtime.IOGroovyMethods import org.apache.doris.regression.util.Http suite("test_stream_stub_fault_injection", "nonConcurrent") { - sql """ set enable_memtable_on_sink_node=true """ - sql """ DROP TABLE IF EXISTS `baseall` """ - sql """ DROP TABLE IF EXISTS `test` """ - sql """ - CREATE TABLE IF NOT EXISTS `baseall` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ - sql """ - CREATE TABLE IF NOT EXISTS `test` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace_if_not_null null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ + if (!isCloudMode()) { + sql """ set enable_memtable_on_sink_node=true """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ - GetDebugPoint().clearDebugPointsForAllBEs() - streamLoad { - table "baseall" - db "regression_test_fault_injection_p0" - set 'column_separator', ',' - file "baseall.txt" - } + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } - def load_with_injection = { injection, error_msg, success=false-> - try { - GetDebugPoint().enableDebugPointForAllBEs(injection) - sql "insert into test select * from baseall where k1 <= 3" - assertTrue(success, String.format("Expected Exception '%s', actual success", expect_errmsg)) - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains(error_msg)) - } finally { - GetDebugPoint().disableDebugPointForAllBEs(injection) + def load_with_injection = { injection, error_msg, success=false-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql "insert into test select * from baseall where k1 <= 3" + assertTrue(success, String.format("Expected Exception '%s', actual success", expect_errmsg)) + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg)) + } finally { + GetDebugPoint().disableDebugPointForAllBEs(injection) + } } - } - // StreamSinkFileWriter appendv write segment failed all replica - load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") - // StreamSinkFileWriter finalize failed - load_with_injection("StreamSinkFileWriter.finalize.finalize_failed", "failed to send segment eos to any replicas") - // LoadStreams stream wait failed - load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed", "StreamWrite failed, err=32") + // StreamSinkFileWriter appendv write segment failed all replica + load_with_injection("StreamSinkFileWriter.appendv.write_segment_failed_all_replica", "failed to send segment data to any replicas") + // StreamSinkFileWriter finalize failed + load_with_injection("StreamSinkFileWriter.finalize.finalize_failed", "failed to send segment eos to any replicas") + // LoadStreams stream wait failed + load_with_injection("LoadStreamStub._send_with_retry.stream_write_failed", "StreamWrite failed, err=32") - sql """ DROP TABLE IF EXISTS `baseall` """ - sql """ DROP TABLE IF EXISTS `test` """ - sql """ set enable_memtable_on_sink_node=false """ + sql """ DROP TABLE IF EXISTS `baseall` """ + sql """ DROP TABLE IF EXISTS `test` """ + sql """ set enable_memtable_on_sink_node=false """ + } } diff --git a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy index 4e235daf97c20c..4c01513c612d45 100644 --- a/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_multi_replica_fault_injection.groovy @@ -26,7 +26,7 @@ suite("test_multi_replica_fault_injection", "nonConcurrent") { beNums++; logger.info(item.toString()) } - if (beNums >= 3){ + if (!isCloudMode() && beNums >= 3){ sql """ set enable_memtable_on_sink_node=true """ sql """ CREATE TABLE IF NOT EXISTS `baseall` ( diff --git a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy index 7e71de173e96f9..30854cfb50b67f 100644 --- a/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy +++ b/regression-test/suites/fault_injection_p0/test_writer_v2_fault_injection.groovy @@ -19,89 +19,91 @@ import org.codehaus.groovy.runtime.IOGroovyMethods import org.apache.doris.regression.util.Http suite("test_writer_v2_fault_injection", "nonConcurrent") { - sql """ set enable_memtable_on_sink_node=true """ - sql """ - CREATE TABLE IF NOT EXISTS `baseall` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ - sql """ - CREATE TABLE IF NOT EXISTS `test` ( - `k0` boolean null comment "", - `k1` tinyint(4) null comment "", - `k2` smallint(6) null comment "", - `k3` int(11) null comment "", - `k4` bigint(20) null comment "", - `k5` decimal(9, 3) null comment "", - `k6` char(5) null comment "", - `k10` date null comment "", - `k11` datetime null comment "", - `k7` varchar(20) null comment "", - `k8` double max null comment "", - `k9` float sum null comment "", - `k12` string replace_if_not_null null comment "", - `k13` largeint(40) replace null comment "" - ) engine=olap - DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") - """ + if (!isCloudMode()) { + sql """ set enable_memtable_on_sink_node=true """ + sql """ + CREATE TABLE IF NOT EXISTS `baseall` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ + sql """ + CREATE TABLE IF NOT EXISTS `test` ( + `k0` boolean null comment "", + `k1` tinyint(4) null comment "", + `k2` smallint(6) null comment "", + `k3` int(11) null comment "", + `k4` bigint(20) null comment "", + `k5` decimal(9, 3) null comment "", + `k6` char(5) null comment "", + `k10` date null comment "", + `k11` datetime null comment "", + `k7` varchar(20) null comment "", + `k8` double max null comment "", + `k9` float sum null comment "", + `k12` string replace_if_not_null null comment "", + `k13` largeint(40) replace null comment "" + ) engine=olap + DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1") + """ - GetDebugPoint().clearDebugPointsForAllBEs() - streamLoad { - table "baseall" - db "regression_test_fault_injection_p0" - set 'column_separator', ',' - file "baseall.txt" - } + GetDebugPoint().clearDebugPointsForAllBEs() + streamLoad { + table "baseall" + db "regression_test_fault_injection_p0" + set 'column_separator', ',' + file "baseall.txt" + } - def load_with_injection = { injection, error_msg, success=false-> - try { - GetDebugPoint().enableDebugPointForAllBEs(injection) - sql "insert into test select * from baseall where k1 <= 3" - assertTrue(success, String.format("expected Exception '%s', actual success", error_msg)) - } catch(Exception e) { - logger.info(e.getMessage()) - assertTrue(e.getMessage().contains(error_msg), - String.format("expected '%s', actual '%s'", error_msg, e.getMessage())) - } finally { - sleep 1000 // wait some time for instance finish before disable injection - GetDebugPoint().disableDebugPointForAllBEs(injection) + def load_with_injection = { injection, error_msg, success=false-> + try { + GetDebugPoint().enableDebugPointForAllBEs(injection) + sql "insert into test select * from baseall where k1 <= 3" + assertTrue(success, String.format("expected Exception '%s', actual success", error_msg)) + } catch(Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains(error_msg), + String.format("expected '%s', actual '%s'", error_msg, e.getMessage())) + } finally { + sleep 1000 // wait some time for instance finish before disable injection + GetDebugPoint().disableDebugPointForAllBEs(injection) + } } - } - // VTabletWriterV2 _output_tuple_desc is null - load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "unknown destination tuple descriptor") - // VTabletWriterV2 _vec_output_expr_ctxs not equal _output_tuple_slot - load_with_injection("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot", "should be equal to output_expr_num") - // VTabletWriterV2 node_info is null - load_with_injection("VTabletWriterV2._open_streams_to_backend.node_info_null", "failed to open streams to any BE") - // VTabletWriterV2 do not get tablet schema on open_streams - load_with_injection("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams", "success", true) - // VTabletWriterV2 tablet_location is null - load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", "unknown tablet location") - // VTabletWriterV2 location is null - load_with_injection("VTabletWriterV2._select_streams.location_null", "failed to open DeltaWriter") - // VTabletWriterV2 index not found - load_with_injection("VTabletWriterV2._write_memtable.index_not_found", "failed to open DeltaWriter") - // VTabletWriterV2 cancel - load_with_injection("VTabletWriterV2.close.cancel", "load cancel") - // VTabletWriterV2 load timeout before close_wait - load_with_injection("VTabletWriterV2._close_wait.load_timeout", "load timed out before close waiting") - // DeltaWriterV2 stream_size is 0 - load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema") + // VTabletWriterV2 _output_tuple_desc is null + load_with_injection("VTabletWriterV2._init._output_tuple_desc_null", "unknown destination tuple descriptor") + // VTabletWriterV2 _vec_output_expr_ctxs not equal _output_tuple_slot + load_with_injection("VTabletWriterV2._init._vec_output_expr_ctxs_not_equal_output_tuple_slot", "should be equal to output_expr_num") + // VTabletWriterV2 node_info is null + load_with_injection("VTabletWriterV2._open_streams_to_backend.node_info_null", "failed to open streams to any BE") + // VTabletWriterV2 do not get tablet schema on open_streams + load_with_injection("VTabletWriterV2._open_streams_to_backend.no_schema_when_open_streams", "success", true) + // VTabletWriterV2 tablet_location is null + load_with_injection("VTabletWriterV2._build_tablet_node_mapping.tablet_location_null", "unknown tablet location") + // VTabletWriterV2 location is null + load_with_injection("VTabletWriterV2._select_streams.location_null", "failed to open DeltaWriter") + // VTabletWriterV2 index not found + load_with_injection("VTabletWriterV2._write_memtable.index_not_found", "failed to open DeltaWriter") + // VTabletWriterV2 cancel + load_with_injection("VTabletWriterV2.close.cancel", "load cancel") + // VTabletWriterV2 load timeout before close_wait + load_with_injection("VTabletWriterV2._close_wait.load_timeout", "load timed out before close waiting") + // DeltaWriterV2 stream_size is 0 + load_with_injection("DeltaWriterV2.init.stream_size", "failed to find tablet schema") - sql """ set enable_memtable_on_sink_node=false """ + sql """ set enable_memtable_on_sink_node=false """ + } }