Skip to content

Commit

Permalink
feat(make_idempotent): support making incr request idempotent in `p…
Browse files Browse the repository at this point in the history
…egasus_write_service::impl` (#2185)

Implement two APIs making `incr` request idempotent in `pegasus_write_service::impl`:

- translate an `incr` request (non-idempotent) into a single-put request (idempotent),
and process the possible errors during the translation, e.g. failed to read base value for
increment from the RocksDB instance;
- apply the single-put request into the RocksDB instance and make response for `incr`.
  • Loading branch information
empiredan authored Feb 5, 2025
1 parent 2222230 commit 8f75b52
Show file tree
Hide file tree
Showing 7 changed files with 530 additions and 90 deletions.
15 changes: 15 additions & 0 deletions idl/rrdb.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,26 @@ enum mutate_operation
MO_DELETE
}

enum update_type
{
UT_PUT,
UT_INCR
}

// The single-put request, just writes a key/value pair into storage, which is certainly
// idempotent.
struct update_request
{
1:dsn.blob key;
2:dsn.blob value;
3:i32 expire_ts_seconds;

// This field marks the type of a single-put request, mainly used to differentiate a general
// single-put request from the one translated from a non-idempotent atomic write request:
// - a general single-put request, if `type` is UT_PUT or not set by default as it's
// optional, or
// - a put request translated from a non-idempotent incr request, if `type` is UT_INCR.
4:optional update_type type;
}

struct update_response
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_server_write.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class pegasus_server_write : public dsn::replication::replica_base

friend class pegasus_server_write_test;
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class PegasusWriteServiceImplTest;
friend class rocksdb_wrapper_test;

std::unique_ptr<pegasus_write_service> _write_svc;
Expand Down
3 changes: 1 addition & 2 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,8 @@ class pegasus_write_service : dsn::replication::replica_base
private:
void clear_up_batch_states();

private:
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class PegasusWriteServiceImplTest;
friend class pegasus_server_write_test;
friend class rocksdb_wrapper_test;

Expand Down
178 changes: 173 additions & 5 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,99 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return resp.error;
}

// Tranlate an incr request which is certainly non-idempotent into a single-put request
// which is certainly idempotent. Return current status for RocksDB.
int make_idempotent(const dsn::apps::incr_request &req,
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update)
{
// Get current raw value for the provided key from the RocksDB instance.
db_get_context get_ctx;
const int err = _rocksdb_wrapper->get(req.key.to_string_view(), &get_ctx);
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
return make_error_response(err, err_resp);
}

if (!get_ctx.found || get_ctx.expired) {
// Once the provided key is not found or has been expired, we could assume that
// its value is 0 before incr; thus the final result for incr could be set as
// the value of the single-put request, i.e. req.increment.
return make_idempotent_request_for_incr(
req.key, req.increment, calc_expire_on_non_existent(req), update);
}

// Extract user data from raw value as base for increment.
dsn::blob base_value;
pegasus_extract_user_data(_pegasus_data_version, std::move(get_ctx.raw_value), base_value);

int64_t new_int = 0;
if (base_value.empty()) {
// Old value is also considered as 0 before incr as above once it's empty, thus
// set req.increment as the value for single put.
new_int = req.increment;
} else {
int64_t base_int = 0;
if (dsn_unlikely(!dsn::buf2int64(base_value.to_string_view(), base_int))) {
// Old value is not valid int64.
LOG_ERROR_PREFIX("incr failed: error = base value \"{}\" "
"is not an integer or out of range",
utils::c_escape_sensitive_string(base_value));
return make_error_response(rocksdb::Status::kInvalidArgument, err_resp);
}

new_int = base_int + req.increment;
if (dsn_unlikely((req.increment > 0 && new_int < base_int) ||
(req.increment < 0 && new_int > base_int))) {
// New value overflows, just respond with the base value.
LOG_ERROR_PREFIX("incr failed: error = new value is out of range, "
"base_value = {}, increment = {}, new_value = {}",
base_int,
req.increment,
new_int);
return make_error_response(rocksdb::Status::kInvalidArgument, base_int, err_resp);
}
}

return make_idempotent_request_for_incr(
req.key, new_int, calc_expire_on_existing(req, get_ctx), update);
}

// Apply single-put request translated from incr request into RocksDB, and build response
// for incr. Return current status for RocksDB.
int put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::incr_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = ctx.decree;
resp.server = _primary_host_port;

auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });

resp.error =
_rocksdb_wrapper->write_batch_put_ctx(ctx,
update.key.to_string_view(),
update.value.to_string_view(),
static_cast<uint32_t>(update.expire_ts_seconds));
if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}

resp.error = _rocksdb_wrapper->write(ctx.decree);
if (dsn_unlikely(resp.error != rocksdb::Status::kOk)) {
return resp.error;
}

// Shouldn't fail to parse since the value must be a valid int64.
CHECK(dsn::buf2int64(update.value.to_string_view(), resp.new_value),
"invalid int64 value for put idempotent incr: key={}, value={}",
update.key,
update.value);

return resp.error;
}

int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp)
{
resp.app_id = get_gpid().get_app_id();
Expand Down Expand Up @@ -242,14 +335,15 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
resp.error = _rocksdb_wrapper->write_batch_put(
decree, update.key.to_string_view(), std::to_string(new_value), new_expire_ts);
if (resp.error) {
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}

resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == rocksdb::Status::kOk) {
resp.new_value = new_value;
}

return resp.error;
}

Expand Down Expand Up @@ -569,6 +663,83 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return raw_key;
}

// Calculate expire timestamp in seconds for the keys not contained in the storage
// according to `req`.
template <typename TRequest>
static inline int32_t calc_expire_on_non_existent(const TRequest &req)
{
return req.expire_ts_seconds > 0 ? req.expire_ts_seconds : 0;
}

// Calculate new expire timestamp in seconds for the keys contained in the storage
// according to `req` and their current expire timestamp in `get_ctx`.
template <typename TRequest>
static inline int32_t calc_expire_on_existing(const TRequest &req,
const db_get_context &get_ctx)
{
if (req.expire_ts_seconds == 0) {
// Still use current expire timestamp of the existing key as the new value.
return static_cast<int32_t>(get_ctx.expire_ts);
}

if (req.expire_ts_seconds < 0) {
// Reset expire timestamp to 0.
return 0;
}

return req.expire_ts_seconds;
}

// Build a single-put request by provided int64 value.
static inline void make_idempotent_request(const dsn::blob &key,
int64_t value,
int32_t expire_ts_seconds,
dsn::apps::update_type::type type,
dsn::apps::update_request &update)
{
update.key = key;
update.value = dsn::blob::create_from_numeric(value);
update.expire_ts_seconds = expire_ts_seconds;
update.__set_type(type);
}

// Build corresponding single-put request for an incr request, and return current status
// for RocksDB, i.e. kOk.
static inline int make_idempotent_request_for_incr(const dsn::blob &key,
int64_t value,
int32_t expire_ts_seconds,
dsn::apps::update_request &update)
{
make_idempotent_request(
key, value, expire_ts_seconds, dsn::apps::update_type::UT_INCR, update);
return rocksdb::Status::kOk;
}

// Build incr response only for error, and return the current error status for RocksDB.
inline int make_error_response(int err, dsn::apps::incr_response &resp)
{
CHECK(err != rocksdb::Status::kOk, "this incr response is built only for error");
resp.error = err;

const auto pid = get_gpid();
resp.app_id = pid.get_app_id();
resp.partition_index = pid.get_partition_index();

// Currently the mutation has not been assigned with valid decree, thus set to -1.
resp.decree = -1;

resp.server = _primary_host_port;

return err;
}

// Build incr response as above, except that also set new value for response.
inline int make_error_response(int err, int64_t new_value, dsn::apps::incr_response &resp)
{
resp.new_value = new_value;
return make_error_response(err, resp);
}

// return true if the check type is supported
static bool is_check_type_supported(::dsn::apps::cas_check_type::type check_type)
{
Expand Down Expand Up @@ -678,13 +849,10 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return false;
}

private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class pegasus_write_service_impl_test;
friend class PegasusWriteServiceImplTest;
friend class rocksdb_wrapper_test;
FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0);

const std::string _primary_host_port;
const uint32_t _pegasus_data_version;
Expand Down
13 changes: 9 additions & 4 deletions src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,25 @@ int rocksdb_wrapper::get(std::string_view raw_key, /*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](std::string_view) -> int { return FAIL_DB_GET; });

rocksdb::Status s =
const rocksdb::Status s =
_db->Get(_rd_opts, _data_cf, utils::to_rocksdb_slice(raw_key), &ctx->raw_value);
if (dsn_likely(s.ok())) {
// success
// The key is found and its value is read successfully.
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
METRIC_VAR_INCREMENT(read_expired_values);
} else {
ctx->expired = false;
}
return rocksdb::Status::kOk;
} else if (s.IsNotFound()) {
// NotFound is an acceptable error
}

if (s.IsNotFound()) {
// NotFound is considered normal since the key may not be present in DB now.
ctx->found = false;
ctx->expired = false;
return rocksdb::Status::kOk;
}

Expand Down
Loading

0 comments on commit 8f75b52

Please sign in to comment.