Use a priority queue to merge files.

Summary:
Use a std::priority_queue in merger.cc instead of doing a o(n) search
every time.
Currently only the ForwardIteration uses a Priority Queue.

Test Plan: make all check

Reviewers: dhruba

Reviewed By: dhruba

CC: emayanke, zshao

Differential Revision: https://reviews.facebook.net/D7629
main
Abhishek Kona 12 years ago
parent d7d43ae21a
commit 3f7af03a2d
  1. 63
      table/iter_heap.h
  2. 86
      table/merger.cc

@ -0,0 +1,63 @@
// Copyright 2008-present Facebook. All Rights Reserved.
#ifndef STORAGE_LEVELDB_ITER_HEAP_H_
#define STORAGE_LEVELDB_ITER_HEAP_H_
#include <queue>
#include "leveldb/comparator.h"
#include "table/iterator_wrapper.h"
namespace leveldb {
// Return the max of two keys.
class MaxIteratorComparator {
public:
MaxIteratorComparator(const Comparator* comparator) :
comparator_(comparator) {}
bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
return comparator_->Compare(a->key(), b->key()) <= 0;
}
private:
const Comparator* comparator_;
};
// Return the max of two keys.
class MinIteratorComparator {
public:
// if maxHeap is set comparator returns the max value.
// else returns the min Value.
// Can use to create a minHeap or a maxHeap.
MinIteratorComparator(const Comparator* comparator) :
comparator_(comparator) {}
bool operator()(IteratorWrapper* a, IteratorWrapper* b) {
return comparator_->Compare(a->key(), b->key()) > 0;
}
private:
const Comparator* comparator_;
};
typedef std::priority_queue<
IteratorWrapper*,
std::vector<IteratorWrapper*>,
MaxIteratorComparator> MaxIterHeap;
typedef std::priority_queue<
IteratorWrapper*,
std::vector<IteratorWrapper*>,
MinIteratorComparator> MinIterHeap;
// Return's a new MaxHeap of IteratorWrapper's using the provided Comparator.
MaxIterHeap NewMaxIterHeap(const Comparator* comparator) {
return MaxIterHeap(MaxIteratorComparator(comparator));
}
// Return's a new MinHeap of IteratorWrapper's using the provided Comparator.
MinIterHeap NewMinIterHeap(const Comparator* comparator) {
return MinIterHeap(MinIteratorComparator(comparator));
}
} // namespace leveldb
#endif // STORAGE_LEVELDB_ITER_HEAP_H_

@ -6,11 +6,13 @@
#include "leveldb/comparator.h" #include "leveldb/comparator.h"
#include "leveldb/iterator.h" #include "leveldb/iterator.h"
#include "table/iter_heap.h"
#include "table/iterator_wrapper.h" #include "table/iterator_wrapper.h"
namespace leveldb { namespace leveldb {
namespace { namespace {
class MergingIterator : public Iterator { class MergingIterator : public Iterator {
public: public:
MergingIterator(const Comparator* comparator, Iterator** children, int n) MergingIterator(const Comparator* comparator, Iterator** children, int n)
@ -18,10 +20,17 @@ class MergingIterator : public Iterator {
children_(new IteratorWrapper[n]), children_(new IteratorWrapper[n]),
n_(n), n_(n),
current_(NULL), current_(NULL),
direction_(kForward) { direction_(kForward),
maxHeap_(NewMaxIterHeap(comparator_)),
minHeap_ (NewMinIterHeap(comparator_)) {
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
children_[i].Set(children[i]); children_[i].Set(children[i]);
} }
for (int i = 0; i < n; ++i) {
if (children_[i].Valid()) {
minHeap_.push(&children_[i]);
}
}
} }
virtual ~MergingIterator() { virtual ~MergingIterator() {
@ -33,24 +42,36 @@ class MergingIterator : public Iterator {
} }
virtual void SeekToFirst() { virtual void SeekToFirst() {
ClearHeaps();
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
children_[i].SeekToFirst(); children_[i].SeekToFirst();
if (children_[i].Valid()) {
minHeap_.push(&children_[i]);
}
} }
FindSmallest(); FindSmallest();
direction_ = kForward; direction_ = kForward;
} }
virtual void SeekToLast() { virtual void SeekToLast() {
ClearHeaps();
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
children_[i].SeekToLast(); children_[i].SeekToLast();
if (children_[i].Valid()) {
maxHeap_.push(&children_[i]);
}
} }
FindLargest(); FindLargest();
direction_ = kReverse; direction_ = kReverse;
} }
virtual void Seek(const Slice& target) { virtual void Seek(const Slice& target) {
ClearHeaps();
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
children_[i].Seek(target); children_[i].Seek(target);
if (children_[i].Valid()) {
minHeap_.push(&children_[i]);
}
} }
FindSmallest(); FindSmallest();
direction_ = kForward; direction_ = kForward;
@ -65,6 +86,7 @@ class MergingIterator : public Iterator {
// the smallest child and key() == current_->key(). Otherwise, // the smallest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children. // we explicitly position the non-current_ children.
if (direction_ != kForward) { if (direction_ != kForward) {
ClearHeaps();
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i]; IteratorWrapper* child = &children_[i];
if (child != current_) { if (child != current_) {
@ -73,24 +95,32 @@ class MergingIterator : public Iterator {
comparator_->Compare(key(), child->key()) == 0) { comparator_->Compare(key(), child->key()) == 0) {
child->Next(); child->Next();
} }
if (child->Valid()) {
minHeap_.push(child);
}
} }
} }
direction_ = kForward; direction_ = kForward;
} }
// as the current points to the current record. move the iterator forward.
// and if it is valid add it to the heap.
current_->Next(); current_->Next();
if (current_->Valid()){
minHeap_.push(current_);
}
FindSmallest(); FindSmallest();
} }
virtual void Prev() { virtual void Prev() {
assert(Valid()); assert(Valid());
// Ensure that all children are positioned before key(). // Ensure that all children are positioned before key().
// If we are moving in the reverse direction, it is already // If we are moving in the reverse direction, it is already
// true for all of the non-current_ children since current_ is // true for all of the non-current_ children since current_ is
// the largest child and key() == current_->key(). Otherwise, // the largest child and key() == current_->key(). Otherwise,
// we explicitly position the non-current_ children. // we explicitly position the non-current_ children.
if (direction_ != kReverse) { if (direction_ != kReverse) {
ClearHeaps();
for (int i = 0; i < n_; i++) { for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i]; IteratorWrapper* child = &children_[i];
if (child != current_) { if (child != current_) {
@ -102,12 +132,18 @@ class MergingIterator : public Iterator {
// Child has no entries >= key(). Position at last entry. // Child has no entries >= key(). Position at last entry.
child->SeekToLast(); child->SeekToLast();
} }
if (child->Valid()) {
maxHeap_.push(child);
}
} }
} }
direction_ = kReverse; direction_ = kReverse;
} }
current_->Prev(); current_->Prev();
if (current_->Valid()) {
maxHeap_.push(current_);
}
FindLargest(); FindLargest();
} }
@ -135,51 +171,47 @@ class MergingIterator : public Iterator {
private: private:
void FindSmallest(); void FindSmallest();
void FindLargest(); void FindLargest();
void ClearHeaps();
// We might want to use a heap in case there are lots of children.
// For now we use a simple array since we expect a very small number
// of children in leveldb.
const Comparator* comparator_; const Comparator* comparator_;
IteratorWrapper* children_; IteratorWrapper* children_;
int n_; int n_;
IteratorWrapper* current_; IteratorWrapper* current_;
// Which direction is the iterator moving? // Which direction is the iterator moving?
enum Direction { enum Direction {
kForward, kForward,
kReverse kReverse
}; };
Direction direction_; Direction direction_;
MaxIterHeap maxHeap_;
MinIterHeap minHeap_;
}; };
void MergingIterator::FindSmallest() { void MergingIterator::FindSmallest() {
IteratorWrapper* smallest = NULL; assert (direction_ == kForward);
for (int i = 0; i < n_; i++) { if (minHeap_.empty()) {
IteratorWrapper* child = &children_[i]; current_ = NULL;
if (child->Valid()) { } else {
if (smallest == NULL) { current_ = minHeap_.top();
smallest = child; assert(current_->Valid());
} else if (comparator_->Compare(child->key(), smallest->key()) < 0) { minHeap_.pop();
smallest = child;
}
}
} }
current_ = smallest;
} }
void MergingIterator::FindLargest() { void MergingIterator::FindLargest() {
IteratorWrapper* largest = NULL; assert(direction_ == kReverse);
for (int i = n_-1; i >= 0; i--) { if (maxHeap_.empty()) {
IteratorWrapper* child = &children_[i]; current_ = NULL;
if (child->Valid()) { } else {
if (largest == NULL) { current_ = maxHeap_.top();
largest = child; assert(current_->Valid());
} else if (comparator_->Compare(child->key(), largest->key()) > 0) { maxHeap_.pop();
largest = child;
}
} }
} }
current_ = largest;
void MergingIterator::ClearHeaps() {
maxHeap_ = NewMaxIterHeap(comparator_);
minHeap_ = NewMinIterHeap(comparator_);
} }
} // namespace } // namespace

Loading…
Cancel
Save