Skip to content

Commit

Permalink
refactor(bulk_load): refactor get_app for bulk_load_service (#962)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Nov 12, 2021
1 parent 1be6905 commit 9546c46
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 100 deletions.
163 changes: 63 additions & 100 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,19 @@ void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc)
auto &response = rpc.response();
response.err = ERR_OK;

std::shared_ptr<app_state> app;
{
zauto_read_lock l(app_lock());

app = _state->get_app(request.app_name);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app({}) is not existed or not available", request.app_name);
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.hint_msg = fmt::format(
"app {}", response.err == ERR_APP_NOT_EXIST ? "not existed" : "dropped");
return;
}

if (app->is_bulk_loading) {
derror_f("app({}) is already executing bulk load, please wait", app->app_name);
response.err = ERR_BUSY;
response.hint_msg = "app is already executing bulk load";
return;
}
std::shared_ptr<app_state> app = get_app(request.app_name);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.hint_msg = fmt::format(
"app {} is ", response.err == ERR_APP_NOT_EXIST ? "not existed" : "not available");
derror_f("{}", response.hint_msg);
return;
}
if (app->is_bulk_loading) {
response.err = ERR_BUSY;
response.hint_msg = fmt::format("app({}) is already executing bulk load", app->app_name);
derror_f("{}", response.hint_msg);
return;
}

std::string hint_msg;
Expand Down Expand Up @@ -293,22 +287,15 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
{
FAIL_POINT_INJECT_F("meta_bulk_load_partition_bulk_load", [](dsn::string_view) {});

rpc_address primary_addr;
ballot b;
{
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, set bulk load failed",
app_name,
pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}
primary_addr = app->partitions[pid.get_partition_index()].primary;
b = app->partitions[pid.get_partition_index()].ballot;
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f(
"app(name={}, id={}) is not existed, set bulk load failed", app_name, pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}

rpc_address primary_addr = app->partitions[pid.get_partition_index()].primary;
if (primary_addr.is_invalid()) {
dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(LPC_META_STATE_NORMAL,
Expand All @@ -328,7 +315,7 @@ void bulk_load_service::partition_bulk_load(const std::string &app_name, const g
req->remote_provider_name = ainfo.file_provider_type;
req->cluster_name = ainfo.cluster_name;
req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid);
req->ballot = b;
req->ballot = app->partitions[pid.get_partition_index()].ballot;
req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlocked(pid);
req->remote_root_path = ainfo.remote_root_path;

Expand Down Expand Up @@ -405,19 +392,14 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
}

// response.err = ERR_OK
ballot current_ballot;
{
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, set bulk load failed",
app_name,
pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}
current_ballot = app->partitions[pid.get_partition_index()].ballot;
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f(
"app(name={}, id={}) is not existed, set bulk load failed", app_name, pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}
ballot current_ballot = app->partitions[pid.get_partition_index()].ballot;
if (request.ballot < current_ballot) {
dwarn_f("receive out-date response from node({}), app({}), partition({}), request ballot = "
"{}, current ballot= {}",
Expand Down Expand Up @@ -675,18 +657,14 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon
count = --_apps_in_progress_count[pid.get_app_id()];
}
if (count == 0) {
std::shared_ptr<app_state> app;
{
zauto_read_lock l(app_lock());
app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, remove bulk load dir on remote "
"storage",
app_name,
pid.get_app_id());
remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name);
return;
}
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, remove bulk load dir on remote "
"storage",
app_name,
pid.get_app_id());
remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name);
return;
}
ddebug_f("app({}) all partitions cleanup bulk load context", app_name);
remove_bulk_load_dir_on_remote_storage(std::move(app), true);
Expand Down Expand Up @@ -1068,20 +1046,15 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
return;
}

rpc_address primary_addr;
{
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, set bulk load failed",
app_name,
pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}
primary_addr = app->partitions[pid.get_partition_index()].primary;
std::shared_ptr<app_state> app = get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f(
"app(name={}, id={}) is not existed, set bulk load failed", app_name, pid.get_app_id());
handle_app_unavailable(pid.get_app_id(), app_name);
return;
}

rpc_address primary_addr = app->partitions[pid.get_partition_index()].primary;
if (primary_addr.is_invalid()) {
dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(LPC_META_STATE_NORMAL,
Expand Down Expand Up @@ -1300,7 +1273,7 @@ void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc)
auto &response = rpc.response();
response.err = ERR_OK;

std::shared_ptr<app_state> app = _state->get_app(app_name);
std::shared_ptr<app_state> app = get_app(app_name);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app({}) is not existed or not available", app_name);
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
Expand Down Expand Up @@ -1380,34 +1353,28 @@ void bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
response.err = ERR_OK;
response.app_name = app_name;

int32_t app_id, partition_count, max_replica_count;
{
zauto_read_lock l(app_lock());
std::shared_ptr<app_state> app = _state->get_app(app_name);

if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
auto hint_msg = fmt::format("app({}) is not existed or not available", app_name);
derror_f("{}", hint_msg);
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.__set_hint_msg(hint_msg);
return;
}

if (!app->is_bulk_loading) {
auto hint_msg = fmt::format("app({}) is not during bulk load", app_name);
derror_f("{}", hint_msg);
response.err = ERR_INVALID_STATE;
response.__set_hint_msg(hint_msg);
return;
}
std::shared_ptr<app_state> app = get_app(app_name);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
auto hint_msg = fmt::format("app({}) is not existed or not available", app_name);
derror_f("{}", hint_msg);
response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED;
response.__set_hint_msg(hint_msg);
return;
}

app_id = app->app_id;
partition_count = app->partition_count;
max_replica_count = app->max_replica_count;
if (!app->is_bulk_loading) {
auto hint_msg = fmt::format("app({}) is not during bulk load", app_name);
derror_f("{}", hint_msg);
response.err = ERR_INVALID_STATE;
response.__set_hint_msg(hint_msg);
return;
}

int32_t app_id = app->app_id;
int32_t partition_count = app->partition_count;

zauto_read_lock l(_lock);
response.max_replica_count = max_replica_count;
response.max_replica_count = app->max_replica_count;
response.app_status = get_app_bulk_load_status_unlocked(app_id);

response.partitions_status.resize(partition_count);
Expand Down Expand Up @@ -1528,11 +1495,7 @@ void bulk_load_service::try_to_continue_app_bulk_load(
const app_bulk_load_info &ainfo,
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map)
{
std::shared_ptr<app_state> app;
{
zauto_read_lock l(app_lock());
app = _state->get_app(ainfo.app_name);
}
std::shared_ptr<app_state> app = get_app(ainfo.app_name);
// if app is not available, remove bulk load dir
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f(
Expand Down
12 changes: 12 additions & 0 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ class bulk_load_service
///
/// helper functions
///
inline std::shared_ptr<app_state> get_app(const std::string &name)
{
zauto_read_lock l(app_lock());
return _state->get_app(name);
}

inline std::shared_ptr<app_state> get_app(int32_t app_id)
{
zauto_read_lock l(app_lock());
return _state->get_app(app_id);
}

// get bulk_load_info path on file provider
// <remote_root_path>/<cluster_name>/<app_name>/bulk_load_info
inline std::string get_bulk_load_info_path(const std::string &app_name,
Expand Down

0 comments on commit 9546c46

Please sign in to comment.