diff --git a/HISTORY.md b/HISTORY.md index 2cb3b6201..93f9f1d7f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,7 @@ ### Performance Improvements * Try to align the compaction output file boundaries to the next level ones, which can reduce more than 10% compaction load for the default level compaction. The feature is enabled by default, to disable, set `AdvancedColumnFamilyOptions.level_compaction_dynamic_file_size` to false. As a side effect, it can create SSTs larger than the target_file_size (capped at 2x target_file_size) or smaller files. +* Improve RoundRobin TTL compaction, which is going to be the same as normal RoundRobin compaction to move the compaction cursor. ### New Features * Add a new option IOOptions.do_not_recurse that can be used by underlying file systems to skip recursing through sub directories and list only files in GetChildren API. diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 9ec21f03c..442e302cb 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -97,6 +97,8 @@ const char* GetCompactionReasonString(CompactionReason compaction_reason) { return "ChangeTemperature"; case CompactionReason::kForcedBlobGC: return "ForcedBlobGC"; + case CompactionReason::kRoundRobinTtl: + return "RoundRobinTtl"; case CompactionReason::kNumOfReasons: // fall through default: @@ -1662,7 +1664,9 @@ Status CompactionJob::InstallCompactionResults( stats.GetBytes()); } - if (compaction->compaction_reason() == CompactionReason::kLevelMaxLevelSize && + if ((compaction->compaction_reason() == + CompactionReason::kLevelMaxLevelSize || + compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) && compaction->immutable_options()->compaction_pri == kRoundRobin) { int start_level = compaction->start_level(); if (start_level > 0) { diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index 271b59bff..2280a5cec 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -254,6 +254,25 @@ void LevelCompactionBuilder::SetupInitialFiles() { } // TTL Compaction + if (ioptions_.compaction_pri == kRoundRobin && + !vstorage_->ExpiredTtlFiles().empty()) { + auto expired_files = vstorage_->ExpiredTtlFiles(); + // the expired files list should already be sorted by level + start_level_ = expired_files.front().first; +#ifndef NDEBUG + for (const auto& file : expired_files) { + assert(start_level_ <= file.first); + } +#endif + if (start_level_ > 0) { + output_level_ = start_level_ + 1; + if (PickFileToCompact()) { + compaction_reason_ = CompactionReason::kRoundRobinTtl; + return; + } + } + } + PickFileToCompact(vstorage_->ExpiredTtlFiles(), true); if (!start_level_inputs_.empty()) { compaction_reason_ = CompactionReason::kTtl; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index d4b96c757..15a7ce9a6 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3824,6 +3824,242 @@ TEST_F(DBCompactionTest, NoCompactBottomLevelFilesWithDeletions) { } } +TEST_F(DBCompactionTest, RoundRobinTtlCompactionNormal) { + Options options = CurrentOptions(); + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = 20; + options.ttl = 24 * 60 * 60; // 24 hours + options.compaction_pri = kRoundRobin; + env_->now_cpu_count_.store(0); + env_->SetMockSleep(); + options.env = env_; + + // add a small second for each wait time, to make sure the file is expired + int small_seconds = 1; + + std::atomic_int ttl_compactions{0}; + std::atomic_int round_robin_ttl_compactions{0}; + std::atomic_int other_compactions{0}; + + SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + Compaction* compaction = reinterpret_cast(arg); + auto compaction_reason = compaction->compaction_reason(); + if (compaction_reason == CompactionReason::kTtl) { + ttl_compactions++; + } else if (compaction_reason == CompactionReason::kRoundRobinTtl) { + round_robin_ttl_compactions++; + } else { + other_compactions++; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + + // Setup the files from lower level to up level, each file is 1 hour's older + // than the next one. + // create 10 files on the last level (L6) + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(i * 100 + j), "value" + std::to_string(i * 100 + j))); + } + ASSERT_OK(Flush()); + env_->MockSleepForSeconds(60 * 60); // generate 1 file per hour + } + MoveFilesToLevel(6); + + // create 5 files on L5 + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 200; j++) { + ASSERT_OK(Put(Key(i * 200 + j), "value" + std::to_string(i * 200 + j))); + } + ASSERT_OK(Flush()); + env_->MockSleepForSeconds(60 * 60); + } + MoveFilesToLevel(5); + + // create 3 files on L4 + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 300; j++) { + ASSERT_OK(Put(Key(i * 300 + j), "value" + std::to_string(i * 300 + j))); + } + ASSERT_OK(Flush()); + env_->MockSleepForSeconds(60 * 60); + } + MoveFilesToLevel(4); + + // The LSM tree should be like: + // L4: [0, 299], [300, 599], [600, 899] + // L5: [0, 199] [200, 399]...............[800, 999] + // L6: [0,99][100,199][200,299][300,399]...............[800,899][900,999] + ASSERT_EQ("0,0,0,0,3,5,10", FilesPerLevel()); + + // make sure the first L5 file is expired + env_->MockSleepForSeconds(16 * 60 * 60 + small_seconds++); + + // trigger TTL compaction + ASSERT_OK(Put(Key(4), "value" + std::to_string(1))); + ASSERT_OK(Put(Key(5), "value" + std::to_string(1))); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // verify there's a RoundRobin TTL compaction + ASSERT_EQ(1, round_robin_ttl_compactions); + round_robin_ttl_compactions = 0; + + // expire 2 more files + env_->MockSleepForSeconds(2 * 60 * 60 + small_seconds++); + // trigger TTL compaction + ASSERT_OK(Put(Key(4), "value" + std::to_string(2))); + ASSERT_OK(Put(Key(5), "value" + std::to_string(2))); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_EQ(2, round_robin_ttl_compactions); + round_robin_ttl_compactions = 0; + + // expire 4 more files, 2 out of 3 files on L4 are expired + env_->MockSleepForSeconds(4 * 60 * 60 + small_seconds++); + // trigger TTL compaction + ASSERT_OK(Put(Key(6), "value" + std::to_string(3))); + ASSERT_OK(Put(Key(7), "value" + std::to_string(3))); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + ASSERT_EQ(1, NumTableFilesAtLevel(4)); + ASSERT_EQ(0, NumTableFilesAtLevel(5)); + + ASSERT_GT(round_robin_ttl_compactions, 0); + round_robin_ttl_compactions = 0; + + // make the first L0 file expired, which triggers a normal TTL compaction + // instead of roundrobin TTL compaction, it will also include an extra file + // from L0 because of overlap + ASSERT_EQ(0, ttl_compactions); + env_->MockSleepForSeconds(19 * 60 * 60 + small_seconds++); + + // trigger TTL compaction + ASSERT_OK(Put(Key(6), "value" + std::to_string(4))); + ASSERT_OK(Put(Key(7), "value" + std::to_string(4))); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + + // L0 -> L1 compaction is normal TTL compaction, L1 -> next levels compactions + // are RoundRobin TTL compaction. + ASSERT_GT(ttl_compactions, 0); + ttl_compactions = 0; + ASSERT_GT(round_robin_ttl_compactions, 0); + round_robin_ttl_compactions = 0; + + // All files are expired, so only the last level has data + env_->MockSleepForSeconds(24 * 60 * 60); + // trigger TTL compaction + ASSERT_OK(Put(Key(6), "value" + std::to_string(4))); + ASSERT_OK(Put(Key(7), "value" + std::to_string(4))); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); + + ASSERT_GT(ttl_compactions, 0); + ttl_compactions = 0; + ASSERT_GT(round_robin_ttl_compactions, 0); + round_robin_ttl_compactions = 0; + + ASSERT_EQ(0, other_compactions); +} + +TEST_F(DBCompactionTest, RoundRobinTtlCompactionUnsortedTime) { + // This is to test the case that the RoundRobin compaction cursor not pointing + // to the oldest file, RoundRobin compaction should still compact the file + // after cursor until all expired files are compacted. + Options options = CurrentOptions(); + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = 20; + options.ttl = 24 * 60 * 60; // 24 hours + options.compaction_pri = kRoundRobin; + env_->now_cpu_count_.store(0); + env_->SetMockSleep(); + options.env = env_; + + std::atomic_int ttl_compactions{0}; + std::atomic_int round_robin_ttl_compactions{0}; + std::atomic_int other_compactions{0}; + + SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + Compaction* compaction = reinterpret_cast(arg); + auto compaction_reason = compaction->compaction_reason(); + if (compaction_reason == CompactionReason::kTtl) { + ttl_compactions++; + } else if (compaction_reason == CompactionReason::kRoundRobinTtl) { + round_robin_ttl_compactions++; + } else { + other_compactions++; + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + + // create 10 files on the last level (L6) + for (int i = 0; i < 10; i++) { + for (int j = 0; j < 100; j++) { + ASSERT_OK(Put(Key(i * 100 + j), "value" + std::to_string(i * 100 + j))); + } + ASSERT_OK(Flush()); + env_->MockSleepForSeconds(60 * 60); // generate 1 file per hour + } + MoveFilesToLevel(6); + + // create 5 files on L5 + for (int i = 0; i < 5; i++) { + for (int j = 0; j < 200; j++) { + ASSERT_OK(Put(Key(i * 200 + j), "value" + std::to_string(i * 200 + j))); + } + ASSERT_OK(Flush()); + env_->MockSleepForSeconds(60 * 60); // 1 hour + } + MoveFilesToLevel(5); + + // The LSM tree should be like: + // L5: [0, 199] [200, 399] [400,599] [600,799] [800, 999] + // L6: [0,99][100,199][200,299][300,399]....................[800,899][900,999] + ASSERT_EQ("0,0,0,0,0,5,10", FilesPerLevel()); + + // point the compaction cursor to the 4th file on L5 + VersionSet* const versions = dbfull()->GetVersionSet(); + assert(versions); + ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault(); + ASSERT_NE(cfd, nullptr); + Version* const current = cfd->current(); + ASSERT_NE(current, nullptr); + VersionStorageInfo* storage_info = current->storage_info(); + ASSERT_NE(storage_info, nullptr); + const InternalKey split_cursor = InternalKey(Key(600), 100000, kTypeValue); + storage_info->AddCursorForOneLevel(5, split_cursor); + + // make the first file on L5 expired, there should be 3 TTL compactions: + // 4th one, 5th one, then 1st one. + env_->MockSleepForSeconds(19 * 60 * 60 + 1); + // trigger TTL compaction + ASSERT_OK(Put(Key(6), "value" + std::to_string(4))); + ASSERT_OK(Put(Key(7), "value" + std::to_string(4))); + ASSERT_OK(Flush()); + + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(2, NumTableFilesAtLevel(5)); + + ASSERT_EQ(3, round_robin_ttl_compactions); + ASSERT_EQ(0, ttl_compactions); + ASSERT_EQ(0, other_compactions); +} + TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) { const int kNumKeysPerFile = 32; const int kNumLevelFiles = 2; diff --git a/db/version_set.cc b/db/version_set.cc index a3e8ac944..ae8a30a28 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4561,6 +4561,10 @@ std::string Version::DebugString(bool hex, bool print_stats) const { AppendNumberTo(&r, level); r.append(" --- version# "); AppendNumberTo(&r, version_number_); + if (storage_info_.compact_cursor_[level].Valid()) { + r.append(" --- compact_cursor: "); + r.append(storage_info_.compact_cursor_[level].DebugString(hex)); + } r.append(" ---\n"); const std::vector& files = storage_info_.files_[level]; for (size_t i = 0; i < files.size(); i++) { diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 5f6f89059..8644fcf3f 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -148,6 +148,9 @@ enum class CompactionReason : int { kChangeTemperature, // Compaction scheduled to force garbage collection of blob files kForcedBlobGC, + // A special TTL compaction for RoundRobin policy, which basically the same as + // kLevelMaxLevelSize, but the goal is to compact TTLed files. + kRoundRobinTtl, // total number of compaction reasons, new reasons must be added above this. kNumOfReasons, };