From da96f2fe00d681d9bd953e30b21eb70991f12fd7 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 25 Apr 2019 10:04:57 -0700 Subject: [PATCH] Close WAL files before deletion (#5233) Summary: Currently one thread in RocksDB keeps a WAL file open while another thread deletes it. Although the first thread never writes to the WAL again, it still tries to close it in the end. This is fine on POSIX, but can be problematic on other platforms, e.g. HDFS, etc.. It will either cause a lot of warning messages or throw exceptions. The solution is to let the second thread close the WAL before deleting it. RocksDB keeps the writers of the logs to delete in `logs_to_free_`, which is passed to `job_context` during `FindObsoleteFiles` (holding mutex). Then in `PurgeObsoleteFiles` (without mutex), these writers should close the logs. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5233 Differential Revision: D15032670 Pulled By: riversand963 fbshipit-source-id: c55e8a612db8cc2306644001a5e6d53842a8f754 --- HISTORY.md | 1 + db/db_impl.cc | 17 +++++++++-------- db/db_impl_files.cc | 5 +++++ db/log_writer.cc | 15 ++++++++++++++- db/log_writer.h | 2 ++ 5 files changed, 31 insertions(+), 9 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index a27971795..e2ec3265c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -17,6 +17,7 @@ * Adjust WriteBufferManager's dummy entry size to block cache from 1MB to 256KB. * Fix a race condition between WritePrepared::Get and ::Put with duplicate keys. * Fix crash when memtable prefix bloom is enabled and read/write a key out of domain of prefix extractor. +* Close a WAL file before another thread deletes it. ## 6.1.1 (4/9/2019) ### New Features diff --git a/db/db_impl.cc b/db/db_impl.cc index 27ffac50f..c6268d0cb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1209,7 +1209,15 @@ void DBImpl::BackgroundCallPurge() { // both queues are empty. This is stricter than what is needed, but can make // it easier for us to reason the correctness. while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) { - if (!purge_queue_.empty()) { + // Check logs_to_free_queue_ first and close log writers. + if (!logs_to_free_queue_.empty()) { + assert(!logs_to_free_queue_.empty()); + log::Writer* log_writer = *(logs_to_free_queue_.begin()); + logs_to_free_queue_.pop_front(); + mutex_.Unlock(); + delete log_writer; + mutex_.Lock(); + } else { auto purge_file = purge_queue_.begin(); auto fname = purge_file->fname; auto dir_to_sync = purge_file->dir_to_sync; @@ -1221,13 +1229,6 @@ void DBImpl::BackgroundCallPurge() { mutex_.Unlock(); DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number); mutex_.Lock(); - } else { - assert(!logs_to_free_queue_.empty()); - log::Writer* log_writer = *(logs_to_free_queue_.begin()); - logs_to_free_queue_.pop_front(); - mutex_.Unlock(); - delete log_writer; - mutex_.Lock(); } } bg_purge_scheduled_--; diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index 90cc6a14b..b16cf8794 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -481,6 +481,11 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { } #endif // !ROCKSDB_LITE + for (const auto w : state.logs_to_free) { + // TODO: maybe check the return value of Close. + w->Close(); + } + Status file_deletion_status; if (schedule_only) { InstrumentedMutexLock guard_lock(&mutex_); diff --git a/db/log_writer.cc b/db/log_writer.cc index bc99931b9..6ee391981 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -31,10 +31,23 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, } } -Writer::~Writer() { WriteBuffer(); } +Writer::~Writer() { + if (dest_) { + WriteBuffer(); + } +} Status Writer::WriteBuffer() { return dest_->Flush(); } +Status Writer::Close() { + Status s; + if (dest_) { + s = dest_->Close(); + dest_.reset(); + } + return s; +} + Status Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); size_t left = slice.size(); diff --git a/db/log_writer.h b/db/log_writer.h index 3638beb7e..116d03358 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -84,6 +84,8 @@ class Writer { Status WriteBuffer(); + Status Close(); + bool TEST_BufferIsEmpty(); private: