Skip to content

Commit

Permalink
Make the Env class Customizable (facebook#9293)
Browse files Browse the repository at this point in the history
Summary:
Allows the Env to have options (Configurable) and loads like other Customizable classes.

Pull Request resolved: facebook#9293

Reviewed By: pdillinger, zhichao-cao

Differential Revision: D33181591

Pulled By: mrambacher

fbshipit-source-id: 55e823886c654d214eda9eedd45ccdc54dac14d7
  • Loading branch information
mrambacher authored and facebook-github-bot committed Jan 5, 2022
1 parent 677d2b4 commit fe31dc5
Show file tree
Hide file tree
Showing 31 changed files with 763 additions and 243 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Public API change
* Added values to `TraceFilterType`: `kTraceFilterIteratorSeek`, `kTraceFilterIteratorSeekForPrev`, and `kTraceFilterMultiGet`. They can be set in `TraceOptions` to filter out the operation types after which they are named.
* Added `TraceOptions::preserve_write_order`. When enabled it guarantees write records are traced in the same order they are logged to WAL and applied to the DB. By default it is disabled (false) to match the legacy behavior and prevent regression.
* Made the Env class extend the Customizable class. Implementations need to be registered with the ObjectRegistry and to implement a Name() method in order to be created via this method.

## 6.28.0 (2021-12-17)
### New Features
Expand Down
30 changes: 27 additions & 3 deletions db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,35 @@
namespace ROCKSDB_NAMESPACE {

static constexpr int kValueSize = 1000;

namespace {
// A wrapper that allows injection of errors.
class ErrorEnv : public EnvWrapper {
public:
bool writable_file_error_;
int num_writable_file_errors_;

explicit ErrorEnv(Env* _target)
: EnvWrapper(_target),
writable_file_error_(false),
num_writable_file_errors_(0) {}
const char* Name() const override { return "ErrorEnv"; }

virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& soptions) override {
result->reset();
if (writable_file_error_) {
++num_writable_file_errors_;
return Status::IOError(fname, "fake error");
}
return target()->NewWritableFile(fname, result, soptions);
}
};
} // namespace
class CorruptionTest : public testing::Test {
public:
std::shared_ptr<Env> env_guard_;
test::ErrorEnv* env_;
ErrorEnv* env_;
std::string dbname_;
std::shared_ptr<Cache> tiny_cache_;
Options options_;
Expand All @@ -58,7 +82,7 @@ class CorruptionTest : public testing::Test {
EXPECT_OK(
test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
EXPECT_NE(base_env, nullptr);
env_ = new test::ErrorEnv(base_env);
env_ = new ErrorEnv(base_env);
options_.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options_.env = env_;
dbname_ = test::PerThreadDBPath(env_, "corruption_test");
Expand Down
4 changes: 4 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,8 @@ TEST_F(DBBasicTest, MmapAndBufferOptions) {
class TestEnv : public EnvWrapper {
public:
explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
static const char* kClassName() { return "TestEnv"; }
const char* Name() const override { return kClassName(); }

class TestLogger : public Logger {
public:
Expand Down Expand Up @@ -3064,6 +3066,8 @@ TEST_P(DBBasicTestWithParallelIO, MultiGetDirectIO) {

public:
FakeDirectIOEnv(Env* env) : EnvWrapper(env) {}
static const char* kClassName() { return "FakeDirectIOEnv"; }
const char* Name() const override { return kClassName(); }

Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
Expand Down
3 changes: 3 additions & 0 deletions db/db_secondary_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ namespace {
class TraceFileEnv : public EnvWrapper {
public:
explicit TraceFileEnv(Env* _target) : EnvWrapper(_target) {}
static const char* kClassName() { return "TraceFileEnv"; }
const char* Name() const override { return kClassName(); }

Status NewRandomAccessFile(const std::string& f,
std::unique_ptr<RandomAccessFile>* r,
const EnvOptions& env_options) override {
Expand Down
4 changes: 2 additions & 2 deletions db/db_sst_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -789,8 +789,8 @@ TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
class MyEnv : public EnvWrapper {
public:
MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}

Status DeleteFile(const std::string& fname) {
const char* Name() const override { return "MyEnv"; }
Status DeleteFile(const std::string& fname) override {
if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
return Status::OK();
}
Expand Down
3 changes: 3 additions & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ class SpecialEnv : public EnvWrapper {
public:
explicit SpecialEnv(Env* base, bool time_elapse_only_sleep = false);

static const char* kClassName() { return "SpecialEnv"; }
const char* Name() const override { return kClassName(); }

Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
const EnvOptions& soptions) override {
class SSTableFile : public WritableFile {
Expand Down
2 changes: 2 additions & 0 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ class ExternalSSTTestEnv : public EnvWrapper {
public:
ExternalSSTTestEnv(Env* t, bool fail_link)
: EnvWrapper(t), fail_link_(fail_link) {}
static const char* kClassName() { return "ExternalSSTTestEnv"; }
const char* Name() const override { return kClassName(); }

Status LinkFile(const std::string& s, const std::string& t) override {
if (fail_link_) {
Expand Down
2 changes: 2 additions & 0 deletions db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,8 @@ class TableFileCreationListener : public EventListener {
class TestEnv : public EnvWrapper {
public:
explicit TestEnv(Env* t) : EnvWrapper(t) {}
static const char* kClassName() { return "TestEnv"; }
const char* Name() const override { return kClassName(); }

void SetStatus(Status s) { status_ = s; }

Expand Down
2 changes: 2 additions & 0 deletions db/merge_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class CountMergeOperator : public AssociativeMergeOperator {
class EnvMergeTest : public EnvWrapper {
public:
EnvMergeTest() : EnvWrapper(Env::Default()) {}
static const char* kClassName() { return "MergeEnv"; }
const char* Name() const override { return kClassName(); }
// ~EnvMergeTest() override {}

uint64_t NowNanos() override {
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/db_stress_env_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace ROCKSDB_NAMESPACE {
class DbStressEnvWrapper : public EnvWrapper {
public:
explicit DbStressEnvWrapper(Env* t) : EnvWrapper(t) {}
static const char* kClassName() { return "DbStressEnv"; }
const char* Name() const override { return kClassName(); }

Status DeleteFile(const std::string& f) override {
// We determine whether it is a manifest file by searching a strong,
Expand Down
2 changes: 2 additions & 0 deletions db_stress_tool/multi_ops_txns_stress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ namespace ROCKSDB_NAMESPACE {

// TODO: move these to gflags.
static constexpr uint32_t kInitNumC = 1000;
#ifndef ROCKSDB_LITE
static constexpr uint32_t kInitialCARatio = 3;
#endif // ROCKSDB_LITE
static constexpr bool kDoPreload = true;

std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) {
Expand Down
81 changes: 81 additions & 0 deletions env/composite_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// (found in the LICENSE.Apache file in the root directory).
//
#include "env/composite_env_wrapper.h"
#include "rocksdb/utilities/options_type.h"

namespace ROCKSDB_NAMESPACE {
namespace {
Expand Down Expand Up @@ -380,4 +381,84 @@ Status CompositeEnv::NewDirectory(const std::string& name,
return status;
}

namespace {
static std::unordered_map<std::string, OptionTypeInfo>
composite_env_wrapper_type_info = {
#ifndef ROCKSDB_LITE
{"target",
{0, OptionType::kCustomizable, OptionVerificationType::kByName,
OptionTypeFlags::kDontSerialize | OptionTypeFlags::kRawPointer,
[](const ConfigOptions& opts, const std::string& /*name*/,
const std::string& value, void* addr) {
auto target = static_cast<EnvWrapper::Target*>(addr);
return Env::CreateFromString(opts, value, &(target->env),
&(target->guard));
},
nullptr, nullptr}},
#endif // ROCKSDB_LITE
};
static std::unordered_map<std::string, OptionTypeInfo>
composite_fs_wrapper_type_info = {
#ifndef ROCKSDB_LITE
{"file_system",
OptionTypeInfo::AsCustomSharedPtr<FileSystem>(
0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
#endif // ROCKSDB_LITE
};

static std::unordered_map<std::string, OptionTypeInfo>
composite_clock_wrapper_type_info = {
#ifndef ROCKSDB_LITE
{"clock",
OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
#endif // ROCKSDB_LITE
};

} // namespace

std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
}

CompositeEnvWrapper::CompositeEnvWrapper(Env* env,
const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<SystemClock>& sc)
: CompositeEnv(fs, sc), target_(env) {
RegisterOptions("", &target_, &composite_env_wrapper_type_info);
RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
}

CompositeEnvWrapper::CompositeEnvWrapper(const std::shared_ptr<Env>& env,
const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<SystemClock>& sc)
: CompositeEnv(fs, sc), target_(env) {
RegisterOptions("", &target_, &composite_env_wrapper_type_info);
RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
}

Status CompositeEnvWrapper::PrepareOptions(const ConfigOptions& options) {
target_.Prepare();
if (file_system_ == nullptr) {
file_system_ = target_.env->GetFileSystem();
}
if (system_clock_ == nullptr) {
system_clock_ = target_.env->GetSystemClock();
}
return Env::PrepareOptions(options);
}

#ifndef ROCKSDB_LITE
std::string CompositeEnvWrapper::SerializeOptions(
const ConfigOptions& config_options, const std::string& header) const {
auto options = CompositeEnv::SerializeOptions(config_options, header);
if (target_.env != nullptr && target_.env != Env::Default()) {
options.append("target=");
options.append(target_.env->ToString(config_options));
}
return options;
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
72 changes: 51 additions & 21 deletions env/composite_env_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,89 +254,119 @@ class CompositeEnvWrapper : public CompositeEnv {
public:
// Initialize a CompositeEnvWrapper that delegates all thread/time related
// calls to env, and all file operations to fs
explicit CompositeEnvWrapper(Env* env)
: CompositeEnvWrapper(env, env->GetFileSystem(), env->GetSystemClock()) {}
explicit CompositeEnvWrapper(Env* env, const std::shared_ptr<FileSystem>& fs)
: CompositeEnvWrapper(env, fs, env->GetSystemClock()) {}

explicit CompositeEnvWrapper(Env* env, const std::shared_ptr<SystemClock>& sc)
: CompositeEnvWrapper(env, env->GetFileSystem(), sc) {}

explicit CompositeEnvWrapper(Env* env, const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<SystemClock>& sc);

explicit CompositeEnvWrapper(const std::shared_ptr<Env>& env,
const std::shared_ptr<FileSystem>& fs)
: CompositeEnvWrapper(env, fs, env->GetSystemClock()) {}

explicit CompositeEnvWrapper(const std::shared_ptr<Env>& env,
const std::shared_ptr<SystemClock>& sc)
: CompositeEnv(fs, sc), env_target_(env) {}
: CompositeEnvWrapper(env, env->GetFileSystem(), sc) {}

explicit CompositeEnvWrapper(const std::shared_ptr<Env>& env,
const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<SystemClock>& sc);

static const char* kClassName() { return "CompositeEnv"; }
const char* Name() const override { return kClassName(); }
bool IsInstanceOf(const std::string& name) const override {
if (name == kClassName()) {
return true;
} else {
return CompositeEnv::IsInstanceOf(name);
}
}
const Customizable* Inner() const override { return target_.env; }

Status PrepareOptions(const ConfigOptions& options) override;
#ifndef ROCKSDB_LITE
std::string SerializeOptions(const ConfigOptions& config_options,
const std::string& header) const override;
#endif // ROCKSDB_LITE

// Return the target to which this Env forwards all calls
Env* env_target() const { return env_target_; }
Env* env_target() const { return target_.env; }

#if !defined(OS_WIN) && !defined(ROCKSDB_NO_DYNAMIC_EXTENSION)
Status LoadLibrary(const std::string& lib_name,
const std::string& search_path,
std::shared_ptr<DynamicLibrary>* result) override {
return env_target_->LoadLibrary(lib_name, search_path, result);
return target_.env->LoadLibrary(lib_name, search_path, result);
}
#endif

void Schedule(void (*f)(void* arg), void* a, Priority pri,
void* tag = nullptr, void (*u)(void* arg) = nullptr) override {
return env_target_->Schedule(f, a, pri, tag, u);
return target_.env->Schedule(f, a, pri, tag, u);
}

int UnSchedule(void* tag, Priority pri) override {
return env_target_->UnSchedule(tag, pri);
return target_.env->UnSchedule(tag, pri);
}

void StartThread(void (*f)(void*), void* a) override {
return env_target_->StartThread(f, a);
return target_.env->StartThread(f, a);
}
void WaitForJoin() override { return env_target_->WaitForJoin(); }
void WaitForJoin() override { return target_.env->WaitForJoin(); }
unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override {
return env_target_->GetThreadPoolQueueLen(pri);
return target_.env->GetThreadPoolQueueLen(pri);
}

Status GetHostName(char* name, uint64_t len) override {
return env_target_->GetHostName(name, len);
return target_.env->GetHostName(name, len);
}
void SetBackgroundThreads(int num, Priority pri) override {
return env_target_->SetBackgroundThreads(num, pri);
return target_.env->SetBackgroundThreads(num, pri);
}
int GetBackgroundThreads(Priority pri) override {
return env_target_->GetBackgroundThreads(pri);
return target_.env->GetBackgroundThreads(pri);
}

Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
return env_target_->SetAllowNonOwnerAccess(allow_non_owner_access);
return target_.env->SetAllowNonOwnerAccess(allow_non_owner_access);
}

void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
return env_target_->IncBackgroundThreadsIfNeeded(num, pri);
return target_.env->IncBackgroundThreadsIfNeeded(num, pri);
}

void LowerThreadPoolIOPriority(Priority pool) override {
env_target_->LowerThreadPoolIOPriority(pool);
target_.env->LowerThreadPoolIOPriority(pool);
}

void LowerThreadPoolCPUPriority(Priority pool) override {
env_target_->LowerThreadPoolCPUPriority(pool);
target_.env->LowerThreadPoolCPUPriority(pool);
}

Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
return env_target_->LowerThreadPoolCPUPriority(pool, pri);
return target_.env->LowerThreadPoolCPUPriority(pool, pri);
}

Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
return env_target_->GetThreadList(thread_list);
return target_.env->GetThreadList(thread_list);
}

ThreadStatusUpdater* GetThreadStatusUpdater() const override {
return env_target_->GetThreadStatusUpdater();
return target_.env->GetThreadStatusUpdater();
}

uint64_t GetThreadID() const override { return env_target_->GetThreadID(); }
uint64_t GetThreadID() const override { return target_.env->GetThreadID(); }

std::string GenerateUniqueId() override {
return env_target_->GenerateUniqueId();
return target_.env->GenerateUniqueId();
}

private:
Env* env_target_;
EnvWrapper::Target target_;
};
} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit fe31dc5

Please sign in to comment.