diff --git a/db/db_impl.cc b/db/db_impl.cc index da304e505..023946196 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -35,6 +35,7 @@ #include "db/job_context.h" #include "db/log_reader.h" #include "db/log_writer.h" +#include "db/managed_iterator.h" #include "db/memtable.h" #include "db/memtable_list.h" #include "db/merge_context.h" @@ -80,6 +81,7 @@ #include "util/string_util.h" #include "util/thread_status_updater.h" #include "util/thread_status_util.h" +#include "util/xfunc.h" namespace rocksdb { @@ -2788,7 +2790,24 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); - if (read_options.tailing) { + XFUNC_TEST("", "managed_new", managed_new1, xf_manage_new, + reinterpret_cast(this), + const_cast(&read_options), is_snapshot_supported_); + if (read_options.managed) { +#ifdef ROCKSDB_LITE + // not supported in lite version + return NewErrorIterator(Status::InvalidArgument( + "Managed Iterators not supported in RocksDBLite.")); +#else + if ((read_options.tailing) || (read_options.snapshot != nullptr) || + (is_snapshot_supported_)) { + return new ManagedIterator(this, read_options, cfd); + } + // Managed iter not supported + return NewErrorIterator(Status::InvalidArgument( + "Managed Iterators not supported without snapshots.")); +#endif + } else if (read_options.tailing) { #ifdef ROCKSDB_LITE // not supported in lite version return nullptr; @@ -2873,8 +2892,26 @@ Status DBImpl::NewIterators( std::vector* iterators) { iterators->clear(); iterators->reserve(column_families.size()); - - if (read_options.tailing) { + XFUNC_TEST("", "managed_new", managed_new1, xf_manage_new, + reinterpret_cast(this), + const_cast(&read_options), is_snapshot_supported_); + if (read_options.managed) { +#ifdef ROCKSDB_LITE + return Status::InvalidArgument( + "Managed interator not supported in RocksDB lite"); +#else + if ((!read_options.tailing) && (read_options.snapshot == nullptr) && + (!is_snapshot_supported_)) { + return Status::InvalidArgument( + "Managed interator not supported without snapshots"); + } + for (auto cfh : column_families) { + auto cfd = reinterpret_cast(cfh)->cfd(); + auto iter = new ManagedIterator(this, read_options, cfd); + iterators->push_back(iter); + } +#endif + } else if (read_options.tailing) { #ifdef ROCKSDB_LITE return Status::InvalidArgument( "Tailing interator not supported in RocksDB lite"); diff --git a/db/db_test.cc b/db/db_test.cc index dd120d51d..3d352e25d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1943,6 +1943,71 @@ TEST(DBTest, NonBlockingIteration) { kSkipMmapReads)); } +TEST(DBTest, ManagedNonBlockingIteration) { + do { + ReadOptions non_blocking_opts, regular_opts; + Options options = CurrentOptions(); + options.statistics = rocksdb::CreateDBStatistics(); + non_blocking_opts.read_tier = kBlockCacheTier; + non_blocking_opts.managed = true; + CreateAndReopenWithCF({"pikachu"}, options); + // write one kv to the database. + ASSERT_OK(Put(1, "a", "b")); + + // scan using non-blocking iterator. We should find it because + // it is in memtable. + Iterator* iter = db_->NewIterator(non_blocking_opts, handles_[1]); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + count++; + } + ASSERT_EQ(count, 1); + delete iter; + + // flush memtable to storage. Now, the key should not be in the + // memtable neither in the block cache. + ASSERT_OK(Flush(1)); + + // verify that a non-blocking iterator does not find any + // kvs. Neither does it do any IOs to storage. + int64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS); + int64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD); + iter = db_->NewIterator(non_blocking_opts, handles_[1]); + count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + count++; + } + ASSERT_EQ(count, 0); + ASSERT_TRUE(iter->status().IsIncomplete()); + ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS)); + ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + delete iter; + + // read in the specified block via a regular get + ASSERT_EQ(Get(1, "a"), "b"); + + // verify that we can find it via a non-blocking scan + numopen = TestGetTickerCount(options, NO_FILE_OPENS); + cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD); + iter = db_->NewIterator(non_blocking_opts, handles_[1]); + count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_OK(iter->status()); + count++; + } + ASSERT_EQ(count, 1); + ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS)); + ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD)); + delete iter; + + // This test verifies block cache behaviors, which is not used by plain + // table format. + // Exclude kHashCuckoo as it does not support iteration currently + } while (ChangeOptions(kSkipPlainTable | kSkipNoSeekToLast | kSkipHashCuckoo | + kSkipMmapReads)); +} + // A delete is skipped for key if KeyMayExist(key) returns False // Tests Writebatch consistency and proper delete behaviour TEST(DBTest, FilterDeletes) { @@ -8503,6 +8568,8 @@ void PrefixScanInit(DBTest *dbtest) { } // namespace TEST(DBTest, PrefixScan) { + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, + kSkipNoPrefix); while (ChangeFilterOptions()) { int count; Slice prefix; @@ -8546,6 +8613,7 @@ TEST(DBTest, PrefixScan) { ASSERT_EQ(env_->random_read_counter_.Read(), 2); Close(); } // end of while + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); } TEST(DBTest, TailingIteratorSingle) { @@ -8675,6 +8743,8 @@ TEST(DBTest, TailingIteratorDeletes) { } TEST(DBTest, TailingIteratorPrefixSeek) { + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, + kSkipNoPrefix); ReadOptions read_options; read_options.tailing = true; @@ -8704,6 +8774,7 @@ TEST(DBTest, TailingIteratorPrefixSeek) { iter->Next(); ASSERT_TRUE(!iter->Valid()); + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); } TEST(DBTest, TailingIteratorIncomplete) { @@ -8763,6 +8834,232 @@ TEST(DBTest, TailingIteratorSeekToSame) { ASSERT_EQ(found, iter->key().ToString()); } +TEST(DBTest, ManagedTailingIteratorSingle) { + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + ASSERT_TRUE(!iter->Valid()); + + // add a record and check that iter can see it + ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "mirko"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); +} + +TEST(DBTest, ManagedTailingIteratorKeepAdding) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 10000; + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "%016d", i); + + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + + iter->Seek(key); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST(DBTest, ManagedTailingIteratorSeekToNext) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + std::string value(1024, 'a'); + + const int num_records = 1000; + for (int i = 1; i < num_records; ++i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } + for (int i = 2 * num_records; i > 0; --i) { + char buf1[32]; + char buf2[32]; + snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5); + + Slice key(buf1, 20); + ASSERT_OK(Put(1, key, value)); + + if (i % 100 == 99) { + ASSERT_OK(Flush(1)); + } + + snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2); + Slice target(buf2, 20); + iter->Seek(target); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().compare(key), 0); + } +} + +TEST(DBTest, ManagedTailingIteratorDeletes) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + + // write a single record, read it using the iterator, then delete it + ASSERT_OK(Put(1, "0test", "test")); + iter->SeekToFirst(); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0test"); + ASSERT_OK(Delete(1, "0test")); + + // write many more records + const int num_records = 10000; + std::string value(1024, 'A'); + + for (int i = 0; i < num_records; ++i) { + char buf[32]; + snprintf(buf, sizeof(buf), "1%015d", i); + + Slice key(buf, 16); + ASSERT_OK(Put(1, key, value)); + } + + // force a flush to make sure that no records are read from memtable + ASSERT_OK(Flush(1)); + + // skip "0test" + iter->Next(); + + // make sure we can read all new records using the existing iterator + int count = 0; + for (; iter->Valid(); iter->Next(), ++count) { + } + + ASSERT_EQ(count, num_records); +} + +TEST(DBTest, ManagedTailingIteratorPrefixSeek) { + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, + kSkipNoPrefix); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + Options options = CurrentOptions(); + options.env = env_; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.prefix_extractor.reset(NewFixedPrefixTransform(2)); + options.memtable_factory.reset(NewHashSkipListRepFactory(16)); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + std::unique_ptr iter(db_->NewIterator(read_options, handles_[1])); + ASSERT_OK(Put(1, "0101", "test")); + + ASSERT_OK(Flush(1)); + + ASSERT_OK(Put(1, "0202", "test")); + + // Seek(0102) shouldn't find any records since 0202 has a different prefix + iter->Seek("0102"); + ASSERT_TRUE(!iter->Valid()); + + iter->Seek("0202"); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(iter->key().ToString(), "0202"); + + iter->Next(); + ASSERT_TRUE(!iter->Valid()); + XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0); +} + +TEST(DBTest, ManagedTailingIteratorIncomplete) { + CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + read_options.read_tier = kBlockCacheTier; + + std::string key = "key"; + std::string value = "value"; + + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + + std::unique_ptr iter(db_->NewIterator(read_options)); + iter->SeekToFirst(); + // we either see the entry or it's not in cache + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); + + ASSERT_OK(db_->CompactRange(nullptr, nullptr)); + iter->SeekToFirst(); + // should still be true after compaction + ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete()); +} + +TEST(DBTest, ManagedTailingIteratorSeekToSame) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 1000; + CreateAndReopenWithCF({"pikachu"}, options); + + ReadOptions read_options; + read_options.tailing = true; + read_options.managed = true; + + const int NROWS = 10000; + // Write rows with keys 00000, 00002, 00004 etc. + for (int i = 0; i < NROWS; ++i) { + char buf[100]; + snprintf(buf, sizeof(buf), "%05d", 2 * i); + std::string key(buf); + std::string value("value"); + ASSERT_OK(db_->Put(WriteOptions(), key, value)); + } + + std::unique_ptr iter(db_->NewIterator(read_options)); + // Seek to 00001. We expect to find 00002. + std::string start_key = "00001"; + iter->Seek(start_key); + ASSERT_TRUE(iter->Valid()); + + std::string found = iter->key().ToString(); + ASSERT_EQ("00002", found); + + // Now seek to the same key. The iterator should remain in the same + // position. + iter->Seek(found); + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(found, iter->key().ToString()); +} + TEST(DBTest, BlockBasedTablePrefixIndexTest) { // create a DB with block prefix index BlockBasedTableOptions table_options; diff --git a/db/managed_iterator.cc b/db/managed_iterator.cc new file mode 100644 index 000000000..2a783a0e2 --- /dev/null +++ b/db/managed_iterator.cc @@ -0,0 +1,255 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "db/column_family.h" +#include "db/db_impl.h" +#include "db/db_iter.h" +#include "db/dbformat.h" +#include "db/managed_iterator.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/slice_transform.h" +#include "table/merger.h" +#include "util/xfunc.h" + +namespace rocksdb { + +namespace { +// Helper class that locks a mutex on construction and unlocks the mutex when +// the destructor of the MutexLock object is invoked. +// +// Typical usage: +// +// void MyClass::MyMethod() { +// MILock l(&mu_); // mu_ is an instance variable +// ... some complex code, possibly with multiple return paths ... +// } + +class MILock { + public: + explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) { + this->mu_->lock(); + } + ~MILock() { + this->mu_->unlock(); + XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1, + xf_manage_release, mi_); + } + + private: + std::mutex* const mu_; + ManagedIterator* mi_; + // No copying allowed + MILock(const MILock&) = delete; + void operator=(const MILock&) = delete; +}; +} // anonymous namespace + +// +// Synchronization between modifiers, releasers, creators +// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use +// if modifying mutable_iter, atomically exchange in_use: +// return if in_use set / otherwise set in use, +// atomically replace new iter with old , reset in use +// The releaser is the new operation and it holds a lock for a very short time +// The existing non-const iterator operations are supposed to be single +// threaded and hold the lock for the duration of the operation +// The existing const iterator operations use the cached key/values +// and don't do any locking. +ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd) + : db_(db), + read_options_(read_options), + cfd_(cfd), + svnum_(cfd->GetSuperVersionNumber()), + mutable_iter_(nullptr), + valid_(false), + snapshot_created_(false), + release_supported_(true) { + read_options_.managed = false; + if ((!read_options_.tailing) && (read_options_.snapshot == nullptr)) { + assert(read_options_.snapshot = db_->GetSnapshot()); + snapshot_created_ = true; + } + cfh_.SetCFD(cfd); + mutable_iter_ = unique_ptr(db->NewIterator(read_options_, &cfh_)); + XFUNC_TEST("managed_xftest_dropold", "managed_create", xf_managed_create1, + xf_manage_create, this); +} + +ManagedIterator::~ManagedIterator() { + Lock(); + if (snapshot_created_) { + db_->ReleaseSnapshot(read_options_.snapshot); + snapshot_created_ = false; + read_options_.snapshot = nullptr; + } +} + +bool ManagedIterator::Valid() const { return valid_; } + +void ManagedIterator::SeekToLast() { + MILock l(&in_use_, this); + if (NeedToRebuild()) { + RebuildIterator(); + } + assert(mutable_iter_ != nullptr); + mutable_iter_->SeekToLast(); + if (mutable_iter_->status().ok()) { + UpdateCurrent(); + } +} + +void ManagedIterator::SeekToFirst() { + MILock l(&in_use_, this); + SeekInternal(Slice(), true); +} + +void ManagedIterator::Seek(const Slice& user_key) { + MILock l(&in_use_, this); + SeekInternal(user_key, false); +} + +void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) { + if (NeedToRebuild()) { + RebuildIterator(); + } + assert(mutable_iter_ != nullptr); + if (seek_to_first) { + mutable_iter_->SeekToFirst(); + } else { + mutable_iter_->Seek(user_key); + } + UpdateCurrent(); +} + +void ManagedIterator::Prev() { + if (!valid_) { + status_ = Status::InvalidArgument("Iterator value invalid"); + return; + } + MILock l(&in_use_, this); + if (NeedToRebuild()) { + std::string current_key = key().ToString(); + Slice old_key(current_key); + RebuildIterator(); + SeekInternal(old_key, false); + UpdateCurrent(); + if (!valid_) { + return; + } + if (key().compare(old_key) != 0) { + valid_ = false; + status_ = Status::Incomplete("Cannot do Prev now"); + return; + } + } + mutable_iter_->Prev(); + if (mutable_iter_->status().ok()) { + UpdateCurrent(); + status_ = Status::OK(); + } else { + status_ = mutable_iter_->status(); + } +} + +void ManagedIterator::Next() { + if (!valid_) { + status_ = Status::InvalidArgument("Iterator value invalid"); + return; + } + MILock l(&in_use_, this); + if (NeedToRebuild()) { + std::string current_key = key().ToString(); + Slice old_key(current_key.data(), cached_key_.Size()); + RebuildIterator(); + SeekInternal(old_key, false); + UpdateCurrent(); + if (!valid_) { + return; + } + if (key().compare(old_key) != 0) { + valid_ = false; + status_ = Status::Incomplete("Cannot do Next now"); + return; + } + } + mutable_iter_->Next(); + UpdateCurrent(); +} + +Slice ManagedIterator::key() const { + assert(valid_); + return cached_key_.GetKey(); +} + +Slice ManagedIterator::value() const { + assert(valid_); + return cached_value_.GetKey(); +} + +Status ManagedIterator::status() const { return status_; } + +void ManagedIterator::RebuildIterator() { + svnum_ = cfd_->GetSuperVersionNumber(); + mutable_iter_ = unique_ptr(db_->NewIterator(read_options_, &cfh_)); +} + +void ManagedIterator::UpdateCurrent() { + assert(mutable_iter_ != nullptr); + + if (!(valid_ = mutable_iter_->Valid())) { + status_ = mutable_iter_->status(); + return; + } + + status_ = Status::OK(); + cached_key_.SetKey(mutable_iter_->key()); + cached_value_.SetKey(mutable_iter_->value()); +} + +void ManagedIterator::ReleaseIter(bool only_old) { + if ((mutable_iter_ == nullptr) || (!release_supported_)) { + return; + } + if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) { + if (!TryLock()) { // Don't release iter if in use + return; + } + mutable_iter_ = nullptr; // in_use for a very short time + UnLock(); + } +} + +bool ManagedIterator::NeedToRebuild() { + if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) || + (!only_drop_old_ && (svnum_ != cfd_->GetSuperVersionNumber()))) { + return true; + } + return false; +} + +void ManagedIterator::Lock() { + in_use_.lock(); + return; +} + +bool ManagedIterator::TryLock() { return in_use_.try_lock(); } + +void ManagedIterator::UnLock() { + in_use_.unlock(); + XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1, + xf_manage_release, this); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/db/managed_iterator.h b/db/managed_iterator.h new file mode 100644 index 000000000..3a551dff9 --- /dev/null +++ b/db/managed_iterator.h @@ -0,0 +1,84 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include + +#include "db/column_family.h" +#include "rocksdb/db.h" +#include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "util/arena.h" + +namespace rocksdb { + +class DBImpl; +struct SuperVersion; +class ColumnFamilyData; + +/** + * ManagedIterator is a special type of iterator that supports freeing the + * underlying iterator and still being able to access the current key/value + * pair. This is done by copying the key/value pair so that clients can + * continue to access the data without getting a SIGSEGV. + * The underlying iterator can be freed manually through the call to + * ReleaseIter or automatically (as needed on space pressure or age.) + * The iterator is recreated using the saved original arguments. + */ +class ManagedIterator : public Iterator { + public: + ManagedIterator(DBImpl* db, const ReadOptions& read_options, + ColumnFamilyData* cfd); + virtual ~ManagedIterator(); + + virtual void SeekToLast() override; + virtual void Prev() override; + virtual bool Valid() const override; + void SeekToFirst() override; + virtual void Seek(const Slice& target) override; + virtual void Next() override; + virtual Slice key() const override; + virtual Slice value() const override; + virtual Status status() const override; + void ReleaseIter(bool only_old); + void SetDropOld(bool only_old) { + only_drop_old_ = read_options_.tailing || only_old; + } + + private: + void RebuildIterator(); + void UpdateCurrent(); + void SeekInternal(const Slice& user_key, bool seek_to_first); + bool NeedToRebuild(); + void Lock(); + bool TryLock(); + void UnLock(); + DBImpl* const db_; + ReadOptions read_options_; + ColumnFamilyData* const cfd_; + ColumnFamilyHandleInternal cfh_; + + uint64_t svnum_; + std::unique_ptr mutable_iter_; + // internal iterator status + Status status_; + bool valid_; + + IterKey cached_key_; + IterKey cached_value_; + + bool only_drop_old_ = true; + bool snapshot_created_; + bool release_supported_; + std::mutex in_use_; // is managed iterator in use +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 0541a7b34..bfe88fe0c 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1021,27 +1021,20 @@ struct ReadOptions { // Not supported in ROCKSDB_LITE mode! bool tailing; + // Specify to create a managed iterator -- a special iterator that + // uses less resources by having the ability to free its underlying + // resources on request. + // Default: false + // Not supported in ROCKSDB_LITE mode! + bool managed; + // Enable a total order seek regardless of index format (e.g. hash index) // used in the table. Some table format (e.g. plain table) may not support // this option. bool total_order_seek; - ReadOptions() - : verify_checksums(true), - fill_cache(true), - snapshot(nullptr), - iterate_upper_bound(nullptr), - read_tier(kReadAllTier), - tailing(false), - total_order_seek(false) {} - ReadOptions(bool cksum, bool cache) - : verify_checksums(cksum), - fill_cache(cache), - snapshot(nullptr), - iterate_upper_bound(nullptr), - read_tier(kReadAllTier), - tailing(false), - total_order_seek(false) {} + ReadOptions(); + ReadOptions(bool cksum, bool cache); }; // Options that control write operations diff --git a/util/options.cc b/util/options.cc index fbfa74ccc..c16ba1928 100644 --- a/util/options.cc +++ b/util/options.cc @@ -30,6 +30,7 @@ #include "rocksdb/table_properties.h" #include "table/block_based_table_factory.h" #include "util/statistics.h" +#include "util/xfunc.h" namespace rocksdb { @@ -596,6 +597,33 @@ DBOptions* DBOptions::IncreaseParallelism(int total_threads) { env->SetBackgroundThreads(1, Env::HIGH); return this; } + +ReadOptions::ReadOptions() + : verify_checksums(true), + fill_cache(true), + snapshot(nullptr), + iterate_upper_bound(nullptr), + read_tier(kReadAllTier), + tailing(false), + managed(false), + total_order_seek(false) { + XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, + reinterpret_cast(this)); +} + +ReadOptions::ReadOptions(bool cksum, bool cache) + : verify_checksums(cksum), + fill_cache(cache), + snapshot(nullptr), + iterate_upper_bound(nullptr), + read_tier(kReadAllTier), + tailing(false), + managed(false), + total_order_seek(false) { + XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, + reinterpret_cast(this)); +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/util/xfunc.cc b/util/xfunc.cc index 9a2482272..aadc8974e 100644 --- a/util/xfunc.cc +++ b/util/xfunc.cc @@ -4,6 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include "db/db_impl.h" +#include "db/managed_iterator.h" #include "rocksdb/options.h" #include "util/xfunc.h" @@ -14,6 +16,7 @@ namespace rocksdb { std::string XFuncPoint::xfunc_test_; bool XFuncPoint::initialized_ = false; bool XFuncPoint::enabled_ = false; +int XFuncPoint::skip_policy_ = 0; void GetXFTestOptions(Options* options, int skip_policy) { if (XFuncPoint::Check("inplace_lock_test") && @@ -22,6 +25,45 @@ void GetXFTestOptions(Options* options, int skip_policy) { } } +void xf_manage_release(ManagedIterator* iter) { + if (!(XFuncPoint::GetSkip() & kSkipNoPrefix)) { + iter->ReleaseIter(false); + } +} + +void xf_manage_options(ReadOptions* read_options) { + if (!XFuncPoint::Check("managed_xftest_dropold") && + (!XFuncPoint::Check("managed_xftest_release"))) { + return; + } + read_options->managed = true; +} + +void xf_manage_new(DBImpl* db, ReadOptions* read_options, + bool is_snapshot_supported) { + if ((!XFuncPoint::Check("managed_xftest_dropold") && + (!XFuncPoint::Check("managed_xftest_release"))) || + (!read_options->managed)) { + return; + } + if ((!read_options->tailing) && (read_options->snapshot == nullptr) && + (!is_snapshot_supported)) { + read_options->managed = false; + return; + } + if (db->GetOptions().prefix_extractor != nullptr) { + if (strcmp(db->GetOptions().table_factory.get()->Name(), "PlainTable")) { + if (!(XFuncPoint::GetSkip() & kSkipNoPrefix)) { + read_options->total_order_seek = true; + } + } else { + read_options->managed = false; + } + } +} + +void xf_manage_create(ManagedIterator* iter) { iter->SetDropOld(false); } + } // namespace rocksdb #endif // XFUNC diff --git a/util/xfunc.h b/util/xfunc.h index 51122b7aa..c37149a30 100644 --- a/util/xfunc.h +++ b/util/xfunc.h @@ -29,7 +29,14 @@ namespace rocksdb { #else struct Options; +class ManagedIterator; +class DBImpl; void GetXFTestOptions(Options* options, int skip_policy); +void xf_manage_release(ManagedIterator* iter); +void xf_manage_new(DBImpl* db, ReadOptions* readoptions, + bool is_snapshot_supported); +void xf_manage_create(ManagedIterator* iter); +void xf_manage_options(ReadOptions* read_options); // This class provides the facility to run custom code to test a specific // feature typically with all existing unit tests. @@ -66,10 +73,14 @@ class XFuncPoint { ((test.compare("") == 0) || (test.compare(xfunc_test_) == 0))); } + static void SetSkip(int skip) { skip_policy_ = skip; } + static int GetSkip(void) { return skip_policy_; } + private: static std::string xfunc_test_; static bool initialized_; static bool enabled_; + static int skip_policy_; }; // Use XFUNC_TEST to specify cross functional test points inside the code base.