diff --git a/table/iter_heap.h b/table/iter_heap.h new file mode 100644 index 000000000..52518ba90 --- /dev/null +++ b/table/iter_heap.h @@ -0,0 +1,63 @@ +// Copyright 2008-present Facebook. All Rights Reserved. +#ifndef STORAGE_LEVELDB_ITER_HEAP_H_ +#define STORAGE_LEVELDB_ITER_HEAP_H_ + +#include + +#include "leveldb/comparator.h" +#include "table/iterator_wrapper.h" + +namespace leveldb { + +// Return the max of two keys. +class MaxIteratorComparator { + public: + MaxIteratorComparator(const Comparator* comparator) : + comparator_(comparator) {} + + bool operator()(IteratorWrapper* a, IteratorWrapper* b) { + return comparator_->Compare(a->key(), b->key()) <= 0; + } + private: + const Comparator* comparator_; +}; + +// Return the max of two keys. +class MinIteratorComparator { + public: + // if maxHeap is set comparator returns the max value. + // else returns the min Value. + // Can use to create a minHeap or a maxHeap. + MinIteratorComparator(const Comparator* comparator) : + comparator_(comparator) {} + + bool operator()(IteratorWrapper* a, IteratorWrapper* b) { + return comparator_->Compare(a->key(), b->key()) > 0; + } + private: + const Comparator* comparator_; +}; + +typedef std::priority_queue< + IteratorWrapper*, + std::vector, + MaxIteratorComparator> MaxIterHeap; + +typedef std::priority_queue< + IteratorWrapper*, + std::vector, + MinIteratorComparator> MinIterHeap; + +// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator. +MaxIterHeap NewMaxIterHeap(const Comparator* comparator) { + return MaxIterHeap(MaxIteratorComparator(comparator)); +} + +// Return's a new MinHeap of IteratorWrapper's using the provided Comparator. +MinIterHeap NewMinIterHeap(const Comparator* comparator) { + return MinIterHeap(MinIteratorComparator(comparator)); +} + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_ITER_HEAP_H_ diff --git a/table/merger.cc b/table/merger.cc index 2dde4dc21..5f7b24bd8 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -6,11 +6,13 @@ #include "leveldb/comparator.h" #include "leveldb/iterator.h" +#include "table/iter_heap.h" #include "table/iterator_wrapper.h" namespace leveldb { namespace { + class MergingIterator : public Iterator { public: MergingIterator(const Comparator* comparator, Iterator** children, int n) @@ -18,10 +20,17 @@ class MergingIterator : public Iterator { children_(new IteratorWrapper[n]), n_(n), current_(NULL), - direction_(kForward) { + direction_(kForward), + maxHeap_(NewMaxIterHeap(comparator_)), + minHeap_ (NewMinIterHeap(comparator_)) { for (int i = 0; i < n; i++) { children_[i].Set(children[i]); } + for (int i = 0; i < n; ++i) { + if (children_[i].Valid()) { + minHeap_.push(&children_[i]); + } + } } virtual ~MergingIterator() { @@ -33,24 +42,36 @@ class MergingIterator : public Iterator { } virtual void SeekToFirst() { + ClearHeaps(); for (int i = 0; i < n_; i++) { children_[i].SeekToFirst(); + if (children_[i].Valid()) { + minHeap_.push(&children_[i]); + } } FindSmallest(); direction_ = kForward; } virtual void SeekToLast() { + ClearHeaps(); for (int i = 0; i < n_; i++) { children_[i].SeekToLast(); + if (children_[i].Valid()) { + maxHeap_.push(&children_[i]); + } } FindLargest(); direction_ = kReverse; } virtual void Seek(const Slice& target) { + ClearHeaps(); for (int i = 0; i < n_; i++) { children_[i].Seek(target); + if (children_[i].Valid()) { + minHeap_.push(&children_[i]); + } } FindSmallest(); direction_ = kForward; @@ -65,6 +86,7 @@ class MergingIterator : public Iterator { // the smallest child and key() == current_->key(). Otherwise, // we explicitly position the non-current_ children. if (direction_ != kForward) { + ClearHeaps(); for (int i = 0; i < n_; i++) { IteratorWrapper* child = &children_[i]; if (child != current_) { @@ -73,24 +95,32 @@ class MergingIterator : public Iterator { comparator_->Compare(key(), child->key()) == 0) { child->Next(); } + if (child->Valid()) { + minHeap_.push(child); + } } } direction_ = kForward; } + // as the current points to the current record. move the iterator forward. + // and if it is valid add it to the heap. current_->Next(); + if (current_->Valid()){ + minHeap_.push(current_); + } FindSmallest(); } virtual void Prev() { assert(Valid()); - // Ensure that all children are positioned before key(). // If we are moving in the reverse direction, it is already // true for all of the non-current_ children since current_ is // the largest child and key() == current_->key(). Otherwise, // we explicitly position the non-current_ children. if (direction_ != kReverse) { + ClearHeaps(); for (int i = 0; i < n_; i++) { IteratorWrapper* child = &children_[i]; if (child != current_) { @@ -102,12 +132,18 @@ class MergingIterator : public Iterator { // Child has no entries >= key(). Position at last entry. child->SeekToLast(); } + if (child->Valid()) { + maxHeap_.push(child); + } } } direction_ = kReverse; } current_->Prev(); + if (current_->Valid()) { + maxHeap_.push(current_); + } FindLargest(); } @@ -135,51 +171,47 @@ class MergingIterator : public Iterator { private: void FindSmallest(); void FindLargest(); + void ClearHeaps(); - // We might want to use a heap in case there are lots of children. - // For now we use a simple array since we expect a very small number - // of children in leveldb. const Comparator* comparator_; IteratorWrapper* children_; int n_; IteratorWrapper* current_; - // Which direction is the iterator moving? enum Direction { kForward, kReverse }; Direction direction_; + MaxIterHeap maxHeap_; + MinIterHeap minHeap_; }; void MergingIterator::FindSmallest() { - IteratorWrapper* smallest = NULL; - for (int i = 0; i < n_; i++) { - IteratorWrapper* child = &children_[i]; - if (child->Valid()) { - if (smallest == NULL) { - smallest = child; - } else if (comparator_->Compare(child->key(), smallest->key()) < 0) { - smallest = child; - } - } + assert (direction_ == kForward); + if (minHeap_.empty()) { + current_ = NULL; + } else { + current_ = minHeap_.top(); + assert(current_->Valid()); + minHeap_.pop(); } - current_ = smallest; } void MergingIterator::FindLargest() { - IteratorWrapper* largest = NULL; - for (int i = n_-1; i >= 0; i--) { - IteratorWrapper* child = &children_[i]; - if (child->Valid()) { - if (largest == NULL) { - largest = child; - } else if (comparator_->Compare(child->key(), largest->key()) > 0) { - largest = child; - } - } + assert(direction_ == kReverse); + if (maxHeap_.empty()) { + current_ = NULL; + } else { + current_ = maxHeap_.top(); + assert(current_->Valid()); + maxHeap_.pop(); } - current_ = largest; +} + +void MergingIterator::ClearHeaps() { + maxHeap_ = NewMaxIterHeap(comparator_); + minHeap_ = NewMinIterHeap(comparator_); } } // namespace