From c66b4429ff4fb9d0791f515655b79c7551e9a1e2 Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 20 Oct 2021 10:03:03 -0700 Subject: [PATCH] Incremental Space Amp Compactions in Universal Style (#8655) Summary: This commit introduces incremental compaction in univeral style for space amplification. This follows the first improvement mentioned in https://rocksdb.org/blog/2021/04/12/universal-improvements.html . The implemention simply picks up files about size of max_compaction_bytes to compact and execute if the penalty is not too big. More optimizations can be done in the future, e.g. prioritizing between this compaction and other types. But for now, the feature is supposed to be functional and can often reduce frequency of full compactions, although it can introduce penalty. In order to add cut files more efficiently so that more files from upper levels can be included, SST file cutting threshold (for current file + overlapping parent level files) is set to 1.5X of target file size. A 2MB target file size will generate files like this: https://gist.github.com/siying/29d2676fba417404f3c95e6c013c7de8 Number of files indeed increases but it is not out of control. Two set of write benchmarks are run: 1. For ingestion rate limited scenario, we can see full compaction is mostly eliminated: https://gist.github.com/siying/959bc1186066906831cf4c808d6e0a19 . The write amp increased from 7.7 to 9.4, as expected. After applying file cutting, the number is improved to 8.9. In another benchmark, the write amp is even better with the incremental approach: https://gist.github.com/siying/d1c16c286d7c59c4d7bba718ca198163 2. For ingestion rate unlimited scenario, incremental compaction turns out to be too expensive most of the time and is not executed, as expected. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8655 Test Plan: Add unit tests to the functionality. Reviewed By: ajkr Differential Revision: D31787034 fbshipit-source-id: ce813e63b15a61d5a56e97bf8902a1b28e011beb --- HISTORY.md | 1 + db/compaction/compaction_job.cc | 6 +- db/compaction/compaction_picker_test.cc | 215 ++++++++++++++ db/compaction/compaction_picker_universal.cc | 287 ++++++++++++++++++- db/db_compaction_test.cc | 5 +- db/db_range_del_test.cc | 31 +- include/rocksdb/universal_compaction.h | 10 +- options/cf_options.cc | 6 + tools/db_bench_tool.cc | 5 + 9 files changed, 550 insertions(+), 16 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index eba52ad3e..596639036 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -27,6 +27,7 @@ * Add `file_temperature` to `IngestExternalFileArg` such that when ingesting SST files, we are able to indicate the temperature of the this batch of files. * If `DB::Close()` failed with a non aborted status, calling `DB::Close()` again will return the original status instead of Status::OK. * Add CacheTier to advanced_options.h to describe the cache tier we used. Add a `lowest_used_cache_tier` option to `DBOptions` (immutable) and pass it to BlockBasedTableReader. By default it is `CacheTier::kNonVolatileBlockTier`, which means, we always use both block cache (kVolatileTier) and secondary cache (kNonVolatileBlockTier). By set it to `CacheTier::kVolatileTier`, the DB will not use the secondary cache. +* Even when options.max_compaction_bytes is hit, compaction output files are only cut when it aligns with grandparent files' boundaries. options.max_compaction_bytes could be slightly violated with the change, but the violation is no more than one target SST file size, which is usually much smaller. ### Performance Improvements * Improved CPU efficiency of building block-based table (SST) files (#9039 and #9040). diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index d36cf8ab5..5c6fb5d75 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -219,6 +219,7 @@ struct CompactionJob::SubcompactionState { &compaction->column_family_data()->internal_comparator(); const std::vector& grandparents = compaction->grandparents(); + bool grandparant_file_switched = false; // Scan to find earliest grandparent file that contains key. while (grandparent_index < grandparents.size() && icmp->Compare(internal_key, @@ -226,6 +227,7 @@ struct CompactionJob::SubcompactionState { 0) { if (seen_key) { overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize(); + grandparant_file_switched = true; } assert(grandparent_index + 1 >= grandparents.size() || icmp->Compare( @@ -235,8 +237,8 @@ struct CompactionJob::SubcompactionState { } seen_key = true; - if (overlapped_bytes + curr_file_size > - compaction->max_compaction_bytes()) { + if (grandparant_file_switched && overlapped_bytes + curr_file_size > + compaction->max_compaction_bytes()) { // Too much overlap for current output; start new output overlapped_bytes = 0; return true; diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index bce3a2076..949a7eb41 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -733,6 +733,221 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction6) { ASSERT_EQ(4, compaction->output_level()); } +TEST_F(CompactionPickerTest, UniversalIncrementalSpace1) { + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 555555; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + Add(3, 5U, "310", "380", kFileSize, 0, 200, 251); + Add(3, 6U, "410", "880", kFileSize, 0, 200, 251); + Add(3, 7U, "910", "980", 1, 0, 200, 251); + Add(4, 10U, "201", "250", kFileSize, 0, 101, 150); + Add(4, 11U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 12U, "401", "450", kFileSize, 0, 101, 150); + Add(4, 13U, "501", "750", kFileSize, 0, 101, 150); + Add(4, 14U, "801", "850", kFileSize, 0, 101, 150); + Add(4, 15U, "901", "950", kFileSize, 0, 101, 150); + // Add(4, 15U, "960", "970", kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(3, compaction->start_level()); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(5U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(6U, compaction->input(0, 1)->fd.GetNumber()); + // ASSERT_EQ(4U, compaction->num_input_files(1)); + ASSERT_EQ(11U, compaction->input(1, 0)->fd.GetNumber()); + ASSERT_EQ(12U, compaction->input(1, 1)->fd.GetNumber()); + ASSERT_EQ(13U, compaction->input(1, 2)->fd.GetNumber()); + ASSERT_EQ(14U, compaction->input(1, 3)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, UniversalIncrementalSpace2) { + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 400000; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(1, 2U, "010", "080", kFileSize, 0, 200, 251); + Add(2, 5U, "310", "380", kFileSize, 0, 200, 251); + Add(2, 6U, "410", "880", kFileSize, 0, 200, 251); + Add(2, 7U, "910", "980", kFileSize, 0, 200, 251); + Add(4, 10U, "201", "250", kFileSize, 0, 101, 150); + Add(4, 11U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 12U, "401", "450", kFileSize, 0, 101, 150); + Add(4, 13U, "501", "750", kFileSize, 0, 101, 150); + Add(4, 14U, "801", "850", kFileSize, 0, 101, 150); + Add(4, 15U, "901", "950", kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(2, compaction->start_level()); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(1U, compaction->num_input_files(1)); + ASSERT_EQ(15U, compaction->input(1, 0)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, UniversalIncrementalSpace3) { + // Test bottom level files falling between gaps between two upper level + // files + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 300000; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + Add(3, 5U, "000", "180", kFileSize, 0, 200, 251); + Add(3, 6U, "181", "190", kFileSize, 0, 200, 251); + Add(3, 7U, "710", "810", kFileSize, 0, 200, 251); + Add(3, 8U, "820", "830", kFileSize, 0, 200, 251); + Add(3, 9U, "900", "991", kFileSize, 0, 200, 251); + Add(4, 10U, "201", "250", kFileSize, 0, 101, 150); + Add(4, 11U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 12U, "401", "450", kFileSize, 0, 101, 150); + Add(4, 13U, "501", "750", kFileSize, 0, 101, 150); + Add(4, 14U, "801", "850", kFileSize, 0, 101, 150); + Add(4, 15U, "901", "950", kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(2, compaction->start_level()); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->num_input_files(1)); + ASSERT_EQ(5U, compaction->input(1, 0)->fd.GetNumber()); + ASSERT_EQ(6U, compaction->input(1, 1)->fd.GetNumber()); + ASSERT_EQ(0, compaction->num_input_files(2)); +} + +TEST_F(CompactionPickerTest, UniversalIncrementalSpace4) { + // Test compaction candidates always cover many files. + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 3200000; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + + // Generate files like following: + // L3: (1101, 1180) (1201, 1280) ... (7901, 7908) + // L4: (1130, 1150) (1160, 1210) (1230, 1250) (1260 1310) ... (7960, 8010) + for (int i = 11; i < 79; i++) { + Add(3, 100 + i * 3, ToString(i * 100).c_str(), + ToString(i * 100 + 80).c_str(), kFileSize, 0, 200, 251); + // Add a tie breaker + if (i == 66) { + Add(3, 10000U, "6690", "6699", kFileSize, 0, 200, 251); + } + + Add(4, 100 + i * 3 + 1, ToString(i * 100 + 30).c_str(), + ToString(i * 100 + 50).c_str(), kFileSize, 0, 200, 251); + Add(4, 100 + i * 3 + 2, ToString(i * 100 + 60).c_str(), + ToString(i * 100 + 110).c_str(), kFileSize, 0, 200, 251); + } + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(3, compaction->start_level()); + ASSERT_EQ(6U, compaction->num_input_files(0)); + ASSERT_EQ(100 + 62U * 3, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(10000U, compaction->input(0, 5)->fd.GetNumber()); + ASSERT_EQ(11, compaction->num_input_files(1)); +} + +TEST_F(CompactionPickerTest, UniversalIncrementalSpace5) { + // Test compaction candidates always cover many files with some single + // files larger than size threshold. + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 3200000; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + + // Generate files like following: + // L3: (1101, 1180) (1201, 1280) ... (7901, 7908) + // L4: (1130, 1150) (1160, 1210) (1230, 1250) (1260 1310) ... (7960, 8010) + for (int i = 11; i < 70; i++) { + Add(3, 100 + i * 3, ToString(i * 100).c_str(), + ToString(i * 100 + 80).c_str(), + i % 10 == 9 ? kFileSize * 100 : kFileSize, 0, 200, 251); + + Add(4, 100 + i * 3 + 1, ToString(i * 100 + 30).c_str(), + ToString(i * 100 + 50).c_str(), kFileSize, 0, 200, 251); + Add(4, 100 + i * 3 + 2, ToString(i * 100 + 60).c_str(), + ToString(i * 100 + 110).c_str(), kFileSize, 0, 200, 251); + } + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(3, compaction->start_level()); + ASSERT_EQ(6U, compaction->num_input_files(0)); + ASSERT_EQ(100 + 14 * 3, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(100 + 19 * 3, compaction->input(0, 5)->fd.GetNumber()); + ASSERT_EQ(13, compaction->num_input_files(1)); +} + TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { NewVersionStorage(1, kCompactionStyleFIFO); const int kFileCount = diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index fa3e0f639..cac6fcc1c 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -89,6 +89,14 @@ class UniversalCompactionBuilder { // Pick Universal compaction to limit space amplification. Compaction* PickCompactionToReduceSizeAmp(); + // Try to pick incremental compaction to reduce space amplification. + // It will return null if it cannot find a fanout within the threshold. + // Fanout is defined as + // total size of files to compact at output level + // -------------------------------------------------- + // total size of files to compact at other levels + Compaction* PickIncrementalForReduceSizeAmp(double fanout_threshold); + Compaction* PickDeleteTriggeredCompaction(); // Form a compaction from the sorted run indicated by start_index to the @@ -110,6 +118,8 @@ class UniversalCompactionBuilder { // overlapping. bool IsInputFilesNonOverlapping(Compaction* c); + uint64_t GetMaxOverlappingBytes() const; + const ImmutableOptions& ioptions_; const InternalKeyComparator* icmp_; double score_; @@ -714,6 +724,19 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( cf_name_.c_str(), file_num_buf); } + std::vector grandparents; + // Include grandparents for potential file cutting in incremental + // mode. It is for aligning file cutting boundaries across levels, + // so that subsequent compactions can pick files with aligned + // buffer. + // Single files are only picked up in incremental mode, so that + // there is no need for full range. + if (mutable_cf_options_.compaction_options_universal.incremental && + first_index_after < sorted_runs_.size() && + sorted_runs_[first_index_after].level > 1) { + grandparents = vstorage_->LevelFiles(sorted_runs_[first_index_after].level); + } + CompactionReason compaction_reason; if (max_number_of_files_to_compact == UINT_MAX) { compaction_reason = CompactionReason::kUniversalSizeRatio; @@ -725,14 +748,14 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( std::move(inputs), output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), - LLONG_MAX, path_id, + GetMaxOverlappingBytes(), path_id, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level, 1, enable_compression), GetCompressionOptions(mutable_cf_options_, vstorage_, start_level, enable_compression), Temperature::kUnknown, - /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score_, false /* deletion_compaction */, compaction_reason); + /* max_subcompactions */ 0, grandparents, /* is manual */ false, score_, + false /* deletion_compaction */, compaction_reason); } // Look at overall size amplification. If size amplification @@ -788,6 +811,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() { for (size_t loop = start_index; loop + 1 < sorted_runs_.size(); loop++) { sr = &sorted_runs_[loop]; if (sr->being_compacted) { + // TODO with incremental compaction is supported, we might want to + // schedule some incremental compactions in parallel if needed. char file_num_buf[kFormatFileNumberBufSize]; sr->Dump(file_num_buf, sizeof(file_num_buf), true); ROCKS_LOG_BUFFER( @@ -821,16 +846,250 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() { " earliest-file-size %" PRIu64, cf_name_.c_str(), candidate_size, earliest_file_size); } + // Since incremental compaction can't include more than second last + // level, it can introduce penalty, compared to full compaction. We + // hard code the pentalty to be 80%. If we end up with a compaction + // fanout higher than 80% of full level compactions, we fall back + // to full level compaction. + // The 80% threshold is arbitrary and can be adjusted or made + // configurable in the future. + // This also prevent the case when compaction falls behind and we + // need to compact more levels for compactions to catch up. + if (mutable_cf_options_.compaction_options_universal.incremental) { + double fanout_threshold = static_cast(earliest_file_size) / + static_cast(candidate_size) * 1.8; + Compaction* picked = PickIncrementalForReduceSizeAmp(fanout_threshold); + if (picked != nullptr) { + // As the feature is still incremental, picking incremental compaction + // might fail and we will fall bck to compacting full level. + return picked; + } + } return PickCompactionToOldest(start_index, CompactionReason::kUniversalSizeAmplification); } +Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp( + double fanout_threshold) { + // Try find all potential compactions with total size just over + // options.max_compaction_size / 2, and take the one with the lowest + // fanout (defined in declaration of the function). + // This is done by having a sliding window of the files at the second + // lowest level, and keep expanding while finding overlapping in the + // last level. Once total size exceeds the size threshold, calculate + // the fanout value. And then shrinking from the small side of the + // window. Keep doing it until the end. + // Finally, we try to include upper level files if they fall into + // the range. + // + // Note that it is a similar problem as leveled compaction's + // kMinOverlappingRatio priority, but instead of picking single files + // we expand to a target compaction size. The reason is that in + // leveled compaction, actual fanout value tends to high, e.g. 10, so + // even with single file in down merging level, the extra size + // compacted in boundary files is at a lower ratio. But here users + // often have size of second last level size to be 1/4, 1/3 or even + // 1/2 of the bottommost level, so picking single file in second most + // level will cause significant waste, which is not desirable. + // + // This algorithm has lots of room to improve to pick more efficient + // compactions. + assert(sorted_runs_.size() >= 2); + int second_last_level = sorted_runs_[sorted_runs_.size() - 2].level; + if (second_last_level == 0) { + // Can't split Level 0. + return nullptr; + } + int output_level = sorted_runs_.back().level; + const std::vector& bottom_files = + vstorage_->LevelFiles(output_level); + const std::vector& files = + vstorage_->LevelFiles(second_last_level); + assert(!bottom_files.empty()); + assert(!files.empty()); + + // std::unordered_map file_to_order; + + int picked_start_idx = 0; + int picked_end_idx = 0; + double picked_fanout = fanout_threshold; + + // Use half target compaction bytes as anchor to stop growing second most + // level files, and reserve growing space for more overlapping bottom level, + // clean cut, files from other levels, etc. + uint64_t comp_thres_size = mutable_cf_options_.max_compaction_bytes / 2; + int start_idx = 0; + int bottom_end_idx = 0; + int bottom_start_idx = 0; + uint64_t non_bottom_size = 0; + uint64_t bottom_size = 0; + bool end_bottom_size_counted = false; + for (int end_idx = 0; end_idx < static_cast(files.size()); end_idx++) { + FileMetaData* end_file = files[end_idx]; + + // Include bottom most level files smaller than the current second + // last level file. + int num_skipped = 0; + while (bottom_end_idx < static_cast(bottom_files.size()) && + icmp_->Compare(bottom_files[bottom_end_idx]->largest, + end_file->smallest) < 0) { + if (!end_bottom_size_counted) { + bottom_size += bottom_files[bottom_end_idx]->fd.file_size; + } + bottom_end_idx++; + end_bottom_size_counted = false; + num_skipped++; + } + + if (num_skipped > 1) { + // At least a file in the bottom most level falls into the file gap. No + // reason to include the file. We cut the range and start a new sliding + // window. + start_idx = end_idx; + } + + if (start_idx == end_idx) { + // new sliding window. + non_bottom_size = 0; + bottom_size = 0; + bottom_start_idx = bottom_end_idx; + end_bottom_size_counted = false; + } + + non_bottom_size += end_file->fd.file_size; + + // Include all overlapping files in bottom level. + while (bottom_end_idx < static_cast(bottom_files.size()) && + icmp_->Compare(bottom_files[bottom_end_idx]->smallest, + end_file->largest) < 0) { + if (!end_bottom_size_counted) { + bottom_size += bottom_files[bottom_end_idx]->fd.file_size; + end_bottom_size_counted = true; + } + if (icmp_->Compare(bottom_files[bottom_end_idx]->largest, + end_file->largest) > 0) { + // next level file cross large boundary of current file. + break; + } + bottom_end_idx++; + end_bottom_size_counted = false; + } + + if ((non_bottom_size + bottom_size > comp_thres_size || + end_idx == static_cast(files.size()) - 1) && + non_bottom_size > 0) { // Do we alow 0 size file at all? + // If it is a better compaction, remember it in picked* variables. + double fanout = static_cast(bottom_size) / + static_cast(non_bottom_size); + if (fanout < picked_fanout) { + picked_start_idx = start_idx; + picked_end_idx = end_idx; + picked_fanout = fanout; + } + // Shrink from the start end to under comp_thres_size + while (non_bottom_size + bottom_size > comp_thres_size && + start_idx <= end_idx) { + non_bottom_size -= files[start_idx]->fd.file_size; + start_idx++; + if (start_idx < static_cast(files.size())) { + while (bottom_start_idx <= bottom_end_idx && + icmp_->Compare(bottom_files[bottom_start_idx]->largest, + files[start_idx]->smallest) < 0) { + bottom_size -= bottom_files[bottom_start_idx]->fd.file_size; + bottom_start_idx++; + } + } + } + } + } + + if (picked_fanout >= fanout_threshold) { + assert(picked_fanout == fanout_threshold); + return nullptr; + } + + std::vector inputs; + CompactionInputFiles bottom_level_inputs; + CompactionInputFiles second_last_level_inputs; + second_last_level_inputs.level = second_last_level; + bottom_level_inputs.level = output_level; + for (int i = picked_start_idx; i <= picked_end_idx; i++) { + if (files[i]->being_compacted) { + return nullptr; + } + second_last_level_inputs.files.push_back(files[i]); + } + assert(!second_last_level_inputs.empty()); + if (!picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &second_last_level_inputs, + /*next_smallest=*/nullptr)) { + return nullptr; + } + // We might be able to avoid this binary search if we save and expand + // from bottom_start_idx and bottom_end_idx, but for now, we use + // SetupOtherInputs() for simplicity. + int parent_index = -1; // Create and use bottom_start_idx? + if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_, + &second_last_level_inputs, + &bottom_level_inputs, &parent_index, + /*base_index=*/-1)) { + return nullptr; + } + + // Try to include files in upper levels if they fall into the range. + // Since we need to go from lower level up and this is in the reverse + // order, compared to level order, we first write to an reversed + // data structure and finally copy them to compaction inputs. + InternalKey smallest, largest; + picker_->GetRange(second_last_level_inputs, &smallest, &largest); + std::vector inputs_reverse; + for (auto it = ++(++sorted_runs_.rbegin()); it != sorted_runs_.rend(); it++) { + SortedRun& sr = *it; + if (sr.level == 0) { + break; + } + std::vector level_inputs; + vstorage_->GetCleanInputsWithinInterval(sr.level, &smallest, &largest, + &level_inputs); + if (!level_inputs.empty()) { + inputs_reverse.push_back({}); + inputs_reverse.back().level = sr.level; + inputs_reverse.back().files = level_inputs; + picker_->GetRange(inputs_reverse.back(), &smallest, &largest); + } + } + for (auto it = inputs_reverse.rbegin(); it != inputs_reverse.rend(); it++) { + inputs.push_back(*it); + } + + inputs.push_back(second_last_level_inputs); + inputs.push_back(bottom_level_inputs); + + // TODO support multi paths? + uint32_t path_id = 0; + return new Compaction( + vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_, + std::move(inputs), output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, + kCompactionStyleUniversal), + GetMaxOverlappingBytes(), path_id, + GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, + output_level, 1, true /* enable_compression */), + GetCompressionOptions(mutable_cf_options_, vstorage_, output_level, + true /* enable_compression */), + Temperature::kUnknown, + /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, + score_, false /* deletion_compaction */, + CompactionReason::kUniversalSizeAmplification); +} + // Pick files marked for compaction. Typically, files are marked by // CompactOnDeleteCollector due to the presence of tombstones. Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { CompactionInputFiles start_level_inputs; int output_level; std::vector inputs; + std::vector grandparents; if (vstorage_->num_levels() == 1) { // This is single level universal. Since we're basically trying to reclaim @@ -937,6 +1196,9 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) { return nullptr; } + + picker_->GetGrandparents(vstorage_, start_level_inputs, + output_level_inputs, &grandparents); } else { inputs.push_back(start_level_inputs); } @@ -954,13 +1216,13 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { std::move(inputs), output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), - /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, + /* max_grandparent_overlap_bytes */ GetMaxOverlappingBytes(), path_id, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, output_level, 1), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level), Temperature::kUnknown, - /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score_, false /* deletion_compaction */, + /* max_subcompactions */ 0, grandparents, /* is manual */ false, score_, + false /* deletion_compaction */, CompactionReason::kFilesMarkedForCompaction); } @@ -1028,7 +1290,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest( std::move(inputs), output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), - LLONG_MAX, path_id, + GetMaxOverlappingBytes(), path_id, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, output_level, 1, true /* enable_compression */), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level, @@ -1103,6 +1365,17 @@ Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() { return c; } + +uint64_t UniversalCompactionBuilder::GetMaxOverlappingBytes() const { + if (!mutable_cf_options_.compaction_options_universal.incremental) { + return port::kMaxUint64; + } else { + // Try to align cutting boundary with files at the next level if the + // file isn't end up with 1/2 of target size, or it would overlap + // with two full size files at the next level. + return mutable_cf_options_.target_file_size_base / 2 * 3; + } +} } // namespace ROCKSDB_NAMESPACE #endif // !ROCKSDB_LITE diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 24e9dbf4a..41d12edd4 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5255,6 +5255,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { generate_sst_func(); uint64_t total_size = (l1_avg_size * 10) + (l2_avg_size * 100); opts.max_compaction_bytes = total_size / num_split; + opts.target_file_size_base = total_size / num_split; Reopen(opts); num_compactions.store(0); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); @@ -5262,6 +5263,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { // very small max_compaction_bytes, it should still move forward opts.max_compaction_bytes = l1_avg_size / 2; + opts.target_file_size_base = l1_avg_size / 2; DestroyAndReopen(opts); generate_sst_func(); num_compactions.store(0); @@ -5275,7 +5277,8 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { generate_sst_func(); total_size = (l1_avg_size * 10) + (l2_avg_size * 100); Status s = db_->SetOptions( - {{"max_compaction_bytes", std::to_string(total_size / num_split)}}); + {{"max_compaction_bytes", std::to_string(total_size / num_split)}, + {"target_file_size_base", std::to_string(total_size / num_split)}}); ASSERT_OK(s); num_compactions.store(0); diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index fc2e05ad4..8967ddef5 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -169,17 +169,36 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) { opts.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile)); // Want max_compaction_bytes to trigger the end of compaction output file, not // target_file_size_base, so make the latter much bigger - opts.target_file_size_base = 100 * opts.max_compaction_bytes; + // opts.target_file_size_base = 100 * opts.max_compaction_bytes; + opts.target_file_size_base = 1; DestroyAndReopen(opts); // snapshot protects range tombstone from dropping due to becoming obsolete. const Snapshot* snapshot = db_->GetSnapshot(); + Random rnd(301); + + ASSERT_OK(Put(GetNumericStr(0), rnd.RandomString(kBytesPerVal))); + ASSERT_OK( + Put(GetNumericStr(kNumPerFile - 1), rnd.RandomString(kBytesPerVal))); + ASSERT_OK(Flush()); + ASSERT_OK(Put(GetNumericStr(kNumPerFile), rnd.RandomString(kBytesPerVal))); + ASSERT_OK( + Put(GetNumericStr(kNumPerFile * 2 - 1), rnd.RandomString(kBytesPerVal))); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + ASSERT_EQ(NumTableFilesAtLevel(2), 2); + + ASSERT_OK(db_->SetOptions( + db_->DefaultColumnFamily(), + {{"target_file_size_base", ToString(100 * opts.max_compaction_bytes)}})); + // It spans the whole key-range, thus will be included in all output files ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), GetNumericStr(0), GetNumericStr(kNumFiles * kNumPerFile - 1))); - Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { std::vector values; // Write 1MB (256 values, each 4K) @@ -193,11 +212,11 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) { ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); } - ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, - true /* disallow_trivial_move */)); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, + /*column_family=*/nullptr, + /*disallow_trivial_move=*/true)); ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_GE(NumTableFilesAtLevel(1), 2); - std::vector> files; dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files); @@ -1628,6 +1647,7 @@ TEST_F(DBRangeDelTest, OverlappedTombstones) { const int kNumPerFile = 4, kNumFiles = 2; Options options = CurrentOptions(); options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; options.max_compaction_bytes = 9 * 1024; DestroyAndReopen(options); Random rnd(301); @@ -1667,6 +1687,7 @@ TEST_F(DBRangeDelTest, OverlappedKeys) { const int kNumPerFile = 4, kNumFiles = 2; Options options = CurrentOptions(); options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; options.max_compaction_bytes = 9 * 1024; DestroyAndReopen(options); Random rnd(301); diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index 9a09d8f4e..2ac0ef1ed 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -74,6 +74,13 @@ class CompactionOptionsUniversal { // Default: false bool allow_trivial_move; + // EXPERIMENTAL + // If true, try to limit compaction size under max_compaction_bytes. + // This might cause higher write amplification, but can prevent some + // problem caused by large compactions. + // Default: false + bool incremental; + // Default set of parameters CompactionOptionsUniversal() : size_ratio(1), @@ -82,7 +89,8 @@ class CompactionOptionsUniversal { max_size_amplification_percent(200), compression_size_percent(-1), stop_style(kCompactionStopStyleTotalSize), - allow_trivial_move(false) {} + allow_trivial_move(false), + incremental(false) {} }; } // namespace ROCKSDB_NAMESPACE diff --git a/options/cf_options.cc b/options/cf_options.cc index 5767e759c..57c965cc3 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -213,6 +213,10 @@ static std::unordered_map {offsetof(class CompactionOptionsUniversal, stop_style), OptionType::kCompactionStopStyle, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"incremental", + {offsetof(class CompactionOptionsUniversal, incremental), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"allow_trivial_move", {offsetof(class CompactionOptionsUniversal, allow_trivial_move), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -1027,6 +1031,8 @@ void MutableCFOptions::Dump(Logger* log) const { ROCKS_LOG_INFO( log, "compaction_options_universal.allow_trivial_move : %d", static_cast(compaction_options_universal.allow_trivial_move)); + ROCKS_LOG_INFO(log, "compaction_options_universal.incremental : %d", + static_cast(compaction_options_universal.incremental)); // FIFO Compaction Options ROCKS_LOG_INFO(log, "compaction_options_fifo.max_table_files_size : %" PRIu64, diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 790db453f..9aebf7dd7 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -532,6 +532,9 @@ DEFINE_int32(universal_compression_size_percent, -1, DEFINE_bool(universal_allow_trivial_move, false, "Allow trivial move in universal compaction."); +DEFINE_bool(universal_incremental, false, + "Enable incremental compactions in universal compaction."); + DEFINE_int64(cache_size, 8 << 20, // 8MB "Number of bytes to use as a cache of uncompressed data"); @@ -4313,6 +4316,8 @@ class Benchmark { } options.compaction_options_universal.allow_trivial_move = FLAGS_universal_allow_trivial_move; + options.compaction_options_universal.incremental = + FLAGS_universal_incremental; if (FLAGS_thread_status_per_interval > 0) { options.enable_thread_tracking = true; }