From 9bd5fce6e89fcb294a1d193f32f3e4bb2e41d994 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 16 Sep 2019 10:49:20 -0700 Subject: [PATCH] Refactor UniversalCompactionPicker code a little bit (#5639) Summary: Several functions of UniversalCompactionPicker share most of the parameters. Move these functions to a class with those shared arguments as class members. Hopefully this will make code slightly easier to maintain. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5639 Test Plan: Run all existing test. Differential Revision: D16996403 fbshipit-source-id: fffafd1897ab132b420b1dec073542cffb5c44de --- db/compaction/compaction_picker_universal.cc | 435 +++++++++++-------- db/compaction/compaction_picker_universal.h | 66 --- db/db_universal_compaction_test.cc | 8 +- 3 files changed, 266 insertions(+), 243 deletions(-) diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index 5909ab576..7eddfa2b8 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -25,6 +25,98 @@ namespace rocksdb { namespace { +// A helper class that form universal compactions. The class is used by +// UniversalCompactionPicker::PickCompaction(). +// The usage is to create the class, and get the compaction object by calling +// PickCompaction(). +class UniversalCompactionBuilder { + public: + UniversalCompactionBuilder(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp, + const std::string& cf_name, + const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, + UniversalCompactionPicker* picker, + LogBuffer* log_buffer) + : ioptions_(ioptions), + icmp_(icmp), + cf_name_(cf_name), + mutable_cf_options_(mutable_cf_options), + vstorage_(vstorage), + picker_(picker), + log_buffer_(log_buffer) {} + + // Form and return the compaction object. The caller owns return object. + Compaction* PickCompaction(); + + private: + struct SortedRun { + SortedRun(int _level, FileMetaData* _file, uint64_t _size, + uint64_t _compensated_file_size, bool _being_compacted) + : level(_level), + file(_file), + size(_size), + compensated_file_size(_compensated_file_size), + being_compacted(_being_compacted) { + assert(compensated_file_size > 0); + assert(level != 0 || file != nullptr); + } + + void Dump(char* out_buf, size_t out_buf_size, + bool print_path = false) const; + + // sorted_run_count is added into the string to print + void DumpSizeInfo(char* out_buf, size_t out_buf_size, + size_t sorted_run_count) const; + + int level; + // `file` Will be null for level > 0. For level = 0, the sorted run is + // for this file. + FileMetaData* file; + // For level > 0, `size` and `compensated_file_size` are sum of sizes all + // files in the level. `being_compacted` should be the same for all files + // in a non-zero level. Use the value here. + uint64_t size; + uint64_t compensated_file_size; + bool being_compacted; + }; + + // Pick Universal compaction to limit read amplification + Compaction* PickCompactionToReduceSortedRuns( + unsigned int ratio, unsigned int max_number_of_files_to_compact); + + // Pick Universal compaction to limit space amplification. + Compaction* PickCompactionToReduceSizeAmp(); + + Compaction* PickDeleteTriggeredCompaction(); + + // Used in universal compaction when the enabled_trivial_move + // option is set. Checks whether there are any overlapping files + // in the input. Returns true if the input files are non + // overlapping. + bool IsInputFilesNonOverlapping(Compaction* c); + + const ImmutableCFOptions& ioptions_; + const InternalKeyComparator* icmp_; + double score_; + std::vector sorted_runs_; + const std::string& cf_name_; + const MutableCFOptions& mutable_cf_options_; + VersionStorageInfo* vstorage_; + UniversalCompactionPicker* picker_; + LogBuffer* log_buffer_; + + static std::vector CalculateSortedRuns( + const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options); + + // Pick a path ID to place a newly generated file, with its estimated file + // size. + static uint32_t GetPathId(const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options, + uint64_t file_size); +}; + // Used in universal compaction when trivial move is enabled. // This structure is used for the construction of min heap // that contains the file meta data, the level of the file @@ -113,7 +205,7 @@ void GetSmallestLargestSeqno(const std::vector& files, // Algorithm that checks to see if there are any overlapping // files in the input -bool UniversalCompactionPicker::IsInputFilesNonOverlapping(Compaction* c) { +bool UniversalCompactionBuilder::IsInputFilesNonOverlapping(Compaction* c) { auto comparator = icmp_->user_comparator(); int first_iter = 1; @@ -167,9 +259,18 @@ bool UniversalCompactionPicker::NeedsCompaction( return false; } -void UniversalCompactionPicker::SortedRun::Dump(char* out_buf, - size_t out_buf_size, - bool print_path) const { +Compaction* UniversalCompactionPicker::PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name, + mutable_cf_options, vstorage, this, + log_buffer); + return builder.PickCompaction(); +} + +void UniversalCompactionBuilder::SortedRun::Dump(char* out_buf, + size_t out_buf_size, + bool print_path) const { if (level == 0) { assert(file != nullptr); if (file->fd.GetPathId() == 0 || !print_path) { @@ -185,7 +286,7 @@ void UniversalCompactionPicker::SortedRun::Dump(char* out_buf, } } -void UniversalCompactionPicker::SortedRun::DumpSizeInfo( +void UniversalCompactionBuilder::SortedRun::DumpSizeInfo( char* out_buf, size_t out_buf_size, size_t sorted_run_count) const { if (level == 0) { assert(file != nullptr); @@ -204,11 +305,11 @@ void UniversalCompactionPicker::SortedRun::DumpSizeInfo( } } -std::vector -UniversalCompactionPicker::CalculateSortedRuns( +std::vector +UniversalCompactionBuilder::CalculateSortedRuns( const VersionStorageInfo& vstorage, const ImmutableCFOptions& /*ioptions*/, const MutableCFOptions& mutable_cf_options) { - std::vector ret; + std::vector ret; for (FileMetaData* f : vstorage.LevelFiles(0)) { ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted); @@ -231,8 +332,8 @@ UniversalCompactionPicker::CalculateSortedRuns( // non-zero level, all the files should share the same being_compacted // value. // This assumption is only valid when - // mutable_cf_options.compaction_options_universal.allow_trivial_move is - // false + // mutable_cf_options.compaction_options_universal.allow_trivial_move + // is false assert(is_first || f->being_compacted == being_compacted); } if (is_first) { @@ -250,65 +351,59 @@ UniversalCompactionPicker::CalculateSortedRuns( // Universal style of compaction. Pick files that are contiguous in // time-range to compact. -Compaction* UniversalCompactionPicker::PickCompaction( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, LogBuffer* log_buffer) { +Compaction* UniversalCompactionBuilder::PickCompaction() { const int kLevel0 = 0; - double score = vstorage->CompactionScore(kLevel0); - std::vector sorted_runs = - CalculateSortedRuns(*vstorage, ioptions_, mutable_cf_options); - - if (sorted_runs.size() == 0 || - (vstorage->FilesMarkedForCompaction().empty() && - sorted_runs.size() < (unsigned int)mutable_cf_options - .level0_file_num_compaction_trigger)) { - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: nothing to do\n", - cf_name.c_str()); - TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", - nullptr); + score_ = vstorage_->CompactionScore(kLevel0); + sorted_runs_ = + CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_); + + if (sorted_runs_.size() == 0 || + (vstorage_->FilesMarkedForCompaction().empty() && + sorted_runs_.size() < (unsigned int)mutable_cf_options_ + .level0_file_num_compaction_trigger)) { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n", + cf_name_.c_str()); + TEST_SYNC_POINT_CALLBACK( + "UniversalCompactionBuilder::PickCompaction:Return", nullptr); return nullptr; } VersionStorageInfo::LevelSummaryStorage tmp; ROCKS_LOG_BUFFER_MAX_SZ( - log_buffer, 3072, + log_buffer_, 3072, "[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n", - cf_name.c_str(), sorted_runs.size(), vstorage->LevelSummary(&tmp)); + cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp)); // Check for size amplification first. Compaction* c = nullptr; - if (sorted_runs.size() >= + if (sorted_runs_.size() >= static_cast( - mutable_cf_options.level0_file_num_compaction_trigger)) { - if ((c = PickCompactionToReduceSizeAmp(cf_name, mutable_cf_options, - vstorage, score, sorted_runs, - log_buffer)) != nullptr) { - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: compacting for size amp\n", - cf_name.c_str()); + mutable_cf_options_.level0_file_num_compaction_trigger)) { + if ((c = PickCompactionToReduceSizeAmp()) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for size amp\n", + cf_name_.c_str()); } else { // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. unsigned int ratio = - mutable_cf_options.compaction_options_universal.size_ratio; + mutable_cf_options_.compaction_options_universal.size_ratio; - if ((c = PickCompactionToReduceSortedRuns( - cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX, - sorted_runs, log_buffer)) != nullptr) { - ROCKS_LOG_BUFFER(log_buffer, + if ((c = PickCompactionToReduceSortedRuns(ratio, UINT_MAX)) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for size ratio\n", - cf_name.c_str()); + cf_name_.c_str()); } else { // Size amplification and file size ratios are within configured limits. // If max read amplification is exceeding configured limits, then force // compaction without looking at filesize ratios and try to reduce // the number of files to fewer than level0_file_num_compaction_trigger. // This is guaranteed by NeedsCompaction() - assert(sorted_runs.size() >= + assert(sorted_runs_.size() >= static_cast( - mutable_cf_options.level0_file_num_compaction_trigger)); + mutable_cf_options_.level0_file_num_compaction_trigger)); // Get the total number of sorted runs that are not being compacted int num_sr_not_compacted = 0; - for (size_t i = 0; i < sorted_runs.size(); i++) { - if (sorted_runs[i].being_compacted == false) { + for (size_t i = 0; i < sorted_runs_.size(); i++) { + if (sorted_runs_[i].being_compacted == false) { num_sr_not_compacted++; } } @@ -316,16 +411,15 @@ Compaction* UniversalCompactionPicker::PickCompaction( // The number of sorted runs that are not being compacted is greater // than the maximum allowed number of sorted runs if (num_sr_not_compacted > - mutable_cf_options.level0_file_num_compaction_trigger) { + mutable_cf_options_.level0_file_num_compaction_trigger) { unsigned int num_files = num_sr_not_compacted - - mutable_cf_options.level0_file_num_compaction_trigger + 1; - if ((c = PickCompactionToReduceSortedRuns( - cf_name, mutable_cf_options, vstorage, score, UINT_MAX, - num_files, sorted_runs, log_buffer)) != nullptr) { - ROCKS_LOG_BUFFER(log_buffer, + mutable_cf_options_.level0_file_num_compaction_trigger + 1; + if ((c = PickCompactionToReduceSortedRuns(UINT_MAX, num_files)) != + nullptr) { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for file num -- %u\n", - cf_name.c_str(), num_files); + cf_name_.c_str(), num_files); } } } @@ -333,22 +427,20 @@ Compaction* UniversalCompactionPicker::PickCompaction( } if (c == nullptr) { - if ((c = PickDeleteTriggeredCompaction(cf_name, mutable_cf_options, - vstorage, score, sorted_runs, - log_buffer)) != nullptr) { - ROCKS_LOG_BUFFER(log_buffer, + if ((c = PickDeleteTriggeredCompaction()) != nullptr) { + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: delete triggered compaction\n", - cf_name.c_str()); + cf_name_.c_str()); } } if (c == nullptr) { - TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", - nullptr); + TEST_SYNC_POINT_CALLBACK( + "UniversalCompactionBuilder::PickCompaction:Return", nullptr); return nullptr; } - if (mutable_cf_options.compaction_options_universal.allow_trivial_move == + if (mutable_cf_options_.compaction_options_universal.allow_trivial_move == true) { c->set_is_trivial_move(IsInputFilesNonOverlapping(c)); } @@ -394,15 +486,15 @@ Compaction* UniversalCompactionPicker::PickCompaction( RecordInHistogram(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION, c->inputs(0)->size()); - RegisterCompaction(c); - vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options); + picker_->RegisterCompaction(c); + vstorage_->ComputeCompactionScore(ioptions_, mutable_cf_options_); - TEST_SYNC_POINT_CALLBACK("UniversalCompactionPicker::PickCompaction:Return", + TEST_SYNC_POINT_CALLBACK("UniversalCompactionBuilder::PickCompaction:Return", c); return c; } -uint32_t UniversalCompactionPicker::GetPathId( +uint32_t UniversalCompactionBuilder::GetPathId( const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, uint64_t file_size) { // Two conditions need to be satisfied: @@ -440,15 +532,12 @@ uint32_t UniversalCompactionPicker::GetPathId( // Consider compaction files based on their size differences with // the next file in time order. // -Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, unsigned int ratio, - unsigned int max_number_of_files_to_compact, - const std::vector& sorted_runs, LogBuffer* log_buffer) { +Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( + unsigned int ratio, unsigned int max_number_of_files_to_compact) { unsigned int min_merge_width = - mutable_cf_options.compaction_options_universal.min_merge_width; + mutable_cf_options_.compaction_options_universal.min_merge_width; unsigned int max_merge_width = - mutable_cf_options.compaction_options_universal.max_merge_width; + mutable_cf_options_.compaction_options_universal.max_merge_width; const SortedRun* sr = nullptr; bool done = false; @@ -462,16 +551,16 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( // Caller checks the size before executing this function. This invariant is // important because otherwise we may have a possible integer underflow when // dealing with unsigned types. - assert(sorted_runs.size() > 0); + assert(sorted_runs_.size() > 0); // Considers a candidate file only if it is smaller than the // total size accumulated so far. - for (size_t loop = 0; loop < sorted_runs.size(); loop++) { + for (size_t loop = 0; loop < sorted_runs_.size(); loop++) { candidate_count = 0; // Skip files that are already being compacted - for (sr = nullptr; loop < sorted_runs.size(); loop++) { - sr = &sorted_runs[loop]; + for (sr = nullptr; loop < sorted_runs_.size(); loop++) { + sr = &sorted_runs_[loop]; if (!sr->being_compacted) { candidate_count = 1; @@ -479,10 +568,10 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( } char file_num_buf[kFormatFileNumberBufSize]; sr->Dump(file_num_buf, sizeof(file_num_buf)); - ROCKS_LOG_BUFFER(log_buffer, + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s" "[%d] being compacted, skipping", - cf_name.c_str(), file_num_buf, loop); + cf_name_.c_str(), file_num_buf, loop); sr = nullptr; } @@ -493,15 +582,16 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( if (sr != nullptr) { char file_num_buf[kFormatFileNumberBufSize]; sr->Dump(file_num_buf, sizeof(file_num_buf), true); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Possible candidate %s[%d].", - cf_name.c_str(), file_num_buf, loop); + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: Possible candidate %s[%d].", + cf_name_.c_str(), file_num_buf, loop); } // Check if the succeeding files need compaction. for (size_t i = loop + 1; - candidate_count < max_files_to_compact && i < sorted_runs.size(); + candidate_count < max_files_to_compact && i < sorted_runs_.size(); i++) { - const SortedRun* succeeding_sr = &sorted_runs[i]; + const SortedRun* succeeding_sr = &sorted_runs_[i]; if (succeeding_sr->being_compacted) { break; } @@ -515,7 +605,7 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( if (sz < static_cast(succeeding_sr->size)) { break; } - if (mutable_cf_options.compaction_options_universal.stop_style == + if (mutable_cf_options_.compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) { // Similar-size stopping rule: also check the last picked file isn't // far larger than the next candidate file. @@ -541,12 +631,12 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( break; } else { for (size_t i = loop; - i < loop + candidate_count && i < sorted_runs.size(); i++) { - const SortedRun* skipping_sr = &sorted_runs[i]; + i < loop + candidate_count && i < sorted_runs_.size(); i++) { + const SortedRun* skipping_sr = &sorted_runs_[i]; char file_num_buf[256]; skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Skipping %s", - cf_name.c_str(), file_num_buf); + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Skipping %s", + cf_name_.c_str(), file_num_buf); } } } @@ -558,16 +648,16 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( // size ratio of compression. bool enable_compression = true; int ratio_to_compress = - mutable_cf_options.compaction_options_universal.compression_size_percent; + mutable_cf_options_.compaction_options_universal.compression_size_percent; if (ratio_to_compress >= 0) { uint64_t total_size = 0; - for (auto& sorted_run : sorted_runs) { + for (auto& sorted_run : sorted_runs_) { total_size += sorted_run.compensated_file_size; } uint64_t older_file_size = 0; - for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) { - older_file_size += sorted_runs[i].size; + for (size_t i = sorted_runs_.size() - 1; i >= first_index_after; i--) { + older_file_size += sorted_runs_[i].size; if (older_file_size * 100L >= total_size * (long)ratio_to_compress) { enable_compression = false; break; @@ -577,46 +667,46 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( uint64_t estimated_total_size = 0; for (unsigned int i = 0; i < first_index_after; i++) { - estimated_total_size += sorted_runs[i].size; + estimated_total_size += sorted_runs_[i].size; } uint32_t path_id = - GetPathId(ioptions_, mutable_cf_options, estimated_total_size); - int start_level = sorted_runs[start_index].level; + GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); + int start_level = sorted_runs_[start_index].level; int output_level; - if (first_index_after == sorted_runs.size()) { - output_level = vstorage->num_levels() - 1; - } else if (sorted_runs[first_index_after].level == 0) { + if (first_index_after == sorted_runs_.size()) { + output_level = vstorage_->num_levels() - 1; + } else if (sorted_runs_[first_index_after].level == 0) { output_level = 0; } else { - output_level = sorted_runs[first_index_after].level - 1; + output_level = sorted_runs_[first_index_after].level - 1; } // last level is reserved for the files ingested behind if (ioptions_.allow_ingest_behind && - (output_level == vstorage->num_levels() - 1)) { + (output_level == vstorage_->num_levels() - 1)) { assert(output_level > 1); output_level--; } - std::vector inputs(vstorage->num_levels()); + std::vector inputs(vstorage_->num_levels()); for (size_t i = 0; i < inputs.size(); ++i) { inputs[i].level = start_level + static_cast(i); } for (size_t i = start_index; i < first_index_after; i++) { - auto& picking_sr = sorted_runs[i]; + auto& picking_sr = sorted_runs_[i]; if (picking_sr.level == 0) { FileMetaData* picking_file = picking_sr.file; inputs[0].files.push_back(picking_file); } else { auto& files = inputs[picking_sr.level - start_level].files; - for (auto* f : vstorage->LevelFiles(picking_sr.level)) { + for (auto* f : vstorage_->LevelFiles(picking_sr.level)) { files.push_back(f); } } char file_num_buf[256]; picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(), - file_num_buf); + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Picking %s", + cf_name_.c_str(), file_num_buf); } CompactionReason compaction_reason; @@ -626,16 +716,17 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( compaction_reason = CompactionReason::kUniversalSortedRunNum; } return new Compaction( - vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level, - MaxFileSizeForLevel(mutable_cf_options, output_level, + vstorage_, ioptions_, mutable_cf_options_, std::move(inputs), + output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level, + GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level, 1, enable_compression), - GetCompressionOptions(ioptions_, vstorage, start_level, + GetCompressionOptions(ioptions_, vstorage_, start_level, enable_compression), /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score, false /* deletion_compaction */, compaction_reason); + score_, false /* deletion_compaction */, compaction_reason); } // Look at overall size amplification. If size amplification @@ -644,12 +735,9 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns( // base file (overrides configured values of file-size ratios, // min_merge_width and max_merge_width). // -Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, - const std::vector& sorted_runs, LogBuffer* log_buffer) { +Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() { // percentage flexibility while reducing size amplification - uint64_t ratio = mutable_cf_options.compaction_options_universal + uint64_t ratio = mutable_cf_options_.compaction_options_universal .max_size_amplification_percent; unsigned int candidate_count = 0; @@ -657,21 +745,23 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp( size_t start_index = 0; const SortedRun* sr = nullptr; - if (sorted_runs.back().being_compacted) { + assert(!sorted_runs_.empty()); + if (sorted_runs_.back().being_compacted) { return nullptr; } // Skip files that are already being compacted - for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) { - sr = &sorted_runs[loop]; + for (size_t loop = 0; loop < sorted_runs_.size() - 1; loop++) { + sr = &sorted_runs_[loop]; if (!sr->being_compacted) { start_index = loop; // Consider this as the first candidate. break; } char file_num_buf[kFormatFileNumberBufSize]; sr->Dump(file_num_buf, sizeof(file_num_buf), true); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s", - cf_name.c_str(), file_num_buf, loop, + ROCKS_LOG_BUFFER(log_buffer_, + "[%s] Universal: skipping %s[%d] compacted %s", + cf_name_.c_str(), file_num_buf, loop, " cannot be a candidate to reduce size amp.\n"); sr = nullptr; } @@ -683,20 +773,20 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp( char file_num_buf[kFormatFileNumberBufSize]; sr->Dump(file_num_buf, sizeof(file_num_buf), true); ROCKS_LOG_BUFFER( - log_buffer, + log_buffer_, "[%s] Universal: First candidate %s[%" ROCKSDB_PRIszt "] %s", - cf_name.c_str(), file_num_buf, start_index, " to reduce size amp.\n"); + cf_name_.c_str(), file_num_buf, start_index, " to reduce size amp.\n"); } // keep adding up all the remaining files - for (size_t loop = start_index; loop < sorted_runs.size() - 1; loop++) { - sr = &sorted_runs[loop]; + for (size_t loop = start_index; loop < sorted_runs_.size() - 1; loop++) { + sr = &sorted_runs_[loop]; if (sr->being_compacted) { char file_num_buf[kFormatFileNumberBufSize]; sr->Dump(file_num_buf, sizeof(file_num_buf), true); ROCKS_LOG_BUFFER( - log_buffer, "[%s] Universal: Possible candidate %s[%d] %s", - cf_name.c_str(), file_num_buf, start_index, + log_buffer_, "[%s] Universal: Possible candidate %s[%d] %s", + cf_name_.c_str(), file_num_buf, start_index, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -708,58 +798,58 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp( } // size of earliest file - uint64_t earliest_file_size = sorted_runs.back().size; + uint64_t earliest_file_size = sorted_runs_.back().size; // size amplification = percentage of additional size if (candidate_size * 100 < ratio * earliest_file_size) { ROCKS_LOG_BUFFER( - log_buffer, + log_buffer_, "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64 " earliest-file-size %" PRIu64, - cf_name.c_str(), candidate_size, earliest_file_size); + cf_name_.c_str(), candidate_size, earliest_file_size); return nullptr; } else { ROCKS_LOG_BUFFER( - log_buffer, + log_buffer_, "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64 " earliest-file-size %" PRIu64, - cf_name.c_str(), candidate_size, earliest_file_size); + cf_name_.c_str(), candidate_size, earliest_file_size); } - assert(start_index < sorted_runs.size() - 1); + assert(start_index < sorted_runs_.size() - 1); // Estimate total file size uint64_t estimated_total_size = 0; - for (size_t loop = start_index; loop < sorted_runs.size(); loop++) { - estimated_total_size += sorted_runs[loop].size; + for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) { + estimated_total_size += sorted_runs_[loop].size; } uint32_t path_id = - GetPathId(ioptions_, mutable_cf_options, estimated_total_size); - int start_level = sorted_runs[start_index].level; + GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); + int start_level = sorted_runs_[start_index].level; - std::vector inputs(vstorage->num_levels()); + std::vector inputs(vstorage_->num_levels()); for (size_t i = 0; i < inputs.size(); ++i) { inputs[i].level = start_level + static_cast(i); } // We always compact all the files, so always compress. - for (size_t loop = start_index; loop < sorted_runs.size(); loop++) { - auto& picking_sr = sorted_runs[loop]; + for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) { + auto& picking_sr = sorted_runs_[loop]; if (picking_sr.level == 0) { FileMetaData* f = picking_sr.file; inputs[0].files.push_back(f); } else { auto& files = inputs[picking_sr.level - start_level].files; - for (auto* f : vstorage->LevelFiles(picking_sr.level)) { + for (auto* f : vstorage_->LevelFiles(picking_sr.level)) { files.push_back(f); } } char file_num_buf[256]; picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); - ROCKS_LOG_BUFFER(log_buffer, "[%s] Universal: size amp picking %s", - cf_name.c_str(), file_num_buf); + ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: size amp picking %s", + cf_name_.c_str(), file_num_buf); } // output files at the bottom most level, unless it's reserved - int output_level = vstorage->num_levels() - 1; + int output_level = vstorage_->num_levels() - 1; // last level is reserved for the files ingested behind if (ioptions_.allow_ingest_behind) { assert(output_level > 1); @@ -767,29 +857,27 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp( } return new Compaction( - vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level, - MaxFileSizeForLevel(mutable_cf_options, output_level, + vstorage_, ioptions_, mutable_cf_options_, std::move(inputs), + output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, - 1), - GetCompressionOptions(ioptions_, vstorage, output_level), + GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, + output_level, 1), + GetCompressionOptions(ioptions_, vstorage_, output_level), /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score, false /* deletion_compaction */, + score_, false /* deletion_compaction */, CompactionReason::kUniversalSizeAmplification); } // Pick files marked for compaction. Typically, files are marked by // CompactOnDeleteCollector due to the presence of tombstones. -Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, - const std::vector& /*sorted_runs*/, LogBuffer* /*log_buffer*/) { +Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { CompactionInputFiles start_level_inputs; int output_level; std::vector inputs; - if (vstorage->num_levels() == 1) { + if (vstorage_->num_levels() == 1) { // This is single level universal. Since we're basically trying to reclaim // space by processing files marked for compaction due to high tombstone // density, let's do the same thing as compaction to reduce size amp which @@ -799,7 +887,7 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( start_level_inputs.level = 0; start_level_inputs.files.clear(); output_level = 0; - for (FileMetaData* f : vstorage->LevelFiles(0)) { + for (FileMetaData* f : vstorage_->LevelFiles(0)) { if (f->marked_for_compaction) { compact = true; } @@ -818,24 +906,24 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( // For multi-level universal, the strategy is to make this look more like // leveled. We pick one of the files marked for compaction and compact with // overlapping files in the adjacent level. - PickFilesMarkedForCompaction(cf_name, vstorage, &start_level, &output_level, - &start_level_inputs); + picker_->PickFilesMarkedForCompaction(cf_name_, vstorage_, &start_level, + &output_level, &start_level_inputs); if (start_level_inputs.empty()) { return nullptr; } // Pick the first non-empty level after the start_level - for (output_level = start_level + 1; output_level < vstorage->num_levels(); + for (output_level = start_level + 1; output_level < vstorage_->num_levels(); output_level++) { - if (vstorage->NumLevelFiles(output_level) != 0) { + if (vstorage_->NumLevelFiles(output_level) != 0) { break; } } // If all higher levels are empty, pick the highest level as output level - if (output_level == vstorage->num_levels()) { + if (output_level == vstorage_->num_levels()) { if (start_level == 0) { - output_level = vstorage->num_levels() - 1; + output_level = vstorage_->num_levels() - 1; } else { // If start level is non-zero and all higher levels are empty, this // compaction will translate into a trivial move. Since the idea is @@ -845,15 +933,15 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( } } if (ioptions_.allow_ingest_behind && - output_level == vstorage->num_levels() - 1) { + output_level == vstorage_->num_levels() - 1) { assert(output_level > 1); output_level--; } if (output_level != 0) { if (start_level == 0) { - if (!GetOverlappingL0Files(vstorage, &start_level_inputs, output_level, - nullptr)) { + if (!picker_->GetOverlappingL0Files(vstorage_, &start_level_inputs, + output_level, nullptr)) { return nullptr; } } @@ -862,16 +950,16 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( int parent_index = -1; output_level_inputs.level = output_level; - if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, - &start_level_inputs, &output_level_inputs, - &parent_index, -1)) { + if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_, + &start_level_inputs, &output_level_inputs, + &parent_index, -1)) { return nullptr; } inputs.push_back(start_level_inputs); if (!output_level_inputs.empty()) { inputs.push_back(output_level_inputs); } - if (FilesRangeOverlapWithCompaction(inputs, output_level)) { + if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) { return nullptr; } } else { @@ -881,21 +969,22 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction( uint64_t estimated_total_size = 0; // Use size of the output level as estimated file size - for (FileMetaData* f : vstorage->LevelFiles(output_level)) { + for (FileMetaData* f : vstorage_->LevelFiles(output_level)) { estimated_total_size += f->fd.GetFileSize(); } uint32_t path_id = - GetPathId(ioptions_, mutable_cf_options, estimated_total_size); + GetPathId(ioptions_, mutable_cf_options_, estimated_total_size); return new Compaction( - vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level, - MaxFileSizeForLevel(mutable_cf_options, output_level, + vstorage_, ioptions_, mutable_cf_options_, std::move(inputs), + output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, - 1), - GetCompressionOptions(ioptions_, vstorage, output_level), + GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, + output_level, 1), + GetCompressionOptions(ioptions_, vstorage_, output_level), /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true, - score, false /* deletion_compaction */, + score_, false /* deletion_compaction */, CompactionReason::kFilesMarkedForCompaction); } } // namespace rocksdb diff --git a/db/compaction/compaction_picker_universal.h b/db/compaction/compaction_picker_universal.h index 2c44735d9..28f3a63cd 100644 --- a/db/compaction/compaction_picker_universal.h +++ b/db/compaction/compaction_picker_universal.h @@ -27,72 +27,6 @@ class UniversalCompactionPicker : public CompactionPicker { virtual bool NeedsCompaction( const VersionStorageInfo* vstorage) const override; - - private: - struct SortedRun { - SortedRun(int _level, FileMetaData* _file, uint64_t _size, - uint64_t _compensated_file_size, bool _being_compacted) - : level(_level), - file(_file), - size(_size), - compensated_file_size(_compensated_file_size), - being_compacted(_being_compacted) { - assert(compensated_file_size > 0); - assert(level != 0 || file != nullptr); - } - - void Dump(char* out_buf, size_t out_buf_size, - bool print_path = false) const; - - // sorted_run_count is added into the string to print - void DumpSizeInfo(char* out_buf, size_t out_buf_size, - size_t sorted_run_count) const; - - int level; - // `file` Will be null for level > 0. For level = 0, the sorted run is - // for this file. - FileMetaData* file; - // For level > 0, `size` and `compensated_file_size` are sum of sizes all - // files in the level. `being_compacted` should be the same for all files - // in a non-zero level. Use the value here. - uint64_t size; - uint64_t compensated_file_size; - bool being_compacted; - }; - - // Pick Universal compaction to limit read amplification - Compaction* PickCompactionToReduceSortedRuns( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, unsigned int ratio, - unsigned int num_files, const std::vector& sorted_runs, - LogBuffer* log_buffer); - - // Pick Universal compaction to limit space amplification. - Compaction* PickCompactionToReduceSizeAmp( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, - const std::vector& sorted_runs, LogBuffer* log_buffer); - - Compaction* PickDeleteTriggeredCompaction( - const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, - const std::vector& sorted_runs, LogBuffer* log_buffer); - - // Used in universal compaction when the enabled_trivial_move - // option is set. Checks whether there are any overlapping files - // in the input. Returns true if the input files are non - // overlapping. - bool IsInputFilesNonOverlapping(Compaction* c); - - static std::vector CalculateSortedRuns( - const VersionStorageInfo& vstorage, const ImmutableCFOptions& ioptions, - const MutableCFOptions& mutable_cf_options); - - // Pick a path ID to place a newly generated file, with its estimated file - // size. - static uint32_t GetPathId(const ImmutableCFOptions& ioptions, - const MutableCFOptions& mutable_cf_options, - uint64_t file_size); }; } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 524892e9e..4f3ddef10 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -397,7 +397,7 @@ TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionSizeAmplification) { int total_picked_compactions = 0; int total_size_amp_compactions = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) { + "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) { if (arg) { total_picked_compactions++; Compaction* c = static_cast(arg); @@ -478,7 +478,7 @@ TEST_P(DBTestUniversalCompaction, DynamicUniversalCompactionReadAmplification) { int total_picked_compactions = 0; int total_size_ratio_compactions = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) { + "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) { if (arg) { total_picked_compactions++; Compaction* c = static_cast(arg); @@ -837,14 +837,14 @@ TEST_P(DBTestUniversalCompactionParallel, PickByFileNumberBug) { rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"DBTestUniversalCompactionParallel::PickByFileNumberBug:0", "BackgroundCallCompaction:0"}, - {"UniversalCompactionPicker::PickCompaction:Return", + {"UniversalCompactionBuilder::PickCompaction:Return", "DBTestUniversalCompactionParallel::PickByFileNumberBug:1"}, {"DBTestUniversalCompactionParallel::PickByFileNumberBug:2", "CompactionJob::Run():Start"}}); int total_picked_compactions = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "UniversalCompactionPicker::PickCompaction:Return", [&](void* arg) { + "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) { if (arg) { total_picked_compactions++; }