diff --git a/db/column_family.cc b/db/column_family.cc index 91f222cee..1cf46c5a8 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -333,6 +333,15 @@ ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const { } } +ColumnFamilyData* ColumnFamilySet::GetColumnFamily(const std::string& name) + const { + auto cfd_iter = column_families_.find(name); + if (cfd_iter == column_families_.end()) { + return nullptr; + } + return GetColumnFamily(cfd_iter->second); +} + bool ColumnFamilySet::Exists(uint32_t id) { return column_family_data_.find(id) != column_family_data_.end(); } diff --git a/db/column_family.h b/db/column_family.h index eff4be4bb..378afaf75 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -264,6 +264,7 @@ class ColumnFamilySet { ColumnFamilyData* GetDefault() const; // GetColumnFamily() calls return nullptr if column family is not found ColumnFamilyData* GetColumnFamily(uint32_t id) const; + ColumnFamilyData* GetColumnFamily(const std::string& name) const; bool Exists(uint32_t id); bool Exists(const std::string& name); uint32_t GetID(const std::string& name); diff --git a/db/db_impl.cc b/db/db_impl.cc index 38426218c..2aeb45fb8 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3068,9 +3068,12 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, edit.SetLogNumber(logfile_number_); edit.SetComparatorName(options.comparator->Name()); - Status s = versions_->LogAndApply(default_cf_handle_->cfd(), &edit, &mutex_); + Status s = versions_->LogAndApply(nullptr, &edit, &mutex_, + db_directory_.get(), false, &options); if (s.ok()) { - auto cfd = versions_->CreateColumnFamily(options, &edit); + auto cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); + assert(cfd != nullptr); *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); Log(options_.info_log, "Created column family \"%s\" (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); @@ -3098,16 +3101,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { s = Status::InvalidArgument("Column family already dropped!\n"); } if (s.ok()) { - cfd->SetDropped(); s = versions_->LogAndApply(cfd, &edit, &mutex_); } - if (s.ok()) { - // DB is holding one reference to each column family when it's alive, - // need to drop it now - if (cfd->Unref()) { - delete cfd; - } - } if (s.ok()) { Log(options_.info_log, "Dropped column family with id %u\n", cfd->GetID()); diff --git a/db/version_edit.h b/db/version_edit.h index bd5f0df95..e94dc3d6b 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -101,6 +101,10 @@ class VersionEdit { return new_files_.size() + deleted_files_.size(); } + bool IsColumnFamilyManipulation() { + return is_column_family_add_ || is_column_family_drop_; + } + void SetColumnFamily(uint32_t column_family_id) { column_family_ = column_family_id; } diff --git a/db/version_set.cc b/db/version_set.cc index 9a5be3691..5c1816360 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1485,15 +1485,20 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, VersionEdit* edit, port::Mutex* mu, - Directory* db_directory, - bool new_descriptor_log) { + Directory* db_directory, bool new_descriptor_log, + const ColumnFamilyOptions* options) { mu->AssertHeld(); - if (column_family_data->IsDropped() && !edit->is_column_family_drop_) { + assert(column_family_data != nullptr || edit->is_column_family_add_); + + if (column_family_data != nullptr && column_family_data->IsDropped()) { // if column family is dropped no need to write anything to the manifest // (unless, of course, thit is the drop column family write) return Status::OK(); } + if (edit->is_column_family_drop_) { + column_family_data->SetDropped(); + } // queue our request ManifestWriter w(mu, column_family_data, edit); @@ -1506,23 +1511,36 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } std::vector batch_edits; - Version* v = new Version(column_family_data, this, current_version_number_++); - Builder builder(column_family_data); + Version* v = nullptr; + std::unique_ptr builder(nullptr); // process all requests in the queue ManifestWriter* last_writer = &w; assert(!manifest_writers_.empty()); assert(manifest_writers_.front() == &w); - for (const auto& writer : manifest_writers_) { - if (writer->cfd->GetID() != column_family_data->GetID()) { - // group commits across column families are not yet supported - break; + if (edit->IsColumnFamilyManipulation()) { + // no group commits for column family add or drop + last_writer = &w; + edit->SetNextFile(next_file_number_); + edit->SetLastSequence(last_sequence_); + batch_edits.push_back(edit); + } else { + v = new Version(column_family_data, this, current_version_number_++); + builder.reset(new Builder(column_family_data)); + for (const auto& writer : manifest_writers_) { + if (writer->edit->IsColumnFamilyManipulation() || + writer->cfd->GetID() != column_family_data->GetID()) { + // no group commits for column family add or drop + // also, group commits across column families are not supported + break; + } + last_writer = writer; + LogAndApplyHelper(column_family_data, builder.get(), v, last_writer->edit, + mu); + batch_edits.push_back(last_writer->edit); } - last_writer = writer; - LogAndApplyHelper(column_family_data, &builder, v, last_writer->edit, mu); - batch_edits.push_back(last_writer->edit); + builder->SaveTo(v); } - builder.SaveTo(v); // Initialize new descriptor log file if necessary by creating // a temporary file that contains a snapshot of the current version. @@ -1547,17 +1565,20 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // Unlock during expensive operations. New writes cannot get here // because &w is ensuring that all new writes get queued. { - // calculate the amount of data being compacted at every level - std::vector size_being_compacted(v->NumberLevels() - 1); - column_family_data->compaction_picker()->SizeBeingCompacted( - size_being_compacted); + std::vector size_being_compacted; + if (!edit->IsColumnFamilyManipulation()) { + size_being_compacted.resize(v->NumberLevels() - 1); + // calculate the amount of data being compacted at every level + column_family_data->compaction_picker()->SizeBeingCompacted( + size_being_compacted); + } mu->Unlock(); - if (options_->max_open_files == -1) { + if (!edit->IsColumnFamilyManipulation() && options_->max_open_files == -1) { // unlimited table cache. Pre-load table handle now. // Need to do it out of the mutex. - builder.LoadTableHandlers(); + builder->LoadTableHandlers(); } // This is fine because everything inside of this block is serialized -- @@ -1573,10 +1594,12 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } - // The calls to Finalize and UpdateFilesBySize are cpu-heavy - // and is best called outside the mutex. - v->Finalize(size_being_compacted); - v->UpdateFilesBySize(); + if (!edit->IsColumnFamilyManipulation()) { + // The calls to Finalize and UpdateFilesBySize are cpu-heavy + // and is best called outside the mutex. + v->Finalize(size_being_compacted); + v->UpdateFilesBySize(); + } // Write new record to MANIFEST log if (s.ok()) { @@ -1650,11 +1673,23 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // Install the new version if (s.ok()) { + if (edit->is_column_family_add_) { + // no group commit on column family add + assert(batch_edits.size() == 1); + assert(options != nullptr); + CreateColumnFamily(*options, edit); + } else if (edit->is_column_family_drop_) { + assert(batch_edits.size() == 1); + if (column_family_data->Unref()) { + delete column_family_data; + } + } else { + column_family_data->SetLogNumber(batch_edits.back()->log_number_); + AppendVersion(column_family_data, v); + } + manifest_file_size_ = new_manifest_file_size; - AppendVersion(column_family_data, v); - column_family_data->SetLogNumber(edit->log_number_); prev_log_number_ = edit->prev_log_number_; - } else { Log(options_->info_log, "Error in committing version %lu", (unsigned long)v->GetVersionNumber()); @@ -1694,19 +1729,14 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder, edit->SetNextFile(next_file_number_); edit->SetLastSequence(last_sequence_); - if (edit->is_column_family_add_) { - assert(edit->has_log_number_); + if (edit->has_log_number_) { + assert(edit->log_number_ >= cfd->GetLogNumber()); } else { - if (edit->has_log_number_) { - assert(edit->log_number_ >= cfd->GetLogNumber()); - } else { - edit->SetLogNumber(cfd->GetLogNumber()); - } - - builder->Apply(edit); + edit->SetLogNumber(cfd->GetLogNumber()); } - assert(edit->log_number_ < next_file_number_); + + builder->Apply(edit); } Status VersionSet::Recover( @@ -2013,9 +2043,20 @@ Status VersionSet::ListColumnFamilies(std::vector* column_families, break; } if (edit.is_column_family_add_) { + if (column_family_names.find(edit.column_family_) != + column_family_names.end()) { + s = Status::Corruption("Manifest adding the same column family twice"); + break; + } column_family_names.insert( {edit.column_family_, edit.column_family_name_}); } else if (edit.is_column_family_drop_) { + if (column_family_names.find(edit.column_family_) == + column_family_names.end()) { + s = Status::Corruption( + "Manifest - dropping non-existing column family"); + break; + } column_family_names.erase(edit.column_family_); } } diff --git a/db/version_set.h b/db/version_set.h index 711964128..4546c91c6 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -296,11 +296,14 @@ class VersionSet { // Apply *edit to the current version to form a new descriptor that // is both saved to persistent state and installed as the new // current version. Will release *mu while actually writing to the file. + // column_family_options has to be set if edit is column family add // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() Status LogAndApply(ColumnFamilyData* column_family_data, VersionEdit* edit, port::Mutex* mu, Directory* db_directory = nullptr, - bool new_descriptor_log = false); + bool new_descriptor_log = false, + const ColumnFamilyOptions* column_family_options = + nullptr); // Recover the last saved descriptor from persistent storage. Status Recover(const std::vector& column_families); @@ -401,9 +404,6 @@ class VersionSet { void GetObsoleteFiles(std::vector* files); - ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options, - VersionEdit* edit); - ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } private: @@ -426,6 +426,9 @@ class VersionSet { bool ManifestContains(const std::string& record) const; + ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options, + VersionEdit* edit); + std::unique_ptr column_family_set_; Env* const env_;