diff --git a/db/db_impl.cc b/db/db_impl.cc index e86cb077d9..f5b905d037 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -100,8 +100,6 @@ #if !defined(_MSC_VER) && !defined(__APPLE__) #include - - #endif #include "utilities/util/valvec.hpp" @@ -112,8 +110,9 @@ #ifdef WITH_TERARK_ZIP #include -#include + #include +#include #endif #ifdef BOOSTLIB @@ -121,7 +120,6 @@ #endif //#include - #ifdef __GNUC__ #pragma GCC diagnostic pop #endif @@ -275,6 +273,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, use_custom_gc_(seq_per_batch), shutdown_initiated_(false), own_sfm_(options.sst_file_manager == nullptr), + write_wal_while_sync_(options.write_wal_while_sync), preserve_deletes_(options.preserve_deletes), closed_(false), error_handler_(this, immutable_db_options_, &mutex_), @@ -962,6 +961,7 @@ Status DBImpl::SetDBOptions( s = GetMutableDBOptionsFromStrings(mutable_db_options_, options_map, &new_options); if (s.ok()) { + write_wal_while_sync_ = new_options.write_wal_while_sync; auto bg_job_limits = DBImpl::GetBGJobLimits( immutable_db_options_.max_background_flushes, new_options.max_background_compactions, diff --git a/db/db_impl.h b/db/db_impl.h index fd5105d1d8..f9db08e113 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -832,6 +832,13 @@ class DBImpl : public DB { uint64_t* seq_used = nullptr, size_t batch_cnt = 0, PreReleaseCallback* pre_release_callback = nullptr); + Status WriteWhileSyncWriteImpl( + const WriteOptions& options, WriteBatch* updates, + WriteCallback* callback = nullptr, uint64_t* log_used = nullptr, + uint64_t log_ref = 0, bool disable_memtable = false, + uint64_t* seq_used = nullptr, size_t batch_cnt = 0, + PreReleaseCallback* pre_release_callback = nullptr); + // write cached_recoverable_state_ to memtable if it is not empty // The writer must be the leader in write_thread_ and holding mutex_ Status WriteRecoverableState(); @@ -1667,6 +1674,8 @@ class DBImpl : public DB { // DB::Open() or passed to us bool own_sfm_; + bool write_wal_while_sync_; + // Clients must periodically call SetPreserveDeletesSequenceNumber() // to advance this seqnum. Default value is 0 which means ALL deletes are // preserved. Note that this has no effect if DBOptions.preserve_deletes diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 85ef466ce7..f7c4d60cf0 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -122,6 +122,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, log_ref, disable_memtable, seq_used); } + if (write_options.sync && !write_options.disableWAL && !two_write_queues_ && + write_wal_while_sync_) { + return WriteWhileSyncWriteImpl(write_options, my_batch, callback, log_used, + log_ref, disable_memtable, seq_used, + batch_cnt, pre_release_callback); + } + PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, log_ref, disable_memtable, batch_cnt, pre_release_callback); @@ -682,6 +689,310 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, return status; } +Status DBImpl::WriteWhileSyncWriteImpl( + const WriteOptions& write_options, WriteBatch* my_batch, + WriteCallback* callback, uint64_t* log_used, uint64_t log_ref, + bool disable_memtable, uint64_t* seq_used, size_t batch_cnt, + PreReleaseCallback* pre_release_callback) { + Status status; + + PERF_TIMER_GUARD(write_pre_and_post_process_time); + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable, batch_cnt, pre_release_callback); + + RecordTick(stats_, WRITE_WITH_WAL); + + StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE); + + std::vector writers; + std::vector manual_wake_followers; + std::vector> defer_sync_funcs; + + write_thread_.JoinBatchGroup(&w); + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { + // we are a non-leader in a parallel group + + if (w.ShouldWriteToMemtable()) { + PERF_TIMER_STOP(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_memtable_time); + + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + w.status = WriteBatchInternal::InsertInto( + &w, w.sequence, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt); + + PERF_TIMER_START(write_pre_and_post_process_time); + } + + if (write_thread_.CompleteParallelMemTableWriter(&w)) { + // we're responsible for exit batch group + for (auto* writer : *(w.write_group)) { + if (!writer->CallbackFailed() && writer->pre_release_callback) { + assert(writer->sequence != kMaxSequenceNumber); + Status ws = writer->pre_release_callback->Callback(writer->sequence, + disable_memtable); + if (!ws.ok()) { + status = ws; + break; + } + } + } + // TODO(myabandeh): propagate status to write_group + auto last_sequence = w.write_group->last_sequence; + MemTableInsertStatusCheck(w.status); + // TODO(linyuanjin): may violate consistency + versions_->SetLastSequence(last_sequence); + write_thread_.ExitAsBatchGroupFollower(&w, &manual_wake_followers); + status = w.write_group->exit_callback(); + + for (auto* follower : manual_wake_followers) { + follower->status = status; + WriteThread::SetStateCompleted(follower); + } + } + assert(w.state == WriteThread::STATE_COMPLETED); + // STATE_COMPLETED conditional below handles exit + + status = w.FinalStatus(); + } + if (w.state == WriteThread::STATE_COMPLETED) { + if (log_used != nullptr) { + *log_used = w.log_used; + } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + // write is complete and leader has updated sequence + return w.FinalStatus(); + } + // else we are the leader of the write batch group + assert(w.state == WriteThread::STATE_GROUP_LEADER); + + // Once reaches this point, the current writer "w" will try to do its write + // job. It may also pick up some of the remaining writers in the "writers_" + // when it finds suitable, and finish them in the same write batch. + // This is how a write job could be done by the other writer. + WriteContext write_context; + WriteThread::WriteGroup write_group; + bool in_parallel_group = false; + uint64_t last_sequence = kMaxSequenceNumber; + if (!two_write_queues_) { + last_sequence = versions_->LastSequence(); + } + + mutex_.Lock(); + + bool need_log_sync = false /* write_options.sync */; + bool need_log_dir_sync = need_log_sync && !log_dir_synced_; + + if (!two_write_queues_ || !disable_memtable) { + // With concurrent writes we do preprocess only in the write thread that + // also does write to memtable to avoid sync issue on shared data structure + // with the other thread + + // PreprocessWrite does its own perf timing. + PERF_TIMER_STOP(write_pre_and_post_process_time); + + status = PreprocessWrite(write_options, &need_log_sync, &write_context); + + PERF_TIMER_START(write_pre_and_post_process_time); + } + log::Writer* log_writer = logs_.back().writer; + + for (auto& log : logs_) { + writers.emplace_back(log.writer); + } + + mutex_.Unlock(); + + // Add to log and apply to memtable. We can release the lock + // during this phase since &w is currently responsible for logging + // and protects against concurrent loggers and concurrent writes + // into memtables + + TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters"); + last_batch_group_size_ = + write_thread_.EnterAsBatchGroupLeader(&w, &write_group); + + if (status.ok()) { + // Rules for when we can update the memtable concurrently + // 1. supported by memtable + // 2. Puts are not okay if inplace_update_support + // 3. Merges are not okay + // + // Rules 1..2 are enforced by checking the options + // during startup (CheckConcurrentWritesSupported), so if + // options.allow_concurrent_memtable_write is true then they can be + // assumed to be true. Rule 3 is checked for each batch. We could + // relax rules 2 if we could prevent write batches from referring + // more than once to a particular key. + bool parallel = immutable_db_options_.allow_concurrent_memtable_write && + write_group.size > 1; + size_t total_count = 0; + size_t valid_batches = 0; + size_t total_byte_size = 0; + for (auto* writer : write_group) { + if (writer->CheckCallback(this)) { + valid_batches += writer->batch_cnt; + if (writer->ShouldWriteToMemtable()) { + total_count += WriteBatchInternal::Count(writer->batch); + parallel = parallel && !writer->batch->HasMerge(); + } + + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + } + } + // Note about seq_per_batch_: either disableWAL is set for the entire write + // group or not. In either case we inc seq for each write batch with no + // failed callback. This means that there could be a batch with + // disalbe_memtable in between; although we do not write this batch to + // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc + // the seq per valid written key to mem. + size_t seq_inc = seq_per_batch_ ? valid_batches : total_count; + + const bool concurrent_update = two_write_queues_; + // Update stats while we are an exclusive group leader, so we know + // that nobody else can be writing to these particular stats. + // We're optimistic, updating the stats before we successfully + // commit. That lets us release our leader status early. + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count, + concurrent_update); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size, + concurrent_update); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1, concurrent_update); + RecordTick(stats_, WRITE_DONE_BY_SELF); + auto write_done_by_other = write_group.size - 1; + if (write_done_by_other > 0) { + stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other, + concurrent_update); + RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); + } + MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); + + PERF_TIMER_STOP(write_pre_and_post_process_time); + + if (status.ok() /* && !write_options.disableWAL */) { + PERF_TIMER_GUARD(write_wal_time); + status = WriteToWAL(write_group, log_writer, log_used, need_log_sync, + need_log_dir_sync, last_sequence + 1); + for (auto& w : writers) { + defer_sync_funcs.emplace_back(w->get_defer_sync_func()); + } + w.write_group->exit_callback = [func_vec{std::move(defer_sync_funcs)}]() { + for (auto& func : func_vec) { + auto s = func(); + if (!s.ok()) { + return s; + } + } + return Status::OK(); + }; + } + assert(last_sequence != kMaxSequenceNumber); + const SequenceNumber current_sequence = last_sequence + 1; + last_sequence += seq_inc; + + if (status.ok()) { + PERF_TIMER_GUARD(write_memtable_time); + + if (!parallel) { + // w.sequence will be set inside InsertInto + w.status = WriteBatchInternal::InsertInto( + write_group, current_sequence, column_family_memtables_.get(), + &flush_scheduler_, write_options.ignore_missing_column_families, + 0 /*recovery_log_number*/, this, parallel, seq_per_batch_, + batch_per_txn_); + } else { + SequenceNumber next_sequence = current_sequence; + // Note: the logic for advancing seq here must be consistent with the + // logic in WriteBatchInternal::InsertInto(write_group...) as well as + // with WriteBatchInternal::InsertInto(write_batch...) that is called on + // the merged batch during recovery from the WAL. + for (auto* writer : write_group) { + if (writer->CallbackFailed()) { + continue; + } + writer->sequence = next_sequence; + if (seq_per_batch_) { + assert(writer->batch_cnt); + next_sequence += writer->batch_cnt; + } else if (writer->ShouldWriteToMemtable()) { + next_sequence += WriteBatchInternal::Count(writer->batch); + } + } + write_group.last_sequence = last_sequence; + write_thread_.LaunchParallelMemTableWriters(&write_group); + in_parallel_group = true; + + // Each parallel follower is doing each own writes. The leader should + // also do its own. + if (w.ShouldWriteToMemtable()) { + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + assert(w.sequence == current_sequence); + w.status = WriteBatchInternal::InsertInto( + &w, w.sequence, &column_family_memtables, &flush_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, + this, true /*concurrent_memtable_writes*/, seq_per_batch_, + w.batch_cnt, batch_per_txn_); + } + } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + } + } + PERF_TIMER_START(write_pre_and_post_process_time); + + if (!w.CallbackFailed()) { + WriteStatusCheck(status); + } + + bool should_exit_batch_group = true; + if (in_parallel_group) { + // CompleteParallelWorker returns true if this thread should + // handle exit, false means somebody else did + should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); + } + if (should_exit_batch_group) { + if (status.ok()) { + for (auto* writer : write_group) { + if (!writer->CallbackFailed() && writer->pre_release_callback) { + assert(writer->sequence != kMaxSequenceNumber); + Status ws = writer->pre_release_callback->Callback(writer->sequence, + disable_memtable); + if (!ws.ok()) { + status = ws; + break; + } + } + } + } + MemTableInsertStatusCheck(w.status); + // TODO(linyuanjin): may violate consistency + versions_->SetLastSequence(last_sequence); + write_thread_.ExitAsBatchGroupLeader(write_group, status, + &manual_wake_followers); + status = write_group.exit_callback(); + + for (auto* follower : manual_wake_followers) { + follower->status = status; + WriteThread::SetStateCompleted(follower); + } + } + + if (status.ok()) { + status = w.FinalStatus(); + } + return status; +} + void DBImpl::WriteStatusCheck(const Status& status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. diff --git a/db/db_options_test.cc b/db/db_options_test.cc index e563b2777f..7a14f98288 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -136,9 +136,7 @@ TEST_F(DBOptionsTest, SetBytesPerSync) { const std::string kValue(kValueSize, 'v'); ASSERT_EQ(options.bytes_per_sync, dbfull()->GetDBOptions().bytes_per_sync); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) { - counter++; - }); + "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) { counter++; }); WriteOptions write_opts; // should sync approximately 40MB/1MB ~= 40 times. @@ -188,9 +186,7 @@ TEST_F(DBOptionsTest, SetWalBytesPerSync) { int counter = 0; int low_bytes_per_sync = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) { - counter++; - }); + "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) { counter++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); const std::string kValue(kValueSize, 'v'); int i = 0; @@ -200,7 +196,7 @@ TEST_F(DBOptionsTest, SetWalBytesPerSync) { // Do not flush. If we flush here, SwitchWAL will reuse old WAL file since its // empty and will not get the new wal_bytes_per_sync value. low_bytes_per_sync = counter; - //5242880 = 1024 * 1024 * 5 + // 5242880 = 1024 * 1024 * 5 ASSERT_OK(dbfull()->SetDBOptions({{"wal_bytes_per_sync", "5242880"}})); ASSERT_EQ(5242880, dbfull()->GetDBOptions().wal_bytes_per_sync); counter = 0; @@ -408,7 +404,7 @@ TEST_F(DBOptionsTest, SetOptionsMayTriggerCompaction) { TEST_F(DBOptionsTest, SetBackgroundCompactionThreads) { Options options; options.create_if_missing = true; - options.max_background_compactions = 1; // default value + options.max_background_compactions = 1; // default value options.env = env_; options.enable_lazy_compaction = false; options.blob_size = -1; @@ -477,13 +473,41 @@ TEST_F(DBOptionsTest, AvoidFlushDuringShutdown) { ASSERT_EQ("", FilesPerLevel()); } +TEST_F(DBOptionsTest, WriteWalWhileSync) { + Options options; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.env = env_; + options.enable_lazy_compaction = false; + options.blob_size = -1; + + WriteOptions wopt; + wopt.sync = true; + + ASSERT_FALSE(options.write_wal_while_sync); + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "v1", wopt)); + Reopen(options); + ASSERT_EQ("v1", Get("foo")); + + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "v2", wopt)); + ASSERT_EQ("v2", Get("foo")); + ASSERT_OK(dbfull()->SetDBOptions({{"write_wal_while_sync", "true"}})); + Reopen(options); + + ASSERT_OK(Put("foo", "v3", wopt)); + ASSERT_EQ("v3", Get("foo")); +} + TEST_F(DBOptionsTest, SetDelayedWriteRateOption) { Options options; options.create_if_missing = true; options.delayed_write_rate = 2 * 1024U * 1024U; options.env = env_; Reopen(options); - ASSERT_EQ(2 * 1024U * 1024U, dbfull()->TEST_write_controler().max_delayed_write_rate()); + ASSERT_EQ(2 * 1024U * 1024U, + dbfull()->TEST_write_controler().max_delayed_write_rate()); ASSERT_OK(dbfull()->SetDBOptions({{"delayed_write_rate", "20000"}})); ASSERT_EQ(20000, dbfull()->TEST_write_controler().max_delayed_write_rate()); @@ -543,13 +567,11 @@ TEST_F(DBOptionsTest, RunStatsDumpPeriodSec) { options.blob_size = -1; std::unique_ptr mock_env; mock_env.reset(new rocksdb::MockTimeEnv(env_)); - mock_env->set_current_time(0); // in seconds + mock_env->set_current_time(0); // in seconds options.env = mock_env.get(); int counter = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::DumpStats:1", [&](void* /*arg*/) { - counter++; - }); + "DBImpl::DumpStats:1", [&](void* /*arg*/) { counter++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); ASSERT_EQ(5, dbfull()->GetDBOptions().stats_dump_period_sec); diff --git a/db/log_writer.cc b/db/log_writer.cc index bc99931b9a..55e64da02f 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -10,6 +10,7 @@ #include "db/log_writer.h" #include + #include "rocksdb/env.h" #include "util/coding.h" #include "util/crc32c.h" @@ -20,7 +21,7 @@ namespace log { Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, bool recycle_log_files, bool manual_flush) - : dest_(std::move(dest)), + : dest_(dest.release()), block_offset_(0), log_number_(log_number), recycle_log_files_(recycle_log_files), @@ -33,6 +34,11 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, Writer::~Writer() { WriteBuffer(); } +std::function Writer::get_defer_sync_func() { + dest_->Flush(); + return [f{dest_}]() { return f->SyncWithoutFlush(true); }; +} + Status Writer::WriteBuffer() { return dest_->Flush(); } Status Writer::AddRecord(const Slice& slice) { diff --git a/db/log_writer.h b/db/log_writer.h index 5aac045107..c05c49e788 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -83,12 +83,14 @@ class Writer { uint64_t get_log_number() const { return log_number_; } + std::function get_defer_sync_func(); + Status WriteBuffer(); bool TEST_BufferIsEmpty(); private: - std::unique_ptr dest_; + std::shared_ptr dest_; size_t block_offset_; // Current offset in block uint64_t log_number_; bool recycle_log_files_; diff --git a/db/version_set.h b/db/version_set.h index 7a22c3ad6a..f7871dcfd9 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -976,6 +976,17 @@ class VersionSet { last_sequence_.store(s, std::memory_order_release); } + // same as `SetLastSequence` but thread-safe via CAS + // Only seq that is larger than current `last_sequence_` is stored + void SetLastSequenceConcurrently(uint64_t s) { + uint64_t curr = last_sequence_.load(std::memory_order_acquire); + while (curr < s) { + if (last_sequence_.compare_exchange_weak(curr, s)) { + break; + } + } + } + // Note: memory_order_release must be sufficient void SetLastPublishedSequence(uint64_t s) { assert(s >= last_published_sequence_); diff --git a/db/write_thread.cc b/db/write_thread.cc index 835992c8fc..f83ab85af7 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -4,8 +4,10 @@ // (found in the LICENSE.Apache file in the root directory). #include "db/write_thread.h" + #include #include + #include "db/column_family.h" #include "monitoring/perf_context_imp.h" #include "port/port.h" @@ -218,6 +220,8 @@ void WriteThread::SetState(Writer* w, uint8_t new_state) { } } +void WriteThread::SetStateCompleted(Writer* w) { SetState(w, STATE_COMPLETED); } + bool WriteThread::LinkOne(Writer* w, std::atomic* newest_writer) { assert(newest_writer != nullptr); assert(w->state == STATE_INIT); @@ -388,8 +392,9 @@ void WriteThread::JoinBatchGroup(Writer* w) { * writes in parallel. */ TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:BeganWaiting", w); - AwaitState(w, STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | - STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, + AwaitState(w, + STATE_GROUP_LEADER | STATE_MEMTABLE_WRITER_LEADER | + STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, &jbg_ctx); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:DoneWaiting", w); } @@ -571,10 +576,10 @@ void WriteThread::LaunchParallelMemTableWriters(WriteGroup* write_group) { } } -static WriteThread::AdaptationContext cpmtw_ctx("CompleteParallelMemTableWriter"); +static WriteThread::AdaptationContext cpmtw_ctx( + "CompleteParallelMemTableWriter"); // This method is called by both the leader and parallel followers bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { - auto* write_group = w->write_group; if (!w->status.ok()) { std::lock_guard guard(write_group->leader->StateMutex()); @@ -591,20 +596,25 @@ bool WriteThread::CompleteParallelMemTableWriter(Writer* w) { return true; } -void WriteThread::ExitAsBatchGroupFollower(Writer* w) { +void WriteThread::ExitAsBatchGroupFollower( + Writer* w, std::vector* manual_wake_followers) { auto* write_group = w->write_group; assert(w->state == STATE_PARALLEL_MEMTABLE_WRITER); assert(write_group->status.ok()); - ExitAsBatchGroupLeader(*write_group, write_group->status); + ExitAsBatchGroupLeader(*write_group, write_group->status, + manual_wake_followers); assert(w->status.ok()); - assert(w->state == STATE_COMPLETED); + if (manual_wake_followers == nullptr) { + assert(w->state == STATE_COMPLETED); + } SetState(write_group->leader, STATE_COMPLETED); } static WriteThread::AdaptationContext eabgl_ctx("ExitAsBatchGroupLeader"); -void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, - Status status) { +void WriteThread::ExitAsBatchGroupLeader( + WriteGroup& write_group, Status status, + std::vector* manual_wake_followers) { Writer* leader = write_group.leader; Writer* last_writer = write_group.last_writer; assert(leader->link_older == nullptr); @@ -673,8 +683,9 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, next_leader->link_older = nullptr; SetState(next_leader, STATE_GROUP_LEADER); } - AwaitState(leader, STATE_MEMTABLE_WRITER_LEADER | - STATE_PARALLEL_MEMTABLE_WRITER | STATE_COMPLETED, + AwaitState(leader, + STATE_MEMTABLE_WRITER_LEADER | STATE_PARALLEL_MEMTABLE_WRITER | + STATE_COMPLETED, &eabgl_ctx); } else { Writer* head = newest_writer_.load(std::memory_order_acquire); @@ -709,6 +720,16 @@ void WriteThread::ExitAsBatchGroupLeader(WriteGroup& write_group, // else nobody else was waiting, although there might already be a new // leader now + if (manual_wake_followers != nullptr) { + manual_wake_followers->clear(); + while (last_writer != leader) { + auto next = last_writer->link_older; + manual_wake_followers->emplace_back(last_writer); + last_writer = next; + } + return; + } + while (last_writer != leader) { last_writer->status = status; // we need to read link_older before calling SetState, because as soon diff --git a/db/write_thread.h b/db/write_thread.h index a3802c996b..51ee6d1b4b 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -7,9 +7,11 @@ #include #include + #include #include #include +#include #include #include #include @@ -82,6 +84,7 @@ class WriteThread { Status status; std::atomic running; size_t size = 0; + std::function exit_callback; struct Iterator { Writer* writer; @@ -241,7 +244,7 @@ class WriteThread { std::condition_variable& StateCV() { assert(made_waitable); return *static_cast( - static_cast(&state_cv_bytes)); + static_cast(&state_cv_bytes)); } }; @@ -289,10 +292,13 @@ class WriteThread { // // WriteGroup* write_group: the write group // Status status: Status of write operation - void ExitAsBatchGroupLeader(WriteGroup& write_group, Status status); + void ExitAsBatchGroupLeader( + WriteGroup& write_group, Status status, + std::vector* manual_wake_followers = nullptr); // Exit batch group on behalf of batch group leader. - void ExitAsBatchGroupFollower(Writer* w); + void ExitAsBatchGroupFollower( + Writer* w, std::vector* manual_wake_followers = nullptr); // Constructs a write batch group led by leader from newest_memtable_writers_ // list. The leader should either write memtable for the whole group and @@ -349,6 +355,8 @@ class WriteThread { // Remove the dummy writer and wake up waiting writers void EndWriteStall(); + static void SetStateCompleted(Writer* w); + private: // See AwaitState. const uint64_t max_yield_usec_; @@ -394,7 +402,7 @@ class WriteThread { uint8_t AwaitState(Writer* w, uint8_t goal_mask, AdaptationContext* ctx); // Set writer state and wake the writer up if it is waiting. - void SetState(Writer* w, uint8_t new_state); + static void SetState(Writer* w, uint8_t new_state); // Links w into the newest_writer list. Return true if w was linked directly // into the leader position. Safe to call from multiple threads without diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 0566fc71f7..ed0a610044 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -689,6 +689,13 @@ struct DBOptions { // Not supported in ROCKSDB_LITE mode! bool use_direct_reads = false; + // Write Wal while WriteOptions.sync = true + // + // Dedefault: false + // + // Dynamically changeable through SetDBOptions() API. + bool write_wal_while_sync = false; + // Use O_DIRECT for writes in background flush and compactions. // Default: false // Not supported in ROCKSDB_LITE mode! diff --git a/options/db_options.cc b/options/db_options.cc index 99092b4125..e51f6656dc 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -254,6 +254,7 @@ MutableDBOptions::MutableDBOptions() max_background_compactions(-1), max_background_garbage_collections(-1), avoid_flush_during_shutdown(false), + write_wal_while_sync(false), writable_file_max_buffer_size(1024 * 1024), delayed_write_rate(2 * 1024U * 1024U), max_wal_size(0), @@ -272,6 +273,7 @@ MutableDBOptions::MutableDBOptions(const DBOptions& options) max_background_garbage_collections( options.max_background_garbage_collections), avoid_flush_during_shutdown(options.avoid_flush_during_shutdown), + write_wal_while_sync(options.write_wal_while_sync), writable_file_max_buffer_size(options.writable_file_max_buffer_size), delayed_write_rate(options.delayed_write_rate), max_wal_size(options.max_wal_size), @@ -293,6 +295,8 @@ void MutableDBOptions::Dump(Logger* log) const { max_background_garbage_collections); ROCKS_LOG_HEADER(log, " Options.avoid_flush_during_shutdown: %d", avoid_flush_during_shutdown); + ROCKS_LOG_HEADER(log, " Options.write_wal_while_sync: %d", + write_wal_while_sync); ROCKS_LOG_HEADER( log, " Options.writable_file_max_buffer_size: %" ROCKSDB_PRIszt, writable_file_max_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index 4cf106e4f5..3dec4ec082 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -100,6 +100,7 @@ struct MutableDBOptions { int max_background_compactions; int max_background_garbage_collections; bool avoid_flush_during_shutdown; + bool write_wal_while_sync; size_t writable_file_max_buffer_size; uint64_t delayed_write_rate; uint64_t max_wal_size; diff --git a/options/options_helper.cc b/options/options_helper.cc index 9218f95502..0fc41cecba 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -130,6 +130,7 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.avoid_flush_during_recovery; options.avoid_flush_during_shutdown = mutable_db_options.avoid_flush_during_shutdown; + options.write_wal_while_sync = mutable_db_options.write_wal_while_sync; options.allow_ingest_behind = immutable_db_options.allow_ingest_behind; options.preserve_deletes = immutable_db_options.preserve_deletes; options.two_write_queues = immutable_db_options.two_write_queues; @@ -1635,6 +1636,10 @@ std::unordered_map {offsetof(struct DBOptions, avoid_flush_during_shutdown), OptionType::kBoolean, OptionVerificationType::kNormal, true, offsetof(struct MutableDBOptions, avoid_flush_during_shutdown)}}, + {"write_wal_while_sync", + {offsetof(struct DBOptions, write_wal_while_sync), + OptionType::kBoolean, OptionVerificationType::kNormal, true, + offsetof(struct MutableDBOptions, write_wal_while_sync)}}, {"writable_file_max_buffer_size", {offsetof(struct DBOptions, writable_file_max_buffer_size), OptionType::kSizeT, OptionVerificationType::kNormal, true, diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index fd20c4f2a6..8fd8b39b0e 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -970,6 +970,10 @@ DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes, DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads, "Use O_DIRECT for reading data"); +DEFINE_bool(write_wal_while_sync, + rocksdb::Options().write_wal_while_sync, + "Write Wal while WriteOptions.sync = true"); + DEFINE_bool(use_direct_io_for_flush_and_compaction, rocksdb::Options().use_direct_io_for_flush_and_compaction, "Use O_DIRECT for background flush and compaction writes"); @@ -3194,6 +3198,7 @@ class Benchmark { options.allow_mmap_reads = FLAGS_mmap_read; options.allow_mmap_writes = FLAGS_mmap_write; options.use_direct_reads = FLAGS_use_direct_reads; + options.write_wal_while_sync = FLAGS_write_wal_while_sync; options.use_direct_io_for_flush_and_compaction = FLAGS_use_direct_io_for_flush_and_compaction; options.use_aio_reads = FLAGS_use_aio_reads; diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 2543b51988..c0a02f17fe 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -10,6 +10,7 @@ #include #include #include + #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/listener.h" @@ -85,13 +86,13 @@ class RandomAccessFileReader { bool ShouldNotifyListeners() const { return !listeners_.empty(); } std::unique_ptr file_; - std::string file_name_; - Env* env_; - Statistics* stats_; - uint32_t hist_type_; - bool for_compaction_; - bool use_fsread_; - HistogramImpl* file_read_hist_; + std::string file_name_; + Env* env_; + Statistics* stats_; + uint32_t hist_type_; + bool for_compaction_; + bool use_fsread_; + HistogramImpl* file_read_hist_; RateLimiter* rate_limiter_; std::vector> listeners_; @@ -147,21 +148,21 @@ class WritableFileWriter { std::unique_ptr writable_file_; std::string file_name_; - AlignedBuffer buf_; - size_t max_buffer_size_; + AlignedBuffer buf_; + size_t max_buffer_size_; // Actually written data size can be used for truncate // not counting padding data - uint64_t filesize_; + uint64_t filesize_; #ifndef ROCKSDB_LITE // This is necessary when we use unbuffered access // and writes must happen on aligned offsets // so we need to go back and write that page again - uint64_t next_write_offset_; + uint64_t next_write_offset_; #endif // ROCKSDB_LITE - bool pending_sync_; - uint64_t last_sync_size_; - uint64_t bytes_per_sync_; - RateLimiter* rate_limiter_; + bool pending_sync_; + uint64_t last_sync_size_; + uint64_t bytes_per_sync_; + RateLimiter* rate_limiter_; Statistics* stats_; std::vector> listeners_;