diff --git a/CMakeLists.txt b/CMakeLists.txt index 428e23843..7cd9c1ca0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -291,6 +291,7 @@ set(SOURCES db/compaction_iterator.cc db/compaction_job.cc db/compaction_picker.cc + db/compaction_picker_universal.cc db/convenience.cc db/db_filesnapshot.cc db/db_impl.cc diff --git a/db/column_family.cc b/db/column_family.cc index f3f61da23..610111a69 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -20,6 +20,7 @@ #include #include "db/compaction_picker.h" +#include "db/compaction_picker_universal.h" #include "db/db_impl.h" #include "db/internal_stats.h" #include "db/job_context.h" diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 7c2e8d123..6ffb00fd3 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -36,71 +36,6 @@ uint64_t TotalCompensatedFileSize(const std::vector& files) { } return sum; } - -// Universal compaction is not supported in ROCKSDB_LITE -#ifndef ROCKSDB_LITE - -// Used in universal compaction when trivial move is enabled. -// This structure is used for the construction of min heap -// that contains the file meta data, the level of the file -// and the index of the file in that level - -struct InputFileInfo { - InputFileInfo() : f(nullptr) {} - - FileMetaData* f; - size_t level; - size_t index; -}; - -// Used in universal compaction when trivial move is enabled. -// This comparator is used for the construction of min heap -// based on the smallest key of the file. -struct UserKeyComparator { - explicit UserKeyComparator(const Comparator* ucmp) { ucmp_ = ucmp; } - - bool operator()(InputFileInfo i1, InputFileInfo i2) const { - return (ucmp_->Compare(i1.f->smallest.user_key(), - i2.f->smallest.user_key()) > 0); - } - - private: - const Comparator* ucmp_; -}; - -typedef std::priority_queue, - UserKeyComparator> - SmallestKeyHeap; - -// This function creates the heap that is used to find if the files are -// overlapping during universal compaction when the allow_trivial_move -// is set. -SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) { - SmallestKeyHeap smallest_key_priority_q = - SmallestKeyHeap(UserKeyComparator(ucmp)); - - InputFileInfo input_file; - - for (size_t l = 0; l < c->num_input_levels(); l++) { - if (c->num_input_files(l) != 0) { - if (l == 0 && c->start_level() == 0) { - for (size_t i = 0; i < c->num_input_files(0); i++) { - input_file.f = c->input(0, i); - input_file.level = 0; - input_file.index = i; - smallest_key_priority_q.push(std::move(input_file)); - } - } else { - input_file.f = c->input(l, 0); - input_file.level = l; - input_file.index = 0; - smallest_key_priority_q.push(std::move(input_file)); - } - } - } - return smallest_key_priority_q; -} -#endif // !ROCKSDB_LITE } // anonymous namespace // Determine compression type, based on user options, level of the output @@ -231,7 +166,7 @@ void CompactionPicker::GetRange(const std::vector& inputs, assert(initialized); } -bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, +bool CompactionPicker::ExpandInputsToCleanCut(const std::string& cf_name, VersionStorageInfo* vstorage, CompactionInputFiles* inputs) { // This isn't good compaction @@ -265,7 +200,7 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, // If, after the expansion, there are files that are already under // compaction, then we must drop/cancel this compaction. - if (FilesInCompaction(inputs->files)) { + if (AreFilesInCompaction(inputs->files)) { ROCKS_LOG_WARN( ioptions_.info_log, "[%s] ExpandWhileOverlapping() failure because some of the necessary" @@ -313,7 +248,7 @@ bool CompactionPicker::FilesRangeOverlapWithCompaction( } // Returns true if any one of specified files are being compacted -bool CompactionPicker::FilesInCompaction( +bool CompactionPicker::AreFilesInCompaction( const std::vector& files) { for (size_t i = 0; i < files.size(); i++) { if (files[i]->being_compacted) { @@ -323,7 +258,7 @@ bool CompactionPicker::FilesInCompaction( return false; } -Compaction* CompactionPicker::FormCompaction( +Compaction* CompactionPicker::CompactFiles( const CompactionOptions& compact_options, const std::vector& input_files, int output_level, VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options, @@ -402,16 +337,16 @@ Status CompactionPicker::GetCompactionInputsFromFileNumbers( } // Returns true if any one of the parent files are being compacted -bool CompactionPicker::RangeInCompaction(VersionStorageInfo* vstorage, - const InternalKey* smallest, - const InternalKey* largest, int level, - int* level_index) { +bool CompactionPicker::IsRangeInCompaction(VersionStorageInfo* vstorage, + const InternalKey* smallest, + const InternalKey* largest, + int level, int* level_index) { std::vector inputs; assert(level < NumberLevels()); vstorage->GetOverlappingInputs(level, smallest, largest, &inputs, *level_index, level_index); - return FilesInCompaction(inputs); + return AreFilesInCompaction(inputs); } // Populates the set of inputs of all other levels that overlap with the @@ -451,11 +386,11 @@ bool CompactionPicker::SetupOtherInputs( vstorage->GetOverlappingInputs(output_level, &smallest, &largest, &output_level_inputs->files, *parent_index, parent_index); - if (FilesInCompaction(output_level_inputs->files)) { + if (AreFilesInCompaction(output_level_inputs->files)) { return false; } if (!output_level_inputs->empty()) { - if (!ExpandWhileOverlapping(cf_name, vstorage, output_level_inputs)) { + if (!ExpandInputsToCleanCut(cf_name, vstorage, output_level_inputs)) { return false; } } @@ -482,12 +417,12 @@ bool CompactionPicker::SetupOtherInputs( &expanded_inputs.files, base_index, nullptr); uint64_t expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files); - if (!ExpandWhileOverlapping(cf_name, vstorage, &expanded_inputs)) { + if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) { try_overlapping_inputs = false; } if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() && output_level_inputs_size + expanded_inputs_size < limit && - !FilesInCompaction(expanded_inputs.files)) { + !AreFilesInCompaction(expanded_inputs.files)) { InternalKey new_start, new_limit; GetRange(expanded_inputs, &new_start, &new_limit); CompactionInputFiles expanded_output_level_inputs; @@ -496,8 +431,8 @@ bool CompactionPicker::SetupOtherInputs( &expanded_output_level_inputs.files, *parent_index, parent_index); assert(!expanded_output_level_inputs.empty()); - if (!FilesInCompaction(expanded_output_level_inputs.files) && - ExpandWhileOverlapping(cf_name, vstorage, + if (!AreFilesInCompaction(expanded_output_level_inputs.files) && + ExpandInputsToCleanCut(cf_name, vstorage, &expanded_output_level_inputs) && expanded_output_level_inputs.size() == output_level_inputs->size()) { expand_inputs = true; @@ -510,7 +445,7 @@ bool CompactionPicker::SetupOtherInputs( expanded_inputs_size = TotalCompensatedFileSize(expanded_inputs.files); if (expanded_inputs.size() > inputs->size() && output_level_inputs_size + expanded_inputs_size < limit && - !FilesInCompaction(expanded_inputs.files)) { + !AreFilesInCompaction(expanded_inputs.files)) { expand_inputs = true; } } @@ -588,7 +523,7 @@ Compaction* CompactionPicker::CompactRange( for (FileMetaData* f : vstorage->LevelFiles(level)) { files.push_back(f); } - if (FilesInCompaction(files)) { + if (AreFilesInCompaction(files)) { *manual_conflict = true; return nullptr; } @@ -657,7 +592,7 @@ Compaction* CompactionPicker::CompactRange( } assert(output_path_id < static_cast(ioptions_.db_paths.size())); - if (ExpandWhileOverlapping(cf_name, vstorage, &inputs) == false) { + if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs) == false) { // manual compaction is now multi-threaded, so it can // happen that ExpandWhileOverlapping fails // we handle it higher in RunManualCompaction @@ -693,7 +628,7 @@ Compaction* CompactionPicker::CompactRange( compaction_inputs.push_back(output_level_inputs); } for (size_t i = 0; i < compaction_inputs.size(); i++) { - if (FilesInCompaction(compaction_inputs[i].files)) { + if (AreFilesInCompaction(compaction_inputs[i].files)) { *manual_conflict = true; return nullptr; } @@ -993,10 +928,89 @@ bool LevelCompactionPicker::NeedsCompaction( return false; } -void LevelCompactionPicker::PickFilesMarkedForCompactionExperimental( - const std::string& cf_name, VersionStorageInfo* vstorage, - CompactionInputFiles* inputs, int* level, int* output_level) { - if (vstorage->FilesMarkedForCompaction().empty()) { +namespace { +// A class to build a leveled compaction step-by-step. +class LevelCompactionBuilder { + public: + LevelCompactionBuilder(const std::string& cf_name, + VersionStorageInfo* vstorage, + CompactionPicker* compaction_picker, + LogBuffer* log_buffer, + const MutableCFOptions& mutable_cf_options, + const ImmutableCFOptions& ioptions) + : cf_name_(cf_name), + vstorage_(vstorage), + compaction_picker_(compaction_picker), + log_buffer_(log_buffer), + mutable_cf_options_(mutable_cf_options), + ioptions_(ioptions) {} + + // Pick and return a compaction. + Compaction* PickCompaction(); + + // Pick the initial files to compact to the next level. (or together + // in Intra-L0 compactions) + void SetupInitialFiles(); + + // If the initial files are from L0 level, pick other L0 + // files if needed. + bool SetupOtherL0FilesIfNeeded(); + + // Based on initial files, setup other files need to be compacted + // in this compaction, accordingly. + bool SetupOtherInputsIfNeeded(); + + Compaction* GetCompaction(); + + // For the specfied level, pick a file that we want to compact. + // Returns false if there is no file to compact. + // If it returns true, inputs->files.size() will be exactly one. + // If level is 0 and there is already a compaction on that level, this + // function will return false. + bool PickFileToCompact(); + + // For L0->L0, picks the longest span of files that aren't currently + // undergoing compaction for which work-per-deleted-file decreases. The span + // always starts from the newest L0 file. + // + // Intra-L0 compaction is independent of all other files, so it can be + // performed even when L0->base_level compactions are blocked. + // + // Returns true if `inputs` is populated with a span of files to be compacted; + // otherwise, returns false. + bool PickIntraL0Compaction(); + + // If there is any file marked for compaction, put put it into inputs. + void PickFilesMarkedForCompaction(); + + const std::string& cf_name_; + VersionStorageInfo* vstorage_; + CompactionPicker* compaction_picker_; + LogBuffer* log_buffer_; + int start_level_ = -1; + int output_level_ = -1; + int parent_index_ = -1; + int base_index_ = -1; + double start_level_score_ = 0; + bool is_manual_ = false; + CompactionInputFiles start_level_inputs_; + std::vector compaction_inputs_; + CompactionInputFiles output_level_inputs_; + std::vector grandparents_; + CompactionReason compaction_reason_ = CompactionReason::kUnknown; + + const MutableCFOptions& mutable_cf_options_; + const ImmutableCFOptions& ioptions_; + // Pick a path ID to place a newly generated file, with its level + static uint32_t GetPathId(const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + int level); + + static const int kMinFilesForIntraL0Compaction = 4; +}; + +void LevelCompactionBuilder::PickFilesMarkedForCompaction() { + if (vstorage_->FilesMarkedForCompaction().empty()) { return; } @@ -1005,79 +1019,74 @@ void LevelCompactionPicker::PickFilesMarkedForCompactionExperimental( // If this assert() fails that means that some function marked some // files as being_compacted, but didn't call ComputeCompactionScore() assert(!level_file.second->being_compacted); - *level = level_file.first; - *output_level = (*level == 0) ? vstorage->base_level() : *level + 1; + start_level_ = level_file.first; + output_level_ = + (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1; - if (*level == 0 && !level0_compactions_in_progress_.empty()) { + if (start_level_ == 0 && + !compaction_picker_->level0_compactions_in_progress()->empty()) { return false; } - inputs->files = {level_file.second}; - inputs->level = *level; - return ExpandWhileOverlapping(cf_name, vstorage, inputs); + start_level_inputs_.files = {level_file.second}; + start_level_inputs_.level = start_level_; + return compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &start_level_inputs_); }; // take a chance on a random file first - Random64 rnd(/* seed */ reinterpret_cast(vstorage)); + Random64 rnd(/* seed */ reinterpret_cast(vstorage_)); size_t random_file_index = static_cast(rnd.Uniform( - static_cast(vstorage->FilesMarkedForCompaction().size()))); + static_cast(vstorage_->FilesMarkedForCompaction().size()))); - if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) { + if (continuation(vstorage_->FilesMarkedForCompaction()[random_file_index])) { // found the compaction! return; } - for (auto& level_file : vstorage->FilesMarkedForCompaction()) { + for (auto& level_file : vstorage_->FilesMarkedForCompaction()) { if (continuation(level_file)) { // found the compaction! return; } } - inputs->files.clear(); + start_level_inputs_.files.clear(); } -Compaction* LevelCompactionPicker::PickCompaction( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, LogBuffer* log_buffer) { - int level = -1; - int output_level = -1; - int parent_index = -1; - int base_index = -1; - CompactionInputFiles inputs; - double score = 0; - CompactionReason compaction_reason = CompactionReason::kUnknown; - +void LevelCompactionBuilder::SetupInitialFiles() { // Find the compactions by size on all levels. bool skipped_l0_to_base = false; - for (int i = 0; i < NumberLevels() - 1; i++) { - score = vstorage->CompactionScore(i); - level = vstorage->CompactionScoreLevel(i); - assert(i == 0 || score <= vstorage->CompactionScore(i - 1)); - if (score >= 1) { - if (skipped_l0_to_base && level == vstorage->base_level()) { + for (int i = 0; i < compaction_picker_->NumberLevels() - 1; i++) { + start_level_score_ = vstorage_->CompactionScore(i); + start_level_ = vstorage_->CompactionScoreLevel(i); + assert(i == 0 || start_level_score_ <= vstorage_->CompactionScore(i - 1)); + if (start_level_score_ >= 1) { + if (skipped_l0_to_base && start_level_ == vstorage_->base_level()) { // If L0->base_level compaction is pending, don't schedule further // compaction from base level. Otherwise L0->base_level compaction // may starve. continue; } - output_level = (level == 0) ? vstorage->base_level() : level + 1; - if (PickCompactionBySize(vstorage, level, output_level, &inputs, - &parent_index, &base_index) && - ExpandWhileOverlapping(cf_name, vstorage, &inputs) && - !FilesRangeOverlapWithCompaction({inputs}, output_level)) { + output_level_ = + (start_level_ == 0) ? vstorage_->base_level() : start_level_ + 1; + if (PickFileToCompact() && + compaction_picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &start_level_inputs_) && + !compaction_picker_->FilesRangeOverlapWithCompaction( + {start_level_inputs_}, output_level_)) { // found the compaction! - if (level == 0) { + if (start_level_ == 0) { // L0 score = `num L0 files` / `level0_file_num_compaction_trigger` - compaction_reason = CompactionReason::kLevelL0FilesNum; + compaction_reason_ = CompactionReason::kLevelL0FilesNum; } else { // L1+ score = `Level files size` / `MaxBytesForLevel` - compaction_reason = CompactionReason::kLevelMaxLevelSize; + compaction_reason_ = CompactionReason::kLevelMaxLevelSize; } break; } else { // didn't find the compaction, clear the inputs - inputs.clear(); - if (level == 0) { + start_level_inputs_.clear(); + if (start_level_ == 0) { skipped_l0_to_base = true; // L0->base_level may be blocked due to ongoing L0->base_level // compactions. It may also be blocked by an ongoing compaction from @@ -1086,9 +1095,9 @@ Compaction* LevelCompactionPicker::PickCompaction( // In these cases, to reduce L0 file count and thus reduce likelihood // of write stalls, we can attempt compacting a span of files within // L0. - if (PickIntraL0Compaction(vstorage, mutable_cf_options, &inputs)) { - output_level = 0; - compaction_reason = CompactionReason::kLevelL0FilesNum; + if (PickIntraL0Compaction()) { + output_level_ = 0; + compaction_reason_ = CompactionReason::kLevelL0FilesNum; break; } } @@ -1096,62 +1105,61 @@ Compaction* LevelCompactionPicker::PickCompaction( } } - bool is_manual = false; // if we didn't find a compaction, check if there are any files marked for // compaction - if (inputs.empty()) { - is_manual = true; - parent_index = base_index = -1; - PickFilesMarkedForCompactionExperimental(cf_name, vstorage, &inputs, &level, - &output_level); - if (!inputs.empty()) { - compaction_reason = CompactionReason::kFilesMarkedForCompaction; + if (start_level_inputs_.empty()) { + is_manual_ = true; + parent_index_ = base_index_ = -1; + PickFilesMarkedForCompaction(); + if (!start_level_inputs_.empty()) { + compaction_reason_ = CompactionReason::kFilesMarkedForCompaction; } } - if (inputs.empty()) { - return nullptr; - } - assert(level >= 0 && output_level >= 0); +} - // Two level 0 compaction won't run at the same time, so don't need to worry - // about files on level 0 being compacted. - if (level == 0 && output_level != 0) { - assert(level0_compactions_in_progress_.empty()); +bool LevelCompactionBuilder::SetupOtherL0FilesIfNeeded() { + if (start_level_ == 0 && output_level_ != 0) { + // Two level 0 compaction won't run at the same time, so don't need to worry + // about files on level 0 being compacted. + assert(compaction_picker_->level0_compactions_in_progress()->empty()); InternalKey smallest, largest; - GetRange(inputs, &smallest, &largest); + compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. - inputs.files.clear(); - vstorage->GetOverlappingInputs(0, &smallest, &largest, &inputs.files); + start_level_inputs_.files.clear(); + vstorage_->GetOverlappingInputs(0, &smallest, &largest, + &start_level_inputs_.files); // If we include more L0 files in the same compaction run it can // cause the 'smallest' and 'largest' key to get extended to a // larger range. So, re-invoke GetRange to get the new key range - GetRange(inputs, &smallest, &largest); - if (RangeInCompaction(vstorage, &smallest, &largest, output_level, - &parent_index)) { - return nullptr; + compaction_picker_->GetRange(start_level_inputs_, &smallest, &largest); + if (compaction_picker_->IsRangeInCompaction( + vstorage_, &smallest, &largest, output_level_, &parent_index_)) { + return false; } - assert(!inputs.files.empty()); } + assert(!start_level_inputs_.files.empty()); - std::vector compaction_inputs; - CompactionInputFiles output_level_inputs; - std::vector grandparents; + return true; +} + +bool LevelCompactionBuilder::SetupOtherInputsIfNeeded() { // Setup input files from output level. For output to L0, we only compact // spans of files that do not interact with any pending compactions, so don't // need to consider other levels. - if (output_level != 0) { - output_level_inputs.level = output_level; - if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs, - &output_level_inputs, &parent_index, base_index)) { + if (output_level_ != 0) { + output_level_inputs_.level = output_level_; + if (!compaction_picker_->SetupOtherInputs( + cf_name_, mutable_cf_options_, vstorage_, &start_level_inputs_, + &output_level_inputs_, &parent_index_, base_index_)) { return nullptr; } - compaction_inputs.push_back(inputs); - if (!output_level_inputs.empty()) { - compaction_inputs.push_back(output_level_inputs); + compaction_inputs_.push_back(start_level_inputs_); + if (!output_level_inputs_.empty()) { + compaction_inputs_.push_back(output_level_inputs_); } // In some edge cases we could pick a compaction that will be compacting @@ -1160,38 +1168,69 @@ Compaction* LevelCompactionPicker::PickCompaction( // (1) we are running a non-exclusive manual compaction // (2) AddFile ingest a new file into the LSM tree // We need to disallow this from happening. - if (FilesRangeOverlapWithCompaction(compaction_inputs, output_level)) { + if (compaction_picker_->FilesRangeOverlapWithCompaction(compaction_inputs_, + output_level_)) { // This compaction output could potentially conflict with the output // of a currently running compaction, we cannot run it. - return nullptr; + return false; } - GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents); + compaction_picker_->GetGrandparents(vstorage_, start_level_inputs_, + output_level_inputs_, &grandparents_); } else { - compaction_inputs.push_back(inputs); + compaction_inputs_.push_back(start_level_inputs_); + } + return true; +} + +Compaction* LevelCompactionBuilder::PickCompaction() { + // Pick up the first file to start compaction. It may have been extended + // to a clean cut. + SetupInitialFiles(); + if (start_level_inputs_.empty()) { + return nullptr; } + assert(start_level_ >= 0 && output_level_ >= 0); + + // If it is a L0 -> base level compaction, we need to set up other L0 + // files if needed. + if (!SetupOtherL0FilesIfNeeded()) { + return nullptr; + } + + // Pick files in the output level and expand more files in the start level + // if needed. + if (!SetupOtherInputsIfNeeded()) { + return nullptr; + } + + // Form a compaction object containing the files we picked. + Compaction* c = GetCompaction(); + + TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c); + + return c; +} +Compaction* LevelCompactionBuilder::GetCompaction() { auto c = new Compaction( - vstorage, ioptions_, mutable_cf_options, std::move(compaction_inputs), - output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), - mutable_cf_options.max_compaction_bytes, - GetPathId(ioptions_, mutable_cf_options, output_level), - GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, - vstorage->base_level()), - std::move(grandparents), is_manual, score, - false /* deletion_compaction */, compaction_reason); + vstorage_, ioptions_, mutable_cf_options_, std::move(compaction_inputs_), + output_level_, mutable_cf_options_.MaxFileSizeForLevel(output_level_), + mutable_cf_options_.max_compaction_bytes, + GetPathId(ioptions_, mutable_cf_options_, output_level_), + GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, + output_level_, vstorage_->base_level()), + std::move(grandparents_), is_manual_, start_level_score_, + false /* deletion_compaction */, compaction_reason_); // If it's level 0 compaction, make sure we don't execute any other level 0 // compactions in parallel - RegisterCompaction(c); + compaction_picker_->RegisterCompaction(c); // Creating a compaction influences the compaction score because the score // takes running compactions into account (by skipping files that are already // being compacted). Since we just changed compaction score, we recalculate it // here - vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options); - - TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c); - + vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_); return c; } @@ -1200,7 +1239,7 @@ Compaction* LevelCompactionPicker::PickCompaction( * Given a level, finds the path where levels up to it will fit in levels * up to and including this path */ -uint32_t LevelCompactionPicker::GetPathId( +uint32_t LevelCompactionBuilder::GetPathId( const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, int level) { uint32_t p = 0; @@ -1234,33 +1273,32 @@ uint32_t LevelCompactionPicker::GetPathId( return p; } -bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage, - int level, int output_level, - CompactionInputFiles* inputs, - int* parent_index, - int* base_index) { +bool LevelCompactionBuilder::PickFileToCompact() { // level 0 files are overlapping. So we cannot pick more // than one concurrent compactions at this level. This // could be made better by looking at key-ranges that are // being compacted at level 0. - if (level == 0 && !level0_compactions_in_progress_.empty()) { + if (start_level_ == 0 && + !compaction_picker_->level0_compactions_in_progress()->empty()) { TEST_SYNC_POINT("LevelCompactionPicker::PickCompactionBySize:0"); return false; } - inputs->clear(); + start_level_inputs_.clear(); - assert(level >= 0); + assert(start_level_ >= 0); // Pick the largest file in this level that is not already // being compacted - const std::vector& file_size = vstorage->FilesByCompactionPri(level); - const std::vector& level_files = vstorage->LevelFiles(level); + const std::vector& file_size = + vstorage_->FilesByCompactionPri(start_level_); + const std::vector& level_files = + vstorage_->LevelFiles(start_level_); // record the first file that is not yet compacted int nextIndex = -1; - for (unsigned int i = vstorage->NextCompactionIndex(level); + for (unsigned int i = vstorage_->NextCompactionIndex(start_level_); i < file_size.size(); i++) { int index = file_size[i]; auto* f = level_files[index]; @@ -1278,32 +1316,31 @@ bool LevelCompactionPicker::PickCompactionBySize(VersionStorageInfo* vstorage, // Do not pick this file if its parents at level+1 are being compacted. // Maybe we can avoid redoing this work in SetupOtherInputs - *parent_index = -1; - if (RangeInCompaction(vstorage, &f->smallest, &f->largest, output_level, - parent_index)) { + parent_index_ = -1; + if (compaction_picker_->IsRangeInCompaction(vstorage_, &f->smallest, + &f->largest, output_level_, + &parent_index_)) { continue; } - inputs->files.push_back(f); - inputs->level = level; - *base_index = index; + start_level_inputs_.files.push_back(f); + start_level_inputs_.level = start_level_; + base_index_ = index; break; } // store where to start the iteration in the next call to PickCompaction - vstorage->SetNextCompactionIndex(level, nextIndex); + vstorage_->SetNextCompactionIndex(start_level_, nextIndex); - return inputs->size() > 0; + return start_level_inputs_.size() > 0; } -bool LevelCompactionPicker::PickIntraL0Compaction( - VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options, - CompactionInputFiles* inputs) { - inputs->clear(); +bool LevelCompactionBuilder::PickIntraL0Compaction() { + start_level_inputs_.clear(); const std::vector& level_files = - vstorage->LevelFiles(0 /* level */); + vstorage_->LevelFiles(0 /* level */); if (level_files.size() < static_cast( - mutable_cf_options.level0_file_num_compaction_trigger + 2) || + mutable_cf_options_.level0_file_num_compaction_trigger + 2) || level_files[0]->being_compacted) { // If L0 isn't accumulating much files beyond the regular trigger, don't // resort to L0->L0 compaction yet. @@ -1327,658 +1364,25 @@ bool LevelCompactionPicker::PickIntraL0Compaction( } if (span_len >= kMinFilesForIntraL0Compaction) { - inputs->level = 0; + start_level_inputs_.level = 0; for (size_t i = 0; i < span_len; ++i) { - inputs->files.push_back(level_files[i]); + start_level_inputs_.files.push_back(level_files[i]); } return true; } return false; } - -#ifndef ROCKSDB_LITE -bool UniversalCompactionPicker::NeedsCompaction( - const VersionStorageInfo* vstorage) const { - const int kLevel0 = 0; - return vstorage->CompactionScore(kLevel0) >= 1; -} - -void UniversalCompactionPicker::SortedRun::Dump(char* out_buf, - size_t out_buf_size, - bool print_path) const { - if (level == 0) { - assert(file != nullptr); - if (file->fd.GetPathId() == 0 || !print_path) { - snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber()); - } else { - snprintf(out_buf, out_buf_size, "file %" PRIu64 - "(path " - "%" PRIu32 ")", - file->fd.GetNumber(), file->fd.GetPathId()); - } - } else { - snprintf(out_buf, out_buf_size, "level %d", level); - } -} - -void UniversalCompactionPicker::SortedRun::DumpSizeInfo( - char* out_buf, size_t out_buf_size, size_t sorted_run_count) const { - if (level == 0) { - assert(file != nullptr); - snprintf(out_buf, out_buf_size, - "file %" PRIu64 "[%" ROCKSDB_PRIszt - "] " - "with size %" PRIu64 " (compensated size %" PRIu64 ")", - file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(), - file->compensated_file_size); - } else { - snprintf(out_buf, out_buf_size, - "level %d[%" ROCKSDB_PRIszt - "] " - "with size %" PRIu64 " (compensated size %" PRIu64 ")", - level, sorted_run_count, size, compensated_file_size); - } -} - -std::vector -UniversalCompactionPicker::CalculateSortedRuns( - const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions) { - std::vector ret; - for (FileMetaData* f : vstorage.LevelFiles(0)) { - ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size, - f->being_compacted); - } - for (int level = 1; level < vstorage.num_levels(); level++) { - uint64_t total_compensated_size = 0U; - uint64_t total_size = 0U; - bool being_compacted = false; - bool is_first = true; - for (FileMetaData* f : vstorage.LevelFiles(level)) { - total_compensated_size += f->compensated_file_size; - total_size += f->fd.GetFileSize(); - if (ioptions.compaction_options_universal.allow_trivial_move == true) { - if (f->being_compacted) { - being_compacted = f->being_compacted; - } - } else { - // Compaction always includes all files for a non-zero level, so for a - // non-zero level, all the files should share the same being_compacted - // value. - // This assumption is only valid when - // ioptions.compaction_options_universal.allow_trivial_move is false - assert(is_first || f->being_compacted == being_compacted); - } - if (is_first) { - being_compacted = f->being_compacted; - is_first = false; - } - } - if (total_compensated_size > 0) { - ret.emplace_back(level, nullptr, total_size, total_compensated_size, - being_compacted); - } - } - return ret; -} - -#ifndef NDEBUG -namespace { -// smallest_seqno and largest_seqno are set iff. `files` is not empty. -void GetSmallestLargestSeqno(const std::vector& files, - SequenceNumber* smallest_seqno, - SequenceNumber* largest_seqno) { - bool is_first = true; - for (FileMetaData* f : files) { - assert(f->smallest_seqno <= f->largest_seqno); - if (is_first) { - is_first = false; - *smallest_seqno = f->smallest_seqno; - *largest_seqno = f->largest_seqno; - } else { - if (f->smallest_seqno < *smallest_seqno) { - *smallest_seqno = f->smallest_seqno; - } - if (f->largest_seqno > *largest_seqno) { - *largest_seqno = f->largest_seqno; - } - } - } -} } // namespace -#endif - -// Algorithm that checks to see if there are any overlapping -// files in the input -bool CompactionPicker::IsInputNonOverlapping(Compaction* c) { - auto comparator = icmp_->user_comparator(); - int first_iter = 1; - - InputFileInfo prev, curr, next; - - SmallestKeyHeap smallest_key_priority_q = - create_level_heap(c, icmp_->user_comparator()); - - while (!smallest_key_priority_q.empty()) { - curr = smallest_key_priority_q.top(); - smallest_key_priority_q.pop(); - - if (first_iter) { - prev = curr; - first_iter = 0; - } else { - if (comparator->Compare(prev.f->largest.user_key(), - curr.f->smallest.user_key()) >= 0) { - // found overlapping files, return false - return false; - } - assert(comparator->Compare(curr.f->largest.user_key(), - prev.f->largest.user_key()) > 0); - prev = curr; - } - - next.f = nullptr; - - if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) { - next.f = c->input(curr.level, curr.index + 1); - next.level = curr.level; - next.index = curr.index + 1; - } - if (next.f) { - smallest_key_priority_q.push(std::move(next)); - } - } - return true; -} - -// Universal style of compaction. Pick files that are contiguous in -// time-range to compact. -// -Compaction* UniversalCompactionPicker::PickCompaction( +Compaction* LevelCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, LogBuffer* log_buffer) { - const int kLevel0 = 0; - double score = vstorage->CompactionScore(kLevel0); - std::vector sorted_runs = - CalculateSortedRuns(*vstorage, ioptions_); - - if (sorted_runs.size() == 0 || - sorted_runs.size() < - (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) { - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n", - cf_name.c_str()); - TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", - nullptr); - return nullptr; - } - VersionStorageInfo::LevelSummaryStorage tmp; - ROCKS_LOG_BUFFER_MAX_SZ( - log_buffer, 3072, - "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n", - cf_name.c_str(), sorted_runs.size(), vstorage->LevelSummary(&tmp)); - - // Check for size amplification first. - Compaction* c; - if ((c = PickCompactionUniversalSizeAmp(cf_name, mutable_cf_options, vstorage, - score, sorted_runs, log_buffer)) != - nullptr) { - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n", - cf_name.c_str()); - } else { - // Size amplification is within limits. Try reducing read - // amplification while maintaining file size ratios. - unsigned int ratio = ioptions_.compaction_options_universal.size_ratio; - - if ((c = PickCompactionUniversalReadAmp( - cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX, - sorted_runs, log_buffer)) != nullptr) { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Universal: compacting for size ratio\n", - cf_name.c_str()); - } else { - // Size amplification and file size ratios are within configured limits. - // If max read amplification is exceeding configured limits, then force - // compaction without looking at filesize ratios and try to reduce - // the number of files to fewer than level0_file_num_compaction_trigger. - // This is guaranteed by NeedsCompaction() - assert(sorted_runs.size() >= - static_cast( - mutable_cf_options.level0_file_num_compaction_trigger)); - // Get the total number of sorted runs that are not being compacted - int num_sr_not_compacted = 0; - for (size_t i = 0; i < sorted_runs.size(); i++) { - if (sorted_runs[i].being_compacted == false) { - num_sr_not_compacted++; - } - } - - // The number of sorted runs that are not being compacted is greater than - // the maximum allowed number of sorted runs - if (num_sr_not_compacted > - mutable_cf_options.level0_file_num_compaction_trigger) { - unsigned int num_files = - num_sr_not_compacted - - mutable_cf_options.level0_file_num_compaction_trigger + 1; - if ((c = PickCompactionUniversalReadAmp( - cf_name, mutable_cf_options, vstorage, score, UINT_MAX, - num_files, sorted_runs, log_buffer)) != nullptr) { - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Universal: compacting for file num -- %u\n", - cf_name.c_str(), num_files); - } - } - } - } - if (c == nullptr) { - TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", - nullptr); - return nullptr; - } - - if (ioptions_.compaction_options_universal.allow_trivial_move == true) { - c->set_is_trivial_move(IsInputNonOverlapping(c)); - } - -// validate that all the chosen files of L0 are non overlapping in time -#ifndef NDEBUG - SequenceNumber prev_smallest_seqno = 0U; - bool is_first = true; - - size_t level_index = 0U; - if (c->start_level() == 0) { - for (auto f : *c->inputs(0)) { - assert(f->smallest_seqno <= f->largest_seqno); - if (is_first) { - is_first = false; - } else { - assert(prev_smallest_seqno > f->largest_seqno); - } - prev_smallest_seqno = f->smallest_seqno; - } - level_index = 1U; - } - for (; level_index < c->num_input_levels(); level_index++) { - if (c->num_input_files(level_index) != 0) { - SequenceNumber smallest_seqno = 0U; - SequenceNumber largest_seqno = 0U; - GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno, - &largest_seqno); - if (is_first) { - is_first = false; - } else if (prev_smallest_seqno > 0) { - // A level is considered as the bottommost level if there are - // no files in higher levels or if files in higher levels do - // not overlap with the files being compacted. Sequence numbers - // of files in bottommost level can be set to 0 to help - // compression. As a result, the following assert may not hold - // if the prev_smallest_seqno is 0. - assert(prev_smallest_seqno > largest_seqno); - } - prev_smallest_seqno = smallest_seqno; - } - } -#endif - // update statistics - MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION, - c->inputs(0)->size()); - - RegisterCompaction(c); - - TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", - c); - return c; -} - -uint32_t UniversalCompactionPicker::GetPathId( - const ImmutableCFOptions& ioptions, uint64_t file_size) { - // Two conditions need to be satisfied: - // (1) the target path needs to be able to hold the file's size - // (2) Total size left in this and previous paths need to be not - // smaller than expected future file size before this new file is - // compacted, which is estimated based on size_ratio. - // For example, if now we are compacting files of size (1, 1, 2, 4, 8), - // we will make sure the target file, probably with size of 16, will be - // placed in a path so that eventually when new files are generated and - // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or - // before the path we chose. - // - // TODO(sdong): now the case of multiple column families is not - // considered in this algorithm. So the target size can be violated in - // that case. We need to improve it. - uint64_t accumulated_size = 0; - uint64_t future_size = - file_size * (100 - ioptions.compaction_options_universal.size_ratio) / - 100; - uint32_t p = 0; - assert(!ioptions.db_paths.empty()); - for (; p < ioptions.db_paths.size() - 1; p++) { - uint64_t target_size = ioptions.db_paths[p].target_size; - if (target_size > file_size && - accumulated_size + (target_size - file_size) > future_size) { - return p; - } - accumulated_size += target_size; - } - return p; -} - -// -// Consider compaction files based on their size differences with -// the next file in time order. -// -Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, unsigned int ratio, - unsigned int max_number_of_files_to_compact, - const std::vector& sorted_runs, LogBuffer* log_buffer) { - unsigned int min_merge_width = - ioptions_.compaction_options_universal.min_merge_width; - unsigned int max_merge_width = - ioptions_.compaction_options_universal.max_merge_width; - - const SortedRun* sr = nullptr; - bool done = false; - size_t start_index = 0; - unsigned int candidate_count = 0; - - unsigned int max_files_to_compact = - std::min(max_merge_width, max_number_of_files_to_compact); - min_merge_width = std::max(min_merge_width, 2U); - - // Caller checks the size before executing this function. This invariant is - // important because otherwise we may have a possible integer underflow when - // dealing with unsigned types. - assert(sorted_runs.size() > 0); - - // Considers a candidate file only if it is smaller than the - // total size accumulated so far. - for (size_t loop = 0; loop < sorted_runs.size(); loop++) { - candidate_count = 0; - - // Skip files that are already being compacted - for (sr = nullptr; loop < sorted_runs.size(); loop++) { - sr = &sorted_runs[loop]; - - if (!sr->being_compacted) { - candidate_count = 1; - break; - } - char file_num_buf[kFormatFileNumberBufSize]; - sr->Dump(file_num_buf, sizeof(file_num_buf)); - ROCKS_LOG_BUFFER(log_buffer, - "[%s] Universal: %s" - "[%d] being compacted, skipping", - cf_name.c_str(), file_num_buf, loop); - - sr = nullptr; - } - - // This file is not being compacted. Consider it as the - // first candidate to be compacted. - uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0; - if (sr != nullptr) { - char file_num_buf[kFormatFileNumberBufSize]; - sr->Dump(file_num_buf, sizeof(file_num_buf), true); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Possible candidate %s[%d].", - cf_name.c_str(), file_num_buf, loop); - } - - // Check if the succeeding files need compaction. - for (size_t i = loop + 1; - candidate_count < max_files_to_compact && i < sorted_runs.size(); - i++) { - const SortedRun* succeeding_sr = &sorted_runs[i]; - if (succeeding_sr->being_compacted) { - break; - } - // Pick files if the total/last candidate file size (increased by the - // specified ratio) is still larger than the next candidate file. - // candidate_size is the total size of files picked so far with the - // default kCompactionStopStyleTotalSize; with - // kCompactionStopStyleSimilarSize, it's simply the size of the last - // picked file. - double sz = candidate_size * (100.0 + ratio) / 100.0; - if (sz < static_cast(succeeding_sr->size)) { - break; - } - if (ioptions_.compaction_options_universal.stop_style == - kCompactionStopStyleSimilarSize) { - // Similar-size stopping rule: also check the last picked file isn't - // far larger than the next candidate file. - sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0; - if (sz < static_cast(candidate_size)) { - // If the small file we've encountered begins a run of similar-size - // files, we'll pick them up on a future iteration of the outer - // loop. If it's some lonely straggler, it'll eventually get picked - // by the last-resort read amp strategy which disregards size ratios. - break; - } - candidate_size = succeeding_sr->compensated_file_size; - } else { // default kCompactionStopStyleTotalSize - candidate_size += succeeding_sr->compensated_file_size; - } - candidate_count++; - } - - // Found a series of consecutive files that need compaction. - if (candidate_count >= (unsigned int)min_merge_width) { - start_index = loop; - done = true; - break; - } else { - for (size_t i = loop; - i < loop + candidate_count && i < sorted_runs.size(); i++) { - const SortedRun* skipping_sr = &sorted_runs[i]; - char file_num_buf[256]; - skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Skipping %s", - cf_name.c_str(), file_num_buf); - } - } - } - if (!done || candidate_count <= 1) { - return nullptr; - } - size_t first_index_after = start_index + candidate_count; - // Compression is enabled if files compacted earlier already reached - // size ratio of compression. - bool enable_compression = true; - int ratio_to_compress = - ioptions_.compaction_options_universal.compression_size_percent; - if (ratio_to_compress >= 0) { - uint64_t total_size = 0; - for (auto& sorted_run : sorted_runs) { - total_size += sorted_run.compensated_file_size; - } - - uint64_t older_file_size = 0; - for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) { - older_file_size += sorted_runs[i].size; - if (older_file_size * 100L >= total_size * (long)ratio_to_compress) { - enable_compression = false; - break; - } - } - } - - uint64_t estimated_total_size = 0; - for (unsigned int i = 0; i < first_index_after; i++) { - estimated_total_size += sorted_runs[i].size; - } - uint32_t path_id = GetPathId(ioptions_, estimated_total_size); - int start_level = sorted_runs[start_index].level; - int output_level; - if (first_index_after == sorted_runs.size()) { - output_level = vstorage->num_levels() - 1; - } else if (sorted_runs[first_index_after].level == 0) { - output_level = 0; - } else { - output_level = sorted_runs[first_index_after].level - 1; - } - - std::vector inputs(vstorage->num_levels()); - for (size_t i = 0; i < inputs.size(); ++i) { - inputs[i].level = start_level + static_cast(i); - } - for (size_t i = start_index; i < first_index_after; i++) { - auto& picking_sr = sorted_runs[i]; - if (picking_sr.level == 0) { - FileMetaData* picking_file = picking_sr.file; - inputs[0].files.push_back(picking_file); - } else { - auto& files = inputs[picking_sr.level - start_level].files; - for (auto* f : vstorage->LevelFiles(picking_sr.level)) { - files.push_back(f); - } - } - char file_num_buf[256]; - picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(), - file_num_buf); - } - - CompactionReason compaction_reason; - if (max_number_of_files_to_compact == UINT_MAX) { - compaction_reason = CompactionReason::kUniversalSortedRunNum; - } else { - compaction_reason = CompactionReason::kUniversalSizeRatio; - } - return new Compaction( - vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level, - mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level, - 1, enable_compression), - /* grandparents */ {}, /* is manual */ false, score, - false /* deletion_compaction */, compaction_reason); -} - -// Look at overall size amplification. If size amplification -// exceeeds the configured value, then do a compaction -// of the candidate files all the way upto the earliest -// base file (overrides configured values of file-size ratios, -// min_merge_width and max_merge_width). -// -Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, - const std::vector& sorted_runs, LogBuffer* log_buffer) { - // percentage flexibilty while reducing size amplification - uint64_t ratio = - ioptions_.compaction_options_universal.max_size_amplification_percent; - - unsigned int candidate_count = 0; - uint64_t candidate_size = 0; - size_t start_index = 0; - const SortedRun* sr = nullptr; - - // Skip files that are already being compacted - for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) { - sr = &sorted_runs[loop]; - if (!sr->being_compacted) { - start_index = loop; // Consider this as the first candidate. - break; - } - char file_num_buf[kFormatFileNumberBufSize]; - sr->Dump(file_num_buf, sizeof(file_num_buf), true); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s", - cf_name.c_str(), file_num_buf, loop, - " cannot be a candidate to reduce size amp.\n"); - sr = nullptr; - } - - if (sr == nullptr) { - return nullptr; // no candidate files - } - { - char file_num_buf[kFormatFileNumberBufSize]; - sr->Dump(file_num_buf, sizeof(file_num_buf), true); - ROCKS_LOG_BUFFER( - log_buffer, - "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s", - cf_name.c_str(), file_num_buf, start_index, " to reduce size amp.\n"); - } - - // keep adding up all the remaining files - for (size_t loop = start_index; loop < sorted_runs.size() - 1; loop++) { - sr = &sorted_runs[loop]; - if (sr->being_compacted) { - char file_num_buf[kFormatFileNumberBufSize]; - sr->Dump(file_num_buf, sizeof(file_num_buf), true); - ROCKS_LOG_BUFFER( - log_buffer, "[%s] Universal: Possible candidate %s[%d] %s", - cf_name.c_str(), file_num_buf, start_index, - " is already being compacted. No size amp reduction possible.\n"); - return nullptr; - } - candidate_size += sr->compensated_file_size; - candidate_count++; - } - if (candidate_count == 0) { - return nullptr; - } - - // size of earliest file - uint64_t earliest_file_size = sorted_runs.back().size; - - // size amplification = percentage of additional size - if (candidate_size * 100 < ratio * earliest_file_size) { - ROCKS_LOG_BUFFER( - log_buffer, - "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64 - " earliest-file-size %" PRIu64, - cf_name.c_str(), candidate_size, earliest_file_size); - return nullptr; - } else { - ROCKS_LOG_BUFFER( - log_buffer, - "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64 - " earliest-file-size %" PRIu64, - cf_name.c_str(), candidate_size, earliest_file_size); - } - assert(start_index < sorted_runs.size() - 1); - - // Estimate total file size - uint64_t estimated_total_size = 0; - for (size_t loop = start_index; loop < sorted_runs.size(); loop++) { - estimated_total_size += sorted_runs[loop].size; - } - uint32_t path_id = GetPathId(ioptions_, estimated_total_size); - int start_level = sorted_runs[start_index].level; - - std::vector inputs(vstorage->num_levels()); - for (size_t i = 0; i < inputs.size(); ++i) { - inputs[i].level = start_level + static_cast(i); - } - // We always compact all the files, so always compress. - for (size_t loop = start_index; loop < sorted_runs.size(); loop++) { - auto& picking_sr = sorted_runs[loop]; - if (picking_sr.level == 0) { - FileMetaData* f = picking_sr.file; - inputs[0].files.push_back(f); - } else { - auto& files = inputs[picking_sr.level - start_level].files; - for (auto* f : vstorage->LevelFiles(picking_sr.level)) { - files.push_back(f); - } - } - char file_num_buf[256]; - picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: size amp picking %s", - cf_name.c_str(), file_num_buf); - } - - return new Compaction( - vstorage, ioptions_, mutable_cf_options, std::move(inputs), - vstorage->num_levels() - 1, - mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1), - /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage, mutable_cf_options, - vstorage->num_levels() - 1, 1), - /* grandparents */ {}, /* is manual */ false, score, - false /* deletion_compaction */, - CompactionReason::kUniversalSizeAmplification); + LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer, + mutable_cf_options, ioptions_); + return builder.PickCompaction(); } +#ifndef ROCKSDB_LITE bool FIFOCompactionPicker::NeedsCompaction( const VersionStorageInfo* vstorage) const { const int kLevel0 = 0; diff --git a/db/compaction_picker.h b/db/compaction_picker.h index b1905f348..e6443a8e7 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -84,15 +84,15 @@ class CompactionPicker { void ReleaseCompactionFiles(Compaction* c, Status status); // Returns true if any one of the specified files are being compacted - bool FilesInCompaction(const std::vector& files); + bool AreFilesInCompaction(const std::vector& files); // Takes a list of CompactionInputFiles and returns a (manual) Compaction // object. - Compaction* FormCompaction( - const CompactionOptions& compact_options, - const std::vector& input_files, int output_level, - VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options, - uint32_t output_path_id); + Compaction* CompactFiles(const CompactionOptions& compact_options, + const std::vector& input_files, + int output_level, VersionStorageInfo* vstorage, + const MutableCFOptions& mutable_cf_options, + uint32_t output_path_id); // Converts a set of compaction input file numbers into // a list of CompactionInputFiles. @@ -102,12 +102,6 @@ class CompactionPicker { const VersionStorageInfo* vstorage, const CompactionOptions& compact_options) const; - // Used in universal compaction when the enabled_trivial_move - // option is set. Checks whether there are any overlapping files - // in the input. Returns true if the input files are non - // overlapping. - bool IsInputNonOverlapping(Compaction* c); - // Is there currently a compaction involving level 0 taking place bool IsLevel0CompactionInProgress() const { return !level0_compactions_in_progress_.empty(); @@ -138,7 +132,6 @@ class CompactionPicker { void GetRange(const std::vector& inputs, InternalKey* smallest, InternalKey* largest) const; - protected: int NumberLevels() const { return ioptions_.num_levels; } // Add more files to the inputs on "level" to make sure that @@ -151,14 +144,14 @@ class CompactionPicker { // populated. // // Will return false if it is impossible to apply this compaction. - bool ExpandWhileOverlapping(const std::string& cf_name, + bool ExpandInputsToCleanCut(const std::string& cf_name, VersionStorageInfo* vstorage, CompactionInputFiles* inputs); // Returns true if any one of the parent files are being compacted - bool RangeInCompaction(VersionStorageInfo* vstorage, - const InternalKey* smallest, - const InternalKey* largest, int level, int* index); + bool IsRangeInCompaction(VersionStorageInfo* vstorage, + const InternalKey* smallest, + const InternalKey* largest, int level, int* index); // Returns true if the key range that `inputs` files cover overlap with the // key range of a currently running compaction. @@ -177,6 +170,20 @@ class CompactionPicker { const CompactionInputFiles& output_level_inputs, std::vector* grandparents); + // Register this compaction in the set of running compactions + void RegisterCompaction(Compaction* c); + + // Remove this compaction from the set of running compactions + void UnregisterCompaction(Compaction* c); + + std::set* level0_compactions_in_progress() { + return &level0_compactions_in_progress_; + } + std::unordered_set* compactions_in_progress() { + return &compactions_in_progress_; + } + + protected: const ImmutableCFOptions& ioptions_; // A helper function to SanitizeCompactionInputFiles() that @@ -187,12 +194,6 @@ class CompactionPicker { const ColumnFamilyMetaData& cf_meta, const int output_level) const; #endif // ROCKSDB_LITE - // Register this compaction in the set of running compactions - void RegisterCompaction(Compaction* c); - - // Remove this compaction from the set of running compactions - void UnregisterCompaction(Compaction* c); - // Keeps track of all compactions that are running on Level0. // Protected by DB mutex std::set level0_compactions_in_progress_; @@ -216,116 +217,9 @@ class LevelCompactionPicker : public CompactionPicker { virtual bool NeedsCompaction( const VersionStorageInfo* vstorage) const override; - - // Pick a path ID to place a newly generated file, with its level - static uint32_t GetPathId(const ImmutableCFOptions& ioptions, - const MutableCFOptions& mutable_cf_options, - int level); - - private: - // For the specfied level, pick a file that we want to compact. - // Returns false if there is no file to compact. - // If it returns true, inputs->files.size() will be exactly one. - // If level is 0 and there is already a compaction on that level, this - // function will return false. - bool PickCompactionBySize(VersionStorageInfo* vstorage, int level, - int output_level, CompactionInputFiles* inputs, - int* parent_index, int* base_index); - - // For L0->L0, picks the longest span of files that aren't currently - // undergoing compaction for which work-per-deleted-file decreases. The span - // always starts from the newest L0 file. - // - // Intra-L0 compaction is independent of all other files, so it can be - // performed even when L0->base_level compactions are blocked. - // - // Returns true if `inputs` is populated with a span of files to be compacted; - // otherwise, returns false. - bool PickIntraL0Compaction(VersionStorageInfo* vstorage, - const MutableCFOptions& mutable_cf_options, - CompactionInputFiles* inputs); - - // If there is any file marked for compaction, put put it into inputs. - // This is still experimental. It will return meaningful results only if - // clients call experimental feature SuggestCompactRange() - void PickFilesMarkedForCompactionExperimental(const std::string& cf_name, - VersionStorageInfo* vstorage, - CompactionInputFiles* inputs, - int* level, int* output_level); - - static const int kMinFilesForIntraL0Compaction = 4; }; #ifndef ROCKSDB_LITE -class UniversalCompactionPicker : public CompactionPicker { - public: - UniversalCompactionPicker(const ImmutableCFOptions& ioptions, - const InternalKeyComparator* icmp) - : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction(const std::string& cf_name, - const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, - LogBuffer* log_buffer) override; - - virtual int MaxOutputLevel() const override { return NumberLevels() - 1; } - - virtual bool NeedsCompaction( - const VersionStorageInfo* vstorage) const override; - - private: - struct SortedRun { - SortedRun(int _level, FileMetaData* _file, uint64_t _size, - uint64_t _compensated_file_size, bool _being_compacted) - : level(_level), - file(_file), - size(_size), - compensated_file_size(_compensated_file_size), - being_compacted(_being_compacted) { - assert(compensated_file_size > 0); - assert(level != 0 || file != nullptr); - } - - void Dump(char* out_buf, size_t out_buf_size, - bool print_path = false) const; - - // sorted_run_count is added into the string to print - void DumpSizeInfo(char* out_buf, size_t out_buf_size, - size_t sorted_run_count) const; - - int level; - // `file` Will be null for level > 0. For level = 0, the sorted run is - // for this file. - FileMetaData* file; - // For level > 0, `size` and `compensated_file_size` are sum of sizes all - // files in the level. `being_compacted` should be the same for all files - // in a non-zero level. Use the value here. - uint64_t size; - uint64_t compensated_file_size; - bool being_compacted; - }; - - // Pick Universal compaction to limit read amplification - Compaction* PickCompactionUniversalReadAmp( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, unsigned int ratio, - unsigned int num_files, const std::vector& sorted_runs, - LogBuffer* log_buffer); - - // Pick Universal compaction to limit space amplification. - Compaction* PickCompactionUniversalSizeAmp( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, - const std::vector& sorted_runs, LogBuffer* log_buffer); - - static std::vector CalculateSortedRuns( - const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions); - - // Pick a path ID to place a newly generated file, with its estimated file - // size. - static uint32_t GetPathId(const ImmutableCFOptions& ioptions, - uint64_t file_size); -}; - class FIFOCompactionPicker : public CompactionPicker { public: FIFOCompactionPicker(const ImmutableCFOptions& ioptions, diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index ad1173110..0e18e167d 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -3,11 +3,12 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "db/compaction.h" #include "db/compaction_picker.h" #include #include #include +#include "db/compaction.h" +#include "db/compaction_picker_universal.h" #include "util/logging.h" #include "util/string_util.h" diff --git a/db/compaction_picker_universal.cc b/db/compaction_picker_universal.cc new file mode 100644 index 000000000..0c61efe86 --- /dev/null +++ b/db/compaction_picker_universal.cc @@ -0,0 +1,735 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/compaction_picker_universal.h" +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include +#include +#include +#include "db/column_family.h" +#include "monitoring/statistics.h" +#include "util/filename.h" +#include "util/log_buffer.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/sync_point.h" + +namespace rocksdb { +namespace { +// Used in universal compaction when trivial move is enabled. +// This structure is used for the construction of min heap +// that contains the file meta data, the level of the file +// and the index of the file in that level + +struct InputFileInfo { + InputFileInfo() : f(nullptr) {} + + FileMetaData* f; + size_t level; + size_t index; +}; + +// Used in universal compaction when trivial move is enabled. +// This comparator is used for the construction of min heap +// based on the smallest key of the file. +struct SmallestKeyHeapComparator { + explicit SmallestKeyHeapComparator(const Comparator* ucmp) { ucmp_ = ucmp; } + + bool operator()(InputFileInfo i1, InputFileInfo i2) const { + return (ucmp_->Compare(i1.f->smallest.user_key(), + i2.f->smallest.user_key()) > 0); + } + + private: + const Comparator* ucmp_; +}; + +typedef std::priority_queue, + SmallestKeyHeapComparator> + SmallestKeyHeap; + +// This function creates the heap that is used to find if the files are +// overlapping during universal compaction when the allow_trivial_move +// is set. +SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) { + SmallestKeyHeap smallest_key_priority_q = + SmallestKeyHeap(SmallestKeyHeapComparator(ucmp)); + + InputFileInfo input_file; + + for (size_t l = 0; l < c->num_input_levels(); l++) { + if (c->num_input_files(l) != 0) { + if (l == 0 && c->start_level() == 0) { + for (size_t i = 0; i < c->num_input_files(0); i++) { + input_file.f = c->input(0, i); + input_file.level = 0; + input_file.index = i; + smallest_key_priority_q.push(std::move(input_file)); + } + } else { + input_file.f = c->input(l, 0); + input_file.level = l; + input_file.index = 0; + smallest_key_priority_q.push(std::move(input_file)); + } + } + } + return smallest_key_priority_q; +} + +#ifndef NDEBUG +// smallest_seqno and largest_seqno are set iff. `files` is not empty. +void GetSmallestLargestSeqno(const std::vector& files, + SequenceNumber* smallest_seqno, + SequenceNumber* largest_seqno) { + bool is_first = true; + for (FileMetaData* f : files) { + assert(f->smallest_seqno <= f->largest_seqno); + if (is_first) { + is_first = false; + *smallest_seqno = f->smallest_seqno; + *largest_seqno = f->largest_seqno; + } else { + if (f->smallest_seqno < *smallest_seqno) { + *smallest_seqno = f->smallest_seqno; + } + if (f->largest_seqno > *largest_seqno) { + *largest_seqno = f->largest_seqno; + } + } + } +} +#endif +} // namespace + +// Algorithm that checks to see if there are any overlapping +// files in the input +bool UniversalCompactionPicker::IsInputFilesNonOverlapping(Compaction* c) { + auto comparator = icmp_->user_comparator(); + int first_iter = 1; + + InputFileInfo prev, curr, next; + + SmallestKeyHeap smallest_key_priority_q = + create_level_heap(c, icmp_->user_comparator()); + + while (!smallest_key_priority_q.empty()) { + curr = smallest_key_priority_q.top(); + smallest_key_priority_q.pop(); + + if (first_iter) { + prev = curr; + first_iter = 0; + } else { + if (comparator->Compare(prev.f->largest.user_key(), + curr.f->smallest.user_key()) >= 0) { + // found overlapping files, return false + return false; + } + assert(comparator->Compare(curr.f->largest.user_key(), + prev.f->largest.user_key()) > 0); + prev = curr; + } + + next.f = nullptr; + + if (curr.level != 0 && curr.index < c->num_input_files(curr.level) - 1) { + next.f = c->input(curr.level, curr.index + 1); + next.level = curr.level; + next.index = curr.index + 1; + } + + if (next.f) { + smallest_key_priority_q.push(std::move(next)); + } + } + return true; +} + +bool UniversalCompactionPicker::NeedsCompaction( + const VersionStorageInfo* vstorage) const { + const int kLevel0 = 0; + return vstorage->CompactionScore(kLevel0) >= 1; +} + +void UniversalCompactionPicker::SortedRun::Dump(char* out_buf, + size_t out_buf_size, + bool print_path) const { + if (level == 0) { + assert(file != nullptr); + if (file->fd.GetPathId() == 0 || !print_path) { + snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber()); + } else { + snprintf(out_buf, out_buf_size, "file %" PRIu64 + "(path " + "%" PRIu32 ")", + file->fd.GetNumber(), file->fd.GetPathId()); + } + } else { + snprintf(out_buf, out_buf_size, "level %d", level); + } +} + +void UniversalCompactionPicker::SortedRun::DumpSizeInfo( + char* out_buf, size_t out_buf_size, size_t sorted_run_count) const { + if (level == 0) { + assert(file != nullptr); + snprintf(out_buf, out_buf_size, + "file %" PRIu64 "[%" ROCKSDB_PRIszt + "] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(), + file->compensated_file_size); + } else { + snprintf(out_buf, out_buf_size, + "level %d[%" ROCKSDB_PRIszt + "] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + level, sorted_run_count, size, compensated_file_size); + } +} + +std::vector +UniversalCompactionPicker::CalculateSortedRuns( + const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions) { + std::vector ret; + for (FileMetaData* f : vstorage.LevelFiles(0)) { + ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size, + f->being_compacted); + } + for (int level = 1; level < vstorage.num_levels(); level++) { + uint64_t total_compensated_size = 0U; + uint64_t total_size = 0U; + bool being_compacted = false; + bool is_first = true; + for (FileMetaData* f : vstorage.LevelFiles(level)) { + total_compensated_size += f->compensated_file_size; + total_size += f->fd.GetFileSize(); + if (ioptions.compaction_options_universal.allow_trivial_move == true) { + if (f->being_compacted) { + being_compacted = f->being_compacted; + } + } else { + // Compaction always includes all files for a non-zero level, so for a + // non-zero level, all the files should share the same being_compacted + // value. + // This assumption is only valid when + // ioptions.compaction_options_universal.allow_trivial_move is false + assert(is_first || f->being_compacted == being_compacted); + } + if (is_first) { + being_compacted = f->being_compacted; + is_first = false; + } + } + if (total_compensated_size > 0) { + ret.emplace_back(level, nullptr, total_size, total_compensated_size, + being_compacted); + } + } + return ret; +} + +// Universal style of compaction. Pick files that are contiguous in +// time-range to compact. +// +Compaction* UniversalCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + const int kLevel0 = 0; + double score = vstorage->CompactionScore(kLevel0); + std::vector sorted_runs = + CalculateSortedRuns(*vstorage, ioptions_); + + if (sorted_runs.size() == 0 || + sorted_runs.size() < + (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) { + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n", + cf_name.c_str()); + TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", + nullptr); + return nullptr; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_BUFFER_MAX_SZ( + log_buffer, 3072, + "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n", + cf_name.c_str(), sorted_runs.size(), vstorage->LevelSummary(&tmp)); + + // Check for size amplification first. + Compaction* c; + if ((c = PickCompactionToReduceSizeAmp(cf_name, mutable_cf_options, vstorage, + score, sorted_runs, log_buffer)) != + nullptr) { + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n", + cf_name.c_str()); + } else { + // Size amplification is within limits. Try reducing read + // amplification while maintaining file size ratios. + unsigned int ratio = ioptions_.compaction_options_universal.size_ratio; + + if ((c = PickCompactionToReduceSortedRuns( + cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX, + sorted_runs, log_buffer)) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Universal: compacting for size ratio\n", + cf_name.c_str()); + } else { + // Size amplification and file size ratios are within configured limits. + // If max read amplification is exceeding configured limits, then force + // compaction without looking at filesize ratios and try to reduce + // the number of files to fewer than level0_file_num_compaction_trigger. + // This is guaranteed by NeedsCompaction() + assert(sorted_runs.size() >= + static_cast( + mutable_cf_options.level0_file_num_compaction_trigger)); + // Get the total number of sorted runs that are not being compacted + int num_sr_not_compacted = 0; + for (size_t i = 0; i < sorted_runs.size(); i++) { + if (sorted_runs[i].being_compacted == false) { + num_sr_not_compacted++; + } + } + + // The number of sorted runs that are not being compacted is greater than + // the maximum allowed number of sorted runs + if (num_sr_not_compacted > + mutable_cf_options.level0_file_num_compaction_trigger) { + unsigned int num_files = + num_sr_not_compacted - + mutable_cf_options.level0_file_num_compaction_trigger + 1; + if ((c = PickCompactionToReduceSortedRuns( + cf_name, mutable_cf_options, vstorage, score, UINT_MAX, + num_files, sorted_runs, log_buffer)) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Universal: compacting for file num -- %u\n", + cf_name.c_str(), num_files); + } + } + } + } + if (c == nullptr) { + TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", + nullptr); + return nullptr; + } + + if (ioptions_.compaction_options_universal.allow_trivial_move == true) { + c->set_is_trivial_move(IsInputFilesNonOverlapping(c)); + } + +// validate that all the chosen files of L0 are non overlapping in time +#ifndef NDEBUG + SequenceNumber prev_smallest_seqno = 0U; + bool is_first = true; + + size_t level_index = 0U; + if (c->start_level() == 0) { + for (auto f : *c->inputs(0)) { + assert(f->smallest_seqno <= f->largest_seqno); + if (is_first) { + is_first = false; + } else { + assert(prev_smallest_seqno > f->largest_seqno); + } + prev_smallest_seqno = f->smallest_seqno; + } + level_index = 1U; + } + for (; level_index < c->num_input_levels(); level_index++) { + if (c->num_input_files(level_index) != 0) { + SequenceNumber smallest_seqno = 0U; + SequenceNumber largest_seqno = 0U; + GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno, + &largest_seqno); + if (is_first) { + is_first = false; + } else if (prev_smallest_seqno > 0) { + // A level is considered as the bottommost level if there are + // no files in higher levels or if files in higher levels do + // not overlap with the files being compacted. Sequence numbers + // of files in bottommost level can be set to 0 to help + // compression. As a result, the following assert may not hold + // if the prev_smallest_seqno is 0. + assert(prev_smallest_seqno > largest_seqno); + } + prev_smallest_seqno = smallest_seqno; + } + } +#endif + // update statistics + MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION, + c->inputs(0)->size()); + + RegisterCompaction(c); + + TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", + c); + return c; +} + +uint32_t UniversalCompactionPicker::GetPathId( + const ImmutableCFOptions& ioptions, uint64_t file_size) { + // Two conditions need to be satisfied: + // (1) the target path needs to be able to hold the file's size + // (2) Total size left in this and previous paths need to be not + // smaller than expected future file size before this new file is + // compacted, which is estimated based on size_ratio. + // For example, if now we are compacting files of size (1, 1, 2, 4, 8), + // we will make sure the target file, probably with size of 16, will be + // placed in a path so that eventually when new files are generated and + // compacted to (1, 1, 2, 4, 8, 16), all those files can be stored in or + // before the path we chose. + // + // TODO(sdong): now the case of multiple column families is not + // considered in this algorithm. So the target size can be violated in + // that case. We need to improve it. + uint64_t accumulated_size = 0; + uint64_t future_size = + file_size * (100 - ioptions.compaction_options_universal.size_ratio) / + 100; + uint32_t p = 0; + assert(!ioptions.db_paths.empty()); + for (; p < ioptions.db_paths.size() - 1; p++) { + uint64_t target_size = ioptions.db_paths[p].target_size; + if (target_size > file_size && + accumulated_size + (target_size - file_size) > future_size) { + return p; + } + accumulated_size += target_size; + } + return p; +} + +// +// Consider compaction files based on their size differences with +// the next file in time order. +// +Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, double score, unsigned int ratio, + unsigned int max_number_of_files_to_compact, + const std::vector& sorted_runs, LogBuffer* log_buffer) { + unsigned int min_merge_width = + ioptions_.compaction_options_universal.min_merge_width; + unsigned int max_merge_width = + ioptions_.compaction_options_universal.max_merge_width; + + const SortedRun* sr = nullptr; + bool done = false; + size_t start_index = 0; + unsigned int candidate_count = 0; + + unsigned int max_files_to_compact = + std::min(max_merge_width, max_number_of_files_to_compact); + min_merge_width = std::max(min_merge_width, 2U); + + // Caller checks the size before executing this function. This invariant is + // important because otherwise we may have a possible integer underflow when + // dealing with unsigned types. + assert(sorted_runs.size() > 0); + + // Considers a candidate file only if it is smaller than the + // total size accumulated so far. + for (size_t loop = 0; loop < sorted_runs.size(); loop++) { + candidate_count = 0; + + // Skip files that are already being compacted + for (sr = nullptr; loop < sorted_runs.size(); loop++) { + sr = &sorted_runs[loop]; + + if (!sr->being_compacted) { + candidate_count = 1; + break; + } + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf)); + ROCKS_LOG_BUFFER(log_buffer, + "[%s] Universal: %s" + "[%d] being compacted, skipping", + cf_name.c_str(), file_num_buf, loop); + + sr = nullptr; + } + + // This file is not being compacted. Consider it as the + // first candidate to be compacted. + uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0; + if (sr != nullptr) { + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Possible candidate %s[%d].", + cf_name.c_str(), file_num_buf, loop); + } + + // Check if the succeeding files need compaction. + for (size_t i = loop + 1; + candidate_count < max_files_to_compact && i < sorted_runs.size(); + i++) { + const SortedRun* succeeding_sr = &sorted_runs[i]; + if (succeeding_sr->being_compacted) { + break; + } + // Pick files if the total/last candidate file size (increased by the + // specified ratio) is still larger than the next candidate file. + // candidate_size is the total size of files picked so far with the + // default kCompactionStopStyleTotalSize; with + // kCompactionStopStyleSimilarSize, it's simply the size of the last + // picked file. + double sz = candidate_size * (100.0 + ratio) / 100.0; + if (sz < static_cast(succeeding_sr->size)) { + break; + } + if (ioptions_.compaction_options_universal.stop_style == + kCompactionStopStyleSimilarSize) { + // Similar-size stopping rule: also check the last picked file isn't + // far larger than the next candidate file. + sz = (succeeding_sr->size * (100.0 + ratio)) / 100.0; + if (sz < static_cast(candidate_size)) { + // If the small file we've encountered begins a run of similar-size + // files, we'll pick them up on a future iteration of the outer + // loop. If it's some lonely straggler, it'll eventually get picked + // by the last-resort read amp strategy which disregards size ratios. + break; + } + candidate_size = succeeding_sr->compensated_file_size; + } else { // default kCompactionStopStyleTotalSize + candidate_size += succeeding_sr->compensated_file_size; + } + candidate_count++; + } + + // Found a series of consecutive files that need compaction. + if (candidate_count >= (unsigned int)min_merge_width) { + start_index = loop; + done = true; + break; + } else { + for (size_t i = loop; + i < loop + candidate_count && i < sorted_runs.size(); i++) { + const SortedRun* skipping_sr = &sorted_runs[i]; + char file_num_buf[256]; + skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Skipping %s", + cf_name.c_str(), file_num_buf); + } + } + } + if (!done || candidate_count <= 1) { + return nullptr; + } + size_t first_index_after = start_index + candidate_count; + // Compression is enabled if files compacted earlier already reached + // size ratio of compression. + bool enable_compression = true; + int ratio_to_compress = + ioptions_.compaction_options_universal.compression_size_percent; + if (ratio_to_compress >= 0) { + uint64_t total_size = 0; + for (auto& sorted_run : sorted_runs) { + total_size += sorted_run.compensated_file_size; + } + + uint64_t older_file_size = 0; + for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) { + older_file_size += sorted_runs[i].size; + if (older_file_size * 100L >= total_size * (long)ratio_to_compress) { + enable_compression = false; + break; + } + } + } + + uint64_t estimated_total_size = 0; + for (unsigned int i = 0; i < first_index_after; i++) { + estimated_total_size += sorted_runs[i].size; + } + uint32_t path_id = GetPathId(ioptions_, estimated_total_size); + int start_level = sorted_runs[start_index].level; + int output_level; + if (first_index_after == sorted_runs.size()) { + output_level = vstorage->num_levels() - 1; + } else if (sorted_runs[first_index_after].level == 0) { + output_level = 0; + } else { + output_level = sorted_runs[first_index_after].level - 1; + } + + std::vector inputs(vstorage->num_levels()); + for (size_t i = 0; i < inputs.size(); ++i) { + inputs[i].level = start_level + static_cast(i); + } + for (size_t i = start_index; i < first_index_after; i++) { + auto& picking_sr = sorted_runs[i]; + if (picking_sr.level == 0) { + FileMetaData* picking_file = picking_sr.file; + inputs[0].files.push_back(picking_file); + } else { + auto& files = inputs[picking_sr.level - start_level].files; + for (auto* f : vstorage->LevelFiles(picking_sr.level)) { + files.push_back(f); + } + } + char file_num_buf[256]; + picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i); + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(), + file_num_buf); + } + + CompactionReason compaction_reason; + if (max_number_of_files_to_compact == UINT_MAX) { + compaction_reason = CompactionReason::kUniversalSortedRunNum; + } else { + compaction_reason = CompactionReason::kUniversalSizeRatio; + } + return new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level, + mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id, + GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level, + 1, enable_compression), + /* grandparents */ {}, /* is manual */ false, score, + false /* deletion_compaction */, compaction_reason); +} + +// Look at overall size amplification. If size amplification +// exceeeds the configured value, then do a compaction +// of the candidate files all the way upto the earliest +// base file (overrides configured values of file-size ratios, +// min_merge_width and max_merge_width). +// +Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, double score, + const std::vector& sorted_runs, LogBuffer* log_buffer) { + // percentage flexibilty while reducing size amplification + uint64_t ratio = + ioptions_.compaction_options_universal.max_size_amplification_percent; + + unsigned int candidate_count = 0; + uint64_t candidate_size = 0; + size_t start_index = 0; + const SortedRun* sr = nullptr; + + // Skip files that are already being compacted + for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) { + sr = &sorted_runs[loop]; + if (!sr->being_compacted) { + start_index = loop; // Consider this as the first candidate. + break; + } + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s", + cf_name.c_str(), file_num_buf, loop, + " cannot be a candidate to reduce size amp.\n"); + sr = nullptr; + } + + if (sr == nullptr) { + return nullptr; // no candidate files + } + { + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + ROCKS_LOG_BUFFER( + log_buffer, + "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s", + cf_name.c_str(), file_num_buf, start_index, " to reduce size amp.\n"); + } + + // keep adding up all the remaining files + for (size_t loop = start_index; loop < sorted_runs.size() - 1; loop++) { + sr = &sorted_runs[loop]; + if (sr->being_compacted) { + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + ROCKS_LOG_BUFFER( + log_buffer, "[%s] Universal: Possible candidate %s[%d] %s", + cf_name.c_str(), file_num_buf, start_index, + " is already being compacted. No size amp reduction possible.\n"); + return nullptr; + } + candidate_size += sr->compensated_file_size; + candidate_count++; + } + if (candidate_count == 0) { + return nullptr; + } + + // size of earliest file + uint64_t earliest_file_size = sorted_runs.back().size; + + // size amplification = percentage of additional size + if (candidate_size * 100 < ratio * earliest_file_size) { + ROCKS_LOG_BUFFER( + log_buffer, + "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64 + " earliest-file-size %" PRIu64, + cf_name.c_str(), candidate_size, earliest_file_size); + return nullptr; + } else { + ROCKS_LOG_BUFFER( + log_buffer, + "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64 + " earliest-file-size %" PRIu64, + cf_name.c_str(), candidate_size, earliest_file_size); + } + assert(start_index < sorted_runs.size() - 1); + + // Estimate total file size + uint64_t estimated_total_size = 0; + for (size_t loop = start_index; loop < sorted_runs.size(); loop++) { + estimated_total_size += sorted_runs[loop].size; + } + uint32_t path_id = GetPathId(ioptions_, estimated_total_size); + int start_level = sorted_runs[start_index].level; + + std::vector inputs(vstorage->num_levels()); + for (size_t i = 0; i < inputs.size(); ++i) { + inputs[i].level = start_level + static_cast(i); + } + // We always compact all the files, so always compress. + for (size_t loop = start_index; loop < sorted_runs.size(); loop++) { + auto& picking_sr = sorted_runs[loop]; + if (picking_sr.level == 0) { + FileMetaData* f = picking_sr.file; + inputs[0].files.push_back(f); + } else { + auto& files = inputs[picking_sr.level - start_level].files; + for (auto* f : vstorage->LevelFiles(picking_sr.level)) { + files.push_back(f); + } + } + char file_num_buf[256]; + picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); + ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: size amp picking %s", + cf_name.c_str(), file_num_buf); + } + + return new Compaction( + vstorage, ioptions_, mutable_cf_options, std::move(inputs), + vstorage->num_levels() - 1, + mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1), + /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, + GetCompressionType(ioptions_, vstorage, mutable_cf_options, + vstorage->num_levels() - 1, 1), + /* grandparents */ {}, /* is manual */ false, score, + false /* deletion_compaction */, + CompactionReason::kUniversalSizeAmplification); +} +} // namespace rocksdb + +#endif // !ROCKSDB_LITE diff --git a/db/compaction_picker_universal.h b/db/compaction_picker_universal.h new file mode 100644 index 000000000..271bd06fe --- /dev/null +++ b/db/compaction_picker_universal.h @@ -0,0 +1,91 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#ifndef ROCKSDB_LITE + +#include "db/compaction_picker.h" + +namespace rocksdb { +class UniversalCompactionPicker : public CompactionPicker { + public: + UniversalCompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp) + : CompactionPicker(ioptions, icmp) {} + virtual Compaction* PickCompaction(const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + LogBuffer* log_buffer) override; + + virtual int MaxOutputLevel() const override { return NumberLevels() - 1; } + + virtual bool NeedsCompaction( + const VersionStorageInfo* vstorage) const override; + + private: + struct SortedRun { + SortedRun(int _level, FileMetaData* _file, uint64_t _size, + uint64_t _compensated_file_size, bool _being_compacted) + : level(_level), + file(_file), + size(_size), + compensated_file_size(_compensated_file_size), + being_compacted(_being_compacted) { + assert(compensated_file_size > 0); + assert(level != 0 || file != nullptr); + } + + void Dump(char* out_buf, size_t out_buf_size, + bool print_path = false) const; + + // sorted_run_count is added into the string to print + void DumpSizeInfo(char* out_buf, size_t out_buf_size, + size_t sorted_run_count) const; + + int level; + // `file` Will be null for level > 0. For level = 0, the sorted run is + // for this file. + FileMetaData* file; + // For level > 0, `size` and `compensated_file_size` are sum of sizes all + // files in the level. `being_compacted` should be the same for all files + // in a non-zero level. Use the value here. + uint64_t size; + uint64_t compensated_file_size; + bool being_compacted; + }; + + // Pick Universal compaction to limit read amplification + Compaction* PickCompactionToReduceSortedRuns( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, double score, unsigned int ratio, + unsigned int num_files, const std::vector& sorted_runs, + LogBuffer* log_buffer); + + // Pick Universal compaction to limit space amplification. + Compaction* PickCompactionToReduceSizeAmp( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, double score, + const std::vector& sorted_runs, LogBuffer* log_buffer); + + // Used in universal compaction when the enabled_trivial_move + // option is set. Checks whether there are any overlapping files + // in the input. Returns true if the input files are non + // overlapping. + bool IsInputFilesNonOverlapping(Compaction* c); + + static std::vector CalculateSortedRuns( + const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions); + + // Pick a path ID to place a newly generated file, with its estimated file + // size. + static uint32_t GetPathId(const ImmutableCFOptions& ioptions, + uint64_t file_size); +}; +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 70ff1af04..245e5727d 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -16,10 +16,10 @@ #include "db/builder.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" -#include "util/sst_file_manager_impl.h" -#include "util/sync_point.h" #include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_util.h" +#include "util/sst_file_manager_impl.h" +#include "util/sync_point.h" namespace rocksdb { Status DBImpl::SyncClosedLogs(JobContext* job_context) { @@ -425,7 +425,7 @@ Status DBImpl::CompactFilesImpl( } for (auto inputs : input_files) { - if (cfd->compaction_picker()->FilesInCompaction(inputs.files)) { + if (cfd->compaction_picker()->AreFilesInCompaction(inputs.files)) { return Status::Aborted( "Some of the necessary compaction input " "files are already being compacted"); @@ -437,7 +437,7 @@ Status DBImpl::CompactFilesImpl( unique_ptr c; assert(cfd->compaction_picker()); - c.reset(cfd->compaction_picker()->FormCompaction( + c.reset(cfd->compaction_picker()->CompactFiles( compact_options, input_files, output_level, version->storage_info(), *cfd->GetLatestMutableCFOptions(), output_path_id)); if (!c) { diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 468cb6c88..d043d678f 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -14,8 +14,8 @@ #include #include "db/builder.h" -#include "rocksdb/wal_filter.h" #include "options/options_helper.h" +#include "rocksdb/wal_filter.h" #include "util/sst_file_manager_impl.h" #include "util/sync_point.h" diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 154e4dc81..d0755ea22 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -12,8 +12,8 @@ #define __STDC_FORMAT_MACROS #endif #include -#include "options/options_helper.h" #include "monitoring/perf_context_imp.h" +#include "options/options_helper.h" #include "util/sync_point.h" namespace rocksdb { diff --git a/src.mk b/src.mk index 77c95d9e3..6f4303cc2 100644 --- a/src.mk +++ b/src.mk @@ -11,6 +11,7 @@ LIB_SOURCES = \ db/compaction_iterator.cc \ db/compaction_job.cc \ db/compaction_picker.cc \ + db/compaction_picker_universal.cc \ db/convenience.cc \ db/db_filesnapshot.cc \ db/db_impl.cc \