diff --git a/db/db_impl.cc b/db/db_impl.cc index 5508a2056..03510ccad 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3718,7 +3718,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, InternalIterator* internal_iter; assert(arena != nullptr); // Need to create internal iterator from the arena. - MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena); + MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena, + cfd->ioptions()->prefix_extractor); // Collect iterator for mutable mem merge_iter_builder.AddIterator( super_version->mem->NewIterator(read_options, arena)); diff --git a/db/db_impl.h b/db/db_impl.h index 7986c1da0..b937118ab 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -21,6 +21,7 @@ #include "db/column_family.h" #include "db/compaction_job.h" +#include "db/db_iter.h" #include "db/dbformat.h" #include "db/flush_job.h" #include "db/flush_scheduler.h" diff --git a/db/db_iter.cc b/db/db_iter.cc index 2a12534f2..c004fb99c 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -116,6 +116,7 @@ class DBIter: public Iterator { direction_(kForward), valid_(false), current_entry_is_merged_(false), + prefix_is_saved_(false), statistics_(ioptions.statistics), version_number_(version_number), iterate_upper_bound_(iterate_upper_bound), @@ -203,6 +204,7 @@ class DBIter: public Iterator { virtual void SeekToLast() override; private: + void ReverseToForward(); void ReverseToBackward(); void PrevInternal(); void FindParseableKey(ParsedInternalKey* ikey, Direction direction); @@ -255,6 +257,8 @@ class DBIter: public Iterator { Direction direction_; bool valid_; bool current_entry_is_merged_; + // for prefix seek mode to support prev() + bool prefix_is_saved_; Statistics* statistics_; uint64_t max_skip_; uint64_t version_number_; @@ -293,11 +297,7 @@ void DBIter::Next() { // Release temporarily pinned blocks from last operation ReleaseTempPinnedData(); if (direction_ == kReverse) { - FindNextUserKey(); - direction_ = kForward; - if (!iter_->Valid()) { - iter_->SeekToFirst(); - } + ReverseToForward(); } else if (iter_->Valid() && !current_entry_is_merged_) { // If the current value is not a merge, the iter position is the // current key, which is already returned. We can safely issue a @@ -510,7 +510,20 @@ void DBIter::Prev() { } } +void DBIter::ReverseToForward() { + FindNextUserKey(); + direction_ = kForward; + if (!iter_->Valid()) { + iter_->SeekToFirst(); + } +} + void DBIter::ReverseToBackward() { + if (prefix_extractor_ != nullptr) { + Slice prefix = prefix_extractor_->Transform(key()); + iter_->ResetPrefix(&prefix); + prefix_is_saved_ = true; + } if (current_entry_is_merged_) { // Not placed in the same key. Need to call Prev() until finding the // previous key. @@ -729,6 +742,11 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() { // Don't use Seek(), // because next user key will be very close void DBIter::FindNextUserKey() { + if (prefix_extractor_ != nullptr) { + Slice prefix = prefix_extractor_->Transform(key()); + iter_->ResetPrefix(&prefix); + prefix_is_saved_ = true; + } if (!iter_->Valid()) { return; } @@ -792,7 +810,6 @@ void DBIter::Seek(const Slice& target) { PERF_TIMER_GUARD(seek_internal_seek_time); iter_->Seek(saved_key_.GetKey()); } - RecordTick(statistics_, NUMBER_DB_SEEK); if (iter_->Valid()) { if (prefix_extractor_ && prefix_same_as_start_) { @@ -813,6 +830,11 @@ void DBIter::Seek(const Slice& target) { } else { valid_ = false; } + // Need to reset prefix if change direction + if (prefix_is_saved_) { + iter_->ResetPrefix(); + prefix_is_saved_ = false; + } if (valid_ && prefix_extractor_ && prefix_same_as_start_) { prefix_start_buf_.SetKey(prefix_start_key_); prefix_start_key_ = prefix_start_buf_.GetKey(); diff --git a/db/prefix_test.cc b/db/prefix_test.cc index 1fb22994e..f8dc51911 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -21,14 +21,16 @@ int main() { #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/filter_policy.h" +#include "rocksdb/memtablerep.h" #include "rocksdb/perf_context.h" #include "rocksdb/slice_transform.h" -#include "rocksdb/memtablerep.h" #include "rocksdb/table.h" #include "util/histogram.h" +#include "util/random.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/testharness.h" +#include "utilities/merge_operators.h" using GFLAGS::ParseCommandLineFlags; @@ -46,6 +48,7 @@ DEFINE_int32(skiplist_height, 4, ""); DEFINE_double(memtable_prefix_bloom_size_ratio, 0.1, ""); DEFINE_int32(memtable_huge_page_size, 2 * 1024 * 1024, ""); DEFINE_int32(value_size, 40, ""); +DEFINE_bool(enable_print, false, "Print options generated to console."); // Path to the database on file system const std::string kDbName = rocksdb::test::TmpDir() + "/prefix_test"; @@ -106,6 +109,10 @@ class TestKeyComparator : public Comparator { return 0; } + bool operator()(const TestKey& a, const TestKey& b) const { + return Compare(TestKeyToSlice(a), TestKeyToSlice(b)) < 0; + } + virtual const char* Name() const override { return "TestKeyComparator"; } @@ -124,6 +131,23 @@ void PutKey(DB* db, WriteOptions write_options, uint64_t prefix, ASSERT_OK(db->Put(write_options, key, value)); } +void PutKey(DB* db, WriteOptions write_options, const TestKey& test_key, + const Slice& value) { + Slice key = TestKeyToSlice(test_key); + ASSERT_OK(db->Put(write_options, key, value)); +} + +void MergeKey(DB* db, WriteOptions write_options, const TestKey& test_key, + const Slice& value) { + Slice key = TestKeyToSlice(test_key); + ASSERT_OK(db->Merge(write_options, key, value)); +} + +void DeleteKey(DB* db, WriteOptions write_options, const TestKey& test_key) { + Slice key = TestKeyToSlice(test_key); + ASSERT_OK(db->Delete(write_options, key)); +} + void SeekIterator(Iterator* iter, uint64_t prefix, uint64_t suffix) { TestKey test_key(prefix, suffix); Slice key = TestKeyToSlice(test_key); @@ -629,8 +653,115 @@ TEST_F(PrefixTest, DynamicPrefixIterator) { } } +TEST_F(PrefixTest, PrefixSeekModePrev) { + // Only for SkipListFactory + options.memtable_factory.reset(new SkipListFactory); + options.merge_operator = MergeOperators::CreatePutOperator(); + options.write_buffer_size = 1024 * 1024; + Random rnd(1); + for (size_t m = 1; m < 100; m++) { + std::cout << "[" + std::to_string(m) + "]" + "*** Mem table: " + << options.memtable_factory->Name() << std::endl; + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + std::map entry_maps[3], whole_map; + for (uint64_t i = 0; i < 10; i++) { + int div = i % 3 + 1; + for (uint64_t j = 0; j < 10; j++) { + whole_map[TestKey(i, j)] = entry_maps[rnd.Uniform(div)][TestKey(i, j)] = + 'v' + std::to_string(i) + std::to_string(j); + } + } + + std::map type_map; + for (size_t i = 0; i < 3; i++) { + for (auto& kv : entry_maps[i]) { + if (rnd.OneIn(3)) { + PutKey(db.get(), write_options, kv.first, kv.second); + type_map[kv.first] = "value"; + } else { + MergeKey(db.get(), write_options, kv.first, kv.second); + type_map[kv.first] = "merge"; + } + } + if (i < 2) { + db->Flush(FlushOptions()); + } + } + + for (size_t i = 0; i < 2; i++) { + for (auto& kv : entry_maps[i]) { + if (rnd.OneIn(10)) { + whole_map.erase(kv.first); + DeleteKey(db.get(), write_options, kv.first); + entry_maps[2][kv.first] = "delete"; + } + } + } + + if (FLAGS_enable_print) { + for (size_t i = 0; i < 3; i++) { + for (auto& kv : entry_maps[i]) { + std::cout << "[" << i << "]" << kv.first.prefix << kv.first.sorted + << " " << kv.second + " " + type_map[kv.first] << std::endl; + } + } + } + + std::unique_ptr iter(db->NewIterator(read_options)); + for (uint64_t prefix = 0; prefix < 10; prefix++) { + uint64_t start_suffix = rnd.Uniform(9); + SeekIterator(iter.get(), prefix, start_suffix); + auto it = whole_map.find(TestKey(prefix, start_suffix)); + if (it == whole_map.end()) { + continue; + } + ASSERT_NE(it, whole_map.end()); + ASSERT_TRUE(iter->Valid()); + if (FLAGS_enable_print) { + std::cout << "round " << prefix + << " iter: " << SliceToTestKey(iter->key())->prefix + << SliceToTestKey(iter->key())->sorted + << " | map: " << it->first.prefix << it->first.sorted << " | " + << iter->value().ToString() << " " << it->second << std::endl; + } + ASSERT_EQ(iter->value(), it->second); + for (size_t k = 0; k < 9; k++) { + if (rnd.OneIn(2) || it == whole_map.begin()) { + iter->Next(); + it++; + if (FLAGS_enable_print) { + std::cout << "Next >> "; + } + } else { + iter->Prev(); + it--; + if (FLAGS_enable_print) { + std::cout << "Prev >> "; + } + } + if (!iter->Valid() || SliceToTestKey(iter->key())->prefix != prefix) { + break; + } + ASSERT_TRUE(iter->Valid()); + ASSERT_NE(it, whole_map.end()); + ASSERT_EQ(iter->value(), it->second); + if (FLAGS_enable_print) { + std::cout << "iter: " << SliceToTestKey(iter->key())->prefix + << SliceToTestKey(iter->key())->sorted + << " | map: " << it->first.prefix << it->first.sorted + << " | " << iter->value().ToString() << " " << it->second + << std::endl; + } + } + } + } } +} // end namespace rocksdb + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); ParseCommandLineFlags(&argc, &argv, true); diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 2850a6773..0f7f0233b 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -89,6 +89,8 @@ class InternalIterator : public Cleanable { return Status::NotSupported(""); } + virtual void ResetPrefix(const Slice* prefix = nullptr) {} + private: // No copying allowed InternalIterator(const InternalIterator&) = delete; diff --git a/table/merger.cc b/table/merger.cc index 637959d9a..5733d279f 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -6,11 +6,12 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. - #include "table/merger.h" +#include #include +#include "db/dbformat.h" #include "db/pinned_iterators_manager.h" #include "rocksdb/comparator.h" #include "rocksdb/iterator.h" @@ -20,6 +21,7 @@ #include "table/iterator_wrapper.h" #include "util/arena.h" #include "util/autovector.h" +#include "util/coding.h" #include "util/heap.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" @@ -37,12 +39,14 @@ const size_t kNumIterReserve = 4; class MergingIterator : public InternalIterator { public: MergingIterator(const Comparator* comparator, InternalIterator** children, - int n, bool is_arena_mode) + int n, bool is_arena_mode, + const SliceTransform* const prefix_extractor) : is_arena_mode_(is_arena_mode), comparator_(comparator), current_(nullptr), direction_(kForward), minHeap_(comparator_), + prefix_extractor_(prefix_extractor), pinned_iters_mgr_(nullptr) { children_.resize(n); for (int i = 0; i < n; i++) { @@ -109,8 +113,8 @@ class MergingIterator : public InternalIterator { PERF_TIMER_GUARD(seek_child_seek_time); child.Seek(target); } - PERF_COUNTER_ADD(seek_child_seek_count, 1); + PERF_COUNTER_ADD(seek_child_seek_count, 1); if (child.Valid()) { PERF_TIMER_GUARD(seek_min_heap_time); minHeap_.push(&child); @@ -125,7 +129,6 @@ class MergingIterator : public InternalIterator { virtual void Next() override { assert(Valid()); - // Ensure that all children are positioned after key(). // If we are moving in the forward direction, it is already // true for all of the non-current children since current_ is @@ -136,13 +139,30 @@ class MergingIterator : public InternalIterator { ClearHeaps(); for (auto& child : children_) { if (&child != current_) { - child.Seek(key()); + if (prefix_extractor_ == nullptr) { + child.Seek(key()); + } else { + // only for prefix_seek_mode + // we should not call Seek() here + if (child.Valid()) { + child.Next(); + } else { + child.SeekToFirst(); + } + } if (child.Valid() && comparator_->Equal(key(), child.key())) { child.Next(); } } if (child.Valid()) { - minHeap_.push(&child); + bool skip_iter = + prefix_extractor_ != nullptr && + prefix_extractor_->InDomain(ExtractUserKey(child.key())) && + prefix_extractor_->Transform(ExtractUserKey(child.key())) != + Slice(*prefix_); + if (&child == current_ || !skip_iter) { + minHeap_.push(&child); + } } } direction_ = kForward; @@ -182,7 +202,12 @@ class MergingIterator : public InternalIterator { InitMaxHeap(); for (auto& child : children_) { if (&child != current_) { - child.Seek(key()); + if (prefix_extractor_ == nullptr) { + child.Seek(key()); + } else { + // only for prefix_seek_mode + // we should not call Seek() here + } if (child.Valid()) { // Child is at first entry >= key(). Step back one to be < key() TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); @@ -193,8 +218,16 @@ class MergingIterator : public InternalIterator { child.SeekToLast(); } } + if (child.Valid()) { - maxHeap_->push(&child); + bool skip_iter = + prefix_extractor_ != nullptr && + prefix_extractor_->InDomain(ExtractUserKey(child.key())) && + prefix_extractor_->Transform(ExtractUserKey(child.key())) != + Slice(*prefix_); + if (&child == current_ || !skip_iter) { + maxHeap_->push(&child); + } } } direction_ = kReverse; @@ -263,6 +296,17 @@ class MergingIterator : public InternalIterator { current_->IsValuePinned(); } + virtual void ResetPrefix(const Slice* prefix) override { + if (prefix == nullptr) { + prefix_.reset(); + return; + } + if (!prefix_) { + prefix_.reset(new std::string); + } + *prefix_ = prefix->ToString(); + } + private: // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(); @@ -288,7 +332,9 @@ class MergingIterator : public InternalIterator { // Max heap is used for reverse iteration, which is way less common than // forward. Lazily initialize it to save memory. std::unique_ptr maxHeap_; + const SliceTransform* const prefix_extractor_; PinnedIteratorsManager* pinned_iters_mgr_; + std::unique_ptr prefix_; IteratorWrapper* CurrentForward() const { assert(direction_ == kForward); @@ -315,9 +361,9 @@ void MergingIterator::InitMaxHeap() { } } -InternalIterator* NewMergingIterator(const Comparator* cmp, - InternalIterator** list, int n, - Arena* arena) { +InternalIterator* NewMergingIterator( + const Comparator* cmp, InternalIterator** list, int n, Arena* arena, + const SliceTransform* const prefix_extractor) { assert(n >= 0); if (n == 0) { return NewEmptyInternalIterator(arena); @@ -325,20 +371,21 @@ InternalIterator* NewMergingIterator(const Comparator* cmp, return list[0]; } else { if (arena == nullptr) { - return new MergingIterator(cmp, list, n, false); + return new MergingIterator(cmp, list, n, false, prefix_extractor); } else { auto mem = arena->AllocateAligned(sizeof(MergingIterator)); - return new (mem) MergingIterator(cmp, list, n, true); + return new (mem) MergingIterator(cmp, list, n, true, prefix_extractor); } } } -MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator, - Arena* a) +MergeIteratorBuilder::MergeIteratorBuilder( + const Comparator* comparator, Arena* a, + const SliceTransform* const prefix_extractor) : first_iter(nullptr), use_merging_iter(false), arena(a) { - auto mem = arena->AllocateAligned(sizeof(MergingIterator)); - merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true); + merge_iter = + new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_extractor); } void MergeIteratorBuilder::AddIterator(InternalIterator* iter) { diff --git a/table/merger.h b/table/merger.h index 7291a0378..65e594434 100644 --- a/table/merger.h +++ b/table/merger.h @@ -9,6 +9,7 @@ #pragma once +#include "rocksdb/slice_transform.h" #include "rocksdb/types.h" namespace rocksdb { @@ -26,9 +27,10 @@ class Arena; // key is present in K child iterators, it will be yielded K times. // // REQUIRES: n >= 0 -extern InternalIterator* NewMergingIterator(const Comparator* comparator, - InternalIterator** children, int n, - Arena* arena = nullptr); +extern InternalIterator* NewMergingIterator( + const Comparator* comparator, InternalIterator** children, int n, + Arena* arena = nullptr, + const SliceTransform* const prefix_extractor = nullptr); class MergingIterator; @@ -37,7 +39,9 @@ class MergeIteratorBuilder { public: // comparator: the comparator used in merging comparator // arena: where the merging iterator needs to be allocated from. - explicit MergeIteratorBuilder(const Comparator* comparator, Arena* arena); + explicit MergeIteratorBuilder( + const Comparator* comparator, Arena* arena, + const SliceTransform* const prefix_extractor = nullptr); ~MergeIteratorBuilder() {} // Add iter to the merging iterator.