From 9d50afc3b99eb04e7231c757e4aca8d79759b65a Mon Sep 17 00:00:00 2001 From: Venkatesh Radhakrishnan Date: Thu, 5 Nov 2015 13:24:05 -0800 Subject: [PATCH] Prefix-based iterating only shows keys in prefix Summary: MyRocks testing found an issue that while iterating over keys that are outside the prefix, sometimes wrong results were seen for keys outside the prefix. We now tighten the range of keys seen with a new read option called prefix_seen_at_start. This remembers the starting prefix and then compares it on a Next for equality of prefix. If they are from a different prefix, it sets valid to false. Test Plan: PrefixTest.PrefixValid Reviewers: IslamAbdelRahman, sdong, yhchiang, anthony Reviewed By: anthony Subscribers: spetrunia, hermanlee4, yoshinorim, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D50211 --- db/db_impl.cc | 11 ++++---- db/db_iter.cc | 44 +++++++++++++++++++++++------- db/db_iter.h | 10 ++++--- db/prefix_test.cc | 57 +++++++++++++++++++++++++++++++++++++++ include/rocksdb/options.h | 8 ++++++ util/options.cc | 6 +++-- 6 files changed, 115 insertions(+), 21 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index c9af825df..40b7ae817 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3573,10 +3573,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, #else SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); auto iter = new ForwardIterator(this, read_options, cfd, sv); - return NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter, + return NewDBIterator( + env_, *cfd->ioptions(), cfd->user_comparator(), iter, kMaxSequenceNumber, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_options.iterate_upper_bound); + read_options.iterate_upper_bound, read_options.prefix_same_as_start); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); @@ -3631,9 +3632,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, // likely that any iterator pointer is close to the iterator it points to so // that they are likely to be in the same cache line and/or page. ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, *cfd->ioptions(), cfd->user_comparator(), - snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_options.iterate_upper_bound); + env_, *cfd->ioptions(), cfd->user_comparator(), snapshot, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + read_options.iterate_upper_bound, read_options.prefix_same_as_start); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena()); diff --git a/db/db_iter.cc b/db/db_iter.cc index c34341da9..7c928f020 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -62,7 +62,8 @@ class DBIter: public Iterator { DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound = nullptr) + const Slice* iterate_upper_bound = nullptr, + bool prefix_same_as_start = false) : arena_mode_(arena_mode), env_(env), logger_(ioptions.info_log), @@ -74,7 +75,8 @@ class DBIter: public Iterator { valid_(false), current_entry_is_merged_(false), statistics_(ioptions.statistics), - iterate_upper_bound_(iterate_upper_bound) { + iterate_upper_bound_(iterate_upper_bound), + prefix_same_as_start_(prefix_same_as_start) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = ioptions.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -155,6 +157,8 @@ class DBIter: public Iterator { Statistics* statistics_; uint64_t max_skip_; const Slice* iterate_upper_bound_; + Slice prefix_start_; + bool prefix_same_as_start_; // No copying allowed DBIter(const DBIter&); @@ -197,6 +201,11 @@ void DBIter::Next() { RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); } } + if (valid_ && prefix_extractor_ && prefix_same_as_start_ && + prefix_extractor_->Transform(saved_key_.GetKey()) + .compare(prefix_start_) != 0) { + valid_ = false; + } } // PRE: saved_key_ has the current user key if skipping @@ -367,6 +376,11 @@ void DBIter::Prev() { RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); } } + if (valid_ && prefix_extractor_ && prefix_same_as_start_ && + prefix_extractor_->Transform(saved_key_.GetKey()) + .compare(prefix_start_) != 0) { + valid_ = false; + } } void DBIter::ReverseToBackward() { @@ -668,6 +682,9 @@ void DBIter::Seek(const Slice& target) { } else { valid_ = false; } + if (valid_ && prefix_extractor_ && prefix_same_as_start_) { + prefix_start_ = prefix_extractor_->Transform(target); + } } void DBIter::SeekToFirst() { @@ -696,6 +713,9 @@ void DBIter::SeekToFirst() { } else { valid_ = false; } + if (valid_ && prefix_extractor_ && prefix_same_as_start_) { + prefix_start_ = prefix_extractor_->Transform(saved_key_.GetKey()); + } } void DBIter::SeekToLast() { @@ -741,6 +761,9 @@ void DBIter::SeekToLast() { RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); } } + if (valid_ && prefix_extractor_ && prefix_same_as_start_) { + prefix_start_ = prefix_extractor_->Transform(saved_key_.GetKey()); + } } Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions, @@ -748,10 +771,11 @@ Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound) { + const Slice* iterate_upper_bound, + bool prefix_same_as_start) { return new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence, false, max_sequential_skip_in_iterations, - iterate_upper_bound); + iterate_upper_bound, prefix_same_as_start); } ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } @@ -780,16 +804,16 @@ void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1, ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ImmutableCFOptions& ioptions, - const Comparator* user_key_comparator, - const SequenceNumber& sequence, + const Comparator* user_key_comparator, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound) { + const Slice* iterate_upper_bound, bool prefix_same_as_start) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); Arena* arena = iter->GetArena(); auto mem = arena->AllocateAligned(sizeof(DBIter)); - DBIter* db_iter = new (mem) DBIter(env, ioptions, user_key_comparator, - nullptr, sequence, true, max_sequential_skip_in_iterations, - iterate_upper_bound); + DBIter* db_iter = + new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence, + true, max_sequential_skip_in_iterations, + iterate_upper_bound, prefix_same_as_start); iter->SetDBIter(db_iter); diff --git a/db/db_iter.h b/db/db_iter.h index 97a0b6ff7..740d8c51b 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -29,7 +29,8 @@ extern Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& options, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound = nullptr); + const Slice* iterate_upper_bound = nullptr, + bool prefix_same_as_start = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -71,8 +72,9 @@ class ArenaWrappedDBIter : public Iterator { // Generate the arena wrapped iterator class. extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ImmutableCFOptions& options, - const Comparator* user_key_comparator, - const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound = nullptr); + const Comparator* user_key_comparator, const SequenceNumber& sequence, + uint64_t max_sequential_skip_in_iterations, + const Slice* iterate_upper_bound = nullptr, + bool prefix_same_as_start = false); } // namespace rocksdb diff --git a/db/prefix_test.cc b/db/prefix_test.cc index d095d444f..a210e4d65 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -20,9 +20,11 @@ int main() { #include #include "rocksdb/comparator.h" #include "rocksdb/db.h" +#include "rocksdb/filter_policy.h" #include "rocksdb/perf_context.h" #include "rocksdb/slice_transform.h" #include "rocksdb/memtablerep.h" +#include "rocksdb/table.h" #include "util/histogram.h" #include "util/stop_watch.h" #include "util/string_util.h" @@ -163,6 +165,12 @@ class PrefixTest : public testing::Test { options.memtable_prefix_bloom_huge_page_tlb_size = FLAGS_memtable_prefix_bloom_huge_page_tlb_size; + options.prefix_extractor.reset(NewFixedPrefixTransform(8)); + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + bbto.whole_key_filtering = false; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Status s = DB::Open(options, kDbName, &db); EXPECT_OK(s); return std::shared_ptr(db); @@ -393,6 +401,55 @@ TEST_F(PrefixTest, TestResult) { } } +// Show results in prefix +TEST_F(PrefixTest, PrefixValid) { + for (int num_buckets = 1; num_buckets <= 2; num_buckets++) { + FirstOption(); + while (NextOptions(num_buckets)) { + std::cout << "*** Mem table: " << options.memtable_factory->Name() + << " number of buckets: " << num_buckets << std::endl; + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + + // Insert keys with common prefix and one key with different + Slice v16("v16"); + Slice v17("v17"); + Slice v18("v18"); + Slice v19("v19"); + PutKey(db.get(), write_options, 12345, 6, v16); + PutKey(db.get(), write_options, 12345, 7, v17); + PutKey(db.get(), write_options, 12345, 8, v18); + PutKey(db.get(), write_options, 12345, 9, v19); + PutKey(db.get(), write_options, 12346, 8, v16); + db->Flush(FlushOptions()); + db->Delete(write_options, TestKeyToSlice(TestKey(12346, 8))); + db->Flush(FlushOptions()); + read_options.prefix_same_as_start = true; + std::unique_ptr iter(db->NewIterator(read_options)); + SeekIterator(iter.get(), 12345, 6); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(v16 == iter->value()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(v17 == iter->value()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(v18 == iter->value()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(v19 == iter->value()); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_EQ(kNotFoundResult, Get(db.get(), read_options, 12346, 8)); + } + } +} + TEST_F(PrefixTest, DynamicPrefixIterator) { while (NextOptions(FLAGS_bucket_count)) { std::cout << "*** Mem table: " << options.memtable_factory->Name() diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 567c3a867..07678b667 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1290,6 +1290,14 @@ struct ReadOptions { // this option. bool total_order_seek; + // Enforce that the iterator only iterates over the same prefix as the seek. + // This option is effective only for prefix seeks, i.e. prefix_extractor is + // non-null for the column family and total_order_seek is false. Unlike + // iterate_upper_bound, prefix_same_as_start only works within a prefix + // but in both directions. + // Default: false + bool prefix_same_as_start; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/util/options.cc b/util/options.cc index 027504ac6..a19c95900 100644 --- a/util/options.cc +++ b/util/options.cc @@ -717,7 +717,8 @@ ReadOptions::ReadOptions() read_tier(kReadAllTier), tailing(false), managed(false), - total_order_seek(false) { + total_order_seek(false), + prefix_same_as_start(false) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); } @@ -730,7 +731,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache) read_tier(kReadAllTier), tailing(false), managed(false), - total_order_seek(false) { + total_order_seek(false), + prefix_same_as_start(false) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); }