created a new ReadOptions parameter 'iterate_upper_bound'

main
Raghav Pisolkar 10 years ago
parent 51ea889002
commit e0b99d4f5d
  1. 1
      HISTORY.md
  2. 7
      db/c.cc
  3. 6
      db/db_impl.cc
  4. 113
      db/db_iter.cc
  5. 5
      db/db_iter.h
  6. 138
      db/db_test.cc
  7. 4
      include/rocksdb/c.h
  8. 14
      include/rocksdb/options.h

@ -20,6 +20,7 @@
* Support Multiple DB paths in universal style compactions * Support Multiple DB paths in universal style compactions
* Add feature of storing plain table index and bloom filter in SST file. * Add feature of storing plain table index and bloom filter in SST file.
* CompactRange() will never output compacted files to level 0. This used to be the case when all the compaction input files were at level 0. * CompactRange() will never output compacted files to level 0. This used to be the case when all the compaction input files were at level 0.
* Added iterate_upper_bound to define the extent upto which the forward iterator will return entries. This will prevent iterating over delete markers and overwritten entries for edge cases where you want to break out the iterator anyways. This may improve perfomance in case there are a large number of delete markers or overwritten entries.
### Public API changes ### Public API changes
* DBOptions.db_paths now is a vector of a DBPath structure which indicates both of path and target size * DBOptions.db_paths now is a vector of a DBPath structure which indicates both of path and target size

@ -1844,6 +1844,13 @@ void rocksdb_readoptions_set_snapshot(
opt->rep.snapshot = (snap ? snap->rep : nullptr); opt->rep.snapshot = (snap ? snap->rep : nullptr);
} }
void rocksdb_readoptions_set_iterate_upper_bound(
rocksdb_readoptions_t* opt,
const char* key, size_t keylen) {
Slice prefix = Slice(key, keylen);
opt->rep.iterate_upper_bound = &prefix;
}
void rocksdb_readoptions_set_read_tier( void rocksdb_readoptions_set_read_tier(
rocksdb_readoptions_t* opt, int v) { rocksdb_readoptions_t* opt, int v) {
opt->rep.read_tier = static_cast<rocksdb::ReadTier>(v); opt->rep.read_tier = static_cast<rocksdb::ReadTier>(v);

@ -3677,7 +3677,7 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
// TODO(ljin): remove tailing iterator // TODO(ljin): remove tailing iterator
auto iter = new ForwardIterator(this, options, cfd); auto iter = new ForwardIterator(this, options, cfd);
return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter, return NewDBIterator(env_, *cfd->options(), cfd->user_comparator(), iter,
kMaxSequenceNumber); kMaxSequenceNumber, options.iterate_upper_bound);
// return new TailingIterator(env_, this, options, cfd); // return new TailingIterator(env_, this, options, cfd);
#endif #endif
} else { } else {
@ -3733,7 +3733,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options,
// likely that any iterator pointer is close to the iterator it points to so // likely that any iterator pointer is close to the iterator it points to so
// that they are likely to be in the same cache line and/or page. // that they are likely to be in the same cache line and/or page.
ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator(
env_, *cfd->options(), cfd->user_comparator(), snapshot); env_, *cfd->options(), cfd->user_comparator(),
snapshot, options.iterate_upper_bound);
Iterator* internal_iter = Iterator* internal_iter =
NewInternalIterator(options, cfd, sv, db_iter->GetArena()); NewInternalIterator(options, cfd, sv, db_iter->GetArena());
db_iter->SetIterUnderDBIter(internal_iter); db_iter->SetIterUnderDBIter(internal_iter);

@ -59,7 +59,8 @@ class DBIter: public Iterator {
}; };
DBIter(Env* env, const Options& options, const Comparator* cmp, DBIter(Env* env, const Options& options, const Comparator* cmp,
Iterator* iter, SequenceNumber s, bool arena_mode) Iterator* iter, SequenceNumber s, bool arena_mode,
const Slice* iterate_upper_bound = nullptr)
: arena_mode_(arena_mode), : arena_mode_(arena_mode),
env_(env), env_(env),
logger_(options.info_log.get()), logger_(options.info_log.get()),
@ -70,9 +71,10 @@ class DBIter: public Iterator {
direction_(kForward), direction_(kForward),
valid_(false), valid_(false),
current_entry_is_merged_(false), current_entry_is_merged_(false),
statistics_(options.statistics.get()) { statistics_(options.statistics.get()),
iterate_upper_bound_(iterate_upper_bound) {
RecordTick(statistics_, NO_ITERATORS); RecordTick(statistics_, NO_ITERATORS);
has_prefix_extractor_ = (options.prefix_extractor.get() != nullptr); prefix_extractor_ = options.prefix_extractor.get();
max_skip_ = options.max_sequential_skip_in_iterations; max_skip_ = options.max_sequential_skip_in_iterations;
} }
virtual ~DBIter() { virtual ~DBIter() {
@ -132,7 +134,7 @@ class DBIter: public Iterator {
} }
} }
bool has_prefix_extractor_; const SliceTransform* prefix_extractor_;
bool arena_mode_; bool arena_mode_;
Env* const env_; Env* const env_;
Logger* logger_; Logger* logger_;
@ -149,6 +151,7 @@ class DBIter: public Iterator {
bool current_entry_is_merged_; bool current_entry_is_merged_;
Statistics* statistics_; Statistics* statistics_;
uint64_t max_skip_; uint64_t max_skip_;
const Slice* iterate_upper_bound_;
// No copying allowed // No copying allowed
DBIter(const DBIter&); DBIter(const DBIter&);
@ -207,36 +210,44 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
uint64_t num_skipped = 0; uint64_t num_skipped = 0;
do { do {
ParsedInternalKey ikey; ParsedInternalKey ikey;
if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
if (skipping && if (ParseKey(&ikey)) {
user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) { if (iterate_upper_bound_ != nullptr &&
num_skipped++; // skip this entry ikey.user_key.compare(*iterate_upper_bound_) >= 0) {
PERF_COUNTER_ADD(internal_key_skipped_count, 1); break;
} else { }
skipping = false;
switch (ikey.type) { if (ikey.sequence <= sequence_) {
case kTypeDeletion: if (skipping &&
// Arrange to skip all upcoming entries for this key since user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) <= 0) {
// they are hidden by this deletion. num_skipped++; // skip this entry
saved_key_.SetKey(ikey.user_key); PERF_COUNTER_ADD(internal_key_skipped_count, 1);
skipping = true; } else {
num_skipped = 0; skipping = false;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1); switch (ikey.type) {
break; case kTypeDeletion:
case kTypeValue: // Arrange to skip all upcoming entries for this key since
valid_ = true; // they are hidden by this deletion.
saved_key_.SetKey(ikey.user_key); saved_key_.SetKey(ikey.user_key);
return; skipping = true;
case kTypeMerge: num_skipped = 0;
// By now, we are sure the current ikey is going to yield a value PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
saved_key_.SetKey(ikey.user_key); break;
current_entry_is_merged_ = true; case kTypeValue:
valid_ = true; valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine saved_key_.SetKey(ikey.user_key);
return; return;
default: case kTypeMerge:
assert(false); // By now, we are sure the current ikey is going to yield a value
break; saved_key_.SetKey(ikey.user_key);
current_entry_is_merged_ = true;
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
default:
assert(false);
break;
}
} }
} }
} }
@ -398,6 +409,7 @@ bool DBIter::FindValueForCurrentKey() {
case kTypeDeletion: case kTypeDeletion:
operands.clear(); operands.clear();
last_not_merge_type = kTypeDeletion; last_not_merge_type = kTypeDeletion;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break; break;
case kTypeMerge: case kTypeMerge:
assert(user_merge_operator_ != nullptr); assert(user_merge_operator_ != nullptr);
@ -407,6 +419,7 @@ bool DBIter::FindValueForCurrentKey() {
assert(false); assert(false);
} }
PERF_COUNTER_ADD(internal_key_skipped_count, 1);
assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0); assert(user_comparator_->Compare(ikey.user_key, saved_key_.GetKey()) == 0);
iter_->Prev(); iter_->Prev();
++num_skipped; ++num_skipped;
@ -553,6 +566,20 @@ void DBIter::FindParseableKey(ParsedInternalKey* ikey, Direction direction) {
void DBIter::Seek(const Slice& target) { void DBIter::Seek(const Slice& target) {
StopWatch sw(env_, statistics_, DB_SEEK); StopWatch sw(env_, statistics_, DB_SEEK);
// total ordering is not guaranteed if prefix_extractor is set
// hence prefix based seeks will not give correct results
if (iterate_upper_bound_ != nullptr && prefix_extractor_ != nullptr) {
if (!prefix_extractor_->InDomain(*iterate_upper_bound_) ||
!prefix_extractor_->InDomain(target) ||
prefix_extractor_->Transform(*iterate_upper_bound_).compare(
prefix_extractor_->Transform(target)) != 0) {
status_ = Status::InvalidArgument("read_options.iterate_*_bound "
" and seek target need to have the same prefix.");
valid_ = false;
return;
}
}
saved_key_.Clear(); saved_key_.Clear();
// now savved_key is used to store internal key. // now savved_key is used to store internal key.
saved_key_.SetInternalKey(target, sequence_); saved_key_.SetInternalKey(target, sequence_);
@ -574,7 +601,7 @@ void DBIter::Seek(const Slice& target) {
void DBIter::SeekToFirst() { void DBIter::SeekToFirst() {
// Don't use iter_::Seek() if we set a prefix extractor // Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek wiil be used. // because prefix seek wiil be used.
if (has_prefix_extractor_) { if (prefix_extractor_ != nullptr) {
max_skip_ = std::numeric_limits<uint64_t>::max(); max_skip_ = std::numeric_limits<uint64_t>::max();
} }
direction_ = kForward; direction_ = kForward;
@ -595,7 +622,7 @@ void DBIter::SeekToFirst() {
void DBIter::SeekToLast() { void DBIter::SeekToLast() {
// Don't use iter_::Seek() if we set a prefix extractor // Don't use iter_::Seek() if we set a prefix extractor
// because prefix seek wiil be used. // because prefix seek wiil be used.
if (has_prefix_extractor_) { if (prefix_extractor_ != nullptr) {
max_skip_ = std::numeric_limits<uint64_t>::max(); max_skip_ = std::numeric_limits<uint64_t>::max();
} }
direction_ = kReverse; direction_ = kReverse;
@ -612,9 +639,10 @@ void DBIter::SeekToLast() {
Iterator* NewDBIterator(Env* env, const Options& options, Iterator* NewDBIterator(Env* env, const Options& options,
const Comparator* user_key_comparator, const Comparator* user_key_comparator,
Iterator* internal_iter, Iterator* internal_iter,
const SequenceNumber& sequence) { const SequenceNumber& sequence,
const Slice* iterate_upper_bound) {
return new DBIter(env, options, user_key_comparator, internal_iter, sequence, return new DBIter(env, options, user_key_comparator, internal_iter, sequence,
false); false, iterate_upper_bound);
} }
ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); }
@ -643,13 +671,16 @@ void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1,
ArenaWrappedDBIter* NewArenaWrappedDbIterator( ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const Options& options, const Comparator* user_key_comparator, Env* env, const Options& options, const Comparator* user_key_comparator,
const SequenceNumber& sequence) { const SequenceNumber& sequence,
const Slice* iterate_upper_bound) {
ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); ArenaWrappedDBIter* iter = new ArenaWrappedDBIter();
Arena* arena = iter->GetArena(); Arena* arena = iter->GetArena();
auto mem = arena->AllocateAligned(sizeof(DBIter)); auto mem = arena->AllocateAligned(sizeof(DBIter));
DBIter* db_iter = new (mem) DBIter* db_iter = new (mem) DBIter(env, options, user_key_comparator,
DBIter(env, options, user_key_comparator, nullptr, sequence, true); nullptr, sequence, true, iterate_upper_bound);
iter->SetDBIter(db_iter); iter->SetDBIter(db_iter);
return iter; return iter;
} }

@ -27,7 +27,8 @@ extern Iterator* NewDBIterator(
const Options& options, const Options& options,
const Comparator *user_key_comparator, const Comparator *user_key_comparator,
Iterator* internal_iter, Iterator* internal_iter,
const SequenceNumber& sequence); const SequenceNumber& sequence,
const Slice* iterate_upper_bound = nullptr);
// A wrapper iterator which wraps DB Iterator and the arena, with which the DB // A wrapper iterator which wraps DB Iterator and the arena, with which the DB
// iterator is supposed be allocated. This class is used as an entry point of // iterator is supposed be allocated. This class is used as an entry point of
@ -68,6 +69,6 @@ class ArenaWrappedDBIter : public Iterator {
// Generate the arena wrapped iterator class. // Generate the arena wrapped iterator class.
extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( extern ArenaWrappedDBIter* NewArenaWrappedDbIterator(
Env* env, const Options& options, const Comparator* user_key_comparator, Env* env, const Options& options, const Comparator* user_key_comparator,
const SequenceNumber& sequence); const SequenceNumber& sequence, const Slice* iterate_upper_bound = nullptr);
} // namespace rocksdb } // namespace rocksdb

@ -7743,6 +7743,144 @@ TEST(DBTest, TableOptionsSanitizeTest) {
ASSERT_TRUE(TryReopen(&options).IsNotSupported()); ASSERT_TRUE(TryReopen(&options).IsNotSupported());
} }
TEST(DBTest, DBIteratorBoundTest) {
Options options;
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor = nullptr;
DestroyAndReopen(&options);
ASSERT_OK(Put("a", "0"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("g1", "0"));
// testing basic case with no iterate_upper_bound and no prefix_extractor
{
ReadOptions ro;
ro.iterate_upper_bound = nullptr;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo1")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("g1")), 0);
}
// testing iterate_upper_bound and forward iterator
// to make sure it stops at bound
{
ReadOptions ro;
// iterate_upper_bound points beyond the last expected entry
ro.iterate_upper_bound = new Slice("foo2");
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(("foo1")), 0);
iter->Next();
// should stop here...
ASSERT_TRUE(!iter->Valid());
}
// prefix is the first letter of the key
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
DestroyAndReopen(&options);
ASSERT_OK(Put("a", "0"));
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("g1", "0"));
// testing with iterate_upper_bound and prefix_extractor
// Seek target and iterate_upper_bound are not is same prefix
// This should be an error
{
ReadOptions ro;
ro.iterate_upper_bound = new Slice("g1");
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(!iter->Valid());
ASSERT_TRUE(iter->status().IsInvalidArgument());
}
// testing that iterate_upper_bound prevents iterating over deleted items
// if the bound has already reached
{
options.prefix_extractor = nullptr;
DestroyAndReopen(&options);
ASSERT_OK(Put("a", "0"));
ASSERT_OK(Put("b", "0"));
ASSERT_OK(Put("b1", "0"));
ASSERT_OK(Put("c", "0"));
ASSERT_OK(Put("d", "0"));
ASSERT_OK(Put("e", "0"));
ASSERT_OK(Delete("c"));
ASSERT_OK(Delete("d"));
// base case with no bound
ReadOptions ro;
ro.iterate_upper_bound = nullptr;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->Seek("b");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("b")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(("b1")), 0);
perf_context.Reset();
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(static_cast<int>(perf_context.internal_delete_skipped_count), 2);
// now testing with iterate_bound
ro.iterate_upper_bound = new Slice("c");
iter.reset(db_->NewIterator(ro));
perf_context.Reset();
iter->Seek("b");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("b")), 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(("b1")), 0);
iter->Next();
// the iteration should stop as soon as the the bound key is reached
// even though the key is deleted
// hence internal_delete_skipped_count should be 0
ASSERT_TRUE(!iter->Valid());
ASSERT_EQ(static_cast<int>(perf_context.internal_delete_skipped_count), 0);
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -698,6 +698,10 @@ extern void rocksdb_readoptions_set_fill_cache(
extern void rocksdb_readoptions_set_snapshot( extern void rocksdb_readoptions_set_snapshot(
rocksdb_readoptions_t*, rocksdb_readoptions_t*,
const rocksdb_snapshot_t*); const rocksdb_snapshot_t*);
extern void rocksdb_readoptions_set_iterate_upper_bound(
rocksdb_readoptions_t*,
const char* key,
size_t keylen);
extern void rocksdb_readoptions_set_read_tier( extern void rocksdb_readoptions_set_read_tier(
rocksdb_readoptions_t*, int); rocksdb_readoptions_t*, int);
extern void rocksdb_readoptions_set_tailing( extern void rocksdb_readoptions_set_tailing(

@ -903,6 +903,18 @@ struct ReadOptions {
// ! DEPRECATED // ! DEPRECATED
// const Slice* prefix; // const Slice* prefix;
// "iterate_upper_bound" defines the extent upto which the forward iterator
// can returns entries. Once the bound is reached, Valid() will be false.
// "iterate_upper_bound" is exclusive ie the bound value is
// not a valid entry. If iterator_extractor is not null, the Seek target
// and iterator_upper_bound need to have the same prefix.
// This is because ordering is not guaranteed outside of prefix domain.
// There is no lower bound on the iterator. If needed, that can be easily
// implemented
//
// Default: nullptr
const Slice* iterate_upper_bound;
// Specify if this read request should process data that ALREADY // Specify if this read request should process data that ALREADY
// resides on a particular cache. If the required data is not // resides on a particular cache. If the required data is not
// found at the specified cache, then Status::Incomplete is returned. // found at the specified cache, then Status::Incomplete is returned.
@ -926,6 +938,7 @@ struct ReadOptions {
: verify_checksums(true), : verify_checksums(true),
fill_cache(true), fill_cache(true),
snapshot(nullptr), snapshot(nullptr),
iterate_upper_bound(nullptr),
read_tier(kReadAllTier), read_tier(kReadAllTier),
tailing(false), tailing(false),
total_order_seek(false) {} total_order_seek(false) {}
@ -933,6 +946,7 @@ struct ReadOptions {
: verify_checksums(cksum), : verify_checksums(cksum),
fill_cache(cache), fill_cache(cache),
snapshot(nullptr), snapshot(nullptr),
iterate_upper_bound(nullptr),
read_tier(kReadAllTier), read_tier(kReadAllTier),
tailing(false), tailing(false),
total_order_seek(false) {} total_order_seek(false) {}

Loading…
Cancel
Save