Fix Data Race Between CreateColumnFamily() and GetAggregatedIntProperty()

Summary:
CreateColumnFamily() releases DB mutex after adding column family to the set and install super version (to write option file), so if users call GetAggregatedIntProperty() in the middle, then super version will be null and the process will crash. Fix it by skipping those column families without super version installed.

Maybe we should also fix the problem of releasing the lock when reading option file, but it is more risky. so I'm doing a quick and safer fix and we can investigate it later.
Closes https://github.com/facebook/rocksdb/pull/2475

Differential Revision: D5298053

Pulled By: siying

fbshipit-source-id: 4b3c8f91c60400b163fcc6cda8a0c77723be0ef6
main
Siying Dong 7 years ago committed by Facebook Github Bot
parent af1746751f
commit 6837a17621
  1. 1
      db/column_family.cc
  2. 5
      db/column_family.h
  3. 22
      db/column_family_test.cc
  4. 27
      db/db_impl.cc
  5. 18
      db/version_set.cc

@ -349,6 +349,7 @@ ColumnFamilyData::ColumnFamilyData(
dummy_versions_(_dummy_versions),
current_(nullptr),
refs_(0),
initialized_(false),
dropped_(false),
internal_comparator_(cf_options.comparator),
initial_cf_options_(SanitizeOptions(db_options, cf_options)),

@ -335,6 +335,10 @@ class ColumnFamilyData {
void RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
void set_initialized() { initialized_.store(true); }
bool initialized() const { return initialized_.load(); }
private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
@ -351,6 +355,7 @@ class ColumnFamilyData {
Version* current_; // == dummy_versions->prev_
std::atomic<int> refs_; // outstanding references to ColumnFamilyData
std::atomic<bool> initialized_;
bool dropped_; // true if client dropped it
const InternalKeyComparator internal_comparator_;

@ -521,6 +521,28 @@ TEST_F(ColumnFamilyTest, DontReuseColumnFamilyID) {
}
}
TEST_F(ColumnFamilyTest, CreateCFRaceWithGetAggProperty) {
Open();
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::WriteOptionsFile:1",
"ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1"},
{"ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2",
"DBImpl::WriteOptionsFile:2"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::port::Thread thread([&] { CreateColumnFamilies({"one"}); });
TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:1");
uint64_t pv;
db_->GetAggregatedIntProperty(DB::Properties::kEstimateTableReadersMem, &pv);
TEST_SYNC_POINT("ColumnFamilyTest.CreateCFRaceWithGetAggProperty:2");
thread.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
class FlushEmptyCFTestWithParam : public ColumnFamilyTest,
public testing::WithParamInterface<bool> {
public:

@ -224,7 +224,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) {
if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
cfd->Ref();
mutex_.Unlock();
FlushMemTable(cfd, FlushOptions());
@ -406,12 +406,17 @@ void DBImpl::MaybeDumpStats() {
default_cf_internal_stats_->GetStringProperty(
*db_property_info, DB::Properties::kDBStats, &stats);
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFStatsNoFileHistogram, &stats);
if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFStatsNoFileHistogram,
&stats);
}
}
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFFileHistogram, &stats);
if (cfd->initialized()) {
cfd->internal_stats()->GetStringProperty(
*cf_property_info, DB::Properties::kCFFileHistogram, &stats);
}
}
}
ROCKS_LOG_WARN(immutable_db_options_.info_log,
@ -1208,6 +1213,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
is_snapshot_supported_ = false;
}
cfd->set_initialized();
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Created column family [%s] (ID %u)",
@ -1709,7 +1716,9 @@ bool DBImpl::GetIntPropertyInternal(ColumnFamilyData* cfd,
Status DBImpl::ResetStats() {
InstrumentedMutexLock l(&mutex_);
for (auto* cfd : *versions_->GetColumnFamilySet()) {
cfd->internal_stats()->Clear();
if (cfd->initialized()) {
cfd->internal_stats()->Clear();
}
}
return Status::OK();
}
@ -1728,6 +1737,9 @@ bool DBImpl::GetAggregatedIntProperty(const Slice& property,
InstrumentedMutexLock l(&mutex_);
uint64_t value;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->initialized()) {
continue;
}
if (GetIntPropertyInternal(cfd, *property_info, true, &value)) {
sum += value;
} else {
@ -2320,6 +2332,9 @@ Status DBImpl::WriteOptionsFile(bool need_mutex_lock,
BuildDBOptions(immutable_db_options_, mutable_db_options_);
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::WriteOptionsFile:1");
TEST_SYNC_POINT("DBImpl::WriteOptionsFile:2");
std::string file_name =
TempOptionsFileName(GetName(), versions_->NewFileNumber());
Status s =

@ -2720,6 +2720,9 @@ Status VersionSet::Recover(
default_cf_edit.SetColumnFamily(0);
ColumnFamilyData* default_cfd =
CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
// In recovery, nobody else can access it, so it's fine to set it to be
// initialized earlier.
default_cfd->set_initialized();
builders.insert({0, new BaseReferencedVersionBuilder(default_cfd)});
{
@ -2766,6 +2769,7 @@ Status VersionSet::Recover(
{edit.column_family_, edit.column_family_name_});
} else {
cfd = CreateColumnFamily(cf_options->second, &edit);
cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
}
@ -2893,6 +2897,7 @@ Status VersionSet::Recover(
if (cfd->IsDropped()) {
continue;
}
assert(cfd->initialized());
auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end());
auto* builder = builders_iter->second->version_builder();
@ -3177,6 +3182,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
break;
}
cfd = CreateColumnFamily(ColumnFamilyOptions(options), &edit);
cfd->set_initialized();
builders.insert(
{edit.column_family_, new BaseReferencedVersionBuilder(cfd)});
} else if (edit.is_column_family_drop_) {
@ -3320,6 +3326,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
if (cfd->IsDropped()) {
continue;
}
assert(cfd->initialized());
{
// Store column family info
VersionEdit edit;
@ -3486,6 +3493,9 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
// pre-calculate space requirement
int64_t total_files = 0;
for (auto cfd : *column_family_set_) {
if (!cfd->initialized()) {
continue;
}
Version* dummy_versions = cfd->dummy_versions();
for (Version* v = dummy_versions->next_; v != dummy_versions;
v = v->next_) {
@ -3500,6 +3510,9 @@ void VersionSet::AddLiveFiles(std::vector<FileDescriptor>* live_list) {
live_list->reserve(live_list->size() + static_cast<size_t>(total_files));
for (auto cfd : *column_family_set_) {
if (!cfd->initialized()) {
continue;
}
auto* current = cfd->current();
bool found_current = false;
Version* dummy_versions = cfd->dummy_versions();
@ -3628,6 +3641,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
FileMetaData** meta,
ColumnFamilyData** cfd) {
for (auto cfd_iter : *column_family_set_) {
if (!cfd_iter->initialized()) {
continue;
}
Version* version = cfd_iter->current();
const auto* vstorage = version->storage_info();
for (int level = 0; level < vstorage->num_levels(); level++) {
@ -3646,7 +3662,7 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
if (cfd->IsDropped() || !cfd->initialized()) {
continue;
}
for (int level = 0; level < cfd->NumberLevels(); level++) {

Loading…
Cancel
Save