RoundRobin TTL compaction (#10725)

Summary:
For RoundRobin compaction, the data should be mostly sorted per level and within level. Use normal compaction picker for RR until all expired data is compacted.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10725

Reviewed By: ajkr

Differential Revision: D39771069

Pulled By: jay-zhuang

fbshipit-source-id: 7ccf88d7c093fad5673bda73a7b08cc4757780cd
main
Jay Zhuang 2 years ago committed by Facebook GitHub Bot
parent 626eaa4189
commit f007ad8b4f
  1. 1
      HISTORY.md
  2. 6
      db/compaction/compaction_job.cc
  3. 19
      db/compaction/compaction_picker_level.cc
  4. 236
      db/db_compaction_test.cc
  5. 4
      db/version_set.cc
  6. 3
      include/rocksdb/listener.h

@ -11,6 +11,7 @@
### Performance Improvements ### Performance Improvements
* Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files. * Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files.
* Improve RoundRobin TTL compaction, which is going to be the same as normal RoundRobin compaction to move the compaction cursor.
### New Features ### New Features
* Add a new option IOOptions.do_not_recurse that can be used by underlying file systems to skip recursing through sub directories and list only files in GetChildren API. * Add a new option IOOptions.do_not_recurse that can be used by underlying file systems to skip recursing through sub directories and list only files in GetChildren API.

@ -97,6 +97,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) {
return "ChangeTemperature"; return "ChangeTemperature";
case CompactionReason::kForcedBlobGC: case CompactionReason::kForcedBlobGC:
return "ForcedBlobGC"; return "ForcedBlobGC";
case CompactionReason::kRoundRobinTtl:
return "RoundRobinTtl";
case CompactionReason::kNumOfReasons: case CompactionReason::kNumOfReasons:
// fall through // fall through
default: default:
@ -1662,7 +1664,9 @@ Status CompactionJob::InstallCompactionResults(
stats.GetBytes()); stats.GetBytes());
} }
if (compaction->compaction_reason() == CompactionReason::kLevelMaxLevelSize && if ((compaction->compaction_reason() ==
CompactionReason::kLevelMaxLevelSize ||
compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) &&
compaction->immutable_options()->compaction_pri == kRoundRobin) { compaction->immutable_options()->compaction_pri == kRoundRobin) {
int start_level = compaction->start_level(); int start_level = compaction->start_level();
if (start_level > 0) { if (start_level > 0) {

@ -254,6 +254,25 @@ void LevelCompactionBuilder::SetupInitialFiles() {
} }
// TTL Compaction // TTL Compaction
if (ioptions_.compaction_pri == kRoundRobin &&
!vstorage_->ExpiredTtlFiles().empty()) {
auto expired_files = vstorage_->ExpiredTtlFiles();
// the expired files list should already be sorted by level
start_level_ = expired_files.front().first;
#ifndef NDEBUG
for (const auto& file : expired_files) {
assert(start_level_ <= file.first);
}
#endif
if (start_level_ > 0) {
output_level_ = start_level_ + 1;
if (PickFileToCompact()) {
compaction_reason_ = CompactionReason::kRoundRobinTtl;
return;
}
}
}
PickFileToCompact(vstorage_->ExpiredTtlFiles(), true); PickFileToCompact(vstorage_->ExpiredTtlFiles(), true);
if (!start_level_inputs_.empty()) { if (!start_level_inputs_.empty()) {
compaction_reason_ = CompactionReason::kTtl; compaction_reason_ = CompactionReason::kTtl;

@ -3824,6 +3824,242 @@ TEST_F(DBCompactionTest, NoCompactBottomLevelFilesWithDeletions) {
} }
} }
TEST_F(DBCompactionTest, RoundRobinTtlCompactionNormal) {
Options options = CurrentOptions();
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = 20;
options.ttl = 24 * 60 * 60; // 24 hours
options.compaction_pri = kRoundRobin;
env_->now_cpu_count_.store(0);
env_->SetMockSleep();
options.env = env_;
// add a small second for each wait time, to make sure the file is expired
int small_seconds = 1;
std::atomic_int ttl_compactions{0};
std::atomic_int round_robin_ttl_compactions{0};
std::atomic_int other_compactions{0};
SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
auto compaction_reason = compaction->compaction_reason();
if (compaction_reason == CompactionReason::kTtl) {
ttl_compactions++;
} else if (compaction_reason == CompactionReason::kRoundRobinTtl) {
round_robin_ttl_compactions++;
} else {
other_compactions++;
}
});
SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
// Setup the files from lower level to up level, each file is 1 hour's older
// than the next one.
// create 10 files on the last level (L6)
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 100 + j), "value" + std::to_string(i * 100 + j)));
}
ASSERT_OK(Flush());
env_->MockSleepForSeconds(60 * 60); // generate 1 file per hour
}
MoveFilesToLevel(6);
// create 5 files on L5
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 200; j++) {
ASSERT_OK(Put(Key(i * 200 + j), "value" + std::to_string(i * 200 + j)));
}
ASSERT_OK(Flush());
env_->MockSleepForSeconds(60 * 60);
}
MoveFilesToLevel(5);
// create 3 files on L4
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 300; j++) {
ASSERT_OK(Put(Key(i * 300 + j), "value" + std::to_string(i * 300 + j)));
}
ASSERT_OK(Flush());
env_->MockSleepForSeconds(60 * 60);
}
MoveFilesToLevel(4);
// The LSM tree should be like:
// L4: [0, 299], [300, 599], [600, 899]
// L5: [0, 199] [200, 399]...............[800, 999]
// L6: [0,99][100,199][200,299][300,399]...............[800,899][900,999]
ASSERT_EQ("0,0,0,0,3,5,10", FilesPerLevel());
// make sure the first L5 file is expired
env_->MockSleepForSeconds(16 * 60 * 60 + small_seconds++);
// trigger TTL compaction
ASSERT_OK(Put(Key(4), "value" + std::to_string(1)));
ASSERT_OK(Put(Key(5), "value" + std::to_string(1)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// verify there's a RoundRobin TTL compaction
ASSERT_EQ(1, round_robin_ttl_compactions);
round_robin_ttl_compactions = 0;
// expire 2 more files
env_->MockSleepForSeconds(2 * 60 * 60 + small_seconds++);
// trigger TTL compaction
ASSERT_OK(Put(Key(4), "value" + std::to_string(2)));
ASSERT_OK(Put(Key(5), "value" + std::to_string(2)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(2, round_robin_ttl_compactions);
round_robin_ttl_compactions = 0;
// expire 4 more files, 2 out of 3 files on L4 are expired
env_->MockSleepForSeconds(4 * 60 * 60 + small_seconds++);
// trigger TTL compaction
ASSERT_OK(Put(Key(6), "value" + std::to_string(3)));
ASSERT_OK(Put(Key(7), "value" + std::to_string(3)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, NumTableFilesAtLevel(4));
ASSERT_EQ(0, NumTableFilesAtLevel(5));
ASSERT_GT(round_robin_ttl_compactions, 0);
round_robin_ttl_compactions = 0;
// make the first L0 file expired, which triggers a normal TTL compaction
// instead of roundrobin TTL compaction, it will also include an extra file
// from L0 because of overlap
ASSERT_EQ(0, ttl_compactions);
env_->MockSleepForSeconds(19 * 60 * 60 + small_seconds++);
// trigger TTL compaction
ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// L0 -> L1 compaction is normal TTL compaction, L1 -> next levels compactions
// are RoundRobin TTL compaction.
ASSERT_GT(ttl_compactions, 0);
ttl_compactions = 0;
ASSERT_GT(round_robin_ttl_compactions, 0);
round_robin_ttl_compactions = 0;
// All files are expired, so only the last level has data
env_->MockSleepForSeconds(24 * 60 * 60);
// trigger TTL compaction
ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
ASSERT_GT(ttl_compactions, 0);
ttl_compactions = 0;
ASSERT_GT(round_robin_ttl_compactions, 0);
round_robin_ttl_compactions = 0;
ASSERT_EQ(0, other_compactions);
}
TEST_F(DBCompactionTest, RoundRobinTtlCompactionUnsortedTime) {
// This is to test the case that the RoundRobin compaction cursor not pointing
// to the oldest file, RoundRobin compaction should still compact the file
// after cursor until all expired files are compacted.
Options options = CurrentOptions();
options.compression = kNoCompression;
options.level0_file_num_compaction_trigger = 20;
options.ttl = 24 * 60 * 60; // 24 hours
options.compaction_pri = kRoundRobin;
env_->now_cpu_count_.store(0);
env_->SetMockSleep();
options.env = env_;
std::atomic_int ttl_compactions{0};
std::atomic_int round_robin_ttl_compactions{0};
std::atomic_int other_compactions{0};
SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
auto compaction_reason = compaction->compaction_reason();
if (compaction_reason == CompactionReason::kTtl) {
ttl_compactions++;
} else if (compaction_reason == CompactionReason::kRoundRobinTtl) {
round_robin_ttl_compactions++;
} else {
other_compactions++;
}
});
SyncPoint::GetInstance()->EnableProcessing();
DestroyAndReopen(options);
// create 10 files on the last level (L6)
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 100; j++) {
ASSERT_OK(Put(Key(i * 100 + j), "value" + std::to_string(i * 100 + j)));
}
ASSERT_OK(Flush());
env_->MockSleepForSeconds(60 * 60); // generate 1 file per hour
}
MoveFilesToLevel(6);
// create 5 files on L5
for (int i = 0; i < 5; i++) {
for (int j = 0; j < 200; j++) {
ASSERT_OK(Put(Key(i * 200 + j), "value" + std::to_string(i * 200 + j)));
}
ASSERT_OK(Flush());
env_->MockSleepForSeconds(60 * 60); // 1 hour
}
MoveFilesToLevel(5);
// The LSM tree should be like:
// L5: [0, 199] [200, 399] [400,599] [600,799] [800, 999]
// L6: [0,99][100,199][200,299][300,399]....................[800,899][900,999]
ASSERT_EQ("0,0,0,0,0,5,10", FilesPerLevel());
// point the compaction cursor to the 4th file on L5
VersionSet* const versions = dbfull()->GetVersionSet();
assert(versions);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
ASSERT_NE(current, nullptr);
VersionStorageInfo* storage_info = current->storage_info();
ASSERT_NE(storage_info, nullptr);
const InternalKey split_cursor = InternalKey(Key(600), 100000, kTypeValue);
storage_info->AddCursorForOneLevel(5, split_cursor);
// make the first file on L5 expired, there should be 3 TTL compactions:
// 4th one, 5th one, then 1st one.
env_->MockSleepForSeconds(19 * 60 * 60 + 1);
// trigger TTL compaction
ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(2, NumTableFilesAtLevel(5));
ASSERT_EQ(3, round_robin_ttl_compactions);
ASSERT_EQ(0, ttl_compactions);
ASSERT_EQ(0, other_compactions);
}
TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) { TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
const int kNumKeysPerFile = 32; const int kNumKeysPerFile = 32;
const int kNumLevelFiles = 2; const int kNumLevelFiles = 2;

@ -4561,6 +4561,10 @@ std::string Version::DebugString(bool hex, bool print_stats) const {
AppendNumberTo(&r, level); AppendNumberTo(&r, level);
r.append(" --- version# "); r.append(" --- version# ");
AppendNumberTo(&r, version_number_); AppendNumberTo(&r, version_number_);
if (storage_info_.compact_cursor_[level].Valid()) {
r.append(" --- compact_cursor: ");
r.append(storage_info_.compact_cursor_[level].DebugString(hex));
}
r.append(" ---\n"); r.append(" ---\n");
const std::vector<FileMetaData*>& files = storage_info_.files_[level]; const std::vector<FileMetaData*>& files = storage_info_.files_[level];
for (size_t i = 0; i < files.size(); i++) { for (size_t i = 0; i < files.size(); i++) {

@ -148,6 +148,9 @@ enum class CompactionReason : int {
kChangeTemperature, kChangeTemperature,
// Compaction scheduled to force garbage collection of blob files // Compaction scheduled to force garbage collection of blob files
kForcedBlobGC, kForcedBlobGC,
// A special TTL compaction for RoundRobin policy, which basically the same as
// kLevelMaxLevelSize, but the goal is to compact TTLed files.
kRoundRobinTtl,
// total number of compaction reasons, new reasons must be added above this. // total number of compaction reasons, new reasons must be added above this.
kNumOfReasons, kNumOfReasons,
}; };

Loading…
Cancel
Save