From e9e6e53247778865314fd8c4e421b785c9d84464 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Thu, 13 Apr 2017 14:46:25 -0700 Subject: [PATCH] Simplify write thread logic Summary: The concept about early exit in write thread implementation is a confusing one. It means that if early exit is allowed, batch group leader will not responsible to exit the batch group, but the last finished writer do. In case we need to mark log synced, or encounter memtable insert error, early exit is disallowed. This patch remove such a concept by: * In all cases, the last finished writer (not necessary leader) is responsible to exit batch group. * In case of parallel memtable write, leader will also mark log synced after memtable insert and before signal finish (call `CompleteParallelWorker()`). The purpose is to allow mark log synced (which require locking mutex) can run in parallel to memtable insert in other writers. * The last finish writer should handle memtable insert error (update bg_error_) before exiting batch group. Closes https://github.com/facebook/rocksdb/pull/2134 Differential Revision: D4869667 Pulled By: yiwu-arbug fbshipit-source-id: aec170847c85b90f4179d6a4608a4fe1361544e3 --- db/db_impl.h | 4 +++ db/db_impl_write.cc | 87 +++++++++++++++++++++------------------------ db/write_thread.cc | 27 +++----------- db/write_thread.h | 7 ++-- 4 files changed, 50 insertions(+), 75 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index 1831dee22..0a97fd170 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -704,6 +704,10 @@ class DBImpl : public DB { log::Writer* log_writer, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence); + // Used by WriteImpl to update bg_error_ when encountering memtable insert + // error. + void UpdateBackgroundError(const Status& memtable_insert_status); + #ifndef ROCKSDB_LITE Status CompactFilesImpl(const CompactionOptions& compact_options, diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 6aa5003e4..f9465e2d1 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -92,10 +92,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (write_thread_.CompleteParallelWorker(&w)) { - // we're responsible for early exit + // we're responsible for exit batch group auto last_sequence = w.parallel_group->last_sequence; versions_->SetLastSequence(last_sequence); - write_thread_.EarlyExitParallelGroup(&w); + UpdateBackgroundError(w.status); + write_thread_.ExitAsBatchGroupFollower(&w); } assert(w.state == WriteThread::STATE_COMPLETED); // STATE_COMPLETED conditional below handles exit @@ -119,7 +120,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteContext write_context; WriteThread::Writer* last_writer = &w; // Dummy intial value autovector write_group; + WriteThread::ParallelGroup pg; bool logs_getting_synced = false; + bool in_parallel_group = false; + uint64_t last_sequence = versions_->LastSequence(); mutex_.Lock(); @@ -136,7 +140,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // and protects against concurrent loggers and concurrent writes // into memtables - bool exit_completed_early = false; last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group); @@ -168,15 +171,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } - uint64_t last_sequence = versions_->LastSequence(); const SequenceNumber current_sequence = last_sequence + 1; last_sequence += total_count; // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully - // commit. That lets us release our leader status early in - // some cases. + // commit. That lets us release our leader status early. auto stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); @@ -211,26 +212,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_memtable_time); if (!parallel) { - status = WriteBatchInternal::InsertInto( + w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this); - - if (status.ok()) { - // There were no write failures. Set leader's status - // in case the write callback returned a non-ok status. - status = w.FinalStatus(); - } - } else { - WriteThread::ParallelGroup pg; pg.leader = &w; pg.last_writer = last_writer; pg.last_sequence = last_sequence; - pg.early_exit_allowed = !need_log_sync; pg.running.store(static_cast(write_group.size()), std::memory_order_relaxed); write_thread_.LaunchParallelFollowers(&pg, current_sequence); + in_parallel_group = true; // Each parallel follower is doing each own writes. The leader should // also do its own. @@ -244,40 +237,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); } - - // CompleteParallelWorker returns true if this thread should - // handle exit, false means somebody else did - exit_completed_early = !write_thread_.CompleteParallelWorker(&w); - status = w.FinalStatus(); - } - - if (!exit_completed_early && w.status.ok()) { - versions_->SetLastSequence(last_sequence); - if (!need_log_sync) { - write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status); - exit_completed_early = true; - } - } - - // A non-OK status here indicates that the state implied by the - // WAL has diverged from the in-memory state. This could be - // because of a corrupt write_batch (very bad), or because the - // client specified an invalid column family and didn't specify - // ignore_missing_column_families. - // - // Is setting bg_error_ enough here? This will at least stop - // compaction and fail any further writes. - if (!status.ok() && !w.CallbackFailed()) { - mutex_.Lock(); - if (bg_error_.ok()) { - bg_error_ = status; // stop compaction & fail any further writes - } - mutex_.Unlock(); } } } PERF_TIMER_START(write_pre_and_post_process_time); + // + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. if (immutable_db_options_.paranoid_checks && !status.ok() && !w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); @@ -293,13 +260,39 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Unlock(); } - if (!exit_completed_early) { + bool should_exit_batch_group = true; + if (in_parallel_group) { + // CompleteParallelWorker returns true if this thread should + // handle exit, false means somebody else did + should_exit_batch_group = write_thread_.CompleteParallelWorker(&w); + } + if (should_exit_batch_group) { + versions_->SetLastSequence(last_sequence); + UpdateBackgroundError(w.status); write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status); } + if (status.ok()) { + status = w.FinalStatus(); + } + return status; } +void DBImpl::UpdateBackgroundError(const Status& memtable_insert_status) { + // A non-OK status here indicates that the state implied by the + // WAL has diverged from the in-memory state. This could be + // because of a corrupt write_batch (very bad), or because the + // client specified an invalid column family and didn't specify + // ignore_missing_column_families. + if (!memtable_insert_status.ok()) { + mutex_.Lock(); + assert(bg_error_.ok()); + bg_error_ = memtable_insert_status; + mutex_.Unlock(); + } +} + Status DBImpl::PreprocessWrite(const WriteOptions& write_options, bool need_log_sync, bool* logs_getting_synced, WriteContext* write_context) { @@ -324,7 +317,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, } if (UNLIKELY(status.ok() && !bg_error_.ok())) { - status = bg_error_; + return bg_error_; } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { diff --git a/db/write_thread.cc b/db/write_thread.cc index 26836ef6d..e4fee9c7c 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -349,36 +349,17 @@ bool WriteThread::CompleteParallelWorker(Writer* w) { pg->status = w->status; } - auto leader = pg->leader; - auto early_exit_allowed = pg->early_exit_allowed; - if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) { // we're not the last one AwaitState(w, STATE_COMPLETED, &ctx); - - // Caller only needs to perform exit duties if early exit doesn't - // apply and this is the leader. Can't touch pg here. Whoever set - // our state to STATE_COMPLETED copied pg->status to w.status for us. - return w == leader && !(early_exit_allowed && w->status.ok()); - } - // else we're the last parallel worker - - // Errors (if there is any) must be handled by leader before waking up others - if (w == leader || (early_exit_allowed && pg->status.ok())) { - // this thread should perform exit duties - w->status = pg->status; - return true; - } else { - // We're the last parallel follower but early commit is not - // applicable. Wake up the leader and then wait for it to exit. - assert(w->state == STATE_PARALLEL_FOLLOWER); - SetState(leader, STATE_COMPLETED); - AwaitState(w, STATE_COMPLETED, &ctx); return false; } + // else we're the last parallel worker and should perform exit duties. + w->status = pg->status; + return true; } -void WriteThread::EarlyExitParallelGroup(Writer* w) { +void WriteThread::ExitAsBatchGroupFollower(Writer* w) { auto* pg = w->parallel_group; assert(w->state == STATE_PARALLEL_FOLLOWER); diff --git a/db/write_thread.h b/db/write_thread.h index cb7b66807..f09b0d408 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -70,7 +70,6 @@ class WriteThread { Writer* leader; Writer* last_writer; SequenceNumber last_sequence; - bool early_exit_allowed; // before running goes to zero, status needs leader->StateMutex() Status status; std::atomic running; @@ -248,10 +247,8 @@ class WriteThread { // someone else has already taken responsibility for that. bool CompleteParallelWorker(Writer* w); - // This method performs an early completion of a parallel write group, - // where the cleanup work of the leader is performed by a follower who - // happens to be the last parallel worker to complete. - void EarlyExitParallelGroup(Writer* w); + // Exit batch group on behalf of batch group leader. + void ExitAsBatchGroupFollower(Writer* w); // Unlinks the Writer-s in a batch group, wakes up the non-leaders, // and wakes up the next leader (if any).