do not read next datablock if upperbound is reached

Summary:
Now if we have iterate_upper_bound set, we continue read until get a key >= upper_bound. For a lot of cases that neighboring data blocks have a user key gap between them, our index key will be a user key in the middle to get a shorter size. For example, if we have blocks:
[a b c d][f g h]
Then the index key for the first block will be 'e'.
then if upper bound is any key between 'd' and 'e', for example, d1, d2, ..., d99999999999, we don't have to read the second block and also know that we have done our iteration by reaching the last key that smaller the upper bound already.

This diff can reduce RA in most cases.
Closes https://github.com/facebook/rocksdb/pull/2239

Differential Revision: D4990693

Pulled By: lightmark

fbshipit-source-id: ab30ea2e3c6edf3fddd5efed3c34fcf7739827ff
main
Aaron Gao 7 years ago committed by Facebook Github Bot
parent 2d42cf5ea9
commit a30a696034
  1. 72
      db/db_iterator_test.cc
  2. 4
      db/dbformat.cc
  3. 3
      db/forward_iterator.cc
  4. 3
      db/table_cache.cc
  5. 7
      db/version_set.cc
  6. 77
      table/block_based_table_reader.cc
  7. 20
      table/block_based_table_reader.h
  8. 3
      table/cuckoo_table_reader.cc
  9. 6
      table/cuckoo_table_reader.h
  10. 4
      table/mock_table.cc
  11. 4
      table/mock_table.h
  12. 1
      table/plain_table_reader.cc
  13. 6
      table/plain_table_reader.h
  14. 1
      table/table_reader.h
  15. 8
      table/two_level_iterator.cc
  16. 1
      table/two_level_iterator.h
  17. 2
      util/comparator.cc

@ -24,6 +24,34 @@ class DBIteratorTest : public DBTestBase {
DBIteratorTest() : DBTestBase("/db_iterator_test") {}
};
class FlushBlockEveryKeyPolicy : public FlushBlockPolicy {
public:
virtual bool Update(const Slice& key, const Slice& value) override {
if (!start_) {
start_ = true;
return false;
}
return true;
}
private:
bool start_ = false;
};
class FlushBlockEveryKeyPolicyFactory : public FlushBlockPolicyFactory {
public:
explicit FlushBlockEveryKeyPolicyFactory() {}
const char* Name() const override {
return "FlushBlockEveryKeyPolicyFactory";
}
FlushBlockPolicy* NewFlushBlockPolicy(
const BlockBasedTableOptions& table_options,
const BlockBuilder& data_block_builder) const override {
return new FlushBlockEveryKeyPolicy;
}
};
TEST_F(DBIteratorTest, IteratorProperty) {
// The test needs to be changed if kPersistedTier is supported in iterator.
Options options = CurrentOptions();
@ -977,6 +1005,50 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) {
}
}
TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) {
int upper_bound_hits = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound",
[&upper_bound_hits](void* arg) {
assert(arg != nullptr);
upper_bound_hits += (*static_cast<bool*>(arg) ? 1 : 0);
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.prefix_extractor = nullptr;
BlockBasedTableOptions table_options;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
ASSERT_OK(Put("foo1", "bar1"));
ASSERT_OK(Put("foo2", "bar2"));
ASSERT_OK(Put("foo4", "bar4"));
ASSERT_OK(Flush());
Slice ub("foo3");
ReadOptions ro;
ro.iterate_upper_bound = &ub;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro));
iter->Seek("foo");
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo1")), 0);
ASSERT_EQ(upper_bound_hits, 0);
iter->Next();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(Slice("foo2")), 0);
ASSERT_EQ(upper_bound_hits, 0);
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_EQ(upper_bound_hits, 1);
}
// TODO(3.13): fix the issue of Seek() + Prev() which might not necessary
// return the biggest key which is smaller than the seek key.
TEST_F(DBIteratorTest, PrevAfterAndNextAfterMerge) {

@ -131,7 +131,7 @@ void InternalKeyComparator::FindShortestSeparator(
Slice user_limit = ExtractUserKey(limit);
std::string tmp(user_start.data(), user_start.size());
user_comparator_->FindShortestSeparator(&tmp, user_limit);
if (tmp.size() < user_start.size() &&
if (tmp.size() <= user_start.size() &&
user_comparator_->Compare(user_start, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
@ -146,7 +146,7 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
Slice user_key = ExtractUserKey(*key);
std::string tmp(user_key.data(), user_key.size());
user_comparator_->FindShortSuccessor(&tmp);
if (tmp.size() < user_key.size() &&
if (tmp.size() <= user_key.size() &&
user_comparator_->Compare(user_key, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.

@ -75,7 +75,8 @@ class LevelIterator : public InternalIterator {
cfd_->internal_comparator(), {} /* snapshots */);
file_iter_ = cfd_->table_cache()->NewIterator(
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
files_[file_index_]->fd, read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
files_[file_index_]->fd,
read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
nullptr /* table_reader_ptr */, nullptr, false);
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
if (!range_del_agg.IsEmpty()) {

@ -224,7 +224,8 @@ InternalIterator* TableCache::NewIterator(
}
InternalIterator* result = nullptr;
if (s.ok()) {
result = table_reader->NewIterator(options, arena, skip_filters);
result =
table_reader->NewIterator(options, arena, &icomparator, skip_filters);
if (create_new_table_reader) {
assert(handle == nullptr);
result->RegisterCleanup(&DeleteTableReader, table_reader, nullptr);

@ -528,6 +528,13 @@ class LevelFileIteratorState : public TwoLevelIteratorState {
return true;
}
bool KeyReachedUpperBound(const Slice& internal_key) override {
return read_options_.iterate_upper_bound != nullptr &&
icomparator_.user_comparator()->Compare(
ExtractUserKey(internal_key),
*read_options_.iterate_upper_bound) >= 0;
}
private:
TableCache* table_cache_;
const ReadOptions read_options_;

@ -161,7 +161,8 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
static Status Create(BlockBasedTable* table, RandomAccessFileReader* file,
const Footer& footer, const BlockHandle& index_handle,
const ImmutableCFOptions& ioptions,
const Comparator* comparator, IndexReader** index_reader,
const InternalKeyComparator* icomparator,
IndexReader** index_reader,
const PersistentCacheOptions& cache_options,
const int level) {
std::unique_ptr<Block> index_block;
@ -172,7 +173,7 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
if (s.ok()) {
*index_reader =
new PartitionIndexReader(table, comparator, std::move(index_block),
new PartitionIndexReader(table, icomparator, std::move(index_block),
ioptions.statistics, level);
}
@ -196,8 +197,9 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
}
return NewTwoLevelIterator(
new BlockBasedTable::BlockEntryIteratorState(
table_, ReadOptions(), skip_filters, is_index, block_cache_cleaner),
index_block_->NewIterator(comparator_, nullptr, true));
table_, ReadOptions(), icomparator_, skip_filters, is_index,
block_cache_cleaner),
index_block_->NewIterator(icomparator_, nullptr, true));
// TODO(myabandeh): Update TwoLevelIterator to be able to make use of
// on-stack
// BlockIter while the state is on heap
@ -214,10 +216,11 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
}
private:
PartitionIndexReader(BlockBasedTable* table, const Comparator* comparator,
PartitionIndexReader(BlockBasedTable* table,
const InternalKeyComparator* icomparator,
std::unique_ptr<Block>&& index_block, Statistics* stats,
const int level)
: IndexReader(comparator, stats),
: IndexReader(icomparator, stats),
table_(table),
index_block_(std::move(index_block)),
level_(level) {
@ -239,8 +242,9 @@ class BinarySearchIndexReader : public IndexReader {
// unmodified.
static Status Create(RandomAccessFileReader* file, const Footer& footer,
const BlockHandle& index_handle,
const ImmutableCFOptions &ioptions,
const Comparator* comparator, IndexReader** index_reader,
const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icomparator,
IndexReader** index_reader,
const PersistentCacheOptions& cache_options) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(
@ -250,7 +254,7 @@ class BinarySearchIndexReader : public IndexReader {
if (s.ok()) {
*index_reader = new BinarySearchIndexReader(
comparator, std::move(index_block), ioptions.statistics);
icomparator, std::move(index_block), ioptions.statistics);
}
return s;
@ -258,7 +262,7 @@ class BinarySearchIndexReader : public IndexReader {
virtual InternalIterator* NewIterator(BlockIter* iter = nullptr,
bool dont_care = true) override {
return index_block_->NewIterator(comparator_, iter, true);
return index_block_->NewIterator(icomparator_, iter, true);
}
virtual size_t size() const override { return index_block_->size(); }
@ -272,10 +276,10 @@ class BinarySearchIndexReader : public IndexReader {
}
private:
BinarySearchIndexReader(const Comparator* comparator,
BinarySearchIndexReader(const InternalKeyComparator* icomparator,
std::unique_ptr<Block>&& index_block,
Statistics* stats)
: IndexReader(comparator, stats), index_block_(std::move(index_block)) {
: IndexReader(icomparator, stats), index_block_(std::move(index_block)) {
assert(index_block_ != nullptr);
}
std::unique_ptr<Block> index_block_;
@ -288,7 +292,7 @@ class HashIndexReader : public IndexReader {
static Status Create(const SliceTransform* hash_key_extractor,
const Footer& footer, RandomAccessFileReader* file,
const ImmutableCFOptions& ioptions,
const Comparator* comparator,
const InternalKeyComparator* icomparator,
const BlockHandle& index_handle,
InternalIterator* meta_index_iter,
IndexReader** index_reader,
@ -309,7 +313,7 @@ class HashIndexReader : public IndexReader {
// So, Create will succeed regardless, from this point on.
auto new_index_reader =
new HashIndexReader(comparator, std::move(index_block),
new HashIndexReader(icomparator, std::move(index_block),
ioptions.statistics);
*index_reader = new_index_reader;
@ -361,7 +365,7 @@ class HashIndexReader : public IndexReader {
virtual InternalIterator* NewIterator(BlockIter* iter = nullptr,
bool total_order_seek = true) override {
return index_block_->NewIterator(comparator_, iter, total_order_seek);
return index_block_->NewIterator(icomparator_, iter, total_order_seek);
}
virtual size_t size() const override { return index_block_->size(); }
@ -376,9 +380,9 @@ class HashIndexReader : public IndexReader {
}
private:
HashIndexReader(const Comparator* comparator,
HashIndexReader(const InternalKeyComparator* icomparator,
std::unique_ptr<Block>&& index_block, Statistics* stats)
: IndexReader(comparator, stats), index_block_(std::move(index_block)) {
: IndexReader(icomparator, stats), index_block_(std::move(index_block)) {
assert(index_block_ != nullptr);
}
@ -1381,11 +1385,13 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
}
BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState(
BlockBasedTable* table, const ReadOptions& read_options, bool skip_filters,
bool is_index, Cleanable* block_cache_cleaner)
BlockBasedTable* table, const ReadOptions& read_options,
const InternalKeyComparator* icomparator, bool skip_filters, bool is_index,
Cleanable* block_cache_cleaner)
: TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr),
table_(table),
read_options_(read_options),
icomparator_(icomparator),
skip_filters_(skip_filters),
is_index_(is_index),
block_cache_cleaner_(block_cache_cleaner) {}
@ -1424,6 +1430,19 @@ bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch(
return table_->PrefixMayMatch(internal_key);
}
bool BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound(
const Slice& internal_key) {
bool reached_upper_bound = read_options_.iterate_upper_bound != nullptr &&
icomparator_ != nullptr &&
icomparator_->user_comparator()->Compare(
ExtractUserKey(internal_key),
*read_options_.iterate_upper_bound) >= 0;
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound",
&reached_upper_bound);
return reached_upper_bound;
}
// This will be broken if the user specifies an unusual implementation
// of Options.comparator, or if the user specifies an unusual
// definition of prefixes in BlockBasedTableOptions.filter_policy.
@ -1526,11 +1545,11 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
return may_match;
}
InternalIterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
Arena* arena,
bool skip_filters) {
InternalIterator* BlockBasedTable::NewIterator(
const ReadOptions& read_options, Arena* arena,
const InternalKeyComparator* icomp, bool skip_filters) {
return NewTwoLevelIterator(
new BlockEntryIteratorState(this, read_options, skip_filters),
new BlockEntryIteratorState(this, read_options, icomp, skip_filters),
NewIndexIterator(read_options), arena);
}
@ -1782,7 +1801,7 @@ Status BlockBasedTable::CreateIndexReader(
}
auto file = rep_->file.get();
auto comparator = &rep_->internal_comparator;
const InternalKeyComparator* icomparator = &rep_->internal_comparator;
const Footer& footer = rep_->footer;
if (index_type_on_file == BlockBasedTableOptions::kHashSearch &&
rep_->ioptions.prefix_extractor == nullptr) {
@ -1796,12 +1815,12 @@ Status BlockBasedTable::CreateIndexReader(
switch (index_type_on_file) {
case BlockBasedTableOptions::kTwoLevelIndexSearch: {
return PartitionIndexReader::Create(
this, file, footer, footer.index_handle(), rep_->ioptions, comparator,
index_reader, rep_->persistent_cache_options, level);
this, file, footer, footer.index_handle(), rep_->ioptions,
icomparator, index_reader, rep_->persistent_cache_options, level);
}
case BlockBasedTableOptions::kBinarySearch: {
return BinarySearchIndexReader::Create(
file, footer, footer.index_handle(), rep_->ioptions, comparator,
file, footer, footer.index_handle(), rep_->ioptions, icomparator,
index_reader, rep_->persistent_cache_options);
}
case BlockBasedTableOptions::kHashSearch: {
@ -1817,7 +1836,7 @@ Status BlockBasedTable::CreateIndexReader(
"Unable to read the metaindex block."
" Fall back to binary search index.");
return BinarySearchIndexReader::Create(
file, footer, footer.index_handle(), rep_->ioptions, comparator,
file, footer, footer.index_handle(), rep_->ioptions, icomparator,
index_reader, rep_->persistent_cache_options);
}
meta_index_iter = meta_iter_guard.get();
@ -1825,7 +1844,7 @@ Status BlockBasedTable::CreateIndexReader(
return HashIndexReader::Create(
rep_->internal_prefix_transform.get(), footer, file, rep_->ioptions,
comparator, footer.index_handle(), meta_index_iter, index_reader,
icomparator, footer.index_handle(), meta_index_iter, index_reader,
rep_->hash_index_allow_collision, rep_->persistent_cache_options);
}
default: {

@ -102,8 +102,10 @@ class BlockBasedTable : public TableReader {
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
// @param skip_filters Disables loading/accessing the filter block
InternalIterator* NewIterator(const ReadOptions&, Arena* arena = nullptr,
bool skip_filters = false) override;
InternalIterator* NewIterator(
const ReadOptions&, Arena* arena = nullptr,
const InternalKeyComparator* icomparator = nullptr,
bool skip_filters = false) override;
InternalIterator* NewRangeTombstoneIterator(
const ReadOptions& read_options) override;
@ -151,8 +153,9 @@ class BlockBasedTable : public TableReader {
// access.
class IndexReader {
public:
explicit IndexReader(const Comparator* comparator, Statistics* stats)
: comparator_(comparator), statistics_(stats) {}
explicit IndexReader(const InternalKeyComparator* icomparator,
Statistics* stats)
: icomparator_(icomparator), statistics_(stats) {}
virtual ~IndexReader() {}
@ -180,7 +183,7 @@ class BlockBasedTable : public TableReader {
virtual size_t ApproximateMemoryUsage() const = 0;
protected:
const Comparator* comparator_;
const InternalKeyComparator* icomparator_;
private:
Statistics* statistics_;
@ -343,16 +346,19 @@ class BlockBasedTable : public TableReader {
class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
public:
BlockEntryIteratorState(BlockBasedTable* table,
const ReadOptions& read_options, bool skip_filters,
bool is_index = false,
const ReadOptions& read_options,
const InternalKeyComparator* icomparator,
bool skip_filters, bool is_index = false,
Cleanable* block_cache_cleaner = nullptr);
InternalIterator* NewSecondaryIterator(const Slice& index_value) override;
bool PrefixMayMatch(const Slice& internal_key) override;
bool KeyReachedUpperBound(const Slice& internal_key) override;
private:
// Don't own table_
BlockBasedTable* table_;
const ReadOptions read_options_;
const InternalKeyComparator* icomparator_;
bool skip_filters_;
// true if the 2nd level iterator is on indexes instead of on user data.
bool is_index_;

@ -366,7 +366,8 @@ extern InternalIterator* NewErrorInternalIterator(const Status& status,
Arena* arena);
InternalIterator* CuckooTableReader::NewIterator(
const ReadOptions& read_options, Arena* arena, bool skip_filters) {
const ReadOptions& read_options, Arena* arena,
const InternalKeyComparator* icomp, bool skip_filters) {
if (!status().ok()) {
return NewErrorInternalIterator(
Status::Corruption("CuckooTableReader status is not okay."), arena);

@ -47,8 +47,10 @@ class CuckooTableReader: public TableReader {
Status Get(const ReadOptions& read_options, const Slice& key,
GetContext* get_context, bool skip_filters = false) override;
InternalIterator* NewIterator(const ReadOptions&, Arena* arena = nullptr,
bool skip_filters = false) override;
InternalIterator* NewIterator(
const ReadOptions&, Arena* arena = nullptr,
const InternalKeyComparator* icomparator = nullptr,
bool skip_filters = false) override;
void Prepare(const Slice& target) override;
// Report an approximation of how much memory has been used.

@ -30,7 +30,9 @@ stl_wrappers::KVMap MakeMockFile(
return stl_wrappers::KVMap(l, stl_wrappers::LessOfComparator(&icmp_));
}
InternalIterator* MockTableReader::NewIterator(const ReadOptions&, Arena* arena,
InternalIterator* MockTableReader::NewIterator(const ReadOptions&,
Arena* arena,
const InternalKeyComparator*,
bool skip_filters) {
return new MockTableIterator(table_);
}

@ -42,7 +42,9 @@ class MockTableReader : public TableReader {
public:
explicit MockTableReader(const stl_wrappers::KVMap& table) : table_(table) {}
InternalIterator* NewIterator(const ReadOptions&, Arena* arena,
InternalIterator* NewIterator(const ReadOptions&,
Arena* arena,
const InternalKeyComparator* = nullptr,
bool skip_filters = false) override;
Status Get(const ReadOptions&, const Slice& key, GetContext* get_context,

@ -191,6 +191,7 @@ void PlainTableReader::SetupForCompaction() {
InternalIterator* PlainTableReader::NewIterator(const ReadOptions& options,
Arena* arena,
const InternalKeyComparator*,
bool skip_filters) {
if (options.total_order_seek && !IsTotalOrderMode()) {
return NewErrorInternalIterator(

@ -71,14 +71,16 @@ class PlainTableReader: public TableReader {
public:
static Status Open(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
const InternalKeyComparator& internal_comparator,
const InternalKeyComparator& icomparator,
unique_ptr<RandomAccessFileReader>&& file,
uint64_t file_size, unique_ptr<TableReader>* table,
const int bloom_bits_per_key, double hash_table_ratio,
size_t index_sparseness, size_t huge_page_tlb_size,
bool full_scan_mode);
InternalIterator* NewIterator(const ReadOptions&, Arena* arena = nullptr,
InternalIterator* NewIterator(const ReadOptions&,
Arena* arena = nullptr,
const InternalKeyComparator* = nullptr,
bool skip_filters = false) override;
void Prepare(const Slice& target) override;

@ -42,6 +42,7 @@ class TableReader {
// option is effective only for block-based table format.
virtual InternalIterator* NewIterator(const ReadOptions&,
Arena* arena = nullptr,
const InternalKeyComparator* = nullptr,
bool skip_filters = false) = 0;
virtual InternalIterator* NewRangeTombstoneIterator(

@ -180,13 +180,13 @@ void TwoLevelIterator::Prev() {
SkipEmptyDataBlocksBackward();
}
void TwoLevelIterator::SkipEmptyDataBlocksForward() {
while (second_level_iter_.iter() == nullptr ||
(!second_level_iter_.Valid() &&
!second_level_iter_.status().IsIncomplete())) {
!second_level_iter_.status().IsIncomplete())) {
// Move to next block
if (!first_level_iter_.Valid()) {
if (!first_level_iter_.Valid() ||
state_->KeyReachedUpperBound(first_level_iter_.key())) {
SetSecondLevelIterator(nullptr);
return;
}
@ -201,7 +201,7 @@ void TwoLevelIterator::SkipEmptyDataBlocksForward() {
void TwoLevelIterator::SkipEmptyDataBlocksBackward() {
while (second_level_iter_.iter() == nullptr ||
(!second_level_iter_.Valid() &&
!second_level_iter_.status().IsIncomplete())) {
!second_level_iter_.status().IsIncomplete())) {
// Move to next block
if (!first_level_iter_.Valid()) {
SetSecondLevelIterator(nullptr);

@ -27,6 +27,7 @@ struct TwoLevelIteratorState {
virtual ~TwoLevelIteratorState() {}
virtual InternalIterator* NewSecondaryIterator(const Slice& handle) = 0;
virtual bool PrefixMayMatch(const Slice& internal_key) = 0;
virtual bool KeyReachedUpperBound(const Slice& internal_key) = 0;
// If call PrefixMayMatch()
bool check_prefix_may_match;

@ -53,7 +53,7 @@ class BytewiseComparatorImpl : public Comparator {
} else {
uint8_t start_byte = static_cast<uint8_t>((*start)[diff_index]);
uint8_t limit_byte = static_cast<uint8_t>(limit[diff_index]);
if (start_byte >= limit_byte || (diff_index == start->size() - 1)) {
if (start_byte >= limit_byte) {
// Cannot shorten since limit is smaller than start or start is
// already the shortest possible.
return;

Loading…
Cancel
Save