From e3a96c48230658270c95812aea9ab9878d9cb61f Mon Sep 17 00:00:00 2001 From: Baptiste Lemaire Date: Tue, 10 Aug 2021 18:07:48 -0700 Subject: [PATCH] Memtable sampling for mempurge heuristic. (#8628) Summary: Changes the API of the MemPurge process: the `bool experimental_allow_mempurge` and `experimental_mempurge_policy` flags have been replaced by a `double experimental_mempurge_threshold` option. This change of API reflects another major change introduced in this PR: the MemPurgeDecider() function now works by sampling the memtables being flushed to estimate the overall amount of useful payload (payload minus the garbage), and then compare this useful payload estimate with the `double experimental_mempurge_threshold` value. Therefore, when the value of this flag is `0.0` (default value), mempurge is simply deactivated. On the other hand, a value of `DBL_MAX` would be equivalent to always going through a mempurge regardless of the garbage ratio estimate. At the moment, a `double experimental_mempurge_threshold` value else than 0.0 or `DBL_MAX` is opnly supported`with the `SkipList` memtable representation. Regarding the sampling, this PR includes the introduction of a `MemTable::UniqueRandomSample` function that collects (approximately) random entries from the memtable by using the new `SkipList::Iterator::RandomSeek()` under the hood, or by iterating through each memtable entry, depending on the target sample size and the total number of entries. The unit tests have been readapted to support this new API. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8628 Reviewed By: pdillinger Differential Revision: D30149315 Pulled By: bjlemaire fbshipit-source-id: 1feef5390c95db6f4480ab4434716533d3947f27 --- db/c.cc | 6 +- db/db_flush_test.cc | 20 ++-- db/db_impl/db_impl.cc | 2 +- db/db_impl/db_impl_compaction_flush.cc | 4 +- db/flush_job.cc | 137 ++++++++++++++++++++++--- db/flush_job.h | 6 +- db/memtable.h | 21 ++++ db/memtable_list.h | 6 +- db_stress_tool/db_stress_common.h | 15 +-- db_stress_tool/db_stress_gflags.cc | 8 +- db_stress_tool/db_stress_test_base.cc | 5 +- include/rocksdb/memtablerep.h | 15 +++ include/rocksdb/options.h | 29 +++--- memtable/inlineskiplist.h | 53 ++++++++++ memtable/skiplistrep.cc | 64 ++++++++++++ options/db_options.cc | 27 ++--- options/db_options.h | 3 +- options/options_test.cc | 8 +- tools/db_bench_tool.cc | 26 +---- tools/db_crashtest.py | 3 +- 20 files changed, 338 insertions(+), 120 deletions(-) diff --git a/db/c.cc b/db/c.cc index 8048cb7c9..a3883c9bf 100644 --- a/db/c.cc +++ b/db/c.cc @@ -3029,9 +3029,9 @@ unsigned char rocksdb_options_get_advise_random_on_open( return opt->rep.advise_random_on_open; } -void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt, - unsigned char v) { - opt->rep.experimental_allow_mempurge = v; +void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt, + double v) { + opt->rep.experimental_mempurge_threshold = v; } void rocksdb_options_set_access_hint_on_compaction_start( diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index b3e435472..824dc9e55 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include +#include #include "db/db_impl/db_impl.h" #include "db/db_test_util.h" @@ -694,8 +695,8 @@ TEST_F(DBFlushTest, MemPurgeBasic) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_allow_mempurge = true; - options.experimental_mempurge_policy = MemPurgePolicy::kAlways; + options.experimental_mempurge_threshold = + 1.0; // std::numeric_limits::max(); ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; uint32_t sst_count = 0; @@ -842,8 +843,8 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_allow_mempurge = true; - options.experimental_mempurge_policy = MemPurgePolicy::kAlways; + options.experimental_mempurge_threshold = + 1.0; // std::numeric_limits::max(); ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; @@ -1046,8 +1047,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) { // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes). options.write_buffer_size = 1 << 20; // Activate the MemPurge prototype. - options.experimental_allow_mempurge = true; - options.experimental_mempurge_policy = MemPurgePolicy::kAlways; + options.experimental_mempurge_threshold = + 1.0; // std::numeric_limits::max(); ASSERT_OK(TryReopen(options)); uint32_t mempurge_count = 0; @@ -1122,8 +1123,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) { // Enforce size of a single MemTable to 128KB. options.write_buffer_size = 128 << 10; // Activate the MemPurge prototype. - options.experimental_allow_mempurge = true; - options.experimental_mempurge_policy = MemPurgePolicy::kAlways; + options.experimental_mempurge_threshold = + 1.0; // std::numeric_limits::max(); ASSERT_OK(TryReopen(options)); const size_t KVSIZE = 10; @@ -1239,7 +1240,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) { const uint32_t EXPECTED_SST_COUNT = 0; EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); - if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) { + if (options.experimental_mempurge_threshold == + std::numeric_limits::max()) { EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); } diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index e3ca3f52d..e46092ba4 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -558,7 +558,7 @@ Status DBImpl::CloseHelper() { // flushing (but need to implement something // else than imm()->IsFlushPending() because the output // memtables added to imm() dont trigger flushes). - if (immutable_db_options_.experimental_allow_mempurge) { + if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { Status flush_ret; mutex_.Unlock(); for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) { diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 6f5e22258..7ec42c1fd 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2410,7 +2410,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req, // future changes. Therefore, we add the following if // statement - note that calling it twice (or more) // doesn't break anything. - if (immutable_db_options_.experimental_allow_mempurge) { + if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { // If imm() contains silent memtables, // requesting a flush will mark the imm_needed as true. cfd->imm()->FlushRequested(); @@ -2556,7 +2556,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, for (const auto& iter : flush_req) { ColumnFamilyData* cfd = iter.first; - if (immutable_db_options_.experimental_allow_mempurge) { + if (immutable_db_options_.experimental_mempurge_threshold > 0.0) { // If imm() contains silent memtables, // requesting a flush will mark the imm_needed as true. cfd->imm()->FlushRequested(); diff --git a/db/flush_job.cc b/db/flush_job.cc index 3a258a57e..bc4824af6 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -195,7 +195,7 @@ void FlushJob::PickMemTable() { // If mempurge feature is activated, keep track of any potential // memtables coming from a previous mempurge operation. // Used for mempurge policy. - if (db_options_.experimental_allow_mempurge) { + if (db_options_.experimental_mempurge_threshold > 0.0) { contains_mempurge_outcome_ = false; for (MemTable* mt : mems_) { if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) { @@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); } Status mempurge_s = Status::NotFound("No MemPurge."); - if (db_options_.experimental_allow_mempurge && + if ((db_options_.experimental_mempurge_threshold > 0.0) && (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) && (!mems_.empty()) && MemPurgeDecider()) { mempurge_s = MemPurge(); @@ -580,8 +580,6 @@ Status FlushJob::MemPurge() { // This addition will not trigger another flush, because // we do not call SchedulePendingFlush(). cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free); - new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 / - mutable_cf_options_.write_buffer_size; new_mem->Ref(); db_mutex_->Unlock(); } else { @@ -622,16 +620,129 @@ Status FlushJob::MemPurge() { } bool FlushJob::MemPurgeDecider() { - MemPurgePolicy policy = db_options_.experimental_mempurge_policy; - if (policy == MemPurgePolicy::kAlways) { + double threshold = db_options_.experimental_mempurge_threshold; + // Never trigger mempurge if threshold is not a strictly positive value. + if (!(threshold > 0.0)) { + return false; + } + if (threshold > (1.0 * mems_.size())) { return true; - } else if (policy == MemPurgePolicy::kAlternate) { - // Note: if at least one of the flushed memtables is - // an output of a previous mempurge process, then flush - // to storage. - return !(contains_mempurge_outcome_); } - return false; + // Payload and useful_payload (in bytes). + // The useful payload ratio of a given MemTable + // is estimated to be useful_payload/payload. + uint64_t payload = 0, useful_payload = 0; + // If estimated_useful_payload is > threshold, + // then flush to storage, else MemPurge. + double estimated_useful_payload = 0.0; + // Cochran formula for determining sample size. + // 95% confidence interval, 7% precision. + // n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0 + double n0 = 196.0; + ReadOptions ro; + ro.total_order_seek = true; + + // Iterate over each memtable of the set. + for (MemTable* mt : mems_) { + // If the memtable is the output of a previous mempurge, + // its approximate useful payload ratio is already calculated. + if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) { + // We make the assumption that this memtable is already + // free of garbage (garbage underestimation). + estimated_useful_payload += mt->ApproximateMemoryUsage(); + } else { + // Else sample from the table. + uint64_t nentries = mt->num_entries(); + // Corrected Cochran formula for small populations + // (converges to n0 for large populations). + uint64_t target_sample_size = + static_cast(ceil(n0 / (1.0 + (n0 / nentries)))); + std::unordered_set sentries = {}; + // Populate sample entries set. + mt->UniqueRandomSample(target_sample_size, &sentries); + + // Estimate the garbage ratio by comparing if + // each sample corresponds to a valid entry. + for (const char* ss : sentries) { + ParsedInternalKey res; + Slice entry_slice = GetLengthPrefixedSlice(ss); + Status parse_s = + ParseInternalKey(entry_slice, &res, true /*log_err_key*/); + if (!parse_s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Memtable Decider: ParseInternalKey did not parse " + "entry_slice %s" + "successfully.", + entry_slice.data()); + } + LookupKey lkey(res.user_key, kMaxSequenceNumber); + std::string vget; + Status s; + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0, sqno = 0; + + // Pick the oldest existing snapshot that is more recent + // than the sequence number of the sampled entry. + SequenceNumber min_seqno_snapshot = kMaxSequenceNumber; + SnapshotImpl min_snapshot; + for (SequenceNumber seq_num : existing_snapshots_) { + if (seq_num > res.sequence && seq_num < min_seqno_snapshot) { + min_seqno_snapshot = seq_num; + } + } + min_snapshot.number_ = min_seqno_snapshot; + ro.snapshot = + min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr; + + // Estimate if the sample entry is valid or not. + bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context, + &max_covering_tombstone_seq, &sqno, ro); + if (!gres) { + ROCKS_LOG_WARN( + db_options_.info_log, + "Memtable Get returned false when Get(sampled entry). " + "Yet each sample entry should exist somewhere in the memtable, " + "unrelated to whether it has been deleted or not."); + } + payload += entry_slice.size(); + + // TODO(bjlemaire): evaluate typeMerge. + // This is where the sampled entry is estimated to be + // garbage or not. Note that this is a garbage *estimation* + // because we do not include certain items such as + // CompactionFitlers triggered at flush, or if the same delete + // has been inserted twice or more in the memtable. + if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) { + useful_payload += entry_slice.size(); + } else if (((res.type == kTypeDeletion) || + (res.type == kTypeSingleDeletion)) && + s.IsNotFound() && gres) { + useful_payload += entry_slice.size(); + } + } + if (payload > 0) { + // We used the estimated useful payload ratio + // to evaluate how much of the total memtable is useful bytes. + estimated_useful_payload += + (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); + ROCKS_LOG_INFO( + db_options_.info_log, + "Mempurge sampling - found garbage ratio from sampling: %f.\n", + (payload - useful_payload) * 1.0 / payload); + } else { + ROCKS_LOG_WARN( + db_options_.info_log, + "Mempurge kSampling policy: null payload measured, and collected " + "sample size is %zu\n.", + sentries.size()); + } + } + } + // We convert the total number of useful paylaod bytes + // into the proportion of memtable necessary to store all these bytes. + // We compare this proportion with the threshold value. + return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) < + threshold; } Status FlushJob::WriteLevel0Table() { @@ -843,7 +954,7 @@ Status FlushJob::WriteLevel0Table() { stats.num_output_files_blob = static_cast(blobs.size()); - if (db_options_.experimental_allow_mempurge && s.ok()) { + if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) { // The db_mutex is held at this point. for (MemTable* mt : mems_) { // Note: if m is not a previous mempurge output memtable, diff --git a/db/flush_job.h b/db/flush_job.h index 694dd71d2..9050657de 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -117,9 +117,9 @@ class FlushJob { // of development. At the moment it is only compatible with the Get, Put, // Delete operations as well as Iterators and CompactionFilters. // For this early version, "MemPurge" is called by setting the - // options.experimental_allow_mempurge flag as "true". When this is + // options.experimental_mempurge_threshold value as >0.0. When this is // the case, ALL automatic flush operations (kWRiteBufferManagerFull) will - // first go through the MemPurge process. herefore, we strongly + // first go through the MemPurge process. Therefore, we strongly // recommend all users not to set this flag as true given that the MemPurge // process has not matured yet. Status MemPurge(); @@ -192,7 +192,7 @@ class FlushJob { const std::string full_history_ts_low_; BlobFileCompletionCallback* blob_callback_; - // Used when experimental_allow_mempurge set to true. + // Used when experimental_mempurge_threshold > 0.0. bool contains_mempurge_outcome_; }; diff --git a/db/memtable.h b/db/memtable.h index 93060941a..e6580379f 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "db/dbformat.h" @@ -145,6 +146,26 @@ class MemTable { return approximate_memory_usage_.load(std::memory_order_relaxed); } + // Returns a vector of unique random memtable entries of size 'sample_size'. + // + // Note: the entries are stored in the unordered_set as length-prefixed keys, + // hence their representation in the set as "const char*". + // Note2: the size of the output set 'entries' is not enforced to be strictly + // equal to 'target_sample_size'. Its final size might be slightly + // greater or slightly less than 'target_sample_size' + // + // REQUIRES: external synchronization to prevent simultaneous + // operations on the same MemTable (unless this Memtable is immutable). + // REQUIRES: SkipList memtable representation. This function is not + // implemented for any other type of memtable representation (vectorrep, + // hashskiplist,...). + void UniqueRandomSample(const uint64_t& target_sample_size, + std::unordered_set* entries) { + // TODO(bjlemaire): at the moment, only supported by skiplistrep. + // Extend it to all other memtable representations. + table_->UniqueRandomSample(num_entries(), target_sample_size, entries); + } + // This method heuristically determines if the memtable should continue to // host more data. bool ShouldScheduleFlush() const { diff --git a/db/memtable_list.h b/db/memtable_list.h index 17a7aa87f..6dde85016 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -390,11 +390,7 @@ class MemTableList { // not freed, but put into a vector for future deref and reclamation. void RemoveOldMemTables(uint64_t log_number, autovector* to_delete); - void AddMemPurgeOutputID(uint64_t mid) { - if (mempurged_ids_.find(mid) == mempurged_ids_.end()) { - mempurged_ids_.insert(mid); - } - } + void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); } void RemoveMemPurgeOutputID(uint64_t mid) { if (mempurged_ids_.find(mid) != mempurged_ids_.end()) { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index ec3aa212e..5db089b16 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -141,8 +141,7 @@ DECLARE_uint64(subcompactions); DECLARE_uint64(periodic_compaction_seconds); DECLARE_uint64(compaction_ttl); DECLARE_bool(allow_concurrent_memtable_write); -DECLARE_bool(experimental_allow_mempurge); -DECLARE_string(experimental_mempurge_policy); +DECLARE_double(experimental_mempurge_threshold); DECLARE_bool(enable_write_thread_adaptive_yield); DECLARE_int32(reopen); DECLARE_double(bloom_bits); @@ -341,18 +340,6 @@ inline enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( return ret_compression_type; } -inline enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy( - const char* mpolicy) { - assert(mpolicy); - if (!strcasecmp(mpolicy, "kAlways")) { - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways; - } else if (!strcasecmp(mpolicy, "kAlternate")) { - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; - } - fprintf(stderr, "Cannot parse mempurge policy: '%s'\n", mpolicy); - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; -} - inline enum ROCKSDB_NAMESPACE::ChecksumType StringToChecksumType( const char* ctype) { assert(ctype); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index f1589d78c..adb44084a 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -326,11 +326,9 @@ DEFINE_uint64(compaction_ttl, 1000, DEFINE_bool(allow_concurrent_memtable_write, false, "Allow multi-writers to update mem tables in parallel."); -DEFINE_bool(experimental_allow_mempurge, false, - "Allow mempurge process to collect memtable garbage bytes."); - -DEFINE_string(experimental_mempurge_policy, "kAlternate", - "Set mempurge (MemTable Garbage Collection) policy."); +DEFINE_double(experimental_mempurge_threshold, 0.0, + "Maximum estimated useful payload that triggers a " + "mempurge process to collect memtable garbage bytes."); DEFINE_bool(enable_write_thread_adaptive_yield, true, "Use a yielding spin loop for brief writer thread waits."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index df4252398..9d9320dde 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -2266,9 +2266,8 @@ void StressTest::Open() { options_.max_subcompactions = static_cast(FLAGS_subcompactions); options_.allow_concurrent_memtable_write = FLAGS_allow_concurrent_memtable_write; - options_.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge; - options_.experimental_mempurge_policy = - StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str()); + options_.experimental_mempurge_threshold = + FLAGS_experimental_mempurge_threshold; options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds; options_.ttl = FLAGS_compaction_ttl; options_.enable_pipelined_write = FLAGS_enable_pipelined_write; diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index b8701135d..6974831fd 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -38,8 +38,10 @@ #include #include #include + #include #include +#include namespace ROCKSDB_NAMESPACE { @@ -194,6 +196,17 @@ class MemTableRep { return 0; } + // Returns a vector of unique random memtable entries of approximate + // size 'target_sample_size' (this size is not strictly enforced). + virtual void UniqueRandomSample(const uint64_t& num_entries, + const uint64_t& target_sample_size, + std::unordered_set* entries) { + (void)num_entries; + (void)target_sample_size; + (void)entries; + assert(false); + } + // Report an approximation of how much memory has been used other than memory // that was allocated through the allocator. Safe to call from any thread. virtual size_t ApproximateMemoryUsage() = 0; @@ -230,6 +243,8 @@ class MemTableRep { virtual void SeekForPrev(const Slice& internal_key, const char* memtable_key) = 0; + virtual void RandomSeek() {} + // Position at the first entry in collection. // Final state of iterator is Valid() iff collection is not empty. virtual void SeekToFirst() = 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index d7ecb5b3d..95a9ff46f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -369,11 +369,6 @@ struct DbPath { extern const char* kHostnameForDbHostId; -enum class MemPurgePolicy : char { - kAlternate = 0x00, - kAlways = 0x01, -}; - enum class CompactionServiceJobStatus : char { kSuccess, kFailure, @@ -787,14 +782,22 @@ struct DBOptions { // Default: true bool advise_random_on_open = true; - // If true, allows for memtable purge instead of flush to storage. - // (experimental). - bool experimental_allow_mempurge = false; - // If experimental_allow_mempurge is true, will dictate MemPurge - // policy. - // Default: kAlternate - // (experimental). - MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate; + // [experimental] + // Used to activate or deactive the Mempurge feature (memtable garbage + // collection). (deactivated by default). At every flush, the total useful + // payload (total entries minus garbage entries) is estimated as a ratio + // [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then + // compared to this `threshold` value: + // - if ratio1.0 : aggressive mempurge. + // 0 < threshold < 1.0: mempurge triggered only for very low useful payload + // ratios. + // [experimental] + double experimental_mempurge_threshold = 0.0; // Amount of data to build up in memtables across all column // families before writing to disk. diff --git a/memtable/inlineskiplist.h b/memtable/inlineskiplist.h index 1782288f0..028fde3a2 100644 --- a/memtable/inlineskiplist.h +++ b/memtable/inlineskiplist.h @@ -177,6 +177,9 @@ class InlineSkipList { // Retreat to the last entry with a key <= target void SeekForPrev(const char* target); + // Advance to a random entry in the list. + void RandomSeek(); + // Position at the first entry in list. // Final state of iterator is Valid() iff list is not empty. void SeekToFirst(); @@ -252,6 +255,9 @@ class InlineSkipList { // Return head_ if list is empty. Node* FindLast() const; + // Returns a random entry. + Node* FindRandomEntry() const; + // Traverses a single level of the list, setting *out_prev to the last // node before the key and *out_next to the first node after. Assumes // that the key is not present in the skip list. On entry, before should @@ -412,6 +418,11 @@ inline void InlineSkipList::Iterator::SeekForPrev( } } +template +inline void InlineSkipList::Iterator::RandomSeek() { + node_ = list_->FindRandomEntry(); +} + template inline void InlineSkipList::Iterator::SeekToFirst() { node_ = list_->head_->Next(0); @@ -558,6 +569,48 @@ InlineSkipList::FindLast() const { } } +template +typename InlineSkipList::Node* +InlineSkipList::FindRandomEntry() const { + // TODO(bjlemaire): consider adding PREFETCH calls. + Node *x = head_, *scan_node = nullptr, *limit_node = nullptr; + + // We start at the max level. + // FOr each level, we look at all the nodes at the level, and + // we randomly pick one of them. Then decrement the level + // and reiterate the process. + // eg: assume GetMaxHeight()=5, and there are #100 elements (nodes). + // level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15. + // We will consider all the nodes between #15 (inclusive) and #67 + // (exclusive). #67 is called 'limit_node' here. + // level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose + // #51. #67 remains 'limit_node'. + // [...] + // level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57. + // Return Node #57. + std::vector lvl_nodes; + Random* rnd = Random::GetTLSInstance(); + int level = GetMaxHeight() - 1; + + while (level >= 0) { + lvl_nodes.clear(); + scan_node = x; + while (scan_node != limit_node) { + lvl_nodes.push_back(scan_node); + scan_node = scan_node->Next(level); + } + uint32_t rnd_idx = rnd->Next() % lvl_nodes.size(); + x = lvl_nodes[rnd_idx]; + if (rnd_idx + 1 < lvl_nodes.size()) { + limit_node = lvl_nodes[rnd_idx + 1]; + } + level--; + } + // There is a special case where x could still be the head_ + // (note that the head_ contains no key). + return x == head_ ? head_->Next(0) : x; +} + template uint64_t InlineSkipList::EstimateCount(const char* key) const { uint64_t count = 0; diff --git a/memtable/skiplistrep.cc b/memtable/skiplistrep.cc index eec15626c..abe7144ab 100644 --- a/memtable/skiplistrep.cc +++ b/memtable/skiplistrep.cc @@ -3,6 +3,8 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // +#include + #include "db/memtable.h" #include "memory/arena.h" #include "memtable/inlineskiplist.h" @@ -95,6 +97,66 @@ public: return (end_count >= start_count) ? (end_count - start_count) : 0; } + void UniqueRandomSample(const uint64_t& num_entries, + const uint64_t& target_sample_size, + std::unordered_set* entries) override { + entries->clear(); + // Avoid divide-by-0. + assert(target_sample_size > 0); + assert(num_entries > 0); + // NOTE: the size of entries is not enforced to be exactly + // target_sample_size at the end of this function, it might be slightly + // greater or smaller. + SkipListRep::Iterator iter(&skip_list_); + // There are two methods to create the subset of samples (size m) + // from the table containing N elements: + // 1-Iterate linearly through the N memtable entries. For each entry i, + // add it to the sample set with a probability + // (target_sample_size - entries.size() ) / (N-i). + // + // 2-Pick m random elements without repetition. + // We pick Option 2 when m sqrt(N). + if (target_sample_size > + static_cast(std::sqrt(1.0 * num_entries))) { + Random* rnd = Random::GetTLSInstance(); + iter.SeekToFirst(); + uint64_t counter = 0, num_samples_left = target_sample_size; + for (; iter.Valid() && (num_samples_left > 0); iter.Next(), counter++) { + // Add entry to sample set with probability + // num_samples_left/(num_entries - counter). + if (rnd->Next() % (num_entries - counter) < num_samples_left) { + entries->insert(iter.key()); + num_samples_left--; + } + } + } else { + // Option 2: pick m random elements with no duplicates. + // If Option 2 is picked, then target_sample_size99.9% for N>4. + // At worst, for the final pick , when m=sqrt(N) there is + // a probability of p= 1/sqrt(N) chances to find a duplicate. + for (uint64_t j = 0; j < 5; j++) { + iter.RandomSeek(); + // unordered_set::insert returns pair. + // The second element is true if an insert successfully happened. + // If element is already in the set, this bool will be false, and + // true otherwise. + if ((entries->insert(iter.key())).second) { + break; + } + } + } + } + } + ~SkipListRep() override {} // Iteration over the contents of a skip list @@ -143,6 +205,8 @@ public: } } + void RandomSeek() override { iter_.RandomSeek(); } + // Position at the first entry in list. // Final state of iterator is Valid() iff list is not empty. void SeekToFirst() override { iter_.SeekToFirst(); } diff --git a/options/db_options.cc b/options/db_options.cc index 743a0a9e1..3e2c8bf13 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -48,11 +48,6 @@ static std::unordered_map info_log_level_string_map = {"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL}, {"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}}; -static std::unordered_map - experimental_mempurge_policy_string_map = { - {"kAlternate", MemPurgePolicy::kAlternate}, - {"kAlways", MemPurgePolicy::kAlways}}; - static std::unordered_map db_mutable_options_type_info = { {"allow_os_buffer", @@ -197,14 +192,10 @@ static std::unordered_map {offsetof(struct ImmutableDBOptions, error_if_exists), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, - {"experimental_allow_mempurge", - {offsetof(struct ImmutableDBOptions, experimental_allow_mempurge), - OptionType::kBoolean, OptionVerificationType::kNormal, + {"experimental_mempurge_threshold", + {offsetof(struct ImmutableDBOptions, experimental_mempurge_threshold), + OptionType::kDouble, OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, - {"experimental_mempurge_policy", - OptionTypeInfo::Enum( - offsetof(struct ImmutableDBOptions, experimental_mempurge_policy), - &experimental_mempurge_policy_string_map)}, {"is_fd_close_on_exec", {offsetof(struct ImmutableDBOptions, is_fd_close_on_exec), OptionType::kBoolean, OptionVerificationType::kNormal, @@ -615,8 +606,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), - experimental_allow_mempurge(options.experimental_allow_mempurge), - experimental_mempurge_policy(options.experimental_mempurge_policy), + experimental_mempurge_threshold(options.experimental_mempurge_threshold), db_write_buffer_size(options.db_write_buffer_size), write_buffer_manager(options.write_buffer_manager), access_hint_on_compaction_start(options.access_hint_on_compaction_start), @@ -750,12 +740,9 @@ void ImmutableDBOptions::Dump(Logger* log) const { is_fd_close_on_exec); ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d", advise_random_on_open); - ROCKS_LOG_HEADER(log, - " Options.experimental_allow_mempurge: %d", - experimental_allow_mempurge); - ROCKS_LOG_HEADER(log, - " Options.experimental_mempurge_policy: %d", - static_cast(experimental_mempurge_policy)); + ROCKS_LOG_HEADER( + log, " Options.experimental_mempurge_threshold: %f", + experimental_mempurge_threshold); ROCKS_LOG_HEADER( log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt, db_write_buffer_size); diff --git a/options/db_options.h b/options/db_options.h index 50ec521f0..d2b056802 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -57,8 +57,7 @@ struct ImmutableDBOptions { bool allow_fallocate; bool is_fd_close_on_exec; bool advise_random_on_open; - bool experimental_allow_mempurge; - MemPurgePolicy experimental_mempurge_policy; + double experimental_mempurge_threshold; size_t db_write_buffer_size; std::shared_ptr write_buffer_manager; DBOptions::AccessHint access_hint_on_compaction_start; diff --git a/options/options_test.cc b/options/options_test.cc index 3905a9577..73cbedd6b 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -143,7 +143,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, - {"experimental_allow_mempurge", "false"}, + {"experimental_mempurge_threshold", "0.0"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, @@ -302,7 +302,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); - ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false); + ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); @@ -2047,7 +2047,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { {"persist_stats_to_disk", "false"}, {"stats_history_buffer_size", "69"}, {"advise_random_on_open", "true"}, - {"experimental_allow_mempurge", "false"}, + {"experimental_mempurge_threshold", "0.0"}, {"use_adaptive_mutex", "false"}, {"new_table_reader_for_compaction_inputs", "true"}, {"compaction_readahead_size", "100"}, @@ -2200,7 +2200,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.persist_stats_to_disk, false); ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U); ASSERT_EQ(new_db_opt.advise_random_on_open, true); - ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false); + ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0); ASSERT_EQ(new_db_opt.use_adaptive_mutex, false); ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true); ASSERT_EQ(new_db_opt.compaction_readahead_size, 100); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 17be134cb..c7c254735 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1039,19 +1039,6 @@ static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( return ROCKSDB_NAMESPACE::kSnappyCompression; // default value } -static enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy( - const char* mpolicy) { - assert(mpolicy); - if (!strcasecmp(mpolicy, "kAlways")) { - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways; - } else if (!strcasecmp(mpolicy, "kAlternate")) { - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; - } - - fprintf(stdout, "Cannot parse mempurge policy '%s'\n", mpolicy); - return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate; -} - static std::string ColumnFamilyName(size_t i) { if (i == 0) { return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName; @@ -1186,11 +1173,9 @@ DEFINE_bool( DEFINE_bool(allow_concurrent_memtable_write, true, "Allow multi-writers to update mem tables in parallel."); -DEFINE_bool(experimental_allow_mempurge, false, - "Allow memtable garbage collection."); - -DEFINE_string(experimental_mempurge_policy, "kAlternate", - "Specify memtable garbage collection policy."); +DEFINE_double(experimental_mempurge_threshold, 0.0, + "Maximum useful payload ratio estimate that triggers a mempurge " + "(memtable garbage collection)."); DEFINE_bool(inplace_update_support, ROCKSDB_NAMESPACE::Options().inplace_update_support, @@ -4275,9 +4260,8 @@ class Benchmark { options.delayed_write_rate = FLAGS_delayed_write_rate; options.allow_concurrent_memtable_write = FLAGS_allow_concurrent_memtable_write; - options.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge; - options.experimental_mempurge_policy = - StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str()); + options.experimental_mempurge_threshold = + FLAGS_experimental_mempurge_threshold; options.inplace_update_support = FLAGS_inplace_update_support; options.inplace_update_num_locks = FLAGS_inplace_update_num_locks; options.enable_write_thread_adaptive_yield = diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 3215bec9f..a133f3529 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -220,8 +220,7 @@ whitebox_default_params = { simple_default_params = { "allow_concurrent_memtable_write": lambda: random.randint(0, 1), "column_families": 1, - "experimental_allow_mempurge": lambda: random.randint(0, 1), - "experimental_mempurge_policy": lambda: random.choice(["kAlways", "kAlternate"]), + "experimental_mempurge_threshold": lambda: 10.0*random.random(), "max_background_compactions": 1, "max_bytes_for_level_base": 67108864, "memtablerep": "skip_list",