From 447f17127cfb996ec4d8e5348bd2b90f42538c8a Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Tue, 11 Oct 2016 13:54:26 -0700 Subject: [PATCH] new Prev() prefix support using SeekForPrev() Summary: 1) The previous solution for Prev() prefix support is not clean. Since I add api SeekForPrev(), now the Prev() can be symmetric to Next(). and we do not need SeekToLast() to be called in Prev() any more. Also, Next() will Seek(prefix_seek_key_) to solve the problem of possible inconsistency between db_iter and merge_iter when there is merge_operator. And prefix_seek_key is only refreshed when change direction to forward. 2) This diff also solves the bug of Iterator::SeekToLast() with iterate_upper_bound_ with prefix extractor. add test cases for the above two cases. There are some tests for the SeekToLast() in Prev(), I will clean them later. Test Plan: make all check Reviewers: IslamAbdelRahman, andrewkr, yiwu, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D63933 --- db/db_impl.cc | 3 +- db/db_iter.cc | 97 +++++++++-------- db/db_iter.h | 6 +- db/db_iter_test.cc | 4 +- db/prefix_test.cc | 223 ++++++++++++++++++++++++++++++++++++++ table/internal_iterator.h | 6 + table/merger.cc | 33 +++--- 7 files changed, 310 insertions(+), 62 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index cec473b24..a8cfc4469 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4345,7 +4345,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, env_, *cfd->ioptions(), cfd->user_comparator(), snapshot, sv->mutable_cf_options.max_sequential_skip_in_iterations, sv->version_number, read_options.iterate_upper_bound, - read_options.prefix_same_as_start, read_options.pin_data); + read_options.prefix_same_as_start, read_options.pin_data, + read_options.total_order_seek); InternalIterator* internal_iter = NewInternalIterator(read_options, cfd, sv, db_iter->GetArena()); diff --git a/db/db_iter.cc b/db/db_iter.cc index 924fb9ac5..afc4596a5 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -105,7 +105,8 @@ class DBIter: public Iterator { InternalIterator* iter, SequenceNumber s, bool arena_mode, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, const Slice* iterate_upper_bound = nullptr, - bool prefix_same_as_start = false, bool pin_data = false) + bool prefix_same_as_start = false, bool pin_data = false, + bool total_order_seek = false) : arena_mode_(arena_mode), env_(env), logger_(ioptions.info_log), @@ -120,7 +121,8 @@ class DBIter: public Iterator { version_number_(version_number), iterate_upper_bound_(iterate_upper_bound), prefix_same_as_start_(prefix_same_as_start), - pin_thru_lifetime_(pin_data) { + pin_thru_lifetime_(pin_data), + total_order_seek_(total_order_seek) { RecordTick(statistics_, NO_ITERATORS); prefix_extractor_ = ioptions.prefix_extractor; max_skip_ = max_sequential_skip_in_iterations; @@ -204,6 +206,7 @@ class DBIter: public Iterator { virtual void SeekToLast() override; private: + void ReverseToForward(); void ReverseToBackward(); void PrevInternal(); void FindParseableKey(ParsedInternalKey* ikey, Direction direction); @@ -256,6 +259,7 @@ class DBIter: public Iterator { Direction direction_; bool valid_; bool current_entry_is_merged_; + // for prefix seek mode to support prev() Statistics* statistics_; uint64_t max_skip_; uint64_t version_number_; @@ -266,6 +270,7 @@ class DBIter: public Iterator { // Means that we will pin all data blocks we read as long the Iterator // is not deleted, will be true if ReadOptions::pin_data is true const bool pin_thru_lifetime_; + const bool total_order_seek_; // List of operands for merge operator. MergeContext merge_context_; LocalStatistics local_stats_; @@ -294,11 +299,7 @@ void DBIter::Next() { // Release temporarily pinned blocks from last operation ReleaseTempPinnedData(); if (direction_ == kReverse) { - FindNextUserKey(); - direction_ = kForward; - if (!iter_->Valid()) { - iter_->SeekToFirst(); - } + ReverseToForward(); } else if (iter_->Valid() && !current_entry_is_merged_) { // If the current value is not a merge, the iter position is the // current key, which is already returned. We can safely issue a @@ -506,7 +507,29 @@ void DBIter::Prev() { } } +void DBIter::ReverseToForward() { + if (prefix_extractor_ != nullptr && !total_order_seek_) { + IterKey last_key; + last_key.SetInternalKey(ParsedInternalKey( + saved_key_.GetKey(), kMaxSequenceNumber, kValueTypeForSeek)); + Slice db_iter_key = last_key.GetKey(); + iter_->ResetPrefixSeekKey(&db_iter_key); + } + FindNextUserKey(); + direction_ = kForward; + if (!iter_->Valid()) { + iter_->SeekToFirst(); + } +} + void DBIter::ReverseToBackward() { + if (prefix_extractor_ != nullptr && !total_order_seek_) { + IterKey last_key; + last_key.SetInternalKey( + ParsedInternalKey(saved_key_.GetKey(), 0, kValueTypeForSeekForPrev)); + Slice db_iter_key = last_key.GetKey(); + iter_->ResetPrefixSeekKey(&db_iter_key); + } if (current_entry_is_merged_) { // Not placed in the same key. Need to call Prev() until finding the // previous key. @@ -794,7 +817,6 @@ void DBIter::Seek(const Slice& target) { PERF_TIMER_GUARD(seek_internal_seek_time); iter_->Seek(saved_key_.GetKey()); } - RecordTick(statistics_, NUMBER_DB_SEEK); if (iter_->Valid()) { if (prefix_extractor_ && prefix_same_as_start_) { @@ -815,6 +837,7 @@ void DBIter::Seek(const Slice& target) { } else { valid_ = false; } + if (valid_ && prefix_extractor_ && prefix_same_as_start_) { prefix_start_buf_.SetKey(prefix_start_key_); prefix_start_key_ = prefix_start_buf_.GetKey(); @@ -911,25 +934,15 @@ void DBIter::SeekToLast() { // it will seek to the last key before the // ReadOptions.iterate_upper_bound if (iter_->Valid() && iterate_upper_bound_ != nullptr) { - saved_key_.SetKey(*iterate_upper_bound_, false /* copy */); - std::string last_key; - AppendInternalKey(&last_key, - ParsedInternalKey(saved_key_.GetKey(), kMaxSequenceNumber, - kValueTypeForSeek)); - - iter_->Seek(last_key); - - if (!iter_->Valid()) { - iter_->SeekToLast(); - } else { - iter_->Prev(); - if (!iter_->Valid()) { - valid_ = false; - return; - } + SeekForPrev(*iterate_upper_bound_); + if (!Valid()) { + return; + } else if (user_comparator_->Equal(*iterate_upper_bound_, key())) { + Prev(); } + } else { + PrevInternal(); } - PrevInternal(); if (statistics_ != nullptr) { RecordTick(statistics_, NUMBER_DB_SEEK); if (valid_) { @@ -943,18 +956,16 @@ void DBIter::SeekToLast() { } } -Iterator* NewDBIterator(Env* env, const ImmutableCFOptions& ioptions, - const Comparator* user_key_comparator, - InternalIterator* internal_iter, - const SequenceNumber& sequence, - uint64_t max_sequential_skip_in_iterations, - uint64_t version_number, - const Slice* iterate_upper_bound, - bool prefix_same_as_start, bool pin_data) { - DBIter* db_iter = - new DBIter(env, ioptions, user_key_comparator, internal_iter, sequence, - false, max_sequential_skip_in_iterations, version_number, - iterate_upper_bound, prefix_same_as_start, pin_data); +Iterator* NewDBIterator( + Env* env, const ImmutableCFOptions& ioptions, + const Comparator* user_key_comparator, InternalIterator* internal_iter, + const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, + uint64_t version_number, const Slice* iterate_upper_bound, + bool prefix_same_as_start, bool pin_data, bool total_order_seek) { + DBIter* db_iter = new DBIter( + env, ioptions, user_key_comparator, internal_iter, sequence, false, + max_sequential_skip_in_iterations, version_number, iterate_upper_bound, + prefix_same_as_start, pin_data, total_order_seek); return db_iter; } @@ -993,15 +1004,15 @@ ArenaWrappedDBIter* NewArenaWrappedDbIterator( Env* env, const ImmutableCFOptions& ioptions, const Comparator* user_key_comparator, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, - const Slice* iterate_upper_bound, bool prefix_same_as_start, - bool pin_data) { + const Slice* iterate_upper_bound, bool prefix_same_as_start, bool pin_data, + bool total_order_seek) { ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); Arena* arena = iter->GetArena(); auto mem = arena->AllocateAligned(sizeof(DBIter)); - DBIter* db_iter = - new (mem) DBIter(env, ioptions, user_key_comparator, nullptr, sequence, - true, max_sequential_skip_in_iterations, version_number, - iterate_upper_bound, prefix_same_as_start, pin_data); + DBIter* db_iter = new (mem) DBIter( + env, ioptions, user_key_comparator, nullptr, sequence, true, + max_sequential_skip_in_iterations, version_number, iterate_upper_bound, + prefix_same_as_start, pin_data, total_order_seek); iter->SetDBIter(db_iter); diff --git a/db/db_iter.h b/db/db_iter.h index 1431bfac9..989188232 100644 --- a/db/db_iter.h +++ b/db/db_iter.h @@ -31,7 +31,8 @@ extern Iterator* NewDBIterator( const Comparator* user_key_comparator, InternalIterator* internal_iter, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, const Slice* iterate_upper_bound = nullptr, - bool prefix_same_as_start = false, bool pin_data = false); + bool prefix_same_as_start = false, bool pin_data = false, + bool total_order_seek = false); // A wrapper iterator which wraps DB Iterator and the arena, with which the DB // iterator is supposed be allocated. This class is used as an entry point of @@ -78,6 +79,7 @@ extern ArenaWrappedDBIter* NewArenaWrappedDbIterator( const Comparator* user_key_comparator, const SequenceNumber& sequence, uint64_t max_sequential_skip_in_iterations, uint64_t version_number, const Slice* iterate_upper_bound = nullptr, - bool prefix_same_as_start = false, bool pin_data = false); + bool prefix_same_as_start = false, bool pin_data = false, + bool total_order_seek = false); } // namespace rocksdb diff --git a/db/db_iter_test.cc b/db/db_iter_test.cc index 5305d3617..c500ab5c5 100644 --- a/db/db_iter_test.cc +++ b/db/db_iter_test.cc @@ -357,7 +357,7 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); - ASSERT_EQ(static_cast(perf_context.internal_key_skipped_count), 1); + ASSERT_EQ(static_cast(perf_context.internal_key_skipped_count), 7); ASSERT_EQ(db_iter->key().ToString(), "b"); SetPerfLevel(kDisable); @@ -480,7 +480,7 @@ TEST_F(DBIteratorTest, DBIteratorPrevNext) { db_iter->SeekToLast(); ASSERT_TRUE(db_iter->Valid()); - ASSERT_EQ(static_cast(perf_context.internal_delete_skipped_count), 0); + ASSERT_EQ(static_cast(perf_context.internal_delete_skipped_count), 1); ASSERT_EQ(db_iter->key().ToString(), "b"); SetPerfLevel(kDisable); diff --git a/db/prefix_test.cc b/db/prefix_test.cc index 067977cd1..455b67266 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -18,6 +18,7 @@ int main() { #include #include +#include "db/db_impl.h" #include "rocksdb/comparator.h" #include "rocksdb/db.h" #include "rocksdb/filter_policy.h" @@ -26,9 +27,11 @@ int main() { #include "rocksdb/slice_transform.h" #include "rocksdb/table.h" #include "util/histogram.h" +#include "util/random.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/testharness.h" +#include "utilities/merge_operators.h" using GFLAGS::ParseCommandLineFlags; @@ -46,6 +49,7 @@ DEFINE_int32(skiplist_height, 4, ""); DEFINE_double(memtable_prefix_bloom_size_ratio, 0.1, ""); DEFINE_int32(memtable_huge_page_size, 2 * 1024 * 1024, ""); DEFINE_int32(value_size, 40, ""); +DEFINE_bool(enable_print, false, "Print options generated to console."); // Path to the database on file system const std::string kDbName = rocksdb::test::TmpDir() + "/prefix_test"; @@ -106,6 +110,10 @@ class TestKeyComparator : public Comparator { return 0; } + bool operator()(const TestKey& a, const TestKey& b) const { + return Compare(TestKeyToSlice(a), TestKeyToSlice(b)) < 0; + } + virtual const char* Name() const override { return "TestKeyComparator"; } @@ -124,6 +132,23 @@ void PutKey(DB* db, WriteOptions write_options, uint64_t prefix, ASSERT_OK(db->Put(write_options, key, value)); } +void PutKey(DB* db, WriteOptions write_options, const TestKey& test_key, + const Slice& value) { + Slice key = TestKeyToSlice(test_key); + ASSERT_OK(db->Put(write_options, key, value)); +} + +void MergeKey(DB* db, WriteOptions write_options, const TestKey& test_key, + const Slice& value) { + Slice key = TestKeyToSlice(test_key); + ASSERT_OK(db->Merge(write_options, key, value)); +} + +void DeleteKey(DB* db, WriteOptions write_options, const TestKey& test_key) { + Slice key = TestKeyToSlice(test_key); + ASSERT_OK(db->Delete(write_options, key)); +} + void SeekIterator(Iterator* iter, uint64_t prefix, uint64_t suffix) { TestKey test_key(prefix, suffix); Slice key = TestKeyToSlice(test_key); @@ -629,8 +654,206 @@ TEST_F(PrefixTest, DynamicPrefixIterator) { } } +TEST_F(PrefixTest, PrefixSeekModePrev) { + // Only for SkipListFactory + options.memtable_factory.reset(new SkipListFactory); + options.merge_operator = MergeOperators::CreatePutOperator(); + options.write_buffer_size = 1024 * 1024; + Random rnd(1); + for (size_t m = 1; m < 100; m++) { + std::cout << "[" + std::to_string(m) + "]" + "*** Mem table: " + << options.memtable_factory->Name() << std::endl; + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + std::map entry_maps[3], whole_map; + for (uint64_t i = 0; i < 10; i++) { + int div = i % 3 + 1; + for (uint64_t j = 0; j < 10; j++) { + whole_map[TestKey(i, j)] = entry_maps[rnd.Uniform(div)][TestKey(i, j)] = + 'v' + std::to_string(i) + std::to_string(j); + } + } + + std::map type_map; + for (size_t i = 0; i < 3; i++) { + for (auto& kv : entry_maps[i]) { + if (rnd.OneIn(3)) { + PutKey(db.get(), write_options, kv.first, kv.second); + type_map[kv.first] = "value"; + } else { + MergeKey(db.get(), write_options, kv.first, kv.second); + type_map[kv.first] = "merge"; + } + } + if (i < 2) { + db->Flush(FlushOptions()); + } + } + + for (size_t i = 0; i < 2; i++) { + for (auto& kv : entry_maps[i]) { + if (rnd.OneIn(10)) { + whole_map.erase(kv.first); + DeleteKey(db.get(), write_options, kv.first); + entry_maps[2][kv.first] = "delete"; + } + } + } + + if (FLAGS_enable_print) { + for (size_t i = 0; i < 3; i++) { + for (auto& kv : entry_maps[i]) { + std::cout << "[" << i << "]" << kv.first.prefix << kv.first.sorted + << " " << kv.second + " " + type_map[kv.first] << std::endl; + } + } + } + + std::unique_ptr iter(db->NewIterator(read_options)); + for (uint64_t prefix = 0; prefix < 10; prefix++) { + uint64_t start_suffix = rnd.Uniform(9); + SeekIterator(iter.get(), prefix, start_suffix); + auto it = whole_map.find(TestKey(prefix, start_suffix)); + if (it == whole_map.end()) { + continue; + } + ASSERT_NE(it, whole_map.end()); + ASSERT_TRUE(iter->Valid()); + if (FLAGS_enable_print) { + std::cout << "round " << prefix + << " iter: " << SliceToTestKey(iter->key())->prefix + << SliceToTestKey(iter->key())->sorted + << " | map: " << it->first.prefix << it->first.sorted << " | " + << iter->value().ToString() << " " << it->second << std::endl; + } + ASSERT_EQ(iter->value(), it->second); + uint64_t stored_prefix = prefix; + for (size_t k = 0; k < 9; k++) { + if (rnd.OneIn(2) || it == whole_map.begin()) { + iter->Next(); + it++; + if (FLAGS_enable_print) { + std::cout << "Next >> "; + } + } else { + iter->Prev(); + it--; + if (FLAGS_enable_print) { + std::cout << "Prev >> "; + } + } + if (!iter->Valid() || + SliceToTestKey(iter->key())->prefix != stored_prefix) { + break; + } + stored_prefix = SliceToTestKey(iter->key())->prefix; + ASSERT_TRUE(iter->Valid()); + ASSERT_NE(it, whole_map.end()); + ASSERT_EQ(iter->value(), it->second); + if (FLAGS_enable_print) { + std::cout << "iter: " << SliceToTestKey(iter->key())->prefix + << SliceToTestKey(iter->key())->sorted + << " | map: " << it->first.prefix << it->first.sorted + << " | " << iter->value().ToString() << " " << it->second + << std::endl; + } + } + } + } } +TEST_F(PrefixTest, PrefixSeekModePrev2) { + // Only for SkipListFactory + // test the case + // iter1 iter2 + // | prefix | suffix | | prefix | suffix | + // | 1 | 1 | | 1 | 2 | + // | 1 | 3 | | 1 | 4 | + // | 2 | 1 | | 3 | 3 | + // | 2 | 2 | | 3 | 4 | + // after seek(15), iter1 will be at 21 and iter2 will be 33. + // Then if call Prev() in prefix mode where SeekForPrev(21) gets called, + // iter2 should turn to invalid state because of bloom filter. + options.memtable_factory.reset(new SkipListFactory); + options.write_buffer_size = 1024 * 1024; + std::string v13("v13"); + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + PutKey(db.get(), write_options, TestKey(1, 2), "v12"); + PutKey(db.get(), write_options, TestKey(1, 4), "v14"); + PutKey(db.get(), write_options, TestKey(3, 3), "v33"); + PutKey(db.get(), write_options, TestKey(3, 4), "v34"); + db->Flush(FlushOptions()); + reinterpret_cast(db.get())->TEST_WaitForFlushMemTable(); + PutKey(db.get(), write_options, TestKey(1, 1), "v11"); + PutKey(db.get(), write_options, TestKey(1, 3), "v13"); + PutKey(db.get(), write_options, TestKey(2, 1), "v21"); + PutKey(db.get(), write_options, TestKey(2, 2), "v22"); + db->Flush(FlushOptions()); + reinterpret_cast(db.get())->TEST_WaitForFlushMemTable(); + std::unique_ptr iter(db->NewIterator(read_options)); + SeekIterator(iter.get(), 1, 5); + iter->Prev(); + ASSERT_EQ(iter->value(), v13); +} + +TEST_F(PrefixTest, PrefixSeekModePrev3) { + // Only for SkipListFactory + // test SeekToLast() with iterate_upper_bound_ in prefix_seek_mode + options.memtable_factory.reset(new SkipListFactory); + options.write_buffer_size = 1024 * 1024; + std::string v14("v14"); + TestKey upper_bound_key = TestKey(1, 5); + Slice upper_bound = TestKeyToSlice(upper_bound_key); + + { + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + read_options.iterate_upper_bound = &upper_bound; + PutKey(db.get(), write_options, TestKey(1, 2), "v12"); + PutKey(db.get(), write_options, TestKey(1, 4), "v14"); + db->Flush(FlushOptions()); + reinterpret_cast(db.get())->TEST_WaitForFlushMemTable(); + PutKey(db.get(), write_options, TestKey(1, 1), "v11"); + PutKey(db.get(), write_options, TestKey(1, 3), "v13"); + PutKey(db.get(), write_options, TestKey(2, 1), "v21"); + PutKey(db.get(), write_options, TestKey(2, 2), "v22"); + db->Flush(FlushOptions()); + reinterpret_cast(db.get())->TEST_WaitForFlushMemTable(); + std::unique_ptr iter(db->NewIterator(read_options)); + iter->SeekToLast(); + ASSERT_EQ(iter->value(), v14); + } + { + DestroyDB(kDbName, Options()); + auto db = OpenDb(); + WriteOptions write_options; + ReadOptions read_options; + read_options.iterate_upper_bound = &upper_bound; + PutKey(db.get(), write_options, TestKey(1, 2), "v12"); + PutKey(db.get(), write_options, TestKey(1, 4), "v14"); + PutKey(db.get(), write_options, TestKey(3, 3), "v33"); + PutKey(db.get(), write_options, TestKey(3, 4), "v34"); + db->Flush(FlushOptions()); + reinterpret_cast(db.get())->TEST_WaitForFlushMemTable(); + PutKey(db.get(), write_options, TestKey(1, 1), "v11"); + PutKey(db.get(), write_options, TestKey(1, 3), "v13"); + db->Flush(FlushOptions()); + reinterpret_cast(db.get())->TEST_WaitForFlushMemTable(); + std::unique_ptr iter(db->NewIterator(read_options)); + iter->SeekToLast(); + ASSERT_EQ(iter->value(), v14); + } +} + +} // end namespace rocksdb + int main(int argc, char** argv) { ::testing::InitGoogleTest(&argc, argv); ParseCommandLineFlags(&argc, &argv, true); diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 62248007c..53a4a05ef 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -94,6 +94,12 @@ class InternalIterator : public Cleanable { virtual Status GetProperty(std::string prop_name, std::string* prop) { return Status::NotSupported(""); } + // Reset the key used for Seek() in merge iterator, especially for prefix + // seek mode + // When in prefix_seek_mode, there is inconsistency between db_iterator and + // merge iterator. This inconsistency can cause problem when do Seek() in + // merge iterator in prefix mode. + virtual void ResetPrefixSeekKey(const Slice* prefix_seek_key = nullptr) {} protected: void SeekForPrevImpl(const Slice& target, const Comparator* cmp) { diff --git a/table/merger.cc b/table/merger.cc index 2b199fb5a..84fae2624 100644 --- a/table/merger.cc +++ b/table/merger.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "table/merger.h" +#include #include #include "db/pinned_iterators_manager.h" #include "rocksdb/comparator.h" @@ -157,7 +158,7 @@ class MergingIterator : public InternalIterator { ClearHeaps(); for (auto& child : children_) { if (&child != current_) { - child.Seek(key()); + child.Seek(prefix_seek_key_ ? *prefix_seek_key_ : key()); if (child.Valid() && comparator_->Equal(key(), child.key())) { child.Next(); } @@ -203,15 +204,9 @@ class MergingIterator : public InternalIterator { InitMaxHeap(); for (auto& child : children_) { if (&child != current_) { - child.Seek(key()); - if (child.Valid()) { - // Child is at first entry >= key(). Step back one to be < key() - TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); + child.SeekForPrev(prefix_seek_key_ ? *prefix_seek_key_ : key()); + if (child.Valid() && comparator_->Equal(key(), child.key())) { child.Prev(); - } else { - // Child has no entries >= key(). Position at last entry. - TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast"); - child.SeekToLast(); } } if (child.Valid()) { @@ -219,11 +214,9 @@ class MergingIterator : public InternalIterator { } } direction_ = kReverse; - // Note that we don't do assert(current_ == CurrentReverse()) here - // because it is possible to have some keys larger than the seek-key - // inserted between Seek() and SeekToLast(), which makes current_ not - // equal to CurrentReverse(). - current_ = CurrentReverse(); + // The loop advanced all non-current children to be < key() so current_ + // should still be strictly the smallest key. + assert(current_ == CurrentReverse()); } // For the heap modifications below to be correct, current_ must be the @@ -284,6 +277,17 @@ class MergingIterator : public InternalIterator { current_->IsValuePinned(); } + virtual void ResetPrefixSeekKey(const Slice* db_iter_key) override { + if (db_iter_key == nullptr) { + prefix_seek_key_.reset(); + return; + } + if (!prefix_seek_key_) { + prefix_seek_key_.reset(new std::string); + } + *prefix_seek_key_ = db_iter_key->ToString(); + } + private: // Clears heaps for both directions, used when changing direction or seeking void ClearHeaps(); @@ -310,6 +314,7 @@ class MergingIterator : public InternalIterator { // forward. Lazily initialize it to save memory. std::unique_ptr maxHeap_; PinnedIteratorsManager* pinned_iters_mgr_; + std::unique_ptr prefix_seek_key_; IteratorWrapper* CurrentForward() const { assert(direction_ == kForward);