diff --git a/db/db_impl.cc b/db/db_impl.cc index 28cf48547..a63a00f22 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3703,7 +3703,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // May temporarily unlock and wait. SuperVersion* superversion_to_free = nullptr; - Status status = MakeRoomForWrite(my_batch == nullptr, &superversion_to_free); + log::Writer* old_log = nullptr; + Status status = MakeRoomForWrite(my_batch == nullptr, + &superversion_to_free, + &old_log); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions @@ -3804,6 +3807,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { writers_.front()->cv.Signal(); } mutex_.Unlock(); + delete old_log; delete superversion_to_free; BumpPerfTime(&perf_context.write_pre_and_post_process_time, &pre_post_process_timer); @@ -3893,7 +3897,8 @@ uint64_t DBImpl::SlowdownAmount(int n, double bottom, double top) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force, - SuperVersion** superversion_to_free) { + SuperVersion** superversion_to_free, + log::Writer** old_log) { mutex_.AssertHeld(); assert(!writers_.empty()); bool allow_delay = !force; @@ -4015,6 +4020,7 @@ Status DBImpl::MakeRoomForWrite(bool force, } else { unique_ptr lfile; + log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; // Attempt to switch to a new memtable and trigger flush of old. @@ -4032,6 +4038,7 @@ Status DBImpl::MakeRoomForWrite(bool force, // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. lfile->SetPreallocationBlockSize(1.1 * options_.write_buffer_size); + new_log = new log::Writer(std::move(lfile)); new_mem = new MemTable(internal_comparator_, options_); new_superversion = new SuperVersion(); } @@ -4044,10 +4051,13 @@ Status DBImpl::MakeRoomForWrite(bool force, // Avoid chewing through file number space in a tight loop. versions_->ReuseFileNumber(new_log_number); assert (!new_mem); + assert(new_log == nullptr); break; } logfile_number_ = new_log_number; - log_.reset(new log::Writer(std::move(lfile))); + assert(new_log != nullptr); + *old_log = log_.release(); + log_.reset(new_log); mem_->SetNextLogNumber(logfile_number_); imm_.Add(mem_); if (force) { diff --git a/db/db_impl.h b/db/db_impl.h index d8ac98cee..bedd23f89 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -338,9 +338,12 @@ class DBImpl : public DB { uint64_t SlowdownAmount(int n, double bottom, double top); // MakeRoomForWrite will return superversion_to_free through an arugment, // which the caller needs to delete. We do it because caller can delete - // the superversion outside of mutex + // the superversion outside of mutex. + // old_log if not nullptr is the old log writer that should be safely + // closed whenever DB mutex is released. Status MakeRoomForWrite(bool force /* compact even if there is room? */, - SuperVersion** superversion_to_free); + SuperVersion** superversion_to_free, + log::Writer** old_log); void BuildBatchGroup(Writer** last_writer, autovector* write_batch_group);