Skip to content

Commit

Permalink
feat: add rate limiter for backup request (#855)
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Jul 14, 2021
1 parent 41b5d3d commit 1275b1b
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 5 deletions.
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replica_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class replica_envs
static const std::string BUSINESS_INFO;
static const std::string REPLICA_ACCESS_CONTROLLER_ALLOWED_USERS;
static const std::string READ_QPS_THROTTLING;
static const std::string BACKUP_REQUEST_QPS_THROTTLING;
static const std::string SPLIT_VALIDATE_PARTITION_HASH;
static const std::string USER_SPECIFIED_COMPACTION;
};
Expand Down
1 change: 1 addition & 0 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ const std::string replica_envs::READ_QPS_THROTTLING("replica.read_throttling");
const std::string
replica_envs::SPLIT_VALIDATE_PARTITION_HASH("replica.split.validate_partition_hash");
const std::string replica_envs::USER_SPECIFIED_COMPACTION("user_specified_compaction");
const std::string replica_envs::BACKUP_REQUEST_QPS_THROTTLING("replica.backup_request_throttling");

const std::string bulk_load_constant::BULK_LOAD_INFO("bulk_load_info");
const int32_t bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL = 10;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/app_env_validator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ void app_env_validator::register_all_validators()
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::SPLIT_VALIDATE_PARTITION_HASH,
std::bind(&check_split_validation, std::placeholders::_1, std::placeholders::_2)},
{replica_envs::USER_SPECIFIED_COMPACTION, nullptr}};
{replica_envs::USER_SPECIFIED_COMPACTION, nullptr},
{replica_envs::BACKUP_REQUEST_QPS_THROTTLING,
std::bind(&check_throttling, std::placeholders::_1, std::placeholders::_2)}};
}

} // namespace replication
Expand Down
21 changes: 17 additions & 4 deletions src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ replica::replica(
_counter_recent_read_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str =
fmt::format("recent.backup.request.throttling.delay.count@{}", _app_info.app_name);
_counter_recent_backup_request_throttling_delay_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str =
fmt::format("recent.backup.request.throttling.reject.count@{}", _app_info.app_name);
_counter_recent_backup_request_throttling_reject_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());

counter_str = fmt::format("dup.disabled_non_idempotent_write_count@{}", _app_info.app_name);
_counter_dup_disabled_non_idempotent_write_count.init_app_counter(
"eon.replica", counter_str.c_str(), COUNTER_TYPE_VOLATILE_NUMBER, counter_str.c_str());
Expand Down Expand Up @@ -185,13 +195,13 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (!ignore_throttling && throttle_read_request(request)) {
return;
}

if (!request->is_backup_request()) {
// only backup request is allowed to read from a stale replica

if (!ignore_throttling && throttle_read_request(request)) {
return;
}

if (status() != partition_status::PS_PRIMARY) {
response_client_read(request, ERR_INVALID_STATE);
return;
Expand All @@ -207,6 +217,9 @@ void replica::on_client_read(dsn::message_ex *request, bool ignore_throttling)
return;
}
} else {
if (!ignore_throttling && throttle_backup_request(request)) {
return;
}
_counter_backup_request_qps->increment();
}

Expand Down
4 changes: 4 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
/// return true if request is throttled.
bool throttle_write_request(message_ex *request);
bool throttle_read_request(message_ex *request);
bool throttle_backup_request(message_ex *request);
/// update throttling controllers
/// \see replica::update_app_envs
void update_throttle_envs(const std::map<std::string, std::string> &envs);
Expand Down Expand Up @@ -536,6 +537,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
throttling_controller _write_qps_throttling_controller; // throttling by requests-per-second
throttling_controller _write_size_throttling_controller; // throttling by bytes-per-second
throttling_controller _read_qps_throttling_controller;
throttling_controller _backup_request_qps_throttling_controller;

// duplication
std::unique_ptr<replica_duplicator_manager> _duplication_mgr;
Expand Down Expand Up @@ -563,6 +565,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
perf_counter_wrapper _counter_recent_write_throttling_reject_count;
perf_counter_wrapper _counter_recent_read_throttling_delay_count;
perf_counter_wrapper _counter_recent_read_throttling_reject_count;
perf_counter_wrapper _counter_recent_backup_request_throttling_delay_count;
perf_counter_wrapper _counter_recent_backup_request_throttling_reject_count;
std::vector<perf_counter *> _counters_table_level_latency;
perf_counter_wrapper _counter_dup_disabled_non_idempotent_write_count;
perf_counter_wrapper _counter_backup_request_qps;
Expand Down
24 changes: 24 additions & 0 deletions src/replica/replica_throttle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,27 @@ bool replica::throttle_read_request(message_ex *request)
return false;
}

bool replica::throttle_backup_request(message_ex *request)
{
int64_t delay_ms = 0;
auto type = _backup_request_qps_throttling_controller.control(
request->header->client.timeout_ms, 1, delay_ms);
if (type != throttling_controller::PASS) {
if (type == throttling_controller::DELAY) {
tasking::enqueue(LPC_read_THROTTLING_DELAY,
&_tracker,
[ this, req = message_ptr(request) ]() { on_client_read(req, true); },
get_gpid().thread_hash(),
std::chrono::milliseconds(delay_ms));
_counter_recent_backup_request_throttling_delay_count->increment();
} else { /** type == throttling_controller::REJECT **/
_counter_recent_backup_request_throttling_reject_count->increment();
}
return true;
}
return false;
}

void replica::update_throttle_envs(const std::map<std::string, std::string> &envs)
{
update_throttle_env_internal(
Expand All @@ -80,6 +101,9 @@ void replica::update_throttle_envs(const std::map<std::string, std::string> &env
envs, replica_envs::WRITE_SIZE_THROTTLING, _write_size_throttling_controller);
update_throttle_env_internal(
envs, replica_envs::READ_QPS_THROTTLING, _read_qps_throttling_controller);
update_throttle_env_internal(envs,
replica_envs::BACKUP_REQUEST_QPS_THROTTLING,
_backup_request_qps_throttling_controller);
}

void replica::update_throttle_env_internal(const std::map<std::string, std::string> &envs,
Expand Down

0 comments on commit 1275b1b

Please sign in to comment.