Skip to content

Commit

Permalink
[Fix](cloud-mow) Compaciton should release delete bitmap lock when ab…
Browse files Browse the repository at this point in the history
…ort fail (#47963)

pick pr #47766
  • Loading branch information
hust-hhb authored Feb 22, 2025
1 parent 936d21f commit a731643
Show file tree
Hide file tree
Showing 17 changed files with 359 additions and 86 deletions.
19 changes: 6 additions & 13 deletions be/src/cloud/cloud_base_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ bvar::Adder<uint64_t> base_output_size("base_compaction", "output_size");

CloudBaseCompaction::CloudBaseCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet)
: CloudCompactionMixin(engine, tablet,
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {}

CloudBaseCompaction::~CloudBaseCompaction() = default;

Expand Down Expand Up @@ -330,8 +325,7 @@ Status CloudBaseCompaction::modify_rowsets() {
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
int64_t initiator = this->initiator();
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
_input_rowsets, _output_rowset, *_rowid_conversion, compaction_type(),
_stats.merged_rows, initiator, output_rowset_delete_bitmap,
Expand Down Expand Up @@ -403,8 +397,8 @@ Status CloudBaseCompaction::modify_rowsets() {
return Status::OK();
}

void CloudBaseCompaction::garbage_collection() {
CloudCompactionMixin::garbage_collection();
Status CloudBaseCompaction::garbage_collection() {
RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection());
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
idx->set_tablet_id(_tablet->tablet_id());
Expand All @@ -418,9 +412,7 @@ void CloudBaseCompaction::garbage_collection() {
compaction_job->set_type(cloud::TabletCompactionJobPB::BASE);
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
compaction_job->set_delete_bitmap_lock_initiator(initiator);
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
}
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
Expand All @@ -429,6 +421,7 @@ void CloudBaseCompaction::garbage_collection() {
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
return st;
}

void CloudBaseCompaction::do_lease() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_base_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,14 @@ class CloudBaseCompaction : public CloudCompactionMixin {

Status modify_rowsets() override;

void garbage_collection() override;
Status garbage_collection() override;

void _filter_input_rowset();

void build_basic_info();

ReaderType compaction_type() const override { return ReaderType::READER_BASE_COMPACTION; }

std::string _uuid;
int64_t _input_segments = 0;
int64_t _base_compaction_cnt = 0;
int64_t _cumulative_compaction_cnt = 0;
Expand Down
33 changes: 20 additions & 13 deletions be/src/cloud/cloud_cumulative_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,7 @@ bvar::Adder<uint64_t> cumu_output_size("cumu_compaction", "output_size");
CloudCumulativeCompaction::CloudCumulativeCompaction(CloudStorageEngine& engine,
CloudTabletSPtr tablet)
: CloudCompactionMixin(engine, tablet,
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {}

CloudCumulativeCompaction::~CloudCumulativeCompaction() = default;

Expand Down Expand Up @@ -284,8 +279,7 @@ Status CloudCumulativeCompaction::modify_rowsets() {
});

DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
int64_t initiator =
HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
int64_t initiator = this->initiator();
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(cloud_tablet()->calc_delete_bitmap_for_compaction(
Expand All @@ -305,6 +299,13 @@ Status CloudCumulativeCompaction::modify_rowsets() {
compaction_job->set_delete_bitmap_lock_initiator(initiator);
}

DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", {
LOG(INFO) << "CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id"
<< cloud_tablet()->tablet_id();
return Status::InternalError(
"CumulativeCompaction.modify_rowsets.trigger_abort_job_failed for tablet_id {}",
cloud_tablet()->tablet_id());
});
cloud::FinishTabletJobResponse resp;
auto st = _engine.meta_mgr().commit_tablet_job(job, &resp);
if (resp.has_alter_version()) {
Expand Down Expand Up @@ -440,8 +441,8 @@ Status CloudCumulativeCompaction::process_old_version_delete_bitmap() {
return Status::OK();
}

void CloudCumulativeCompaction::garbage_collection() {
CloudCompactionMixin::garbage_collection();
Status CloudCumulativeCompaction::garbage_collection() {
RETURN_IF_ERROR(CloudCompactionMixin::garbage_collection());
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
idx->set_tablet_id(_tablet->tablet_id());
Expand All @@ -455,17 +456,23 @@ void CloudCumulativeCompaction::garbage_collection() {
compaction_job->set_type(cloud::TabletCompactionJobPB::CUMULATIVE);
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator = HashUtil::hash64(_uuid.data(), _uuid.size(), 0) &
std::numeric_limits<int64_t>::max();
compaction_job->set_delete_bitmap_lock_initiator(initiator);
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
}
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.trigger_abort_job_failed", {
LOG(INFO) << "CumulativeCompaction.modify_rowsets.abort_job_failed for tablet_id"
<< cloud_tablet()->tablet_id();
return Status::InternalError(
"CumulativeCompaction.modify_rowsets.abort_job_failed for tablet_id {}",
cloud_tablet()->tablet_id());
});
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
LOG_WARNING("failed to abort compaction job")
.tag("job_id", _uuid)
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
return st;
}

Status CloudCumulativeCompaction::pick_rowsets_to_compact() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_cumulative_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,14 @@ class CloudCumulativeCompaction : public CloudCompactionMixin {

Status modify_rowsets() override;

void garbage_collection() override;
Status garbage_collection() override;

void update_cumulative_point();

Status process_old_version_delete_bitmap();

ReaderType compaction_type() const override { return ReaderType::READER_CUMULATIVE_COMPACTION; }

std::string _uuid;
int64_t _input_segments = 0;
int64_t _max_conflict_version = 0;
// Snapshot values when pick input rowsets
Expand Down
20 changes: 6 additions & 14 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ bvar::Adder<uint64_t> full_output_size("full_compaction", "output_size");

CloudFullCompaction::CloudFullCompaction(CloudStorageEngine& engine, CloudTabletSPtr tablet)
: CloudCompactionMixin(engine, tablet,
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}
"BaseCompaction:" + std::to_string(tablet->tablet_id())) {}

CloudFullCompaction::~CloudFullCompaction() = default;

Expand Down Expand Up @@ -227,10 +222,8 @@ Status CloudFullCompaction::modify_rowsets() {
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator =
boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max();
RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(initiator));
compaction_job->set_delete_bitmap_lock_initiator(initiator);
RETURN_IF_ERROR(_cloud_full_compaction_update_delete_bitmap(this->initiator()));
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
}

cloud::FinishTabletJobResponse resp;
Expand Down Expand Up @@ -271,7 +264,7 @@ Status CloudFullCompaction::modify_rowsets() {
return Status::OK();
}

void CloudFullCompaction::garbage_collection() {
Status CloudFullCompaction::garbage_collection() {
//file_cache_garbage_collection();
cloud::TabletJobInfoPB job;
auto idx = job.mutable_idx();
Expand All @@ -286,9 +279,7 @@ void CloudFullCompaction::garbage_collection() {
compaction_job->set_type(cloud::TabletCompactionJobPB::FULL);
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
int64_t initiator =
boost::hash_range(_uuid.begin(), _uuid.end()) & std::numeric_limits<int64_t>::max();
compaction_job->set_delete_bitmap_lock_initiator(initiator);
compaction_job->set_delete_bitmap_lock_initiator(this->initiator());
}
auto st = _engine.meta_mgr().abort_tablet_job(job);
if (!st.ok()) {
Expand All @@ -297,6 +288,7 @@ void CloudFullCompaction::garbage_collection() {
.tag("tablet_id", _tablet->tablet_id())
.error(st);
}
return st;
}

void CloudFullCompaction::do_lease() {
Expand Down
3 changes: 1 addition & 2 deletions be/src/cloud/cloud_full_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class CloudFullCompaction : public CloudCompactionMixin {
std::string_view compaction_name() const override { return "CloudFullCompaction"; }

Status modify_rowsets() override;
void garbage_collection() override;
Status garbage_collection() override;

private:
Status _cloud_full_compaction_update_delete_bitmap(int64_t initiator);
Expand All @@ -52,7 +52,6 @@ class CloudFullCompaction : public CloudCompactionMixin {

ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; }

std::string _uuid;
int64_t _input_segments = 0;
// Snapshot values when pick input rowsets
int64_t _base_compaction_cnt = 0;
Expand Down
17 changes: 9 additions & 8 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1219,23 +1219,24 @@ Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, in
return st;
}

Status CloudMetaMgr::remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator) {
VLOG_DEBUG << "remove_delete_bitmap_update_lock , tablet_id: " << tablet.tablet_id()
<< ",lock_id:" << lock_id;
void CloudMetaMgr::remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id,
int64_t initiator, int64_t tablet_id) {
LOG(INFO) << "remove_delete_bitmap_update_lock ,table_id: " << table_id
<< ",lock_id:" << lock_id << ",initiator:" << initiator << ",tablet_id:" << tablet_id;
RemoveDeleteBitmapUpdateLockRequest req;
RemoveDeleteBitmapUpdateLockResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
req.set_tablet_id(tablet.tablet_id());
req.set_table_id(table_id);
req.set_tablet_id(tablet_id);
req.set_lock_id(lock_id);
req.set_initiator(initiator);
auto st = retry_rpc("remove delete bitmap update lock", req, &res,
&MetaService_Stub::remove_delete_bitmap_update_lock);
if (!st.ok()) {
LOG(WARNING) << "remove delete bitmap update lock fail,tablet_id=" << tablet.tablet_id()
<< " lock_id=" << lock_id << " st=" << st.to_string();
LOG(WARNING) << "remove delete bitmap update lock fail,table_id=" << table_id
<< ",tablet_id=" << tablet_id << ",lock_id=" << lock_id
<< ",st=" << st.to_string();
}
return st;
}

Status CloudMetaMgr::remove_old_version_delete_bitmap(
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class CloudMetaMgr {
Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator);

Status remove_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator);
void remove_delete_bitmap_update_lock(int64_t table_id, int64_t lock_id, int64_t initiator,
int64_t tablet_id);

Status remove_old_version_delete_bitmap(
int64_t tablet_id,
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "common/logging.h"
#include "io/cache/block_file_cache_downloader.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/compaction.h"
#include "olap/cumulative_compaction_time_series_policy.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
Expand All @@ -54,7 +55,6 @@ namespace doris {
#include "common/compile_check_begin.h"
using namespace ErrorCode;

static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
static constexpr int LOAD_INITIATOR_ID = -1;

CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ DEFINE_mInt32(sync_load_for_tablets_thread, "32");

DEFINE_mBool(enable_new_tablet_do_compaction, "false");

DEFINE_Int32(delete_bitmap_lock_expiration_seconds, "10");
DEFINE_mInt32(delete_bitmap_lock_expiration_seconds, "10");

DEFINE_Bool(enable_cloud_txn_lazy_commit, "false");

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ DECLARE_mBool(save_load_error_log_to_s3);
// the theads which sync the datas which loaded in other clusters
DECLARE_mInt32(sync_load_for_tablets_thread);

DECLARE_Int32(delete_bitmap_lock_expiration_seconds);
DECLARE_mInt32(delete_bitmap_lock_expiration_seconds);

// enable large txn lazy commit in meta-service `commit_txn`
DECLARE_mBool(enable_cloud_txn_lazy_commit);
Expand Down
31 changes: 26 additions & 5 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,12 @@ int64_t CloudCompactionMixin::get_compaction_permits() {

CloudCompactionMixin::CloudCompactionMixin(CloudStorageEngine& engine, CloudTabletSPtr tablet,
const std::string& label)
: Compaction(tablet, label), _engine(engine) {}
: Compaction(tablet, label), _engine(engine) {
auto uuid = UUIDGenerator::instance()->next_uuid();
std::stringstream ss;
ss << uuid;
_uuid = ss.str();
}

Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
OlapStopWatch watch;
Expand Down Expand Up @@ -1298,11 +1303,26 @@ Status CloudCompactionMixin::execute_compact_impl(int64_t permits) {
return Status::OK();
}

int64_t CloudCompactionMixin::initiator() const {
return HashUtil::hash64(_uuid.data(), _uuid.size(), 0) & std::numeric_limits<int64_t>::max();
}

Status CloudCompactionMixin::execute_compact() {
TEST_INJECTION_POINT("Compaction::do_compaction");
int64_t permits = get_compaction_permits();
HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(execute_compact_impl(permits),
[&](const doris::Exception& ex) { garbage_collection(); });
HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
execute_compact_impl(permits), [&](const doris::Exception& ex) {
auto st = garbage_collection();
if (!st.ok() && initiator() != INVALID_COMPACTION_INITIATOR_ID) {
// if compaction fail, be will try to abort compaction, and delete bitmap lock
// will release if abort job successfully, but if abort failed, delete bitmap
// lock will not release, in this situation, be need to send this rpc to ms
// to try to release delete bitmap lock.
_engine.meta_mgr().remove_delete_bitmap_update_lock(
_tablet->table_id(), COMPACTION_DELETE_BITMAP_LOCK_ID, initiator(),
_tablet->tablet_id());
}
});
_load_segment_to_cache();
return Status::OK();
}
Expand Down Expand Up @@ -1352,9 +1372,9 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext&
return Status::OK();
}

void CloudCompactionMixin::garbage_collection() {
Status CloudCompactionMixin::garbage_collection() {
if (!config::enable_file_cache) {
return;
return Status::OK();
}
if (_output_rs_writer) {
auto* beta_rowset_writer = dynamic_cast<BaseBetaRowsetWriter*>(_output_rs_writer.get());
Expand All @@ -1365,6 +1385,7 @@ void CloudCompactionMixin::garbage_collection() {
file_cache->remove_if_cached_async(file_key);
}
}
return Status::OK();
}

} // namespace doris
Loading

0 comments on commit a731643

Please sign in to comment.