diff --git a/Makefile b/Makefile index 62b31b87a..6b11012c2 100644 --- a/Makefile +++ b/Makefile @@ -131,6 +131,7 @@ TESTS = \ spatial_db_test \ version_edit_test \ version_set_test \ + compaction_picker_test \ file_indexer_test \ write_batch_test \ write_controller_test\ @@ -452,6 +453,9 @@ version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +compaction_picker_test: db/compaction_picker_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/compaction_picker_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + file_indexer_test : db/file_indexer_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/file_indexer_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) diff --git a/db/column_family.cc b/db/column_family.cc index b64c24ffe..e6298692a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -324,8 +324,9 @@ ColumnFamilyData::~ColumnFamilyData() { void ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { if (current_ != nullptr) { - const double score = current_->MaxCompactionScore(); - const int max_level = current_->MaxCompactionScoreLevel(); + auto* vstorage = current_->GetStorageInfo(); + const double score = vstorage->MaxCompactionScore(); + const int max_level = vstorage->MaxCompactionScoreLevel(); auto write_controller = column_family_set_->write_controller_; @@ -337,26 +338,26 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "(waiting for flush), max_write_buffer_number is set to %d", name_.c_str(), imm()->size(), mutable_cf_options.max_write_buffer_number); - } else if (current_->NumLevelFiles(0) >= + } else if (vstorage->NumLevelFiles(0) >= mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1); Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stopping writes because we have %d level-0 files", - name_.c_str(), current_->NumLevelFiles(0)); + name_.c_str(), vstorage->NumLevelFiles(0)); } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && - current_->NumLevelFiles(0) >= + vstorage->NumLevelFiles(0) >= mutable_cf_options.level0_slowdown_writes_trigger) { - uint64_t slowdown = SlowdownAmount( - current_->NumLevelFiles(0), - mutable_cf_options.level0_slowdown_writes_trigger, - mutable_cf_options.level0_stop_writes_trigger); + uint64_t slowdown = + SlowdownAmount(vstorage->NumLevelFiles(0), + mutable_cf_options.level0_slowdown_writes_trigger, + mutable_cf_options.level0_stop_writes_trigger); write_controller_token_ = write_controller->GetDelayToken(slowdown); internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown); Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stalling writes because we have %d level-0 files (%" PRIu64 "us)", - name_.c_str(), current_->NumLevelFiles(0), slowdown); + name_.c_str(), vstorage->NumLevelFiles(0), slowdown); } else if (mutable_cf_options.hard_rate_limit > 1.0 && score > mutable_cf_options.hard_rate_limit) { uint64_t kHardLimitSlowdown = 1000; @@ -403,8 +404,11 @@ void ColumnFamilyData::CreateNewMemtable( Compaction* ColumnFamilyData::PickCompaction( const MutableCFOptions& mutable_options, LogBuffer* log_buffer) { - auto result = compaction_picker_->PickCompaction( - mutable_options, current_, log_buffer); + auto* result = compaction_picker_->PickCompaction( + GetName(), mutable_options, current_->GetStorageInfo(), log_buffer); + if (result != nullptr) { + result->SetInputVersion(current_); + } return result; } @@ -413,9 +417,13 @@ Compaction* ColumnFamilyData::CompactRange( int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { - return compaction_picker_->CompactRange( - mutable_cf_options, current_, input_level, output_level, - output_path_id, begin, end, compaction_end); + auto* result = compaction_picker_->CompactRange( + GetName(), mutable_cf_options, current_->GetStorageInfo(), input_level, + output_level, output_path_id, begin, end, compaction_end); + if (result != nullptr) { + result->SetInputVersion(current_); + } + return result; } SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( diff --git a/db/compaction.cc b/db/compaction.cc index 533fe497e..a739da29e 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -29,7 +29,17 @@ uint64_t TotalFileSize(const std::vector& files) { return sum; } -Compaction::Compaction(Version* input_version, int start_level, int out_level, +void Compaction::SetInputVersion(Version* input_version) { + input_version_ = input_version; + cfd_ = input_version_->cfd(); + + cfd_->Ref(); + input_version_->Ref(); + edit_ = new VersionEdit(); + edit_->SetColumnFamily(cfd_->GetID()); +} + +Compaction::Compaction(int number_levels, int start_level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, uint32_t output_path_id, @@ -39,9 +49,10 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level, output_level_(out_level), max_output_file_size_(target_file_size), max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), - input_version_(input_version), - number_levels_(input_version_->NumberLevels()), - cfd_(input_version_->cfd()), + input_version_(nullptr), + edit_(nullptr), + number_levels_(number_levels), + cfd_(nullptr), output_path_id_(output_path_id), output_compression_(output_compression), seek_compaction_(seek_compaction), @@ -56,10 +67,6 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level, is_full_compaction_(false), is_manual_compaction_(false), level_ptrs_(std::vector(number_levels_)) { - cfd_->Ref(); - input_version_->Ref(); - edit_ = new VersionEdit(); - edit_->SetColumnFamily(cfd_->GetID()); for (int i = 0; i < number_levels_; i++) { level_ptrs_[i] = 0; } @@ -113,6 +120,7 @@ void Compaction::AddInputDeletions(VersionEdit* edit) { } bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { + assert(input_version_ != nullptr); assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO); if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { return bottommost_level_; @@ -120,7 +128,8 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(const Slice& user_key) { // Maybe use binary search to find right entry instead of linear search? const Comparator* user_cmp = cfd_->user_comparator(); for (int lvl = output_level_ + 1; lvl < number_levels_; lvl++) { - const std::vector& files = input_version_->LevelFiles(lvl); + const std::vector& files = + input_version_->GetStorageInfo()->LevelFiles(lvl); for (; level_ptrs_[lvl] < files.size(); ) { FileMetaData* f = files[level_ptrs_[lvl]]; if (user_cmp->Compare(user_key, f->largest.user_key()) <= 0) { @@ -176,9 +185,9 @@ void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) { } // Is this compaction producing files at the bottommost level? -void Compaction::SetupBottomMostLevel(bool is_manual) { - assert(cfd_->ioptions()->compaction_style != kCompactionStyleFIFO); - if (cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { +void Compaction::SetupBottomMostLevel(VersionStorageInfo* vstorage, + bool is_manual, bool level0_only) { + if (level0_only) { // If universal compaction style is used and manual // compaction is occuring, then we are guaranteed that // all files will be picked in a single compaction @@ -193,7 +202,7 @@ void Compaction::SetupBottomMostLevel(bool is_manual) { bottommost_level_ = true; // checks whether there are files living beyond the output_level. for (int i = output_level_ + 1; i < number_levels_; i++) { - if (input_version_->NumLevelFiles(i) > 0) { + if (vstorage->NumLevelFiles(i) > 0) { bottommost_level_ = false; break; } @@ -218,7 +227,8 @@ void Compaction::ReleaseCompactionFiles(Status status) { } void Compaction::ResetNextCompactionIndex() { - input_version_->SetNextCompactionIndex(start_level_, 0); + assert(input_version_ != nullptr); + input_version_->GetStorageInfo()->ResetNextCompactionIndex(start_level_); } namespace { diff --git a/db/compaction.h b/db/compaction.h index 5183822e3..d8014545b 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -28,6 +28,7 @@ struct CompactionInputFiles { class Version; class ColumnFamilyData; +class VersionStorageInfo; // A Compaction encapsulates information about a compaction. class Compaction { @@ -161,13 +162,15 @@ class Compaction { // is the sum of all input file sizes. uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options); + void SetInputVersion(Version* input_version); + private: friend class CompactionPicker; friend class UniversalCompactionPicker; friend class FIFOCompactionPicker; friend class LevelCompactionPicker; - Compaction(Version* input_version, int start_level, int out_level, + Compaction(int num_levels, int start_level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, uint32_t output_path_id, CompressionType output_compression, bool seek_compaction = false, bool deletion_compaction = false); @@ -230,7 +233,8 @@ class Compaction { // bottommost level. // // @see BottomMostLevel() - void SetupBottomMostLevel(bool is_manual); + void SetupBottomMostLevel(VersionStorageInfo* vstorage, bool is_manual, + bool level0_only); // In case of compaction error, reset the nextIndex that is used // to pick up the next file to be compacted from files_by_size_ diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 974400fd9..676f39b7d 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -15,6 +15,7 @@ #include #include +#include #include "db/filename.h" #include "util/log_buffer.h" #include "util/statistics.h" @@ -121,7 +122,9 @@ void CompactionPicker::GetRange(const std::vector& inputs1, GetRange(all, smallest, largest); } -bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) { +bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, + VersionStorageInfo* vstorage, + Compaction* c) { assert(c != nullptr); // If inputs are empty then there is nothing to expand. if (c->inputs_[0].empty()) { @@ -148,9 +151,9 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) { old_size = c->inputs_[0].size(); GetRange(c->inputs_[0].files, &smallest, &largest); c->inputs_[0].clear(); - c->input_version_->GetOverlappingInputs( - level, &smallest, &largest, &c->inputs_[0].files, - hint_index, &hint_index); + vstorage->GetOverlappingInputs(level, &smallest, &largest, + &c->inputs_[0].files, hint_index, + &hint_index); } while(c->inputs_[0].size() > old_size); // Get the new range @@ -162,11 +165,11 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) { if (c->inputs_[0].empty()) { Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] ExpandWhileOverlapping() failure because zero input files", - c->column_family_data()->GetName().c_str()); + cf_name.c_str()); } if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0].files) || (c->level() != c->output_level() && - ParentRangeInCompaction(c->input_version_, &smallest, &largest, level, + ParentRangeInCompaction(vstorage, &smallest, &largest, level, &parent_index))) { c->inputs_[0].clear(); c->inputs_[1].clear(); @@ -192,15 +195,15 @@ bool CompactionPicker::FilesInCompaction(std::vector& files) { } // Returns true if any one of the parent files are being compacted -bool CompactionPicker::ParentRangeInCompaction(Version* version, +bool CompactionPicker::ParentRangeInCompaction(VersionStorageInfo* vstorage, const InternalKey* smallest, const InternalKey* largest, int level, int* parent_index) { std::vector inputs; assert(level + 1 < NumberLevels()); - version->GetOverlappingInputs(level + 1, smallest, largest, &inputs, - *parent_index, parent_index); + vstorage->GetOverlappingInputs(level + 1, smallest, largest, &inputs, + *parent_index, parent_index); return FilesInCompaction(inputs); } @@ -209,7 +212,8 @@ bool CompactionPicker::ParentRangeInCompaction(Version* version, // or cause "level" to include a file for compaction that has an overlapping // user-key with another file. void CompactionPicker::SetupOtherInputs( - const MutableCFOptions& mutable_cf_options, Compaction* c) { + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, Compaction* c) { // If inputs are empty, then there is nothing to expand. // If both input and output levels are the same, no need to consider // files at level "level+1" @@ -224,10 +228,9 @@ void CompactionPicker::SetupOtherInputs( GetRange(c->inputs_[0].files, &smallest, &largest); // Populate the set of next-level files (inputs_[1]) to include in compaction - c->input_version_->GetOverlappingInputs( - level + 1, &smallest, &largest, - &c->inputs_[1].files, c->parent_index_, - &c->parent_index_); + vstorage->GetOverlappingInputs(level + 1, &smallest, &largest, + &c->inputs_[1].files, c->parent_index_, + &c->parent_index_); // Get entire range covered by compaction InternalKey all_start, all_limit; @@ -240,8 +243,8 @@ void CompactionPicker::SetupOtherInputs( // can happen when one user key spans multiple files. if (!c->inputs_[1].empty()) { std::vector expanded0; - c->input_version_->GetOverlappingInputs( - level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); + vstorage->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, + c->base_index_, nullptr); const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files); const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1].files); const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); @@ -249,22 +252,21 @@ void CompactionPicker::SetupOtherInputs( if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && !FilesInCompaction(expanded0) && - !c->input_version_->HasOverlappingUserKey(&expanded0, level)) { + !vstorage->HasOverlappingUserKey(&expanded0, level)) { InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector expanded1; - c->input_version_->GetOverlappingInputs(level + 1, &new_start, &new_limit, - &expanded1, c->parent_index_, - &c->parent_index_); + vstorage->GetOverlappingInputs(level + 1, &new_start, &new_limit, + &expanded1, c->parent_index_, + &c->parent_index_); if (expanded1.size() == c->inputs_[1].size() && !FilesInCompaction(expanded1)) { Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log, "[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64 " bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n", - c->column_family_data()->GetName().c_str(), level, - c->inputs_[0].size(), c->inputs_[1].size(), inputs0_size, - inputs1_size, expanded0.size(), expanded1.size(), expanded0_size, - inputs1_size); + cf_name.c_str(), level, c->inputs_[0].size(), c->inputs_[1].size(), + inputs0_size, inputs1_size, expanded0.size(), expanded1.size(), + expanded0_size, inputs1_size); smallest = new_start; largest = new_limit; c->inputs_[0].files = expanded0; @@ -278,15 +280,15 @@ void CompactionPicker::SetupOtherInputs( // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) if (level + 2 < NumberLevels()) { - c->input_version_->GetOverlappingInputs(level + 2, &all_start, &all_limit, - &c->grandparents_); + vstorage->GetOverlappingInputs(level + 2, &all_start, &all_limit, + &c->grandparents_); } } Compaction* CompactionPicker::CompactRange( - const MutableCFOptions& mutable_cf_options, Version* version, - int input_level, int output_level, uint32_t output_path_id, - const InternalKey* begin, const InternalKey* end, + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { // CompactionPickerFIFO has its own implementation of compact range assert(ioptions_.compaction_style != kCompactionStyleFIFO); @@ -300,7 +302,7 @@ Compaction* CompactionPicker::CompactRange( begin = nullptr; end = nullptr; } - version->GetOverlappingInputs(input_level, begin, end, &inputs); + vstorage->GetOverlappingInputs(input_level, begin, end, &inputs); if (inputs.empty()) { return nullptr; } @@ -326,24 +328,20 @@ Compaction* CompactionPicker::CompactRange( } assert(output_path_id < static_cast(ioptions_.db_paths.size())); Compaction* c = new Compaction( - version, input_level, output_level, + vstorage->NumberLevels(), input_level, output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxGrandParentOverlapBytes(input_level), - output_path_id, - GetCompressionType(ioptions_, output_level)); + output_path_id, GetCompressionType(ioptions_, output_level)); c->inputs_[0].files = inputs; - if (ExpandWhileOverlapping(c) == false) { + if (ExpandWhileOverlapping(cf_name, vstorage, c) == false) { delete c; Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, - "[%s] Unable to perform CompactRange compact due to expansion" - " failure. Possible causes include some of the necessary " - " compaction input files are currently being compacted.\n", - version->cfd()->GetName().c_str()); + "[%s] Could not compact due to expansion failure.\n", cf_name.c_str()); return nullptr; } - SetupOtherInputs(mutable_cf_options, c); + SetupOtherInputs(cf_name, mutable_cf_options, vstorage, c); if (covering_the_whole_range) { *compaction_end = nullptr; @@ -355,7 +353,8 @@ Compaction* CompactionPicker::CompactRange( c->MarkFilesBeingCompacted(true); // Is this compaction creating a file at the bottommost level - c->SetupBottomMostLevel(true); + c->SetupBottomMostLevel( + vstorage, true, ioptions_.compaction_style == kCompactionStyleUniversal); c->is_manual_compaction_ = true; c->mutable_cf_options_ = mutable_cf_options; @@ -364,8 +363,8 @@ Compaction* CompactionPicker::CompactRange( } Compaction* LevelCompactionPicker::PickCompaction( - const MutableCFOptions& mutable_cf_options, - Version* version, LogBuffer* log_buffer) { + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { Compaction* c = nullptr; int level = -1; @@ -373,19 +372,23 @@ Compaction* LevelCompactionPicker::PickCompaction( // and also in LogAndApply(), otherwise the values could be stale. std::vector size_being_compacted(NumberLevels() - 1); SizeBeingCompacted(size_being_compacted); - version->ComputeCompactionScore(mutable_cf_options, size_being_compacted); + + CompactionOptionsFIFO dummy_compaction_options_fifo; + vstorage->ComputeCompactionScore( + mutable_cf_options, dummy_compaction_options_fifo, size_being_compacted); // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. // // Find the compactions by size on all levels. for (int i = 0; i < NumberLevels() - 1; i++) { - double score = version->CompactionScore(i); - assert(i == 0 || score <= version->CompactionScore(i - 1)); - level = version->CompactionScoreLevel(i); - if (score >= 1) { - c = PickCompactionBySize(mutable_cf_options, version, level, score); - if (c == nullptr || ExpandWhileOverlapping(c) == false) { + double score = vstorage->CompactionScore(i); + level = vstorage->CompactionScoreLevel(i); + assert(i == 0 || score <= vstorage->CompactionScore(i - 1)); + if ((score >= 1)) { + c = PickCompactionBySize(mutable_cf_options, vstorage, level, score); + if (c == nullptr || + ExpandWhileOverlapping(cf_name, vstorage, c) == false) { delete c; c = nullptr; } else { @@ -408,14 +411,14 @@ Compaction* LevelCompactionPicker::PickCompaction( // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. c->inputs_[0].clear(); - c->input_version_->GetOverlappingInputs(0, &smallest, &largest, - &c->inputs_[0].files); + vstorage->GetOverlappingInputs(0, &smallest, &largest, + &c->inputs_[0].files); // If we include more L0 files in the same compaction run it can // cause the 'smallest' and 'largest' key to get extended to a // larger range. So, re-invoke GetRange to get the new key range GetRange(c->inputs_[0].files, &smallest, &largest); - if (ParentRangeInCompaction(c->input_version_, &smallest, &largest, level, + if (ParentRangeInCompaction(vstorage, &smallest, &largest, level, &c->parent_index_)) { delete c; return nullptr; @@ -424,13 +427,13 @@ Compaction* LevelCompactionPicker::PickCompaction( } // Setup "level+1" files (inputs_[1]) - SetupOtherInputs(mutable_cf_options, c); + SetupOtherInputs(cf_name, mutable_cf_options, vstorage, c); // mark all the files that are being compacted c->MarkFilesBeingCompacted(true); // Is this compaction creating a file at the bottommost level - c->SetupBottomMostLevel(false); + c->SetupBottomMostLevel(vstorage, false, false); // remember this currently undergoing compaction compactions_in_progress_[level].insert(c); @@ -440,8 +443,8 @@ Compaction* LevelCompactionPicker::PickCompaction( } Compaction* LevelCompactionPicker::PickCompactionBySize( - const MutableCFOptions& mutable_cf_options, - Version* version, int level, double score) { + const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, + int level, double score) { Compaction* c = nullptr; // level 0 files are overlapping. So we cannot pick more @@ -454,7 +457,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( assert(level >= 0); assert(level + 1 < NumberLevels()); - c = new Compaction(version, level, level + 1, + c = new Compaction(vstorage->NumberLevels(), level, level + 1, mutable_cf_options.MaxFileSizeForLevel(level + 1), mutable_cf_options.MaxGrandParentOverlapBytes(level), 0, GetCompressionType(ioptions_, level + 1)); @@ -462,20 +465,19 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( // Pick the largest file in this level that is not already // being compacted - const std::vector& file_size = version->FilesBySize(level); - const std::vector& level_files = version->LevelFiles(level); + const std::vector& file_size = vstorage->FilesBySize(level); + const std::vector& level_files = vstorage->LevelFiles(level); // record the first file that is not yet compacted int nextIndex = -1; - for (unsigned int i = version->NextCompactionIndex(level); + for (unsigned int i = vstorage->NextCompactionIndex(level); i < file_size.size(); i++) { int index = file_size[i]; FileMetaData* f = level_files[index]; - // Check to verify files are arranged in descending compensated size. assert((i == file_size.size() - 1) || - (i >= Version::kNumberFilesToSort - 1) || + (i >= VersionStorageInfo::kNumberFilesToSort - 1) || (f->compensated_file_size >= level_files[file_size[i + 1]]->compensated_file_size)); @@ -493,8 +495,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( // Do not pick this file if its parents at level+1 are being compacted. // Maybe we can avoid redoing this work in SetupOtherInputs int parent_index = -1; - if (ParentRangeInCompaction(version, &f->smallest, &f->largest, - level, &parent_index)) { + if (ParentRangeInCompaction(vstorage, &f->smallest, &f->largest, level, + &parent_index)) { continue; } c->inputs_[0].files.push_back(f); @@ -509,7 +511,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( } // store where to start the iteration in the next call to PickCompaction - version->SetNextCompactionIndex(level, nextIndex); + vstorage->SetNextCompactionIndex(level, nextIndex); return c; } @@ -518,39 +520,38 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( // time-range to compact. // Compaction* UniversalCompactionPicker::PickCompaction( - const MutableCFOptions& mutable_cf_options, - Version* version, LogBuffer* log_buffer) { + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { const int kLevel0 = 0; - double score = version->CompactionScore(kLevel0); - const std::vector& level_files = version->LevelFiles(kLevel0); + double score = vstorage->CompactionScore(kLevel0); + const std::vector& level_files = vstorage->LevelFiles(kLevel0); if ((level_files.size() < (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) { - LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", - version->cfd()->GetName().c_str()); + LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", cf_name.c_str()); return nullptr; } - Version::FileSummaryStorage tmp; + VersionStorageInfo::FileSummaryStorage tmp; LogToBuffer(log_buffer, 3072, "[%s] Universal: candidate files(%zu): %s\n", - version->cfd()->GetName().c_str(), level_files.size(), - version->LevelFileSummary(&tmp, kLevel0)); + cf_name.c_str(), level_files.size(), + vstorage->LevelFileSummary(&tmp, kLevel0)); // Check for size amplification first. Compaction* c; - if ((c = PickCompactionUniversalSizeAmp( - mutable_cf_options, version, score, log_buffer)) != nullptr) { + if ((c = PickCompactionUniversalSizeAmp(cf_name, mutable_cf_options, vstorage, + score, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n", - version->cfd()->GetName().c_str()); + cf_name.c_str()); } else { // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. unsigned int ratio = ioptions_.compaction_options_universal.size_ratio; - if ((c = PickCompactionUniversalReadAmp( - mutable_cf_options, version, score, ratio, - UINT_MAX, log_buffer)) != nullptr) { + if ((c = PickCompactionUniversalReadAmp(cf_name, mutable_cf_options, + vstorage, score, ratio, UINT_MAX, + log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n", - version->cfd()->GetName().c_str()); + cf_name.c_str()); } else { // Size amplification and file size ratios are within configured limits. // If max read amplification is exceeding configured limits, then force @@ -559,10 +560,11 @@ Compaction* UniversalCompactionPicker::PickCompaction( unsigned int num_files = level_files.size() - mutable_cf_options.level0_file_num_compaction_trigger; if ((c = PickCompactionUniversalReadAmp( - mutable_cf_options, version, score, UINT_MAX, + cf_name, mutable_cf_options, vstorage, score, UINT_MAX, num_files, log_buffer)) != nullptr) { - LogToBuffer(log_buffer, "[%s] Universal: compacting for file num -- %u\n", - version->cfd()->GetName().c_str(), num_files); + LogToBuffer(log_buffer, + "[%s] Universal: compacting for file num -- %u\n", + cf_name.c_str(), num_files); } } } @@ -639,8 +641,8 @@ uint32_t UniversalCompactionPicker::GetPathId( // the next file in time order. // Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( - const MutableCFOptions& mutable_cf_options, Version* version, - double score, unsigned int ratio, + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, double score, unsigned int ratio, unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) { const int kLevel0 = 0; @@ -650,7 +652,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( ioptions_.compaction_options_universal.max_merge_width; // The files are sorted from newest first to oldest last. - const auto& files = version->LevelFiles(kLevel0); + const auto& files = vstorage->LevelFiles(kLevel0); FileMetaData* f = nullptr; bool done = false; @@ -677,7 +679,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( } LogToBuffer(log_buffer, "[%s] Universal: file %" PRIu64 "[%d] being compacted, skipping", - version->cfd()->GetName().c_str(), f->fd.GetNumber(), loop); + cf_name.c_str(), f->fd.GetNumber(), loop); f = nullptr; } @@ -689,7 +691,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, sizeof(file_num_buf)); LogToBuffer(log_buffer, "[%s] Universal: Possible candidate file %s[%d].", - version->cfd()->GetName().c_str(), file_num_buf, loop); + cf_name.c_str(), file_num_buf, loop); } // Check if the suceeding files need compaction. @@ -740,9 +742,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( LogToBuffer(log_buffer, "[%s] Universal: Skipping file %" PRIu64 "[%d] with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n", - version->cfd()->GetName().c_str(), f->fd.GetNumber(), i, - f->fd.GetFileSize(), f->compensated_file_size, - f->being_compacted); + cf_name.c_str(), f->fd.GetNumber(), i, f->fd.GetFileSize(), + f->compensated_file_size, f->being_compacted); } } } @@ -756,7 +757,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int ratio_to_compress = ioptions_.compaction_options_universal.compression_size_percent; if (ratio_to_compress >= 0) { - uint64_t total_size = version->NumLevelBytes(kLevel0); + uint64_t total_size = vstorage->NumLevelBytes(kLevel0); uint64_t older_file_size = 0; for (unsigned int i = files.size() - 1; i >= first_index_after; i--) { @@ -774,10 +775,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( } uint32_t path_id = GetPathId(ioptions_, estimated_total_size); - Compaction* c = new Compaction(version, kLevel0, kLevel0, - mutable_cf_options.MaxFileSizeForLevel(kLevel0), - LLONG_MAX, path_id, GetCompressionType(ioptions_, kLevel0, - enable_compression)); + Compaction* c = new Compaction( + vstorage->NumberLevels(), kLevel0, kLevel0, + mutable_cf_options.MaxFileSizeForLevel(kLevel0), LLONG_MAX, path_id, + GetCompressionType(ioptions_, kLevel0, enable_compression)); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { @@ -789,8 +790,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( LogToBuffer(log_buffer, "[%s] Universal: Picking file %s[%d] " "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", - version->cfd()->GetName().c_str(), file_num_buf, i, - f->fd.GetFileSize(), f->compensated_file_size); + cf_name.c_str(), file_num_buf, i, f->fd.GetFileSize(), + f->compensated_file_size); } return c; } @@ -802,8 +803,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // min_merge_width and max_merge_width). // Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( - const MutableCFOptions& mutable_cf_options, Version* version, - double score, LogBuffer* log_buffer) { + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, double score, LogBuffer* log_buffer) { const int kLevel = 0; // percentage flexibilty while reducing size amplification @@ -811,7 +812,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( max_size_amplification_percent; // The files are sorted from newest first to oldest last. - const auto& files = version->LevelFiles(kLevel); + const auto& files = vstorage->LevelFiles(kLevel); unsigned int candidate_count = 0; uint64_t candidate_size = 0; @@ -829,7 +830,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, sizeof(file_num_buf)); LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s", - version->cfd()->GetName().c_str(), file_num_buf, loop, + cf_name.c_str(), file_num_buf, loop, " cannot be a candidate to reduce size amp.\n"); f = nullptr; } @@ -842,7 +843,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, sizeof(file_num_buf)); LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s", - version->cfd()->GetName().c_str(), file_num_buf, start_index, + cf_name.c_str(), file_num_buf, start_index, " to reduce size amp.\n"); // keep adding up all the remaining files @@ -854,7 +855,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( sizeof(file_num_buf)); LogToBuffer( log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.", - version->cfd()->GetName().c_str(), file_num_buf, loop, + cf_name.c_str(), file_num_buf, loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -874,14 +875,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( log_buffer, "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64 "earliest-file-size %" PRIu64, - version->cfd()->GetName().c_str(), candidate_size, earliest_file_size); + cf_name.c_str(), candidate_size, earliest_file_size); return nullptr; } else { LogToBuffer( log_buffer, "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64 "earliest-file-size %" PRIu64, - version->cfd()->GetName().c_str(), candidate_size, earliest_file_size); + cf_name.c_str(), candidate_size, earliest_file_size); } assert(start_index < files.size() - 1); @@ -895,29 +896,29 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( // create a compaction request // We always compact all the files, so always compress. Compaction* c = - new Compaction(version, kLevel, kLevel, - mutable_cf_options.MaxFileSizeForLevel(kLevel), - LLONG_MAX, path_id, GetCompressionType(ioptions_, kLevel)); + new Compaction(vstorage->NumberLevels(), kLevel, kLevel, + mutable_cf_options.MaxFileSizeForLevel(kLevel), LLONG_MAX, + path_id, GetCompressionType(ioptions_, kLevel)); c->score_ = score; for (unsigned int loop = start_index; loop < files.size(); loop++) { f = files[loop]; c->inputs_[0].files.push_back(f); LogToBuffer(log_buffer, - "[%s] Universal: size amp picking file %" PRIu64 "[%d] " - "with size %" PRIu64 " (compensated size %" PRIu64 ")", - version->cfd()->GetName().c_str(), - f->fd.GetNumber(), loop, - f->fd.GetFileSize(), f->compensated_file_size); + "[%s] Universal: size amp picking file %" PRIu64 + "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + cf_name.c_str(), f->fd.GetNumber(), loop, f->fd.GetFileSize(), + f->compensated_file_size); } return c; } Compaction* FIFOCompactionPicker::PickCompaction( - const MutableCFOptions& mutable_cf_options, - Version* version, LogBuffer* log_buffer) { - assert(version->NumberLevels() == 1); + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + assert(vstorage->NumberLevels() == 1); const int kLevel0 = 0; - const std::vector& level_files = version->LevelFiles(kLevel0); + const std::vector& level_files = vstorage->LevelFiles(kLevel0); uint64_t total_size = 0; for (const auto& file : level_files) { total_size += file->compensated_file_size; @@ -929,7 +930,7 @@ Compaction* FIFOCompactionPicker::PickCompaction( LogToBuffer(log_buffer, "[%s] FIFO compaction: nothing to do. Total size %" PRIu64 ", max size %" PRIu64 "\n", - version->cfd()->GetName().c_str(), total_size, + cf_name.c_str(), total_size, ioptions_.compaction_options_fifo.max_table_files_size); return nullptr; } @@ -938,11 +939,11 @@ Compaction* FIFOCompactionPicker::PickCompaction( LogToBuffer(log_buffer, "[%s] FIFO compaction: Already executing compaction. No need " "to run parallel compactions since compactions are very fast", - version->cfd()->GetName().c_str()); + cf_name.c_str()); return nullptr; } - Compaction* c = new Compaction(version, 0, 0, 0, 0, 0, kNoCompression, false, + Compaction* c = new Compaction(1, 0, 0, 0, 0, 0, kNoCompression, false, true /* is deletion compaction */); // delete old files (FIFO) for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { @@ -953,8 +954,7 @@ Compaction* FIFOCompactionPicker::PickCompaction( AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 " with size %s for deletion", - version->cfd()->GetName().c_str(), f->fd.GetNumber(), - tmp_fsize); + cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { break; } @@ -967,15 +967,16 @@ Compaction* FIFOCompactionPicker::PickCompaction( } Compaction* FIFOCompactionPicker::CompactRange( - const MutableCFOptions& mutable_cf_options, - Version* version, int input_level, int output_level, + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { assert(input_level == 0); assert(output_level == 0); *compaction_end = nullptr; LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log); - Compaction* c = PickCompaction(mutable_cf_options, version, &log_buffer); + Compaction* c = + PickCompaction(cf_name, mutable_cf_options, vstorage, &log_buffer); if (c != nullptr) { assert(output_path_id < static_cast(ioptions_.db_paths.size())); c->output_path_id_ = output_path_id; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 138b97eb4..d691a765a 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -18,12 +18,13 @@ #include #include #include +#include namespace rocksdb { class LogBuffer; class Compaction; -class Version; +class VersionStorageInfo; class CompactionPicker { public: @@ -35,9 +36,10 @@ class CompactionPicker { // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - virtual Compaction* PickCompaction( - const MutableCFOptions& mutable_cf_options, - Version* version, LogBuffer* log_buffer) = 0; + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + LogBuffer* log_buffer) = 0; // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that @@ -51,9 +53,9 @@ class CompactionPicker { // Client is responsible for compaction_end storage -- when called, // *compaction_end should point to valid InternalKey! virtual Compaction* CompactRange( - const MutableCFOptions& mutable_cf_options, Version* version, - int input_level, int output_level, uint32_t output_path_id, - const InternalKey* begin, const InternalKey* end, + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end); // Given the current number of levels, returns the lowest allowed level @@ -93,18 +95,21 @@ class CompactionPicker { // populated. // // Will return false if it is impossible to apply this compaction. - bool ExpandWhileOverlapping(Compaction* c); + bool ExpandWhileOverlapping(const std::string& cf_name, + VersionStorageInfo* vstorage, Compaction* c); // Returns true if any one of the specified files are being compacted bool FilesInCompaction(std::vector& files); // Returns true if any one of the parent files are being compacted - bool ParentRangeInCompaction(Version* version, const InternalKey* smallest, + bool ParentRangeInCompaction(VersionStorageInfo* vstorage, + const InternalKey* smallest, const InternalKey* largest, int level, int* index); - void SetupOtherInputs(const MutableCFOptions& mutable_cf_options, - Compaction* c); + void SetupOtherInputs(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, Compaction* c); const ImmutableCFOptions& ioptions_; @@ -121,9 +126,10 @@ class UniversalCompactionPicker : public CompactionPicker { UniversalCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction( - const MutableCFOptions& mutable_cf_options, - Version* version, LogBuffer* log_buffer) override; + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + LogBuffer* log_buffer) override; // The maxinum allowed input level. Always return 0. virtual int MaxInputLevel(int current_num_levels) const override { @@ -133,14 +139,14 @@ class UniversalCompactionPicker : public CompactionPicker { private: // Pick Universal compaction to limit read amplification Compaction* PickCompactionUniversalReadAmp( - const MutableCFOptions& mutable_cf_options, - Version* version, double score, unsigned int ratio, + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, double score, unsigned int ratio, unsigned int num_files, LogBuffer* log_buffer); // Pick Universal compaction to limit space amplification. Compaction* PickCompactionUniversalSizeAmp( - const MutableCFOptions& mutable_cf_options, - Version* version, double score, LogBuffer* log_buffer); + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, double score, LogBuffer* log_buffer); // Pick a path ID to place a newly generated file, with its estimated file // size. @@ -153,9 +159,10 @@ class LevelCompactionPicker : public CompactionPicker { LevelCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction( - const MutableCFOptions& mutable_cf_options, - Version* version, LogBuffer* log_buffer) override; + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + LogBuffer* log_buffer) override; // Returns current_num_levels - 2, meaning the last level cannot be // compaction input level. @@ -169,7 +176,8 @@ class LevelCompactionPicker : public CompactionPicker { // If level is 0 and there is already a compaction on that level, this // function will return nullptr. Compaction* PickCompactionBySize(const MutableCFOptions& mutable_cf_options, - Version* version, int level, double score); + VersionStorageInfo* vstorage, int level, + double score); }; class FIFOCompactionPicker : public CompactionPicker { @@ -178,14 +186,15 @@ class FIFOCompactionPicker : public CompactionPicker { const InternalKeyComparator* icmp) : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction( - const MutableCFOptions& mutable_cf_options, - Version* version, LogBuffer* log_buffer) override; + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, + LogBuffer* log_buffer) override; virtual Compaction* CompactRange( - const MutableCFOptions& mutable_cf_options, Version* version, - int input_level, int output_level, uint32_t output_path_id, - const InternalKey* begin, const InternalKey* end, + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, int input_level, int output_level, + uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) override; // The maxinum allowed input level. Always return 0. diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc new file mode 100644 index 000000000..81bffe0af --- /dev/null +++ b/db/compaction_picker_test.cc @@ -0,0 +1,149 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/compaction_picker.h" +#include +#include "util/logging.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace rocksdb { + +class CountingLogger : public Logger { + public: + virtual void Logv(const char* format, va_list ap) override { log_count++; } + size_t log_count; +}; + +class CompactionPickerTest { + public: + const Comparator* ucmp; + InternalKeyComparator icmp; + Options options; + ImmutableCFOptions ioptions; + MutableCFOptions mutable_cf_options; + LevelCompactionPicker level_compaction_picker; + std::string cf_name; + CountingLogger logger; + LogBuffer log_buffer; + VersionStorageInfo vstorage; + uint32_t file_num; + CompactionOptionsFIFO fifo_options; + std::vector size_being_compacted; + + CompactionPickerTest() + : ucmp(BytewiseComparator()), + icmp(ucmp), + ioptions(options), + mutable_cf_options(options, ioptions), + level_compaction_picker(ioptions, &icmp), + cf_name("dummy"), + log_buffer(InfoLogLevel::INFO_LEVEL, &logger), + vstorage(&icmp, ucmp, options.num_levels, kCompactionStyleLevel, + nullptr), + file_num(1) { + fifo_options.max_table_files_size = 1; + mutable_cf_options.RefreshDerivedOptions(ioptions); + size_being_compacted.resize(options.num_levels); + } + + ~CompactionPickerTest() { + auto* files = vstorage.GetFiles(); + for (int i = 0; i < vstorage.NumberLevels(); i++) { + for (auto* f : files[i]) { + delete f; + } + } + } + + void Add(int level, uint32_t file_number, const char* smallest, + const char* largest, uint64_t file_size = 0, uint32_t path_id = 0, + SequenceNumber smallest_seq = 100, + SequenceNumber largest_seq = 100) { + assert(level < vstorage.NumberLevels()); + auto& files = vstorage.GetFiles()[level]; + FileMetaData* f = new FileMetaData; + f->fd = FileDescriptor(file_number, path_id, file_size); + f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); + f->largest = InternalKey(largest, largest_seq, kTypeValue); + f->compensated_file_size = file_size; + files.push_back(f); + } + + void UpdateVersionStorageInfo() { + vstorage.ComputeCompactionScore(mutable_cf_options, fifo_options, + size_being_compacted); + vstorage.UpdateFilesBySize(); + vstorage.UpdateNumNonEmptyLevels(); + vstorage.GenerateFileIndexer(); + vstorage.GenerateLevelFilesBrief(); + vstorage.SetFinalized(); + } +}; + +TEST(CompactionPickerTest, Empty) { + UpdateVersionStorageInfo(); + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name, mutable_cf_options, &vstorage, &log_buffer)); + ASSERT_TRUE(compaction.get() == nullptr); +} + +TEST(CompactionPickerTest, Single) { + mutable_cf_options.level0_file_num_compaction_trigger = 2; + Add(0, 1U, "p", "q"); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name, mutable_cf_options, &vstorage, &log_buffer)); + ASSERT_TRUE(compaction.get() == nullptr); +} + +TEST(CompactionPickerTest, Level0Trigger) { + mutable_cf_options.level0_file_num_compaction_trigger = 2; + Add(0, 1U, "150", "200"); + Add(0, 2U, "200", "250"); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name, mutable_cf_options, &vstorage, &log_buffer)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); +} + +TEST(CompactionPickerTest, Level1Trigger) { + Add(1, 66U, "150", "200", 1000000000U); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name, mutable_cf_options, &vstorage, &log_buffer)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1, compaction->num_input_files(0)); + ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber()); +} + +TEST(CompactionPickerTest, Level1Trigger2) { + Add(1, 66U, "150", "200", 1000000000U); + Add(1, 88U, "201", "300", 1000000000U); + Add(2, 6U, "150", "180", 1000000000U); + Add(2, 7U, "180", "220", 1000000000U); + Add(2, 8U, "220", "300", 1000000000U); + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name, mutable_cf_options, &vstorage, &log_buffer)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1, compaction->num_input_files(0)); + ASSERT_EQ(2, compaction->num_input_files(1)); + ASSERT_EQ(66U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(6U, compaction->input(1, 0)->fd.GetNumber()); + ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber()); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); } diff --git a/db/db_impl.cc b/db/db_impl.cc index a47668763..345188703 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1497,9 +1497,9 @@ Status DBImpl::FlushMemTableToOutputFile( if (madeProgress) { *madeProgress = 1; } - Version::LevelSummaryStorage tmp; + VersionStorageInfo::LevelSummaryStorage tmp; LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(), - cfd->current()->LevelSummary(&tmp)); + cfd->current()->GetStorageInfo()->LevelSummary(&tmp)); if (disable_delete_obsolete_files_ == 0) { // add to deletion state @@ -1545,7 +1545,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, MutexLock l(&mutex_); Version* base = cfd->current(); for (int level = 1; level < cfd->NumberLevels(); level++) { - if (base->OverlapInLevel(level, begin, end)) { + if (base->GetStorageInfo()->OverlapInLevel(level, begin, end)) { max_level_with_files = level; } } @@ -1623,14 +1623,14 @@ bool DBImpl::SetOptions(ColumnFamilyHandle* column_family, int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, int level) { mutex_.AssertHeld(); - Version* current = cfd->current(); + auto* vstorage = cfd->current()->GetStorageInfo(); int minimum_level = level; for (int i = level - 1; i > 0; --i) { // stop if level i is not empty - if (current->NumLevelFiles(i) > 0) break; + if (vstorage->NumLevelFiles(i) > 0) break; // stop if level i is too small (cannot fit the level files) if (mutable_cf_options.MaxBytesForLevel(i) < - current->NumLevelBytes(level)) { + vstorage->NumLevelBytes(level)) { break; } @@ -1682,7 +1682,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { VersionEdit edit; edit.SetColumnFamily(cfd->GetID()); - for (const auto& f : cfd->current()->files_[level]) { + for (const auto& f : cfd->current()->GetStorageInfo()->files_[level]) { edit.DeleteFile(level, f->fd.GetNumber()); edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, @@ -1898,7 +1898,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { bool is_compaction_needed = false; // no need to refcount since we're under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->current()->NeedsCompaction()) { + if (cfd->current()->GetStorageInfo()->NeedsCompaction()) { is_compaction_needed = true; break; } @@ -2269,14 +2269,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, InstallSuperVersionBackground(c->column_family_data(), job_context, *c->mutable_cf_options()); - Version::LevelSummaryStorage tmp; - LogToBuffer( - log_buffer, - "[%s] Moved #%" PRIu64 " to level-%d %" PRIu64 " bytes %s: %s\n", - c->column_family_data()->GetName().c_str(), - f->fd.GetNumber(), c->level() + 1, - f->fd.GetFileSize(), - status.ToString().c_str(), c->input_version()->LevelSummary(&tmp)); + VersionStorageInfo::LevelSummaryStorage tmp; + LogToBuffer(log_buffer, "[%s] Moved #%" PRIu64 " to level-%d %" PRIu64 + " bytes %s: %s\n", + c->column_family_data()->GetName().c_str(), f->fd.GetNumber(), + c->level() + 1, f->fd.GetFileSize(), status.ToString().c_str(), + c->input_version()->GetStorageInfo()->LevelSummary(&tmp)); c->ReleaseCompactionFiles(status); *madeProgress = true; } else { @@ -3008,7 +3006,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, LogToBuffer(log_buffer, "[%s] Compaction start summary: %s\n", cfd->GetName().c_str(), scratch); - assert(cfd->current()->NumLevelFiles(compact->compaction->level()) > 0); + assert(cfd->current()->GetStorageInfo()->NumLevelFiles( + compact->compaction->level()) > 0); assert(compact->builder == nullptr); assert(!compact->outfile); @@ -3246,26 +3245,26 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, status = InstallCompactionResults(compact, mutable_cf_options, log_buffer); InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); } - Version::LevelSummaryStorage tmp; - LogToBuffer( - log_buffer, - "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " - "files in(%d, %d) out(%d) " - "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " - "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", - cfd->GetName().c_str(), cfd->current()->LevelSummary(&tmp), - (stats.bytes_readn + stats.bytes_readnp1) / - static_cast(stats.micros), - stats.bytes_written / static_cast(stats.micros), - compact->compaction->output_level(), stats.files_in_leveln, - stats.files_in_levelnp1, stats.files_out_levelnp1, - stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0, - stats.bytes_written / 1048576.0, - (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) / - (double)stats.bytes_readn, - stats.bytes_written / (double)stats.bytes_readn, - status.ToString().c_str(), stats.num_input_records, - stats.num_dropped_records); + VersionStorageInfo::LevelSummaryStorage tmp; + LogToBuffer(log_buffer, + "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " + "files in(%d, %d) out(%d) " + "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " + "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n", + cfd->GetName().c_str(), + cfd->current()->GetStorageInfo()->LevelSummary(&tmp), + (stats.bytes_readn + stats.bytes_readnp1) / + static_cast(stats.micros), + stats.bytes_written / static_cast(stats.micros), + compact->compaction->output_level(), stats.files_in_leveln, + stats.files_in_levelnp1, stats.files_out_levelnp1, + stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0, + stats.bytes_written / 1048576.0, + (stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) / + (double)stats.bytes_readn, + stats.bytes_written / (double)stats.bytes_readn, + status.ToString().c_str(), stats.num_input_records, + stats.num_dropped_records); return status; } @@ -4375,16 +4374,16 @@ Status DBImpl::DeleteFile(std::string name) { // Only the files in the last level can be deleted externally. // This is to make sure that any deletion tombstones are not // lost. Check that the level passed is the last level. + auto* vstoreage = cfd->current()->GetStorageInfo(); for (int i = level + 1; i < cfd->NumberLevels(); i++) { - if (cfd->current()->NumLevelFiles(i) != 0) { + if (vstoreage->NumLevelFiles(i) != 0) { Log(db_options_.info_log, "DeleteFile %s FAILED. File not in last level\n", name.c_str()); return Status::InvalidArgument("File not in last level"); } } // if level == 0, it has to be the oldest file - if (level == 0 && - cfd->current()->files_[0].back()->fd.GetNumber() != number) { + if (level == 0 && vstoreage->files_[0].back()->fd.GetNumber() != number) { return Status::InvalidArgument("File in level 0, but not oldest"); } edit.SetColumnFamily(cfd->GetID()); @@ -4637,9 +4636,9 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, for (auto cfd : *impl->versions_->GetColumnFamilySet()) { if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { - Version* current = cfd->current(); - for (int i = 1; i < current->NumberLevels(); ++i) { - int num_files = current->NumLevelFiles(i); + auto* vstorage = cfd->current()->GetStorageInfo(); + for (int i = 1; i < vstorage->NumberLevels(); ++i) { + int num_files = vstorage->NumLevelFiles(i); if (num_files > 0) { s = Status::InvalidArgument( "Not all files are at level 0. Cannot " diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 6c073d4d5..a7be59313 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -17,7 +17,8 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { PurgeObsoleteWALFiles(); } uint64_t DBImpl::TEST_GetLevel0TotalSize() { MutexLock l(&mutex_); - return default_cf_handle_->cfd()->current()->NumLevelBytes(0); + return default_cf_handle_->cfd()->current()->GetStorageInfo()->NumLevelBytes( + 0); } Iterator* DBImpl::TEST_NewInternalIterator(Arena* arena, @@ -47,7 +48,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( cfd = cfh->cfd(); } MutexLock l(&mutex_); - return cfd->current()->MaxNextLevelOverlappingBytes(); + return cfd->current()->GetStorageInfo()->MaxNextLevelOverlappingBytes(); } void DBImpl::TEST_GetFilesMetaData( @@ -58,7 +59,8 @@ void DBImpl::TEST_GetFilesMetaData( MutexLock l(&mutex_); metadata->resize(NumberLevels()); for (int level = 0; level < NumberLevels(); level++) { - const std::vector& files = cfd->current()->files_[level]; + const std::vector& files = + cfd->current()->GetStorageInfo()->LevelFiles(level); (*metadata)[level].clear(); for (const auto& f : files) { diff --git a/db/flush_job.cc b/db/flush_job.cc index c4eb12d3c..fda80cea8 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -202,8 +202,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, if (base != nullptr && db_options_.max_background_compactions <= 1 && db_options_.max_background_flushes == 0 && cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { - level = base->PickLevelForMemTableOutput(mutable_cf_options_, - min_user_key, max_user_key); + level = base->GetStorageInfo()->PickLevelForMemTableOutput( + mutable_cf_options_, min_user_key, max_user_key); } edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 04b5b3b34..88415e5b8 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -220,7 +220,8 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, if (!seek_to_first) { user_key = ExtractUserKey(internal_key); } - const std::vector& l0 = sv_->current->LevelFiles(0); + VersionStorageInfo* vstorage = sv_->current->GetStorageInfo(); + const std::vector& l0 = vstorage->LevelFiles(0); for (uint32_t i = 0; i < l0.size(); ++i) { if (seek_to_first) { l0_iters_[i]->SeekToFirst(); @@ -248,9 +249,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, int32_t search_left_bound = 0; int32_t search_right_bound = FileIndexer::kLevelMaxIndex; - for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) { + for (int32_t level = 1; level < vstorage->NumberLevels(); ++level) { const std::vector& level_files = - sv_->current->LevelFiles(level); + vstorage->LevelFiles(level); if (level_files.empty()) { search_left_bound = 0; search_right_bound = FileIndexer::kLevelMaxIndex; @@ -258,7 +259,7 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, } assert(level_iters_[level - 1] != nullptr); uint32_t f_idx = 0; - const auto& indexer = sv_->current->GetIndexer(); + const auto& indexer = vstorage->GetIndexer(); if (!seek_to_first) { if (search_left_bound == search_right_bound) { f_idx = search_left_bound; @@ -428,15 +429,18 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { } mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_); sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_); - const auto& l0_files = sv_->current->LevelFiles(0); + + auto* vstorage = sv_->current->GetStorageInfo(); + const auto& l0_files = vstorage->LevelFiles(0); l0_iters_.reserve(l0_files.size()); for (const auto* l0 : l0_files) { l0_iters_.push_back(cfd_->table_cache()->NewIterator( read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd)); } - level_iters_.reserve(sv_->current->NumberLevels() - 1); - for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) { - const auto& level_files = sv_->current->LevelFiles(level); + level_iters_.reserve(vstorage->NumberLevels() - 1); + for (int32_t level = 1; level < vstorage->NumberLevels(); ++level) { + const auto& level_files = vstorage->LevelFiles(level); + if (level_files.empty()) { level_iters_.push_back(nullptr); } else { @@ -450,7 +454,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) { } void ForwardIterator::ResetIncompleteIterators() { - const auto& l0_files = sv_->current->LevelFiles(0); + const auto& l0_files = sv_->current->GetStorageInfo()->LevelFiles(0); for (uint32_t i = 0; i < l0_iters_.size(); ++i) { assert(i < l0_files.size()); if (!l0_iters_[i]->status().IsIncomplete()) { diff --git a/db/internal_stats.cc b/db/internal_stats.cc index cfeb9c00d..ca0a8d62c 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -169,7 +169,8 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type, const Slice& property, std::string* value) { assert(value != nullptr); - Version* current = cfd_->current(); + auto* current = cfd_->current(); + auto* vstorage = current->GetStorageInfo(); Slice in = property; switch (property_type) { @@ -182,7 +183,7 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type, } else { char buf[100]; snprintf(buf, sizeof(buf), "%d", - current->NumLevelFiles(static_cast(level))); + vstorage->NumLevelFiles(static_cast(level))); *value = buf; return true; } @@ -196,8 +197,8 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type, for (int level = 0; level < number_levels_; level++) { snprintf(buf, sizeof(buf), "%3d %8d %8.0f\n", level, - current->NumLevelFiles(level), - current->NumLevelBytes(level) / kMB); + vstorage->NumLevelFiles(level), + vstorage->NumLevelBytes(level) / kMB); value->append(buf); } return true; @@ -229,7 +230,7 @@ bool InternalStats::GetStringProperty(DBPropertyType property_type, bool InternalStats::GetIntProperty(DBPropertyType property_type, uint64_t* value, DBImpl* db) const { - Version* current = cfd_->current(); + auto* vstorage = cfd_->current()->GetStorageInfo(); switch (property_type) { case kNumImmutableMemTable: @@ -242,7 +243,7 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type, case kCompactionPending: // 1 if the system already determines at least one compacdtion is needed. // 0 otherwise, - *value = (current->NeedsCompaction() ? 1 : 0); + *value = (vstorage->NeedsCompaction() ? 1 : 0); return true; case kBackgroundErrors: // Accumulated number of errors in background flushes or compactions. @@ -270,7 +271,7 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type, // Use estimated entries in tables + total entries in memtables. *value = cfd_->mem()->GetNumEntries() + cfd_->imm()->current()->GetTotalNumEntries() + - current->GetEstimatedActiveKeys(); + vstorage->GetEstimatedActiveKeys(); return true; #ifndef ROCKSDB_LITE case kIsFileDeletionEnabled: @@ -365,24 +366,25 @@ void InternalStats::DumpDBStats(std::string* value) { } void InternalStats::DumpCFStats(std::string* value) { - Version* current = cfd_->current(); + VersionStorageInfo* vstorage = cfd_->current()->GetStorageInfo(); int num_levels_to_check = (cfd_->options()->compaction_style != kCompactionStyleUniversal && cfd_->options()->compaction_style != kCompactionStyleFIFO) - ? current->NumberLevels() - 1 + ? vstorage->NumberLevels() - 1 : 1; + // Compaction scores are sorted base on its value. Restore them to the // level order std::vector compaction_score(number_levels_, 0); for (int i = 0; i < num_levels_to_check; ++i) { - compaction_score[current->compaction_level_[i]] = - current->compaction_score_[i]; + compaction_score[vstorage->compaction_level_[i]] = + vstorage->compaction_score_[i]; } // Count # of files being compacted for each level std::vector files_being_compacted(number_levels_, 0); for (int level = 0; level < num_levels_to_check; ++level) { - for (auto* f : current->files_[level]) { + for (auto* f : vstorage->files_[level]) { if (f->being_compacted) { ++files_being_compacted[level]; } @@ -405,7 +407,7 @@ void InternalStats::DumpCFStats(std::string* value) { uint64_t total_stall_count = 0; double total_stall_us = 0; for (int level = 0; level < number_levels_; level++) { - int files = current->NumLevelFiles(level); + int files = vstorage->NumLevelFiles(level); total_files += files; total_files_being_compacted += files_being_compacted[level]; if (comp_stats_[level].micros > 0 || files > 0) { @@ -424,7 +426,7 @@ void InternalStats::DumpCFStats(std::string* value) { stall_leveln_slowdown_hard_[level]); stats_sum.Add(comp_stats_[level]); - total_file_size += current->NumLevelBytes(level); + total_file_size += vstorage->NumLevelBytes(level); total_stall_us += stall_us; total_stall_count += stalls; total_slowdown_soft += stall_leveln_slowdown_soft_[level]; @@ -439,10 +441,10 @@ void InternalStats::DumpCFStats(std::string* value) { double w_amp = (comp_stats_[level].bytes_readn == 0) ? 0.0 : comp_stats_[level].bytes_written / static_cast(comp_stats_[level].bytes_readn); - PrintLevelStats(buf, sizeof(buf), "L" + std::to_string(level), - files, files_being_compacted[level], current->NumLevelBytes(level), - compaction_score[level], rw_amp, w_amp, stall_us, stalls, - comp_stats_[level]); + PrintLevelStats(buf, sizeof(buf), "L" + std::to_string(level), files, + files_being_compacted[level], + vstorage->NumLevelBytes(level), compaction_score[level], + rw_amp, w_amp, stall_us, stalls, comp_stats_[level]); value->append(buf); } } diff --git a/db/version_set.cc b/db/version_set.cc index 6a68c373e..0069ef6b0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -305,6 +305,8 @@ class FilePicker { }; } // anonymous namespace +VersionStorageInfo::~VersionStorageInfo() { delete[] files_; } + Version::~Version() { assert(refs_ == 0); @@ -313,9 +315,9 @@ Version::~Version() { next_->prev_ = prev_; // Drop references to files - for (int level = 0; level < num_levels_; level++) { - for (size_t i = 0; i < files_[level].size(); i++) { - FileMetaData* f = files_[level][i]; + for (int level = 0; level < vstorage_.num_levels_; level++) { + for (size_t i = 0; i < vstorage_.files_[level].size(); i++) { + FileMetaData* f = vstorage_.files_[level][i]; assert(f->refs > 0); f->refs--; if (f->refs <= 0) { @@ -327,7 +329,6 @@ Version::~Version() { } } } - delete[] files_; } int FindFile(const InternalKeyComparator& icmp, @@ -564,8 +565,8 @@ Status Version::GetTableProperties(std::shared_ptr* tp, } Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { - for (int level = 0; level < num_levels_; level++) { - for (const auto& file_meta : files_[level]) { + for (int level = 0; level < vstorage_.num_levels_; level++) { + for (const auto& file_meta : vstorage_.files_[level]) { auto fname = TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), file_meta->fd.GetPathId()); @@ -586,7 +587,7 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { size_t Version::GetMemoryUsageByTableReaders() { size_t total_usage = 0; - for (auto& file_level : level_files_brief_) { + for (auto& file_level : vstorage_.level_files_brief_) { for (size_t i = 0; i < file_level.num_files; i++) { total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader( vset_->env_options_, cfd_->internal_comparator(), @@ -596,7 +597,7 @@ size_t Version::GetMemoryUsageByTableReaders() { return total_usage; } -uint64_t Version::GetEstimatedActiveKeys() { +uint64_t VersionStorageInfo::GetEstimatedActiveKeys() { // Estimation will be not accurate when: // (1) there is merge keys // (2) keys are directly overwritten @@ -619,11 +620,11 @@ uint64_t Version::GetEstimatedActiveKeys() { void Version::AddIterators(const ReadOptions& read_options, const EnvOptions& soptions, MergeIteratorBuilder* merge_iter_builder) { - assert(finalized_); + assert(vstorage_.finalized_); // Merge all level zero files together since they may overlap - for (size_t i = 0; i < level_files_brief_[0].num_files; i++) { - const auto& file = level_files_brief_[0].files[i]; + for (size_t i = 0; i < vstorage_.level_files_brief_[0].num_files; i++) { + const auto& file = vstorage_.level_files_brief_[0].files[i]; merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator( read_options, soptions, cfd_->internal_comparator(), file.fd, nullptr, false, merge_iter_builder->GetArena())); @@ -632,50 +633,36 @@ void Version::AddIterators(const ReadOptions& read_options, // For levels > 0, we can use a concatenating iterator that sequentially // walks through the non-overlapping files in the level, opening them // lazily. - for (int level = 1; level < num_levels_; level++) { - if (level_files_brief_[level].num_files != 0) { + for (int level = 1; level < vstorage_.num_levels_; level++) { + if (vstorage_.level_files_brief_[level].num_files != 0) { merge_iter_builder->AddIterator(NewTwoLevelIterator( new LevelFileIteratorState( cfd_->table_cache(), read_options, soptions, cfd_->internal_comparator(), false /* for_compaction */, cfd_->ioptions()->prefix_extractor != nullptr), new LevelFileNumIterator(cfd_->internal_comparator(), - &level_files_brief_[level]), merge_iter_builder->GetArena())); + &vstorage_.level_files_brief_[level]), + merge_iter_builder->GetArena())); } } } - - -Version::Version(ColumnFamilyData* cfd, VersionSet* vset, - uint64_t version_number) - : cfd_(cfd), - internal_comparator_((cfd == nullptr) ? nullptr - : &cfd->internal_comparator()), - user_comparator_( - (cfd == nullptr) ? nullptr : internal_comparator_->user_comparator()), - table_cache_((cfd == nullptr) ? nullptr : cfd->table_cache()), - merge_operator_((cfd == nullptr) ? nullptr - : cfd->ioptions()->merge_operator), - info_log_((cfd == nullptr) ? nullptr : cfd->ioptions()->info_log), - db_statistics_((cfd == nullptr) ? nullptr - : cfd->ioptions()->statistics), +VersionStorageInfo::VersionStorageInfo( + const InternalKeyComparator* internal_comparator, + const Comparator* user_comparator, int num_levels, + CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage) + : internal_comparator_(internal_comparator), + user_comparator_(user_comparator), // cfd is nullptr if Version is dummy - num_levels_(cfd == nullptr ? 0 : cfd->NumberLevels()), + num_levels_(num_levels), num_non_empty_levels_(num_levels_), - file_indexer_(cfd == nullptr - ? nullptr - : cfd->internal_comparator().user_comparator()), - vset_(vset), - next_(this), - prev_(this), - refs_(0), + file_indexer_(user_comparator), + compaction_style_(compaction_style), files_(new std::vector[num_levels_]), files_by_size_(num_levels_), next_file_to_compact_by_size_(num_levels_), compaction_score_(num_levels_), compaction_level_(num_levels_), - version_number_(version_number), accumulated_file_size_(0), accumulated_raw_key_size_(0), accumulated_raw_value_size_(0), @@ -683,18 +670,39 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, accumulated_num_deletions_(0), num_samples_(0), finalized_(false) { - if (cfd != nullptr && cfd->current() != nullptr) { - accumulated_file_size_ = cfd->current()->accumulated_file_size_; - accumulated_raw_key_size_ = cfd->current()->accumulated_raw_key_size_; - accumulated_raw_value_size_ = - cfd->current()->accumulated_raw_value_size_; - accumulated_num_non_deletions_ = - cfd->current()->accumulated_num_non_deletions_; - accumulated_num_deletions_ = cfd->current()->accumulated_num_deletions_; - num_samples_ = cfd->current()->num_samples_; + if (ref_vstorage != nullptr) { + accumulated_file_size_ = ref_vstorage->accumulated_file_size_; + accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_; + accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_; + accumulated_num_non_deletions_ = + ref_vstorage->accumulated_num_non_deletions_; + accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_; + num_samples_ = ref_vstorage->num_samples_; } } +Version::Version(ColumnFamilyData* cfd, VersionSet* vset, + uint64_t version_number) + : cfd_(cfd), + info_log_((cfd == nullptr) ? nullptr : cfd->ioptions()->info_log), + db_statistics_((cfd == nullptr) ? nullptr : cfd->ioptions()->statistics), + table_cache_((cfd == nullptr) ? nullptr : cfd->table_cache()), + merge_operator_((cfd == nullptr) ? nullptr + : cfd->ioptions()->merge_operator), + vstorage_((cfd == nullptr) ? nullptr : &cfd->internal_comparator(), + (cfd == nullptr) ? nullptr : cfd->user_comparator(), + cfd == nullptr ? 0 : cfd->NumberLevels(), + cfd == nullptr ? kCompactionStyleLevel + : cfd->ioptions()->compaction_style, + (cfd == nullptr || cfd->current() == nullptr) + ? nullptr + : cfd->current()->GetStorageInfo()), + vset_(vset), + next_(this), + prev_(this), + refs_(0), + version_number_(version_number) {} + void Version::Get(const ReadOptions& read_options, const LookupKey& k, std::string* value, @@ -706,16 +714,17 @@ void Version::Get(const ReadOptions& read_options, assert(status->ok() || status->IsMergeInProgress()); - GetContext get_context(user_comparator_, merge_operator_, info_log_, - db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge, - user_key, value, value_found, merge_context); + GetContext get_context( + GetUserComparator(), merge_operator_, info_log_, db_statistics_, + status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key, + value, value_found, merge_context); - FilePicker fp(files_, user_key, ikey, &level_files_brief_, - num_non_empty_levels_, &file_indexer_, user_comparator_, - internal_comparator_); + FilePicker fp(vstorage_.files_, user_key, ikey, &vstorage_.level_files_brief_, + vstorage_.num_non_empty_levels_, &vstorage_.file_indexer_, + GetUserComparator(), GetInternalComparator()); FdWithKeyRange* f = fp.GetNextFile(); while (f != nullptr) { - *status = table_cache_->Get(read_options, *internal_comparator_, f->fd, + *status = table_cache_->Get(read_options, *GetInternalComparator(), f->fd, ikey, &get_context); // TODO: examine the behavior for corrupted key if (!status->ok()) { @@ -763,7 +772,7 @@ void Version::Get(const ReadOptions& read_options, } } -void Version::GenerateLevelFilesBrief() { +void VersionStorageInfo::GenerateLevelFilesBrief() { level_files_brief_.resize(num_non_empty_levels_); for (int level = 0; level < num_non_empty_levels_; level++) { DoGenerateLevelFilesBrief( @@ -774,11 +783,13 @@ void Version::GenerateLevelFilesBrief() { void Version::PrepareApply(const MutableCFOptions& mutable_cf_options, std::vector& size_being_compacted) { UpdateAccumulatedStats(); - ComputeCompactionScore(mutable_cf_options, size_being_compacted); - UpdateFilesBySize(); - UpdateNumNonEmptyLevels(); - file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_); - GenerateLevelFilesBrief(); + vstorage_.ComputeCompactionScore(mutable_cf_options, + cfd_->ioptions()->compaction_options_fifo, + size_being_compacted); + vstorage_.UpdateFilesBySize(); + vstorage_.UpdateNumNonEmptyLevels(); + vstorage_.GenerateFileIndexer(); + vstorage_.GenerateLevelFilesBrief(); } bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { @@ -804,7 +815,7 @@ bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { return true; } -void Version::UpdateAccumulatedStats(FileMetaData* file_meta) { +void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) { assert(file_meta->init_stats_from_file); accumulated_file_size_ += file_meta->fd.GetFileSize(); accumulated_raw_key_size_ += file_meta->raw_key_size; @@ -816,8 +827,6 @@ void Version::UpdateAccumulatedStats(FileMetaData* file_meta) { } void Version::UpdateAccumulatedStats() { - static const int kDeletionWeightOnCompaction = 2; - // maximum number of table properties loaded from files. const int kMaxInitCount = 20; int init_count = 0; @@ -832,11 +841,11 @@ void Version::UpdateAccumulatedStats() { // will be triggered, which creates higher-level files whose num_deletions // will be updated here. for (int level = 0; - level < num_levels_ && init_count < kMaxInitCount; ++level) { - for (auto* file_meta : files_[level]) { + level < vstorage_.num_levels_ && init_count < kMaxInitCount; ++level) { + for (auto* file_meta : vstorage_.files_[level]) { if (MaybeInitializeFileMetaData(file_meta)) { // each FileMeta will be initialized only once. - UpdateAccumulatedStats(file_meta); + vstorage_.UpdateAccumulatedStats(file_meta); if (++init_count >= kMaxInitCount) { break; } @@ -846,16 +855,21 @@ void Version::UpdateAccumulatedStats() { // In case all sampled-files contain only deletion entries, then we // load the table-property of a file in higher-level to initialize // that value. - for (int level = num_levels_ - 1; - accumulated_raw_value_size_ == 0 && level >= 0; --level) { - for (int i = static_cast(files_[level].size()) - 1; - accumulated_raw_value_size_ == 0 && i >= 0; --i) { - if (MaybeInitializeFileMetaData(files_[level][i])) { - UpdateAccumulatedStats(files_[level][i]); + for (int level = vstorage_.num_levels_ - 1; + vstorage_.accumulated_raw_value_size_ == 0 && level >= 0; --level) { + for (int i = static_cast(vstorage_.files_[level].size()) - 1; + vstorage_.accumulated_raw_value_size_ == 0 && i >= 0; --i) { + if (MaybeInitializeFileMetaData(vstorage_.files_[level][i])) { + vstorage_.UpdateAccumulatedStats(vstorage_.files_[level][i]); } } } + vstorage_.ComputeCompensatedSizes(); +} + +void VersionStorageInfo::ComputeCompensatedSizes() { + static const int kDeletionWeightOnCompaction = 2; uint64_t average_value_size = GetAverageValueSize(); // compute the compensated size @@ -872,15 +886,21 @@ void Version::UpdateAccumulatedStats() { } } -void Version::ComputeCompactionScore( +int VersionStorageInfo::MaxInputLevel() const { + if (compaction_style_ == kCompactionStyleLevel) { + return NumberLevels() - 2; + } + return 0; +} + +void VersionStorageInfo::ComputeCompactionScore( const MutableCFOptions& mutable_cf_options, + const CompactionOptionsFIFO& compaction_options_fifo, std::vector& size_being_compacted) { double max_score = 0; int max_score_level = 0; - int max_input_level = - cfd_->compaction_picker()->MaxInputLevel(NumberLevels()); - for (int level = 0; level <= max_input_level; level++) { + for (int level = 0; level <= MaxInputLevel(); level++) { double score; if (level == 0) { // We treat level-0 specially by bounding the number of files @@ -902,9 +922,9 @@ void Version::ComputeCompactionScore( numfiles++; } } - if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { + if (compaction_style_ == kCompactionStyleFIFO) { score = static_cast(total_size) / - cfd_->ioptions()->compaction_options_fifo.max_table_files_size; + compaction_options_fifo.max_table_files_size; } else if (numfiles >= mutable_cf_options.level0_stop_writes_trigger) { // If we are slowing down writes, then we better compact that first score = 1000000; @@ -967,7 +987,7 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) { } // anonymous namespace -void Version::UpdateNumNonEmptyLevels() { +void VersionStorageInfo::UpdateNumNonEmptyLevels() { num_non_empty_levels_ = num_levels_; for (int i = num_levels_ - 1; i >= 0; i--) { if (files_[i].size() != 0) { @@ -978,9 +998,9 @@ void Version::UpdateNumNonEmptyLevels() { } } -void Version::UpdateFilesBySize() { - if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO || - cfd_->ioptions()->compaction_style == kCompactionStyleUniversal) { +void VersionStorageInfo::UpdateFilesBySize() { + if (compaction_style_ == kCompactionStyleFIFO || + compaction_style_ == kCompactionStyleUniversal) { // don't need this return; } @@ -997,8 +1017,8 @@ void Version::UpdateFilesBySize() { temp[i].file = files[i]; } - // sort the top kNumberFilesToSort based on file size - size_t num = Version::kNumberFilesToSort; + // sort the top number_of_files_to_sort_ based on file size + size_t num = VersionStorageInfo::kNumberFilesToSort; if (num > temp.size()) { num = temp.size(); } @@ -1029,7 +1049,7 @@ bool Version::Unref() { return false; } -bool Version::NeedsCompaction() const { +bool VersionStorageInfo::NeedsCompaction() const { // In universal compaction case, this check doesn't really // check the compaction condition, but checks num of files threshold // only. We are not going to miss any compaction opportunity @@ -1037,10 +1057,7 @@ bool Version::NeedsCompaction() const { // ending up with nothing to do. We can improve it later. // TODO(sdong): improve this function to be accurate for universal // compactions. - int max_input_level = - cfd_->compaction_picker()->MaxInputLevel(NumberLevels()); - - for (int i = 0; i <= max_input_level; i++) { + for (int i = 0; i <= MaxInputLevel(); i++) { if (compaction_score_[i] >= 1) { return true; } @@ -1048,17 +1065,16 @@ bool Version::NeedsCompaction() const { return false; } -bool Version::OverlapInLevel(int level, - const Slice* smallest_user_key, - const Slice* largest_user_key) { - return SomeFileOverlapsRange(cfd_->internal_comparator(), (level > 0), +bool VersionStorageInfo::OverlapInLevel(int level, + const Slice* smallest_user_key, + const Slice* largest_user_key) { + return SomeFileOverlapsRange(*internal_comparator_, (level > 0), level_files_brief_[level], smallest_user_key, largest_user_key); } -int Version::PickLevelForMemTableOutput( - const MutableCFOptions& mutable_cf_options, - const Slice& smallest_user_key, +int VersionStorageInfo::PickLevelForMemTableOutput( + const MutableCFOptions& mutable_cf_options, const Slice& smallest_user_key, const Slice& largest_user_key) { int level = 0; if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) { @@ -1092,12 +1108,9 @@ int Version::PickLevelForMemTableOutput( // If hint_index is specified, then it points to a file in the // overlapping range. // The file_index returns a pointer to any file in an overlapping range. -void Version::GetOverlappingInputs(int level, - const InternalKey* begin, - const InternalKey* end, - std::vector* inputs, - int hint_index, - int* file_index) { +void VersionStorageInfo::GetOverlappingInputs( + int level, const InternalKey* begin, const InternalKey* end, + std::vector* inputs, int hint_index, int* file_index) { inputs->clear(); Slice user_begin, user_end; if (begin != nullptr) { @@ -1109,7 +1122,7 @@ void Version::GetOverlappingInputs(int level, if (file_index) { *file_index = -1; } - const Comparator* user_cmp = cfd_->internal_comparator().user_comparator(); + const Comparator* user_cmp = user_comparator_; if (begin != nullptr && end != nullptr && level > 0) { GetOverlappingInputsBinarySearch(level, user_begin, user_end, inputs, hint_index, file_index); @@ -1149,19 +1162,15 @@ void Version::GetOverlappingInputs(int level, // Employ binary search to find at least one file that overlaps the // specified range. From that file, iterate backwards and // forwards to find all overlapping files. -void Version::GetOverlappingInputsBinarySearch( - int level, - const Slice& user_begin, - const Slice& user_end, - std::vector* inputs, - int hint_index, - int* file_index) { +void VersionStorageInfo::GetOverlappingInputsBinarySearch( + int level, const Slice& user_begin, const Slice& user_end, + std::vector* inputs, int hint_index, int* file_index) { assert(level > 0); int min = 0; int mid = 0; int max = files_[level].size() -1; bool foundOverlap = false; - const Comparator* user_cmp = cfd_->internal_comparator().user_comparator(); + const Comparator* user_cmp = user_comparator_; // if the caller already knows the index of a file that has overlap, // then we can skip the binary search. @@ -1200,15 +1209,12 @@ void Version::GetOverlappingInputsBinarySearch( // The midIndex specifies the index of at least one file that // overlaps the specified range. From that file, iterate backward // and forward to find all overlapping files. -// Use LevelFilesBrief in searching, make it faster -void Version::ExtendOverlappingInputs( - int level, - const Slice& user_begin, - const Slice& user_end, - std::vector* inputs, - unsigned int midIndex) { - - const Comparator* user_cmp = cfd_->internal_comparator().user_comparator(); +// Use FileLevel in searching, make it faster +void VersionStorageInfo::ExtendOverlappingInputs( + int level, const Slice& user_begin, const Slice& user_end, + std::vector* inputs, unsigned int midIndex) { + + const Comparator* user_cmp = user_comparator_; const FdWithKeyRange* files = level_files_brief_[level].files; #ifndef NDEBUG { @@ -1264,9 +1270,8 @@ void Version::ExtendOverlappingInputs( // an overlapping user key to the file "just outside" of it (i.e. // just after the last file, or just before the first file) // REQUIRES: "*inputs" is a sorted list of non-overlapping files -bool Version::HasOverlappingUserKey( - const std::vector* inputs, - int level) { +bool VersionStorageInfo::HasOverlappingUserKey( + const std::vector* inputs, int level) { // If inputs empty, there is no overlap. // If level == 0, it is assumed that all needed files were already included. @@ -1274,13 +1279,13 @@ bool Version::HasOverlappingUserKey( return false; } - const Comparator* user_cmp = cfd_->internal_comparator().user_comparator(); - const LevelFilesBrief& file_level = level_files_brief_[level]; + const Comparator* user_cmp = user_comparator_; + const rocksdb::LevelFilesBrief& file_level = level_files_brief_[level]; const FdWithKeyRange* files = level_files_brief_[level].files; const size_t kNumFiles = file_level.num_files; // Check the last file in inputs against the file after it - size_t last_file = FindFile(cfd_->internal_comparator(), file_level, + size_t last_file = FindFile(*internal_comparator_, file_level, inputs->back()->largest.Encode()); assert(last_file < kNumFiles); // File should exist! if (last_file < kNumFiles-1) { // If not the last file @@ -1295,7 +1300,7 @@ bool Version::HasOverlappingUserKey( } // Check the first file in inputs against the file just before it - size_t first_file = FindFile(cfd_->internal_comparator(), file_level, + size_t first_file = FindFile(*internal_comparator_, file_level, inputs->front()->smallest.Encode()); assert(first_file <= last_file); // File should exist! if (first_file > 0) { // If not first file @@ -1312,13 +1317,14 @@ bool Version::HasOverlappingUserKey( return false; } -uint64_t Version::NumLevelBytes(int level) const { +uint64_t VersionStorageInfo::NumLevelBytes(int level) const { assert(level >= 0); assert(level < NumberLevels()); return TotalFileSize(files_[level]); } -const char* Version::LevelSummary(LevelSummaryStorage* scratch) const { +const char* VersionStorageInfo::LevelSummary( + LevelSummaryStorage* scratch) const { int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files["); for (int i = 0; i < NumberLevels(); i++) { int sz = sizeof(scratch->buffer) - len; @@ -1334,8 +1340,8 @@ const char* Version::LevelSummary(LevelSummaryStorage* scratch) const { return scratch->buffer; } -const char* Version::LevelFileSummary(FileSummaryStorage* scratch, - int level) const { +const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch, + int level) const { int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); for (const auto& f : files_[level]) { int sz = sizeof(scratch->buffer) - len; @@ -1357,7 +1363,7 @@ const char* Version::LevelFileSummary(FileSummaryStorage* scratch, return scratch->buffer; } -int64_t Version::MaxNextLevelOverlappingBytes() { +int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() { uint64_t result = 0; std::vector overlaps; for (int level = 1; level < NumberLevels() - 1; level++) { @@ -1373,8 +1379,8 @@ int64_t Version::MaxNextLevelOverlappingBytes() { } void Version::AddLiveFiles(std::vector* live) { - for (int level = 0; level < NumberLevels(); level++) { - const std::vector& files = files_[level]; + for (int level = 0; level < vstorage_.NumberLevels(); level++) { + const std::vector& files = vstorage_.files_[level]; for (const auto& file : files) { live->push_back(file->fd); } @@ -1383,7 +1389,7 @@ void Version::AddLiveFiles(std::vector* live) { std::string Version::DebugString(bool hex) const { std::string r; - for (int level = 0; level < num_levels_; level++) { + for (int level = 0; level < vstorage_.num_levels_; level++) { // E.g., // --- level 1 --- // 17:123['a' .. 'd'] @@ -1393,7 +1399,7 @@ std::string Version::DebugString(bool hex) const { r.append(" --- version# "); AppendNumberTo(&r, version_number_); r.append(" ---\n"); - const std::vector& files = files_[level]; + const std::vector& files = vstorage_.files_[level]; for (size_t i = 0; i < files.size(); i++) { r.push_back(' '); AppendNumberTo(&r, files[i]->fd.GetNumber()); @@ -1427,7 +1433,7 @@ struct VersionSet::ManifestWriter { // Versions that contain full copies of the intermediate state. class VersionSet::Builder { private: - // Helper to sort v->files_ + // Helper to sort files_ in v // kLevel0 -- NewestFirstBySeqNo // kLevelNon0 -- BySmallestKey struct FileComparator { @@ -1464,19 +1470,21 @@ class VersionSet::Builder { public: Builder(ColumnFamilyData* cfd) : cfd_(cfd), base_(cfd->current()) { base_->Ref(); - levels_ = new LevelState[base_->NumberLevels()]; + levels_ = new LevelState[base_->GetStorageInfo()->NumberLevels()]; level_zero_cmp_.sort_method = FileComparator::kLevel0; level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; level_nonzero_cmp_.internal_comparator = &cfd->internal_comparator(); levels_[0].added_files = new FileSet(level_zero_cmp_); - for (int level = 1; level < base_->NumberLevels(); level++) { + for (int level = 1; level < base_->GetStorageInfo()->NumberLevels(); + level++) { levels_[level].added_files = new FileSet(level_nonzero_cmp_); } } ~Builder() { - for (int level = 0; level < base_->NumberLevels(); level++) { + for (int level = 0; level < base_->GetStorageInfo()->NumberLevels(); + level++) { const FileSet* added = levels_[level].added_files; std::vector to_unref; to_unref.reserve(added->size()); @@ -1505,10 +1513,11 @@ class VersionSet::Builder { void CheckConsistency(Version* v) { #ifndef NDEBUG // make sure the files are sorted correctly - for (int level = 0; level < v->NumberLevels(); level++) { - for (size_t i = 1; i < v->files_[level].size(); i++) { - auto f1 = v->files_[level][i - 1]; - auto f2 = v->files_[level][i]; + auto* files = v->GetFiles(); + for (int level = 0; level < v->GetStorageInfo()->NumberLevels(); level++) { + for (size_t i = 1; i < files[level].size(); i++) { + auto f1 = files[level][i - 1]; + auto f2 = files[level][i]; if (level == 0) { assert(level_zero_cmp_(f1, f2)); assert(f1->largest_seqno > f2->largest_seqno); @@ -1534,8 +1543,10 @@ class VersionSet::Builder { #ifndef NDEBUG // a file to be deleted better exist in the previous version bool found = false; - for (int l = 0; !found && l < base_->NumberLevels(); l++) { - const std::vector& base_files = base_->files_[l]; + auto* files = base_->GetFiles(); + for (int l = 0; !found && l < base_->GetStorageInfo()->NumberLevels(); + l++) { + const std::vector& base_files = files[l]; for (unsigned int i = 0; i < base_files.size(); i++) { FileMetaData* f = base_files[i]; if (f->fd.GetNumber() == number) { @@ -1547,7 +1558,8 @@ class VersionSet::Builder { // if the file did not exist in the previous version, then it // is possibly moved from lower level to higher level in current // version - for (int l = level+1; !found && l < base_->NumberLevels(); l++) { + for (int l = level + 1; + !found && l < base_->GetStorageInfo()->NumberLevels(); l++) { const FileSet* added = levels_[l].added_files; for (FileSet::const_iterator added_iter = added->begin(); added_iter != added->end(); ++added_iter) { @@ -1607,15 +1619,17 @@ class VersionSet::Builder { CheckConsistency(base_); CheckConsistency(v); - for (int level = 0; level < base_->NumberLevels(); level++) { + auto* out_files = v->GetFiles(); + for (int level = 0; level < base_->GetStorageInfo()->NumberLevels(); + level++) { const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. - const auto& base_files = base_->files_[level]; + const auto& base_files = base_->GetStorageInfo()->LevelFiles(level); auto base_iter = base_files.begin(); auto base_end = base_files.end(); const auto& added_files = *levels_[level].added_files; - v->files_[level].reserve(base_files.size() + added_files.size()); + out_files[level].reserve(base_files.size() + added_files.size()); for (const auto& added : added_files) { // Add all smaller files listed in base_ @@ -1642,7 +1656,7 @@ class VersionSet::Builder { for (auto& file_meta : *(levels_[level].added_files)) { assert (!file_meta->table_reader_handle); cfd_->table_cache()->FindTable( - base_->vset_->env_options_, cfd_->internal_comparator(), + base_->GetVersionSet()->env_options_, cfd_->internal_comparator(), file_meta->fd, &file_meta->table_reader_handle, false); if (file_meta->table_reader_handle != nullptr) { // Load table_reader @@ -1658,14 +1672,16 @@ class VersionSet::Builder { if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) { // File is deleted: do nothing } else { - auto* files = &v->files_[level]; - if (level > 0 && !files->empty()) { + auto* files = v->GetFiles(); + auto* level_files = &files[level]; + if (level > 0 && !level_files->empty()) { // Must not overlap assert(cfd_->internal_comparator().Compare( - (*files)[files->size() - 1]->largest, f->smallest) < 0); + (*level_files)[level_files->size() - 1]->largest, + f->smallest) < 0); } f->refs++; - files->push_back(f); + level_files->push_back(f); } } }; @@ -1701,7 +1717,7 @@ VersionSet::~VersionSet() { void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Version* v) { // Mark v finalized - v->finalized_ = true; + v->vstorage_.SetFinalized(); // Make "v" current assert(v->refs_ == 0); @@ -1812,7 +1828,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, { std::vector size_being_compacted; if (!edit->IsColumnFamilyManipulation()) { - size_being_compacted.resize(v->NumberLevels() - 1); + size_being_compacted.resize(v->GetStorageInfo()->NumberLevels() - 1); // calculate the amount of data being compacted at every level column_family_data->compaction_picker()->SizeBeingCompacted( size_being_compacted); @@ -2172,7 +2188,8 @@ Status VersionSet::Recover( cfd = column_family_set_->GetColumnFamily(edit.column_family_); // this should never happen since cf_in_builders is true assert(cfd != nullptr); - if (edit.max_level_ >= cfd->current()->NumberLevels()) { + if (edit.max_level_ >= + cfd->current()->GetStorageInfo()->NumberLevels()) { s = Status::InvalidArgument( "db has more levels than options.num_levels"); break; @@ -2275,7 +2292,8 @@ Status VersionSet::Recover( builder->SaveTo(v); // Install recovered version - std::vector size_being_compacted(v->NumberLevels() - 1); + std::vector size_being_compacted( + v->GetStorageInfo()->NumberLevels() - 1); cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); AppendVersion(cfd, v); @@ -2407,7 +2425,8 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, Version* current_version = versions.GetColumnFamilySet()->GetDefault()->current(); - int current_levels = current_version->NumberLevels(); + auto* vstorage = current_version->GetStorageInfo(); + int current_levels = vstorage->NumberLevels(); if (current_levels <= new_levels) { return Status::OK(); @@ -2418,7 +2437,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, int first_nonempty_level = -1; int first_nonempty_level_filenum = 0; for (int i = new_levels - 1; i < current_levels; i++) { - int file_num = current_version->NumLevelFiles(i); + int file_num = vstorage->NumLevelFiles(i); if (file_num != 0) { if (first_nonempty_level < 0) { first_nonempty_level = i; @@ -2435,7 +2454,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, } } - std::vector* old_files_list = current_version->files_; + std::vector* old_files_list = vstorage->GetFiles(); // we need to allocate an array with the old number of levels size to // avoid SIGSEGV in WriteSnapshot() // however, all levels bigger or equal to new_levels will be empty @@ -2449,9 +2468,9 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, new_files_list[new_levels - 1] = old_files_list[first_nonempty_level]; } - delete[] current_version->files_; - current_version->files_ = new_files_list; - current_version->num_levels_ = new_levels; + delete[] vstorage -> files_; + vstorage->files_ = new_files_list; + vstorage->num_levels_ = new_levels; MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options)); VersionEdit ve; @@ -2609,7 +2628,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, Version* v = new Version(cfd, this, current_version_number_++); builder->SaveTo(v); - std::vector size_being_compacted(v->NumberLevels() - 1); + std::vector size_being_compacted( + v->GetStorageInfo()->NumberLevels() - 1); cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); delete builder; @@ -2686,7 +2706,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { edit.SetColumnFamily(cfd->GetID()); for (int level = 0; level < cfd->NumberLevels(); level++) { - for (const auto& f : cfd->current()->files_[level]) { + auto* files = cfd->current()->GetFiles(); + for (const auto& f : files[level]) { edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); @@ -2741,8 +2762,9 @@ bool VersionSet::ManifestContains(uint64_t manifest_file_number, uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t result = 0; - for (int level = 0; level < v->NumberLevels(); level++) { - const std::vector& files = v->files_[level]; + auto* vstorage = v->GetStorageInfo(); + for (int level = 0; level < vstorage->NumberLevels(); level++) { + const std::vector& files = vstorage->LevelFiles(level); for (size_t i = 0; i < files.size(); i++) { if (v->cfd_->internal_comparator().Compare(files[i]->largest, ikey) <= 0) { @@ -2781,8 +2803,9 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { Version* dummy_versions = cfd->dummy_versions(); for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { - for (int level = 0; level < v->NumberLevels(); level++) { - total_files += v->files_[level].size(); + auto* vstorage = v->GetStorageInfo(); + for (int level = 0; level < vstorage->NumberLevels(); level++) { + total_files += vstorage->LevelFiles(level).size(); } } } @@ -2794,8 +2817,9 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { Version* dummy_versions = cfd->dummy_versions(); for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { - for (int level = 0; level < v->NumberLevels(); level++) { - for (const auto& f : v->files_[level]) { + auto* vstorage = v->GetStorageInfo(); + for (int level = 0; level < vstorage->NumberLevels(); level++) { + for (const auto& f : vstorage->LevelFiles(level)) { live_list->push_back(f->fd); } } @@ -2851,6 +2875,7 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { #ifndef NDEBUG Version* version = c->column_family_data()->current(); + VersionStorageInfo* vstorage = version->GetStorageInfo(); if (c->input_version() != version) { Log(db_options_->info_log, "[%s] VerifyCompactionFileConsistency version mismatch", @@ -2864,8 +2889,8 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { // look for this file in the current version bool found = false; - for (unsigned int j = 0; j < version->files_[level].size(); j++) { - FileMetaData* f = version->files_[level][j]; + for (unsigned int j = 0; j < vstorage->files_[level].size(); j++) { + FileMetaData* f = vstorage->files_[level][j]; if (f->fd.GetNumber() == number) { found = true; break; @@ -2882,8 +2907,8 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { // look for this file in the current version bool found = false; - for (unsigned int j = 0; j < version->files_[level].size(); j++) { - FileMetaData* f = version->files_[level][j]; + for (unsigned int j = 0; j < vstorage->files_[level].size(); j++) { + FileMetaData* f = vstorage->files_[level][j]; if (f->fd.GetNumber() == number) { found = true; break; @@ -2902,8 +2927,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, ColumnFamilyData** cfd) { for (auto cfd_iter : *column_family_set_) { Version* version = cfd_iter->current(); - for (int level = 0; level < version->NumberLevels(); level++) { - for (const auto& file : version->files_[level]) { + auto* vstorage = version->GetStorageInfo(); + for (int level = 0; level < vstorage->NumberLevels(); level++) { + for (const auto& file : vstorage->LevelFiles(level)) { if (file->fd.GetNumber() == number) { *meta = file; *filelevel = level; @@ -2918,8 +2944,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (auto cfd : *column_family_set_) { + auto* files = cfd->current()->GetFiles(); for (int level = 0; level < cfd->NumberLevels(); level++) { - for (const auto& file : cfd->current()->files_[level]) { + for (const auto& file : files[level]) { LiveFileMetaData filemetadata; filemetadata.column_family_name = cfd->GetName(); uint32_t path_id = file->fd.GetPathId(); diff --git a/db/version_set.h b/db/version_set.h index 5a11a2f1c..98ce172e3 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -38,7 +38,9 @@ namespace rocksdb { -namespace log { class Writer; } +namespace log { +class Writer; +} class Compaction; class Iterator; @@ -81,45 +83,45 @@ extern void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level, const std::vector& files, Arena* arena); -class Version { +class VersionStorageInfo { public: - // Append to *iters a sequence of iterators that will - // yield the contents of this Version when merged together. - // REQUIRES: This version has been saved (see VersionSet::SaveTo) - void AddIterators(const ReadOptions&, const EnvOptions& soptions, - MergeIteratorBuilder* merger_iter_builder); + VersionStorageInfo(const InternalKeyComparator* internal_comparator, + const Comparator* user_comparator, int num_levels, + CompactionStyle compaction_style, + VersionStorageInfo* src_vstorage); + ~VersionStorageInfo(); - // Lookup the value for key. If found, store it in *val and - // return OK. Else return a non-OK status. - // Uses *operands to store merge_operator operations to apply later - // REQUIRES: lock is not held - void Get(const ReadOptions&, const LookupKey& key, std::string* val, - Status* status, MergeContext* merge_context, - bool* value_found = nullptr); + void SetFinalized() { finalized_ = true; } + + // Update num_non_empty_levels_. + void UpdateNumNonEmptyLevels(); + + void GenerateFileIndexer() { + file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_); + } + + // Update the accumulated stats from a file-meta. + void UpdateAccumulatedStats(FileMetaData* file_meta); + + void ComputeCompensatedSizes(); // Updates internal structures that keep track of compaction scores // We use compaction scores to figure out which compaction to do next // REQUIRES: If Version is not yet saved to current_, it can be called without // a lock. Once a version is saved to current_, call only with mutex held + // TODO find a better way to pass compaction_options_fifo. void ComputeCompactionScore( const MutableCFOptions& mutable_cf_options, + const CompactionOptionsFIFO& compaction_options_fifo, std::vector& size_being_compacted); // Generate level_files_brief_ from files_ void GenerateLevelFilesBrief(); + // Sort all files for this version based on their file size and + // record results in files_by_size_. The largest files are listed first. + void UpdateFilesBySize(); - // Update scores, pre-calculated variables. It needs to be called before - // applying the version to the version set. - void PrepareApply( - const MutableCFOptions& mutable_cf_options, - std::vector& size_being_compacted); - - // Reference count management (so Versions do not disappear out from - // under live iterators) - void Ref(); - // Decrease reference count. Delete the object if no reference left - // and return true. Otherwise, return false. - bool Unref(); + int MaxInputLevel() const; // Returns true iff some level needs a compaction. bool NeedsCompaction() const; @@ -137,34 +139,30 @@ class Version { double CompactionScore(int idx) const { return compaction_score_[idx]; } void GetOverlappingInputs( - int level, - const InternalKey* begin, // nullptr means before all keys - const InternalKey* end, // nullptr means after all keys + int level, const InternalKey* begin, // nullptr means before all keys + const InternalKey* end, // nullptr means after all keys std::vector* inputs, - int hint_index = -1, // index of overlap file - int* file_index = nullptr); // return index of overlap file + int hint_index = -1, // index of overlap file + int* file_index = nullptr); // return index of overlap file void GetOverlappingInputsBinarySearch( - int level, - const Slice& begin, // nullptr means before all keys - const Slice& end, // nullptr means after all keys + int level, const Slice& begin, // nullptr means before all keys + const Slice& end, // nullptr means after all keys std::vector* inputs, - int hint_index, // index of overlap file - int* file_index); // return index of overlap file + int hint_index, // index of overlap file + int* file_index); // return index of overlap file void ExtendOverlappingInputs( - int level, - const Slice& begin, // nullptr means before all keys - const Slice& end, // nullptr means after all keys + int level, const Slice& begin, // nullptr means before all keys + const Slice& end, // nullptr means after all keys std::vector* inputs, - unsigned int index); // start extending from this index + unsigned int index); // start extending from this index // Returns true iff some file in the specified level overlaps // some part of [*smallest_user_key,*largest_user_key]. // smallest_user_key==NULL represents a key smaller than all keys in the DB. // largest_user_key==NULL represents a key largest than all keys in the DB. - bool OverlapInLevel(int level, - const Slice* smallest_user_key, + bool OverlapInLevel(int level, const Slice* smallest_user_key, const Slice* largest_user_key); // Returns true iff the first or last file in inputs contains @@ -174,7 +172,6 @@ class Version { bool HasOverlappingUserKey(const std::vector* inputs, int level); - // Return the level at which we should place a new memtable compaction // result that covers the range [smallest_user_key,largest_user_key]. int PickLevelForMemTableOutput(const MutableCFOptions& mutable_cf_options, @@ -198,6 +195,47 @@ class Version { // Return the combined file size of all files at the specified level. uint64_t NumLevelBytes(int level) const; + // REQUIRES: This version has been saved (see VersionSet::SaveTo) + const std::vector& LevelFiles(int level) const { + assert(finalized_); + return files_[level]; + } + + const rocksdb::LevelFilesBrief& LevelFilesBrief(int level) const { + return level_files_brief_[level]; + } + + // REQUIRES: This version has been saved (see VersionSet::SaveTo) + const std::vector& FilesBySize(int level) const { + assert(finalized_); + return files_by_size_[level]; + } + + // REQUIRES: lock is held + // Set the index that is used to offset into files_by_size_ to find + // the next compaction candidate file. + void SetNextCompactionIndex(int level, int index) { + next_file_to_compact_by_size_[level] = index; + } + + // REQUIRES: lock is held + int NextCompactionIndex(int level) const { + return next_file_to_compact_by_size_[level]; + } + + // REQUIRES: This version has been saved (see VersionSet::SaveTo) + const FileIndexer& GetIndexer() const { + assert(finalized_); + return file_indexer_; + } + + // Only the first few entries of files_by_size_ are sorted. + // There is no need to sort all the files because it is likely + // that on a running system, we need to look at only the first + // few largest files because a new version is created every few + // seconds/minutes (because of concurrent compactions). + static const size_t kNumberFilesToSort = 50; + // Return a human-readable short (single-line) summary of the number // of files per level. Uses *scratch as backing store. struct LevelSummaryStorage { @@ -211,31 +249,146 @@ class Version { // in a specified level. Uses *scratch as backing store. const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; + std::vector* GetFiles() { return files_; } + // Return the maximum overlapping data (in bytes) at next level for any // file at a level >= 1. int64_t MaxNextLevelOverlappingBytes(); - // Add all files listed in the current version to *live. - void AddLiveFiles(std::vector* live); - // Return a human readable string that describes this version's contents. std::string DebugString(bool hex = false) const; - // Returns the version nuber of this version - uint64_t GetVersionNumber() const { return version_number_; } - uint64_t GetAverageValueSize() const { if (accumulated_num_non_deletions_ == 0) { return 0; } assert(accumulated_raw_key_size_ + accumulated_raw_value_size_ > 0); assert(accumulated_file_size_ > 0); - return accumulated_raw_value_size_ / - accumulated_num_non_deletions_ * + return accumulated_raw_value_size_ / accumulated_num_non_deletions_ * accumulated_file_size_ / (accumulated_raw_key_size_ + accumulated_raw_value_size_); } + uint64_t GetEstimatedActiveKeys(); + + // re-initializes the index that is used to offset into files_by_size_ + // to find the next compaction candidate file. + void ResetNextCompactionIndex(int level) { + next_file_to_compact_by_size_[level] = 0; + } + + private: + const InternalKeyComparator* internal_comparator_; + const Comparator* user_comparator_; + int num_levels_; // Number of levels + int num_non_empty_levels_; // Number of levels. Any level larger than it + // is guaranteed to be empty. + // A short brief metadata of files per level + autovector level_files_brief_; + FileIndexer file_indexer_; + Arena arena_; // Used to allocate space for file_levels_ + + CompactionStyle compaction_style_; + + // List of files per level, files in each level are arranged + // in increasing order of keys + std::vector* files_; + + // A list for the same set of files that are stored in files_, + // but files in each level are now sorted based on file + // size. The file with the largest size is at the front. + // This vector stores the index of the file from files_. + std::vector> files_by_size_; + + // An index into files_by_size_ that specifies the first + // file that is not yet compacted + std::vector next_file_to_compact_by_size_; + + // Only the first few entries of files_by_size_ are sorted. + // There is no need to sort all the files because it is likely + // that on a running system, we need to look at only the first + // few largest files because a new version is created every few + // seconds/minutes (because of concurrent compactions). + static const size_t number_of_files_to_sort_ = 50; + + // Level that should be compacted next and its compaction score. + // Score < 1 means compaction is not strictly needed. These fields + // are initialized by Finalize(). + // The most critical level to be compacted is listed first + // These are used to pick the best compaction level + std::vector compaction_score_; + std::vector compaction_level_; + double max_compaction_score_ = 0.0; // max score in l1 to ln-1 + int max_compaction_score_level_ = 0; // level on which max score occurs + + // the following are the sampled temporary stats. + // the current accumulated size of sampled files. + uint64_t accumulated_file_size_; + // the current accumulated size of all raw keys based on the sampled files. + uint64_t accumulated_raw_key_size_; + // the current accumulated size of all raw keys based on the sampled files. + uint64_t accumulated_raw_value_size_; + // total number of non-deletion entries + uint64_t accumulated_num_non_deletions_; + // total number of deletion entries + uint64_t accumulated_num_deletions_; + // the number of samples + uint64_t num_samples_; + + bool finalized_; + + friend class Version; + friend class VersionSet; + friend class DBImpl; + friend class InternalStats; + // No copying allowed + VersionStorageInfo(const VersionStorageInfo&) = delete; + void operator=(const VersionStorageInfo&) = delete; +}; + +class Version { + public: + // Append to *iters a sequence of iterators that will + // yield the contents of this Version when merged together. + // REQUIRES: This version has been saved (see VersionSet::SaveTo) + void AddIterators(const ReadOptions&, const EnvOptions& soptions, + MergeIteratorBuilder* merger_iter_builder); + + // Lookup the value for key. If found, store it in *val and + // return OK. Else return a non-OK status. + // Uses *operands to store merge_operator operations to apply later + // REQUIRES: lock is not held + void Get(const ReadOptions&, const LookupKey& key, std::string* val, + Status* status, MergeContext* merge_context, + bool* value_found = nullptr); + + // Update scores, pre-calculated variables. It needs to be called before + // applying the version to the version set. + void PrepareApply(const MutableCFOptions& mutable_cf_options, + std::vector& size_being_compacted); + + // Reference count management (so Versions do not disappear out from + // under live iterators) + void Ref(); + // Decrease reference count. Delete the object if no reference left + // and return true. Otherwise, return false. + bool Unref(); + + std::vector* GetFiles() { return vstorage_.GetFiles(); } + + // Add all files listed in the current version to *live. + void AddLiveFiles(std::vector* live); + + // Return a human readable string that describes this version's contents. + std::string DebugString(bool hex = false) const; + + // Returns the version nuber of this version + uint64_t GetVersionNumber() const { return version_number_; } + + uint64_t GetAverageValueSize() const { + return vstorage_.GetAverageValueSize(); + } + // REQUIRES: lock is held // On success, "tp" will contains the table properties of the file // specified in "file_meta". If the file name of "file_meta" is @@ -251,77 +404,40 @@ class Version { // tables' propertis, represented as shared_ptr. Status GetPropertiesOfAllTables(TablePropertiesCollection* props); - uint64_t GetEstimatedActiveKeys(); + uint64_t GetEstimatedActiveKeys() { + return vstorage_.GetEstimatedActiveKeys(); + } size_t GetMemoryUsageByTableReaders(); ColumnFamilyData* cfd() const { return cfd_; } - // REQUIRES: This version has been saved (see VersionSet::SaveTo) - const std::vector& LevelFiles(int level) const { - assert(finalized_); - return files_[level]; - } - - // REQUIRES: This version has been saved (see VersionSet::SaveTo) - const std::vector& FilesBySize(int level) const { - assert(finalized_); - return files_by_size_[level]; - } - - const LevelFilesBrief& GetLevelFilesBrief(int level) const { - return level_files_brief_[level]; - } - - // REQUIRES: lock is held - // Set the index that is used to offset into files_by_size_ to find - // the next compaction candidate file. - void SetNextCompactionIndex(int level, int index) { - next_file_to_compact_by_size_[level] = index; - } - - // REQUIRES: lock is held - int NextCompactionIndex(int level) const { - return next_file_to_compact_by_size_[level]; - } - - // Only the first few entries of files_by_size_ are sorted. - // There is no need to sort all the files because it is likely - // that on a running system, we need to look at only the first - // few largest files because a new version is created every few - // seconds/minutes (because of concurrent compactions). - static const size_t kNumberFilesToSort = 50; // Return the next Version in the linked list. Used for debug only Version* TEST_Next() const { return next_; } - // REQUIRES: This version has been saved (see VersionSet::SaveTo) - const FileIndexer& GetIndexer() const { - assert(finalized_); - return file_indexer_; - } + VersionStorageInfo* GetStorageInfo() { return &vstorage_; } private: friend class VersionSet; - friend class DBImpl; - friend class InternalStats; + + const InternalKeyComparator* GetInternalComparator() const { + return vstorage_.internal_comparator_; + } + const Comparator* GetUserComparator() const { + return vstorage_.user_comparator_; + } bool PrefixMayMatch(const ReadOptions& read_options, Iterator* level_iter, const Slice& internal_prefix) const; - // Update num_non_empty_levels_. - void UpdateNumNonEmptyLevels(); - // The helper function of UpdateAccumulatedStats, which may fill the missing // fields of file_mata from its associated TableProperties. // Returns true if it does initialize FileMetaData. bool MaybeInitializeFileMetaData(FileMetaData* file_meta); - // Update the accumulated stats from a file-meta. - void UpdateAccumulatedStats(FileMetaData* file_meta); - // Update the accumulated stats associated with the current version. // This accumulated stats will be used in compaction. void UpdateAccumulatedStats(); @@ -330,74 +446,26 @@ class Version { // record results in files_by_size_. The largest files are listed first. void UpdateFilesBySize(); + VersionSet* GetVersionSet() { return vset_; } + ColumnFamilyData* cfd_; // ColumnFamilyData to which this Version belongs - const InternalKeyComparator* internal_comparator_; - const Comparator* user_comparator_; + Logger* info_log_; + Statistics* db_statistics_; TableCache* table_cache_; const MergeOperator* merge_operator_; - // A short brief metadata of files per level - autovector level_files_brief_; - Logger* info_log_; - Statistics* db_statistics_; - int num_levels_; // Number of levels - int num_non_empty_levels_; // Number of levels. Any level larger than it - // is guaranteed to be empty. - FileIndexer file_indexer_; + VersionStorageInfo vstorage_; VersionSet* vset_; // VersionSet to which this Version belongs - Arena arena_; // Used to allocate space for level_files_brief_ Version* next_; // Next version in linked list Version* prev_; // Previous version in linked list int refs_; // Number of live refs to this version - // List of files per level, files in each level are arranged - // in increasing order of keys - std::vector* files_; - - // A list for the same set of files that are stored in files_, - // but files in each level are now sorted based on file - // size. The file with the largest size is at the front. - // This vector stores the index of the file from files_. - std::vector> files_by_size_; - - // An index into files_by_size_ that specifies the first - // file that is not yet compacted - std::vector next_file_to_compact_by_size_; - - // Level that should be compacted next and its compaction score. - // Score < 1 means compaction is not strictly needed. These fields - // are initialized by Finalize(). - // The most critical level to be compacted is listed first - // These are used to pick the best compaction level - std::vector compaction_score_; - std::vector compaction_level_; - double max_compaction_score_ = 0.0; // max score in l1 to ln-1 - int max_compaction_score_level_ = 0; // level on which max score occurs - // A version number that uniquely represents this version. This is // used for debugging and logging purposes only. uint64_t version_number_; Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0); - // the following are the sampled temporary stats. - // the current accumulated size of sampled files. - uint64_t accumulated_file_size_; - // the current accumulated size of all raw keys based on the sampled files. - uint64_t accumulated_raw_key_size_; - // the current accumulated size of all raw keys based on the sampled files. - uint64_t accumulated_raw_value_size_; - // total number of non-deletion entries - uint64_t accumulated_num_non_deletions_; - // total number of deletion entries - uint64_t accumulated_num_deletions_; - // the number of samples - uint64_t num_samples_; - - // Used to assert APIs that are only safe to use after the version - // is finalized - bool finalized_; - ~Version(); // No copying allowed @@ -418,13 +486,12 @@ class VersionSet { // column_family_options has to be set if edit is column family add // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() - Status LogAndApply(ColumnFamilyData* column_family_data, - const MutableCFOptions& mutable_cf_options, - VersionEdit* edit, - port::Mutex* mu, Directory* db_directory = nullptr, - bool new_descriptor_log = false, - const ColumnFamilyOptions* column_family_options = - nullptr); + Status LogAndApply( + ColumnFamilyData* column_family_data, + const MutableCFOptions& mutable_cf_options, VersionEdit* edit, + port::Mutex* mu, Directory* db_directory = nullptr, + bool new_descriptor_log = false, + const ColumnFamilyOptions* column_family_options = nullptr); // Recover the last saved descriptor from persistent storage. // If read_only == true, Recover() will not complain if some column families @@ -530,8 +597,7 @@ class VersionSet { Status GetMetadataForFile(uint64_t number, int* filelevel, FileMetaData** metadata, ColumnFamilyData** cfd); - void GetLiveFilesMetaData( - std::vector *metadata); + void GetLiveFilesMetaData(std::vector* metadata); void GetObsoleteFiles(std::vector* files); diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 70f0c6a94..3ff31359b 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1125,7 +1125,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, int max = -1; auto default_cfd = versions.GetColumnFamilySet()->GetDefault(); for (int i = 0; i < default_cfd->NumberLevels(); i++) { - if (default_cfd->current()->NumLevelFiles(i)) { + if (default_cfd->current()->GetStorageInfo()->NumLevelFiles(i)) { max = i; } } diff --git a/utilities/compacted_db/compacted_db_impl.cc b/utilities/compacted_db/compacted_db_impl.cc index a253153ae..455b312fa 100644 --- a/utilities/compacted_db/compacted_db_impl.cc +++ b/utilities/compacted_db/compacted_db_impl.cc @@ -104,28 +104,29 @@ Status CompactedDBImpl::Init(const Options& options) { } version_ = cfd_->GetSuperVersion()->current; user_comparator_ = cfd_->user_comparator(); - const LevelFilesBrief& l0 = version_->GetLevelFilesBrief(0); + auto* vstorage = version_->GetStorageInfo(); + const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0); // L0 should not have files if (l0.num_files > 1) { return Status::NotSupported("L0 contain more than 1 file"); } if (l0.num_files == 1) { - if (version_->NumNonEmptyLevels() > 1) { + if (vstorage->NumNonEmptyLevels() > 1) { return Status::NotSupported("Both L0 and other level contain files"); } files_ = l0; return Status::OK(); } - for (int i = 1; i < version_->NumNonEmptyLevels() - 1; ++i) { - if (version_->GetLevelFilesBrief(i).num_files > 0) { + for (int i = 1; i < vstorage->NumNonEmptyLevels() - 1; ++i) { + if (vstorage->LevelFilesBrief(i).num_files > 0) { return Status::NotSupported("Other levels also contain files"); } } - int level = version_->NumNonEmptyLevels() - 1; - if (version_->GetLevelFilesBrief(level).num_files > 0) { - files_ = version_->GetLevelFilesBrief(level); + int level = vstorage->NumNonEmptyLevels() - 1; + if (vstorage->LevelFilesBrief(level).num_files > 0) { + files_ = vstorage->LevelFilesBrief(level); return Status::OK(); } return Status::NotSupported("no file exists");