diff --git a/db/db_impl.cc b/db/db_impl.cc index 17c4466a3..0587774fb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2273,26 +2273,20 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { StopWatch sw(env_, options_.statistics, DB_WRITE); MutexLock l(&mutex_); - - // If WAL is disabled, we avoid any queueing. - if (!options.disableWAL) { - writers_.push_back(&w); - while (!w.done && &w != writers_.front()) { - w.cv.Wait(); - } - if (w.done) { - return w.status; - } + writers_.push_back(&w); + while (!w.done && &w != writers_.front()) { + w.cv.Wait(); + } + if (w.done) { + return w.status; } // May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; - if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions - WriteBatch* updates = options.disableWAL ? my_batch : - BuildBatchGroup(&last_writer); + WriteBatch* updates = BuildBatchGroup(&last_writer); const SequenceNumber current_sequence = last_sequence + 1; WriteBatchInternal::SetSequence(updates, current_sequence); int my_batch_count = WriteBatchInternal::Count(updates); @@ -2307,12 +2301,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // and protects against concurrent loggers and concurrent writes // into mem_. { + mutex_.Unlock(); if (options.disableWAL) { - // If WAL is disabled, then we do not drop the mutex. We keep the - // mutex to protect concurrent insertions into the memtable. flush_on_destroy_ = true; - } else { - mutex_.Unlock(); + } + + if (!options.disableWAL) { status = log_->AddRecord(WriteBatchInternal::Contents(updates)); if (status.ok() && options.sync) { if (options_.use_fsync) { @@ -2337,29 +2331,25 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { versions_->SetLastSequence(last_sequence); last_flushed_sequence_ = current_sequence; } - if (!options.disableWAL) { - mutex_.Lock(); - } + mutex_.Lock(); } if (updates == &tmp_batch_) tmp_batch_.Clear(); } - if (!options.disableWAL) { - while (true) { - Writer* ready = writers_.front(); - writers_.pop_front(); - if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); - } - if (ready == last_writer) break; + while (true) { + Writer* ready = writers_.front(); + writers_.pop_front(); + if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); } + if (ready == last_writer) break; + } - // Notify new head of write queue - if (!writers_.empty()) { - writers_.front()->cv.Signal(); - } + // Notify new head of write queue + if (!writers_.empty()) { + writers_.front()->cv.Signal(); } return status; } @@ -2423,6 +2413,7 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); + assert(!writers_.empty()); bool allow_delay = !force; bool allow_rate_limit_delay = !force; uint64_t rate_limit_delay_millis = 0;