diff --git a/HISTORY.md b/HISTORY.md index eba52ad3e..596639036 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -27,6 +27,7 @@ * Add `file_temperature` to `IngestExternalFileArg` such that when ingesting SST files, we are able to indicate the temperature of the this batch of files. * If `DB::Close()` failed with a non aborted status, calling `DB::Close()` again will return the original status instead of Status::OK. * Add CacheTier to advanced_options.h to describe the cache tier we used. Add a `lowest_used_cache_tier` option to `DBOptions` (immutable) and pass it to BlockBasedTableReader. By default it is `CacheTier::kNonVolatileBlockTier`, which means, we always use both block cache (kVolatileTier) and secondary cache (kNonVolatileBlockTier). By set it to `CacheTier::kVolatileTier`, the DB will not use the secondary cache. +* Even when options.max_compaction_bytes is hit, compaction output files are only cut when it aligns with grandparent files' boundaries. options.max_compaction_bytes could be slightly violated with the change, but the violation is no more than one target SST file size, which is usually much smaller. ### Performance Improvements * Improved CPU efficiency of building block-based table (SST) files (#9039 and #9040). diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index d36cf8ab5..5c6fb5d75 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -219,6 +219,7 @@ struct CompactionJob::SubcompactionState { &compaction->column_family_data()->internal_comparator(); const std::vector& grandparents = compaction->grandparents(); + bool grandparant_file_switched = false; // Scan to find earliest grandparent file that contains key. while (grandparent_index < grandparents.size() && icmp->Compare(internal_key, @@ -226,6 +227,7 @@ struct CompactionJob::SubcompactionState { 0) { if (seen_key) { overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize(); + grandparant_file_switched = true; } assert(grandparent_index + 1 >= grandparents.size() || icmp->Compare( @@ -235,8 +237,8 @@ struct CompactionJob::SubcompactionState { } seen_key = true; - if (overlapped_bytes + curr_file_size > - compaction->max_compaction_bytes()) { + if (grandparant_file_switched && overlapped_bytes + curr_file_size > + compaction->max_compaction_bytes()) { // Too much overlap for current output; start new output overlapped_bytes = 0; return true; diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index bce3a2076..949a7eb41 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -733,6 +733,221 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction6) { ASSERT_EQ(4, compaction->output_level()); } +TEST_F(CompactionPickerTest, UniversalIncrementalSpace1) { + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 555555; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + Add(3, 5U, "310", "380", kFileSize, 0, 200, 251); + Add(3, 6U, "410", "880", kFileSize, 0, 200, 251); + Add(3, 7U, "910", "980", 1, 0, 200, 251); + Add(4, 10U, "201", "250", kFileSize, 0, 101, 150); + Add(4, 11U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 12U, "401", "450", kFileSize, 0, 101, 150); + Add(4, 13U, "501", "750", kFileSize, 0, 101, 150); + Add(4, 14U, "801", "850", kFileSize, 0, 101, 150); + Add(4, 15U, "901", "950", kFileSize, 0, 101, 150); + // Add(4, 15U, "960", "970", kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(3, compaction->start_level()); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(5U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(6U, compaction->input(0, 1)->fd.GetNumber()); + // ASSERT_EQ(4U, compaction->num_input_files(1)); + ASSERT_EQ(11U, compaction->input(1, 0)->fd.GetNumber()); + ASSERT_EQ(12U, compaction->input(1, 1)->fd.GetNumber()); + ASSERT_EQ(13U, compaction->input(1, 2)->fd.GetNumber()); + ASSERT_EQ(14U, compaction->input(1, 3)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, UniversalIncrementalSpace2) { + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 400000; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(1, 2U, "010", "080", kFileSize, 0, 200, 251); + Add(2, 5U, "310", "380", kFileSize, 0, 200, 251); + Add(2, 6U, "410", "880", kFileSize, 0, 200, 251); + Add(2, 7U, "910", "980", kFileSize, 0, 200, 251); + Add(4, 10U, "201", "250", kFileSize, 0, 101, 150); + Add(4, 11U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 12U, "401", "450", kFileSize, 0, 101, 150); + Add(4, 13U, "501", "750", kFileSize, 0, 101, 150); + Add(4, 14U, "801", "850", kFileSize, 0, 101, 150); + Add(4, 15U, "901", "950", kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(2, compaction->start_level()); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(1U, compaction->num_input_files(1)); + ASSERT_EQ(15U, compaction->input(1, 0)->fd.GetNumber()); +} + +TEST_F(CompactionPickerTest, UniversalIncrementalSpace3) { + // Test bottom level files falling between gaps between two upper level + // files + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 300000; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + Add(3, 5U, "000", "180", kFileSize, 0, 200, 251); + Add(3, 6U, "181", "190", kFileSize, 0, 200, 251); + Add(3, 7U, "710", "810", kFileSize, 0, 200, 251); + Add(3, 8U, "820", "830", kFileSize, 0, 200, 251); + Add(3, 9U, "900", "991", kFileSize, 0, 200, 251); + Add(4, 10U, "201", "250", kFileSize, 0, 101, 150); + Add(4, 11U, "301", "350", kFileSize, 0, 101, 150); + Add(4, 12U, "401", "450", kFileSize, 0, 101, 150); + Add(4, 13U, "501", "750", kFileSize, 0, 101, 150); + Add(4, 14U, "801", "850", kFileSize, 0, 101, 150); + Add(4, 15U, "901", "950", kFileSize, 0, 101, 150); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(2, compaction->start_level()); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->num_input_files(1)); + ASSERT_EQ(5U, compaction->input(1, 0)->fd.GetNumber()); + ASSERT_EQ(6U, compaction->input(1, 1)->fd.GetNumber()); + ASSERT_EQ(0, compaction->num_input_files(2)); +} + +TEST_F(CompactionPickerTest, UniversalIncrementalSpace4) { + // Test compaction candidates always cover many files. + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 3200000; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + + // Generate files like following: + // L3: (1101, 1180) (1201, 1280) ... (7901, 7908) + // L4: (1130, 1150) (1160, 1210) (1230, 1250) (1260 1310) ... (7960, 8010) + for (int i = 11; i < 79; i++) { + Add(3, 100 + i * 3, ToString(i * 100).c_str(), + ToString(i * 100 + 80).c_str(), kFileSize, 0, 200, 251); + // Add a tie breaker + if (i == 66) { + Add(3, 10000U, "6690", "6699", kFileSize, 0, 200, 251); + } + + Add(4, 100 + i * 3 + 1, ToString(i * 100 + 30).c_str(), + ToString(i * 100 + 50).c_str(), kFileSize, 0, 200, 251); + Add(4, 100 + i * 3 + 2, ToString(i * 100 + 60).c_str(), + ToString(i * 100 + 110).c_str(), kFileSize, 0, 200, 251); + } + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(3, compaction->start_level()); + ASSERT_EQ(6U, compaction->num_input_files(0)); + ASSERT_EQ(100 + 62U * 3, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(10000U, compaction->input(0, 5)->fd.GetNumber()); + ASSERT_EQ(11, compaction->num_input_files(1)); +} + +TEST_F(CompactionPickerTest, UniversalIncrementalSpace5) { + // Test compaction candidates always cover many files with some single + // files larger than size threshold. + const uint64_t kFileSize = 100000; + + mutable_cf_options_.max_compaction_bytes = 3200000; + mutable_cf_options_.compaction_options_universal.incremental = true; + mutable_cf_options_.compaction_options_universal + .max_size_amplification_percent = 30; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + + NewVersionStorage(5, kCompactionStyleUniversal); + + Add(0, 1U, "150", "200", kFileSize, 0, 500, 550); + Add(2, 2U, "010", "080", kFileSize, 0, 200, 251); + + // Generate files like following: + // L3: (1101, 1180) (1201, 1280) ... (7901, 7908) + // L4: (1130, 1150) (1160, 1210) (1230, 1250) (1260 1310) ... (7960, 8010) + for (int i = 11; i < 70; i++) { + Add(3, 100 + i * 3, ToString(i * 100).c_str(), + ToString(i * 100 + 80).c_str(), + i % 10 == 9 ? kFileSize * 100 : kFileSize, 0, 200, 251); + + Add(4, 100 + i * 3 + 1, ToString(i * 100 + 30).c_str(), + ToString(i * 100 + 50).c_str(), kFileSize, 0, 200, 251); + Add(4, 100 + i * 3 + 2, ToString(i * 100 + 60).c_str(), + ToString(i * 100 + 110).c_str(), kFileSize, 0, 200, 251); + } + UpdateVersionStorageInfo(); + + std::unique_ptr compaction( + universal_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(), + &log_buffer_)); + ASSERT_TRUE(compaction); + ASSERT_EQ(4, compaction->output_level()); + ASSERT_EQ(3, compaction->start_level()); + ASSERT_EQ(6U, compaction->num_input_files(0)); + ASSERT_EQ(100 + 14 * 3, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(100 + 19 * 3, compaction->input(0, 5)->fd.GetNumber()); + ASSERT_EQ(13, compaction->num_input_files(1)); +} + TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { NewVersionStorage(1, kCompactionStyleFIFO); const int kFileCount = diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index fa3e0f639..cac6fcc1c 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -89,6 +89,14 @@ class UniversalCompactionBuilder { // Pick Universal compaction to limit space amplification. Compaction* PickCompactionToReduceSizeAmp(); + // Try to pick incremental compaction to reduce space amplification. + // It will return null if it cannot find a fanout within the threshold. + // Fanout is defined as + // total size of files to compact at output level + // -------------------------------------------------- + // total size of files to compact at other levels + Compaction* PickIncrementalForReduceSizeAmp(double fanout_threshold); + Compaction* PickDeleteTriggeredCompaction(); // Form a compaction from the sorted run indicated by start_index to the @@ -110,6 +118,8 @@ class UniversalCompactionBuilder { // overlapping. bool IsInputFilesNonOverlapping(Compaction* c); + uint64_t GetMaxOverlappingBytes() const; + const ImmutableOptions& ioptions_; const InternalKeyComparator* icmp_; double score_; @@ -714,6 +724,19 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( cf_name_.c_str(), file_num_buf); } + std::vector grandparents; + // Include grandparents for potential file cutting in incremental + // mode. It is for aligning file cutting boundaries across levels, + // so that subsequent compactions can pick files with aligned + // buffer. + // Single files are only picked up in incremental mode, so that + // there is no need for full range. + if (mutable_cf_options_.compaction_options_universal.incremental && + first_index_after < sorted_runs_.size() && + sorted_runs_[first_index_after].level > 1) { + grandparents = vstorage_->LevelFiles(sorted_runs_[first_index_after].level); + } + CompactionReason compaction_reason; if (max_number_of_files_to_compact == UINT_MAX) { compaction_reason = CompactionReason::kUniversalSizeRatio; @@ -725,14 +748,14 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns( std::move(inputs), output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), - LLONG_MAX, path_id, + GetMaxOverlappingBytes(), path_id, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level, 1, enable_compression), GetCompressionOptions(mutable_cf_options_, vstorage_, start_level, enable_compression), Temperature::kUnknown, - /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score_, false /* deletion_compaction */, compaction_reason); + /* max_subcompactions */ 0, grandparents, /* is manual */ false, score_, + false /* deletion_compaction */, compaction_reason); } // Look at overall size amplification. If size amplification @@ -788,6 +811,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() { for (size_t loop = start_index; loop + 1 < sorted_runs_.size(); loop++) { sr = &sorted_runs_[loop]; if (sr->being_compacted) { + // TODO with incremental compaction is supported, we might want to + // schedule some incremental compactions in parallel if needed. char file_num_buf[kFormatFileNumberBufSize]; sr->Dump(file_num_buf, sizeof(file_num_buf), true); ROCKS_LOG_BUFFER( @@ -821,16 +846,250 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() { " earliest-file-size %" PRIu64, cf_name_.c_str(), candidate_size, earliest_file_size); } + // Since incremental compaction can't include more than second last + // level, it can introduce penalty, compared to full compaction. We + // hard code the pentalty to be 80%. If we end up with a compaction + // fanout higher than 80% of full level compactions, we fall back + // to full level compaction. + // The 80% threshold is arbitrary and can be adjusted or made + // configurable in the future. + // This also prevent the case when compaction falls behind and we + // need to compact more levels for compactions to catch up. + if (mutable_cf_options_.compaction_options_universal.incremental) { + double fanout_threshold = static_cast(earliest_file_size) / + static_cast(candidate_size) * 1.8; + Compaction* picked = PickIncrementalForReduceSizeAmp(fanout_threshold); + if (picked != nullptr) { + // As the feature is still incremental, picking incremental compaction + // might fail and we will fall bck to compacting full level. + return picked; + } + } return PickCompactionToOldest(start_index, CompactionReason::kUniversalSizeAmplification); } +Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp( + double fanout_threshold) { + // Try find all potential compactions with total size just over + // options.max_compaction_size / 2, and take the one with the lowest + // fanout (defined in declaration of the function). + // This is done by having a sliding window of the files at the second + // lowest level, and keep expanding while finding overlapping in the + // last level. Once total size exceeds the size threshold, calculate + // the fanout value. And then shrinking from the small side of the + // window. Keep doing it until the end. + // Finally, we try to include upper level files if they fall into + // the range. + // + // Note that it is a similar problem as leveled compaction's + // kMinOverlappingRatio priority, but instead of picking single files + // we expand to a target compaction size. The reason is that in + // leveled compaction, actual fanout value tends to high, e.g. 10, so + // even with single file in down merging level, the extra size + // compacted in boundary files is at a lower ratio. But here users + // often have size of second last level size to be 1/4, 1/3 or even + // 1/2 of the bottommost level, so picking single file in second most + // level will cause significant waste, which is not desirable. + // + // This algorithm has lots of room to improve to pick more efficient + // compactions. + assert(sorted_runs_.size() >= 2); + int second_last_level = sorted_runs_[sorted_runs_.size() - 2].level; + if (second_last_level == 0) { + // Can't split Level 0. + return nullptr; + } + int output_level = sorted_runs_.back().level; + const std::vector& bottom_files = + vstorage_->LevelFiles(output_level); + const std::vector& files = + vstorage_->LevelFiles(second_last_level); + assert(!bottom_files.empty()); + assert(!files.empty()); + + // std::unordered_map file_to_order; + + int picked_start_idx = 0; + int picked_end_idx = 0; + double picked_fanout = fanout_threshold; + + // Use half target compaction bytes as anchor to stop growing second most + // level files, and reserve growing space for more overlapping bottom level, + // clean cut, files from other levels, etc. + uint64_t comp_thres_size = mutable_cf_options_.max_compaction_bytes / 2; + int start_idx = 0; + int bottom_end_idx = 0; + int bottom_start_idx = 0; + uint64_t non_bottom_size = 0; + uint64_t bottom_size = 0; + bool end_bottom_size_counted = false; + for (int end_idx = 0; end_idx < static_cast(files.size()); end_idx++) { + FileMetaData* end_file = files[end_idx]; + + // Include bottom most level files smaller than the current second + // last level file. + int num_skipped = 0; + while (bottom_end_idx < static_cast(bottom_files.size()) && + icmp_->Compare(bottom_files[bottom_end_idx]->largest, + end_file->smallest) < 0) { + if (!end_bottom_size_counted) { + bottom_size += bottom_files[bottom_end_idx]->fd.file_size; + } + bottom_end_idx++; + end_bottom_size_counted = false; + num_skipped++; + } + + if (num_skipped > 1) { + // At least a file in the bottom most level falls into the file gap. No + // reason to include the file. We cut the range and start a new sliding + // window. + start_idx = end_idx; + } + + if (start_idx == end_idx) { + // new sliding window. + non_bottom_size = 0; + bottom_size = 0; + bottom_start_idx = bottom_end_idx; + end_bottom_size_counted = false; + } + + non_bottom_size += end_file->fd.file_size; + + // Include all overlapping files in bottom level. + while (bottom_end_idx < static_cast(bottom_files.size()) && + icmp_->Compare(bottom_files[bottom_end_idx]->smallest, + end_file->largest) < 0) { + if (!end_bottom_size_counted) { + bottom_size += bottom_files[bottom_end_idx]->fd.file_size; + end_bottom_size_counted = true; + } + if (icmp_->Compare(bottom_files[bottom_end_idx]->largest, + end_file->largest) > 0) { + // next level file cross large boundary of current file. + break; + } + bottom_end_idx++; + end_bottom_size_counted = false; + } + + if ((non_bottom_size + bottom_size > comp_thres_size || + end_idx == static_cast(files.size()) - 1) && + non_bottom_size > 0) { // Do we alow 0 size file at all? + // If it is a better compaction, remember it in picked* variables. + double fanout = static_cast(bottom_size) / + static_cast(non_bottom_size); + if (fanout < picked_fanout) { + picked_start_idx = start_idx; + picked_end_idx = end_idx; + picked_fanout = fanout; + } + // Shrink from the start end to under comp_thres_size + while (non_bottom_size + bottom_size > comp_thres_size && + start_idx <= end_idx) { + non_bottom_size -= files[start_idx]->fd.file_size; + start_idx++; + if (start_idx < static_cast(files.size())) { + while (bottom_start_idx <= bottom_end_idx && + icmp_->Compare(bottom_files[bottom_start_idx]->largest, + files[start_idx]->smallest) < 0) { + bottom_size -= bottom_files[bottom_start_idx]->fd.file_size; + bottom_start_idx++; + } + } + } + } + } + + if (picked_fanout >= fanout_threshold) { + assert(picked_fanout == fanout_threshold); + return nullptr; + } + + std::vector inputs; + CompactionInputFiles bottom_level_inputs; + CompactionInputFiles second_last_level_inputs; + second_last_level_inputs.level = second_last_level; + bottom_level_inputs.level = output_level; + for (int i = picked_start_idx; i <= picked_end_idx; i++) { + if (files[i]->being_compacted) { + return nullptr; + } + second_last_level_inputs.files.push_back(files[i]); + } + assert(!second_last_level_inputs.empty()); + if (!picker_->ExpandInputsToCleanCut(cf_name_, vstorage_, + &second_last_level_inputs, + /*next_smallest=*/nullptr)) { + return nullptr; + } + // We might be able to avoid this binary search if we save and expand + // from bottom_start_idx and bottom_end_idx, but for now, we use + // SetupOtherInputs() for simplicity. + int parent_index = -1; // Create and use bottom_start_idx? + if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_, + &second_last_level_inputs, + &bottom_level_inputs, &parent_index, + /*base_index=*/-1)) { + return nullptr; + } + + // Try to include files in upper levels if they fall into the range. + // Since we need to go from lower level up and this is in the reverse + // order, compared to level order, we first write to an reversed + // data structure and finally copy them to compaction inputs. + InternalKey smallest, largest; + picker_->GetRange(second_last_level_inputs, &smallest, &largest); + std::vector inputs_reverse; + for (auto it = ++(++sorted_runs_.rbegin()); it != sorted_runs_.rend(); it++) { + SortedRun& sr = *it; + if (sr.level == 0) { + break; + } + std::vector level_inputs; + vstorage_->GetCleanInputsWithinInterval(sr.level, &smallest, &largest, + &level_inputs); + if (!level_inputs.empty()) { + inputs_reverse.push_back({}); + inputs_reverse.back().level = sr.level; + inputs_reverse.back().files = level_inputs; + picker_->GetRange(inputs_reverse.back(), &smallest, &largest); + } + } + for (auto it = inputs_reverse.rbegin(); it != inputs_reverse.rend(); it++) { + inputs.push_back(*it); + } + + inputs.push_back(second_last_level_inputs); + inputs.push_back(bottom_level_inputs); + + // TODO support multi paths? + uint32_t path_id = 0; + return new Compaction( + vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_, + std::move(inputs), output_level, + MaxFileSizeForLevel(mutable_cf_options_, output_level, + kCompactionStyleUniversal), + GetMaxOverlappingBytes(), path_id, + GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, + output_level, 1, true /* enable_compression */), + GetCompressionOptions(mutable_cf_options_, vstorage_, output_level, + true /* enable_compression */), + Temperature::kUnknown, + /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, + score_, false /* deletion_compaction */, + CompactionReason::kUniversalSizeAmplification); +} + // Pick files marked for compaction. Typically, files are marked by // CompactOnDeleteCollector due to the presence of tombstones. Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { CompactionInputFiles start_level_inputs; int output_level; std::vector inputs; + std::vector grandparents; if (vstorage_->num_levels() == 1) { // This is single level universal. Since we're basically trying to reclaim @@ -937,6 +1196,9 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) { return nullptr; } + + picker_->GetGrandparents(vstorage_, start_level_inputs, + output_level_inputs, &grandparents); } else { inputs.push_back(start_level_inputs); } @@ -954,13 +1216,13 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { std::move(inputs), output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), - /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, + /* max_grandparent_overlap_bytes */ GetMaxOverlappingBytes(), path_id, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, output_level, 1), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level), Temperature::kUnknown, - /* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, - score_, false /* deletion_compaction */, + /* max_subcompactions */ 0, grandparents, /* is manual */ false, score_, + false /* deletion_compaction */, CompactionReason::kFilesMarkedForCompaction); } @@ -1028,7 +1290,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest( std::move(inputs), output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level, kCompactionStyleUniversal), - LLONG_MAX, path_id, + GetMaxOverlappingBytes(), path_id, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, output_level, 1, true /* enable_compression */), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level, @@ -1103,6 +1365,17 @@ Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() { return c; } + +uint64_t UniversalCompactionBuilder::GetMaxOverlappingBytes() const { + if (!mutable_cf_options_.compaction_options_universal.incremental) { + return port::kMaxUint64; + } else { + // Try to align cutting boundary with files at the next level if the + // file isn't end up with 1/2 of target size, or it would overlap + // with two full size files at the next level. + return mutable_cf_options_.target_file_size_base / 2 * 3; + } +} } // namespace ROCKSDB_NAMESPACE #endif // !ROCKSDB_LITE diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 24e9dbf4a..41d12edd4 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -5255,6 +5255,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { generate_sst_func(); uint64_t total_size = (l1_avg_size * 10) + (l2_avg_size * 100); opts.max_compaction_bytes = total_size / num_split; + opts.target_file_size_base = total_size / num_split; Reopen(opts); num_compactions.store(0); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); @@ -5262,6 +5263,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { // very small max_compaction_bytes, it should still move forward opts.max_compaction_bytes = l1_avg_size / 2; + opts.target_file_size_base = l1_avg_size / 2; DestroyAndReopen(opts); generate_sst_func(); num_compactions.store(0); @@ -5275,7 +5277,8 @@ TEST_F(DBCompactionTest, ManualCompactionMax) { generate_sst_func(); total_size = (l1_avg_size * 10) + (l2_avg_size * 100); Status s = db_->SetOptions( - {{"max_compaction_bytes", std::to_string(total_size / num_split)}}); + {{"max_compaction_bytes", std::to_string(total_size / num_split)}, + {"target_file_size_base", std::to_string(total_size / num_split)}}); ASSERT_OK(s); num_compactions.store(0); diff --git a/db/db_range_del_test.cc b/db/db_range_del_test.cc index fc2e05ad4..8967ddef5 100644 --- a/db/db_range_del_test.cc +++ b/db/db_range_del_test.cc @@ -169,17 +169,36 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) { opts.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile)); // Want max_compaction_bytes to trigger the end of compaction output file, not // target_file_size_base, so make the latter much bigger - opts.target_file_size_base = 100 * opts.max_compaction_bytes; + // opts.target_file_size_base = 100 * opts.max_compaction_bytes; + opts.target_file_size_base = 1; DestroyAndReopen(opts); // snapshot protects range tombstone from dropping due to becoming obsolete. const Snapshot* snapshot = db_->GetSnapshot(); + Random rnd(301); + + ASSERT_OK(Put(GetNumericStr(0), rnd.RandomString(kBytesPerVal))); + ASSERT_OK( + Put(GetNumericStr(kNumPerFile - 1), rnd.RandomString(kBytesPerVal))); + ASSERT_OK(Flush()); + ASSERT_OK(Put(GetNumericStr(kNumPerFile), rnd.RandomString(kBytesPerVal))); + ASSERT_OK( + Put(GetNumericStr(kNumPerFile * 2 - 1), rnd.RandomString(kBytesPerVal))); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + ASSERT_EQ(NumTableFilesAtLevel(2), 2); + + ASSERT_OK(db_->SetOptions( + db_->DefaultColumnFamily(), + {{"target_file_size_base", ToString(100 * opts.max_compaction_bytes)}})); + // It spans the whole key-range, thus will be included in all output files ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), GetNumericStr(0), GetNumericStr(kNumFiles * kNumPerFile - 1))); - Random rnd(301); + for (int i = 0; i < kNumFiles; ++i) { std::vector values; // Write 1MB (256 values, each 4K) @@ -193,11 +212,11 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) { ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); } - ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, - true /* disallow_trivial_move */)); + ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, + /*column_family=*/nullptr, + /*disallow_trivial_move=*/true)); ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_GE(NumTableFilesAtLevel(1), 2); - std::vector> files; dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files); @@ -1628,6 +1647,7 @@ TEST_F(DBRangeDelTest, OverlappedTombstones) { const int kNumPerFile = 4, kNumFiles = 2; Options options = CurrentOptions(); options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; options.max_compaction_bytes = 9 * 1024; DestroyAndReopen(options); Random rnd(301); @@ -1667,6 +1687,7 @@ TEST_F(DBRangeDelTest, OverlappedKeys) { const int kNumPerFile = 4, kNumFiles = 2; Options options = CurrentOptions(); options.disable_auto_compactions = true; + options.target_file_size_base = 9 * 1024; options.max_compaction_bytes = 9 * 1024; DestroyAndReopen(options); Random rnd(301); diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index 9a09d8f4e..2ac0ef1ed 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -74,6 +74,13 @@ class CompactionOptionsUniversal { // Default: false bool allow_trivial_move; + // EXPERIMENTAL + // If true, try to limit compaction size under max_compaction_bytes. + // This might cause higher write amplification, but can prevent some + // problem caused by large compactions. + // Default: false + bool incremental; + // Default set of parameters CompactionOptionsUniversal() : size_ratio(1), @@ -82,7 +89,8 @@ class CompactionOptionsUniversal { max_size_amplification_percent(200), compression_size_percent(-1), stop_style(kCompactionStopStyleTotalSize), - allow_trivial_move(false) {} + allow_trivial_move(false), + incremental(false) {} }; } // namespace ROCKSDB_NAMESPACE diff --git a/options/cf_options.cc b/options/cf_options.cc index 5767e759c..57c965cc3 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -213,6 +213,10 @@ static std::unordered_map {offsetof(class CompactionOptionsUniversal, stop_style), OptionType::kCompactionStopStyle, OptionVerificationType::kNormal, OptionTypeFlags::kMutable}}, + {"incremental", + {offsetof(class CompactionOptionsUniversal, incremental), + OptionType::kBoolean, OptionVerificationType::kNormal, + OptionTypeFlags::kMutable}}, {"allow_trivial_move", {offsetof(class CompactionOptionsUniversal, allow_trivial_move), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -1027,6 +1031,8 @@ void MutableCFOptions::Dump(Logger* log) const { ROCKS_LOG_INFO( log, "compaction_options_universal.allow_trivial_move : %d", static_cast(compaction_options_universal.allow_trivial_move)); + ROCKS_LOG_INFO(log, "compaction_options_universal.incremental : %d", + static_cast(compaction_options_universal.incremental)); // FIFO Compaction Options ROCKS_LOG_INFO(log, "compaction_options_fifo.max_table_files_size : %" PRIu64, diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 790db453f..9aebf7dd7 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -532,6 +532,9 @@ DEFINE_int32(universal_compression_size_percent, -1, DEFINE_bool(universal_allow_trivial_move, false, "Allow trivial move in universal compaction."); +DEFINE_bool(universal_incremental, false, + "Enable incremental compactions in universal compaction."); + DEFINE_int64(cache_size, 8 << 20, // 8MB "Number of bytes to use as a cache of uncompressed data"); @@ -4313,6 +4316,8 @@ class Benchmark { } options.compaction_options_universal.allow_trivial_move = FLAGS_universal_allow_trivial_move; + options.compaction_options_universal.incremental = + FLAGS_universal_incremental; if (FLAGS_thread_status_per_interval > 0) { options.enable_thread_tracking = true; }