diff --git a/HISTORY.md b/HISTORY.md index a27971795..e2ec3265c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,7 @@ * Adjust WriteBufferManager's dummy entry size to block cache from 1MB to 256KB. * Fix a race condition between WritePrepared::Get and ::Put with duplicate keys. * Fix crash when memtable prefix bloom is enabled and read/write a key out of domain of prefix extractor. +* Close a WAL file before another thread deletes it. ## 6.1.1 (4/9/2019) ### New Features diff --git a/db/db_impl.cc b/db/db_impl.cc index 27ffac50f..c6268d0cb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1209,7 +1209,15 @@ void DBImpl::BackgroundCallPurge() { // both queues are empty. This is stricter than what is needed, but can make // it easier for us to reason the correctness. while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) { - if (!purge_queue_.empty()) { + // Check logs_to_free_queue_ first and close log writers. + if (!logs_to_free_queue_.empty()) { + assert(!logs_to_free_queue_.empty()); + log::Writer* log_writer = *(logs_to_free_queue_.begin()); + logs_to_free_queue_.pop_front(); + mutex_.Unlock(); + delete log_writer; + mutex_.Lock(); + } else { auto purge_file = purge_queue_.begin(); auto fname = purge_file->fname; auto dir_to_sync = purge_file->dir_to_sync; @@ -1221,13 +1229,6 @@ void DBImpl::BackgroundCallPurge() { mutex_.Unlock(); DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number); mutex_.Lock(); - } else { - assert(!logs_to_free_queue_.empty()); - log::Writer* log_writer = *(logs_to_free_queue_.begin()); - logs_to_free_queue_.pop_front(); - mutex_.Unlock(); - delete log_writer; - mutex_.Lock(); } } bg_purge_scheduled_--; diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 90cc6a14b..b16cf8794 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -481,6 +481,11 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { } #endif // !ROCKSDB_LITE + for (const auto w : state.logs_to_free) { + // TODO: maybe check the return value of Close. + w->Close(); + } + Status file_deletion_status; if (schedule_only) { InstrumentedMutexLock guard_lock(&mutex_); diff --git a/db/log_writer.cc b/db/log_writer.cc index bc99931b9..6ee391981 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -31,10 +31,23 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, } } -Writer::~Writer() { WriteBuffer(); } +Writer::~Writer() { + if (dest_) { + WriteBuffer(); + } +} Status Writer::WriteBuffer() { return dest_->Flush(); } +Status Writer::Close() { + Status s; + if (dest_) { + s = dest_->Close(); + dest_.reset(); + } + return s; +} + Status Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); size_t left = slice.size(); diff --git a/db/log_writer.h b/db/log_writer.h index 3638beb7e..116d03358 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -84,6 +84,8 @@ class Writer { Status WriteBuffer(); + Status Close(); + bool TEST_BufferIsEmpty(); private: