Incremental Space Amp Compactions in Universal Style (#8655)

Summary:
This commit introduces incremental compaction in univeral style for space amplification. This follows the first improvement mentioned in https://rocksdb.org/blog/2021/04/12/universal-improvements.html . The implemention simply picks up files about size of max_compaction_bytes to compact and execute if the penalty is not too big. More optimizations can be done in the future, e.g. prioritizing between this compaction and other types. But for now, the feature is supposed to be functional and can often reduce frequency of full compactions, although it can introduce penalty.

In order to add cut files more efficiently so that more files from upper levels can be included, SST file cutting threshold (for current file + overlapping parent level files) is set to 1.5X of target file size. A 2MB target file size will generate files like this: https://gist.github.com/siying/29d2676fba417404f3c95e6c013c7de8 Number of files indeed increases but it is not out of control.

Two set of write benchmarks are run:
1. For ingestion rate limited scenario, we can see full compaction is mostly eliminated: https://gist.github.com/siying/959bc1186066906831cf4c808d6e0a19 . The write amp increased from 7.7 to 9.4, as expected. After applying file cutting, the number is improved to 8.9. In another benchmark, the write amp is even better with the incremental approach: https://gist.github.com/siying/d1c16c286d7c59c4d7bba718ca198163
2. For ingestion rate unlimited scenario, incremental compaction turns out to be too expensive most of the time and is not executed, as expected.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8655

Test Plan: Add unit tests to the functionality.

Reviewed By: ajkr

Differential Revision: D31787034

fbshipit-source-id: ce813e63b15a61d5a56e97bf8902a1b28e011beb
main
sdong 3 years ago committed by Facebook GitHub Bot
parent 6d93b87588
commit c66b4429ff
  1. 1
      HISTORY.md
  2. 4
      db/compaction/compaction_job.cc
  3. 215
      db/compaction/compaction_picker_test.cc
  4. 287
      db/compaction/compaction_picker_universal.cc
  5. 5
      db/db_compaction_test.cc
  6. 31
      db/db_range_del_test.cc
  7. 10
      include/rocksdb/universal_compaction.h
  8. 6
      options/cf_options.cc
  9. 5
      tools/db_bench_tool.cc

@ -27,6 +27,7 @@
* Add `file_temperature` to `IngestExternalFileArg` such that when ingesting SST files, we are able to indicate the temperature of the this batch of files. * Add `file_temperature` to `IngestExternalFileArg` such that when ingesting SST files, we are able to indicate the temperature of the this batch of files.
* If `DB::Close()` failed with a non aborted status, calling `DB::Close()` again will return the original status instead of Status::OK. * If `DB::Close()` failed with a non aborted status, calling `DB::Close()` again will return the original status instead of Status::OK.
* Add CacheTier to advanced_options.h to describe the cache tier we used. Add a `lowest_used_cache_tier` option to `DBOptions` (immutable) and pass it to BlockBasedTableReader. By default it is `CacheTier::kNonVolatileBlockTier`, which means, we always use both block cache (kVolatileTier) and secondary cache (kNonVolatileBlockTier). By set it to `CacheTier::kVolatileTier`, the DB will not use the secondary cache. * Add CacheTier to advanced_options.h to describe the cache tier we used. Add a `lowest_used_cache_tier` option to `DBOptions` (immutable) and pass it to BlockBasedTableReader. By default it is `CacheTier::kNonVolatileBlockTier`, which means, we always use both block cache (kVolatileTier) and secondary cache (kNonVolatileBlockTier). By set it to `CacheTier::kVolatileTier`, the DB will not use the secondary cache.
* Even when options.max_compaction_bytes is hit, compaction output files are only cut when it aligns with grandparent files' boundaries. options.max_compaction_bytes could be slightly violated with the change, but the violation is no more than one target SST file size, which is usually much smaller.
### Performance Improvements ### Performance Improvements
* Improved CPU efficiency of building block-based table (SST) files (#9039 and #9040). * Improved CPU efficiency of building block-based table (SST) files (#9039 and #9040).

@ -219,6 +219,7 @@ struct CompactionJob::SubcompactionState {
&compaction->column_family_data()->internal_comparator(); &compaction->column_family_data()->internal_comparator();
const std::vector<FileMetaData*>& grandparents = compaction->grandparents(); const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
bool grandparant_file_switched = false;
// Scan to find earliest grandparent file that contains key. // Scan to find earliest grandparent file that contains key.
while (grandparent_index < grandparents.size() && while (grandparent_index < grandparents.size() &&
icmp->Compare(internal_key, icmp->Compare(internal_key,
@ -226,6 +227,7 @@ struct CompactionJob::SubcompactionState {
0) { 0) {
if (seen_key) { if (seen_key) {
overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize(); overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
grandparant_file_switched = true;
} }
assert(grandparent_index + 1 >= grandparents.size() || assert(grandparent_index + 1 >= grandparents.size() ||
icmp->Compare( icmp->Compare(
@ -235,7 +237,7 @@ struct CompactionJob::SubcompactionState {
} }
seen_key = true; seen_key = true;
if (overlapped_bytes + curr_file_size > if (grandparant_file_switched && overlapped_bytes + curr_file_size >
compaction->max_compaction_bytes()) { compaction->max_compaction_bytes()) {
// Too much overlap for current output; start new output // Too much overlap for current output; start new output
overlapped_bytes = 0; overlapped_bytes = 0;

@ -733,6 +733,221 @@ TEST_F(CompactionPickerTest, UniversalPeriodicCompaction6) {
ASSERT_EQ(4, compaction->output_level()); ASSERT_EQ(4, compaction->output_level());
} }
TEST_F(CompactionPickerTest, UniversalIncrementalSpace1) {
const uint64_t kFileSize = 100000;
mutable_cf_options_.max_compaction_bytes = 555555;
mutable_cf_options_.compaction_options_universal.incremental = true;
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent = 30;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(2, 2U, "010", "080", kFileSize, 0, 200, 251);
Add(3, 5U, "310", "380", kFileSize, 0, 200, 251);
Add(3, 6U, "410", "880", kFileSize, 0, 200, 251);
Add(3, 7U, "910", "980", 1, 0, 200, 251);
Add(4, 10U, "201", "250", kFileSize, 0, 101, 150);
Add(4, 11U, "301", "350", kFileSize, 0, 101, 150);
Add(4, 12U, "401", "450", kFileSize, 0, 101, 150);
Add(4, 13U, "501", "750", kFileSize, 0, 101, 150);
Add(4, 14U, "801", "850", kFileSize, 0, 101, 150);
Add(4, 15U, "901", "950", kFileSize, 0, 101, 150);
// Add(4, 15U, "960", "970", kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(3, compaction->start_level());
ASSERT_EQ(2U, compaction->num_input_files(0));
ASSERT_EQ(5U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(6U, compaction->input(0, 1)->fd.GetNumber());
// ASSERT_EQ(4U, compaction->num_input_files(1));
ASSERT_EQ(11U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(12U, compaction->input(1, 1)->fd.GetNumber());
ASSERT_EQ(13U, compaction->input(1, 2)->fd.GetNumber());
ASSERT_EQ(14U, compaction->input(1, 3)->fd.GetNumber());
}
TEST_F(CompactionPickerTest, UniversalIncrementalSpace2) {
const uint64_t kFileSize = 100000;
mutable_cf_options_.max_compaction_bytes = 400000;
mutable_cf_options_.compaction_options_universal.incremental = true;
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent = 30;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(1, 2U, "010", "080", kFileSize, 0, 200, 251);
Add(2, 5U, "310", "380", kFileSize, 0, 200, 251);
Add(2, 6U, "410", "880", kFileSize, 0, 200, 251);
Add(2, 7U, "910", "980", kFileSize, 0, 200, 251);
Add(4, 10U, "201", "250", kFileSize, 0, 101, 150);
Add(4, 11U, "301", "350", kFileSize, 0, 101, 150);
Add(4, 12U, "401", "450", kFileSize, 0, 101, 150);
Add(4, 13U, "501", "750", kFileSize, 0, 101, 150);
Add(4, 14U, "801", "850", kFileSize, 0, 101, 150);
Add(4, 15U, "901", "950", kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(2, compaction->start_level());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(7U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(1U, compaction->num_input_files(1));
ASSERT_EQ(15U, compaction->input(1, 0)->fd.GetNumber());
}
TEST_F(CompactionPickerTest, UniversalIncrementalSpace3) {
// Test bottom level files falling between gaps between two upper level
// files
const uint64_t kFileSize = 100000;
mutable_cf_options_.max_compaction_bytes = 300000;
mutable_cf_options_.compaction_options_universal.incremental = true;
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent = 30;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(2, 2U, "010", "080", kFileSize, 0, 200, 251);
Add(3, 5U, "000", "180", kFileSize, 0, 200, 251);
Add(3, 6U, "181", "190", kFileSize, 0, 200, 251);
Add(3, 7U, "710", "810", kFileSize, 0, 200, 251);
Add(3, 8U, "820", "830", kFileSize, 0, 200, 251);
Add(3, 9U, "900", "991", kFileSize, 0, 200, 251);
Add(4, 10U, "201", "250", kFileSize, 0, 101, 150);
Add(4, 11U, "301", "350", kFileSize, 0, 101, 150);
Add(4, 12U, "401", "450", kFileSize, 0, 101, 150);
Add(4, 13U, "501", "750", kFileSize, 0, 101, 150);
Add(4, 14U, "801", "850", kFileSize, 0, 101, 150);
Add(4, 15U, "901", "950", kFileSize, 0, 101, 150);
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(2, compaction->start_level());
ASSERT_EQ(1U, compaction->num_input_files(0));
ASSERT_EQ(2U, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(2U, compaction->num_input_files(1));
ASSERT_EQ(5U, compaction->input(1, 0)->fd.GetNumber());
ASSERT_EQ(6U, compaction->input(1, 1)->fd.GetNumber());
ASSERT_EQ(0, compaction->num_input_files(2));
}
TEST_F(CompactionPickerTest, UniversalIncrementalSpace4) {
// Test compaction candidates always cover many files.
const uint64_t kFileSize = 100000;
mutable_cf_options_.max_compaction_bytes = 3200000;
mutable_cf_options_.compaction_options_universal.incremental = true;
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent = 30;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(2, 2U, "010", "080", kFileSize, 0, 200, 251);
// Generate files like following:
// L3: (1101, 1180) (1201, 1280) ... (7901, 7908)
// L4: (1130, 1150) (1160, 1210) (1230, 1250) (1260 1310) ... (7960, 8010)
for (int i = 11; i < 79; i++) {
Add(3, 100 + i * 3, ToString(i * 100).c_str(),
ToString(i * 100 + 80).c_str(), kFileSize, 0, 200, 251);
// Add a tie breaker
if (i == 66) {
Add(3, 10000U, "6690", "6699", kFileSize, 0, 200, 251);
}
Add(4, 100 + i * 3 + 1, ToString(i * 100 + 30).c_str(),
ToString(i * 100 + 50).c_str(), kFileSize, 0, 200, 251);
Add(4, 100 + i * 3 + 2, ToString(i * 100 + 60).c_str(),
ToString(i * 100 + 110).c_str(), kFileSize, 0, 200, 251);
}
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(3, compaction->start_level());
ASSERT_EQ(6U, compaction->num_input_files(0));
ASSERT_EQ(100 + 62U * 3, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(10000U, compaction->input(0, 5)->fd.GetNumber());
ASSERT_EQ(11, compaction->num_input_files(1));
}
TEST_F(CompactionPickerTest, UniversalIncrementalSpace5) {
// Test compaction candidates always cover many files with some single
// files larger than size threshold.
const uint64_t kFileSize = 100000;
mutable_cf_options_.max_compaction_bytes = 3200000;
mutable_cf_options_.compaction_options_universal.incremental = true;
mutable_cf_options_.compaction_options_universal
.max_size_amplification_percent = 30;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(2, 2U, "010", "080", kFileSize, 0, 200, 251);
// Generate files like following:
// L3: (1101, 1180) (1201, 1280) ... (7901, 7908)
// L4: (1130, 1150) (1160, 1210) (1230, 1250) (1260 1310) ... (7960, 8010)
for (int i = 11; i < 70; i++) {
Add(3, 100 + i * 3, ToString(i * 100).c_str(),
ToString(i * 100 + 80).c_str(),
i % 10 == 9 ? kFileSize * 100 : kFileSize, 0, 200, 251);
Add(4, 100 + i * 3 + 1, ToString(i * 100 + 30).c_str(),
ToString(i * 100 + 50).c_str(), kFileSize, 0, 200, 251);
Add(4, 100 + i * 3 + 2, ToString(i * 100 + 60).c_str(),
ToString(i * 100 + 110).c_str(), kFileSize, 0, 200, 251);
}
UpdateVersionStorageInfo();
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, mutable_db_options_, vstorage_.get(),
&log_buffer_));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(3, compaction->start_level());
ASSERT_EQ(6U, compaction->num_input_files(0));
ASSERT_EQ(100 + 14 * 3, compaction->input(0, 0)->fd.GetNumber());
ASSERT_EQ(100 + 19 * 3, compaction->input(0, 5)->fd.GetNumber());
ASSERT_EQ(13, compaction->num_input_files(1));
}
TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
NewVersionStorage(1, kCompactionStyleFIFO); NewVersionStorage(1, kCompactionStyleFIFO);
const int kFileCount = const int kFileCount =

@ -89,6 +89,14 @@ class UniversalCompactionBuilder {
// Pick Universal compaction to limit space amplification. // Pick Universal compaction to limit space amplification.
Compaction* PickCompactionToReduceSizeAmp(); Compaction* PickCompactionToReduceSizeAmp();
// Try to pick incremental compaction to reduce space amplification.
// It will return null if it cannot find a fanout within the threshold.
// Fanout is defined as
// total size of files to compact at output level
// --------------------------------------------------
// total size of files to compact at other levels
Compaction* PickIncrementalForReduceSizeAmp(double fanout_threshold);
Compaction* PickDeleteTriggeredCompaction(); Compaction* PickDeleteTriggeredCompaction();
// Form a compaction from the sorted run indicated by start_index to the // Form a compaction from the sorted run indicated by start_index to the
@ -110,6 +118,8 @@ class UniversalCompactionBuilder {
// overlapping. // overlapping.
bool IsInputFilesNonOverlapping(Compaction* c); bool IsInputFilesNonOverlapping(Compaction* c);
uint64_t GetMaxOverlappingBytes() const;
const ImmutableOptions& ioptions_; const ImmutableOptions& ioptions_;
const InternalKeyComparator* icmp_; const InternalKeyComparator* icmp_;
double score_; double score_;
@ -714,6 +724,19 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
cf_name_.c_str(), file_num_buf); cf_name_.c_str(), file_num_buf);
} }
std::vector<FileMetaData*> grandparents;
// Include grandparents for potential file cutting in incremental
// mode. It is for aligning file cutting boundaries across levels,
// so that subsequent compactions can pick files with aligned
// buffer.
// Single files are only picked up in incremental mode, so that
// there is no need for full range.
if (mutable_cf_options_.compaction_options_universal.incremental &&
first_index_after < sorted_runs_.size() &&
sorted_runs_[first_index_after].level > 1) {
grandparents = vstorage_->LevelFiles(sorted_runs_[first_index_after].level);
}
CompactionReason compaction_reason; CompactionReason compaction_reason;
if (max_number_of_files_to_compact == UINT_MAX) { if (max_number_of_files_to_compact == UINT_MAX) {
compaction_reason = CompactionReason::kUniversalSizeRatio; compaction_reason = CompactionReason::kUniversalSizeRatio;
@ -725,14 +748,14 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
std::move(inputs), output_level, std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal), kCompactionStyleUniversal),
LLONG_MAX, path_id, GetMaxOverlappingBytes(), path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1, enable_compression), 1, enable_compression),
GetCompressionOptions(mutable_cf_options_, vstorage_, start_level, GetCompressionOptions(mutable_cf_options_, vstorage_, start_level,
enable_compression), enable_compression),
Temperature::kUnknown, Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, /* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
score_, false /* deletion_compaction */, compaction_reason); false /* deletion_compaction */, compaction_reason);
} }
// Look at overall size amplification. If size amplification // Look at overall size amplification. If size amplification
@ -788,6 +811,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
for (size_t loop = start_index; loop + 1 < sorted_runs_.size(); loop++) { for (size_t loop = start_index; loop + 1 < sorted_runs_.size(); loop++) {
sr = &sorted_runs_[loop]; sr = &sorted_runs_[loop];
if (sr->being_compacted) { if (sr->being_compacted) {
// TODO with incremental compaction is supported, we might want to
// schedule some incremental compactions in parallel if needed.
char file_num_buf[kFormatFileNumberBufSize]; char file_num_buf[kFormatFileNumberBufSize];
sr->Dump(file_num_buf, sizeof(file_num_buf), true); sr->Dump(file_num_buf, sizeof(file_num_buf), true);
ROCKS_LOG_BUFFER( ROCKS_LOG_BUFFER(
@ -821,16 +846,250 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
" earliest-file-size %" PRIu64, " earliest-file-size %" PRIu64,
cf_name_.c_str(), candidate_size, earliest_file_size); cf_name_.c_str(), candidate_size, earliest_file_size);
} }
// Since incremental compaction can't include more than second last
// level, it can introduce penalty, compared to full compaction. We
// hard code the pentalty to be 80%. If we end up with a compaction
// fanout higher than 80% of full level compactions, we fall back
// to full level compaction.
// The 80% threshold is arbitrary and can be adjusted or made
// configurable in the future.
// This also prevent the case when compaction falls behind and we
// need to compact more levels for compactions to catch up.
if (mutable_cf_options_.compaction_options_universal.incremental) {
double fanout_threshold = static_cast<double>(earliest_file_size) /
static_cast<double>(candidate_size) * 1.8;
Compaction* picked = PickIncrementalForReduceSizeAmp(fanout_threshold);
if (picked != nullptr) {
// As the feature is still incremental, picking incremental compaction
// might fail and we will fall bck to compacting full level.
return picked;
}
}
return PickCompactionToOldest(start_index, return PickCompactionToOldest(start_index,
CompactionReason::kUniversalSizeAmplification); CompactionReason::kUniversalSizeAmplification);
} }
Compaction* UniversalCompactionBuilder::PickIncrementalForReduceSizeAmp(
double fanout_threshold) {
// Try find all potential compactions with total size just over
// options.max_compaction_size / 2, and take the one with the lowest
// fanout (defined in declaration of the function).
// This is done by having a sliding window of the files at the second
// lowest level, and keep expanding while finding overlapping in the
// last level. Once total size exceeds the size threshold, calculate
// the fanout value. And then shrinking from the small side of the
// window. Keep doing it until the end.
// Finally, we try to include upper level files if they fall into
// the range.
//
// Note that it is a similar problem as leveled compaction's
// kMinOverlappingRatio priority, but instead of picking single files
// we expand to a target compaction size. The reason is that in
// leveled compaction, actual fanout value tends to high, e.g. 10, so
// even with single file in down merging level, the extra size
// compacted in boundary files is at a lower ratio. But here users
// often have size of second last level size to be 1/4, 1/3 or even
// 1/2 of the bottommost level, so picking single file in second most
// level will cause significant waste, which is not desirable.
//
// This algorithm has lots of room to improve to pick more efficient
// compactions.
assert(sorted_runs_.size() >= 2);
int second_last_level = sorted_runs_[sorted_runs_.size() - 2].level;
if (second_last_level == 0) {
// Can't split Level 0.
return nullptr;
}
int output_level = sorted_runs_.back().level;
const std::vector<FileMetaData*>& bottom_files =
vstorage_->LevelFiles(output_level);
const std::vector<FileMetaData*>& files =
vstorage_->LevelFiles(second_last_level);
assert(!bottom_files.empty());
assert(!files.empty());
// std::unordered_map<uint64_t, uint64_t> file_to_order;
int picked_start_idx = 0;
int picked_end_idx = 0;
double picked_fanout = fanout_threshold;
// Use half target compaction bytes as anchor to stop growing second most
// level files, and reserve growing space for more overlapping bottom level,
// clean cut, files from other levels, etc.
uint64_t comp_thres_size = mutable_cf_options_.max_compaction_bytes / 2;
int start_idx = 0;
int bottom_end_idx = 0;
int bottom_start_idx = 0;
uint64_t non_bottom_size = 0;
uint64_t bottom_size = 0;
bool end_bottom_size_counted = false;
for (int end_idx = 0; end_idx < static_cast<int>(files.size()); end_idx++) {
FileMetaData* end_file = files[end_idx];
// Include bottom most level files smaller than the current second
// last level file.
int num_skipped = 0;
while (bottom_end_idx < static_cast<int>(bottom_files.size()) &&
icmp_->Compare(bottom_files[bottom_end_idx]->largest,
end_file->smallest) < 0) {
if (!end_bottom_size_counted) {
bottom_size += bottom_files[bottom_end_idx]->fd.file_size;
}
bottom_end_idx++;
end_bottom_size_counted = false;
num_skipped++;
}
if (num_skipped > 1) {
// At least a file in the bottom most level falls into the file gap. No
// reason to include the file. We cut the range and start a new sliding
// window.
start_idx = end_idx;
}
if (start_idx == end_idx) {
// new sliding window.
non_bottom_size = 0;
bottom_size = 0;
bottom_start_idx = bottom_end_idx;
end_bottom_size_counted = false;
}
non_bottom_size += end_file->fd.file_size;
// Include all overlapping files in bottom level.
while (bottom_end_idx < static_cast<int>(bottom_files.size()) &&
icmp_->Compare(bottom_files[bottom_end_idx]->smallest,
end_file->largest) < 0) {
if (!end_bottom_size_counted) {
bottom_size += bottom_files[bottom_end_idx]->fd.file_size;
end_bottom_size_counted = true;
}
if (icmp_->Compare(bottom_files[bottom_end_idx]->largest,
end_file->largest) > 0) {
// next level file cross large boundary of current file.
break;
}
bottom_end_idx++;
end_bottom_size_counted = false;
}
if ((non_bottom_size + bottom_size > comp_thres_size ||
end_idx == static_cast<int>(files.size()) - 1) &&
non_bottom_size > 0) { // Do we alow 0 size file at all?
// If it is a better compaction, remember it in picked* variables.
double fanout = static_cast<double>(bottom_size) /
static_cast<double>(non_bottom_size);
if (fanout < picked_fanout) {
picked_start_idx = start_idx;
picked_end_idx = end_idx;
picked_fanout = fanout;
}
// Shrink from the start end to under comp_thres_size
while (non_bottom_size + bottom_size > comp_thres_size &&
start_idx <= end_idx) {
non_bottom_size -= files[start_idx]->fd.file_size;
start_idx++;
if (start_idx < static_cast<int>(files.size())) {
while (bottom_start_idx <= bottom_end_idx &&
icmp_->Compare(bottom_files[bottom_start_idx]->largest,
files[start_idx]->smallest) < 0) {
bottom_size -= bottom_files[bottom_start_idx]->fd.file_size;
bottom_start_idx++;
}
}
}
}
}
if (picked_fanout >= fanout_threshold) {
assert(picked_fanout == fanout_threshold);
return nullptr;
}
std::vector<CompactionInputFiles> inputs;
CompactionInputFiles bottom_level_inputs;
CompactionInputFiles second_last_level_inputs;
second_last_level_inputs.level = second_last_level;
bottom_level_inputs.level = output_level;
for (int i = picked_start_idx; i <= picked_end_idx; i++) {
if (files[i]->being_compacted) {
return nullptr;
}
second_last_level_inputs.files.push_back(files[i]);
}
assert(!second_last_level_inputs.empty());
if (!picker_->ExpandInputsToCleanCut(cf_name_, vstorage_,
&second_last_level_inputs,
/*next_smallest=*/nullptr)) {
return nullptr;
}
// We might be able to avoid this binary search if we save and expand
// from bottom_start_idx and bottom_end_idx, but for now, we use
// SetupOtherInputs() for simplicity.
int parent_index = -1; // Create and use bottom_start_idx?
if (!picker_->SetupOtherInputs(cf_name_, mutable_cf_options_, vstorage_,
&second_last_level_inputs,
&bottom_level_inputs, &parent_index,
/*base_index=*/-1)) {
return nullptr;
}
// Try to include files in upper levels if they fall into the range.
// Since we need to go from lower level up and this is in the reverse
// order, compared to level order, we first write to an reversed
// data structure and finally copy them to compaction inputs.
InternalKey smallest, largest;
picker_->GetRange(second_last_level_inputs, &smallest, &largest);
std::vector<CompactionInputFiles> inputs_reverse;
for (auto it = ++(++sorted_runs_.rbegin()); it != sorted_runs_.rend(); it++) {
SortedRun& sr = *it;
if (sr.level == 0) {
break;
}
std::vector<FileMetaData*> level_inputs;
vstorage_->GetCleanInputsWithinInterval(sr.level, &smallest, &largest,
&level_inputs);
if (!level_inputs.empty()) {
inputs_reverse.push_back({});
inputs_reverse.back().level = sr.level;
inputs_reverse.back().files = level_inputs;
picker_->GetRange(inputs_reverse.back(), &smallest, &largest);
}
}
for (auto it = inputs_reverse.rbegin(); it != inputs_reverse.rend(); it++) {
inputs.push_back(*it);
}
inputs.push_back(second_last_level_inputs);
inputs.push_back(bottom_level_inputs);
// TODO support multi paths?
uint32_t path_id = 0;
return new Compaction(
vstorage_, ioptions_, mutable_cf_options_, mutable_db_options_,
std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
GetMaxOverlappingBytes(), path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1, true /* enable_compression */),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
true /* enable_compression */),
Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
}
// Pick files marked for compaction. Typically, files are marked by // Pick files marked for compaction. Typically, files are marked by
// CompactOnDeleteCollector due to the presence of tombstones. // CompactOnDeleteCollector due to the presence of tombstones.
Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() { Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
CompactionInputFiles start_level_inputs; CompactionInputFiles start_level_inputs;
int output_level; int output_level;
std::vector<CompactionInputFiles> inputs; std::vector<CompactionInputFiles> inputs;
std::vector<FileMetaData*> grandparents;
if (vstorage_->num_levels() == 1) { if (vstorage_->num_levels() == 1) {
// This is single level universal. Since we're basically trying to reclaim // This is single level universal. Since we're basically trying to reclaim
@ -937,6 +1196,9 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) { if (picker_->FilesRangeOverlapWithCompaction(inputs, output_level)) {
return nullptr; return nullptr;
} }
picker_->GetGrandparents(vstorage_, start_level_inputs,
output_level_inputs, &grandparents);
} else { } else {
inputs.push_back(start_level_inputs); inputs.push_back(start_level_inputs);
} }
@ -954,13 +1216,13 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
std::move(inputs), output_level, std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal), kCompactionStyleUniversal),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, /* max_grandparent_overlap_bytes */ GetMaxOverlappingBytes(), path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1), output_level, 1),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level), GetCompressionOptions(mutable_cf_options_, vstorage_, output_level),
Temperature::kUnknown, Temperature::kUnknown,
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false, /* max_subcompactions */ 0, grandparents, /* is manual */ false, score_,
score_, false /* deletion_compaction */, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction); CompactionReason::kFilesMarkedForCompaction);
} }
@ -1028,7 +1290,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
std::move(inputs), output_level, std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level, MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal), kCompactionStyleUniversal),
LLONG_MAX, path_id, GetMaxOverlappingBytes(), path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1, true /* enable_compression */), output_level, 1, true /* enable_compression */),
GetCompressionOptions(mutable_cf_options_, vstorage_, output_level, GetCompressionOptions(mutable_cf_options_, vstorage_, output_level,
@ -1103,6 +1365,17 @@ Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
return c; return c;
} }
uint64_t UniversalCompactionBuilder::GetMaxOverlappingBytes() const {
if (!mutable_cf_options_.compaction_options_universal.incremental) {
return port::kMaxUint64;
} else {
// Try to align cutting boundary with files at the next level if the
// file isn't end up with 1/2 of target size, or it would overlap
// with two full size files at the next level.
return mutable_cf_options_.target_file_size_base / 2 * 3;
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE

@ -5255,6 +5255,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) {
generate_sst_func(); generate_sst_func();
uint64_t total_size = (l1_avg_size * 10) + (l2_avg_size * 100); uint64_t total_size = (l1_avg_size * 10) + (l2_avg_size * 100);
opts.max_compaction_bytes = total_size / num_split; opts.max_compaction_bytes = total_size / num_split;
opts.target_file_size_base = total_size / num_split;
Reopen(opts); Reopen(opts);
num_compactions.store(0); num_compactions.store(0);
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
@ -5262,6 +5263,7 @@ TEST_F(DBCompactionTest, ManualCompactionMax) {
// very small max_compaction_bytes, it should still move forward // very small max_compaction_bytes, it should still move forward
opts.max_compaction_bytes = l1_avg_size / 2; opts.max_compaction_bytes = l1_avg_size / 2;
opts.target_file_size_base = l1_avg_size / 2;
DestroyAndReopen(opts); DestroyAndReopen(opts);
generate_sst_func(); generate_sst_func();
num_compactions.store(0); num_compactions.store(0);
@ -5275,7 +5277,8 @@ TEST_F(DBCompactionTest, ManualCompactionMax) {
generate_sst_func(); generate_sst_func();
total_size = (l1_avg_size * 10) + (l2_avg_size * 100); total_size = (l1_avg_size * 10) + (l2_avg_size * 100);
Status s = db_->SetOptions( Status s = db_->SetOptions(
{{"max_compaction_bytes", std::to_string(total_size / num_split)}}); {{"max_compaction_bytes", std::to_string(total_size / num_split)},
{"target_file_size_base", std::to_string(total_size / num_split)}});
ASSERT_OK(s); ASSERT_OK(s);
num_compactions.store(0); num_compactions.store(0);

@ -169,17 +169,36 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile)); opts.memtable_factory.reset(test::NewSpecialSkipListFactory(kNumPerFile));
// Want max_compaction_bytes to trigger the end of compaction output file, not // Want max_compaction_bytes to trigger the end of compaction output file, not
// target_file_size_base, so make the latter much bigger // target_file_size_base, so make the latter much bigger
opts.target_file_size_base = 100 * opts.max_compaction_bytes; // opts.target_file_size_base = 100 * opts.max_compaction_bytes;
opts.target_file_size_base = 1;
DestroyAndReopen(opts); DestroyAndReopen(opts);
// snapshot protects range tombstone from dropping due to becoming obsolete. // snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot(); const Snapshot* snapshot = db_->GetSnapshot();
Random rnd(301);
ASSERT_OK(Put(GetNumericStr(0), rnd.RandomString(kBytesPerVal)));
ASSERT_OK(
Put(GetNumericStr(kNumPerFile - 1), rnd.RandomString(kBytesPerVal)));
ASSERT_OK(Flush());
ASSERT_OK(Put(GetNumericStr(kNumPerFile), rnd.RandomString(kBytesPerVal)));
ASSERT_OK(
Put(GetNumericStr(kNumPerFile * 2 - 1), rnd.RandomString(kBytesPerVal)));
ASSERT_OK(Flush());
MoveFilesToLevel(2);
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(NumTableFilesAtLevel(2), 2);
ASSERT_OK(db_->SetOptions(
db_->DefaultColumnFamily(),
{{"target_file_size_base", ToString(100 * opts.max_compaction_bytes)}}));
// It spans the whole key-range, thus will be included in all output files // It spans the whole key-range, thus will be included in all output files
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
GetNumericStr(0), GetNumericStr(0),
GetNumericStr(kNumFiles * kNumPerFile - 1))); GetNumericStr(kNumFiles * kNumPerFile - 1)));
Random rnd(301);
for (int i = 0; i < kNumFiles; ++i) { for (int i = 0; i < kNumFiles; ++i) {
std::vector<std::string> values; std::vector<std::string> values;
// Write 1MB (256 values, each 4K) // Write 1MB (256 values, each 4K)
@ -193,11 +212,11 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
} }
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr,
true /* disallow_trivial_move */)); /*column_family=*/nullptr,
/*disallow_trivial_move=*/true));
ASSERT_EQ(0, NumTableFilesAtLevel(0)); ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GE(NumTableFilesAtLevel(1), 2); ASSERT_GE(NumTableFilesAtLevel(1), 2);
std::vector<std::vector<FileMetaData>> files; std::vector<std::vector<FileMetaData>> files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files); dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &files);
@ -1628,6 +1647,7 @@ TEST_F(DBRangeDelTest, OverlappedTombstones) {
const int kNumPerFile = 4, kNumFiles = 2; const int kNumPerFile = 4, kNumFiles = 2;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.target_file_size_base = 9 * 1024;
options.max_compaction_bytes = 9 * 1024; options.max_compaction_bytes = 9 * 1024;
DestroyAndReopen(options); DestroyAndReopen(options);
Random rnd(301); Random rnd(301);
@ -1667,6 +1687,7 @@ TEST_F(DBRangeDelTest, OverlappedKeys) {
const int kNumPerFile = 4, kNumFiles = 2; const int kNumPerFile = 4, kNumFiles = 2;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.target_file_size_base = 9 * 1024;
options.max_compaction_bytes = 9 * 1024; options.max_compaction_bytes = 9 * 1024;
DestroyAndReopen(options); DestroyAndReopen(options);
Random rnd(301); Random rnd(301);

@ -74,6 +74,13 @@ class CompactionOptionsUniversal {
// Default: false // Default: false
bool allow_trivial_move; bool allow_trivial_move;
// EXPERIMENTAL
// If true, try to limit compaction size under max_compaction_bytes.
// This might cause higher write amplification, but can prevent some
// problem caused by large compactions.
// Default: false
bool incremental;
// Default set of parameters // Default set of parameters
CompactionOptionsUniversal() CompactionOptionsUniversal()
: size_ratio(1), : size_ratio(1),
@ -82,7 +89,8 @@ class CompactionOptionsUniversal {
max_size_amplification_percent(200), max_size_amplification_percent(200),
compression_size_percent(-1), compression_size_percent(-1),
stop_style(kCompactionStopStyleTotalSize), stop_style(kCompactionStopStyleTotalSize),
allow_trivial_move(false) {} allow_trivial_move(false),
incremental(false) {}
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -213,6 +213,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(class CompactionOptionsUniversal, stop_style), {offsetof(class CompactionOptionsUniversal, stop_style),
OptionType::kCompactionStopStyle, OptionVerificationType::kNormal, OptionType::kCompactionStopStyle, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}}, OptionTypeFlags::kMutable}},
{"incremental",
{offsetof(class CompactionOptionsUniversal, incremental),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"allow_trivial_move", {"allow_trivial_move",
{offsetof(class CompactionOptionsUniversal, allow_trivial_move), {offsetof(class CompactionOptionsUniversal, allow_trivial_move),
OptionType::kBoolean, OptionVerificationType::kNormal, OptionType::kBoolean, OptionVerificationType::kNormal,
@ -1027,6 +1031,8 @@ void MutableCFOptions::Dump(Logger* log) const {
ROCKS_LOG_INFO( ROCKS_LOG_INFO(
log, "compaction_options_universal.allow_trivial_move : %d", log, "compaction_options_universal.allow_trivial_move : %d",
static_cast<int>(compaction_options_universal.allow_trivial_move)); static_cast<int>(compaction_options_universal.allow_trivial_move));
ROCKS_LOG_INFO(log, "compaction_options_universal.incremental : %d",
static_cast<int>(compaction_options_universal.incremental));
// FIFO Compaction Options // FIFO Compaction Options
ROCKS_LOG_INFO(log, "compaction_options_fifo.max_table_files_size : %" PRIu64, ROCKS_LOG_INFO(log, "compaction_options_fifo.max_table_files_size : %" PRIu64,

@ -532,6 +532,9 @@ DEFINE_int32(universal_compression_size_percent, -1,
DEFINE_bool(universal_allow_trivial_move, false, DEFINE_bool(universal_allow_trivial_move, false,
"Allow trivial move in universal compaction."); "Allow trivial move in universal compaction.");
DEFINE_bool(universal_incremental, false,
"Enable incremental compactions in universal compaction.");
DEFINE_int64(cache_size, 8 << 20, // 8MB DEFINE_int64(cache_size, 8 << 20, // 8MB
"Number of bytes to use as a cache of uncompressed data"); "Number of bytes to use as a cache of uncompressed data");
@ -4313,6 +4316,8 @@ class Benchmark {
} }
options.compaction_options_universal.allow_trivial_move = options.compaction_options_universal.allow_trivial_move =
FLAGS_universal_allow_trivial_move; FLAGS_universal_allow_trivial_move;
options.compaction_options_universal.incremental =
FLAGS_universal_incremental;
if (FLAGS_thread_status_per_interval > 0) { if (FLAGS_thread_status_per_interval > 0) {
options.enable_thread_tracking = true; options.enable_thread_tracking = true;
} }

Loading…
Cancel
Save