Skip to content

Commit

Permalink
Introduce bottom-pri thread pool for large universal compactions
Browse files Browse the repository at this point in the history
Summary:
When we had a single thread pool for compactions, a thread could be busy for a long time (minutes) executing a compaction involving the bottom level. In multi-instance setups, the entire thread pool could be consumed by such bottom-level compactions. Then, top-level compactions (e.g., a few L0 files) would be blocked for a long time ("head-of-line blocking"). Such top-level compactions are critical to prevent compaction stalls as they can quickly reduce number of L0 files / sorted runs.

This diff introduces a bottom-priority queue for universal compactions including the bottom level. This alleviates the head-of-line blocking situation for fast, top-level compactions.

- Added `Env::Priority::BOTTOM` thread pool. This feature is only enabled if user explicitly configures it to have a positive number of threads.
- Changed `ThreadPoolImpl`'s default thread limit from one to zero. This change is invisible to users as we call `IncBackgroundThreadsIfNeeded` on the low-pri/high-pri pools during `DB::Open` with values of at least one. It is necessary, though, for bottom-pri to start with zero threads so the feature is disabled by default.
- Separated `ManualCompaction` into two parts in `PrepickedCompaction`. `PrepickedCompaction` is used for any compaction that's picked outside of its execution thread, either manual or automatic.
- Forward universal compactions involving last level to the bottom pool (worker thread's entry point is `BGWorkBottomCompaction`).
- Track `bg_bottom_compaction_scheduled_` so we can wait for bottom-level compactions to finish. We don't count them against the background jobs limits. So users of this feature will get an extra compaction for free.
Closes facebook#2580

Differential Revision: D5422916

Pulled By: ajkr

fbshipit-source-id: a74bd11f1ea4933df3739b16808bb21fcd512333
  • Loading branch information
ajkr authored and facebook-github-bot committed Aug 3, 2017
1 parent 0b814ba commit cc01985
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 70 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### New Features
* Add Iterator::Refresh(), which allows users to update the iterator state so that they can avoid some initialization costs of recreating iterators.
* Replace dynamic_cast<> (except unit test) so people can choose to build with RTTI off. With make, release mode is by default built with -fno-rtti and debug mode is built without it. Users can override it by setting USE_RTTI=0 or 1.
* Universal compactions including the bottom level can be executed in a dedicated thread pool. This alleviates head-of-line blocking in the compaction queue, which cause write stalling, particularly in multi-instance use cases. Users can enable this feature via `Env::SetBackgroundThreads(N, Env::Priority::BOTTOM)`, where `N > 0`.

### Bug Fixes
* Fix wrong latencies in `rocksdb.db.get.micros`, `rocksdb.db.write.micros`, and `rocksdb.sst.read.micros`.
Expand Down
11 changes: 8 additions & 3 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
last_batch_group_size_(0),
unscheduled_flushes_(0),
unscheduled_compactions_(0),
bg_bottom_compaction_scheduled_(0),
bg_compaction_scheduled_(0),
num_running_compactions_(0),
bg_flush_scheduled_(0),
Expand Down Expand Up @@ -242,7 +243,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
return;
}
// Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_) {
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) {
bg_cv_.Wait();
}
}
Expand All @@ -252,15 +254,18 @@ DBImpl::~DBImpl() {
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
CancelAllBackgroundWork(false);
int bottom_compactions_unscheduled =
env_->UnSchedule(this, Env::Priority::BOTTOM);
int compactions_unscheduled = env_->UnSchedule(this, Env::Priority::LOW);
int flushes_unscheduled = env_->UnSchedule(this, Env::Priority::HIGH);
mutex_.Lock();
bg_bottom_compaction_scheduled_ -= bottom_compactions_unscheduled;
bg_compaction_scheduled_ -= compactions_unscheduled;
bg_flush_scheduled_ -= flushes_unscheduled;

// Wait for background work to finish
while (bg_compaction_scheduled_ || bg_flush_scheduled_ ||
bg_purge_scheduled_) {
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || bg_purge_scheduled_) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait();
}
Expand Down
36 changes: 27 additions & 9 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ class DBImpl : public DB {
}
};

struct PrepickedCompaction;
struct PurgeFileInfo;

// Recover the descriptor from persistent storage. May do a significant
Expand Down Expand Up @@ -799,14 +800,19 @@ class DBImpl : public DB {
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
uint32_t path_id, int job_id);
static void BGWorkCompaction(void* arg);
// Runs a pre-chosen universal compaction involving bottom level in a
// separate, bottom-pri thread pool.
static void BGWorkBottomCompaction(void* arg);
static void BGWorkFlush(void* db);
static void BGWorkPurge(void* arg);
static void UnscheduleCallback(void* arg);
void BackgroundCallCompaction(void* arg);
void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
Env::Priority bg_thread_pri);
void BackgroundCallFlush();
void BackgroundCallPurge();
Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer, void* m = 0);
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);

Expand Down Expand Up @@ -1059,6 +1065,10 @@ class DBImpl : public DB {
int unscheduled_flushes_;
int unscheduled_compactions_;

// count how many background compactions are running or have been scheduled in
// the BOTTOM pool
int bg_bottom_compaction_scheduled_;

// count how many background compactions are running or have been scheduled
int bg_compaction_scheduled_;

Expand All @@ -1075,7 +1085,7 @@ class DBImpl : public DB {
int bg_purge_scheduled_;

// Information for a manual compaction
struct ManualCompaction {
struct ManualCompactionState {
ColumnFamilyData* cfd;
int input_level;
int output_level;
Expand All @@ -1091,13 +1101,21 @@ class DBImpl : public DB {
InternalKey* manual_end; // how far we are compacting
InternalKey tmp_storage; // Used to keep track of compaction progress
InternalKey tmp_storage1; // Used to keep track of compaction progress
};
struct PrepickedCompaction {
// background compaction takes ownership of `compaction`.
Compaction* compaction;
// caller retains ownership of `manual_compaction_state` as it is reused
// across background compactions.
ManualCompactionState* manual_compaction_state; // nullptr if non-manual
};
std::deque<ManualCompaction*> manual_compaction_dequeue_;
std::deque<ManualCompactionState*> manual_compaction_dequeue_;

struct CompactionArg {
// caller retains ownership of `db`.
DBImpl* db;
ManualCompaction* m;
// background compaction takes ownership of `prepicked_compaction`.
PrepickedCompaction* prepicked_compaction;
};

// Have we encountered a background error in paranoid mode?
Expand Down Expand Up @@ -1231,11 +1249,11 @@ class DBImpl : public DB {

bool HasPendingManualCompaction();
bool HasExclusiveManualCompaction();
void AddManualCompaction(ManualCompaction* m);
void RemoveManualCompaction(ManualCompaction* m);
bool ShouldntRunManualCompaction(ManualCompaction* m);
void AddManualCompaction(ManualCompactionState* m);
void RemoveManualCompaction(ManualCompactionState* m);
bool ShouldntRunManualCompaction(ManualCompactionState* m);
bool HaveManualCompaction(ColumnFamilyData* cfd);
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);

size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;

Expand Down
Loading

0 comments on commit cc01985

Please sign in to comment.