Temporarily revert Prev() prefix support

Summary:
Temporarily revert commits for supporting prefix Prev() to unblock MyRocks and RocksDB release

These are the commits reverted

  - 6a14d55bd9
  - b18f9c9eac
  - db74b1a219
  - 2482d5fb45

Test Plan: make check -j64

Reviewers: sdong, lightmark

Reviewed By: lightmark

Subscribers: andrewkr, dhruba, yoshinorim

Differential Revision: https://reviews.facebook.net/D63789
main
Islam AbdelRahman 8 years ago
parent de28a25533
commit 1cca091298
  1. 3
      db/db_impl.cc
  2. 1
      db/db_impl.h
  3. 34
      db/db_iter.cc
  4. 34
      db/db_iter_test.cc
  5. 131
      db/prefix_test.cc
  6. 3
      table/block_based_table_reader.cc
  7. 2
      table/internal_iterator.h
  8. 86
      table/merger.cc
  9. 12
      table/merger.h

@ -3719,8 +3719,7 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options,
InternalIterator* internal_iter; InternalIterator* internal_iter;
assert(arena != nullptr); assert(arena != nullptr);
// Need to create internal iterator from the arena. // Need to create internal iterator from the arena.
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena, MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
cfd->ioptions()->prefix_extractor);
// Collect iterator for mutable mem // Collect iterator for mutable mem
merge_iter_builder.AddIterator( merge_iter_builder.AddIterator(
super_version->mem->NewIterator(read_options, arena)); super_version->mem->NewIterator(read_options, arena));

@ -21,7 +21,6 @@
#include "db/column_family.h" #include "db/column_family.h"
#include "db/compaction_job.h" #include "db/compaction_job.h"
#include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/flush_job.h" #include "db/flush_job.h"
#include "db/flush_scheduler.h" #include "db/flush_scheduler.h"

@ -116,7 +116,6 @@ class DBIter: public Iterator {
direction_(kForward), direction_(kForward),
valid_(false), valid_(false),
current_entry_is_merged_(false), current_entry_is_merged_(false),
prefix_is_saved_(false),
statistics_(ioptions.statistics), statistics_(ioptions.statistics),
version_number_(version_number), version_number_(version_number),
iterate_upper_bound_(iterate_upper_bound), iterate_upper_bound_(iterate_upper_bound),
@ -204,7 +203,6 @@ class DBIter: public Iterator {
virtual void SeekToLast() override; virtual void SeekToLast() override;
private: private:
void ReverseToForward();
void ReverseToBackward(); void ReverseToBackward();
void PrevInternal(); void PrevInternal();
void FindParseableKey(ParsedInternalKey* ikey, Direction direction); void FindParseableKey(ParsedInternalKey* ikey, Direction direction);
@ -257,8 +255,6 @@ class DBIter: public Iterator {
Direction direction_; Direction direction_;
bool valid_; bool valid_;
bool current_entry_is_merged_; bool current_entry_is_merged_;
// for prefix seek mode to support prev()
bool prefix_is_saved_;
Statistics* statistics_; Statistics* statistics_;
uint64_t max_skip_; uint64_t max_skip_;
uint64_t version_number_; uint64_t version_number_;
@ -297,7 +293,11 @@ void DBIter::Next() {
// Release temporarily pinned blocks from last operation // Release temporarily pinned blocks from last operation
ReleaseTempPinnedData(); ReleaseTempPinnedData();
if (direction_ == kReverse) { if (direction_ == kReverse) {
ReverseToForward(); FindNextUserKey();
direction_ = kForward;
if (!iter_->Valid()) {
iter_->SeekToFirst();
}
} else if (iter_->Valid() && !current_entry_is_merged_) { } else if (iter_->Valid() && !current_entry_is_merged_) {
// If the current value is not a merge, the iter position is the // If the current value is not a merge, the iter position is the
// current key, which is already returned. We can safely issue a // current key, which is already returned. We can safely issue a
@ -510,20 +510,7 @@ void DBIter::Prev() {
} }
} }
void DBIter::ReverseToForward() {
FindNextUserKey();
direction_ = kForward;
if (!iter_->Valid()) {
iter_->SeekToFirst();
}
}
void DBIter::ReverseToBackward() { void DBIter::ReverseToBackward() {
if (prefix_extractor_ != nullptr) {
Slice prefix = prefix_extractor_->Transform(key());
iter_->ResetPrefix(&prefix);
prefix_is_saved_ = true;
}
if (current_entry_is_merged_) { if (current_entry_is_merged_) {
// Not placed in the same key. Need to call Prev() until finding the // Not placed in the same key. Need to call Prev() until finding the
// previous key. // previous key.
@ -742,11 +729,6 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
// Don't use Seek(), // Don't use Seek(),
// because next user key will be very close // because next user key will be very close
void DBIter::FindNextUserKey() { void DBIter::FindNextUserKey() {
if (prefix_extractor_ != nullptr) {
Slice prefix = prefix_extractor_->Transform(key());
iter_->ResetPrefix(&prefix);
prefix_is_saved_ = true;
}
if (!iter_->Valid()) { if (!iter_->Valid()) {
return; return;
} }
@ -810,6 +792,7 @@ void DBIter::Seek(const Slice& target) {
PERF_TIMER_GUARD(seek_internal_seek_time); PERF_TIMER_GUARD(seek_internal_seek_time);
iter_->Seek(saved_key_.GetKey()); iter_->Seek(saved_key_.GetKey());
} }
RecordTick(statistics_, NUMBER_DB_SEEK); RecordTick(statistics_, NUMBER_DB_SEEK);
if (iter_->Valid()) { if (iter_->Valid()) {
if (prefix_extractor_ && prefix_same_as_start_) { if (prefix_extractor_ && prefix_same_as_start_) {
@ -830,11 +813,6 @@ void DBIter::Seek(const Slice& target) {
} else { } else {
valid_ = false; valid_ = false;
} }
// Need to reset prefix if change direction
if (prefix_is_saved_) {
iter_->ResetPrefix();
prefix_is_saved_ = false;
}
if (valid_ && prefix_extractor_ && prefix_same_as_start_) { if (valid_ && prefix_extractor_ && prefix_same_as_start_) {
prefix_start_buf_.SetKey(prefix_start_key_); prefix_start_buf_.SetKey(prefix_start_key_);
prefix_start_key_ = prefix_start_buf_.GetKey(); prefix_start_key_ = prefix_start_buf_.GetKey();

@ -1842,7 +1842,7 @@ TEST_F(DBIteratorTest, DBIterator12) {
ASSERT_FALSE(db_iter->Valid()); ASSERT_FALSE(db_iter->Valid());
} }
class DBIterWithMergeIterTest : public ::testing::TestWithParam<bool> { class DBIterWithMergeIterTest : public testing::Test {
public: public:
DBIterWithMergeIterTest() DBIterWithMergeIterTest()
: env_(Env::Default()), icomp_(BytewiseComparator()) { : env_(Env::Default()), icomp_(BytewiseComparator()) {
@ -1865,11 +1865,9 @@ class DBIterWithMergeIterTest : public ::testing::TestWithParam<bool> {
child_iters.push_back(internal_iter1_); child_iters.push_back(internal_iter1_);
child_iters.push_back(internal_iter2_); child_iters.push_back(internal_iter2_);
InternalKeyComparator icomp(BytewiseComparator()); InternalKeyComparator icomp(BytewiseComparator());
if (GetParam() == false) { InternalIterator* merge_iter =
options_.prefix_extractor.reset(NewFixedPrefixTransform(0)); NewMergingIterator(&icomp_, &child_iters[0], 2u);
}
InternalIterator* merge_iter = NewMergingIterator(
&icomp_, &child_iters[0], 2u, nullptr, options_.prefix_extractor.get());
db_iter_.reset(NewDBIterator(env_, ImmutableCFOptions(options_), db_iter_.reset(NewDBIterator(env_, ImmutableCFOptions(options_),
BytewiseComparator(), merge_iter, BytewiseComparator(), merge_iter,
8 /* read data earlier than seqId 8 */, 8 /* read data earlier than seqId 8 */,
@ -1885,7 +1883,7 @@ class DBIterWithMergeIterTest : public ::testing::TestWithParam<bool> {
std::unique_ptr<Iterator> db_iter_; std::unique_ptr<Iterator> db_iter_;
}; };
TEST_P(DBIterWithMergeIterTest, InnerMergeIterator1) { TEST_F(DBIterWithMergeIterTest, InnerMergeIterator1) {
db_iter_->SeekToFirst(); db_iter_->SeekToFirst();
ASSERT_TRUE(db_iter_->Valid()); ASSERT_TRUE(db_iter_->Valid());
ASSERT_EQ(db_iter_->key().ToString(), "a"); ASSERT_EQ(db_iter_->key().ToString(), "a");
@ -1914,7 +1912,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIterator1) {
ASSERT_FALSE(db_iter_->Valid()); ASSERT_FALSE(db_iter_->Valid());
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIterator2) { TEST_F(DBIterWithMergeIterTest, InnerMergeIterator2) {
// Test Prev() when one child iterator is at its end. // Test Prev() when one child iterator is at its end.
db_iter_->Seek("g"); db_iter_->Seek("g");
ASSERT_TRUE(db_iter_->Valid()); ASSERT_TRUE(db_iter_->Valid());
@ -1942,7 +1940,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIterator2) {
ASSERT_EQ(db_iter_->value().ToString(), "4"); ASSERT_EQ(db_iter_->value().ToString(), "4");
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) {
// Test Prev() when one child iterator is at its end but more rows // Test Prev() when one child iterator is at its end but more rows
// are added. // are added.
db_iter_->Seek("f"); db_iter_->Seek("f");
@ -1978,7 +1976,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace1) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace2) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace2) {
// Test Prev() when one child iterator is at its end but more rows // Test Prev() when one child iterator is at its end but more rows
// are added. // are added.
db_iter_->Seek("f"); db_iter_->Seek("f");
@ -2016,7 +2014,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace2) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace3) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace3) {
// Test Prev() when one child iterator is at its end but more rows // Test Prev() when one child iterator is at its end but more rows
// are added and max_skipped is triggered. // are added and max_skipped is triggered.
db_iter_->Seek("f"); db_iter_->Seek("f");
@ -2058,7 +2056,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace3) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace4) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace4) {
// Test Prev() when one child iterator has more rows inserted // Test Prev() when one child iterator has more rows inserted
// between Seek() and Prev() when changing directions. // between Seek() and Prev() when changing directions.
internal_iter2_->Add("z", kTypeValue, "9", 4u); internal_iter2_->Add("z", kTypeValue, "9", 4u);
@ -2109,7 +2107,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace4) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace5) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace5) {
internal_iter2_->Add("z", kTypeValue, "9", 4u); internal_iter2_->Add("z", kTypeValue, "9", 4u);
// Test Prev() when one child iterator has more rows inserted // Test Prev() when one child iterator has more rows inserted
@ -2156,7 +2154,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace5) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace6) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace6) {
internal_iter2_->Add("z", kTypeValue, "9", 4u); internal_iter2_->Add("z", kTypeValue, "9", 4u);
// Test Prev() when one child iterator has more rows inserted // Test Prev() when one child iterator has more rows inserted
@ -2202,7 +2200,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace6) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace7) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace7) {
internal_iter1_->Add("u", kTypeValue, "10", 4u); internal_iter1_->Add("u", kTypeValue, "10", 4u);
internal_iter1_->Add("v", kTypeValue, "11", 4u); internal_iter1_->Add("v", kTypeValue, "11", 4u);
internal_iter1_->Add("w", kTypeValue, "12", 4u); internal_iter1_->Add("w", kTypeValue, "12", 4u);
@ -2256,7 +2254,7 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace7) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) { TEST_F(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) {
// internal_iter1_: a, f, g // internal_iter1_: a, f, g
// internal_iter2_: a, b, c, d, adding (z) // internal_iter2_: a, b, c, d, adding (z)
internal_iter2_->Add("z", kTypeValue, "9", 4u); internal_iter2_->Add("z", kTypeValue, "9", 4u);
@ -2292,10 +2290,6 @@ TEST_P(DBIterWithMergeIterTest, InnerMergeIteratorDataRace8) {
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
INSTANTIATE_TEST_CASE_P(InnerMergeIteratorDataRaceInstance,
DBIterWithMergeIterTest, ::testing::Bool());
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -26,11 +26,9 @@ int main() {
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "util/histogram.h" #include "util/histogram.h"
#include "util/random.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "utilities/merge_operators.h"
using GFLAGS::ParseCommandLineFlags; using GFLAGS::ParseCommandLineFlags;
@ -48,7 +46,6 @@ DEFINE_int32(skiplist_height, 4, "");
DEFINE_double(memtable_prefix_bloom_size_ratio, 0.1, ""); DEFINE_double(memtable_prefix_bloom_size_ratio, 0.1, "");
DEFINE_int32(memtable_huge_page_size, 2 * 1024 * 1024, ""); DEFINE_int32(memtable_huge_page_size, 2 * 1024 * 1024, "");
DEFINE_int32(value_size, 40, ""); DEFINE_int32(value_size, 40, "");
DEFINE_bool(enable_print, false, "Print options generated to console.");
// Path to the database on file system // Path to the database on file system
const std::string kDbName = rocksdb::test::TmpDir() + "/prefix_test"; const std::string kDbName = rocksdb::test::TmpDir() + "/prefix_test";
@ -109,10 +106,6 @@ class TestKeyComparator : public Comparator {
return 0; return 0;
} }
bool operator()(const TestKey& a, const TestKey& b) const {
return Compare(TestKeyToSlice(a), TestKeyToSlice(b)) < 0;
}
virtual const char* Name() const override { virtual const char* Name() const override {
return "TestKeyComparator"; return "TestKeyComparator";
} }
@ -131,23 +124,6 @@ void PutKey(DB* db, WriteOptions write_options, uint64_t prefix,
ASSERT_OK(db->Put(write_options, key, value)); ASSERT_OK(db->Put(write_options, key, value));
} }
void PutKey(DB* db, WriteOptions write_options, const TestKey& test_key,
const Slice& value) {
Slice key = TestKeyToSlice(test_key);
ASSERT_OK(db->Put(write_options, key, value));
}
void MergeKey(DB* db, WriteOptions write_options, const TestKey& test_key,
const Slice& value) {
Slice key = TestKeyToSlice(test_key);
ASSERT_OK(db->Merge(write_options, key, value));
}
void DeleteKey(DB* db, WriteOptions write_options, const TestKey& test_key) {
Slice key = TestKeyToSlice(test_key);
ASSERT_OK(db->Delete(write_options, key));
}
void SeekIterator(Iterator* iter, uint64_t prefix, uint64_t suffix) { void SeekIterator(Iterator* iter, uint64_t prefix, uint64_t suffix) {
TestKey test_key(prefix, suffix); TestKey test_key(prefix, suffix);
Slice key = TestKeyToSlice(test_key); Slice key = TestKeyToSlice(test_key);
@ -653,114 +629,7 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
} }
} }
TEST_F(PrefixTest, PrefixSeekModePrev) {
// Only for SkipListFactory
options.memtable_factory.reset(new SkipListFactory);
options.merge_operator = MergeOperators::CreatePutOperator();
options.write_buffer_size = 1024 * 1024;
Random rnd(1);
for (size_t m = 1; m < 100; m++) {
std::cout << "[" + std::to_string(m) + "]" + "*** Mem table: "
<< options.memtable_factory->Name() << std::endl;
DestroyDB(kDbName, Options());
auto db = OpenDb();
WriteOptions write_options;
ReadOptions read_options;
std::map<TestKey, std::string, TestKeyComparator> entry_maps[3], whole_map;
for (uint64_t i = 0; i < 10; i++) {
int div = i % 3 + 1;
for (uint64_t j = 0; j < 10; j++) {
whole_map[TestKey(i, j)] = entry_maps[rnd.Uniform(div)][TestKey(i, j)] =
'v' + std::to_string(i) + std::to_string(j);
}
}
std::map<TestKey, std::string, TestKeyComparator> type_map;
for (size_t i = 0; i < 3; i++) {
for (auto& kv : entry_maps[i]) {
if (rnd.OneIn(3)) {
PutKey(db.get(), write_options, kv.first, kv.second);
type_map[kv.first] = "value";
} else {
MergeKey(db.get(), write_options, kv.first, kv.second);
type_map[kv.first] = "merge";
}
}
if (i < 2) {
db->Flush(FlushOptions());
}
}
for (size_t i = 0; i < 2; i++) {
for (auto& kv : entry_maps[i]) {
if (rnd.OneIn(10)) {
whole_map.erase(kv.first);
DeleteKey(db.get(), write_options, kv.first);
entry_maps[2][kv.first] = "delete";
}
}
}
if (FLAGS_enable_print) {
for (size_t i = 0; i < 3; i++) {
for (auto& kv : entry_maps[i]) {
std::cout << "[" << i << "]" << kv.first.prefix << kv.first.sorted
<< " " << kv.second + " " + type_map[kv.first] << std::endl;
}
}
}
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
for (uint64_t prefix = 0; prefix < 10; prefix++) {
uint64_t start_suffix = rnd.Uniform(9);
SeekIterator(iter.get(), prefix, start_suffix);
auto it = whole_map.find(TestKey(prefix, start_suffix));
if (it == whole_map.end()) {
continue;
}
ASSERT_NE(it, whole_map.end());
ASSERT_TRUE(iter->Valid());
if (FLAGS_enable_print) {
std::cout << "round " << prefix
<< " iter: " << SliceToTestKey(iter->key())->prefix
<< SliceToTestKey(iter->key())->sorted
<< " | map: " << it->first.prefix << it->first.sorted << " | "
<< iter->value().ToString() << " " << it->second << std::endl;
}
ASSERT_EQ(iter->value(), it->second);
for (size_t k = 0; k < 9; k++) {
if (rnd.OneIn(2) || it == whole_map.begin()) {
iter->Next();
it++;
if (FLAGS_enable_print) {
std::cout << "Next >> ";
} }
} else {
iter->Prev();
it--;
if (FLAGS_enable_print) {
std::cout << "Prev >> ";
}
}
if (!iter->Valid() || SliceToTestKey(iter->key())->prefix != prefix) {
break;
}
ASSERT_TRUE(iter->Valid());
ASSERT_NE(it, whole_map.end());
ASSERT_EQ(iter->value(), it->second);
if (FLAGS_enable_print) {
std::cout << "iter: " << SliceToTestKey(iter->key())->prefix
<< SliceToTestKey(iter->key())->sorted
<< " | map: " << it->first.prefix << it->first.sorted
<< " | " << iter->value().ToString() << " " << it->second
<< std::endl;
}
}
}
}
}
} // end namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);

@ -556,10 +556,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
rep->hash_index_allow_collision = table_options.hash_index_allow_collision; rep->hash_index_allow_collision = table_options.hash_index_allow_collision;
// We need to wrap data with internal_prefix_transform to make sure it can // We need to wrap data with internal_prefix_transform to make sure it can
// handle prefix correctly. // handle prefix correctly.
if (rep->ioptions.prefix_extractor != nullptr) {
rep->internal_prefix_transform.reset( rep->internal_prefix_transform.reset(
new InternalKeySliceTransform(rep->ioptions.prefix_extractor)); new InternalKeySliceTransform(rep->ioptions.prefix_extractor));
}
SetupCacheKeyPrefix(rep, file_size); SetupCacheKeyPrefix(rep, file_size);
unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep)); unique_ptr<BlockBasedTable> new_table(new BlockBasedTable(rep));
@ -1692,7 +1690,6 @@ Status BlockBasedTable::CreateIndexReader(
meta_index_iter = meta_iter_guard.get(); meta_index_iter = meta_iter_guard.get();
} }
assert(rep_->internal_prefix_transform);
return HashIndexReader::Create( return HashIndexReader::Create(
rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions, rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions,
comparator, footer.index_handle(), meta_index_iter, index_reader, comparator, footer.index_handle(), meta_index_iter, index_reader,

@ -89,8 +89,6 @@ class InternalIterator : public Cleanable {
return Status::NotSupported(""); return Status::NotSupported("");
} }
virtual void ResetPrefix(const Slice* prefix = nullptr) {}
private: private:
// No copying allowed // No copying allowed
InternalIterator(const InternalIterator&) = delete; InternalIterator(const InternalIterator&) = delete;

@ -6,12 +6,11 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/merger.h" #include "table/merger.h"
#include <string>
#include <vector> #include <vector>
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h" #include "db/pinned_iterators_manager.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
@ -21,7 +20,6 @@
#include "table/iterator_wrapper.h" #include "table/iterator_wrapper.h"
#include "util/arena.h" #include "util/arena.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/coding.h"
#include "util/heap.h" #include "util/heap.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
@ -39,14 +37,12 @@ const size_t kNumIterReserve = 4;
class MergingIterator : public InternalIterator { class MergingIterator : public InternalIterator {
public: public:
MergingIterator(const Comparator* comparator, InternalIterator** children, MergingIterator(const Comparator* comparator, InternalIterator** children,
int n, bool is_arena_mode, int n, bool is_arena_mode)
const SliceTransform* const prefix_extractor)
: is_arena_mode_(is_arena_mode), : is_arena_mode_(is_arena_mode),
comparator_(comparator), comparator_(comparator),
current_(nullptr), current_(nullptr),
direction_(kForward), direction_(kForward),
minHeap_(comparator_), minHeap_(comparator_),
prefix_extractor_(prefix_extractor),
pinned_iters_mgr_(nullptr) { pinned_iters_mgr_(nullptr) {
children_.resize(n); children_.resize(n);
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
@ -113,8 +109,8 @@ class MergingIterator : public InternalIterator {
PERF_TIMER_GUARD(seek_child_seek_time); PERF_TIMER_GUARD(seek_child_seek_time);
child.Seek(target); child.Seek(target);
} }
PERF_COUNTER_ADD(seek_child_seek_count, 1); PERF_COUNTER_ADD(seek_child_seek_count, 1);
if (child.Valid()) { if (child.Valid()) {
PERF_TIMER_GUARD(seek_min_heap_time); PERF_TIMER_GUARD(seek_min_heap_time);
minHeap_.push(&child); minHeap_.push(&child);
@ -129,6 +125,7 @@ class MergingIterator : public InternalIterator {
virtual void Next() override { virtual void Next() override {
assert(Valid()); assert(Valid());
// Ensure that all children are positioned after key(). // Ensure that all children are positioned after key().
// If we are moving in the forward direction, it is already // If we are moving in the forward direction, it is already
// true for all of the non-current children since current_ is // true for all of the non-current children since current_ is
@ -139,42 +136,15 @@ class MergingIterator : public InternalIterator {
ClearHeaps(); ClearHeaps();
for (auto& child : children_) { for (auto& child : children_) {
if (&child != current_) { if (&child != current_) {
if (prefix_extractor_ == nullptr) {
child.Seek(key()); child.Seek(key());
if (child.Valid() && comparator_->Equal(key(), child.key())) { if (child.Valid() && comparator_->Equal(key(), child.key())) {
child.Next(); child.Next();
} }
} else {
// only for prefix_seek_mode
// we should not call Seek() here
if (child.Valid()) {
child.Next();
} else {
child.SeekToFirst();
}
}
// This condition is needed because it is possible that multiple
// threads read/write memtable simultaneously. After one thread
// calls Prev(), another thread may insert a new key just between
// the current key and the key next, which may cause the
// assert(current_ == CurrentForward()) failure when the first
// thread calls Next() again if in prefix seek mode
while (child.Valid() &&
comparator_->Compare(key(), child.key()) >= 0) {
child.Next();
}
} }
if (child.Valid()) { if (child.Valid()) {
bool skip_iter =
prefix_extractor_ != nullptr &&
prefix_extractor_->InDomain(ExtractUserKey(child.key())) &&
prefix_extractor_->Transform(ExtractUserKey(child.key())) !=
Slice(*prefix_);
if (&child == current_ || !skip_iter) {
minHeap_.push(&child); minHeap_.push(&child);
} }
} }
}
direction_ = kForward; direction_ = kForward;
// The loop advanced all non-current children to be > key() so current_ // The loop advanced all non-current children to be > key() so current_
// should still be strictly the smallest key. // should still be strictly the smallest key.
@ -212,12 +182,7 @@ class MergingIterator : public InternalIterator {
InitMaxHeap(); InitMaxHeap();
for (auto& child : children_) { for (auto& child : children_) {
if (&child != current_) { if (&child != current_) {
if (prefix_extractor_ == nullptr) {
child.Seek(key()); child.Seek(key());
} else {
// only for prefix_seek_mode
// we should not call Seek() here
}
if (child.Valid()) { if (child.Valid()) {
// Child is at first entry >= key(). Step back one to be < key() // Child is at first entry >= key(). Step back one to be < key()
TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child); TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", &child);
@ -227,23 +192,11 @@ class MergingIterator : public InternalIterator {
TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast"); TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast");
child.SeekToLast(); child.SeekToLast();
} }
while (child.Valid() &&
comparator_->Compare(key(), child.key()) <= 0) {
child.Prev();
}
} }
if (child.Valid()) { if (child.Valid()) {
bool skip_iter =
prefix_extractor_ != nullptr &&
prefix_extractor_->InDomain(ExtractUserKey(child.key())) &&
prefix_extractor_->Transform(ExtractUserKey(child.key())) !=
Slice(*prefix_);
if (&child == current_ || !skip_iter) {
maxHeap_->push(&child); maxHeap_->push(&child);
} }
} }
}
direction_ = kReverse; direction_ = kReverse;
// Note that we don't do assert(current_ == CurrentReverse()) here // Note that we don't do assert(current_ == CurrentReverse()) here
// because it is possible to have some keys larger than the seek-key // because it is possible to have some keys larger than the seek-key
@ -310,17 +263,6 @@ class MergingIterator : public InternalIterator {
current_->IsValuePinned(); current_->IsValuePinned();
} }
virtual void ResetPrefix(const Slice* prefix) override {
if (prefix == nullptr) {
prefix_.reset();
return;
}
if (!prefix_) {
prefix_.reset(new std::string);
}
*prefix_ = prefix->ToString();
}
private: private:
// Clears heaps for both directions, used when changing direction or seeking // Clears heaps for both directions, used when changing direction or seeking
void ClearHeaps(); void ClearHeaps();
@ -346,9 +288,7 @@ class MergingIterator : public InternalIterator {
// Max heap is used for reverse iteration, which is way less common than // Max heap is used for reverse iteration, which is way less common than
// forward. Lazily initialize it to save memory. // forward. Lazily initialize it to save memory.
std::unique_ptr<MergerMaxIterHeap> maxHeap_; std::unique_ptr<MergerMaxIterHeap> maxHeap_;
const SliceTransform* const prefix_extractor_;
PinnedIteratorsManager* pinned_iters_mgr_; PinnedIteratorsManager* pinned_iters_mgr_;
std::unique_ptr<std::string> prefix_;
IteratorWrapper* CurrentForward() const { IteratorWrapper* CurrentForward() const {
assert(direction_ == kForward); assert(direction_ == kForward);
@ -375,9 +315,9 @@ void MergingIterator::InitMaxHeap() {
} }
} }
InternalIterator* NewMergingIterator( InternalIterator* NewMergingIterator(const Comparator* cmp,
const Comparator* cmp, InternalIterator** list, int n, Arena* arena, InternalIterator** list, int n,
const SliceTransform* const prefix_extractor) { Arena* arena) {
assert(n >= 0); assert(n >= 0);
if (n == 0) { if (n == 0) {
return NewEmptyInternalIterator(arena); return NewEmptyInternalIterator(arena);
@ -385,21 +325,19 @@ InternalIterator* NewMergingIterator(
return list[0]; return list[0];
} else { } else {
if (arena == nullptr) { if (arena == nullptr) {
return new MergingIterator(cmp, list, n, false, prefix_extractor); return new MergingIterator(cmp, list, n, false);
} else { } else {
auto mem = arena->AllocateAligned(sizeof(MergingIterator)); auto mem = arena->AllocateAligned(sizeof(MergingIterator));
return new (mem) MergingIterator(cmp, list, n, true, prefix_extractor); return new (mem) MergingIterator(cmp, list, n, true);
} }
} }
} }
MergeIteratorBuilder::MergeIteratorBuilder( MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator,
const Comparator* comparator, Arena* a, Arena* a)
const SliceTransform* const prefix_extractor)
: first_iter(nullptr), use_merging_iter(false), arena(a) { : first_iter(nullptr), use_merging_iter(false), arena(a) {
auto mem = arena->AllocateAligned(sizeof(MergingIterator)); auto mem = arena->AllocateAligned(sizeof(MergingIterator));
merge_iter = merge_iter = new (mem) MergingIterator(comparator, nullptr, 0, true);
new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_extractor);
} }
void MergeIteratorBuilder::AddIterator(InternalIterator* iter) { void MergeIteratorBuilder::AddIterator(InternalIterator* iter) {

@ -9,7 +9,6 @@
#pragma once #pragma once
#include "rocksdb/slice_transform.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
namespace rocksdb { namespace rocksdb {
@ -27,10 +26,9 @@ class Arena;
// key is present in K child iterators, it will be yielded K times. // key is present in K child iterators, it will be yielded K times.
// //
// REQUIRES: n >= 0 // REQUIRES: n >= 0
extern InternalIterator* NewMergingIterator( extern InternalIterator* NewMergingIterator(const Comparator* comparator,
const Comparator* comparator, InternalIterator** children, int n, InternalIterator** children, int n,
Arena* arena = nullptr, Arena* arena = nullptr);
const SliceTransform* const prefix_extractor = nullptr);
class MergingIterator; class MergingIterator;
@ -39,9 +37,7 @@ class MergeIteratorBuilder {
public: public:
// comparator: the comparator used in merging comparator // comparator: the comparator used in merging comparator
// arena: where the merging iterator needs to be allocated from. // arena: where the merging iterator needs to be allocated from.
explicit MergeIteratorBuilder( explicit MergeIteratorBuilder(const Comparator* comparator, Arena* arena);
const Comparator* comparator, Arena* arena,
const SliceTransform* const prefix_extractor = nullptr);
~MergeIteratorBuilder() {} ~MergeIteratorBuilder() {}
// Add iter to the merging iterator. // Add iter to the merging iterator.

Loading…
Cancel
Save