Do not include last level in compaction when `allow_ingest_behind=true` (#11489)

Summary:
when a DB is configured with `allow_ingest_behind = true`, the last level should be reserved for ingested files and these files should not be included in any compaction. Currently, a major compaction can compact these files to smaller levels. This can cause future files to be rejected for ingest behind (see `ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile()`). This PR fixes the issue such that files in the last level is not included in any compaction.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11489

Test Plan: * Updated unit test `ExternalSSTFileTest.IngestBehind` to test that last level is not included in manual and auto-compaction.

Reviewed By: ajkr

Differential Revision: D46455711

Pulled By: cbi42

fbshipit-source-id: 5e2142c2a709ef932ad797897795021c06c4ac8c
oxigraph-main
Changyu Bi 2 years ago committed by Facebook GitHub Bot
parent cac3240cbf
commit 15e8a843d9
  1. 16
      db/compaction/compaction_picker.cc
  2. 10
      db/compaction/compaction_picker_test.cc
  3. 55
      db/compaction/compaction_picker_universal.cc
  4. 8
      db/db_impl/db_impl_compaction_flush.cc
  5. 40
      db/external_sst_file_test.cc
  6. 19
      db/version_set.cc
  7. 8
      db/version_set.h
  8. 10
      include/rocksdb/options.h
  9. 1
      unreleased_history/behavior_changes/ingest_behind_universal.md

@ -611,23 +611,21 @@ Compaction* CompactionPicker::CompactRange(
// Universal compaction with more than one level always compacts all the
// files together to the last level.
assert(vstorage->num_levels() > 1);
int max_output_level =
vstorage->MaxOutputLevel(ioptions_.allow_ingest_behind);
// DBImpl::CompactRange() set output level to be the last level
if (ioptions_.allow_ingest_behind) {
assert(output_level == vstorage->num_levels() - 2);
} else {
assert(output_level == vstorage->num_levels() - 1);
}
assert(output_level == max_output_level);
// DBImpl::RunManualCompaction will make full range for universal compaction
assert(begin == nullptr);
assert(end == nullptr);
*compaction_end = nullptr;
int start_level = 0;
for (; start_level < vstorage->num_levels() &&
for (; start_level <= max_output_level &&
vstorage->NumLevelFiles(start_level) == 0;
start_level++) {
}
if (start_level == vstorage->num_levels()) {
if (start_level > max_output_level) {
return nullptr;
}
@ -637,9 +635,9 @@ Compaction* CompactionPicker::CompactRange(
return nullptr;
}
std::vector<CompactionInputFiles> inputs(vstorage->num_levels() -
std::vector<CompactionInputFiles> inputs(max_output_level + 1 -
start_level);
for (int level = start_level; level < vstorage->num_levels(); level++) {
for (int level = start_level; level <= max_output_level; level++) {
inputs[level - start_level].level = level;
auto& files = inputs[level - start_level].files;
for (FileMetaData* f : vstorage->LevelFiles(level)) {

@ -505,7 +505,7 @@ TEST_F(CompactionPickerTest, NeedsCompactionUniversal) {
TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) {
const uint64_t kFileSize = 100000;
NewVersionStorage(1, kCompactionStyleUniversal);
NewVersionStorage(3 /* num_levels */, kCompactionStyleUniversal);
ioptions_.allow_ingest_behind = true;
ioptions_.num_levels = 3;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
@ -532,6 +532,14 @@ TEST_F(CompactionPickerTest, CompactionUniversalIngestBehindReservedLevel) {
// output level should be the one above the bottom-most
ASSERT_EQ(1, compaction->output_level());
// input should not include the reserved level
const std::vector<CompactionInputFiles>* inputs = compaction->inputs();
for (const auto& compaction_input : *inputs) {
if (!compaction_input.empty()) {
ASSERT_LT(compaction_input.level, 2);
}
}
}
// Tests if the files can be trivially moved in multi level
// universal compaction when allow_trivial_move option is set

@ -133,8 +133,8 @@ class UniversalCompactionBuilder {
UniversalCompactionPicker* picker_;
LogBuffer* log_buffer_;
static std::vector<SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage);
static std::vector<UniversalCompactionBuilder::SortedRun> CalculateSortedRuns(
const VersionStorageInfo& vstorage, int last_level);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
@ -339,13 +339,13 @@ void UniversalCompactionBuilder::SortedRun::DumpSizeInfo(
std::vector<UniversalCompactionBuilder::SortedRun>
UniversalCompactionBuilder::CalculateSortedRuns(
const VersionStorageInfo& vstorage) {
const VersionStorageInfo& vstorage, int last_level) {
std::vector<UniversalCompactionBuilder::SortedRun> ret;
for (FileMetaData* f : vstorage.LevelFiles(0)) {
ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size,
f->being_compacted);
}
for (int level = 1; level < vstorage.num_levels(); level++) {
for (int level = 1; level <= last_level; level++) {
uint64_t total_compensated_size = 0U;
uint64_t total_size = 0U;
bool being_compacted = false;
@ -374,7 +374,9 @@ UniversalCompactionBuilder::CalculateSortedRuns(
Compaction* UniversalCompactionBuilder::PickCompaction() {
const int kLevel0 = 0;
score_ = vstorage_->CompactionScore(kLevel0);
sorted_runs_ = CalculateSortedRuns(*vstorage_);
int max_output_level =
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind);
sorted_runs_ = CalculateSortedRuns(*vstorage_, max_output_level);
if (sorted_runs_.size() == 0 ||
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
@ -471,6 +473,8 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
"UniversalCompactionBuilder::PickCompaction:Return", nullptr);
return nullptr;
}
assert(c->output_level() <=
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind));
if (mutable_cf_options_.compaction_options_universal.allow_trivial_move ==
true &&
@ -698,22 +702,18 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSortedRuns(
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
int start_level = sorted_runs_[start_index].level;
int output_level;
// last level is reserved for the files ingested behind
int max_output_level =
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind);
if (first_index_after == sorted_runs_.size()) {
output_level = vstorage_->num_levels() - 1;
output_level = max_output_level;
} else if (sorted_runs_[first_index_after].level == 0) {
output_level = 0;
} else {
output_level = sorted_runs_[first_index_after].level - 1;
}
// last level is reserved for the files ingested behind
if (ioptions_.allow_ingest_behind &&
(output_level == vstorage_->num_levels() - 1)) {
assert(output_level > 1);
output_level--;
}
std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
std::vector<CompactionInputFiles> inputs(max_output_level + 1);
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
@ -1192,8 +1192,10 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
return nullptr;
}
int max_output_level =
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind);
// Pick the first non-empty level after the start_level
for (output_level = start_level + 1; output_level < vstorage_->num_levels();
for (output_level = start_level + 1; output_level <= max_output_level;
output_level++) {
if (vstorage_->NumLevelFiles(output_level) != 0) {
break;
@ -1201,9 +1203,9 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
}
// If all higher levels are empty, pick the highest level as output level
if (output_level == vstorage_->num_levels()) {
if (output_level > max_output_level) {
if (start_level == 0) {
output_level = vstorage_->num_levels() - 1;
output_level = max_output_level;
} else {
// If start level is non-zero and all higher levels are empty, this
// compaction will translate into a trivial move. Since the idea is
@ -1212,11 +1214,7 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
return nullptr;
}
}
if (ioptions_.allow_ingest_behind &&
output_level == vstorage_->num_levels() - 1) {
assert(output_level > 1);
output_level--;
}
assert(output_level <= max_output_level);
if (output_level != 0) {
if (start_level == 0) {
@ -1293,8 +1291,9 @@ Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange(
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
int start_level = sorted_runs_[start_index].level;
std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
int max_output_level =
vstorage_->MaxOutputLevel(ioptions_.allow_ingest_behind);
std::vector<CompactionInputFiles> inputs(max_output_level + 1);
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
@ -1331,13 +1330,7 @@ Compaction* UniversalCompactionBuilder::PickCompactionWithSortedRunRange(
int output_level;
if (end_index == sorted_runs_.size() - 1) {
// output files at the last level, unless it's reserved
output_level = vstorage_->num_levels() - 1;
// last level is reserved for the files ingested behind
if (ioptions_.allow_ingest_behind) {
assert(output_level > 1);
output_level--;
}
output_level = max_output_level;
} else {
// if it's not including all sorted_runs, it can only output to the level
// above the `end_index + 1` sorted_run.

@ -1386,6 +1386,14 @@ Status DBImpl::CompactFilesImpl(
}
}
if (cfd->ioptions()->allow_ingest_behind &&
output_level >= cfd->ioptions()->num_levels - 1) {
return Status::InvalidArgument(
"Exceed the maximum output level defined by "
"the current compaction algorithm with ingest_behind --- " +
std::to_string(cfd->ioptions()->num_levels - 1));
}
Status s = cfd->compaction_picker()->SanitizeCompactionInputFiles(
&input_set, cf_meta, output_level);
TEST_SYNC_POINT("DBImpl::CompactFilesImpl::PostSanitizeCompactionInputFiles");

@ -2160,13 +2160,13 @@ TEST_P(ExternalSSTFileTest, IngestBehind) {
// Insert 100 -> 200 into the memtable
for (int i = 100; i <= 200; i++) {
ASSERT_OK(Put(Key(i), "memtable"));
true_data[Key(i)] = "memtable";
}
// Insert 100 -> 200 using IngestExternalFile
file_data.clear();
for (int i = 0; i <= 20; i++) {
file_data.emplace_back(Key(i), "ingest_behind");
true_data[Key(i)] = "ingest_behind";
}
bool allow_global_seqno = true;
@ -2188,6 +2188,7 @@ TEST_P(ExternalSSTFileTest, IngestBehind) {
options.num_levels = 3;
DestroyAndReopen(options);
true_data.clear();
// Insert 100 -> 200 into the memtable
for (int i = 100; i <= 200; i++) {
ASSERT_OK(Put(Key(i), "memtable"));
@ -2207,12 +2208,43 @@ TEST_P(ExternalSSTFileTest, IngestBehind) {
verify_checksums_before_ingest, true /*ingest_behind*/,
false /*sort_data*/, &true_data));
ASSERT_EQ("0,1,1", FilesPerLevel());
std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files);
uint64_t ingested_file_number = level_to_files[2][0].fd.GetNumber();
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// bottom level should be empty
ASSERT_EQ("0,1", FilesPerLevel());
// Last level should not be compacted
ASSERT_EQ("0,1,1", FilesPerLevel());
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files);
ASSERT_EQ(ingested_file_number, level_to_files[2][0].fd.GetNumber());
size_t kcnt = 0;
VerifyDBFromMap(true_data, &kcnt, false);
// Auto-compaction should not include the last level.
// Trigger compaction if size amplification exceeds 110%.
options.compaction_options_universal.max_size_amplification_percent = 110;
options.level0_file_num_compaction_trigger = 4;
TryReopen(options);
Random rnd(301);
for (int i = 0; i < 4; ++i) {
for (int j = 0; j < 10; j++) {
true_data[Key(j)] = rnd.RandomString(1000);
ASSERT_OK(Put(Key(j), true_data[Key(j)]));
}
ASSERT_OK(Flush());
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files);
ASSERT_EQ(1, level_to_files[2].size());
ASSERT_EQ(ingested_file_number, level_to_files[2][0].fd.GetNumber());
// Turning off the option allows DB to compact ingested files.
options.allow_ingest_behind = false;
TryReopen(options);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
dbfull()->TEST_GetFilesMetaData(db_->DefaultColumnFamily(), &level_to_files);
ASSERT_EQ(1, level_to_files[2].size());
ASSERT_NE(ingested_file_number, level_to_files[2][0].fd.GetNumber());
VerifyDBFromMap(true_data, &kcnt, false);
}
TEST_F(ExternalSSTFileTest, SkipBloomFilter) {

@ -3390,6 +3390,7 @@ void VersionStorageInfo::ComputeCompactionScore(
// maintaining it to be over 1.0, we scale the original score by 10x
// if it is larger than 1.0.
const double kScoreScale = 10.0;
int max_output_level = MaxOutputLevel(immutable_options.allow_ingest_behind);
for (int level = 0; level <= MaxInputLevel(); level++) {
double score;
if (level == 0) {
@ -3417,7 +3418,7 @@ void VersionStorageInfo::ComputeCompactionScore(
// For universal compaction, we use level0 score to indicate
// compaction score for the whole DB. Adding other levels as if
// they are L0 files.
for (int i = 1; i < num_levels(); i++) {
for (int i = 1; i <= max_output_level; i++) {
// It's possible that a subset of the files in a level may be in a
// compaction, due to delete triggered compaction or trivial move.
// In that case, the below check may not catch a level being
@ -3561,16 +3562,18 @@ void VersionStorageInfo::ComputeCompactionScore(
}
}
}
ComputeFilesMarkedForCompaction();
ComputeFilesMarkedForCompaction(max_output_level);
if (!immutable_options.allow_ingest_behind) {
ComputeBottommostFilesMarkedForCompaction();
}
if (mutable_cf_options.ttl > 0) {
if (mutable_cf_options.ttl > 0 &&
compaction_style_ == kCompactionStyleLevel) {
ComputeExpiredTtlFiles(immutable_options, mutable_cf_options.ttl);
}
if (mutable_cf_options.periodic_compaction_seconds > 0) {
ComputeFilesMarkedForPeriodicCompaction(
immutable_options, mutable_cf_options.periodic_compaction_seconds);
immutable_options, mutable_cf_options.periodic_compaction_seconds,
max_output_level);
}
if (mutable_cf_options.enable_blob_garbage_collection &&
@ -3584,14 +3587,14 @@ void VersionStorageInfo::ComputeCompactionScore(
EstimateCompactionBytesNeeded(mutable_cf_options);
}
void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
void VersionStorageInfo::ComputeFilesMarkedForCompaction(int last_level) {
files_marked_for_compaction_.clear();
int last_qualify_level = 0;
// Do not include files from the last level with data
// If table properties collector suggests a file on the last level,
// we should not move it to a new level.
for (int level = num_levels() - 1; level >= 1; level--) {
for (int level = last_level; level >= 1; level--) {
if (!files_[level].empty()) {
last_qualify_level = level - 1;
break;
@ -3635,7 +3638,7 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
const ImmutableOptions& ioptions,
const uint64_t periodic_compaction_seconds) {
const uint64_t periodic_compaction_seconds, int last_level) {
assert(periodic_compaction_seconds > 0);
files_marked_for_periodic_compaction_.clear();
@ -3656,7 +3659,7 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
const uint64_t allowed_time_limit =
current_time - periodic_compaction_seconds;
for (int level = 0; level < num_levels(); level++) {
for (int level = 0; level <= last_level; level++) {
for (auto f : files_[level]) {
if (!f->being_compacted) {
// Compute a file's modification time in the following order:

@ -204,7 +204,7 @@ class VersionStorageInfo {
// This computes files_marked_for_compaction_ and is called by
// ComputeCompactionScore()
void ComputeFilesMarkedForCompaction();
void ComputeFilesMarkedForCompaction(int last_level);
// This computes ttl_expired_files_ and is called by
// ComputeCompactionScore()
@ -215,7 +215,7 @@ class VersionStorageInfo {
// ComputeCompactionScore()
void ComputeFilesMarkedForPeriodicCompaction(
const ImmutableOptions& ioptions,
const uint64_t periodic_compaction_seconds);
const uint64_t periodic_compaction_seconds, int last_level);
// This computes bottommost_files_marked_for_compaction_ and is called by
// ComputeCompactionScore() or UpdateOldestSnapshot().
@ -465,6 +465,7 @@ class VersionStorageInfo {
// REQUIRES: ComputeCompactionScore has been called
// REQUIRES: DB mutex held during access
// Used by Leveled Compaction only.
const autovector<std::pair<int, FileMetaData*>>& ExpiredTtlFiles() const {
assert(finalized_);
return expired_ttl_files_;
@ -472,6 +473,7 @@ class VersionStorageInfo {
// REQUIRES: ComputeCompactionScore has been called
// REQUIRES: DB mutex held during access
// Used by Leveled and Universal Compaction.
const autovector<std::pair<int, FileMetaData*>>&
FilesMarkedForPeriodicCompaction() const {
assert(finalized_);
@ -680,7 +682,7 @@ class VersionStorageInfo {
// This vector contains list of files marked for compaction and also not
// currently being compacted. It is protected by DB mutex. It is calculated in
// ComputeCompactionScore()
// ComputeCompactionScore(). Used by Leveled and Universal Compaction.
autovector<std::pair<int, FileMetaData*>> files_marked_for_compaction_;
autovector<std::pair<int, FileMetaData*>> expired_ttl_files_;

@ -1198,11 +1198,11 @@ struct DBOptions {
// Set this option to true during creation of database if you want
// to be able to ingest behind (call IngestExternalFile() skipping keys
// that already exist, rather than overwriting matching keys).
// Setting this option to true will affect 2 things:
// 1) Disable some internal optimizations around SST file compression
// 2) Reserve bottom-most level for ingested files only.
// Note that only universal compaction supports reserving last level
// for file ingestion only.
// Setting this option to true has the following effects:
// 1) Disable some internal optimizations around SST file compression.
// 2) Reserve the last level for ingested files only.
// 3) Compaction will not include any file from the last level.
// Note that only Universal Compaction supports allow_ingest_behind.
// `num_levels` should be >= 3 if this option is turned on.
//
//

@ -0,0 +1 @@
When a DB is openend with `allow_ingest_behind=true` (currently only Universal compaction is supported), files in the last level, i.e. the ingested files, will not be included in any compaction. (#11489)
Loading…
Cancel
Save