From 6837a17621a045a468c5606a01da6ad5c483f763 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Thu, 22 Jun 2017 15:45:42 -0700 Subject: [PATCH] Fix Data Race Between CreateColumnFamily() and GetAggregatedIntProperty() Summary: CreateColumnFamily() releases DB mutex after adding column family to the set and install super version (to write option file), so if users call GetAggregatedIntProperty() in the middle, then super version will be null and the process will crash. Fix it by skipping those column families without super version installed. Maybe we should also fix the problem of releasing the lock when reading option file, but it is more risky. so I'm doing a quick and safer fix and we can investigate it later. Closes https://github.com/facebook/rocksdb/pull/2475 Differential Revision: D5298053 Pulled By: siying fbshipit-source-id: 4b3c8f91c60400b163fcc6cda8a0c77723be0ef6 --- db/column_family.cc | 1 + db/column_family.h | 5 +++++ db/column_family_test.cc | 22 ++++++++++++++++++++++ db/db_impl.cc | 27 +++++++++++++++++++++------ db/version_set.cc | 18 +++++++++++++++++- 5 files changed, 66 insertions(+), 7 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 72186b706..5ad77ef0f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -349,6 +349,7 @@ ColumnFamilyData::ColumnFamilyData( dummy_versions_(_dummy_versions), current_(nullptr), refs_(0), + initialized_(false), dropped_(false), internal_comparator_(cf_options.comparator), initial_cf_options_(SanitizeOptions(db_options, cf_options)), diff --git a/db/column_family.h b/db/column_family.h index a812777a9..9a6f15d3c 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -335,6 +335,10 @@ class ColumnFamilyData { void RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options); + void set_initialized() { initialized_.store(true); } + + bool initialized() const { return initialized_.load(); } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -351,6 +355,7 @@ class ColumnFamilyData { Version* current_; // == dummy_versions->prev_ std::atomic refs_; // outstanding references to ColumnFamilyData + std::atomic initialized_; bool dropped_; // true if client dropped it const InternalKeyComparator internal_comparator_; diff --git a/db/column_family_test.cc b/db/column_family_test.cc index fbc05e347..dc2f99499 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -521,6 +521,28 @@ TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) { } } +TEST_F(ColumnFamilyTest, CreateCFRaceWithGetAggProperty) { + Open(); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::WriteOptionsFile:1", + "ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"}, + {"ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2", + "DBImpl::WriteOptionsFile:2"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + rocksdb::port::Thread thread([&] { CreateColumnFamilies({"one"}); }); + + TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"); + uint64_t pv; + db_->GetAggregatedIntProperty(DB::Properties::kEstimateTableReadersMem, &pv); + TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2"); + + thread.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + class FlushEmptyCFTestWithParam : public ColumnFamilyTest, public testing::WithParamInterface { public: diff --git a/db/db_impl.cc b/db/db_impl.cc index 0f412a754..afec15935 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -224,7 +224,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { has_unpersisted_data_.load(std::memory_order_relaxed) && !mutable_db_options_.avoid_flush_during_shutdown) { for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) { + if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { cfd->Ref(); mutex_.Unlock(); FlushMemTable(cfd, FlushOptions()); @@ -406,12 +406,17 @@ void DBImpl::MaybeDumpStats() { default_cf_internal_stats_->GetStringProperty( *db_property_info, DB::Properties::kDBStats, &stats); for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->internal_stats()->GetStringProperty( - *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats); + if (cfd->initialized()) { + cfd->internal_stats()->GetStringProperty( + *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, + &stats); + } } for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->internal_stats()->GetStringProperty( - *cf_property_info, DB::Properties::kCFFileHistogram, &stats); + if (cfd->initialized()) { + cfd->internal_stats()->GetStringProperty( + *cf_property_info, DB::Properties::kCFFileHistogram, &stats); + } } } ROCKS_LOG_WARN(immutable_db_options_.info_log, @@ -1208,6 +1213,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, is_snapshot_supported_ = false; } + cfd->set_initialized(); + *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Created column family [%s] (ID %u)", @@ -1709,7 +1716,9 @@ bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd, Status DBImpl::ResetStats() { InstrumentedMutexLock l(&mutex_); for (auto* cfd : *versions_->GetColumnFamilySet()) { - cfd->internal_stats()->Clear(); + if (cfd->initialized()) { + cfd->internal_stats()->Clear(); + } } return Status::OK(); } @@ -1728,6 +1737,9 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property, InstrumentedMutexLock l(&mutex_); uint64_t value; for (auto* cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->initialized()) { + continue; + } if (GetIntPropertyInternal(cfd, *property_info, true, &value)) { sum += value; } else { @@ -2320,6 +2332,9 @@ Status DBImpl::WriteOptionsFile(bool need_mutex_lock, BuildDBOptions(immutable_db_options_, mutable_db_options_); mutex_.Unlock(); + TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1"); + TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2"); + std::string file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber()); Status s = diff --git a/db/version_set.cc b/db/version_set.cc index 584f04386..6c220b5ef 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2720,6 +2720,9 @@ Status VersionSet::Recover( default_cf_edit.SetColumnFamily(0); ColumnFamilyData* default_cfd = CreateColumnFamily(default_cf_iter->second, &default_cf_edit); + // In recovery, nobody else can access it, so it's fine to set it to be + // initialized earlier. + default_cfd->set_initialized(); builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); { @@ -2766,6 +2769,7 @@ Status VersionSet::Recover( {edit.column_family_, edit.column_family_name_}); } else { cfd = CreateColumnFamily(cf_options->second, &edit); + cfd->set_initialized(); builders.insert( {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); } @@ -2893,6 +2897,7 @@ Status VersionSet::Recover( if (cfd->IsDropped()) { continue; } + assert(cfd->initialized()); auto builders_iter = builders.find(cfd->GetID()); assert(builders_iter != builders.end()); auto* builder = builders_iter->second->version_builder(); @@ -3177,6 +3182,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, break; } cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit); + cfd->set_initialized(); builders.insert( {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); } else if (edit.is_column_family_drop_) { @@ -3320,6 +3326,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { if (cfd->IsDropped()) { continue; } + assert(cfd->initialized()); { // Store column family info VersionEdit edit; @@ -3486,6 +3493,9 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { // pre-calculate space requirement int64_t total_files = 0; for (auto cfd : *column_family_set_) { + if (!cfd->initialized()) { + continue; + } Version* dummy_versions = cfd->dummy_versions(); for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { @@ -3500,6 +3510,9 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { live_list->reserve(live_list->size() + static_cast(total_files)); for (auto cfd : *column_family_set_) { + if (!cfd->initialized()) { + continue; + } auto* current = cfd->current(); bool found_current = false; Version* dummy_versions = cfd->dummy_versions(); @@ -3628,6 +3641,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, FileMetaData** meta, ColumnFamilyData** cfd) { for (auto cfd_iter : *column_family_set_) { + if (!cfd_iter->initialized()) { + continue; + } Version* version = cfd_iter->current(); const auto* vstorage = version->storage_info(); for (int level = 0; level < vstorage->num_levels(); level++) { @@ -3646,7 +3662,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { + if (cfd->IsDropped() || !cfd->initialized()) { continue; } for (int level = 0; level < cfd->NumberLevels(); level++) {