diff --git a/table/merging_iterator.cc b/table/merging_iterator.cc index 1a0d4df89..82329ae8a 100644 --- a/table/merging_iterator.cc +++ b/table/merging_iterator.cc @@ -51,12 +51,7 @@ class MergingIterator : public InternalIterator { children_[i].Set(children[i]); } for (auto& child : children_) { - if (child.Valid()) { - assert(child.status().ok()); - minHeap_.push(&child); - } else { - considerStatus(child.status()); - } + AddToMinHeapOrCheckStatus(&child); } current_ = CurrentForward(); } @@ -74,12 +69,9 @@ class MergingIterator : public InternalIterator { iter->SetPinnedItersMgr(pinned_iters_mgr_); } auto new_wrapper = children_.back(); + AddToMinHeapOrCheckStatus(&new_wrapper); if (new_wrapper.Valid()) { - assert(new_wrapper.status().ok()); - minHeap_.push(&new_wrapper); current_ = CurrentForward(); - } else { - considerStatus(new_wrapper.status()); } } @@ -98,12 +90,7 @@ class MergingIterator : public InternalIterator { status_ = Status::OK(); for (auto& child : children_) { child.SeekToFirst(); - if (child.Valid()) { - assert(child.status().ok()); - minHeap_.push(&child); - } else { - considerStatus(child.status()); - } + AddToMinHeapOrCheckStatus(&child); } direction_ = kForward; current_ = CurrentForward(); @@ -115,12 +102,7 @@ class MergingIterator : public InternalIterator { status_ = Status::OK(); for (auto& child : children_) { child.SeekToLast(); - if (child.Valid()) { - assert(child.status().ok()); - maxHeap_->push(&child); - } else { - considerStatus(child.status()); - } + AddToMaxHeapOrCheckStatus(&child); } direction_ = kReverse; current_ = CurrentReverse(); @@ -147,16 +129,16 @@ class MergingIterator : public InternalIterator { comparator_->Compare(target, child.key()) > 0 || child.iter()->is_mutable()) { PERF_TIMER_GUARD(seek_child_seek_time); + child.Seek(target); + PERF_COUNTER_ADD(seek_child_seek_count, 1); } - - if (child.Valid()) { - assert(child.status().ok()); + { + // Strictly, we timed slightly more than min heap operation, + // but these operations are very cheap. PERF_TIMER_GUARD(seek_min_heap_time); - minHeap_.push(&child); - } else { - considerStatus(child.status()); + AddToMinHeapOrCheckStatus(&child); } } direction_ = kForward; @@ -178,12 +160,9 @@ class MergingIterator : public InternalIterator { } PERF_COUNTER_ADD(seek_child_seek_count, 1); - if (child.Valid()) { - assert(child.status().ok()); + { PERF_TIMER_GUARD(seek_max_heap_time); - maxHeap_->push(&child); - } else { - considerStatus(child.status()); + AddToMaxHeapOrCheckStatus(&child); } } direction_ = kReverse; @@ -246,35 +225,7 @@ class MergingIterator : public InternalIterator { if (direction_ != kReverse) { // Otherwise, retreat the non-current children. We retreat current_ // just after the if-block. - ClearHeaps(); - InitMaxHeap(); - Slice target = key(); - for (auto& child : children_) { - if (&child != current_) { - child.SeekForPrev(target); - TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); - considerStatus(child.status()); - if (child.Valid() && comparator_->Equal(target, child.key())) { - child.Prev(); - considerStatus(child.status()); - } - } - if (child.Valid()) { - assert(child.status().ok()); - maxHeap_->push(&child); - } - } - direction_ = kReverse; - if (!prefix_seek_mode_) { - // Note that we don't do assert(current_ == CurrentReverse()) here - // because it is possible to have some keys larger than the seek-key - // inserted between Seek() and SeekToLast(), which makes current_ not - // equal to CurrentReverse(). - current_ = CurrentReverse(); - } - // The loop advanced all non-current children to be < key() so current_ - // should still be strictly the smallest key. - assert(current_ == CurrentReverse()); + SwitchToBackward(); } // For the heap modifications below to be correct, current_ must be the @@ -370,8 +321,20 @@ class MergingIterator : public InternalIterator { std::unique_ptr maxHeap_; PinnedIteratorsManager* pinned_iters_mgr_; + // In forward direction, process a child that is not in the min heap. + // If valid, add to the min heap. Otherwise, check status. + void AddToMinHeapOrCheckStatus(IteratorWrapper*); + + // In backward direction, process a child that is not in the max heap. + // If valid, add to the min heap. Otherwise, check status. + void AddToMaxHeapOrCheckStatus(IteratorWrapper*); + void SwitchToForward(); + // Switch the direction from forward to backward without changing the + // position. Iterator should still be valid. + void SwitchToBackward(); + IteratorWrapper* CurrentForward() const { assert(direction_ == kForward); return !minHeap_.empty() ? minHeap_.top() : nullptr; @@ -384,6 +347,24 @@ class MergingIterator : public InternalIterator { } }; +void MergingIterator::AddToMinHeapOrCheckStatus(IteratorWrapper* child) { + if (child->Valid()) { + assert(child->status().ok()); + minHeap_.push(child); + } else { + considerStatus(child->status()); + } +} + +void MergingIterator::AddToMaxHeapOrCheckStatus(IteratorWrapper* child) { + if (child->Valid()) { + assert(child->status().ok()); + maxHeap_->push(child); + } else { + considerStatus(child->status()); + } +} + void MergingIterator::SwitchToForward() { // Otherwise, advance the non-current children. We advance current_ // just after the if-block. @@ -392,19 +373,42 @@ void MergingIterator::SwitchToForward() { for (auto& child : children_) { if (&child != current_) { child.Seek(target); - considerStatus(child.status()); if (child.Valid() && comparator_->Equal(target, child.key())) { + assert(child.status().ok()); child.Next(); - considerStatus(child.status()); } } - if (child.Valid()) { - minHeap_.push(&child); - } + AddToMinHeapOrCheckStatus(&child); } direction_ = kForward; } +void MergingIterator::SwitchToBackward() { + ClearHeaps(); + InitMaxHeap(); + Slice target = key(); + for (auto& child : children_) { + if (&child != current_) { + child.SeekForPrev(target); + TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); + if (child.Valid() && comparator_->Equal(target, child.key())) { + assert(child.status().ok()); + child.Prev(); + } + } + AddToMaxHeapOrCheckStatus(&child); + } + direction_ = kReverse; + if (!prefix_seek_mode_) { + // Note that we don't do assert(current_ == CurrentReverse()) here + // because it is possible to have some keys larger than the seek-key + // inserted between Seek() and SeekToLast(), which makes current_ not + // equal to CurrentReverse(). + current_ = CurrentReverse(); + } + assert(current_ == CurrentReverse()); +} + void MergingIterator::ClearHeaps() { minHeap_.clear(); if (maxHeap_) {