Close WAL files before deletion (#5233)

Summary:
Currently one thread in RocksDB keeps a WAL file open while another thread
deletes it. Although the first thread never writes to the WAL again, it still
tries to close it in the end. This is fine on POSIX, but can be problematic on
other platforms, e.g. HDFS, etc.. It will either cause a lot of warning messages or
throw exceptions. The solution is to let the second thread close the WAL before deleting it.

RocksDB keeps the writers of the logs to delete in `logs_to_free_`, which is passed to `job_context` during `FindObsoleteFiles` (holding mutex). Then in `PurgeObsoleteFiles` (without mutex), these writers should close the logs.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5233

Differential Revision: D15032670

Pulled By: riversand963

fbshipit-source-id: c55e8a612db8cc2306644001a5e6d53842a8f754
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent 66d8360beb
commit da96f2fe00
  1. 1
      HISTORY.md
  2. 17
      db/db_impl.cc
  3. 5
      db/db_impl_files.cc
  4. 15
      db/log_writer.cc
  5. 2
      db/log_writer.h

@ -17,6 +17,7 @@
* Adjust WriteBufferManager's dummy entry size to block cache from 1MB to 256KB. * Adjust WriteBufferManager's dummy entry size to block cache from 1MB to 256KB.
* Fix a race condition between WritePrepared::Get and ::Put with duplicate keys. * Fix a race condition between WritePrepared::Get and ::Put with duplicate keys.
* Fix crash when memtable prefix bloom is enabled and read/write a key out of domain of prefix extractor. * Fix crash when memtable prefix bloom is enabled and read/write a key out of domain of prefix extractor.
* Close a WAL file before another thread deletes it.
## 6.1.1 (4/9/2019) ## 6.1.1 (4/9/2019)
### New Features ### New Features

@ -1209,7 +1209,15 @@ void DBImpl::BackgroundCallPurge() {
// both queues are empty. This is stricter than what is needed, but can make // both queues are empty. This is stricter than what is needed, but can make
// it easier for us to reason the correctness. // it easier for us to reason the correctness.
while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) { while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) {
if (!purge_queue_.empty()) { // Check logs_to_free_queue_ first and close log writers.
if (!logs_to_free_queue_.empty()) {
assert(!logs_to_free_queue_.empty());
log::Writer* log_writer = *(logs_to_free_queue_.begin());
logs_to_free_queue_.pop_front();
mutex_.Unlock();
delete log_writer;
mutex_.Lock();
} else {
auto purge_file = purge_queue_.begin(); auto purge_file = purge_queue_.begin();
auto fname = purge_file->fname; auto fname = purge_file->fname;
auto dir_to_sync = purge_file->dir_to_sync; auto dir_to_sync = purge_file->dir_to_sync;
@ -1221,13 +1229,6 @@ void DBImpl::BackgroundCallPurge() {
mutex_.Unlock(); mutex_.Unlock();
DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number); DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
mutex_.Lock(); mutex_.Lock();
} else {
assert(!logs_to_free_queue_.empty());
log::Writer* log_writer = *(logs_to_free_queue_.begin());
logs_to_free_queue_.pop_front();
mutex_.Unlock();
delete log_writer;
mutex_.Lock();
} }
} }
bg_purge_scheduled_--; bg_purge_scheduled_--;

@ -481,6 +481,11 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
for (const auto w : state.logs_to_free) {
// TODO: maybe check the return value of Close.
w->Close();
}
Status file_deletion_status; Status file_deletion_status;
if (schedule_only) { if (schedule_only) {
InstrumentedMutexLock guard_lock(&mutex_); InstrumentedMutexLock guard_lock(&mutex_);

@ -31,10 +31,23 @@ Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
} }
} }
Writer::~Writer() { WriteBuffer(); } Writer::~Writer() {
if (dest_) {
WriteBuffer();
}
}
Status Writer::WriteBuffer() { return dest_->Flush(); } Status Writer::WriteBuffer() { return dest_->Flush(); }
Status Writer::Close() {
Status s;
if (dest_) {
s = dest_->Close();
dest_.reset();
}
return s;
}
Status Writer::AddRecord(const Slice& slice) { Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data(); const char* ptr = slice.data();
size_t left = slice.size(); size_t left = slice.size();

@ -84,6 +84,8 @@ class Writer {
Status WriteBuffer(); Status WriteBuffer();
Status Close();
bool TEST_BufferIsEmpty(); bool TEST_BufferIsEmpty();
private: private:

Loading…
Cancel
Save