Align compaction output file boundaries to the next level ones (#10655)

Summary:
Try to align the compaction output file boundaries to the next level ones
(grandparent level), to reduce the level compaction write-amplification.

In level compaction, there are "wasted" data at the beginning and end of the
output level files. Align the file boundary can avoid such "wasted" compaction.
With this PR, it tries to align the non-bottommost level file boundaries to its
next level ones. It may cut file when the file size is large enough (at least
50% of target_file_size) and not too large (2x target_file_size).

db_bench shows about 12.56% compaction reduction:
```
TEST_TMPDIR=/data/dbbench2 ./db_bench --benchmarks=fillrandom,readrandom -max_background_jobs=12 -num=400000000 -target_file_size_base=33554432

# baseline:
Flush(GB): cumulative 25.882, interval 7.216
Cumulative compaction: 285.90 GB write, 162.36 MB/s write, 269.68 GB read, 153.15 MB/s read, 2926.7 seconds

# with this change:
Flush(GB): cumulative 25.882, interval 7.753
Cumulative compaction: 249.97 GB write, 141.96 MB/s write, 233.74 GB read, 132.74 MB/s read, 2534.9 seconds
```

The compaction simulator shows a similar result (14% with 100G random data).
As a side effect, with this PR, the SST file size can exceed the
target_file_size, but is capped at 2x target_file_size. And there will be
smaller files. Here are file size statistics when loading 100GB with the target
file size 32MB:
```
          baseline      this_PR
count  1.656000e+03  1.705000e+03
mean   3.116062e+07  3.028076e+07
std    7.145242e+06  8.046139e+06
```

The feature is enabled by default, to revert to the old behavior disable it
with `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size = false`

Also includes https://github.com/facebook/rocksdb/issues/1963 to cut file before skippable grandparent file. Which is for
use case like user adding 2 or more non-overlapping data range at the same
time, it can reduce the overlapping of 2 datasets in the lower levels.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10655

Reviewed By: cbi42

Differential Revision: D39552321

Pulled By: jay-zhuang

fbshipit-source-id: 640d15f159ab0cd973f2426cfc3af266fc8bdde2
main
Jay Zhuang 2 years ago committed by Facebook GitHub Bot
parent 47b57a3731
commit f3cc66632b
  1. 3
      HISTORY.md
  2. 10
      db/compaction/compaction.cc
  3. 4
      db/compaction/compaction.h
  4. 267
      db/compaction/compaction_job_test.cc
  5. 172
      db/compaction/compaction_outputs.cc
  6. 19
      db/compaction/compaction_outputs.h
  7. 22
      db/db_range_del_test.cc
  8. 12
      db/db_test2.cc
  9. 9
      include/rocksdb/advanced_options.h
  10. 7
      options/cf_options.cc
  11. 2
      options/cf_options.h
  12. 2
      options/options_helper.cc
  13. 1
      options/options_settable_test.cc
  14. 2
      options/options_test.cc
  15. 42
      table/mock_table.cc
  16. 6
      table/mock_table.h

@ -6,6 +6,9 @@
* Fixed an optimistic transaction validation bug caused by DBImpl::GetLatestSequenceForKey() returning non-latest seq for merge (#10724). * Fixed an optimistic transaction validation bug caused by DBImpl::GetLatestSequenceForKey() returning non-latest seq for merge (#10724).
* Fixed a bug in iterator refresh which could segfault for DeleteRange users (#10739). * Fixed a bug in iterator refresh which could segfault for DeleteRange users (#10739).
### Performance Improvements
* Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files.
## 7.7.0 (09/18/2022) ## 7.7.0 (09/18/2022)
### Bug Fixes ### Bug Fixes
* Fixed a hang when an operation such as `GetLiveFiles` or `CreateNewBackup` is asked to trigger and wait for memtable flush on a read-only DB. Such indirect requests for memtable flush are now ignored on a read-only DB. * Fixed a hang when an operation such as `GetLiveFiles` or `CreateNewBackup` is asked to trigger and wait for memtable flush on a read-only DB. Such indirect requests for memtable flush are now ignored on a read-only DB.

@ -221,7 +221,7 @@ Compaction::Compaction(
: input_vstorage_(vstorage), : input_vstorage_(vstorage),
start_level_(_inputs[0].level), start_level_(_inputs[0].level),
output_level_(_output_level), output_level_(_output_level),
max_output_file_size_(_target_file_size), target_output_file_size_(_target_file_size),
max_compaction_bytes_(_max_compaction_bytes), max_compaction_bytes_(_max_compaction_bytes),
max_subcompactions_(_max_subcompactions), max_subcompactions_(_max_subcompactions),
immutable_options_(_immutable_options), immutable_options_(_immutable_options),
@ -268,6 +268,14 @@ Compaction::Compaction(
max_subcompactions_ = _mutable_db_options.max_subcompactions; max_subcompactions_ = _mutable_db_options.max_subcompactions;
} }
// for the non-bottommost levels, it tries to build files match the target
// file size, but not guaranteed. It could be 2x the size of the target size.
max_output_file_size_ =
bottommost_level_ || grandparents_.empty() ||
!_immutable_options.level_compaction_dynamic_file_size
? target_output_file_size_
: 2 * target_output_file_size_;
#ifndef NDEBUG #ifndef NDEBUG
for (size_t i = 1; i < inputs_.size(); ++i) { for (size_t i = 1; i < inputs_.size(); ++i) {
assert(inputs_[i].level > inputs_[i - 1].level); assert(inputs_[i].level > inputs_[i - 1].level);

@ -163,6 +163,9 @@ class Compaction {
// Maximum size of files to build during this compaction. // Maximum size of files to build during this compaction.
uint64_t max_output_file_size() const { return max_output_file_size_; } uint64_t max_output_file_size() const { return max_output_file_size_; }
// Target output file size for this compaction
uint64_t target_output_file_size() const { return target_output_file_size_; }
// What compression for output // What compression for output
CompressionType output_compression() const { return output_compression_; } CompressionType output_compression() const { return output_compression_; }
@ -412,6 +415,7 @@ class Compaction {
const int start_level_; // the lowest level to be compacted const int start_level_; // the lowest level to be compacted
const int output_level_; // levels to which output files are stored const int output_level_; // levels to which output files are stored
uint64_t target_output_file_size_;
uint64_t max_output_file_size_; uint64_t max_output_file_size_;
uint64_t max_compaction_bytes_; uint64_t max_compaction_bytes_;
uint32_t max_subcompactions_; uint32_t max_subcompactions_;

@ -228,6 +228,9 @@ class CompactionJobTestBase : public testing::Test {
test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_)); test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
env_ = base_env; env_ = base_env;
fs_ = env_->GetFileSystem(); fs_ = env_->GetFileSystem();
// set default for the tests
mutable_cf_options_.target_file_size_base = 1024 * 1024;
mutable_cf_options_.max_compaction_bytes = 10 * 1024 * 1024;
} }
void SetUp() override { void SetUp() override {
@ -415,8 +418,9 @@ class CompactionJobTestBase : public testing::Test {
auto cfd = versions_->GetColumnFamilySet()->GetDefault(); auto cfd = versions_->GetColumnFamilySet()->GetDefault();
if (table_type_ == TableTypeForTest::kMockTable) { if (table_type_ == TableTypeForTest::kMockTable) {
assert(expected_results.size() == 1); ASSERT_EQ(compaction_job_stats_.num_output_files,
mock_table_factory_->AssertLatestFile(expected_results[0]); expected_results.size());
mock_table_factory_->AssertLatestFiles(expected_results);
} else { } else {
assert(table_type_ == TableTypeForTest::kBlockBasedTable); assert(table_type_ == TableTypeForTest::kBlockBasedTable);
} }
@ -426,7 +430,8 @@ class CompactionJobTestBase : public testing::Test {
ASSERT_EQ(expected_output_file_num, output_files.size()); ASSERT_EQ(expected_output_file_num, output_files.size());
if (table_type_ == TableTypeForTest::kMockTable) { if (table_type_ == TableTypeForTest::kMockTable) {
assert(output_files.size() == 1); assert(output_files.size() ==
static_cast<size_t>(expected_output_file_num));
const FileMetaData* const output_file = output_files[0]; const FileMetaData* const output_file = output_files[0];
ASSERT_EQ(output_file->oldest_blob_file_number, ASSERT_EQ(output_file->oldest_blob_file_number,
expected_oldest_blob_file_numbers[0]); expected_oldest_blob_file_numbers[0]);
@ -620,12 +625,22 @@ class CompactionJobTestBase : public testing::Test {
num_input_files += level_files.size(); num_input_files += level_files.size();
} }
std::vector<FileMetaData*> grandparents;
// it should actually be the next non-empty level
const int kGrandparentsLevel = output_level + 1;
if (kGrandparentsLevel < cf_options_.num_levels) {
grandparents =
cfd_->current()->storage_info()->LevelFiles(kGrandparentsLevel);
}
Compaction compaction( Compaction compaction(
cfd->current()->storage_info(), *cfd->ioptions(), cfd->current()->storage_info(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(), mutable_db_options_, *cfd->GetLatestMutableCFOptions(), mutable_db_options_,
compaction_input_files, output_level, 1024 * 1024, 10 * 1024 * 1024, 0, compaction_input_files, output_level,
kNoCompression, cfd->GetLatestMutableCFOptions()->compression_opts, mutable_cf_options_.target_file_size_base,
Temperature::kUnknown, max_subcompactions, {}, true); mutable_cf_options_.max_compaction_bytes, 0, kNoCompression,
cfd->GetLatestMutableCFOptions()->compression_opts,
Temperature::kUnknown, max_subcompactions, grandparents, true);
compaction.SetInputVersion(cfd->current()); compaction.SetInputVersion(cfd->current());
assert(db_options_.info_log); assert(db_options_.info_log);
@ -1721,6 +1736,246 @@ TEST_F(CompactionJobTest, ResultSerialization) {
} }
} }
class CompactionJobDynamicFileSizeTest
: public CompactionJobTestBase,
public ::testing::WithParamInterface<bool> {
public:
CompactionJobDynamicFileSizeTest()
: CompactionJobTestBase(
test::PerThreadDBPath("compaction_job_dynamic_file_size_test"),
BytewiseComparator(), [](uint64_t /*ts*/) { return ""; },
/*test_io_priority=*/false, TableTypeForTest::kMockTable) {}
};
TEST_P(CompactionJobDynamicFileSizeTest, CutForMaxCompactionBytes) {
// dynamic_file_size option should has no impact on cutting for max compaction
// bytes.
bool enable_dyanmic_file_size = GetParam();
cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
NewDB();
mutable_cf_options_.target_file_size_base = 80;
mutable_cf_options_.max_compaction_bytes = 21;
auto file1 = mock::MakeMockFile({
{KeyStr("c", 5U, kTypeValue), "val2"},
{KeyStr("n", 6U, kTypeValue), "val3"},
});
AddMockFile(file1);
auto file2 = mock::MakeMockFile({{KeyStr("h", 3U, kTypeValue), "val"},
{KeyStr("j", 4U, kTypeValue), "val"}});
AddMockFile(file2, 1);
// Create three L2 files, each size 10.
// max_compaction_bytes 21 means the compaction output in L1 will
// be cut to at least two files.
auto file3 = mock::MakeMockFile({{KeyStr("b", 1U, kTypeValue), "val"},
{KeyStr("c", 1U, kTypeValue), "val"},
{KeyStr("c1", 1U, kTypeValue), "val"},
{KeyStr("c2", 1U, kTypeValue), "val"},
{KeyStr("c3", 1U, kTypeValue), "val"},
{KeyStr("c4", 1U, kTypeValue), "val"},
{KeyStr("d", 1U, kTypeValue), "val"},
{KeyStr("e", 2U, kTypeValue), "val"}});
AddMockFile(file3, 2);
auto file4 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
{KeyStr("i", 1U, kTypeValue), "val"},
{KeyStr("i1", 1U, kTypeValue), "val"},
{KeyStr("i2", 1U, kTypeValue), "val"},
{KeyStr("i3", 1U, kTypeValue), "val"},
{KeyStr("i4", 1U, kTypeValue), "val"},
{KeyStr("j", 1U, kTypeValue), "val"},
{KeyStr("k", 2U, kTypeValue), "val"}});
AddMockFile(file4, 2);
auto file5 = mock::MakeMockFile({{KeyStr("l", 1U, kTypeValue), "val"},
{KeyStr("m", 1U, kTypeValue), "val"},
{KeyStr("m1", 1U, kTypeValue), "val"},
{KeyStr("m2", 1U, kTypeValue), "val"},
{KeyStr("m3", 1U, kTypeValue), "val"},
{KeyStr("m4", 1U, kTypeValue), "val"},
{KeyStr("n", 1U, kTypeValue), "val"},
{KeyStr("o", 2U, kTypeValue), "val"}});
AddMockFile(file5, 2);
auto expected_file1 =
mock::MakeMockFile({{KeyStr("c", 5U, kTypeValue), "val2"},
{KeyStr("h", 3U, kTypeValue), "val"}});
auto expected_file2 =
mock::MakeMockFile({{KeyStr("j", 4U, kTypeValue), "val"},
{KeyStr("n", 6U, kTypeValue), "val3"}});
SetLastSequence(6U);
const std::vector<int> input_levels = {0, 1};
auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
RunCompaction({lvl0_files, lvl1_files}, input_levels,
{expected_file1, expected_file2});
}
TEST_P(CompactionJobDynamicFileSizeTest, CutToSkipGrandparentFile) {
bool enable_dyanmic_file_size = GetParam();
cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
NewDB();
// Make sure the grandparent level file size (10) qualifies skipping.
// Currently, it has to be > 1/8 of target file size.
mutable_cf_options_.target_file_size_base = 70;
auto file1 = mock::MakeMockFile({
{KeyStr("a", 5U, kTypeValue), "val2"},
{KeyStr("z", 6U, kTypeValue), "val3"},
});
AddMockFile(file1);
auto file2 = mock::MakeMockFile({{KeyStr("c", 3U, kTypeValue), "val"},
{KeyStr("x", 4U, kTypeValue), "val"}});
AddMockFile(file2, 1);
auto file3 = mock::MakeMockFile({{KeyStr("b", 1U, kTypeValue), "val"},
{KeyStr("d", 2U, kTypeValue), "val"}});
AddMockFile(file3, 2);
auto file4 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
{KeyStr("i", 2U, kTypeValue), "val"}});
AddMockFile(file4, 2);
auto file5 = mock::MakeMockFile({{KeyStr("v", 1U, kTypeValue), "val"},
{KeyStr("y", 2U, kTypeValue), "val"}});
AddMockFile(file5, 2);
auto expected_file1 =
mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
{KeyStr("c", 3U, kTypeValue), "val"}});
auto expected_file2 =
mock::MakeMockFile({{KeyStr("x", 4U, kTypeValue), "val"},
{KeyStr("z", 6U, kTypeValue), "val3"}});
auto expected_file_disable_dynamic_file_size =
mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
{KeyStr("c", 3U, kTypeValue), "val"},
{KeyStr("x", 4U, kTypeValue), "val"},
{KeyStr("z", 6U, kTypeValue), "val3"}});
SetLastSequence(6U);
const std::vector<int> input_levels = {0, 1};
auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
if (enable_dyanmic_file_size) {
RunCompaction({lvl0_files, lvl1_files}, input_levels,
{expected_file1, expected_file2});
} else {
RunCompaction({lvl0_files, lvl1_files}, input_levels,
{expected_file_disable_dynamic_file_size});
}
}
TEST_P(CompactionJobDynamicFileSizeTest, CutToAlignGrandparentBoundary) {
bool enable_dyanmic_file_size = GetParam();
cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
NewDB();
// MockTable has 1 byte per entry by default and each file is 10 bytes.
// When the file size is smaller than 100, it won't cut file earlier to align
// with its grandparent boundary.
const size_t kKeyValueSize = 10000;
mock_table_factory_->SetKeyValueSize(kKeyValueSize);
mutable_cf_options_.target_file_size_base = 10 * kKeyValueSize;
mock::KVVector file1;
char ch = 'd';
// Add value from d -> o
for (char i = 0; i < 12; i++) {
file1.emplace_back(KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
"val" + std::to_string(i));
}
AddMockFile(file1);
auto file2 = mock::MakeMockFile({{KeyStr("e", 3U, kTypeValue), "val"},
{KeyStr("s", 4U, kTypeValue), "val"}});
AddMockFile(file2, 1);
// the 1st grandparent file should be skipped
auto file3 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
{KeyStr("b", 2U, kTypeValue), "val"}});
AddMockFile(file3, 2);
auto file4 = mock::MakeMockFile({{KeyStr("c", 1U, kTypeValue), "val"},
{KeyStr("e", 2U, kTypeValue), "val"}});
AddMockFile(file4, 2);
auto file5 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
{KeyStr("j", 2U, kTypeValue), "val"}});
AddMockFile(file5, 2);
auto file6 = mock::MakeMockFile({{KeyStr("k", 1U, kTypeValue), "val"},
{KeyStr("n", 2U, kTypeValue), "val"}});
AddMockFile(file6, 2);
auto file7 = mock::MakeMockFile({{KeyStr("q", 1U, kTypeValue), "val"},
{KeyStr("t", 2U, kTypeValue), "val"}});
AddMockFile(file7, 2);
// The expected outputs are:
// L1: [d,e,f,g,h,i,j] [k,l,m,n,o,s]
// L2: [a, b] [c, e] [h, j] [k, n] [q, t]
// The first output cut earlier at "j", so it could be aligned with L2 files.
// If dynamic_file_size is not enabled, it will be cut based on the
// target_file_size
mock::KVVector expected_file1;
for (char i = 0; i < 7; i++) {
expected_file1.emplace_back(
KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
"val" + std::to_string(i));
}
mock::KVVector expected_file2;
for (char i = 7; i < 12; i++) {
expected_file2.emplace_back(
KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
"val" + std::to_string(i));
}
expected_file2.emplace_back(KeyStr("s", 4U, kTypeValue), "val");
mock::KVVector expected_file_disable_dynamic_file_size1;
for (char i = 0; i < 10; i++) {
expected_file_disable_dynamic_file_size1.emplace_back(
KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
"val" + std::to_string(i));
}
mock::KVVector expected_file_disable_dynamic_file_size2;
for (char i = 10; i < 12; i++) {
expected_file_disable_dynamic_file_size2.emplace_back(
KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
"val" + std::to_string(i));
}
expected_file_disable_dynamic_file_size2.emplace_back(
KeyStr("s", 4U, kTypeValue), "val");
SetLastSequence(22U);
const std::vector<int> input_levels = {0, 1};
auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
if (enable_dyanmic_file_size) {
RunCompaction({lvl0_files, lvl1_files}, input_levels,
{expected_file1, expected_file2});
} else {
RunCompaction({lvl0_files, lvl1_files}, input_levels,
{expected_file_disable_dynamic_file_size1,
expected_file_disable_dynamic_file_size2});
}
}
INSTANTIATE_TEST_CASE_P(CompactionJobDynamicFileSizeTest,
CompactionJobDynamicFileSizeTest, testing::Bool());
class CompactionJobTimestampTest : public CompactionJobTestBase { class CompactionJobTimestampTest : public CompactionJobTestBase {
public: public:

@ -76,13 +76,86 @@ IOStatus CompactionOutputs::WriterSyncClose(const Status& input_status,
return io_s; return io_s;
} }
size_t CompactionOutputs::UpdateGrandparentBoundaryInfo(
const Slice& internal_key) {
size_t curr_key_boundary_switched_num = 0;
const std::vector<FileMetaData*>& grandparents = compaction_->grandparents();
if (grandparents.empty()) {
return curr_key_boundary_switched_num;
}
// TODO: here it uses the internal comparator but Compaction picker uses user
// comparator to get a clean cut with `GetOverlappingInputs()`, which means
// this function may cut files still be overlapped in compaction, for example
// current function can generate L1 files like:
// L1: [2-21] [22-30]
// L2: [1-10] [21-30]
// Because L1 `21` has higher seq_number, which is smaller than `21` on L2,
// it cuts in the first file. But for compaction picker L1 [2-21] file
// overlaps with both files on L2. Ideally it should cut to
// L1: [2-20] [21-30]
const InternalKeyComparator* icmp =
&compaction_->column_family_data()->internal_comparator();
while (grandparent_index_ < grandparents.size()) {
if (being_grandparent_gap_) {
if (icmp->Compare(internal_key,
grandparents[grandparent_index_]->smallest.Encode()) <
0) {
break;
}
if (seen_key_) {
curr_key_boundary_switched_num++;
grandparent_overlapped_bytes_ +=
grandparents[grandparent_index_]->fd.GetFileSize();
grandparent_boundary_switched_num_++;
}
being_grandparent_gap_ = false;
} else {
if (icmp->Compare(internal_key,
grandparents[grandparent_index_]->largest.Encode()) <=
0) {
break;
}
if (seen_key_) {
curr_key_boundary_switched_num++;
grandparent_boundary_switched_num_++;
}
being_grandparent_gap_ = true;
grandparent_index_++;
}
}
// If the first key is in the middle of a grandparent file, adding it to the
// overlap
if (!seen_key_ && !being_grandparent_gap_) {
assert(grandparent_overlapped_bytes_ == 0);
grandparent_overlapped_bytes_ =
grandparents[grandparent_index_]->fd.GetFileSize();
}
seen_key_ = true;
return curr_key_boundary_switched_num;
}
bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) { bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) {
assert(c_iter.Valid()); assert(c_iter.Valid());
// always update grandparent information like overlapped file number, size
// etc.
const Slice& internal_key = c_iter.key();
const uint64_t previous_overlapped_bytes = grandparent_overlapped_bytes_;
size_t num_grandparent_boundaries_crossed =
UpdateGrandparentBoundaryInfo(internal_key);
if (!HasBuilder()) {
return false;
}
// If there's user defined partitioner, check that first // If there's user defined partitioner, check that first
if (HasBuilder() && partitioner_ && if (partitioner_ && partitioner_->ShouldPartition(PartitionerRequest(
partitioner_->ShouldPartition( last_key_for_partitioner_, c_iter.user_key(),
PartitionerRequest(last_key_for_partitioner_, c_iter.user_key(),
current_output_file_size_)) == kRequired) { current_output_file_size_)) == kRequired) {
return true; return true;
} }
@ -92,12 +165,11 @@ bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) {
return false; return false;
} }
// reach the target file size // reach the max file size
if (current_output_file_size_ >= compaction_->max_output_file_size()) { if (current_output_file_size_ >= compaction_->max_output_file_size()) {
return true; return true;
} }
const Slice& internal_key = c_iter.key();
const InternalKeyComparator* icmp = const InternalKeyComparator* icmp =
&compaction_->column_family_data()->internal_comparator(); &compaction_->column_family_data()->internal_comparator();
@ -111,33 +183,68 @@ bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) {
} }
} }
// Update grandparent information // only check if the current key is going to cross the grandparents file
const std::vector<FileMetaData*>& grandparents = compaction_->grandparents(); // boundary (either the file beginning or ending).
bool grandparant_file_switched = false; if (num_grandparent_boundaries_crossed > 0) {
// Scan to find the earliest grandparent file that contains key. // Cut the file before the current key if the size of the current output
while (grandparent_index_ < grandparents.size() && // file + its overlapped grandparent files is bigger than
icmp->Compare(internal_key, // max_compaction_bytes. Which is to prevent future bigger than
grandparents[grandparent_index_]->largest.Encode()) > // max_compaction_bytes compaction from the current output level.
0) { if (grandparent_overlapped_bytes_ + current_output_file_size_ >
if (seen_key_) { compaction_->max_compaction_bytes()) {
overlapped_bytes_ += grandparents[grandparent_index_]->fd.GetFileSize(); return true;
grandparant_file_switched = true;
} }
assert(grandparent_index_ + 1 >= grandparents.size() ||
icmp->Compare( // Cut the file if including the key is going to add a skippable file on
grandparents[grandparent_index_]->largest.Encode(), // the grandparent level AND its size is reasonably big (1/8 of target file
grandparents[grandparent_index_ + 1]->smallest.Encode()) <= 0); // size). For example, if it's compacting the files L0 + L1:
grandparent_index_++; // L0: [1, 21]
// L1: [3, 23]
// L2: [2, 4] [11, 15] [22, 24]
// Without this break, it will output as:
// L1: [1,3, 21,23]
// With this break, it will output as (assuming [11, 15] at L2 is bigger
// than 1/8 of target size):
// L1: [1,3] [21,23]
// Then for the future compactions, [11,15] won't be included.
// For random datasets (either evenly distributed or skewed), it rarely
// triggers this condition, but if the user is adding 2 different datasets
// without any overlap, it may likely happen.
// More details, check PR #1963
const size_t num_skippable_boundaries_crossed =
being_grandparent_gap_ ? 2 : 3;
if (compaction_->immutable_options()->compaction_style ==
kCompactionStyleLevel &&
compaction_->immutable_options()->level_compaction_dynamic_file_size &&
num_grandparent_boundaries_crossed >=
num_skippable_boundaries_crossed &&
grandparent_overlapped_bytes_ - previous_overlapped_bytes >
compaction_->target_output_file_size() / 8) {
return true;
} }
seen_key_ = true;
if (grandparant_file_switched && // Pre-cut the output file if it's reaching a certain size AND it's at the
overlapped_bytes_ + current_output_file_size_ > // boundary of a grandparent file. It can reduce the future compaction size,
compaction_->max_compaction_bytes()) { // the cost is having smaller files.
// Too much overlap for current output; start new output // The pre-cut size threshold is based on how many grandparent boundaries
overlapped_bytes_ = 0; // it has seen before. Basically, if it has seen no boundary at all, then it
// will pre-cut at 50% target file size. Every boundary it has seen
// increases the threshold by 5%, max at 90%, which it will always cut.
// The idea is based on if it has seen more boundaries before, it will more
// likely to see another boundary (file cutting opportunity) before the
// target file size. The test shows it can generate larger files than a
// static threshold like 75% and has a similar write amplification
// improvement.
if (compaction_->immutable_options()->compaction_style ==
kCompactionStyleLevel &&
compaction_->immutable_options()->level_compaction_dynamic_file_size &&
current_output_file_size_ >
((compaction_->target_output_file_size() + 99) / 100) *
(50 + std::min(grandparent_boundary_switched_num_ * 5,
size_t{40}))) {
return true; return true;
} }
}
// check ttl file boundaries if there's any // check ttl file boundaries if there's any
if (!files_to_cut_for_ttl_.empty()) { if (!files_to_cut_for_ttl_.empty()) {
@ -189,6 +296,14 @@ Status CompactionOutputs::AddToOutput(
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
// reset grandparent information
const std::vector<FileMetaData*>& grandparents =
compaction_->grandparents();
grandparent_overlapped_bytes_ =
being_grandparent_gap_
? 0
: grandparents[grandparent_index_]->fd.GetFileSize();
grandparent_boundary_switched_num_ = 0;
} }
// Open output file if necessary // Open output file if necessary
@ -199,10 +314,9 @@ Status CompactionOutputs::AddToOutput(
} }
} }
Output& curr = current_output();
assert(builder_ != nullptr); assert(builder_ != nullptr);
const Slice& value = c_iter.value(); const Slice& value = c_iter.value();
s = curr.validator.Add(key, value); s = current_output().validator.Add(key, value);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

@ -216,9 +216,11 @@ class CompactionOutputs {
} }
} }
uint64_t GetCurrentOutputFileSize() const { // update tracked grandparents information like grandparent index, if it's
return current_output_file_size_; // in the gap between 2 grandparent files, accumulated grandparent files size
} // etc.
// It returns how many boundaries it crosses by including current key.
size_t UpdateGrandparentBoundaryInfo(const Slice& internal_key);
// Add current key from compaction_iterator to the output file. If needed // Add current key from compaction_iterator to the output file. If needed
// close and open new compaction output with the functions provided. // close and open new compaction output with the functions provided.
@ -311,12 +313,21 @@ class CompactionOutputs {
// An index that used to speed up ShouldStopBefore(). // An index that used to speed up ShouldStopBefore().
size_t grandparent_index_ = 0; size_t grandparent_index_ = 0;
// if the output key is being grandparent files gap, so:
// key > grandparents[grandparent_index_ - 1].largest &&
// key < grandparents[grandparent_index_].smallest
bool being_grandparent_gap_ = true;
// The number of bytes overlapping between the current output and // The number of bytes overlapping between the current output and
// grandparent files used in ShouldStopBefore(). // grandparent files used in ShouldStopBefore().
uint64_t overlapped_bytes_ = 0; uint64_t grandparent_overlapped_bytes_ = 0;
// A flag determines whether the key has been seen in ShouldStopBefore() // A flag determines whether the key has been seen in ShouldStopBefore()
bool seen_key_ = false; bool seen_key_ = false;
// for the current output file, how many file boundaries has it crossed,
// basically number of files overlapped * 2
size_t grandparent_boundary_switched_num_ = 0;
}; };
// helper struct to concatenate the last level and penultimate level outputs // helper struct to concatenate the last level and penultimate level outputs

@ -1029,7 +1029,11 @@ TEST_F(DBRangeDelTest, CompactionTreatsSplitInputLevelDeletionAtomically) {
options.level0_file_num_compaction_trigger = kNumFilesPerLevel; options.level0_file_num_compaction_trigger = kNumFilesPerLevel;
options.memtable_factory.reset( options.memtable_factory.reset(
test::NewSpecialSkipListFactory(2 /* num_entries_flush */)); test::NewSpecialSkipListFactory(2 /* num_entries_flush */));
options.target_file_size_base = kValueBytes; // max file size could be 2x of target file size, so set it to half of that
options.target_file_size_base = kValueBytes / 2;
// disable dynamic_file_size, as it will cut L1 files into more files (than
// kNumFilesPerLevel).
options.level_compaction_dynamic_file_size = false;
options.max_compaction_bytes = 1500; options.max_compaction_bytes = 1500;
// i == 0: CompactFiles // i == 0: CompactFiles
// i == 1: CompactRange // i == 1: CompactRange
@ -1100,6 +1104,12 @@ TEST_F(DBRangeDelTest, RangeTombstoneEndKeyAsSstableUpperBound) {
test::NewSpecialSkipListFactory(2 /* num_entries_flush */)); test::NewSpecialSkipListFactory(2 /* num_entries_flush */));
options.target_file_size_base = kValueBytes; options.target_file_size_base = kValueBytes;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
// disable it for now, otherwise the L1 files are going be cut before data 1:
// L1: [0] [1,4]
// L2: [0,0]
// because the grandparent file is between [0]->[1] and it's size is more than
// 1/8 of target size (4k).
options.level_compaction_dynamic_file_size = false;
DestroyAndReopen(options); DestroyAndReopen(options);
@ -1715,17 +1725,17 @@ TEST_F(DBRangeDelTest, OverlappedKeys) {
ASSERT_OK(db_->Flush(FlushOptions())); ASSERT_OK(db_->Flush(FlushOptions()));
ASSERT_EQ(1, NumTableFilesAtLevel(0)); ASSERT_EQ(1, NumTableFilesAtLevel(0));
// The key range is broken up into three SSTs to avoid a future big compaction // The key range is broken up into 4 SSTs to avoid a future big compaction
// with the grandparent // with the grandparent
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */)); true /* disallow_trivial_move */));
ASSERT_EQ(3, NumTableFilesAtLevel(1)); ASSERT_EQ(4, NumTableFilesAtLevel(1));
ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr,
true /* disallow_trivial_move */)); true /* disallow_trivial_move */));
ASSERT_EQ( // L1->L2 compaction outputs to 2 files because there are 2 separated
3, NumTableFilesAtLevel( // compactions: [0-4] and [5-9]
2)); // L1->L2 compaction size is limited to max_compaction_bytes ASSERT_EQ(2, NumTableFilesAtLevel(2));
ASSERT_EQ(0, NumTableFilesAtLevel(1)); ASSERT_EQ(0, NumTableFilesAtLevel(1));
} }

@ -2379,9 +2379,15 @@ TEST_F(DBTest2, MaxCompactionBytesTest) {
} }
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Output files to L1 are cut to three pieces, according to // Output files to L1 are cut to 4 pieces, according to
// options.max_compaction_bytes // options.max_compaction_bytes (300K)
ASSERT_EQ("0,3,8", FilesPerLevel(0)); // There are 8 files on L2 (grandparents level), each one is 100K. The first
// file overlaps with a, b which max_compaction_bytes is less than 300K, the
// second one overlaps with d, e, which is also less than 300K. Including any
// extra grandparent file will make the future compaction larger than 300K.
// L1: [ 1 ] [ 2 ] [ 3 ] [ 4 ]
// L2: [a] [b] [c] [d] [e] [f] [g] [h]
ASSERT_EQ("0,4,8", FilesPerLevel(0));
} }
static void UniqueIdCallback(void* arg) { static void UniqueIdCallback(void* arg) {

@ -648,6 +648,15 @@ struct AdvancedColumnFamilyOptions {
// Default: false // Default: false
bool level_compaction_dynamic_level_bytes = false; bool level_compaction_dynamic_level_bytes = false;
// Allows RocksDB to generate files that are not exactly the target_file_size
// only for the non-bottommost files. Which can reduce the write-amplification
// from compaction. The file size could be from 0 to 2x target_file_size.
// Once enabled, non-bottommost compaction will try to cut the files align
// with the next level file boundaries (grandparent level).
//
// Default: true
bool level_compaction_dynamic_file_size = true;
// Default: 10. // Default: 10.
// //
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API

@ -550,6 +550,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
level_compaction_dynamic_level_bytes), level_compaction_dynamic_level_bytes),
OptionType::kBoolean, OptionVerificationType::kNormal, OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
{"level_compaction_dynamic_file_size",
{offsetof(struct ImmutableCFOptions,
level_compaction_dynamic_file_size),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"optimize_filters_for_hits", {"optimize_filters_for_hits",
{offsetof(struct ImmutableCFOptions, optimize_filters_for_hits), {offsetof(struct ImmutableCFOptions, optimize_filters_for_hits),
OptionType::kBoolean, OptionVerificationType::kNormal, OptionType::kBoolean, OptionVerificationType::kNormal,
@ -892,6 +897,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ColumnFamilyOptions& cf_options)
bloom_locality(cf_options.bloom_locality), bloom_locality(cf_options.bloom_locality),
level_compaction_dynamic_level_bytes( level_compaction_dynamic_level_bytes(
cf_options.level_compaction_dynamic_level_bytes), cf_options.level_compaction_dynamic_level_bytes),
level_compaction_dynamic_file_size(
cf_options.level_compaction_dynamic_file_size),
num_levels(cf_options.num_levels), num_levels(cf_options.num_levels),
optimize_filters_for_hits(cf_options.optimize_filters_for_hits), optimize_filters_for_hits(cf_options.optimize_filters_for_hits),
force_consistency_checks(cf_options.force_consistency_checks), force_consistency_checks(cf_options.force_consistency_checks),

@ -64,6 +64,8 @@ struct ImmutableCFOptions {
bool level_compaction_dynamic_level_bytes; bool level_compaction_dynamic_level_bytes;
bool level_compaction_dynamic_file_size;
int num_levels; int num_levels;
bool optimize_filters_for_hits; bool optimize_filters_for_hits;

@ -301,6 +301,8 @@ void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions,
cf_opts->bloom_locality = ioptions.bloom_locality; cf_opts->bloom_locality = ioptions.bloom_locality;
cf_opts->level_compaction_dynamic_level_bytes = cf_opts->level_compaction_dynamic_level_bytes =
ioptions.level_compaction_dynamic_level_bytes; ioptions.level_compaction_dynamic_level_bytes;
cf_opts->level_compaction_dynamic_file_size =
ioptions.level_compaction_dynamic_file_size;
cf_opts->num_levels = ioptions.num_levels; cf_opts->num_levels = ioptions.num_levels;
cf_opts->optimize_filters_for_hits = ioptions.optimize_filters_for_hits; cf_opts->optimize_filters_for_hits = ioptions.optimize_filters_for_hits;
cf_opts->force_consistency_checks = ioptions.force_consistency_checks; cf_opts->force_consistency_checks = ioptions.force_consistency_checks;

@ -509,6 +509,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"experimental_mempurge_threshold=0.0001;" "experimental_mempurge_threshold=0.0001;"
"optimize_filters_for_hits=false;" "optimize_filters_for_hits=false;"
"level_compaction_dynamic_level_bytes=false;" "level_compaction_dynamic_level_bytes=false;"
"level_compaction_dynamic_file_size=true;"
"inplace_update_support=false;" "inplace_update_support=false;"
"compaction_style=kCompactionStyleFIFO;" "compaction_style=kCompactionStyleFIFO;"
"compaction_pri=kMinOverlappingRatio;" "compaction_pri=kMinOverlappingRatio;"

@ -2322,6 +2322,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
{"target_file_size_multiplier", "13"}, {"target_file_size_multiplier", "13"},
{"max_bytes_for_level_base", "14"}, {"max_bytes_for_level_base", "14"},
{"level_compaction_dynamic_level_bytes", "true"}, {"level_compaction_dynamic_level_bytes", "true"},
{"level_compaction_dynamic_file_size", "true"},
{"max_bytes_for_level_multiplier", "15.0"}, {"max_bytes_for_level_multiplier", "15.0"},
{"max_bytes_for_level_multiplier_additional", "16:17:18"}, {"max_bytes_for_level_multiplier_additional", "16:17:18"},
{"max_compaction_bytes", "21"}, {"max_compaction_bytes", "21"},
@ -2458,6 +2459,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13); ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U); ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U);
ASSERT_EQ(new_cf_opt.level_compaction_dynamic_level_bytes, true); ASSERT_EQ(new_cf_opt.level_compaction_dynamic_level_bytes, true);
ASSERT_EQ(new_cf_opt.level_compaction_dynamic_file_size, true);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier, 15.0); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier, 15.0);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional.size(), 3U); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional.size(), 3U);
ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional[0], 16); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional[0], 16);

@ -122,8 +122,12 @@ class MockTableBuilder : public TableBuilder {
public: public:
MockTableBuilder(uint32_t id, MockTableFileSystem* file_system, MockTableBuilder(uint32_t id, MockTableFileSystem* file_system,
MockTableFactory::MockCorruptionMode corrupt_mode = MockTableFactory::MockCorruptionMode corrupt_mode =
MockTableFactory::kCorruptNone) MockTableFactory::kCorruptNone,
: id_(id), file_system_(file_system), corrupt_mode_(corrupt_mode) { size_t key_value_size = 1)
: id_(id),
file_system_(file_system),
corrupt_mode_(corrupt_mode),
key_value_size_(key_value_size) {
table_ = MakeMockFile({}); table_ = MakeMockFile({});
} }
@ -171,7 +175,7 @@ class MockTableBuilder : public TableBuilder {
uint64_t NumEntries() const override { return table_.size(); } uint64_t NumEntries() const override { return table_.size(); }
uint64_t FileSize() const override { return table_.size(); } uint64_t FileSize() const override { return table_.size() * key_value_size_; }
TableProperties GetTableProperties() const override { TableProperties GetTableProperties() const override {
return TableProperties(); return TableProperties();
@ -191,6 +195,7 @@ class MockTableBuilder : public TableBuilder {
MockTableFileSystem* file_system_; MockTableFileSystem* file_system_;
int corrupt_mode_; int corrupt_mode_;
KVVector table_; KVVector table_;
size_t key_value_size_;
}; };
InternalIterator* MockTableReader::NewIterator( InternalIterator* MockTableReader::NewIterator(
@ -260,7 +265,8 @@ TableBuilder* MockTableFactory::NewTableBuilder(
Status s = GetAndWriteNextID(file, &id); Status s = GetAndWriteNextID(file, &id);
assert(s.ok()); assert(s.ok());
return new MockTableBuilder(id, &file_system_, corrupt_mode_); return new MockTableBuilder(id, &file_system_, corrupt_mode_,
key_value_size_);
} }
Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
@ -303,14 +309,25 @@ void MockTableFactory::AssertSingleFile(const KVVector& file_contents) {
ASSERT_EQ(file_contents, file_system_.files.begin()->second); ASSERT_EQ(file_contents, file_system_.files.begin()->second);
} }
void MockTableFactory::AssertLatestFile(const KVVector& file_contents) { void MockTableFactory::AssertLatestFiles(
ASSERT_GE(file_system_.files.size(), 1U); const std::vector<KVVector>& files_contents) {
auto latest = file_system_.files.end(); ASSERT_GE(file_system_.files.size(), files_contents.size());
--latest; auto it = file_system_.files.rbegin();
for (auto expect = files_contents.rbegin(); expect != files_contents.rend();
if (file_contents != latest->second) { expect++, it++) {
std::cout << "Wrong content! Content of latest file:" << std::endl; ASSERT_TRUE(it != file_system_.files.rend());
for (const auto& kv : latest->second) { if (*expect != it->second) {
std::cout << "Wrong content! Content of file, expect:" << std::endl;
for (const auto& kv : *expect) {
ParsedInternalKey ikey;
std::string key, value;
std::tie(key, value) = kv;
ASSERT_OK(ParseInternalKey(Slice(key), &ikey, true /* log_err_key */));
std::cout << ikey.DebugString(true, false) << " -> " << value
<< std::endl;
}
std::cout << "actual:" << std::endl;
for (const auto& kv : it->second) {
ParsedInternalKey ikey; ParsedInternalKey ikey;
std::string key, value; std::string key, value;
std::tie(key, value) = kv; std::tie(key, value) = kv;
@ -320,6 +337,7 @@ void MockTableFactory::AssertLatestFile(const KVVector& file_contents) {
} }
FAIL(); FAIL();
} }
}
} }
} // namespace mock } // namespace mock

@ -72,10 +72,12 @@ class MockTableFactory : public TableFactory {
} }
void SetCorruptionMode(MockCorruptionMode mode) { corrupt_mode_ = mode; } void SetCorruptionMode(MockCorruptionMode mode) { corrupt_mode_ = mode; }
void SetKeyValueSize(size_t size) { key_value_size_ = size; }
// This function will assert that only a single file exists and that the // This function will assert that only a single file exists and that the
// contents are equal to file_contents // contents are equal to file_contents
void AssertSingleFile(const KVVector& file_contents); void AssertSingleFile(const KVVector& file_contents);
void AssertLatestFile(const KVVector& file_contents); void AssertLatestFiles(const std::vector<KVVector>& files_contents);
private: private:
Status GetAndWriteNextID(WritableFileWriter* file, uint32_t* id) const; Status GetAndWriteNextID(WritableFileWriter* file, uint32_t* id) const;
@ -84,6 +86,8 @@ class MockTableFactory : public TableFactory {
mutable MockTableFileSystem file_system_; mutable MockTableFileSystem file_system_;
mutable std::atomic<uint32_t> next_id_; mutable std::atomic<uint32_t> next_id_;
MockCorruptionMode corrupt_mode_; MockCorruptionMode corrupt_mode_;
size_t key_value_size_ = 1;
}; };
} // namespace mock } // namespace mock

Loading…
Cancel
Save