Skip to content

Commit 39c2937

Browse files
hx235facebook-github-bot
authored andcommitted
Add SetAllowStall() (facebook#11335)
Summary: **Context/Summary:** - Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()` - Misc: some clean up - see PR conversation Pull Request resolved: facebook#11335 Test Plan: - New UT Reviewed By: akankshamahajan15 Differential Revision: D44502555 Pulled By: hx235 fbshipit-source-id: 24b5cc57df7734b11d42e4870c06c87b95312b5e
1 parent 0efd7b4 commit 39c2937

File tree

4 files changed

+81
-11
lines changed

4 files changed

+81
-11
lines changed

HISTORY.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
### New Features
1111
* Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called.
12+
* Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()`
1213

1314
## 8.1.0 (03/18/2023)
1415
### Behavior changes

db/db_write_buffer_manager_test.cc

+67
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,73 @@ TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) {
846846
delete shared_wbm_db;
847847
}
848848

849+
TEST_F(DBWriteBufferManagerTest, RuntimeChangeableAllowStall) {
850+
constexpr int kBigValue = 10000;
851+
852+
Options options = CurrentOptions();
853+
options.write_buffer_manager.reset(
854+
new WriteBufferManager(1, nullptr /* cache */, true /* allow_stall */));
855+
DestroyAndReopen(options);
856+
857+
// Pause flush thread so that
858+
// (a) the only way to exist write stall below is to change the `allow_stall`
859+
// (b) the write stall is "stable" without being interfered by flushes so that
860+
// we can check it without flakiness
861+
std::unique_ptr<test::SleepingBackgroundTask> sleeping_task(
862+
new test::SleepingBackgroundTask());
863+
env_->SetBackgroundThreads(1, Env::HIGH);
864+
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
865+
sleeping_task.get(), Env::Priority::HIGH);
866+
sleeping_task->WaitUntilSleeping();
867+
868+
// Test 1: test setting `allow_stall` from true to false
869+
//
870+
// Assert existence of a write stall
871+
WriteOptions wo_no_slowdown;
872+
wo_no_slowdown.no_slowdown = true;
873+
Status s = Put(Key(0), DummyString(kBigValue), wo_no_slowdown);
874+
ASSERT_TRUE(s.IsIncomplete());
875+
ASSERT_TRUE(s.ToString().find("Write stall") != std::string::npos);
876+
877+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
878+
{{"WBMStallInterface::BlockDB",
879+
"DBWriteBufferManagerTest::RuntimeChangeableThreadSafeParameters::"
880+
"ChangeParameter"}});
881+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
882+
883+
// Test `SetAllowStall()`
884+
port::Thread thread1([&] { ASSERT_OK(Put(Key(0), DummyString(kBigValue))); });
885+
port::Thread thread2([&] {
886+
TEST_SYNC_POINT(
887+
"DBWriteBufferManagerTest::RuntimeChangeableThreadSafeParameters::"
888+
"ChangeParameter");
889+
options.write_buffer_manager->SetAllowStall(false);
890+
});
891+
892+
// Verify `allow_stall` is successfully set to false in thread2.
893+
// Othwerwise, thread1's write will be stalled and this test will hang
894+
// forever.
895+
thread1.join();
896+
thread2.join();
897+
898+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
899+
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
900+
901+
// Test 2: test setting `allow_stall` from false to true
902+
//
903+
// Assert no write stall
904+
ASSERT_OK(Put(Key(0), DummyString(kBigValue), wo_no_slowdown));
905+
906+
// Test `SetAllowStall()`
907+
options.write_buffer_manager->SetAllowStall(true);
908+
909+
// Verify `allow_stall` is successfully set to true.
910+
// Otherwise the following write will not be stalled and therefore succeed.
911+
s = Put(Key(0), DummyString(kBigValue), wo_no_slowdown);
912+
ASSERT_TRUE(s.IsIncomplete());
913+
ASSERT_TRUE(s.ToString().find("Write stall") != std::string::npos);
914+
sleeping_task->WakeUp();
915+
}
849916

850917
INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
851918
testing::Bool());

include/rocksdb/write_buffer_manager.h

+9-2
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,20 @@ class WriteBufferManager final {
8181
return buffer_size_.load(std::memory_order_relaxed);
8282
}
8383

84+
// REQUIRED: `new_size` > 0
8485
void SetBufferSize(size_t new_size) {
86+
assert(new_size > 0);
8587
buffer_size_.store(new_size, std::memory_order_relaxed);
8688
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
8789
// Check if stall is active and can be ended.
8890
MaybeEndWriteStall();
8991
}
9092

93+
void SetAllowStall(bool new_allow_stall) {
94+
allow_stall_.store(new_allow_stall, std::memory_order_relaxed);
95+
MaybeEndWriteStall();
96+
}
97+
9198
// Below functions should be called by RocksDB internally.
9299

93100
// Should only be called from write thread
@@ -117,7 +124,7 @@ class WriteBufferManager final {
117124
//
118125
// Should only be called by RocksDB internally .
119126
bool ShouldStall() const {
120-
if (!allow_stall_ || !enabled()) {
127+
if (!allow_stall_.load(std::memory_order_relaxed) || !enabled()) {
121128
return false;
122129
}
123130

@@ -165,7 +172,7 @@ class WriteBufferManager final {
165172
std::list<StallInterface*> queue_;
166173
// Protects the queue_ and stall_active_.
167174
std::mutex mu_;
168-
bool allow_stall_;
175+
std::atomic<bool> allow_stall_;
169176
// Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
170177
// while holding mu_, but it can be read without a lock.
171178
std::atomic<bool> stall_active_;

memtable/write_buffer_manager.cc

+4-9
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) {
117117

118118
void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
119119
assert(wbm_stall != nullptr);
120-
assert(allow_stall_);
121120

122121
// Allocate outside of the lock.
123122
std::list<StallInterface*> new_node = {wbm_stall};
@@ -140,16 +139,12 @@ void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
140139

141140
// Called when memory is freed in FreeMem or the buffer size has changed.
142141
void WriteBufferManager::MaybeEndWriteStall() {
143-
// Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock
144-
// the writers.
145-
if (!allow_stall_) {
142+
// Stall conditions have not been resolved.
143+
if (allow_stall_.load(std::memory_order_relaxed) &&
144+
IsStallThresholdExceeded()) {
146145
return;
147146
}
148147

149-
if (IsStallThresholdExceeded()) {
150-
return; // Stall conditions have not resolved.
151-
}
152-
153148
// Perform all deallocations outside of the lock.
154149
std::list<StallInterface*> cleanup;
155150

@@ -174,7 +169,7 @@ void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
174169
// Deallocate the removed nodes outside of the lock.
175170
std::list<StallInterface*> cleanup;
176171

177-
if (enabled() && allow_stall_) {
172+
if (enabled() && allow_stall_.load(std::memory_order_relaxed)) {
178173
std::unique_lock<std::mutex> lock(mu_);
179174
for (auto it = queue_.begin(); it != queue_.end();) {
180175
auto next = std::next(it);

0 commit comments

Comments
 (0)