Revert "Replace std::priority_queue in MergingIterator with custom heap"

Summary:
This patch reverts "Replace std::priority_queue in MergingIterator
with custom heap" (commit commit b6655a679d)
as it causes db_stress failure.

Test Plan: ./db_stress --test_batches_snapshots=1 --threads=32 --write_buffer_size=4194304 --destroy_db_initially=0 --reopen=20 --readpercent=45 --prefixpercent=5 --writepercent=35 --delpercent=5 --iterpercent=10 --db=/tmp/rocksdb_crashtest_KdCI5F --max_key=100000000 --mmap_read=0 --block_size=16384 --cache_size=1048576 --open_files=500000 --verify_checksum=1 --sync=0 --progress_reports=0 --disable_wal=0 --disable_data_sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --max_write_buffer_number=3 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=0 --memtablerep=prefix_hash --prefix_size=7 --ops_per_thread=200 --kill_random_test=97

Reviewers: igor, anthony, lovro, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D41343
main
Yueh-Hsuan Chiang 10 years ago
parent c0b23dd5b0
commit b7a2369fb2
  1. 16
      table/iter_heap.h
  2. 185
      table/merger.cc
  3. 140
      util/heap.h

@ -5,34 +5,36 @@
//
#pragma once
#include <queue>
#include "rocksdb/comparator.h"
#include "table/iterator_wrapper.h"
namespace rocksdb {
// When used with std::priority_queue, this comparison functor puts the
// iterator with the max/largest key on top.
// Return the max of two keys.
class MaxIteratorComparator {
public:
MaxIteratorComparator(const Comparator* comparator) :
comparator_(comparator) {}
bool operator()(IteratorWrapper* a, IteratorWrapper* b) const {
return comparator_->Compare(a->key(), b->key()) < 0;
bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
return comparator_->Compare(a->key(), b->key()) <= 0;
}
private:
const Comparator* comparator_;
};
// When used with std::priority_queue, this comparison functor puts the
// iterator with the min/smallest key on top.
// Return the max of two keys.
class MinIteratorComparator {
public:
// if maxHeap is set comparator returns the max value.
// else returns the min Value.
// Can use to create a minHeap or a maxHeap.
MinIteratorComparator(const Comparator* comparator) :
comparator_(comparator) {}
bool operator()(IteratorWrapper* a, IteratorWrapper* b) const {
bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
return comparator_->Compare(a->key(), b->key()) > 0;
}
private:

@ -9,6 +9,7 @@
#include "table/merger.h"
#include <queue>
#include <vector>
#include "rocksdb/comparator.h"
@ -17,7 +18,6 @@
#include "table/iter_heap.h"
#include "table/iterator_wrapper.h"
#include "util/arena.h"
#include "util/heap.h"
#include "util/stop_watch.h"
#include "util/perf_context_imp.h"
#include "util/autovector.h"
@ -25,8 +25,21 @@
namespace rocksdb {
// Without anonymous namespace here, we fail the warning -Wmissing-prototypes
namespace {
typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap;
typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap;
typedef std::priority_queue<IteratorWrapper*, std::vector<IteratorWrapper*>,
MaxIteratorComparator> MergerMaxIterHeap;
typedef std::priority_queue<IteratorWrapper*, std::vector<IteratorWrapper*>,
MinIteratorComparator> MergerMinIterHeap;
// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator.
MergerMaxIterHeap NewMergerMaxIterHeap(const Comparator* comparator) {
return MergerMaxIterHeap(MaxIteratorComparator(comparator));
}
// Return's a new MinHeap of IteratorWrapper's using the provided Comparator.
MergerMinIterHeap NewMergerMinIterHeap(const Comparator* comparator) {
return MergerMinIterHeap(MinIteratorComparator(comparator));
}
} // namespace
const size_t kNumIterReserve = 4;
@ -38,8 +51,10 @@ class MergingIterator : public Iterator {
: is_arena_mode_(is_arena_mode),
comparator_(comparator),
current_(nullptr),
use_heap_(true),
direction_(kForward),
minHeap_(comparator_) {
maxHeap_(NewMergerMaxIterHeap(comparator_)),
minHeap_(NewMergerMinIterHeap(comparator_)) {
children_.resize(n);
for (int i = 0; i < n; i++) {
children_[i].Set(children[i]);
@ -49,7 +64,6 @@ class MergingIterator : public Iterator {
minHeap_.push(&child);
}
}
current_ = CurrentForward();
}
virtual void AddIterator(Iterator* iter) {
@ -58,7 +72,6 @@ class MergingIterator : public Iterator {
auto new_wrapper = children_.back();
if (new_wrapper.Valid()) {
minHeap_.push(&new_wrapper);
current_ = CurrentForward();
}
}
@ -78,25 +91,27 @@ class MergingIterator : public Iterator {
minHeap_.push(&child);
}
}
FindSmallest();
direction_ = kForward;
current_ = CurrentForward();
}
virtual void SeekToLast() override {
ClearHeaps();
InitMaxHeap();
for (auto& child : children_) {
child.SeekToLast();
if (child.Valid()) {
maxHeap_->push(&child);
maxHeap_.push(&child);
}
}
FindLargest();
direction_ = kReverse;
current_ = CurrentReverse();
}
virtual void Seek(const Slice& target) override {
ClearHeaps();
// Invalidate the heap.
use_heap_ = false;
IteratorWrapper* first_child = nullptr;
for (auto& child : children_) {
{
PERF_TIMER_GUARD(seek_child_seek_time);
@ -105,15 +120,36 @@ class MergingIterator : public Iterator {
PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.Valid()) {
PERF_TIMER_GUARD(seek_min_heap_time);
minHeap_.push(&child);
// This child has valid key
if (!use_heap_) {
if (first_child == nullptr) {
// It's the first child has valid key. Only put it int
// current_. Now the values in the heap should be invalid.
first_child = &child;
} else {
// We have more than one children with valid keys. Initialize
// the heap and put the first child into the heap.
PERF_TIMER_GUARD(seek_min_heap_time);
ClearHeaps();
minHeap_.push(first_child);
}
}
if (use_heap_) {
PERF_TIMER_GUARD(seek_min_heap_time);
minHeap_.push(&child);
}
}
}
direction_ = kForward;
{
if (use_heap_) {
// If heap is valid, need to put the smallest key to curent_.
PERF_TIMER_GUARD(seek_min_heap_time);
current_ = CurrentForward();
FindSmallest();
} else {
// The heap is not valid, then the current_ iterator is the first
// one, or null if there is no first child.
current_ = first_child;
}
direction_ = kForward;
}
virtual void Next() override {
@ -121,11 +157,10 @@ class MergingIterator : public Iterator {
// Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already
// true for all of the non-current children since current_ is
// the smallest child and key() == current_->key().
// true for all of the non-current_ children since current_ is
// the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kForward) {
// Otherwise, advance the non-current children. We advance current_
// just after the if-block.
ClearHeaps();
for (auto& child : children_) {
if (&child != current_) {
@ -134,42 +169,36 @@ class MergingIterator : public Iterator {
comparator_->Compare(key(), child.key()) == 0) {
child.Next();
}
}
if (child.Valid()) {
minHeap_.push(&child);
if (child.Valid()) {
minHeap_.push(&child);
}
}
}
direction_ = kForward;
// The loop advanced all non-current children to be > key() so current_
// should still be strictly the smallest key.
assert(current_ == CurrentForward());
}
// as the current points to the current record. move the iterator forward.
// and if it is valid add it to the heap.
current_->Next();
if (current_->Valid()) {
// current is still valid after the Next() call above. Call
// replace_top() to restore the heap property. When the same child
// iterator yields a sequence of keys, this is cheap.
minHeap_.replace_top(current_);
} else {
// current stopped being valid, remove it from the heap.
minHeap_.pop();
if (use_heap_) {
if (current_->Valid()) {
minHeap_.push(current_);
}
FindSmallest();
} else if (!current_->Valid()) {
current_ = nullptr;
}
current_ = CurrentForward();
}
virtual void Prev() override {
assert(Valid());
// Ensure that all children are positioned before key().
// If we are moving in the reverse direction, it is already
// true for all of the non-current children since current_ is
// the largest child and key() == current_->key().
// true for all of the non-current_ children since current_ is
// the largest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children.
if (direction_ != kReverse) {
// Otherwise, retreat the non-current children. We retreat current_
// just after the if-block.
ClearHeaps();
InitMaxHeap();
for (auto& child : children_) {
if (&child != current_) {
child.Seek(key());
@ -180,9 +209,9 @@ class MergingIterator : public Iterator {
// Child has no entries >= key(). Position at last entry.
child.SeekToLast();
}
}
if (child.Valid()) {
maxHeap_->push(&child);
if (child.Valid()) {
maxHeap_.push(&child);
}
}
}
direction_ = kReverse;
@ -196,15 +225,9 @@ class MergingIterator : public Iterator {
current_->Prev();
if (current_->Valid()) {
// current is still valid after the Prev() call above. Call
// replace_top() to restore the heap property. When the same child
// iterator yields a sequence of keys, this is cheap.
maxHeap_->replace_top(current_);
} else {
// current stopped being valid, remove it from the heap.
maxHeap_->pop();
maxHeap_.push(current_);
}
current_ = CurrentReverse();
FindLargest();
}
virtual Slice key() const override {
@ -229,56 +252,58 @@ class MergingIterator : public Iterator {
}
private:
// Clears heaps for both directions, used when changing direction or seeking
void FindSmallest();
void FindLargest();
void ClearHeaps();
// Ensures that maxHeap_ is initialized when starting to go in the reverse
// direction
void InitMaxHeap();
bool is_arena_mode_;
const Comparator* comparator_;
autovector<IteratorWrapper, kNumIterReserve> children_;
// Cached pointer to child iterator with the current key, or nullptr if no
// child iterators are valid. This is the top of minHeap_ or maxHeap_
// depending on the direction.
IteratorWrapper* current_;
// If the value is true, both of iterators in the heap and current_
// contain valid rows. If it is false, only current_ can possibly contain
// valid rows.
// This flag is always true for reverse direction, as we always use heap for
// the reverse iterating case.
bool use_heap_;
// Which direction is the iterator moving?
enum Direction {
kForward,
kReverse
};
Direction direction_;
MergerMaxIterHeap maxHeap_;
MergerMinIterHeap minHeap_;
// Max heap is used for reverse iteration, which is way less common than
// forward. Lazily initialize it to save memory.
std::unique_ptr<MergerMaxIterHeap> maxHeap_;
IteratorWrapper* CurrentForward() const {
assert(direction_ == kForward);
return !minHeap_.empty() ? minHeap_.top() : nullptr;
}
IteratorWrapper* CurrentReverse() const {
assert(direction_ == kReverse);
assert(maxHeap_);
return !maxHeap_->empty() ? maxHeap_->top() : nullptr;
}
};
void MergingIterator::ClearHeaps() {
minHeap_.clear();
if (maxHeap_) {
maxHeap_->clear();
void MergingIterator::FindSmallest() {
assert(use_heap_);
if (minHeap_.empty()) {
current_ = nullptr;
} else {
current_ = minHeap_.top();
assert(current_->Valid());
minHeap_.pop();
}
}
void MergingIterator::InitMaxHeap() {
if (!maxHeap_) {
maxHeap_.reset(new MergerMaxIterHeap(comparator_));
void MergingIterator::FindLargest() {
assert(use_heap_);
if (maxHeap_.empty()) {
current_ = nullptr;
} else {
current_ = maxHeap_.top();
assert(current_->Valid());
maxHeap_.pop();
}
}
void MergingIterator::ClearHeaps() {
use_heap_ = true;
maxHeap_ = NewMergerMaxIterHeap(comparator_);
minHeap_ = NewMergerMinIterHeap(comparator_);
}
Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n,
Arena* arena) {
assert(n >= 0);

@ -1,140 +0,0 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <algorithm>
#include <cstdint>
#include <functional>
#include "util/autovector.h"
namespace rocksdb {
// Binary heap implementation optimized for use in multi-way merge sort.
// Comparison to std::priority_queue:
// - In libstdc++, std::priority_queue::pop() usually performs just over logN
// comparisons but never fewer.
// - std::priority_queue does not have a replace-top operation, requiring a
// pop+push. If the replacement element is the new top, this requires
// around 2logN comparisons.
// - This heap's pop() uses a "schoolbook" downheap which requires up to ~2logN
// comparisons.
// - This heap provides a replace_top() operation which requires [1, 2logN]
// comparisons. When the replacement element is also the new top, this
// takes just 1 or 2 comparisons.
//
// The last property can yield an order-of-magnitude performance improvement
// when merge-sorting real-world non-random data. If the merge operation is
// likely to take chunks of elements from the same input stream, only 1
// comparison per element is needed. In RocksDB-land, this happens when
// compacting a database where keys are not randomly distributed across L0
// files but nearby keys are likely to be in the same L0 file.
//
// The container uses the same counterintuitive ordering as
// std::priority_queue: the comparison operator is expected to provide the
// less-than relation, but top() will return the maximum.
template<typename T, typename Compare = std::less<T>>
class BinaryHeap {
public:
BinaryHeap() { }
explicit BinaryHeap(Compare cmp) : cmp_(std::move(cmp)) { }
void push(const T& value) {
data_.push_back(value);
upheap(data_.size() - 1);
}
void push(T&& value) {
data_.push_back(std::move(value));
upheap(data_.size() - 1);
}
const T& top() const {
assert(!empty());
return data_.front();
}
void replace_top(const T& value) {
assert(!empty());
data_.front() = value;
downheap(get_root());
}
void replace_top(T&& value) {
assert(!empty());
data_.front() = std::move(value);
downheap(get_root());
}
void pop() {
assert(!empty());
data_.front() = std::move(data_.back());
data_.pop_back();
if (!empty()) {
downheap(get_root());
}
}
void swap(BinaryHeap &other) {
std::swap(cmp_, other.cmp_);
data_.swap(other.data_);
}
void clear() {
data_.clear();
}
bool empty() const {
return data_.empty();
}
private:
static inline size_t get_root() { return 0; }
static inline size_t get_parent(size_t index) { return (index - 1) / 2; }
static inline size_t get_left(size_t index) { return 2 * index + 1; }
static inline size_t get_right(size_t index) { return 2 * index + 2; }
void upheap(size_t index) {
T v = std::move(data_[index]);
while (index > get_root()) {
const size_t parent = get_parent(index);
if (!cmp_(data_[parent], v)) {
break;
}
data_[index] = std::move(data_[parent]);
index = parent;
}
data_[index] = std::move(v);
}
void downheap(size_t index) {
T v = std::move(data_[index]);
while (1) {
const size_t left_child = get_left(index);
if (get_left(index) >= data_.size()) {
break;
}
const size_t right_child = left_child + 1;
assert(right_child == get_right(index));
size_t picked_child = left_child;
if (right_child < data_.size() &&
cmp_(data_[left_child], data_[right_child])) {
picked_child = right_child;
}
if (!cmp_(v, data_[picked_child])) {
break;
}
data_[index] = std::move(data_[picked_child]);
index = picked_child;
}
data_[index] = std::move(v);
}
Compare cmp_;
autovector<T> data_;
};
} // namespace rocksdb
Loading…
Cancel
Save