diff --git a/db/column_family.h b/db/column_family.h index d306f4e66..b5a70e76f 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -180,7 +180,7 @@ class ColumnFamilyData { void SetCurrent(Version* current); void CreateNewMemtable(); - TableCache* table_cache() { return table_cache_.get(); } + TableCache* table_cache() const { return table_cache_.get(); } // See documentation in compaction_picker.h Compaction* PickCompaction(LogBuffer* log_buffer); diff --git a/db/db_bench.cc b/db/db_bench.cc index 44b684e09..b8e4b3213 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -1928,7 +1928,7 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found, read); thread->stats.AddMessage(msg); @@ -2056,9 +2056,12 @@ class Benchmark { } char msg[100]; - snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", + snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n", found, read); thread->stats.AddMessage(msg); + if (FLAGS_perf_level > 0) { + thread->stats.AddMessage(perf_context.ToString()); + } } void SeekRandomWhileWriting(ThreadState* thread) { diff --git a/db/db_impl.cc b/db/db_impl.cc index f87442767..4e6136281 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -36,6 +36,7 @@ #include "db/table_cache.h" #include "db/table_properties_collector.h" #include "db/tailing_iter.h" +#include "db/forward_iterator.h" #include "db/transaction_log_impl.h" #include "db/version_set.h" #include "db/write_batch_internal.h" @@ -2578,7 +2579,7 @@ Status DBImpl::ProcessKeyValueCompaction( cfd->user_comparator()->Compare(ikey.user_key, current_user_key.GetKey()) != 0) { // First occurrence of this user key - current_user_key.SetUserKey(ikey.user_key); + current_user_key.SetKey(ikey.user_key); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber; @@ -3538,7 +3539,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options, // not supported in lite version return nullptr; #else - iter = new TailingIterator(env_, this, options, cfd); + // TODO(ljin): remove tailing iterator + iter = new ForwardIterator(env_, this, options, cfd); + iter = NewDBIterator(env_, *cfd->options(), + cfd->user_comparator(), iter, kMaxSequenceNumber); + //iter = new TailingIterator(env_, this, options, cfd); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); diff --git a/db/db_impl.h b/db/db_impl.h index cc59cfd7c..faa5c4c63 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -285,6 +285,7 @@ class DBImpl : public DB { friend class InternalStats; #ifndef ROCKSDB_LITE friend class TailingIterator; + friend class ForwardIterator; #endif friend struct SuperVersion; struct CompactionState; diff --git a/db/db_iter.cc b/db/db_iter.cc index e7ff2ba25..3de620dff 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -211,18 +211,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping) { case kTypeDeletion: // Arrange to skip all upcoming entries for this key since // they are hidden by this deletion. - saved_key_.SetUserKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key); skipping = true; num_skipped = 0; PERF_COUNTER_ADD(internal_delete_skipped_count, 1); break; case kTypeValue: valid_ = true; - saved_key_.SetUserKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key); return; case kTypeMerge: // By now, we are sure the current ikey is going to yield a value - saved_key_.SetUserKey(ikey.user_key); + saved_key_.SetKey(ikey.user_key); current_entry_is_merged_ = true; valid_ = true; MergeValuesNewToOld(); // Go to a different state machine @@ -331,7 +331,7 @@ void DBIter::Prev() { // iter_ is pointing at the current entry. Scan backwards until // the key changes so we can use the normal reverse scanning code. assert(iter_->Valid()); // Otherwise valid_ would have been false - saved_key_.SetUserKey(ExtractUserKey(iter_->key())); + saved_key_.SetKey(ExtractUserKey(iter_->key())); while (true) { iter_->Prev(); if (!iter_->Valid()) { @@ -377,7 +377,7 @@ void DBIter::FindPrevUserEntry() { std::string empty; swap(empty, saved_value_); } - saved_key_.SetUserKey(ExtractUserKey(iter_->key())); + saved_key_.SetKey(ExtractUserKey(iter_->key())); saved_value_.assign(raw_value.data(), raw_value.size()); } } else { diff --git a/db/dbformat.h b/db/dbformat.h index 4356dd934..9640372d7 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -256,10 +256,10 @@ class IterKey { void Clear() { key_size_ = 0; } - void SetUserKey(const Slice& user_key) { - size_t size = user_key.size(); + void SetKey(const Slice& key) { + size_t size = key.size(); EnlargeBufferIfNeeded(size); - memcpy(key_, user_key.data(), size); + memcpy(key_, key.data(), size); key_size_ = size; } diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc new file mode 100644 index 000000000..b80e300ea --- /dev/null +++ b/db/forward_iterator.cc @@ -0,0 +1,384 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE +#include "db/forward_iterator.h" + +#include +#include +#include +#include "db/db_impl.h" +#include "db/db_iter.h" +#include "db/column_family.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "table/merger.h" +#include "db/dbformat.h" + +namespace rocksdb { + +// Usage: +// LevelIterator iter; +// iter.SetFileIndex(file_index); +// iter.Seek(target); +// iter.Next() +class LevelIterator : public Iterator { + public: + LevelIterator(const ColumnFamilyData* const cfd, + const ReadOptions& read_options, + const std::vector& files) + : cfd_(cfd), read_options_(read_options), files_(files), valid_(false), + file_index_(std::numeric_limits::max()) {} + + void SetFileIndex(uint32_t file_index) { + assert(file_index < files_.size()); + if (file_index != file_index_) { + file_index_ = file_index; + file_iter_.reset(cfd_->table_cache()->NewIterator( + read_options_, *(cfd_->soptions()), cfd_->internal_comparator(), + *(files_[file_index_]), nullptr /* table_reader_ptr */, false)); + } + valid_ = false; + } + void SeekToLast() override { + status_ = Status::NotSupported("LevelIterator::SeekToLast()"); + valid_ = false; + } + void Prev() { + status_ = Status::NotSupported("LevelIterator::Prev()"); + valid_ = false; + } + bool Valid() const override { + return valid_; + } + void SeekToFirst() override { + SetFileIndex(0); + file_iter_->SeekToFirst(); + valid_ = file_iter_->Valid(); + } + void Seek(const Slice& internal_key) override { + assert(file_iter_ != nullptr); + file_iter_->Seek(internal_key); + valid_ = file_iter_->Valid(); + assert(valid_); + } + void Next() override { + assert(valid_); + file_iter_->Next(); + while (!file_iter_->Valid()) { + if (file_index_ + 1 >= files_.size()) { + valid_ = false; + return; + } + SetFileIndex(file_index_ + 1); + file_iter_->SeekToFirst(); + } + valid_ = file_iter_->Valid(); + } + Slice key() const override { + assert(valid_); + return file_iter_->key(); + } + Slice value() const override { + assert(valid_); + return file_iter_->value(); + } + Status status() const override { + return status_; + } + + private: + const ColumnFamilyData* const cfd_; + const ReadOptions& read_options_; + const std::vector& files_; + + bool valid_; + uint32_t file_index_; + Status status_; + std::unique_ptr file_iter_; +}; + +ForwardIterator::ForwardIterator(Env* const env, DBImpl* db, + const ReadOptions& read_options, ColumnFamilyData* cfd) + : db_(db), + env_(env), + read_options_(read_options), + cfd_(cfd), + prefix_extractor_(cfd->options()->prefix_extractor.get()), + user_comparator_(cfd->user_comparator()), + immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())), + sv_(nullptr), + mutable_iter_(nullptr), + current_(nullptr), + valid_(false), + is_prev_set_(false) {} + +ForwardIterator::~ForwardIterator() { + Cleanup(); +} + +void ForwardIterator::Cleanup() { + delete mutable_iter_; + for (auto* m : imm_iters_) { + delete m; + } + imm_iters_.clear(); + for (auto* f : l0_iters_) { + delete f; + } + l0_iters_.clear(); + for (auto* l : level_iters_) { + delete l; + } + level_iters_.clear(); + + if (sv_ != nullptr && sv_->Unref()) { + DBImpl::DeletionState deletion_state; + db_->mutex_.Lock(); + sv_->Cleanup(); + db_->FindObsoleteFiles(deletion_state, false, true); + db_->mutex_.Unlock(); + delete sv_; + if (deletion_state.HaveSomethingToDelete()) { + db_->PurgeObsoleteFiles(deletion_state); + } + } +} + +bool ForwardIterator::Valid() const { + return valid_; +} + +void ForwardIterator::SeekToFirst() { + if (sv_ == nullptr || + sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + RebuildIterators(); + } + SeekInternal(Slice(), true); +} + +void ForwardIterator::Seek(const Slice& internal_key) { + if (sv_ == nullptr || + sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + RebuildIterators(); + } + SeekInternal(internal_key, false); +} + +void ForwardIterator::SeekInternal(const Slice& internal_key, + bool seek_to_first) { + // mutable + seek_to_first ? mutable_iter_->SeekToFirst() : + mutable_iter_->Seek(internal_key); + + // immutable + // TODO(ljin): NeedToSeekImmutable has negative impact on performance + // if it turns to need to seek immutable often. We probably want to have + // an option to turn it off. + if (seek_to_first || NeedToSeekImmutable(internal_key)) { + { + auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator())); + immutable_min_heap_.swap(tmp); + } + for (auto* m : imm_iters_) { + seek_to_first ? m->SeekToFirst() : m->Seek(internal_key); + if (m->Valid()) { + immutable_min_heap_.push(m); + } + } + + auto* files = sv_->current->files_; + for (uint32_t i = 0; i < files[0].size(); ++i) { + if (seek_to_first) { + l0_iters_[i]->SeekToFirst(); + } else { + // If the target key passes over the larget key, we are sure Next() + // won't go over this file. + if (user_comparator_->Compare(ExtractUserKey(internal_key), + files[0][i]->largest.user_key()) > 0) { + continue; + } + l0_iters_[i]->Seek(internal_key); + } + if (l0_iters_[i]->Valid()) { + immutable_min_heap_.push(l0_iters_[i]); + } + } + for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) { + if (files[level].empty()) { + continue; + } + assert(level_iters_[level - 1] != nullptr); + uint32_t f_idx = 0; + if (!seek_to_first) { + f_idx = FindFileInRange( + files[level], internal_key, 0, files[level].size()); + } + if (f_idx < files[level].size()) { + level_iters_[level - 1]->SetFileIndex(f_idx); + seek_to_first ? level_iters_[level - 1]->SeekToFirst() : + level_iters_[level - 1]->Seek(internal_key); + if (level_iters_[level - 1]->Valid()) { + immutable_min_heap_.push(level_iters_[level - 1]); + } + } + } + + if (seek_to_first || immutable_min_heap_.empty()) { + is_prev_set_ = false; + } else { + prev_key_.SetKey(internal_key); + is_prev_set_ = true; + } + } + + UpdateCurrent(); +} + +void ForwardIterator::Next() { + assert(valid_); + + if (sv_ == nullptr || + sv_ ->version_number != cfd_->GetSuperVersionNumber()) { + std::string current_key = key().ToString(); + Slice old_key(current_key.data(), current_key.size()); + + RebuildIterators(); + SeekInternal(old_key, false); + if (!valid_ || key().compare(old_key) != 0) { + return; + } + } else if (current_ != mutable_iter_) { + // It is going to advance immutable iterator + prev_key_.SetKey(current_->key()); + is_prev_set_ = true; + } + + current_->Next(); + if (current_->Valid() && current_ != mutable_iter_) { + immutable_min_heap_.push(current_); + } + UpdateCurrent(); +} + +Slice ForwardIterator::key() const { + assert(valid_); + return current_->key(); +} + +Slice ForwardIterator::value() const { + assert(valid_); + return current_->value(); +} + +Status ForwardIterator::status() const { + if (!status_.ok()) { + return status_; + } else if (!mutable_iter_->status().ok()) { + return mutable_iter_->status(); + } + return Status::OK(); +} + +void ForwardIterator::RebuildIterators() { + // Clean up + Cleanup(); + // New + sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_)); + mutable_iter_ = sv_->mem->NewIterator(read_options_); + sv_->imm->AddIterators(read_options_, &imm_iters_); + const auto& l0_files = sv_->current->files_[0]; + l0_iters_.reserve(l0_files.size()); + for (const auto* l0 : l0_files) { + l0_iters_.push_back(cfd_->table_cache()->NewIterator( + read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0)); + } + level_iters_.reserve(sv_->current->NumberLevels() - 1); + for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) { + if (sv_->current->files_[level].empty()) { + level_iters_.push_back(nullptr); + } else { + level_iters_.push_back(new LevelIterator(cfd_, read_options_, + sv_->current->files_[level])); + } + } + + current_ = nullptr; + is_prev_set_ = false; +} + +void ForwardIterator::UpdateCurrent() { + if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) { + current_ = nullptr; + } else if (immutable_min_heap_.empty()) { + current_ = mutable_iter_; + } else if (!mutable_iter_->Valid()) { + current_ = immutable_min_heap_.top(); + immutable_min_heap_.pop(); + } else { + current_ = immutable_min_heap_.top(); + assert(current_ != nullptr); + assert(current_->Valid()); + int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare( + mutable_iter_->key(), current_->key()) > 0; + assert(cmp != 0); + if (cmp > 0) { + immutable_min_heap_.pop(); + } else { + current_ = mutable_iter_; + } + } + valid_ = (current_ != nullptr); + if (!status_.ok()) { + status_ = Status::OK(); + } +} + +bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { + if (!is_prev_set_) { + return true; + } + Slice prev_key = prev_key_.GetKey(); + if (prefix_extractor_ && prefix_extractor_->Transform(target).compare( + prefix_extractor_->Transform(prev_key)) != 0) { + return true; + } + if (cfd_->internal_comparator().InternalKeyComparator::Compare( + prev_key, target) >= 0) { + return true; + } + if (immutable_min_heap_.empty() || + cfd_->internal_comparator().InternalKeyComparator::Compare( + target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key() + : current_->key()) > 0) { + return true; + } + return false; +} + +uint32_t ForwardIterator::FindFileInRange( + const std::vector& files, const Slice& internal_key, + uint32_t left, uint32_t right) { + while (left < right) { + uint32_t mid = (left + right) / 2; + const FileMetaData* f = files[mid]; + if (cfd_->internal_comparator().InternalKeyComparator::Compare( + f->largest.Encode(), internal_key) < 0) { + // Key at "mid.largest" is < "target". Therefore all + // files at or before "mid" are uninteresting. + left = mid + 1; + } else { + // Key at "mid.largest" is >= "target". Therefore all files + // after "mid" are uninteresting. + right = mid; + } + } + return right; +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/db/forward_iterator.h b/db/forward_iterator.h new file mode 100644 index 000000000..754a69947 --- /dev/null +++ b/db/forward_iterator.h @@ -0,0 +1,106 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "db/dbformat.h" + +namespace rocksdb { + +class DBImpl; +class Env; +struct SuperVersion; +class ColumnFamilyData; +class LevelIterator; +class FileMetaData; + +class MinIterComparator { + public: + explicit MinIterComparator(const Comparator* comparator) : + comparator_(comparator) {} + + bool operator()(Iterator* a, Iterator* b) { + return comparator_->Compare(a->key(), b->key()) > 0; + } + private: + const Comparator* comparator_; +}; + +typedef std::priority_queue, + MinIterComparator> MinIterHeap; + +/** + * ForwardIterator is a special type of iterator that only supports Seek() + * and Next(). It is expected to perform better than TailingIterator by + * removing the encapsulation and making all information accessible within + * the iterator. At the current implementation, snapshot is taken at the + * time Seek() is called. The Next() followed do not see new values after. + */ +class ForwardIterator : public Iterator { + public: + ForwardIterator(Env* const env, DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd); + virtual ~ForwardIterator(); + + void SeekToLast() override { + status_ = Status::NotSupported("ForwardIterator::SeekToLast()"); + valid_ = false; + } + void Prev() { + status_ = Status::NotSupported("ForwardIterator::Prev"); + valid_ = false; + } + + virtual bool Valid() const override; + void SeekToFirst() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + + private: + void Cleanup(); + void RebuildIterators(); + void SeekInternal(const Slice& internal_key, bool seek_to_first); + void UpdateCurrent(); + bool NeedToSeekImmutable(const Slice& internal_key); + uint32_t FindFileInRange( + const std::vector& files, const Slice& internal_key, + uint32_t left, uint32_t right); + + DBImpl* const db_; + Env* const env_; + const ReadOptions read_options_; + ColumnFamilyData* const cfd_; + const SliceTransform* const prefix_extractor_; + const Comparator* user_comparator_; + MinIterHeap immutable_min_heap_; + + SuperVersion* sv_; + Iterator* mutable_iter_; + std::vector imm_iters_; + std::vector l0_iters_; + std::vector level_iters_; + Iterator* current_; + // internal iterator status + Status status_; + bool valid_; + + IterKey prev_key_; + bool is_prev_set_; +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/db/version_set.h b/db/version_set.h index ffadb5813..852c0323b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -218,6 +218,7 @@ class Version { friend class LevelCompactionPicker; friend class UniversalCompactionPicker; friend class FIFOCompactionPicker; + friend class ForwardIterator; class LevelFileNumIterator; class LevelFileIteratorState; diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 6793bfa0a..117c9fdc4 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -8,6 +8,9 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once + +#include "rocksdb/iterator.h" + namespace rocksdb { // A internal wrapper class with an interface similar to Iterator that