diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 9dc6c3c89..938c4121a 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -10,6 +10,7 @@ #include #include #include +#include #include "db/db_impl.h" #include "rocksdb/db.h" @@ -19,6 +20,7 @@ #include "util/testharness.h" #include "util/testutil.h" #include "util/coding.h" +#include "util/sync_point.h" #include "utilities/merge_operators.h" namespace rocksdb { @@ -1196,6 +1198,67 @@ TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) { } } +TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) { + db_options_.create_missing_column_families = true; + Open({"default", "one"}); + ColumnFamilyOptions options; + options.level0_file_num_compaction_trigger = 100; + options.level0_slowdown_writes_trigger = 200; + options.level0_stop_writes_trigger = 200; + options.max_write_buffer_number = 20; + options.write_buffer_size = 100000; // small write buffer size + Reopen({options, options}); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"VersionSet::LogAndApply::ColumnFamilyDrop:1" + "FlushJob::InstallResults"}, + {"FlushJob::InstallResults", + "VersionSet::LogAndApply::ColumnFamilyDrop:2", }}); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + test::SleepingBackgroundTask sleeping_task; + + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + + // 1MB should create ~10 files for each CF + int kKeysNum = 10000; + PutRandomData(1, kKeysNum, 100); + + std::vector threads; + threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); }); + + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + sleeping_task.Reset(); + // now we sleep again. this is just so we're certain that flush job finished + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::HIGH); + sleeping_task.WakeUp(); + sleeping_task.WaitUntilDone(); + + { + // Since we didn't delete CF handle, RocksDB's contract guarantees that + // we're still able to read dropped CF + std::unique_ptr iterator( + db_->NewIterator(ReadOptions(), handles_[1])); + int count = 0; + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_OK(iterator->status()); + ++count; + } + ASSERT_OK(iterator->status()); + ASSERT_EQ(count, kKeysNum); + } + for (auto& t : threads) { + t.join(); + } + + Close(); + Destroy(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test.cc b/db/db_test.cc index 7b45fd8e1..d13f8b145 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2219,61 +2219,16 @@ TEST_F(DBTest, NumImmutableMemTable) { } while (ChangeCompactOptions()); } -class SleepingBackgroundTask { - public: - SleepingBackgroundTask() - : bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {} - void DoSleep() { - MutexLock l(&mutex_); - while (should_sleep_) { - bg_cv_.Wait(); - } - done_with_sleep_ = true; - bg_cv_.SignalAll(); - } - void WakeUp() { - MutexLock l(&mutex_); - should_sleep_ = false; - bg_cv_.SignalAll(); - } - void WaitUntilDone() { - MutexLock l(&mutex_); - while (!done_with_sleep_) { - bg_cv_.Wait(); - } - } - bool WokenUp() { - MutexLock l(&mutex_); - return should_sleep_ == false; - } - - void Reset() { - MutexLock l(&mutex_); - should_sleep_ = true; - done_with_sleep_ = false; - } - - static void DoSleepTask(void* arg) { - reinterpret_cast(arg)->DoSleep(); - } - - private: - port::Mutex mutex_; - port::CondVar bg_cv_; // Signalled when background work finishes - bool should_sleep_; - bool done_with_sleep_; -}; - TEST_F(DBTest, FlushEmptyColumnFamily) { // Block flush thread and disable compaction thread env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - SleepingBackgroundTask sleeping_task_high; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, - Env::Priority::HIGH); + test::SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_high, Env::Priority::HIGH); Options options = CurrentOptions(); // disable compaction @@ -2312,12 +2267,12 @@ TEST_F(DBTest, GetProperty) { // Set sizes to both background thread pool to be 1 and block them. env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - SleepingBackgroundTask sleeping_task_high; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, - Env::Priority::HIGH); + test::SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_high, Env::Priority::HIGH); Options options = CurrentOptions(); WriteOptions writeOpt = WriteOptions(); @@ -2566,8 +2521,8 @@ TEST_F(DBTest, EstimatePendingCompBytes) { // Set sizes to both background thread pool to be 1 and block them. env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); Options options = CurrentOptions(); @@ -6181,7 +6136,7 @@ TEST_F(DBTest, TableOptionsSanitizeTest) { TEST_F(DBTest, SanitizeNumThreads) { for (int attempt = 0; attempt < 2; attempt++) { const size_t kTotalTasks = 8; - SleepingBackgroundTask sleeping_tasks[kTotalTasks]; + test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; Options options = CurrentOptions(); if (attempt == 0) { @@ -6193,7 +6148,8 @@ TEST_F(DBTest, SanitizeNumThreads) { for (size_t i = 0; i < kTotalTasks; i++) { // Insert 5 tasks to low priority queue and 5 tasks to high priority queue - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i], + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_tasks[i], (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH); } @@ -6485,8 +6441,8 @@ TEST_F(DBTest, DynamicMemtableOptions) { // max_background_flushes == 0, so flushes are getting executed by the // compaction thread env_->SetBackgroundThreads(1, Env::LOW); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); // Start from scratch and disable compaction/flush. Flush can only happen // during compaction but trigger is pretty high @@ -6521,7 +6477,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); sleeping_task_low.Reset(); - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; while (!sleeping_task_low.WokenUp() && count < 1024) { @@ -6544,7 +6500,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); sleeping_task_low.Reset(); - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; @@ -7351,8 +7307,8 @@ TEST_F(DBTest, DynamicCompactionOptions) { // since level0_stop_writes_trigger = 8 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); // Block compaction - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); rocksdb::SyncPoint::GetInstance()->SetCallBack( @@ -7388,7 +7344,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { // Block compaction again sleeping_task_low.Reset(); - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); count = 0; while (count < 64) { @@ -7826,7 +7782,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) { dbfull()->TEST_WaitForCompact(); ASSERT_EQ("0,0,1", FilesPerLevel(0)); - SleepingBackgroundTask blocking_thread; + test::SleepingBackgroundTask blocking_thread; port::Mutex mutex_; bool already_blocked(false); @@ -7893,12 +7849,12 @@ TEST_F(DBTest, CloseSpeedup) { // Block background threads env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::HIGH); - SleepingBackgroundTask sleeping_task_low; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, Env::Priority::LOW); - SleepingBackgroundTask sleeping_task_high; - env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, - Env::Priority::HIGH); + test::SleepingBackgroundTask sleeping_task_high; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_high, Env::Priority::HIGH); std::vector filenames; env_->GetChildren(dbname_, &filenames); diff --git a/db/flush_job.cc b/db/flush_job.cc index 936c375f6..e42e3c0e4 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -154,6 +154,7 @@ Status FlushJob::Run(FileMetaData* file_meta) { if (!s.ok()) { cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber()); } else { + TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table s = cfd_->imm()->InstallMemtableFlushResults( cfd_, mutable_cf_options_, mems, versions_, db_mutex_, diff --git a/db/version_set.cc b/db/version_set.cc index cedaa3e29..fd3105539 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1979,7 +1979,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!manifest_writers_.empty()) { manifest_writers_.front()->cv.Signal(); } - return Status::OK(); + // we steal this code to also inform about cf-drop + return Status::ShutdownInProgress(); } std::vector batch_edits; @@ -2141,6 +2142,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, new_manifest_file_size = descriptor_log_->file()->GetFileSize(); } + if (edit->is_column_family_drop_) { + TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1"); + TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2"); + } + LogFlush(db_options_->info_log); mu->Lock(); } diff --git a/util/testutil.h b/util/testutil.h index 990a3ba81..9e452fe8a 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -16,6 +16,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/slice.h" +#include "util/mutexlock.h" #include "util/random.h" namespace rocksdb { @@ -276,5 +277,50 @@ class NullLogger : public Logger { // Corrupts key by changing the type extern void CorruptKeyType(InternalKey* ikey); +class SleepingBackgroundTask { + public: + SleepingBackgroundTask() + : bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {} + void DoSleep() { + MutexLock l(&mutex_); + while (should_sleep_) { + bg_cv_.Wait(); + } + done_with_sleep_ = true; + bg_cv_.SignalAll(); + } + void WakeUp() { + MutexLock l(&mutex_); + should_sleep_ = false; + bg_cv_.SignalAll(); + } + void WaitUntilDone() { + MutexLock l(&mutex_); + while (!done_with_sleep_) { + bg_cv_.Wait(); + } + } + bool WokenUp() { + MutexLock l(&mutex_); + return should_sleep_ == false; + } + + void Reset() { + MutexLock l(&mutex_); + should_sleep_ = true; + done_with_sleep_ = false; + } + + static void DoSleepTask(void* arg) { + reinterpret_cast(arg)->DoSleep(); + } + + private: + port::Mutex mutex_; + port::CondVar bg_cv_; // Signalled when background work finishes + bool should_sleep_; + bool done_with_sleep_; +}; + } // namespace test } // namespace rocksdb