Add option `preserve_internal_time_seconds` to preserve the time info (#10747)

Summary:
Add option `preserve_internal_time_seconds` to preserve the internal
time information.
It's mostly for the migration of the existing data to tiered storage (
`preclude_last_level_data_seconds`). When the tiering feature is just
enabled, the existing data won't have the time information to decide if
it's hot or cold. Enabling this feature will start collect and preserve
the time information for the new data.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10747

Reviewed By: siying

Differential Revision: D39910141

Pulled By: siying

fbshipit-source-id: 25c21638e37b1a7c44006f636b7d714fe7242138
main
Jay Zhuang 2 years ago committed by Facebook GitHub Bot
parent f366f90bdb
commit c401f285c3
  1. 1
      HISTORY.md
  2. 17
      db/compaction/compaction_iterator.cc
  3. 15
      db/compaction/compaction_iterator.h
  4. 35
      db/compaction/compaction_job.cc
  5. 15
      db/compaction/compaction_job.h
  6. 18
      db/db_impl/db_impl.cc
  7. 181
      db/seqno_time_test.cc
  8. 25
      db/seqno_to_time_mapping.cc
  9. 5
      db/seqno_to_time_mapping.h
  10. 25
      include/rocksdb/advanced_options.h
  11. 5
      options/cf_options.cc
  12. 2
      options/cf_options.h
  13. 3
      options/options.cc
  14. 2
      options/options_helper.cc
  15. 3
      options/options_settable_test.cc

@ -19,6 +19,7 @@
### New Features ### New Features
* Add a new option IOOptions.do_not_recurse that can be used by underlying file systems to skip recursing through sub directories and list only files in GetChildren API. * Add a new option IOOptions.do_not_recurse that can be used by underlying file systems to skip recursing through sub directories and list only files in GetChildren API.
* Add option `preserve_internal_time_seconds` to preserve the time information for the latest data. Which can be used to determine the age of data when `preclude_last_level_data_seconds` is enabled. The time information is attached with SST in table property `rocksdb.seqno.time.map` which can be parsed by tool ldb or sst_dump.
### Behavior Changes ### Behavior Changes
* Sanitize min_write_buffer_number_to_merge to 1 if atomic flush is enabled to prevent unexpected data loss when WAL is disabled in a multi-column-family setting (#10773). * Sanitize min_write_buffer_number_to_merge to 1 if atomic flush is enabled to prevent unexpected data loss when WAL is disabled in a multi-column-family setting (#10773).

@ -34,7 +34,8 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const std::shared_ptr<Logger> info_log, const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low, const std::string* full_history_ts_low,
const SequenceNumber penultimate_level_cutoff_seqno) const SequenceNumber preserve_time_min_seqno,
const SequenceNumber preclude_last_level_min_seqno)
: CompactionIterator( : CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots, input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env,
@ -44,7 +45,7 @@ CompactionIterator::CompactionIterator(
std::unique_ptr<CompactionProxy>( std::unique_ptr<CompactionProxy>(
compaction ? new RealCompaction(compaction) : nullptr), compaction ? new RealCompaction(compaction) : nullptr),
compaction_filter, shutting_down, info_log, full_history_ts_low, compaction_filter, shutting_down, info_log, full_history_ts_low,
penultimate_level_cutoff_seqno) {} preserve_time_min_seqno, preclude_last_level_min_seqno) {}
CompactionIterator::CompactionIterator( CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
@ -61,7 +62,8 @@ CompactionIterator::CompactionIterator(
const std::atomic<bool>* shutting_down, const std::atomic<bool>* shutting_down,
const std::shared_ptr<Logger> info_log, const std::shared_ptr<Logger> info_log,
const std::string* full_history_ts_low, const std::string* full_history_ts_low,
const SequenceNumber penultimate_level_cutoff_seqno) const SequenceNumber preserve_time_min_seqno,
const SequenceNumber preclude_last_level_min_seqno)
: input_(input, cmp, : input_(input, cmp,
!compaction || compaction->DoesInputReferenceBlobFiles()), !compaction || compaction->DoesInputReferenceBlobFiles()),
cmp_(cmp), cmp_(cmp),
@ -105,8 +107,10 @@ CompactionIterator::CompactionIterator(
current_key_committed_(false), current_key_committed_(false),
cmp_with_history_ts_low_(0), cmp_with_history_ts_low_(0),
level_(compaction_ == nullptr ? 0 : compaction_->level()), level_(compaction_ == nullptr ? 0 : compaction_->level()),
penultimate_level_cutoff_seqno_(penultimate_level_cutoff_seqno) { preserve_time_min_seqno_(preserve_time_min_seqno),
preclude_last_level_min_seqno_(preclude_last_level_min_seqno) {
assert(snapshots_ != nullptr); assert(snapshots_ != nullptr);
assert(preserve_time_min_seqno_ <= preclude_last_level_min_seqno_);
if (compaction_ != nullptr) { if (compaction_ != nullptr) {
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0); level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
@ -1088,6 +1092,7 @@ void CompactionIterator::GarbageCollectBlobIfNeeded() {
} }
void CompactionIterator::DecideOutputLevel() { void CompactionIterator::DecideOutputLevel() {
assert(compaction_->SupportsPerKeyPlacement());
#ifndef NDEBUG #ifndef NDEBUG
// Could be overridden by unittest // Could be overridden by unittest
PerKeyPlacementContext context(level_, ikey_.user_key, value_, PerKeyPlacementContext context(level_, ikey_.user_key, value_,
@ -1099,7 +1104,7 @@ void CompactionIterator::DecideOutputLevel() {
// if the key is newer than the cutoff sequence or within the earliest // if the key is newer than the cutoff sequence or within the earliest
// snapshot, it should output to the penultimate level. // snapshot, it should output to the penultimate level.
if (ikey_.sequence > penultimate_level_cutoff_seqno_ || if (ikey_.sequence >= preclude_last_level_min_seqno_ ||
ikey_.sequence > earliest_snapshot_) { ikey_.sequence > earliest_snapshot_) {
output_to_penultimate_level_ = true; output_to_penultimate_level_ = true;
} }
@ -1161,7 +1166,7 @@ void CompactionIterator::PrepareOutput() {
DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) &&
ikey_.type != kTypeMerge && current_key_committed_ && ikey_.type != kTypeMerge && current_key_committed_ &&
!output_to_penultimate_level_ && !output_to_penultimate_level_ &&
ikey_.sequence < penultimate_level_cutoff_seqno_) { ikey_.sequence < preserve_time_min_seqno_) {
if (ikey_.type == kTypeDeletion || if (ikey_.type == kTypeDeletion ||
(ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) {
ROCKS_LOG_FATAL( ROCKS_LOG_FATAL(

@ -200,7 +200,8 @@ class CompactionIterator {
const std::atomic<bool>* shutting_down = nullptr, const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr, const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr, const std::string* full_history_ts_low = nullptr,
const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber); const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
// Constructor with custom CompactionProxy, used for tests. // Constructor with custom CompactionProxy, used for tests.
CompactionIterator( CompactionIterator(
@ -218,7 +219,8 @@ class CompactionIterator {
const std::atomic<bool>* shutting_down = nullptr, const std::atomic<bool>* shutting_down = nullptr,
const std::shared_ptr<Logger> info_log = nullptr, const std::shared_ptr<Logger> info_log = nullptr,
const std::string* full_history_ts_low = nullptr, const std::string* full_history_ts_low = nullptr,
const SequenceNumber penultimate_level_cutoff_seqno = kMaxSequenceNumber); const SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber,
const SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber);
~CompactionIterator(); ~CompactionIterator();
@ -470,9 +472,12 @@ class CompactionIterator {
// output to. // output to.
bool output_to_penultimate_level_{false}; bool output_to_penultimate_level_{false};
// any key later than this sequence number should have // min seqno for preserving the time information.
// output_to_penultimate_level_ set to true const SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber;
const SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber;
// min seqno to preclude the data from the last level, if the key seqno larger
// than this, it will be output to penultimate level
const SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
void AdvanceInputIter() { input_.Next(); } void AdvanceInputIter() { input_.Next(); }

@ -265,11 +265,15 @@ void CompactionJob::Prepare() {
/*sub_job_id*/ 0); /*sub_job_id*/ 0);
} }
if (c->immutable_options()->preclude_last_level_data_seconds > 0) { // collect all seqno->time information from the input files which will be used
// TODO(zjay): move to a function // to encode seqno->time to the output files.
seqno_time_mapping_.SetMaxTimeDuration( uint64_t preserve_time_duration =
std::max(c->immutable_options()->preserve_internal_time_seconds,
c->immutable_options()->preclude_last_level_data_seconds); c->immutable_options()->preclude_last_level_data_seconds);
if (preserve_time_duration > 0) {
// setup seqno_time_mapping_ // setup seqno_time_mapping_
seqno_time_mapping_.SetMaxTimeDuration(preserve_time_duration);
for (const auto& each_level : *c->inputs()) { for (const auto& each_level : *c->inputs()) {
for (const auto& fmd : each_level.files) { for (const auto& fmd : each_level.files) {
std::shared_ptr<const TableProperties> tp; std::shared_ptr<const TableProperties> tp;
@ -295,10 +299,27 @@ void CompactionJob::Prepare() {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"Failed to get current time in compaction: Status: %s", "Failed to get current time in compaction: Status: %s",
status.ToString().c_str()); status.ToString().c_str());
penultimate_level_cutoff_seqno_ = 0; // preserve all time information
preserve_time_min_seqno_ = 0;
preclude_last_level_min_seqno_ = 0;
} else { } else {
penultimate_level_cutoff_seqno_ =
seqno_time_mapping_.TruncateOldEntries(_current_time); seqno_time_mapping_.TruncateOldEntries(_current_time);
uint64_t preserve_time =
static_cast<uint64_t>(_current_time) > preserve_time_duration
? _current_time - preserve_time_duration
: 0;
preserve_time_min_seqno_ =
seqno_time_mapping_.GetOldestSequenceNum(preserve_time);
if (c->immutable_options()->preclude_last_level_data_seconds > 0) {
uint64_t preclude_last_level_time =
static_cast<uint64_t>(_current_time) >
c->immutable_options()->preclude_last_level_data_seconds
? _current_time -
c->immutable_options()->preclude_last_level_data_seconds
: 0;
preclude_last_level_min_seqno_ =
seqno_time_mapping_.GetOldestSequenceNum(preclude_last_level_time);
}
} }
} }
} }
@ -1216,8 +1237,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
blob_file_builder.get(), db_options_.allow_data_in_errors, blob_file_builder.get(), db_options_.allow_data_in_errors,
db_options_.enforce_single_del_contracts, manual_compaction_canceled_, db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
sub_compact->compaction, compaction_filter, shutting_down_, sub_compact->compaction, compaction_filter, shutting_down_,
db_options_.info_log, full_history_ts_low, db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_,
penultimate_level_cutoff_seqno_); preclude_last_level_min_seqno_);
c_iter->SeekToFirst(); c_iter->SeekToFirst();
// Assign range delete aggregator to the target output level, which makes sure // Assign range delete aggregator to the target output level, which makes sure

@ -335,12 +335,15 @@ class CompactionJob {
// it also collects the smallest_seqno -> oldest_ancester_time from the SST. // it also collects the smallest_seqno -> oldest_ancester_time from the SST.
SeqnoToTimeMapping seqno_time_mapping_; SeqnoToTimeMapping seqno_time_mapping_;
// cutoff sequence number for penultimate level, only set when // Minimal sequence number for preserving the time information. The time info
// per_key_placement feature is enabled. // older than this sequence number won't be preserved after the compaction and
// If a key with sequence number larger than penultimate_level_cutoff_seqno_, // if it's bottommost compaction, the seq num will be zeroed out.
// it will be placed on the penultimate_level and seqnuence number won't be SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber;
// zeroed out.
SequenceNumber penultimate_level_cutoff_seqno_ = kMaxSequenceNumber; // Minimal sequence number to preclude the data from the last level. If the
// key has bigger (newer) sequence number than this, it will be precluded from
// the last level (output to penultimate level).
SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
// Get table file name in where it's outputting to, which should also be in // Get table file name in where it's outputting to, which should also be in
// `output_directory_`. // `output_directory_`.

@ -847,11 +847,13 @@ Status DBImpl::RegisterRecordSeqnoTimeWorker() {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
uint64_t preclude_last_option = // preserve time is the max of 2 options.
cfd->ioptions()->preclude_last_level_data_seconds; uint64_t preserve_time_duration =
if (!cfd->IsDropped() && preclude_last_option > 0) { std::max(cfd->ioptions()->preserve_internal_time_seconds,
min_time_duration = std::min(preclude_last_option, min_time_duration); cfd->ioptions()->preclude_last_level_data_seconds);
max_time_duration = std::max(preclude_last_option, max_time_duration); if (!cfd->IsDropped() && preserve_time_duration > 0) {
min_time_duration = std::min(preserve_time_duration, min_time_duration);
max_time_duration = std::max(preserve_time_duration, max_time_duration);
} }
} }
if (min_time_duration == std::numeric_limits<uint64_t>::max()) { if (min_time_duration == std::numeric_limits<uint64_t>::max()) {
@ -3103,7 +3105,8 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
} }
} // InstrumentedMutexLock l(&mutex_) } // InstrumentedMutexLock l(&mutex_)
if (cf_options.preclude_last_level_data_seconds > 0) { if (cf_options.preserve_internal_time_seconds > 0 ||
cf_options.preclude_last_level_data_seconds > 0) {
s = RegisterRecordSeqnoTimeWorker(); s = RegisterRecordSeqnoTimeWorker();
} }
sv_context.Clean(); sv_context.Clean();
@ -3194,7 +3197,8 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) {
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
if (cfd->ioptions()->preclude_last_level_data_seconds > 0) { if (cfd->ioptions()->preserve_internal_time_seconds > 0 ||
cfd->ioptions()->preclude_last_level_data_seconds > 0) {
s = RegisterRecordSeqnoTimeWorker(); s = RegisterRecordSeqnoTimeWorker();
} }

@ -9,6 +9,7 @@
#include "db/seqno_to_time_mapping.h" #include "db/seqno_to_time_mapping.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/iostats_context.h" #include "rocksdb/iostats_context.h"
#include "rocksdb/utilities/debug.h"
#include "test_util/mock_time_env.h" #include "test_util/mock_time_env.h"
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -37,7 +38,7 @@ class SeqnoTimeTest : public DBTestBase {
} }
// make sure the file is not in cache, otherwise it won't have IO info // make sure the file is not in cache, otherwise it won't have IO info
void AssertKetTemperature(int key_id, Temperature expected_temperature) { void AssertKeyTemperature(int key_id, Temperature expected_temperature) {
get_iostats_context()->Reset(); get_iostats_context()->Reset();
IOStatsContext* iostats = get_iostats_context(); IOStatsContext* iostats = get_iostats_context();
std::string result = Get(Key(key_id)); std::string result = Get(Key(key_id));
@ -101,7 +102,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) {
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
// read a random key, which should be hot (kUnknown) // read a random key, which should be hot (kUnknown)
AssertKetTemperature(20, Temperature::kUnknown); AssertKeyTemperature(20, Temperature::kUnknown);
// Write more data, but still all hot until the 10th SST, as: // Write more data, but still all hot until the 10th SST, as:
// write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds // write a key every 10 seconds, 100 keys per SST, each SST takes 1000 seconds
@ -139,7 +140,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) {
ASSERT_GT(hot_data_size, 0); ASSERT_GT(hot_data_size, 0);
ASSERT_GT(cold_data_size, 0); ASSERT_GT(cold_data_size, 0);
// the first a few key should be cold // the first a few key should be cold
AssertKetTemperature(20, Temperature::kCold); AssertKeyTemperature(20, Temperature::kCold);
for (int i = 0; i < 30; i++) { for (int i = 0; i < 30; i++) {
dbfull()->TEST_WaitForPeridicTaskRun([&] { dbfull()->TEST_WaitForPeridicTaskRun([&] {
@ -148,8 +149,8 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) {
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// the hot/cold data cut off range should be between i * 20 + 200 -> 250 // the hot/cold data cut off range should be between i * 20 + 200 -> 250
AssertKetTemperature(i * 20 + 250, Temperature::kUnknown); AssertKeyTemperature(i * 20 + 250, Temperature::kUnknown);
AssertKetTemperature(i * 20 + 200, Temperature::kCold); AssertKeyTemperature(i * 20 + 200, Temperature::kCold);
} }
ASSERT_LT(GetSstSizeHelper(Temperature::kUnknown), hot_data_size); ASSERT_LT(GetSstSizeHelper(Temperature::kUnknown), hot_data_size);
@ -166,7 +167,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicUniversal) {
} }
// any random data close to the end should be cold // any random data close to the end should be cold
AssertKetTemperature(1000, Temperature::kCold); AssertKeyTemperature(1000, Temperature::kCold);
// close explicitly, because the env is local variable which will be released // close explicitly, because the env is local variable which will be released
// first. // first.
@ -215,7 +216,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) {
ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0); ASSERT_EQ(GetSstSizeHelper(Temperature::kCold), 0);
// read a random key, which should be hot (kUnknown) // read a random key, which should be hot (kUnknown)
AssertKetTemperature(20, Temperature::kUnknown); AssertKeyTemperature(20, Temperature::kUnknown);
// Adding more data to have mixed hot and cold data // Adding more data to have mixed hot and cold data
for (; sst_num < 14; sst_num++) { for (; sst_num < 14; sst_num++) {
@ -237,7 +238,7 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) {
ASSERT_GT(hot_data_size, 0); ASSERT_GT(hot_data_size, 0);
ASSERT_GT(cold_data_size, 0); ASSERT_GT(cold_data_size, 0);
// the first a few key should be cold // the first a few key should be cold
AssertKetTemperature(20, Temperature::kCold); AssertKeyTemperature(20, Temperature::kCold);
// Wait some time, with each wait, the cold data is increasing and hot data is // Wait some time, with each wait, the cold data is increasing and hot data is
// decreasing // decreasing
@ -253,8 +254,8 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) {
ASSERT_GT(cold_data_size, pre_cold); ASSERT_GT(cold_data_size, pre_cold);
// the hot/cold cut_off key should be around i * 20 + 400 -> 450 // the hot/cold cut_off key should be around i * 20 + 400 -> 450
AssertKetTemperature(i * 20 + 450, Temperature::kUnknown); AssertKeyTemperature(i * 20 + 450, Temperature::kUnknown);
AssertKetTemperature(i * 20 + 400, Temperature::kCold); AssertKeyTemperature(i * 20 + 400, Temperature::kCold);
} }
// Wait again, the most of the data should be cold after that // Wait again, the most of the data should be cold after that
@ -267,14 +268,53 @@ TEST_F(SeqnoTimeTest, TemperatureBasicLevel) {
} }
// any random data close to the end should be cold // any random data close to the end should be cold
AssertKetTemperature(1000, Temperature::kCold); AssertKeyTemperature(1000, Temperature::kCold);
Close(); Close();
} }
TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) { enum class SeqnoTimeTestType : char {
kTrackInternalTimeSeconds = 0,
kPrecludeLastLevel = 1,
kBothSetTrackSmaller = 2,
};
class SeqnoTimeTablePropTest
: public SeqnoTimeTest,
public ::testing::WithParamInterface<SeqnoTimeTestType> {
public:
SeqnoTimeTablePropTest() : SeqnoTimeTest() {}
void SetTrackTimeDurationOptions(uint64_t track_time_duration,
Options& options) const {
// either option set will enable the time tracking feature
switch (GetParam()) {
case SeqnoTimeTestType::kTrackInternalTimeSeconds:
options.preclude_last_level_data_seconds = 0;
options.preserve_internal_time_seconds = track_time_duration;
break;
case SeqnoTimeTestType::kPrecludeLastLevel:
options.preclude_last_level_data_seconds = track_time_duration;
options.preserve_internal_time_seconds = 0;
break;
case SeqnoTimeTestType::kBothSetTrackSmaller:
options.preclude_last_level_data_seconds = track_time_duration;
options.preserve_internal_time_seconds = track_time_duration / 10;
break;
}
}
};
INSTANTIATE_TEST_CASE_P(
SeqnoTimeTablePropTest, SeqnoTimeTablePropTest,
::testing::Values(SeqnoTimeTestType::kTrackInternalTimeSeconds,
SeqnoTimeTestType::kPrecludeLastLevel,
SeqnoTimeTestType::kBothSetTrackSmaller));
TEST_P(SeqnoTimeTablePropTest, BasicSeqnoToTimeMapping) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.preclude_last_level_data_seconds = 10000; SetTrackTimeDurationOptions(10000, options);
options.env = mock_env_.get(); options.env = mock_env_.get();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
DestroyAndReopen(options); DestroyAndReopen(options);
@ -297,6 +337,8 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) {
ASSERT_OK(tp_mapping.Sort()); ASSERT_OK(tp_mapping.Sort());
ASSERT_FALSE(tp_mapping.Empty()); ASSERT_FALSE(tp_mapping.Empty());
auto seqs = tp_mapping.TEST_GetInternalMapping(); auto seqs = tp_mapping.TEST_GetInternalMapping();
// about ~20 seqs->time entries, because the sample rate is 10000/100, and it
// passes 2k time.
ASSERT_GE(seqs.size(), 19); ASSERT_GE(seqs.size(), 19);
ASSERT_LE(seqs.size(), 21); ASSERT_LE(seqs.size(), 21);
SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber(); SequenceNumber seq_end = dbfull()->GetLatestSequenceNumber();
@ -444,7 +486,8 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) {
ASSERT_LE(seqs.size(), 101); ASSERT_LE(seqs.size(), 101);
for (auto i = start_seq; i < seq_end - 99; i++) { for (auto i = start_seq; i < seq_end - 99; i++) {
// likely the first 100 entries reports 0 // likely the first 100 entries reports 0
ASSERT_LE(tp_mapping.GetOldestApproximateTime(i), (i - start_seq) + 3000); ASSERT_LE(tp_mapping.GetOldestApproximateTime(i),
(i - start_seq) * 100 + 50000);
} }
start_seq += 101; start_seq += 101;
@ -457,9 +500,10 @@ TEST_F(SeqnoTimeTest, BasicSeqnoToTimeMapping) {
ASSERT_OK(db_->Close()); ASSERT_OK(db_->Close());
} }
TEST_F(SeqnoTimeTest, MultiCFs) { TEST_P(SeqnoTimeTablePropTest, MultiCFs) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.preclude_last_level_data_seconds = 0; options.preclude_last_level_data_seconds = 0;
options.preserve_internal_time_seconds = 0;
options.env = mock_env_.get(); options.env = mock_env_.get();
options.stats_dump_period_sec = 0; options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0; options.stats_persist_period_sec = 0;
@ -485,7 +529,7 @@ TEST_F(SeqnoTimeTest, MultiCFs) {
ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty()); ASSERT_TRUE(dbfull()->TEST_GetSeqnoToTimeMapping().Empty());
Options options_1 = options; Options options_1 = options;
options_1.preclude_last_level_data_seconds = 10000; // 10k SetTrackTimeDurationOptions(10000, options_1);
CreateColumnFamilies({"one"}, options_1); CreateColumnFamilies({"one"}, options_1);
ASSERT_TRUE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime)); ASSERT_TRUE(scheduler.TEST_HasTask(PeriodicTaskType::kRecordSeqnoTime));
@ -514,11 +558,11 @@ TEST_F(SeqnoTimeTest, MultiCFs) {
ASSERT_FALSE(tp_mapping.Empty()); ASSERT_FALSE(tp_mapping.Empty());
auto seqs = tp_mapping.TEST_GetInternalMapping(); auto seqs = tp_mapping.TEST_GetInternalMapping();
ASSERT_GE(seqs.size(), 1); ASSERT_GE(seqs.size(), 1);
ASSERT_LE(seqs.size(), 3); ASSERT_LE(seqs.size(), 4);
// Create one more CF with larger preclude_last_level time // Create one more CF with larger preclude_last_level time
Options options_2 = options; Options options_2 = options;
options_2.preclude_last_level_data_seconds = 1000000; // 1m SetTrackTimeDurationOptions(1000000, options_2); // 1m
CreateColumnFamilies({"two"}, options_2); CreateColumnFamilies({"two"}, options_2);
// Add more data to CF "two" to fill the in memory mapping // Add more data to CF "two" to fill the in memory mapping
@ -618,11 +662,11 @@ TEST_F(SeqnoTimeTest, MultiCFs) {
Close(); Close();
} }
TEST_F(SeqnoTimeTest, MultiInstancesBasic) { TEST_P(SeqnoTimeTablePropTest, MultiInstancesBasic) {
const int kInstanceNum = 2; const int kInstanceNum = 2;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.preclude_last_level_data_seconds = 10000; SetTrackTimeDurationOptions(10000, options);
options.env = mock_env_.get(); options.env = mock_env_.get();
options.stats_dump_period_sec = 0; options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0; options.stats_persist_period_sec = 0;
@ -650,17 +694,32 @@ TEST_F(SeqnoTimeTest, MultiInstancesBasic) {
} }
} }
TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) { TEST_P(SeqnoTimeTablePropTest, SeqnoToTimeMappingUniversal) {
const int kNumTrigger = 4;
const int kNumLevels = 7;
const int kNumKeys = 100;
Options options = CurrentOptions(); Options options = CurrentOptions();
SetTrackTimeDurationOptions(10000, options);
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
options.preclude_last_level_data_seconds = 10000; options.num_levels = kNumLevels;
options.env = mock_env_.get(); options.env = mock_env_.get();
DestroyAndReopen(options); DestroyAndReopen(options);
for (int j = 0; j < 3; j++) { std::atomic_uint64_t num_seqno_zeroing{0};
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(i), "value")); SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"CompactionIterator::PrepareOutput:ZeroingSeq",
[&](void* /*arg*/) { num_seqno_zeroing++; });
SyncPoint::GetInstance()->EnableProcessing();
int sst_num = 0;
for (; sst_num < kNumTrigger - 1; sst_num++) {
for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeridicTaskRun( dbfull()->TEST_WaitForPeridicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
} }
@ -681,11 +740,12 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) {
} }
// Trigger a compaction // Trigger a compaction
for (int i = 0; i < 100; i++) { for (int i = 0; i < kNumKeys; i++) {
ASSERT_OK(Put(Key(i), "value")); ASSERT_OK(Put(Key(sst_num * (kNumKeys - 1) + i), "value"));
dbfull()->TEST_WaitForPeridicTaskRun( dbfull()->TEST_WaitForPeridicTaskRun(
[&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); }); [&] { mock_clock_->MockSleepForSeconds(static_cast<int>(10)); });
} }
sst_num++;
ASSERT_OK(Flush()); ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
tables_props.clear(); tables_props.clear();
@ -696,6 +756,73 @@ TEST_F(SeqnoTimeTest, SeqnoToTimeMappingUniversal) {
SeqnoToTimeMapping tp_mapping; SeqnoToTimeMapping tp_mapping;
ASSERT_FALSE(it->second->seqno_to_time_mapping.empty()); ASSERT_FALSE(it->second->seqno_to_time_mapping.empty());
ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping)); ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping));
// compact to the last level
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// make sure the data is all compacted to penultimate level if the feature is
// on, otherwise, compacted to the last level.
if (options.preclude_last_level_data_seconds > 0) {
ASSERT_GT(NumTableFilesAtLevel(5), 0);
ASSERT_EQ(NumTableFilesAtLevel(6), 0);
} else {
ASSERT_EQ(NumTableFilesAtLevel(5), 0);
ASSERT_GT(NumTableFilesAtLevel(6), 0);
}
// regardless the file is on the last level or not, it should keep the time
// information and sequence number are not set
tables_props.clear();
tp_mapping.Clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
ASSERT_EQ(tables_props.size(), 1);
ASSERT_EQ(num_seqno_zeroing, 0);
it = tables_props.begin();
ASSERT_FALSE(it->second->seqno_to_time_mapping.empty());
ASSERT_OK(tp_mapping.Add(it->second->seqno_to_time_mapping));
// make half of the data expired
mock_clock_->MockSleepForSeconds(static_cast<int>(8000));
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
tables_props.clear();
tp_mapping.Clear();
ASSERT_OK(dbfull()->GetPropertiesOfAllTables(&tables_props));
if (options.preclude_last_level_data_seconds > 0) {
ASSERT_EQ(tables_props.size(), 2);
} else {
ASSERT_EQ(tables_props.size(), 1);
}
ASSERT_GT(num_seqno_zeroing, 0);
std::vector<KeyVersion> key_versions;
ASSERT_OK(GetAllKeyVersions(db_, Slice(), Slice(),
std::numeric_limits<size_t>::max(),
&key_versions));
// make sure there're more than 300 keys and first 100 keys are having seqno
// zeroed out, the last 100 key seqno not zeroed out
ASSERT_GT(key_versions.size(), 300);
for (int i = 0; i < 100; i++) {
ASSERT_EQ(key_versions[i].sequence, 0);
}
auto rit = key_versions.rbegin();
for (int i = 0; i < 100; i++) {
ASSERT_GT(rit->sequence, 0);
rit++;
}
// make all data expired and compact again to push it to the last level
// regardless if the tiering feature is enabled or not
mock_clock_->MockSleepForSeconds(static_cast<int>(20000));
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_GT(num_seqno_zeroing, 0);
ASSERT_GT(NumTableFilesAtLevel(6), 0);
Close(); Close();
} }

@ -31,11 +31,11 @@ void SeqnoToTimeMapping::Add(SequenceNumber seqno, uint64_t time) {
seqno_time_mapping_.emplace_back(seqno, time); seqno_time_mapping_.emplace_back(seqno, time);
} }
SequenceNumber SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) { void SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) {
assert(is_sorted_); assert(is_sorted_);
if (max_time_duration_ == 0) { if (max_time_duration_ == 0) {
return 0; return;
} }
const uint64_t cut_off_time = const uint64_t cut_off_time =
@ -48,12 +48,25 @@ SequenceNumber SeqnoToTimeMapping::TruncateOldEntries(const uint64_t now) {
return target < other.time; return target < other.time;
}); });
if (it == seqno_time_mapping_.begin()) { if (it == seqno_time_mapping_.begin()) {
return 0; return;
} }
it--; it--;
seqno_time_mapping_.erase(seqno_time_mapping_.begin(), it); seqno_time_mapping_.erase(seqno_time_mapping_.begin(), it);
}
return seqno_time_mapping_.front().seqno; SequenceNumber SeqnoToTimeMapping::GetOldestSequenceNum(uint64_t time) {
assert(is_sorted_);
auto it = std::upper_bound(
seqno_time_mapping_.begin(), seqno_time_mapping_.end(), time,
[](uint64_t target, const SeqnoTimePair& other) -> bool {
return target < other.time;
});
if (it == seqno_time_mapping_.begin()) {
return 0;
}
it--;
return it->seqno;
} }
// The encoded format is: // The encoded format is:
@ -94,6 +107,10 @@ void SeqnoToTimeMapping::Encode(std::string& dest, const SequenceNumber start,
start_it++; start_it++;
} }
} }
// to include the first element
if (start_it != seqno_time_mapping_.begin()) {
start_it--;
}
// If there are more data than needed, pick the entries for encoding. // If there are more data than needed, pick the entries for encoding.
// It's not the most optimized algorithm for selecting the best representative // It's not the most optimized algorithm for selecting the best representative

@ -107,7 +107,10 @@ class SeqnoToTimeMapping {
uint64_t GetOldestApproximateTime(SequenceNumber seqno) const; uint64_t GetOldestApproximateTime(SequenceNumber seqno) const;
// Truncate the old entries based on the current time and max_time_duration_ // Truncate the old entries based on the current time and max_time_duration_
SequenceNumber TruncateOldEntries(uint64_t now); void TruncateOldEntries(uint64_t now);
// Given a time, return it's oldest possible sequence number
SequenceNumber GetOldestSequenceNum(uint64_t time);
// Encode to a binary string // Encode to a binary string
void Encode(std::string& des, SequenceNumber start, SequenceNumber end, void Encode(std::string& des, SequenceNumber start, SequenceNumber end,

@ -911,8 +911,33 @@ struct AdvancedColumnFamilyOptions {
// size constrained, the size amp is going to be only for non-last levels. // size constrained, the size amp is going to be only for non-last levels.
// //
// Default: 0 (disable the feature) // Default: 0 (disable the feature)
//
// Not dynamically changeable, change it requires db restart.
uint64_t preclude_last_level_data_seconds = 0; uint64_t preclude_last_level_data_seconds = 0;
// EXPERIMENTAL
// If this option is set, it will preserve the internal time information about
// the data until it's older than the specified time here.
// Internally the time information is a map between sequence number and time,
// which is the same as `preclude_last_level_data_seconds`. But it won't
// preclude the data from the last level and the data in the last level won't
// have the sequence number zeroed out.
// Internally, rocksdb would sample the sequence number to time pair and store
// that in SST property "rocksdb.seqno.time.map". The information is currently
// only used for tiered storage compaction (option
// `preclude_last_level_data_seconds`).
//
// Note: if both `preclude_last_level_data_seconds` and this option is set, it
// will preserve the max time of the 2 options and compaction still preclude
// the data based on `preclude_last_level_data_seconds`.
// The higher the preserve_time is, the less the sampling frequency will be (
// which means less accuracy of the time estimation).
//
// Default: 0 (disable the feature)
//
// Not dynamically changeable, change it requires db restart.
uint64_t preserve_internal_time_seconds = 0;
// When set, large values (blobs) are written to separate blob files, and // When set, large values (blobs) are written to separate blob files, and
// only pointers to them are stored in SST files. This can reduce write // only pointers to them are stored in SST files. This can reduce write
// amplification for large-value use cases at the cost of introducing a level // amplification for large-value use cases at the cost of introducing a level

@ -567,6 +567,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableCFOptions, preclude_last_level_data_seconds), {offsetof(struct ImmutableCFOptions, preclude_last_level_data_seconds),
OptionType::kUInt64T, OptionVerificationType::kNormal, OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
{"preserve_internal_time_seconds",
{offsetof(struct ImmutableCFOptions, preserve_internal_time_seconds),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
// Need to keep this around to be able to read old OPTIONS files. // Need to keep this around to be able to read old OPTIONS files.
{"max_mem_compaction_level", {"max_mem_compaction_level",
{0, OptionType::kInt, OptionVerificationType::kDeprecated, {0, OptionType::kInt, OptionVerificationType::kDeprecated,
@ -904,6 +908,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ColumnFamilyOptions& cf_options)
force_consistency_checks(cf_options.force_consistency_checks), force_consistency_checks(cf_options.force_consistency_checks),
preclude_last_level_data_seconds( preclude_last_level_data_seconds(
cf_options.preclude_last_level_data_seconds), cf_options.preclude_last_level_data_seconds),
preserve_internal_time_seconds(cf_options.preserve_internal_time_seconds),
memtable_insert_with_hint_prefix_extractor( memtable_insert_with_hint_prefix_extractor(
cf_options.memtable_insert_with_hint_prefix_extractor), cf_options.memtable_insert_with_hint_prefix_extractor),
cf_paths(cf_options.cf_paths), cf_paths(cf_options.cf_paths),

@ -74,6 +74,8 @@ struct ImmutableCFOptions {
uint64_t preclude_last_level_data_seconds; uint64_t preclude_last_level_data_seconds;
uint64_t preserve_internal_time_seconds;
std::shared_ptr<const SliceTransform> std::shared_ptr<const SliceTransform>
memtable_insert_with_hint_prefix_extractor; memtable_insert_with_hint_prefix_extractor;

@ -94,6 +94,7 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
sample_for_compression(options.sample_for_compression), sample_for_compression(options.sample_for_compression),
preclude_last_level_data_seconds( preclude_last_level_data_seconds(
options.preclude_last_level_data_seconds), options.preclude_last_level_data_seconds),
preserve_internal_time_seconds(options.preserve_internal_time_seconds),
enable_blob_files(options.enable_blob_files), enable_blob_files(options.enable_blob_files),
min_blob_size(options.min_blob_size), min_blob_size(options.min_blob_size),
blob_file_size(options.blob_file_size), blob_file_size(options.blob_file_size),
@ -403,6 +404,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
periodic_compaction_seconds); periodic_compaction_seconds);
ROCKS_LOG_HEADER(log, " Options.preclude_last_level_data_seconds: %" PRIu64, ROCKS_LOG_HEADER(log, " Options.preclude_last_level_data_seconds: %" PRIu64,
preclude_last_level_data_seconds); preclude_last_level_data_seconds);
ROCKS_LOG_HEADER(log, " Options.preserve_internal_time_seconds: %" PRIu64,
preserve_internal_time_seconds);
ROCKS_LOG_HEADER(log, " Options.enable_blob_files: %s", ROCKS_LOG_HEADER(log, " Options.enable_blob_files: %s",
enable_blob_files ? "true" : "false"); enable_blob_files ? "true" : "false");
ROCKS_LOG_HEADER( ROCKS_LOG_HEADER(

@ -314,6 +314,8 @@ void UpdateColumnFamilyOptions(const ImmutableCFOptions& ioptions,
cf_opts->blob_cache = ioptions.blob_cache; cf_opts->blob_cache = ioptions.blob_cache;
cf_opts->preclude_last_level_data_seconds = cf_opts->preclude_last_level_data_seconds =
ioptions.preclude_last_level_data_seconds; ioptions.preclude_last_level_data_seconds;
cf_opts->preserve_internal_time_seconds =
ioptions.preserve_internal_time_seconds;
// TODO(yhchiang): find some way to handle the following derived options // TODO(yhchiang): find some way to handle the following derived options
// * max_file_size // * max_file_size

@ -403,6 +403,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
sizeof(ColumnFamilyOptions::TablePropertiesCollectorFactories)}, sizeof(ColumnFamilyOptions::TablePropertiesCollectorFactories)},
{offsetof(struct ColumnFamilyOptions, preclude_last_level_data_seconds), {offsetof(struct ColumnFamilyOptions, preclude_last_level_data_seconds),
sizeof(uint64_t)}, sizeof(uint64_t)},
{offsetof(struct ColumnFamilyOptions, preserve_internal_time_seconds),
sizeof(uint64_t)},
{offsetof(struct ColumnFamilyOptions, blob_cache), {offsetof(struct ColumnFamilyOptions, blob_cache),
sizeof(std::shared_ptr<Cache>)}, sizeof(std::shared_ptr<Cache>)},
{offsetof(struct ColumnFamilyOptions, comparator), sizeof(Comparator*)}, {offsetof(struct ColumnFamilyOptions, comparator), sizeof(Comparator*)},
@ -532,6 +534,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"bottommost_temperature=kWarm;" "bottommost_temperature=kWarm;"
"last_level_temperature=kWarm;" "last_level_temperature=kWarm;"
"preclude_last_level_data_seconds=86400;" "preclude_last_level_data_seconds=86400;"
"preserve_internal_time_seconds=86400;"
"compaction_options_fifo={max_table_files_size=3;allow_" "compaction_options_fifo={max_table_files_size=3;allow_"
"compaction=false;age_for_warm=1;};" "compaction=false;age_for_warm=1;};"
"blob_cache=1M;" "blob_cache=1M;"

Loading…
Cancel
Save