From f7489123e292514b24af852ebb453b1f56e38490 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 31 Jan 2014 15:30:27 -0800 Subject: [PATCH] Move compaction picker and internal key comparator to ColumnFamilyData Summary: Compaction picker and internal key comparator are different for each column family (not global), so they should live in ColumnFamilyData Test Plan: make check Reviewers: dhruba, haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D15801 --- db/column_family.cc | 22 ++++++- db/column_family.h | 22 +++++++ db/compaction_picker.cc | 2 +- db/compaction_picker.h | 9 +-- db/db_impl.cc | 22 ++++--- db/internal_stats.cc | 22 +++---- db/internal_stats.h | 4 +- db/version_set.cc | 132 ++++++++++++++++---------------------- db/version_set.h | 37 +---------- include/rocksdb/options.h | 22 +++---- util/options.cc | 7 +- 11 files changed, 146 insertions(+), 155 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 47c660944..9b7a5284d 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -14,6 +14,7 @@ #include #include "db/version_set.h" +#include "db/compaction_picker.h" namespace rocksdb { @@ -65,6 +66,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, dummy_versions_(dummy_versions), current_(nullptr), options_(options), + icmp_(options_.comparator), mem_(nullptr), imm_(options.min_write_buffer_number_to_merge), super_version_(nullptr), @@ -72,7 +74,13 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, next_(nullptr), prev_(nullptr), log_number_(0), - need_slowdown_for_num_level0_files_(false) {} + need_slowdown_for_num_level0_files_(false) { + if (options_.compaction_style == kCompactionStyleUniversal) { + compaction_picker_.reset(new UniversalCompactionPicker(&options_, &icmp_)); + } else { + compaction_picker_.reset(new LevelCompactionPicker(&options_, &icmp_)); + } +} ColumnFamilyData::~ColumnFamilyData() { if (super_version_ != nullptr) { @@ -114,6 +122,18 @@ void ColumnFamilyData::CreateNewMemtable() { mem_->Ref(); } +Compaction* ColumnFamilyData::PickCompaction() { + return compaction_picker_->PickCompaction(current_); +} + +Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, + const InternalKey* begin, + const InternalKey* end, + InternalKey** compaction_end) { + return compaction_picker_->CompactRange(current_, input_level, output_level, + begin, end, compaction_end); +} + SuperVersion* ColumnFamilyData::InstallSuperVersion( SuperVersion* new_superversion) { new_superversion->Init(mem_, imm_.current(), current_); diff --git a/db/column_family.h b/db/column_family.h index 286a9ff1a..513eadd3e 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -23,6 +23,9 @@ class Version; class VersionSet; class MemTable; class MemTableListVersion; +class CompactionPicker; +class Compaction; +class InternalKey; // holds references to memtable, all immutable memtables and version struct SuperVersion { @@ -62,6 +65,8 @@ class ColumnFamilyData { uint32_t GetID() const { return id_; } const std::string& GetName() { return name_; } + int NumberLevels() const { return options_.num_levels; } + void SetLogNumber(uint64_t log_number) { log_number_ = log_number; } uint64_t GetLogNumber() const { return log_number_; } @@ -75,6 +80,17 @@ class ColumnFamilyData { void SetCurrent(Version* current); void CreateNewMemtable(); + // See documentation in compaction_picker.h + Compaction* PickCompaction(); + Compaction* CompactRange(int input_level, int output_level, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end); + + CompactionPicker* compaction_picker() const { + return compaction_picker_.get(); + } + const InternalKeyComparator& internal_comparator() const { return icmp_; } + SuperVersion* GetSuperVersion() const { return super_version_; } uint64_t GetSuperVersionNumber() const { return super_version_number_.load(); @@ -102,6 +118,8 @@ class ColumnFamilyData { Version* current_; // == dummy_versions->prev_ ColumnFamilyOptions options_; + const InternalKeyComparator icmp_; + MemTable* mem_; MemTableList imm_; SuperVersion* super_version_; @@ -124,6 +142,10 @@ class ColumnFamilyData { // A flag indicating whether we should delay writes because // we have too many level 0 files bool need_slowdown_for_num_level0_files_; + + // An object that keeps all the compaction stats + // and picks the next compaction + std::unique_ptr compaction_picker_; }; // Thread safe only for reading without a writer. All access should be diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 9582b6a29..c4e5719c3 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -41,7 +41,7 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { } // anonymous namespace -CompactionPicker::CompactionPicker(const Options* options, +CompactionPicker::CompactionPicker(const ColumnFamilyOptions* options, const InternalKeyComparator* icmp) : compactions_in_progress_(options->num_levels), options_(options), diff --git a/db/compaction_picker.h b/db/compaction_picker.h index ee77cc4c7..1b6897546 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -24,7 +24,8 @@ class Version; class CompactionPicker { public: - CompactionPicker(const Options* options, const InternalKeyComparator* icmp); + CompactionPicker(const ColumnFamilyOptions* options, + const InternalKeyComparator* icmp); virtual ~CompactionPicker(); // Pick level and inputs for a new compaction. @@ -115,7 +116,7 @@ class CompactionPicker { // Per-level max bytes std::unique_ptr level_max_bytes_; - const Options* const options_; + const ColumnFamilyOptions* const options_; private: int num_levels_; @@ -124,7 +125,7 @@ class CompactionPicker { class UniversalCompactionPicker : public CompactionPicker { public: - UniversalCompactionPicker(const Options* options, + UniversalCompactionPicker(const ColumnFamilyOptions* options, const InternalKeyComparator* icmp) : CompactionPicker(options, icmp) {} virtual Compaction* PickCompaction(Version* version) override; @@ -141,7 +142,7 @@ class UniversalCompactionPicker : public CompactionPicker { class LevelCompactionPicker : public CompactionPicker { public: - LevelCompactionPicker(const Options* options, + LevelCompactionPicker(const ColumnFamilyOptions* options, const InternalKeyComparator* icmp) : CompactionPicker(options, icmp) {} virtual Compaction* PickCompaction(Version* version) override; diff --git a/db/db_impl.cc b/db/db_impl.cc index a32448f92..d4148d34e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1312,7 +1312,10 @@ int DBImpl::FindMinimumEmptyLevelFitting(int level) { // stop if level i is not empty if (current->NumLevelFiles(i) > 0) break; // stop if level i is too small (cannot fit the level files) - if (versions_->MaxBytesForLevel(i) < current->NumLevelBytes(level)) break; + if (default_cfd_->compaction_picker()->MaxBytesForLevel(i) < + current->NumLevelBytes(level)) { + break; + } minimum_level = i; } @@ -1943,8 +1946,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, if (is_manual) { ManualCompaction* m = manual_compaction_; assert(m->in_progress); - c.reset(versions_->CompactRange( - m->input_level, m->output_level, m->begin, m->end, &manual_end)); + c.reset(default_cfd_->CompactRange(m->input_level, m->output_level, + m->begin, m->end, &manual_end)); if (!c) { m->done = true; } @@ -1959,7 +1962,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, ? "(end)" : manual_end->DebugString().c_str())); } else if (!options_.disable_auto_compactions) { - c.reset(versions_->PickCompaction()); + c.reset(default_cfd_->PickCompaction()); } Status status; @@ -1983,14 +1986,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, static_cast(f->number), c->level() + 1, static_cast(f->file_size), status.ToString().c_str(), default_cfd_->current()->LevelSummary(&tmp)); - versions_->ReleaseCompactionFiles(c.get(), status); + default_cfd_->compaction_picker()->ReleaseCompactionFiles(c.get(), status); *madeProgress = true; } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); status = DoCompactionWork(compact, deletion_state); CleanupCompaction(compact, status); - versions_->ReleaseCompactionFiles(c.get(), status); + default_cfd_->compaction_picker()->ReleaseCompactionFiles(c.get(), status); c->ReleaseInputs(); *madeProgress = true; } @@ -2121,7 +2124,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { // Over-estimate slightly so we don't end up just barely crossing // the threshold. compact->outfile->SetPreallocationBlockSize( - 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); + 1.1 * default_cfd_->compaction_picker()->MaxFileSizeForLevel( + compact->compaction->output_level())); CompressionType compression_type = GetCompressionType( options_, compact->compaction->output_level(), @@ -3534,9 +3538,7 @@ bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, const Slice& property, std::string* value) { value->clear(); MutexLock l(&mutex_); - return internal_stats_.GetProperty(property, value, versions_.get(), - default_cfd_->current(), - default_cfd_->imm()->size()); + return internal_stats_.GetProperty(property, value, default_cfd_); } void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family, diff --git a/db/internal_stats.cc b/db/internal_stats.cc index d946e6f27..29842eff3 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -8,14 +8,14 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/internal_stats.h" +#include "db/column_family.h" #include namespace rocksdb { bool InternalStats::GetProperty(const Slice& property, std::string* value, - VersionSet* version_set, Version* current, - int immsize) { + ColumnFamilyData* cfd) { Slice in = property; Slice prefix("rocksdb."); if (!in.starts_with(prefix)) return false; @@ -30,7 +30,7 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value, } else { char buf[100]; snprintf(buf, sizeof(buf), "%d", - current->NumLevelFiles(static_cast(level))); + cfd->current()->NumLevelFiles(static_cast(level))); *value = buf; return true; } @@ -43,8 +43,8 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value, for (int level = 0; level < number_levels_; level++) { snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, - current->NumLevelFiles(level), - current->NumLevelBytes(level) / 1048576.0); + cfd->current()->NumLevelFiles(level), + cfd->current()->NumLevelBytes(level) / 1048576.0); value->append(buf); } return true; @@ -87,7 +87,7 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value, ); value->append(buf); for (int level = 0; level < number_levels_; level++) { - int files = current->NumLevelFiles(level); + int files = cfd->current()->NumLevelFiles(level); if (compaction_stats_[level].micros > 0 || files > 0) { int64_t bytes_read = compaction_stats_[level].bytes_readn + compaction_stats_[level].bytes_readnp1; @@ -117,9 +117,9 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value, "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f " "%10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %8d %9.1f %9.1f " "%9lu\n", - level, files, current->NumLevelBytes(level) / 1048576.0, - current->NumLevelBytes(level) / - version_set->MaxBytesForLevel(level), + level, files, cfd->current()->NumLevelBytes(level) / 1048576.0, + cfd->current()->NumLevelBytes(level) / + cfd->compaction_picker()->MaxBytesForLevel(level), compaction_stats_[level].micros / 1e6, bytes_read / 1048576.0, compaction_stats_[level].bytes_written / 1048576.0, compaction_stats_[level].bytes_readn / 1048576.0, @@ -285,10 +285,10 @@ bool InternalStats::GetProperty(const Slice& property, std::string* value, return true; } else if (in == "sstables") { - *value = current->DebugString(); + *value = cfd->current()->DebugString(); return true; } else if (in == "num-immutable-mem-table") { - *value = std::to_string(immsize); + *value = std::to_string(cfd->imm()->size()); return true; } diff --git a/db/internal_stats.h b/db/internal_stats.h index ca49294e4..bc590f992 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -16,6 +16,8 @@ #include #include +class ColumnFamilyData; + namespace rocksdb { class InternalStats { public: @@ -100,7 +102,7 @@ class InternalStats { } bool GetProperty(const Slice& property, std::string* value, - VersionSet* version_set, Version* current, int immsize); + ColumnFamilyData* cfd); private: std::vector compaction_stats_; diff --git a/db/version_set.cc b/db/version_set.cc index 9ea9a84ca..f78c1deb5 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -409,8 +409,10 @@ static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { return false; } -Version::Version(VersionSet* vset, uint64_t version_number) - : vset_(vset), +Version::Version(ColumnFamilyData* cfd, VersionSet* vset, + uint64_t version_number) + : cfd_(cfd), + vset_(vset), next_(this), prev_(this), refs_(0), @@ -434,7 +436,7 @@ void Version::Get(const ReadOptions& options, bool* value_found) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); - const Comparator* ucmp = vset_->icmp_.user_comparator(); + const Comparator* ucmp = cfd_->internal_comparator().user_comparator(); auto merge_operator = db_options.merge_operator.get(); auto logger = db_options.info_log; @@ -481,7 +483,7 @@ void Version::Get(const ReadOptions& options, // On Level-n (n>=1), files are sorted. // Binary search to find earliest index whose largest key >= ikey. // We will also stop when the file no longer overlaps ikey - start_index = FindFile(vset_->icmp_, files_[level], ikey); + start_index = FindFile(cfd_->internal_comparator(), files_[level], ikey); } // Traverse each relevant file to find the desired key @@ -507,11 +509,12 @@ void Version::Get(const ReadOptions& options, // Sanity check to make sure that the files are correctly sorted if (prev_file) { if (level != 0) { - int comp_sign = vset_->icmp_.Compare(prev_file->largest, f->smallest); + int comp_sign = cfd_->internal_comparator().Compare( + prev_file->largest, f->smallest); assert(comp_sign < 0); } else { // level == 0, the current file cannot be newer than the previous one. - if (vset_->options_->compaction_style == kCompactionStyleUniversal) { + if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { assert(!NewestFirstBySeqNo(f, prev_file)); } else { assert(!NewestFirst(f, prev_file)); @@ -597,7 +600,7 @@ bool Version::UpdateStats(const GetStats& stats) { void Version::Finalize(std::vector& size_being_compacted) { // Pre-sort level0 for Get() - if (vset_->options_->compaction_style == kCompactionStyleUniversal) { + if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { std::sort(files_[0].begin(), files_[0].end(), NewestFirstBySeqNo); } else { std::sort(files_[0].begin(), files_[0].end(), NewestFirst); @@ -607,7 +610,7 @@ void Version::Finalize(std::vector& size_being_compacted) { int max_score_level = 0; int num_levels_to_check = - (vset_->options_->compaction_style != kCompactionStyleUniversal) + (cfd_->options()->compaction_style != kCompactionStyleUniversal) ? NumberLevels() - 1 : 1; @@ -633,15 +636,15 @@ void Version::Finalize(std::vector& size_being_compacted) { } // If we are slowing down writes, then we better compact that first - if (numfiles >= vset_->options_->level0_stop_writes_trigger) { + if (numfiles >= cfd_->options()->level0_stop_writes_trigger) { score = 1000000; // Log(options_->info_log, "XXX score l0 = 1000000000 max"); - } else if (numfiles >= vset_->options_->level0_slowdown_writes_trigger) { + } else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) { score = 10000; // Log(options_->info_log, "XXX score l0 = 1000000 medium"); } else { score = static_cast(numfiles) / - vset_->options_->level0_file_num_compaction_trigger; + cfd_->options()->level0_file_num_compaction_trigger; if (score >= 1) { // Log(options_->info_log, "XXX score l0 = %d least", (int)score); } @@ -650,7 +653,8 @@ void Version::Finalize(std::vector& size_being_compacted) { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalFileSize(files_[level]) - size_being_compacted[level]; - score = static_cast(level_bytes) / vset_->MaxBytesForLevel(level); + score = static_cast(level_bytes) / + cfd_->compaction_picker()->MaxBytesForLevel(level); if (score > 1) { // Log(options_->info_log, "XXX score l%d = %d ", level, (int)score); } @@ -708,7 +712,7 @@ bool CompareSeqnoDescending(const Version::Fsize& first, void Version::UpdateFilesBySize() { // No need to sort the highest level because it is never compacted. int max_level = - (vset_->options_->compaction_style == kCompactionStyleUniversal) + (cfd_->options()->compaction_style == kCompactionStyleUniversal) ? NumberLevels() : NumberLevels() - 1; @@ -725,7 +729,7 @@ void Version::UpdateFilesBySize() { } // sort the top number_of_files_to_sort_ based on file size - if (vset_->options_->compaction_style == kCompactionStyleUniversal) { + if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { int num = temp.size(); std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), CompareSeqnoDescending); @@ -772,8 +776,9 @@ bool Version::NeedsCompaction() const { // TODO(sdong): improve this function to be accurate for universal // compactions. int num_levels_to_check = - (vset_->options_->compaction_style != kCompactionStyleUniversal) ? - NumberLevels() - 1 : 1; + (cfd_->options()->compaction_style != kCompactionStyleUniversal) + ? NumberLevels() - 1 + : 1; for (int i = 0; i < num_levels_to_check; i++) { if (compaction_score_[i] >= 1) { return true; @@ -785,8 +790,9 @@ bool Version::NeedsCompaction() const { bool Version::OverlapInLevel(int level, const Slice* smallest_user_key, const Slice* largest_user_key) { - return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level], - smallest_user_key, largest_user_key); + return SomeFileOverlapsRange(cfd_->internal_comparator(), (level > 0), + files_[level], smallest_user_key, + largest_user_key); } int Version::PickLevelForMemTableOutput( @@ -799,7 +805,7 @@ int Version::PickLevelForMemTableOutput( InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek); InternalKey limit(largest_user_key, 0, static_cast(0)); std::vector overlaps; - int max_mem_compact_level = vset_->options_->max_mem_compaction_level; + int max_mem_compact_level = cfd_->options()->max_mem_compaction_level; while (max_mem_compact_level > 0 && level < max_mem_compact_level) { if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) { break; @@ -810,7 +816,7 @@ int Version::PickLevelForMemTableOutput( } GetOverlappingInputs(level + 2, &start, &limit, &overlaps); const uint64_t sum = TotalFileSize(overlaps); - if (sum > vset_->compaction_picker_->MaxGrandParentOverlapBytes(level)) { + if (sum > cfd_->compaction_picker()->MaxGrandParentOverlapBytes(level)) { break; } level++; @@ -841,7 +847,7 @@ void Version::GetOverlappingInputs(int level, if (file_index) { *file_index = -1; } - const Comparator* user_cmp = vset_->icmp_.user_comparator(); + const Comparator* user_cmp = cfd_->internal_comparator().user_comparator(); if (begin != nullptr && end != nullptr && level > 0) { GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs, hint_index, file_index); @@ -893,7 +899,7 @@ void Version::GetOverlappingInputsBinarySearch( int mid = 0; int max = files_[level].size() -1; bool foundOverlap = false; - const Comparator* user_cmp = vset_->icmp_.user_comparator(); + const Comparator* user_cmp = cfd_->internal_comparator().user_comparator(); // if the caller already knows the index of a file that has overlap, // then we can skip the binary search. @@ -939,7 +945,7 @@ void Version::ExtendOverlappingInputs( std::vector* inputs, unsigned int midIndex) { - const Comparator* user_cmp = vset_->icmp_.user_comparator(); + const Comparator* user_cmp = cfd_->internal_comparator().user_comparator(); #ifndef NDEBUG { // assert that the file at midIndex overlaps with the range @@ -1003,12 +1009,12 @@ bool Version::HasOverlappingUserKey( return false; } - const Comparator* user_cmp = vset_->icmp_.user_comparator(); + const Comparator* user_cmp = cfd_->internal_comparator().user_comparator(); const std::vector& files = files_[level]; const size_t kNumFiles = files.size(); // Check the last file in inputs against the file after it - size_t last_file = FindFile(vset_->icmp_, files, + size_t last_file = FindFile(cfd_->internal_comparator(), files, inputs->back()->largest.Encode()); assert(0 <= last_file && last_file < kNumFiles); // File should exist! if (last_file < kNumFiles-1) { // If not the last file @@ -1021,7 +1027,7 @@ bool Version::HasOverlappingUserKey( } // Check the first file in inputs against the file just before it - size_t first_file = FindFile(vset_->icmp_, files, + size_t first_file = FindFile(cfd_->internal_comparator(), files, inputs->front()->smallest.Encode()); assert(0 <= first_file && first_file <= last_file); // File should exist! if (first_file > 0) { // If not first file @@ -1164,17 +1170,17 @@ class VersionSet::Builder { FileSet* added_files; }; - VersionSet* vset_; + ColumnFamilyData* cfd_; Version* base_; LevelState* levels_; public: // Initialize a builder with the files from *base and other info from *vset - Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) { + Builder(ColumnFamilyData* cfd, Version* base) : cfd_(cfd), base_(base) { base_->Ref(); levels_ = new LevelState[base->NumberLevels()]; BySmallestKey cmp; - cmp.internal_comparator = &vset_->icmp_; + cmp.internal_comparator = &cfd_->internal_comparator(); for (int level = 0; level < base->NumberLevels(); level++) { levels_[level].added_files = new FileSet(cmp); } @@ -1210,7 +1216,7 @@ class VersionSet::Builder { for (uint32_t i = 1; i < v->files_[level].size(); i++) { const InternalKey& prev_end = v->files_[level][i-1]->largest; const InternalKey& this_begin = v->files_[level][i]->smallest; - if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) { + if (cfd_->internal_comparator().Compare(prev_end, this_begin) >= 0) { fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", prev_end.DebugString().c_str(), this_begin.DebugString().c_str()); @@ -1315,7 +1321,7 @@ class VersionSet::Builder { CheckConsistency(base_); CheckConsistency(v); BySmallestKey cmp; - cmp.internal_comparator = &vset_->icmp_; + cmp.internal_comparator = &cfd_->internal_comparator(); for (int level = 0; level < base_->NumberLevels(); level++) { // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. @@ -1354,8 +1360,8 @@ class VersionSet::Builder { std::vector* files = &v->files_[level]; if (level > 0 && !files->empty()) { // Must not overlap - assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest, - f->smallest) < 0); + assert(cfd_->internal_comparator().Compare( + (*files)[files->size() - 1]->largest, f->smallest) < 0); } f->refs++; files->push_back(f); @@ -1382,11 +1388,6 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options, manifest_file_size_(0), storage_options_(storage_options), storage_options_compactions_(storage_options_) { - if (options_->compaction_style == kCompactionStyleUniversal) { - compaction_picker_.reset(new UniversalCompactionPicker(options_, &icmp_)); - } else { - compaction_picker_.reset(new LevelCompactionPicker(options_, &icmp_)); - } } VersionSet::~VersionSet() { @@ -1439,8 +1440,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } std::vector batch_edits; - Version* v = new Version(this, current_version_number_++); - Builder builder(this, column_family_data->current()); + Version* v = new Version(column_family_data, this, current_version_number_++); + Builder builder(column_family_data, column_family_data->current()); // process all requests in the queue ManifestWriter* last_writer = &w; @@ -1483,7 +1484,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, { // calculate the amount of data being compacted at every level std::vector size_being_compacted(v->NumberLevels() - 1); - compaction_picker_->SizeBeingCompacted(size_being_compacted); + column_family_data->compaction_picker()->SizeBeingCompacted( + size_being_compacted); mu->Unlock(); @@ -1679,7 +1681,7 @@ Status VersionSet::Recover( } else { ColumnFamilyData* default_cfd = CreateColumnFamily(default_cf_iter->second, &default_cf_edit); - builders.insert({0, new Builder(this, default_cfd->current())}); + builders.insert({0, new Builder(default_cfd, default_cfd->current())}); } { @@ -1725,7 +1727,7 @@ Status VersionSet::Recover( ColumnFamilyData* new_cfd = CreateColumnFamily(cf_options->second, &edit); builders.insert( - {edit.column_family_, new Builder(this, new_cfd->current())}); + {edit.column_family_, new Builder(new_cfd, new_cfd->current())}); } } else if (edit.is_column_family_drop_) { if (cf_in_builders) { @@ -1815,12 +1817,12 @@ Status VersionSet::Recover( if (s.ok()) { for (auto cfd : *column_family_set_) { - Version* v = new Version(this, current_version_number_++); + Version* v = new Version(cfd, this, current_version_number_++); builders[cfd->GetID()]->SaveTo(v); // Install recovered version std::vector size_being_compacted(v->NumberLevels() - 1); - compaction_picker_->SizeBeingCompacted(size_being_compacted); + cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); v->Finalize(size_being_compacted); AppendVersion(cfd, v); } @@ -2007,7 +2009,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, uint64_t prev_log_number = 0; int count = 0; // TODO works only for default column family currently - VersionSet::Builder builder(this, + VersionSet::Builder builder(column_family_set_->GetDefault(), column_family_set_->GetDefault()->current()); { @@ -2084,7 +2086,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, } if (s.ok()) { - Version* v = new Version(this, 0); + Version* v = new Version(column_family_set_->GetDefault(), this, 0); builder.SaveTo(v); manifest_file_number_ = next_file; @@ -2258,22 +2260,6 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { } } -Compaction* VersionSet::PickCompaction() { - // TODO this only works for default column family now - Version* version = column_family_set_->GetDefault()->current(); - return compaction_picker_->PickCompaction(version); -} - -Compaction* VersionSet::CompactRange(int input_level, int output_level, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end) { - // TODO this only works for default column family now - Version* version = column_family_set_->GetDefault()->current(); - return compaction_picker_->CompactRange(version, input_level, output_level, - begin, end, compaction_end); -} - Iterator* VersionSet::MakeInputIterator(Compaction* c) { ReadOptions options; options.fill_cache = false; @@ -2295,7 +2281,9 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( - new Version::LevelFileNumIterator(icmp_, c->inputs(which)), + new Version::LevelFileNumIterator( + c->input_version()->cfd_->internal_comparator(), + c->inputs(which)), &GetFileIterator, table_cache_, options, storage_options_, true /* for compaction */); } @@ -2307,14 +2295,6 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { return result; } -double VersionSet::MaxBytesForLevel(int level) { - return compaction_picker_->MaxBytesForLevel(level); -} - -uint64_t VersionSet::MaxFileSizeForLevel(int level) { - return compaction_picker_->MaxFileSizeForLevel(level); -} - // verify that the files listed in this compaction are present // in the current version bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { @@ -2365,10 +2345,6 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { return true; // everything good } -void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) { - compaction_picker_->ReleaseCompactionFiles(c, status); -} - Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, FileMetaData* meta, ColumnFamilyData** cfd) { @@ -2415,11 +2391,11 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( const ColumnFamilyOptions& options, VersionEdit* edit) { assert(edit->is_column_family_add_); - Version* dummy_versions = new Version(this); + Version* dummy_versions = new Version(nullptr, this); auto new_cfd = column_family_set_->CreateColumnFamily( edit->column_family_name_, edit->column_family_, dummy_versions, options); - AppendVersion(new_cfd, new Version(this, current_version_number_++)); + AppendVersion(new_cfd, new Version(new_cfd, this, current_version_number_++)); new_cfd->CreateNewMemtable(); return new_cfd; } diff --git a/db/version_set.h b/db/version_set.h index 3376668cb..178787dcb 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -217,6 +217,7 @@ class Version { // record results in files_by_size_. The largest files are listed first. void UpdateFilesBySize(); + ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs VersionSet* vset_; // VersionSet to which this Version belongs Version* next_; // Next version in linked list Version* prev_; // Previous version in linked list @@ -262,7 +263,7 @@ class Version { // used for debugging and logging purposes only. uint64_t version_number_; - explicit Version(VersionSet* vset, uint64_t version_number = 0); + Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0); ~Version(); @@ -362,29 +363,6 @@ class VersionSet { int NumberLevels() const { return num_levels_; } - // Pick level and inputs for a new compaction. - // Returns nullptr if there is no compaction to be done. - // Otherwise returns a pointer to a heap-allocated object that - // describes the compaction. Caller should delete the result. - Compaction* PickCompaction(); - - // Return a compaction object for compacting the range [begin,end] in - // the specified level. Returns nullptr if there is nothing in that - // level that overlaps the specified range. Caller should delete - // the result. - // - // The returned Compaction might not include the whole requested range. - // In that case, compaction_end will be set to the next key that needs - // compacting. In case the compaction will compact the whole range, - // compaction_end will be set to nullptr. - // Client is responsible for compaction_end storage -- when called, - // *compaction_end should point to valid InternalKey! - Compaction* CompactRange(int input_level, - int output_level, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end); - // Create an iterator that reads over the compaction inputs for "*c". // The caller should delete the iterator when no longer needed. Iterator* MakeInputIterator(Compaction* c); @@ -409,13 +387,6 @@ class VersionSet { // pick the same files to compact. bool VerifyCompactionFileConsistency(Compaction* c); - double MaxBytesForLevel(int level); - - // Get the max file size in a given level. - uint64_t MaxFileSizeForLevel(int level); - - void ReleaseCompactionFiles(Compaction* c, Status status); - Status GetMetadataForFile(uint64_t number, int* filelevel, FileMetaData* metadata, ColumnFamilyData** cfd); @@ -471,10 +442,6 @@ class VersionSet { // Opened lazily unique_ptr descriptor_log_; - // An object that keeps all the compaction stats - // and picks the next compaction - std::unique_ptr compaction_picker_; - // generates a increasing version number for every new version uint64_t current_version_number_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 453b5cdbb..0ca7055bb 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -118,6 +118,12 @@ struct ColumnFamilyOptions { // Default: a factory that doesn't provide any object std::shared_ptr compaction_filter_factory; + // Any internal progress/error information generated by the db will + // be written to info_log if it is non-nullptr, or to a file stored + // in the same directory as the DB contents if info_log is nullptr. + // Default: nullptr + shared_ptr info_log; + // ------------------- // Parameters that affect performance @@ -316,6 +322,11 @@ struct ColumnFamilyOptions { // stop building a single file in a level->level+1 compaction. int max_grandparent_overlap_factor; + // If non-null, then we should collect metrics about database operations + // Statistics objects should not be shared between DB instances as + // it does not use any locks to prevent concurrent updates. + shared_ptr statistics; + // Disable compaction triggered by seek. // With bloomfilter and fast storage, a miss on one level // is very cheap if the file handle is cached in table cache @@ -478,12 +489,6 @@ struct DBOptions { // Default: Env::Default() Env* env; - // Any internal progress/error information generated by the db will - // be written to info_log if it is non-nullptr, or to a file stored - // in the same directory as the DB contents if info_log is nullptr. - // Default: nullptr - shared_ptr info_log; - // Number of open files that can be used by the DB. You may need to // increase this if your database has a large working set (budget // one open file per 2MB of working set). @@ -491,11 +496,6 @@ struct DBOptions { // Default: 1000 int max_open_files; - // If non-null, then we should collect metrics about database operations - // Statistics objects should not be shared between DB instances as - // it does not use any locks to prevent concurrent updates. - shared_ptr statistics; - // If true, then the contents of data files are not synced // to stable storage. Their contents remain in the OS buffers till the // OS decides to flush them. This option is good for bulk-loading diff --git a/util/options.cc b/util/options.cc index f2e11c0d1..ca0efd65a 100644 --- a/util/options.cc +++ b/util/options.cc @@ -32,6 +32,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() compaction_filter_factory( std::shared_ptr( new DefaultCompactionFilterFactory())), + info_log(nullptr), write_buffer_size(4<<20), max_write_buffer_number(2), min_write_buffer_number_to_merge(1), @@ -56,6 +57,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() expanded_compaction_factor(25), source_compaction_factor(1), max_grandparent_overlap_factor(10), + statistics(nullptr), disable_seek_compaction(false), soft_rate_limit(0.0), hard_rate_limit(0.0), @@ -84,6 +86,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) merge_operator(options.merge_operator), compaction_filter(options.compaction_filter), compaction_filter_factory(options.compaction_filter_factory), + info_log(options.info_log), write_buffer_size(options.write_buffer_size), max_write_buffer_number(options.max_write_buffer_number), min_write_buffer_number_to_merge( @@ -113,6 +116,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) expanded_compaction_factor(options.expanded_compaction_factor), source_compaction_factor(options.source_compaction_factor), max_grandparent_overlap_factor(options.max_grandparent_overlap_factor), + statistics(options.statistics), disable_seek_compaction(options.disable_seek_compaction), soft_rate_limit(options.soft_rate_limit), hard_rate_limit(options.hard_rate_limit), @@ -145,7 +149,6 @@ DBOptions::DBOptions() error_if_exists(false), paranoid_checks(false), env(Env::Default()), - info_log(nullptr), max_open_files(1000), disableDataSync(false), use_fsync(false), @@ -178,9 +181,7 @@ DBOptions::DBOptions(const Options& options) error_if_exists(options.error_if_exists), paranoid_checks(options.paranoid_checks), env(options.env), - info_log(options.info_log), max_open_files(options.max_open_files), - statistics(options.statistics), disableDataSync(options.disableDataSync), use_fsync(options.use_fsync), db_stats_log_interval(options.db_stats_log_interval),