diff --git a/CMakeLists.txt b/CMakeLists.txt index d7d0aed4f..35ec24dcb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -620,6 +620,7 @@ set(SOURCES options/options_sanity_check.cc port/stack_trace.cc table/adaptive/adaptive_table_factory.cc + table/block_based/binary_search_index_reader.cc table/block_based/block.cc table/block_based/block_based_filter_block.cc table/block_based/block_based_table_builder.cc @@ -633,9 +634,13 @@ set(SOURCES table/block_based/filter_policy.cc table/block_based/flush_block_policy.cc table/block_based/full_filter_block.cc + table/block_based/hash_index_reader.cc table/block_based/index_builder.cc + table/block_based/index_reader_common.cc table/block_based/parsed_full_filter_block.cc table/block_based/partitioned_filter_block.cc + table/block_based/partitioned_index_reader.cc + table/block_based/reader_common.cc table/block_based/uncompression_dict_reader.cc table/block_fetcher.cc table/cuckoo/cuckoo_table_builder.cc diff --git a/TARGETS b/TARGETS index 60eec57cd..9e5ea51cb 100644 --- a/TARGETS +++ b/TARGETS @@ -231,6 +231,7 @@ cpp_library( "port/port_posix.cc", "port/stack_trace.cc", "table/adaptive/adaptive_table_factory.cc", + "table/block_based/binary_search_index_reader.cc", "table/block_based/block.cc", "table/block_based/block_based_filter_block.cc", "table/block_based/block_based_table_builder.cc", @@ -244,9 +245,13 @@ cpp_library( "table/block_based/filter_policy.cc", "table/block_based/flush_block_policy.cc", "table/block_based/full_filter_block.cc", + "table/block_based/hash_index_reader.cc", "table/block_based/index_builder.cc", + "table/block_based/index_reader_common.cc", "table/block_based/parsed_full_filter_block.cc", "table/block_based/partitioned_filter_block.cc", + "table/block_based/partitioned_index_reader.cc", + "table/block_based/reader_common.cc", "table/block_based/uncompression_dict_reader.cc", "table/block_fetcher.cc", "table/cuckoo/cuckoo_table_builder.cc", diff --git a/src.mk b/src.mk index 50431e669..08b560942 100644 --- a/src.mk +++ b/src.mk @@ -119,6 +119,7 @@ LIB_SOURCES = \ port/port_posix.cc \ port/stack_trace.cc \ table/adaptive/adaptive_table_factory.cc \ + table/block_based/binary_search_index_reader.cc \ table/block_based/block.cc \ table/block_based/block_based_filter_block.cc \ table/block_based/block_based_table_builder.cc \ @@ -132,9 +133,13 @@ LIB_SOURCES = \ table/block_based/filter_policy.cc \ table/block_based/flush_block_policy.cc \ table/block_based/full_filter_block.cc \ + table/block_based/hash_index_reader.cc \ table/block_based/index_builder.cc \ + table/block_based/index_reader_common.cc \ table/block_based/parsed_full_filter_block.cc \ table/block_based/partitioned_filter_block.cc \ + table/block_based/partitioned_index_reader.cc \ + table/block_based/reader_common.cc \ table/block_based/uncompression_dict_reader.cc \ table/block_fetcher.cc \ table/cuckoo/cuckoo_table_builder.cc \ diff --git a/table/block_based/binary_search_index_reader.cc b/table/block_based/binary_search_index_reader.cc new file mode 100644 index 000000000..8c938c924 --- /dev/null +++ b/table/block_based/binary_search_index_reader.cc @@ -0,0 +1,73 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/binary_search_index_reader.h" + +namespace ROCKSDB_NAMESPACE { +Status BinarySearchIndexReader::Create( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = + ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + /*get_context=*/nullptr, lookup_context, &index_block); + if (!s.ok()) { + return s; + } + + if (use_cache && !pin) { + index_block.Reset(); + } + } + + index_reader->reset( + new BinarySearchIndexReader(table, std::move(index_block))); + + return Status::OK(); +} + +InternalIteratorBase* BinarySearchIndexReader::NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) { + const BlockBasedTable::Rep* rep = table()->get_rep(); + const bool no_io = (read_options.read_tier == kBlockCacheTier); + CachableEntry index_block; + const Status s = + GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + Statistics* kNullStats = nullptr; + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + auto it = index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), index_value_is_full()); + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/binary_search_index_reader.h b/table/block_based/binary_search_index_reader.h new file mode 100644 index 000000000..e8a05d51e --- /dev/null +++ b/table/block_based/binary_search_index_reader.h @@ -0,0 +1,48 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "table/block_based/index_reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Index that allows binary search lookup for the first key of each block. +// This class can be viewed as a thin wrapper for `Block` class which already +// supports binary search. +class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon { + public: + // Read index from the file and create an intance for + // `BinarySearchIndexReader`. + // On success, index_reader will be populated; otherwise it will remain + // unmodified. + static Status Create(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader); + + InternalIteratorBase* NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) override; + + size_t ApproximateMemoryUsage() const override { + size_t usage = ApproximateIndexBlockMemoryUsage(); +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size(const_cast(this)); +#else + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + return usage; + } + + private: + BinarySearchIndexReader(const BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) {} +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_iterator.h b/table/block_based/block_based_table_iterator.h new file mode 100644 index 000000000..1eadc5fc8 --- /dev/null +++ b/table/block_based/block_based_table_iterator.h @@ -0,0 +1,657 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "table/block_based/block_based_table_reader.h" + +#include "table/block_based/block_based_table_reader_impl.h" +#include "table/block_based/reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Iterates over the contents of BlockBasedTable. +template +class BlockBasedTableIterator : public InternalIteratorBase { + // compaction_readahead_size: its value will only be used if for_compaction = + // true + public: + BlockBasedTableIterator(const BlockBasedTable* table, + const ReadOptions& read_options, + const InternalKeyComparator& icomp, + InternalIteratorBase* index_iter, + bool check_filter, bool need_upper_bound_check, + const SliceTransform* prefix_extractor, + BlockType block_type, TableReaderCaller caller, + size_t compaction_readahead_size = 0) + : table_(table), + read_options_(read_options), + icomp_(icomp), + user_comparator_(icomp.user_comparator()), + index_iter_(index_iter), + pinned_iters_mgr_(nullptr), + block_iter_points_to_real_block_(false), + check_filter_(check_filter), + need_upper_bound_check_(need_upper_bound_check), + prefix_extractor_(prefix_extractor), + block_type_(block_type), + lookup_context_(caller), + compaction_readahead_size_(compaction_readahead_size) {} + + ~BlockBasedTableIterator() { delete index_iter_; } + + void Seek(const Slice& target) override; + void SeekForPrev(const Slice& target) override; + void SeekToFirst() override; + void SeekToLast() override; + void Next() final override; + bool NextAndGetResult(IterateResult* result) override; + void Prev() override; + bool Valid() const override { + return !is_out_of_bound_ && + (is_at_first_key_from_index_ || + (block_iter_points_to_real_block_ && block_iter_.Valid())); + } + Slice key() const override { + assert(Valid()); + if (is_at_first_key_from_index_) { + return index_iter_->value().first_internal_key; + } else { + return block_iter_.key(); + } + } + Slice user_key() const override { + assert(Valid()); + if (is_at_first_key_from_index_) { + return ExtractUserKey(index_iter_->value().first_internal_key); + } else { + return block_iter_.user_key(); + } + } + TValue value() const override { + assert(Valid()); + + // Load current block if not loaded. + if (is_at_first_key_from_index_ && + !const_cast(this) + ->MaterializeCurrentBlock()) { + // Oops, index is not consistent with block contents, but we have + // no good way to report error at this point. Let's return empty value. + return TValue(); + } + + return block_iter_.value(); + } + Status status() const override { + // Prefix index set status to NotFound when the prefix does not exist + if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) { + return index_iter_->status(); + } else if (block_iter_points_to_real_block_) { + return block_iter_.status(); + } else { + return Status::OK(); + } + } + + // Whether iterator invalidated for being out of bound. + bool IsOutOfBound() override { return is_out_of_bound_; } + + inline bool MayBeOutOfUpperBound() override { + assert(Valid()); + return !data_block_within_upper_bound_; + } + + void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { + pinned_iters_mgr_ = pinned_iters_mgr; + } + bool IsKeyPinned() const override { + // Our key comes either from block_iter_'s current key + // or index_iter_'s current *value*. + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + ((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) || + (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned())); + } + bool IsValuePinned() const override { + // Load current block if not loaded. + if (is_at_first_key_from_index_) { + const_cast(this)->MaterializeCurrentBlock(); + } + // BlockIter::IsValuePinned() is always true. No need to check + return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && + block_iter_points_to_real_block_; + } + + void ResetDataIter() { + if (block_iter_points_to_real_block_) { + if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) { + block_iter_.DelegateCleanupsTo(pinned_iters_mgr_); + } + block_iter_.Invalidate(Status::OK()); + block_iter_points_to_real_block_ = false; + } + } + + void SavePrevIndexValue() { + if (block_iter_points_to_real_block_) { + // Reseek. If they end up with the same data block, we shouldn't re-fetch + // the same data block. + prev_block_offset_ = index_iter_->value().handle.offset(); + } + } + + private: + enum class IterDirection { + kForward, + kBackward, + }; + + const BlockBasedTable* table_; + const ReadOptions read_options_; + const InternalKeyComparator& icomp_; + UserComparatorWrapper user_comparator_; + InternalIteratorBase* index_iter_; + PinnedIteratorsManager* pinned_iters_mgr_; + TBlockIter block_iter_; + + // True if block_iter_ is initialized and points to the same block + // as index iterator. + bool block_iter_points_to_real_block_; + // See InternalIteratorBase::IsOutOfBound(). + bool is_out_of_bound_ = false; + // Whether current data block being fully within iterate upper bound. + bool data_block_within_upper_bound_ = false; + // True if we're standing at the first key of a block, and we haven't loaded + // that block yet. A call to value() will trigger loading the block. + bool is_at_first_key_from_index_ = false; + bool check_filter_; + // TODO(Zhongyi): pick a better name + bool need_upper_bound_check_; + const SliceTransform* prefix_extractor_; + BlockType block_type_; + uint64_t prev_block_offset_ = std::numeric_limits::max(); + BlockCacheLookupContext lookup_context_; + // Readahead size used in compaction, its value is used only if + // lookup_context_.caller = kCompaction. + size_t compaction_readahead_size_; + + size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; + size_t readahead_limit_ = 0; + int64_t num_file_reads_ = 0; + std::unique_ptr prefetch_buffer_; + + // If `target` is null, seek to first. + void SeekImpl(const Slice* target); + + void InitDataBlock(); + bool MaterializeCurrentBlock(); + void FindKeyForward(); + void FindBlockForward(); + void FindKeyBackward(); + void CheckOutOfBound(); + + // Check if data block is fully within iterate_upper_bound. + // + // Note MyRocks may update iterate bounds between seek. To workaround it, + // we need to check and update data_block_within_upper_bound_ accordingly. + void CheckDataBlockWithinUpperBound(); + + bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction) { + if (need_upper_bound_check_ && direction == IterDirection::kBackward) { + // Upper bound check isn't sufficnet for backward direction to + // guarantee the same result as total order, so disable prefix + // check. + return true; + } + if (check_filter_ && + !table_->PrefixMayMatch(ikey, read_options_, prefix_extractor_, + need_upper_bound_check_, &lookup_context_)) { + // TODO remember the iterator is invalidated because of prefix + // match. This can avoid the upper level file iterator to falsely + // believe the position is the end of the SST file and move to + // the first key of the next file. + ResetDataIter(); + return false; + } + return true; + } +}; + +// Functions below cannot be moved to .cc file because the class is a template +// The template is in place so that block based table iterator can be served +// partitioned index too. However, the logic is kind of different between the +// two. So we may think of de-template them by having a separate iterator +// for partitioned index. + +template +void BlockBasedTableIterator::Seek(const Slice& target) { + SeekImpl(&target); +} + +template +void BlockBasedTableIterator::SeekToFirst() { + SeekImpl(nullptr); +} + +template +void BlockBasedTableIterator::SeekImpl( + const Slice* target) { + is_out_of_bound_ = false; + is_at_first_key_from_index_ = false; + if (target && !CheckPrefixMayMatch(*target, IterDirection::kForward)) { + ResetDataIter(); + return; + } + + bool need_seek_index = true; + if (block_iter_points_to_real_block_ && block_iter_.Valid()) { + // Reseek. + prev_block_offset_ = index_iter_->value().handle.offset(); + + if (target) { + // We can avoid an index seek if: + // 1. The new seek key is larger than the current key + // 2. The new seek key is within the upper bound of the block + // Since we don't necessarily know the internal key for either + // the current key or the upper bound, we check user keys and + // exclude the equality case. Considering internal keys can + // improve for the boundary cases, but it would complicate the + // code. + if (user_comparator_.Compare(ExtractUserKey(*target), + block_iter_.user_key()) > 0 && + user_comparator_.Compare(ExtractUserKey(*target), + index_iter_->user_key()) < 0) { + need_seek_index = false; + } + } + } + + if (need_seek_index) { + if (target) { + index_iter_->Seek(*target); + } else { + index_iter_->SeekToFirst(); + } + + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + } + + IndexValue v = index_iter_->value(); + const bool same_block = block_iter_points_to_real_block_ && + v.handle.offset() == prev_block_offset_; + + // TODO(kolmike): Remove the != kBlockCacheTier condition. + if (!v.first_internal_key.empty() && !same_block && + (!target || icomp_.Compare(*target, v.first_internal_key) <= 0) && + read_options_.read_tier != kBlockCacheTier) { + // Index contains the first key of the block, and it's >= target. + // We can defer reading the block. + is_at_first_key_from_index_ = true; + // ResetDataIter() will invalidate block_iter_. Thus, there is no need to + // call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound + // as that will be done later when the data block is actually read. + ResetDataIter(); + } else { + // Need to use the data block. + if (!same_block) { + InitDataBlock(); + } else { + // When the user does a reseek, the iterate_upper_bound might have + // changed. CheckDataBlockWithinUpperBound() needs to be called + // explicitly if the reseek ends up in the same data block. + // If the reseek ends up in a different block, InitDataBlock() will do + // the iterator upper bound check. + CheckDataBlockWithinUpperBound(); + } + + if (target) { + block_iter_.Seek(*target); + } else { + block_iter_.SeekToFirst(); + } + FindKeyForward(); + } + + CheckOutOfBound(); + + if (target) { + assert(!Valid() || ((block_type_ == BlockType::kIndex && + !table_->get_rep()->index_key_includes_seq) + ? (user_comparator_.Compare(ExtractUserKey(*target), + key()) <= 0) + : (icomp_.Compare(*target, key()) <= 0))); + } +} + +template +void BlockBasedTableIterator::SeekForPrev( + const Slice& target) { + is_out_of_bound_ = false; + is_at_first_key_from_index_ = false; + // For now totally disable prefix seek in auto prefix mode because we don't + // have logic + if (!CheckPrefixMayMatch(target, IterDirection::kBackward)) { + ResetDataIter(); + return; + } + + SavePrevIndexValue(); + + // Call Seek() rather than SeekForPrev() in the index block, because the + // target data block will likely to contain the position for `target`, the + // same as Seek(), rather than than before. + // For example, if we have three data blocks, each containing two keys: + // [2, 4] [6, 8] [10, 12] + // (the keys in the index block would be [4, 8, 12]) + // and the user calls SeekForPrev(7), we need to go to the second block, + // just like if they call Seek(7). + // The only case where the block is difference is when they seek to a position + // in the boundary. For example, if they SeekForPrev(5), we should go to the + // first block, rather than the second. However, we don't have the information + // to distinguish the two unless we read the second block. In this case, we'll + // end up with reading two blocks. + index_iter_->Seek(target); + + if (!index_iter_->Valid()) { + auto seek_status = index_iter_->status(); + // Check for IO error + if (!seek_status.IsNotFound() && !seek_status.ok()) { + ResetDataIter(); + return; + } + + // With prefix index, Seek() returns NotFound if the prefix doesn't exist + if (seek_status.IsNotFound()) { + // Any key less than the target is fine for prefix seek + ResetDataIter(); + return; + } else { + index_iter_->SeekToLast(); + } + // Check for IO error + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + } + + InitDataBlock(); + + block_iter_.SeekForPrev(target); + + FindKeyBackward(); + CheckDataBlockWithinUpperBound(); + assert(!block_iter_.Valid() || + icomp_.Compare(target, block_iter_.key()) >= 0); +} + +template +void BlockBasedTableIterator::SeekToLast() { + is_out_of_bound_ = false; + is_at_first_key_from_index_ = false; + SavePrevIndexValue(); + index_iter_->SeekToLast(); + if (!index_iter_->Valid()) { + ResetDataIter(); + return; + } + InitDataBlock(); + block_iter_.SeekToLast(); + FindKeyBackward(); + CheckDataBlockWithinUpperBound(); +} + +template +void BlockBasedTableIterator::Next() { + if (is_at_first_key_from_index_ && !MaterializeCurrentBlock()) { + return; + } + assert(block_iter_points_to_real_block_); + block_iter_.Next(); + FindKeyForward(); + CheckOutOfBound(); +} + +template +bool BlockBasedTableIterator::NextAndGetResult( + IterateResult* result) { + Next(); + bool is_valid = Valid(); + if (is_valid) { + result->key = key(); + result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); + } + return is_valid; +} + +template +void BlockBasedTableIterator::Prev() { + if (is_at_first_key_from_index_) { + is_at_first_key_from_index_ = false; + + index_iter_->Prev(); + if (!index_iter_->Valid()) { + return; + } + + InitDataBlock(); + block_iter_.SeekToLast(); + } else { + assert(block_iter_points_to_real_block_); + block_iter_.Prev(); + } + + FindKeyBackward(); +} + +template +void BlockBasedTableIterator::InitDataBlock() { + BlockHandle data_block_handle = index_iter_->value().handle; + if (!block_iter_points_to_real_block_ || + data_block_handle.offset() != prev_block_offset_ || + // if previous attempt of reading the block missed cache, try again + block_iter_.status().IsIncomplete()) { + if (block_iter_points_to_real_block_) { + ResetDataIter(); + } + auto* rep = table_->get_rep(); + + // Prefetch additional data for range scans (iterators). Enabled only for + // user reads. + // Implicit auto readahead: + // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. + // Explicit user requested readahead: + // Enabled from the very first IO when ReadOptions.readahead_size is set. + if (lookup_context_.caller != TableReaderCaller::kCompaction) { + if (read_options_.readahead_size == 0) { + // Implicit auto readahead + num_file_reads_++; + if (num_file_reads_ > + BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { + if (!rep->file->use_direct_io() && + (data_block_handle.offset() + + static_cast(block_size(data_block_handle)) > + readahead_limit_)) { + // Buffered I/O + // Discarding the return status of Prefetch calls intentionally, as + // we can fallback to reading from disk if Prefetch fails. + rep->file->Prefetch(data_block_handle.offset(), readahead_size_); + readahead_limit_ = static_cast(data_block_handle.offset() + + readahead_size_); + // Keep exponentially increasing readahead size until + // kMaxAutoReadaheadSize. + readahead_size_ = std::min(BlockBasedTable::kMaxAutoReadaheadSize, + readahead_size_ * 2); + } else if (rep->file->use_direct_io() && !prefetch_buffer_) { + // Direct I/O + // Let FilePrefetchBuffer take care of the readahead. + rep->CreateFilePrefetchBuffer( + BlockBasedTable::kInitAutoReadaheadSize, + BlockBasedTable::kMaxAutoReadaheadSize, &prefetch_buffer_); + } + } + } else if (!prefetch_buffer_) { + // Explicit user requested readahead + // The actual condition is: + // if (read_options_.readahead_size != 0 && !prefetch_buffer_) + rep->CreateFilePrefetchBuffer(read_options_.readahead_size, + read_options_.readahead_size, + &prefetch_buffer_); + } + } else if (!prefetch_buffer_) { + rep->CreateFilePrefetchBuffer(compaction_readahead_size_, + compaction_readahead_size_, + &prefetch_buffer_); + } + + Status s; + table_->NewDataBlockIterator( + read_options_, data_block_handle, &block_iter_, block_type_, + /*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get(), + /*for_compaction=*/lookup_context_.caller == + TableReaderCaller::kCompaction); + block_iter_points_to_real_block_ = true; + CheckDataBlockWithinUpperBound(); + } +} + +template +bool BlockBasedTableIterator::MaterializeCurrentBlock() { + assert(is_at_first_key_from_index_); + assert(!block_iter_points_to_real_block_); + assert(index_iter_->Valid()); + + is_at_first_key_from_index_ = false; + InitDataBlock(); + assert(block_iter_points_to_real_block_); + block_iter_.SeekToFirst(); + + if (!block_iter_.Valid() || + icomp_.Compare(block_iter_.key(), + index_iter_->value().first_internal_key) != 0) { + // Uh oh. + block_iter_.Invalidate(Status::Corruption( + "first key in index doesn't match first key in block")); + return false; + } + + return true; +} + +template +void BlockBasedTableIterator::FindKeyForward() { + // This method's code is kept short to make it likely to be inlined. + + assert(!is_out_of_bound_); + assert(block_iter_points_to_real_block_); + + if (!block_iter_.Valid()) { + // This is the only call site of FindBlockForward(), but it's extracted into + // a separate method to keep FindKeyForward() short and likely to be + // inlined. When transitioning to a different block, we call + // FindBlockForward(), which is much longer and is probably not inlined. + FindBlockForward(); + } else { + // This is the fast path that avoids a function call. + } +} + +template +void BlockBasedTableIterator::FindBlockForward() { + // TODO the while loop inherits from two-level-iterator. We don't know + // whether a block can be empty so it can be replaced by an "if". + do { + if (!block_iter_.status().ok()) { + return; + } + // Whether next data block is out of upper bound, if there is one. + const bool next_block_is_out_of_bound = + read_options_.iterate_upper_bound != nullptr && + block_iter_points_to_real_block_ && !data_block_within_upper_bound_; + assert(!next_block_is_out_of_bound || + user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, + index_iter_->user_key(), /*b_has_ts=*/true) <= 0); + ResetDataIter(); + index_iter_->Next(); + if (next_block_is_out_of_bound) { + // The next block is out of bound. No need to read it. + TEST_SYNC_POINT_CALLBACK("BlockBasedTableIterator:out_of_bound", nullptr); + // We need to make sure this is not the last data block before setting + // is_out_of_bound_, since the index key for the last data block can be + // larger than smallest key of the next file on the same level. + if (index_iter_->Valid()) { + is_out_of_bound_ = true; + } + return; + } + + if (!index_iter_->Valid()) { + return; + } + + IndexValue v = index_iter_->value(); + + // TODO(kolmike): Remove the != kBlockCacheTier condition. + if (!v.first_internal_key.empty() && + read_options_.read_tier != kBlockCacheTier) { + // Index contains the first key of the block. Defer reading the block. + is_at_first_key_from_index_ = true; + return; + } + + InitDataBlock(); + block_iter_.SeekToFirst(); + } while (!block_iter_.Valid()); +} + +template +void BlockBasedTableIterator::FindKeyBackward() { + while (!block_iter_.Valid()) { + if (!block_iter_.status().ok()) { + return; + } + + ResetDataIter(); + index_iter_->Prev(); + + if (index_iter_->Valid()) { + InitDataBlock(); + block_iter_.SeekToLast(); + } else { + return; + } + } + + // We could have check lower bound here too, but we opt not to do it for + // code simplicity. +} + +template +void BlockBasedTableIterator::CheckOutOfBound() { + if (read_options_.iterate_upper_bound != nullptr && Valid()) { + is_out_of_bound_ = + user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, user_key(), + /*b_has_ts=*/true) <= 0; + } +} + +template +void BlockBasedTableIterator::CheckDataBlockWithinUpperBound() { + if (read_options_.iterate_upper_bound != nullptr && + block_iter_points_to_real_block_) { + data_block_within_upper_bound_ = + (user_comparator_.CompareWithoutTimestamp( + *read_options_.iterate_upper_bound, /*a_has_ts=*/false, + index_iter_->user_key(), + /*b_has_ts=*/true) > 0); + } +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 6a0d8ef5f..3c619d9ea 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -31,13 +31,17 @@ #include "rocksdb/table.h" #include "rocksdb/table_properties.h" +#include "table/block_based/binary_search_index_reader.h" #include "table/block_based/block.h" #include "table/block_based/block_based_filter_block.h" #include "table/block_based/block_based_table_factory.h" +#include "table/block_based/block_based_table_iterator.h" #include "table/block_based/block_prefix_index.h" #include "table/block_based/filter_block.h" #include "table/block_based/full_filter_block.h" +#include "table/block_based/hash_index_reader.h" #include "table/block_based/partitioned_filter_block.h" +#include "table/block_based/partitioned_index_reader.h" #include "table/block_fetcher.h" #include "table/format.h" #include "table/get_context.h" @@ -175,20 +179,6 @@ Status ReadBlockFromFile( return s; } -inline MemoryAllocator* GetMemoryAllocator( - const BlockBasedTableOptions& table_options) { - return table_options.block_cache.get() - ? table_options.block_cache->memory_allocator() - : nullptr; -} - -inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock( - const BlockBasedTableOptions& table_options) { - return table_options.block_cache_compressed.get() - ? table_options.block_cache_compressed->memory_allocator() - : nullptr; -} - // Delete the entry resided in the cache. template void DeleteCachedEntry(const Slice& /*key*/, void* value) { @@ -196,13 +186,6 @@ void DeleteCachedEntry(const Slice& /*key*/, void* value) { delete entry; } -// Release the cached entry and decrement its ref count. -void ForceReleaseCachedEntry(void* arg, void* h) { - Cache* cache = reinterpret_cast(arg); - Cache::Handle* handle = reinterpret_cast(h); - cache->Release(handle, true /* force_erase */); -} - // Release the cached entry and decrement its ref count. // Do not force erase void ReleaseCachedEntry(void* arg, void* h) { @@ -239,555 +222,8 @@ CacheAllocationPtr CopyBufferToHeap(MemoryAllocator* allocator, Slice& buf) { memcpy(heap_buf.get(), buf.data(), buf.size()); return heap_buf; } - } // namespace -// Encapsulates common functionality for the various index reader -// implementations. Provides access to the index block regardless of whether -// it is owned by the reader or stored in the cache, or whether it is pinned -// in the cache or not. -class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader { - public: - IndexReaderCommon(const BlockBasedTable* t, - CachableEntry&& index_block) - : table_(t), index_block_(std::move(index_block)) { - assert(table_ != nullptr); - } - - protected: - static Status ReadIndexBlock(const BlockBasedTable* table, - FilePrefetchBuffer* prefetch_buffer, - const ReadOptions& read_options, bool use_cache, - GetContext* get_context, - BlockCacheLookupContext* lookup_context, - CachableEntry* index_block); - - const BlockBasedTable* table() const { return table_; } - - const InternalKeyComparator* internal_comparator() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - - return &table_->get_rep()->internal_comparator; - } - - bool index_has_first_key() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - return table_->get_rep()->index_has_first_key; - } - - bool index_key_includes_seq() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - return table_->get_rep()->index_key_includes_seq; - } - - bool index_value_is_full() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - return table_->get_rep()->index_value_is_full; - } - - bool cache_index_blocks() const { - assert(table_ != nullptr); - assert(table_->get_rep() != nullptr); - return table_->get_rep()->table_options.cache_index_and_filter_blocks; - } - - Status GetOrReadIndexBlock(bool no_io, GetContext* get_context, - BlockCacheLookupContext* lookup_context, - CachableEntry* index_block) const; - - size_t ApproximateIndexBlockMemoryUsage() const { - assert(!index_block_.GetOwnValue() || index_block_.GetValue() != nullptr); - return index_block_.GetOwnValue() - ? index_block_.GetValue()->ApproximateMemoryUsage() - : 0; - } - - private: - const BlockBasedTable* table_; - CachableEntry index_block_; -}; - -Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( - const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, - const ReadOptions& read_options, bool use_cache, GetContext* get_context, - BlockCacheLookupContext* lookup_context, - CachableEntry* index_block) { - PERF_TIMER_GUARD(read_index_block_nanos); - - assert(table != nullptr); - assert(index_block != nullptr); - assert(index_block->IsEmpty()); - - const Rep* const rep = table->get_rep(); - assert(rep != nullptr); - - const Status s = table->RetrieveBlock( - prefetch_buffer, read_options, rep->footer.index_handle(), - UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex, - get_context, lookup_context, /* for_compaction */ false, use_cache); - - return s; -} - -Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock( - bool no_io, GetContext* get_context, - BlockCacheLookupContext* lookup_context, - CachableEntry* index_block) const { - assert(index_block != nullptr); - - if (!index_block_.IsEmpty()) { - index_block->SetUnownedValue(index_block_.GetValue()); - return Status::OK(); - } - - ReadOptions read_options; - if (no_io) { - read_options.read_tier = kBlockCacheTier; - } - - return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_options, - cache_index_blocks(), get_context, lookup_context, - index_block); -} - -// Index that allows binary search lookup in a two-level index structure. -class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { - public: - // Read the partition index from the file and create an instance for - // `PartitionIndexReader`. - // On success, index_reader will be populated; otherwise it will remain - // unmodified. - static Status Create(const BlockBasedTable* table, - FilePrefetchBuffer* prefetch_buffer, bool use_cache, - bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, - std::unique_ptr* index_reader) { - assert(table != nullptr); - assert(table->get_rep()); - assert(!pin || prefetch); - assert(index_reader != nullptr); - - CachableEntry index_block; - if (prefetch || !use_cache) { - const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, - /*get_context=*/nullptr, lookup_context, &index_block); - if (!s.ok()) { - return s; - } - - if (use_cache && !pin) { - index_block.Reset(); - } - } - - index_reader->reset( - new PartitionIndexReader(table, std::move(index_block))); - - return Status::OK(); - } - - // return a two-level iterator: first level is on the partition index - InternalIteratorBase* NewIterator( - const ReadOptions& read_options, bool /* disable_prefix_seek */, - IndexBlockIter* iter, GetContext* get_context, - BlockCacheLookupContext* lookup_context) override { - const bool no_io = (read_options.read_tier == kBlockCacheTier); - CachableEntry index_block; - const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); - if (!s.ok()) { - if (iter != nullptr) { - iter->Invalidate(s); - return iter; - } - - return NewErrorInternalIterator(s); - } - - const BlockBasedTable::Rep* rep = table()->rep_; - InternalIteratorBase* it = nullptr; - - Statistics* kNullStats = nullptr; - // Filters are already checked before seeking the index - if (!partition_map_.empty()) { - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - it = NewTwoLevelIterator( - new BlockBasedTable::PartitionedIndexIteratorState(table(), - &partition_map_), - index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, - true, index_has_first_key(), index_key_includes_seq(), - index_value_is_full())); - } else { - ReadOptions ro; - ro.fill_cache = read_options.fill_cache; - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - it = new BlockBasedTableIterator( - table(), ro, *internal_comparator(), - index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, - true, index_has_first_key(), index_key_includes_seq(), - index_value_is_full()), - false, true, /* prefix_extractor */ nullptr, BlockType::kIndex, - lookup_context ? lookup_context->caller - : TableReaderCaller::kUncategorized); - } - - assert(it != nullptr); - index_block.TransferTo(it); - - return it; - - // TODO(myabandeh): Update TwoLevelIterator to be able to make use of - // on-stack BlockIter while the state is on heap. Currentlly it assumes - // the first level iter is always on heap and will attempt to delete it - // in its destructor. - } - - void CacheDependencies(bool pin) override { - // Before read partitions, prefetch them to avoid lots of IOs - BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; - const BlockBasedTable::Rep* rep = table()->rep_; - IndexBlockIter biter; - BlockHandle handle; - Statistics* kNullStats = nullptr; - - CachableEntry index_block; - Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */, - &lookup_context, &index_block); - if (!s.ok()) { - ROCKS_LOG_WARN(rep->ioptions.info_log, - "Error retrieving top-level index block while trying to " - "cache index partitions: %s", - s.ToString().c_str()); - return; - } - - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true, - index_has_first_key(), index_key_includes_seq(), index_value_is_full()); - // Index partitions are assumed to be consecuitive. Prefetch them all. - // Read the first block offset - biter.SeekToFirst(); - if (!biter.Valid()) { - // Empty index. - return; - } - handle = biter.value().handle; - uint64_t prefetch_off = handle.offset(); - - // Read the last block's offset - biter.SeekToLast(); - if (!biter.Valid()) { - // Empty index. - return; - } - handle = biter.value().handle; - uint64_t last_off = handle.offset() + block_size(handle); - uint64_t prefetch_len = last_off - prefetch_off; - std::unique_ptr prefetch_buffer; - rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer); - s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off, - static_cast(prefetch_len)); - - // After prefetch, read the partitions one by one - biter.SeekToFirst(); - auto ro = ReadOptions(); - for (; biter.Valid(); biter.Next()) { - handle = biter.value().handle; - CachableEntry block; - // TODO: Support counter batch update for partitioned index and - // filter blocks - s = table()->MaybeReadBlockAndLoadToCache( - prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), - &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context, - /*contents=*/nullptr); - - assert(s.ok() || block.GetValue() == nullptr); - if (s.ok() && block.GetValue() != nullptr) { - if (block.IsCached()) { - if (pin) { - partition_map_[handle.offset()] = std::move(block); - } - } - } - } - } - - size_t ApproximateMemoryUsage() const override { - size_t usage = ApproximateIndexBlockMemoryUsage(); -#ifdef ROCKSDB_MALLOC_USABLE_SIZE - usage += malloc_usable_size(const_cast(this)); -#else - usage += sizeof(*this); -#endif // ROCKSDB_MALLOC_USABLE_SIZE - // TODO(myabandeh): more accurate estimate of partition_map_ mem usage - return usage; - } - - private: - PartitionIndexReader(const BlockBasedTable* t, - CachableEntry&& index_block) - : IndexReaderCommon(t, std::move(index_block)) {} - - std::unordered_map> partition_map_; -}; - -// Index that allows binary search lookup for the first key of each block. -// This class can be viewed as a thin wrapper for `Block` class which already -// supports binary search. -class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon { - public: - // Read index from the file and create an intance for - // `BinarySearchIndexReader`. - // On success, index_reader will be populated; otherwise it will remain - // unmodified. - static Status Create(const BlockBasedTable* table, - FilePrefetchBuffer* prefetch_buffer, bool use_cache, - bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, - std::unique_ptr* index_reader) { - assert(table != nullptr); - assert(table->get_rep()); - assert(!pin || prefetch); - assert(index_reader != nullptr); - - CachableEntry index_block; - if (prefetch || !use_cache) { - const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, - /*get_context=*/nullptr, lookup_context, &index_block); - if (!s.ok()) { - return s; - } - - if (use_cache && !pin) { - index_block.Reset(); - } - } - - index_reader->reset( - new BinarySearchIndexReader(table, std::move(index_block))); - - return Status::OK(); - } - - InternalIteratorBase* NewIterator( - const ReadOptions& read_options, bool /* disable_prefix_seek */, - IndexBlockIter* iter, GetContext* get_context, - BlockCacheLookupContext* lookup_context) override { - const BlockBasedTable::Rep* rep = table()->get_rep(); - const bool no_io = (read_options.read_tier == kBlockCacheTier); - CachableEntry index_block; - const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); - if (!s.ok()) { - if (iter != nullptr) { - iter->Invalidate(s); - return iter; - } - - return NewErrorInternalIterator(s); - } - - Statistics* kNullStats = nullptr; - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - auto it = index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, true, - index_has_first_key(), index_key_includes_seq(), index_value_is_full()); - - assert(it != nullptr); - index_block.TransferTo(it); - - return it; - } - - size_t ApproximateMemoryUsage() const override { - size_t usage = ApproximateIndexBlockMemoryUsage(); -#ifdef ROCKSDB_MALLOC_USABLE_SIZE - usage += malloc_usable_size(const_cast(this)); -#else - usage += sizeof(*this); -#endif // ROCKSDB_MALLOC_USABLE_SIZE - return usage; - } - - private: - BinarySearchIndexReader(const BlockBasedTable* t, - CachableEntry&& index_block) - : IndexReaderCommon(t, std::move(index_block)) {} -}; - -// Index that leverages an internal hash table to quicken the lookup for a given -// key. -class HashIndexReader : public BlockBasedTable::IndexReaderCommon { - public: - static Status Create(const BlockBasedTable* table, - FilePrefetchBuffer* prefetch_buffer, - InternalIterator* meta_index_iter, bool use_cache, - bool prefetch, bool pin, - BlockCacheLookupContext* lookup_context, - std::unique_ptr* index_reader) { - assert(table != nullptr); - assert(index_reader != nullptr); - assert(!pin || prefetch); - - const BlockBasedTable::Rep* rep = table->get_rep(); - assert(rep != nullptr); - - CachableEntry index_block; - if (prefetch || !use_cache) { - const Status s = - ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, - /*get_context=*/nullptr, lookup_context, &index_block); - if (!s.ok()) { - return s; - } - - if (use_cache && !pin) { - index_block.Reset(); - } - } - - // Note, failure to create prefix hash index does not need to be a - // hard error. We can still fall back to the original binary search index. - // So, Create will succeed regardless, from this point on. - - index_reader->reset(new HashIndexReader(table, std::move(index_block))); - - // Get prefixes block - BlockHandle prefixes_handle; - Status s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, - &prefixes_handle); - if (!s.ok()) { - // TODO: log error - return Status::OK(); - } - - // Get index metadata block - BlockHandle prefixes_meta_handle; - s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesMetadataBlock, - &prefixes_meta_handle); - if (!s.ok()) { - // TODO: log error - return Status::OK(); - } - - RandomAccessFileReader* const file = rep->file.get(); - const Footer& footer = rep->footer; - const ImmutableCFOptions& ioptions = rep->ioptions; - const PersistentCacheOptions& cache_options = rep->persistent_cache_options; - MemoryAllocator* const memory_allocator = - GetMemoryAllocator(rep->table_options); - - // Read contents for the blocks - BlockContents prefixes_contents; - BlockFetcher prefixes_block_fetcher( - file, prefetch_buffer, footer, ReadOptions(), prefixes_handle, - &prefixes_contents, ioptions, true /*decompress*/, - true /*maybe_compressed*/, BlockType::kHashIndexPrefixes, - UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); - s = prefixes_block_fetcher.ReadBlockContents(); - if (!s.ok()) { - return s; - } - BlockContents prefixes_meta_contents; - BlockFetcher prefixes_meta_block_fetcher( - file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle, - &prefixes_meta_contents, ioptions, true /*decompress*/, - true /*maybe_compressed*/, BlockType::kHashIndexMetadata, - UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); - s = prefixes_meta_block_fetcher.ReadBlockContents(); - if (!s.ok()) { - // TODO: log error - return Status::OK(); - } - - BlockPrefixIndex* prefix_index = nullptr; - assert(rep->internal_prefix_transform.get() != nullptr); - s = BlockPrefixIndex::Create(rep->internal_prefix_transform.get(), - prefixes_contents.data, - prefixes_meta_contents.data, &prefix_index); - // TODO: log error - if (s.ok()) { - HashIndexReader* const hash_index_reader = - static_cast(index_reader->get()); - hash_index_reader->prefix_index_.reset(prefix_index); - } - - return Status::OK(); - } - - InternalIteratorBase* NewIterator( - const ReadOptions& read_options, bool disable_prefix_seek, - IndexBlockIter* iter, GetContext* get_context, - BlockCacheLookupContext* lookup_context) override { - const BlockBasedTable::Rep* rep = table()->get_rep(); - const bool no_io = (read_options.read_tier == kBlockCacheTier); - CachableEntry index_block; - const Status s = - GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); - if (!s.ok()) { - if (iter != nullptr) { - iter->Invalidate(s); - return iter; - } - - return NewErrorInternalIterator(s); - } - - Statistics* kNullStats = nullptr; - const bool total_order_seek = - read_options.total_order_seek || disable_prefix_seek; - // We don't return pinned data from index blocks, so no need - // to set `block_contents_pinned`. - auto it = index_block.GetValue()->NewIndexIterator( - internal_comparator(), internal_comparator()->user_comparator(), - rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, - total_order_seek, index_has_first_key(), index_key_includes_seq(), - index_value_is_full(), false /* block_contents_pinned */, - prefix_index_.get()); - - assert(it != nullptr); - index_block.TransferTo(it); - - return it; - } - - size_t ApproximateMemoryUsage() const override { - size_t usage = ApproximateIndexBlockMemoryUsage(); -#ifdef ROCKSDB_MALLOC_USABLE_SIZE - usage += malloc_usable_size(const_cast(this)); -#else - if (prefix_index_) { - usage += prefix_index_->ApproximateMemoryUsage(); - } - usage += sizeof(*this); -#endif // ROCKSDB_MALLOC_USABLE_SIZE - return usage; - } - - private: - HashIndexReader(const BlockBasedTable* t, CachableEntry&& index_block) - : IndexReaderCommon(t, std::move(index_block)) {} - - std::unique_ptr prefix_index_; -}; - void BlockBasedTable::UpdateCacheHitMetrics(BlockType block_type, GetContext* get_context, size_t usage) const { @@ -1295,32 +731,6 @@ Status BlockBasedTable::PrefetchTail( return s; } -Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len, - uint32_t expected) { - Status s; - uint32_t actual = 0; - switch (type) { - case kNoChecksum: - break; - case kCRC32c: - expected = crc32c::Unmask(expected); - actual = crc32c::Value(buf, len); - break; - case kxxHash: - actual = XXH32(buf, static_cast(len), 0); - break; - case kxxHash64: - actual = static_cast(XXH64(buf, static_cast(len), 0) & - uint64_t{0xffffffff}); - break; - default: - s = Status::Corruption("unknown checksum type"); - } - if (s.ok() && actual != expected) { - s = Status::Corruption("properties block checksum mismatched"); - } - return s; -} Status BlockBasedTable::TryReadPropertiesWithGlobalSeqno( FilePrefetchBuffer* prefetch_buffer, const Slice& handle_value, @@ -1931,106 +1341,6 @@ InternalIteratorBase* BlockBasedTable::NewIndexIterator( lookup_context); } -// Convert an index iterator value (i.e., an encoded BlockHandle) -// into an iterator over the contents of the corresponding block. -// If input_iter is null, new a iterator -// If input_iter is not null, update this iter and return it -template -TBlockIter* BlockBasedTable::NewDataBlockIterator( - const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter, - BlockType block_type, GetContext* get_context, - BlockCacheLookupContext* lookup_context, Status s, - FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const { - PERF_TIMER_GUARD(new_table_block_iter_nanos); - - TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; - if (!s.ok()) { - iter->Invalidate(s); - return iter; - } - - CachableEntry uncompression_dict; - if (rep_->uncompression_dict_reader) { - const bool no_io = (ro.read_tier == kBlockCacheTier); - s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( - prefetch_buffer, no_io, get_context, lookup_context, - &uncompression_dict); - if (!s.ok()) { - iter->Invalidate(s); - return iter; - } - } - - const UncompressionDict& dict = uncompression_dict.GetValue() - ? *uncompression_dict.GetValue() - : UncompressionDict::GetEmptyDict(); - - CachableEntry block; - s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type, - get_context, lookup_context, for_compaction, - /* use_cache */ true); - - if (!s.ok()) { - assert(block.IsEmpty()); - iter->Invalidate(s); - return iter; - } - - assert(block.GetValue() != nullptr); - - // Block contents are pinned and it is still pinned after the iterator - // is destroyed as long as cleanup functions are moved to another object, - // when: - // 1. block cache handle is set to be released in cleanup function, or - // 2. it's pointing to immortal source. If own_bytes is true then we are - // not reading data from the original source, whether immortal or not. - // Otherwise, the block is pinned iff the source is immortal. - const bool block_contents_pinned = - block.IsCached() || - (!block.GetValue()->own_bytes() && rep_->immortal_table); - iter = InitBlockIterator(rep_, block.GetValue(), block_type, iter, - block_contents_pinned); - - if (!block.IsCached()) { - if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { - // insert a dummy record to block cache to track the memory usage - Cache* const block_cache = rep_->table_options.block_cache.get(); - Cache::Handle* cache_handle = nullptr; - // There are two other types of cache keys: 1) SST cache key added in - // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in - // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate - // from SST cache key(31 bytes), and use non-zero prefix to - // differentiate from `write_buffer_manager` - const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; - char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; - // Prefix: use rep_->cache_key_prefix padded by 0s - memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); - assert(rep_->cache_key_prefix_size != 0); - assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); - memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, - next_cache_key_id_++); - assert(end - cache_key <= - static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); - const Slice unique_key(cache_key, static_cast(end - cache_key)); - s = block_cache->Insert(unique_key, nullptr, - block.GetValue()->ApproximateMemoryUsage(), - nullptr, &cache_handle); - - if (s.ok()) { - assert(cache_handle != nullptr); - iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, - cache_handle); - } - } - } else { - iter->SetCacheHandle(block.GetCacheHandle()); - } - - block.TransferTo(iter); - - return iter; -} template <> DataBlockIter* BlockBasedTable::InitBlockIterator( @@ -2054,75 +1364,6 @@ IndexBlockIter* BlockBasedTable::InitBlockIterator( block_contents_pinned); } -// Convert an uncompressed data block (i.e CachableEntry) -// into an iterator over the contents of the corresponding block. -// If input_iter is null, new a iterator -// If input_iter is not null, update this iter and return it -template -TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro, - CachableEntry& block, - TBlockIter* input_iter, - Status s) const { - PERF_TIMER_GUARD(new_table_block_iter_nanos); - - TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; - if (!s.ok()) { - iter->Invalidate(s); - return iter; - } - - assert(block.GetValue() != nullptr); - // Block contents are pinned and it is still pinned after the iterator - // is destroyed as long as cleanup functions are moved to another object, - // when: - // 1. block cache handle is set to be released in cleanup function, or - // 2. it's pointing to immortal source. If own_bytes is true then we are - // not reading data from the original source, whether immortal or not. - // Otherwise, the block is pinned iff the source is immortal. - const bool block_contents_pinned = - block.IsCached() || - (!block.GetValue()->own_bytes() && rep_->immortal_table); - iter = InitBlockIterator(rep_, block.GetValue(), BlockType::kData, - iter, block_contents_pinned); - - if (!block.IsCached()) { - if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { - // insert a dummy record to block cache to track the memory usage - Cache* const block_cache = rep_->table_options.block_cache.get(); - Cache::Handle* cache_handle = nullptr; - // There are two other types of cache keys: 1) SST cache key added in - // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in - // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate - // from SST cache key(31 bytes), and use non-zero prefix to - // differentiate from `write_buffer_manager` - const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; - char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; - // Prefix: use rep_->cache_key_prefix padded by 0s - memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); - assert(rep_->cache_key_prefix_size != 0); - assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); - memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, - next_cache_key_id_++); - assert(end - cache_key <= - static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); - const Slice unique_key(cache_key, static_cast(end - cache_key)); - s = block_cache->Insert(unique_key, nullptr, - block.GetValue()->ApproximateMemoryUsage(), - nullptr, &cache_handle); - if (s.ok()) { - assert(cache_handle != nullptr); - iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, - cache_handle); - } - } - } else { - iter->SetCacheHandle(block.GetCacheHandle()); - } - - block.TransferTo(iter); - return iter; -} // If contents is nullptr, this function looks up the block caches for the // data block referenced by handle, and read the block from disk if necessary. @@ -2775,435 +2016,6 @@ bool BlockBasedTable::PrefixMayMatch( return may_match; } -template -void BlockBasedTableIterator::Seek(const Slice& target) { - SeekImpl(&target); -} - -template -void BlockBasedTableIterator::SeekToFirst() { - SeekImpl(nullptr); -} - -template -void BlockBasedTableIterator::SeekImpl( - const Slice* target) { - is_out_of_bound_ = false; - is_at_first_key_from_index_ = false; - if (target && !CheckPrefixMayMatch(*target, IterDirection::kForward)) { - ResetDataIter(); - return; - } - - bool need_seek_index = true; - if (block_iter_points_to_real_block_ && block_iter_.Valid()) { - // Reseek. - prev_block_offset_ = index_iter_->value().handle.offset(); - - if (target) { - // We can avoid an index seek if: - // 1. The new seek key is larger than the current key - // 2. The new seek key is within the upper bound of the block - // Since we don't necessarily know the internal key for either - // the current key or the upper bound, we check user keys and - // exclude the equality case. Considering internal keys can - // improve for the boundary cases, but it would complicate the - // code. - if (user_comparator_.Compare(ExtractUserKey(*target), - block_iter_.user_key()) > 0 && - user_comparator_.Compare(ExtractUserKey(*target), - index_iter_->user_key()) < 0) { - need_seek_index = false; - } - } - } - - if (need_seek_index) { - if (target) { - index_iter_->Seek(*target); - } else { - index_iter_->SeekToFirst(); - } - - if (!index_iter_->Valid()) { - ResetDataIter(); - return; - } - } - - IndexValue v = index_iter_->value(); - const bool same_block = block_iter_points_to_real_block_ && - v.handle.offset() == prev_block_offset_; - - // TODO(kolmike): Remove the != kBlockCacheTier condition. - if (!v.first_internal_key.empty() && !same_block && - (!target || icomp_.Compare(*target, v.first_internal_key) <= 0) && - read_options_.read_tier != kBlockCacheTier) { - // Index contains the first key of the block, and it's >= target. - // We can defer reading the block. - is_at_first_key_from_index_ = true; - // ResetDataIter() will invalidate block_iter_. Thus, there is no need to - // call CheckDataBlockWithinUpperBound() to check for iterate_upper_bound - // as that will be done later when the data block is actually read. - ResetDataIter(); - } else { - // Need to use the data block. - if (!same_block) { - InitDataBlock(); - } else { - // When the user does a reseek, the iterate_upper_bound might have - // changed. CheckDataBlockWithinUpperBound() needs to be called - // explicitly if the reseek ends up in the same data block. - // If the reseek ends up in a different block, InitDataBlock() will do - // the iterator upper bound check. - CheckDataBlockWithinUpperBound(); - } - - if (target) { - block_iter_.Seek(*target); - } else { - block_iter_.SeekToFirst(); - } - FindKeyForward(); - } - - CheckOutOfBound(); - - if (target) { - assert(!Valid() || ((block_type_ == BlockType::kIndex && - !table_->get_rep()->index_key_includes_seq) - ? (user_comparator_.Compare(ExtractUserKey(*target), - key()) <= 0) - : (icomp_.Compare(*target, key()) <= 0))); - } -} - -template -void BlockBasedTableIterator::SeekForPrev( - const Slice& target) { - is_out_of_bound_ = false; - is_at_first_key_from_index_ = false; - // For now totally disable prefix seek in auto prefix mode because we don't - // have logic - if (!CheckPrefixMayMatch(target, IterDirection::kBackward)) { - ResetDataIter(); - return; - } - - SavePrevIndexValue(); - - // Call Seek() rather than SeekForPrev() in the index block, because the - // target data block will likely to contain the position for `target`, the - // same as Seek(), rather than than before. - // For example, if we have three data blocks, each containing two keys: - // [2, 4] [6, 8] [10, 12] - // (the keys in the index block would be [4, 8, 12]) - // and the user calls SeekForPrev(7), we need to go to the second block, - // just like if they call Seek(7). - // The only case where the block is difference is when they seek to a position - // in the boundary. For example, if they SeekForPrev(5), we should go to the - // first block, rather than the second. However, we don't have the information - // to distinguish the two unless we read the second block. In this case, we'll - // end up with reading two blocks. - index_iter_->Seek(target); - - if (!index_iter_->Valid()) { - auto seek_status = index_iter_->status(); - // Check for IO error - if (!seek_status.IsNotFound() && !seek_status.ok()) { - ResetDataIter(); - return; - } - - // With prefix index, Seek() returns NotFound if the prefix doesn't exist - if (seek_status.IsNotFound()) { - // Any key less than the target is fine for prefix seek - ResetDataIter(); - return; - } else { - index_iter_->SeekToLast(); - } - // Check for IO error - if (!index_iter_->Valid()) { - ResetDataIter(); - return; - } - } - - InitDataBlock(); - - block_iter_.SeekForPrev(target); - - FindKeyBackward(); - CheckDataBlockWithinUpperBound(); - assert(!block_iter_.Valid() || - icomp_.Compare(target, block_iter_.key()) >= 0); -} - -template -void BlockBasedTableIterator::SeekToLast() { - is_out_of_bound_ = false; - is_at_first_key_from_index_ = false; - SavePrevIndexValue(); - index_iter_->SeekToLast(); - if (!index_iter_->Valid()) { - ResetDataIter(); - return; - } - InitDataBlock(); - block_iter_.SeekToLast(); - FindKeyBackward(); - CheckDataBlockWithinUpperBound(); -} - -template -void BlockBasedTableIterator::Next() { - if (is_at_first_key_from_index_ && !MaterializeCurrentBlock()) { - return; - } - assert(block_iter_points_to_real_block_); - block_iter_.Next(); - FindKeyForward(); - CheckOutOfBound(); -} - -template -bool BlockBasedTableIterator::NextAndGetResult( - IterateResult* result) { - Next(); - bool is_valid = Valid(); - if (is_valid) { - result->key = key(); - result->may_be_out_of_upper_bound = MayBeOutOfUpperBound(); - } - return is_valid; -} - -template -void BlockBasedTableIterator::Prev() { - if (is_at_first_key_from_index_) { - is_at_first_key_from_index_ = false; - - index_iter_->Prev(); - if (!index_iter_->Valid()) { - return; - } - - InitDataBlock(); - block_iter_.SeekToLast(); - } else { - assert(block_iter_points_to_real_block_); - block_iter_.Prev(); - } - - FindKeyBackward(); -} - -template -void BlockBasedTableIterator::InitDataBlock() { - BlockHandle data_block_handle = index_iter_->value().handle; - if (!block_iter_points_to_real_block_ || - data_block_handle.offset() != prev_block_offset_ || - // if previous attempt of reading the block missed cache, try again - block_iter_.status().IsIncomplete()) { - if (block_iter_points_to_real_block_) { - ResetDataIter(); - } - auto* rep = table_->get_rep(); - - // Prefetch additional data for range scans (iterators). Enabled only for - // user reads. - // Implicit auto readahead: - // Enabled after 2 sequential IOs when ReadOptions.readahead_size == 0. - // Explicit user requested readahead: - // Enabled from the very first IO when ReadOptions.readahead_size is set. - if (lookup_context_.caller != TableReaderCaller::kCompaction) { - if (read_options_.readahead_size == 0) { - // Implicit auto readahead - num_file_reads_++; - if (num_file_reads_ > - BlockBasedTable::kMinNumFileReadsToStartAutoReadahead) { - if (!rep->file->use_direct_io() && - (data_block_handle.offset() + - static_cast(block_size(data_block_handle)) > - readahead_limit_)) { - // Buffered I/O - // Discarding the return status of Prefetch calls intentionally, as - // we can fallback to reading from disk if Prefetch fails. - rep->file->Prefetch(data_block_handle.offset(), readahead_size_); - readahead_limit_ = static_cast(data_block_handle.offset() + - readahead_size_); - // Keep exponentially increasing readahead size until - // kMaxAutoReadaheadSize. - readahead_size_ = std::min(BlockBasedTable::kMaxAutoReadaheadSize, - readahead_size_ * 2); - } else if (rep->file->use_direct_io() && !prefetch_buffer_) { - // Direct I/O - // Let FilePrefetchBuffer take care of the readahead. - rep->CreateFilePrefetchBuffer( - BlockBasedTable::kInitAutoReadaheadSize, - BlockBasedTable::kMaxAutoReadaheadSize, &prefetch_buffer_); - } - } - } else if (!prefetch_buffer_) { - // Explicit user requested readahead - // The actual condition is: - // if (read_options_.readahead_size != 0 && !prefetch_buffer_) - rep->CreateFilePrefetchBuffer(read_options_.readahead_size, - read_options_.readahead_size, - &prefetch_buffer_); - } - } else if (!prefetch_buffer_) { - rep->CreateFilePrefetchBuffer(compaction_readahead_size_, - compaction_readahead_size_, - &prefetch_buffer_); - } - - Status s; - table_->NewDataBlockIterator( - read_options_, data_block_handle, &block_iter_, block_type_, - /*get_context=*/nullptr, &lookup_context_, s, prefetch_buffer_.get(), - /*for_compaction=*/lookup_context_.caller == - TableReaderCaller::kCompaction); - block_iter_points_to_real_block_ = true; - CheckDataBlockWithinUpperBound(); - } -} - -template -bool BlockBasedTableIterator::MaterializeCurrentBlock() { - assert(is_at_first_key_from_index_); - assert(!block_iter_points_to_real_block_); - assert(index_iter_->Valid()); - - is_at_first_key_from_index_ = false; - InitDataBlock(); - assert(block_iter_points_to_real_block_); - block_iter_.SeekToFirst(); - - if (!block_iter_.Valid() || - icomp_.Compare(block_iter_.key(), - index_iter_->value().first_internal_key) != 0) { - // Uh oh. - block_iter_.Invalidate(Status::Corruption( - "first key in index doesn't match first key in block")); - return false; - } - - return true; -} - -template -void BlockBasedTableIterator::FindKeyForward() { - // This method's code is kept short to make it likely to be inlined. - - assert(!is_out_of_bound_); - assert(block_iter_points_to_real_block_); - - if (!block_iter_.Valid()) { - // This is the only call site of FindBlockForward(), but it's extracted into - // a separate method to keep FindKeyForward() short and likely to be - // inlined. When transitioning to a different block, we call - // FindBlockForward(), which is much longer and is probably not inlined. - FindBlockForward(); - } else { - // This is the fast path that avoids a function call. - } -} - -template -void BlockBasedTableIterator::FindBlockForward() { - // TODO the while loop inherits from two-level-iterator. We don't know - // whether a block can be empty so it can be replaced by an "if". - do { - if (!block_iter_.status().ok()) { - return; - } - // Whether next data block is out of upper bound, if there is one. - const bool next_block_is_out_of_bound = - read_options_.iterate_upper_bound != nullptr && - block_iter_points_to_real_block_ && !data_block_within_upper_bound_; - assert(!next_block_is_out_of_bound || - user_comparator_.CompareWithoutTimestamp( - *read_options_.iterate_upper_bound, /*a_has_ts=*/false, - index_iter_->user_key(), /*b_has_ts=*/true) <= 0); - ResetDataIter(); - index_iter_->Next(); - if (next_block_is_out_of_bound) { - // The next block is out of bound. No need to read it. - TEST_SYNC_POINT_CALLBACK("BlockBasedTableIterator:out_of_bound", nullptr); - // We need to make sure this is not the last data block before setting - // is_out_of_bound_, since the index key for the last data block can be - // larger than smallest key of the next file on the same level. - if (index_iter_->Valid()) { - is_out_of_bound_ = true; - } - return; - } - - if (!index_iter_->Valid()) { - return; - } - - IndexValue v = index_iter_->value(); - - // TODO(kolmike): Remove the != kBlockCacheTier condition. - if (!v.first_internal_key.empty() && - read_options_.read_tier != kBlockCacheTier) { - // Index contains the first key of the block. Defer reading the block. - is_at_first_key_from_index_ = true; - return; - } - - InitDataBlock(); - block_iter_.SeekToFirst(); - } while (!block_iter_.Valid()); -} - -template -void BlockBasedTableIterator::FindKeyBackward() { - while (!block_iter_.Valid()) { - if (!block_iter_.status().ok()) { - return; - } - - ResetDataIter(); - index_iter_->Prev(); - - if (index_iter_->Valid()) { - InitDataBlock(); - block_iter_.SeekToLast(); - } else { - return; - } - } - - // We could have check lower bound here too, but we opt not to do it for - // code simplicity. -} - -template -void BlockBasedTableIterator::CheckOutOfBound() { - if (read_options_.iterate_upper_bound != nullptr && Valid()) { - is_out_of_bound_ = - user_comparator_.CompareWithoutTimestamp( - *read_options_.iterate_upper_bound, /*a_has_ts=*/false, user_key(), - /*b_has_ts=*/true) <= 0; - } -} - -template -void BlockBasedTableIterator::CheckDataBlockWithinUpperBound() { - if (read_options_.iterate_upper_bound != nullptr && - block_iter_points_to_real_block_) { - data_block_within_upper_bound_ = - (user_comparator_.CompareWithoutTimestamp( - *read_options_.iterate_upper_bound, /*a_has_ts=*/false, - index_iter_->user_key(), - /*b_has_ts=*/true) > 0); - } -} InternalIterator* BlockBasedTable::NewIterator( const ReadOptions& read_options, const SliceTransform* prefix_extractor, diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h index d04325bec..536fad2c0 100644 --- a/table/block_based/block_based_table_reader.h +++ b/table/block_based/block_based_table_reader.h @@ -9,38 +9,18 @@ #pragma once -#include -#include -#include -#include -#include -#include - #include "db/range_tombstone_fragmenter.h" #include "file/filename.h" -#include "file/random_access_file_reader.h" -#include "options/cf_options.h" -#include "rocksdb/options.h" -#include "rocksdb/persistent_cache.h" -#include "rocksdb/statistics.h" -#include "rocksdb/status.h" -#include "rocksdb/table.h" -#include "table/block_based/block.h" +#include "table/block_based/cachable_entry.h" #include "table/block_based/block_based_table_factory.h" #include "table/block_based/block_type.h" -#include "table/block_based/cachable_entry.h" #include "table/block_based/filter_block.h" #include "table/block_based/uncompression_dict_reader.h" -#include "table/format.h" -#include "table/get_context.h" -#include "table/multiget_context.h" -#include "table/persistent_cache_helper.h" #include "table/table_properties_internal.h" #include "table/table_reader.h" #include "table/two_level_iterator.h" + #include "trace_replay/block_cache_tracer.h" -#include "util/coding.h" -#include "util/user_comparator_wrapper.h" namespace ROCKSDB_NAMESPACE { @@ -617,211 +597,4 @@ struct BlockBasedTable::Rep { !ioptions.allow_mmap_reads /* enable */)); } }; - -// Iterates over the contents of BlockBasedTable. -template -class BlockBasedTableIterator : public InternalIteratorBase { - // compaction_readahead_size: its value will only be used if for_compaction = - // true - public: - BlockBasedTableIterator(const BlockBasedTable* table, - const ReadOptions& read_options, - const InternalKeyComparator& icomp, - InternalIteratorBase* index_iter, - bool check_filter, bool need_upper_bound_check, - const SliceTransform* prefix_extractor, - BlockType block_type, TableReaderCaller caller, - size_t compaction_readahead_size = 0) - : table_(table), - read_options_(read_options), - icomp_(icomp), - user_comparator_(icomp.user_comparator()), - index_iter_(index_iter), - pinned_iters_mgr_(nullptr), - block_iter_points_to_real_block_(false), - check_filter_(check_filter), - need_upper_bound_check_(need_upper_bound_check), - prefix_extractor_(prefix_extractor), - block_type_(block_type), - lookup_context_(caller), - compaction_readahead_size_(compaction_readahead_size) {} - - ~BlockBasedTableIterator() { delete index_iter_; } - - void Seek(const Slice& target) override; - void SeekForPrev(const Slice& target) override; - void SeekToFirst() override; - void SeekToLast() override; - void Next() final override; - bool NextAndGetResult(IterateResult* result) override; - void Prev() override; - bool Valid() const override { - return !is_out_of_bound_ && - (is_at_first_key_from_index_ || - (block_iter_points_to_real_block_ && block_iter_.Valid())); - } - Slice key() const override { - assert(Valid()); - if (is_at_first_key_from_index_) { - return index_iter_->value().first_internal_key; - } else { - return block_iter_.key(); - } - } - Slice user_key() const override { - assert(Valid()); - if (is_at_first_key_from_index_) { - return ExtractUserKey(index_iter_->value().first_internal_key); - } else { - return block_iter_.user_key(); - } - } - TValue value() const override { - assert(Valid()); - - // Load current block if not loaded. - if (is_at_first_key_from_index_ && - !const_cast(this) - ->MaterializeCurrentBlock()) { - // Oops, index is not consistent with block contents, but we have - // no good way to report error at this point. Let's return empty value. - return TValue(); - } - - return block_iter_.value(); - } - Status status() const override { - // Prefix index set status to NotFound when the prefix does not exist - if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) { - return index_iter_->status(); - } else if (block_iter_points_to_real_block_) { - return block_iter_.status(); - } else { - return Status::OK(); - } - } - - // Whether iterator invalidated for being out of bound. - bool IsOutOfBound() override { return is_out_of_bound_; } - - inline bool MayBeOutOfUpperBound() override { - assert(Valid()); - return !data_block_within_upper_bound_; - } - - void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { - pinned_iters_mgr_ = pinned_iters_mgr; - } - bool IsKeyPinned() const override { - // Our key comes either from block_iter_'s current key - // or index_iter_'s current *value*. - return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && - ((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) || - (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned())); - } - bool IsValuePinned() const override { - // Load current block if not loaded. - if (is_at_first_key_from_index_) { - const_cast(this)->MaterializeCurrentBlock(); - } - // BlockIter::IsValuePinned() is always true. No need to check - return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && - block_iter_points_to_real_block_; - } - - void ResetDataIter() { - if (block_iter_points_to_real_block_) { - if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) { - block_iter_.DelegateCleanupsTo(pinned_iters_mgr_); - } - block_iter_.Invalidate(Status::OK()); - block_iter_points_to_real_block_ = false; - } - } - - void SavePrevIndexValue() { - if (block_iter_points_to_real_block_) { - // Reseek. If they end up with the same data block, we shouldn't re-fetch - // the same data block. - prev_block_offset_ = index_iter_->value().handle.offset(); - } - } - - private: - enum class IterDirection { - kForward, - kBackward, - }; - - const BlockBasedTable* table_; - const ReadOptions read_options_; - const InternalKeyComparator& icomp_; - UserComparatorWrapper user_comparator_; - InternalIteratorBase* index_iter_; - PinnedIteratorsManager* pinned_iters_mgr_; - TBlockIter block_iter_; - - // True if block_iter_ is initialized and points to the same block - // as index iterator. - bool block_iter_points_to_real_block_; - // See InternalIteratorBase::IsOutOfBound(). - bool is_out_of_bound_ = false; - // Whether current data block being fully within iterate upper bound. - bool data_block_within_upper_bound_ = false; - // True if we're standing at the first key of a block, and we haven't loaded - // that block yet. A call to value() will trigger loading the block. - bool is_at_first_key_from_index_ = false; - bool check_filter_; - // TODO(Zhongyi): pick a better name - bool need_upper_bound_check_; - const SliceTransform* prefix_extractor_; - BlockType block_type_; - uint64_t prev_block_offset_ = std::numeric_limits::max(); - BlockCacheLookupContext lookup_context_; - // Readahead size used in compaction, its value is used only if - // lookup_context_.caller = kCompaction. - size_t compaction_readahead_size_; - - size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize; - size_t readahead_limit_ = 0; - int64_t num_file_reads_ = 0; - std::unique_ptr prefetch_buffer_; - - // If `target` is null, seek to first. - void SeekImpl(const Slice* target); - - void InitDataBlock(); - bool MaterializeCurrentBlock(); - void FindKeyForward(); - void FindBlockForward(); - void FindKeyBackward(); - void CheckOutOfBound(); - - // Check if data block is fully within iterate_upper_bound. - // - // Note MyRocks may update iterate bounds between seek. To workaround it, - // we need to check and update data_block_within_upper_bound_ accordingly. - void CheckDataBlockWithinUpperBound(); - - bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction) { - if (need_upper_bound_check_ && direction == IterDirection::kBackward) { - // Upper bound check isn't sufficnet for backward direction to - // guarantee the same result as total order, so disable prefix - // check. - return true; - } - if (check_filter_ && - !table_->PrefixMayMatch(ikey, read_options_, prefix_extractor_, - need_upper_bound_check_, &lookup_context_)) { - // TODO remember the iterator is invalidated because of prefix - // match. This can avoid the upper level file iterator to falsely - // believe the position is the end of the SST file and move to - // the first key of the next file. - ResetDataIter(); - return false; - } - return true; - } -}; - } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_reader_impl.h b/table/block_based/block_based_table_reader_impl.h new file mode 100644 index 000000000..d9cfaa92c --- /dev/null +++ b/table/block_based/block_based_table_reader_impl.h @@ -0,0 +1,190 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "table/block_based/block_based_table_reader.h" + +#include "table/block_based/reader_common.h" + +// The file contains some member functions of BlockBasedTable that +// cannot be implemented in block_based_table_reader.cc because +// it's called by other files (e.g. block_based_iterator.h) and +// are templates. + +namespace ROCKSDB_NAMESPACE { +// Convert an index iterator value (i.e., an encoded BlockHandle) +// into an iterator over the contents of the corresponding block. +// If input_iter is null, new a iterator +// If input_iter is not null, update this iter and return it +template +TBlockIter* BlockBasedTable::NewDataBlockIterator( + const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter, + BlockType block_type, GetContext* get_context, + BlockCacheLookupContext* lookup_context, Status s, + FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const { + PERF_TIMER_GUARD(new_table_block_iter_nanos); + + TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + + CachableEntry uncompression_dict; + if (rep_->uncompression_dict_reader) { + const bool no_io = (ro.read_tier == kBlockCacheTier); + s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( + prefetch_buffer, no_io, get_context, lookup_context, + &uncompression_dict); + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + } + + const UncompressionDict& dict = uncompression_dict.GetValue() + ? *uncompression_dict.GetValue() + : UncompressionDict::GetEmptyDict(); + + CachableEntry block; + s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type, + get_context, lookup_context, for_compaction, + /* use_cache */ true); + + if (!s.ok()) { + assert(block.IsEmpty()); + iter->Invalidate(s); + return iter; + } + + assert(block.GetValue() != nullptr); + + // Block contents are pinned and it is still pinned after the iterator + // is destroyed as long as cleanup functions are moved to another object, + // when: + // 1. block cache handle is set to be released in cleanup function, or + // 2. it's pointing to immortal source. If own_bytes is true then we are + // not reading data from the original source, whether immortal or not. + // Otherwise, the block is pinned iff the source is immortal. + const bool block_contents_pinned = + block.IsCached() || + (!block.GetValue()->own_bytes() && rep_->immortal_table); + iter = InitBlockIterator(rep_, block.GetValue(), block_type, iter, + block_contents_pinned); + + if (!block.IsCached()) { + if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { + // insert a dummy record to block cache to track the memory usage + Cache* const block_cache = rep_->table_options.block_cache.get(); + Cache::Handle* cache_handle = nullptr; + // There are two other types of cache keys: 1) SST cache key added in + // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in + // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate + // from SST cache key(31 bytes), and use non-zero prefix to + // differentiate from `write_buffer_manager` + const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; + char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; + // Prefix: use rep_->cache_key_prefix padded by 0s + memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); + assert(rep_->cache_key_prefix_size != 0); + assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); + memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); + char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, + next_cache_key_id_++); + assert(end - cache_key <= + static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); + const Slice unique_key(cache_key, static_cast(end - cache_key)); + s = block_cache->Insert(unique_key, nullptr, + block.GetValue()->ApproximateMemoryUsage(), + nullptr, &cache_handle); + + if (s.ok()) { + assert(cache_handle != nullptr); + iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, + cache_handle); + } + } + } else { + iter->SetCacheHandle(block.GetCacheHandle()); + } + + block.TransferTo(iter); + + return iter; +} + +// Convert an uncompressed data block (i.e CachableEntry) +// into an iterator over the contents of the corresponding block. +// If input_iter is null, new a iterator +// If input_iter is not null, update this iter and return it +template +TBlockIter* BlockBasedTable::NewDataBlockIterator(const ReadOptions& ro, + CachableEntry& block, + TBlockIter* input_iter, + Status s) const { + PERF_TIMER_GUARD(new_table_block_iter_nanos); + + TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + + assert(block.GetValue() != nullptr); + // Block contents are pinned and it is still pinned after the iterator + // is destroyed as long as cleanup functions are moved to another object, + // when: + // 1. block cache handle is set to be released in cleanup function, or + // 2. it's pointing to immortal source. If own_bytes is true then we are + // not reading data from the original source, whether immortal or not. + // Otherwise, the block is pinned iff the source is immortal. + const bool block_contents_pinned = + block.IsCached() || + (!block.GetValue()->own_bytes() && rep_->immortal_table); + iter = InitBlockIterator(rep_, block.GetValue(), BlockType::kData, + iter, block_contents_pinned); + + if (!block.IsCached()) { + if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) { + // insert a dummy record to block cache to track the memory usage + Cache* const block_cache = rep_->table_options.block_cache.get(); + Cache::Handle* cache_handle = nullptr; + // There are two other types of cache keys: 1) SST cache key added in + // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in + // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate + // from SST cache key(31 bytes), and use non-zero prefix to + // differentiate from `write_buffer_manager` + const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; + char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; + // Prefix: use rep_->cache_key_prefix padded by 0s + memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); + assert(rep_->cache_key_prefix_size != 0); + assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix); + memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size); + char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, + next_cache_key_id_++); + assert(end - cache_key <= + static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); + const Slice unique_key(cache_key, static_cast(end - cache_key)); + s = block_cache->Insert(unique_key, nullptr, + block.GetValue()->ApproximateMemoryUsage(), + nullptr, &cache_handle); + if (s.ok()) { + assert(cache_handle != nullptr); + iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, + cache_handle); + } + } + } else { + iter->SetCacheHandle(block.GetCacheHandle()); + } + + block.TransferTo(iter); + return iter; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/hash_index_reader.cc b/table/block_based/hash_index_reader.cc new file mode 100644 index 000000000..c1648bbe1 --- /dev/null +++ b/table/block_based/hash_index_reader.cc @@ -0,0 +1,146 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/hash_index_reader.h" + +#include "table/block_fetcher.h" +#include "table/meta_blocks.h" + +namespace ROCKSDB_NAMESPACE { +Status HashIndexReader::Create(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_index_iter, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader) { + assert(table != nullptr); + assert(index_reader != nullptr); + assert(!pin || prefetch); + + const BlockBasedTable::Rep* rep = table->get_rep(); + assert(rep != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = + ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + /*get_context=*/nullptr, lookup_context, &index_block); + if (!s.ok()) { + return s; + } + + if (use_cache && !pin) { + index_block.Reset(); + } + } + + // Note, failure to create prefix hash index does not need to be a + // hard error. We can still fall back to the original binary search index. + // So, Create will succeed regardless, from this point on. + + index_reader->reset(new HashIndexReader(table, std::move(index_block))); + + // Get prefixes block + BlockHandle prefixes_handle; + Status s = + FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, &prefixes_handle); + if (!s.ok()) { + // TODO: log error + return Status::OK(); + } + + // Get index metadata block + BlockHandle prefixes_meta_handle; + s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesMetadataBlock, + &prefixes_meta_handle); + if (!s.ok()) { + // TODO: log error + return Status::OK(); + } + + RandomAccessFileReader* const file = rep->file.get(); + const Footer& footer = rep->footer; + const ImmutableCFOptions& ioptions = rep->ioptions; + const PersistentCacheOptions& cache_options = rep->persistent_cache_options; + MemoryAllocator* const memory_allocator = + GetMemoryAllocator(rep->table_options); + + // Read contents for the blocks + BlockContents prefixes_contents; + BlockFetcher prefixes_block_fetcher( + file, prefetch_buffer, footer, ReadOptions(), prefixes_handle, + &prefixes_contents, ioptions, true /*decompress*/, + true /*maybe_compressed*/, BlockType::kHashIndexPrefixes, + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); + s = prefixes_block_fetcher.ReadBlockContents(); + if (!s.ok()) { + return s; + } + BlockContents prefixes_meta_contents; + BlockFetcher prefixes_meta_block_fetcher( + file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle, + &prefixes_meta_contents, ioptions, true /*decompress*/, + true /*maybe_compressed*/, BlockType::kHashIndexMetadata, + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); + s = prefixes_meta_block_fetcher.ReadBlockContents(); + if (!s.ok()) { + // TODO: log error + return Status::OK(); + } + + BlockPrefixIndex* prefix_index = nullptr; + assert(rep->internal_prefix_transform.get() != nullptr); + s = BlockPrefixIndex::Create(rep->internal_prefix_transform.get(), + prefixes_contents.data, + prefixes_meta_contents.data, &prefix_index); + // TODO: log error + if (s.ok()) { + HashIndexReader* const hash_index_reader = + static_cast(index_reader->get()); + hash_index_reader->prefix_index_.reset(prefix_index); + } + + return Status::OK(); +} + +InternalIteratorBase* HashIndexReader::NewIterator( + const ReadOptions& read_options, bool disable_prefix_seek, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) { + const BlockBasedTable::Rep* rep = table()->get_rep(); + const bool no_io = (read_options.read_tier == kBlockCacheTier); + CachableEntry index_block; + const Status s = + GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + Statistics* kNullStats = nullptr; + const bool total_order_seek = + read_options.total_order_seek || disable_prefix_seek; + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + auto it = index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), iter, kNullStats, + total_order_seek, index_has_first_key(), index_key_includes_seq(), + index_value_is_full(), false /* block_contents_pinned */, + prefix_index_.get()); + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/hash_index_reader.h b/table/block_based/hash_index_reader.h new file mode 100644 index 000000000..fecd1e5c8 --- /dev/null +++ b/table/block_based/hash_index_reader.h @@ -0,0 +1,49 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include "table/block_based/index_reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Index that leverages an internal hash table to quicken the lookup for a given +// key. +class HashIndexReader : public BlockBasedTable::IndexReaderCommon { + public: + static Status Create(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_index_iter, bool use_cache, + bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader); + + InternalIteratorBase* NewIterator( + const ReadOptions& read_options, bool disable_prefix_seek, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) override; + + size_t ApproximateMemoryUsage() const override { + size_t usage = ApproximateIndexBlockMemoryUsage(); +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size(const_cast(this)); +#else + if (prefix_index_) { + usage += prefix_index_->ApproximateMemoryUsage(); + } + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + return usage; + } + + private: + HashIndexReader(const BlockBasedTable* t, CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) {} + + std::unique_ptr prefix_index_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/index_reader_common.cc b/table/block_based/index_reader_common.cc new file mode 100644 index 000000000..76f894d59 --- /dev/null +++ b/table/block_based/index_reader_common.cc @@ -0,0 +1,54 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/index_reader_common.h" + +namespace ROCKSDB_NAMESPACE { +Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& read_options, bool use_cache, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* index_block) { + PERF_TIMER_GUARD(read_index_block_nanos); + + assert(table != nullptr); + assert(index_block != nullptr); + assert(index_block->IsEmpty()); + + const Rep* const rep = table->get_rep(); + assert(rep != nullptr); + + const Status s = table->RetrieveBlock( + prefetch_buffer, read_options, rep->footer.index_handle(), + UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex, + get_context, lookup_context, /* for_compaction */ false, use_cache); + + return s; +} + +Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock( + bool no_io, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* index_block) const { + assert(index_block != nullptr); + + if (!index_block_.IsEmpty()) { + index_block->SetUnownedValue(index_block_.GetValue()); + return Status::OK(); + } + + ReadOptions read_options; + if (no_io) { + read_options.read_tier = kBlockCacheTier; + } + + return ReadIndexBlock(table_, /*prefetch_buffer=*/nullptr, read_options, + cache_index_blocks(), get_context, lookup_context, + index_block); +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/index_reader_common.h b/table/block_based/index_reader_common.h new file mode 100644 index 000000000..71174a7d3 --- /dev/null +++ b/table/block_based/index_reader_common.h @@ -0,0 +1,85 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include "table/block_based/block_based_table_reader.h" + +#include "table/block_based/reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Encapsulates common functionality for the various index reader +// implementations. Provides access to the index block regardless of whether +// it is owned by the reader or stored in the cache, or whether it is pinned +// in the cache or not. +class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader { + public: + IndexReaderCommon(const BlockBasedTable* t, + CachableEntry&& index_block) + : table_(t), index_block_(std::move(index_block)) { + assert(table_ != nullptr); + } + + protected: + static Status ReadIndexBlock(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& read_options, bool use_cache, + GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* index_block); + + const BlockBasedTable* table() const { return table_; } + + const InternalKeyComparator* internal_comparator() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + + return &table_->get_rep()->internal_comparator; + } + + bool index_has_first_key() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + return table_->get_rep()->index_has_first_key; + } + + bool index_key_includes_seq() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + return table_->get_rep()->index_key_includes_seq; + } + + bool index_value_is_full() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + return table_->get_rep()->index_value_is_full; + } + + bool cache_index_blocks() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + return table_->get_rep()->table_options.cache_index_and_filter_blocks; + } + + Status GetOrReadIndexBlock(bool no_io, GetContext* get_context, + BlockCacheLookupContext* lookup_context, + CachableEntry* index_block) const; + + size_t ApproximateIndexBlockMemoryUsage() const { + assert(!index_block_.GetOwnValue() || index_block_.GetValue() != nullptr); + return index_block_.GetOwnValue() + ? index_block_.GetValue()->ApproximateMemoryUsage() + : 0; + } + + private: + const BlockBasedTable* table_; + CachableEntry index_block_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc new file mode 100644 index 000000000..aacbb6053 --- /dev/null +++ b/table/block_based/partitioned_index_reader.cc @@ -0,0 +1,175 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/partitioned_index_reader.h" +#include "table/block_based/block_based_table_iterator.h" + +namespace ROCKSDB_NAMESPACE { +Status PartitionIndexReader::Create( + const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + bool use_cache, bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = + ReadIndexBlock(table, prefetch_buffer, ReadOptions(), use_cache, + /*get_context=*/nullptr, lookup_context, &index_block); + if (!s.ok()) { + return s; + } + + if (use_cache && !pin) { + index_block.Reset(); + } + } + + index_reader->reset(new PartitionIndexReader(table, std::move(index_block))); + + return Status::OK(); +} + +InternalIteratorBase* PartitionIndexReader::NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) { + const bool no_io = (read_options.read_tier == kBlockCacheTier); + CachableEntry index_block; + const Status s = + GetOrReadIndexBlock(no_io, get_context, lookup_context, &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + const BlockBasedTable::Rep* rep = table()->rep_; + InternalIteratorBase* it = nullptr; + + Statistics* kNullStats = nullptr; + // Filters are already checked before seeking the index + if (!partition_map_.empty()) { + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + it = NewTwoLevelIterator( + new BlockBasedTable::PartitionedIndexIteratorState(table(), + &partition_map_), + index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), + index_value_is_full())); + } else { + ReadOptions ro; + ro.fill_cache = read_options.fill_cache; + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + it = new BlockBasedTableIterator( + table(), ro, *internal_comparator(), + index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), nullptr, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), + index_value_is_full()), + false, true, /* prefix_extractor */ nullptr, BlockType::kIndex, + lookup_context ? lookup_context->caller + : TableReaderCaller::kUncategorized); + } + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + + // TODO(myabandeh): Update TwoLevelIterator to be able to make use of + // on-stack BlockIter while the state is on heap. Currentlly it assumes + // the first level iter is always on heap and will attempt to delete it + // in its destructor. +} +void PartitionIndexReader::CacheDependencies(bool pin) { + // Before read partitions, prefetch them to avoid lots of IOs + BlockCacheLookupContext lookup_context{TableReaderCaller::kPrefetch}; + const BlockBasedTable::Rep* rep = table()->rep_; + IndexBlockIter biter; + BlockHandle handle; + Statistics* kNullStats = nullptr; + + CachableEntry index_block; + Status s = GetOrReadIndexBlock(false /* no_io */, nullptr /* get_context */, + &lookup_context, &index_block); + if (!s.ok()) { + ROCKS_LOG_WARN(rep->ioptions.info_log, + "Error retrieving top-level index block while trying to " + "cache index partitions: %s", + s.ToString().c_str()); + return; + } + + // We don't return pinned data from index blocks, so no need + // to set `block_contents_pinned`. + index_block.GetValue()->NewIndexIterator( + internal_comparator(), internal_comparator()->user_comparator(), + rep->get_global_seqno(BlockType::kIndex), &biter, kNullStats, true, + index_has_first_key(), index_key_includes_seq(), index_value_is_full()); + // Index partitions are assumed to be consecuitive. Prefetch them all. + // Read the first block offset + biter.SeekToFirst(); + if (!biter.Valid()) { + // Empty index. + return; + } + handle = biter.value().handle; + uint64_t prefetch_off = handle.offset(); + + // Read the last block's offset + biter.SeekToLast(); + if (!biter.Valid()) { + // Empty index. + return; + } + handle = biter.value().handle; + uint64_t last_off = handle.offset() + block_size(handle); + uint64_t prefetch_len = last_off - prefetch_off; + std::unique_ptr prefetch_buffer; + rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer); + s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off, + static_cast(prefetch_len)); + + // After prefetch, read the partitions one by one + biter.SeekToFirst(); + auto ro = ReadOptions(); + for (; biter.Valid(); biter.Next()) { + handle = biter.value().handle; + CachableEntry block; + // TODO: Support counter batch update for partitioned index and + // filter blocks + s = table()->MaybeReadBlockAndLoadToCache( + prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(), + &block, BlockType::kIndex, /*get_context=*/nullptr, &lookup_context, + /*contents=*/nullptr); + + assert(s.ok() || block.GetValue() == nullptr); + if (s.ok() && block.GetValue() != nullptr) { + if (block.IsCached()) { + if (pin) { + partition_map_[handle.offset()] = std::move(block); + } + } + } + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/partitioned_index_reader.h b/table/block_based/partitioned_index_reader.h new file mode 100644 index 000000000..86397fd58 --- /dev/null +++ b/table/block_based/partitioned_index_reader.h @@ -0,0 +1,51 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include "table/block_based/index_reader_common.h" + +namespace ROCKSDB_NAMESPACE { +// Index that allows binary search lookup in a two-level index structure. +class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { + public: + // Read the partition index from the file and create an instance for + // `PartitionIndexReader`. + // On success, index_reader will be populated; otherwise it will remain + // unmodified. + static Status Create(const BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, + BlockCacheLookupContext* lookup_context, + std::unique_ptr* index_reader); + + // return a two-level iterator: first level is on the partition index + InternalIteratorBase* NewIterator( + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context, + BlockCacheLookupContext* lookup_context) override; + + void CacheDependencies(bool pin) override; + size_t ApproximateMemoryUsage() const override { + size_t usage = ApproximateIndexBlockMemoryUsage(); +#ifdef ROCKSDB_MALLOC_USABLE_SIZE + usage += malloc_usable_size(const_cast(this)); +#else + usage += sizeof(*this); +#endif // ROCKSDB_MALLOC_USABLE_SIZE + // TODO(myabandeh): more accurate estimate of partition_map_ mem usage + return usage; + } + + private: + PartitionIndexReader(const BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) {} + + std::unordered_map> partition_map_; +}; +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/reader_common.cc b/table/block_based/reader_common.cc new file mode 100644 index 000000000..4164e4ce1 --- /dev/null +++ b/table/block_based/reader_common.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "table/block_based/reader_common.h" + +#include "util/crc32c.h" +#include "util/xxhash.h" + +namespace ROCKSDB_NAMESPACE { +void ForceReleaseCachedEntry(void* arg, void* h) { + Cache* cache = reinterpret_cast(arg); + Cache::Handle* handle = reinterpret_cast(h); + cache->Release(handle, true /* force_erase */); +} + +Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len, + uint32_t expected) { + Status s; + uint32_t actual = 0; + switch (type) { + case kNoChecksum: + break; + case kCRC32c: + expected = crc32c::Unmask(expected); + actual = crc32c::Value(buf, len); + break; + case kxxHash: + actual = XXH32(buf, static_cast(len), 0); + break; + case kxxHash64: + actual = static_cast(XXH64(buf, static_cast(len), 0) & + uint64_t{0xffffffff}); + break; + default: + s = Status::Corruption("unknown checksum type"); + } + if (s.ok() && actual != expected) { + s = Status::Corruption("properties block checksum mismatched"); + } + return s; +} +} // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/reader_common.h b/table/block_based/reader_common.h new file mode 100644 index 000000000..8fa68d49c --- /dev/null +++ b/table/block_based/reader_common.h @@ -0,0 +1,33 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include "rocksdb/table.h" + +namespace ROCKSDB_NAMESPACE { +// Release the cached entry and decrement its ref count. +extern void ForceReleaseCachedEntry(void* arg, void* h); + +inline MemoryAllocator* GetMemoryAllocator( + const BlockBasedTableOptions& table_options) { + return table_options.block_cache.get() + ? table_options.block_cache->memory_allocator() + : nullptr; +} + +inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock( + const BlockBasedTableOptions& table_options) { + return table_options.block_cache_compressed.get() + ? table_options.block_cache_compressed->memory_allocator() + : nullptr; +} + +extern Status VerifyChecksum(const ChecksumType type, const char* buf, + size_t len, uint32_t expected); +} // namespace ROCKSDB_NAMESPACE