diff --git a/db/db_impl.cc b/db/db_impl.cc index 35517c22b..ba8e85b8a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include "db/builder.h" @@ -32,6 +33,7 @@ #include "db/prefix_filter_iterator.h" #include "db/table_cache.h" #include "db/table_properties_collector.h" +#include "db/tailing_iter.h" #include "db/transaction_log_impl.h" #include "db/version_set.h" #include "db/write_batch_internal.h" @@ -264,6 +266,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mem_(new MemTable(internal_comparator_, options_)), logfile_number_(0), super_version_(nullptr), + super_version_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), bg_manual_only_(0), @@ -1413,6 +1416,10 @@ int DBImpl::Level0StopWriteTrigger() { return options_.level0_stop_writes_trigger; } +uint64_t DBImpl::CurrentVersionNumber() const { + return super_version_number_.load(); +} + Status DBImpl::Flush(const FlushOptions& options) { Status status = FlushMemTable(options); return status; @@ -2652,11 +2659,14 @@ static void CleanupIteratorState(void* arg1, void* arg2) { deletion_state.memtables_to_free.push_back(m); } } - state->version->Unref(); + if (state->version) { // not set for memtable-only iterator + state->version->Unref(); + } // fast path FindObsoleteFiles state->db->FindObsoleteFiles(deletion_state, false, true); state->mu->Unlock(); state->db->PurgeObsoleteFiles(deletion_state); + delete state; } } // namespace @@ -2678,7 +2688,6 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, for (unsigned int i = 0; i < immutables.size(); i++) { immutables[i]->Ref(); } - // Collect iterators for files in L0 - Ln versions_->current()->Ref(); version = versions_->current(); mutex_.Unlock(); @@ -2686,10 +2695,13 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, std::vector list; list.push_back(mutable_mem->NewIterator(options)); cleanup->mem.push_back(mutable_mem); + + // Collect all needed child iterators for immutable memtables for (MemTable* m : immutables) { list.push_back(m->NewIterator(options)); cleanup->mem.push_back(m); } + // Collect iterators for files in L0 - Ln version->AddIterators(options, storage_options_, &list); Iterator* internal_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size()); @@ -2706,6 +2718,66 @@ Iterator* DBImpl::TEST_NewInternalIterator() { return NewInternalIterator(ReadOptions(), &ignored); } +std::pair DBImpl::GetTailingIteratorPair( + const ReadOptions& options, + uint64_t* superversion_number) { + + MemTable* mutable_mem; + std::vector immutables; + Version* version; + + immutables.reserve(options_.max_write_buffer_number); + + // get all child iterators and bump their refcounts under lock + mutex_.Lock(); + mutable_mem = mem_; + mutable_mem->Ref(); + imm_.GetMemTables(&immutables); + for (size_t i = 0; i < immutables.size(); ++i) { + immutables[i]->Ref(); + } + version = versions_->current(); + version->Ref(); + if (superversion_number != nullptr) { + *superversion_number = CurrentVersionNumber(); + } + mutex_.Unlock(); + + Iterator* mutable_iter = mutable_mem->NewIterator(options); + IterState* mutable_cleanup = new IterState(); + mutable_cleanup->mem.push_back(mutable_mem); + mutable_cleanup->db = this; + mutable_cleanup->mu = &mutex_; + mutable_iter->RegisterCleanup(CleanupIteratorState, mutable_cleanup, nullptr); + + // create a DBIter that only uses memtable content; see NewIterator() + mutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), + mutable_iter, kMaxSequenceNumber); + + Iterator* immutable_iter; + IterState* immutable_cleanup = new IterState(); + std::vector list; + for (MemTable* m : immutables) { + list.push_back(m->NewIterator(options)); + immutable_cleanup->mem.push_back(m); + } + version->AddIterators(options, storage_options_, &list); + immutable_cleanup->version = version; + immutable_cleanup->db = this; + immutable_cleanup->mu = &mutex_; + + immutable_iter = + NewMergingIterator(&internal_comparator_, &list[0], list.size()); + immutable_iter->RegisterCleanup(CleanupIteratorState, immutable_cleanup, + nullptr); + + // create a DBIter that only uses memtable content; see NewIterator() + immutable_iter = NewDBIterator(&dbname_, env_, options_, user_comparator(), + immutable_iter, kMaxSequenceNumber); + + return std::make_pair(mutable_iter, immutable_iter); +} + int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { MutexLock l(&mutex_); return versions_->current()->MaxNextLevelOverlappingBytes(); @@ -2748,6 +2820,7 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion( new_superversion->Init(mem_, imm_, versions_->current()); SuperVersion* old_superversion = super_version_; super_version_ = new_superversion; + ++super_version_number_; if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex @@ -2930,13 +3003,21 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, } Iterator* DBImpl::NewIterator(const ReadOptions& options) { - SequenceNumber latest_snapshot; - Iterator* iter = NewInternalIterator(options, &latest_snapshot); - iter = NewDBIterator( - &dbname_, env_, options_, user_comparator(), iter, - (options.snapshot != nullptr - ? reinterpret_cast(options.snapshot)->number_ - : latest_snapshot)); + Iterator* iter; + + if (options.tailing) { + iter = new TailingIterator(this, options, user_comparator()); + } else { + SequenceNumber latest_snapshot; + iter = NewInternalIterator(options, &latest_snapshot); + + iter = NewDBIterator( + &dbname_, env_, options_, user_comparator(), iter, + (options.snapshot != nullptr + ? reinterpret_cast(options.snapshot)->number_ + : latest_snapshot)); + } + if (options.prefix) { // use extra wrapper to exclude any keys from the results which // don't begin with the prefix diff --git a/db/db_impl.h b/db/db_impl.h index da888e4b4..f1a25aa4b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -10,6 +10,7 @@ #include #include #include +#include #include #include "db/dbformat.h" #include "db/log_writer.h" @@ -256,6 +257,7 @@ class DBImpl : public DB { private: friend class DB; + friend class TailingIterator; struct CompactionState; struct Writer; @@ -359,6 +361,17 @@ class DBImpl : public DB { // hold the data set. void ReFitLevel(int level, int target_level = -1); + // Returns the current SuperVersion number. + uint64_t CurrentVersionNumber() const; + + // Returns a pair of iterators (mutable-only and immutable-only) used + // internally by TailingIterator and stores CurrentVersionNumber() in + // *superversion_number. These iterators are always up-to-date, i.e. can + // be used to read new data. + std::pair GetTailingIteratorPair( + const ReadOptions& options, + uint64_t* superversion_number); + // Constant after construction const InternalFilterPolicy internal_filter_policy_; bool owns_info_log_; @@ -381,6 +394,11 @@ class DBImpl : public DB { SuperVersion* super_version_; + // An ordinal representing the current SuperVersion. Updated by + // InstallSuperVersion(), i.e. incremented every time super_version_ + // changes. + std::atomic super_version_number_; + std::string host_name_; // Queue of writers. diff --git a/db/db_test.cc b/db/db_test.cc index 2445d585a..0ecd8cf40 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5034,6 +5034,118 @@ void BM_LogAndApply(int iters, int num_base_files) { buf, iters, us, ((float)us) / iters); } +TEST(DBTest, TailingIteratorSingle) { + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + ASSERT_TRUE(!iter->Valid()); + + // add a record and check that iter can see it + ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "mirko"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + +TEST(DBTest, TailingIteratorKeepAdding) { + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + std::string value(1024, 'a'); + + const int num_records = 10000; + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "%016d", i); + + Slice key(buf, 16); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST(DBTest, TailingIteratorDeletes) { + ReadOptions read_options; + read_options.tailing = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + + // write a single record, read it using the iterator, then delete it + ASSERT_OK(db_->Put(WriteOptions(), "0test", "test")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0test"); + ASSERT_OK(db_->Delete(WriteOptions(), "0test")); + + // write many more records + const int num_records = 10000; + std::string value(1024, 'A'); + + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "1%015d", i); + + Slice key(buf, 16); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + } + + // force a flush to make sure that no records are read from memtable + dbfull()->TEST_FlushMemTable(); + + // skip "0test" + iter->Next(); + + // make sure we can read all new records using the existing iterator + int count = 0; + for (; iter->Valid(); iter->Next(), ++count) ; + + ASSERT_EQ(count, num_records); +} + +TEST(DBTest, TailingIteratorPrefixSeek) { + ReadOptions read_options; + read_options.tailing = true; + read_options.prefix_seek = true; + + auto prefix_extractor = NewFixedPrefixTransform(2); + + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.prefix_extractor = prefix_extractor; + options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor)); + DestroyAndReopen(&options); + + std::unique_ptr iter(db_->NewIterator(read_options)); + ASSERT_OK(db_->Put(WriteOptions(), "0101", "test")); + + dbfull()->TEST_FlushMemTable(); + + ASSERT_OK(db_->Put(WriteOptions(), "0202", "test")); + + // Seek(0102) shouldn't find any records since 0202 has a different prefix + iter->Seek("0102"); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("0202"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0202"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/tailing_iter.cc b/db/tailing_iter.cc new file mode 100644 index 000000000..f448ac15d --- /dev/null +++ b/db/tailing_iter.cc @@ -0,0 +1,173 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/tailing_iter.h" + +#include +#include +#include "db/db_impl.h" + +namespace rocksdb { + +TailingIterator::TailingIterator(DBImpl* db, const ReadOptions& options, + const Comparator* comparator) + : db_(db), options_(options), comparator_(comparator), + version_number_(0), current_(nullptr), + status_(Status::InvalidArgument("Seek() not called on this iterator")) {} + +bool TailingIterator::Valid() const { + return current_ != nullptr; +} + +void TailingIterator::SeekToFirst() { + if (!IsCurrentVersion()) { + CreateIterators(); + } + + mutable_->SeekToFirst(); + immutable_->SeekToFirst(); + UpdateCurrent(); +} + +void TailingIterator::Seek(const Slice& target) { + if (!IsCurrentVersion()) { + CreateIterators(); + } + + mutable_->Seek(target); + + // We maintain the interval (prev_key_, immutable_->key()] such that there + // are no records with keys within that range in immutable_ other than + // immutable_->key(). Since immutable_ can't change in this version, we don't + // need to do a seek if 'target' belongs to that interval (i.e. immutable_ is + // already at the correct position)! + // + // If options.prefix_seek is used and immutable_ is not valid, seek if target + // has a different prefix than prev_key. + // + // prev_key_ is updated by Next(). SeekImmutable() sets prev_key_ to + // 'target' -- in this case, prev_key_ is included in the interval, so + // prev_inclusive_ has to be set. + + if (!is_prev_set_ || + comparator_->Compare(prev_key_, target) >= !is_prev_inclusive_ || + (immutable_->Valid() && + comparator_->Compare(target, immutable_->key()) > 0) || + (options_.prefix_seek && !IsSamePrefix(target))) { + SeekImmutable(target); + } + + UpdateCurrent(); +} + +void TailingIterator::Next() { + assert(Valid()); + + if (!IsCurrentVersion()) { + // save the current key, create new iterators and then seek + std::string current_key = key().ToString(); + Slice key_slice(current_key.data(), current_key.size()); + + CreateIterators(); + Seek(key_slice); + + if (!Valid() || key().compare(key_slice) != 0) { + // record with current_key no longer exists + return; + } + + } else if (current_ == immutable_.get()) { + // immutable iterator is advanced -- update prev_key_ + prev_key_ = key().ToString(); + is_prev_inclusive_ = false; + is_prev_set_ = true; + } + + current_->Next(); + UpdateCurrent(); +} + +Slice TailingIterator::key() const { + assert(Valid()); + return current_->key(); +} + +Slice TailingIterator::value() const { + assert(Valid()); + return current_->value(); +} + +Status TailingIterator::status() const { + if (!status_.ok()) { + return status_; + } else if (!mutable_->status().ok()) { + return mutable_->status(); + } else { + return immutable_->status(); + } +} + +void TailingIterator::Prev() { + status_ = Status::NotSupported("This iterator doesn't support Prev()"); +} + +void TailingIterator::SeekToLast() { + status_ = Status::NotSupported("This iterator doesn't support SeekToLast()"); +} + +void TailingIterator::CreateIterators() { + std::pair iters = + db_->GetTailingIteratorPair(options_, &version_number_); + + assert(iters.first && iters.second); + + mutable_.reset(iters.first); + immutable_.reset(iters.second); + current_ = nullptr; + is_prev_set_ = false; +} + +void TailingIterator::UpdateCurrent() { + current_ = nullptr; + + if (mutable_->Valid()) { + current_ = mutable_.get(); + } + if (immutable_->Valid() && + (current_ == nullptr || + comparator_->Compare(immutable_->key(), current_->key()) < 0)) { + current_ = immutable_.get(); + } + + if (!status_.ok()) { + // reset status that was set by Prev() or SeekToLast() + status_ = Status::OK(); + } +} + +bool TailingIterator::IsCurrentVersion() const { + return mutable_ != nullptr && immutable_ != nullptr && + version_number_ == db_->CurrentVersionNumber(); +} + +bool TailingIterator::IsSamePrefix(const Slice& target) const { + const SliceTransform* extractor = db_->options_.prefix_extractor; + + assert(extractor); + assert(is_prev_set_); + + return extractor->Transform(target) + .compare(extractor->Transform(prev_key_)) == 0; +} + +void TailingIterator::SeekImmutable(const Slice& target) { + prev_key_ = target.ToString(); + is_prev_inclusive_ = true; + is_prev_set_ = true; + + immutable_->Seek(target); +} + +} // namespace rocksdb diff --git a/db/tailing_iter.h b/db/tailing_iter.h new file mode 100644 index 000000000..3b8343a28 --- /dev/null +++ b/db/tailing_iter.h @@ -0,0 +1,88 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include + +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" + +namespace rocksdb { + +class DBImpl; + +/** + * TailingIterator is a special type of iterator that doesn't use an (implicit) + * snapshot. In other words, it can be used to read data that was added to the + * db after the iterator had been created. + * + * TailingIterator is optimized for sequential reading. It doesn't support + * Prev() and SeekToLast() operations. + */ +class TailingIterator : public Iterator { + public: + TailingIterator(DBImpl* db, const ReadOptions& options, + const Comparator* comparator); + virtual ~TailingIterator() {} + + virtual bool Valid() const override; + virtual void SeekToFirst() override; + virtual void SeekToLast() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual void Prev() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + + private: + DBImpl* const db_; + const ReadOptions options_; + const Comparator* const comparator_; + uint64_t version_number_; + + // TailingIterator merges the contents of the two iterators below (one using + // mutable memtable contents only, other over SSTs and immutable memtables). + // See DBIter::GetTailingIteratorPair(). + std::unique_ptr mutable_; + std::unique_ptr immutable_; + + // points to either mutable_ or immutable_ + Iterator* current_; + + // key that precedes immutable iterator's current key + std::string prev_key_; + + // unless prev_set is true, prev_key/prev_head is not valid and shouldn't be + // used; reset by createIterators() + bool is_prev_set_; + + // prev_key_ was set by SeekImmutable(), which means that the interval of + // keys covered by immutable_ is [prev_key_, current], i.e. it includes the + // left endpoint + bool is_prev_inclusive_; + + // internal iterator status + Status status_; + + // check if this iterator's version matches DB's version + bool IsCurrentVersion() const; + + // check if SeekImmutable() is needed due to target having a different prefix + // than prev_key_ (used when options.prefix_seek is set) + bool IsSamePrefix(const Slice& target) const; + + // creates mutable_ and immutable_ iterators and updates version_number_ + void CreateIterators(); + + // set current_ to be one of the iterators with the smallest key + void UpdateCurrent(); + + // seek on immutable_ and update prev_key + void SeekImmutable(const Slice& target); +}; + +} // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b84bdcf38..96eadcd5a 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -711,20 +711,27 @@ struct ReadOptions { // Default: kReadAllTier ReadTier read_tier; + // Specify to create a tailing iterator -- a special iterator that has a + // view of the complete database (i.e. it can also be used to read newly + // added data) and is optimized for sequential reads. + bool tailing; + ReadOptions() : verify_checksums(false), fill_cache(true), prefix_seek(false), snapshot(nullptr), prefix(nullptr), - read_tier(kReadAllTier) {} + read_tier(kReadAllTier), + tailing(false) {} ReadOptions(bool cksum, bool cache) : verify_checksums(cksum), fill_cache(cache), prefix_seek(false), snapshot(nullptr), prefix(nullptr), - read_tier(kReadAllTier) {} + read_tier(kReadAllTier), + tailing(false) {} }; // Options that control write operations