diff --git a/cloud/src/recycler/hdfs_accessor.cpp b/cloud/src/recycler/hdfs_accessor.cpp index 1999bcfa16543a..024acd0efe7852 100644 --- a/cloud/src/recycler/hdfs_accessor.cpp +++ b/cloud/src/recycler/hdfs_accessor.cpp @@ -356,6 +356,7 @@ std::string extract_parent_path(const std::string& path) { } int HdfsAccessor::init() { + TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed", (int)-1); // TODO(plat1ko): Cache hdfsFS fs_ = HDFSBuilder::create_fs(info_.build_conf()); if (!fs_) { diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index c7c9d8e0a02fe4..d8bbfa15fc01e1 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1464,7 +1464,8 @@ int InstanceRecycler::delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta return accessor->delete_files(file_paths); } -int InstanceRecycler::delete_rowset_data(const std::vector& rowsets) { +int InstanceRecycler::delete_rowset_data(const std::vector& rowsets, + RowsetRecyclingState type) { int ret = 0; // resource_id -> file_paths std::map> resource_file_paths; @@ -1472,7 +1473,9 @@ int InstanceRecycler::delete_rowset_data(const std::vector> rowsets_delete_by_prefix; for (const auto& rs : rowsets) { - { + // we have to treat tmp rowset as "orphans" that may not related to any existing tablets + // due to aborted schema change. + if (type == RowsetRecyclingState::FORMAL_ROWSET) { std::lock_guard lock(recycled_tablets_mtx_); if (recycled_tablets_.count(rs.tablet_id())) { continue; // Rowset data has already been deleted @@ -1499,7 +1502,7 @@ int InstanceRecycler::delete_rowset_data(const std::vector> index_ids; // default format as v1. InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V1; - + int inverted_index_get_ret = 0; if (rs.has_tablet_schema()) { for (const auto& index : rs.tablet_schema().index()) { if (index.has_index_type() && index.index_type() == IndexType::INVERTED) { @@ -1519,12 +1522,12 @@ int InstanceRecycler::delete_rowset_data(const std::vectorget(rs.index_id(), rs.schema_version(), index_info); - if (get_ret == 0) { + if (inverted_index_get_ret == 0) { index_format = index_info.first; index_ids = index_info.second; - } else if (get_ret == 1) { + } else if (inverted_index_get_ret == 1) { // 1. Schema kv not found means tablet has been recycled // Maybe some tablet recycle failed by some bugs // We need to delete again to double check @@ -1562,7 +1565,10 @@ int InstanceRecycler::delete_rowset_data(const std::vectorsubmit([&, rowset_keys_to_delete = std::move(rowset_keys_to_delete), rowsets_to_delete = std::move(rowsets_to_delete)]() { - if (delete_rowset_data(rowsets_to_delete) != 0) { + if (delete_rowset_data(rowsets_to_delete, RowsetRecyclingState::FORMAL_ROWSET) != 0) { LOG(WARNING) << "failed to delete rowset data, instance_id=" << instance_id_; return; } @@ -2225,7 +2231,7 @@ int InstanceRecycler::recycle_tmp_rowsets() { tmp_rowset_keys.clear(); tmp_rowsets.clear(); }); - if (delete_rowset_data(tmp_rowsets) != 0) { + if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET) != 0) { LOG(WARNING) << "failed to delete tmp rowset data, instance_id=" << instance_id_; return -1; } diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index cf23dcacd2fdca..84e4075e61be8d 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -110,6 +110,11 @@ class Recycler { std::shared_ptr txn_lazy_committer_; }; +enum class RowsetRecyclingState { + FORMAL_ROWSET, + TMP_ROWSET, +}; + class InstanceRecycler { public: explicit InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance, @@ -222,7 +227,8 @@ class InstanceRecycler { const std::string& rowset_id); // return 0 for success otherwise error - int delete_rowset_data(const std::vector& rowsets); + int delete_rowset_data(const std::vector& rowsets, + RowsetRecyclingState type); /** * Get stage storage info from instance and init StorageVaultAccessor diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 567d27f5d6f3c4..f47c50023b8219 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -1129,6 +1129,7 @@ TEST(RecyclerTest, recycle_indexes) { j & 1); auto tmp_rowset = create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5, schemas[j % 5], txn_id_base + j); + tmp_rowset.set_resource_id("recycle_indexes"); create_tmp_rowset(txn_kv.get(), accessor.get(), tmp_rowset, j & 1); } for (int j = 0; j < 10; ++j) { @@ -3132,7 +3133,7 @@ TEST(RecyclerTest, delete_rowset_data) { rowset_pbs.emplace_back(std::move(rowset)); } - ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs)); + ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET)); std::unique_ptr list_iter; ASSERT_EQ(0, accessor->list_all(&list_iter)); ASSERT_FALSE(list_iter->has_next()); @@ -3237,7 +3238,7 @@ TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) { rowset_pbs.emplace_back(std::move(rowset)); } - ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs)); + ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET)); std::unique_ptr list_iter; ASSERT_EQ(0, accessor->list_all(&list_iter)); ASSERT_FALSE(list_iter->has_next()); @@ -3352,6 +3353,11 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) { rs = resp->add_rowset_meta(); rs->set_resource_id("success_vault"); }); + sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = -1; + ret->second = true; + }); sp->enable_processing(); // succeed to init MockAccessor