diff --git a/db/db_impl.h b/db/db_impl.h index 1831dee22..0a97fd170 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -704,6 +704,10 @@ class DBImpl : public DB { log::Writer* log_writer, bool need_log_sync, bool need_log_dir_sync, SequenceNumber sequence); + // Used by WriteImpl to update bg_error_ when encountering memtable insert + // error. + void UpdateBackgroundError(const Status& memtable_insert_status); + #ifndef ROCKSDB_LITE Status CompactFilesImpl(const CompactionOptions& compact_options, diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 6aa5003e4..f9465e2d1 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -92,10 +92,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } if (write_thread_.CompleteParallelWorker(&w)) { - // we're responsible for early exit + // we're responsible for exit batch group auto last_sequence = w.parallel_group->last_sequence; versions_->SetLastSequence(last_sequence); - write_thread_.EarlyExitParallelGroup(&w); + UpdateBackgroundError(w.status); + write_thread_.ExitAsBatchGroupFollower(&w); } assert(w.state == WriteThread::STATE_COMPLETED); // STATE_COMPLETED conditional below handles exit @@ -119,7 +120,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteContext write_context; WriteThread::Writer* last_writer = &w; // Dummy intial value autovector write_group; + WriteThread::ParallelGroup pg; bool logs_getting_synced = false; + bool in_parallel_group = false; + uint64_t last_sequence = versions_->LastSequence(); mutex_.Lock(); @@ -136,7 +140,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // and protects against concurrent loggers and concurrent writes // into memtables - bool exit_completed_early = false; last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(&w, &last_writer, &write_group); @@ -168,15 +171,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } } - uint64_t last_sequence = versions_->LastSequence(); const SequenceNumber current_sequence = last_sequence + 1; last_sequence += total_count; // Update stats while we are an exclusive group leader, so we know // that nobody else can be writing to these particular stats. // We're optimistic, updating the stats before we successfully - // commit. That lets us release our leader status early in - // some cases. + // commit. That lets us release our leader status early. auto stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::NUMBER_KEYS_WRITTEN, total_count); RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); @@ -211,26 +212,18 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_memtable_time); if (!parallel) { - status = WriteBatchInternal::InsertInto( + w.status = WriteBatchInternal::InsertInto( write_group, current_sequence, column_family_memtables_.get(), &flush_scheduler_, write_options.ignore_missing_column_families, 0 /*recovery_log_number*/, this); - - if (status.ok()) { - // There were no write failures. Set leader's status - // in case the write callback returned a non-ok status. - status = w.FinalStatus(); - } - } else { - WriteThread::ParallelGroup pg; pg.leader = &w; pg.last_writer = last_writer; pg.last_sequence = last_sequence; - pg.early_exit_allowed = !need_log_sync; pg.running.store(static_cast(write_group.size()), std::memory_order_relaxed); write_thread_.LaunchParallelFollowers(&pg, current_sequence); + in_parallel_group = true; // Each parallel follower is doing each own writes. The leader should // also do its own. @@ -244,40 +237,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, write_options.ignore_missing_column_families, 0 /*log_number*/, this, true /*concurrent_memtable_writes*/); } - - // CompleteParallelWorker returns true if this thread should - // handle exit, false means somebody else did - exit_completed_early = !write_thread_.CompleteParallelWorker(&w); - status = w.FinalStatus(); - } - - if (!exit_completed_early && w.status.ok()) { - versions_->SetLastSequence(last_sequence); - if (!need_log_sync) { - write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status); - exit_completed_early = true; - } - } - - // A non-OK status here indicates that the state implied by the - // WAL has diverged from the in-memory state. This could be - // because of a corrupt write_batch (very bad), or because the - // client specified an invalid column family and didn't specify - // ignore_missing_column_families. - // - // Is setting bg_error_ enough here? This will at least stop - // compaction and fail any further writes. - if (!status.ok() && !w.CallbackFailed()) { - mutex_.Lock(); - if (bg_error_.ok()) { - bg_error_ = status; // stop compaction & fail any further writes - } - mutex_.Unlock(); } } } PERF_TIMER_START(write_pre_and_post_process_time); + // + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. if (immutable_db_options_.paranoid_checks && !status.ok() && !w.CallbackFailed() && !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); @@ -293,13 +260,39 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.Unlock(); } - if (!exit_completed_early) { + bool should_exit_batch_group = true; + if (in_parallel_group) { + // CompleteParallelWorker returns true if this thread should + // handle exit, false means somebody else did + should_exit_batch_group = write_thread_.CompleteParallelWorker(&w); + } + if (should_exit_batch_group) { + versions_->SetLastSequence(last_sequence); + UpdateBackgroundError(w.status); write_thread_.ExitAsBatchGroupLeader(&w, last_writer, w.status); } + if (status.ok()) { + status = w.FinalStatus(); + } + return status; } +void DBImpl::UpdateBackgroundError(const Status& memtable_insert_status) { + // A non-OK status here indicates that the state implied by the + // WAL has diverged from the in-memory state. This could be + // because of a corrupt write_batch (very bad), or because the + // client specified an invalid column family and didn't specify + // ignore_missing_column_families. + if (!memtable_insert_status.ok()) { + mutex_.Lock(); + assert(bg_error_.ok()); + bg_error_ = memtable_insert_status; + mutex_.Unlock(); + } +} + Status DBImpl::PreprocessWrite(const WriteOptions& write_options, bool need_log_sync, bool* logs_getting_synced, WriteContext* write_context) { @@ -324,7 +317,7 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, } if (UNLIKELY(status.ok() && !bg_error_.ok())) { - status = bg_error_; + return bg_error_; } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { diff --git a/db/write_thread.cc b/db/write_thread.cc index 26836ef6d..e4fee9c7c 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -349,36 +349,17 @@ bool WriteThread::CompleteParallelWorker(Writer* w) { pg->status = w->status; } - auto leader = pg->leader; - auto early_exit_allowed = pg->early_exit_allowed; - if (pg->running.load(std::memory_order_acquire) > 1 && pg->running-- > 1) { // we're not the last one AwaitState(w, STATE_COMPLETED, &ctx); - - // Caller only needs to perform exit duties if early exit doesn't - // apply and this is the leader. Can't touch pg here. Whoever set - // our state to STATE_COMPLETED copied pg->status to w.status for us. - return w == leader && !(early_exit_allowed && w->status.ok()); - } - // else we're the last parallel worker - - // Errors (if there is any) must be handled by leader before waking up others - if (w == leader || (early_exit_allowed && pg->status.ok())) { - // this thread should perform exit duties - w->status = pg->status; - return true; - } else { - // We're the last parallel follower but early commit is not - // applicable. Wake up the leader and then wait for it to exit. - assert(w->state == STATE_PARALLEL_FOLLOWER); - SetState(leader, STATE_COMPLETED); - AwaitState(w, STATE_COMPLETED, &ctx); return false; } + // else we're the last parallel worker and should perform exit duties. + w->status = pg->status; + return true; } -void WriteThread::EarlyExitParallelGroup(Writer* w) { +void WriteThread::ExitAsBatchGroupFollower(Writer* w) { auto* pg = w->parallel_group; assert(w->state == STATE_PARALLEL_FOLLOWER); diff --git a/db/write_thread.h b/db/write_thread.h index cb7b66807..f09b0d408 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -70,7 +70,6 @@ class WriteThread { Writer* leader; Writer* last_writer; SequenceNumber last_sequence; - bool early_exit_allowed; // before running goes to zero, status needs leader->StateMutex() Status status; std::atomic running; @@ -248,10 +247,8 @@ class WriteThread { // someone else has already taken responsibility for that. bool CompleteParallelWorker(Writer* w); - // This method performs an early completion of a parallel write group, - // where the cleanup work of the leader is performed by a follower who - // happens to be the last parallel worker to complete. - void EarlyExitParallelGroup(Writer* w); + // Exit batch group on behalf of batch group leader. + void ExitAsBatchGroupFollower(Writer* w); // Unlinks the Writer-s in a batch group, wakes up the non-leaders, // and wakes up the next leader (if any).