diff --git a/db/db_impl.cc b/db/db_impl.cc index 0909c5694..7cfd00b58 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -906,7 +906,7 @@ Status DBImpl::Recover( } auto cf_data_iter = versions_->column_family_data_.find(cf_iter->second); assert(cf_data_iter != versions_->column_family_data_.end()); - cf_data_iter->second.options = cf.options; + cf_data_iter->second->options = cf.options; } SequenceNumber max_sequence(0); @@ -2888,15 +2888,19 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, Status s = versions_->LogAndApply(&edit, &mutex_); if (s.ok()) { // add to internal data structures - versions_->column_families_[column_family_name] = handle->id; - versions_->column_family_data_.insert( - {handle->id, - VersionSet::ColumnFamilyData(column_family_name, options)}); + versions_->CreateColumnFamily(options, &edit); } return s; } Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { + // TODO this is not good. implement some sort of refcounting + // column family data and only delete when refcount goes to 0 + // We don't want to delete column family if there is a compaction going on, + // or if there are some outstanding iterators + if (column_family.id == 0) { + return Status::InvalidArgument("Can't drop default column family"); + } VersionEdit edit(0); edit.DropColumnFamily(); edit.SetColumnFamily(column_family.id); @@ -2908,10 +2912,7 @@ Status DBImpl::DropColumnFamily(const ColumnFamilyHandle& column_family) { Status s = versions_->LogAndApply(&edit, &mutex_); if (s.ok()) { // remove from internal data structures - auto cf_iter = versions_->column_families_.find(data_iter->second.name); - assert(cf_iter != versions_->column_families_.end()); - versions_->column_families_.erase(cf_iter); - versions_->column_family_data_.erase(data_iter); + versions_->DropColumnFamily(&edit); } return s; } @@ -3931,7 +3932,7 @@ Status DB::OpenWithColumnFamilies( } impl->mutex_.Unlock(); - if (options.compaction_style == kCompactionStyleUniversal) { + if (s.ok() && options.compaction_style == kCompactionStyleUniversal) { int num_files; for (int i = 1; i < impl->NumberLevels(); i++) { num_files = impl->versions_->NumLevelFiles(i); diff --git a/db/version_set.cc b/db/version_set.cc index f65851329..e1352ebe5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -415,7 +415,6 @@ Version::Version(VersionSet* vset, uint64_t version_number) file_to_compact_level_(-1), compaction_score_(vset->NumberLevels()), compaction_level_(vset->NumberLevels()), - offset_manifest_file_(0), version_number_(version_number) { } @@ -595,7 +594,9 @@ void Version::Ref() { } void Version::Unref() { - assert(this != &vset_->dummy_versions_); + for (auto cfd : vset_->column_family_data_) { + assert(this != &cfd.second->dummy_versions); + } assert(refs_ >= 1); --refs_; if (refs_ == 0) { @@ -645,13 +646,12 @@ int Version::PickLevelForMemTableOutput( // If hint_index is specified, then it points to a file in the // overlapping range. // The file_index returns a pointer to any file in an overlapping range. -void Version::GetOverlappingInputs( - int level, - const InternalKey* begin, - const InternalKey* end, - std::vector* inputs, - int hint_index, - int* file_index) { +void Version::GetOverlappingInputs(int level, + const InternalKey* begin, + const InternalKey* end, + std::vector* inputs, + int hint_index, + int* file_index) { inputs->clear(); Slice user_begin, user_end; if (begin != nullptr) { @@ -1149,21 +1149,22 @@ VersionSet::VersionSet(const std::string& dbname, log_number_(0), prev_log_number_(0), num_levels_(options_->num_levels), - dummy_versions_(this), - current_(nullptr), compactions_in_progress_(options_->num_levels), current_version_number_(0), - last_observed_manifest_size_(0), + manifest_file_size_(0), storage_options_(storage_options), storage_options_compactions_(storage_options_) { compact_pointer_ = new std::string[options_->num_levels]; Init(options_->num_levels); - AppendVersion(new Version(this, current_version_number_++)); } VersionSet::~VersionSet() { - current_->Unref(); - assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty + for (auto cfd : column_family_data_) { + cfd.second->current->Unref(); + // List must be empty + assert(cfd.second->dummy_versions.next_ == &cfd.second->dummy_versions); + delete cfd.second; + } for (auto file : obsolete_files_) { delete file; } @@ -1193,26 +1194,29 @@ void VersionSet::Init(int num_levels) { } } -void VersionSet::AppendVersion(Version* v) { +void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, + Version* v) { // Make "v" current assert(v->refs_ == 0); - assert(v != current_); - if (current_ != nullptr) { - assert(current_->refs_ > 0); - current_->Unref(); + assert(v != column_family_data->current); + if (column_family_data->current != nullptr) { + assert(column_family_data->current->refs_ > 0); + column_family_data->current->Unref(); } - current_ = v; + column_family_data->current = v; v->Ref(); // Append to linked list - v->prev_ = dummy_versions_.prev_; - v->next_ = &dummy_versions_; + v->prev_ = column_family_data->dummy_versions.prev_; + v->next_ = &column_family_data->dummy_versions; v->prev_->next_ = v; v->next_->prev_ = v; } -Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, - bool new_descriptor_log) { +Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, + VersionEdit* edit, + port::Mutex* mu, + bool new_descriptor_log) { mu->AssertHeld(); // queue our request @@ -1227,7 +1231,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, std::vector batch_edits; Version* v = new Version(this, current_version_number_++); - Builder builder(this, current_); + Builder builder(this, column_family_data->current); // process all requests in the queue ManifestWriter* last_writer = &w; @@ -1251,7 +1255,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // No need to perform this check if a new Manifest is being created anyways. if (!descriptor_log_ || - last_observed_manifest_size_ > options_->max_manifest_file_size) { + manifest_file_size_ > options_->max_manifest_file_size) { new_descriptor_log = true; manifest_file_number_ = NewFileNumber(); // Change manifest file no. } @@ -1341,15 +1345,12 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, LogFlush(options_->info_log); mu->Lock(); - // cache the manifest_file_size so that it can be used to rollover in the - // next call to LogAndApply - last_observed_manifest_size_ = new_manifest_file_size; } // Install the new version if (s.ok()) { - v->offset_manifest_file_ = new_manifest_file_size; - AppendVersion(v); + manifest_file_size_ = new_manifest_file_size; + AppendVersion(column_family_data, v); log_number_ = edit->log_number_; prev_log_number_ = edit->prev_log_number_; @@ -1381,8 +1382,10 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, return s; } -void VersionSet::LogAndApplyHelper(Builder* builder, Version* v, - VersionEdit* edit, port::Mutex* mu) { +void VersionSet::LogAndApplyHelper(Builder* builder, + Version* v, + VersionEdit* edit, + port::Mutex* mu) { mu->AssertHeld(); if (edit->has_log_number_) { @@ -1444,12 +1447,15 @@ Status VersionSet::Recover() { uint64_t last_sequence = 0; uint64_t log_number = 0; uint64_t prev_log_number = 0; - Builder builder(this, current_); + std::unordered_map builders; // add default column family - column_families_.insert({default_column_family_name, 0}); - column_family_data_.insert( - {0, ColumnFamilyData(default_column_family_name)}); + VersionEdit default_cf_edit(0); + default_cf_edit.AddColumnFamily(default_column_family_name); + default_cf_edit.SetColumnFamily(0); + ColumnFamilyData* default_cfd = + CreateColumnFamily(ColumnFamilyOptions(*options_), &default_cf_edit); + builders.insert({0, new Builder(this, default_cfd->current)}); { LogReporter reporter; @@ -1470,25 +1476,28 @@ Status VersionSet::Recover() { } } - if (s.ok()) { - builder.Apply(&edit); + if (!s.ok()) { + break; } if (edit.is_column_family_add_) { - assert(column_families_.find(edit.column_family_name_) == - column_families_.end()); - column_families_.insert( - {edit.column_family_name_, edit.column_family_}); - column_family_data_.insert( - {edit.column_family_, ColumnFamilyData(edit.column_family_name_)}); - max_column_family_ = std::max(max_column_family_, edit.column_family_); - } - - if (edit.is_column_family_drop_) { - auto cf = column_family_data_.find(edit.column_family_); - assert(cf != column_family_data_.end()); - column_families_.erase(cf->second.name); - column_family_data_.erase(cf); + ColumnFamilyData* new_cfd = + CreateColumnFamily(ColumnFamilyOptions(), &edit); + builders.insert( + {edit.column_family_, new Builder(this, new_cfd->current)}); + } else if (edit.is_column_family_drop_) { + auto builder = builders.find(edit.column_family_); + assert(builder != builders.end()); + delete builder->second; + builders.erase(builder); + DropColumnFamily(&edit); + } else { + // if it isn't column family add or column family drop, + // then it's a file add/delete, which should be forwarded + // to builder + auto builder = builders.find(edit.column_family_); + assert(builder != builders.end()); + builder->second->Apply(&edit); } if (edit.has_log_number_) { @@ -1532,16 +1541,18 @@ Status VersionSet::Recover() { } if (s.ok()) { - Version* v = new Version(this, current_version_number_++); - builder.SaveTo(v); + for (auto cfd : column_family_data_) { + Version* v = new Version(this, current_version_number_++); + builders[cfd.first]->SaveTo(v); - // Install recovered version - std::vector size_being_compacted(NumberLevels()-1); - SizeBeingCompacted(size_being_compacted); - Finalize(v, size_being_compacted); + // Install recovered version + std::vector size_being_compacted(NumberLevels()-1); + SizeBeingCompacted(size_being_compacted); + Finalize(v, size_being_compacted); + AppendVersion(cfd.second, v); + } - v->offset_manifest_file_ = manifest_file_size; - AppendVersion(v); + manifest_file_size_ = manifest_file_size; manifest_file_number_ = next_file; next_file_number_ = next_file + 1; last_sequence_ = last_sequence; @@ -1560,11 +1571,17 @@ Status VersionSet::Recover() { (unsigned long)prev_log_number_); } + for (auto builder : builders) { + delete builder.second; + } + return s; } -Status VersionSet::DumpManifest(Options& options, std::string& dscname, - bool verbose, bool hex) { +Status VersionSet::DumpManifest(Options& options, + std::string& dscname, + bool verbose, + bool hex) { struct LogReporter : public log::Reader::Reporter { Status* status; virtual void Corruption(size_t bytes, const Status& s) { @@ -1588,7 +1605,9 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, uint64_t log_number = 0; uint64_t prev_log_number = 0; int count = 0; - VersionSet::Builder builder(this, current_); + // TODO works only for default column family currently + VersionSet::Builder builder(this, + column_family_data_.find(0)->second->current); { LogReporter reporter; @@ -1616,7 +1635,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, } count++; - if (s.ok()) { + if (s.ok() && edit.column_family_ == 0) { builder.Apply(&edit); } @@ -1667,12 +1686,6 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, Version* v = new Version(this, 0); builder.SaveTo(v); - // Install recovered version - std::vector size_being_compacted(NumberLevels()-1); - SizeBeingCompacted(size_being_compacted); - Finalize(v, size_being_compacted); - - AppendVersion(v); manifest_file_number_ = next_file; next_file_number_ = next_file + 1; last_sequence_ = last_sequence; @@ -1852,21 +1865,49 @@ void VersionSet::UpdateFilesBySize(Version* v) { Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? - // Save column families - for (auto cf : column_families_) { - VersionEdit edit(0); - if (cf.second == 0) { - // default column family is always there, - // no need to explicitly write it - continue; + for (auto cfd : column_family_data_) { + { + // Store column family info + VersionEdit edit(0); + if (cfd.first != 0) { + // default column family is always there, + // no need to explicitly write it + edit.AddColumnFamily(cfd.second->name); + edit.SetColumnFamily(cfd.first); + std::string record; + edit.EncodeTo(&record); + Status s = log->AddRecord(record); + if (!s.ok()) { + return s; + } + } } - edit.AddColumnFamily(cf.first); - edit.SetColumnFamily(cf.second); - std::string record; - edit.EncodeTo(&record); - Status s = log->AddRecord(record); - if (!s.ok()) { - return s; + + { + // Save files + VersionEdit edit(NumberLevels()); + edit.SetColumnFamily(cfd.first); + + for (int level = 0; level < NumberLevels(); level++) { + const std::vector& files = + cfd.second->current->files_[level]; + for (size_t i = 0; i < files.size(); i++) { + const FileMetaData* f = files[i]; + edit.AddFile(level, + f->number, + f->file_size, + f->smallest, + f->largest, + f->smallest_seqno, + f->largest_seqno); + } + } + std::string record; + edit.EncodeTo(&record); + Status s = log->AddRecord(record); + if (!s.ok()) { + return s; + } } } @@ -1883,16 +1924,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { } } - // Save files - for (int level = 0; level < NumberLevels(); level++) { - const std::vector& files = current_->files_[level]; - for (size_t i = 0; i < files.size(); i++) { - const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest, - f->smallest_seqno, f->largest_seqno); - } - } - std::string record; edit.EncodeTo(&record); return log->AddRecord(record); @@ -1901,15 +1932,20 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { int VersionSet::NumLevelFiles(int level) const { assert(level >= 0); assert(level < NumberLevels()); - return current_->files_[level].size(); + // TODO this only works for default column family now + assert(column_family_data_.find(0) != column_family_data_.end()); + Version* version = column_family_data_.find(0)->second->current; + return version->files_[level].size(); } const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const { + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files["); for (int i = 0; i < NumberLevels(); i++) { int sz = sizeof(scratch->buffer) - len; int ret = snprintf(scratch->buffer + len, sz, "%d ", - int(current_->files_[i].size())); + int(version->files_[i].size())); if (ret < 0 || ret >= sz) break; len += ret; @@ -1933,11 +1969,12 @@ const char* VersionSet::LevelDataSizeSummary( return scratch->buffer; } -const char* VersionSet::LevelFileSummary( - FileSummaryStorage* scratch, int level) const { +const char* VersionSet::LevelFileSummary(Version* v, + FileSummaryStorage* scratch, + int level) const { int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); - for (unsigned int i = 0; i < current_->files_[level].size(); i++) { - FileMetaData* f = current_->files_[level][i]; + for (unsigned int i = 0; i < v->files_[level].size(); i++) { + FileMetaData* f = v->files_[level][i]; int sz = sizeof(scratch->buffer) - len; int ret = snprintf(scratch->buffer + len, sz, "#%lu(seq=%lu,sz=%lu,%lu) ", @@ -2018,30 +2055,35 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { void VersionSet::AddLiveFiles(std::vector* live_list) { // pre-calculate space requirement int64_t total_files = 0; - for (Version* v = dummy_versions_.next_; - v != &dummy_versions_; - v = v->next_) { - for (int level = 0; level < NumberLevels(); level++) { - total_files += v->files_[level].size(); + for (auto cfd : column_family_data_) { + for (Version* v = cfd.second->dummy_versions.next_; + v != &cfd.second->dummy_versions; + v = v->next_) { + for (int level = 0; level < NumberLevels(); level++) { + total_files += v->files_[level].size(); + } } } // just one time extension to the right size live_list->reserve(live_list->size() + total_files); - for (Version* v = dummy_versions_.next_; - v != &dummy_versions_; - v = v->next_) { - for (int level = 0; level < NumberLevels(); level++) { - for (const auto& f : v->files_[level]) { - live_list->push_back(f->number); + for (auto cfd : column_family_data_) { + for (Version* v = cfd.second->dummy_versions.next_; + v != &cfd.second->dummy_versions; + v = v->next_) { + for (int level = 0; level < NumberLevels(); level++) { + for (const auto& f : v->files_[level]) { + live_list->push_back(f->number); + } } } } } void VersionSet::AddLiveFilesCurrentVersion(std::set* live) { - Version* v = current_; + // TODO this only works for default column family now + Version* v = column_family_data_.find(0)->second->current; for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = v->files_[level]; for (size_t i = 0; i < files.size(); i++) { @@ -2051,20 +2093,24 @@ void VersionSet::AddLiveFilesCurrentVersion(std::set* live) { } int64_t VersionSet::NumLevelBytes(int level) const { + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; assert(level >= 0); assert(level < NumberLevels()); - assert(current_); - return TotalFileSize(current_->files_[level]); + assert(version); + return TotalFileSize(version->files_[level]); } int64_t VersionSet::MaxNextLevelOverlappingBytes() { + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; uint64_t result = 0; std::vector overlaps; for (int level = 1; level < NumberLevels() - 1; level++) { - for (size_t i = 0; i < current_->files_[level].size(); i++) { - const FileMetaData* f = current_->files_[level][i]; - current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest, - &overlaps); + for (size_t i = 0; i < version->files_[level].size(); i++) { + const FileMetaData* f = version->files_[level][i]; + version->GetOverlappingInputs( + level + 1, &f->smallest, &f->largest, &overlaps); const uint64_t sum = TotalFileSize(overlaps); if (sum > result) { result = sum; @@ -2177,7 +2223,9 @@ uint64_t VersionSet::MaxGrandParentOverlapBytes(int level) { // in the current version bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { #ifndef NDEBUG - if (c->input_version_ != current_) { + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; + if (c->input_version_ != version) { Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch"); } @@ -2188,8 +2236,8 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { // look for this file in the current version bool found = false; - for (unsigned int j = 0; j < current_->files_[level].size(); j++) { - FileMetaData* f = current_->files_[level][j]; + for (unsigned int j = 0; j < version->files_[level].size(); j++) { + FileMetaData* f = version->files_[level][j]; if (f->number == number) { found = true; break; @@ -2206,8 +2254,8 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { // look for this file in the current version bool found = false; - for (unsigned int j = 0; j < current_->files_[level].size(); j++) { - FileMetaData* f = current_->files_[level][j]; + for (unsigned int j = 0; j < version->files_[level].size(); j++) { + FileMetaData* f = version->files_[level][j]; if (f->number == number) { found = true; break; @@ -2257,17 +2305,19 @@ void VersionSet::SizeBeingCompacted(std::vector& sizes) { // base file (overrides configured values of file-size ratios, // min_merge_width and max_merge_width). // -Compaction* VersionSet::PickCompactionUniversalSizeAmp( - int level, double score) { +Compaction* VersionSet::PickCompactionUniversalSizeAmp(int level, + double score) { assert (level == 0); + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; // percentage flexibilty while reducing size amplification uint64_t ratio = options_->compaction_options_universal. max_size_amplification_percent; // The files are sorted from newest first to oldest last. - std::vector& file_by_time = current_->files_by_size_[level]; - assert(file_by_time.size() == current_->files_[level].size()); + std::vector& file_by_time = version->files_by_size_[level]; + assert(file_by_time.size() == version->files_[level].size()); unsigned int candidate_count = 0; uint64_t candidate_size = 0; @@ -2277,7 +2327,7 @@ Compaction* VersionSet::PickCompactionUniversalSizeAmp( // Skip files that are already being compacted for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) { int index = file_by_time[loop]; - f = current_->files_[level][index]; + f = version->files_[level][index]; if (!f->being_compacted) { start_index = loop; // Consider this as the first candidate. break; @@ -2301,7 +2351,7 @@ Compaction* VersionSet::PickCompactionUniversalSizeAmp( for (unsigned int loop = start_index; loop < file_by_time.size() - 1; loop++) { int index = file_by_time[loop]; - f = current_->files_[level][index]; + f = version->files_[level][index]; if (f->being_compacted) { Log(options_->info_log, "Universal: Possible candidate file %lu[%d] %s.", @@ -2319,7 +2369,7 @@ Compaction* VersionSet::PickCompactionUniversalSizeAmp( // size of earliest file int index = file_by_time[file_by_time.size() - 1]; - uint64_t earliest_file_size = current_->files_[level][index]->file_size; + uint64_t earliest_file_size = version->files_[level][index]->file_size; // size amplification = percentage of additional size if (candidate_size * 100 < ratio * earliest_file_size) { @@ -2340,13 +2390,18 @@ Compaction* VersionSet::PickCompactionUniversalSizeAmp( // create a compaction request // We always compact all the files, so always compress. - Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), - LLONG_MAX, NumberLevels(), false, + Compaction* c = new Compaction(level, + level, + MaxFileSizeForLevel(level), + LLONG_MAX, + NumberLevels(), + version, + false, true); c->score_ = score; for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) { int index = file_by_time[loop]; - f = current_->files_[level][index]; + f = version->files_[level][index]; c->inputs_[0].push_back(f); Log(options_->info_log, "Universal: size amp picking file %lu[%d] with size %lu", @@ -2365,18 +2420,21 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( int level, double score, unsigned int ratio, unsigned int max_number_of_files_to_compact) { + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; + unsigned int min_merge_width = options_->compaction_options_universal.min_merge_width; unsigned int max_merge_width = options_->compaction_options_universal.max_merge_width; // The files are sorted from newest first to oldest last. - std::vector& file_by_time = current_->files_by_size_[level]; + std::vector& file_by_time = version->files_by_size_[level]; FileMetaData* f = nullptr; bool done = false; int start_index = 0; unsigned int candidate_count; - assert(file_by_time.size() == current_->files_[level].size()); + assert(file_by_time.size() == version->files_[level].size()); unsigned int max_files_to_compact = std::min(max_merge_width, max_number_of_files_to_compact); @@ -2391,7 +2449,7 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( // Skip files that are already being compacted for (f = nullptr; loop < file_by_time.size(); loop++) { int index = file_by_time[loop]; - f = current_->files_[level][index]; + f = version->files_[level][index]; if (!f->being_compacted) { candidate_count = 1; @@ -2416,7 +2474,7 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( candidate_count < max_files_to_compact && i < file_by_time.size(); i++) { int index = file_by_time[i]; - FileMetaData* f = current_->files_[level][index]; + FileMetaData* f = version->files_[level][index]; if (f->being_compacted) { break; } @@ -2439,7 +2497,7 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( for (unsigned int i = loop; i < loop + candidate_count && i < file_by_time.size(); i++) { int index = file_by_time[i]; - FileMetaData* f = current_->files_[level][index]; + FileMetaData* f = version->files_[level][index]; Log(options_->info_log, "Universal: Skipping file %lu[%d] with size %lu %d\n", (unsigned long)f->number, @@ -2459,25 +2517,30 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( int ratio_to_compress = options_->compaction_options_universal.compression_size_percent; if (ratio_to_compress >= 0) { - uint64_t total_size = TotalFileSize(current_->files_[level]); + uint64_t total_size = TotalFileSize(version->files_[level]); uint64_t older_file_size = 0; for (unsigned int i = file_by_time.size() - 1; i >= first_index_after; i--) { - older_file_size += current_->files_[level][file_by_time[i]]->file_size; + older_file_size += version->files_[level][file_by_time[i]]->file_size; if (older_file_size * 100L >= total_size * (long) ratio_to_compress) { enable_compression = false; break; } } } - Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), - LLONG_MAX, NumberLevels(), false, + Compaction* c = new Compaction(level, + level, + MaxFileSizeForLevel(level), + LLONG_MAX, + NumberLevels(), + version, + false, enable_compression); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { int index = file_by_time[i]; - FileMetaData* f = current_->files_[level][index]; + FileMetaData* f = version->files_[level][index]; c->inputs_[0].push_back(f); Log(options_->info_log, "Universal: Picking file %lu[%d] with size %lu\n", (unsigned long)f->number, @@ -2493,16 +2556,19 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( // Compaction* VersionSet::PickCompactionUniversal(int level, double score) { assert (level == 0); + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; - if ((current_->files_[level].size() < + if ((version->files_[level].size() < (unsigned int)options_->level0_file_num_compaction_trigger)) { Log(options_->info_log, "Universal: nothing to do\n"); return nullptr; } VersionSet::FileSummaryStorage tmp; - Log(options_->info_log, "Universal: candidate files(%lu): %s\n", - current_->files_[level].size(), - LevelFileSummary(&tmp, 0)); + Log(options_->info_log, + "Universal: candidate files(%lu): %s\n", + version->files_[level].size(), + LevelFileSummary(version, &tmp, 0)); // Check for size amplification first. Compaction* c = PickCompactionUniversalSizeAmp(level, score); @@ -2518,7 +2584,7 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { // compaction without looking at filesize ratios and try to reduce // the number of files to fewer than level0_file_num_compaction_trigger. if (c == nullptr) { - unsigned int num_files = current_->files_[level].size() - + unsigned int num_files = version->files_[level].size() - options_->level0_file_num_compaction_trigger; c = PickCompactionUniversalReadAmp(level, score, UINT_MAX, num_files); } @@ -2539,11 +2605,11 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { } // The files are sorted from newest first to oldest last. - std::vector& file_by_time = current_->files_by_size_[level]; + std::vector& file_by_time = version->files_by_size_[level]; // Is the earliest file part of this compaction? int last_index = file_by_time[file_by_time.size()-1]; - FileMetaData* last_file = current_->files_[level][last_index]; + FileMetaData* last_file = version->files_[level][last_index]; if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) { c->bottommost_level_ = true; } @@ -2554,9 +2620,6 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { c->inputs_[0].size()); } - c->input_version_ = current_; - c->input_version_->Ref(); - // mark all the files that are being compacted c->MarkFilesBeingCompacted(true); @@ -2565,13 +2628,15 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { // Record whether this compaction includes all sst files. // For now, it is only relevant in universal compaction mode. - c->is_full_compaction_ = (c->inputs_[0].size() == current_->files_[0].size()); + c->is_full_compaction_ = (c->inputs_[0].size() == version->files_[0].size()); return c; } Compaction* VersionSet::PickCompactionBySize(int level, double score) { Compaction* c = nullptr; + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; // level 0 files are overlapping. So we cannot pick more // than one concurrent compactions at this level. This @@ -2583,26 +2648,30 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { assert(level >= 0); assert(level+1 < NumberLevels()); - c = new Compaction(level, level+1, MaxFileSizeForLevel(level+1), - MaxGrandParentOverlapBytes(level), NumberLevels()); + c = new Compaction(level, + level + 1, + MaxFileSizeForLevel(level + 1), + MaxGrandParentOverlapBytes(level), + NumberLevels(), + version); c->score_ = score; // Pick the largest file in this level that is not already // being compacted - std::vector& file_size = current_->files_by_size_[level]; + std::vector& file_size = version->files_by_size_[level]; // record the first file that is not yet compacted int nextIndex = -1; - for (unsigned int i = current_->next_file_to_compact_by_size_[level]; + for (unsigned int i = version->next_file_to_compact_by_size_[level]; i < file_size.size(); i++) { int index = file_size[i]; - FileMetaData* f = current_->files_[level][index]; + FileMetaData* f = version->files_[level][index]; // check to verify files are arranged in descending size assert((i == file_size.size() - 1) || (i >= Version::number_of_files_to_sort_-1) || - (f->file_size >= current_->files_[level][file_size[i+1]]->file_size)); + (f->file_size >= version->files_[level][file_size[i+1]]->file_size)); // do not pick a file to compact if it is being compacted // from n-1 level. @@ -2638,25 +2707,27 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { } // store where to start the iteration in the next call to PickCompaction - current_->next_file_to_compact_by_size_[level] = nextIndex; + version->next_file_to_compact_by_size_[level] = nextIndex; return c; } Compaction* VersionSet::PickCompaction() { Compaction* c = nullptr; + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; int level = -1; // Compute the compactions needed. It is better to do it here // and also in LogAndApply(), otherwise the values could be stale. std::vector size_being_compacted(NumberLevels()-1); - current_->vset_->SizeBeingCompacted(size_being_compacted); - Finalize(current_, size_being_compacted); + version->vset_->SizeBeingCompacted(size_being_compacted); + Finalize(version, size_being_compacted); // In universal style of compaction, compact L0 files back into L0. if (options_->compaction_style == kCompactionStyleUniversal) { int level = 0; - c = PickCompactionUniversal(level, current_->compaction_score_[level]); + c = PickCompactionUniversal(level, version->compaction_score_[level]); return c; } @@ -2665,11 +2736,11 @@ Compaction* VersionSet::PickCompaction() { // // Find the compactions by size on all levels. for (int i = 0; i < NumberLevels()-1; i++) { - assert(i == 0 || current_->compaction_score_[i] <= - current_->compaction_score_[i-1]); - level = current_->compaction_level_[i]; - if ((current_->compaction_score_[i] >= 1)) { - c = PickCompactionBySize(level, current_->compaction_score_[i]); + assert(i == 0 || version->compaction_score_[i] <= + version->compaction_score_[i-1]); + level = version->compaction_level_[i]; + if ((version->compaction_score_[i] >= 1)) { + c = PickCompactionBySize(level, version->compaction_score_[i]); ExpandWhileOverlapping(c); if (c != nullptr) { break; @@ -2678,10 +2749,10 @@ Compaction* VersionSet::PickCompaction() { } // Find compactions needed by seeks - FileMetaData* f = current_->file_to_compact_; + FileMetaData* f = version->file_to_compact_; if (c == nullptr && f != nullptr && !f->being_compacted) { - level = current_->file_to_compact_level_; + level = version->file_to_compact_level_; int parent_index = -1; // Only allow one level 0 compaction at a time. @@ -2689,11 +2760,16 @@ Compaction* VersionSet::PickCompaction() { if (level != 0 || compactions_in_progress_[0].empty()) { if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, &parent_index)) { - c = new Compaction(level, level+1, MaxFileSizeForLevel(level+1), - MaxGrandParentOverlapBytes(level), NumberLevels(), true); + c = new Compaction(level, + level + 1, + MaxFileSizeForLevel(level + 1), + MaxGrandParentOverlapBytes(level), + NumberLevels(), + version, + true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; - current_->file_to_compact_ = nullptr; + version->file_to_compact_ = nullptr; ExpandWhileOverlapping(c); } } @@ -2703,9 +2779,6 @@ Compaction* VersionSet::PickCompaction() { return nullptr; } - c->input_version_ = current_; - c->input_version_->Ref(); - // Two level 0 compaction won't run at the same time, so don't need to worry // about files on level 0 being compacted. if (level == 0) { @@ -2716,7 +2789,7 @@ Compaction* VersionSet::PickCompaction() { // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. c->inputs_[0].clear(); - current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); + version->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); // If we include more L0 files in the same compaction run it can // cause the 'smallest' and 'largest' key to get extended to a @@ -2749,9 +2822,11 @@ Compaction* VersionSet::PickCompaction() { bool VersionSet::ParentRangeInCompaction(const InternalKey* smallest, const InternalKey* largest, int level, int* parent_index) { std::vector inputs; + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; - current_->GetOverlappingInputs(level+1, smallest, largest, - &inputs, *parent_index, parent_index); + version->GetOverlappingInputs( + level + 1, smallest, largest, &inputs, *parent_index, parent_index); return FilesInCompaction(inputs); } @@ -2799,8 +2874,8 @@ void VersionSet::ExpandWhileOverlapping(Compaction* c) { old_size = c->inputs_[0].size(); GetRange(c->inputs_[0], &smallest, &largest); c->inputs_[0].clear(); - current_->GetOverlappingInputs(level, &smallest, &largest, &c->inputs_[0], - hint_index, &hint_index); + c->input_version_->GetOverlappingInputs( + level, &smallest, &largest, &c->inputs_[0], hint_index, &hint_index); } while(c->inputs_[0].size() > old_size); // Get the new range @@ -2835,8 +2910,12 @@ void VersionSet::SetupOtherInputs(Compaction* c) { GetRange(c->inputs_[0], &smallest, &largest); // Populate the set of next-level files (inputs_[1]) to include in compaction - current_->GetOverlappingInputs(level+1, &smallest, &largest, &c->inputs_[1], - c->parent_index_, &c->parent_index_); + c->input_version_->GetOverlappingInputs(level + 1, + &smallest, + &largest, + &c->inputs_[1], + c->parent_index_, + &c->parent_index_); // Get entire range covered by compaction InternalKey all_start, all_limit; @@ -2849,8 +2928,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) { // can happen when one user key spans multiple files. if (!c->inputs_[1].empty()) { std::vector expanded0; - current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, - c->base_index_, nullptr); + c->input_version_->GetOverlappingInputs( + level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]); const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]); const uint64_t expanded0_size = TotalFileSize(expanded0); @@ -2858,13 +2937,16 @@ void VersionSet::SetupOtherInputs(Compaction* c) { if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && !FilesInCompaction(expanded0) && - !current_->HasOverlappingUserKey(&expanded0, level)) { + !c->input_version_->HasOverlappingUserKey(&expanded0, level)) { InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector expanded1; - current_->GetOverlappingInputs(level+1, &new_start, &new_limit, - &expanded1, c->parent_index_, - &c->parent_index_); + c->input_version_->GetOverlappingInputs(level + 1, + &new_start, + &new_limit, + &expanded1, + c->parent_index_, + &c->parent_index_); if (expanded1.size() == c->inputs_[1].size() && !FilesInCompaction(expanded1)) { Log(options_->info_log, @@ -2891,8 +2973,8 @@ void VersionSet::SetupOtherInputs(Compaction* c) { // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) if (level + 2 < NumberLevels()) { - current_->GetOverlappingInputs(level + 2, &all_start, &all_limit, - &c->grandparents_); + c->input_version_->GetOverlappingInputs( + level + 2, &all_start, &all_limit, &c->grandparents_); } if (false) { @@ -2914,49 +2996,79 @@ Status VersionSet::GetMetadataForFile( uint64_t number, int *filelevel, FileMetaData *meta) { - for (int level = 0; level < NumberLevels(); level++) { - const std::vector& files = current_->files_[level]; - for (size_t i = 0; i < files.size(); i++) { - if (files[i]->number == number) { - *meta = *files[i]; - *filelevel = level; - return Status::OK(); + for (auto cfd : column_family_data_) { + for (int level = 0; level < NumberLevels(); level++) { + const std::vector& files = + cfd.second->current->files_[level]; + for (size_t i = 0; i < files.size(); i++) { + if (files[i]->number == number) { + *meta = *files[i]; + *filelevel = level; + return Status::OK(); + } } } } return Status::NotFound("File not present in any level"); } -void VersionSet::GetLiveFilesMetaData( - std::vector * metadata) { - for (int level = 0; level < NumberLevels(); level++) { - const std::vector& files = current_->files_[level]; - for (size_t i = 0; i < files.size(); i++) { - LiveFileMetaData filemetadata; - filemetadata.name = TableFileName("", files[i]->number); - filemetadata.level = level; - filemetadata.size = files[i]->file_size; - filemetadata.smallestkey = files[i]->smallest.user_key().ToString(); - filemetadata.largestkey = files[i]->largest.user_key().ToString(); - filemetadata.smallest_seqno = files[i]->smallest_seqno; - filemetadata.largest_seqno = files[i]->largest_seqno; - metadata->push_back(filemetadata); +void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { + for (auto cfd : column_family_data_) { + for (int level = 0; level < NumberLevels(); level++) { + const std::vector& files = + cfd.second->current->files_[level]; + for (size_t i = 0; i < files.size(); i++) { + LiveFileMetaData filemetadata; + filemetadata.name = TableFileName("", files[i]->number); + filemetadata.level = level; + filemetadata.size = files[i]->file_size; + filemetadata.smallestkey = files[i]->smallest.user_key().ToString(); + filemetadata.largestkey = files[i]->largest.user_key().ToString(); + filemetadata.smallest_seqno = files[i]->smallest_seqno; + filemetadata.largest_seqno = files[i]->largest_seqno; + metadata->push_back(filemetadata); + } } } } void VersionSet::GetObsoleteFiles(std::vector* files) { - files->insert(files->end(), - obsolete_files_.begin(), - obsolete_files_.end()); + files->insert(files->end(), obsolete_files_.begin(), obsolete_files_.end()); obsolete_files_.clear(); } -Compaction* VersionSet::CompactRange( - int level, - const InternalKey* begin, - const InternalKey* end) { +ColumnFamilyData* VersionSet::CreateColumnFamily( + const ColumnFamilyOptions& options, VersionEdit* edit) { + assert(column_families_.find(edit->column_family_name_) == + column_families_.end()); + assert(edit->is_column_family_add_); + + column_families_.insert({edit->column_family_name_, edit->column_family_}); + ColumnFamilyData* new_cfd = + new ColumnFamilyData(edit->column_family_name_, this, options); + column_family_data_.insert({edit->column_family_, new_cfd}); + max_column_family_ = std::max(max_column_family_, edit->column_family_); + AppendVersion(new_cfd, new Version(this, current_version_number_++)); + return new_cfd; +} + +void VersionSet::DropColumnFamily(VersionEdit* edit) { + auto cfd = column_family_data_.find(edit->column_family_); + assert(cfd != column_family_data_.end()); + column_families_.erase(cfd->second->name); + cfd->second->current->Unref(); + // List must be empty + assert(cfd->second->dummy_versions.next_ == &cfd->second->dummy_versions); + delete cfd->second; + column_family_data_.erase(cfd); +} + +Compaction* VersionSet::CompactRange(int level, + const InternalKey* begin, + const InternalKey* end) { std::vector inputs; + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; // All files are 'overlapping' in universal style compaction. // We have to compact the entire range in one shot. @@ -2964,7 +3076,7 @@ Compaction* VersionSet::CompactRange( begin = nullptr; end = nullptr; } - current_->GetOverlappingInputs(level, begin, end, &inputs); + version->GetOverlappingInputs(level, begin, end, &inputs); if (inputs.empty()) { return nullptr; } @@ -2989,8 +3101,12 @@ Compaction* VersionSet::CompactRange( int out_level = (options_->compaction_style == kCompactionStyleUniversal) ? level : level+1; - Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(out_level), - MaxGrandParentOverlapBytes(level), NumberLevels()); + Compaction* c = new Compaction(level, + out_level, + MaxFileSizeForLevel(out_level), + MaxGrandParentOverlapBytes(level), + NumberLevels(), + version); c->inputs_[0] = inputs; ExpandWhileOverlapping(c); @@ -2999,8 +3115,6 @@ Compaction* VersionSet::CompactRange( return nullptr; } - c->input_version_ = current_; - c->input_version_->Ref(); SetupOtherInputs(c); // These files that are to be manaully compacted do not trample @@ -3013,14 +3127,19 @@ Compaction* VersionSet::CompactRange( return c; } -Compaction::Compaction(int level, int out_level, uint64_t target_file_size, - uint64_t max_grandparent_overlap_bytes, int number_levels, - bool seek_compaction, bool enable_compression) +Compaction::Compaction(int level, + int out_level, + uint64_t target_file_size, + uint64_t max_grandparent_overlap_bytes, + int number_levels, + Version* input_version, + bool seek_compaction, + bool enable_compression) : level_(level), out_level_(out_level), max_output_file_size_(target_file_size), maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), - input_version_(nullptr), + input_version_(input_version), number_levels_(number_levels), seek_compaction_(seek_compaction), enable_compression_(enable_compression), @@ -3033,6 +3152,7 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size, bottommost_level_(false), is_full_compaction_(false), level_ptrs_(std::vector(number_levels)) { + input_version_->Ref(); edit_ = new VersionEdit(number_levels_); for (int i = 0; i < number_levels_; i++) { level_ptrs_[i] = 0; diff --git a/db/version_set.h b/db/version_set.h index f47df9824..bd9b1095a 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -149,6 +149,7 @@ class Version { friend class Compaction; friend class VersionSet; friend class DBImpl; + friend struct ColumnFamilyData; class LevelFileNumIterator; Iterator* NewConcatenatingIterator(const ReadOptions&, @@ -197,9 +198,6 @@ class Version { double max_compaction_score_; // max score in l1 to ln-1 int max_compaction_score_level_; // level on which max score occurs - // The offset in the manifest file where this version is stored. - uint64_t offset_manifest_file_; - // A version number that uniquely represents this version. This is // used for debugging and logging purposes only. uint64_t version_number_; @@ -219,6 +217,20 @@ class Version { void operator=(const Version&); }; +// column family metadata +struct ColumnFamilyData { + std::string name; + Version dummy_versions; // Head of circular doubly-linked list of versions. + Version* current; // == dummy_versions.prev_ + ColumnFamilyOptions options; + + ColumnFamilyData(const std::string& name, + VersionSet* vset, + const ColumnFamilyOptions& options) + : name(name), dummy_versions(vset), current(nullptr), options(options) {} + ~ColumnFamilyData() {} +}; + class VersionSet { public: VersionSet(const std::string& dbname, @@ -233,8 +245,17 @@ class VersionSet { // current version. Will release *mu while actually writing to the file. // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() - Status LogAndApply(VersionEdit* edit, port::Mutex* mu, - bool new_descriptor_log = false); + Status LogAndApply(ColumnFamilyData* column_family_data, + VersionEdit* edit, + port::Mutex* mu, + bool new_descriptor_log = false); + + Status LogAndApply(VersionEdit* edit, + port::Mutex* mu, + bool new_descriptor_log = false) { + return LogAndApply( + column_family_data_.find(0)->second, edit, mu, new_descriptor_log); + } // Recover the last saved descriptor from persistent storage. Status Recover(); @@ -248,7 +269,10 @@ class VersionSet { Status ReduceNumberOfLevels(int new_levels, port::Mutex* mu); // Return the current version. - Version* current() const { return current_; } + Version* current() const { + // TODO this only works for default column family now + return column_family_data_.find(0)->second->current; + } // Return the current manifest file number uint64_t ManifestFileNumber() const { return manifest_file_number_; } @@ -327,11 +351,13 @@ class VersionSet { // ending up with nothing to do. We can improve it later. // TODO: improve this function to be accurate for universal // compactions. + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; int num_levels_to_check = (options_->compaction_style != kCompactionStyleUniversal) ? NumberLevels() - 1 : 1; for (int i = 0; i < num_levels_to_check; i++) { - if (current_->compaction_score_[i] >= 1) { + if (version->compaction_score_[i] >= 1) { return true; } } @@ -339,18 +365,23 @@ class VersionSet { } // Returns true iff some level needs a compaction. bool NeedsCompaction() const { - return ((current_->file_to_compact_ != nullptr) || - NeedsSizeCompaction()); + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; + return ((version->file_to_compact_ != nullptr) || NeedsSizeCompaction()); } // Returns the maxmimum compaction score for levels 1 to max double MaxCompactionScore() const { - return current_->max_compaction_score_; + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; + return version->max_compaction_score_; } // See field declaration int MaxCompactionScoreLevel() const { - return current_->max_compaction_score_level_; + // TODO this only works for default column family now + Version* version = column_family_data_.find(0)->second->current; + return version->max_compaction_score_level_; } // Add all files listed in any live version to *live. @@ -383,10 +414,12 @@ class VersionSet { // Return a human-readable short (single-line) summary of files // in a specified level. Uses *scratch as backing store. - const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; + const char* LevelFileSummary(Version* version, + FileSummaryStorage* scratch, + int level) const; // Return the size of the current manifest file - const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } + const uint64_t ManifestFileSize() { return manifest_file_size_; } // For the specfied level, pick a compaction. // Returns nullptr if there is no compaction to be done. @@ -436,17 +469,13 @@ class VersionSet { void GetObsoleteFiles(std::vector* files); - // column family metadata - struct ColumnFamilyData { - std::string name; - ColumnFamilyOptions options; - explicit ColumnFamilyData(const std::string& name) : name(name) {} - ColumnFamilyData(const std::string& name, - const ColumnFamilyOptions& options) - : name(name), options(options) {} - }; + ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& options, + VersionEdit* edit); + + void DropColumnFamily(VersionEdit* edit); + std::unordered_map column_families_; - std::unordered_map column_family_data_; + std::unordered_map column_family_data_; uint32_t max_column_family_; private: @@ -476,7 +505,7 @@ class VersionSet { // Save current contents to *log Status WriteSnapshot(log::Writer* log); - void AppendVersion(Version* v); + void AppendVersion(ColumnFamilyData* column_family_data, Version* v); bool ManifestContains(const std::string& record) const; @@ -499,8 +528,6 @@ class VersionSet { // Opened lazily unique_ptr descriptor_log_; - Version dummy_versions_; // Head of circular doubly-linked list of versions. - Version* current_; // == dummy_versions_.prev_ // Per-level key at which the next compaction at that level should start. // Either an empty string, or a valid InternalKey. @@ -521,9 +548,8 @@ class VersionSet { // Queue of writers to the manifest file std::deque manifest_writers_; - // Store the manifest file size when it is checked. - // Save us the cost of checking file size twice in LogAndApply - uint64_t last_observed_manifest_size_; + // size of manifest file + uint64_t manifest_file_size_; std::vector obsolete_files_; @@ -616,9 +642,14 @@ class Compaction { friend class Version; friend class VersionSet; - explicit Compaction(int level, int out_level, uint64_t target_file_size, - uint64_t max_grandparent_overlap_bytes, int number_levels, - bool seek_compaction = false, bool enable_compression = true); + Compaction(int level, + int out_level, + uint64_t target_file_size, + uint64_t max_grandparent_overlap_bytes, + int number_levels, + Version* input_version, + bool seek_compaction = false, + bool enable_compression = true); int level_; int out_level_; // levels to which output files are stored diff --git a/db/version_set_reduce_num_levels.cc b/db/version_set_reduce_num_levels.cc index d13a4aed9..89653ea7c 100644 --- a/db/version_set_reduce_num_levels.cc +++ b/db/version_set_reduce_num_levels.cc @@ -24,7 +24,8 @@ Status VersionSet::ReduceNumberOfLevels(int new_levels, port::Mutex* mu) { "Number of levels needs to be bigger than 1"); } - Version* current_version = current_; + // TODO this only works for default column family now + Version* current_version = column_family_data_.find(0)->second->current; int current_levels = NumberLevels(); if (current_levels <= new_levels) {