Improve MemPurge sampling (#8656)

Summary:
Previously, the `MemPurge` sampling function was assessing whether a random entry from a memtable was garbage or not by simply querying the given memtable (see https://github.com/facebook/rocksdb/issues/8628 for more details).
In this diff, I am updating the sampling function by querying not only the memtable the entry was drawn from, but also all subsequent memtables that have a greater memtable ID.
I also added the size of the value for KV entries in the payload/useful payload estimates (which was also one of the reasons why sampling was not as good as mempurging all the time in terms of L0 SST files reduction).
Once these changes were made, I was able to clean obsolete objects and functions from the `MemtableList` struct, and did a bit of cleanup everywhere.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8656

Reviewed By: pdillinger

Differential Revision: D30288583

Pulled By: bjlemaire

fbshipit-source-id: 7646a545ec56f4715949daa59ab5eee74540feb3
main
Baptiste Lemaire 3 years ago committed by Facebook GitHub Bot
parent 74a652a45f
commit e51be2c5a1
  1. 264
      db/flush_job.cc
  2. 3
      db/flush_job.h
  3. 19
      db/memtable_list.cc
  4. 18
      db/memtable_list.h
  5. 4
      include/rocksdb/memtablerep.h
  6. 4
      memtable/skiplistrep.cc

@ -192,19 +192,6 @@ void FlushJob::PickMemTable() {
// path 0 for level 0 file. // path 0 for level 0 file.
meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
// If mempurge feature is activated, keep track of any potential
// memtables coming from a previous mempurge operation.
// Used for mempurge policy.
if (db_options_.experimental_mempurge_threshold > 0.0) {
contains_mempurge_outcome_ = false;
for (MemTable* mt : mems_) {
if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
contains_mempurge_outcome_ = true;
break;
}
}
}
base_ = cfd_->current(); base_ = cfd_->current();
base_->Ref(); // it is likely that we do not need this reference base_->Ref(); // it is likely that we do not need this reference
} }
@ -246,8 +233,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
(!mems_.empty()) && MemPurgeDecider()) { (!mems_.empty()) && MemPurgeDecider()) {
mempurge_s = MemPurge(); mempurge_s = MemPurge();
if (!mempurge_s.ok()) { if (!mempurge_s.ok()) {
// Mempurge is typically aborted when the new_mem output memtable // Mempurge is typically aborted when the output
// is filled at more than XX % capacity (currently: 60%). // bytes cannot be contained onto a single output memtable.
if (mempurge_s.IsAborted()) { if (mempurge_s.IsAborted()) {
ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n", ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
mempurge_s.ToString().c_str()); mempurge_s.ToString().c_str());
@ -567,16 +554,9 @@ Status FlushJob::MemPurge() {
!(new_mem->ShouldFlushNow())) { !(new_mem->ShouldFlushNow())) {
db_mutex_->Lock(); db_mutex_->Lock();
uint64_t new_mem_id = mems_[0]->GetID(); uint64_t new_mem_id = mems_[0]->GetID();
// Copy lowest memtable ID
// House keeping work.
for (MemTable* mt : mems_) {
new_mem_id = mt->GetID() < new_mem_id ? mt->GetID() : new_mem_id;
// Note: if m is not a previous mempurge output memtable,
// nothing happens.
cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
}
new_mem->SetID(new_mem_id); new_mem->SetID(new_mem_id);
cfd_->imm()->AddMemPurgeOutputID(new_mem_id);
// This addition will not trigger another flush, because // This addition will not trigger another flush, because
// we do not call SchedulePendingFlush(). // we do not call SchedulePendingFlush().
cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free); cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
@ -631,7 +611,20 @@ bool FlushJob::MemPurgeDecider() {
// Payload and useful_payload (in bytes). // Payload and useful_payload (in bytes).
// The useful payload ratio of a given MemTable // The useful payload ratio of a given MemTable
// is estimated to be useful_payload/payload. // is estimated to be useful_payload/payload.
uint64_t payload = 0, useful_payload = 0; uint64_t payload = 0, useful_payload = 0, entry_size = 0;
// Local variables used repetitively inside the for-loop
// when iterating over the sampled entries.
Slice key_slice, value_slice;
ParsedInternalKey res;
SnapshotImpl min_snapshot;
std::string vget;
Status mget_s, parse_s;
MergeContext merge_context;
SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
min_seqno_snapshot = 0;
bool get_res, can_be_useful_payload, not_in_next_mems;
// If estimated_useful_payload is > threshold, // If estimated_useful_payload is > threshold,
// then flush to storage, else MemPurge. // then flush to storage, else MemPurge.
double estimated_useful_payload = 0.0; double estimated_useful_payload = 0.0;
@ -643,106 +636,136 @@ bool FlushJob::MemPurgeDecider() {
ro.total_order_seek = true; ro.total_order_seek = true;
// Iterate over each memtable of the set. // Iterate over each memtable of the set.
for (MemTable* mt : mems_) { for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
// If the memtable is the output of a previous mempurge, mem_iter++) {
// its approximate useful payload ratio is already calculated. MemTable* mt = *mem_iter;
if (cfd_->imm()->IsMemPurgeOutput(mt->GetID())) {
// We make the assumption that this memtable is already // Else sample from the table.
// free of garbage (garbage underestimation). uint64_t nentries = mt->num_entries();
estimated_useful_payload += mt->ApproximateMemoryUsage(); // Corrected Cochran formula for small populations
} else { // (converges to n0 for large populations).
// Else sample from the table. uint64_t target_sample_size =
uint64_t nentries = mt->num_entries(); static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
// Corrected Cochran formula for small populations std::unordered_set<const char*> sentries = {};
// (converges to n0 for large populations). // Populate sample entries set.
uint64_t target_sample_size = mt->UniqueRandomSample(target_sample_size, &sentries);
static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
std::unordered_set<const char*> sentries = {}; // Estimate the garbage ratio by comparing if
// Populate sample entries set. // each sample corresponds to a valid entry.
mt->UniqueRandomSample(target_sample_size, &sentries); for (const char* ss : sentries) {
key_slice = GetLengthPrefixedSlice(ss);
// Estimate the garbage ratio by comparing if parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
// each sample corresponds to a valid entry. if (!parse_s.ok()) {
for (const char* ss : sentries) { ROCKS_LOG_WARN(db_options_.info_log,
ParsedInternalKey res; "Memtable Decider: ParseInternalKey did not parse "
Slice entry_slice = GetLengthPrefixedSlice(ss); "key_slice %s successfully.",
Status parse_s = key_slice.data());
ParseInternalKey(entry_slice, &res, true /*log_err_key*/); }
if (!parse_s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, // Size of the entry is "key size (+ value size if KV entry)"
"Memtable Decider: ParseInternalKey did not parse " entry_size = key_slice.size();
"entry_slice %s" if (res.type == kTypeValue) {
"successfully.", value_slice =
entry_slice.data()); GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
} entry_size += value_slice.size();
LookupKey lkey(res.user_key, kMaxSequenceNumber); }
std::string vget;
Status s; // Count entry bytes as payload.
MergeContext merge_context; payload += entry_size;
SequenceNumber max_covering_tombstone_seq = 0, sqno = 0;
LookupKey lkey(res.user_key, kMaxSequenceNumber);
// Pick the oldest existing snapshot that is more recent
// than the sequence number of the sampled entry. // Paranoia: zero out these values just in case.
SequenceNumber min_seqno_snapshot = kMaxSequenceNumber; max_covering_tombstone_seq = 0;
SnapshotImpl min_snapshot; sqno = 0;
for (SequenceNumber seq_num : existing_snapshots_) {
if (seq_num > res.sequence && seq_num < min_seqno_snapshot) { // Pick the oldest existing snapshot that is more recent
min_seqno_snapshot = seq_num; // than the sequence number of the sampled entry.
} min_seqno_snapshot = kMaxSequenceNumber;
} for (SequenceNumber seq_num : existing_snapshots_) {
min_snapshot.number_ = min_seqno_snapshot; if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
ro.snapshot = min_seqno_snapshot = seq_num;
min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
// Estimate if the sample entry is valid or not.
bool gres = mt->Get(lkey, &vget, nullptr, &s, &merge_context,
&max_covering_tombstone_seq, &sqno, ro);
if (!gres) {
ROCKS_LOG_WARN(
db_options_.info_log,
"Memtable Get returned false when Get(sampled entry). "
"Yet each sample entry should exist somewhere in the memtable, "
"unrelated to whether it has been deleted or not.");
}
payload += entry_slice.size();
// TODO(bjlemaire): evaluate typeMerge.
// This is where the sampled entry is estimated to be
// garbage or not. Note that this is a garbage *estimation*
// because we do not include certain items such as
// CompactionFitlers triggered at flush, or if the same delete
// has been inserted twice or more in the memtable.
if (res.type == kTypeValue && gres && s.ok() && sqno == res.sequence) {
useful_payload += entry_slice.size();
} else if (((res.type == kTypeDeletion) ||
(res.type == kTypeSingleDeletion)) &&
s.IsNotFound() && gres) {
useful_payload += entry_slice.size();
} }
} }
if (payload > 0) { min_snapshot.number_ = min_seqno_snapshot;
// We used the estimated useful payload ratio ro.snapshot =
// to evaluate how much of the total memtable is useful bytes. min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
estimated_useful_payload +=
(mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); // Estimate if the sample entry is valid or not.
ROCKS_LOG_INFO( get_res = mt->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
db_options_.info_log, &max_covering_tombstone_seq, &sqno, ro);
"Mempurge sampling - found garbage ratio from sampling: %f.\n", if (!get_res) {
(payload - useful_payload) * 1.0 / payload);
} else {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
db_options_.info_log, db_options_.info_log,
"Mempurge kSampling policy: null payload measured, and collected " "Memtable Get returned false when Get(sampled entry). "
"sample size is %zu\n.", "Yet each sample entry should exist somewhere in the memtable, "
sentries.size()); "unrelated to whether it has been deleted or not.");
}
// TODO(bjlemaire): evaluate typeMerge.
// This is where the sampled entry is estimated to be
// garbage or not. Note that this is a garbage *estimation*
// because we do not include certain items such as
// CompactionFitlers triggered at flush, or if the same delete
// has been inserted twice or more in the memtable.
// Evaluate if the entry can be useful payload
// Situation #1: entry is a KV entry, was found in the memtable mt
// and the sequence numbers match.
can_be_useful_payload = (res.type == kTypeValue) && get_res &&
mget_s.ok() && (sqno == res.sequence);
// Situation #2: entry is a delete entry, was found in the memtable mt
// (because gres==true) and no valid KV entry is found.
// (note: duplicate delete entries are also taken into
// account here, because the sequence number 'sqno'
// in memtable->Get(&sqno) operation is set to be equal
// to the most recent delete entry as well).
can_be_useful_payload |=
((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
mget_s.IsNotFound() && get_res && (sqno == res.sequence);
// If there is a chance that the entry is useful payload
// Verify that the entry does not appear in the following memtables
// (memtables with greater memtable ID/larger sequence numbers).
if (can_be_useful_payload) {
not_in_next_mems = true;
for (auto next_mem_iter = mem_iter + 1;
next_mem_iter != std::end(mems_); next_mem_iter++) {
if ((*next_mem_iter)
->Get(lkey, &vget, nullptr, &mget_s, &merge_context,
&max_covering_tombstone_seq, &sqno, ro)) {
not_in_next_mems = false;
break;
}
}
if (not_in_next_mems) {
useful_payload += entry_size;
}
} }
} }
if (payload > 0) {
// We use the estimated useful payload ratio to
// evaluate how many of the memtable bytes are useful bytes.
estimated_useful_payload +=
(mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
ROCKS_LOG_INFO(
db_options_.info_log,
"Mempurge sampling - found garbage ratio from sampling: %f.\n",
(payload - useful_payload) * 1.0 / payload);
} else {
ROCKS_LOG_WARN(db_options_.info_log,
"Mempurge sampling: null payload measured, and collected "
"sample size is %zu\n.",
sentries.size());
}
} }
// We convert the total number of useful paylaod bytes // We convert the total number of useful payload bytes
// into the proportion of memtable necessary to store all these bytes. // into the proportion of memtable necessary to store all these bytes.
// We compare this proportion with the threshold value. // We compare this proportion with the threshold value.
return (estimated_useful_payload / mutable_cf_options_.write_buffer_size) < return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
threshold; threshold);
} }
Status FlushJob::WriteLevel0Table() { Status FlushJob::WriteLevel0Table() {
@ -954,15 +977,6 @@ Status FlushJob::WriteLevel0Table() {
stats.num_output_files_blob = static_cast<int>(blobs.size()); stats.num_output_files_blob = static_cast<int>(blobs.size());
if ((db_options_.experimental_mempurge_threshold > 0.0) && s.ok()) {
// The db_mutex is held at this point.
for (MemTable* mt : mems_) {
// Note: if m is not a previous mempurge output memtable,
// nothing happens here.
cfd_->imm()->RemoveMemPurgeOutputID(mt->GetID());
}
}
RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros); RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats); cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
cfd_->internal_stats()->AddCFStats( cfd_->internal_stats()->AddCFStats(

@ -191,9 +191,6 @@ class FlushJob {
const std::string full_history_ts_low_; const std::string full_history_ts_low_;
BlobFileCompletionCallback* blob_callback_; BlobFileCompletionCallback* blob_callback_;
// Used when experimental_mempurge_threshold > 0.0.
bool contains_mempurge_outcome_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -5,10 +5,12 @@
// //
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include <algorithm>
#include <cinttypes> #include <cinttypes>
#include <limits> #include <limits>
#include <queue> #include <queue>
#include <string> #include <string>
#include "db/db_impl/db_impl.h" #include "db/db_impl/db_impl.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/range_tombstone_fragmenter.h" #include "db/range_tombstone_fragmenter.h"
@ -340,6 +342,14 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH); ThreadStatus::STAGE_PICK_MEMTABLES_TO_FLUSH);
const auto& memlist = current_->memlist_; const auto& memlist = current_->memlist_;
bool atomic_flush = false; bool atomic_flush = false;
// Note: every time MemTableList::Add(mem) is called, it adds the new mem
// at the FRONT of the memlist (memlist.push_front(mem)). Therefore, by
// iterating through the memlist starting at the end, the vector<MemTable*>
// ret is filled with memtables already sorted in increasing MemTable ID.
// However, when the mempurge feature is activated, new memtables with older
// IDs will be added to the memlist. Therefore we std::sort(ret) at the end to
// return a vector of memtables sorted by increasing memtable ID.
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it; MemTable* m = *it;
if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) {
@ -361,6 +371,15 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id,
if (!atomic_flush || num_flush_not_started_ == 0) { if (!atomic_flush || num_flush_not_started_ == 0) {
flush_requested_ = false; // start-flush request is complete flush_requested_ = false; // start-flush request is complete
} }
// Sort the list of memtables by increasing memtable ID.
// This is useful when the mempurge feature is activated
// and the memtables are not guaranteed to be sorted in
// the memlist vector.
std::sort(ret->begin(), ret->end(),
[](const MemTable* m1, const MemTable* m2) -> bool {
return m1->GetID() < m2->GetID();
});
} }
void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems, void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,

@ -390,20 +390,6 @@ class MemTableList {
// not freed, but put into a vector for future deref and reclamation. // not freed, but put into a vector for future deref and reclamation.
void RemoveOldMemTables(uint64_t log_number, void RemoveOldMemTables(uint64_t log_number,
autovector<MemTable*>* to_delete); autovector<MemTable*>* to_delete);
void AddMemPurgeOutputID(uint64_t mid) { mempurged_ids_.insert(mid); }
void RemoveMemPurgeOutputID(uint64_t mid) {
if (mempurged_ids_.find(mid) != mempurged_ids_.end()) {
mempurged_ids_.erase(mid);
}
}
bool IsMemPurgeOutput(uint64_t mid) {
if (mempurged_ids_.find(mid) == mempurged_ids_.end()) {
return false;
}
return true;
}
private: private:
friend Status InstallMemtableAtomicFlushResults( friend Status InstallMemtableAtomicFlushResults(
@ -450,10 +436,6 @@ class MemTableList {
// Cached value of current_->HasHistory(). // Cached value of current_->HasHistory().
std::atomic<bool> current_has_history_; std::atomic<bool> current_has_history_;
// Store the IDs of the memtables installed in this
// list that result from a mempurge operation.
std::unordered_set<uint64_t> mempurged_ids_;
}; };
// Installs memtable atomic flush results. // Installs memtable atomic flush results.

@ -198,8 +198,8 @@ class MemTableRep {
// Returns a vector of unique random memtable entries of approximate // Returns a vector of unique random memtable entries of approximate
// size 'target_sample_size' (this size is not strictly enforced). // size 'target_sample_size' (this size is not strictly enforced).
virtual void UniqueRandomSample(const uint64_t& num_entries, virtual void UniqueRandomSample(const uint64_t num_entries,
const uint64_t& target_sample_size, const uint64_t target_sample_size,
std::unordered_set<const char*>* entries) { std::unordered_set<const char*>* entries) {
(void)num_entries; (void)num_entries;
(void)target_sample_size; (void)target_sample_size;

@ -97,8 +97,8 @@ public:
return (end_count >= start_count) ? (end_count - start_count) : 0; return (end_count >= start_count) ? (end_count - start_count) : 0;
} }
void UniqueRandomSample(const uint64_t& num_entries, void UniqueRandomSample(const uint64_t num_entries,
const uint64_t& target_sample_size, const uint64_t target_sample_size,
std::unordered_set<const char*>* entries) override { std::unordered_set<const char*>* entries) override {
entries->clear(); entries->clear();
// Avoid divide-by-0. // Avoid divide-by-0.

Loading…
Cancel
Save