From 7c5e583a27a82894bdfbcd05ddfd38d88b270417 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 22 Jan 2014 11:44:53 -0800 Subject: [PATCH] ColumnFamilySet Summary: I created a separate class ColumnFamilySet to keep track of column families. Before we did this in VersionSet and I believe this approach is cleaner. Let me know if you have any comments. I will commit tomorrow. Test Plan: make check Reviewers: dhruba, haobo, kailiu, sdong CC: leveldb Differential Revision: https://reviews.facebook.net/D15357 --- db/column_family.cc | 86 +++++++++ db/column_family.h | 87 +++++++++ db/column_family_test.cc | 2 +- db/db_impl.cc | 64 ++----- db/db_test.cc | 4 +- db/version_set.cc | 276 ++++++++++++++++++---------- db/version_set.h | 70 +++---- db/version_set_reduce_num_levels.cc | 2 +- include/rocksdb/db.h | 2 + util/ldb_cmd.cc | 9 +- 10 files changed, 410 insertions(+), 192 deletions(-) create mode 100644 db/column_family.cc create mode 100644 db/column_family.h diff --git a/db/column_family.cc b/db/column_family.cc new file mode 100644 index 000000000..218958dba --- /dev/null +++ b/db/column_family.cc @@ -0,0 +1,86 @@ +#include "db/column_family.h" +#include "db/version_set.h" + +namespace rocksdb { + +ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, + Version* dummy_versions, + const ColumnFamilyOptions& options) + : id(id), + name(name), + dummy_versions(dummy_versions), + current(nullptr), + options(options) {} + +ColumnFamilyData::~ColumnFamilyData() { + // List must be empty + assert(dummy_versions->next_ == dummy_versions); + delete dummy_versions; +} + +ColumnFamilySet::ColumnFamilySet() : max_column_family_(0) {} + +ColumnFamilySet::~ColumnFamilySet() { + for (auto& cfd : column_family_data_) { + delete cfd.second; + } + for (auto& cfd : droppped_column_families_) { + delete cfd; + } +} + +ColumnFamilyData* ColumnFamilySet::GetDefault() const { + auto ret = GetColumnFamily(0); + assert(ret != nullptr); // default column family should always exist + return ret; +} + +ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const { + auto cfd_iter = column_family_data_.find(id); + if (cfd_iter != column_family_data_.end()) { + return cfd_iter->second; + } else { + return nullptr; + } +} + +bool ColumnFamilySet::Exists(uint32_t id) { + return column_family_data_.find(id) != column_family_data_.end(); +} + +bool ColumnFamilySet::Exists(const std::string& name) { + return column_families_.find(name) != column_families_.end(); +} + +uint32_t ColumnFamilySet::GetID(const std::string& name) { + auto cfd_iter = column_families_.find(name); + assert(cfd_iter != column_families_.end()); + return cfd_iter->second; +} + +uint32_t ColumnFamilySet::GetNextColumnFamilyID() { + return ++max_column_family_; +} + +ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( + const std::string& name, uint32_t id, Version* dummy_versions, + const ColumnFamilyOptions& options) { + assert(column_families_.find(name) == column_families_.end()); + column_families_.insert({name, id}); + ColumnFamilyData* new_cfd = + new ColumnFamilyData(id, name, dummy_versions, options); + column_family_data_.insert({id, new_cfd}); + max_column_family_ = std::max(max_column_family_, id); + return new_cfd; +} + +void ColumnFamilySet::DropColumnFamily(uint32_t id) { + auto cfd = column_family_data_.find(id); + assert(cfd != column_family_data_.end()); + column_families_.erase(cfd->second->name); + cfd->second->current->Unref(); + droppped_column_families_.push_back(cfd->second); + column_family_data_.erase(cfd); +} + +} // namespace rocksdb diff --git a/db/column_family.h b/db/column_family.h new file mode 100644 index 000000000..eaf57826b --- /dev/null +++ b/db/column_family.h @@ -0,0 +1,87 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include "rocksdb/options.h" + +#include +#include +#include + +namespace rocksdb { + +class Version; +class VersionSet; + +// column family metadata +struct ColumnFamilyData { + uint32_t id; + std::string name; + Version* dummy_versions; // Head of circular doubly-linked list of versions. + Version* current; // == dummy_versions->prev_ + ColumnFamilyOptions options; + + ColumnFamilyData(uint32_t id, const std::string& name, + Version* dummy_versions, const ColumnFamilyOptions& options); + ~ColumnFamilyData(); +}; + +class ColumnFamilySet { + public: + class iterator { + public: + explicit iterator( + std::unordered_map::iterator itr) + : itr_(itr) {} + iterator& operator++() { + ++itr_; + return *this; + } + bool operator!=(const iterator& other) { return this->itr_ != other.itr_; } + ColumnFamilyData* operator*() { return itr_->second; } + private: + std::unordered_map::iterator itr_; + }; + + ColumnFamilySet(); + ~ColumnFamilySet(); + + ColumnFamilyData* GetDefault() const; + // GetColumnFamily() calls return nullptr if column family is not found + ColumnFamilyData* GetColumnFamily(uint32_t id) const; + bool Exists(uint32_t id); + bool Exists(const std::string& name); + uint32_t GetID(const std::string& name); + // this call will return the next available column family ID. it guarantees + // that there is no column family with id greater than or equal to the + // returned value in the current running instance. It does not, however, + // guarantee that the returned ID is unique accross RocksDB restarts. + // For example, if a client adds a column family 6 and then drops it, + // after a restart, we might reuse column family 6 ID. + uint32_t GetNextColumnFamilyID(); + + ColumnFamilyData* CreateColumnFamily(const std::string& name, uint32_t id, + Version* dummy_version, + const ColumnFamilyOptions& options); + void DropColumnFamily(uint32_t id); + + iterator begin() { return iterator(column_family_data_.begin()); } + iterator end() { return iterator(column_family_data_.end()); } + + private: + std::unordered_map column_families_; + std::unordered_map column_family_data_; + // we need to keep them alive because we still can't control the lifetime of + // all of column family data members (options for example) + std::vector droppped_column_families_; + uint32_t max_column_family_; +}; + +} // namespace rocksdb diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 0e2608442..fc278ecf3 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -69,7 +69,7 @@ TEST(ColumnFamilyTest, AddDrop) { Close(); vector families; - DB::ListColumnFamilies(db_options_, dbname_, &families); + ASSERT_OK(DB::ListColumnFamilies(db_options_, dbname_, &families)); sort(families.begin(), families.end()); ASSERT_TRUE(families == vector({"default", "four", "one", "three"})); } diff --git a/db/db_impl.cc b/db/db_impl.cc index d57adaae4..c7992ec1c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -913,21 +913,8 @@ Status DBImpl::Recover( } } - Status s = versions_->Recover(); + Status s = versions_->Recover(column_families); if (s.ok()) { - if (column_families.size() != versions_->column_families_.size()) { - return Status::InvalidArgument("Column family specifications mismatch"); - } - for (auto cf : column_families) { - auto cf_iter = versions_->column_families_.find(cf.name); - if (cf_iter == versions_->column_families_.end()) { - return Status::InvalidArgument("Column family specifications mismatch"); - } - auto cf_data_iter = versions_->column_family_data_.find(cf_iter->second); - assert(cf_data_iter != versions_->column_family_data_.end()); - cf_data_iter->second->options = cf.options; - } - SequenceNumber max_sequence(0); // Recover from all newer log files than the ones named in the @@ -2933,11 +2920,13 @@ std::vector DBImpl::MultiGet( Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle* handle) { + if (!versions_->GetColumnFamilySet()->Exists(column_family_name)) { + return Status::InvalidArgument("Column family already exists"); + } VersionEdit edit; edit.AddColumnFamily(column_family_name); MutexLock l(&mutex_); - ++versions_->max_column_family_; - handle->id = versions_->max_column_family_; + handle->id = versions_->GetColumnFamilySet()->GetNextColumnFamilyID(); edit.SetColumnFamily(handle->id); Status s = versions_->LogAndApply(&edit, &mutex_); if (s.ok()) { @@ -2948,21 +2937,16 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, } Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { - // TODO this is not good. implement some sort of refcounting - // column family data and only delete when refcount goes to 0 - // We don't want to delete column family if there is a compaction going on, - // or if there are some outstanding iterators if (column_family.id == 0) { return Status::InvalidArgument("Can't drop default column family"); } - VersionEdit edit; - edit.DropColumnFamily(); - edit.SetColumnFamily(column_family.id); MutexLock l(&mutex_); - auto data_iter = versions_->column_family_data_.find(column_family.id); - if (data_iter == versions_->column_family_data_.end()) { + if (!versions_->GetColumnFamilySet()->Exists(column_family.id)) { return Status::NotFound("Column family not found"); } + VersionEdit edit; + edit.DropColumnFamily(); + edit.SetColumnFamily(column_family.id); Status s = versions_->LogAndApply(&edit, &mutex_); if (s.ok()) { // remove from internal data structures @@ -3968,10 +3952,16 @@ Status DB::OpenWithColumnFamilies( // set column family handles handles->clear(); for (auto cf : column_families) { - auto cf_iter = impl->versions_->column_families_.find(cf.name); - assert(cf_iter != impl->versions_->column_families_.end()); - handles->push_back(ColumnFamilyHandle(cf_iter->second)); + if (!impl->versions_->GetColumnFamilySet()->Exists(cf.name)) { + s = Status::InvalidArgument("Column family not found: ", cf.name); + handles->clear(); + break; + } + uint32_t id = impl->versions_->GetColumnFamilySet()->GetID(cf.name); + handles->push_back(ColumnFamilyHandle(id)); } + } + if (s.ok()) { delete impl->InstallSuperVersion(new DBImpl::SuperVersion()); impl->mem_->SetLogNumber(impl->logfile_number_); impl->DeleteObsoleteFiles(); @@ -4006,23 +3996,7 @@ Status DB::OpenWithColumnFamilies( Status DB::ListColumnFamilies(const DBOptions& db_options, const std::string& name, std::vector* column_families) { - Options options(db_options, ColumnFamilyOptions()); - InternalKeyComparator* icmp = new InternalKeyComparator(options.comparator); - TableCache* table_cache = new TableCache(name, &options, EnvOptions(options), - db_options.max_open_files - 10); - VersionSet* version_set = - new VersionSet(name, &options, EnvOptions(options), table_cache, icmp); - - version_set->Recover(); - column_families->clear(); - for (auto cf : version_set->column_families_) { - column_families->push_back(cf.first); - } - - delete version_set; - delete table_cache; - delete icmp; - return Status::NotSupported("Working on it"); + return VersionSet::ListColumnFamilies(column_families, name, db_options.env); } Snapshot::~Snapshot() { diff --git a/db/db_test.cc b/db/db_test.cc index 30ec6c26a..882421541 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5035,7 +5035,9 @@ void BM_LogAndApply(int iters, int num_base_files) { Options options; EnvOptions sopt; VersionSet vset(dbname, &options, sopt, nullptr, &cmp); - ASSERT_OK(vset.Recover()); + std::vector dummy; + dummy.push_back(ColumnFamilyDescriptor()); + ASSERT_OK(vset.Recover(dummy)); VersionEdit vbase; uint64_t fnum = 1; for (int i = 0; i < num_base_files; i++) { diff --git a/db/version_set.cc b/db/version_set.cc index e2c41ee63..39533c8f3 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -10,6 +10,7 @@ #include "db/version_set.h" #include +#include #include #include #include "db/filename.h" @@ -751,9 +752,6 @@ void Version::Ref() { } void Version::Unref() { - for (auto cfd : vset_->column_family_data_) { - assert(this != &cfd.second->dummy_versions); - } assert(refs_ >= 1); --refs_; if (refs_ == 0) { @@ -1344,7 +1342,8 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, const EnvOptions& storage_options, TableCache* table_cache, const InternalKeyComparator* cmp) - : env_(options->env), + : column_family_set_(new ColumnFamilySet()), + env_(options->env), dbname_(dbname), options_(options), table_cache_(table_cache), @@ -1368,11 +1367,8 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, } VersionSet::~VersionSet() { - for (auto cfd : column_family_data_) { - cfd.second->current->Unref(); - // List must be empty - assert(cfd.second->dummy_versions.next_ == &cfd.second->dummy_versions); - cfd.second->Unref(); + for (auto cfd : *column_family_set_) { + cfd->current->Unref(); } for (auto file : obsolete_files_) { delete file; @@ -1396,8 +1392,8 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, v->Ref(); // Append to linked list - v->prev_ = column_family_data->dummy_versions.prev_; - v->next_ = &column_family_data->dummy_versions; + v->prev_ = column_family_data->dummy_versions->prev_; + v->next_ = column_family_data->dummy_versions; v->prev_->next_ = v; v->next_->prev_ = v; } @@ -1592,13 +1588,16 @@ void VersionSet::LogAndApplyHelper(Builder* builder, Version* v, builder->Apply(edit); } -Status VersionSet::Recover() { - struct LogReporter : public log::Reader::Reporter { - Status* status; - virtual void Corruption(size_t bytes, const Status& s) { - if (this->status->ok()) *this->status = s; - } - }; +Status VersionSet::Recover( + const std::vector& column_families) { + std::unordered_map cf_name_to_options; + for (auto cf : column_families) { + cf_name_to_options.insert({cf.name, cf.options}); + } + // keeps track of column families in manifest that were not found in + // column families parameters. if those column families are not dropped + // by subsequent manifest records, Recover() will return failure status + std::set column_families_not_found; // Read "CURRENT" file, which contains a pointer to the current manifest file std::string current; @@ -1640,12 +1639,17 @@ Status VersionSet::Recover() { VersionEdit default_cf_edit; default_cf_edit.AddColumnFamily(default_column_family_name); default_cf_edit.SetColumnFamily(0); - ColumnFamilyData* default_cfd = - CreateColumnFamily(ColumnFamilyOptions(*options_), &default_cf_edit); - builders.insert({0, new Builder(this, default_cfd->current)}); + auto default_cf_iter = cf_name_to_options.find(default_column_family_name); + if (default_cf_iter == cf_name_to_options.end()) { + column_families_not_found.insert(0); + } else { + ColumnFamilyData* default_cfd = + CreateColumnFamily(default_cf_iter->second, &default_cf_edit); + builders.insert({0, new Builder(this, default_cfd->current)}); + } { - LogReporter reporter; + VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(std::move(file), &reporter, true/*checksum*/, 0/*initial_offset*/); @@ -1665,27 +1669,61 @@ Status VersionSet::Recover() { break; } + bool cf_in_not_found = + column_families_not_found.find(edit.column_family_) != + column_families_not_found.end(); + bool cf_in_builders = + builders.find(edit.column_family_) != builders.end(); + + // they can't both be true + assert(!(cf_in_not_found && cf_in_builders)); + if (edit.is_column_family_add_) { - ColumnFamilyData* new_cfd = - CreateColumnFamily(ColumnFamilyOptions(), &edit); - builders.insert( - {edit.column_family_, new Builder(this, new_cfd->current)}); + if (cf_in_builders || cf_in_not_found) { + s = Status::Corruption( + "Manifest adding the same column family twice"); + break; + } + auto cf_options = cf_name_to_options.find(edit.column_family_name_); + if (cf_options == cf_name_to_options.end()) { + column_families_not_found.insert(edit.column_family_); + } else { + ColumnFamilyData* new_cfd = + CreateColumnFamily(cf_options->second, &edit); + builders.insert( + {edit.column_family_, new Builder(this, new_cfd->current)}); + } } else if (edit.is_column_family_drop_) { - auto builder = builders.find(edit.column_family_); - assert(builder != builders.end()); - delete builder->second; - builders.erase(builder); - DropColumnFamily(&edit); - } else { - auto cfd = column_family_data_.find(edit.column_family_); - assert(cfd != column_family_data_.end()); - if (edit.max_level_ >= cfd->second->current->NumberLevels()) { + if (cf_in_builders) { + auto builder = builders.find(edit.column_family_); + assert(builder != builders.end()); + delete builder->second; + builders.erase(builder); + DropColumnFamily(&edit); + } else if (cf_in_not_found) { + column_families_not_found.erase(edit.column_family_); + } else { + s = Status::Corruption( + "Manifest - dropping non-existing column family"); + break; + } + } else if (!cf_in_not_found) { + if (!cf_in_builders) { + s = Status::Corruption( + "Manifest record referencing unknown column family"); + break; + } + + auto cfd = column_family_set_->GetColumnFamily(edit.column_family_); + // this should never happen since cf_in_builders is true + assert(cfd != nullptr); + if (edit.max_level_ >= cfd->current->NumberLevels()) { s = Status::InvalidArgument( "db has more levels than options.num_levels"); break; } - // if it isn't column family add or column family drop, + // if it is not column family add or column family drop, // then it's a file add/delete, which should be forwarded // to builder auto builder = builders.find(edit.column_family_); @@ -1733,16 +1771,24 @@ Status VersionSet::Recover() { MarkFileNumberUsed(log_number); } + // there were some column families in the MANIFEST that weren't specified + // in the argument + if (column_families_not_found.size() > 0) { + s = Status::InvalidArgument( + "Found unexpected column families. You have to specify all column " + "families when opening the DB"); + } + if (s.ok()) { - for (auto cfd : column_family_data_) { + for (auto cfd : *column_family_set_) { Version* v = new Version(this, current_version_number_++); - builders[cfd.first]->SaveTo(v); + builders[cfd->id]->SaveTo(v); // Install recovered version std::vector size_being_compacted(v->NumberLevels() - 1); compaction_picker_->SizeBeingCompacted(size_being_compacted); v->Finalize(size_being_compacted); - AppendVersion(cfd.second, v); + AppendVersion(cfd, v); } manifest_file_size_ = manifest_file_size; @@ -1771,15 +1817,65 @@ Status VersionSet::Recover() { return s; } -Status VersionSet::DumpManifest(Options& options, std::string& dscname, - bool verbose, bool hex) { - struct LogReporter : public log::Reader::Reporter { - Status* status; - virtual void Corruption(size_t bytes, const Status& s) { - if (this->status->ok()) *this->status = s; +Status VersionSet::ListColumnFamilies(std::vector* column_families, + const std::string& dbname, Env* env) { + + // these are just for performance reasons, not correcntes, + // so we're fine using the defaults + EnvOptions soptions; + // Read "CURRENT" file, which contains a pointer to the current manifest file + std::string current; + Status s = ReadFileToString(env, CurrentFileName(dbname), ¤t); + if (!s.ok()) { + return s; + } + if (current.empty() || current[current.size()-1] != '\n') { + return Status::Corruption("CURRENT file does not end with newline"); + } + current.resize(current.size() - 1); + + std::string dscname = dbname + "/" + current; + unique_ptr file; + s = env->NewSequentialFile(dscname, &file, soptions); + if (!s.ok()) { + return s; + } + + std::map column_family_names; + // default column family is always implicitly there + column_family_names.insert({0, default_column_family_name}); + VersionSet::LogReporter reporter; + reporter.status = &s; + log::Reader reader(std::move(file), &reporter, true /*checksum*/, + 0 /*initial_offset*/); + Slice record; + std::string scratch; + while (reader.ReadRecord(&record, &scratch) && s.ok()) { + VersionEdit edit; + s = edit.DecodeFrom(record); + if (!s.ok()) { + break; + } + if (edit.is_column_family_add_) { + column_family_names.insert( + {edit.column_family_, edit.column_family_name_}); + } else if (edit.is_column_family_drop_) { + column_family_names.erase(edit.column_family_); + } + } + + column_families->clear(); + if (s.ok()) { + for (const auto& iter : column_family_names) { + column_families->push_back(iter.second); } - }; + } + + return s; +} +Status VersionSet::DumpManifest(Options& options, std::string& dscname, + bool verbose, bool hex) { // Open the specified manifest file. unique_ptr file; Status s = options.env->NewSequentialFile(dscname, &file, storage_options_); @@ -1797,11 +1893,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, uint64_t prev_log_number = 0; int count = 0; // TODO works only for default column family currently - VersionSet::Builder builder(this, - column_family_data_.find(0)->second->current); + VersionSet::Builder builder(this, column_family_set_->GetDefault()->current); { - LogReporter reporter; + VersionSet::LogReporter reporter; reporter.status = &s; log::Reader reader(std::move(file), &reporter, true/*checksum*/, 0/*initial_offset*/); @@ -1905,15 +2000,15 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) { Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? - for (auto cfd : column_family_data_) { + for (auto cfd : *column_family_set_) { { // Store column family info VersionEdit edit; - if (cfd.first != 0) { + if (cfd->id != 0) { // default column family is always there, // no need to explicitly write it - edit.AddColumnFamily(cfd.second->name); - edit.SetColumnFamily(cfd.first); + edit.AddColumnFamily(cfd->name); + edit.SetColumnFamily(cfd->id); std::string record; edit.EncodeTo(&record); Status s = log->AddRecord(record); @@ -1926,13 +2021,10 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { { // Save files VersionEdit edit; - edit.SetColumnFamily(cfd.first); + edit.SetColumnFamily(cfd->id); for (int level = 0; level < NumberLevels(); level++) { - const std::vector& files = - cfd.second->current->files_[level]; - for (size_t i = 0; i < files.size(); i++) { - const FileMetaData* f = files[i]; + for (const auto& f : cfd->current->files_[level]) { edit.AddFile(level, f->number, f->file_size, @@ -2025,9 +2117,9 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { void VersionSet::AddLiveFiles(std::vector* live_list) { // pre-calculate space requirement int64_t total_files = 0; - for (auto cfd : column_family_data_) { - for (Version* v = cfd.second->dummy_versions.next_; - v != &cfd.second->dummy_versions; v = v->next_) { + for (auto cfd : *column_family_set_) { + for (Version* v = cfd->dummy_versions->next_; v != cfd->dummy_versions; + v = v->next_) { for (int level = 0; level < v->NumberLevels(); level++) { total_files += v->files_[level].size(); } @@ -2037,9 +2129,9 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { // just one time extension to the right size live_list->reserve(live_list->size() + total_files); - for (auto cfd : column_family_data_) { - for (Version* v = cfd.second->dummy_versions.next_; - v != &cfd.second->dummy_versions; v = v->next_) { + for (auto cfd : *column_family_set_) { + for (Version* v = cfd->dummy_versions->next_; v != cfd->dummy_versions; + v = v->next_) { for (int level = 0; level < v->NumberLevels(); level++) { for (const auto& f : v->files_[level]) { live_list->push_back(f->number); @@ -2051,7 +2143,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { Compaction* VersionSet::PickCompaction() { // TODO this only works for default column family now - Version* version = column_family_data_.find(0)->second->current; + Version* version = column_family_set_->GetDefault()->current; return compaction_picker_->PickCompaction(version); } @@ -2060,7 +2152,7 @@ Compaction* VersionSet::CompactRange(int input_level, int output_level, const InternalKey* end, InternalKey** compaction_end) { // TODO this only works for default column family now - Version* version = column_family_data_.find(0)->second->current; + Version* version = column_family_set_->GetDefault()->current; return compaction_picker_->CompactRange(version, input_level, output_level, begin, end, compaction_end); } @@ -2112,7 +2204,7 @@ uint64_t VersionSet::MaxFileSizeForLevel(int level) { bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { #ifndef NDEBUG // TODO this only works for default column family now - Version* version = column_family_data_.find(0)->second->current; + Version* version = column_family_set_->GetDefault()->current; if (c->input_version() != version) { Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); } @@ -2163,13 +2255,12 @@ void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) { Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, FileMetaData* meta) { - for (auto cfd : column_family_data_) { - Version* version = cfd.second->current; + for (auto cfd : *column_family_set_) { + Version* version = cfd->current; for (int level = 0; level < version->NumberLevels(); level++) { - const std::vector& files = version->files_[level]; - for (size_t i = 0; i < files.size(); i++) { - if (files[i]->number == number) { - *meta = *files[i]; + for (const auto& file : version->files_[level]) { + if (file->number == number) { + *meta = *file; *filelevel = level; return Status::OK(); } @@ -2180,19 +2271,17 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, } void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { - for (auto cfd : column_family_data_) { + for (auto cfd : *column_family_set_) { for (int level = 0; level < NumberLevels(); level++) { - const std::vector& files = - cfd.second->current->files_[level]; - for (size_t i = 0; i < files.size(); i++) { + for (const auto& file : cfd->current->files_[level]) { LiveFileMetaData filemetadata; - filemetadata.name = TableFileName("", files[i]->number); + filemetadata.name = TableFileName("", file->number); filemetadata.level = level; - filemetadata.size = files[i]->file_size; - filemetadata.smallestkey = files[i]->smallest.user_key().ToString(); - filemetadata.largestkey = files[i]->largest.user_key().ToString(); - filemetadata.smallest_seqno = files[i]->smallest_seqno; - filemetadata.largest_seqno = files[i]->largest_seqno; + filemetadata.size = file->file_size; + filemetadata.smallestkey = file->smallest.user_key().ToString(); + filemetadata.largestkey = file->largest.user_key().ToString(); + filemetadata.smallest_seqno = file->smallest_seqno; + filemetadata.largest_seqno = file->largest_seqno; metadata->push_back(filemetadata); } } @@ -2206,29 +2295,18 @@ void VersionSet::GetObsoleteFiles(std::vector* files) { ColumnFamilyData* VersionSet::CreateColumnFamily( const ColumnFamilyOptions& options, VersionEdit* edit) { - assert(column_families_.find(edit->column_family_name_) == - column_families_.end()); assert(edit->is_column_family_add_); - column_families_.insert({edit->column_family_name_, edit->column_family_}); - ColumnFamilyData* new_cfd = - new ColumnFamilyData(edit->column_family_name_, this, options); - column_family_data_.insert({edit->column_family_, new_cfd}); - max_column_family_ = std::max(max_column_family_, edit->column_family_); + Version* dummy_versions = new Version(this); + auto new_cfd = column_family_set_->CreateColumnFamily( + edit->column_family_name_, edit->column_family_, dummy_versions, options); + AppendVersion(new_cfd, new Version(this, current_version_number_++)); return new_cfd; } void VersionSet::DropColumnFamily(VersionEdit* edit) { - auto cfd = column_family_data_.find(edit->column_family_); - assert(cfd != column_family_data_.end()); - column_families_.erase(cfd->second->name); - cfd->second->current->Unref(); - // List must be empty - assert(cfd->second->dummy_versions.next_ == &cfd->second->dummy_versions); - // might delete itself - cfd->second->Unref(); - column_family_data_.erase(cfd); + column_family_set_->DropColumnFamily(edit->column_family_); } } // namespace rocksdb diff --git a/db/version_set.h b/db/version_set.h index 65a1406aa..5ea1320cc 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -29,6 +29,8 @@ #include "db/table_cache.h" #include "db/compaction.h" #include "db/compaction_picker.h" +#include "db/column_family.h" +#include "db/log_reader.h" namespace rocksdb { @@ -42,6 +44,8 @@ class TableCache; class Version; class VersionSet; class MergeContext; +struct ColumnFamilyData; +class ColumnFamilySet; // Return the smallest index i such that files[i]->largest >= key. // Return files.size() if there is no such file. @@ -263,38 +267,6 @@ class Version { void operator=(const Version&); }; -// column family metadata -struct ColumnFamilyData { - std::string name; - Version dummy_versions; // Head of circular doubly-linked list of versions. - Version* current; // == dummy_versions.prev_ - ColumnFamilyOptions options; - int refs; - - void Ref() { - ++refs; - } - - void Unref() { - assert(refs > 0); - if (refs == 1) { - delete this; - } else { - --refs; - } - } - - ColumnFamilyData(const std::string& name, - VersionSet* vset, - const ColumnFamilyOptions& options) - : name(name), - dummy_versions(vset), - current(nullptr), - options(options), - refs(1) {} - ~ColumnFamilyData() {} -}; - class VersionSet { public: VersionSet(const std::string& dbname, const Options* options, @@ -315,12 +287,17 @@ class VersionSet { Status LogAndApply(VersionEdit* edit, port::Mutex* mu, bool new_descriptor_log = false) { - return LogAndApply( - column_family_data_.find(0)->second, edit, mu, new_descriptor_log); + return LogAndApply(column_family_set_->GetDefault(), edit, mu, + new_descriptor_log); } // Recover the last saved descriptor from persistent storage. - Status Recover(); + Status Recover(const std::vector& column_families); + + // Reads a manifest file and returns a list of column families in + // column_families. + static Status ListColumnFamilies(std::vector* column_families, + const std::string& dbname, Env* env); // Try to reduce the number of levels. This call is valid when // only one level from the new max level to the old @@ -333,7 +310,7 @@ class VersionSet { // Return the current version. Version* current() const { // TODO this only works for default column family now - return column_family_data_.find(0)->second->current; + return column_family_set_->GetDefault()->current; } // A Flag indicating whether write needs to slowdown because of there are @@ -418,7 +395,7 @@ class VersionSet { // TODO: improve this function to be accurate for universal // compactions. // TODO this only works for default column family now - Version* version = column_family_data_.find(0)->second->current; + Version* version = column_family_set_->GetDefault()->current; int num_levels_to_check = (options_->compaction_style != kCompactionStyleUniversal) ? NumberLevels() - 1 : 1; @@ -432,21 +409,21 @@ class VersionSet { // Returns true iff some level needs a compaction. bool NeedsCompaction() const { // TODO this only works for default column family now - Version* version = column_family_data_.find(0)->second->current; + Version* version = column_family_set_->GetDefault()->current; return ((version->file_to_compact_ != nullptr) || NeedsSizeCompaction()); } // Returns the maxmimum compaction score for levels 1 to max double MaxCompactionScore() const { // TODO this only works for default column family now - Version* version = column_family_data_.find(0)->second->current; + Version* version = column_family_set_->GetDefault()->current; return version->max_compaction_score_; } // See field declaration int MaxCompactionScoreLevel() const { // TODO this only works for default column family now - Version* version = column_family_data_.find(0)->second->current; + Version* version = column_family_set_->GetDefault()->current; return version->max_compaction_score_level_; } @@ -490,9 +467,7 @@ class VersionSet { void DropColumnFamily(VersionEdit* edit); - std::unordered_map column_families_; - std::unordered_map column_family_data_; - uint32_t max_column_family_; + ColumnFamilySet* GetColumnFamilySet() { return column_family_set_.get(); } private: class Builder; @@ -501,6 +476,13 @@ class VersionSet { friend class Compaction; friend class Version; + struct LogReporter : public log::Reader::Reporter { + Status* status; + virtual void Corruption(size_t bytes, const Status& s) { + if (this->status->ok()) *this->status = s; + } + }; + // Save current contents to *log Status WriteSnapshot(log::Writer* log); @@ -508,6 +490,8 @@ class VersionSet { bool ManifestContains(const std::string& record) const; + std::unique_ptr column_family_set_; + Env* const env_; const std::string dbname_; const Options* const options_; diff --git a/db/version_set_reduce_num_levels.cc b/db/version_set_reduce_num_levels.cc index e22b82a5a..707719f90 100644 --- a/db/version_set_reduce_num_levels.cc +++ b/db/version_set_reduce_num_levels.cc @@ -25,7 +25,7 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { } // TODO this only works for default column family now - Version* current_version = column_family_data_.find(0)->second->current; + Version* current_version = column_family_set_->GetDefault()->current; int current_levels = current_version->NumberLevels(); if (current_levels <= new_levels) { diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index b60c96cbe..a65dcfb9d 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -29,6 +29,8 @@ extern const ColumnFamilyHandle default_column_family; struct ColumnFamilyDescriptor { std::string name; ColumnFamilyOptions options; + ColumnFamilyDescriptor() + : name(default_column_family_name), options(ColumnFamilyOptions()) {} ColumnFamilyDescriptor(const std::string& name, const ColumnFamilyOptions& options) : name(name), options(options) {} diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 65ecd61a2..341d154e9 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -11,6 +11,7 @@ #include "db/filename.h" #include "db/write_batch_internal.h" #include "rocksdb/write_batch.h" +#include "rocksdb/column_family.h" #include "util/coding.h" #include @@ -1015,10 +1016,12 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, TableCache tc(db_path_, &opt, soptions, 10); const InternalKeyComparator cmp(opt.comparator); VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); + std::vector dummy; + dummy.push_back(ColumnFamilyDescriptor()); // We rely the VersionSet::Recover to tell us the internal data structures // in the db. And the Recover() should never do any change // (like LogAndApply) to the manifest file. - Status st = versions.Recover(); + Status st = versions.Recover(dummy); if (!st.ok()) { return st; } @@ -1072,10 +1075,12 @@ void ReduceDBLevelsCommand::DoCommand() { TableCache tc(db_path_, &opt, soptions, 10); const InternalKeyComparator cmp(opt.comparator); VersionSet versions(db_path_, &opt, soptions, &tc, &cmp); + std::vector dummy; + dummy.push_back(ColumnFamilyDescriptor()); // We rely the VersionSet::Recover to tell us the internal data structures // in the db. And the Recover() should never do any change (like LogAndApply) // to the manifest file. - st = versions.Recover(); + st = versions.Recover(dummy); if (!st.ok()) { exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); return;