fix live WALs purged while file deletions disabled

Summary:
When calling `DisableFileDeletions` followed by `GetSortedWalFiles`, we guarantee the files returned by the latter call won't be deleted until after file deletions are re-enabled. However, `GetSortedWalFiles` didn't omit files already planned for deletion via `PurgeObsoleteFiles`, so the guarantee could be broken.

We fix it by making `GetSortedWalFiles` wait for the number of pending purges to hit zero if file deletions are disabled. This condition is eventually met since `PurgeObsoleteFiles` is guaranteed to be called for the existing pending purges, and new purges cannot be scheduled while file deletions are disabled. Once the condition is met, `GetSortedWalFiles` simply returns the content of DB and archive directories, which nobody can delete (except for deletion scheduler, for which I plan to fix this bug later) until deletions are re-enabled.
Closes https://github.com/facebook/rocksdb/pull/3341

Differential Revision: D6681131

Pulled By: ajkr

fbshipit-source-id: 90b1e2f2362ea9ef715623841c0826611a817634
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent 266d85fbec
commit 46e599fc6b
  1. 2
      HISTORY.md
  2. 13
      db/db_filesnapshot.cc
  3. 4
      db/db_impl.cc
  4. 8
      db/db_impl.h
  5. 1
      db/db_impl_compaction_flush.cc
  6. 19
      db/db_impl_files.cc
  7. 7
      db/db_options_test.cc
  8. 54
      db/db_test2.cc

@ -1,5 +1,7 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased ## Unreleased
### Bug Fixes
* Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete.
## 5.10.0 (12/11/2017) ## 5.10.0 (12/11/2017)
### Public API Change ### Public API Change

@ -57,6 +57,7 @@ Status DBImpl::EnableFileDeletions(bool force) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled"); ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
should_purge_files = true; should_purge_files = true;
FindObsoleteFiles(&job_context, true); FindObsoleteFiles(&job_context, true);
bg_cv_.SignalAll();
} else { } else {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
immutable_db_options_.info_log, immutable_db_options_.info_log,
@ -141,6 +142,18 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
} }
Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) {
{
// If caller disabled deletions, this function should return files that are
// guaranteed not to be deleted until deletions are re-enabled. We need to
// wait for pending purges to finish since WalManager doesn't know which
// files are going to be purged. Additional purges won't be scheduled as
// long as deletions are disabled (so the below loop must terminate).
InstrumentedMutexLock l(&mutex_);
while (disable_delete_obsolete_files_ > 0 &&
pending_purge_obsolete_files_ > 0) {
bg_cv_.Wait();
}
}
return wal_manager_.GetSortedWalFiles(files); return wal_manager_.GetSortedWalFiles(files);
} }

@ -178,6 +178,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
num_running_flushes_(0), num_running_flushes_(0),
bg_purge_scheduled_(0), bg_purge_scheduled_(0),
disable_delete_obsolete_files_(0), disable_delete_obsolete_files_(0),
pending_purge_obsolete_files_(0),
delete_obsolete_files_last_run_(env_->NowMicros()), delete_obsolete_files_last_run_(env_->NowMicros()),
last_stats_dump_time_microsec_(0), last_stats_dump_time_microsec_(0),
next_job_id_(1), next_job_id_(1),
@ -293,7 +294,8 @@ Status DBImpl::CloseImpl() {
// Wait for background work to finish // Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || bg_purge_scheduled_) { bg_flush_scheduled_ || bg_purge_scheduled_ ||
pending_purge_obsolete_files_) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait(); bg_cv_.Wait();
} }

@ -468,6 +468,8 @@ class DBImpl : public DB {
// belong to live files are posibly removed. Also, removes all the // belong to live files are posibly removed. Also, removes all the
// files in sst_delete_files and log_delete_files. // files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method. // It is not necessary to hold the mutex when invoking this method.
// If FindObsoleteFiles() was run, we need to also run
// PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
void PurgeObsoleteFiles(const JobContext& background_contet, void PurgeObsoleteFiles(const JobContext& background_contet,
bool schedule_only = false); bool schedule_only = false);
@ -947,6 +949,8 @@ class DBImpl : public DB {
// (i.e. whenever a flush is done, even if it didn't make any progress) // (i.e. whenever a flush is done, even if it didn't make any progress)
// * whenever there is an error in background purge, flush or compaction // * whenever there is an error in background purge, flush or compaction
// * whenever num_running_ingest_file_ goes to 0. // * whenever num_running_ingest_file_ goes to 0.
// * whenever pending_purge_obsolete_files_ goes to 0.
// * whenever disable_delete_obsolete_files_ goes to 0.
InstrumentedCondVar bg_cv_; InstrumentedCondVar bg_cv_;
// Writes are protected by locking both mutex_ and log_write_mutex_, and reads // Writes are protected by locking both mutex_ and log_write_mutex_, and reads
// must be under either mutex_ or log_write_mutex_. Since after ::Open, // must be under either mutex_ or log_write_mutex_. Since after ::Open,
@ -1213,6 +1217,10 @@ class DBImpl : public DB {
// without any synchronization // without any synchronization
int disable_delete_obsolete_files_; int disable_delete_obsolete_files_;
// Number of times FindObsoleteFiles has found deletable files and the
// corresponding call to PurgeObsoleteFiles has not yet finished.
int pending_purge_obsolete_files_;
// last time when DeleteObsoleteFiles with full scan was executed. Originaly // last time when DeleteObsoleteFiles with full scan was executed. Originaly
// initialized with startup time. // initialized with startup time.
uint64_t delete_obsolete_files_last_run_; uint64_t delete_obsolete_files_last_run_;

@ -1308,6 +1308,7 @@ void DBImpl::BackgroundCallFlush() {
if (job_context.HaveSomethingToClean() || if (job_context.HaveSomethingToClean() ||
job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock(); mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FilesFound");
// Have to flush the info logs before bg_flush_scheduled_-- // Have to flush the info logs before bg_flush_scheduled_--
// because if bg_flush_scheduled_ becomes 0 and the lock is // because if bg_flush_scheduled_ becomes 0 and the lock is
// released, the deconstructor of DB can kick in and destroy all the // released, the deconstructor of DB can kick in and destroy all the

@ -285,6 +285,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->logs_to_free = logs_to_free_; job_context->logs_to_free = logs_to_free_;
job_context->log_recycle_files.assign(log_recycle_files_.begin(), job_context->log_recycle_files.assign(log_recycle_files_.begin(),
log_recycle_files_.end()); log_recycle_files_.end());
if (job_context->HaveSomethingToDelete()) {
++pending_purge_obsolete_files_;
}
logs_to_free_.clear(); logs_to_free_.clear();
} }
@ -342,15 +345,12 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
// files in sst_delete_files and log_delete_files. // files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method. // It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
// we'd better have sth to delete // we'd better have sth to delete
assert(state.HaveSomethingToDelete()); assert(state.HaveSomethingToDelete());
// this checks if FindObsoleteFiles() was run before. If not, don't do // FindObsoleteFiles() should've populated this so nonzero
// PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also assert(state.manifest_file_number != 0);
// run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
if (state.manifest_file_number == 0) {
return;
}
// Now, convert live list to an unordered map, WITHOUT mutex held; // Now, convert live list to an unordered map, WITHOUT mutex held;
// set is slow. // set is slow.
@ -533,6 +533,13 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
wal_manager_.PurgeObsoleteWALFiles(); wal_manager_.PurgeObsoleteWALFiles();
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
LogFlush(immutable_db_options_.info_log); LogFlush(immutable_db_options_.info_log);
InstrumentedMutexLock l(&mutex_);
--pending_purge_obsolete_files_;
assert(pending_purge_obsolete_files_ >= 0);
if (pending_purge_obsolete_files_ == 0) {
bg_cv_.SignalAll();
}
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
} }
void DBImpl::DeleteObsoleteFiles() { void DBImpl::DeleteObsoleteFiles() {

@ -518,8 +518,13 @@ static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) {
JobContext job_context(0); JobContext job_context(0);
dbfull->FindObsoleteFiles(&job_context, false); dbfull->FindObsoleteFiles(&job_context, false);
ASSERT_EQ(empty, job_context.full_scan_candidate_files.empty()); ASSERT_EQ(empty, job_context.full_scan_candidate_files.empty());
job_context.Clean();
dbfull->TEST_UnlockMutex(); dbfull->TEST_UnlockMutex();
if (job_context.HaveSomethingToDelete()) {
// fulfill the contract of FindObsoleteFiles by calling PurgeObsoleteFiles
// afterwards; otherwise the test may hang on shutdown
dbfull->PurgeObsoleteFiles(job_context);
}
job_context.Clean();
} }
TEST_F(DBOptionsTest, DeleteObsoleteFilesPeriodChange) { TEST_F(DBOptionsTest, DeleteObsoleteFilesPeriodChange) {

@ -2430,6 +2430,60 @@ TEST_F(DBTest2, ReadCallbackTest) {
} }
} }
#ifndef ROCKSDB_LITE
TEST_F(DBTest2, LiveFilesOmitObsoleteFiles) {
// Regression test for race condition where an obsolete file is returned to
// user as a "live file" but then deleted, all while file deletions are
// disabled.
//
// It happened like this:
//
// 1. [flush thread] Log file "x.log" found by FindObsoleteFiles
// 2. [user thread] DisableFileDeletions, GetSortedWalFiles are called and the
// latter returned "x.log"
// 3. [flush thread] PurgeObsoleteFiles deleted "x.log"
// 4. [user thread] Reading "x.log" failed
//
// Unfortunately the only regression test I can come up with involves sleep.
// We cannot set SyncPoints to repro since, once the fix is applied, the
// SyncPoints would cause a deadlock as the repro's sequence of events is now
// prohibited.
//
// Instead, if we sleep for a second between Find and Purge, and ensure the
// read attempt happens after purge, then the sequence of events will almost
// certainly happen on the old code.
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DBImpl::BackgroundCallFlush:FilesFound",
"DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered"},
{"DBImpl::PurgeObsoleteFiles:End",
"DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured"},
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::PurgeObsoleteFiles:Begin",
[&](void* arg) { env_->SleepForMicroseconds(1000000); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Put("key", "val");
FlushOptions flush_opts;
flush_opts.wait = false;
db_->Flush(flush_opts);
TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:FlushTriggered");
db_->DisableFileDeletions();
VectorLogPtr log_files;
db_->GetSortedWalFiles(log_files);
TEST_SYNC_POINT("DBTest2::LiveFilesOmitObsoleteFiles:LiveFilesCaptured");
for (const auto& log_file : log_files) {
ASSERT_OK(env_->FileExists(LogFileName(dbname_, log_file->LogNumber())));
}
db_->EnableFileDeletions();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

Loading…
Cancel
Save