Replace std::priority_queue in MergingIterator with custom heap

Summary:
While profiling compaction in our service I noticed a lot of CPU (~15% of compaction) being spent in MergingIterator and key comparison.  Looking at the code I found MergingIterator was (understandably) using std::priority_queue for the multiway merge.

Keys in our dataset include sequence numbers that increase with time.  Adjacent keys in an L0 file are very likely to be adjacent in the full database.  Consequently, compaction will often pick a chunk of rows from the same L0 file before switching to another one.  It would be great to avoid the O(log K) operation per row while compacting.

This diff replaces std::priority_queue with a custom binary heap implementation.  It has a "replace top" operation that is cheap when the new top is the same as the old one (i.e. the priority of the top entry is decreased but it still stays on top).

Test Plan:
make check

To test the effect on performance, I generated databases with data patterns that mimic what I describe in the summary (rows have a mostly increasing sequence number).  I see a 10-15% CPU decrease for compaction (and a matching throughput improvement on tmpfs).  The exact improvement depends on the number of L0 files and the amount of locality.  Performance on randomly distributed keys seems on par with the old code.

Reviewers: kailiu, sdong, igor

Reviewed By: igor

Subscribers: yoshinorim, dhruba, tnovak

Differential Revision: https://reviews.facebook.net/D29133
main
lovro 10 years ago
parent 35cd75c379
commit b6655a679d
  1. 16
      table/iter_heap.h
  2. 176
      table/merger.cc
  3. 140
      util/heap.h

@ -5,36 +5,34 @@
// //
#pragma once #pragma once
#include <queue>
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "table/iterator_wrapper.h" #include "table/iterator_wrapper.h"
namespace rocksdb { namespace rocksdb {
// Return the max of two keys. // When used with std::priority_queue, this comparison functor puts the
// iterator with the max/largest key on top.
class MaxIteratorComparator { class MaxIteratorComparator {
public: public:
MaxIteratorComparator(const Comparator* comparator) : MaxIteratorComparator(const Comparator* comparator) :
comparator_(comparator) {} comparator_(comparator) {}
bool operator()(IteratorWrapper* a, IteratorWrapper* b) { bool operator()(IteratorWrapper* a, IteratorWrapper* b) const {
return comparator_->Compare(a->key(), b->key()) <= 0; return comparator_->Compare(a->key(), b->key()) < 0;
} }
private: private:
const Comparator* comparator_; const Comparator* comparator_;
}; };
// Return the max of two keys. // When used with std::priority_queue, this comparison functor puts the
// iterator with the min/smallest key on top.
class MinIteratorComparator { class MinIteratorComparator {
public: public:
// if maxHeap is set comparator returns the max value.
// else returns the min Value.
// Can use to create a minHeap or a maxHeap.
MinIteratorComparator(const Comparator* comparator) : MinIteratorComparator(const Comparator* comparator) :
comparator_(comparator) {} comparator_(comparator) {}
bool operator()(IteratorWrapper* a, IteratorWrapper* b) { bool operator()(IteratorWrapper* a, IteratorWrapper* b) const {
return comparator_->Compare(a->key(), b->key()) > 0; return comparator_->Compare(a->key(), b->key()) > 0;
} }
private: private:

@ -10,7 +10,6 @@
#include "table/merger.h" #include "table/merger.h"
#include <vector> #include <vector>
#include <queue>
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -18,6 +17,7 @@
#include "table/iter_heap.h" #include "table/iter_heap.h"
#include "table/iterator_wrapper.h" #include "table/iterator_wrapper.h"
#include "util/arena.h" #include "util/arena.h"
#include "util/heap.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/autovector.h" #include "util/autovector.h"
@ -25,21 +25,8 @@
namespace rocksdb { namespace rocksdb {
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes // Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace { namespace {
typedef std::priority_queue<IteratorWrapper*, std::vector<IteratorWrapper*>, typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
MaxIteratorComparator> MergerMaxIterHeap; typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
typedef std::priority_queue<IteratorWrapper*, std::vector<IteratorWrapper*>,
MinIteratorComparator> MergerMinIterHeap;
// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator.
MergerMaxIterHeap NewMergerMaxIterHeap(const Comparator* comparator) {
return MergerMaxIterHeap(MaxIteratorComparator(comparator));
}
// Return's a new MinHeap of IteratorWrapper's using the provided Comparator.
MergerMinIterHeap NewMergerMinIterHeap(const Comparator* comparator) {
return MergerMinIterHeap(MinIteratorComparator(comparator));
}
} // namespace } // namespace
const size_t kNumIterReserve = 4; const size_t kNumIterReserve = 4;
@ -51,10 +38,8 @@ class MergingIterator : public Iterator {
: is_arena_mode_(is_arena_mode), : is_arena_mode_(is_arena_mode),
comparator_(comparator), comparator_(comparator),
current_(nullptr), current_(nullptr),
use_heap_(true),
direction_(kForward), direction_(kForward),
maxHeap_(NewMergerMaxIterHeap(comparator_)), minHeap_(comparator_) {
minHeap_(NewMergerMinIterHeap(comparator_)) {
children_.resize(n); children_.resize(n);
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
children_[i].Set(children[i]); children_[i].Set(children[i]);
@ -64,6 +49,7 @@ class MergingIterator : public Iterator {
minHeap_.push(&child); minHeap_.push(&child);
} }
} }
current_ = CurrentForward();
} }
virtual void AddIterator(Iterator* iter) { virtual void AddIterator(Iterator* iter) {
@ -72,6 +58,7 @@ class MergingIterator : public Iterator {
auto new_wrapper = children_.back(); auto new_wrapper = children_.back();
if (new_wrapper.Valid()) { if (new_wrapper.Valid()) {
minHeap_.push(&new_wrapper); minHeap_.push(&new_wrapper);
current_ = CurrentForward();
} }
} }
@ -91,27 +78,25 @@ class MergingIterator : public Iterator {
minHeap_.push(&child); minHeap_.push(&child);
} }
} }
FindSmallest();
direction_ = kForward; direction_ = kForward;
current_ = CurrentForward();
} }
virtual void SeekToLast() override { virtual void SeekToLast() override {
ClearHeaps(); ClearHeaps();
InitMaxHeap();
for (auto& child : children_) { for (auto& child : children_) {
child.SeekToLast(); child.SeekToLast();
if (child.Valid()) { if (child.Valid()) {
maxHeap_.push(&child); maxHeap_->push(&child);
} }
} }
FindLargest();
direction_ = kReverse; direction_ = kReverse;
current_ = CurrentReverse();
} }
virtual void Seek(const Slice& target) override { virtual void Seek(const Slice& target) override {
// Invalidate the heap. ClearHeaps();
use_heap_ = false;
IteratorWrapper* first_child = nullptr;
for (auto& child : children_) { for (auto& child : children_) {
{ {
PERF_TIMER_GUARD(seek_child_seek_time); PERF_TIMER_GUARD(seek_child_seek_time);
@ -120,36 +105,15 @@ class MergingIterator : public Iterator {
PERF_COUNTER_ADD(seek_child_seek_count, 1); PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.Valid()) { if (child.Valid()) {
// This child has valid key
if (!use_heap_) {
if (first_child == nullptr) {
// It's the first child has valid key. Only put it int
// current_. Now the values in the heap should be invalid.
first_child = &child;
} else {
// We have more than one children with valid keys. Initialize
// the heap and put the first child into the heap.
PERF_TIMER_GUARD(seek_min_heap_time);
ClearHeaps();
minHeap_.push(first_child);
}
}
if (use_heap_) {
PERF_TIMER_GUARD(seek_min_heap_time); PERF_TIMER_GUARD(seek_min_heap_time);
minHeap_.push(&child); minHeap_.push(&child);
} }
} }
} direction_ = kForward;
if (use_heap_) { {
// If heap is valid, need to put the smallest key to curent_.
PERF_TIMER_GUARD(seek_min_heap_time); PERF_TIMER_GUARD(seek_min_heap_time);
FindSmallest(); current_ = CurrentForward();
} else {
// The heap is not valid, then the current_ iterator is the first
// one, or null if there is no first child.
current_ = first_child;
} }
direction_ = kForward;
} }
virtual void Next() override { virtual void Next() override {
@ -157,10 +121,11 @@ class MergingIterator : public Iterator {
// Ensure that all children are positioned after key(). // Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already // If we are moving in the forward direction, it is already
// true for all of the non-current_ children since current_ is // true for all of the non-current children since current_ is
// the smallest child and key() == current_->key(). Otherwise, // the smallest child and key() == current_->key().
// we explicitly position the non-current_ children.
if (direction_ != kForward) { if (direction_ != kForward) {
// Otherwise, advance the non-current children. We advance current_
// just after the if-block.
ClearHeaps(); ClearHeaps();
for (auto& child : children_) { for (auto& child : children_) {
if (&child != current_) { if (&child != current_) {
@ -169,36 +134,42 @@ class MergingIterator : public Iterator {
comparator_->Compare(key(), child.key()) == 0) { comparator_->Compare(key(), child.key()) == 0) {
child.Next(); child.Next();
} }
}
if (child.Valid()) { if (child.Valid()) {
minHeap_.push(&child); minHeap_.push(&child);
} }
} }
}
direction_ = kForward; direction_ = kForward;
// The loop advanced all non-current children to be > key() so current_
// should still be strictly the smallest key.
assert(current_ == CurrentForward());
} }
// as the current points to the current record. move the iterator forward. // as the current points to the current record. move the iterator forward.
// and if it is valid add it to the heap.
current_->Next(); current_->Next();
if (use_heap_) {
if (current_->Valid()) { if (current_->Valid()) {
minHeap_.push(current_); // current is still valid after the Next() call above. Call
} // replace_top() to restore the heap property. When the same child
FindSmallest(); // iterator yields a sequence of keys, this is cheap.
} else if (!current_->Valid()) { minHeap_.replace_top(current_);
current_ = nullptr; } else {
// current stopped being valid, remove it from the heap.
minHeap_.pop();
} }
current_ = CurrentForward();
} }
virtual void Prev() override { virtual void Prev() override {
assert(Valid()); assert(Valid());
// Ensure that all children are positioned before key(). // Ensure that all children are positioned before key().
// If we are moving in the reverse direction, it is already // If we are moving in the reverse direction, it is already
// true for all of the non-current_ children since current_ is // true for all of the non-current children since current_ is
// the largest child and key() == current_->key(). Otherwise, // the largest child and key() == current_->key().
// we explicitly position the non-current_ children.
if (direction_ != kReverse) { if (direction_ != kReverse) {
// Otherwise, retreat the non-current children. We retreat current_
// just after the if-block.
ClearHeaps(); ClearHeaps();
InitMaxHeap();
for (auto& child : children_) { for (auto& child : children_) {
if (&child != current_) { if (&child != current_) {
child.Seek(key()); child.Seek(key());
@ -222,19 +193,28 @@ class MergingIterator : public Iterator {
continue; continue;
} }
} }
if (child.Valid()) {
maxHeap_.push(&child);
} }
if (child.Valid()) {
maxHeap_->push(&child);
} }
} }
direction_ = kReverse; direction_ = kReverse;
// The loop retreated all non-current children to be < key() so current_
// should still be strictly the largest key.
assert(current_ == CurrentReverse());
} }
current_->Prev(); current_->Prev();
if (current_->Valid()) { if (current_->Valid()) {
maxHeap_.push(current_); // current is still valid after the Prev() call above. Call
// replace_top() to restore the heap property. When the same child
// iterator yields a sequence of keys, this is cheap.
maxHeap_->replace_top(current_);
} else {
// current stopped being valid, remove it from the heap.
maxHeap_->pop();
} }
FindLargest(); current_ = CurrentReverse();
} }
virtual Slice key() const override { virtual Slice key() const override {
@ -259,56 +239,54 @@ class MergingIterator : public Iterator {
} }
private: private:
void FindSmallest(); // Clears heaps for both directions, used when changing direction or seeking
void FindLargest();
void ClearHeaps(); void ClearHeaps();
// Ensures that maxHeap_ is initialized when starting to go in the reverse
// direction
void InitMaxHeap();
bool is_arena_mode_; bool is_arena_mode_;
const Comparator* comparator_; const Comparator* comparator_;
autovector<IteratorWrapper, kNumIterReserve> children_; autovector<IteratorWrapper, kNumIterReserve> children_;
// Cached pointer to child iterator with the current key, or nullptr if no
// child iterators are valid. This is the top of minHeap_ or maxHeap_
// depending on the direction.
IteratorWrapper* current_; IteratorWrapper* current_;
// If the value is true, both of iterators in the heap and current_
// contain valid rows. If it is false, only current_ can possibly contain
// valid rows.
// This flag is always true for reverse direction, as we always use heap for
// the reverse iterating case.
bool use_heap_;
// Which direction is the iterator moving? // Which direction is the iterator moving?
enum Direction { enum Direction {
kForward, kForward,
kReverse kReverse
}; };
Direction direction_; Direction direction_;
MergerMaxIterHeap maxHeap_;
MergerMinIterHeap minHeap_; MergerMinIterHeap minHeap_;
}; // Max heap is used for reverse iteration, which is way less common than
// forward. Lazily initialize it to save memory.
std::unique_ptr<MergerMaxIterHeap> maxHeap_;
void MergingIterator::FindSmallest() { IteratorWrapper* CurrentForward() const {
assert(use_heap_); assert(direction_ == kForward);
if (minHeap_.empty()) { return !minHeap_.empty() ? minHeap_.top() : nullptr;
current_ = nullptr;
} else {
current_ = minHeap_.top();
assert(current_->Valid());
minHeap_.pop();
} }
}
void MergingIterator::FindLargest() { IteratorWrapper* CurrentReverse() const {
assert(use_heap_); assert(direction_ == kReverse);
if (maxHeap_.empty()) { assert(maxHeap_);
current_ = nullptr; return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
} else {
current_ = maxHeap_.top();
assert(current_->Valid());
maxHeap_.pop();
} }
} };
void MergingIterator::ClearHeaps() { void MergingIterator::ClearHeaps() {
use_heap_ = true; minHeap_.clear();
maxHeap_ = NewMergerMaxIterHeap(comparator_); if (maxHeap_) {
minHeap_ = NewMergerMinIterHeap(comparator_); maxHeap_->clear();
}
}
void MergingIterator::InitMaxHeap() {
if (!maxHeap_) {
maxHeap_.reset(new MergerMaxIterHeap(comparator_));
}
} }
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n,

@ -0,0 +1,140 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <algorithm>
#include <cstdint>
#include <functional>
#include "util/autovector.h"
namespace rocksdb {
// Binary heap implementation optimized for use in multi-way merge sort.
// Comparison to std::priority_queue:
// - In libstdc++, std::priority_queue::pop() usually performs just over logN
// comparisons but never fewer.
// - std::priority_queue does not have a replace-top operation, requiring a
// pop+push. If the replacement element is the new top, this requires
// around 2logN comparisons.
// - This heap's pop() uses a "schoolbook" downheap which requires up to ~2logN
// comparisons.
// - This heap provides a replace_top() operation which requires [1, 2logN]
// comparisons. When the replacement element is also the new top, this
// takes just 1 or 2 comparisons.
//
// The last property can yield an order-of-magnitude performance improvement
// when merge-sorting real-world non-random data. If the merge operation is
// likely to take chunks of elements from the same input stream, only 1
// comparison per element is needed. In RocksDB-land, this happens when
// compacting a database where keys are not randomly distributed across L0
// files but nearby keys are likely to be in the same L0 file.
//
// The container uses the same counterintuitive ordering as
// std::priority_queue: the comparison operator is expected to provide the
// less-than relation, but top() will return the maximum.
template<typename T, typename Compare = std::less<T>>
class BinaryHeap {
public:
BinaryHeap() { }
explicit BinaryHeap(Compare cmp) : cmp_(std::move(cmp)) { }
void push(const T& value) {
data_.push_back(value);
upheap(data_.size() - 1);
}
void push(T&& value) {
data_.push_back(std::move(value));
upheap(data_.size() - 1);
}
const T& top() const {
assert(!empty());
return data_.front();
}
void replace_top(const T& value) {
assert(!empty());
data_.front() = value;
downheap(get_root());
}
void replace_top(T&& value) {
assert(!empty());
data_.front() = std::move(value);
downheap(get_root());
}
void pop() {
assert(!empty());
data_.front() = std::move(data_.back());
data_.pop_back();
if (!empty()) {
downheap(get_root());
}
}
void swap(BinaryHeap &other) {
std::swap(cmp_, other.cmp_);
data_.swap(other.data_);
}
void clear() {
data_.clear();
}
bool empty() const {
return data_.empty();
}
private:
static inline size_t get_root() { return 0; }
static inline size_t get_parent(size_t index) { return (index - 1) / 2; }
static inline size_t get_left(size_t index) { return 2 * index + 1; }
static inline size_t get_right(size_t index) { return 2 * index + 2; }
void upheap(size_t index) {
T v = std::move(data_[index]);
while (index > get_root()) {
const size_t parent = get_parent(index);
if (!cmp_(data_[parent], v)) {
break;
}
data_[index] = std::move(data_[parent]);
index = parent;
}
data_[index] = std::move(v);
}
void downheap(size_t index) {
T v = std::move(data_[index]);
while (1) {
const size_t left_child = get_left(index);
if (get_left(index) >= data_.size()) {
break;
}
const size_t right_child = left_child + 1;
assert(right_child == get_right(index));
size_t picked_child = left_child;
if (right_child < data_.size() &&
cmp_(data_[left_child], data_[right_child])) {
picked_child = right_child;
}
if (!cmp_(v, data_[picked_child])) {
break;
}
data_[index] = std::move(data_[picked_child]);
index = picked_child;
}
data_[index] = std::move(v);
}
Compare cmp_;
autovector<T> data_;
};
} // namespace rocksdb
Loading…
Cancel
Save