From 510f84b686fb12a1a17cee254828f44dcd07c055 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 28 Feb 2014 10:29:37 -0800 Subject: [PATCH] [CF] CreateColumnFamily fix Summary: This fixes few bugs with CreateColumnFamily * We first have to LogAndApply and then call VersionSet::CreateColumnFamily. Otherwise, WriteSnapshot might be invoked, writing out column family add inside of LogAndApply, even though it's not really committed * Fix LogAndApplyHelper() to not apply log number to column_family_data, which is in case of column family add, just a dummy (default) column family * Create SuperVerion when creating column family Test Plan: column_family_test Reviewers: dhruba, haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16443 --- db/column_family_test.cc | 8 ++++++++ db/db_impl.cc | 17 +++++------------ db/version_set.cc | 23 ++++++++++++++--------- 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 0b049a673..6ed5ac234 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -287,8 +287,16 @@ class ColumnFamilyTest { TEST(ColumnFamilyTest, AddDrop) { Open(); CreateColumnFamilies({"one", "two", "three"}); + ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(2, "fodor")); DropColumnFamilies({2}); + ASSERT_EQ("NOT_FOUND", Get(1, "fodor")); CreateColumnFamilies({"four"}); + ASSERT_EQ("NOT_FOUND", Get(3, "fodor")); + ASSERT_OK(Put(1, "fodor", "mirko")); + ASSERT_EQ("mirko", Get(1, "fodor")); + ASSERT_EQ("NOT_FOUND", Get(3, "fodor")); + Close(); ASSERT_TRUE(TryOpen({"default"}).IsInvalidArgument()); Open({"default", "one", "three", "four"}); diff --git a/db/db_impl.cc b/db/db_impl.cc index fd26b8563..e6c5ffed0 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3055,7 +3055,9 @@ std::vector DBImpl::MultiGet( Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle) { - mutex_.Lock(); + *handle = nullptr; + MutexLock l(&mutex_); + if (versions_->GetColumnFamilySet()->Exists(column_family_name)) { return Status::InvalidArgument("Column family already exists"); } @@ -3064,21 +3066,12 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, uint32_t new_id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); edit.SetColumnFamily(new_id); edit.SetLogNumber(logfile_number_); - auto cfd = versions_->CreateColumnFamily(options, &edit); - assert(cfd != nullptr); - edit.SetComparatorName(cfd->internal_comparator().user_comparator()->Name()); + edit.SetComparatorName(options.comparator->Name()); Status s = versions_->LogAndApply(default_cf_handle_->cfd(), &edit, &mutex_); if (s.ok()) { + auto cfd = versions_->CreateColumnFamily(options, &edit); *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); - } else { - *handle = nullptr; - cfd->Unref(); - delete cfd; - } - mutex_.Unlock(); - - if (s.ok()) { Log(options_.info_log, "Created column family \"%s\" (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); } else { diff --git a/db/version_set.cc b/db/version_set.cc index f845344b6..2fcae240e 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1679,21 +1679,25 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder, port::Mutex* mu) { mu->AssertHeld(); - if (edit->has_log_number_) { - assert(edit->log_number_ >= cfd->GetLogNumber()); - assert(edit->log_number_ < next_file_number_); - } else { - edit->SetLogNumber(cfd->GetLogNumber()); - } - if (!edit->has_prev_log_number_) { edit->SetPrevLogNumber(prev_log_number_); } - edit->SetNextFile(next_file_number_); edit->SetLastSequence(last_sequence_); - builder->Apply(edit); + if (edit->is_column_family_add_) { + assert(edit->has_log_number_); + } else { + if (edit->has_log_number_) { + assert(edit->log_number_ >= cfd->GetLogNumber()); + } else { + edit->SetLogNumber(cfd->GetLogNumber()); + } + + builder->Apply(edit); + } + + assert(edit->log_number_ < next_file_number_); } Status VersionSet::Recover( @@ -2563,6 +2567,7 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( AppendVersion(new_cfd, new Version(new_cfd, this, current_version_number_++)); new_cfd->CreateNewMemtable(); + delete new_cfd->InstallSuperVersion(new SuperVersion()); return new_cfd; }