diff --git a/db/column_family.cc b/db/column_family.cc index 72186b706..5ad77ef0f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -349,6 +349,7 @@ ColumnFamilyData::ColumnFamilyData( dummy_versions_(_dummy_versions), current_(nullptr), refs_(0), + initialized_(false), dropped_(false), internal_comparator_(cf_options.comparator), initial_cf_options_(SanitizeOptions(db_options, cf_options)), diff --git a/db/column_family.h b/db/column_family.h index a812777a9..9a6f15d3c 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -335,6 +335,10 @@ class ColumnFamilyData { void RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options); + void set_initialized() { initialized_.store(true); } + + bool initialized() const { return initialized_.load(); } + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -351,6 +355,7 @@ class ColumnFamilyData { Version* current_; // == dummy_versions->prev_ std::atomic refs_; // outstanding references to ColumnFamilyData + std::atomic initialized_; bool dropped_; // true if client dropped it const InternalKeyComparator internal_comparator_; diff --git a/db/column_family_test.cc b/db/column_family_test.cc index fbc05e347..dc2f99499 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -521,6 +521,28 @@ TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) { } } +TEST_F(ColumnFamilyTest, CreateCFRaceWithGetAggProperty) { + Open(); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::WriteOptionsFile:1", + "ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"}, + {"ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2", + "DBImpl::WriteOptionsFile:2"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + rocksdb::port::Thread thread([&] { CreateColumnFamilies({"one"}); }); + + TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"); + uint64_t pv; + db_->GetAggregatedIntProperty(DB::Properties::kEstimateTableReadersMem, &pv); + TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2"); + + thread.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + class FlushEmptyCFTestWithParam : public ColumnFamilyTest, public testing::WithParamInterface { public: diff --git a/db/db_impl.cc b/db/db_impl.cc index 0f412a754..afec15935 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -224,7 +224,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { has_unpersisted_data_.load(std::memory_order_relaxed) && !mutable_db_options_.avoid_flush_during_shutdown) { for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) { + if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) { cfd->Ref(); mutex_.Unlock(); FlushMemTable(cfd, FlushOptions()); @@ -406,12 +406,17 @@ void DBImpl::MaybeDumpStats() { default_cf_internal_stats_->GetStringProperty( *db_property_info, DB::Properties::kDBStats, &stats); for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->internal_stats()->GetStringProperty( - *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats); + if (cfd->initialized()) { + cfd->internal_stats()->GetStringProperty( + *cf_property_info, DB::Properties::kCFStatsNoFileHistogram, + &stats); + } } for (auto cfd : *versions_->GetColumnFamilySet()) { - cfd->internal_stats()->GetStringProperty( - *cf_property_info, DB::Properties::kCFFileHistogram, &stats); + if (cfd->initialized()) { + cfd->internal_stats()->GetStringProperty( + *cf_property_info, DB::Properties::kCFFileHistogram, &stats); + } } } ROCKS_LOG_WARN(immutable_db_options_.info_log, @@ -1208,6 +1213,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, is_snapshot_supported_ = false; } + cfd->set_initialized(); + *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Created column family [%s] (ID %u)", @@ -1709,7 +1716,9 @@ bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd, Status DBImpl::ResetStats() { InstrumentedMutexLock l(&mutex_); for (auto* cfd : *versions_->GetColumnFamilySet()) { - cfd->internal_stats()->Clear(); + if (cfd->initialized()) { + cfd->internal_stats()->Clear(); + } } return Status::OK(); } @@ -1728,6 +1737,9 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property, InstrumentedMutexLock l(&mutex_); uint64_t value; for (auto* cfd : *versions_->GetColumnFamilySet()) { + if (!cfd->initialized()) { + continue; + } if (GetIntPropertyInternal(cfd, *property_info, true, &value)) { sum += value; } else { @@ -2320,6 +2332,9 @@ Status DBImpl::WriteOptionsFile(bool need_mutex_lock, BuildDBOptions(immutable_db_options_, mutable_db_options_); mutex_.Unlock(); + TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1"); + TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2"); + std::string file_name = TempOptionsFileName(GetName(), versions_->NewFileNumber()); Status s = diff --git a/db/version_set.cc b/db/version_set.cc index 584f04386..6c220b5ef 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2720,6 +2720,9 @@ Status VersionSet::Recover( default_cf_edit.SetColumnFamily(0); ColumnFamilyData* default_cfd = CreateColumnFamily(default_cf_iter->second, &default_cf_edit); + // In recovery, nobody else can access it, so it's fine to set it to be + // initialized earlier. + default_cfd->set_initialized(); builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)}); { @@ -2766,6 +2769,7 @@ Status VersionSet::Recover( {edit.column_family_, edit.column_family_name_}); } else { cfd = CreateColumnFamily(cf_options->second, &edit); + cfd->set_initialized(); builders.insert( {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); } @@ -2893,6 +2897,7 @@ Status VersionSet::Recover( if (cfd->IsDropped()) { continue; } + assert(cfd->initialized()); auto builders_iter = builders.find(cfd->GetID()); assert(builders_iter != builders.end()); auto* builder = builders_iter->second->version_builder(); @@ -3177,6 +3182,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, break; } cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit); + cfd->set_initialized(); builders.insert( {edit.column_family_, new BaseReferencedVersionBuilder(cfd)}); } else if (edit.is_column_family_drop_) { @@ -3320,6 +3326,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { if (cfd->IsDropped()) { continue; } + assert(cfd->initialized()); { // Store column family info VersionEdit edit; @@ -3486,6 +3493,9 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { // pre-calculate space requirement int64_t total_files = 0; for (auto cfd : *column_family_set_) { + if (!cfd->initialized()) { + continue; + } Version* dummy_versions = cfd->dummy_versions(); for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { @@ -3500,6 +3510,9 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { live_list->reserve(live_list->size() + static_cast(total_files)); for (auto cfd : *column_family_set_) { + if (!cfd->initialized()) { + continue; + } auto* current = cfd->current(); bool found_current = false; Version* dummy_versions = cfd->dummy_versions(); @@ -3628,6 +3641,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, FileMetaData** meta, ColumnFamilyData** cfd) { for (auto cfd_iter : *column_family_set_) { + if (!cfd_iter->initialized()) { + continue; + } Version* version = cfd_iter->current(); const auto* vstorage = version->storage_info(); for (int level = 0; level < vstorage->num_levels(); level++) { @@ -3646,7 +3662,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (auto cfd : *column_family_set_) { - if (cfd->IsDropped()) { + if (cfd->IsDropped() || !cfd->initialized()) { continue; } for (int level = 0; level < cfd->NumberLevels(); level++) {