From aa9653228dd6a4ca2bff2bc0f5c8569dabd47841 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Wed, 29 Jan 2025 23:38:03 +0800 Subject: [PATCH 1/4] 1 --- cloud/src/recycler/recycler.cpp | 25 +++++++++++++++---------- cloud/src/recycler/recycler.h | 3 ++- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index c7c9d8e0a02fe4..9e1d0c3108e152 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1464,7 +1464,8 @@ int InstanceRecycler::delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta return accessor->delete_files(file_paths); } -int InstanceRecycler::delete_rowset_data(const std::vector& rowsets) { +int InstanceRecycler::delete_rowset_data(const std::vector& rowsets, + bool is_recycle_tmp_rowset) { int ret = 0; // resource_id -> file_paths std::map> resource_file_paths; @@ -1472,10 +1473,14 @@ int InstanceRecycler::delete_rowset_data(const std::vector> rowsets_delete_by_prefix; for (const auto& rs : rowsets) { - { - std::lock_guard lock(recycled_tablets_mtx_); - if (recycled_tablets_.count(rs.tablet_id())) { - continue; // Rowset data has already been deleted + // Tmp rowsets may not be recycled in recycle_tablet correctly, + // here we can not skip recycling them + if (!is_recycle_tmp_rowset) { + { + std::lock_guard lock(recycled_tablets_mtx_); + if (recycled_tablets_.count(rs.tablet_id())) { + continue; // Rowset data has already been deleted + } } } @@ -1499,7 +1504,7 @@ int InstanceRecycler::delete_rowset_data(const std::vector> index_ids; // default format as v1. InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V1; - + int get_ret = 0; if (rs.has_tablet_schema()) { for (const auto& index : rs.tablet_schema().index()) { if (index.has_index_type() && index.index_type() == IndexType::INVERTED) { @@ -1519,8 +1524,7 @@ int InstanceRecycler::delete_rowset_data(const std::vectorget(rs.index_id(), rs.schema_version(), index_info); + get_ret = inverted_index_id_cache_->get(rs.index_id(), rs.schema_version(), index_info); if (get_ret == 0) { index_format = index_info.first; index_ids = index_info.second; @@ -1562,7 +1566,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector> index_ids; // default format as v1. InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V1; - int get_ret = 0; + int inverted_index_get_ret = 0; if (rs.has_tablet_schema()) { for (const auto& index : rs.tablet_schema().index()) { if (index.has_index_type() && index.index_type() == IndexType::INVERTED) { @@ -1524,11 +1522,12 @@ int InstanceRecycler::delete_rowset_data(const std::vectorget(rs.index_id(), rs.schema_version(), index_info); - if (get_ret == 0) { + inverted_index_get_ret = + inverted_index_id_cache_->get(rs.index_id(), rs.schema_version(), index_info); + if (inverted_index_get_ret == 0) { index_format = index_info.first; index_ids = index_info.second; - } else if (get_ret == 1) { + } else if (inverted_index_get_ret == 1) { // 1. Schema kv not found means tablet has been recycled // Maybe some tablet recycle failed by some bugs // We need to delete again to double check @@ -1566,8 +1565,10 @@ int InstanceRecycler::delete_rowset_data(const std::vector& rowsets, - bool is_recycle_tmp_rowset = false); + RowsetRecyclingState type); /** * Get stage storage info from instance and init StorageVaultAccessor From 114b0b95dc17ff1ac5b2c114289b91ea6720990f Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Fri, 31 Jan 2025 22:24:47 +0800 Subject: [PATCH 3/4] 3 --- cloud/src/recycler/recycler.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index c1c6fc34c8cde2..d8bbfa15fc01e1 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -2034,7 +2034,7 @@ int InstanceRecycler::recycle_rowsets() { rowsets_to_delete.swap(rowsets); worker_pool->submit([&, rowset_keys_to_delete = std::move(rowset_keys_to_delete), rowsets_to_delete = std::move(rowsets_to_delete)]() { - if (delete_rowset_data(rowsets_to_delete) != 0) { + if (delete_rowset_data(rowsets_to_delete, RowsetRecyclingState::FORMAL_ROWSET) != 0) { LOG(WARNING) << "failed to delete rowset data, instance_id=" << instance_id_; return; } @@ -2231,7 +2231,7 @@ int InstanceRecycler::recycle_tmp_rowsets() { tmp_rowset_keys.clear(); tmp_rowsets.clear(); }); - if (delete_rowset_data(tmp_rowsets, true) != 0) { + if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET) != 0) { LOG(WARNING) << "failed to delete tmp rowset data, instance_id=" << instance_id_; return -1; } From 49bc7516b642d61956f25dbcbbd4d7398ae46107 Mon Sep 17 00:00:00 2001 From: Yukang-Lian Date: Sat, 1 Feb 2025 11:49:37 +0800 Subject: [PATCH 4/4] 4 --- cloud/src/recycler/hdfs_accessor.cpp | 1 + cloud/src/recycler/recycler.h | 9 +++++---- cloud/test/recycler_test.cpp | 10 ++++++++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/cloud/src/recycler/hdfs_accessor.cpp b/cloud/src/recycler/hdfs_accessor.cpp index 1999bcfa16543a..024acd0efe7852 100644 --- a/cloud/src/recycler/hdfs_accessor.cpp +++ b/cloud/src/recycler/hdfs_accessor.cpp @@ -356,6 +356,7 @@ std::string extract_parent_path(const std::string& path) { } int HdfsAccessor::init() { + TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed", (int)-1); // TODO(plat1ko): Cache hdfsFS fs_ = HDFSBuilder::create_fs(info_.build_conf()); if (!fs_) { diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 1c5c57179155ee..84e4075e61be8d 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -110,6 +110,11 @@ class Recycler { std::shared_ptr txn_lazy_committer_; }; +enum class RowsetRecyclingState { + FORMAL_ROWSET, + TMP_ROWSET, +}; + class InstanceRecycler { public: explicit InstanceRecycler(std::shared_ptr txn_kv, const InstanceInfoPB& instance, @@ -221,10 +226,6 @@ class InstanceRecycler { int delete_rowset_data(const std::string& resource_id, int64_t tablet_id, const std::string& rowset_id); - enum class RowsetRecyclingState { - FORMAL_ROWSET, - TMP_ROWSET, - }; // return 0 for success otherwise error int delete_rowset_data(const std::vector& rowsets, RowsetRecyclingState type); diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 567d27f5d6f3c4..f47c50023b8219 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -1129,6 +1129,7 @@ TEST(RecyclerTest, recycle_indexes) { j & 1); auto tmp_rowset = create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5, schemas[j % 5], txn_id_base + j); + tmp_rowset.set_resource_id("recycle_indexes"); create_tmp_rowset(txn_kv.get(), accessor.get(), tmp_rowset, j & 1); } for (int j = 0; j < 10; ++j) { @@ -3132,7 +3133,7 @@ TEST(RecyclerTest, delete_rowset_data) { rowset_pbs.emplace_back(std::move(rowset)); } - ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs)); + ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET)); std::unique_ptr list_iter; ASSERT_EQ(0, accessor->list_all(&list_iter)); ASSERT_FALSE(list_iter->has_next()); @@ -3237,7 +3238,7 @@ TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) { rowset_pbs.emplace_back(std::move(rowset)); } - ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs)); + ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET)); std::unique_ptr list_iter; ASSERT_EQ(0, accessor->list_all(&list_iter)); ASSERT_FALSE(list_iter->has_next()); @@ -3352,6 +3353,11 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) { rs = resp->add_rowset_meta(); rs->set_resource_id("success_vault"); }); + sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) { + auto* ret = try_any_cast_ret(args); + ret->first = -1; + ret->second = true; + }); sp->enable_processing(); // succeed to init MockAccessor