Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement](recycler) Add some UT for recycler #47739

Merged
merged 2 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ void Recycler::recycle_callback() {
recycling_instance_map_.erase(instance_id);
}
auto elpased_ms =
ctime_ms -
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() -
ctime_ms;
LOG_INFO("finish recycle instance")
.tag("instance_id", instance_id)
.tag("cost_ms", elpased_ms);
Expand Down Expand Up @@ -1595,6 +1595,8 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
DCHECK(accessor_map_.count(*rid))
<< "uninitilized accessor, instance_id=" << instance_id_
<< " resource_id=" << resource_id << " path[0]=" << (*paths)[0];
TEST_SYNC_POINT_CALLBACK("InstanceRecycler::delete_rowset_data.no_resource_id",
&accessor_map_);
if (!accessor_map_.contains(*rid)) {
LOG_WARNING("delete rowset data accessor_map_ does not contains resouce id")
.tag("resource_id", resource_id)
Expand Down
2 changes: 2 additions & 0 deletions cloud/src/recycler/s3_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/S3Client.h>
#include <bvar/reducer.h>
#include <cpp/sync_point.h>
#include <gen_cpp/cloud.pb.h>

#include <algorithm>
Expand Down Expand Up @@ -224,6 +225,7 @@ std::string S3Accessor::to_uri(const std::string& relative_path) const {
}

int S3Accessor::create(S3Conf conf, std::shared_ptr<S3Accessor>* accessor) {
TEST_SYNC_POINT_RETURN_WITH_VALUE("S3Accessor::init.s3_init_failed", (int)-1);
switch (conf.provider) {
case S3Conf::GCS:
*accessor = std::make_shared<GcsAccessor>(std::move(conf));
Expand Down
4 changes: 4 additions & 0 deletions cloud/test/hdfs_accessor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ TEST(HdfsAccessorTest, delete_prefix) {
put_and_verify("data/20000/1_0.dat");
put_and_verify("data111/10000/1_0.dat");

ret = accessor.delete_prefix("nonexist");
EXPECT_EQ(ret, -1);
ret = accessor.delete_prefix("/");
EXPECT_EQ(ret, -1);
ret = accessor.delete_prefix("data/10000/1_");
EXPECT_EQ(ret, 0);
ret = accessor.delete_prefix("data/10000/2_");
Expand Down
310 changes: 310 additions & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3406,6 +3406,216 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) {
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 1);
}

TEST(RecyclerTest, recycle_tablet_without_resource_id) {
auto* sp = SyncPoint::get_instance();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
});

auto txn_kv = std::make_shared<MemTxnKv>();
EXPECT_EQ(txn_kv->init(), 0);
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string key;
std::string val;

InstanceKeyInfo key_info {"test_instance"};
instance_key(key_info, &key);
InstanceInfoPB instance;
instance.set_instance_id("GetObjStoreInfoTestInstance");

auto accessor = std::make_shared<MockAccessor>();
EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
sp->set_call_back(
"InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) {
auto* map = try_any_cast<
std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>(
args[0]);
auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
if (vault->name() == "test_success_hdfs_vault") {
map->emplace(vault->id(), accessor);
}
});
sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", [](auto&& args) {
auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
auto* rs = resp->add_rowset_meta();
EXPECT_EQ(rs->has_resource_id(), false);
});
sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
auto* ret = try_any_cast_ret<int>(args);
ret->first = -1;
ret->second = true;
});
sp->enable_processing();

// succeed to init MockAccessor
{
HdfsBuildConf hdfs_build_conf;
StorageVaultPB vault;
hdfs_build_conf.set_fs_name("fs_name");
hdfs_build_conf.set_user("root");
HdfsVaultInfo hdfs_info;
hdfs_info.set_prefix("root_path");
hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
vault.set_name("test_success_hdfs_vault");
vault.set_id("success_vault");
instance.add_storage_vault_names(vault.name());
instance.add_resource_ids(vault.id());
instance.set_instance_id("GetObjStoreInfoTestInstance");
txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString());
}

val = instance.SerializeAsString();
txn->put(key, val);
EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);

InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
EXPECT_EQ(recycler.init(), 0);
EXPECT_EQ(recycler.accessor_map_.size(), 1);

// useful mock accessor
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0);

// recycle tablet will fail because unuseful obj accessor can not connectted
EXPECT_EQ(recycler.recycle_tablet(0), -1);
// no resource id, cannot recycle
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0);
}

TEST(RecyclerTest, recycle_tablet_with_wrong_resource_id) {
auto* sp = SyncPoint::get_instance();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
});

auto txn_kv = std::make_shared<MemTxnKv>();
EXPECT_EQ(txn_kv->init(), 0);
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string key;
std::string val;

InstanceKeyInfo key_info {"test_instance"};
instance_key(key_info, &key);
InstanceInfoPB instance;
instance.set_instance_id("GetObjStoreInfoTestInstance");

auto accessor = std::make_shared<MockAccessor>();
EXPECT_EQ(accessor->put_file("data/0/test.csv", ""), 0);
sp->set_call_back(
"InstanceRecycler::init_storage_vault_accessors.mock_vault", [&accessor](auto&& args) {
auto* map = try_any_cast<
std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>(
args[0]);
auto* vault = try_any_cast<StorageVaultPB*>(args[1]);
if (vault->name() == "test_success_hdfs_vault") {
map->emplace(vault->id(), accessor);
}
});
sp->set_call_back("InstanceRecycler::recycle_tablet.create_rowset_meta", [](auto&& args) {
auto* resp = try_any_cast<GetRowsetResponse*>(args[0]);
auto* rs = resp->add_rowset_meta();
rs->set_resource_id("no_id");
});
sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
auto* ret = try_any_cast_ret<int>(args);
ret->first = -1;
ret->second = true;
});
sp->enable_processing();

// succeed to init MockAccessor
{
HdfsBuildConf hdfs_build_conf;
StorageVaultPB vault;
hdfs_build_conf.set_fs_name("fs_name");
hdfs_build_conf.set_user("root");
HdfsVaultInfo hdfs_info;
hdfs_info.set_prefix("root_path");
hdfs_info.mutable_build_conf()->MergeFrom(hdfs_build_conf);
vault.mutable_hdfs_info()->MergeFrom(hdfs_info);
vault.set_name("test_success_hdfs_vault");
vault.set_id("success_vault");
instance.add_storage_vault_names(vault.name());
instance.add_resource_ids(vault.id());
instance.set_instance_id("GetObjStoreInfoTestInstance");
txn->put(storage_vault_key({instance.instance_id(), "4"}), vault.SerializeAsString());
}

val = instance.SerializeAsString();
txn->put(key, val);
EXPECT_EQ(txn->commit(), TxnErrorCode::TXN_OK);

EXPECT_EQ(accessor->exists("data/0/test.csv"), 0);

InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
EXPECT_EQ(recycler.init(), 0);
EXPECT_EQ(recycler.accessor_map_.size(), 1);

// useful mock accessor
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0);

// recycle tablet will fail because unuseful obj accessor can not connectted
EXPECT_EQ(recycler.recycle_tablet(0), -1);
// no resource id, cannot recycle
EXPECT_EQ(recycler.accessor_map_.at("success_vault")->exists("data/0/test.csv"), 0);
}

TEST(RecyclerTest, init_all_vault_accessors_failed_test) {
auto* sp = SyncPoint::get_instance();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
});

auto txn_kv = std::make_shared<MemTxnKv>();
EXPECT_EQ(txn_kv->init(), 0);
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string key;
std::string val;

InstanceKeyInfo key_info {"test_instance"};
instance_key(key_info, &key);
InstanceInfoPB instance;
instance.set_instance_id("GetObjStoreInfoTestInstance");
// failed to init because S3Conf::from_obj_store_info() fails
{
ObjectStoreInfoPB obj_info;
StorageVaultPB vault;
obj_info.set_id("id");
obj_info.set_ak("ak");
obj_info.set_sk("sk");
vault.mutable_obj_info()->MergeFrom(obj_info);
vault.set_name("test_failed_s3_vault");
vault.set_id("failed_s3");
instance.add_storage_vault_names(vault.name());
instance.add_resource_ids(vault.id());
txn->put(storage_vault_key({instance.instance_id(), "1"}), vault.SerializeAsString());
}

sp->set_call_back("S3Accessor::init.s3_init_failed", [](auto&& args) {
auto* ret = try_any_cast_ret<int>(args);
ret->first = -1;
ret->second = true;
});
sp->enable_processing();

InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
EXPECT_EQ(recycler.init(), -2);
}

TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v1) {
auto* sp = SyncPoint::get_instance();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) {
Expand Down Expand Up @@ -3563,4 +3773,104 @@ TEST(RecyclerTest, delete_tmp_rowset_data_with_idx_v2) {
}
}

TEST(RecyclerTest, delete_tmp_rowset_without_resource_id) {
auto* sp = SyncPoint::get_instance();
std::unique_ptr<int, std::function<void(int*)>> defer((int*)0x01, [&sp](int*) {
sp->clear_all_call_backs();
sp->clear_trace();
sp->disable_processing();
});
auto txn_kv = std::make_shared<MemTxnKv>();
ASSERT_EQ(txn_kv->init(), 0);

InstanceInfoPB instance;
instance.set_instance_id(instance_id);
auto obj_info = instance.add_obj_info();
obj_info->set_id("delete_tmp_rowset_data_with_idx_v2");
obj_info->set_ak(config::test_s3_ak);
obj_info->set_sk(config::test_s3_sk);
obj_info->set_endpoint(config::test_s3_endpoint);
obj_info->set_region(config::test_s3_region);
obj_info->set_bucket(config::test_s3_bucket);
obj_info->set_prefix("delete_tmp_rowset_data_with_idx_v2");

doris::TabletSchemaCloudPB schema;
schema.set_schema_version(1);
auto index = schema.add_index();
index->set_index_id(1);
index->set_index_type(IndexType::INVERTED);

sp->set_call_back("InstanceRecycler::delete_rowset_data.tmp_rowset", [](auto&& args) {
auto* ret = try_any_cast<int*>(args[0]);
*ret = 1;
});
sp->set_call_back("InstanceRecycler::delete_rowset_data.no_resource_id", [](auto&& args) {
auto* map = try_any_cast<
std::unordered_map<std::string, std::shared_ptr<StorageVaultAccessor>>*>(args[0]);
map->erase("no_resource_id");
;
});
sp->enable_processing();

{
InstanceRecycler recycler(txn_kv, instance, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;
std::vector<doris::RowsetMetaCloudPB> rowset_pbs;
doris::RowsetMetaCloudPB rowset;
rowset.set_rowset_id(0); // useless but required
rowset.set_rowset_id_v2("1");
rowset.set_num_segments(1);
rowset.set_tablet_id(10000);
rowset.set_index_id(10001);
rowset.set_resource_id("delete_tmp_rowset_data_with_idx_v2");
rowset.set_schema_version(schema.schema_version());
rowset.mutable_tablet_schema()->CopyFrom(schema);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
rowset.clear_tablet_schema();
rowset_pbs.emplace_back(rowset);

rowset.set_rowset_id(0); // useless but required
rowset.set_rowset_id_v2("2");
rowset.set_num_segments(1);
rowset.set_tablet_id(20000);
rowset.set_index_id(20001);
rowset.set_resource_id("no_resource_id");
rowset.set_schema_version(schema.schema_version());
rowset.mutable_tablet_schema()->CopyFrom(schema);
create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, 1, true);
rowset.clear_tablet_schema();
rowset_pbs.emplace_back(rowset);

std::unordered_set<std::string> list_files;
std::unique_ptr<ListIterator> iter;
EXPECT_EQ(accessor->list_all(&iter), 0);
EXPECT_TRUE(iter->has_next());
list_files.clear();
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(file->path);
}
EXPECT_EQ(list_files.size(), 4);
// before delete tmp rowset, segment file and idx v2 exist
EXPECT_TRUE(list_files.contains("data/10000/1_0.dat"));
EXPECT_TRUE(list_files.contains("data/10000/1_0.idx"));
EXPECT_TRUE(list_files.contains("data/20000/2_0.dat"));
EXPECT_TRUE(list_files.contains("data/20000/2_0.idx"));

EXPECT_EQ(-1, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::TMP_ROWSET));
list_files.clear();
iter.reset();
EXPECT_EQ(accessor->list_all(&iter), 0);
EXPECT_TRUE(iter->has_next());
for (auto file = iter->next(); file.has_value(); file = iter->next()) {
list_files.insert(file->path);
}
EXPECT_TRUE(list_files.contains("data/20000/2_0.dat"));
EXPECT_TRUE(list_files.contains("data/20000/2_0.idx"));
// after delete tmp rowset, for valit resource id rowset, both file and idx v2 are removed
EXPECT_EQ(list_files.size(), 2);
}
}

} // namespace doris::cloud
Loading