Check PrefixMayMatch on Seek()

Summary:
As a follow-up diff for https://reviews.facebook.net/D17805, add
optimization to check PrefixMayMatch on Seek()

Test Plan: make all check

Reviewers: igor, haobo, sdong, yhchiang, dhruba

Reviewed By: haobo

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17853
main
Lei Jin 11 years ago
parent 3995e801ab
commit d642c60bdc
  1. 2
      db/db_test.cc
  2. 4
      db/table_cache.cc
  3. 105
      db/version_set.cc
  4. 9
      db/version_set.h
  5. 225
      table/block_based_table_reader.cc
  6. 12
      table/block_based_table_reader.h
  7. 156
      table/two_level_iterator.cc
  8. 24
      table/two_level_iterator.h

@ -6481,7 +6481,7 @@ TEST(DBTest, PrefixScan) {
ASSERT_OK(iter->status()); ASSERT_OK(iter->status());
delete iter; delete iter;
ASSERT_EQ(count, 2); ASSERT_EQ(count, 2);
ASSERT_EQ(env_->random_read_counter_.Read(), 11); ASSERT_EQ(env_->random_read_counter_.Read(), 2);
Close(); Close();
delete options.filter_policy; delete options.filter_policy;
} }

@ -193,7 +193,7 @@ Status TableCache::GetTableProperties(
bool TableCache::PrefixMayMatch(const ReadOptions& options, bool TableCache::PrefixMayMatch(const ReadOptions& options,
const InternalKeyComparator& icomparator, const InternalKeyComparator& icomparator,
const FileMetaData& file_meta, const FileMetaData& file_meta,
const Slice& internal_prefix, bool* table_io) { const Slice& internal_key, bool* table_io) {
bool may_match = true; bool may_match = true;
auto table_reader = file_meta.table_reader; auto table_reader = file_meta.table_reader;
Cache::Handle* table_handle = nullptr; Cache::Handle* table_handle = nullptr;
@ -207,7 +207,7 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options,
table_reader = GetTableReaderFromHandle(table_handle); table_reader = GetTableReaderFromHandle(table_handle);
} }
may_match = table_reader->PrefixMayMatch(internal_prefix); may_match = table_reader->PrefixMayMatch(internal_key);
if (table_handle != nullptr) { if (table_handle != nullptr) {
// Need to release handle if it is generated from here. // Need to release handle if it is generated from here.

@ -217,49 +217,43 @@ class Version::LevelFileNumIterator : public Iterator {
mutable EncodedFileMetaData current_value_; mutable EncodedFileMetaData current_value_;
}; };
static Iterator* GetFileIterator(void* arg, const ReadOptions& options, class Version::LevelFileIteratorState : public TwoLevelIteratorState {
const EnvOptions& soptions, public:
const InternalKeyComparator& icomparator, LevelFileIteratorState(TableCache* table_cache,
const Slice& file_value, bool for_compaction) { const ReadOptions& read_options, const EnvOptions& env_options,
TableCache* cache = reinterpret_cast<TableCache*>(arg); const InternalKeyComparator& icomparator, bool for_compaction,
if (file_value.size() != sizeof(EncodedFileMetaData)) { bool prefix_enabled)
: TwoLevelIteratorState(prefix_enabled),
table_cache_(table_cache), read_options_(read_options),
env_options_(env_options), icomparator_(icomparator),
for_compaction_(for_compaction) {}
Iterator* NewSecondaryIterator(const Slice& meta_handle) override {
if (meta_handle.size() != sizeof(EncodedFileMetaData)) {
return NewErrorIterator( return NewErrorIterator(
Status::Corruption("FileReader invoked with unexpected value")); Status::Corruption("FileReader invoked with unexpected value"));
} else { } else {
const EncodedFileMetaData* encoded_meta = const EncodedFileMetaData* encoded_meta =
reinterpret_cast<const EncodedFileMetaData*>(file_value.data()); reinterpret_cast<const EncodedFileMetaData*>(meta_handle.data());
FileMetaData meta(encoded_meta->number, encoded_meta->file_size); FileMetaData meta(encoded_meta->number, encoded_meta->file_size);
meta.table_reader = encoded_meta->table_reader; meta.table_reader = encoded_meta->table_reader;
return cache->NewIterator(options, soptions, icomparator, meta, return table_cache_->NewIterator(read_options_, env_options_,
nullptr /* don't need reference to table*/, for_compaction); icomparator_, meta, nullptr /* don't need reference to table*/,
for_compaction_);
}
} }
}
bool Version::PrefixMayMatch(const ReadOptions& options, bool PrefixMayMatch(const Slice& internal_key) override {
const EnvOptions& soptions, return true;
const Slice& internal_prefix,
Iterator* level_iter) const {
bool may_match = true;
level_iter->Seek(internal_prefix);
if (!level_iter->Valid()) {
// we're past end of level
may_match = false;
} else if (ExtractUserKey(level_iter->key()).starts_with(
ExtractUserKey(internal_prefix))) {
// TODO(tylerharter): do we need this case? Or are we guaranteed
// key() will always be the biggest value for this SST?
may_match = true;
} else {
const EncodedFileMetaData* encoded_meta =
reinterpret_cast<const EncodedFileMetaData*>(
level_iter->value().data());
FileMetaData meta(encoded_meta->number, encoded_meta->file_size);
meta.table_reader = encoded_meta->table_reader;
may_match = cfd_->table_cache()->PrefixMayMatch(
options, cfd_->internal_comparator(), meta, internal_prefix, nullptr);
} }
return may_match;
} private:
TableCache* table_cache_;
const ReadOptions read_options_;
const EnvOptions& env_options_;
const InternalKeyComparator& icomparator_;
bool for_compaction_;
};
Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
auto table_cache = cfd_->table_cache(); auto table_cache = cfd_->table_cache();
@ -314,15 +308,6 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
return Status::OK(); return Status::OK();
} }
Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
const EnvOptions& soptions,
int level) const {
Iterator* level_iter =
new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level]);
return NewTwoLevelIterator(level_iter, &GetFileIterator, cfd_->table_cache(),
options, soptions, cfd_->internal_comparator());
}
void Version::AddIterators(const ReadOptions& options, void Version::AddIterators(const ReadOptions& options,
const EnvOptions& soptions, const EnvOptions& soptions,
std::vector<Iterator*>* iters) { std::vector<Iterator*>* iters) {
@ -337,7 +322,11 @@ void Version::AddIterators(const ReadOptions& options,
// lazily. // lazily.
for (int level = 1; level < num_levels_; level++) { for (int level = 1; level < num_levels_; level++) {
if (!files_[level].empty()) { if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, soptions, level)); iters->push_back(NewTwoLevelIterator(new LevelFileIteratorState(
cfd_->table_cache(), options, soptions,
cfd_->internal_comparator(), false /* for_compaction */,
cfd_->options()->prefix_extractor != nullptr),
new LevelFileNumIterator(cfd_->internal_comparator(), &files_[level])));
} }
} }
} }
@ -2638,10 +2627,11 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
} }
Iterator* VersionSet::MakeInputIterator(Compaction* c) { Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options; auto cfd = c->column_family_data();
options.verify_checksums = ReadOptions read_options;
c->column_family_data()->options()->verify_checksums_in_compaction; read_options.verify_checksums =
options.fill_cache = false; cfd->options()->verify_checksums_in_compaction;
read_options.fill_cache = false;
// Level-0 files have to be merged together. For other levels, // Level-0 files have to be merged together. For other levels,
// we will make a concatenating iterator per level. // we will make a concatenating iterator per level.
@ -2653,20 +2643,19 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
if (!c->inputs(which)->empty()) { if (!c->inputs(which)->empty()) {
if (c->level() + which == 0) { if (c->level() + which == 0) {
for (const auto& file : *c->inputs(which)) { for (const auto& file : *c->inputs(which)) {
list[num++] = c->column_family_data()->table_cache()->NewIterator( list[num++] = cfd->table_cache()->NewIterator(
options, storage_options_compactions_, read_options, storage_options_compactions_,
c->column_family_data()->internal_comparator(), *file, nullptr, cfd->internal_comparator(), *file, nullptr,
true /* for compaction */); true /* for compaction */);
} }
} else { } else {
// Create concatenating iterator for the files from this level // Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator( list[num++] = NewTwoLevelIterator(new Version::LevelFileIteratorState(
new Version::LevelFileNumIterator( cfd->table_cache(), read_options, storage_options_,
c->column_family_data()->internal_comparator(), cfd->internal_comparator(), true /* for_compaction */,
c->inputs(which)), false /* prefix enabled */),
&GetFileIterator, c->column_family_data()->table_cache(), options, new Version::LevelFileNumIterator(cfd->internal_comparator(),
storage_options_, c->column_family_data()->internal_comparator(), c->inputs(which)));
true /* for compaction */);
} }
} }
} }

@ -219,11 +219,10 @@ class Version {
friend class UniversalCompactionPicker; friend class UniversalCompactionPicker;
class LevelFileNumIterator; class LevelFileNumIterator;
Iterator* NewConcatenatingIterator(const ReadOptions&, struct LevelFileIteratorState;
const EnvOptions& soptions,
int level) const; bool PrefixMayMatch(const ReadOptions& options, Iterator* level_iter,
bool PrefixMayMatch(const ReadOptions& options, const EnvOptions& soptions, const Slice& internal_prefix) const;
const Slice& internal_prefix, Iterator* level_iter) const;
// Sort all files for this version based on their file size and // Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first. // record results in files_by_size_. The largest files are listed first.

@ -642,94 +642,6 @@ FilterBlockReader* BlockBasedTable::ReadFilter (
rep->options, block.data, block.heap_allocated); rep->options, block.data, block.heap_allocated);
} }
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
Iterator* BlockBasedTable::DataBlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value,
bool* didIO, bool for_compaction) {
const bool no_io = (options.read_tier == kBlockCacheTier);
BlockBasedTable* table = reinterpret_cast<BlockBasedTable*>(arg);
Cache* block_cache = table->rep_->options.block_cache.get();
Cache* block_cache_compressed = table->rep_->options.
block_cache_compressed.get();
CachableEntry<Block> block;
BlockHandle handle;
Slice input = index_value;
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.
Status s = handle.DecodeFrom(&input);
if (!s.ok()) {
return NewErrorIterator(s);
}
// If either block cache is enabled, we'll try to read from it.
if (block_cache != nullptr || block_cache_compressed != nullptr) {
Statistics* statistics = table->rep_->options.statistics.get();
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice key, /* key to the block cache */
ckey /* key to the compressed block cache */;
// create key for block cache
if (block_cache != nullptr) {
key = GetCacheKey(table->rep_->cache_key_prefix,
table->rep_->cache_key_prefix_size, handle, cache_key);
}
if (block_cache_compressed != nullptr) {
ckey = GetCacheKey(table->rep_->compressed_cache_key_prefix,
table->rep_->compressed_cache_key_prefix_size, handle,
compressed_cache_key);
}
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
statistics, options, &block);
if (block.value == nullptr && !no_io && options.fill_cache) {
Histograms histogram = for_compaction ?
READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS;
Block* raw_block = nullptr;
{
StopWatch sw(table->rep_->options.env, statistics, histogram);
s = ReadBlockFromFile(table->rep_->file.get(), options, handle,
&raw_block, table->rep_->options.env, didIO,
block_cache_compressed == nullptr);
}
if (s.ok()) {
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
options, statistics, &block, raw_block);
}
}
}
// Didn't get any data from block caches.
if (block.value == nullptr) {
if (no_io) {
// Could not read from block_cache and can't do IO
return NewErrorIterator(Status::Incomplete("no blocking io"));
}
s = ReadBlockFromFile(table->rep_->file.get(), options, handle,
&block.value, table->rep_->options.env, didIO);
}
Iterator* iter;
if (block.value != nullptr) {
iter = block.value->NewIterator(&table->rep_->internal_comparator);
if (block.cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
block.cache_handle);
} else {
iter->RegisterCleanup(&DeleteHeldResource<Block>, block.value, nullptr);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
}
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter( BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
bool no_io) const { bool no_io) const {
@ -838,13 +750,115 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) {
return iter; return iter;
} }
Iterator* BlockBasedTable::DataBlockReader( // Convert an index iterator value (i.e., an encoded BlockHandle)
void* arg, const ReadOptions& options, const EnvOptions& soptions, // into an iterator over the contents of the corresponding block.
const InternalKeyComparator& icomparator, const Slice& index_value, Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep,
bool for_compaction) { const ReadOptions& ro, bool* didIO, const Slice& index_value) {
return DataBlockReader(arg, options, index_value, nullptr, for_compaction); const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->options.block_cache.get();
Cache* block_cache_compressed = rep->options.
block_cache_compressed.get();
CachableEntry<Block> block;
BlockHandle handle;
Slice input = index_value;
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.
Status s = handle.DecodeFrom(&input);
if (!s.ok()) {
return NewErrorIterator(s);
}
// If either block cache is enabled, we'll try to read from it.
if (block_cache != nullptr || block_cache_compressed != nullptr) {
Statistics* statistics = rep->options.statistics.get();
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice key, /* key to the block cache */
ckey /* key to the compressed block cache */;
// create key for block cache
if (block_cache != nullptr) {
key = GetCacheKey(rep->cache_key_prefix,
rep->cache_key_prefix_size, handle, cache_key);
}
if (block_cache_compressed != nullptr) {
ckey = GetCacheKey(rep->compressed_cache_key_prefix,
rep->compressed_cache_key_prefix_size, handle,
compressed_cache_key);
}
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
statistics, ro, &block);
if (block.value == nullptr && !no_io && ro.fill_cache) {
Histograms histogram = READ_BLOCK_GET_MICROS;
Block* raw_block = nullptr;
{
StopWatch sw(rep->options.env, statistics, histogram);
s = ReadBlockFromFile(rep->file.get(), ro, handle,
&raw_block, rep->options.env, didIO,
block_cache_compressed == nullptr);
}
if (s.ok()) {
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
ro, statistics, &block, raw_block);
}
}
}
// Didn't get any data from block caches.
if (block.value == nullptr) {
if (no_io) {
// Could not read from block_cache and can't do IO
return NewErrorIterator(Status::Incomplete("no blocking io"));
}
s = ReadBlockFromFile(rep->file.get(), ro, handle,
&block.value, rep->options.env, didIO);
}
Iterator* iter;
if (block.value != nullptr) {
iter = block.value->NewIterator(&rep->internal_comparator);
if (block.cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
block.cache_handle);
} else {
iter->RegisterCleanup(&DeleteHeldResource<Block>, block.value, nullptr);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
} }
class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
public:
BlockEntryIteratorState(BlockBasedTable* table,
const ReadOptions& read_options, bool* did_io)
: TwoLevelIteratorState(table->rep_->options.prefix_extractor != nullptr),
table_(table), read_options_(read_options), did_io_(did_io) {}
Iterator* NewSecondaryIterator(const Slice& index_value) override {
return NewDataBlockIterator(table_->rep_, read_options_, did_io_,
index_value);
}
bool PrefixMayMatch(const Slice& internal_key) override {
return table_->PrefixMayMatch(internal_key);
}
private:
// Don't own table_
BlockBasedTable* table_;
const ReadOptions read_options_;
// Don't own did_io_
bool* did_io_;
};
// This will be broken if the user specifies an unusual implementation // This will be broken if the user specifies an unusual implementation
// of Options.comparator, or if the user specifies an unusual // of Options.comparator, or if the user specifies an unusual
// definition of prefixes in Options.filter_policy. In particular, we // definition of prefixes in Options.filter_policy. In particular, we
@ -857,7 +871,13 @@ Iterator* BlockBasedTable::DataBlockReader(
// Otherwise, this method guarantees no I/O will be incurred. // Otherwise, this method guarantees no I/O will be incurred.
// //
// REQUIRES: this method shouldn't be called while the DB lock is held. // REQUIRES: this method shouldn't be called while the DB lock is held.
bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) { bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) {
assert(rep_->options.prefix_extractor != nullptr);
auto prefix = rep_->options.prefix_extractor->Transform(
ExtractUserKey(internal_key));
InternalKey internal_key_prefix(prefix, 0, kTypeValue);
auto internal_prefix = internal_key_prefix.Encode();
bool may_match = true; bool may_match = true;
Status s; Status s;
@ -918,11 +938,10 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) {
return may_match; return may_match;
} }
Iterator* BlockBasedTable::NewIterator(const ReadOptions& options) { Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options) {
return NewTwoLevelIterator(NewIndexIterator(options), return NewTwoLevelIterator(new BlockEntryIteratorState(this, read_options,
&BlockBasedTable::DataBlockReader, nullptr),
const_cast<BlockBasedTable*>(this), options, NewIndexIterator(read_options));
rep_->soptions, rep_->internal_comparator);
} }
Status BlockBasedTable::Get( Status BlockBasedTable::Get(
@ -953,7 +972,7 @@ Status BlockBasedTable::Get(
} else { } else {
bool didIO = false; bool didIO = false;
unique_ptr<Iterator> block_iter( unique_ptr<Iterator> block_iter(
DataBlockReader(this, read_options, iiter->value(), &didIO)); NewDataBlockIterator(rep_, read_options, &didIO, iiter->value()));
if (read_options.read_tier && block_iter->status().IsIncomplete()) { if (read_options.read_tier && block_iter->status().IsIncomplete()) {
// couldn't get block from block_cache // couldn't get block from block_cache
@ -1050,10 +1069,8 @@ Status BlockBasedTable::CreateIndexReader(IndexReader** index_reader) {
return HashIndexReader::Create( return HashIndexReader::Create(
file, index_handle, env, comparator, file, index_handle, env, comparator,
[&](Iterator* index_iter) { [&](Iterator* index_iter) {
return NewTwoLevelIterator( return NewTwoLevelIterator(new BlockEntryIteratorState(this,
index_iter, &BlockBasedTable::DataBlockReader, ReadOptions(), nullptr), index_iter);
const_cast<BlockBasedTable*>(this), ReadOptions(),
rep_->soptions, rep_->internal_comparator);
}, },
rep_->internal_prefix_transform.get(), index_reader); rep_->internal_prefix_transform.get(), index_reader);
} }

@ -63,7 +63,7 @@ class BlockBasedTable : public TableReader {
unique_ptr<RandomAccessFile>&& file, uint64_t file_size, unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader); unique_ptr<TableReader>* table_reader);
bool PrefixMayMatch(const Slice& internal_prefix) override; bool PrefixMayMatch(const Slice& internal_key) override;
// Returns a new iterator over the table contents. // Returns a new iterator over the table contents.
// The result of NewIterator() is initially invalid (caller must // The result of NewIterator() is initially invalid (caller must
@ -111,13 +111,9 @@ class BlockBasedTable : public TableReader {
Rep* rep_; Rep* rep_;
bool compaction_optimized_; bool compaction_optimized_;
static Iterator* DataBlockReader(void*, const ReadOptions&, struct BlockEntryIteratorState;
const EnvOptions& soptions, static Iterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro,
const InternalKeyComparator& icomparator, bool* didIO, const Slice& index_value);
const Slice&, bool for_compaction);
static Iterator* DataBlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO, bool for_compaction = false);
// For the following two functions: // For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file // if `no_io == true`, we will not try to read filter/index from sst file

@ -13,26 +13,17 @@
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "table/block.h" #include "table/block.h"
#include "table/format.h" #include "table/format.h"
#include "table/iterator_wrapper.h"
namespace rocksdb { namespace rocksdb {
namespace { namespace {
typedef Iterator* (*BlockFunction)(void*, const ReadOptions&,
const EnvOptions& soptions,
const InternalKeyComparator& icomparator,
const Slice&, bool for_compaction);
class TwoLevelIterator: public Iterator { class TwoLevelIterator: public Iterator {
public: public:
TwoLevelIterator(Iterator* index_iter, BlockFunction block_function, explicit TwoLevelIterator(TwoLevelIteratorState* state,
void* arg, const ReadOptions& options, Iterator* first_level_iter);
const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
bool for_compaction);
virtual ~TwoLevelIterator(); virtual ~TwoLevelIterator() {}
virtual void Seek(const Slice& target); virtual void Seek(const Slice& target);
virtual void SeekToFirst(); virtual void SeekToFirst();
@ -41,22 +32,23 @@ class TwoLevelIterator: public Iterator {
virtual void Prev(); virtual void Prev();
virtual bool Valid() const { virtual bool Valid() const {
return data_iter_.Valid(); return second_level_iter_.Valid();
} }
virtual Slice key() const { virtual Slice key() const {
assert(Valid()); assert(Valid());
return data_iter_.key(); return second_level_iter_.key();
} }
virtual Slice value() const { virtual Slice value() const {
assert(Valid()); assert(Valid());
return data_iter_.value(); return second_level_iter_.value();
} }
virtual Status status() const { virtual Status status() const {
// It'd be nice if status() returned a const Status& instead of a Status // It'd be nice if status() returned a const Status& instead of a Status
if (!index_iter_.status().ok()) { if (!first_level_iter_.status().ok()) {
return index_iter_.status(); return first_level_iter_.status();
} else if (data_iter_.iter() != nullptr && !data_iter_.status().ok()) { } else if (second_level_iter_.iter() != nullptr &&
return data_iter_.status(); !second_level_iter_.status().ok()) {
return second_level_iter_.status();
} else { } else {
return status_; return status_;
} }
@ -68,135 +60,129 @@ class TwoLevelIterator: public Iterator {
} }
void SkipEmptyDataBlocksForward(); void SkipEmptyDataBlocksForward();
void SkipEmptyDataBlocksBackward(); void SkipEmptyDataBlocksBackward();
void SetDataIterator(Iterator* data_iter); void SetSecondLevelIterator(Iterator* iter);
void InitDataBlock(); void InitDataBlock();
BlockFunction block_function_; std::unique_ptr<TwoLevelIteratorState> state_;
void* arg_; IteratorWrapper first_level_iter_;
const ReadOptions options_; IteratorWrapper second_level_iter_; // May be nullptr
const EnvOptions& soptions_;
const InternalKeyComparator& internal_comparator_;
Status status_; Status status_;
IteratorWrapper index_iter_; // If second_level_iter is non-nullptr, then "data_block_handle_" holds the
IteratorWrapper data_iter_; // May be nullptr // "index_value" passed to block_function_ to create the second_level_iter.
// If data_iter_ is non-nullptr, then "data_block_handle_" holds the
// "index_value" passed to block_function_ to create the data_iter_.
std::string data_block_handle_; std::string data_block_handle_;
bool for_compaction_;
}; };
TwoLevelIterator::TwoLevelIterator( TwoLevelIterator::TwoLevelIterator(TwoLevelIteratorState* state,
Iterator* index_iter, BlockFunction block_function, void* arg, Iterator* first_level_iter)
const ReadOptions& options, const EnvOptions& soptions, : state_(state), first_level_iter_(first_level_iter) {}
const InternalKeyComparator& internal_comparator, bool for_compaction)
: block_function_(block_function),
arg_(arg),
options_(options),
soptions_(soptions),
internal_comparator_(internal_comparator),
index_iter_(index_iter),
data_iter_(nullptr),
for_compaction_(for_compaction) {}
TwoLevelIterator::~TwoLevelIterator() {
}
void TwoLevelIterator::Seek(const Slice& target) { void TwoLevelIterator::Seek(const Slice& target) {
index_iter_.Seek(target); if (state_->prefix_enabled && !state_->PrefixMayMatch(target)) {
SetSecondLevelIterator(nullptr);
} else {
first_level_iter_.Seek(target);
InitDataBlock(); InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.Seek(target); if (second_level_iter_.iter() != nullptr) {
second_level_iter_.Seek(target);
}
SkipEmptyDataBlocksForward(); SkipEmptyDataBlocksForward();
}
} }
void TwoLevelIterator::SeekToFirst() { void TwoLevelIterator::SeekToFirst() {
index_iter_.SeekToFirst(); first_level_iter_.SeekToFirst();
InitDataBlock(); InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst(); if (second_level_iter_.iter() != nullptr) {
second_level_iter_.SeekToFirst();
}
SkipEmptyDataBlocksForward(); SkipEmptyDataBlocksForward();
} }
void TwoLevelIterator::SeekToLast() { void TwoLevelIterator::SeekToLast() {
index_iter_.SeekToLast(); first_level_iter_.SeekToLast();
InitDataBlock(); InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToLast(); if (second_level_iter_.iter() != nullptr) {
second_level_iter_.SeekToLast();
}
SkipEmptyDataBlocksBackward(); SkipEmptyDataBlocksBackward();
} }
void TwoLevelIterator::Next() { void TwoLevelIterator::Next() {
assert(Valid()); assert(Valid());
data_iter_.Next(); second_level_iter_.Next();
SkipEmptyDataBlocksForward(); SkipEmptyDataBlocksForward();
} }
void TwoLevelIterator::Prev() { void TwoLevelIterator::Prev() {
assert(Valid()); assert(Valid());
data_iter_.Prev(); second_level_iter_.Prev();
SkipEmptyDataBlocksBackward(); SkipEmptyDataBlocksBackward();
} }
void TwoLevelIterator::SkipEmptyDataBlocksForward() { void TwoLevelIterator::SkipEmptyDataBlocksForward() {
while (data_iter_.iter() == nullptr || (!data_iter_.Valid() && while (second_level_iter_.iter() == nullptr ||
!data_iter_.status().IsIncomplete())) { (!second_level_iter_.Valid() &&
!second_level_iter_.status().IsIncomplete())) {
// Move to next block // Move to next block
if (!index_iter_.Valid()) { if (!first_level_iter_.Valid()) {
SetDataIterator(nullptr); SetSecondLevelIterator(nullptr);
return; return;
} }
index_iter_.Next(); first_level_iter_.Next();
InitDataBlock(); InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst(); if (second_level_iter_.iter() != nullptr) {
second_level_iter_.SeekToFirst();
}
} }
} }
void TwoLevelIterator::SkipEmptyDataBlocksBackward() { void TwoLevelIterator::SkipEmptyDataBlocksBackward() {
while (data_iter_.iter() == nullptr || (!data_iter_.Valid() && while (second_level_iter_.iter() == nullptr ||
!data_iter_.status().IsIncomplete())) { (!second_level_iter_.Valid() &&
!second_level_iter_.status().IsIncomplete())) {
// Move to next block // Move to next block
if (!index_iter_.Valid()) { if (!first_level_iter_.Valid()) {
SetDataIterator(nullptr); SetSecondLevelIterator(nullptr);
return; return;
} }
index_iter_.Prev(); first_level_iter_.Prev();
InitDataBlock(); InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.SeekToLast(); if (second_level_iter_.iter() != nullptr) {
second_level_iter_.SeekToLast();
}
} }
} }
void TwoLevelIterator::SetDataIterator(Iterator* data_iter) { void TwoLevelIterator::SetSecondLevelIterator(Iterator* iter) {
if (data_iter_.iter() != nullptr) SaveError(data_iter_.status()); if (second_level_iter_.iter() != nullptr) {
data_iter_.Set(data_iter); SaveError(second_level_iter_.status());
}
second_level_iter_.Set(iter);
} }
void TwoLevelIterator::InitDataBlock() { void TwoLevelIterator::InitDataBlock() {
if (!index_iter_.Valid()) { if (!first_level_iter_.Valid()) {
SetDataIterator(nullptr); SetSecondLevelIterator(nullptr);
} else { } else {
Slice handle = index_iter_.value(); Slice handle = first_level_iter_.value();
if (data_iter_.iter() != nullptr if (second_level_iter_.iter() != nullptr
&& handle.compare(data_block_handle_) == 0) { && handle.compare(data_block_handle_) == 0) {
// data_iter_ is already constructed with this iterator, so // second_level_iter is already constructed with this iterator, so
// no need to change anything // no need to change anything
} else { } else {
Iterator* iter = Iterator* iter = state_->NewSecondaryIterator(handle);
(*block_function_)(arg_, options_, soptions_, internal_comparator_,
handle, for_compaction_);
data_block_handle_.assign(handle.data(), handle.size()); data_block_handle_.assign(handle.data(), handle.size());
SetDataIterator(iter); SetSecondLevelIterator(iter);
} }
} }
} }
} // namespace } // namespace
Iterator* NewTwoLevelIterator(Iterator* index_iter, Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
BlockFunction block_function, void* arg, Iterator* first_level_iter) {
const ReadOptions& options, return new TwoLevelIterator(state, first_level_iter);
const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
bool for_compaction) {
return new TwoLevelIterator(index_iter, block_function, arg, options,
soptions, internal_comparator, for_compaction);
} }
} // namespace rocksdb } // namespace rocksdb

@ -10,12 +10,25 @@
#pragma once #pragma once
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "table/iterator_wrapper.h"
namespace rocksdb { namespace rocksdb {
struct ReadOptions; struct ReadOptions;
class InternalKeyComparator; class InternalKeyComparator;
struct TwoLevelIteratorState {
explicit TwoLevelIteratorState(bool prefix_enabled)
: prefix_enabled(prefix_enabled) {}
virtual ~TwoLevelIteratorState() {}
virtual Iterator* NewSecondaryIterator(const Slice& handle) = 0;
virtual bool PrefixMayMatch(const Slice& internal_key) = 0;
bool prefix_enabled;
};
// Return a new two level iterator. A two-level iterator contains an // Return a new two level iterator. A two-level iterator contains an
// index iterator whose values point to a sequence of blocks where // index iterator whose values point to a sequence of blocks where
// each block is itself a sequence of key,value pairs. The returned // each block is itself a sequence of key,value pairs. The returned
@ -25,14 +38,7 @@ class InternalKeyComparator;
// //
// Uses a supplied function to convert an index_iter value into // Uses a supplied function to convert an index_iter value into
// an iterator over the contents of the corresponding block. // an iterator over the contents of the corresponding block.
extern Iterator* NewTwoLevelIterator( extern Iterator* NewTwoLevelIterator(TwoLevelIteratorState* state,
Iterator* index_iter, Iterator* first_level_iter);
Iterator* (*block_function)(
void* arg, const ReadOptions& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
const Slice& index_value, bool for_compaction),
void* arg, const ReadOptions& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
bool for_compaction = false);
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save