Memtable sampling for mempurge heuristic. (#8628)

Summary:
Changes the API of the MemPurge process: the `bool experimental_allow_mempurge` and `experimental_mempurge_policy` flags have been replaced by a `double experimental_mempurge_threshold` option.
This change of API reflects another major change introduced in this PR: the MemPurgeDecider() function now works by sampling the memtables being flushed to estimate the overall amount of useful payload (payload minus the garbage), and then compare this useful payload estimate with the `double experimental_mempurge_threshold` value.
Therefore, when the value of this flag is `0.0` (default value), mempurge is simply deactivated. On the other hand, a value of `DBL_MAX` would be equivalent to always going through a mempurge regardless of the garbage ratio estimate.
At the moment, a `double experimental_mempurge_threshold` value else than 0.0 or `DBL_MAX` is opnly supported`with the `SkipList` memtable representation.
Regarding the sampling, this PR includes the introduction of a `MemTable::UniqueRandomSample` function that collects (approximately) random entries from the memtable by using the new `SkipList::Iterator::RandomSeek()` under the hood, or by iterating through each memtable entry, depending on the target sample size and the total number of entries.
The unit tests have been readapted to support this new API.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8628

Reviewed By: pdillinger

Differential Revision: D30149315

Pulled By: bjlemaire

fbshipit-source-id: 1feef5390c95db6f4480ab4434716533d3947f27
main
Baptiste Lemaire 4 years ago committed by Facebook GitHub Bot
parent f63331ebaf
commit e3a96c4823
  1. 6
      db/c.cc
  2. 20
      db/db_flush_test.cc
  3. 2
      db/db_impl/db_impl.cc
  4. 4
      db/db_impl/db_impl_compaction_flush.cc
  5. 137
      db/flush_job.cc
  6. 6
      db/flush_job.h
  7. 21
      db/memtable.h
  8. 6
      db/memtable_list.h
  9. 15
      db_stress_tool/db_stress_common.h
  10. 8
      db_stress_tool/db_stress_gflags.cc
  11. 5
      db_stress_tool/db_stress_test_base.cc
  12. 15
      include/rocksdb/memtablerep.h
  13. 29
      include/rocksdb/options.h
  14. 53
      memtable/inlineskiplist.h
  15. 64
      memtable/skiplistrep.cc
  16. 27
      options/db_options.cc
  17. 3
      options/db_options.h
  18. 8
      options/options_test.cc
  19. 26
      tools/db_bench_tool.cc
  20. 3
      tools/db_crashtest.py

@ -3029,9 +3029,9 @@ unsigned char rocksdb_options_get_advise_random_on_open(
return opt->rep.advise_random_on_open;
}
void rocksdb_options_set_experimental_allow_mempurge(rocksdb_options_t* opt,
unsigned char v) {
opt->rep.experimental_allow_mempurge = v;
void rocksdb_options_set_experimental_mempurge_threshold(rocksdb_options_t* opt,
double v) {
opt->rep.experimental_mempurge_threshold = v;
}
void rocksdb_options_set_access_hint_on_compaction_start(

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <atomic>
#include <limits>
#include "db/db_impl/db_impl.h"
#include "db/db_test_util.h"
@ -694,8 +695,8 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
uint32_t sst_count = 0;
@ -842,8 +843,8 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
@ -1046,8 +1047,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
// Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
options.write_buffer_size = 1 << 20;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0;
@ -1122,8 +1123,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
// Enforce size of a single MemTable to 128KB.
options.write_buffer_size = 128 << 10;
// Activate the MemPurge prototype.
options.experimental_allow_mempurge = true;
options.experimental_mempurge_policy = MemPurgePolicy::kAlways;
options.experimental_mempurge_threshold =
1.0; // std::numeric_limits<double>::max();
ASSERT_OK(TryReopen(options));
const size_t KVSIZE = 10;
@ -1239,7 +1240,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT);
if (options.experimental_mempurge_policy == MemPurgePolicy::kAlways) {
if (options.experimental_mempurge_threshold ==
std::numeric_limits<double>::max()) {
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT);
}

@ -558,7 +558,7 @@ Status DBImpl::CloseHelper() {
// flushing (but need to implement something
// else than imm()->IsFlushPending() because the output
// memtables added to imm() dont trigger flushes).
if (immutable_db_options_.experimental_allow_mempurge) {
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
Status flush_ret;
mutex_.Unlock();
for (ColumnFamilyData* cf : *versions_->GetColumnFamilySet()) {

@ -2410,7 +2410,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req,
// future changes. Therefore, we add the following if
// statement - note that calling it twice (or more)
// doesn't break anything.
if (immutable_db_options_.experimental_allow_mempurge) {
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
// If imm() contains silent memtables,
// requesting a flush will mark the imm_needed as true.
cfd->imm()->FlushRequested();
@ -2556,7 +2556,7 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
for (const auto& iter : flush_req) {
ColumnFamilyData* cfd = iter.first;
if (immutable_db_options_.experimental_allow_mempurge) {
if (immutable_db_options_.experimental_mempurge_threshold > 0.0) {
// If imm() contains silent memtables,
// requesting a flush will mark the imm_needed as true.
cfd->imm()->FlushRequested();

@ -195,7 +195,7 @@ void FlushJob::PickMemTable() {
// If mempurge feature is activated, keep track of any potential
// memtables coming from a previous mempurge operation.
// Used for mempurge policy.
if (db_options_.experimental_allow_mempurge) {
if (db_options_.experimental_mempurge_threshold > 0.0) {
contains_mempurge_outcome_ = false;
for (MemTable* mt : mems_) {
if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
@ -241,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
Status mempurge_s = Status::NotFound("No MemPurge.");
if (db_options_.experimental_allow_mempurge &&
if ((db_options_.experimental_mempurge_threshold > 0.0) &&
(cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
(!mems_.empty()) && MemPurgeDecider()) {
mempurge_s = MemPurge();
@ -580,8 +580,6 @@ Status FlushJob::MemPurge() {
// This addition will not trigger another flush, because
// we do not call SchedulePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
new_mem_capacity = (new_mem->ApproximateMemoryUsage()) * 1.0 /
mutable_cf_options_.write_buffer_size;
new_mem->Ref();
db_mutex_->Unlock();
} else {
@ -622,16 +620,129 @@ Status FlushJob::MemPurge() {
}
bool FlushJob::MemPurgeDecider() {
MemPurgePolicy policy = db_options_.experimental_mempurge_policy;
if (policy == MemPurgePolicy::kAlways) {
double threshold = db_options_.experimental_mempurge_threshold;
// Never trigger mempurge if threshold is not a strictly positive value.
if (!(threshold > 0.0)) {
return false;
}
if (threshold > (1.0 * mems_.size())) {
return true;
} else if (policy == MemPurgePolicy::kAlternate) {
// Note: if at least one of the flushed memtables is
// an output of a previous mempurge process, then flush
// to storage.
return !(contains_mempurge_outcome_);
}
return false;
// Payload and useful_payload (in bytes).
// The useful payload ratio of a given MemTable
// is estimated to be useful_payload/payload.
uint64_t payload = 0, useful_payload = 0;
// If estimated_useful_payload is > threshold,
// then flush to storage, else MemPurge.
double estimated_useful_payload = 0.0;
// Cochran formula for determining sample size.
// 95% confidence interval, 7% precision.
// n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
double n0 = 196.0;
ReadOptions ro;
ro.total_order_seek = true;
// Iterate over each memtable of the set.
for (MemTable* mt : mems_) {
// If the memtable is the output of a previous mempurge,
// its approximate useful payload ratio is already calculated.
if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
// We make the assumption that this memtable is already
// free of garbage (garbage underestimation).
estimated_useful_payload += mt->ApproximateMemoryUsage();
} else {
// Else sample from the table.
uint64_t nentries = mt->num_entries();
// Corrected Cochran formula for small populations
// (converges to n0 for large populations).
uint64_t target_sample_size =
static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
std::unordered_set<const char*> sentries = {};
// Populate sample entries set.
mt->UniqueRandomSample(target_sample_size, &sentries);
// Estimate the garbage ratio by comparing if
// each sample corresponds to a valid entry.
for (const char* ss : sentries) {
ParsedInternalKey res;
Slice entry_slice = GetLengthPrefixedSlice(ss);
Status parse_s =
ParseInternalKey(entry_slice, &res, true /*log_err_key*/);
if (!parse_s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Memtable Decider: ParseInternalKey did not parse "
"entry_slice %s"
"successfully.",
entry_slice.data());
}
LookupKey lkey(res.user_key, kMaxSequenceNumber);
std::string vget;
Status s;
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0, sqno = 0;
// Pick the oldest existing snapshot that is more recent
// than the sequence number of the sampled entry.
SequenceNumber min_seqno_snapshot = kMaxSequenceNumber;
SnapshotImpl min_snapshot;
for (SequenceNumber seq_num : existing_snapshots_) {
if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
min_seqno_snapshot = seq_num;
}
}
min_snapshot.number_ = min_seqno_snapshot;
ro.snapshot =
min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
// Estimate if the sample entry is valid or not.
bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, &sqno, ro);
if (!gres) {
ROCKS_LOG_WARN(
db_options_.info_log,
"Memtable Get returned false when Get(sampled entry). "
"Yet each sample entry should exist somewhere in the memtable, "
"unrelated to whether it has been deleted or not.");
}
payload += entry_slice.size();
// TODO(bjlemaire): evaluate typeMerge.
// This is where the sampled entry is estimated to be
// garbage or not. Note that this is a garbage *estimation*
// because we do not include certain items such as
// CompactionFitlers triggered at flush, or if the same delete
// has been inserted twice or more in the memtable.
if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) {
useful_payload += entry_slice.size();
} else if (((res.type == kTypeDeletion) ||
(res.type == kTypeSingleDeletion)) &&
s.IsNotFound() && gres) {
useful_payload += entry_slice.size();
}
}
if (payload > 0) {
// We used the estimated useful payload ratio
// to evaluate how much of the total memtable is useful bytes.
estimated_useful_payload +=
(mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
ROCKS_LOG_INFO(
db_options_.info_log,
"Mempurge sampling - found garbage ratio from sampling: %f.\n",
(payload - useful_payload) * 1.0 / payload);
} else {
ROCKS_LOG_WARN(
db_options_.info_log,
"Mempurge kSampling policy: null payload measured, and collected "
"sample size is %zu\n.",
sentries.size());
}
}
}
// We convert the total number of useful paylaod bytes
// into the proportion of memtable necessary to store all these bytes.
// We compare this proportion with the threshold value.
return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
threshold;
}
Status FlushJob::WriteLevel0Table() {
@ -843,7 +954,7 @@ Status FlushJob::WriteLevel0Table() {
stats.num_output_files_blob = static_cast<int>(blobs.size());
if (db_options_.experimental_allow_mempurge && s.ok()) {
if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) {
// The db_mutex is held at this point.
for (MemTable* mt : mems_) {
// Note: if m is not a previous mempurge output memtable,

@ -117,9 +117,9 @@ class FlushJob {
// of development. At the moment it is only compatible with the Get, Put,
// Delete operations as well as Iterators and CompactionFilters.
// For this early version, "MemPurge" is called by setting the
// options.experimental_allow_mempurge flag as "true". When this is
// options.experimental_mempurge_threshold value as >0.0. When this is
// the case, ALL automatic flush operations (kWRiteBufferManagerFull) will
// first go through the MemPurge process. herefore, we strongly
// first go through the MemPurge process. Therefore, we strongly
// recommend all users not to set this flag as true given that the MemPurge
// process has not matured yet.
Status MemPurge();
@ -192,7 +192,7 @@ class FlushJob {
const std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_;
// Used when experimental_allow_mempurge set to true.
// Used when experimental_mempurge_threshold > 0.0.
bool contains_mempurge_outcome_;
};

@ -14,6 +14,7 @@
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "db/dbformat.h"
@ -145,6 +146,26 @@ class MemTable {
return approximate_memory_usage_.load(std::memory_order_relaxed);
}
// Returns a vector of unique random memtable entries of size 'sample_size'.
//
// Note: the entries are stored in the unordered_set as length-prefixed keys,
// hence their representation in the set as "const char*".
// Note2: the size of the output set 'entries' is not enforced to be strictly
// equal to 'target_sample_size'. Its final size might be slightly
// greater or slightly less than 'target_sample_size'
//
// REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable (unless this Memtable is immutable).
// REQUIRES: SkipList memtable representation. This function is not
// implemented for any other type of memtable representation (vectorrep,
// hashskiplist,...).
void UniqueRandomSample(const uint64_t& target_sample_size,
std::unordered_set<const char*>* entries) {
// TODO(bjlemaire): at the moment, only supported by skiplistrep.
// Extend it to all other memtable representations.
table_->UniqueRandomSample(num_entries(), target_sample_size, entries);
}
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldScheduleFlush() const {

@ -390,11 +390,7 @@ class MemTableList {
// not freed, but put into a vector for future deref and reclamation.
void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete);
void AddMemPurgeOutputID(uint64_t mid) {
if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
mempurged_ids_.insert(mid);
}
}
void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); }
void RemoveMemPurgeOutputID(uint64_t mid) {
if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {

@ -141,8 +141,7 @@ DECLARE_uint64(subcompactions);
DECLARE_uint64(periodic_compaction_seconds);
DECLARE_uint64(compaction_ttl);
DECLARE_bool(allow_concurrent_memtable_write);
DECLARE_bool(experimental_allow_mempurge);
DECLARE_string(experimental_mempurge_policy);
DECLARE_double(experimental_mempurge_threshold);
DECLARE_bool(enable_write_thread_adaptive_yield);
DECLARE_int32(reopen);
DECLARE_double(bloom_bits);
@ -341,18 +340,6 @@ inline enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
return ret_compression_type;
}
inline enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
const char* mpolicy) {
assert(mpolicy);
if (!strcasecmp(mpolicy, "kAlways")) {
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
} else if (!strcasecmp(mpolicy, "kAlternate")) {
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
}
fprintf(stderr, "Cannot parse mempurge policy: '%s'\n", mpolicy);
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
}
inline enum ROCKSDB_NAMESPACE::ChecksumType StringToChecksumType(
const char* ctype) {
assert(ctype);

@ -326,11 +326,9 @@ DEFINE_uint64(compaction_ttl, 1000,
DEFINE_bool(allow_concurrent_memtable_write, false,
"Allow multi-writers to update mem tables in parallel.");
DEFINE_bool(experimental_allow_mempurge, false,
"Allow mempurge process to collect memtable garbage bytes.");
DEFINE_string(experimental_mempurge_policy, "kAlternate",
"Set mempurge (MemTable Garbage Collection) policy.");
DEFINE_double(experimental_mempurge_threshold, 0.0,
"Maximum estimated useful payload that triggers a "
"mempurge process to collect memtable garbage bytes.");
DEFINE_bool(enable_write_thread_adaptive_yield, true,
"Use a yielding spin loop for brief writer thread waits.");

@ -2266,9 +2266,8 @@ void StressTest::Open() {
options_.max_subcompactions = static_cast<uint32_t>(FLAGS_subcompactions);
options_.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
options_.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge;
options_.experimental_mempurge_policy =
StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str());
options_.experimental_mempurge_threshold =
FLAGS_experimental_mempurge_threshold;
options_.periodic_compaction_seconds = FLAGS_periodic_compaction_seconds;
options_.ttl = FLAGS_compaction_ttl;
options_.enable_pipelined_write = FLAGS_enable_pipelined_write;

@ -38,8 +38,10 @@
#include <rocksdb/slice.h>
#include <stdint.h>
#include <stdlib.h>
#include <memory>
#include <stdexcept>
#include <unordered_set>
namespace ROCKSDB_NAMESPACE {
@ -194,6 +196,17 @@ class MemTableRep {
return 0;
}
// Returns a vector of unique random memtable entries of approximate
// size 'target_sample_size' (this size is not strictly enforced).
virtual void UniqueRandomSample(const uint64_t& num_entries,
const uint64_t& target_sample_size,
std::unordered_set<const char*>* entries) {
(void)num_entries;
(void)target_sample_size;
(void)entries;
assert(false);
}
// Report an approximation of how much memory has been used other than memory
// that was allocated through the allocator. Safe to call from any thread.
virtual size_t ApproximateMemoryUsage() = 0;
@ -230,6 +243,8 @@ class MemTableRep {
virtual void SeekForPrev(const Slice& internal_key,
const char* memtable_key) = 0;
virtual void RandomSeek() {}
// Position at the first entry in collection.
// Final state of iterator is Valid() iff collection is not empty.
virtual void SeekToFirst() = 0;

@ -369,11 +369,6 @@ struct DbPath {
extern const char* kHostnameForDbHostId;
enum class MemPurgePolicy : char {
kAlternate = 0x00,
kAlways = 0x01,
};
enum class CompactionServiceJobStatus : char {
kSuccess,
kFailure,
@ -787,14 +782,22 @@ struct DBOptions {
// Default: true
bool advise_random_on_open = true;
// If true, allows for memtable purge instead of flush to storage.
// (experimental).
bool experimental_allow_mempurge = false;
// If experimental_allow_mempurge is true, will dictate MemPurge
// policy.
// Default: kAlternate
// (experimental).
MemPurgePolicy experimental_mempurge_policy = MemPurgePolicy::kAlternate;
// [experimental]
// Used to activate or deactive the Mempurge feature (memtable garbage
// collection). (deactivated by default). At every flush, the total useful
// payload (total entries minus garbage entries) is estimated as a ratio
// [useful payload bytes]/[size of a memtable (in bytes)]. This ratio is then
// compared to this `threshold` value:
// - if ratio<threshold: the flush is replaced by a mempurge operation
// - else: a regular flush operation takes place.
// Threshold values:
// 0.0: mempurge deactivated (default).
// 1.0: recommended threshold value.
// >1.0 : aggressive mempurge.
// 0 < threshold < 1.0: mempurge triggered only for very low useful payload
// ratios.
// [experimental]
double experimental_mempurge_threshold = 0.0;
// Amount of data to build up in memtables across all column
// families before writing to disk.

@ -177,6 +177,9 @@ class InlineSkipList {
// Retreat to the last entry with a key <= target
void SeekForPrev(const char* target);
// Advance to a random entry in the list.
void RandomSeek();
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst();
@ -252,6 +255,9 @@ class InlineSkipList {
// Return head_ if list is empty.
Node* FindLast() const;
// Returns a random entry.
Node* FindRandomEntry() const;
// Traverses a single level of the list, setting *out_prev to the last
// node before the key and *out_next to the first node after. Assumes
// that the key is not present in the skip list. On entry, before should
@ -412,6 +418,11 @@ inline void InlineSkipList<Comparator>::Iterator::SeekForPrev(
}
}
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::RandomSeek() {
node_ = list_->FindRandomEntry();
}
template <class Comparator>
inline void InlineSkipList<Comparator>::Iterator::SeekToFirst() {
node_ = list_->head_->Next(0);
@ -558,6 +569,48 @@ InlineSkipList<Comparator>::FindLast() const {
}
}
template <class Comparator>
typename InlineSkipList<Comparator>::Node*
InlineSkipList<Comparator>::FindRandomEntry() const {
// TODO(bjlemaire): consider adding PREFETCH calls.
Node *x = head_, *scan_node = nullptr, *limit_node = nullptr;
// We start at the max level.
// FOr each level, we look at all the nodes at the level, and
// we randomly pick one of them. Then decrement the level
// and reiterate the process.
// eg: assume GetMaxHeight()=5, and there are #100 elements (nodes).
// level 4 nodes: lvl_nodes={#1, #15, #67, #84}. Randomly pick #15.
// We will consider all the nodes between #15 (inclusive) and #67
// (exclusive). #67 is called 'limit_node' here.
// level 3 nodes: lvl_nodes={#15, #21, #45, #51}. Randomly choose
// #51. #67 remains 'limit_node'.
// [...]
// level 0 nodes: lvl_nodes={#56,#57,#58,#59}. Randomly pick $57.
// Return Node #57.
std::vector<Node*> lvl_nodes;
Random* rnd = Random::GetTLSInstance();
int level = GetMaxHeight() - 1;
while (level >= 0) {
lvl_nodes.clear();
scan_node = x;
while (scan_node != limit_node) {
lvl_nodes.push_back(scan_node);
scan_node = scan_node->Next(level);
}
uint32_t rnd_idx = rnd->Next() % lvl_nodes.size();
x = lvl_nodes[rnd_idx];
if (rnd_idx + 1 < lvl_nodes.size()) {
limit_node = lvl_nodes[rnd_idx + 1];
}
level--;
}
// There is a special case where x could still be the head_
// (note that the head_ contains no key).
return x == head_ ? head_->Next(0) : x;
}
template <class Comparator>
uint64_t InlineSkipList<Comparator>::EstimateCount(const char* key) const {
uint64_t count = 0;

@ -3,6 +3,8 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include <random>
#include "db/memtable.h"
#include "memory/arena.h"
#include "memtable/inlineskiplist.h"
@ -95,6 +97,66 @@ public:
return (end_count >= start_count) ? (end_count - start_count) : 0;
}
void UniqueRandomSample(const uint64_t& num_entries,
const uint64_t& target_sample_size,
std::unordered_set<const char*>* entries) override {
entries->clear();
// Avoid divide-by-0.
assert(target_sample_size > 0);
assert(num_entries > 0);
// NOTE: the size of entries is not enforced to be exactly
// target_sample_size at the end of this function, it might be slightly
// greater or smaller.
SkipListRep::Iterator iter(&skip_list_);
// There are two methods to create the subset of samples (size m)
// from the table containing N elements:
// 1-Iterate linearly through the N memtable entries. For each entry i,
// add it to the sample set with a probability
// (target_sample_size - entries.size() ) / (N-i).
//
// 2-Pick m random elements without repetition.
// We pick Option 2 when m<sqrt(N) and
// Option 1 when m > sqrt(N).
if (target_sample_size >
static_cast<uint64_t>(std::sqrt(1.0 * num_entries))) {
Random* rnd = Random::GetTLSInstance();
iter.SeekToFirst();
uint64_t counter = 0, num_samples_left = target_sample_size;
for (; iter.Valid() && (num_samples_left > 0); iter.Next(), counter++) {
// Add entry to sample set with probability
// num_samples_left/(num_entries - counter).
if (rnd->Next() % (num_entries - counter) < num_samples_left) {
entries->insert(iter.key());
num_samples_left--;
}
}
} else {
// Option 2: pick m random elements with no duplicates.
// If Option 2 is picked, then target_sample_size<sqrt(N)
// Using a set spares the need to check for duplicates.
for (uint64_t i = 0; i < target_sample_size; i++) {
// We give it 5 attempts to find a non-duplicate
// With 5 attempts, the chances of returning `entries` set
// of size target_sample_size is:
// PROD_{i=1}^{target_sample_size-1} [1-(i/N)^5]
// which is monotonically increasing with N in the worse case
// of target_sample_size=sqrt(N), and is always >99.9% for N>4.
// At worst, for the final pick , when m=sqrt(N) there is
// a probability of p= 1/sqrt(N) chances to find a duplicate.
for (uint64_t j = 0; j < 5; j++) {
iter.RandomSeek();
// unordered_set::insert returns pair<iterator, bool>.
// The second element is true if an insert successfully happened.
// If element is already in the set, this bool will be false, and
// true otherwise.
if ((entries->insert(iter.key())).second) {
break;
}
}
}
}
}
~SkipListRep() override {}
// Iteration over the contents of a skip list
@ -143,6 +205,8 @@ public:
}
}
void RandomSeek() override { iter_.RandomSeek(); }
// Position at the first entry in list.
// Final state of iterator is Valid() iff list is not empty.
void SeekToFirst() override { iter_.SeekToFirst(); }

@ -48,11 +48,6 @@ static std::unordered_map<std::string, InfoLogLevel> info_log_level_string_map =
{"FATAL_LEVEL", InfoLogLevel::FATAL_LEVEL},
{"HEADER_LEVEL", InfoLogLevel::HEADER_LEVEL}};
static std::unordered_map<std::string, MemPurgePolicy>
experimental_mempurge_policy_string_map = {
{"kAlternate", MemPurgePolicy::kAlternate},
{"kAlways", MemPurgePolicy::kAlways}};
static std::unordered_map<std::string, OptionTypeInfo>
db_mutable_options_type_info = {
{"allow_os_buffer",
@ -197,14 +192,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct ImmutableDBOptions, error_if_exists),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"experimental_allow_mempurge",
{offsetof(struct ImmutableDBOptions, experimental_allow_mempurge),
OptionType::kBoolean, OptionVerificationType::kNormal,
{"experimental_mempurge_threshold",
{offsetof(struct ImmutableDBOptions, experimental_mempurge_threshold),
OptionType::kDouble, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"experimental_mempurge_policy",
OptionTypeInfo::Enum<MemPurgePolicy>(
offsetof(struct ImmutableDBOptions, experimental_mempurge_policy),
&experimental_mempurge_policy_string_map)},
{"is_fd_close_on_exec",
{offsetof(struct ImmutableDBOptions, is_fd_close_on_exec),
OptionType::kBoolean, OptionVerificationType::kNormal,
@ -615,8 +606,7 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
allow_fallocate(options.allow_fallocate),
is_fd_close_on_exec(options.is_fd_close_on_exec),
advise_random_on_open(options.advise_random_on_open),
experimental_allow_mempurge(options.experimental_allow_mempurge),
experimental_mempurge_policy(options.experimental_mempurge_policy),
experimental_mempurge_threshold(options.experimental_mempurge_threshold),
db_write_buffer_size(options.db_write_buffer_size),
write_buffer_manager(options.write_buffer_manager),
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
@ -750,12 +740,9 @@ void ImmutableDBOptions::Dump(Logger* log) const {
is_fd_close_on_exec);
ROCKS_LOG_HEADER(log, " Options.advise_random_on_open: %d",
advise_random_on_open);
ROCKS_LOG_HEADER(log,
" Options.experimental_allow_mempurge: %d",
experimental_allow_mempurge);
ROCKS_LOG_HEADER(log,
" Options.experimental_mempurge_policy: %d",
static_cast<int>(experimental_mempurge_policy));
ROCKS_LOG_HEADER(
log, " Options.experimental_mempurge_threshold: %f",
experimental_mempurge_threshold);
ROCKS_LOG_HEADER(
log, " Options.db_write_buffer_size: %" ROCKSDB_PRIszt,
db_write_buffer_size);

@ -57,8 +57,7 @@ struct ImmutableDBOptions {
bool allow_fallocate;
bool is_fd_close_on_exec;
bool advise_random_on_open;
bool experimental_allow_mempurge;
MemPurgePolicy experimental_mempurge_policy;
double experimental_mempurge_threshold;
size_t db_write_buffer_size;
std::shared_ptr<WriteBufferManager> write_buffer_manager;
DBOptions::AccessHint access_hint_on_compaction_start;

@ -143,7 +143,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"persist_stats_to_disk", "false"},
{"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"},
{"experimental_allow_mempurge", "false"},
{"experimental_mempurge_threshold", "0.0"},
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
@ -302,7 +302,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false);
ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);
@ -2047,7 +2047,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
{"persist_stats_to_disk", "false"},
{"stats_history_buffer_size", "69"},
{"advise_random_on_open", "true"},
{"experimental_allow_mempurge", "false"},
{"experimental_mempurge_threshold", "0.0"},
{"use_adaptive_mutex", "false"},
{"new_table_reader_for_compaction_inputs", "true"},
{"compaction_readahead_size", "100"},
@ -2200,7 +2200,7 @@ TEST_F(OptionsOldApiTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.persist_stats_to_disk, false);
ASSERT_EQ(new_db_opt.stats_history_buffer_size, 69U);
ASSERT_EQ(new_db_opt.advise_random_on_open, true);
ASSERT_EQ(new_db_opt.experimental_allow_mempurge, false);
ASSERT_EQ(new_db_opt.experimental_mempurge_threshold, 0.0);
ASSERT_EQ(new_db_opt.use_adaptive_mutex, false);
ASSERT_EQ(new_db_opt.new_table_reader_for_compaction_inputs, true);
ASSERT_EQ(new_db_opt.compaction_readahead_size, 100);

@ -1039,19 +1039,6 @@ static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
return ROCKSDB_NAMESPACE::kSnappyCompression; // default value
}
static enum ROCKSDB_NAMESPACE::MemPurgePolicy StringToMemPurgePolicy(
const char* mpolicy) {
assert(mpolicy);
if (!strcasecmp(mpolicy, "kAlways")) {
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlways;
} else if (!strcasecmp(mpolicy, "kAlternate")) {
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
}
fprintf(stdout, "Cannot parse mempurge policy '%s'\n", mpolicy);
return ROCKSDB_NAMESPACE::MemPurgePolicy::kAlternate;
}
static std::string ColumnFamilyName(size_t i) {
if (i == 0) {
return ROCKSDB_NAMESPACE::kDefaultColumnFamilyName;
@ -1186,11 +1173,9 @@ DEFINE_bool(
DEFINE_bool(allow_concurrent_memtable_write, true,
"Allow multi-writers to update mem tables in parallel.");
DEFINE_bool(experimental_allow_mempurge, false,
"Allow memtable garbage collection.");
DEFINE_string(experimental_mempurge_policy, "kAlternate",
"Specify memtable garbage collection policy.");
DEFINE_double(experimental_mempurge_threshold, 0.0,
"Maximum useful payload ratio estimate that triggers a mempurge "
"(memtable garbage collection).");
DEFINE_bool(inplace_update_support,
ROCKSDB_NAMESPACE::Options().inplace_update_support,
@ -4275,9 +4260,8 @@ class Benchmark {
options.delayed_write_rate = FLAGS_delayed_write_rate;
options.allow_concurrent_memtable_write =
FLAGS_allow_concurrent_memtable_write;
options.experimental_allow_mempurge = FLAGS_experimental_allow_mempurge;
options.experimental_mempurge_policy =
StringToMemPurgePolicy(FLAGS_experimental_mempurge_policy.c_str());
options.experimental_mempurge_threshold =
FLAGS_experimental_mempurge_threshold;
options.inplace_update_support = FLAGS_inplace_update_support;
options.inplace_update_num_locks = FLAGS_inplace_update_num_locks;
options.enable_write_thread_adaptive_yield =

@ -220,8 +220,7 @@ whitebox_default_params = {
simple_default_params = {
"allow_concurrent_memtable_write": lambda: random.randint(0, 1),
"column_families": 1,
"experimental_allow_mempurge": lambda: random.randint(0, 1),
"experimental_mempurge_policy": lambda: random.choice(["kAlways", "kAlternate"]),
"experimental_mempurge_threshold": lambda: 10.0*random.random(),
"max_background_compactions": 1,
"max_bytes_for_level_base": 67108864,
"memtablerep": "skip_list",

Loading…
Cancel
Save