From 7d817268b989af6ccd0b273e47765be9c4619e98 Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Wed, 18 Feb 2015 11:49:31 -0800 Subject: [PATCH] Managed iterator Summary: This is a diff for managed iterator. A managed iterator is a wrapper around an iterator which saves the options for that iterator as well as the current key/value so that the underlying iterator and its associated memory can be released when it is aged out automatically or on the request of the user. Will provide the automatic release as a follow-up diff. Test Plan: Managed* tests in db_test and XF tests for managed iterator Reviewers: igor, yhchiang, anthony, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D31401 --- db/db_impl.cc | 43 +++++- db/db_test.cc | 297 ++++++++++++++++++++++++++++++++++++++ db/managed_iterator.cc | 255 ++++++++++++++++++++++++++++++++ db/managed_iterator.h | 84 +++++++++++ include/rocksdb/options.h | 25 ++-- util/options.cc | 28 ++++ util/xfunc.cc | 42 ++++++ util/xfunc.h | 11 ++ 8 files changed, 766 insertions(+), 19 deletions(-) create mode 100644 db/managed_iterator.cc create mode 100644 db/managed_iterator.h diff --git a/db/db_impl.cc b/db/db_impl.cc index da304e505..023946196 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -35,6 +35,7 @@ #include "db/job_context.h" #include "db/log_reader.h" #include "db/log_writer.h" +#include "db/managed_iterator.h" #include "db/memtable.h" #include "db/memtable_list.h" #include "db/merge_context.h" @@ -80,6 +81,7 @@ #include "util/string_util.h" #include "util/thread_status_updater.h" #include "util/thread_status_util.h" +#include "util/xfunc.h" namespace rocksdb { @@ -2788,7 +2790,24 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - if (read_options.tailing) { + XFUNC_TEST("", "managed_new", managed_new1, xf_manage_new, + reinterpret_cast(this), + const_cast(&read_options), is_snapshot_supported_); + if (read_options.managed) { +#ifdef ROCKSDB_LITE + // not supported in lite version + return NewErrorIterator(Status::InvalidArgument( + "Managed Iterators not supported in RocksDBLite.")); +#else + if ((read_options.tailing) || (read_options.snapshot != nullptr) || + (is_snapshot_supported_)) { + return new ManagedIterator(this, read_options, cfd); + } + // Managed iter not supported + return NewErrorIterator(Status::InvalidArgument( + "Managed Iterators not supported without snapshots.")); +#endif + } else if (read_options.tailing) { #ifdef ROCKSDB_LITE // not supported in lite version return nullptr; @@ -2873,8 +2892,26 @@ Status DBImpl::NewIterators( std::vector* iterators) { iterators->clear(); iterators->reserve(column_families.size()); - - if (read_options.tailing) { + XFUNC_TEST("", "managed_new", managed_new1, xf_manage_new, + reinterpret_cast(this), + const_cast(&read_options), is_snapshot_supported_); + if (read_options.managed) { +#ifdef ROCKSDB_LITE + return Status::InvalidArgument( + "Managed interator not supported in RocksDB lite"); +#else + if ((!read_options.tailing) && (read_options.snapshot == nullptr) && + (!is_snapshot_supported_)) { + return Status::InvalidArgument( + "Managed interator not supported without snapshots"); + } + for (auto cfh : column_families) { + auto cfd = reinterpret_cast(cfh)->cfd(); + auto iter = new ManagedIterator(this, read_options, cfd); + iterators->push_back(iter); + } +#endif + } else if (read_options.tailing) { #ifdef ROCKSDB_LITE return Status::InvalidArgument( "Tailing interator not supported in RocksDB lite"); diff --git a/db/db_test.cc b/db/db_test.cc index dd120d51d..3d352e25d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1943,6 +1943,71 @@ TEST(DBTest, NonBlockingIteration) { kSkipMmapReads)); } +TEST(DBTest, ManagedNonBlockingIteration) { + do { + ReadOptions non_blocking_opts, regular_opts; + Options options = CurrentOptions(); + options.statistics = rocksdb::CreateDBStatistics(); + non_blocking_opts.read_tier = kBlockCacheTier; + non_blocking_opts.managed = true; + CreateAndReopenWithCF({"pikachu"}, options); + // write one kv to the database. + ASSERT_OK(Put(1, "a", "b")); + + // scan using non-blocking iterator. We should find it because + // it is in memtable. + Iterator* iter = db_->NewIterator(non_blocking_opts, handles_[1]); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + count++; + } + ASSERT_EQ(count, 1); + delete iter; + + // flush memtable to storage. Now, the key should not be in the + // memtable neither in the block cache. + ASSERT_OK(Flush(1)); + + // verify that a non-blocking iterator does not find any + // kvs. Neither does it do any IOs to storage. + int64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS); + int64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD); + iter = db_->NewIterator(non_blocking_opts, handles_[1]); + count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + count++; + } + ASSERT_EQ(count, 0); + ASSERT_TRUE(iter->status().IsIncomplete()); + ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS)); + ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + delete iter; + + // read in the specified block via a regular get + ASSERT_EQ(Get(1, "a"), "b"); + + // verify that we can find it via a non-blocking scan + numopen = TestGetTickerCount(options, NO_FILE_OPENS); + cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD); + iter = db_->NewIterator(non_blocking_opts, handles_[1]); + count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + count++; + } + ASSERT_EQ(count, 1); + ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS)); + ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + delete iter; + + // This test verifies block cache behaviors, which is not used by plain + // table format. + // Exclude kHashCuckoo as it does not support iteration currently + } while (ChangeOptions(kSkipPlainTable | kSkipNoSeekToLast | kSkipHashCuckoo | + kSkipMmapReads)); +} + // A delete is skipped for key if KeyMayExist(key) returns False // Tests Writebatch consistency and proper delete behaviour TEST(DBTest, FilterDeletes) { @@ -8503,6 +8568,8 @@ void PrefixScanInit(DBTest *dbtest) { } // namespace TEST(DBTest, PrefixScan) { + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, + kSkipNoPrefix); while (ChangeFilterOptions()) { int count; Slice prefix; @@ -8546,6 +8613,7 @@ TEST(DBTest, PrefixScan) { ASSERT_EQ(env_->random_read_counter_.Read(), 2); Close(); } // end of while + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); } TEST(DBTest, TailingIteratorSingle) { @@ -8675,6 +8743,8 @@ TEST(DBTest, TailingIteratorDeletes) { } TEST(DBTest, TailingIteratorPrefixSeek) { + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, + kSkipNoPrefix); ReadOptions read_options; read_options.tailing = true; @@ -8704,6 +8774,7 @@ TEST(DBTest, TailingIteratorPrefixSeek) { iter->Next(); ASSERT_TRUE(!iter->Valid()); + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); } TEST(DBTest, TailingIteratorIncomplete) { @@ -8763,6 +8834,232 @@ TEST(DBTest, TailingIteratorSeekToSame) { ASSERT_EQ(found, iter->key().ToString()); } +TEST(DBTest, ManagedTailingIteratorSingle) { + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + ASSERT_TRUE(!iter->Valid()); + + // add a record and check that iter can see it + ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "mirko"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + +TEST(DBTest, ManagedTailingIteratorKeepAdding) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 10000; + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "%016d", i); + + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST(DBTest, ManagedTailingIteratorSeekToNext) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 1000; + for (int i = 1; i < num_records; ++i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } + for (int i = 2 * num_records; i > 0; --i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST(DBTest, ManagedTailingIteratorDeletes) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + + // write a single record, read it using the iterator, then delete it + ASSERT_OK(Put(1, "0test", "test")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0test"); + ASSERT_OK(Delete(1, "0test")); + + // write many more records + const int num_records = 10000; + std::string value(1024, 'A'); + + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "1%015d", i); + + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + } + + // force a flush to make sure that no records are read from memtable + ASSERT_OK(Flush(1)); + + // skip "0test" + iter->Next(); + + // make sure we can read all new records using the existing iterator + int count = 0; + for (; iter->Valid(); iter->Next(), ++count) { + } + + ASSERT_EQ(count, num_records); +} + +TEST(DBTest, ManagedTailingIteratorPrefixSeek) { + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, + kSkipNoPrefix); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.prefix_extractor.reset(NewFixedPrefixTransform(2)); + options.memtable_factory.reset(NewHashSkipListRepFactory(16)); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(Put(1, "0101", "test")); + + ASSERT_OK(Flush(1)); + + ASSERT_OK(Put(1, "0202", "test")); + + // Seek(0102) shouldn't find any records since 0202 has a different prefix + iter->Seek("0102"); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("0202"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0202"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); +} + +TEST(DBTest, ManagedTailingIteratorIncomplete) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + read_options.read_tier = kBlockCacheTier; + + std::string key = "key"; + std::string value = "value"; + + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + // we either see the entry or it's not in cache + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); + + ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + iter->SeekToFirst(); + // should still be true after compaction + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); +} + +TEST(DBTest, ManagedTailingIteratorSeekToSame) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 1000; + CreateAndReopenWithCF({"pikachu"}, options); + + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + const int NROWS = 10000; + // Write rows with keys 00000, 00002, 00004 etc. + for (int i = 0; i < NROWS; ++i) { + char buf[100]; + snprintf(buf, sizeof(buf), "%05d", 2 * i); + std::string key(buf); + std::string value("value"); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + } + + std::unique_ptr iter(db_->NewIterator(read_options)); + // Seek to 00001. We expect to find 00002. + std::string start_key = "00001"; + iter->Seek(start_key); + ASSERT_TRUE(iter->Valid()); + + std::string found = iter->key().ToString(); + ASSERT_EQ("00002", found); + + // Now seek to the same key. The iterator should remain in the same + // position. + iter->Seek(found); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(found, iter->key().ToString()); +} + TEST(DBTest, BlockBasedTablePrefixIndexTest) { // create a DB with block prefix index BlockBasedTableOptions table_options; diff --git a/db/managed_iterator.cc b/db/managed_iterator.cc new file mode 100644 index 000000000..2a783a0e2 --- /dev/null +++ b/db/managed_iterator.cc @@ -0,0 +1,255 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_impl.h" +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "db/managed_iterator.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "table/merger.h" +#include "util/xfunc.h" + +namespace rocksdb { + +namespace { +// Helper class that locks a mutex on construction and unlocks the mutex when +// the destructor of the MutexLock object is invoked. +// +// Typical usage: +// +// void MyClass::MyMethod() { +// MILock l(&mu_); // mu_ is an instance variable +// ... some complex code, possibly with multiple return paths ... +// } + +class MILock { + public: + explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) { + this->mu_->lock(); + } + ~MILock() { + this->mu_->unlock(); + XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1, + xf_manage_release, mi_); + } + + private: + std::mutex* const mu_; + ManagedIterator* mi_; + // No copying allowed + MILock(const MILock&) = delete; + void operator=(const MILock&) = delete; +}; +} // anonymous namespace + +// +// Synchronization between modifiers, releasers, creators +// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use +// if modifying mutable_iter, atomically exchange in_use: +// return if in_use set / otherwise set in use, +// atomically replace new iter with old , reset in use +// The releaser is the new operation and it holds a lock for a very short time +// The existing non-const iterator operations are supposed to be single +// threaded and hold the lock for the duration of the operation +// The existing const iterator operations use the cached key/values +// and don't do any locking. +ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd) + : db_(db), + read_options_(read_options), + cfd_(cfd), + svnum_(cfd->GetSuperVersionNumber()), + mutable_iter_(nullptr), + valid_(false), + snapshot_created_(false), + release_supported_(true) { + read_options_.managed = false; + if ((!read_options_.tailing) && (read_options_.snapshot == nullptr)) { + assert(read_options_.snapshot = db_->GetSnapshot()); + snapshot_created_ = true; + } + cfh_.SetCFD(cfd); + mutable_iter_ = unique_ptr(db->NewIterator(read_options_, &cfh_)); + XFUNC_TEST("managed_xftest_dropold", "managed_create", xf_managed_create1, + xf_manage_create, this); +} + +ManagedIterator::~ManagedIterator() { + Lock(); + if (snapshot_created_) { + db_->ReleaseSnapshot(read_options_.snapshot); + snapshot_created_ = false; + read_options_.snapshot = nullptr; + } +} + +bool ManagedIterator::Valid() const { return valid_; } + +void ManagedIterator::SeekToLast() { + MILock l(&in_use_, this); + if (NeedToRebuild()) { + RebuildIterator(); + } + assert(mutable_iter_ != nullptr); + mutable_iter_->SeekToLast(); + if (mutable_iter_->status().ok()) { + UpdateCurrent(); + } +} + +void ManagedIterator::SeekToFirst() { + MILock l(&in_use_, this); + SeekInternal(Slice(), true); +} + +void ManagedIterator::Seek(const Slice& user_key) { + MILock l(&in_use_, this); + SeekInternal(user_key, false); +} + +void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) { + if (NeedToRebuild()) { + RebuildIterator(); + } + assert(mutable_iter_ != nullptr); + if (seek_to_first) { + mutable_iter_->SeekToFirst(); + } else { + mutable_iter_->Seek(user_key); + } + UpdateCurrent(); +} + +void ManagedIterator::Prev() { + if (!valid_) { + status_ = Status::InvalidArgument("Iterator value invalid"); + return; + } + MILock l(&in_use_, this); + if (NeedToRebuild()) { + std::string current_key = key().ToString(); + Slice old_key(current_key); + RebuildIterator(); + SeekInternal(old_key, false); + UpdateCurrent(); + if (!valid_) { + return; + } + if (key().compare(old_key) != 0) { + valid_ = false; + status_ = Status::Incomplete("Cannot do Prev now"); + return; + } + } + mutable_iter_->Prev(); + if (mutable_iter_->status().ok()) { + UpdateCurrent(); + status_ = Status::OK(); + } else { + status_ = mutable_iter_->status(); + } +} + +void ManagedIterator::Next() { + if (!valid_) { + status_ = Status::InvalidArgument("Iterator value invalid"); + return; + } + MILock l(&in_use_, this); + if (NeedToRebuild()) { + std::string current_key = key().ToString(); + Slice old_key(current_key.data(), cached_key_.Size()); + RebuildIterator(); + SeekInternal(old_key, false); + UpdateCurrent(); + if (!valid_) { + return; + } + if (key().compare(old_key) != 0) { + valid_ = false; + status_ = Status::Incomplete("Cannot do Next now"); + return; + } + } + mutable_iter_->Next(); + UpdateCurrent(); +} + +Slice ManagedIterator::key() const { + assert(valid_); + return cached_key_.GetKey(); +} + +Slice ManagedIterator::value() const { + assert(valid_); + return cached_value_.GetKey(); +} + +Status ManagedIterator::status() const { return status_; } + +void ManagedIterator::RebuildIterator() { + svnum_ = cfd_->GetSuperVersionNumber(); + mutable_iter_ = unique_ptr(db_->NewIterator(read_options_, &cfh_)); +} + +void ManagedIterator::UpdateCurrent() { + assert(mutable_iter_ != nullptr); + + if (!(valid_ = mutable_iter_->Valid())) { + status_ = mutable_iter_->status(); + return; + } + + status_ = Status::OK(); + cached_key_.SetKey(mutable_iter_->key()); + cached_value_.SetKey(mutable_iter_->value()); +} + +void ManagedIterator::ReleaseIter(bool only_old) { + if ((mutable_iter_ == nullptr) || (!release_supported_)) { + return; + } + if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) { + if (!TryLock()) { // Don't release iter if in use + return; + } + mutable_iter_ = nullptr; // in_use for a very short time + UnLock(); + } +} + +bool ManagedIterator::NeedToRebuild() { + if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) || + (!only_drop_old_ && (svnum_ != cfd_->GetSuperVersionNumber()))) { + return true; + } + return false; +} + +void ManagedIterator::Lock() { + in_use_.lock(); + return; +} + +bool ManagedIterator::TryLock() { return in_use_.try_lock(); } + +void ManagedIterator::UnLock() { + in_use_.unlock(); + XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1, + xf_manage_release, this); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/db/managed_iterator.h b/db/managed_iterator.h new file mode 100644 index 000000000..3a551dff9 --- /dev/null +++ b/db/managed_iterator.h @@ -0,0 +1,84 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include + +#include "db/column_family.h" +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "util/arena.h" + +namespace rocksdb { + +class DBImpl; +struct SuperVersion; +class ColumnFamilyData; + +/** + * ManagedIterator is a special type of iterator that supports freeing the + * underlying iterator and still being able to access the current key/value + * pair. This is done by copying the key/value pair so that clients can + * continue to access the data without getting a SIGSEGV. + * The underlying iterator can be freed manually through the call to + * ReleaseIter or automatically (as needed on space pressure or age.) + * The iterator is recreated using the saved original arguments. + */ +class ManagedIterator : public Iterator { + public: + ManagedIterator(DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd); + virtual ~ManagedIterator(); + + virtual void SeekToLast() override; + virtual void Prev() override; + virtual bool Valid() const override; + void SeekToFirst() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + void ReleaseIter(bool only_old); + void SetDropOld(bool only_old) { + only_drop_old_ = read_options_.tailing || only_old; + } + + private: + void RebuildIterator(); + void UpdateCurrent(); + void SeekInternal(const Slice& user_key, bool seek_to_first); + bool NeedToRebuild(); + void Lock(); + bool TryLock(); + void UnLock(); + DBImpl* const db_; + ReadOptions read_options_; + ColumnFamilyData* const cfd_; + ColumnFamilyHandleInternal cfh_; + + uint64_t svnum_; + std::unique_ptr mutable_iter_; + // internal iterator status + Status status_; + bool valid_; + + IterKey cached_key_; + IterKey cached_value_; + + bool only_drop_old_ = true; + bool snapshot_created_; + bool release_supported_; + std::mutex in_use_; // is managed iterator in use +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 0541a7b34..bfe88fe0c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1021,27 +1021,20 @@ struct ReadOptions { // Not supported in ROCKSDB_LITE mode! bool tailing; + // Specify to create a managed iterator -- a special iterator that + // uses less resources by having the ability to free its underlying + // resources on request. + // Default: false + // Not supported in ROCKSDB_LITE mode! + bool managed; + // Enable a total order seek regardless of index format (e.g. hash index) // used in the table. Some table format (e.g. plain table) may not support // this option. bool total_order_seek; - ReadOptions() - : verify_checksums(true), - fill_cache(true), - snapshot(nullptr), - iterate_upper_bound(nullptr), - read_tier(kReadAllTier), - tailing(false), - total_order_seek(false) {} - ReadOptions(bool cksum, bool cache) - : verify_checksums(cksum), - fill_cache(cache), - snapshot(nullptr), - iterate_upper_bound(nullptr), - read_tier(kReadAllTier), - tailing(false), - total_order_seek(false) {} + ReadOptions(); + ReadOptions(bool cksum, bool cache); }; // Options that control write operations diff --git a/util/options.cc b/util/options.cc index fbfa74ccc..c16ba1928 100644 --- a/util/options.cc +++ b/util/options.cc @@ -30,6 +30,7 @@ #include "rocksdb/table_properties.h" #include "table/block_based_table_factory.h" #include "util/statistics.h" +#include "util/xfunc.h" namespace rocksdb { @@ -596,6 +597,33 @@ DBOptions* DBOptions::IncreaseParallelism(int total_threads) { env->SetBackgroundThreads(1, Env::HIGH); return this; } + +ReadOptions::ReadOptions() + : verify_checksums(true), + fill_cache(true), + snapshot(nullptr), + iterate_upper_bound(nullptr), + read_tier(kReadAllTier), + tailing(false), + managed(false), + total_order_seek(false) { + XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, + reinterpret_cast(this)); +} + +ReadOptions::ReadOptions(bool cksum, bool cache) + : verify_checksums(cksum), + fill_cache(cache), + snapshot(nullptr), + iterate_upper_bound(nullptr), + read_tier(kReadAllTier), + tailing(false), + managed(false), + total_order_seek(false) { + XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, + reinterpret_cast(this)); +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/util/xfunc.cc b/util/xfunc.cc index 9a2482272..aadc8974e 100644 --- a/util/xfunc.cc +++ b/util/xfunc.cc @@ -4,6 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include "db/db_impl.h" +#include "db/managed_iterator.h" #include "rocksdb/options.h" #include "util/xfunc.h" @@ -14,6 +16,7 @@ namespace rocksdb { std::string XFuncPoint::xfunc_test_; bool XFuncPoint::initialized_ = false; bool XFuncPoint::enabled_ = false; +int XFuncPoint::skip_policy_ = 0; void GetXFTestOptions(Options* options, int skip_policy) { if (XFuncPoint::Check("inplace_lock_test") && @@ -22,6 +25,45 @@ void GetXFTestOptions(Options* options, int skip_policy) { } } +void xf_manage_release(ManagedIterator* iter) { + if (!(XFuncPoint::GetSkip() & kSkipNoPrefix)) { + iter->ReleaseIter(false); + } +} + +void xf_manage_options(ReadOptions* read_options) { + if (!XFuncPoint::Check("managed_xftest_dropold") && + (!XFuncPoint::Check("managed_xftest_release"))) { + return; + } + read_options->managed = true; +} + +void xf_manage_new(DBImpl* db, ReadOptions* read_options, + bool is_snapshot_supported) { + if ((!XFuncPoint::Check("managed_xftest_dropold") && + (!XFuncPoint::Check("managed_xftest_release"))) || + (!read_options->managed)) { + return; + } + if ((!read_options->tailing) && (read_options->snapshot == nullptr) && + (!is_snapshot_supported)) { + read_options->managed = false; + return; + } + if (db->GetOptions().prefix_extractor != nullptr) { + if (strcmp(db->GetOptions().table_factory.get()->Name(), "PlainTable")) { + if (!(XFuncPoint::GetSkip() & kSkipNoPrefix)) { + read_options->total_order_seek = true; + } + } else { + read_options->managed = false; + } + } +} + +void xf_manage_create(ManagedIterator* iter) { iter->SetDropOld(false); } + } // namespace rocksdb #endif // XFUNC diff --git a/util/xfunc.h b/util/xfunc.h index 51122b7aa..c37149a30 100644 --- a/util/xfunc.h +++ b/util/xfunc.h @@ -29,7 +29,14 @@ namespace rocksdb { #else struct Options; +class ManagedIterator; +class DBImpl; void GetXFTestOptions(Options* options, int skip_policy); +void xf_manage_release(ManagedIterator* iter); +void xf_manage_new(DBImpl* db, ReadOptions* readoptions, + bool is_snapshot_supported); +void xf_manage_create(ManagedIterator* iter); +void xf_manage_options(ReadOptions* read_options); // This class provides the facility to run custom code to test a specific // feature typically with all existing unit tests. @@ -66,10 +73,14 @@ class XFuncPoint { ((test.compare("") == 0) || (test.compare(xfunc_test_) == 0))); } + static void SetSkip(int skip) { skip_policy_ = skip; } + static int GetSkip(void) { return skip_policy_; } + private: static std::string xfunc_test_; static bool initialized_; static bool enabled_; + static int skip_policy_; }; // Use XFUNC_TEST to specify cross functional test points inside the code base.