Skip to content

Commit

Permalink
[fix](restore) Add a local snapshot lock to protect snapshot dir (#47279
Browse files Browse the repository at this point in the history
)

To avoid concurrent modification of a snapshot dir.
  • Loading branch information
w41ter committed Feb 7, 2025
1 parent 814e4d7 commit 2168a44
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 52 deletions.
33 changes: 32 additions & 1 deletion be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ using std::vector;
namespace doris {
using namespace ErrorCode;

LocalSnapshotLockGuard LocalSnapshotLock::acquire(const std::string& path) {
std::unique_lock<std::mutex> l(_lock);
auto& ctx = _local_snapshot_contexts[path];
while (ctx._is_locked) {
ctx._waiting_count++;
ctx._cv.wait(l);
ctx._waiting_count--;
}

ctx._is_locked = true;
return {path};
}

void LocalSnapshotLock::release(const std::string& path) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _local_snapshot_contexts.find(path);
if (iter == _local_snapshot_contexts.end()) {
return;
}

auto& ctx = iter->second;
ctx._is_locked = false;
if (ctx._waiting_count > 0) {
ctx._cv.notify_one();
} else {
_local_snapshot_contexts.erase(iter);
}
}

SnapshotManager::SnapshotManager(StorageEngine& engine) : _engine(engine) {
_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "SnapshotManager");
Expand Down Expand Up @@ -118,6 +147,8 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s
}

Status SnapshotManager::release_snapshot(const string& snapshot_path) {
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path);

// If the requested snapshot_path is located in the root/snapshot folder, it is considered legal and can be deleted.
// Otherwise, it is considered an illegal request and returns an error result.
SCOPED_ATTACH_TASK(_mem_tracker);
Expand Down Expand Up @@ -448,7 +479,7 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
}
}
// be would definitely set it as true no matter has missed version or not
// but it would take no effets on the following range loop
// but it would take no effects on the following range loop
if (!is_single_rowset_clone && request.__isset.missing_version) {
for (int64_t missed_version : request.missing_version) {
Version version = {missed_version, missed_version};
Expand Down
51 changes: 51 additions & 0 deletions be/src/olap/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

#pragma once

#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand All @@ -33,6 +35,55 @@ struct RowsetId;
class StorageEngine;
class MemTrackerLimiter;

class LocalSnapshotLockGuard;

// A simple lock to protect the local snapshot path.
class LocalSnapshotLock {
friend class LocalSnapshotLockGuard;

public:
LocalSnapshotLock() = default;
~LocalSnapshotLock() = default;
LocalSnapshotLock(const LocalSnapshotLock&) = delete;
LocalSnapshotLock& operator=(const LocalSnapshotLock&) = delete;

static LocalSnapshotLock& instance() {
static LocalSnapshotLock instance;
return instance;
}

// Acquire the lock for the specified path. It will block if the lock is already held by another.
LocalSnapshotLockGuard acquire(const std::string& path);

private:
void release(const std::string& path);

class LocalSnapshotContext {
public:
bool _is_locked = false;
size_t _waiting_count = 0;
std::condition_variable _cv;

LocalSnapshotContext() = default;
LocalSnapshotContext(const LocalSnapshotContext&) = delete;
LocalSnapshotContext& operator=(const LocalSnapshotContext&) = delete;
};

std::mutex _lock;
std::unordered_map<std::string, LocalSnapshotContext> _local_snapshot_contexts;
};

class LocalSnapshotLockGuard {
public:
LocalSnapshotLockGuard(std::string path) : _snapshot_path(std::move(path)) {}
LocalSnapshotLockGuard(const LocalSnapshotLockGuard&) = delete;
LocalSnapshotLockGuard& operator=(const LocalSnapshotLockGuard&) = delete;
~LocalSnapshotLockGuard() { LocalSnapshotLock::instance().release(_snapshot_path); }

private:
std::string _snapshot_path;
};

class SnapshotManager {
public:
SnapshotManager(StorageEngine& engine);
Expand Down
100 changes: 49 additions & 51 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <gen_cpp/Types_types.h>

#include <algorithm>
#include <condition_variable>
#include <cstring>
#include <filesystem>
#include <istream>
Expand Down Expand Up @@ -146,6 +147,9 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
const std::string& src_path = iter.first;
const std::string& dest_path = iter.second;

// Take a lock to protect the local snapshot path.
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(src_path);

int64_t tablet_id = 0;
int32_t schema_hash = 0;
RETURN_IF_ERROR(
Expand Down Expand Up @@ -242,6 +246,9 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
const std::string& remote_path = iter.first;
const std::string& local_path = iter.second;

// Take a lock to protect the local snapshot path.
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path);

int64_t local_tablet_id = 0;
int32_t schema_hash = 0;
RETURN_IF_ERROR(_get_tablet_id_and_schema_hash_from_file_path(local_path, &local_tablet_id,
Expand Down Expand Up @@ -397,8 +404,6 @@ Status SnapshotLoader::download(const std::map<std::string, std::string>& src_to
Status SnapshotLoader::remote_http_download(
const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
std::vector<int64_t>* downloaded_tablet_ids) {
LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, task id: {}", _job_id,
_task_id);
constexpr uint32_t kListRemoteFileTimeout = 15;
constexpr uint32_t kDownloadFileMaxRetry = 3;
constexpr uint32_t kGetLengthTimeout = 10;
Expand All @@ -408,35 +413,39 @@ Status SnapshotLoader::remote_http_download(
RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, TTaskType::type::DOWNLOAD));
Status status = Status::OK();

// Step before, validate all remote

// Step 1: Validate local tablet snapshot paths
int report_counter = 0;
int finished_num = 0;
int total_num = remote_tablet_snapshots.size();
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& path = remote_tablet_snapshot.local_snapshot_path;
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
LOG(INFO) << fmt::format(
"download snapshots via http. job: {}, task id: {}, local dir: {}, remote dir: {}",
_job_id, _task_id, local_path, remote_path);

// Take a lock to protect the local snapshot path.
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(local_path);

// Step 1: Validate local tablet snapshot paths
bool res = true;
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, &res));
RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(local_path, &res));
if (!res) {
std::stringstream ss;
auto err_msg =
fmt::format("snapshot path is not directory or does not exist: {}", path);
fmt::format("snapshot path is not directory or does not exist: {}", local_path);
LOG(WARNING) << err_msg;
return Status::RuntimeError(err_msg);
}
}

// Step 2: get all local files
struct LocalFileStat {
uint64_t size;
std::string md5;
};
std::unordered_map<std::string, std::unordered_map<std::string, LocalFileStat>> local_files_map;
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
std::vector<std::string> local_files;
RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &local_files));

auto& local_filestat = local_files_map[local_path];
for (auto& local_file : local_files) {
// Step 2: get all local files
struct LocalFileStat {
uint64_t size;
std::string md5;
};
std::unordered_map<std::string, LocalFileStat> local_files;
std::vector<std::string> existing_files;
RETURN_IF_ERROR(_get_existing_files_from_local(local_path, &existing_files));
for (auto& local_file : existing_files) {
// add file size
std::string local_file_path = local_path + "/" + local_file;
std::error_code ec;
Expand All @@ -453,27 +462,20 @@ Status SnapshotLoader::remote_http_download(
<< " md5sum: " << status.to_string();
return status;
}
local_filestat[local_file] = {local_file_size, md5};
local_files[local_file] = {local_file_size, md5};
}
}

// Step 3: Validate remote tablet snapshot paths && remote files map
// key is remote snapshot paths, value is filelist
// get all these use http download action
// http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
int report_counter = 0;
int total_num = remote_tablet_snapshots.size();
int finished_num = 0;
struct RemoteFileStat {
std::string url;
std::string md5;
uint64_t size;
};
std::unordered_map<std::string, std::unordered_map<std::string, RemoteFileStat>>
remote_files_map;
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
auto& remote_files = remote_files_map[remote_path];
existing_files.clear();

// Step 3: Validate remote tablet snapshot paths && remote files map
// key is remote snapshot paths, value is filelist
// get all these use http download action
// http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
struct RemoteFileStat {
std::string url;
std::string md5;
uint64_t size;
};
std::unordered_map<std::string, RemoteFileStat> remote_files;
const auto& token = remote_tablet_snapshot.remote_token;
const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;

Expand Down Expand Up @@ -516,19 +518,11 @@ Status SnapshotLoader::remote_http_download(

remote_files[filename] = RemoteFileStat {remote_file_url, file_md5, file_size};
}
}

// Step 4: Compare local and remote files && get all need download files
for (const auto& remote_tablet_snapshot : remote_tablet_snapshots) {
// Step 4: Compare local and remote files && get all need download files
RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));

const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
auto& remote_files = remote_files_map[remote_path];
auto& local_files = local_files_map[local_path];
auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;

// get all need download files
std::vector<std::string> need_download_files;
for (const auto& [remote_file, remote_filestat] : remote_files) {
Expand Down Expand Up @@ -656,6 +650,7 @@ Status SnapshotLoader::remote_http_download(
if (total_time_ms > 0) {
copy_rate = total_file_size / ((double)total_time_ms) / 1000;
}
auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
LOG(INFO) << fmt::format(
"succeed to copy remote tablet {} to local tablet {}, total file size: {} B, cost: "
"{} ms, rate: {} MB/s",
Expand Down Expand Up @@ -705,6 +700,9 @@ Status SnapshotLoader::remote_http_download(
// MUST hold tablet's header lock, push lock, cumulative lock and base compaction lock
Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr tablet,
bool overwrite) {
// Take a lock to protect the local snapshot path.
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path);

auto tablet_path = tablet->tablet_path();
auto store_path = tablet->data_dir()->path();
LOG(INFO) << "begin to move snapshot files. from: " << snapshot_path << ", to: " << tablet_path
Expand Down

0 comments on commit 2168a44

Please sign in to comment.