diff --git a/db/db_impl.cc b/db/db_impl.cc index c9af825df..40b7ae817 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3573,10 +3573,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, #else SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); auto iter = new ForwardIterator(this, read_options, cfd, sv); - return NewDBIterator(env_, *cfd->ioptions(), cfd->user_comparator(), iter, + return NewDBIterator( + env_, *cfd->ioptions(), cfd->user_comparator(), iter, kMaxSequenceNumber, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_options.iterate_upper_bound); + read_options.iterate_upper_bound, read_options.prefix_same_as_start); #endif } else { SequenceNumber latest_snapshot = versions_->LastSequence(); @@ -3631,9 +3632,9 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, // likely that any iterator pointer is close to the iterator it points to so // that they are likely to be in the same cache line and/or page. ArenaWrappedDBIter* db_iter = NewArenaWrappedDbIterator( - env_, *cfd->ioptions(), cfd->user_comparator(), - snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, - read_options.iterate_upper_bound); + env_, *cfd->ioptions(), cfd->user_comparator(), snapshot, + sv->mutable_cf_options.max_sequential_skip_in_iterations, + read_options.iterate_upper_bound, read_options.prefix_same_as_start); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena()); diff --git a/db/db_iter.cc b/db/db_iter.cc index c34341da9..7c928f020 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -62,7 +62,8 @@ class DBIter: public Iterator { DBIter(Env* env, const ImmutableCFOptions& ioptions, const Comparator* cmp, InternalIterator* iter, SequenceNumber s, bool arena_mode, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound = nullptr) + const Slice* iterate_upper_bound = nullptr, + bool prefix_same_as_start = false) : arena_mode_(arena_mode), env_(env), logger_(ioptions.info_log), @@ -74,7 +75,8 @@ class DBIter: public Iterator { valid_(false), current_entry_is_merged_(false), statistics_(ioptions.statistics), - iterate_upper_bound_(iterate_upper_bound) { + iterate_upper_bound_(iterate_upper_bound), + prefix_same_as_start_(prefix_same_as_start) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = ioptions.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -155,6 +157,8 @@ class DBIter: public Iterator { Statistics* statistics_; uint64_t max_skip_; const Slice* iterate_upper_bound_; + Slice prefix_start_; + bool prefix_same_as_start_; // No copying allowed DBIter(const DBIter&); @@ -197,6 +201,11 @@ void DBIter::Next() { RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); } } + if (valid_ && prefix_extractor_ && prefix_same_as_start_ && + prefix_extractor_->Transform(saved_key_.GetKey()) + .compare(prefix_start_) != 0) { + valid_ = false; + } } // PRE: saved_key_ has the current user key if skipping @@ -367,6 +376,11 @@ void DBIter::Prev() { RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); } } + if (valid_ && prefix_extractor_ && prefix_same_as_start_ && + prefix_extractor_->Transform(saved_key_.GetKey()) + .compare(prefix_start_) != 0) { + valid_ = false; + } } void DBIter::ReverseToBackward() { @@ -668,6 +682,9 @@ void DBIter::Seek(const Slice& target) { } else { valid_ = false; } + if (valid_ && prefix_extractor_ && prefix_same_as_start_) { + prefix_start_ = prefix_extractor_->Transform(target); + } } void DBIter::SeekToFirst() { @@ -696,6 +713,9 @@ void DBIter::SeekToFirst() { } else { valid_ = false; } + if (valid_ && prefix_extractor_ && prefix_same_as_start_) { + prefix_start_ = prefix_extractor_->Transform(saved_key_.GetKey()); + } } void DBIter::SeekToLast() { @@ -741,6 +761,9 @@ void DBIter::SeekToLast() { RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); } } + if (valid_ && prefix_extractor_ && prefix_same_as_start_) { + prefix_start_ = prefix_extractor_->Transform(saved_key_.GetKey()); + } } Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions, @@ -748,10 +771,11 @@ Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound) { + const Slice* iterate_upper_bound, + bool prefix_same_as_start) { return new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence, false, max_sequential_skip_in_iterations, - iterate_upper_bound); + iterate_upper_bound, prefix_same_as_start); } ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } @@ -780,16 +804,16 @@ void ArenaWrappedDBIter::RegisterCleanup(CleanupFunction function, void* arg1, ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ImmutableCFOptions& ioptions, - const Comparator* user_key_comparator, - const SequenceNumber& sequence, + const Comparator* user_key_comparator, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound) { + const Slice* iterate_upper_bound, bool prefix_same_as_start) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); Arena* arena = iter->GetArena(); auto mem = arena->AllocateAligned(sizeof(DBIter)); - DBIter* db_iter = new (mem) DBIter(env, ioptions, user_key_comparator, - nullptr, sequence, true, max_sequential_skip_in_iterations, - iterate_upper_bound); + DBIter* db_iter = + new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence, + true, max_sequential_skip_in_iterations, + iterate_upper_bound, prefix_same_as_start); iter->SetDBIter(db_iter); diff --git a/db/db_iter.h b/db/db_iter.h index 97a0b6ff7..740d8c51b 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -29,7 +29,8 @@ extern Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& options, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound = nullptr); + const Slice* iterate_upper_bound = nullptr, + bool prefix_same_as_start = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -71,8 +72,9 @@ class ArenaWrappedDBIter : public Iterator { // Generate the arena wrapped iterator class. extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ImmutableCFOptions& options, - const Comparator* user_key_comparator, - const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, - const Slice* iterate_upper_bound = nullptr); + const Comparator* user_key_comparator, const SequenceNumber& sequence, + uint64_t max_sequential_skip_in_iterations, + const Slice* iterate_upper_bound = nullptr, + bool prefix_same_as_start = false); } // namespace rocksdb diff --git a/db/prefix_test.cc b/db/prefix_test.cc index d095d444f..a210e4d65 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -20,9 +20,11 @@ int main() { #include #include "rocksdb/comparator.h" #include "rocksdb/db.h" +#include "rocksdb/filter_policy.h" #include "rocksdb/perf_context.h" #include "rocksdb/slice_transform.h" #include "rocksdb/memtablerep.h" +#include "rocksdb/table.h" #include "util/histogram.h" #include "util/stop_watch.h" #include "util/string_util.h" @@ -163,6 +165,12 @@ class PrefixTest : public testing::Test { options.memtable_prefix_bloom_huge_page_tlb_size = FLAGS_memtable_prefix_bloom_huge_page_tlb_size; + options.prefix_extractor.reset(NewFixedPrefixTransform(8)); + BlockBasedTableOptions bbto; + bbto.filter_policy.reset(NewBloomFilterPolicy(10, false)); + bbto.whole_key_filtering = false; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + Status s = DB::Open(options, kDbName, &db); EXPECT_OK(s); return std::shared_ptr(db); @@ -393,6 +401,55 @@ TEST_F(PrefixTest, TestResult) { } } +// Show results in prefix +TEST_F(PrefixTest, PrefixValid) { + for (int num_buckets = 1; num_buckets <= 2; num_buckets++) { + FirstOption(); + while (NextOptions(num_buckets)) { + std::cout << "*** Mem table: " << options.memtable_factory->Name() + << " number of buckets: " << num_buckets << std::endl; + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + + // Insert keys with common prefix and one key with different + Slice v16("v16"); + Slice v17("v17"); + Slice v18("v18"); + Slice v19("v19"); + PutKey(db.get(), write_options, 12345, 6, v16); + PutKey(db.get(), write_options, 12345, 7, v17); + PutKey(db.get(), write_options, 12345, 8, v18); + PutKey(db.get(), write_options, 12345, 9, v19); + PutKey(db.get(), write_options, 12346, 8, v16); + db->Flush(FlushOptions()); + db->Delete(write_options, TestKeyToSlice(TestKey(12346, 8))); + db->Flush(FlushOptions()); + read_options.prefix_same_as_start = true; + std::unique_ptr iter(db->NewIterator(read_options)); + SeekIterator(iter.get(), 12345, 6); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(v16 == iter->value()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(v17 == iter->value()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(v18 == iter->value()); + + iter->Next(); + ASSERT_TRUE(iter->Valid()); + ASSERT_TRUE(v19 == iter->value()); + iter->Next(); + ASSERT_FALSE(iter->Valid()); + ASSERT_EQ(kNotFoundResult, Get(db.get(), read_options, 12346, 8)); + } + } +} + TEST_F(PrefixTest, DynamicPrefixIterator) { while (NextOptions(FLAGS_bucket_count)) { std::cout << "*** Mem table: " << options.memtable_factory->Name() diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 567c3a867..07678b667 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1290,6 +1290,14 @@ struct ReadOptions { // this option. bool total_order_seek; + // Enforce that the iterator only iterates over the same prefix as the seek. + // This option is effective only for prefix seeks, i.e. prefix_extractor is + // non-null for the column family and total_order_seek is false. Unlike + // iterate_upper_bound, prefix_same_as_start only works within a prefix + // but in both directions. + // Default: false + bool prefix_same_as_start; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/util/options.cc b/util/options.cc index 027504ac6..a19c95900 100644 --- a/util/options.cc +++ b/util/options.cc @@ -717,7 +717,8 @@ ReadOptions::ReadOptions() read_tier(kReadAllTier), tailing(false), managed(false), - total_order_seek(false) { + total_order_seek(false), + prefix_same_as_start(false) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); } @@ -730,7 +731,8 @@ ReadOptions::ReadOptions(bool cksum, bool cache) read_tier(kReadAllTier), tailing(false), managed(false), - total_order_seek(false) { + total_order_seek(false), + prefix_same_as_start(false) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); }