Lazily initialize RangeDelAggregator's map and pinning manager

Summary:
Since a RangeDelAggregator is created for each read request, these heap-allocating member variables were consuming significant CPU (~3% total) which slowed down request throughput. The map and pinning manager are only necessary when range deletions exist, so we can defer their initialization until the first range deletion is encountered. Currently lazy initialization is done for reads only since reads pass us a single snapshot, which is easier to store on the stack for later insertion into the map than the vector passed to us by flush or compaction.

Note the Arena member variable is still expensive, I will figure out what to do with it in a subsequent diff. It cannot be lazily initialized because we currently use this arena even to allocate empty iterators, which is necessary even when no range deletions exist.
Closes https://github.com/facebook/rocksdb/pull/1539

Differential Revision: D4203488

Pulled By: ajkr

fbshipit-source-id: 3b36279
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 41e77b8390
commit 3f62215210
  1. 7
      db/db_impl.cc
  2. 2
      db/db_impl_readonly.cc
  3. 2
      db/db_iter.cc
  4. 75
      db/range_del_aggregator.cc
  5. 23
      db/range_del_aggregator.h
  6. 2
      db/table_cache.cc

@ -3970,7 +3970,7 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
SuperVersion* sv = GetAndRefSuperVersion(cfd); SuperVersion* sv = GetAndRefSuperVersion(cfd);
// Prepare to store a list of merge operations if merge occurs. // Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context; MergeContext merge_context;
RangeDelAggregator range_del_agg(cfd->internal_comparator(), {snapshot}); RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
Status s; Status s;
// First look in the memtable, then in the immutable memtable (if any). // First look in the memtable, then in the immutable memtable (if any).
@ -4079,7 +4079,7 @@ std::vector<Status> DBImpl::MultiGet(
LookupKey lkey(keys[i], snapshot); LookupKey lkey(keys[i], snapshot);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family[i]);
RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(), RangeDelAggregator range_del_agg(cfh->cfd()->internal_comparator(),
{snapshot}); snapshot);
auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID()); auto mgd_iter = multiget_cf_data.find(cfh->cfd()->GetID());
assert(mgd_iter != multiget_cf_data.end()); assert(mgd_iter != multiget_cf_data.end());
auto mgd = mgd_iter->second; auto mgd = mgd_iter->second;
@ -6359,7 +6359,8 @@ Status DBImpl::GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
bool* found_record_for_key) { bool* found_record_for_key) {
Status s; Status s;
MergeContext merge_context; MergeContext merge_context;
RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(), {}); RangeDelAggregator range_del_agg(sv->mem->GetInternalKeyComparator(),
kMaxSequenceNumber);
ReadOptions read_options; ReadOptions read_options;
SequenceNumber current_seq = versions_->LastSequence(); SequenceNumber current_seq = versions_->LastSequence();

@ -38,7 +38,7 @@ Status DBImplReadOnly::Get(const ReadOptions& read_options,
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
SuperVersion* super_version = cfd->GetSuperVersion(); SuperVersion* super_version = cfd->GetSuperVersion();
MergeContext merge_context; MergeContext merge_context;
RangeDelAggregator range_del_agg(cfd->internal_comparator(), {snapshot}); RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot);
if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg, if (super_version->mem->Get(lkey, value, &s, &merge_context, &range_del_agg,
read_options)) { read_options)) {

@ -123,7 +123,7 @@ class DBIter: public Iterator {
prefix_same_as_start_(prefix_same_as_start), prefix_same_as_start_(prefix_same_as_start),
pin_thru_lifetime_(pin_data), pin_thru_lifetime_(pin_data),
total_order_seek_(total_order_seek), total_order_seek_(total_order_seek),
range_del_agg_(InternalKeyComparator(cmp), {s}) { range_del_agg_(InternalKeyComparator(cmp), s) {
RecordTick(statistics_, NO_ITERATORS); RecordTick(statistics_, NO_ITERATORS);
prefix_extractor_ = ioptions.prefix_extractor; prefix_extractor_ = ioptions.prefix_extractor;
max_skip_ = max_sequential_skip_in_iterations; max_skip_ = max_sequential_skip_in_iterations;

@ -12,17 +12,31 @@ namespace rocksdb {
RangeDelAggregator::RangeDelAggregator( RangeDelAggregator::RangeDelAggregator(
const InternalKeyComparator& icmp, const InternalKeyComparator& icmp,
const std::vector<SequenceNumber>& snapshots) const std::vector<SequenceNumber>& snapshots)
: icmp_(icmp) { : upper_bound_(kMaxSequenceNumber), icmp_(icmp) {
pinned_iters_mgr_.StartPinning(); InitRep(snapshots);
}
RangeDelAggregator::RangeDelAggregator(const InternalKeyComparator& icmp,
SequenceNumber snapshot)
: upper_bound_(snapshot), icmp_(icmp) {}
void RangeDelAggregator::InitRep(const std::vector<SequenceNumber>& snapshots) {
assert(rep_ == nullptr);
rep_.reset(new Rep());
for (auto snapshot : snapshots) { for (auto snapshot : snapshots) {
stripe_map_.emplace(snapshot, rep_->stripe_map_.emplace(
TombstoneMap(stl_wrappers::LessOfComparator(&icmp_))); snapshot, TombstoneMap(stl_wrappers::LessOfComparator(&icmp_)));
} }
// Data newer than any snapshot falls in this catch-all stripe // Data newer than any snapshot falls in this catch-all stripe
stripe_map_.emplace(kMaxSequenceNumber, TombstoneMap()); rep_->stripe_map_.emplace(
kMaxSequenceNumber, TombstoneMap(stl_wrappers::LessOfComparator(&icmp_)));
rep_->pinned_iters_mgr_.StartPinning();
} }
bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) { bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) {
if (rep_ == nullptr) {
return false;
}
ParsedInternalKey parsed; ParsedInternalKey parsed;
if (!ParseInternalKey(internal_key, &parsed)) { if (!ParseInternalKey(internal_key, &parsed)) {
assert(false); assert(false);
@ -32,7 +46,9 @@ bool RangeDelAggregator::ShouldDelete(const Slice& internal_key) {
bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) { bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) {
assert(IsValueType(parsed.type)); assert(IsValueType(parsed.type));
if (rep_ == nullptr) {
return false;
}
const auto& tombstone_map = GetTombstoneMap(parsed.sequence); const auto& tombstone_map = GetTombstoneMap(parsed.sequence);
for (const auto& start_key_and_tombstone : tombstone_map) { for (const auto& start_key_and_tombstone : tombstone_map) {
const auto& tombstone = start_key_and_tombstone.second; const auto& tombstone = start_key_and_tombstone.second;
@ -51,14 +67,17 @@ bool RangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed) {
bool RangeDelAggregator::ShouldAddTombstones( bool RangeDelAggregator::ShouldAddTombstones(
bool bottommost_level /* = false */) { bool bottommost_level /* = false */) {
auto stripe_map_iter = stripe_map_.begin(); if (rep_ == nullptr) {
assert(stripe_map_iter != stripe_map_.end()); return false;
}
auto stripe_map_iter = rep_->stripe_map_.begin();
assert(stripe_map_iter != rep_->stripe_map_.end());
if (bottommost_level) { if (bottommost_level) {
// For the bottommost level, keys covered by tombstones in the first // For the bottommost level, keys covered by tombstones in the first
// (oldest) stripe have been compacted away, so the tombstones are obsolete. // (oldest) stripe have been compacted away, so the tombstones are obsolete.
++stripe_map_iter; ++stripe_map_iter;
} }
while (stripe_map_iter != stripe_map_.end()) { while (stripe_map_iter != rep_->stripe_map_.end()) {
if (!stripe_map_iter->second.empty()) { if (!stripe_map_iter->second.empty()) {
return true; return true;
} }
@ -77,9 +96,15 @@ Status RangeDelAggregator::AddTombstones(
} }
Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) { Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) {
pinned_iters_mgr_.PinIterator(input, arena);
input->SeekToFirst(); input->SeekToFirst();
bool first_iter = true;
while (input->Valid()) { while (input->Valid()) {
if (first_iter) {
if (rep_ == nullptr) {
InitRep({upper_bound_});
}
first_iter = false;
}
ParsedInternalKey parsed_key; ParsedInternalKey parsed_key;
if (!ParseInternalKey(input->key(), &parsed_key)) { if (!ParseInternalKey(input->key(), &parsed_key)) {
return Status::Corruption("Unable to parse range tombstone InternalKey"); return Status::Corruption("Unable to parse range tombstone InternalKey");
@ -89,22 +114,30 @@ Status RangeDelAggregator::AddTombstones(InternalIterator* input, bool arena) {
tombstone_map.emplace(input->key(), std::move(tombstone)); tombstone_map.emplace(input->key(), std::move(tombstone));
input->Next(); input->Next();
} }
if (!first_iter) {
rep_->pinned_iters_mgr_.PinIterator(input, arena);
} else if (arena) {
input->~InternalIterator();
} else {
delete input;
}
return Status::OK(); return Status::OK();
} }
RangeDelAggregator::TombstoneMap& RangeDelAggregator::GetTombstoneMap( RangeDelAggregator::TombstoneMap& RangeDelAggregator::GetTombstoneMap(
SequenceNumber seq) { SequenceNumber seq) {
assert(rep_ != nullptr);
// The stripe includes seqnum for the snapshot above and excludes seqnum for // The stripe includes seqnum for the snapshot above and excludes seqnum for
// the snapshot below. // the snapshot below.
StripeMap::iterator iter; StripeMap::iterator iter;
if (seq > 0) { if (seq > 0) {
// upper_bound() checks strict inequality so need to subtract one // upper_bound() checks strict inequality so need to subtract one
iter = stripe_map_.upper_bound(seq - 1); iter = rep_->stripe_map_.upper_bound(seq - 1);
} else { } else {
iter = stripe_map_.begin(); iter = rep_->stripe_map_.begin();
} }
// catch-all stripe justifies this assertion in either of above cases // catch-all stripe justifies this assertion in either of above cases
assert(iter != stripe_map_.end()); assert(iter != rep_->stripe_map_.end());
return iter->second; return iter->second;
} }
@ -117,8 +150,11 @@ void RangeDelAggregator::AddToBuilder(
TableBuilder* builder, const Slice* lower_bound, const Slice* upper_bound, TableBuilder* builder, const Slice* lower_bound, const Slice* upper_bound,
FileMetaData* meta, FileMetaData* meta,
bool bottommost_level /* = false */) { bool bottommost_level /* = false */) {
auto stripe_map_iter = stripe_map_.begin(); if (rep_ == nullptr) {
assert(stripe_map_iter != stripe_map_.end()); return;
}
auto stripe_map_iter = rep_->stripe_map_.begin();
assert(stripe_map_iter != rep_->stripe_map_.end());
if (bottommost_level) { if (bottommost_level) {
// For the bottommost level, keys covered by tombstones in the first // For the bottommost level, keys covered by tombstones in the first
// (oldest) stripe have been compacted away, so the tombstones are obsolete. // (oldest) stripe have been compacted away, so the tombstones are obsolete.
@ -128,7 +164,7 @@ void RangeDelAggregator::AddToBuilder(
// Note the order in which tombstones are stored is insignificant since we // Note the order in which tombstones are stored is insignificant since we
// insert them into a std::map on the read path. // insert them into a std::map on the read path.
bool first_added = false; bool first_added = false;
while (stripe_map_iter != stripe_map_.end()) { while (stripe_map_iter != rep_->stripe_map_.end()) {
for (const auto& start_key_and_tombstone : stripe_map_iter->second) { for (const auto& start_key_and_tombstone : stripe_map_iter->second) {
const auto& tombstone = start_key_and_tombstone.second; const auto& tombstone = start_key_and_tombstone.second;
if (upper_bound != nullptr && if (upper_bound != nullptr &&
@ -204,8 +240,11 @@ void RangeDelAggregator::AddToBuilder(
} }
bool RangeDelAggregator::IsEmpty() { bool RangeDelAggregator::IsEmpty() {
for (auto stripe_map_iter = stripe_map_.begin(); if (rep_ == nullptr) {
stripe_map_iter != stripe_map_.end(); ++stripe_map_iter) { return true;
}
for (auto stripe_map_iter = rep_->stripe_map_.begin();
stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) {
if (!stripe_map_iter->second.empty()) { if (!stripe_map_iter->second.empty()) {
return false; return false;
} }

@ -37,9 +37,17 @@ class RangeDelAggregator {
// (get/iterator), only the user snapshot is provided such that the seqnum // (get/iterator), only the user snapshot is provided such that the seqnum
// space is divided into two stripes, where only tombstones in the older // space is divided into two stripes, where only tombstones in the older
// stripe are considered by ShouldDelete(). // stripe are considered by ShouldDelete().
// Note this overload does not lazily initialize Rep.
RangeDelAggregator(const InternalKeyComparator& icmp, RangeDelAggregator(const InternalKeyComparator& icmp,
const std::vector<SequenceNumber>& snapshots); const std::vector<SequenceNumber>& snapshots);
// @param upper_bound Similar to snapshots above, except with a single
// snapshot, which allows us to store the snapshot on the stack and defer
// initialization of heap-allocating members (in Rep) until the first range
// deletion is encountered.
RangeDelAggregator(const InternalKeyComparator& icmp,
SequenceNumber upper_bound);
// Returns whether the key should be deleted, which is the case when it is // Returns whether the key should be deleted, which is the case when it is
// covered by a range tombstone residing in the same snapshot stripe. // covered by a range tombstone residing in the same snapshot stripe.
bool ShouldDelete(const ParsedInternalKey& parsed); bool ShouldDelete(const ParsedInternalKey& parsed);
@ -86,13 +94,22 @@ class RangeDelAggregator {
// their seqnums are greater than the next smaller snapshot's seqnum. // their seqnums are greater than the next smaller snapshot's seqnum.
typedef std::map<SequenceNumber, TombstoneMap> StripeMap; typedef std::map<SequenceNumber, TombstoneMap> StripeMap;
struct Rep {
StripeMap stripe_map_;
PinnedIteratorsManager pinned_iters_mgr_;
};
// Initializes rep_ lazily. This aggregator object is constructed for every
// read, so expensive members should only be created when necessary, i.e.,
// once the first range deletion is encountered.
void InitRep(const std::vector<SequenceNumber>& snapshots);
Status AddTombstones(InternalIterator* input, bool arena); Status AddTombstones(InternalIterator* input, bool arena);
TombstoneMap& GetTombstoneMap(SequenceNumber seq); TombstoneMap& GetTombstoneMap(SequenceNumber seq);
StripeMap stripe_map_; SequenceNumber upper_bound_;
const InternalKeyComparator icmp_;
Arena arena_; // must be destroyed after pinned_iters_mgr_ which references Arena arena_; // must be destroyed after pinned_iters_mgr_ which references
// memory in this arena // memory in this arena
PinnedIteratorsManager pinned_iters_mgr_; std::unique_ptr<Rep> rep_;
const InternalKeyComparator icmp_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -245,7 +245,7 @@ InternalIterator* TableCache::NewIterator(
if (handle != nullptr) { if (handle != nullptr) {
ReleaseHandle(handle); ReleaseHandle(handle);
} }
return NewErrorInternalIterator(s); return NewErrorInternalIterator(s, arena);
} }
InternalIterator* TableCache::NewRangeDeletionIterator( InternalIterator* TableCache::NewRangeDeletionIterator(

Loading…
Cancel
Save