From a7e80379b0a8547e5d4136b12bef53b38c6f1145 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 15 Sep 2015 11:28:44 -0700 Subject: [PATCH] LogAndApply() should fail if the column family has been dropped Summary: This patch finally fixes the ColumnFamilyTest.ReadDroppedColumnFamily test. The test has been failing very sporadically and it was hard to repro. However, I managed to write a new tests that reproes the failure deterministically. Here's what happens: 1. We start the flush for the column family 2. We check if the column family was dropped here: https://github.com/facebook/rocksdb/blob/a3fc49bfddcdb1ff29409aacd06c04df56c7a1d7/db/flush_job.cc#L149 3. This check goes through, ends up in InstallMemtableFlushResults() and it goes into LogAndApply() 4. At about this time, we start dropping the column family. Dropping the column family process gets to LogAndApply() at about the same time as LogAndApply() from flush process 5. Drop column family goes through LogAndApply() first, marking the column family as dropped. 6. Flush process gets woken up and gets a chance to write to the MANIFEST. However, this is where it gets stuck: https://github.com/facebook/rocksdb/blob/a3fc49bfddcdb1ff29409aacd06c04df56c7a1d7/db/version_set.cc#L1975 7. We see that the column family was dropped, so there is no need to write to the MANIFEST. We return OK. 8. Flush gets OK back from LogAndApply() and it deletes the memtable, thinking that the data is now safely persisted to sst file. The fix is pretty simple. Instead of OK, we return ShutdownInProgress. This is not really true, but we have been using this status code to also mean "this operation was canceled because the column family has been dropped". The fix is only one LOC. All other code is related to tests. I added a new test that reproes the failure. I also moved SleepingBackgroundTask to util/testutil.h (because I needed it in column_family_test for my new test). There's plenty of other places where we reimplement SleepingBackgroundTask, but I'll address that in a separate commit. Test Plan: 1. new test 2. make check 3. Make sure the ColumnFamilyTest.ReadDroppedColumnFamily doesn't fail on Travis: https://travis-ci.org/facebook/rocksdb/jobs/79952386 Reviewers: yhchiang, anthony, IslamAbdelRahman, kradhakrishnan, rven, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D46773 --- db/column_family_test.cc | 63 ++++++++++++++++++++++++ db/db_test.cc | 100 +++++++++++---------------------------- db/flush_job.cc | 1 + db/version_set.cc | 8 +++- util/testutil.h | 46 ++++++++++++++++++ 5 files changed, 145 insertions(+), 73 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 9dc6c3c89..938c4121a 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -10,6 +10,7 @@ #include #include #include +#include #include "db/db_impl.h" #include "rocksdb/db.h" @@ -19,6 +20,7 @@ #include "util/testharness.h" #include "util/testutil.h" #include "util/coding.h" +#include "util/sync_point.h" #include "utilities/merge_operators.h" namespace rocksdb { @@ -1196,6 +1198,67 @@ TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) { } } +TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) { + db_options_.create_missing_column_families = true; + Open({"default", "one"}); + ColumnFamilyOptions options; + options.level0_file_num_compaction_trigger = 100; + options.level0_slowdown_writes_trigger = 200; + options.level0_stop_writes_trigger = 200; + options.max_write_buffer_number = 20; + options.write_buffer_size = 100000; // small write buffer size + Reopen({options, options}); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"VersionSet::LogAndApply::ColumnFamilyDrop:1" + "FlushJob::InstallResults"}, + {"FlushJob::InstallResults", + "VersionSet::LogAndApply::ColumnFamilyDrop:2", }}); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + test::SleepingBackgroundTask sleeping_task; + + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + + // 1MB should create ~10 files for each CF + int kKeysNum = 10000; + PutRandomData(1, kKeysNum, 100); + + std::vector threads; + threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); }); + + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + sleeping_task.Reset(); + // now we sleep again. this is just so we're certain that flush job finished + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + + { + // Since we didn't delete CF handle, RocksDB's contract guarantees that + // we're still able to read dropped CF + std::unique_ptr iterator( + db_->NewIterator(ReadOptions(), handles_[1])); + int count = 0; + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_OK(iterator->status()); + ++count; + } + ASSERT_OK(iterator->status()); + ASSERT_EQ(count, kKeysNum); + } + for (auto& t : threads) { + t.join(); + } + + Close(); + Destroy(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test.cc b/db/db_test.cc index 7b45fd8e1..d13f8b145 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2219,61 +2219,16 @@ TEST_F(DBTest, NumImmutableMemTable) { } while (ChangeCompactOptions()); } -class SleepingBackgroundTask { - public: - SleepingBackgroundTask() - : bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {} - void DoSleep() { - MutexLock l(&mutex_); - while (should_sleep_) { - bg_cv_.Wait(); - } - done_with_sleep_ = true; - bg_cv_.SignalAll(); - } - void WakeUp() { - MutexLock l(&mutex_); - should_sleep_ = false; - bg_cv_.SignalAll(); - } - void WaitUntilDone() { - MutexLock l(&mutex_); - while (!done_with_sleep_) { - bg_cv_.Wait(); - } - } - bool WokenUp() { - MutexLock l(&mutex_); - return should_sleep_ == false; - } - - void Reset() { - MutexLock l(&mutex_); - should_sleep_ = true; - done_with_sleep_ = false; - } - - static void DoSleepTask(void* arg) { - reinterpret_cast(arg)->DoSleep(); - } - - private: - port::Mutex mutex_; - port::CondVar bg_cv_; // Signalled when background work finishes - bool should_sleep_; - bool done_with_sleep_; -}; - TEST_F(DBTest, FlushEmptyColumnFamily) { // Block flush thread and disable compaction thread env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - SleepingBackgroundTask sleeping_task_high; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, - Env::Priority::HIGH); + test::SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_high, Env::Priority::HIGH); Options options = CurrentOptions(); // disable compaction @@ -2312,12 +2267,12 @@ TEST_F(DBTest, GetProperty) { // Set sizes to both background thread pool to be 1 and block them. env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - SleepingBackgroundTask sleeping_task_high; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, - Env::Priority::HIGH); + test::SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_high, Env::Priority::HIGH); Options options = CurrentOptions(); WriteOptions writeOpt = WriteOptions(); @@ -2566,8 +2521,8 @@ TEST_F(DBTest, EstimatePendingCompBytes) { // Set sizes to both background thread pool to be 1 and block them. env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); Options options = CurrentOptions(); @@ -6181,7 +6136,7 @@ TEST_F(DBTest, TableOptionsSanitizeTest) { TEST_F(DBTest, SanitizeNumThreads) { for (int attempt = 0; attempt < 2; attempt++) { const size_t kTotalTasks = 8; - SleepingBackgroundTask sleeping_tasks[kTotalTasks]; + test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; Options options = CurrentOptions(); if (attempt == 0) { @@ -6193,7 +6148,8 @@ TEST_F(DBTest, SanitizeNumThreads) { for (size_t i = 0; i < kTotalTasks; i++) { // Insert 5 tasks to low priority queue and 5 tasks to high priority queue - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i], + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_tasks[i], (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH); } @@ -6485,8 +6441,8 @@ TEST_F(DBTest, DynamicMemtableOptions) { // max_background_flushes == 0, so flushes are getting executed by the // compaction thread env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); // Start from scratch and disable compaction/flush. Flush can only happen // during compaction but trigger is pretty high @@ -6521,7 +6477,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); sleeping_task_low.Reset(); - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; while (!sleeping_task_low.WokenUp() && count < 1024) { @@ -6544,7 +6500,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); sleeping_task_low.Reset(); - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; @@ -7351,8 +7307,8 @@ TEST_F(DBTest, DynamicCompactionOptions) { // since level0_stop_writes_trigger = 8 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // Block compaction - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); rocksdb::SyncPoint::GetInstance()->SetCallBack( @@ -7388,7 +7344,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { // Block compaction again sleeping_task_low.Reset(); - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; while (count < 64) { @@ -7826,7 +7782,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) { dbfull()->TEST_WaitForCompact(); ASSERT_EQ("0,0,1", FilesPerLevel(0)); - SleepingBackgroundTask blocking_thread; + test::SleepingBackgroundTask blocking_thread; port::Mutex mutex_; bool already_blocked(false); @@ -7893,12 +7849,12 @@ TEST_F(DBTest, CloseSpeedup) { // Block background threads env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::HIGH); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - SleepingBackgroundTask sleeping_task_high; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, - Env::Priority::HIGH); + test::SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_high, Env::Priority::HIGH); std::vector filenames; env_->GetChildren(dbname_, &filenames); diff --git a/db/flush_job.cc b/db/flush_job.cc index 936c375f6..e42e3c0e4 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -154,6 +154,7 @@ Status FlushJob::Run(FileMetaData* file_meta) { if (!s.ok()) { cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber()); } else { + TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( cfd_, mutable_cf_options_, mems, versions_, db_mutex_, diff --git a/db/version_set.cc b/db/version_set.cc index cedaa3e29..fd3105539 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1979,7 +1979,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!manifest_writers_.empty()) { manifest_writers_.front()->cv.Signal(); } - return Status::OK(); + // we steal this code to also inform about cf-drop + return Status::ShutdownInProgress(); } std::vector batch_edits; @@ -2141,6 +2142,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, new_manifest_file_size = descriptor_log_->file()->GetFileSize(); } + if (edit->is_column_family_drop_) { + TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1"); + TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2"); + } + LogFlush(db_options_->info_log); mu->Lock(); } diff --git a/util/testutil.h b/util/testutil.h index 990a3ba81..9e452fe8a 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -16,6 +16,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/slice.h" +#include "util/mutexlock.h" #include "util/random.h" namespace rocksdb { @@ -276,5 +277,50 @@ class NullLogger : public Logger { // Corrupts key by changing the type extern void CorruptKeyType(InternalKey* ikey); +class SleepingBackgroundTask { + public: + SleepingBackgroundTask() + : bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {} + void DoSleep() { + MutexLock l(&mutex_); + while (should_sleep_) { + bg_cv_.Wait(); + } + done_with_sleep_ = true; + bg_cv_.SignalAll(); + } + void WakeUp() { + MutexLock l(&mutex_); + should_sleep_ = false; + bg_cv_.SignalAll(); + } + void WaitUntilDone() { + MutexLock l(&mutex_); + while (!done_with_sleep_) { + bg_cv_.Wait(); + } + } + bool WokenUp() { + MutexLock l(&mutex_); + return should_sleep_ == false; + } + + void Reset() { + MutexLock l(&mutex_); + should_sleep_ = true; + done_with_sleep_ = false; + } + + static void DoSleepTask(void* arg) { + reinterpret_cast(arg)->DoSleep(); + } + + private: + port::Mutex mutex_; + port::CondVar bg_cv_; // Signalled when background work finishes + bool should_sleep_; + bool done_with_sleep_; +}; + } // namespace test } // namespace rocksdb