// Copyright (c) 2013, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_LITE #include "db/forward_iterator.h" #include #include #include #include "db/db_impl.h" #include "db/db_iter.h" #include "db/column_family.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "table/merger.h" #include "db/dbformat.h" namespace rocksdb { // Usage: // LevelIterator iter; // iter.SetFileIndex(file_index); // iter.Seek(target); // iter.Next() class LevelIterator : public Iterator { public: LevelIterator(const ColumnFamilyData* const cfd, const ReadOptions& read_options, const std::vector& files) : cfd_(cfd), read_options_(read_options), files_(files), valid_(false), file_index_(std::numeric_limits::max()) {} void SetFileIndex(uint32_t file_index) { assert(file_index < files_.size()); if (file_index != file_index_) { file_index_ = file_index; file_iter_.reset(cfd_->table_cache()->NewIterator( read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), *(files_[file_index_]), nullptr /* table_reader_ptr */, false)); } valid_ = false; } void SeekToLast() override { status_ = Status::NotSupported("LevelIterator::SeekToLast()"); valid_ = false; } void Prev() { status_ = Status::NotSupported("LevelIterator::Prev()"); valid_ = false; } bool Valid() const override { return valid_; } void SeekToFirst() override { SetFileIndex(0); file_iter_->SeekToFirst(); valid_ = file_iter_->Valid(); } void Seek(const Slice& internal_key) override { assert(file_iter_ != nullptr); file_iter_->Seek(internal_key); valid_ = file_iter_->Valid(); assert(valid_); } void Next() override { assert(valid_); file_iter_->Next(); while (!file_iter_->Valid()) { if (file_index_ + 1 >= files_.size()) { valid_ = false; return; } SetFileIndex(file_index_ + 1); file_iter_->SeekToFirst(); } valid_ = file_iter_->Valid(); } Slice key() const override { assert(valid_); return file_iter_->key(); } Slice value() const override { assert(valid_); return file_iter_->value(); } Status status() const override { return status_; } private: const ColumnFamilyData* const cfd_; const ReadOptions& read_options_; const std::vector& files_; bool valid_; uint32_t file_index_; Status status_; std::unique_ptr file_iter_; }; ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, ColumnFamilyData* cfd) : db_(db), read_options_(read_options), cfd_(cfd), prefix_extractor_(cfd->options()->prefix_extractor.get()), user_comparator_(cfd->user_comparator()), immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())), sv_(nullptr), mutable_iter_(nullptr), current_(nullptr), valid_(false), is_prev_set_(false) {} ForwardIterator::~ForwardIterator() { Cleanup(); } void ForwardIterator::Cleanup() { delete mutable_iter_; for (auto* m : imm_iters_) { delete m; } imm_iters_.clear(); for (auto* f : l0_iters_) { delete f; } l0_iters_.clear(); for (auto* l : level_iters_) { delete l; } level_iters_.clear(); if (sv_ != nullptr && sv_->Unref()) { DBImpl::DeletionState deletion_state; db_->mutex_.Lock(); sv_->Cleanup(); db_->FindObsoleteFiles(deletion_state, false, true); db_->mutex_.Unlock(); delete sv_; if (deletion_state.HaveSomethingToDelete()) { db_->PurgeObsoleteFiles(deletion_state); } } } bool ForwardIterator::Valid() const { return valid_; } void ForwardIterator::SeekToFirst() { if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { RebuildIterators(); } SeekInternal(Slice(), true); } void ForwardIterator::Seek(const Slice& internal_key) { if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { RebuildIterators(); } SeekInternal(internal_key, false); } void ForwardIterator::SeekInternal(const Slice& internal_key, bool seek_to_first) { // mutable seek_to_first ? mutable_iter_->SeekToFirst() : mutable_iter_->Seek(internal_key); // immutable // TODO(ljin): NeedToSeekImmutable has negative impact on performance // if it turns to need to seek immutable often. We probably want to have // an option to turn it off. if (seek_to_first || NeedToSeekImmutable(internal_key)) { { auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator())); immutable_min_heap_.swap(tmp); } for (auto* m : imm_iters_) { seek_to_first ? m->SeekToFirst() : m->Seek(internal_key); if (m->Valid()) { immutable_min_heap_.push(m); } } auto* files = sv_->current->files_; for (uint32_t i = 0; i < files[0].size(); ++i) { if (seek_to_first) { l0_iters_[i]->SeekToFirst(); } else { // If the target key passes over the larget key, we are sure Next() // won't go over this file. if (user_comparator_->Compare(ExtractUserKey(internal_key), files[0][i]->largest.user_key()) > 0) { continue; } l0_iters_[i]->Seek(internal_key); } if (l0_iters_[i]->Valid()) { immutable_min_heap_.push(l0_iters_[i]); } } for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) { if (files[level].empty()) { continue; } assert(level_iters_[level - 1] != nullptr); uint32_t f_idx = 0; if (!seek_to_first) { f_idx = FindFileInRange( files[level], internal_key, 0, files[level].size()); } if (f_idx < files[level].size()) { level_iters_[level - 1]->SetFileIndex(f_idx); seek_to_first ? level_iters_[level - 1]->SeekToFirst() : level_iters_[level - 1]->Seek(internal_key); if (level_iters_[level - 1]->Valid()) { immutable_min_heap_.push(level_iters_[level - 1]); } } } if (seek_to_first || immutable_min_heap_.empty()) { is_prev_set_ = false; } else { prev_key_.SetKey(internal_key); is_prev_set_ = true; } } UpdateCurrent(); } void ForwardIterator::Next() { assert(valid_); if (sv_ == nullptr || sv_ ->version_number != cfd_->GetSuperVersionNumber()) { std::string current_key = key().ToString(); Slice old_key(current_key.data(), current_key.size()); RebuildIterators(); SeekInternal(old_key, false); if (!valid_ || key().compare(old_key) != 0) { return; } } else if (current_ != mutable_iter_) { // It is going to advance immutable iterator prev_key_.SetKey(current_->key()); is_prev_set_ = true; } current_->Next(); if (current_->Valid() && current_ != mutable_iter_) { immutable_min_heap_.push(current_); } UpdateCurrent(); } Slice ForwardIterator::key() const { assert(valid_); return current_->key(); } Slice ForwardIterator::value() const { assert(valid_); return current_->value(); } Status ForwardIterator::status() const { if (!status_.ok()) { return status_; } else if (!mutable_iter_->status().ok()) { return mutable_iter_->status(); } return Status::OK(); } void ForwardIterator::RebuildIterators() { // Clean up Cleanup(); // New sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); mutable_iter_ = sv_->mem->NewIterator(read_options_); sv_->imm->AddIterators(read_options_, &imm_iters_); const auto& l0_files = sv_->current->files_[0]; l0_iters_.reserve(l0_files.size()); for (const auto* l0 : l0_files) { l0_iters_.push_back(cfd_->table_cache()->NewIterator( read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0)); } level_iters_.reserve(sv_->current->NumberLevels() - 1); for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) { if (sv_->current->files_[level].empty()) { level_iters_.push_back(nullptr); } else { level_iters_.push_back(new LevelIterator(cfd_, read_options_, sv_->current->files_[level])); } } current_ = nullptr; is_prev_set_ = false; } void ForwardIterator::UpdateCurrent() { if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) { current_ = nullptr; } else if (immutable_min_heap_.empty()) { current_ = mutable_iter_; } else if (!mutable_iter_->Valid()) { current_ = immutable_min_heap_.top(); immutable_min_heap_.pop(); } else { current_ = immutable_min_heap_.top(); assert(current_ != nullptr); assert(current_->Valid()); int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare( mutable_iter_->key(), current_->key()) > 0; assert(cmp != 0); if (cmp > 0) { immutable_min_heap_.pop(); } else { current_ = mutable_iter_; } } valid_ = (current_ != nullptr); if (!status_.ok()) { status_ = Status::OK(); } } bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { if (!is_prev_set_) { return true; } Slice prev_key = prev_key_.GetKey(); if (prefix_extractor_ && prefix_extractor_->Transform(target).compare( prefix_extractor_->Transform(prev_key)) != 0) { return true; } if (cfd_->internal_comparator().InternalKeyComparator::Compare( prev_key, target) >= 0) { return true; } if (immutable_min_heap_.empty() || cfd_->internal_comparator().InternalKeyComparator::Compare( target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key() : current_->key()) > 0) { return true; } return false; } uint32_t ForwardIterator::FindFileInRange( const std::vector& files, const Slice& internal_key, uint32_t left, uint32_t right) { while (left < right) { uint32_t mid = (left + right) / 2; const FileMetaData* f = files[mid]; if (cfd_->internal_comparator().InternalKeyComparator::Compare( f->largest.Encode(), internal_key) < 0) { // Key at "mid.largest" is < "target". Therefore all // files at or before "mid" are uninteresting. left = mid + 1; } else { // Key at "mid.largest" is >= "target". Therefore all files // after "mid" are uninteresting. right = mid; } } return right; } } // namespace rocksdb #endif // ROCKSDB_LITE