From b7198c3afe089d8a5058ca08fbd2cfdac342b8c6 Mon Sep 17 00:00:00 2001 From: Nathan Bronson Date: Wed, 5 Aug 2015 16:56:28 -0700 Subject: [PATCH] reduce db mutex contention for write batch groups Summary: This diff allows a Writer to join the next write batch group without acquiring any locks. Waiting is performed via a per-Writer mutex, so all of the non-leader writers never need to acquire the db mutex. It is now possible to join a write batch group after the leader has been chosen but before the batch has been constructed. This diff doesn't increase parallelism, but reduces synchronization overheads. For some CPU-bound workloads (no WAL, RAM-sized working set) this can substantially reduce contention on the db mutex in a multi-threaded environment. With T=8 N=500000 in a CPU-bound scenario (see the test plan) this is good for a 33% perf win. Not all scenarios see such a win, but none show a loss. This code is slightly faster even for the single-threaded case (about 2% for the CPU-bound scenario below). Test Plan: 1. unit tests 2. COMPILE_WITH_TSAN=1 make check 3. stress high-contention scenarios with db_bench -benchmarks=fillrandom -threads=$T -batch_size=1 -memtablerep=skip_list -value_size=0 --num=$N -level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999 -disable_auto_compactions --max_write_buffer_number=8 -max_background_flushes=8 --disable_wal --write_buffer_size=160000000 Reviewers: sdong, igor, rven, ljin, yhchiang Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D43887 --- db/db_impl.cc | 56 ++++++++------ db/db_impl_debug.cc | 7 +- db/write_thread.cc | 180 ++++++++++++++++++++++++++++++++------------ db/write_thread.h | 141 +++++++++++++++++++++++++--------- 4 files changed, 271 insertions(+), 113 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index a6015ed7a..1f2fb0d75 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2131,14 +2131,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, return Status::OK(); } - WriteThread::Writer w(&mutex_); - write_thread_.EnterWriteThread(&w); - assert(!w.done); // Nobody should do our job + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); // SwitchMemtable() will release and reacquire mutex // during execution s = SwitchMemtable(cfd, &context); - write_thread_.ExitWriteThread(&w, &w, s); + write_thread_.ExitUnbatched(&w); cfd->imm()->FlushRequested(); @@ -3073,15 +3072,14 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, // ColumnFamilyData object Options opt(db_options_, cf_options); { // write thread - WriteThread::Writer w(&mutex_); - write_thread_.EnterWriteThread(&w); - assert(!w.done); // Nobody should do our job + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object s = versions_->LogAndApply( nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit, &mutex_, directories_.GetDbDir(), false, &cf_options); - write_thread_.ExitWriteThread(&w, &w, s); + write_thread_.ExitUnbatched(&w); } if (s.ok()) { single_column_family_mode_ = false; @@ -3135,12 +3133,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { } if (s.ok()) { // we drop column family from a single write thread - WriteThread::Writer w(&mutex_); - write_thread_.EnterWriteThread(&w); - assert(!w.done); // Nobody should do our job + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_); - write_thread_.ExitWriteThread(&w, &w, s); + write_thread_.ExitUnbatched(&w); } if (!cf_support_snapshot) { @@ -3442,7 +3439,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } PERF_TIMER_GUARD(write_pre_and_post_process_time); - WriteThread::Writer w(&mutex_); + WriteThread::Writer w; w.batch = my_batch; w.sync = write_options.sync; w.disableWAL = write_options.disableWAL; @@ -3456,6 +3453,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, StopWatch write_sw(env_, db_options_.statistics.get(), DB_WRITE); + write_thread_.JoinBatchGroup(&w); + if (w.done) { + // write was done by someone else, no need to grab mutex + RecordTick(stats_, WRITE_DONE_BY_OTHER); + return w.status; + } + // else we are the leader of the write batch group + WriteContext context; mutex_.Lock(); @@ -3463,15 +3468,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, 1); } - write_thread_.EnterWriteThread(&w); - if (w.done) { // write was done by someone else - default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, - 1); - mutex_.Unlock(); - RecordTick(stats_, WRITE_DONE_BY_OTHER); - return w.status; - } - RecordTick(stats_, WRITE_DONE_BY_SELF); default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1); @@ -3560,8 +3556,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, bool need_log_dir_sync = need_log_sync && !log_dir_synced_; if (status.ok()) { - last_batch_group_size_ = - write_thread_.BuildBatchGroup(&last_writer, &write_batch_group); + last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader( + &w, &last_writer, &write_batch_group); if (need_log_sync) { while (logs_.front().getting_synced) { @@ -3702,10 +3698,20 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, MarkLogsSynced(logfile_number_, need_log_dir_sync, status); } - write_thread_.ExitWriteThread(&w, last_writer, status); + uint64_t writes_for_other = write_batch_group.size() - 1; + if (writes_for_other > 0) { + default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, + writes_for_other); + if (!write_options.disableWAL) { + default_cf_internal_stats_->AddDBStats(InternalStats::WRITE_WITH_WAL, + writes_for_other); + } + } mutex_.Unlock(); + write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status); + return status; } diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index e99b5983f..dc40fefc6 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -131,15 +131,14 @@ void DBImpl::TEST_UnlockMutex() { } void* DBImpl::TEST_BeginWrite() { - auto w = new WriteThread::Writer(&mutex_); - write_thread_.EnterWriteThread(w); - assert(!w->done); // Nobody should do our job + auto w = new WriteThread::Writer(); + write_thread_.EnterUnbatched(w, &mutex_); return reinterpret_cast(w); } void DBImpl::TEST_EndWrite(void* w) { auto writer = reinterpret_cast(w); - write_thread_.ExitWriteThread(writer, writer, Status::OK()); + write_thread_.ExitUnbatched(writer); delete writer; } diff --git a/db/write_thread.cc b/db/write_thread.cc index 7a9d16003..9b66af240 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -7,82 +7,102 @@ namespace rocksdb { -void WriteThread::EnterWriteThread(WriteThread::Writer* w) { - // the following code block pushes the current writer "w" into the writer - // queue "writers_" and wait until one of the following conditions met: - // 1. the job of "w" has been done by some other writers. - // 2. "w" becomes the first writer in "writers_" - // 3. "w" timed-out. - writers_.push_back(w); - - while (!w->done && w != writers_.front()) { - w->cv.Wait(); +void WriteThread::Await(Writer* w) { + std::unique_lock guard(w->JoinMutex()); + w->JoinCV().wait(guard, [w] { return w->joined; }); +} + +void WriteThread::MarkJoined(Writer* w) { + std::lock_guard guard(w->JoinMutex()); + assert(!w->joined); + w->joined = true; + w->JoinCV().notify_one(); +} + +void WriteThread::LinkOne(Writer* w, bool* wait_needed) { + assert(!w->joined && !w->done); + + Writer* writers = newest_writer_.load(std::memory_order_relaxed); + while (true) { + w->link_older = writers; + if (writers != nullptr) { + w->CreateMutex(); + } + if (newest_writer_.compare_exchange_strong(writers, w)) { + // Success. + *wait_needed = (writers != nullptr); + return; + } } } -void WriteThread::ExitWriteThread(WriteThread::Writer* w, - WriteThread::Writer* last_writer, - Status status) { - // Pop out the current writer and all writers being pushed before the - // current writer from the writer queue. - while (!writers_.empty()) { - Writer* ready = writers_.front(); - writers_.pop_front(); - if (ready != w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); +void WriteThread::CreateMissingNewerLinks(Writer* head) { + while (true) { + Writer* next = head->link_older; + if (next == nullptr || next->link_newer != nullptr) { + assert(next == nullptr || next->link_newer == head); + break; } - if (ready == last_writer) break; + next->link_newer = head; + head = next; } +} - // Notify new head of write queue - if (!writers_.empty()) { - writers_.front()->cv.Signal(); +void WriteThread::JoinBatchGroup(Writer* w) { + assert(w->batch != nullptr); + bool wait_needed; + LinkOne(w, &wait_needed); + if (wait_needed) { + Await(w); } } -// This function will be called only when the first writer succeeds. -// All writers in the to-be-built batch group will be processed. -// -// REQUIRES: Writer list must be non-empty -// REQUIRES: First writer must have a non-nullptr batch -size_t WriteThread::BuildBatchGroup( - WriteThread::Writer** last_writer, +size_t WriteThread::EnterAsBatchGroupLeader( + Writer* leader, WriteThread::Writer** last_writer, autovector* write_batch_group) { - assert(!writers_.empty()); - Writer* first = writers_.front(); - assert(first->batch != nullptr); + assert(leader->link_older == nullptr); + assert(leader->batch != nullptr); - size_t size = WriteBatchInternal::ByteSize(first->batch); - write_batch_group->push_back(first->batch); + size_t size = WriteBatchInternal::ByteSize(leader->batch); + write_batch_group->push_back(leader->batch); // Allow the group to grow up to a maximum size, but if the // original write is small, limit the growth so we do not slow // down the small write too much. size_t max_size = 1 << 20; - if (size <= (128<<10)) { - max_size = size + (128<<10); + if (size <= (128 << 10)) { + max_size = size + (128 << 10); } - *last_writer = first; + *last_writer = leader; - if (first->has_callback) { + if (leader->has_callback) { // TODO(agiardullo:) Batching not currently supported as this write may // fail if the callback function decides to abort this write. return size; } - std::deque::iterator iter = writers_.begin(); - ++iter; // Advance past "first" - for (; iter != writers_.end(); ++iter) { - Writer* w = *iter; - if (w->sync && !first->sync) { + Writer* newest_writer = newest_writer_.load(std::memory_order_acquire); + + // This is safe regardless of any db mutex status of the caller. Previous + // calls to ExitAsGroupLeader either didn't call CreateMissingNewerLinks + // (they emptied the list and then we added ourself as leader) or had to + // explicitly wake up us (the list was non-empty when we added ourself, + // so we have already received our MarkJoined). + CreateMissingNewerLinks(newest_writer); + + // Tricky. Iteration start (leader) is exclusive and finish + // (newest_writer) is inclusive. Iteration goes from old to new. + Writer* w = leader; + while (w != newest_writer) { + w = w->link_newer; + + if (w->sync && !leader->sync) { // Do not include a sync write into a batch handled by a non-sync write. break; } - if (!w->disableWAL && first->disableWAL) { + if (!w->disableWAL && leader->disableWAL) { // Do not include a write that needs WAL into a batch that has // WAL disabled. break; @@ -113,4 +133,68 @@ size_t WriteThread::BuildBatchGroup( return size; } +void WriteThread::ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, + Status status) { + assert(leader->link_older == nullptr); + + Writer* head = newest_writer_.load(std::memory_order_acquire); + if (head != last_writer || + !newest_writer_.compare_exchange_strong(head, nullptr)) { + // Either w wasn't the head during the load(), or it was the head + // during the load() but somebody else pushed onto the list before + // we did the compare_exchange_strong (causing it to fail). In the + // latter case compare_exchange_strong has the effect of re-reading + // its first param (head). No need to retry a failing CAS, because + // only a departing leader (which we are at the moment) can remove + // nodes from the list. + assert(head != last_writer); + + // After walking link_older starting from head (if not already done) + // we will be able to traverse w->link_newer below. This function + // can only be called from an active leader, only a leader can + // clear newest_writer_, we didn't, and only a clear newest_writer_ + // could cause the next leader to start their work without a call + // to MarkJoined, so we can definitely conclude that no other leader + // work is going on here (with or without db mutex). + CreateMissingNewerLinks(head); + assert(last_writer->link_newer->link_older == last_writer); + last_writer->link_newer->link_older = nullptr; + + // Next leader didn't self-identify, because newest_writer_ wasn't + // nullptr when they enqueued (we were definitely enqueued before them + // and are still in the list). That means leader handoff occurs when + // we call MarkJoined + MarkJoined(last_writer->link_newer); + } + // else nobody else was waiting, although there might already be a new + // leader now + + while (last_writer != leader) { + last_writer->status = status; + last_writer->done = true; + // We must read link_older before calling MarkJoined, because as + // soon as it is marked the other thread's AwaitJoined may return + // and deallocate the Writer. + auto next = last_writer->link_older; + MarkJoined(last_writer); + last_writer = next; + } +} + +void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { + assert(w->batch == nullptr); + bool wait_needed; + LinkOne(w, &wait_needed); + if (wait_needed) { + mu->Unlock(); + Await(w); + mu->Lock(); + } +} + +void WriteThread::ExitUnbatched(Writer* w) { + Status dummy_status; + ExitAsBatchGroupLeader(w, w, dummy_status); +} + } // namespace rocksdb diff --git a/db/write_thread.h b/db/write_thread.h index a4a392246..3a15ea847 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -5,76 +5,145 @@ #pragma once +#include #include -#include -#include +#include +#include +#include +#include #include "rocksdb/status.h" #include "db/write_batch_internal.h" #include "util/autovector.h" -#include "port/port.h" #include "util/instrumented_mutex.h" namespace rocksdb { class WriteThread { public: - // Information kept for every waiting writer + // Information kept for every waiting writer. struct Writer { - Status status; WriteBatch* batch; bool sync; bool disableWAL; bool in_batch_group; bool done; bool has_callback; - InstrumentedCondVar cv; + Status status; + bool made_waitable; // records lazy construction of mutex and cv + bool joined; // read/write only under JoinMutex() (or pre-link) + std::aligned_storage::type join_mutex_bytes; + std::aligned_storage::type join_cv_bytes; + Writer* link_older; // read/write only before linking, or as leader + Writer* link_newer; // lazy, read/write only before linking, or as leader - explicit Writer(InstrumentedMutex* mu) + Writer() : batch(nullptr), sync(false), disableWAL(false), in_batch_group(false), done(false), has_callback(false), - cv(mu) {} + made_waitable(false), + joined(false), + link_older(nullptr), + link_newer(nullptr) {} + + ~Writer() { + if (made_waitable) { + JoinMutex().~mutex(); + JoinCV().~condition_variable(); + } + } + + void CreateMutex() { + assert(!joined); + if (!made_waitable) { + made_waitable = true; + new (&join_mutex_bytes) std::mutex; + new (&join_cv_bytes) std::condition_variable; + } + } + + // No other mutexes may be acquired while holding JoinMutex(), it is + // always last in the order + std::mutex& JoinMutex() { + assert(made_waitable); + return *static_cast(static_cast(&join_mutex_bytes)); + } + + std::condition_variable& JoinCV() { + assert(made_waitable); + return *static_cast( + static_cast(&join_cv_bytes)); + } }; - WriteThread() = default; - ~WriteThread() = default; + WriteThread() : newest_writer_(nullptr) {} - // Before applying write operation (such as DBImpl::Write, DBImpl::Flush) - // thread should grab the mutex_ and be the first on writers queue. - // EnterWriteThread is used for it. - // Be aware! Writer's job can be done by other thread (see DBImpl::Write - // for examples), so check it via w.done before applying changes. + // IMPORTANT: None of the methods in this class rely on the db mutex + // for correctness. All of the methods except JoinBatchGroup and + // EnterUnbatched may be called either with or without the db mutex held. + // Correctness is maintained by ensuring that only a single thread is + // a leader at a time. + + // Registers w as ready to become part of a batch group, and blocks + // until some other thread has completed the write (in which case + // w->done will be set to true) or this write has become the leader + // of a batch group (w->done will remain unset). The db mutex SHOULD + // NOT be held when calling this function, because it will block. + // If !w->done then JoinBatchGroup should be followed by a call to + // EnterAsBatchGroupLeader and ExitAsBatchGroupLeader. // - // Writer* w: writer to be placed in the queue - // See also: ExitWriteThread - // REQUIRES: db mutex held - void EnterWriteThread(Writer* w); + // Writer* w: Writer to be executed as part of a batch group + void JoinBatchGroup(Writer* w); - // After doing write job, we need to remove already used writers from - // writers_ queue and notify head of the queue about it. - // ExitWriteThread is used for this. + // Constructs a write batch group led by leader, which should be a + // Writer passed to JoinBatchGroup on the current thread. // - // Writer* w: Writer, that was added by EnterWriteThread function - // Writer* last_writer: Since we can join a few Writers (as DBImpl::Write - // does) - // we should pass last_writer as a parameter to - // ExitWriteThread - // (if you don't touch other writers, just pass w) - // Status status: Status of write operation - // See also: EnterWriteThread + // Writer* leader: Writer passed to JoinBatchGroup, but !done + // Writer** last_writer: Out-param for use by ExitAsBatchGroupLeader + // autovector* write_batch_group: Out-param of group members + // returns: Total batch group size + size_t EnterAsBatchGroupLeader(Writer* leader, Writer** last_writer, + autovector* write_batch_group); + + // Unlinks the Writer-s in a batch group, wakes up the non-leaders, and + // wakes up the next leader (if any). + // + // Writer* leader: From EnterAsBatchGroupLeader + // Writer* last_writer: Value of out-param of EnterAsBatchGroupLeader + // Status status: Status of write operation + void ExitAsBatchGroupLeader(Writer* leader, Writer* last_writer, + Status status); + + // Waits for all preceding writers (unlocking mu while waiting), then + // registers w as the currently proceeding writer. + // + // Writer* w: A Writer not eligible for batching + // InstrumentedMutex* mu: The db mutex, to unlock while waiting // REQUIRES: db mutex held - void ExitWriteThread(Writer* w, Writer* last_writer, Status status); + void EnterUnbatched(Writer* w, InstrumentedMutex* mu); - // return total batch group size - size_t BuildBatchGroup(Writer** last_writer, - autovector* write_batch_group); + // Completes a Writer begun with EnterUnbatched, unblocking subsequent + // writers. + void ExitUnbatched(Writer* w); private: - // Queue of writers. - std::deque writers_; + // Points to the newest pending Writer. Only leader can remove + // elements, adding can be done lock-free by anybody + std::atomic newest_writer_; + + void Await(Writer* w); + void MarkJoined(Writer* w); + + // Links w into the newest_writer_ list. Sets *wait_needed to false + // if w was linked directly into the leader position, true otherwise. + // Safe to call from multiple threads without external locking. + void LinkOne(Writer* w, bool* wait_needed); + + // Computes any missing link_newer links. Should not be called + // concurrently with itself. + void CreateMissingNewerLinks(Writer* head); }; } // namespace rocksdb