From 0f9c43ea36f1b18baabb704562d2a89d2e4d78a1 Mon Sep 17 00:00:00 2001 From: Tomislav Novak Date: Fri, 29 Aug 2014 14:32:37 -0700 Subject: [PATCH] ForwardIterator: reset incomplete iterators on Seek() Summary: When reading from kBlockCacheTier, ForwardIterator's internal child iterators may end up in the incomplete state (read was unable to complete without doing disk I/O). `ForwardIterator::status()` will correctly report that; however, the iterator may be stuck in that state until all sub-iterators are rebuilt: * `NeedToSeekImmutable()` may return false even if some sub-iterators are incomplete * one of the child iterators may be an empty iterator without any state other that the kIncomplete status (created using `NewErrorIterator()`); seeking on any such iterator has no effect -- we need to construct it again Akin to rebuilding iterators after a superversion bump, this diff makes forward iterator reset all incomplete child iterators when `Seek()` or `Next()` are called. Test Plan: TEST_TMPDIR=/dev/shm/rocksdbtest ROCKSDB_TESTS=TailingIterator ./db_test Reviewers: igor, sdong, ljin Reviewed By: ljin Subscribers: lovro, march, leveldb Differential Revision: https://reviews.facebook.net/D22575 --- db/forward_iterator.cc | 83 +++++++++++++++++++++++++++++++------ db/forward_iterator.h | 1 + table/two_level_iterator.cc | 5 ++- 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 79cc953cf..74e6dd249 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -6,9 +6,10 @@ #ifndef ROCKSDB_LITE #include "db/forward_iterator.h" +#include #include #include -#include + #include "db/db_impl.h" #include "db/db_iter.h" #include "db/column_family.h" @@ -37,12 +38,16 @@ class LevelIterator : public Iterator { assert(file_index < files_.size()); if (file_index != file_index_) { file_index_ = file_index; - file_iter_.reset(cfd_->table_cache()->NewIterator( - read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), - files_[file_index_]->fd, nullptr /* table_reader_ptr */, false)); + Reset(); } valid_ = false; } + void Reset() { + assert(file_index_ < files_.size()); + file_iter_.reset(cfd_->table_cache()->NewIterator( + read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), + files_[file_index_]->fd, nullptr /* table_reader_ptr */, false)); + } void SeekToLast() override { status_ = Status::NotSupported("LevelIterator::SeekToLast()"); valid_ = false; @@ -63,12 +68,15 @@ class LevelIterator : public Iterator { assert(file_iter_ != nullptr); file_iter_->Seek(internal_key); valid_ = file_iter_->Valid(); - assert(valid_); } void Next() override { assert(valid_); file_iter_->Next(); - while (!file_iter_->Valid()) { + for (;;) { + if (file_iter_->status().IsIncomplete() || file_iter_->Valid()) { + valid_ = !file_iter_->status().IsIncomplete(); + return; + } if (file_index_ + 1 >= files_.size()) { valid_ = false; return; @@ -76,7 +84,6 @@ class LevelIterator : public Iterator { SetFileIndex(file_index_ + 1); file_iter_->SeekToFirst(); } - valid_ = file_iter_->Valid(); } Slice key() const override { assert(valid_); @@ -160,6 +167,8 @@ void ForwardIterator::SeekToFirst() { if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { RebuildIterators(); + } else if (status_.IsIncomplete()) { + ResetIncompleteIterators(); } SeekInternal(Slice(), true); } @@ -168,6 +177,8 @@ void ForwardIterator::Seek(const Slice& internal_key) { if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { RebuildIterators(); + } else if (status_.IsIncomplete()) { + ResetIncompleteIterators(); } SeekInternal(internal_key, false); } @@ -211,7 +222,15 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, } l0_iters_[i]->Seek(internal_key); } - if (l0_iters_[i]->Valid()) { + + if (l0_iters_[i]->status().IsIncomplete()) { + // if any of the immutable iterators is incomplete (no-io option was + // used), we are unable to reliably find the smallest key + assert(read_options_.read_tier == kBlockCacheTier); + status_ = l0_iters_[i]->status(); + valid_ = false; + return; + } else if (l0_iters_[i]->Valid()) { immutable_min_heap_.push(l0_iters_[i]); } } @@ -280,7 +299,14 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, level_iters_[level - 1]->SetFileIndex(f_idx); seek_to_first ? level_iters_[level - 1]->SeekToFirst() : level_iters_[level - 1]->Seek(internal_key); - if (level_iters_[level - 1]->Valid()) { + + if (level_iters_[level - 1]->status().IsIncomplete()) { + // see above + assert(read_options_.read_tier == kBlockCacheTier); + status_ = level_iters_[level - 1]->status(); + valid_ = false; + return; + } else if (level_iters_[level - 1]->Valid()) { immutable_min_heap_.push(level_iters_[level - 1]); } } @@ -304,7 +330,7 @@ void ForwardIterator::Next() { assert(valid_); if (sv_ == nullptr || - sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + sv_->version_number != cfd_->GetSuperVersionNumber()) { std::string current_key = key().ToString(); Slice old_key(current_key.data(), current_key.size()); @@ -320,9 +346,17 @@ void ForwardIterator::Next() { } current_->Next(); - if (current_->Valid() && current_ != mutable_iter_) { - immutable_min_heap_.push(current_); + if (current_ != mutable_iter_) { + if (current_->status().IsIncomplete()) { + assert(read_options_.read_tier == kBlockCacheTier); + status_ = current_->status(); + valid_ = false; + return; + } else if (current_->Valid()) { + immutable_min_heap_.push(current_); + } } + UpdateCurrent(); } @@ -389,6 +423,29 @@ void ForwardIterator::RebuildIterators() { is_prev_set_ = false; } +void ForwardIterator::ResetIncompleteIterators() { + const auto& l0_files = sv_->current->files_[0]; + for (uint32_t i = 0; i < l0_iters_.size(); ++i) { + assert(i < l0_files.size()); + if (!l0_iters_[i]->status().IsIncomplete()) { + continue; + } + delete l0_iters_[i]; + l0_iters_[i] = cfd_->table_cache()->NewIterator( + read_options_, *cfd_->soptions(), cfd_->internal_comparator(), + l0_files[i]->fd); + } + + for (auto* level_iter : level_iters_) { + if (level_iter && level_iter->status().IsIncomplete()) { + level_iter->Reset(); + } + } + + current_ = nullptr; + is_prev_set_ = false; +} + void ForwardIterator::UpdateCurrent() { if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) { current_ = nullptr; @@ -417,7 +474,7 @@ void ForwardIterator::UpdateCurrent() { } bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { - if (!is_prev_set_) { + if (!valid_ || !is_prev_set_) { return true; } Slice prev_key = prev_key_.GetKey(); diff --git a/db/forward_iterator.h b/db/forward_iterator.h index d539ae3c7..bbf423a50 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -73,6 +73,7 @@ class ForwardIterator : public Iterator { private: void Cleanup(); void RebuildIterators(); + void ResetIncompleteIterators(); void SeekInternal(const Slice& internal_key, bool seek_to_first); void UpdateCurrent(); bool NeedToSeekImmutable(const Slice& internal_key); diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 6af48f58c..ae4e46239 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -172,8 +172,9 @@ void TwoLevelIterator::InitDataBlock() { SetSecondLevelIterator(nullptr); } else { Slice handle = first_level_iter_.value(); - if (second_level_iter_.iter() != nullptr - && handle.compare(data_block_handle_) == 0) { + if (second_level_iter_.iter() != nullptr && + !second_level_iter_.status().IsIncomplete() && + handle.compare(data_block_handle_) == 0) { // second_level_iter is already constructed with this iterator, so // no need to change anything } else {