Support periodic compaction in universal compaction (#5970)

Summary:
Previously, periodic compaction is not supported in universal compaction. Add the support using following approach: if any file is marked as qualified for periodid compaction, trigger a full compaction. If a full compaction is prevented by files being compacted, try to compact the higher levels than files currently being compacted. If in this way we can only compact the last sorted run and none of the file to be compacted qualifies for periodic compaction, skip the compact. This is to prevent the same single level compaction from being executed again and again.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5970

Test Plan: Add several test cases.

Differential Revision: D18147097

fbshipit-source-id: 8ecc308154d9aca96fb192c51fbceba3947550c1
main
sdong 5 years ago committed by Facebook Github Bot
parent 2a9e5caffe
commit aa6f7d0995
  1. 3
      HISTORY.md
  2. 114
      db/compaction/compaction_picker_test.cc
  3. 229
      db/compaction/compaction_picker_universal.cc
  4. 81
      db/db_universal_compaction_test.cc
  5. 4
      db/version_set.h

@ -6,6 +6,9 @@
* Added an API GetCreationTimeOfOldestFile(uint64_t* creation_time) to get the
file_creation_time of the oldest SST file in the DB.
### New Features
* Universal compaction to support options.periodic_compaction_seconds. A full compaction will be triggered if any file is over the threshold.
## 6.5.1 (10/16/2019)
### Bug Fixes
* Revert the feature "Merging iterator to avoid child iterator reseek for some cases (#5286)" since it might cause strange results when reseek happens with a different iterator upper bound.

@ -499,6 +499,120 @@ TEST_F(CompactionPickerTest, AllowsTrivialMoveUniversal) {
ASSERT_TRUE(compaction->is_trivial_move());
}
TEST_F(CompactionPickerTest, UniversalPeriodicCompaction1) {
// The case where universal periodic compaction can be picked
// with some newer files being compacted.
const uint64_t kFileSize = 100000;
mutable_cf_options_.periodic_compaction_seconds = 1000;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 2U, "201", "250", kFileSize, 0, 401, 450);
Add(0, 4U, "260", "300", kFileSize, 0, 260, 300);
Add(3, 5U, "010", "080", kFileSize, 0, 200, 251);
Add(4, 3U, "301", "350", kFileSize, 0, 101, 150);
Add(4, 6U, "501", "750", kFileSize, 0, 101, 150);
file_map_[2].first->being_compacted = true;
UpdateVersionStorageInfo();
vstorage_->TEST_AddFileMarkedForPeriodicCompaction(4, file_map_[3].first);
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(compaction);
ASSERT_EQ(4, compaction->output_level());
ASSERT_EQ(0, compaction->start_level());
ASSERT_EQ(1U, compaction->num_input_files(0));
}
TEST_F(CompactionPickerTest, UniversalPeriodicCompaction2) {
// The case where universal periodic compaction does not
// pick up only level to compact if it doesn't cover
// any file marked as periodic compaction.
const uint64_t kFileSize = 100000;
mutable_cf_options_.periodic_compaction_seconds = 1000;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(3, 5U, "010", "080", kFileSize, 0, 200, 251);
Add(4, 3U, "301", "350", kFileSize, 0, 101, 150);
Add(4, 6U, "501", "750", kFileSize, 0, 101, 150);
file_map_[5].first->being_compacted = true;
UpdateVersionStorageInfo();
vstorage_->TEST_AddFileMarkedForPeriodicCompaction(0, file_map_[1].first);
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_FALSE(compaction);
}
TEST_F(CompactionPickerTest, UniversalPeriodicCompaction3) {
// The case where universal periodic compaction does not
// pick up only the last sorted run which is an L0 file if it isn't
// marked as periodic compaction.
const uint64_t kFileSize = 100000;
mutable_cf_options_.periodic_compaction_seconds = 1000;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(0, 5U, "010", "080", kFileSize, 0, 200, 251);
Add(0, 6U, "501", "750", kFileSize, 0, 101, 150);
file_map_[5].first->being_compacted = true;
UpdateVersionStorageInfo();
vstorage_->TEST_AddFileMarkedForPeriodicCompaction(0, file_map_[1].first);
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_FALSE(compaction);
}
TEST_F(CompactionPickerTest, UniversalPeriodicCompaction4) {
// The case where universal periodic compaction couldn't form
// a compaction that inlcudes any file marked for periodic compaction.
// Right now we form the compaction anyway if it is more than one
// sorted run. Just put the case here to validate that it doesn't
// crash.
const uint64_t kFileSize = 100000;
mutable_cf_options_.periodic_compaction_seconds = 1000;
UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_);
NewVersionStorage(5, kCompactionStyleUniversal);
Add(0, 1U, "150", "200", kFileSize, 0, 500, 550);
Add(2, 2U, "010", "080", kFileSize, 0, 200, 251);
Add(3, 5U, "010", "080", kFileSize, 0, 200, 251);
Add(4, 3U, "301", "350", kFileSize, 0, 101, 150);
Add(4, 6U, "501", "750", kFileSize, 0, 101, 150);
file_map_[2].first->being_compacted = true;
UpdateVersionStorageInfo();
vstorage_->TEST_AddFileMarkedForPeriodicCompaction(0, file_map_[2].first);
std::unique_ptr<Compaction> compaction(
universal_compaction_picker.PickCompaction(
cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_));
ASSERT_TRUE(!compaction ||
compaction->start_level() != compaction->output_level());
}
TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
NewVersionStorage(1, kCompactionStyleFIFO);
const int kFileCount =

@ -90,6 +90,19 @@ class UniversalCompactionBuilder {
Compaction* PickDeleteTriggeredCompaction();
// Form a compaction from the sorted run indicated by start_index to the
// oldest sorted run.
// The caller is responsible for making sure that those files are not in
// compaction.
Compaction* PickCompactionToOldest(size_t start_index,
CompactionReason compaction_reason);
// Try to pick periodic compaction. The caller should only call it
// if there is at least one file marked for periodic compaction.
// null will be returned if no such a compaction can be formed
// because some files are being compacted.
Compaction* PickPeriodicCompaction();
// Used in universal compaction when the enabled_trivial_move
// option is set. Checks whether there are any overlapping files
// in the input. Returns true if the input files are non
@ -253,6 +266,9 @@ bool UniversalCompactionPicker::NeedsCompaction(
if (vstorage->CompactionScore(kLevel0) >= 1) {
return true;
}
if (!vstorage->FilesMarkedForPeriodicCompaction().empty()) {
return true;
}
if (!vstorage->FilesMarkedForCompaction().empty()) {
return true;
}
@ -358,7 +374,8 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
CalculateSortedRuns(*vstorage_, ioptions_, mutable_cf_options_);
if (sorted_runs_.size() == 0 ||
(vstorage_->FilesMarkedForCompaction().empty() &&
(vstorage_->FilesMarkedForPeriodicCompaction().empty() &&
vstorage_->FilesMarkedForCompaction().empty() &&
sorted_runs_.size() < (unsigned int)mutable_cf_options_
.level0_file_num_compaction_trigger)) {
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: nothing to do\n",
@ -373,11 +390,19 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
"[%s] Universal: sorted runs files(%" ROCKSDB_PRIszt "): %s\n",
cf_name_.c_str(), sorted_runs_.size(), vstorage_->LevelSummary(&tmp));
// Check for size amplification first.
Compaction* c = nullptr;
if (sorted_runs_.size() >=
static_cast<size_t>(
mutable_cf_options_.level0_file_num_compaction_trigger)) {
// Periodic compaction has higher priority than other type of compaction
// because it's a hard requirement.
if (!vstorage_->FilesMarkedForPeriodicCompaction().empty()) {
// Always need to do a full compaction for periodic compaction.
c = PickPeriodicCompaction();
}
// Check for size amplification.
if (c == nullptr &&
sorted_runs_.size() >=
static_cast<size_t>(
mutable_cf_options_.level0_file_num_compaction_trigger)) {
if ((c = PickCompactionToReduceSizeAmp()) != nullptr) {
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: compacting for size amp\n",
cf_name_.c_str());
@ -441,7 +466,8 @@ Compaction* UniversalCompactionBuilder::PickCompaction() {
}
if (mutable_cf_options_.compaction_options_universal.allow_trivial_move ==
true) {
true &&
c->compaction_reason() != CompactionReason::kPeriodicCompaction) {
c->set_is_trivial_move(IsInputFilesNonOverlapping(c));
}
@ -815,59 +841,8 @@ Compaction* UniversalCompactionBuilder::PickCompactionToReduceSizeAmp() {
" earliest-file-size %" PRIu64,
cf_name_.c_str(), candidate_size, earliest_file_size);
}
assert(start_index < sorted_runs_.size() - 1);
// Estimate total file size
uint64_t estimated_total_size = 0;
for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
estimated_total_size += sorted_runs_[loop].size;
}
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
int start_level = sorted_runs_[start_index].level;
std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
// We always compact all the files, so always compress.
for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
auto& picking_sr = sorted_runs_[loop];
if (picking_sr.level == 0) {
FileMetaData* f = picking_sr.file;
inputs[0].files.push_back(f);
} else {
auto& files = inputs[picking_sr.level - start_level].files;
for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
files.push_back(f);
}
}
char file_num_buf[256];
picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: size amp picking %s",
cf_name_.c_str(), file_num_buf);
}
// output files at the bottom most level, unless it's reserved
int output_level = vstorage_->num_levels() - 1;
// last level is reserved for the files ingested behind
if (ioptions_.allow_ingest_behind) {
assert(output_level > 1);
output_level--;
}
return new Compaction(
vstorage_, ioptions_, mutable_cf_options_, std::move(inputs),
output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level, 1),
GetCompressionOptions(ioptions_, vstorage_, output_level),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
return PickCompactionToOldest(start_index,
CompactionReason::kUniversalSizeAmplification);
}
// Pick files marked for compaction. Typically, files are marked by
@ -987,6 +962,142 @@ Compaction* UniversalCompactionBuilder::PickDeleteTriggeredCompaction() {
score_, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);
}
Compaction* UniversalCompactionBuilder::PickCompactionToOldest(
size_t start_index, CompactionReason compaction_reason) {
assert(start_index < sorted_runs_.size() - 1);
// Estimate total file size
uint64_t estimated_total_size = 0;
for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
estimated_total_size += sorted_runs_[loop].size;
}
uint32_t path_id =
GetPathId(ioptions_, mutable_cf_options_, estimated_total_size);
int start_level = sorted_runs_[start_index].level;
std::vector<CompactionInputFiles> inputs(vstorage_->num_levels());
for (size_t i = 0; i < inputs.size(); ++i) {
inputs[i].level = start_level + static_cast<int>(i);
}
for (size_t loop = start_index; loop < sorted_runs_.size(); loop++) {
auto& picking_sr = sorted_runs_[loop];
if (picking_sr.level == 0) {
FileMetaData* f = picking_sr.file;
inputs[0].files.push_back(f);
} else {
auto& files = inputs[picking_sr.level - start_level].files;
for (auto* f : vstorage_->LevelFiles(picking_sr.level)) {
files.push_back(f);
}
}
std::string comp_reason_print_string;
if (compaction_reason == CompactionReason::kPeriodicCompaction) {
comp_reason_print_string = "periodic compaction";
} else if (compaction_reason ==
CompactionReason::kUniversalSizeAmplification) {
comp_reason_print_string = "size amp";
} else {
assert(false);
}
char file_num_buf[256];
picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop);
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: %s picking %s",
cf_name_.c_str(), comp_reason_print_string.c_str(),
file_num_buf);
}
// output files at the bottom most level, unless it's reserved
int output_level = vstorage_->num_levels() - 1;
// last level is reserved for the files ingested behind
if (ioptions_.allow_ingest_behind) {
assert(output_level > 1);
output_level--;
}
// We never check size for
// compaction_options_universal.compression_size_percent,
// because we always compact all the files, so always compress.
return new Compaction(
vstorage_, ioptions_, mutable_cf_options_, std::move(inputs),
output_level,
MaxFileSizeForLevel(mutable_cf_options_, output_level,
kCompactionStyleUniversal),
LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_, start_level,
1, true /* enable_compression */),
GetCompressionOptions(ioptions_, vstorage_, start_level,
true /* enable_compression */),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score_, false /* deletion_compaction */, compaction_reason);
}
Compaction* UniversalCompactionBuilder::PickPeriodicCompaction() {
ROCKS_LOG_BUFFER(log_buffer_, "[%s] Universal: Periodic Compaction",
cf_name_.c_str());
// In universal compaction, sorted runs contain older data are almost always
// generated earlier too. To simplify the problem, we just try to trigger
// a full compaction. We start from the oldest sorted run and include
// all sorted runs, until we hit a sorted already being compacted.
// Since usually the largest (which is usually the oldest) sorted run is
// included anyway, doing a full compaction won't increase write
// amplification much.
// Get some information from marked files to check whether a file is
// included in the compaction.
size_t start_index = sorted_runs_.size();
while (start_index > 0 && !sorted_runs_[start_index - 1].being_compacted) {
start_index--;
}
if (start_index == sorted_runs_.size()) {
return nullptr;
}
// There is a rare corner case where we can't pick up all the files
// because some files are being compacted and we end up with picking files
// but none of them need periodic compaction. Unless we simply recompact
// the last sorted run (either the last level or last L0 file), we would just
// execute the compaction, in order to simplify the logic.
if (start_index == sorted_runs_.size() - 1) {
bool included_file_marked = false;
int start_level = sorted_runs_[start_index].level;
FileMetaData* start_file = sorted_runs_[start_index].file;
for (const std::pair<int, FileMetaData*>& level_file_pair :
vstorage_->FilesMarkedForPeriodicCompaction()) {
if (start_level != 0) {
// Last sorted run is a level
if (start_level == level_file_pair.first) {
included_file_marked = true;
break;
}
} else {
// Last sorted run is a L0 file.
if (start_file == level_file_pair.second) {
included_file_marked = true;
break;
}
}
}
if (!included_file_marked) {
ROCKS_LOG_BUFFER(log_buffer_,
"[%s] Universal: Cannot form a compaction covering file "
"marked for periodic compaction",
cf_name_.c_str());
return nullptr;
}
}
Compaction* c = PickCompactionToOldest(start_index,
CompactionReason::kPeriodicCompaction);
TEST_SYNC_POINT_CALLBACK(
"UniversalCompactionPicker::PickPeriodicCompaction:Return", c);
return c;
}
} // namespace rocksdb
#endif // !ROCKSDB_LITE

@ -41,10 +41,9 @@ class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {
DBTestUniversalCompactionBase("/db_universal_compaction_test") {}
};
class DBTestUniversalDeleteTrigCompaction : public DBTestBase {
class DBTestUniversalCompaction2 : public DBTestBase {
public:
DBTestUniversalDeleteTrigCompaction()
: DBTestBase("/db_universal_compaction_test") {}
DBTestUniversalCompaction2() : DBTestBase("/db_universal_compaction_test2") {}
};
namespace {
@ -1915,7 +1914,7 @@ INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId,
::testing::Combine(::testing::Values(1, 8),
::testing::Bool()));
TEST_F(DBTestUniversalDeleteTrigCompaction, BasicL0toL1) {
TEST_F(DBTestUniversalCompaction2, BasicL0toL1) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
@ -1956,7 +1955,7 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, BasicL0toL1) {
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
TEST_F(DBTestUniversalDeleteTrigCompaction, SingleLevel) {
TEST_F(DBTestUniversalCompaction2, SingleLevel) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
@ -1995,7 +1994,7 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, SingleLevel) {
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
TEST_F(DBTestUniversalDeleteTrigCompaction, MultipleLevels) {
TEST_F(DBTestUniversalCompaction2, MultipleLevels) {
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
@ -2067,7 +2066,7 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, MultipleLevels) {
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
TEST_F(DBTestUniversalDeleteTrigCompaction, OverlappingL0) {
TEST_F(DBTestUniversalCompaction2, OverlappingL0) {
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
@ -2107,7 +2106,7 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, OverlappingL0) {
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
TEST_F(DBTestUniversalDeleteTrigCompaction, IngestBehind) {
TEST_F(DBTestUniversalCompaction2, IngestBehind) {
const int kNumKeys = 3000;
const int kWindowSize = 100;
const int kNumDelsTrigger = 90;
@ -2150,6 +2149,72 @@ TEST_F(DBTestUniversalDeleteTrigCompaction, IngestBehind) {
ASSERT_GT(NumTableFilesAtLevel(5), 0);
}
TEST_F(DBTestUniversalCompaction2, PeriodicCompaction) {
Options opts = CurrentOptions();
opts.env = env_;
opts.compaction_style = kCompactionStyleUniversal;
opts.level0_file_num_compaction_trigger = 10;
opts.max_open_files = -1;
opts.compaction_options_universal.size_ratio = 10;
opts.compaction_options_universal.min_merge_width = 2;
opts.compaction_options_universal.max_size_amplification_percent = 200;
opts.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
opts.num_levels = 5;
env_->addon_time_.store(0);
Reopen(opts);
int periodic_compactions = 0;
int start_level = -1;
int output_level = -1;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"UniversalCompactionPicker::PickPeriodicCompaction:Return",
[&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
ASSERT_TRUE(arg != nullptr);
ASSERT_TRUE(compaction->compaction_reason() ==
CompactionReason::kPeriodicCompaction);
start_level = compaction->start_level();
output_level = compaction->output_level();
periodic_compactions++;
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Case 1: Oldest flushed file excceeds periodic compaction threshold.
ASSERT_OK(Put("foo", "bar"));
Flush();
ASSERT_EQ(0, periodic_compactions);
// Move clock forward so that the flushed file would qualify periodic
// compaction.
env_->addon_time_.store(48 * 60 * 60 + 100);
// Another flush would trigger compaction the oldest file.
ASSERT_OK(Put("foo", "bar2"));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(1, periodic_compactions);
ASSERT_EQ(0, start_level);
ASSERT_EQ(4, output_level);
// Case 2: Oldest compacted file excceeds periodic compaction threshold
periodic_compactions = 0;
// A flush doesn't trigger a periodic compaction when threshold not hit
ASSERT_OK(Put("foo", "bar2"));
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, periodic_compactions);
// After periodic compaction threshold hits, a flush will trigger
// a compaction
ASSERT_OK(Put("foo", "bar2"));
env_->addon_time_.fetch_add(48 * 60 * 60 + 100);
Flush();
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(1, periodic_compactions);
ASSERT_EQ(0, start_level);
ASSERT_EQ(4, output_level);
}
} // namespace rocksdb
#endif // !defined(ROCKSDB_LITE)

@ -312,6 +312,10 @@ class VersionStorageInfo {
return files_marked_for_periodic_compaction_;
}
void TEST_AddFileMarkedForPeriodicCompaction(int level, FileMetaData* f) {
files_marked_for_periodic_compaction_.emplace_back(level, f);
}
// REQUIRES: This version has been saved (see VersionSet::SaveTo)
// REQUIRES: DB mutex held during access
const autovector<std::pair<int, FileMetaData*>>&

Loading…
Cancel
Save