From 8f09d53fd11a7debe1e48b73a192de3a458d37bf Mon Sep 17 00:00:00 2001 From: Feng Zhu Date: Wed, 30 Jul 2014 16:34:35 -0700 Subject: [PATCH] remove malloc when create data and index iterator in Get MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Define Block::Iter to be an independent class to be used by block_based_table_reader When creating data and index iterator, update an existing iterator rather than new one Thus malloc and free could be reduced Benchmark, Base: commit 76286ee67ef4b89579a92134b996a681c36a1331 commands: --db=/dev/shm/rocksdb --num_levels=6 --key_size=20 --prefix_size=20 --keys_per_prefix=0 --value_size=100 --write_buffer_size=134217728 --max_write_buffer_number=2 --target_file_size_base=33554432 --max_bytes_for_level_base=1073741824 --verify_checksum=false --max_background_compactions=4 --use_plain_table=0 --memtablerep=prefix_hash --open_files=-1 --mmap_read=1 --mmap_write=0 --bloom_bits=10 --bloom_locality=1 --memtable_bloom_bits=500000 --compression_type=lz4 --num=2621440 --use_hash_search=1 --block_size=1024 --block_restart_interval=1 --use_existing_db=1 --threads=1 --benchmarks=readrandom —disable_auto_compactions=1 malloc: 3.30% -> 1.42% free: 3.59%->1.61% Test Plan: make all check run db_stress valgrind ./db_test ./table_test Reviewers: ljin, yhchiang, dhruba, igor, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D20655 --- table/block.cc | 475 ++++++++++++++---------------- table/block.h | 128 +++++++- table/block_based_table_reader.cc | 94 ++++-- table/block_based_table_reader.h | 10 +- 4 files changed, 416 insertions(+), 291 deletions(-) diff --git a/table/block.cc b/table/block.cc index 56d71c47b..43b1ee382 100644 --- a/table/block.cc +++ b/table/block.cc @@ -17,44 +17,14 @@ #include #include "rocksdb/comparator.h" +#include "table/format.h" #include "table/block_hash_index.h" #include "table/block_prefix_index.h" -#include "table/format.h" #include "util/coding.h" #include "util/logging.h" -#include "db/dbformat.h" namespace rocksdb { -uint32_t Block::NumRestarts() const { - assert(size_ >= 2*sizeof(uint32_t)); - return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); -} - -Block::Block(const BlockContents& contents) - : data_(contents.data.data()), - size_(contents.data.size()), - owned_(contents.heap_allocated), - cachable_(contents.cachable), - compression_type_(contents.compression_type) { - if (size_ < sizeof(uint32_t)) { - size_ = 0; // Error marker - } else { - restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t); - if (restart_offset_ > size_ - sizeof(uint32_t)) { - // The size is too small for NumRestarts() and therefore - // restart_offset_ wrapped around. - size_ = 0; - } - } -} - -Block::~Block() { - if (owned_) { - delete[] data_; - } -} - // Helper routine: decode the next block entry starting at "p", // storing the number of shared key bytes, non_shared key bytes, // and the length of the value in "*shared", "*non_shared", and @@ -85,142 +55,85 @@ static inline const char* DecodeEntry(const char* p, const char* limit, return p; } -class Block::Iter : public Iterator { - private: - const Comparator* const comparator_; - const char* const data_; // underlying block contents - uint32_t const restarts_; // Offset of restart array (list of fixed32) - uint32_t const num_restarts_; // Number of uint32_t entries in restart array - - // current_ is offset in data_ of current entry. >= restarts_ if !Valid - uint32_t current_; - uint32_t restart_index_; // Index of restart block in which current_ falls - IterKey key_; - Slice value_; - Status status_; - BlockHashIndex* hash_index_; - BlockPrefixIndex* prefix_index_; - - inline int Compare(const Slice& a, const Slice& b) const { - return comparator_->Compare(a, b); - } - - // Return the offset in data_ just past the end of the current entry. - inline uint32_t NextEntryOffset() const { - return (value_.data() + value_.size()) - data_; - } - - uint32_t GetRestartPoint(uint32_t index) { - assert(index < num_restarts_); - return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t)); - } +void BlockIter::Next() { + assert(Valid()); + ParseNextKey(); +} - void SeekToRestartPoint(uint32_t index) { - key_.Clear(); - restart_index_ = index; - // current_ will be fixed by ParseNextKey(); +void BlockIter::Prev() { + assert(Valid()); - // ParseNextKey() starts at the end of value_, so set value_ accordingly - uint32_t offset = GetRestartPoint(index); - value_ = Slice(data_ + offset, 0); + // Scan backwards to a restart point before current_ + const uint32_t original = current_; + while (GetRestartPoint(restart_index_) >= original) { + if (restart_index_ == 0) { + // No more entries + current_ = restarts_; + restart_index_ = num_restarts_; + return; + } + restart_index_--; } - public: - Iter(const Comparator* comparator, const char* data, uint32_t restarts, - uint32_t num_restarts, BlockHashIndex* hash_index, - BlockPrefixIndex* prefix_index) - : comparator_(comparator), - data_(data), - restarts_(restarts), - num_restarts_(num_restarts), - current_(restarts_), - restart_index_(num_restarts_), - hash_index_(hash_index), - prefix_index_(prefix_index) { - assert(num_restarts_ > 0); - } + SeekToRestartPoint(restart_index_); + do { + // Loop until end of current entry hits the start of original entry + } while (ParseNextKey() && NextEntryOffset() < original); +} - virtual bool Valid() const { return current_ < restarts_; } - virtual Status status() const { return status_; } - virtual Slice key() const { - assert(Valid()); - return key_.GetKey(); +void BlockIter::Seek(const Slice& target) { + if (data_ == nullptr) { // Not init yet + return; } - virtual Slice value() const { - assert(Valid()); - return value_; - } - - virtual void Next() { - assert(Valid()); - ParseNextKey(); + uint32_t index = 0; + bool ok = false; + if (prefix_index_) { + ok = PrefixSeek(target, &index); + } else { + ok = hash_index_ ? HashSeek(target, &index) + : BinarySeek(target, 0, num_restarts_ - 1, &index); } - virtual void Prev() { - assert(Valid()); - - // Scan backwards to a restart point before current_ - const uint32_t original = current_; - while (GetRestartPoint(restart_index_) >= original) { - if (restart_index_ == 0) { - // No more entries - current_ = restarts_; - restart_index_ = num_restarts_; - return; - } - restart_index_--; - } - - SeekToRestartPoint(restart_index_); - do { - // Loop until end of current entry hits the start of original entry - } while (ParseNextKey() && NextEntryOffset() < original); + if (!ok) { + return; } + SeekToRestartPoint(index); + // Linear search (within restart block) for first key >= target - virtual void Seek(const Slice& target) { - uint32_t index = 0; - bool ok = false; - if (prefix_index_) { - ok = PrefixSeek(target, &index); - } else { - ok = hash_index_ ? HashSeek(target, &index) - : BinarySeek(target, 0, num_restarts_ - 1, &index); - } - - if (!ok) { + while (true) { + if (!ParseNextKey() || Compare(key_.GetKey(), target) >= 0) { return; } - SeekToRestartPoint(index); - // Linear search (within restart block) for first key >= target - - while (true) { - if (!ParseNextKey() || Compare(key_.GetKey(), target) >= 0) { - return; - } - } - } - virtual void SeekToFirst() { - SeekToRestartPoint(0); - ParseNextKey(); } +} - virtual void SeekToLast() { - SeekToRestartPoint(num_restarts_ - 1); - while (ParseNextKey() && NextEntryOffset() < restarts_) { - // Keep skipping - } +void BlockIter::SeekToFirst() { + if (data_ == nullptr) { // Not init yet + return; } + SeekToRestartPoint(0); + ParseNextKey(); +} - private: - void CorruptionError() { - current_ = restarts_; - restart_index_ = num_restarts_; - status_ = Status::Corruption("bad entry in block"); - key_.Clear(); - value_.clear(); +void BlockIter::SeekToLast() { + if (data_ == nullptr) { // Not init yet + return; + } + SeekToRestartPoint(num_restarts_ - 1); + while (ParseNextKey() && NextEntryOffset() < restarts_) { + // Keep skipping } +} + +void BlockIter::CorruptionError() { + current_ = restarts_; + restart_index_ = num_restarts_; + status_ = Status::Corruption("bad entry in block"); + key_.Clear(); + value_.clear(); +} - bool ParseNextKey() { +bool BlockIter::ParseNextKey() { current_ = NextEntryOffset(); const char* p = data_ + current_; const char* limit = data_ + restarts_; // Restarts come right after data @@ -248,150 +161,194 @@ class Block::Iter : public Iterator { } } - // Binary search in restart array to find the first restart point - // with a key >= target (TODO: this comment is inaccurate) - bool BinarySeek(const Slice& target, uint32_t left, uint32_t right, +// Binary search in restart array to find the first restart point +// with a key >= target (TODO: this comment is inaccurate) +bool BlockIter::BinarySeek(const Slice& target, uint32_t left, uint32_t right, uint32_t* index) { - assert(left <= right); - - while (left < right) { - uint32_t mid = (left + right + 1) / 2; - uint32_t region_offset = GetRestartPoint(mid); - uint32_t shared, non_shared, value_length; - const char* key_ptr = - DecodeEntry(data_ + region_offset, data_ + restarts_, &shared, - &non_shared, &value_length); - if (key_ptr == nullptr || (shared != 0)) { - CorruptionError(); - return false; - } - Slice mid_key(key_ptr, non_shared); - int cmp = Compare(mid_key, target); - if (cmp < 0) { - // Key at "mid" is smaller than "target". Therefore all - // blocks before "mid" are uninteresting. - left = mid; - } else if (cmp > 0) { - // Key at "mid" is >= "target". Therefore all blocks at or - // after "mid" are uninteresting. - right = mid - 1; - } else { - left = right = mid; - } - } + assert(left <= right); - *index = left; - return true; - } - - // Compare target key and the block key of the block of `block_index`. - // Return -1 if error. - int CompareBlockKey(uint32_t block_index, const Slice& target) { - uint32_t region_offset = GetRestartPoint(block_index); + while (left < right) { + uint32_t mid = (left + right + 1) / 2; + uint32_t region_offset = GetRestartPoint(mid); uint32_t shared, non_shared, value_length; - const char* key_ptr = DecodeEntry(data_ + region_offset, data_ + restarts_, - &shared, &non_shared, &value_length); + const char* key_ptr = + DecodeEntry(data_ + region_offset, data_ + restarts_, &shared, + &non_shared, &value_length); if (key_ptr == nullptr || (shared != 0)) { CorruptionError(); - return 1; // Return target is smaller + return false; + } + Slice mid_key(key_ptr, non_shared); + int cmp = Compare(mid_key, target); + if (cmp < 0) { + // Key at "mid" is smaller than "target". Therefore all + // blocks before "mid" are uninteresting. + left = mid; + } else if (cmp > 0) { + // Key at "mid" is >= "target". Therefore all blocks at or + // after "mid" are uninteresting. + right = mid - 1; + } else { + left = right = mid; } - Slice block_key(key_ptr, non_shared); - return Compare(block_key, target); } - // Binary search in block_ids to find the first block - // with a key >= target - bool BinaryBlockIndexSeek(const Slice& target, uint32_t* block_ids, - uint32_t left, uint32_t right, - uint32_t* index) { - assert(left <= right); - uint32_t left_bound = left; + *index = left; + return true; +} - while (left <= right) { - uint32_t mid = (left + right) / 2; +// Compare target key and the block key of the block of `block_index`. +// Return -1 if error. +int BlockIter::CompareBlockKey(uint32_t block_index, const Slice& target) { + uint32_t region_offset = GetRestartPoint(block_index); + uint32_t shared, non_shared, value_length; + const char* key_ptr = DecodeEntry(data_ + region_offset, data_ + restarts_, + &shared, &non_shared, &value_length); + if (key_ptr == nullptr || (shared != 0)) { + CorruptionError(); + return 1; // Return target is smaller + } + Slice block_key(key_ptr, non_shared); + return Compare(block_key, target); +} - int cmp = CompareBlockKey(block_ids[mid], target); - if (!status_.ok()) { - return false; - } - if (cmp < 0) { - // Key at "target" is larger than "mid". Therefore all - // blocks before or at "mid" are uninteresting. - left = mid + 1; - } else { - // Key at "target" is <= "mid". Therefore all blocks - // after "mid" are uninteresting. - // If there is only one block left, we found it. - if (left == right) break; - right = mid; - } - } +// Binary search in block_ids to find the first block +// with a key >= target +bool BlockIter::BinaryBlockIndexSeek(const Slice& target, uint32_t* block_ids, + uint32_t left, uint32_t right, + uint32_t* index) { + assert(left <= right); + uint32_t left_bound = left; - if (left == right) { - // In one of the two following cases: - // (1) left is the first one of block_ids - // (2) there is a gap of blocks between block of `left` and `left-1`. - // we can further distinguish the case of key in the block or key not - // existing, by comparing the target key and the key of the previous - // block to the left of the block found. - if (block_ids[left] > 0 && - (left == left_bound || block_ids[left - 1] != block_ids[left] - 1) && - CompareBlockKey(block_ids[left] - 1, target) > 0) { - current_ = restarts_; - return false; - } + while (left <= right) { + uint32_t mid = (left + right) / 2; - *index = block_ids[left]; - return true; - } else { - assert(left > right); - // Mark iterator invalid - current_ = restarts_; + int cmp = CompareBlockKey(block_ids[mid], target); + if (!status_.ok()) { return false; } + if (cmp < 0) { + // Key at "target" is larger than "mid". Therefore all + // blocks before or at "mid" are uninteresting. + left = mid + 1; + } else { + // Key at "target" is <= "mid". Therefore all blocks + // after "mid" are uninteresting. + // If there is only one block left, we found it. + if (left == right) break; + right = mid; + } } - bool HashSeek(const Slice& target, uint32_t* index) { - assert(hash_index_); - auto restart_index = hash_index_->GetRestartIndex(target); - if (restart_index == nullptr) { + if (left == right) { + // In one of the two following cases: + // (1) left is the first one of block_ids + // (2) there is a gap of blocks between block of `left` and `left-1`. + // we can further distinguish the case of key in the block or key not + // existing, by comparing the target key and the key of the previous + // block to the left of the block found. + if (block_ids[left] > 0 && + (left == left_bound || block_ids[left - 1] != block_ids[left] - 1) && + CompareBlockKey(block_ids[left] - 1, target) > 0) { current_ = restarts_; return false; } - // the elements in restart_array[index : index + num_blocks] - // are all with same prefix. We'll do binary search in that small range. - auto left = restart_index->first_index; - auto right = restart_index->first_index + restart_index->num_blocks - 1; - return BinarySeek(target, left, right, index); + *index = block_ids[left]; + return true; + } else { + assert(left > right); + // Mark iterator invalid + current_ = restarts_; + return false; + } +} + +bool BlockIter::HashSeek(const Slice& target, uint32_t* index) { + assert(hash_index_); + auto restart_index = hash_index_->GetRestartIndex(target); + if (restart_index == nullptr) { + current_ = restarts_; + return false; } - bool PrefixSeek(const Slice& target, uint32_t* index) { - assert(prefix_index_); - uint32_t* block_ids = nullptr; - uint32_t num_blocks = prefix_index_->GetBlocks(target, &block_ids); + // the elements in restart_array[index : index + num_blocks] + // are all with same prefix. We'll do binary search in that small range. + auto left = restart_index->first_index; + auto right = restart_index->first_index + restart_index->num_blocks - 1; + return BinarySeek(target, left, right, index); +} +bool BlockIter::PrefixSeek(const Slice& target, uint32_t* index) { + assert(prefix_index_); + uint32_t* block_ids = nullptr; + uint32_t num_blocks = prefix_index_->GetBlocks(target, &block_ids); - if (num_blocks == 0) { - current_ = restarts_; - return false; - } else { - return BinaryBlockIndexSeek(target, block_ids, 0, num_blocks - 1, index); + if (num_blocks == 0) { + current_ = restarts_; + return false; + } else { + return BinaryBlockIndexSeek(target, block_ids, 0, num_blocks - 1, index); + } +} + +uint32_t Block::NumRestarts() const { + assert(size_ >= 2*sizeof(uint32_t)); + return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); +} + +Block::Block(const BlockContents& contents) + : data_(contents.data.data()), + size_(contents.data.size()), + owned_(contents.heap_allocated), + cachable_(contents.cachable), + compression_type_(contents.compression_type) { + if (size_ < sizeof(uint32_t)) { + size_ = 0; // Error marker + } else { + restart_offset_ = size_ - (1 + NumRestarts()) * sizeof(uint32_t); + if (restart_offset_ > size_ - sizeof(uint32_t)) { + // The size is too small for NumRestarts() and therefore + // restart_offset_ wrapped around. + size_ = 0; } } -}; +} + +Block::~Block() { + if (owned_) { + delete[] data_; + } +} -Iterator* Block::NewIterator(const Comparator* cmp) { +Iterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter) { if (size_ < 2*sizeof(uint32_t)) { - return NewErrorIterator(Status::Corruption("bad block contents")); + if (iter != nullptr) { + iter->SetStatus(Status::Corruption("bad block contents")); + return iter; + } else { + return NewErrorIterator(Status::Corruption("bad block contents")); + } } const uint32_t num_restarts = NumRestarts(); if (num_restarts == 0) { - return NewEmptyIterator(); + if (iter != nullptr) { + iter->SetStatus(Status::OK()); + return iter; + } else { + return NewEmptyIterator(); + } } else { - return new Iter(cmp, data_, restart_offset_, num_restarts, + if (iter != nullptr) { + iter->Initialize(cmp, data_, restart_offset_, num_restarts, hash_index_.get(), prefix_index_.get()); + } else { + iter = new BlockIter(cmp, data_, restart_offset_, num_restarts, + hash_index_.get(), prefix_index_.get()); + } } + + return iter; } void Block::SetBlockHashIndex(BlockHashIndex* hash_index) { diff --git a/table/block.h b/table/block.h index 367b525a0..e63c44b6a 100644 --- a/table/block.h +++ b/table/block.h @@ -13,11 +13,13 @@ #include "rocksdb/iterator.h" #include "rocksdb/options.h" +#include "db/dbformat.h" namespace rocksdb { struct BlockContents; class Comparator; +class BlockIter; class BlockHashIndex; class BlockPrefixIndex; @@ -40,7 +42,11 @@ class Block { // NOTE: for the hash based lookup, if a key prefix doesn't match any key, // the iterator will simply be set as "invalid", rather than returning // the key that is just pass the target key. - Iterator* NewIterator(const Comparator* comparator); + // + // If iter is null, return new Iterator + // If iter is not null, update this one and return it as Iterator* + Iterator* NewIterator(const Comparator* comparator, + BlockIter* iter = nullptr); void SetBlockHashIndex(BlockHashIndex* hash_index); void SetBlockPrefixIndex(BlockPrefixIndex* prefix_index); @@ -57,8 +63,126 @@ class Block { // No copying allowed Block(const Block&); void operator=(const Block&); +}; + +class BlockIter : public Iterator { + public: + BlockIter() + : comparator_(nullptr), + data_(nullptr), + restarts_(0), + num_restarts_(0), + current_(0), + restart_index_(0), + status_(Status::OK()), + hash_index_(nullptr), + prefix_index_(nullptr) {} + + BlockIter(const Comparator* comparator, const char* data, uint32_t restarts, + uint32_t num_restarts, BlockHashIndex* hash_index, + BlockPrefixIndex* prefix_index) + : BlockIter() { + Initialize(comparator, data, restarts, num_restarts, + hash_index, prefix_index); + } + + void Initialize(const Comparator* comparator, const char* data, + uint32_t restarts, uint32_t num_restarts, BlockHashIndex* hash_index, + BlockPrefixIndex* prefix_index) { + assert(data_ == nullptr); // Ensure it is called only once + assert(num_restarts > 0); // Ensure the param is valid + + comparator_ = comparator; + data_ = data; + restarts_ = restarts; + num_restarts_ = num_restarts; + current_ = restarts_; + restart_index_ = num_restarts_; + hash_index_ = hash_index; + prefix_index_ = prefix_index; + } + + void SetStatus(Status s) { + status_ = s; + } + + virtual bool Valid() const override { return current_ < restarts_; } + virtual Status status() const override { return status_; } + virtual Slice key() const override { + assert(Valid()); + return key_.GetKey(); + } + virtual Slice value() const override { + assert(Valid()); + return value_; + } + + virtual void Next() override; + + virtual void Prev() override; + + virtual void Seek(const Slice& target) override; + + virtual void SeekToFirst() override; + + virtual void SeekToLast() override; + + private: + const Comparator* comparator_; + const char* data_; // underlying block contents + uint32_t restarts_; // Offset of restart array (list of fixed32) + uint32_t num_restarts_; // Number of uint32_t entries in restart array + + // current_ is offset in data_ of current entry. >= restarts_ if !Valid + uint32_t current_; + uint32_t restart_index_; // Index of restart block in which current_ falls + IterKey key_; + Slice value_; + Status status_; + BlockHashIndex* hash_index_; + BlockPrefixIndex* prefix_index_; + + inline int Compare(const Slice& a, const Slice& b) const { + return comparator_->Compare(a, b); + } + + // Return the offset in data_ just past the end of the current entry. + inline uint32_t NextEntryOffset() const { + return (value_.data() + value_.size()) - data_; + } + + uint32_t GetRestartPoint(uint32_t index) { + assert(index < num_restarts_); + return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t)); + } + + void SeekToRestartPoint(uint32_t index) { + key_.Clear(); + restart_index_ = index; + // current_ will be fixed by ParseNextKey(); + + // ParseNextKey() starts at the end of value_, so set value_ accordingly + uint32_t offset = GetRestartPoint(index); + value_ = Slice(data_ + offset, 0); + } + + void CorruptionError(); + + bool ParseNextKey(); + + bool BinarySeek(const Slice& target, uint32_t left, uint32_t right, + uint32_t* index); + + int CompareBlockKey(uint32_t block_index, const Slice& target); + + bool BinaryBlockIndexSeek(const Slice& target, uint32_t* block_ids, + uint32_t left, uint32_t right, + uint32_t* index); + + bool HashSeek(const Slice& target, uint32_t* index); + + bool PrefixSeek(const Slice& target, uint32_t* index); - class Iter; }; } // namespace rocksdb diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 571fedab2..1344c04a0 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -135,7 +135,9 @@ class BlockBasedTable::IndexReader { virtual ~IndexReader() {} // Create an iterator for index access. - virtual Iterator* NewIterator() = 0; + // An iter is passed in, if it is not null, update this one and return it + // If it is null, create a new Iterator + virtual Iterator* NewIterator(BlockIter* iter = nullptr) = 0; // The size of the index. virtual size_t size() const = 0; @@ -168,8 +170,8 @@ class BinarySearchIndexReader : public IndexReader { return s; } - virtual Iterator* NewIterator() override { - return index_block_->NewIterator(comparator_); + virtual Iterator* NewIterator(BlockIter* iter = nullptr) override { + return index_block_->NewIterator(comparator_, iter); } virtual size_t size() const override { return index_block_->size(); } @@ -284,8 +286,8 @@ class HashIndexReader : public IndexReader { return Status::OK(); } - virtual Iterator* NewIterator() override { - return index_block_->NewIterator(comparator_); + virtual Iterator* NewIterator(BlockIter* iter = nullptr) override { + return index_block_->NewIterator(comparator_, iter); } virtual size_t size() const override { return index_block_->size(); } @@ -779,10 +781,11 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( return { filter, cache_handle }; } -Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) { +Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options, + BlockIter* input_iter) { // index reader has already been pre-populated. if (rep_->index_reader) { - return rep_->index_reader->NewIterator(); + return rep_->index_reader->NewIterator(input_iter); } bool no_io = read_options.read_tier == kBlockCacheTier; @@ -796,7 +799,12 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) { BLOCK_CACHE_INDEX_HIT, statistics); if (cache_handle == nullptr && no_io) { - return NewErrorIterator(Status::Incomplete("no blocking io")); + if (input_iter != nullptr) { + input_iter->SetStatus(Status::Incomplete("no blocking io")); + return input_iter; + } else { + return NewErrorIterator(Status::Incomplete("no blocking io")); + } } IndexReader* index_reader = nullptr; @@ -811,7 +819,12 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) { if (!s.ok()) { // make sure if something goes wrong, index_reader shall remain intact. assert(index_reader == nullptr); - return NewErrorIterator(s); + if (input_iter != nullptr) { + input_iter->SetStatus(s); + return input_iter; + } else { + return NewErrorIterator(s); + } } cache_handle = block_cache->Insert(key, index_reader, index_reader->size(), @@ -820,7 +833,8 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) { } assert(cache_handle); - auto iter = index_reader->NewIterator(); + Iterator* iter; + iter = index_reader->NewIterator(input_iter); iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); return iter; @@ -828,8 +842,11 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options) { // Convert an index iterator value (i.e., an encoded BlockHandle) // into an iterator over the contents of the corresponding block. +// If input_iter is null, new a iterator +// If input_iter is not null, update this iter and return it Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep, - const ReadOptions& ro, const Slice& index_value) { + const ReadOptions& ro, const Slice& index_value, + BlockIter* input_iter) { const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep->options.block_cache.get(); Cache* block_cache_compressed = rep->options. @@ -843,7 +860,12 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep, Status s = handle.DecodeFrom(&input); if (!s.ok()) { - return NewErrorIterator(s); + if (input_iter != nullptr) { + input_iter->SetStatus(s); + return input_iter; + } else { + return NewErrorIterator(s); + } } // If either block cache is enabled, we'll try to read from it. @@ -889,7 +911,12 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep, if (block.value == nullptr) { if (no_io) { // Could not read from block_cache and can't do IO - return NewErrorIterator(Status::Incomplete("no blocking io")); + if (input_iter != nullptr) { + input_iter->SetStatus(Status::Incomplete("no blocking io")); + return input_iter; + } else { + return NewErrorIterator(Status::Incomplete("no blocking io")); + } } s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, &block.value, rep->options.env); @@ -897,15 +924,20 @@ Iterator* BlockBasedTable::NewDataBlockIterator(Rep* rep, Iterator* iter; if (block.value != nullptr) { - iter = block.value->NewIterator(&rep->internal_comparator); + iter = block.value->NewIterator(&rep->internal_comparator, input_iter); if (block.cache_handle != nullptr) { iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, - block.cache_handle); + block.cache_handle); } else { iter->RegisterCleanup(&DeleteHeldResource, block.value, nullptr); } } else { - iter = NewErrorIterator(s); + if (input_iter != nullptr) { + input_iter->SetStatus(s); + iter = input_iter; + } else { + iter = NewErrorIterator(s); + } } return iter; } @@ -1023,12 +1055,14 @@ Status BlockBasedTable::Get( const Slice& v), void (*mark_key_may_exist_handler)(void* handle_context)) { Status s; - Iterator* iiter = NewIndexIterator(read_options); + BlockIter iiter; + NewIndexIterator(read_options, &iiter); + auto filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier); FilterBlockReader* filter = filter_entry.value; bool done = false; - for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { - Slice handle_value = iiter->value(); + for (iiter.Seek(key); iiter.Valid() && !done; iiter.Next()) { + Slice handle_value = iiter.value(); BlockHandle handle; bool may_not_exist_in_filter = @@ -1043,39 +1077,43 @@ Status BlockBasedTable::Get( RecordTick(rep_->options.statistics.get(), BLOOM_FILTER_USEFUL); break; } else { - unique_ptr block_iter( - NewDataBlockIterator(rep_, read_options, iiter->value())); + BlockIter biter; + NewDataBlockIterator(rep_, read_options, iiter.value(), &biter); - if (read_options.read_tier && block_iter->status().IsIncomplete()) { + if (read_options.read_tier && biter.status().IsIncomplete()) { // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for whether // we can guarantee the key is not there when "no_io" is set (*mark_key_may_exist_handler)(handle_context); break; } + if (!biter.status().ok()) { + s = biter.status(); + break; + } // Call the *saver function on each entry/block until it returns false - for (block_iter->Seek(key); block_iter->Valid(); block_iter->Next()) { + for (biter.Seek(key); biter.Valid(); biter.Next()) { ParsedInternalKey parsed_key; - if (!ParseInternalKey(block_iter->key(), &parsed_key)) { + if (!ParseInternalKey(biter.key(), &parsed_key)) { s = Status::Corruption(Slice()); } if (!(*result_handler)(handle_context, parsed_key, - block_iter->value())) { + biter.value())) { done = true; break; } } - s = block_iter->status(); + s = biter.status(); } } filter_entry.Release(rep_->options.block_cache.get()); if (s.ok()) { - s = iiter->status(); + s = iiter.status(); } - delete iiter; + return s; } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 072bedf3b..da15adbd0 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -23,6 +23,7 @@ namespace rocksdb { class Block; +class BlockIter; class BlockHandle; class Cache; class FilterBlockReader; @@ -111,8 +112,10 @@ class BlockBasedTable : public TableReader { bool compaction_optimized_; class BlockEntryIteratorState; + // input_iter: if it is not null, update this one and return it as Iterator static Iterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, - const Slice& index_value); + const Slice& index_value, + BlockIter* input_iter = nullptr); // For the following two functions: // if `no_io == true`, we will not try to read filter/index from sst file @@ -120,6 +123,8 @@ class BlockBasedTable : public TableReader { CachableEntry GetFilter(bool no_io = false) const; // Get the iterator from the index reader. + // If input_iter is not set, return new Iterator + // If input_iter is set, update it and return it as Iterator // // Note: ErrorIterator with Status::Incomplete shall be returned if all the // following conditions are met: @@ -127,7 +132,8 @@ class BlockBasedTable : public TableReader { // 2. index is not present in block cache. // 3. We disallowed any io to be performed, that is, read_options == // kBlockCacheTier - Iterator* NewIndexIterator(const ReadOptions& read_options); + Iterator* NewIndexIterator(const ReadOptions& read_options, + BlockIter* input_iter = nullptr); // Read block cache from block caches (if set): block_cache and // block_cache_compressed.