Skip to content

Commit 94e3bee

Browse files
pdillingerfacebook-github-bot
authored andcommitted
Cleanup, improve, stress test LockWAL() (facebook#11143)
Summary: The previous API comments for LockWAL didn't provide much about why you might want to use it, and didn't really meet what one would infer its contract was. Also, LockWAL was not in db_stress / crash test. In this change: * Implement a counting semantics for LockWAL()+UnlockWAL(), so that they can safely be used concurrently across threads or recursively within a thread. This should make the API much less bug-prone and easier to use. * Make sure no UnlockWAL() is needed after non-OK LockWAL() (to match RocksDB conventions) * Make UnlockWAL() reliably return non-OK when there's no matching LockWAL() (for debug-ability) * Clarify API comments on LockWAL(), UnlockWAL(), FlushWAL(), and SyncWAL(). Their exact meanings are not obvious, and I don't think it's appropriate to talk about implementation mutexes in the API comments, but about what operations might block each other. * Add LockWAL()/UnlockWAL() to db_stress and crash test, mostly to check for assertion failures, but also checks that latest seqno doesn't change while WAL is locked. This is simpler to add when LockWAL() is allowed in multiple threads. * Remove unnecessary use of sync points in test DBWALTest::LockWal. There was a bug during development of above changes that caused this test to fail sporadically, with and without this sync point change. Pull Request resolved: facebook#11143 Test Plan: unit tests added / updated, added to stress/crash test Reviewed By: ajkr Differential Revision: D42848627 Pulled By: pdillinger fbshipit-source-id: 6d976c51791941a31fd8fbf28b0f82e888d9f4b4
1 parent 36174d8 commit 94e3bee

File tree

12 files changed

+257
-66
lines changed

12 files changed

+257
-66
lines changed

HISTORY.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Bug Fixes
88
* Fixed a data race on `ColumnFamilyData::flush_reason` caused by concurrent flushes.
99
* Fixed an issue in `Get` and `MultiGet` when user-defined timestamps is enabled in combination with BlobDB.
10+
* Fixed some atypical behaviors for `LockWAL()` such as allowing concurrent/recursive use and not expecting `UnlockWAL()` after non-OK result. See API comments.
1011

1112
### Feature Removal
1213
* Remove RocksDB Lite.

buckifier/buckify_rocksdb.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
# $python3 buckifier/buckify_rocksdb.py \
2727
# '{"fake": {
2828
# "extra_deps": [":test_dep", "//fakes/module:mock1"],
29-
# "extra_compiler_flags": ["-DROCKSDB_LITE", "-Os"]
29+
# "extra_compiler_flags": ["-DFOO_BAR", "-Os"]
3030
# }
3131
# }'
3232
# (Generated TARGETS file has test_dep and mock1 as dependencies for RocksDB

db/db_impl/db_impl.cc

+45-21
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
243243
atomic_flush_install_cv_(&mutex_),
244244
blob_callback_(immutable_db_options_.sst_file_manager.get(), &mutex_,
245245
&error_handler_, &event_logger_,
246-
immutable_db_options_.listeners, dbname_) {
246+
immutable_db_options_.listeners, dbname_),
247+
lock_wal_count_(0) {
247248
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
248249
// WriteUnprepared, which should use seq_per_batch_.
249250
assert(batch_per_txn_ || seq_per_batch_);
@@ -1429,15 +1430,10 @@ Status DBImpl::FlushWAL(bool sync) {
14291430
return SyncWAL();
14301431
}
14311432

1432-
bool DBImpl::WALBufferIsEmpty(bool lock) {
1433-
if (lock) {
1434-
log_write_mutex_.Lock();
1435-
}
1433+
bool DBImpl::WALBufferIsEmpty() {
1434+
InstrumentedMutexLock l(&log_write_mutex_);
14361435
log::Writer* cur_log_writer = logs_.back().writer;
14371436
auto res = cur_log_writer->BufferIsEmpty();
1438-
if (lock) {
1439-
log_write_mutex_.Unlock();
1440-
}
14411437
return res;
14421438
}
14431439

@@ -1539,29 +1535,57 @@ Status DBImpl::ApplyWALToManifest(VersionEdit* synced_wals) {
15391535
Status DBImpl::LockWAL() {
15401536
{
15411537
InstrumentedMutexLock lock(&mutex_);
1542-
WriteThread::Writer w;
1543-
write_thread_.EnterUnbatched(&w, &mutex_);
1544-
WriteThread::Writer nonmem_w;
1545-
if (two_write_queues_) {
1546-
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1547-
}
1538+
if (lock_wal_count_ > 0) {
1539+
assert(lock_wal_write_token_);
1540+
++lock_wal_count_;
1541+
} else {
1542+
WriteThread::Writer w;
1543+
write_thread_.EnterUnbatched(&w, &mutex_);
1544+
WriteThread::Writer nonmem_w;
1545+
if (two_write_queues_) {
1546+
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
1547+
}
15481548

1549-
lock_wal_write_token_ = write_controller_.GetStopToken();
1549+
// NOTE: releasing mutex in EnterUnbatched might mean we are actually
1550+
// now lock_wal_count > 0
1551+
if (lock_wal_count_ == 0) {
1552+
assert(!lock_wal_write_token_);
1553+
lock_wal_write_token_ = write_controller_.GetStopToken();
1554+
}
1555+
++lock_wal_count_;
15501556

1551-
if (two_write_queues_) {
1552-
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1557+
if (two_write_queues_) {
1558+
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
1559+
}
1560+
write_thread_.ExitUnbatched(&w);
15531561
}
1554-
write_thread_.ExitUnbatched(&w);
15551562
}
1556-
return FlushWAL(/*sync=*/false);
1563+
// NOTE: avoid I/O holding DB mutex
1564+
Status s = FlushWAL(/*sync=*/false);
1565+
if (!s.ok()) {
1566+
// Non-OK return should not be in locked state
1567+
UnlockWAL().PermitUncheckedError();
1568+
}
1569+
return s;
15571570
}
15581571

15591572
Status DBImpl::UnlockWAL() {
1573+
bool signal = false;
15601574
{
15611575
InstrumentedMutexLock lock(&mutex_);
1562-
lock_wal_write_token_.reset();
1576+
if (lock_wal_count_ == 0) {
1577+
return Status::Aborted("No LockWAL() in effect");
1578+
}
1579+
--lock_wal_count_;
1580+
if (lock_wal_count_ == 0) {
1581+
lock_wal_write_token_.reset();
1582+
signal = true;
1583+
}
1584+
}
1585+
if (signal) {
1586+
// SignalAll outside of mutex for efficiency
1587+
bg_cv_.SignalAll();
15631588
}
1564-
bg_cv_.SignalAll();
15651589
return Status::OK();
15661590
}
15671591

db/db_impl/db_impl.h

+8-3
Original file line numberDiff line numberDiff line change
@@ -426,7 +426,7 @@ class DBImpl : public DB {
426426
const FlushOptions& options,
427427
const std::vector<ColumnFamilyHandle*>& column_families) override;
428428
virtual Status FlushWAL(bool sync) override;
429-
bool WALBufferIsEmpty(bool lock = true);
429+
bool WALBufferIsEmpty();
430430
virtual Status SyncWAL() override;
431431
virtual Status LockWAL() override;
432432
virtual Status UnlockWAL() override;
@@ -2663,9 +2663,14 @@ class DBImpl : public DB {
26632663
// thread safe, both read and write need db mutex hold.
26642664
SeqnoToTimeMapping seqno_time_mapping_;
26652665

2666-
// stop write token that is acquired when LockWal() is called. Destructed
2667-
// when UnlockWal() is called.
2666+
// Stop write token that is acquired when first LockWAL() is called.
2667+
// Destroyed when last UnlockWAL() is called. Controlled by DB mutex.
2668+
// See lock_wal_count_
26682669
std::unique_ptr<WriteControllerToken> lock_wal_write_token_;
2670+
2671+
// The number of LockWAL called without matching UnlockWAL call.
2672+
// See also lock_wal_write_token_
2673+
uint32_t lock_wal_count_;
26692674
};
26702675

26712676
class GetWithTimestampReadCallback : public ReadCallback {

db/db_wal_test.cc

+4-13
Original file line numberDiff line numberDiff line change
@@ -613,19 +613,9 @@ TEST_F(DBWALTest, LockWal) {
613613
Options options = CurrentOptions();
614614
options.create_if_missing = true;
615615
DestroyAndReopen(options);
616-
SyncPoint::GetInstance()->DisableProcessing();
617-
SyncPoint::GetInstance()->LoadDependency(
618-
{{"DBWALTest::LockWal:AfterGetSortedWal",
619-
"DBWALTest::LockWal:BeforeFlush:1"}});
620-
SyncPoint::GetInstance()->EnableProcessing();
621616

622617
ASSERT_OK(Put("foo", "v"));
623618
ASSERT_OK(Put("bar", "v"));
624-
port::Thread worker([&]() {
625-
TEST_SYNC_POINT("DBWALTest::LockWal:BeforeFlush:1");
626-
Status tmp_s = db_->Flush(FlushOptions());
627-
ASSERT_OK(tmp_s);
628-
});
629619

630620
ASSERT_OK(db_->LockWAL());
631621
// Verify writes are stopped
@@ -638,7 +628,10 @@ TEST_F(DBWALTest, LockWal) {
638628
ASSERT_OK(db_->GetSortedWalFiles(wals));
639629
ASSERT_FALSE(wals.empty());
640630
}
641-
TEST_SYNC_POINT("DBWALTest::LockWal:AfterGetSortedWal");
631+
port::Thread worker([&]() {
632+
Status tmp_s = db_->Flush(FlushOptions());
633+
ASSERT_OK(tmp_s);
634+
});
642635
FlushOptions flush_opts;
643636
flush_opts.wait = false;
644637
s = db_->Flush(flush_opts);
@@ -647,8 +640,6 @@ TEST_F(DBWALTest, LockWal) {
647640
ASSERT_OK(db_->Put(WriteOptions(), "foo", "dontcare"));
648641

649642
worker.join();
650-
651-
SyncPoint::GetInstance()->DisableProcessing();
652643
} while (ChangeWalOptions());
653644
}
654645

db/db_write_test.cc

+114-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
// (found in the LICENSE.Apache file in the root directory).
55

66
#include <atomic>
7+
#include <cstdint>
78
#include <fstream>
89
#include <memory>
910
#include <thread>
@@ -605,23 +606,124 @@ TEST_P(DBWriteTest, IOErrorOnSwitchMemtable) {
605606
Close();
606607
}
607608

608-
// Test that db->LockWAL() flushes the WAL after locking.
609-
TEST_P(DBWriteTest, LockWalInEffect) {
609+
// Test that db->LockWAL() flushes the WAL after locking, which can fail
610+
TEST_P(DBWriteTest, LockWALInEffect) {
610611
Options options = GetOptions();
612+
std::unique_ptr<FaultInjectionTestEnv> mock_env(
613+
new FaultInjectionTestEnv(env_));
614+
options.env = mock_env.get();
615+
options.paranoid_checks = false;
611616
Reopen(options);
612617
// try the 1st WAL created during open
613-
ASSERT_OK(Put("key" + std::to_string(0), "value"));
614-
ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
615-
ASSERT_OK(dbfull()->LockWAL());
616-
ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false));
617-
ASSERT_OK(dbfull()->UnlockWAL());
618+
ASSERT_OK(Put("key0", "value"));
619+
ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());
620+
ASSERT_OK(db_->LockWAL());
621+
ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
622+
ASSERT_OK(db_->UnlockWAL());
618623
// try the 2nd wal created during SwitchWAL
619624
ASSERT_OK(dbfull()->TEST_SwitchWAL());
620-
ASSERT_OK(Put("key" + std::to_string(0), "value"));
621-
ASSERT_TRUE(options.manual_wal_flush != dbfull()->WALBufferIsEmpty());
622-
ASSERT_OK(dbfull()->LockWAL());
623-
ASSERT_TRUE(dbfull()->WALBufferIsEmpty(false));
624-
ASSERT_OK(dbfull()->UnlockWAL());
625+
ASSERT_OK(Put("key1", "value"));
626+
ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());
627+
ASSERT_OK(db_->LockWAL());
628+
ASSERT_TRUE(dbfull()->WALBufferIsEmpty());
629+
ASSERT_OK(db_->UnlockWAL());
630+
631+
// Fail the WAL flush if applicable
632+
mock_env->SetFilesystemActive(false);
633+
Status s = Put("key2", "value");
634+
if (options.manual_wal_flush) {
635+
ASSERT_OK(s);
636+
// I/O failure
637+
ASSERT_NOK(db_->LockWAL());
638+
// Should not need UnlockWAL after LockWAL fails
639+
} else {
640+
ASSERT_NOK(s);
641+
ASSERT_OK(db_->LockWAL());
642+
ASSERT_OK(db_->UnlockWAL());
643+
}
644+
mock_env->SetFilesystemActive(true);
645+
// Writes should work again
646+
ASSERT_OK(Put("key3", "value"));
647+
ASSERT_EQ(Get("key3"), "value");
648+
649+
// Should be extraneous, but allowed
650+
ASSERT_NOK(db_->UnlockWAL());
651+
652+
// Close before mock_env destruct.
653+
Close();
654+
}
655+
656+
TEST_P(DBWriteTest, LockWALConcurrentRecursive) {
657+
Options options = GetOptions();
658+
Reopen(options);
659+
ASSERT_OK(Put("k1", "val"));
660+
ASSERT_OK(db_->LockWAL()); // 0 -> 1
661+
auto frozen_seqno = db_->GetLatestSequenceNumber();
662+
std::atomic<bool> t1_completed{false};
663+
port::Thread t1{[&]() {
664+
// Won't finish until WAL unlocked
665+
ASSERT_OK(Put("k1", "val2"));
666+
t1_completed = true;
667+
}};
668+
669+
ASSERT_OK(db_->LockWAL()); // 1 -> 2
670+
// Read-only ops are OK
671+
ASSERT_EQ(Get("k1"), "val");
672+
{
673+
std::vector<LiveFileStorageInfo> files;
674+
LiveFilesStorageInfoOptions lf_opts;
675+
// A DB flush could deadlock
676+
lf_opts.wal_size_for_flush = UINT64_MAX;
677+
ASSERT_OK(db_->GetLiveFilesStorageInfo({lf_opts}, &files));
678+
}
679+
680+
port::Thread t2{[&]() {
681+
ASSERT_OK(db_->LockWAL()); // 2 -> 3 or 1 -> 2
682+
}};
683+
684+
ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 or 3 -> 2
685+
// Give t1 an extra chance to jump in case of bug
686+
std::this_thread::yield();
687+
t2.join();
688+
ASSERT_FALSE(t1_completed.load());
689+
690+
// Should now have 2 outstanding LockWAL
691+
ASSERT_EQ(Get("k1"), "val");
692+
693+
ASSERT_OK(db_->UnlockWAL()); // 2 -> 1
694+
695+
ASSERT_FALSE(t1_completed.load());
696+
ASSERT_EQ(Get("k1"), "val");
697+
ASSERT_EQ(frozen_seqno, db_->GetLatestSequenceNumber());
698+
699+
// Ensure final Unlock is concurrency safe and extra Unlock is safe but
700+
// non-OK
701+
std::atomic<int> unlock_ok{0};
702+
port::Thread t3{[&]() {
703+
if (db_->UnlockWAL().ok()) {
704+
unlock_ok++;
705+
}
706+
ASSERT_OK(db_->LockWAL());
707+
if (db_->UnlockWAL().ok()) {
708+
unlock_ok++;
709+
}
710+
}};
711+
712+
if (db_->UnlockWAL().ok()) {
713+
unlock_ok++;
714+
}
715+
t3.join();
716+
717+
// There was one extra unlock, so just one non-ok
718+
ASSERT_EQ(unlock_ok.load(), 2);
719+
720+
// Write can proceed
721+
t1.join();
722+
ASSERT_TRUE(t1_completed.load());
723+
ASSERT_EQ(Get("k1"), "val2");
724+
// And new writes
725+
ASSERT_OK(Put("k2", "val"));
726+
ASSERT_EQ(Get("k2"), "val");
625727
}
626728

627729
TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {

db_stress_tool/db_stress_common.h

+1
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ DECLARE_int64(active_width);
8787
DECLARE_bool(test_batches_snapshots);
8888
DECLARE_bool(atomic_flush);
8989
DECLARE_int32(manual_wal_flush_one_in);
90+
DECLARE_int32(lock_wal_one_in);
9091
DECLARE_bool(test_cf_consistency);
9192
DECLARE_bool(test_multi_ops_txns);
9293
DECLARE_int32(threads);

db_stress_tool/db_stress_gflags.cc

+4
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ DEFINE_int32(
9292
"on average. Setting `manual_wal_flush_one_in` to be greater than 0 "
9393
"implies `Options::manual_wal_flush = true` is set.");
9494

95+
DEFINE_int32(lock_wal_one_in, 1000000,
96+
"If non-zero, then `LockWAL()` + `UnlockWAL()` will be called in "
97+
"db_stress once for every N ops on average.");
98+
9599
DEFINE_bool(test_cf_consistency, false,
96100
"If set, runs the stress test dedicated to verifying writes to "
97101
"multiple column families are consistent. Setting this implies "

db_stress_tool/db_stress_test_base.cc

+26
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
//
1010

1111
#include <ios>
12+
#include <thread>
1213

1314
#include "util/compression.h"
1415
#ifdef GFLAGS
@@ -828,6 +829,31 @@ void StressTest::OperateDb(ThreadState* thread) {
828829
}
829830
}
830831

832+
if (thread->rand.OneInOpt(FLAGS_lock_wal_one_in)) {
833+
Status s = db_->LockWAL();
834+
if (!s.ok()) {
835+
fprintf(stderr, "LockWAL() failed: %s\n", s.ToString().c_str());
836+
} else {
837+
auto old_seqno = db_->GetLatestSequenceNumber();
838+
// Yield for a while
839+
do {
840+
std::this_thread::yield();
841+
} while (thread->rand.OneIn(2));
842+
// Latest seqno should not have changed
843+
auto new_seqno = db_->GetLatestSequenceNumber();
844+
if (old_seqno != new_seqno) {
845+
fprintf(
846+
stderr,
847+
"Failure: latest seqno changed from %u to %u with WAL locked\n",
848+
(unsigned)old_seqno, (unsigned)new_seqno);
849+
}
850+
s = db_->UnlockWAL();
851+
if (!s.ok()) {
852+
fprintf(stderr, "UnlockWAL() failed: %s\n", s.ToString().c_str());
853+
}
854+
}
855+
}
856+
831857
if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
832858
Status s = db_->SyncWAL();
833859
if (!s.ok() && !s.IsNotSupported()) {

0 commit comments

Comments
 (0)