forward iterator

Summary:
Forward iterator puts everything together in a flat structure instead of
a hierarchy of nested iterators. this should simplify the code and
provide better performance. It also enables more optimization since all
information are accessiable in one place.
Init evaluation shows about 6% improvement

Test Plan: db_test and db_bench

Reviewers: dhruba, igor, tnovak, sdong, haobo

Reviewed By: haobo

Subscribers: sdong, leveldb

Differential Revision: https://reviews.facebook.net/D18795
main
Lei Jin 11 years ago
parent f29c62fc6f
commit 388d2054c7
  1. 2
      db/column_family.h
  2. 7
      db/db_bench.cc
  3. 9
      db/db_impl.cc
  4. 1
      db/db_impl.h
  5. 10
      db/db_iter.cc
  6. 6
      db/dbformat.h
  7. 384
      db/forward_iterator.cc
  8. 106
      db/forward_iterator.h
  9. 1
      db/version_set.h
  10. 3
      table/iterator_wrapper.h

@ -180,7 +180,7 @@ class ColumnFamilyData {
void SetCurrent(Version* current); void SetCurrent(Version* current);
void CreateNewMemtable(); void CreateNewMemtable();
TableCache* table_cache() { return table_cache_.get(); } TableCache* table_cache() const { return table_cache_.get(); }
// See documentation in compaction_picker.h // See documentation in compaction_picker.h
Compaction* PickCompaction(LogBuffer* log_buffer); Compaction* PickCompaction(LogBuffer* log_buffer);

@ -1928,7 +1928,7 @@ class Benchmark {
} }
char msg[100]; char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
found, read); found, read);
thread->stats.AddMessage(msg); thread->stats.AddMessage(msg);
@ -2056,9 +2056,12 @@ class Benchmark {
} }
char msg[100]; char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)\n",
found, read); found, read);
thread->stats.AddMessage(msg); thread->stats.AddMessage(msg);
if (FLAGS_perf_level > 0) {
thread->stats.AddMessage(perf_context.ToString());
}
} }
void SeekRandomWhileWriting(ThreadState* thread) { void SeekRandomWhileWriting(ThreadState* thread) {

@ -36,6 +36,7 @@
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/table_properties_collector.h" #include "db/table_properties_collector.h"
#include "db/tailing_iter.h" #include "db/tailing_iter.h"
#include "db/forward_iterator.h"
#include "db/transaction_log_impl.h" #include "db/transaction_log_impl.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
@ -2578,7 +2579,7 @@ Status DBImpl::ProcessKeyValueCompaction(
cfd->user_comparator()->Compare(ikey.user_key, cfd->user_comparator()->Compare(ikey.user_key,
current_user_key.GetKey()) != 0) { current_user_key.GetKey()) != 0) {
// First occurrence of this user key // First occurrence of this user key
current_user_key.SetUserKey(ikey.user_key); current_user_key.SetKey(ikey.user_key);
has_current_user_key = true; has_current_user_key = true;
last_sequence_for_key = kMaxSequenceNumber; last_sequence_for_key = kMaxSequenceNumber;
visible_in_snapshot = kMaxSequenceNumber; visible_in_snapshot = kMaxSequenceNumber;
@ -3538,7 +3539,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
// not supported in lite version // not supported in lite version
return nullptr; return nullptr;
#else #else
iter = new TailingIterator(env_, this, options, cfd); // TODO(ljin): remove tailing iterator
iter = new ForwardIterator(env_, this, options, cfd);
iter = NewDBIterator(env_, *cfd->options(),
cfd->user_comparator(), iter, kMaxSequenceNumber);
//iter = new TailingIterator(env_, this, options, cfd);
#endif #endif
} else { } else {
SequenceNumber latest_snapshot = versions_->LastSequence(); SequenceNumber latest_snapshot = versions_->LastSequence();

@ -285,6 +285,7 @@ class DBImpl : public DB {
friend class InternalStats; friend class InternalStats;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
friend class TailingIterator; friend class TailingIterator;
friend class ForwardIterator;
#endif #endif
friend struct SuperVersion; friend struct SuperVersion;
struct CompactionState; struct CompactionState;

@ -211,18 +211,18 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
case kTypeDeletion: case kTypeDeletion:
// Arrange to skip all upcoming entries for this key since // Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion. // they are hidden by this deletion.
saved_key_.SetUserKey(ikey.user_key); saved_key_.SetKey(ikey.user_key);
skipping = true; skipping = true;
num_skipped = 0; num_skipped = 0;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1); PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break; break;
case kTypeValue: case kTypeValue:
valid_ = true; valid_ = true;
saved_key_.SetUserKey(ikey.user_key); saved_key_.SetKey(ikey.user_key);
return; return;
case kTypeMerge: case kTypeMerge:
// By now, we are sure the current ikey is going to yield a value // By now, we are sure the current ikey is going to yield a value
saved_key_.SetUserKey(ikey.user_key); saved_key_.SetKey(ikey.user_key);
current_entry_is_merged_ = true; current_entry_is_merged_ = true;
valid_ = true; valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine MergeValuesNewToOld(); // Go to a different state machine
@ -331,7 +331,7 @@ void DBIter::Prev() {
// iter_ is pointing at the current entry. Scan backwards until // iter_ is pointing at the current entry. Scan backwards until
// the key changes so we can use the normal reverse scanning code. // the key changes so we can use the normal reverse scanning code.
assert(iter_->Valid()); // Otherwise valid_ would have been false assert(iter_->Valid()); // Otherwise valid_ would have been false
saved_key_.SetUserKey(ExtractUserKey(iter_->key())); saved_key_.SetKey(ExtractUserKey(iter_->key()));
while (true) { while (true) {
iter_->Prev(); iter_->Prev();
if (!iter_->Valid()) { if (!iter_->Valid()) {
@ -377,7 +377,7 @@ void DBIter::FindPrevUserEntry() {
std::string empty; std::string empty;
swap(empty, saved_value_); swap(empty, saved_value_);
} }
saved_key_.SetUserKey(ExtractUserKey(iter_->key())); saved_key_.SetKey(ExtractUserKey(iter_->key()));
saved_value_.assign(raw_value.data(), raw_value.size()); saved_value_.assign(raw_value.data(), raw_value.size());
} }
} else { } else {

@ -256,10 +256,10 @@ class IterKey {
void Clear() { key_size_ = 0; } void Clear() { key_size_ = 0; }
void SetUserKey(const Slice& user_key) { void SetKey(const Slice& key) {
size_t size = user_key.size(); size_t size = key.size();
EnlargeBufferIfNeeded(size); EnlargeBufferIfNeeded(size);
memcpy(key_, user_key.data(), size); memcpy(key_, key.data(), size);
key_size_ = size; key_size_ = size;
} }

@ -0,0 +1,384 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include "db/forward_iterator.h"
#include <string>
#include <utility>
#include <limits>
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/column_family.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "table/merger.h"
#include "db/dbformat.h"
namespace rocksdb {
// Usage:
// LevelIterator iter;
// iter.SetFileIndex(file_index);
// iter.Seek(target);
// iter.Next()
class LevelIterator : public Iterator {
public:
LevelIterator(const ColumnFamilyData* const cfd,
const ReadOptions& read_options,
const std::vector<FileMetaData*>& files)
: cfd_(cfd), read_options_(read_options), files_(files), valid_(false),
file_index_(std::numeric_limits<uint32_t>::max()) {}
void SetFileIndex(uint32_t file_index) {
assert(file_index < files_.size());
if (file_index != file_index_) {
file_index_ = file_index;
file_iter_.reset(cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
*(files_[file_index_]), nullptr /* table_reader_ptr */, false));
}
valid_ = false;
}
void SeekToLast() override {
status_ = Status::NotSupported("LevelIterator::SeekToLast()");
valid_ = false;
}
void Prev() {
status_ = Status::NotSupported("LevelIterator::Prev()");
valid_ = false;
}
bool Valid() const override {
return valid_;
}
void SeekToFirst() override {
SetFileIndex(0);
file_iter_->SeekToFirst();
valid_ = file_iter_->Valid();
}
void Seek(const Slice& internal_key) override {
assert(file_iter_ != nullptr);
file_iter_->Seek(internal_key);
valid_ = file_iter_->Valid();
assert(valid_);
}
void Next() override {
assert(valid_);
file_iter_->Next();
while (!file_iter_->Valid()) {
if (file_index_ + 1 >= files_.size()) {
valid_ = false;
return;
}
SetFileIndex(file_index_ + 1);
file_iter_->SeekToFirst();
}
valid_ = file_iter_->Valid();
}
Slice key() const override {
assert(valid_);
return file_iter_->key();
}
Slice value() const override {
assert(valid_);
return file_iter_->value();
}
Status status() const override {
return status_;
}
private:
const ColumnFamilyData* const cfd_;
const ReadOptions& read_options_;
const std::vector<FileMetaData*>& files_;
bool valid_;
uint32_t file_index_;
Status status_;
std::unique_ptr<Iterator> file_iter_;
};
ForwardIterator::ForwardIterator(Env* const env, DBImpl* db,
const ReadOptions& read_options, ColumnFamilyData* cfd)
: db_(db),
env_(env),
read_options_(read_options),
cfd_(cfd),
prefix_extractor_(cfd->options()->prefix_extractor.get()),
user_comparator_(cfd->user_comparator()),
immutable_min_heap_(MinIterComparator(&cfd_->internal_comparator())),
sv_(nullptr),
mutable_iter_(nullptr),
current_(nullptr),
valid_(false),
is_prev_set_(false) {}
ForwardIterator::~ForwardIterator() {
Cleanup();
}
void ForwardIterator::Cleanup() {
delete mutable_iter_;
for (auto* m : imm_iters_) {
delete m;
}
imm_iters_.clear();
for (auto* f : l0_iters_) {
delete f;
}
l0_iters_.clear();
for (auto* l : level_iters_) {
delete l;
}
level_iters_.clear();
if (sv_ != nullptr && sv_->Unref()) {
DBImpl::DeletionState deletion_state;
db_->mutex_.Lock();
sv_->Cleanup();
db_->FindObsoleteFiles(deletion_state, false, true);
db_->mutex_.Unlock();
delete sv_;
if (deletion_state.HaveSomethingToDelete()) {
db_->PurgeObsoleteFiles(deletion_state);
}
}
}
bool ForwardIterator::Valid() const {
return valid_;
}
void ForwardIterator::SeekToFirst() {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators();
}
SeekInternal(Slice(), true);
}
void ForwardIterator::Seek(const Slice& internal_key) {
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators();
}
SeekInternal(internal_key, false);
}
void ForwardIterator::SeekInternal(const Slice& internal_key,
bool seek_to_first) {
// mutable
seek_to_first ? mutable_iter_->SeekToFirst() :
mutable_iter_->Seek(internal_key);
// immutable
// TODO(ljin): NeedToSeekImmutable has negative impact on performance
// if it turns to need to seek immutable often. We probably want to have
// an option to turn it off.
if (seek_to_first || NeedToSeekImmutable(internal_key)) {
{
auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
immutable_min_heap_.swap(tmp);
}
for (auto* m : imm_iters_) {
seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
if (m->Valid()) {
immutable_min_heap_.push(m);
}
}
auto* files = sv_->current->files_;
for (uint32_t i = 0; i < files[0].size(); ++i) {
if (seek_to_first) {
l0_iters_[i]->SeekToFirst();
} else {
// If the target key passes over the larget key, we are sure Next()
// won't go over this file.
if (user_comparator_->Compare(ExtractUserKey(internal_key),
files[0][i]->largest.user_key()) > 0) {
continue;
}
l0_iters_[i]->Seek(internal_key);
}
if (l0_iters_[i]->Valid()) {
immutable_min_heap_.push(l0_iters_[i]);
}
}
for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) {
if (files[level].empty()) {
continue;
}
assert(level_iters_[level - 1] != nullptr);
uint32_t f_idx = 0;
if (!seek_to_first) {
f_idx = FindFileInRange(
files[level], internal_key, 0, files[level].size());
}
if (f_idx < files[level].size()) {
level_iters_[level - 1]->SetFileIndex(f_idx);
seek_to_first ? level_iters_[level - 1]->SeekToFirst() :
level_iters_[level - 1]->Seek(internal_key);
if (level_iters_[level - 1]->Valid()) {
immutable_min_heap_.push(level_iters_[level - 1]);
}
}
}
if (seek_to_first || immutable_min_heap_.empty()) {
is_prev_set_ = false;
} else {
prev_key_.SetKey(internal_key);
is_prev_set_ = true;
}
}
UpdateCurrent();
}
void ForwardIterator::Next() {
assert(valid_);
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), current_key.size());
RebuildIterators();
SeekInternal(old_key, false);
if (!valid_ || key().compare(old_key) != 0) {
return;
}
} else if (current_ != mutable_iter_) {
// It is going to advance immutable iterator
prev_key_.SetKey(current_->key());
is_prev_set_ = true;
}
current_->Next();
if (current_->Valid() && current_ != mutable_iter_) {
immutable_min_heap_.push(current_);
}
UpdateCurrent();
}
Slice ForwardIterator::key() const {
assert(valid_);
return current_->key();
}
Slice ForwardIterator::value() const {
assert(valid_);
return current_->value();
}
Status ForwardIterator::status() const {
if (!status_.ok()) {
return status_;
} else if (!mutable_iter_->status().ok()) {
return mutable_iter_->status();
}
return Status::OK();
}
void ForwardIterator::RebuildIterators() {
// Clean up
Cleanup();
// New
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
mutable_iter_ = sv_->mem->NewIterator(read_options_);
sv_->imm->AddIterators(read_options_, &imm_iters_);
const auto& l0_files = sv_->current->files_[0];
l0_iters_.reserve(l0_files.size());
for (const auto* l0 : l0_files) {
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), *l0));
}
level_iters_.reserve(sv_->current->NumberLevels() - 1);
for (int32_t level = 1; level < sv_->current->NumberLevels(); ++level) {
if (sv_->current->files_[level].empty()) {
level_iters_.push_back(nullptr);
} else {
level_iters_.push_back(new LevelIterator(cfd_, read_options_,
sv_->current->files_[level]));
}
}
current_ = nullptr;
is_prev_set_ = false;
}
void ForwardIterator::UpdateCurrent() {
if (immutable_min_heap_.empty() && !mutable_iter_->Valid()) {
current_ = nullptr;
} else if (immutable_min_heap_.empty()) {
current_ = mutable_iter_;
} else if (!mutable_iter_->Valid()) {
current_ = immutable_min_heap_.top();
immutable_min_heap_.pop();
} else {
current_ = immutable_min_heap_.top();
assert(current_ != nullptr);
assert(current_->Valid());
int cmp = cfd_->internal_comparator().InternalKeyComparator::Compare(
mutable_iter_->key(), current_->key()) > 0;
assert(cmp != 0);
if (cmp > 0) {
immutable_min_heap_.pop();
} else {
current_ = mutable_iter_;
}
}
valid_ = (current_ != nullptr);
if (!status_.ok()) {
status_ = Status::OK();
}
}
bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
if (!is_prev_set_) {
return true;
}
Slice prev_key = prev_key_.GetKey();
if (prefix_extractor_ && prefix_extractor_->Transform(target).compare(
prefix_extractor_->Transform(prev_key)) != 0) {
return true;
}
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
prev_key, target) >= 0) {
return true;
}
if (immutable_min_heap_.empty() ||
cfd_->internal_comparator().InternalKeyComparator::Compare(
target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key()
: current_->key()) > 0) {
return true;
}
return false;
}
uint32_t ForwardIterator::FindFileInRange(
const std::vector<FileMetaData*>& files, const Slice& internal_key,
uint32_t left, uint32_t right) {
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
f->largest.Encode(), internal_key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,106 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include <queue>
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "db/dbformat.h"
namespace rocksdb {
class DBImpl;
class Env;
struct SuperVersion;
class ColumnFamilyData;
class LevelIterator;
class FileMetaData;
class MinIterComparator {
public:
explicit MinIterComparator(const Comparator* comparator) :
comparator_(comparator) {}
bool operator()(Iterator* a, Iterator* b) {
return comparator_->Compare(a->key(), b->key()) > 0;
}
private:
const Comparator* comparator_;
};
typedef std::priority_queue<Iterator*,
std::vector<Iterator*>,
MinIterComparator> MinIterHeap;
/**
* ForwardIterator is a special type of iterator that only supports Seek()
* and Next(). It is expected to perform better than TailingIterator by
* removing the encapsulation and making all information accessible within
* the iterator. At the current implementation, snapshot is taken at the
* time Seek() is called. The Next() followed do not see new values after.
*/
class ForwardIterator : public Iterator {
public:
ForwardIterator(Env* const env, DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd);
virtual ~ForwardIterator();
void SeekToLast() override {
status_ = Status::NotSupported("ForwardIterator::SeekToLast()");
valid_ = false;
}
void Prev() {
status_ = Status::NotSupported("ForwardIterator::Prev");
valid_ = false;
}
virtual bool Valid() const override;
void SeekToFirst() override;
virtual void Seek(const Slice& target) override;
virtual void Next() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual Status status() const override;
private:
void Cleanup();
void RebuildIterators();
void SeekInternal(const Slice& internal_key, bool seek_to_first);
void UpdateCurrent();
bool NeedToSeekImmutable(const Slice& internal_key);
uint32_t FindFileInRange(
const std::vector<FileMetaData*>& files, const Slice& internal_key,
uint32_t left, uint32_t right);
DBImpl* const db_;
Env* const env_;
const ReadOptions read_options_;
ColumnFamilyData* const cfd_;
const SliceTransform* const prefix_extractor_;
const Comparator* user_comparator_;
MinIterHeap immutable_min_heap_;
SuperVersion* sv_;
Iterator* mutable_iter_;
std::vector<Iterator*> imm_iters_;
std::vector<Iterator*> l0_iters_;
std::vector<LevelIterator*> level_iters_;
Iterator* current_;
// internal iterator status
Status status_;
bool valid_;
IterKey prev_key_;
bool is_prev_set_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -218,6 +218,7 @@ class Version {
friend class LevelCompactionPicker; friend class LevelCompactionPicker;
friend class UniversalCompactionPicker; friend class UniversalCompactionPicker;
friend class FIFOCompactionPicker; friend class FIFOCompactionPicker;
friend class ForwardIterator;
class LevelFileNumIterator; class LevelFileNumIterator;
class LevelFileIteratorState; class LevelFileIteratorState;

@ -8,6 +8,9 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once #pragma once
#include "rocksdb/iterator.h"
namespace rocksdb { namespace rocksdb {
// A internal wrapper class with an interface similar to Iterator that // A internal wrapper class with an interface similar to Iterator that

Loading…
Cancel
Save