From c401f285c3215043346e150e6a7f319946d2f325 Mon Sep 17 00:00:00 2001 From: Jay Zhuang Date: Fri, 7 Oct 2022 18:49:40 -0700 Subject: [PATCH] Add option `preserve_internal_time_seconds` to preserve the time info (#10747) Summary: Add option `preserve_internal_time_seconds` to preserve the internal time information. It's mostly for the migration of the existing data to tiered storage ( `preclude_last_level_data_seconds`). When the tiering feature is just enabled, the existing data won't have the time information to decide if it's hot or cold. Enabling this feature will start collect and preserve the time information for the new data. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10747 Reviewed By: siying Differential Revision: D39910141 Pulled By: siying fbshipit-source-id: 25c21638e37b1a7c44006f636b7d714fe7242138 --- HISTORY.md | 1 + db/compaction/compaction_iterator.cc | 17 ++- db/compaction/compaction_iterator.h | 15 ++- db/compaction/compaction_job.cc | 39 ++++-- db/compaction/compaction_job.h | 15 ++- db/db_impl/db_impl.cc | 18 +-- db/seqno_time_test.cc | 181 +++++++++++++++++++++++---- db/seqno_to_time_mapping.cc | 25 +++- db/seqno_to_time_mapping.h | 5 +- include/rocksdb/advanced_options.h | 25 ++++ options/cf_options.cc | 5 + options/cf_options.h | 2 + options/options.cc | 3 + options/options_helper.cc | 2 + options/options_settable_test.cc | 3 + 15 files changed, 291 insertions(+), 65 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 4122a82c3..a01f12fa3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,7 @@ ### New Features * Add a new option IOOptions.do_not_recurse that can be used by underlying file systems to skip recursing through sub directories and list only files in GetChildren API. +* Add option `preserve_internal_time_seconds` to preserve the time information for the latest data. Which can be used to determine the age of data when `preclude_last_level_data_seconds` is enabled. The time information is attached with SST in table property `rocksdb.seqno.time.map` which can be parsed by tool ldb or sst_dump. ### Behavior Changes * Sanitize min_write_buffer_number_to_merge to 1 if atomic flush is enabled to prevent unexpected data loss when WAL is disabled in a multi-column-family setting (#10773). diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index a5a0d99df..7f2bed465 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -34,7 +34,8 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, - const SequenceNumber penultimate_level_cutoff_seqno) + const SequenceNumber preserve_time_min_seqno, + const SequenceNumber preclude_last_level_min_seqno) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, @@ -44,7 +45,7 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), compaction_filter, shutting_down, info_log, full_history_ts_low, - penultimate_level_cutoff_seqno) {} + preserve_time_min_seqno, preclude_last_level_min_seqno) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -61,7 +62,8 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, - const SequenceNumber penultimate_level_cutoff_seqno) + const SequenceNumber preserve_time_min_seqno, + const SequenceNumber preclude_last_level_min_seqno) : input_(input, cmp, !compaction || compaction->DoesInputReferenceBlobFiles()), cmp_(cmp), @@ -105,8 +107,10 @@ CompactionIterator::CompactionIterator( current_key_committed_(false), cmp_with_history_ts_low_(0), level_(compaction_ == nullptr ? 0 : compaction_->level()), - penultimate_level_cutoff_seqno_(penultimate_level_cutoff_seqno) { + preserve_time_min_seqno_(preserve_time_min_seqno), + preclude_last_level_min_seqno_(preclude_last_level_min_seqno) { assert(snapshots_ != nullptr); + assert(preserve_time_min_seqno_ <= preclude_last_level_min_seqno_); if (compaction_ != nullptr) { level_ptrs_ = std::vector(compaction_->number_levels(), 0); @@ -1088,6 +1092,7 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { } void CompactionIterator::DecideOutputLevel() { + assert(compaction_->SupportsPerKeyPlacement()); #ifndef NDEBUG // Could be overridden by unittest PerKeyPlacementContext context(level_, ikey_.user_key, value_, @@ -1099,7 +1104,7 @@ void CompactionIterator::DecideOutputLevel() { // if the key is newer than the cutoff sequence or within the earliest // snapshot, it should output to the penultimate level. - if (ikey_.sequence > penultimate_level_cutoff_seqno_ || + if (ikey_.sequence >= preclude_last_level_min_seqno_ || ikey_.sequence > earliest_snapshot_) { output_to_penultimate_level_ = true; } @@ -1161,7 +1166,7 @@ void CompactionIterator::PrepareOutput() { DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikey_.type != kTypeMerge && current_key_committed_ && !output_to_penultimate_level_ && - ikey_.sequence < penultimate_level_cutoff_seqno_) { + ikey_.sequence < preserve_time_min_seqno_) { if (ikey_.type == kTypeDeletion || (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { ROCKS_LOG_FATAL( diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index c5b7b67bd..c215d2bbb 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -200,7 +200,8 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, const std::string* full_history_ts_low = nullptr, - const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber); + const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber, + const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber); // Constructor with custom CompactionProxy, used for tests. CompactionIterator( @@ -218,7 +219,8 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, const std::string* full_history_ts_low = nullptr, - const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber); + const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber, + const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber); ~CompactionIterator(); @@ -470,9 +472,12 @@ class CompactionIterator { // output to. bool output_to_penultimate_level_{false}; - // any key later than this sequence number should have - // output_to_penultimate_level_ set to true - const SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber; + // min seqno for preserving the time information. + const SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber; + + // min seqno to preclude the data from the last level, if the key seqno larger + // than this, it will be output to penultimate level + const SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber; void AdvanceInputIter() { input_.Next(); } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 442e302cb..4222187af 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -265,11 +265,15 @@ void CompactionJob::Prepare() { /*sub_job_id*/ 0); } - if (c->immutable_options()->preclude_last_level_data_seconds > 0) { - // TODO(zjay): move to a function - seqno_time_mapping_.SetMaxTimeDuration( - c->immutable_options()->preclude_last_level_data_seconds); + // collect all seqno->time information from the input files which will be used + // to encode seqno->time to the output files. + uint64_t preserve_time_duration = + std::max(c->immutable_options()->preserve_internal_time_seconds, + c->immutable_options()->preclude_last_level_data_seconds); + + if (preserve_time_duration > 0) { // setup seqno_time_mapping_ + seqno_time_mapping_.SetMaxTimeDuration(preserve_time_duration); for (const auto& each_level : *c->inputs()) { for (const auto& fmd : each_level.files) { std::shared_ptr tp; @@ -295,10 +299,27 @@ void CompactionJob::Prepare() { ROCKS_LOG_WARN(db_options_.info_log, "Failed to get current time in compaction: Status: %s", status.ToString().c_str()); - penultimate_level_cutoff_seqno_ = 0; + // preserve all time information + preserve_time_min_seqno_ = 0; + preclude_last_level_min_seqno_ = 0; } else { - penultimate_level_cutoff_seqno_ = - seqno_time_mapping_.TruncateOldEntries(_current_time); + seqno_time_mapping_.TruncateOldEntries(_current_time); + uint64_t preserve_time = + static_cast(_current_time) > preserve_time_duration + ? _current_time - preserve_time_duration + : 0; + preserve_time_min_seqno_ = + seqno_time_mapping_.GetOldestSequenceNum(preserve_time); + if (c->immutable_options()->preclude_last_level_data_seconds > 0) { + uint64_t preclude_last_level_time = + static_cast(_current_time) > + c->immutable_options()->preclude_last_level_data_seconds + ? _current_time - + c->immutable_options()->preclude_last_level_data_seconds + : 0; + preclude_last_level_min_seqno_ = + seqno_time_mapping_.GetOldestSequenceNum(preclude_last_level_time); + } } } } @@ -1216,8 +1237,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, sub_compact->compaction, compaction_filter, shutting_down_, - db_options_.info_log, full_history_ts_low, - penultimate_level_cutoff_seqno_); + db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_, + preclude_last_level_min_seqno_); c_iter->SeekToFirst(); // Assign range delete aggregator to the target output level, which makes sure diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index d281b4c79..dac7de56d 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -335,12 +335,15 @@ class CompactionJob { // it also collects the smallest_seqno -> oldest_ancester_time from the SST. SeqnoToTimeMapping seqno_time_mapping_; - // cutoff sequence number for penultimate level, only set when - // per_key_placement feature is enabled. - // If a key with sequence number larger than penultimate_level_cutoff_seqno_, - // it will be placed on the penultimate_level and seqnuence number won't be - // zeroed out. - SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber; + // Minimal sequence number for preserving the time information. The time info + // older than this sequence number won't be preserved after the compaction and + // if it's bottommost compaction, the seq num will be zeroed out. + SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber; + + // Minimal sequence number to preclude the data from the last level. If the + // key has bigger (newer) sequence number than this, it will be precluded from + // the last level (output to penultimate level). + SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber; // Get table file name in where it's outputting to, which should also be in // `output_directory_`. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 5aa4b2a3d..f34cdfc15 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -847,11 +847,13 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() { InstrumentedMutexLock l(&mutex_); for (auto cfd : *versions_->GetColumnFamilySet()) { - uint64_t preclude_last_option = - cfd->ioptions()->preclude_last_level_data_seconds; - if (!cfd->IsDropped() && preclude_last_option > 0) { - min_time_duration = std::min(preclude_last_option, min_time_duration); - max_time_duration = std::max(preclude_last_option, max_time_duration); + // preserve time is the max of 2 options. + uint64_t preserve_time_duration = + std::max(cfd->ioptions()->preserve_internal_time_seconds, + cfd->ioptions()->preclude_last_level_data_seconds); + if (!cfd->IsDropped() && preserve_time_duration > 0) { + min_time_duration = std::min(preserve_time_duration, min_time_duration); + max_time_duration = std::max(preserve_time_duration, max_time_duration); } } if (min_time_duration == std::numeric_limits::max()) { @@ -3103,7 +3105,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, } } // InstrumentedMutexLock l(&mutex_) - if (cf_options.preclude_last_level_data_seconds > 0) { + if (cf_options.preserve_internal_time_seconds > 0 || + cf_options.preclude_last_level_data_seconds > 0) { s = RegisterRecordSeqnoTimeWorker(); } sv_context.Clean(); @@ -3194,7 +3197,8 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) { bg_cv_.SignalAll(); } - if (cfd->ioptions()->preclude_last_level_data_seconds > 0) { + if (cfd->ioptions()->preserve_internal_time_seconds > 0 || + cfd->ioptions()->preclude_last_level_data_seconds > 0) { s = RegisterRecordSeqnoTimeWorker(); } diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index ad0ac5f8a..12394a368 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -9,6 +9,7 @@ #include "db/seqno_to_time_mapping.h" #include "port/stack_trace.h" #include "rocksdb/iostats_context.h" +#include "rocksdb/utilities/debug.h" #include "test_util/mock_time_env.h" #ifndef ROCKSDB_LITE @@ -37,7 +38,7 @@ class SeqnoTimeTest : public DBTestBase { } // make sure the file is not in cache, otherwise it won't have IO info - void AssertKetTemperature(int key_id, Temperature expected_temperature) { + void AssertKeyTemperature(int key_id, Temperature expected_temperature) { get_iostats_context()->Reset(); IOStatsContext* iostats = get_iostats_context(); std::string result = Get(Key(key_id)); @@ -101,7 +102,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); // read a random key, which should be hot (kUnknown) - AssertKetTemperature(20, Temperature::kUnknown); + AssertKeyTemperature(20, Temperature::kUnknown); // Write more data, but still all hot until the 10th SST, as: // write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds @@ -139,7 +140,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { ASSERT_GT(hot_data_size, 0); ASSERT_GT(cold_data_size, 0); // the first a few key should be cold - AssertKetTemperature(20, Temperature::kCold); + AssertKeyTemperature(20, Temperature::kCold); for (int i = 0; i < 30; i++) { dbfull()->TEST_WaitForPeridicTaskRun([&] { @@ -148,8 +149,8 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); // the hot/cold data cut off range should be between i * 20 + 200 -> 250 - AssertKetTemperature(i * 20 + 250, Temperature::kUnknown); - AssertKetTemperature(i * 20 + 200, Temperature::kCold); + AssertKeyTemperature(i * 20 + 250, Temperature::kUnknown); + AssertKeyTemperature(i * 20 + 200, Temperature::kCold); } ASSERT_LT(GetSstSizeHelper(Temperature::kUnknown), hot_data_size); @@ -166,7 +167,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { } // any random data close to the end should be cold - AssertKetTemperature(1000, Temperature::kCold); + AssertKeyTemperature(1000, Temperature::kCold); // close explicitly, because the env is local variable which will be released // first. @@ -215,7 +216,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); // read a random key, which should be hot (kUnknown) - AssertKetTemperature(20, Temperature::kUnknown); + AssertKeyTemperature(20, Temperature::kUnknown); // Adding more data to have mixed hot and cold data for (; sst_num < 14; sst_num++) { @@ -237,7 +238,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { ASSERT_GT(hot_data_size, 0); ASSERT_GT(cold_data_size, 0); // the first a few key should be cold - AssertKetTemperature(20, Temperature::kCold); + AssertKeyTemperature(20, Temperature::kCold); // Wait some time, with each wait, the cold data is increasing and hot data is // decreasing @@ -253,8 +254,8 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { ASSERT_GT(cold_data_size, pre_cold); // the hot/cold cut_off key should be around i * 20 + 400 -> 450 - AssertKetTemperature(i * 20 + 450, Temperature::kUnknown); - AssertKetTemperature(i * 20 + 400, Temperature::kCold); + AssertKeyTemperature(i * 20 + 450, Temperature::kUnknown); + AssertKeyTemperature(i * 20 + 400, Temperature::kCold); } // Wait again, the most of the data should be cold after that @@ -267,14 +268,53 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { } // any random data close to the end should be cold - AssertKetTemperature(1000, Temperature::kCold); + AssertKeyTemperature(1000, Temperature::kCold); Close(); } -TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { +enum class SeqnoTimeTestType : char { + kTrackInternalTimeSeconds = 0, + kPrecludeLastLevel = 1, + kBothSetTrackSmaller = 2, +}; + +class SeqnoTimeTablePropTest + : public SeqnoTimeTest, + public ::testing::WithParamInterface { + public: + SeqnoTimeTablePropTest() : SeqnoTimeTest() {} + + void SetTrackTimeDurationOptions(uint64_t track_time_duration, + Options& options) const { + // either option set will enable the time tracking feature + switch (GetParam()) { + case SeqnoTimeTestType::kTrackInternalTimeSeconds: + options.preclude_last_level_data_seconds = 0; + options.preserve_internal_time_seconds = track_time_duration; + break; + case SeqnoTimeTestType::kPrecludeLastLevel: + options.preclude_last_level_data_seconds = track_time_duration; + options.preserve_internal_time_seconds = 0; + break; + case SeqnoTimeTestType::kBothSetTrackSmaller: + options.preclude_last_level_data_seconds = track_time_duration; + options.preserve_internal_time_seconds = track_time_duration / 10; + break; + } + } +}; + +INSTANTIATE_TEST_CASE_P( + SeqnoTimeTablePropTest, SeqnoTimeTablePropTest, + ::testing::Values(SeqnoTimeTestType::kTrackInternalTimeSeconds, + SeqnoTimeTestType::kPrecludeLastLevel, + SeqnoTimeTestType::kBothSetTrackSmaller)); + +TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { Options options = CurrentOptions(); - options.preclude_last_level_data_seconds = 10000; + SetTrackTimeDurationOptions(10000, options); + options.env = mock_env_.get(); options.disable_auto_compactions = true; DestroyAndReopen(options); @@ -297,6 +337,8 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { ASSERT_OK(tp_mapping.Sort()); ASSERT_FALSE(tp_mapping.Empty()); auto seqs = tp_mapping.TEST_GetInternalMapping(); + // about ~20 seqs->time entries, because the sample rate is 10000/100, and it + // passes 2k time. ASSERT_GE(seqs.size(), 19); ASSERT_LE(seqs.size(), 21); SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber(); @@ -444,7 +486,8 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { ASSERT_LE(seqs.size(), 101); for (auto i = start_seq; i < seq_end - 99; i++) { // likely the first 100 entries reports 0 - ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 3000); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 100 + 50000); } start_seq += 101; @@ -457,9 +500,10 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { ASSERT_OK(db_->Close()); } -TEST_F(SeqnoTimeTest, MultiCFs) { +TEST_P(SeqnoTimeTablePropTest, MultiCFs) { Options options = CurrentOptions(); options.preclude_last_level_data_seconds = 0; + options.preserve_internal_time_seconds = 0; options.env = mock_env_.get(); options.stats_dump_period_sec = 0; options.stats_persist_period_sec = 0; @@ -485,7 +529,7 @@ TEST_F(SeqnoTimeTest, MultiCFs) { ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty()); Options options_1 = options; - options_1.preclude_last_level_data_seconds = 10000; // 10k + SetTrackTimeDurationOptions(10000, options_1); CreateColumnFamilies({"one"}, options_1); ASSERT_TRUE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); @@ -514,11 +558,11 @@ TEST_F(SeqnoTimeTest, MultiCFs) { ASSERT_FALSE(tp_mapping.Empty()); auto seqs = tp_mapping.TEST_GetInternalMapping(); ASSERT_GE(seqs.size(), 1); - ASSERT_LE(seqs.size(), 3); + ASSERT_LE(seqs.size(), 4); // Create one more CF with larger preclude_last_level time Options options_2 = options; - options_2.preclude_last_level_data_seconds = 1000000; // 1m + SetTrackTimeDurationOptions(1000000, options_2); // 1m CreateColumnFamilies({"two"}, options_2); // Add more data to CF "two" to fill the in memory mapping @@ -618,11 +662,11 @@ TEST_F(SeqnoTimeTest, MultiCFs) { Close(); } -TEST_F(SeqnoTimeTest, MultiInstancesBasic) { +TEST_P(SeqnoTimeTablePropTest, MultiInstancesBasic) { const int kInstanceNum = 2; Options options = CurrentOptions(); - options.preclude_last_level_data_seconds = 10000; + SetTrackTimeDurationOptions(10000, options); options.env = mock_env_.get(); options.stats_dump_period_sec = 0; options.stats_persist_period_sec = 0; @@ -650,17 +694,32 @@ TEST_F(SeqnoTimeTest, MultiInstancesBasic) { } } -TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { +TEST_P(SeqnoTimeTablePropTest, SeqnoToTimeMappingUniversal) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + Options options = CurrentOptions(); + SetTrackTimeDurationOptions(10000, options); options.compaction_style = kCompactionStyleUniversal; - options.preclude_last_level_data_seconds = 10000; + options.num_levels = kNumLevels; options.env = mock_env_.get(); DestroyAndReopen(options); - for (int j = 0; j < 3; j++) { - for (int i = 0; i < 100; i++) { - ASSERT_OK(Put(Key(i), "value")); + std::atomic_uint64_t num_seqno_zeroing{0}; + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput:ZeroingSeq", + [&](void* /*arg*/) { num_seqno_zeroing++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + int sst_num = 0; + for (; sst_num < kNumTrigger - 1; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } @@ -681,11 +740,12 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { } // Trigger a compaction - for (int i = 0; i < 100; i++) { - ASSERT_OK(Put(Key(i), "value")); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } + sst_num++; ASSERT_OK(Flush()); ASSERT_OK(dbfull()->TEST_WaitForCompact()); tables_props.clear(); @@ -696,6 +756,73 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { SeqnoToTimeMapping tp_mapping; ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + + // compact to the last level + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + // make sure the data is all compacted to penultimate level if the feature is + // on, otherwise, compacted to the last level. + if (options.preclude_last_level_data_seconds > 0) { + ASSERT_GT(NumTableFilesAtLevel(5), 0); + ASSERT_EQ(NumTableFilesAtLevel(6), 0); + } else { + ASSERT_EQ(NumTableFilesAtLevel(5), 0); + ASSERT_GT(NumTableFilesAtLevel(6), 0); + } + + // regardless the file is on the last level or not, it should keep the time + // information and sequence number are not set + tables_props.clear(); + tp_mapping.Clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + + ASSERT_EQ(tables_props.size(), 1); + ASSERT_EQ(num_seqno_zeroing, 0); + + it = tables_props.begin(); + ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + + // make half of the data expired + mock_clock_->MockSleepForSeconds(static_cast(8000)); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + tables_props.clear(); + tp_mapping.Clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + + if (options.preclude_last_level_data_seconds > 0) { + ASSERT_EQ(tables_props.size(), 2); + } else { + ASSERT_EQ(tables_props.size(), 1); + } + ASSERT_GT(num_seqno_zeroing, 0); + std::vector key_versions; + ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(), + std::numeric_limits::max(), + &key_versions)); + // make sure there're more than 300 keys and first 100 keys are having seqno + // zeroed out, the last 100 key seqno not zeroed out + ASSERT_GT(key_versions.size(), 300); + for (int i = 0; i < 100; i++) { + ASSERT_EQ(key_versions[i].sequence, 0); + } + auto rit = key_versions.rbegin(); + for (int i = 0; i < 100; i++) { + ASSERT_GT(rit->sequence, 0); + rit++; + } + + // make all data expired and compact again to push it to the last level + // regardless if the tiering feature is enabled or not + mock_clock_->MockSleepForSeconds(static_cast(20000)); + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + ASSERT_GT(num_seqno_zeroing, 0); + ASSERT_GT(NumTableFilesAtLevel(6), 0); + Close(); } diff --git a/db/seqno_to_time_mapping.cc b/db/seqno_to_time_mapping.cc index a9d3dc36c..c69209929 100644 --- a/db/seqno_to_time_mapping.cc +++ b/db/seqno_to_time_mapping.cc @@ -31,11 +31,11 @@ void SeqnoToTimeMapping::Add(SequenceNumber seqno, uint64_t time) { seqno_time_mapping_.emplace_back(seqno, time); } -SequenceNumber SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { +void SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { assert(is_sorted_); if (max_time_duration_ == 0) { - return 0; + return; } const uint64_t cut_off_time = @@ -48,12 +48,25 @@ SequenceNumber SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { return target < other.time; }); if (it == seqno_time_mapping_.begin()) { - return 0; + return; } it--; seqno_time_mapping_.erase(seqno_time_mapping_.begin(), it); +} - return seqno_time_mapping_.front().seqno; +SequenceNumber SeqnoToTimeMapping::GetOldestSequenceNum(uint64_t time) { + assert(is_sorted_); + + auto it = std::upper_bound( + seqno_time_mapping_.begin(), seqno_time_mapping_.end(), time, + [](uint64_t target, const SeqnoTimePair& other) -> bool { + return target < other.time; + }); + if (it == seqno_time_mapping_.begin()) { + return 0; + } + it--; + return it->seqno; } // The encoded format is: @@ -94,6 +107,10 @@ void SeqnoToTimeMapping::Encode(std::string& dest, const SequenceNumber start, start_it++; } } + // to include the first element + if (start_it != seqno_time_mapping_.begin()) { + start_it--; + } // If there are more data than needed, pick the entries for encoding. // It's not the most optimized algorithm for selecting the best representative diff --git a/db/seqno_to_time_mapping.h b/db/seqno_to_time_mapping.h index ad32a6517..4ffc9c199 100644 --- a/db/seqno_to_time_mapping.h +++ b/db/seqno_to_time_mapping.h @@ -107,7 +107,10 @@ class SeqnoToTimeMapping { uint64_t GetOldestApproximateTime(SequenceNumber seqno) const; // Truncate the old entries based on the current time and max_time_duration_ - SequenceNumber TruncateOldEntries(uint64_t now); + void TruncateOldEntries(uint64_t now); + + // Given a time, return it's oldest possible sequence number + SequenceNumber GetOldestSequenceNum(uint64_t time); // Encode to a binary string void Encode(std::string& des, SequenceNumber start, SequenceNumber end, diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 144b5c61e..a768f5dd4 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -911,8 +911,33 @@ struct AdvancedColumnFamilyOptions { // size constrained, the size amp is going to be only for non-last levels. // // Default: 0 (disable the feature) + // + // Not dynamically changeable, change it requires db restart. uint64_t preclude_last_level_data_seconds = 0; + // EXPERIMENTAL + // If this option is set, it will preserve the internal time information about + // the data until it's older than the specified time here. + // Internally the time information is a map between sequence number and time, + // which is the same as `preclude_last_level_data_seconds`. But it won't + // preclude the data from the last level and the data in the last level won't + // have the sequence number zeroed out. + // Internally, rocksdb would sample the sequence number to time pair and store + // that in SST property "rocksdb.seqno.time.map". The information is currently + // only used for tiered storage compaction (option + // `preclude_last_level_data_seconds`). + // + // Note: if both `preclude_last_level_data_seconds` and this option is set, it + // will preserve the max time of the 2 options and compaction still preclude + // the data based on `preclude_last_level_data_seconds`. + // The higher the preserve_time is, the less the sampling frequency will be ( + // which means less accuracy of the time estimation). + // + // Default: 0 (disable the feature) + // + // Not dynamically changeable, change it requires db restart. + uint64_t preserve_internal_time_seconds = 0; + // When set, large values (blobs) are written to separate blob files, and // only pointers to them are stored in SST files. This can reduce write // amplification for large-value use cases at the cost of introducing a level diff --git a/options/cf_options.cc b/options/cf_options.cc index 5ae6a86e4..f34e8849e 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -567,6 +567,10 @@ static std::unordered_map {offsetof(struct ImmutableCFOptions, preclude_last_level_data_seconds), OptionType::kUInt64T, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"preserve_internal_time_seconds", + {offsetof(struct ImmutableCFOptions, preserve_internal_time_seconds), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, // Need to keep this around to be able to read old OPTIONS files. {"max_mem_compaction_level", {0, OptionType::kInt, OptionVerificationType::kDeprecated, @@ -904,6 +908,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ColumnFamilyOptions& cf_options) force_consistency_checks(cf_options.force_consistency_checks), preclude_last_level_data_seconds( cf_options.preclude_last_level_data_seconds), + preserve_internal_time_seconds(cf_options.preserve_internal_time_seconds), memtable_insert_with_hint_prefix_extractor( cf_options.memtable_insert_with_hint_prefix_extractor), cf_paths(cf_options.cf_paths), diff --git a/options/cf_options.h b/options/cf_options.h index da6b7252a..5ac4c0c8e 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -74,6 +74,8 @@ struct ImmutableCFOptions { uint64_t preclude_last_level_data_seconds; + uint64_t preserve_internal_time_seconds; + std::shared_ptr memtable_insert_with_hint_prefix_extractor; diff --git a/options/options.cc b/options/options.cc index ea1364416..153c5f8ef 100644 --- a/options/options.cc +++ b/options/options.cc @@ -94,6 +94,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) sample_for_compression(options.sample_for_compression), preclude_last_level_data_seconds( options.preclude_last_level_data_seconds), + preserve_internal_time_seconds(options.preserve_internal_time_seconds), enable_blob_files(options.enable_blob_files), min_blob_size(options.min_blob_size), blob_file_size(options.blob_file_size), @@ -403,6 +404,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { periodic_compaction_seconds); ROCKS_LOG_HEADER(log, " Options.preclude_last_level_data_seconds: %" PRIu64, preclude_last_level_data_seconds); + ROCKS_LOG_HEADER(log, " Options.preserve_internal_time_seconds: %" PRIu64, + preserve_internal_time_seconds); ROCKS_LOG_HEADER(log, " Options.enable_blob_files: %s", enable_blob_files ? "true" : "false"); ROCKS_LOG_HEADER( diff --git a/options/options_helper.cc b/options/options_helper.cc index 4118f3961..0d63edb89 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -314,6 +314,8 @@ void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions, cf_opts->blob_cache = ioptions.blob_cache; cf_opts->preclude_last_level_data_seconds = ioptions.preclude_last_level_data_seconds; + cf_opts->preserve_internal_time_seconds = + ioptions.preserve_internal_time_seconds; // TODO(yhchiang): find some way to handle the following derived options // * max_file_size diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index fe430718c..ff4e2acc5 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -403,6 +403,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(ColumnFamilyOptions::TablePropertiesCollectorFactories)}, {offsetof(struct ColumnFamilyOptions, preclude_last_level_data_seconds), sizeof(uint64_t)}, + {offsetof(struct ColumnFamilyOptions, preserve_internal_time_seconds), + sizeof(uint64_t)}, {offsetof(struct ColumnFamilyOptions, blob_cache), sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, comparator), sizeof(Comparator*)}, @@ -532,6 +534,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "bottommost_temperature=kWarm;" "last_level_temperature=kWarm;" "preclude_last_level_data_seconds=86400;" + "preserve_internal_time_seconds=86400;" "compaction_options_fifo={max_table_files_size=3;allow_" "compaction=false;age_for_warm=1;};" "blob_cache=1M;"