Managed iterator

Summary:
This is a diff for managed iterator. A managed iterator
is a wrapper around an iterator which saves the options for that
iterator as well as the current key/value so that the underlying iterator
and its associated memory can be released when it is aged out
automatically or on the request of the user. Will provide the automatic release as a follow-up diff.

Test Plan: Managed* tests in db_test and XF tests for managed iterator

Reviewers: igor, yhchiang, anthony, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D31401
main
Venkatesh Radhakrishnan 10 years ago
parent b4b8c25a5a
commit 7d817268b9
  1. 43
      db/db_impl.cc
  2. 297
      db/db_test.cc
  3. 255
      db/managed_iterator.cc
  4. 84
      db/managed_iterator.h
  5. 25
      include/rocksdb/options.h
  6. 28
      util/options.cc
  7. 42
      util/xfunc.cc
  8. 11
      util/xfunc.h

@ -35,6 +35,7 @@
#include "db/job_context.h" #include "db/job_context.h"
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/managed_iterator.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include "db/merge_context.h" #include "db/merge_context.h"
@ -80,6 +81,7 @@
#include "util/string_util.h" #include "util/string_util.h"
#include "util/thread_status_updater.h" #include "util/thread_status_updater.h"
#include "util/thread_status_util.h" #include "util/thread_status_util.h"
#include "util/xfunc.h"
namespace rocksdb { namespace rocksdb {
@ -2788,7 +2790,24 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
if (read_options.tailing) { XFUNC_TEST("", "managed_new", managed_new1, xf_manage_new,
reinterpret_cast<DBImpl*>(this),
const_cast<ReadOptions*>(&read_options), is_snapshot_supported_);
if (read_options.managed) {
#ifdef ROCKSDB_LITE
// not supported in lite version
return NewErrorIterator(Status::InvalidArgument(
"Managed Iterators not supported in RocksDBLite."));
#else
if ((read_options.tailing) || (read_options.snapshot != nullptr) ||
(is_snapshot_supported_)) {
return new ManagedIterator(this, read_options, cfd);
}
// Managed iter not supported
return NewErrorIterator(Status::InvalidArgument(
"Managed Iterators not supported without snapshots."));
#endif
} else if (read_options.tailing) {
#ifdef ROCKSDB_LITE #ifdef ROCKSDB_LITE
// not supported in lite version // not supported in lite version
return nullptr; return nullptr;
@ -2873,8 +2892,26 @@ Status DBImpl::NewIterators(
std::vector<Iterator*>* iterators) { std::vector<Iterator*>* iterators) {
iterators->clear(); iterators->clear();
iterators->reserve(column_families.size()); iterators->reserve(column_families.size());
XFUNC_TEST("", "managed_new", managed_new1, xf_manage_new,
if (read_options.tailing) { reinterpret_cast<DBImpl*>(this),
const_cast<ReadOptions*>(&read_options), is_snapshot_supported_);
if (read_options.managed) {
#ifdef ROCKSDB_LITE
return Status::InvalidArgument(
"Managed interator not supported in RocksDB lite");
#else
if ((!read_options.tailing) && (read_options.snapshot == nullptr) &&
(!is_snapshot_supported_)) {
return Status::InvalidArgument(
"Managed interator not supported without snapshots");
}
for (auto cfh : column_families) {
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
auto iter = new ManagedIterator(this, read_options, cfd);
iterators->push_back(iter);
}
#endif
} else if (read_options.tailing) {
#ifdef ROCKSDB_LITE #ifdef ROCKSDB_LITE
return Status::InvalidArgument( return Status::InvalidArgument(
"Tailing interator not supported in RocksDB lite"); "Tailing interator not supported in RocksDB lite");

@ -1943,6 +1943,71 @@ TEST(DBTest, NonBlockingIteration) {
kSkipMmapReads)); kSkipMmapReads));
} }
TEST(DBTest, ManagedNonBlockingIteration) {
do {
ReadOptions non_blocking_opts, regular_opts;
Options options = CurrentOptions();
options.statistics = rocksdb::CreateDBStatistics();
non_blocking_opts.read_tier = kBlockCacheTier;
non_blocking_opts.managed = true;
CreateAndReopenWithCF({"pikachu"}, options);
// write one kv to the database.
ASSERT_OK(Put(1, "a", "b"));
// scan using non-blocking iterator. We should find it because
// it is in memtable.
Iterator* iter = db_->NewIterator(non_blocking_opts, handles_[1]);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
count++;
}
ASSERT_EQ(count, 1);
delete iter;
// flush memtable to storage. Now, the key should not be in the
// memtable neither in the block cache.
ASSERT_OK(Flush(1));
// verify that a non-blocking iterator does not find any
// kvs. Neither does it do any IOs to storage.
int64_t numopen = TestGetTickerCount(options, NO_FILE_OPENS);
int64_t cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
iter = db_->NewIterator(non_blocking_opts, handles_[1]);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
count++;
}
ASSERT_EQ(count, 0);
ASSERT_TRUE(iter->status().IsIncomplete());
ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS));
ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD));
delete iter;
// read in the specified block via a regular get
ASSERT_EQ(Get(1, "a"), "b");
// verify that we can find it via a non-blocking scan
numopen = TestGetTickerCount(options, NO_FILE_OPENS);
cache_added = TestGetTickerCount(options, BLOCK_CACHE_ADD);
iter = db_->NewIterator(non_blocking_opts, handles_[1]);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
count++;
}
ASSERT_EQ(count, 1);
ASSERT_EQ(numopen, TestGetTickerCount(options, NO_FILE_OPENS));
ASSERT_EQ(cache_added, TestGetTickerCount(options, BLOCK_CACHE_ADD));
delete iter;
// This test verifies block cache behaviors, which is not used by plain
// table format.
// Exclude kHashCuckoo as it does not support iteration currently
} while (ChangeOptions(kSkipPlainTable | kSkipNoSeekToLast | kSkipHashCuckoo |
kSkipMmapReads));
}
// A delete is skipped for key if KeyMayExist(key) returns False // A delete is skipped for key if KeyMayExist(key) returns False
// Tests Writebatch consistency and proper delete behaviour // Tests Writebatch consistency and proper delete behaviour
TEST(DBTest, FilterDeletes) { TEST(DBTest, FilterDeletes) {
@ -8503,6 +8568,8 @@ void PrefixScanInit(DBTest *dbtest) {
} // namespace } // namespace
TEST(DBTest, PrefixScan) { TEST(DBTest, PrefixScan) {
XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip,
kSkipNoPrefix);
while (ChangeFilterOptions()) { while (ChangeFilterOptions()) {
int count; int count;
Slice prefix; Slice prefix;
@ -8546,6 +8613,7 @@ TEST(DBTest, PrefixScan) {
ASSERT_EQ(env_->random_read_counter_.Read(), 2); ASSERT_EQ(env_->random_read_counter_.Read(), 2);
Close(); Close();
} // end of while } // end of while
XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0);
} }
TEST(DBTest, TailingIteratorSingle) { TEST(DBTest, TailingIteratorSingle) {
@ -8675,6 +8743,8 @@ TEST(DBTest, TailingIteratorDeletes) {
} }
TEST(DBTest, TailingIteratorPrefixSeek) { TEST(DBTest, TailingIteratorPrefixSeek) {
XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip,
kSkipNoPrefix);
ReadOptions read_options; ReadOptions read_options;
read_options.tailing = true; read_options.tailing = true;
@ -8704,6 +8774,7 @@ TEST(DBTest, TailingIteratorPrefixSeek) {
iter->Next(); iter->Next();
ASSERT_TRUE(!iter->Valid()); ASSERT_TRUE(!iter->Valid());
XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0);
} }
TEST(DBTest, TailingIteratorIncomplete) { TEST(DBTest, TailingIteratorIncomplete) {
@ -8763,6 +8834,232 @@ TEST(DBTest, TailingIteratorSeekToSame) {
ASSERT_EQ(found, iter->key().ToString()); ASSERT_EQ(found, iter->key().ToString());
} }
TEST(DBTest, ManagedTailingIteratorSingle) {
ReadOptions read_options;
read_options.tailing = true;
read_options.managed = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
iter->SeekToFirst();
ASSERT_TRUE(!iter->Valid());
// add a record and check that iter can see it
ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor"));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "mirko");
iter->Next();
ASSERT_TRUE(!iter->Valid());
}
TEST(DBTest, ManagedTailingIteratorKeepAdding) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
read_options.managed = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
std::string value(1024, 'a');
const int num_records = 10000;
for (int i = 0; i < num_records; ++i) {
char buf[32];
snprintf(buf, sizeof(buf), "%016d", i);
Slice key(buf, 16);
ASSERT_OK(Put(1, key, value));
iter->Seek(key);
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(key), 0);
}
}
TEST(DBTest, ManagedTailingIteratorSeekToNext) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
read_options.managed = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
std::string value(1024, 'a');
const int num_records = 1000;
for (int i = 1; i < num_records; ++i) {
char buf1[32];
char buf2[32];
snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
Slice key(buf1, 20);
ASSERT_OK(Put(1, key, value));
if (i % 100 == 99) {
ASSERT_OK(Flush(1));
}
snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
Slice target(buf2, 20);
iter->Seek(target);
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(key), 0);
}
for (int i = 2 * num_records; i > 0; --i) {
char buf1[32];
char buf2[32];
snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
Slice key(buf1, 20);
ASSERT_OK(Put(1, key, value));
if (i % 100 == 99) {
ASSERT_OK(Flush(1));
}
snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
Slice target(buf2, 20);
iter->Seek(target);
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(key), 0);
}
}
TEST(DBTest, ManagedTailingIteratorDeletes) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
read_options.managed = true;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
// write a single record, read it using the iterator, then delete it
ASSERT_OK(Put(1, "0test", "test"));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "0test");
ASSERT_OK(Delete(1, "0test"));
// write many more records
const int num_records = 10000;
std::string value(1024, 'A');
for (int i = 0; i < num_records; ++i) {
char buf[32];
snprintf(buf, sizeof(buf), "1%015d", i);
Slice key(buf, 16);
ASSERT_OK(Put(1, key, value));
}
// force a flush to make sure that no records are read from memtable
ASSERT_OK(Flush(1));
// skip "0test"
iter->Next();
// make sure we can read all new records using the existing iterator
int count = 0;
for (; iter->Valid(); iter->Next(), ++count) {
}
ASSERT_EQ(count, num_records);
}
TEST(DBTest, ManagedTailingIteratorPrefixSeek) {
XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip,
kSkipNoPrefix);
ReadOptions read_options;
read_options.tailing = true;
read_options.managed = true;
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.prefix_extractor.reset(NewFixedPrefixTransform(2));
options.memtable_factory.reset(NewHashSkipListRepFactory(16));
DestroyAndReopen(options);
CreateAndReopenWithCF({"pikachu"}, options);
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
ASSERT_OK(Put(1, "0101", "test"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put(1, "0202", "test"));
// Seek(0102) shouldn't find any records since 0202 has a different prefix
iter->Seek("0102");
ASSERT_TRUE(!iter->Valid());
iter->Seek("0202");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().ToString(), "0202");
iter->Next();
ASSERT_TRUE(!iter->Valid());
XFUNC_TEST("", "dbtest_prefix", prefix_skip1, XFuncPoint::SetSkip, 0);
}
TEST(DBTest, ManagedTailingIteratorIncomplete) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
read_options.managed = true;
read_options.read_tier = kBlockCacheTier;
std::string key = "key";
std::string value = "value";
ASSERT_OK(db_->Put(WriteOptions(), key, value));
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
iter->SeekToFirst();
// we either see the entry or it's not in cache
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
ASSERT_OK(db_->CompactRange(nullptr, nullptr));
iter->SeekToFirst();
// should still be true after compaction
ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
}
TEST(DBTest, ManagedTailingIteratorSeekToSame) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 1000;
CreateAndReopenWithCF({"pikachu"}, options);
ReadOptions read_options;
read_options.tailing = true;
read_options.managed = true;
const int NROWS = 10000;
// Write rows with keys 00000, 00002, 00004 etc.
for (int i = 0; i < NROWS; ++i) {
char buf[100];
snprintf(buf, sizeof(buf), "%05d", 2 * i);
std::string key(buf);
std::string value("value");
ASSERT_OK(db_->Put(WriteOptions(), key, value));
}
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
// Seek to 00001. We expect to find 00002.
std::string start_key = "00001";
iter->Seek(start_key);
ASSERT_TRUE(iter->Valid());
std::string found = iter->key().ToString();
ASSERT_EQ("00002", found);
// Now seek to the same key. The iterator should remain in the same
// position.
iter->Seek(found);
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(found, iter->key().ToString());
}
TEST(DBTest, BlockBasedTablePrefixIndexTest) { TEST(DBTest, BlockBasedTablePrefixIndexTest) {
// create a DB with block prefix index // create a DB with block prefix index
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;

@ -0,0 +1,255 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef ROCKSDB_LITE
#include <limits>
#include <string>
#include <utility>
#include "db/column_family.h"
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/managed_iterator.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "table/merger.h"
#include "util/xfunc.h"
namespace rocksdb {
namespace {
// Helper class that locks a mutex on construction and unlocks the mutex when
// the destructor of the MutexLock object is invoked.
//
// Typical usage:
//
// void MyClass::MyMethod() {
// MILock l(&mu_); // mu_ is an instance variable
// ... some complex code, possibly with multiple return paths ...
// }
class MILock {
public:
explicit MILock(std::mutex* mu, ManagedIterator* mi) : mu_(mu), mi_(mi) {
this->mu_->lock();
}
~MILock() {
this->mu_->unlock();
XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1,
xf_manage_release, mi_);
}
private:
std::mutex* const mu_;
ManagedIterator* mi_;
// No copying allowed
MILock(const MILock&) = delete;
void operator=(const MILock&) = delete;
};
} // anonymous namespace
//
// Synchronization between modifiers, releasers, creators
// If iterator operation, wait till (!in_use), set in_use, do op, reset in_use
// if modifying mutable_iter, atomically exchange in_use:
// return if in_use set / otherwise set in use,
// atomically replace new iter with old , reset in use
// The releaser is the new operation and it holds a lock for a very short time
// The existing non-const iterator operations are supposed to be single
// threaded and hold the lock for the duration of the operation
// The existing const iterator operations use the cached key/values
// and don't do any locking.
ManagedIterator::ManagedIterator(DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd)
: db_(db),
read_options_(read_options),
cfd_(cfd),
svnum_(cfd->GetSuperVersionNumber()),
mutable_iter_(nullptr),
valid_(false),
snapshot_created_(false),
release_supported_(true) {
read_options_.managed = false;
if ((!read_options_.tailing) && (read_options_.snapshot == nullptr)) {
assert(read_options_.snapshot = db_->GetSnapshot());
snapshot_created_ = true;
}
cfh_.SetCFD(cfd);
mutable_iter_ = unique_ptr<Iterator>(db->NewIterator(read_options_, &cfh_));
XFUNC_TEST("managed_xftest_dropold", "managed_create", xf_managed_create1,
xf_manage_create, this);
}
ManagedIterator::~ManagedIterator() {
Lock();
if (snapshot_created_) {
db_->ReleaseSnapshot(read_options_.snapshot);
snapshot_created_ = false;
read_options_.snapshot = nullptr;
}
}
bool ManagedIterator::Valid() const { return valid_; }
void ManagedIterator::SeekToLast() {
MILock l(&in_use_, this);
if (NeedToRebuild()) {
RebuildIterator();
}
assert(mutable_iter_ != nullptr);
mutable_iter_->SeekToLast();
if (mutable_iter_->status().ok()) {
UpdateCurrent();
}
}
void ManagedIterator::SeekToFirst() {
MILock l(&in_use_, this);
SeekInternal(Slice(), true);
}
void ManagedIterator::Seek(const Slice& user_key) {
MILock l(&in_use_, this);
SeekInternal(user_key, false);
}
void ManagedIterator::SeekInternal(const Slice& user_key, bool seek_to_first) {
if (NeedToRebuild()) {
RebuildIterator();
}
assert(mutable_iter_ != nullptr);
if (seek_to_first) {
mutable_iter_->SeekToFirst();
} else {
mutable_iter_->Seek(user_key);
}
UpdateCurrent();
}
void ManagedIterator::Prev() {
if (!valid_) {
status_ = Status::InvalidArgument("Iterator value invalid");
return;
}
MILock l(&in_use_, this);
if (NeedToRebuild()) {
std::string current_key = key().ToString();
Slice old_key(current_key);
RebuildIterator();
SeekInternal(old_key, false);
UpdateCurrent();
if (!valid_) {
return;
}
if (key().compare(old_key) != 0) {
valid_ = false;
status_ = Status::Incomplete("Cannot do Prev now");
return;
}
}
mutable_iter_->Prev();
if (mutable_iter_->status().ok()) {
UpdateCurrent();
status_ = Status::OK();
} else {
status_ = mutable_iter_->status();
}
}
void ManagedIterator::Next() {
if (!valid_) {
status_ = Status::InvalidArgument("Iterator value invalid");
return;
}
MILock l(&in_use_, this);
if (NeedToRebuild()) {
std::string current_key = key().ToString();
Slice old_key(current_key.data(), cached_key_.Size());
RebuildIterator();
SeekInternal(old_key, false);
UpdateCurrent();
if (!valid_) {
return;
}
if (key().compare(old_key) != 0) {
valid_ = false;
status_ = Status::Incomplete("Cannot do Next now");
return;
}
}
mutable_iter_->Next();
UpdateCurrent();
}
Slice ManagedIterator::key() const {
assert(valid_);
return cached_key_.GetKey();
}
Slice ManagedIterator::value() const {
assert(valid_);
return cached_value_.GetKey();
}
Status ManagedIterator::status() const { return status_; }
void ManagedIterator::RebuildIterator() {
svnum_ = cfd_->GetSuperVersionNumber();
mutable_iter_ = unique_ptr<Iterator>(db_->NewIterator(read_options_, &cfh_));
}
void ManagedIterator::UpdateCurrent() {
assert(mutable_iter_ != nullptr);
if (!(valid_ = mutable_iter_->Valid())) {
status_ = mutable_iter_->status();
return;
}
status_ = Status::OK();
cached_key_.SetKey(mutable_iter_->key());
cached_value_.SetKey(mutable_iter_->value());
}
void ManagedIterator::ReleaseIter(bool only_old) {
if ((mutable_iter_ == nullptr) || (!release_supported_)) {
return;
}
if (svnum_ != cfd_->GetSuperVersionNumber() || !only_old) {
if (!TryLock()) { // Don't release iter if in use
return;
}
mutable_iter_ = nullptr; // in_use for a very short time
UnLock();
}
}
bool ManagedIterator::NeedToRebuild() {
if ((mutable_iter_ == nullptr) || (status_.IsIncomplete()) ||
(!only_drop_old_ && (svnum_ != cfd_->GetSuperVersionNumber()))) {
return true;
}
return false;
}
void ManagedIterator::Lock() {
in_use_.lock();
return;
}
bool ManagedIterator::TryLock() { return in_use_.try_lock(); }
void ManagedIterator::UnLock() {
in_use_.unlock();
XFUNC_TEST("managed_xftest_release", "managed_unlock", managed_unlock1,
xf_manage_release, this);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,84 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <mutex>
#include <queue>
#include <string>
#include <vector>
#include "db/column_family.h"
#include "rocksdb/db.h"
#include "rocksdb/iterator.h"
#include "rocksdb/options.h"
#include "util/arena.h"
namespace rocksdb {
class DBImpl;
struct SuperVersion;
class ColumnFamilyData;
/**
* ManagedIterator is a special type of iterator that supports freeing the
* underlying iterator and still being able to access the current key/value
* pair. This is done by copying the key/value pair so that clients can
* continue to access the data without getting a SIGSEGV.
* The underlying iterator can be freed manually through the call to
* ReleaseIter or automatically (as needed on space pressure or age.)
* The iterator is recreated using the saved original arguments.
*/
class ManagedIterator : public Iterator {
public:
ManagedIterator(DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd);
virtual ~ManagedIterator();
virtual void SeekToLast() override;
virtual void Prev() override;
virtual bool Valid() const override;
void SeekToFirst() override;
virtual void Seek(const Slice& target) override;
virtual void Next() override;
virtual Slice key() const override;
virtual Slice value() const override;
virtual Status status() const override;
void ReleaseIter(bool only_old);
void SetDropOld(bool only_old) {
only_drop_old_ = read_options_.tailing || only_old;
}
private:
void RebuildIterator();
void UpdateCurrent();
void SeekInternal(const Slice& user_key, bool seek_to_first);
bool NeedToRebuild();
void Lock();
bool TryLock();
void UnLock();
DBImpl* const db_;
ReadOptions read_options_;
ColumnFamilyData* const cfd_;
ColumnFamilyHandleInternal cfh_;
uint64_t svnum_;
std::unique_ptr<Iterator> mutable_iter_;
// internal iterator status
Status status_;
bool valid_;
IterKey cached_key_;
IterKey cached_value_;
bool only_drop_old_ = true;
bool snapshot_created_;
bool release_supported_;
std::mutex in_use_; // is managed iterator in use
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -1021,27 +1021,20 @@ struct ReadOptions {
// Not supported in ROCKSDB_LITE mode! // Not supported in ROCKSDB_LITE mode!
bool tailing; bool tailing;
// Specify to create a managed iterator -- a special iterator that
// uses less resources by having the ability to free its underlying
// resources on request.
// Default: false
// Not supported in ROCKSDB_LITE mode!
bool managed;
// Enable a total order seek regardless of index format (e.g. hash index) // Enable a total order seek regardless of index format (e.g. hash index)
// used in the table. Some table format (e.g. plain table) may not support // used in the table. Some table format (e.g. plain table) may not support
// this option. // this option.
bool total_order_seek; bool total_order_seek;
ReadOptions() ReadOptions();
: verify_checksums(true), ReadOptions(bool cksum, bool cache);
fill_cache(true),
snapshot(nullptr),
iterate_upper_bound(nullptr),
read_tier(kReadAllTier),
tailing(false),
total_order_seek(false) {}
ReadOptions(bool cksum, bool cache)
: verify_checksums(cksum),
fill_cache(cache),
snapshot(nullptr),
iterate_upper_bound(nullptr),
read_tier(kReadAllTier),
tailing(false),
total_order_seek(false) {}
}; };
// Options that control write operations // Options that control write operations

@ -30,6 +30,7 @@
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "table/block_based_table_factory.h" #include "table/block_based_table_factory.h"
#include "util/statistics.h" #include "util/statistics.h"
#include "util/xfunc.h"
namespace rocksdb { namespace rocksdb {
@ -596,6 +597,33 @@ DBOptions* DBOptions::IncreaseParallelism(int total_threads) {
env->SetBackgroundThreads(1, Env::HIGH); env->SetBackgroundThreads(1, Env::HIGH);
return this; return this;
} }
ReadOptions::ReadOptions()
: verify_checksums(true),
fill_cache(true),
snapshot(nullptr),
iterate_upper_bound(nullptr),
read_tier(kReadAllTier),
tailing(false),
managed(false),
total_order_seek(false) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
ReadOptions::ReadOptions(bool cksum, bool cache)
: verify_checksums(cksum),
fill_cache(cache),
snapshot(nullptr),
iterate_upper_bound(nullptr),
read_tier(kReadAllTier),
tailing(false),
managed(false),
total_order_seek(false) {
XFUNC_TEST("", "managed_options", managed_options, xf_manage_options,
reinterpret_cast<ReadOptions*>(this));
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

@ -4,6 +4,8 @@
// of patent rights can be found in the PATENTS file in the same directory. // of patent rights can be found in the PATENTS file in the same directory.
#include <string> #include <string>
#include "db/db_impl.h"
#include "db/managed_iterator.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "util/xfunc.h" #include "util/xfunc.h"
@ -14,6 +16,7 @@ namespace rocksdb {
std::string XFuncPoint::xfunc_test_; std::string XFuncPoint::xfunc_test_;
bool XFuncPoint::initialized_ = false; bool XFuncPoint::initialized_ = false;
bool XFuncPoint::enabled_ = false; bool XFuncPoint::enabled_ = false;
int XFuncPoint::skip_policy_ = 0;
void GetXFTestOptions(Options* options, int skip_policy) { void GetXFTestOptions(Options* options, int skip_policy) {
if (XFuncPoint::Check("inplace_lock_test") && if (XFuncPoint::Check("inplace_lock_test") &&
@ -22,6 +25,45 @@ void GetXFTestOptions(Options* options, int skip_policy) {
} }
} }
void xf_manage_release(ManagedIterator* iter) {
if (!(XFuncPoint::GetSkip() & kSkipNoPrefix)) {
iter->ReleaseIter(false);
}
}
void xf_manage_options(ReadOptions* read_options) {
if (!XFuncPoint::Check("managed_xftest_dropold") &&
(!XFuncPoint::Check("managed_xftest_release"))) {
return;
}
read_options->managed = true;
}
void xf_manage_new(DBImpl* db, ReadOptions* read_options,
bool is_snapshot_supported) {
if ((!XFuncPoint::Check("managed_xftest_dropold") &&
(!XFuncPoint::Check("managed_xftest_release"))) ||
(!read_options->managed)) {
return;
}
if ((!read_options->tailing) && (read_options->snapshot == nullptr) &&
(!is_snapshot_supported)) {
read_options->managed = false;
return;
}
if (db->GetOptions().prefix_extractor != nullptr) {
if (strcmp(db->GetOptions().table_factory.get()->Name(), "PlainTable")) {
if (!(XFuncPoint::GetSkip() & kSkipNoPrefix)) {
read_options->total_order_seek = true;
}
} else {
read_options->managed = false;
}
}
}
void xf_manage_create(ManagedIterator* iter) { iter->SetDropOld(false); }
} // namespace rocksdb } // namespace rocksdb
#endif // XFUNC #endif // XFUNC

@ -29,7 +29,14 @@ namespace rocksdb {
#else #else
struct Options; struct Options;
class ManagedIterator;
class DBImpl;
void GetXFTestOptions(Options* options, int skip_policy); void GetXFTestOptions(Options* options, int skip_policy);
void xf_manage_release(ManagedIterator* iter);
void xf_manage_new(DBImpl* db, ReadOptions* readoptions,
bool is_snapshot_supported);
void xf_manage_create(ManagedIterator* iter);
void xf_manage_options(ReadOptions* read_options);
// This class provides the facility to run custom code to test a specific // This class provides the facility to run custom code to test a specific
// feature typically with all existing unit tests. // feature typically with all existing unit tests.
@ -66,10 +73,14 @@ class XFuncPoint {
((test.compare("") == 0) || (test.compare(xfunc_test_) == 0))); ((test.compare("") == 0) || (test.compare(xfunc_test_) == 0)));
} }
static void SetSkip(int skip) { skip_policy_ = skip; }
static int GetSkip(void) { return skip_policy_; }
private: private:
static std::string xfunc_test_; static std::string xfunc_test_;
static bool initialized_; static bool initialized_;
static bool enabled_; static bool enabled_;
static int skip_policy_;
}; };
// Use XFUNC_TEST to specify cross functional test points inside the code base. // Use XFUNC_TEST to specify cross functional test points inside the code base.

Loading…
Cancel
Save