From f0e1e3ebf1ad78a9145a3ef522b91a22de0c93cf Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 12 Mar 2014 18:09:03 -0700 Subject: [PATCH] CF cleanup part 2 --- db/version_set.cc | 78 +++++++++++++++++++++++++---------------------- db/version_set.h | 1 + db/write_batch.cc | 1 - 3 files changed, 42 insertions(+), 38 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index fcc6ab851..9c82526c8 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1489,12 +1489,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, const ColumnFamilyOptions* options) { mu->AssertHeld(); - assert(column_family_data != nullptr || edit->is_column_family_add_); - - if (edit->is_column_family_drop_) { - // if we drop column family, we have to make sure to save max column family, - // so that we don't reuse existing ID - edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + // column_family_data can be nullptr only if this is column_family_add. + // in that case, we also need to specify ColumnFamilyOptions + if (column_family_data == nullptr) { + assert(edit->is_column_family_add_); + assert(options != nullptr); } // queue our request @@ -1527,9 +1526,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, assert(manifest_writers_.front() == &w); if (edit->IsColumnFamilyManipulation()) { // no group commits for column family add or drop - last_writer = &w; - edit->SetNextFile(next_file_number_); - edit->SetLastSequence(last_sequence_); + LogAndApplyCFHelper(edit); batch_edits.push_back(edit); } else { v = new Version(column_family_data, this, current_version_number_++); @@ -1567,6 +1564,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (new_descriptor_log) { new_manifest_filename = DescriptorFileName(dbname_, manifest_file_number_); edit->SetNextFile(next_file_number_); + // if we're writing out new snapshot make sure to persist max column + // family + if (column_family_set_->GetMaxColumnFamily() > 0) { + edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + } } // Unlock during expensive operations. New writes cannot get here @@ -1590,7 +1592,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // This is fine because everything inside of this block is serialized -- // only one thread can be here at the same time - if (!new_manifest_filename.empty()) { + if (new_descriptor_log) { unique_ptr descriptor_file; s = env_->NewWritableFile(new_manifest_filename, &descriptor_file, storage_options_.AdaptForLogWrite()); @@ -1725,16 +1727,22 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, return s; } +void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { + assert(edit->IsColumnFamilyManipulation()); + edit->SetNextFile(next_file_number_); + edit->SetLastSequence(last_sequence_); + if (edit->is_column_family_drop_) { + // if we drop column family, we have to make sure to save max column family, + // so that we don't reuse existing ID + edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + } +} + void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder, Version* v, VersionEdit* edit, port::Mutex* mu) { mu->AssertHeld(); - - if (!edit->has_prev_log_number_) { - edit->SetPrevLogNumber(prev_log_number_); - } - edit->SetNextFile(next_file_number_); - edit->SetLastSequence(last_sequence_); + assert(!edit->IsColumnFamilyManipulation()); if (edit->has_log_number_) { assert(edit->log_number_ >= cfd->GetLogNumber()); @@ -1743,6 +1751,12 @@ void VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd, Builder* builder, } assert(edit->log_number_ < next_file_number_); + if (!edit->has_prev_log_number_) { + edit->SetPrevLogNumber(prev_log_number_); + } + edit->SetNextFile(next_file_number_); + edit->SetLastSequence(last_sequence_); + builder->Apply(edit); } @@ -1806,17 +1820,16 @@ Status VersionSet::Recover( std::unordered_map builders; // add default column family - VersionEdit default_cf_edit; - default_cf_edit.AddColumnFamily(default_column_family_name); - default_cf_edit.SetColumnFamily(0); auto default_cf_iter = cf_name_to_options.find(default_column_family_name); if (default_cf_iter == cf_name_to_options.end()) { - column_families_not_found.insert(0); - } else { - ColumnFamilyData* default_cfd = - CreateColumnFamily(default_cf_iter->second, &default_cf_edit); - builders.insert({0, new Builder(default_cfd)}); + return Status::InvalidArgument("Default column family not specified"); } + VersionEdit default_cf_edit; + default_cf_edit.AddColumnFamily(default_column_family_name); + default_cf_edit.SetColumnFamily(0); + ColumnFamilyData* default_cfd = + CreateColumnFamily(default_cf_iter->second, &default_cf_edit); + builders.insert({0, new Builder(default_cfd)}); { VersionSet::LogReporter reporter; @@ -2357,9 +2370,11 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? + // WARNING: This method doesn't hold a mutex!! + // This is done without DB mutex lock held, but only within single-threaded // LogAndApply. Column family manipulations can only happen within LogAndApply - // (the same single thread), so we're safe + // (the same single thread), so we're safe to iterate. for (auto cfd : *column_family_set_) { { // Store column family info @@ -2406,18 +2421,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { } } - { - // persist max column family, last sequence and next file - VersionEdit edit; - if (column_family_set_->GetMaxColumnFamily() > 0) { - edit.SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); - } - edit.SetLastSequence(last_sequence_); - edit.SetNextFile(next_file_number_); - std::string record; - edit.EncodeTo(&record); - return log->AddRecord(record); - } + return Status::OK(); } // Opens the mainfest file and reads all records diff --git a/db/version_set.h b/db/version_set.h index c2b0a8f82..d668dbfc3 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -466,6 +466,7 @@ class VersionSet { VersionSet(const VersionSet&); void operator=(const VersionSet&); + void LogAndApplyCFHelper(VersionEdit* edit); void LogAndApplyHelper(ColumnFamilyData* cfd, Builder* b, Version* v, VersionEdit* edit, port::Mutex* mu); }; diff --git a/db/write_batch.cc b/db/write_batch.cc index da8541224..39dc29d39 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -177,7 +177,6 @@ void WriteBatch::Put(uint32_t column_family_id, const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); if (column_family_id == 0) { - // save some data on disk by not writing default column family rep_.push_back(static_cast(kTypeValue)); } else { rep_.push_back(static_cast(kTypeColumnFamilyValue));