diff --git a/db/column_family_test.cc b/db/column_family_test.cc index b6d6b3c6d..a4d7ba0b6 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -68,7 +68,9 @@ class ColumnFamilyTest : public testing::Test { void Close() { for (auto h : handles_) { - delete h; + if (h) { + delete h; + } } handles_.clear(); names_.clear(); @@ -1260,6 +1262,81 @@ TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +namespace { +std::atomic test_stage(0); +const int kMainThreadStartPersistingOptionsFile = 1; +const int kChildThreadFinishDroppingColumnFamily = 2; +const int kChildThreadWaitingMainThreadPersistOptions = 3; +void DropSingleColumnFamily(ColumnFamilyTest* cf_test, int cf_id, + std::vector comparators) { + while (test_stage < kMainThreadStartPersistingOptionsFile) { + Env::Default()->SleepForMicroseconds(100); + } + cf_test->DropColumnFamilies({cf_id}); + delete comparators[cf_id]; + comparators[cf_id] = nullptr; + test_stage = kChildThreadFinishDroppingColumnFamily; +} +} // namespace + +TEST_F(ColumnFamilyTest, CreateAndDropRace) { + const int kCfCount = 5; + std::vector cf_opts; + std::vector comparators; + for (int i = 0; i < kCfCount; ++i) { + cf_opts.emplace_back(); + comparators.push_back(new test::SimpleSuffixReverseComparator()); + cf_opts.back().comparator = comparators.back(); + } + db_options_.create_if_missing = true; + db_options_.create_missing_column_families = true; + + auto main_thread_id = std::this_thread::get_id(); + + rocksdb::SyncPoint::GetInstance()->SetCallBack("PersistRocksDBOptions:start", + [&](void* arg) { + auto current_thread_id = std::this_thread::get_id(); + // If it's the main thread hitting this sync-point, then it + // will be blocked until some other thread update the test_stage. + if (main_thread_id == current_thread_id) { + test_stage = kMainThreadStartPersistingOptionsFile; + while (test_stage < kChildThreadFinishDroppingColumnFamily) { + Env::Default()->SleepForMicroseconds(100); + } + } + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "WriteThread::EnterUnbatched:Wait", [&](void* arg) { + // This means a thread doing DropColumnFamily() is waiting for + // other thread to finish persisting options. + // In such case, we update the test_stage to unblock the main thread. + test_stage = kChildThreadWaitingMainThreadPersistOptions; + + // Note that based on the test setting, this must not be the + // main thread. + ASSERT_NE(main_thread_id, std::this_thread::get_id()); + }); + + // Create a database with four column families + Open({"default", "one", "two", "three"}, + {cf_opts[0], cf_opts[1], cf_opts[2], cf_opts[3]}); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // Start a thread that will drop the first column family + // and its comparator + std::thread drop_cf_thread(DropSingleColumnFamily, this, 1, comparators); + + DropColumnFamilies({2}); + + drop_cf_thread.join(); + + Close(); + Destroy(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 215e7d941..1e05bc8b3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1930,24 +1930,21 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, MutableCFOptions new_options; Status s; + Status persist_options_status; { InstrumentedMutexLock l(&mutex_); s = cfd->SetOptions(options_map); if (s.ok()) { new_options = *cfd->GetLatestMutableCFOptions(); } - } - if (s.ok()) { - Status persist_options_status = WriteOptionsFile(); - if (!persist_options_status.ok()) { - if (db_options_.fail_if_options_file_error) { - s = Status::IOError( - "SetOptions succeeded, but unable to persist options", - persist_options_status.ToString()); - } - Warn(db_options_.info_log, - "Unable to persist options in SetOptions() -- %s", - persist_options_status.ToString().c_str()); + if (s.ok()) { + // Persist RocksDB options under the single write thread + WriteThread::Writer w; + write_thread_.EnterUnbatched(&w, &mutex_); + + persist_options_status = WriteOptionsFile(); + + write_thread_.ExitUnbatched(&w); } } @@ -1963,6 +1960,16 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, db_options_.info_log, "[%s] SetOptions succeeded", cfd->GetName().c_str()); new_options.Dump(db_options_.info_log.get()); + if (!persist_options_status.ok()) { + if (db_options_.fail_if_options_file_error) { + s = Status::IOError( + "SetOptions succeeded, but unable to persist options", + persist_options_status.ToString()); + } + Warn(db_options_.info_log, + "Unable to persist options in SetOptions() -- %s", + persist_options_status.ToString().c_str()); + } } else { Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "[%s] SetOptions failed", cfd->GetName().c_str()); @@ -3446,6 +3453,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, const std::string& column_family_name, ColumnFamilyHandle** handle) { Status s; + Status persist_options_status; *handle = nullptr; s = CheckCompressionSupported(cf_options); @@ -3478,6 +3486,12 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, s = versions_->LogAndApply( nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit, &mutex_, directories_.GetDbDir(), false, &cf_options); + + if (s.ok()) { + // If the column family was created successfully, we then persist + // the updated RocksDB options under the same single write thread + persist_options_status = WriteOptionsFile(); + } write_thread_.ExitUnbatched(&w); } if (s.ok()) { @@ -3505,7 +3519,8 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, // this is outside the mutex if (s.ok()) { - Status persist_options_status = WriteOptionsFile(); + NewThreadStatusCfInfo( + reinterpret_cast(*handle)->cfd()); if (!persist_options_status.ok()) { if (db_options_.fail_if_options_file_error) { s = Status::IOError( @@ -3517,8 +3532,6 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, "Unable to persist options in CreateColumnFamily() -- %s", persist_options_status.ToString().c_str()); } - NewThreadStatusCfInfo( - reinterpret_cast(*handle)->cfd()); } return s; } @@ -3537,6 +3550,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { edit.SetColumnFamily(cfd->GetID()); Status s; + Status options_persist_status; { InstrumentedMutexLock l(&mutex_); if (cfd->IsDropped()) { @@ -3548,6 +3562,11 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { write_thread_.EnterUnbatched(&w, &mutex_); s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_); + if (s.ok()) { + // If the column family was dropped successfully, we then persist + // the updated RocksDB options under the same single write thread + options_persist_status = WriteOptionsFile(); + } write_thread_.ExitUnbatched(&w); } @@ -3574,7 +3593,9 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); max_total_in_memory_state_ -= mutable_cf_options->write_buffer_size * mutable_cf_options->max_write_buffer_number; - auto options_persist_status = WriteOptionsFile(); + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "Dropped column family with id %u\n", cfd->GetID()); + if (!options_persist_status.ok()) { if (db_options_.fail_if_options_file_error) { s = Status::IOError( @@ -3586,9 +3607,6 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { "Unable to persist options in DropColumnFamily() -- %s", options_persist_status.ToString().c_str()); } - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "Dropped column family with id %u\n", - cfd->GetID()); } else { Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, "Dropping column family with id %u FAILED -- %s\n", @@ -5024,7 +5042,12 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } } TEST_SYNC_POINT("DBImpl::Open:Opened"); + Status persist_options_status; if (s.ok()) { + // Persist RocksDB Options before scheduling the compaction. + // The WriteOptionsFile() will release and lock the mutex internally. + persist_options_status = impl->WriteOptionsFile(); + *dbptr = impl; impl->opened_successfully_ = true; impl->MaybeScheduleFlushOrCompaction(); @@ -5035,8 +5058,6 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p", impl); LogFlush(impl->db_options_.info_log); - - auto persist_options_status = impl->WriteOptionsFile(); if (!persist_options_status.ok()) { if (db_options.fail_if_options_file_error) { s = Status::IOError( @@ -5165,38 +5186,34 @@ Status DestroyDB(const std::string& dbname, const Options& options) { Status DBImpl::WriteOptionsFile() { #ifndef ROCKSDB_LITE - std::string file_name; - Status s = WriteOptionsToTempFile(&file_name); - if (!s.ok()) { - return s; - } - s = RenameTempFileToOptionsFile(file_name); - return s; -#else - return Status::OK(); -#endif // !ROCKSDB_LITE -} + mutex_.AssertHeld(); -Status DBImpl::WriteOptionsToTempFile(std::string* file_name) { -#ifndef ROCKSDB_LITE std::vector cf_names; std::vector cf_opts; - { - InstrumentedMutexLock l(&mutex_); - // This part requires mutex to protect the column family options - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } - cf_names.push_back(cfd->GetName()); - cf_opts.push_back(BuildColumnFamilyOptions( - *cfd->options(), *cfd->GetLatestMutableCFOptions())); + + // This part requires mutex to protect the column family options + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; } + cf_names.push_back(cfd->GetName()); + cf_opts.push_back(BuildColumnFamilyOptions( + *cfd->options(), *cfd->GetLatestMutableCFOptions())); } - *file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber()); - Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts, - *file_name, GetEnv()); + // Unlock during expensive operations. New writes cannot get here + // because the single write thread ensures all new writes get queued. + mutex_.Unlock(); + + std::string file_name = + TempOptionsFileName(GetName(), versions_->NewFileNumber()); + Status s = PersistRocksDBOptions(GetDBOptions(), cf_names, cf_opts, file_name, + GetEnv()); + + if (s.ok()) { + s = RenameTempFileToOptionsFile(file_name); + } + mutex_.Lock(); return s; #else return Status::OK(); @@ -5224,8 +5241,6 @@ void DeleteOptionsFilesHelper(const std::map& filenames, Status DBImpl::DeleteObsoleteOptionsFiles() { #ifndef ROCKSDB_LITE - options_files_mutex_.AssertHeld(); - std::vector filenames; // use ordered map to store keep the filenames sorted from the newest // to the oldest. @@ -5257,7 +5272,6 @@ Status DBImpl::DeleteObsoleteOptionsFiles() { Status DBImpl::RenameTempFileToOptionsFile(const std::string& file_name) { #ifndef ROCKSDB_LITE - InstrumentedMutexLock l(&options_files_mutex_); Status s; std::string options_file_name = OptionsFileName(GetName(), versions_->NewFileNumber()); diff --git a/db/db_impl.h b/db/db_impl.h index 1ef96be14..d016af7b4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -405,10 +405,14 @@ class DBImpl : public DB { SuperVersion* super_version, Arena* arena); - // The following options file related functions should not be - // called while DB mutex is held. + // Except in DB::Open(), WriteOptionsFile can only be called when: + // 1. WriteThread::Writer::EnterUnbatched() is used. + // 2. db_mutex is held Status WriteOptionsFile(); - Status WriteOptionsToTempFile(std::string* file_name); + + // The following two functions can only be called when: + // 1. WriteThread::Writer::EnterUnbatched() is used. + // 2. db_mutex is NOT held Status RenameTempFileToOptionsFile(const std::string& file_name); Status DeleteObsoleteOptionsFiles(); diff --git a/db/write_thread.cc b/db/write_thread.cc index 1e606eaa7..cbfd2646f 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "db/write_thread.h" +#include "util/sync_point.h" namespace rocksdb { @@ -188,6 +189,7 @@ void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) { LinkOne(w, &wait_needed); if (wait_needed) { mu->Unlock(); + TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); Await(w); mu->Lock(); } diff --git a/util/options_parser.cc b/util/options_parser.cc index 3e345a32f..20ae51e8b 100644 --- a/util/options_parser.cc +++ b/util/options_parser.cc @@ -17,6 +17,7 @@ #include "rocksdb/db.h" #include "util/options_helper.h" #include "util/string_util.h" +#include "util/sync_point.h" #include "port/port.h" @@ -34,6 +35,7 @@ Status PersistRocksDBOptions(const DBOptions& db_opt, const std::vector& cf_names, const std::vector& cf_opts, const std::string& file_name, Env* env) { + TEST_SYNC_POINT("PersistRocksDBOptions:start"); if (cf_names.size() != cf_opts.size()) { return Status::InvalidArgument( "cf_names.size() and cf_opts.size() must be the same");