diff --git a/HISTORY.md b/HISTORY.md index a36e03ed5..9b96e67ba 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ### New Features * `DeleteRange()` now supports user-defined timestamp. * Provide support for async_io with tailing iterators when ReadOptions.tailing is enabled during scans. +* Tiered Storage: allow data moving up from the last level to the penultimate level if the input level is penultimate level or above. ### Bug Fixes * Fix a bug in io_uring_prep_cancel in AbortIO API for posix which expects sqe->addr to match with read request submitted and wrong paramter was being passed. diff --git a/db/compaction/compaction.cc b/db/compaction/compaction.cc index 81bdcbe7a..31108ab85 100644 --- a/db/compaction/compaction.cc +++ b/db/compaction/compaction.cc @@ -322,9 +322,13 @@ void Compaction::PopulatePenultimateLevelOutputRange() { return; } + int exclude_level = + immutable_options_.compaction_style == kCompactionStyleUniversal + ? kInvalidLevel + : number_levels_ - 1; GetBoundaryKeys(input_vstorage_, inputs_, &penultimate_level_smallest_user_key_, - &penultimate_level_largest_user_key_, number_levels_ - 1); + &penultimate_level_largest_user_key_, exclude_level); } Compaction::~Compaction() { diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index f0f4d964b..fde3dfcee 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -314,12 +314,19 @@ bool CompactionPicker::FilesRangeOverlapWithCompaction( int penultimate_level = Compaction::EvaluatePenultimateLevel(ioptions_, start_level, level); if (penultimate_level != Compaction::kInvalidLevel) { - InternalKey penultimate_smallest, penultimate_largest; - GetRange(inputs, &penultimate_smallest, &penultimate_largest, level); - if (RangeOverlapWithCompaction(penultimate_smallest.user_key(), - penultimate_largest.user_key(), - penultimate_level)) { - return true; + if (ioptions_.compaction_style == kCompactionStyleUniversal) { + if (RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(), + penultimate_level)) { + return true; + } + } else { + InternalKey penultimate_smallest, penultimate_largest; + GetRange(inputs, &penultimate_smallest, &penultimate_largest, level); + if (RangeOverlapWithCompaction(penultimate_smallest.user_key(), + penultimate_largest.user_key(), + penultimate_level)) { + return true; + } } } diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index d39eb63eb..dea7ea37b 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -3656,6 +3656,63 @@ TEST_P(PerKeyPlacementCompactionPickerTest, NormalCompactionOverlapUniversal) { input_files, 5)); } +TEST_P(PerKeyPlacementCompactionPickerTest, PenultimateOverlapUniversal) { + // This test is make sure the Tiered compaction would lock whole range of + // both output level and penultimate level + if (enable_per_key_placement_) { + ioptions_.preclude_last_level_data_seconds = 10000; + } + + int num_levels = ioptions_.num_levels; + ioptions_.compaction_style = kCompactionStyleUniversal; + UniversalCompactionPicker universal_compaction_picker(ioptions_, &icmp_); + NewVersionStorage(num_levels, kCompactionStyleUniversal); + + // L4: [200, 220] [230, 250] [360, 380] + // L5: + // L6: [101, 351] + Add(4, 40U, "200", "220", 60000000U); + Add(4, 41U, "230", "250", 60000000U); + Add(4, 42U, "360", "380", 60000000U); + Add(6, 50U, "101", "351", 60000000U); + UpdateVersionStorageInfo(); + + // the existing compaction is the 1st L4 file + L6 file + // then compaction of the 2nd L4 file to L5 (penultimate level) is overlapped + // when the tiered compaction feature is on. + CompactionOptions comp_options; + std::unordered_set input_set; + input_set.insert(40); + input_set.insert(50); + std::vector input_files; + ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers( + &input_files, &input_set, vstorage_.get(), comp_options)); + + std::unique_ptr comp1(universal_compaction_picker.CompactFiles( + comp_options, input_files, 6, vstorage_.get(), mutable_cf_options_, + mutable_db_options_, 0)); + + input_set.clear(); + input_files.clear(); + input_set.insert(41); + ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers( + &input_files, &input_set, vstorage_.get(), comp_options)); + + ASSERT_EQ(enable_per_key_placement_, + universal_compaction_picker.FilesRangeOverlapWithCompaction( + input_files, 5)); + + // compacting the 3rd L4 file is always safe: + input_set.clear(); + input_files.clear(); + input_set.insert(42); + ASSERT_OK(universal_compaction_picker.GetCompactionInputsFromFileNumbers( + &input_files, &input_set, vstorage_.get(), comp_options)); + + ASSERT_FALSE(universal_compaction_picker.FilesRangeOverlapWithCompaction( + input_files, 5)); +} + INSTANTIATE_TEST_CASE_P(PerKeyPlacementCompactionPickerTest, PerKeyPlacementCompactionPickerTest, ::testing::Bool()); diff --git a/db/compaction/tiered_compaction_test.cc b/db/compaction/tiered_compaction_test.cc index 53e5a2639..864d281cd 100644 --- a/db/compaction/tiered_compaction_test.cc +++ b/db/compaction/tiered_compaction_test.cc @@ -11,6 +11,8 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" #include "rocksdb/listener.h" +#include "rocksdb/utilities/debug.h" +#include "test_util/mock_time_env.h" namespace ROCKSDB_NAMESPACE { @@ -416,36 +418,18 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageUniversal) { ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); - // Add 2 keys in higher level, but in separated files, the keys within that - // range should be moved up to the penultimate level + // Add 2 keys in higher level, but in separated files, all keys can be moved + // up if it's hot ASSERT_OK(Put(Key(0), "value" + std::to_string(0))); ASSERT_OK(Flush()); ASSERT_OK(Put(Key(50), "value" + std::to_string(0))); ASSERT_OK(Flush()); - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); - ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); - - // Add an SST with a key range cover all the data we want to move from the - // last level to the penultimate level - ASSERT_OK(Put(Key(0), "value" + std::to_string(0))); - ASSERT_OK(Put(Key(99), "value" + std::to_string(0))); - ASSERT_OK(Flush()); - - ResetAllStats(expect_stats, expect_pl_stats); - ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); - last_stats.Add(kBasicCompStats); - last_stats.ResetCompactionReason(CompactionReason::kManualCompaction); - last_stats.bytes_read_output_level = kHasValue; - last_stats.num_input_files_in_output_level = kHasValue; - expect_pl_stats.Add(kBasicPerKeyPlacementCompStats); - expect_pl_stats.ResetCompactionReason(CompactionReason::kManualCompaction); - VerifyCompactionStats(expect_stats, expect_pl_stats); - // change to only 1 key cold, to test compaction could stop even it matches // size amp compaction threshold { @@ -1215,6 +1199,241 @@ TEST_P(TieredCompactionTest, RangeBasedTieredStorageLevel) { INSTANTIATE_TEST_CASE_P(TieredCompactionTest, TieredCompactionTest, testing::Bool()); +class PrecludeLastLevelTest : public DBTestBase { + public: + PrecludeLastLevelTest() + : DBTestBase("preclude_last_level_test", /*env_do_fsync=*/false) { + mock_clock_ = std::make_shared(env_->GetSystemClock()); + mock_env_ = std::make_unique(env_, mock_clock_); + } + + protected: + std::unique_ptr mock_env_; + std::shared_ptr mock_clock_; + + void SetUp() override { + mock_clock_->InstallTimedWaitFixCallback(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::StartPeriodicTaskScheduler:Init", [&](void* arg) { + auto periodic_task_scheduler_ptr = + reinterpret_cast(arg); + periodic_task_scheduler_ptr->TEST_OverrideTimer(mock_clock_.get()); + }); + mock_clock_->SetCurrentTime(0); + } +}; + +TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeManualCompaction) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preserve_internal_time_seconds = 10000; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // all data is pushed to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + // enable preclude feature + options.preclude_last_level_data_seconds = 10000; + options.last_level_temperature = Temperature::kCold; + Reopen(options); + + // all data is hot, even they're in the last level + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + // Generate a sstable and trigger manual compaction + ASSERT_OK(Put(Key(10), "value")); + ASSERT_OK(Flush()); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // all data is moved up to the penultimate level + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + // close explicitly, because the env is local variable which will be released + // first. + Close(); +} + +TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimeAutoCompaction) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preserve_internal_time_seconds = 10000; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // all data is pushed to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + // enable preclude feature + options.preclude_last_level_data_seconds = 10000; + options.last_level_temperature = Temperature::kCold; + // make sure it won't trigger Size Amp compaction, unlike normal Size Amp + // compaction which is typically a last level compaction, when tiered Storage + // ("preclude_last_level") is enabled, size amp won't include the last level. + // As the last level would be in cold tier and the size would not be a + // problem, which also avoid frequent hot to cold storage compaction. + options.compaction_options_universal.max_size_amplification_percent = 400; + Reopen(options); + + // all data is hot, even they're in the last level + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + // Write more data, but still all hot until the 10th SST, as: + // write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds + // The preclude_last_level_data_seconds is 10k + Random rnd(301); + for (; sst_num < kNumTrigger * 2 - 1; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + // the value needs to be big enough to trigger full compaction + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), rnd.RandomString(100))); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + ASSERT_OK(dbfull()->WaitForCompact(true)); + } + + // all data is moved up to the penultimate level + ASSERT_EQ("0,0,0,0,0,1", FilesPerLevel()); + ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + // close explicitly, because the env is local variable which will be released + // first. + Close(); +} + +TEST_F(PrecludeLastLevelTest, MigrationFromPreserveTimePartial) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + const int kKeyPerSec = 10; + + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.preserve_internal_time_seconds = 2000; + options.env = mock_env_.get(); + options.level0_file_num_compaction_trigger = kNumTrigger; + options.num_levels = kNumLevels; + DestroyAndReopen(options); + + // pass some time first, otherwise the first a few keys write time are going + // to be zero, and internally zero has special meaning: kUnknownSeqnoTime + dbfull()->TEST_WaitForPeridicTaskRun( + [&] { mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); }); + + int sst_num = 0; + // Write files that are overlap and enough to trigger compaction + for (; sst_num < kNumTrigger; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); + dbfull()->TEST_WaitForPeridicTaskRun([&] { + mock_clock_->MockSleepForSeconds(static_cast(kKeyPerSec)); + }); + } + ASSERT_OK(Flush()); + } + ASSERT_OK(dbfull()->WaitForCompact(true)); + + // all data is pushed to the last level + ASSERT_EQ("0,0,0,0,0,0,1", FilesPerLevel()); + + std::vector key_versions; + ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(), + std::numeric_limits::max(), + &key_versions)); + + // make sure there're more than 300 keys and first 100 keys are having seqno + // zeroed out, the last 100 key seqno not zeroed out + ASSERT_GT(key_versions.size(), 300); + for (int i = 0; i < 100; i++) { + ASSERT_EQ(key_versions[i].sequence, 0); + } + auto rit = key_versions.rbegin(); + for (int i = 0; i < 100; i++) { + ASSERT_GT(rit->sequence, 0); + rit++; + } + + // enable preclude feature + options.preclude_last_level_data_seconds = 2000; + options.last_level_temperature = Temperature::kCold; + Reopen(options); + + // Generate a sstable and trigger manual compaction + ASSERT_OK(Put(Key(10), "value")); + ASSERT_OK(Flush()); + + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + // some data are moved up, some are not + ASSERT_EQ("0,0,0,0,0,1,1", FilesPerLevel()); + ASSERT_GT(GetSstSizeHelper(Temperature::kCold), 0); + ASSERT_GT(GetSstSizeHelper(Temperature::kUnknown), 0); + + Close(); +} + #endif // !defined(ROCKSDB_LITE) } // namespace ROCKSDB_NAMESPACE