From e1c99e10c133ce0135b8dd0fdc04d2746e7c1f42 Mon Sep 17 00:00:00 2001 From: lovro Date: Mon, 6 Jul 2015 04:24:09 -0700 Subject: [PATCH] Replace std::priority_queue in MergingIterator with custom heap, take 2 Summary: Repeat of b6655a679d11f42ce9a4915f54d7995f85b7556a (reverted in b7a2369fb2ac8bb762553d8492c401fb80826498) with a proper fix for the issue that 57d216ea6518c7f34eaea6538690bc52e6c605d1 was trying to fix. Test Plan: make check for i in $(seq 100); do ./db_stress --test_batches_snapshots=1 --threads=32 --write_buffer_size=4194304 --destroy_db_initially=0 --reopen=20 --readpercent=45 --prefixpercent=5 --writepercent=35 --delpercent=5 --iterpercent=10 --db=/tmp/rocksdb_crashtest_KdCI5F --max_key=100000000 --mmap_read=0 --block_size=16384 --cache_size=1048576 --open_files=500000 --verify_checksum=1 --sync=0 --progress_reports=0 --disable_wal=0 --disable_data_sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --max_write_buffer_number=3 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --filter_deletes=0 --memtablerep=prefix_hash --prefix_size=7 --ops_per_thread=200 || break; done Reviewers: anthony, sdong, igor, yhchiang Reviewed By: igor, yhchiang Subscribers: dhruba Differential Revision: https://reviews.facebook.net/D41391 --- Makefile | 6 +- table/iter_heap.h | 16 ++-- table/merger.cc | 196 +++++++++++++++++++++------------------------- util/heap.h | 140 +++++++++++++++++++++++++++++++++ util/heap_test.cc | 138 ++++++++++++++++++++++++++++++++ 5 files changed, 379 insertions(+), 117 deletions(-) create mode 100644 util/heap.h create mode 100644 util/heap_test.cc diff --git a/Makefile b/Makefile index d27d0bbb0..d533bd2f2 100644 --- a/Makefile +++ b/Makefile @@ -294,7 +294,8 @@ TESTS = \ perf_context_test \ optimistic_transaction_test \ write_callback_test \ - compaction_job_stats_test + compaction_job_stats_test \ + heap_test SUBSET := $(shell echo $(TESTS) |sed s/^.*$(ROCKSDBTESTS_START)/$(ROCKSDBTESTS_START)/) @@ -873,6 +874,9 @@ memtable_list_test: db/memtable_list_test.o $(LIBOBJECTS) $(TESTHARNESS) write_callback_test: db/write_callback_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +heap_test: util/heap_test.o $(GTEST) + $(AM_LINK) + sst_dump: tools/sst_dump.o $(LIBOBJECTS) $(AM_LINK) diff --git a/table/iter_heap.h b/table/iter_heap.h index 9569d3638..5343175c3 100644 --- a/table/iter_heap.h +++ b/table/iter_heap.h @@ -5,36 +5,34 @@ // #pragma once -#include #include "rocksdb/comparator.h" #include "table/iterator_wrapper.h" namespace rocksdb { -// Return the max of two keys. +// When used with std::priority_queue, this comparison functor puts the +// iterator with the max/largest key on top. class MaxIteratorComparator { public: MaxIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) { - return comparator_->Compare(a->key(), b->key()) <= 0; + bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { + return comparator_->Compare(a->key(), b->key()) < 0; } private: const Comparator* comparator_; }; -// Return the max of two keys. +// When used with std::priority_queue, this comparison functor puts the +// iterator with the min/smallest key on top. class MinIteratorComparator { public: - // if maxHeap is set comparator returns the max value. - // else returns the min Value. - // Can use to create a minHeap or a maxHeap. MinIteratorComparator(const Comparator* comparator) : comparator_(comparator) {} - bool operator()(IteratorWrapper* a, IteratorWrapper* b) { + bool operator()(IteratorWrapper* a, IteratorWrapper* b) const { return comparator_->Compare(a->key(), b->key()) > 0; } private: diff --git a/table/merger.cc b/table/merger.cc index 32220571c..30931caaa 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -9,7 +9,6 @@ #include "table/merger.h" -#include #include #include "rocksdb/comparator.h" @@ -18,6 +17,7 @@ #include "table/iter_heap.h" #include "table/iterator_wrapper.h" #include "util/arena.h" +#include "util/heap.h" #include "util/stop_watch.h" #include "util/perf_context_imp.h" #include "util/autovector.h" @@ -25,21 +25,8 @@ namespace rocksdb { // Without anonymous namespace here, we fail the warning -Wmissing-prototypes namespace { -typedef std::priority_queue, - MaxIteratorComparator> MergerMaxIterHeap; - -typedef std::priority_queue, - MinIteratorComparator> MergerMinIterHeap; - -// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator. -MergerMaxIterHeap NewMergerMaxIterHeap(const Comparator* comparator) { - return MergerMaxIterHeap(MaxIteratorComparator(comparator)); -} - -// Return's a new MinHeap of IteratorWrapper's using the provided Comparator. -MergerMinIterHeap NewMergerMinIterHeap(const Comparator* comparator) { - return MergerMinIterHeap(MinIteratorComparator(comparator)); -} +typedef BinaryHeap MergerMaxIterHeap; +typedef BinaryHeap MergerMinIterHeap; } // namespace const size_t kNumIterReserve = 4; @@ -51,10 +38,8 @@ class MergingIterator : public Iterator { : is_arena_mode_(is_arena_mode), comparator_(comparator), current_(nullptr), - use_heap_(true), direction_(kForward), - maxHeap_(NewMergerMaxIterHeap(comparator_)), - minHeap_(NewMergerMinIterHeap(comparator_)) { + minHeap_(comparator_) { children_.resize(n); for (int i = 0; i < n; i++) { children_[i].Set(children[i]); @@ -64,6 +49,7 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } + current_ = CurrentForward(); } virtual void AddIterator(Iterator* iter) { @@ -72,6 +58,7 @@ class MergingIterator : public Iterator { auto new_wrapper = children_.back(); if (new_wrapper.Valid()) { minHeap_.push(&new_wrapper); + current_ = CurrentForward(); } } @@ -91,27 +78,25 @@ class MergingIterator : public Iterator { minHeap_.push(&child); } } - FindSmallest(); direction_ = kForward; + current_ = CurrentForward(); } virtual void SeekToLast() override { ClearHeaps(); + InitMaxHeap(); for (auto& child : children_) { child.SeekToLast(); if (child.Valid()) { - maxHeap_.push(&child); + maxHeap_->push(&child); } } - FindLargest(); direction_ = kReverse; + current_ = CurrentReverse(); } virtual void Seek(const Slice& target) override { - // Invalidate the heap. - use_heap_ = false; - IteratorWrapper* first_child = nullptr; - + ClearHeaps(); for (auto& child : children_) { { PERF_TIMER_GUARD(seek_child_seek_time); @@ -120,36 +105,15 @@ class MergingIterator : public Iterator { PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { - // This child has valid key - if (!use_heap_) { - if (first_child == nullptr) { - // It's the first child has valid key. Only put it int - // current_. Now the values in the heap should be invalid. - first_child = &child; - } else { - // We have more than one children with valid keys. Initialize - // the heap and put the first child into the heap. - PERF_TIMER_GUARD(seek_min_heap_time); - ClearHeaps(); - minHeap_.push(first_child); - } - } - if (use_heap_) { - PERF_TIMER_GUARD(seek_min_heap_time); - minHeap_.push(&child); - } + PERF_TIMER_GUARD(seek_min_heap_time); + minHeap_.push(&child); } } - if (use_heap_) { - // If heap is valid, need to put the smallest key to curent_. + direction_ = kForward; + { PERF_TIMER_GUARD(seek_min_heap_time); - FindSmallest(); - } else { - // The heap is not valid, then the current_ iterator is the first - // one, or null if there is no first child. - current_ = first_child; + current_ = CurrentForward(); } - direction_ = kForward; } virtual void Next() override { @@ -157,10 +121,11 @@ class MergingIterator : public Iterator { // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already - // true for all of the non-current_ children since current_ is - // the smallest child and key() == current_->key(). Otherwise, - // we explicitly position the non-current_ children. + // true for all of the non-current children since current_ is + // the smallest child and key() == current_->key(). if (direction_ != kForward) { + // Otherwise, advance the non-current children. We advance current_ + // just after the if-block. ClearHeaps(); for (auto& child : children_) { if (&child != current_) { @@ -169,36 +134,46 @@ class MergingIterator : public Iterator { comparator_->Compare(key(), child.key()) == 0) { child.Next(); } - if (child.Valid()) { - minHeap_.push(&child); - } + } + if (child.Valid()) { + minHeap_.push(&child); } } direction_ = kForward; + // The loop advanced all non-current children to be > key() so current_ + // should still be strictly the smallest key. + assert(current_ == CurrentForward()); } + // For the heap modifications below to be correct, current_ must be the + // current top of the heap. + assert(current_ == CurrentForward()); + // as the current points to the current record. move the iterator forward. - // and if it is valid add it to the heap. current_->Next(); - if (use_heap_) { - if (current_->Valid()) { - minHeap_.push(current_); - } - FindSmallest(); - } else if (!current_->Valid()) { - current_ = nullptr; + if (current_->Valid()) { + // current is still valid after the Next() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + minHeap_.replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + minHeap_.pop(); } + current_ = CurrentForward(); } virtual void Prev() override { assert(Valid()); // Ensure that all children are positioned before key(). // If we are moving in the reverse direction, it is already - // true for all of the non-current_ children since current_ is - // the largest child and key() == current_->key(). Otherwise, - // we explicitly position the non-current_ children. + // true for all of the non-current children since current_ is + // the largest child and key() == current_->key(). if (direction_ != kReverse) { + // Otherwise, retreat the non-current children. We retreat current_ + // just after the if-block. ClearHeaps(); + InitMaxHeap(); for (auto& child : children_) { if (&child != current_) { child.Seek(key()); @@ -209,9 +184,9 @@ class MergingIterator : public Iterator { // Child has no entries >= key(). Position at last entry. child.SeekToLast(); } - if (child.Valid()) { - maxHeap_.push(&child); - } + } + if (child.Valid()) { + maxHeap_->push(&child); } } direction_ = kReverse; @@ -219,15 +194,24 @@ class MergingIterator : public Iterator { // because it is possible to have some keys larger than the seek-key // inserted between Seek() and SeekToLast(), which makes current_ not // equal to CurrentReverse(). - // - // assert(current_ == CurrentReverse()); + current_ = CurrentReverse(); } + // For the heap modifications below to be correct, current_ must be the + // current top of the heap. + assert(current_ == CurrentReverse()); + current_->Prev(); if (current_->Valid()) { - maxHeap_.push(current_); + // current is still valid after the Prev() call above. Call + // replace_top() to restore the heap property. When the same child + // iterator yields a sequence of keys, this is cheap. + maxHeap_->replace_top(current_); + } else { + // current stopped being valid, remove it from the heap. + maxHeap_->pop(); } - FindLargest(); + current_ = CurrentReverse(); } virtual Slice key() const override { @@ -252,56 +236,54 @@ class MergingIterator : public Iterator { } private: - void FindSmallest(); - void FindLargest(); + // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(); + // Ensures that maxHeap_ is initialized when starting to go in the reverse + // direction + void InitMaxHeap(); bool is_arena_mode_; const Comparator* comparator_; autovector children_; + + // Cached pointer to child iterator with the current key, or nullptr if no + // child iterators are valid. This is the top of minHeap_ or maxHeap_ + // depending on the direction. IteratorWrapper* current_; - // If the value is true, both of iterators in the heap and current_ - // contain valid rows. If it is false, only current_ can possibly contain - // valid rows. - // This flag is always true for reverse direction, as we always use heap for - // the reverse iterating case. - bool use_heap_; // Which direction is the iterator moving? enum Direction { kForward, kReverse }; Direction direction_; - MergerMaxIterHeap maxHeap_; MergerMinIterHeap minHeap_; -}; + // Max heap is used for reverse iteration, which is way less common than + // forward. Lazily initialize it to save memory. + std::unique_ptr maxHeap_; -void MergingIterator::FindSmallest() { - assert(use_heap_); - if (minHeap_.empty()) { - current_ = nullptr; - } else { - current_ = minHeap_.top(); - assert(current_->Valid()); - minHeap_.pop(); + IteratorWrapper* CurrentForward() const { + assert(direction_ == kForward); + return !minHeap_.empty() ? minHeap_.top() : nullptr; } -} -void MergingIterator::FindLargest() { - assert(use_heap_); - if (maxHeap_.empty()) { - current_ = nullptr; - } else { - current_ = maxHeap_.top(); - assert(current_->Valid()); - maxHeap_.pop(); + IteratorWrapper* CurrentReverse() const { + assert(direction_ == kReverse); + assert(maxHeap_); + return !maxHeap_->empty() ? maxHeap_->top() : nullptr; } -} +}; void MergingIterator::ClearHeaps() { - use_heap_ = true; - maxHeap_ = NewMergerMaxIterHeap(comparator_); - minHeap_ = NewMergerMinIterHeap(comparator_); + minHeap_.clear(); + if (maxHeap_) { + maxHeap_->clear(); + } +} + +void MergingIterator::InitMaxHeap() { + if (!maxHeap_) { + maxHeap_.reset(new MergerMaxIterHeap(comparator_)); + } } Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n, diff --git a/util/heap.h b/util/heap.h new file mode 100644 index 000000000..7d9e11113 --- /dev/null +++ b/util/heap.h @@ -0,0 +1,140 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include +#include "util/autovector.h" + +namespace rocksdb { + +// Binary heap implementation optimized for use in multi-way merge sort. +// Comparison to std::priority_queue: +// - In libstdc++, std::priority_queue::pop() usually performs just over logN +// comparisons but never fewer. +// - std::priority_queue does not have a replace-top operation, requiring a +// pop+push. If the replacement element is the new top, this requires +// around 2logN comparisons. +// - This heap's pop() uses a "schoolbook" downheap which requires up to ~2logN +// comparisons. +// - This heap provides a replace_top() operation which requires [1, 2logN] +// comparisons. When the replacement element is also the new top, this +// takes just 1 or 2 comparisons. +// +// The last property can yield an order-of-magnitude performance improvement +// when merge-sorting real-world non-random data. If the merge operation is +// likely to take chunks of elements from the same input stream, only 1 +// comparison per element is needed. In RocksDB-land, this happens when +// compacting a database where keys are not randomly distributed across L0 +// files but nearby keys are likely to be in the same L0 file. +// +// The container uses the same counterintuitive ordering as +// std::priority_queue: the comparison operator is expected to provide the +// less-than relation, but top() will return the maximum. + +template> +class BinaryHeap { + public: + BinaryHeap() { } + explicit BinaryHeap(Compare cmp) : cmp_(std::move(cmp)) { } + + void push(const T& value) { + data_.push_back(value); + upheap(data_.size() - 1); + } + + void push(T&& value) { + data_.push_back(std::move(value)); + upheap(data_.size() - 1); + } + + const T& top() const { + assert(!empty()); + return data_.front(); + } + + void replace_top(const T& value) { + assert(!empty()); + data_.front() = value; + downheap(get_root()); + } + + void replace_top(T&& value) { + assert(!empty()); + data_.front() = std::move(value); + downheap(get_root()); + } + + void pop() { + assert(!empty()); + data_.front() = std::move(data_.back()); + data_.pop_back(); + if (!empty()) { + downheap(get_root()); + } + } + + void swap(BinaryHeap &other) { + std::swap(cmp_, other.cmp_); + data_.swap(other.data_); + } + + void clear() { + data_.clear(); + } + + bool empty() const { + return data_.empty(); + } + + private: + static inline size_t get_root() { return 0; } + static inline size_t get_parent(size_t index) { return (index - 1) / 2; } + static inline size_t get_left(size_t index) { return 2 * index + 1; } + static inline size_t get_right(size_t index) { return 2 * index + 2; } + + void upheap(size_t index) { + T v = std::move(data_[index]); + while (index > get_root()) { + const size_t parent = get_parent(index); + if (!cmp_(data_[parent], v)) { + break; + } + data_[index] = std::move(data_[parent]); + index = parent; + } + data_[index] = std::move(v); + } + + void downheap(size_t index) { + T v = std::move(data_[index]); + while (1) { + const size_t left_child = get_left(index); + if (get_left(index) >= data_.size()) { + break; + } + const size_t right_child = left_child + 1; + assert(right_child == get_right(index)); + size_t picked_child = left_child; + if (right_child < data_.size() && + cmp_(data_[left_child], data_[right_child])) { + picked_child = right_child; + } + if (!cmp_(v, data_[picked_child])) { + break; + } + data_[index] = std::move(data_[picked_child]); + index = picked_child; + } + data_[index] = std::move(v); + } + + Compare cmp_; + autovector data_; +}; + +} // namespace rocksdb diff --git a/util/heap_test.cc b/util/heap_test.cc new file mode 100644 index 000000000..e810a7186 --- /dev/null +++ b/util/heap_test.cc @@ -0,0 +1,138 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include + +#include + +#include +#include + +#include "util/heap.h" + +#ifndef GFLAGS +const int64_t FLAGS_iters = 100000; +#else +#include +DEFINE_int64(iters, 100000, "number of pseudo-random operations in each test"); +#endif // GFLAGS + +/* + * Compares the custom heap implementation in util/heap.h against + * std::priority_queue on a pseudo-random sequence of operations. + */ + +namespace rocksdb { + +using HeapTestValue = uint64_t; +using Params = std::tuple; + +class HeapTest : public ::testing::TestWithParam { +}; + +TEST_P(HeapTest, Test) { + // This test performs the same pseudorandom sequence of operations on a + // BinaryHeap and an std::priority_queue, comparing output. The three + // possible operations are insert, replace top and pop. + // + // Insert is chosen slightly more often than the others so that the size of + // the heap slowly grows. Once the size heats the MAX_HEAP_SIZE limit, we + // disallow inserting until the heap becomes empty, testing the "draining" + // scenario. + + const auto MAX_HEAP_SIZE = std::get<0>(GetParam()); + const auto MAX_VALUE = std::get<1>(GetParam()); + const auto RNG_SEED = std::get<2>(GetParam()); + + BinaryHeap heap; + std::priority_queue ref; + + std::mt19937 rng(RNG_SEED); + std::uniform_int_distribution value_dist(0, MAX_VALUE); + int ndrains = 0; + bool draining = false; // hit max size, draining until we empty the heap + size_t size = 0; + for (int64_t i = 0; i < FLAGS_iters; ++i) { + if (size == 0) { + draining = false; + } + + if (!draining && + (size == 0 || std::bernoulli_distribution(0.4)(rng))) { + // insert + HeapTestValue val = value_dist(rng); + heap.push(val); + ref.push(val); + ++size; + if (size == MAX_HEAP_SIZE) { + draining = true; + ++ndrains; + } + } else if (std::bernoulli_distribution(0.5)(rng)) { + // replace top + HeapTestValue val = value_dist(rng); + heap.replace_top(val); + ref.pop(); + ref.push(val); + } else { + // pop + assert(size > 0); + heap.pop(); + ref.pop(); + --size; + } + + // After every operation, check that the public methods give the same + // results + assert((size == 0) == ref.empty()); + ASSERT_EQ(size == 0, heap.empty()); + if (size > 0) { + ASSERT_EQ(ref.top(), heap.top()); + } + } + + // Probabilities should be set up to occasionally hit the max heap size and + // drain it + assert(ndrains > 0); + + heap.clear(); + ASSERT_TRUE(heap.empty()); +} + +// Basic test, MAX_VALUE = 3*MAX_HEAP_SIZE (occasional duplicates) +INSTANTIATE_TEST_CASE_P( + Basic, HeapTest, + ::testing::Values(Params(1000, 3000, 0x1b575cf05b708945)) +); +// Mid-size heap with small values (many duplicates) +INSTANTIATE_TEST_CASE_P( + SmallValues, HeapTest, + ::testing::Values(Params(100, 10, 0x5ae213f7bd5dccd0)) +); +// Small heap, large value range (no duplicates) +INSTANTIATE_TEST_CASE_P( + SmallHeap, HeapTest, + ::testing::Values(Params(10, ULLONG_MAX, 0x3e1fa8f4d01707cf)) +); +// Two-element heap +INSTANTIATE_TEST_CASE_P( + TwoElementHeap, HeapTest, + ::testing::Values(Params(2, 5, 0x4b5e13ea988c6abc)) +); +// One-element heap +INSTANTIATE_TEST_CASE_P( + OneElementHeap, HeapTest, + ::testing::Values(Params(1, 3, 0x176a1019ab0b612e)) +); + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); +#ifdef GFLAGS + GFLAGS::ParseCommandLineFlags(&argc, &argv, true); +#endif // GFLAGS + return RUN_ALL_TESTS(); +}