diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index a2db7772a..c9e931951 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -383,14 +383,13 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, if (!l0_iters_[i]->status().ok()) { immutable_status_ = l0_iters_[i]->status(); - } else if (l0_iters_[i]->Valid()) { - if (!IsOverUpperBound(l0_iters_[i]->key())) { - immutable_min_heap_.push(l0_iters_[i]); - } else { - has_iter_trimmed_for_upper_bound_ = true; - DeleteIterator(l0_iters_[i]); - l0_iters_[i] = nullptr; - } + } else if (l0_iters_[i]->Valid() && + !IsOverUpperBound(l0_iters_[i]->key())) { + immutable_min_heap_.push(l0_iters_[i]); + } else { + has_iter_trimmed_for_upper_bound_ = true; + DeleteIterator(l0_iters_[i]); + l0_iters_[i] = nullptr; } } @@ -417,15 +416,14 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, if (!level_iters_[level - 1]->status().ok()) { immutable_status_ = level_iters_[level - 1]->status(); - } else if (level_iters_[level - 1]->Valid()) { - if (!IsOverUpperBound(level_iters_[level - 1]->key())) { - immutable_min_heap_.push(level_iters_[level - 1]); - } else { - // Nothing in this level is interesting. Remove. - has_iter_trimmed_for_upper_bound_ = true; - DeleteIterator(level_iters_[level - 1]); - level_iters_[level - 1] = nullptr; - } + } else if (level_iters_[level - 1]->Valid() && + !IsOverUpperBound(level_iters_[level - 1]->key())) { + immutable_min_heap_.push(level_iters_[level - 1]); + } else { + // Nothing in this level is interesting. Remove. + has_iter_trimmed_for_upper_bound_ = true; + DeleteIterator(level_iters_[level - 1]); + level_iters_[level - 1] = nullptr; } } } diff --git a/db/version_set.cc b/db/version_set.cc index 4af335590..0de142740 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -424,129 +424,261 @@ bool SomeFileOverlapsRange( } namespace { - -// An internal iterator. For a given version/level pair, yields -// information about the files in the level. For a given entry, key() -// is the largest key that occurs in the file, and value() is an -// 16-byte value containing the file number and file size, both -// encoded using EncodeFixed64. -class LevelFileNumIterator : public InternalIterator { +class LevelIterator final : public InternalIterator { public: - LevelFileNumIterator(const InternalKeyComparator& icmp, - const LevelFilesBrief* flevel, bool should_sample) - : icmp_(icmp), + LevelIterator(TableCache* table_cache, const ReadOptions& read_options, + const EnvOptions& env_options, + const InternalKeyComparator& icomparator, + const LevelFilesBrief* flevel, bool should_sample, + HistogramImpl* file_read_hist, bool for_compaction, + bool skip_filters, int level, RangeDelAggregator* range_del_agg) + : table_cache_(table_cache), + read_options_(read_options), + env_options_(env_options), + icomparator_(icomparator), flevel_(flevel), - index_(static_cast(flevel->num_files)), - current_value_(0, 0, 0), // Marks as invalid - should_sample_(should_sample) {} - virtual bool Valid() const override { return index_ < flevel_->num_files; } - virtual void Seek(const Slice& target) override { - index_ = FindFile(icmp_, *flevel_, target); - } - virtual void SeekForPrev(const Slice& target) override { - SeekForPrevImpl(target, &icmp_); + file_read_hist_(file_read_hist), + should_sample_(should_sample), + for_compaction_(for_compaction), + skip_filters_(skip_filters), + file_index_(flevel_->num_files), + level_(level), + range_del_agg_(range_del_agg), + pinned_iters_mgr_(nullptr) { + // Empty level is not supported. + assert(flevel_ != nullptr && flevel_->num_files > 0); } - virtual void SeekToFirst() override { index_ = 0; } - virtual void SeekToLast() override { - index_ = (flevel_->num_files == 0) - ? 0 - : static_cast(flevel_->num_files) - 1; - } - virtual void Next() override { + virtual ~LevelIterator() { delete file_iter_.Set(nullptr); } + + virtual void Seek(const Slice& target) override; + virtual void SeekForPrev(const Slice& target) override; + virtual void SeekToFirst() override; + virtual void SeekToLast() override; + virtual void Next() override; + virtual void Prev() override; + + virtual bool Valid() const override { return file_iter_.Valid(); } + virtual Slice key() const override { assert(Valid()); - index_++; + return file_iter_.key(); } - virtual void Prev() override { + virtual Slice value() const override { assert(Valid()); - if (index_ == 0) { - index_ = static_cast(flevel_->num_files); // Marks as invalid - } else { - index_--; - } + return file_iter_.value(); } - Slice key() const override { - assert(Valid()); - return flevel_->files[index_].largest_key; + virtual Status status() const override { + // It'd be nice if status() returned a const Status& instead of a Status + if (!status_.ok()) { + return status_; + } else if (file_iter_.iter() != nullptr) { + return file_iter_.status(); + } + return Status::OK(); } - Slice value() const override { - assert(Valid()); - - auto file_meta = flevel_->files[index_]; - if (should_sample_) { - sample_file_read_inc(file_meta.file_metadata); + virtual void SetPinnedItersMgr( + PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + if (file_iter_.iter()) { + file_iter_.SetPinnedItersMgr(pinned_iters_mgr); } - current_value_ = file_meta.fd; - return Slice(reinterpret_cast(¤t_value_), - sizeof(FileDescriptor)); } - virtual Status status() const override { return Status::OK(); } + virtual bool IsKeyPinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + file_iter_.iter() && file_iter_.IsKeyPinned(); + } + virtual bool IsValuePinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + file_iter_.iter() && file_iter_.IsValuePinned(); + } private: - const InternalKeyComparator icmp_; - const LevelFilesBrief* flevel_; - uint32_t index_; - mutable FileDescriptor current_value_; - bool should_sample_; -}; + void SkipEmptyFileForward(); + void SkipEmptyFileBackward(); + void SetFileIterator(InternalIterator* iter); + void InitFileIterator(size_t new_file_index); -class LevelFileIteratorState : public TwoLevelIteratorState { - public: - // @param skip_filters Disables loading/accessing the filter block - LevelFileIteratorState(TableCache* table_cache, - const ReadOptions& read_options, - const EnvOptions& env_options, - const InternalKeyComparator& icomparator, - HistogramImpl* file_read_hist, bool for_compaction, - bool prefix_enabled, bool skip_filters, int level, - RangeDelAggregator* range_del_agg) - : TwoLevelIteratorState(prefix_enabled), - table_cache_(table_cache), - read_options_(read_options), - env_options_(env_options), - icomparator_(icomparator), - file_read_hist_(file_read_hist), - for_compaction_(for_compaction), - skip_filters_(skip_filters), - level_(level), - range_del_agg_(range_del_agg) {} - - InternalIterator* NewSecondaryIterator(const Slice& meta_handle) override { - if (meta_handle.size() != sizeof(FileDescriptor)) { - return NewErrorInternalIterator( - Status::Corruption("FileReader invoked with unexpected value")); - } - const FileDescriptor* fd = - reinterpret_cast(meta_handle.data()); - return table_cache_->NewIterator( - read_options_, env_options_, icomparator_, *fd, range_del_agg_, - nullptr /* don't need reference to table */, file_read_hist_, - for_compaction_, nullptr /* arena */, skip_filters_, level_); + const Slice& file_smallest_key(size_t file_index) { + assert(file_index < flevel_->num_files); + return flevel_->files[file_index].smallest_key; } - bool PrefixMayMatch(const Slice& internal_key) override { - return true; - } - - bool KeyReachedUpperBound(const Slice& internal_key) override { + bool KeyReachedUpperBound(const Slice& internal_key) { return read_options_.iterate_upper_bound != nullptr && icomparator_.user_comparator()->Compare( ExtractUserKey(internal_key), *read_options_.iterate_upper_bound) >= 0; } - private: + InternalIterator* NewFileIterator() { + assert(file_index_ < flevel_->num_files); + auto file_meta = flevel_->files[file_index_]; + if (should_sample_) { + sample_file_read_inc(file_meta.file_metadata); + } + + return table_cache_->NewIterator( + read_options_, env_options_, icomparator_, file_meta.fd, range_del_agg_, + nullptr /* don't need reference to table */, file_read_hist_, + for_compaction_, nullptr /* arena */, skip_filters_, level_); + } + TableCache* table_cache_; const ReadOptions read_options_; const EnvOptions& env_options_; const InternalKeyComparator& icomparator_; + const LevelFilesBrief* flevel_; + mutable FileDescriptor current_value_; + HistogramImpl* file_read_hist_; + bool should_sample_; bool for_compaction_; bool skip_filters_; + size_t file_index_; int level_; RangeDelAggregator* range_del_agg_; + IteratorWrapper file_iter_; // May be nullptr + PinnedIteratorsManager* pinned_iters_mgr_; + Status status_; }; +void LevelIterator::Seek(const Slice& target) { + size_t new_file_index = FindFile(icomparator_, *flevel_, target); + + InitFileIterator(new_file_index); + if (file_iter_.iter() != nullptr) { + file_iter_.Seek(target); + } + SkipEmptyFileForward(); +} + +void LevelIterator::SeekForPrev(const Slice& target) { + size_t new_file_index = FindFile(icomparator_, *flevel_, target); + if (new_file_index >= flevel_->num_files) { + new_file_index = flevel_->num_files - 1; + } + + InitFileIterator(new_file_index); + if (file_iter_.iter() != nullptr) { + file_iter_.SeekForPrev(target); + SkipEmptyFileBackward(); + } +} + +void LevelIterator::SeekToFirst() { + InitFileIterator(0); + if (file_iter_.iter() != nullptr) { + file_iter_.SeekToFirst(); + } + SkipEmptyFileForward(); +} + +void LevelIterator::SeekToLast() { + InitFileIterator(flevel_->num_files - 1); + if (file_iter_.iter() != nullptr) { + file_iter_.SeekToLast(); + } + SkipEmptyFileBackward(); +} + +void LevelIterator::Next() { + assert(Valid()); + file_iter_.Next(); + SkipEmptyFileForward(); +} + +void LevelIterator::Prev() { + assert(Valid()); + file_iter_.Prev(); + SkipEmptyFileBackward(); +} + +void LevelIterator::SkipEmptyFileForward() { + // For an error (IO error, checksum mismatch, etc), we skip the file + // and move to the next one and continue reading data. + // TODO this behavior is from LevelDB. We should revisit it. + while (file_iter_.iter() == nullptr || + (!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) { + if (file_iter_.iter() != nullptr && !file_iter_.Valid() && + file_iter_.iter()->IsOutOfBound()) { + return; + } + + // Move to next file + if (file_index_ >= flevel_->num_files - 1) { + // Already at the last file + SetFileIterator(nullptr); + return; + } + if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) { + SetFileIterator(nullptr); + return; + } + InitFileIterator(file_index_ + 1); + if (file_iter_.iter() != nullptr) { + file_iter_.SeekToFirst(); + } + } +} + +void LevelIterator::SkipEmptyFileBackward() { + while (file_iter_.iter() == nullptr || + (!file_iter_.Valid() && !file_iter_.status().IsIncomplete())) { + // Move to previous file + if (file_index_ == 0) { + // Already the first file + SetFileIterator(nullptr); + return; + } + InitFileIterator(file_index_ - 1); + if (file_iter_.iter() != nullptr) { + file_iter_.SeekToLast(); + } + } +} + +void LevelIterator::SetFileIterator(InternalIterator* iter) { + if (file_iter_.iter() != nullptr && status_.ok()) { + // TODO right now we don't invalidate the iterator even if the status is + // not OK. We should consider to do that so that it is harder for users to + // skip errors. + status_ = file_iter_.status(); + } + + if (pinned_iters_mgr_ && iter) { + iter->SetPinnedItersMgr(pinned_iters_mgr_); + } + + InternalIterator* old_iter = file_iter_.Set(iter); + if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { + pinned_iters_mgr_->PinIterator(old_iter); + } else { + delete old_iter; + } +} + +void LevelIterator::InitFileIterator(size_t new_file_index) { + if (new_file_index >= flevel_->num_files) { + file_index_ = new_file_index; + SetFileIterator(nullptr); + return; + } else { + // If the file iterator shows incomplete, we try it again if users seek + // to the same file, as this time we may go to a different data block + // which is cached in block cache. + // + if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() && + new_file_index == file_index_) { + // file_iter_ is already constructed with this iterator, so + // no need to change anything + } else { + file_index_ = new_file_index; + InternalIterator* iter = NewFileIterator(); + SetFileIterator(iter); + } + } +} + // A wrapper of version builder which references the current version in // constructor and unref it in the destructor. // Both of the constructor and destructor need to be called inside DB Mutex. @@ -854,24 +986,18 @@ void Version::AddIteratorsForLevel(const ReadOptions& read_options, sample_file_read_inc(meta); } } - } else { + } else if (storage_info_.LevelFilesBrief(level).num_files > 0) { // For levels > 0, we can use a concatenating iterator that sequentially // walks through the non-overlapping files in the level, opening them // lazily. - auto* mem = arena->AllocateAligned(sizeof(LevelFileIteratorState)); - auto* state = new (mem) - LevelFileIteratorState(cfd_->table_cache(), read_options, soptions, - cfd_->internal_comparator(), - cfd_->internal_stats()->GetFileReadHist(level), - false /* for_compaction */, - cfd_->ioptions()->prefix_extractor != nullptr, - IsFilterSkipped(level), level, range_del_agg); - mem = arena->AllocateAligned(sizeof(LevelFileNumIterator)); - auto* first_level_iter = new (mem) LevelFileNumIterator( + auto* mem = arena->AllocateAligned(sizeof(LevelIterator)); + merge_iter_builder->AddIterator(new (mem) LevelIterator( + cfd_->table_cache(), read_options, soptions, cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level), - should_sample_file_read()); - merge_iter_builder->AddIterator( - NewTwoLevelIterator(state, first_level_iter, arena, false)); + should_sample_file_read(), + cfd_->internal_stats()->GetFileReadHist(level), + false /* for_compaction */, IsFilterSkipped(level), level, + range_del_agg)); } } @@ -3732,17 +3858,13 @@ InternalIterator* VersionSet::MakeInputIterator( } } else { // Create concatenating iterator for the files from this level - list[num++] = NewTwoLevelIterator( - new LevelFileIteratorState( - cfd->table_cache(), read_options, env_options_compactions, - cfd->internal_comparator(), - nullptr /* no per level latency histogram */, - true /* for_compaction */, false /* prefix enabled */, - false /* skip_filters */, (int)which /* level */, - range_del_agg), - new LevelFileNumIterator(cfd->internal_comparator(), - c->input_levels(which), - false /* don't sample compaction */)); + list[num++] = new LevelIterator( + cfd->table_cache(), read_options, env_options_compactions, + cfd->internal_comparator(), c->input_levels(which), + false /* should_sample */, + nullptr /* no per level latency histogram */, + true /* for_compaction */, false /* skip_filters */, + (int)which /* level */, range_del_agg); } } } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 7c0ea6a1f..da9b856e4 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -210,13 +210,16 @@ class PartitionIndexReader : public IndexReader, public Cleanable { virtual InternalIterator* NewIterator(BlockIter* iter = nullptr, bool dont_care = true) override { // Filters are already checked before seeking the index - const bool skip_filters = true; - const bool is_index = true; - return NewTwoLevelIterator( - new BlockBasedTable::BlockEntryIteratorState( - table_, ReadOptions(), icomparator_, skip_filters, is_index, - partition_map_.size() ? &partition_map_ : nullptr), - index_block_->NewIterator(icomparator_, nullptr, true)); + if (!partition_map_.empty()) { + return NewTwoLevelIterator( + new BlockBasedTable::PartitionedIndexIteratorState( + table_, partition_map_.size() ? &partition_map_ : nullptr), + index_block_->NewIterator(icomparator_, nullptr, true)); + } else { + return new BlockBasedTableIterator( + table_, ReadOptions(), *icomparator_, + index_block_->NewIterator(icomparator_, nullptr, true), false); + } // TODO(myabandeh): Update TwoLevelIterator to be able to make use of // on-stack BlockIter while the state is on heap. Currentlly it assumes // the first level iter is always on heap and will attempt to delete it @@ -1632,91 +1635,37 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache( return s; } -BlockBasedTable::BlockEntryIteratorState::BlockEntryIteratorState( - BlockBasedTable* table, const ReadOptions& read_options, - const InternalKeyComparator* icomparator, bool skip_filters, bool is_index, +BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState( + BlockBasedTable* table, std::unordered_map>* block_map) - : TwoLevelIteratorState(table->rep_->ioptions.prefix_extractor != nullptr), - table_(table), - read_options_(read_options), - icomparator_(icomparator), - skip_filters_(skip_filters), - is_index_(is_index), - block_map_(block_map) {} + : table_(table), block_map_(block_map) {} -const size_t BlockBasedTable::BlockEntryIteratorState::kMaxReadaheadSize = - 256 * 1024; +const size_t BlockBasedTableIterator::kMaxReadaheadSize = 256 * 1024; InternalIterator* -BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator( +BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator( const Slice& index_value) { // Return a block iterator on the index partition BlockHandle handle; Slice input = index_value; Status s = handle.DecodeFrom(&input); - auto rep = table_->rep_; - if (block_map_) { - auto block = block_map_->find(handle.offset()); - // This is a possible scenario since block cache might not have had space - // for the partition - if (block != block_map_->end()) { - PERF_COUNTER_ADD(block_cache_hit_count, 1); - RecordTick(rep->ioptions.statistics, BLOCK_CACHE_INDEX_HIT); - RecordTick(rep->ioptions.statistics, BLOCK_CACHE_HIT); - Cache* block_cache = rep->table_options.block_cache.get(); - assert(block_cache); - RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ, - block_cache->GetUsage(block->second.cache_handle)); - return block->second.value->NewIterator( - &rep->internal_comparator, nullptr, true, rep->ioptions.statistics); - } - } - - // Automatically prefetch additional data when a range scan (iterator) does - // more than 2 sequential IOs. This is enabled only when - // ReadOptions.readahead_size is 0. - if (read_options_.readahead_size == 0) { - if (num_file_reads_ < 2) { - num_file_reads_++; - } else if (handle.offset() + static_cast(handle.size()) + - kBlockTrailerSize > - readahead_limit_) { - num_file_reads_++; - // Do not readahead more than kMaxReadaheadSize. - readahead_size_ = - std::min(BlockBasedTable::BlockEntryIteratorState::kMaxReadaheadSize, - readahead_size_); - table_->rep_->file->Prefetch(handle.offset(), readahead_size_); - readahead_limit_ = handle.offset() + readahead_size_; - // Keep exponentially increasing readahead size until kMaxReadaheadSize. - readahead_size_ *= 2; - } - } - - return NewDataBlockIterator(rep, read_options_, handle, - /* input_iter */ nullptr, is_index_, - /* get_context */ nullptr, s); -} - -bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch( - const Slice& internal_key) { - if (read_options_.total_order_seek || skip_filters_) { - return true; - } - return table_->PrefixMayMatch(internal_key); -} - -bool BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound( - const Slice& internal_key) { - bool reached_upper_bound = read_options_.iterate_upper_bound != nullptr && - icomparator_ != nullptr && - icomparator_->user_comparator()->Compare( - ExtractUserKey(internal_key), - *read_options_.iterate_upper_bound) >= 0; - TEST_SYNC_POINT_CALLBACK( - "BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound", - &reached_upper_bound); - return reached_upper_bound; + auto rep = table_->get_rep(); + auto block = block_map_->find(handle.offset()); + // This is a possible scenario since block cache might not have had space + // for the partition + if (block != block_map_->end()) { + PERF_COUNTER_ADD(block_cache_hit_count, 1); + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_INDEX_HIT); + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_HIT); + Cache* block_cache = rep->table_options.block_cache.get(); + assert(block_cache); + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ, + block_cache->GetUsage(block->second.cache_handle)); + return block->second.value->NewIterator(&rep->internal_comparator, nullptr, + true, rep->ioptions.statistics); + } + // Create an empty iterator + return new BlockIter(); } // This will be broken if the user specifies an unusual implementation @@ -1820,13 +1769,224 @@ bool BlockBasedTable::PrefixMayMatch(const Slice& internal_key) { return may_match; } +void BlockBasedTableIterator::Seek(const Slice& target) { + if (!CheckPrefixMayMatch(target)) { + ResetDataIter(); + return; + } + + SavePrevIndexValue(); + + index_iter_->Seek(target); + + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + + InitDataBlock(); + + data_block_iter_.Seek(target); + + FindKeyForward(); + assert(!data_block_iter_.Valid() || + icomp_.Compare(target, data_block_iter_.key()) <= 0); +} + +void BlockBasedTableIterator::SeekForPrev(const Slice& target) { + if (!CheckPrefixMayMatch(target)) { + ResetDataIter(); + return; + } + + SavePrevIndexValue(); + + // Call Seek() rather than SeekForPrev() in the index block, because the + // target data block will likely to contain the position for `target`, the + // same as Seek(), rather than than before. + // For example, if we have three data blocks, each containing two keys: + // [2, 4] [6, 8] [10, 12] + // (the keys in the index block would be [4, 8, 12]) + // and the user calls SeekForPrev(7), we need to go to the second block, + // just like if they call Seek(7). + // The only case where the block is difference is when they seek to a position + // in the boundary. For example, if they SeekForPrev(5), we should go to the + // first block, rather than the second. However, we don't have the information + // to distinguish the two unless we read the second block. In this case, we'll + // end up with reading two blocks. + index_iter_->Seek(target); + + if (!index_iter_->Valid()) { + index_iter_->SeekToLast(); + if (!index_iter_->Valid()) { + ResetDataIter(); + block_iter_points_to_real_block_ = false; + return; + } + } + + InitDataBlock(); + + data_block_iter_.SeekForPrev(target); + + FindKeyBackward(); + assert(!data_block_iter_.Valid() || + icomp_.Compare(target, data_block_iter_.key()) >= 0); +} + +void BlockBasedTableIterator::SeekToFirst() { + SavePrevIndexValue(); + index_iter_->SeekToFirst(); + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + InitDataBlock(); + data_block_iter_.SeekToFirst(); + FindKeyForward(); +} + +void BlockBasedTableIterator::SeekToLast() { + SavePrevIndexValue(); + index_iter_->SeekToLast(); + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + InitDataBlock(); + data_block_iter_.SeekToLast(); + FindKeyBackward(); +} + +void BlockBasedTableIterator::Next() { + assert(block_iter_points_to_real_block_); + data_block_iter_.Next(); + FindKeyForward(); +} + +void BlockBasedTableIterator::Prev() { + assert(block_iter_points_to_real_block_); + data_block_iter_.Prev(); + FindKeyBackward(); +} + +void BlockBasedTableIterator::InitDataBlock() { + BlockHandle data_block_handle; + Slice handle_slice = index_iter_->value(); + if (!block_iter_points_to_real_block_ || + handle_slice.compare(prev_index_value_) != 0) { + if (block_iter_points_to_real_block_) { + ResetDataIter(); + } + Status s = data_block_handle.DecodeFrom(&handle_slice); + auto* rep = table_->get_rep(); + + // Automatically prefetch additional data when a range scan (iterator) does + // more than 2 sequential IOs. This is enabled only when + // ReadOptions.readahead_size is 0. + if (read_options_.readahead_size == 0) { + if (num_file_reads_ < 2) { + num_file_reads_++; + } else if (data_block_handle.offset() + + static_cast(data_block_handle.size()) + + kBlockTrailerSize > + readahead_limit_) { + num_file_reads_++; + // Do not readahead more than kMaxReadaheadSize. + readahead_size_ = std::min(kMaxReadaheadSize, readahead_size_); + table_->get_rep()->file->Prefetch(data_block_handle.offset(), + readahead_size_); + readahead_limit_ = data_block_handle.offset() + readahead_size_; + // Keep exponentially increasing readahead size until kMaxReadaheadSize. + readahead_size_ *= 2; + } + } + + BlockBasedTable::NewDataBlockIterator(rep, read_options_, data_block_handle, + &data_block_iter_, false, + /* get_context */ nullptr, s); + block_iter_points_to_real_block_ = true; + } +} + +void BlockBasedTableIterator::FindKeyForward() { + is_out_of_bound_ = false; + // TODO the while loop inherits from two-level-iterator. We don't know + // whether a block can be empty so it can be replaced by an "if". + while (!data_block_iter_.Valid()) { + if (!data_block_iter_.status().ok()) { + return; + } + ResetDataIter(); + // We used to check the current index key for upperbound. + // It will only save a data reading for a small percentage of use cases, + // so for code simplicity, we removed it. We can add it back if there is a + // significnat performance regression. + index_iter_->Next(); + + if (index_iter_->Valid()) { + InitDataBlock(); + data_block_iter_.SeekToFirst(); + } else { + return; + } + } + + // Check upper bound on the current key + bool reached_upper_bound = + (read_options_.iterate_upper_bound != nullptr && + block_iter_points_to_real_block_ && data_block_iter_.Valid() && + icomp_.user_comparator()->Compare(ExtractUserKey(data_block_iter_.key()), + *read_options_.iterate_upper_bound) >= + 0); + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound", + &reached_upper_bound); + if (reached_upper_bound) { + is_out_of_bound_ = true; + ResetDataIter(); + return; + } +} + +void BlockBasedTableIterator::FindKeyBackward() { + while (!data_block_iter_.Valid()) { + if (!data_block_iter_.status().ok()) { + return; + } + + ResetDataIter(); + index_iter_->Prev(); + + if (index_iter_->Valid()) { + InitDataBlock(); + data_block_iter_.SeekToLast(); + } else { + return; + } + } + + // We could have check lower bound here too, but we opt not to do it for + // code simplicity. +} + InternalIterator* BlockBasedTable::NewIterator(const ReadOptions& read_options, Arena* arena, bool skip_filters) { - return NewTwoLevelIterator( - new BlockEntryIteratorState(this, read_options, - &rep_->internal_comparator, skip_filters), - NewIndexIterator(read_options), arena); + if (arena == nullptr) { + return new BlockBasedTableIterator( + this, read_options, rep_->internal_comparator, + NewIndexIterator(read_options), + !skip_filters && !read_options.total_order_seek && + rep_->ioptions.prefix_extractor != nullptr); + } else { + auto* mem = arena->AllocateAligned(sizeof(BlockBasedTableIterator)); + return new (mem) BlockBasedTableIterator( + this, read_options, rep_->internal_comparator, + NewIndexIterator(read_options), + !skip_filters && !read_options.total_order_seek && + rep_->ioptions.prefix_extractor != nullptr); + } } InternalIterator* BlockBasedTable::NewRangeTombstoneIterator( @@ -1854,8 +2014,8 @@ InternalIterator* BlockBasedTable::NewRangeTombstoneIterator( } std::string str; rep_->range_del_handle.EncodeTo(&str); - // The meta-block exists but isn't in uncompressed block cache (maybe because - // it is disabled), so go through the full lookup process. + // The meta-block exists but isn't in uncompressed block cache (maybe + // because it is disabled), so go through the full lookup process. return NewDataBlockIterator(rep_, read_options, Slice(str)); } @@ -1932,8 +2092,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, if (read_options.read_tier == kBlockCacheTier && biter.status().IsIncomplete()) { // couldn't get block from block_cache - // Update Saver.state to Found because we are only looking for whether - // we can guarantee the key is not there when "no_io" is set + // Update Saver.state to Found because we are only looking for + // whether we can guarantee the key is not there when "no_io" is set get_context->MarkKeyMayExist(); break; } @@ -2098,8 +2258,8 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; Slice cache_key = - GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, - handle, cache_key_storage); + GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle, + cache_key_storage); Slice ckey; s = GetDataBlockFromCache( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 9e5d658f2..08b0bec7e 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -22,6 +22,7 @@ #include "rocksdb/statistics.h" #include "rocksdb/status.h" #include "rocksdb/table.h" +#include "table/block.h" #include "table/filter_block.h" #include "table/format.h" #include "table/persistent_cache_helper.h" @@ -33,8 +34,6 @@ namespace rocksdb { -class Block; -class BlockIter; class BlockHandle; class Cache; class FilterBlockReader; @@ -201,29 +200,36 @@ class BlockBasedTable : public TableReader { // The key retrieved are internal keys. Status GetKVPairsFromDataBlocks(std::vector* kv_pair_blocks); - class BlockEntryIteratorState; + template + struct CachableEntry; + struct Rep; + + Rep* get_rep() { return rep_; } + + // input_iter: if it is not null, update this one and return it as Iterator + static BlockIter* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, + const Slice& index_value, + BlockIter* input_iter = nullptr, + bool is_index = false, + GetContext* get_context = nullptr); + static BlockIter* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, + const BlockHandle& block_hanlde, + BlockIter* input_iter = nullptr, + bool is_index = false, + GetContext* get_context = nullptr, + Status s = Status()); + + class PartitionedIndexIteratorState; friend class PartitionIndexReader; protected: - template - struct CachableEntry; - struct Rep; Rep* rep_; explicit BlockBasedTable(Rep* rep) : rep_(rep) {} private: friend class MockedBlockBasedTable; static std::atomic next_cache_key_id_; - // input_iter: if it is not null, update this one and return it as Iterator - static BlockIter* NewDataBlockIterator( - Rep* rep, const ReadOptions& ro, const Slice& index_value, - BlockIter* input_iter = nullptr, bool is_index = false, - GetContext* get_context = nullptr); - static BlockIter* NewDataBlockIterator( - Rep* rep, const ReadOptions& ro, const BlockHandle& block_hanlde, - BlockIter* input_iter = nullptr, bool is_index = false, - GetContext* get_context = nullptr, Status s = Status()); // If block cache enabled (compressed or uncompressed), looks for the block // identified by handle in (1) uncompressed cache, (2) compressed cache, and @@ -357,35 +363,18 @@ class BlockBasedTable : public TableReader { }; // Maitaning state of a two-level iteration on a partitioned index structure -class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState { +class BlockBasedTable::PartitionedIndexIteratorState + : public TwoLevelIteratorState { public: - BlockEntryIteratorState( - BlockBasedTable* table, const ReadOptions& read_options, - const InternalKeyComparator* icomparator, bool skip_filters, - bool is_index = false, + PartitionedIndexIteratorState( + BlockBasedTable* table, std::unordered_map>* block_map = nullptr); InternalIterator* NewSecondaryIterator(const Slice& index_value) override; - bool PrefixMayMatch(const Slice& internal_key) override; - bool KeyReachedUpperBound(const Slice& internal_key) override; private: // Don't own table_ BlockBasedTable* table_; - const ReadOptions read_options_; - const InternalKeyComparator* icomparator_; - bool skip_filters_; - // true if the 2nd level iterator is on indexes instead of on user data. - bool is_index_; std::unordered_map>* block_map_; - port::RWMutex cleaner_mu; - - static const size_t kInitReadaheadSize = 8 * 1024; - // Found that 256 KB readahead size provides the best performance, based on - // experiments. - static const size_t kMaxReadaheadSize; - size_t readahead_size_ = kInitReadaheadSize; - size_t readahead_limit_ = 0; - int num_file_reads_ = 0; }; // CachableEntry represents the entries that *may* be fetched from block cache. @@ -504,4 +493,121 @@ struct BlockBasedTable::Rep { bool closed = false; }; +class BlockBasedTableIterator : public InternalIterator { + public: + BlockBasedTableIterator(BlockBasedTable* table, + const ReadOptions& read_options, + const InternalKeyComparator& icomp, + InternalIterator* index_iter, bool check_filter) + : table_(table), + read_options_(read_options), + icomp_(icomp), + index_iter_(index_iter), + pinned_iters_mgr_(nullptr), + block_iter_points_to_real_block_(false), + check_filter_(check_filter) {} + + ~BlockBasedTableIterator() { delete index_iter_; } + + void Seek(const Slice& target) override; + void SeekForPrev(const Slice& target) override; + void SeekToFirst() override; + void SeekToLast() override; + void Next() override; + void Prev() override; + bool Valid() const override { + return block_iter_points_to_real_block_ && data_block_iter_.Valid(); + } + Slice key() const override { + assert(Valid()); + return data_block_iter_.key(); + } + Slice value() const override { + assert(Valid()); + return data_block_iter_.value(); + } + Status status() const override { + // It'd be nice if status() returned a const Status& instead of a Status + if (!index_iter_->status().ok()) { + return index_iter_->status(); + } else if (block_iter_points_to_real_block_ && + !data_block_iter_.status().ok()) { + return data_block_iter_.status(); + } else { + return Status::OK(); + } + } + + bool IsOutOfBound() override { return is_out_of_bound_; } + + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + } + bool IsKeyPinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + block_iter_points_to_real_block_; + } + bool IsValuePinned() const override { + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + block_iter_points_to_real_block_; + } + + bool CheckPrefixMayMatch(const Slice& ikey) { + if (check_filter_ && !table_->PrefixMayMatch(ikey)) { + // TODO remember the iterator is invalidated because of prefix + // match. This can avoid the upper level file iterator to falsely + // believe the position is the end of the SST file and move to + // the first key of the next file. + ResetDataIter(); + return false; + } + return true; + } + + void ResetDataIter() { + if (block_iter_points_to_real_block_) { + if (pinned_iters_mgr_ != nullptr) { + data_block_iter_.DelegateCleanupsTo(pinned_iters_mgr_); + } + data_block_iter_.~BlockIter(); + new (&data_block_iter_) BlockIter(); + block_iter_points_to_real_block_ = false; + } + } + + void SavePrevIndexValue() { + if (block_iter_points_to_real_block_) { + // Reseek. If they end up with the same data block, we shouldn't re-fetch + // the same data block. + Slice v = index_iter_->value(); + prev_index_value_.assign(v.data(), v.size()); + } + } + + void InitDataBlock(); + void FindKeyForward(); + void FindKeyBackward(); + + private: + BlockBasedTable* table_; + const ReadOptions read_options_; + const InternalKeyComparator& icomp_; + InternalIterator* index_iter_; + PinnedIteratorsManager* pinned_iters_mgr_; + BlockIter data_block_iter_; + bool block_iter_points_to_real_block_; + bool is_out_of_bound_ = false; + bool check_filter_; + // TODO use block offset instead + std::string prev_index_value_; + + static const size_t kInitReadaheadSize = 8 * 1024; + // Found that 256 KB readahead size provides the best performance, based on + // experiments. + static const size_t kMaxReadaheadSize; + size_t readahead_size_ = kInitReadaheadSize; + size_t readahead_limit_ = 0; + int num_file_reads_ = 0; +}; + } // namespace rocksdb diff --git a/table/internal_iterator.h b/table/internal_iterator.h index 2bfdb7d95..705044a3a 100644 --- a/table/internal_iterator.h +++ b/table/internal_iterator.h @@ -69,6 +69,10 @@ class InternalIterator : public Cleanable { // satisfied without doing some IO, then this returns Status::Incomplete(). virtual Status status() const = 0; + // True if the iterator is invalidated because it is out of the iterator + // upper bound + virtual bool IsOutOfBound() { return false; } + // Pass the PinnedIteratorsManager to the Iterator, most Iterators dont // communicate with PinnedIteratorsManager so default implementation is no-op // but for Iterators that need to communicate with PinnedIteratorsManager diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index 2236a2a72..dbdf4a9fd 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -22,22 +22,9 @@ namespace { class TwoLevelIterator : public InternalIterator { public: explicit TwoLevelIterator(TwoLevelIteratorState* state, - InternalIterator* first_level_iter, - bool need_free_iter_and_state); + InternalIterator* first_level_iter); - virtual ~TwoLevelIterator() { - // Assert that the TwoLevelIterator is never deleted while Pinning is - // Enabled. - assert(!pinned_iters_mgr_ || - (pinned_iters_mgr_ && !pinned_iters_mgr_->PinningEnabled())); - first_level_iter_.DeleteIter(!need_free_iter_and_state_); - second_level_iter_.DeleteIter(false); - if (need_free_iter_and_state_) { - delete state_; - } else { - state_->~TwoLevelIteratorState(); - } - } + virtual ~TwoLevelIterator() { delete state_; } virtual void Seek(const Slice& target) override; virtual void SeekForPrev(const Slice& target) override; @@ -68,20 +55,9 @@ class TwoLevelIterator : public InternalIterator { } virtual void SetPinnedItersMgr( PinnedIteratorsManager* pinned_iters_mgr) override { - pinned_iters_mgr_ = pinned_iters_mgr; - first_level_iter_.SetPinnedItersMgr(pinned_iters_mgr); - if (second_level_iter_.iter()) { - second_level_iter_.SetPinnedItersMgr(pinned_iters_mgr); - } - } - virtual bool IsKeyPinned() const override { - return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && - second_level_iter_.iter() && second_level_iter_.IsKeyPinned(); - } - virtual bool IsValuePinned() const override { - return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && - second_level_iter_.iter() && second_level_iter_.IsValuePinned(); } + virtual bool IsKeyPinned() const override { return false; } + virtual bool IsValuePinned() const override { return false; } private: void SaveError(const Status& s) { @@ -95,8 +71,6 @@ class TwoLevelIterator : public InternalIterator { TwoLevelIteratorState* state_; IteratorWrapper first_level_iter_; IteratorWrapper second_level_iter_; // May be nullptr - bool need_free_iter_and_state_; - PinnedIteratorsManager* pinned_iters_mgr_; Status status_; // If second_level_iter is non-nullptr, then "data_block_handle_" holds the // "index_value" passed to block_function_ to create the second_level_iter. @@ -104,19 +78,10 @@ class TwoLevelIterator : public InternalIterator { }; TwoLevelIterator::TwoLevelIterator(TwoLevelIteratorState* state, - InternalIterator* first_level_iter, - bool need_free_iter_and_state) - : state_(state), - first_level_iter_(first_level_iter), - need_free_iter_and_state_(need_free_iter_and_state), - pinned_iters_mgr_(nullptr) {} + InternalIterator* first_level_iter) + : state_(state), first_level_iter_(first_level_iter) {} void TwoLevelIterator::Seek(const Slice& target) { - if (state_->check_prefix_may_match && - !state_->PrefixMayMatch(target)) { - SetSecondLevelIterator(nullptr); - return; - } first_level_iter_.Seek(target); InitDataBlock(); @@ -127,10 +92,6 @@ void TwoLevelIterator::Seek(const Slice& target) { } void TwoLevelIterator::SeekForPrev(const Slice& target) { - if (state_->check_prefix_may_match && !state_->PrefixMayMatch(target)) { - SetSecondLevelIterator(nullptr); - return; - } first_level_iter_.Seek(target); InitDataBlock(); if (second_level_iter_.iter() != nullptr) { @@ -183,8 +144,7 @@ void TwoLevelIterator::SkipEmptyDataBlocksForward() { (!second_level_iter_.Valid() && !second_level_iter_.status().IsIncomplete())) { // Move to next block - if (!first_level_iter_.Valid() || - state_->KeyReachedUpperBound(first_level_iter_.key())) { + if (!first_level_iter_.Valid()) { SetSecondLevelIterator(nullptr); return; } @@ -217,17 +177,8 @@ void TwoLevelIterator::SetSecondLevelIterator(InternalIterator* iter) { if (second_level_iter_.iter() != nullptr) { SaveError(second_level_iter_.status()); } - - if (pinned_iters_mgr_ && iter) { - iter->SetPinnedItersMgr(pinned_iters_mgr_); - } - InternalIterator* old_iter = second_level_iter_.Set(iter); - if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) { - pinned_iters_mgr_->PinIterator(old_iter); - } else { - delete old_iter; - } + delete old_iter; } void TwoLevelIterator::InitDataBlock() { @@ -251,17 +202,7 @@ void TwoLevelIterator::InitDataBlock() { } // namespace InternalIterator* NewTwoLevelIterator(TwoLevelIteratorState* state, - InternalIterator* first_level_iter, - Arena* arena, - bool need_free_iter_and_state) { - if (arena == nullptr) { - return new TwoLevelIterator(state, first_level_iter, - need_free_iter_and_state); - } else { - auto mem = arena->AllocateAligned(sizeof(TwoLevelIterator)); - return new (mem) - TwoLevelIterator(state, first_level_iter, need_free_iter_and_state); - } + InternalIterator* first_level_iter) { + return new TwoLevelIterator(state, first_level_iter); } - } // namespace rocksdb diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 34b33c83f..e81c76d38 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -19,16 +19,10 @@ class InternalKeyComparator; class Arena; struct TwoLevelIteratorState { - explicit TwoLevelIteratorState(bool _check_prefix_may_match) - : check_prefix_may_match(_check_prefix_may_match) {} + TwoLevelIteratorState() {} virtual ~TwoLevelIteratorState() {} virtual InternalIterator* NewSecondaryIterator(const Slice& handle) = 0; - virtual bool PrefixMayMatch(const Slice& internal_key) = 0; - virtual bool KeyReachedUpperBound(const Slice& internal_key) = 0; - - // If call PrefixMayMatch() - bool check_prefix_may_match; }; @@ -47,7 +41,6 @@ struct TwoLevelIteratorState { // need_free_iter_and_state: free `state` and `first_level_iter` if // true. Otherwise, just call destructor. extern InternalIterator* NewTwoLevelIterator( - TwoLevelIteratorState* state, InternalIterator* first_level_iter, - Arena* arena = nullptr, bool need_free_iter_and_state = true); + TwoLevelIteratorState* state, InternalIterator* first_level_iter); } // namespace rocksdb