diff --git a/HISTORY.md b/HISTORY.md index 4122a82c3..a01f12fa3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,7 @@ ### New Features * Add a new option IOOptions.do_not_recurse that can be used by underlying file systems to skip recursing through sub directories and list only files in GetChildren API. +* Add option `preserve_internal_time_seconds` to preserve the time information for the latest data. Which can be used to determine the age of data when `preclude_last_level_data_seconds` is enabled. The time information is attached with SST in table property `rocksdb.seqno.time.map` which can be parsed by tool ldb or sst_dump. ### Behavior Changes * Sanitize min_write_buffer_number_to_merge to 1 if atomic flush is enabled to prevent unexpected data loss when WAL is disabled in a multi-column-family setting (#10773). diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc index a5a0d99df..7f2bed465 100644 --- a/db/compaction/compaction_iterator.cc +++ b/db/compaction/compaction_iterator.cc @@ -34,7 +34,8 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, - const SequenceNumber penultimate_level_cutoff_seqno) + const SequenceNumber preserve_time_min_seqno, + const SequenceNumber preclude_last_level_min_seqno) : CompactionIterator( input, cmp, merge_helper, last_sequence, snapshots, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, @@ -44,7 +45,7 @@ CompactionIterator::CompactionIterator( std::unique_ptr( compaction ? new RealCompaction(compaction) : nullptr), compaction_filter, shutting_down, info_log, full_history_ts_low, - penultimate_level_cutoff_seqno) {} + preserve_time_min_seqno, preclude_last_level_min_seqno) {} CompactionIterator::CompactionIterator( InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, @@ -61,7 +62,8 @@ CompactionIterator::CompactionIterator( const std::atomic* shutting_down, const std::shared_ptr info_log, const std::string* full_history_ts_low, - const SequenceNumber penultimate_level_cutoff_seqno) + const SequenceNumber preserve_time_min_seqno, + const SequenceNumber preclude_last_level_min_seqno) : input_(input, cmp, !compaction || compaction->DoesInputReferenceBlobFiles()), cmp_(cmp), @@ -105,8 +107,10 @@ CompactionIterator::CompactionIterator( current_key_committed_(false), cmp_with_history_ts_low_(0), level_(compaction_ == nullptr ? 0 : compaction_->level()), - penultimate_level_cutoff_seqno_(penultimate_level_cutoff_seqno) { + preserve_time_min_seqno_(preserve_time_min_seqno), + preclude_last_level_min_seqno_(preclude_last_level_min_seqno) { assert(snapshots_ != nullptr); + assert(preserve_time_min_seqno_ <= preclude_last_level_min_seqno_); if (compaction_ != nullptr) { level_ptrs_ = std::vector(compaction_->number_levels(), 0); @@ -1088,6 +1092,7 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() { } void CompactionIterator::DecideOutputLevel() { + assert(compaction_->SupportsPerKeyPlacement()); #ifndef NDEBUG // Could be overridden by unittest PerKeyPlacementContext context(level_, ikey_.user_key, value_, @@ -1099,7 +1104,7 @@ void CompactionIterator::DecideOutputLevel() { // if the key is newer than the cutoff sequence or within the earliest // snapshot, it should output to the penultimate level. - if (ikey_.sequence > penultimate_level_cutoff_seqno_ || + if (ikey_.sequence >= preclude_last_level_min_seqno_ || ikey_.sequence > earliest_snapshot_) { output_to_penultimate_level_ = true; } @@ -1161,7 +1166,7 @@ void CompactionIterator::PrepareOutput() { DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && ikey_.type != kTypeMerge && current_key_committed_ && !output_to_penultimate_level_ && - ikey_.sequence < penultimate_level_cutoff_seqno_) { + ikey_.sequence < preserve_time_min_seqno_) { if (ikey_.type == kTypeDeletion || (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { ROCKS_LOG_FATAL( diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h index c5b7b67bd..c215d2bbb 100644 --- a/db/compaction/compaction_iterator.h +++ b/db/compaction/compaction_iterator.h @@ -200,7 +200,8 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, const std::string* full_history_ts_low = nullptr, - const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber); + const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber, + const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber); // Constructor with custom CompactionProxy, used for tests. CompactionIterator( @@ -218,7 +219,8 @@ class CompactionIterator { const std::atomic* shutting_down = nullptr, const std::shared_ptr info_log = nullptr, const std::string* full_history_ts_low = nullptr, - const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber); + const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber, + const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber); ~CompactionIterator(); @@ -470,9 +472,12 @@ class CompactionIterator { // output to. bool output_to_penultimate_level_{false}; - // any key later than this sequence number should have - // output_to_penultimate_level_ set to true - const SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber; + // min seqno for preserving the time information. + const SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber; + + // min seqno to preclude the data from the last level, if the key seqno larger + // than this, it will be output to penultimate level + const SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber; void AdvanceInputIter() { input_.Next(); } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 442e302cb..4222187af 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -265,11 +265,15 @@ void CompactionJob::Prepare() { /*sub_job_id*/ 0); } - if (c->immutable_options()->preclude_last_level_data_seconds > 0) { - // TODO(zjay): move to a function - seqno_time_mapping_.SetMaxTimeDuration( - c->immutable_options()->preclude_last_level_data_seconds); + // collect all seqno->time information from the input files which will be used + // to encode seqno->time to the output files. + uint64_t preserve_time_duration = + std::max(c->immutable_options()->preserve_internal_time_seconds, + c->immutable_options()->preclude_last_level_data_seconds); + + if (preserve_time_duration > 0) { // setup seqno_time_mapping_ + seqno_time_mapping_.SetMaxTimeDuration(preserve_time_duration); for (const auto& each_level : *c->inputs()) { for (const auto& fmd : each_level.files) { std::shared_ptr tp; @@ -295,10 +299,27 @@ void CompactionJob::Prepare() { ROCKS_LOG_WARN(db_options_.info_log, "Failed to get current time in compaction: Status: %s", status.ToString().c_str()); - penultimate_level_cutoff_seqno_ = 0; + // preserve all time information + preserve_time_min_seqno_ = 0; + preclude_last_level_min_seqno_ = 0; } else { - penultimate_level_cutoff_seqno_ = - seqno_time_mapping_.TruncateOldEntries(_current_time); + seqno_time_mapping_.TruncateOldEntries(_current_time); + uint64_t preserve_time = + static_cast(_current_time) > preserve_time_duration + ? _current_time - preserve_time_duration + : 0; + preserve_time_min_seqno_ = + seqno_time_mapping_.GetOldestSequenceNum(preserve_time); + if (c->immutable_options()->preclude_last_level_data_seconds > 0) { + uint64_t preclude_last_level_time = + static_cast(_current_time) > + c->immutable_options()->preclude_last_level_data_seconds + ? _current_time - + c->immutable_options()->preclude_last_level_data_seconds + : 0; + preclude_last_level_min_seqno_ = + seqno_time_mapping_.GetOldestSequenceNum(preclude_last_level_time); + } } } } @@ -1216,8 +1237,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { blob_file_builder.get(), db_options_.allow_data_in_errors, db_options_.enforce_single_del_contracts, manual_compaction_canceled_, sub_compact->compaction, compaction_filter, shutting_down_, - db_options_.info_log, full_history_ts_low, - penultimate_level_cutoff_seqno_); + db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_, + preclude_last_level_min_seqno_); c_iter->SeekToFirst(); // Assign range delete aggregator to the target output level, which makes sure diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index d281b4c79..dac7de56d 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -335,12 +335,15 @@ class CompactionJob { // it also collects the smallest_seqno -> oldest_ancester_time from the SST. SeqnoToTimeMapping seqno_time_mapping_; - // cutoff sequence number for penultimate level, only set when - // per_key_placement feature is enabled. - // If a key with sequence number larger than penultimate_level_cutoff_seqno_, - // it will be placed on the penultimate_level and seqnuence number won't be - // zeroed out. - SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber; + // Minimal sequence number for preserving the time information. The time info + // older than this sequence number won't be preserved after the compaction and + // if it's bottommost compaction, the seq num will be zeroed out. + SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber; + + // Minimal sequence number to preclude the data from the last level. If the + // key has bigger (newer) sequence number than this, it will be precluded from + // the last level (output to penultimate level). + SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber; // Get table file name in where it's outputting to, which should also be in // `output_directory_`. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 5aa4b2a3d..f34cdfc15 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -847,11 +847,13 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() { InstrumentedMutexLock l(&mutex_); for (auto cfd : *versions_->GetColumnFamilySet()) { - uint64_t preclude_last_option = - cfd->ioptions()->preclude_last_level_data_seconds; - if (!cfd->IsDropped() && preclude_last_option > 0) { - min_time_duration = std::min(preclude_last_option, min_time_duration); - max_time_duration = std::max(preclude_last_option, max_time_duration); + // preserve time is the max of 2 options. + uint64_t preserve_time_duration = + std::max(cfd->ioptions()->preserve_internal_time_seconds, + cfd->ioptions()->preclude_last_level_data_seconds); + if (!cfd->IsDropped() && preserve_time_duration > 0) { + min_time_duration = std::min(preserve_time_duration, min_time_duration); + max_time_duration = std::max(preserve_time_duration, max_time_duration); } } if (min_time_duration == std::numeric_limits::max()) { @@ -3103,7 +3105,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, } } // InstrumentedMutexLock l(&mutex_) - if (cf_options.preclude_last_level_data_seconds > 0) { + if (cf_options.preserve_internal_time_seconds > 0 || + cf_options.preclude_last_level_data_seconds > 0) { s = RegisterRecordSeqnoTimeWorker(); } sv_context.Clean(); @@ -3194,7 +3197,8 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) { bg_cv_.SignalAll(); } - if (cfd->ioptions()->preclude_last_level_data_seconds > 0) { + if (cfd->ioptions()->preserve_internal_time_seconds > 0 || + cfd->ioptions()->preclude_last_level_data_seconds > 0) { s = RegisterRecordSeqnoTimeWorker(); } diff --git a/db/seqno_time_test.cc b/db/seqno_time_test.cc index ad0ac5f8a..12394a368 100644 --- a/db/seqno_time_test.cc +++ b/db/seqno_time_test.cc @@ -9,6 +9,7 @@ #include "db/seqno_to_time_mapping.h" #include "port/stack_trace.h" #include "rocksdb/iostats_context.h" +#include "rocksdb/utilities/debug.h" #include "test_util/mock_time_env.h" #ifndef ROCKSDB_LITE @@ -37,7 +38,7 @@ class SeqnoTimeTest : public DBTestBase { } // make sure the file is not in cache, otherwise it won't have IO info - void AssertKetTemperature(int key_id, Temperature expected_temperature) { + void AssertKeyTemperature(int key_id, Temperature expected_temperature) { get_iostats_context()->Reset(); IOStatsContext* iostats = get_iostats_context(); std::string result = Get(Key(key_id)); @@ -101,7 +102,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); // read a random key, which should be hot (kUnknown) - AssertKetTemperature(20, Temperature::kUnknown); + AssertKeyTemperature(20, Temperature::kUnknown); // Write more data, but still all hot until the 10th SST, as: // write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds @@ -139,7 +140,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { ASSERT_GT(hot_data_size, 0); ASSERT_GT(cold_data_size, 0); // the first a few key should be cold - AssertKetTemperature(20, Temperature::kCold); + AssertKeyTemperature(20, Temperature::kCold); for (int i = 0; i < 30; i++) { dbfull()->TEST_WaitForPeridicTaskRun([&] { @@ -148,8 +149,8 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); // the hot/cold data cut off range should be between i * 20 + 200 -> 250 - AssertKetTemperature(i * 20 + 250, Temperature::kUnknown); - AssertKetTemperature(i * 20 + 200, Temperature::kCold); + AssertKeyTemperature(i * 20 + 250, Temperature::kUnknown); + AssertKeyTemperature(i * 20 + 200, Temperature::kCold); } ASSERT_LT(GetSstSizeHelper(Temperature::kUnknown), hot_data_size); @@ -166,7 +167,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) { } // any random data close to the end should be cold - AssertKetTemperature(1000, Temperature::kCold); + AssertKeyTemperature(1000, Temperature::kCold); // close explicitly, because the env is local variable which will be released // first. @@ -215,7 +216,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); // read a random key, which should be hot (kUnknown) - AssertKetTemperature(20, Temperature::kUnknown); + AssertKeyTemperature(20, Temperature::kUnknown); // Adding more data to have mixed hot and cold data for (; sst_num < 14; sst_num++) { @@ -237,7 +238,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { ASSERT_GT(hot_data_size, 0); ASSERT_GT(cold_data_size, 0); // the first a few key should be cold - AssertKetTemperature(20, Temperature::kCold); + AssertKeyTemperature(20, Temperature::kCold); // Wait some time, with each wait, the cold data is increasing and hot data is // decreasing @@ -253,8 +254,8 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { ASSERT_GT(cold_data_size, pre_cold); // the hot/cold cut_off key should be around i * 20 + 400 -> 450 - AssertKetTemperature(i * 20 + 450, Temperature::kUnknown); - AssertKetTemperature(i * 20 + 400, Temperature::kCold); + AssertKeyTemperature(i * 20 + 450, Temperature::kUnknown); + AssertKeyTemperature(i * 20 + 400, Temperature::kCold); } // Wait again, the most of the data should be cold after that @@ -267,14 +268,53 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) { } // any random data close to the end should be cold - AssertKetTemperature(1000, Temperature::kCold); + AssertKeyTemperature(1000, Temperature::kCold); Close(); } -TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { +enum class SeqnoTimeTestType : char { + kTrackInternalTimeSeconds = 0, + kPrecludeLastLevel = 1, + kBothSetTrackSmaller = 2, +}; + +class SeqnoTimeTablePropTest + : public SeqnoTimeTest, + public ::testing::WithParamInterface { + public: + SeqnoTimeTablePropTest() : SeqnoTimeTest() {} + + void SetTrackTimeDurationOptions(uint64_t track_time_duration, + Options& options) const { + // either option set will enable the time tracking feature + switch (GetParam()) { + case SeqnoTimeTestType::kTrackInternalTimeSeconds: + options.preclude_last_level_data_seconds = 0; + options.preserve_internal_time_seconds = track_time_duration; + break; + case SeqnoTimeTestType::kPrecludeLastLevel: + options.preclude_last_level_data_seconds = track_time_duration; + options.preserve_internal_time_seconds = 0; + break; + case SeqnoTimeTestType::kBothSetTrackSmaller: + options.preclude_last_level_data_seconds = track_time_duration; + options.preserve_internal_time_seconds = track_time_duration / 10; + break; + } + } +}; + +INSTANTIATE_TEST_CASE_P( + SeqnoTimeTablePropTest, SeqnoTimeTablePropTest, + ::testing::Values(SeqnoTimeTestType::kTrackInternalTimeSeconds, + SeqnoTimeTestType::kPrecludeLastLevel, + SeqnoTimeTestType::kBothSetTrackSmaller)); + +TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) { Options options = CurrentOptions(); - options.preclude_last_level_data_seconds = 10000; + SetTrackTimeDurationOptions(10000, options); + options.env = mock_env_.get(); options.disable_auto_compactions = true; DestroyAndReopen(options); @@ -297,6 +337,8 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { ASSERT_OK(tp_mapping.Sort()); ASSERT_FALSE(tp_mapping.Empty()); auto seqs = tp_mapping.TEST_GetInternalMapping(); + // about ~20 seqs->time entries, because the sample rate is 10000/100, and it + // passes 2k time. ASSERT_GE(seqs.size(), 19); ASSERT_LE(seqs.size(), 21); SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber(); @@ -444,7 +486,8 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { ASSERT_LE(seqs.size(), 101); for (auto i = start_seq; i < seq_end - 99; i++) { // likely the first 100 entries reports 0 - ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 3000); + ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), + (i - start_seq) * 100 + 50000); } start_seq += 101; @@ -457,9 +500,10 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { ASSERT_OK(db_->Close()); } -TEST_F(SeqnoTimeTest, MultiCFs) { +TEST_P(SeqnoTimeTablePropTest, MultiCFs) { Options options = CurrentOptions(); options.preclude_last_level_data_seconds = 0; + options.preserve_internal_time_seconds = 0; options.env = mock_env_.get(); options.stats_dump_period_sec = 0; options.stats_persist_period_sec = 0; @@ -485,7 +529,7 @@ TEST_F(SeqnoTimeTest, MultiCFs) { ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty()); Options options_1 = options; - options_1.preclude_last_level_data_seconds = 10000; // 10k + SetTrackTimeDurationOptions(10000, options_1); CreateColumnFamilies({"one"}, options_1); ASSERT_TRUE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); @@ -514,11 +558,11 @@ TEST_F(SeqnoTimeTest, MultiCFs) { ASSERT_FALSE(tp_mapping.Empty()); auto seqs = tp_mapping.TEST_GetInternalMapping(); ASSERT_GE(seqs.size(), 1); - ASSERT_LE(seqs.size(), 3); + ASSERT_LE(seqs.size(), 4); // Create one more CF with larger preclude_last_level time Options options_2 = options; - options_2.preclude_last_level_data_seconds = 1000000; // 1m + SetTrackTimeDurationOptions(1000000, options_2); // 1m CreateColumnFamilies({"two"}, options_2); // Add more data to CF "two" to fill the in memory mapping @@ -618,11 +662,11 @@ TEST_F(SeqnoTimeTest, MultiCFs) { Close(); } -TEST_F(SeqnoTimeTest, MultiInstancesBasic) { +TEST_P(SeqnoTimeTablePropTest, MultiInstancesBasic) { const int kInstanceNum = 2; Options options = CurrentOptions(); - options.preclude_last_level_data_seconds = 10000; + SetTrackTimeDurationOptions(10000, options); options.env = mock_env_.get(); options.stats_dump_period_sec = 0; options.stats_persist_period_sec = 0; @@ -650,17 +694,32 @@ TEST_F(SeqnoTimeTest, MultiInstancesBasic) { } } -TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { +TEST_P(SeqnoTimeTablePropTest, SeqnoToTimeMappingUniversal) { + const int kNumTrigger = 4; + const int kNumLevels = 7; + const int kNumKeys = 100; + Options options = CurrentOptions(); + SetTrackTimeDurationOptions(10000, options); options.compaction_style = kCompactionStyleUniversal; - options.preclude_last_level_data_seconds = 10000; + options.num_levels = kNumLevels; options.env = mock_env_.get(); DestroyAndReopen(options); - for (int j = 0; j < 3; j++) { - for (int i = 0; i < 100; i++) { - ASSERT_OK(Put(Key(i), "value")); + std::atomic_uint64_t num_seqno_zeroing{0}; + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "CompactionIterator::PrepareOutput:ZeroingSeq", + [&](void* /*arg*/) { num_seqno_zeroing++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + int sst_num = 0; + for (; sst_num < kNumTrigger - 1; sst_num++) { + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } @@ -681,11 +740,12 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { } // Trigger a compaction - for (int i = 0; i < 100; i++) { - ASSERT_OK(Put(Key(i), "value")); + for (int i = 0; i < kNumKeys; i++) { + ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value")); dbfull()->TEST_WaitForPeridicTaskRun( [&] { mock_clock_->MockSleepForSeconds(static_cast(10)); }); } + sst_num++; ASSERT_OK(Flush()); ASSERT_OK(dbfull()->TEST_WaitForCompact()); tables_props.clear(); @@ -696,6 +756,73 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { SeqnoToTimeMapping tp_mapping; ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + + // compact to the last level + CompactRangeOptions cro; + cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + // make sure the data is all compacted to penultimate level if the feature is + // on, otherwise, compacted to the last level. + if (options.preclude_last_level_data_seconds > 0) { + ASSERT_GT(NumTableFilesAtLevel(5), 0); + ASSERT_EQ(NumTableFilesAtLevel(6), 0); + } else { + ASSERT_EQ(NumTableFilesAtLevel(5), 0); + ASSERT_GT(NumTableFilesAtLevel(6), 0); + } + + // regardless the file is on the last level or not, it should keep the time + // information and sequence number are not set + tables_props.clear(); + tp_mapping.Clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + + ASSERT_EQ(tables_props.size(), 1); + ASSERT_EQ(num_seqno_zeroing, 0); + + it = tables_props.begin(); + ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); + ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); + + // make half of the data expired + mock_clock_->MockSleepForSeconds(static_cast(8000)); + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + tables_props.clear(); + tp_mapping.Clear(); + ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props)); + + if (options.preclude_last_level_data_seconds > 0) { + ASSERT_EQ(tables_props.size(), 2); + } else { + ASSERT_EQ(tables_props.size(), 1); + } + ASSERT_GT(num_seqno_zeroing, 0); + std::vector key_versions; + ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(), + std::numeric_limits::max(), + &key_versions)); + // make sure there're more than 300 keys and first 100 keys are having seqno + // zeroed out, the last 100 key seqno not zeroed out + ASSERT_GT(key_versions.size(), 300); + for (int i = 0; i < 100; i++) { + ASSERT_EQ(key_versions[i].sequence, 0); + } + auto rit = key_versions.rbegin(); + for (int i = 0; i < 100; i++) { + ASSERT_GT(rit->sequence, 0); + rit++; + } + + // make all data expired and compact again to push it to the last level + // regardless if the tiering feature is enabled or not + mock_clock_->MockSleepForSeconds(static_cast(20000)); + + ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); + + ASSERT_GT(num_seqno_zeroing, 0); + ASSERT_GT(NumTableFilesAtLevel(6), 0); + Close(); } diff --git a/db/seqno_to_time_mapping.cc b/db/seqno_to_time_mapping.cc index a9d3dc36c..c69209929 100644 --- a/db/seqno_to_time_mapping.cc +++ b/db/seqno_to_time_mapping.cc @@ -31,11 +31,11 @@ void SeqnoToTimeMapping::Add(SequenceNumber seqno, uint64_t time) { seqno_time_mapping_.emplace_back(seqno, time); } -SequenceNumber SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { +void SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { assert(is_sorted_); if (max_time_duration_ == 0) { - return 0; + return; } const uint64_t cut_off_time = @@ -48,12 +48,25 @@ SequenceNumber SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { return target < other.time; }); if (it == seqno_time_mapping_.begin()) { - return 0; + return; } it--; seqno_time_mapping_.erase(seqno_time_mapping_.begin(), it); +} - return seqno_time_mapping_.front().seqno; +SequenceNumber SeqnoToTimeMapping::GetOldestSequenceNum(uint64_t time) { + assert(is_sorted_); + + auto it = std::upper_bound( + seqno_time_mapping_.begin(), seqno_time_mapping_.end(), time, + [](uint64_t target, const SeqnoTimePair& other) -> bool { + return target < other.time; + }); + if (it == seqno_time_mapping_.begin()) { + return 0; + } + it--; + return it->seqno; } // The encoded format is: @@ -94,6 +107,10 @@ void SeqnoToTimeMapping::Encode(std::string& dest, const SequenceNumber start, start_it++; } } + // to include the first element + if (start_it != seqno_time_mapping_.begin()) { + start_it--; + } // If there are more data than needed, pick the entries for encoding. // It's not the most optimized algorithm for selecting the best representative diff --git a/db/seqno_to_time_mapping.h b/db/seqno_to_time_mapping.h index ad32a6517..4ffc9c199 100644 --- a/db/seqno_to_time_mapping.h +++ b/db/seqno_to_time_mapping.h @@ -107,7 +107,10 @@ class SeqnoToTimeMapping { uint64_t GetOldestApproximateTime(SequenceNumber seqno) const; // Truncate the old entries based on the current time and max_time_duration_ - SequenceNumber TruncateOldEntries(uint64_t now); + void TruncateOldEntries(uint64_t now); + + // Given a time, return it's oldest possible sequence number + SequenceNumber GetOldestSequenceNum(uint64_t time); // Encode to a binary string void Encode(std::string& des, SequenceNumber start, SequenceNumber end, diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 144b5c61e..a768f5dd4 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -911,8 +911,33 @@ struct AdvancedColumnFamilyOptions { // size constrained, the size amp is going to be only for non-last levels. // // Default: 0 (disable the feature) + // + // Not dynamically changeable, change it requires db restart. uint64_t preclude_last_level_data_seconds = 0; + // EXPERIMENTAL + // If this option is set, it will preserve the internal time information about + // the data until it's older than the specified time here. + // Internally the time information is a map between sequence number and time, + // which is the same as `preclude_last_level_data_seconds`. But it won't + // preclude the data from the last level and the data in the last level won't + // have the sequence number zeroed out. + // Internally, rocksdb would sample the sequence number to time pair and store + // that in SST property "rocksdb.seqno.time.map". The information is currently + // only used for tiered storage compaction (option + // `preclude_last_level_data_seconds`). + // + // Note: if both `preclude_last_level_data_seconds` and this option is set, it + // will preserve the max time of the 2 options and compaction still preclude + // the data based on `preclude_last_level_data_seconds`. + // The higher the preserve_time is, the less the sampling frequency will be ( + // which means less accuracy of the time estimation). + // + // Default: 0 (disable the feature) + // + // Not dynamically changeable, change it requires db restart. + uint64_t preserve_internal_time_seconds = 0; + // When set, large values (blobs) are written to separate blob files, and // only pointers to them are stored in SST files. This can reduce write // amplification for large-value use cases at the cost of introducing a level diff --git a/options/cf_options.cc b/options/cf_options.cc index 5ae6a86e4..f34e8849e 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -567,6 +567,10 @@ static std::unordered_map {offsetof(struct ImmutableCFOptions, preclude_last_level_data_seconds), OptionType::kUInt64T, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, + {"preserve_internal_time_seconds", + {offsetof(struct ImmutableCFOptions, preserve_internal_time_seconds), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone}}, // Need to keep this around to be able to read old OPTIONS files. {"max_mem_compaction_level", {0, OptionType::kInt, OptionVerificationType::kDeprecated, @@ -904,6 +908,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ColumnFamilyOptions& cf_options) force_consistency_checks(cf_options.force_consistency_checks), preclude_last_level_data_seconds( cf_options.preclude_last_level_data_seconds), + preserve_internal_time_seconds(cf_options.preserve_internal_time_seconds), memtable_insert_with_hint_prefix_extractor( cf_options.memtable_insert_with_hint_prefix_extractor), cf_paths(cf_options.cf_paths), diff --git a/options/cf_options.h b/options/cf_options.h index da6b7252a..5ac4c0c8e 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -74,6 +74,8 @@ struct ImmutableCFOptions { uint64_t preclude_last_level_data_seconds; + uint64_t preserve_internal_time_seconds; + std::shared_ptr memtable_insert_with_hint_prefix_extractor; diff --git a/options/options.cc b/options/options.cc index ea1364416..153c5f8ef 100644 --- a/options/options.cc +++ b/options/options.cc @@ -94,6 +94,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options) sample_for_compression(options.sample_for_compression), preclude_last_level_data_seconds( options.preclude_last_level_data_seconds), + preserve_internal_time_seconds(options.preserve_internal_time_seconds), enable_blob_files(options.enable_blob_files), min_blob_size(options.min_blob_size), blob_file_size(options.blob_file_size), @@ -403,6 +404,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { periodic_compaction_seconds); ROCKS_LOG_HEADER(log, " Options.preclude_last_level_data_seconds: %" PRIu64, preclude_last_level_data_seconds); + ROCKS_LOG_HEADER(log, " Options.preserve_internal_time_seconds: %" PRIu64, + preserve_internal_time_seconds); ROCKS_LOG_HEADER(log, " Options.enable_blob_files: %s", enable_blob_files ? "true" : "false"); ROCKS_LOG_HEADER( diff --git a/options/options_helper.cc b/options/options_helper.cc index 4118f3961..0d63edb89 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -314,6 +314,8 @@ void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions, cf_opts->blob_cache = ioptions.blob_cache; cf_opts->preclude_last_level_data_seconds = ioptions.preclude_last_level_data_seconds; + cf_opts->preserve_internal_time_seconds = + ioptions.preserve_internal_time_seconds; // TODO(yhchiang): find some way to handle the following derived options // * max_file_size diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index fe430718c..ff4e2acc5 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -403,6 +403,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(ColumnFamilyOptions::TablePropertiesCollectorFactories)}, {offsetof(struct ColumnFamilyOptions, preclude_last_level_data_seconds), sizeof(uint64_t)}, + {offsetof(struct ColumnFamilyOptions, preserve_internal_time_seconds), + sizeof(uint64_t)}, {offsetof(struct ColumnFamilyOptions, blob_cache), sizeof(std::shared_ptr)}, {offsetof(struct ColumnFamilyOptions, comparator), sizeof(Comparator*)}, @@ -532,6 +534,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { "bottommost_temperature=kWarm;" "last_level_temperature=kWarm;" "preclude_last_level_data_seconds=86400;" + "preserve_internal_time_seconds=86400;" "compaction_options_fifo={max_table_files_size=3;allow_" "compaction=false;age_for_warm=1;};" "blob_cache=1M;"