diff --git a/HISTORY.md b/HISTORY.md index 40d11096d..55366b006 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,8 @@ ## Unreleased ### Public API Change * Now DB::Close() will return Aborted() error when there is unreleased snapshot. Users can retry after all snapshots are released. +* Partitions of partitioned indexes no longer affect the read amplification statistics. +* Due to a refactoring, block cache eviction statistics for indexes are temporarily broken. We plan to reintroduce them in a later phase. ### New Features * Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature. diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index f6e1aad32..8eb73a23d 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -365,7 +365,10 @@ TEST_F(DBBlockCacheTest, IndexAndFilterBlocksStats) { ASSERT_EQ(cache->GetUsage(), index_bytes_insert + filter_bytes_insert); // set the cache capacity to the current usage cache->SetCapacity(index_bytes_insert + filter_bytes_insert); - ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), 0); + // The index eviction statistics were broken by the refactoring that moved + // the index readers out of the block cache. Disabling these until we can + // bring the stats back. + // ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), 0); ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_EVICT), 0); // Note that the second key needs to be no longer than the first one. // Otherwise the second index block may not fit in cache. @@ -377,8 +380,11 @@ TEST_F(DBBlockCacheTest, IndexAndFilterBlocksStats) { index_bytes_insert); ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_INSERT), filter_bytes_insert); - ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), - index_bytes_insert); + // The index eviction statistics were broken by the refactoring that moved + // the index readers out of the block cache. Disabling these until we can + // bring the stats back. + // ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), + // index_bytes_insert); ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_EVICT), filter_bytes_insert); } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index a45fc0a5b..82f964926 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -120,16 +120,8 @@ void DeleteCachedEntry(const Slice& /*key*/, void* value) { } void DeleteCachedFilterEntry(const Slice& key, void* value); -void DeleteCachedIndexEntry(const Slice& key, void* value); void DeleteCachedUncompressionDictEntry(const Slice& key, void* value); -// Release the cached entry and decrement its ref count. -void ReleaseCachedEntry(void* arg, void* h) { - Cache* cache = reinterpret_cast(arg); - Cache::Handle* handle = reinterpret_cast(h); - cache->Release(handle); -} - // Release the cached entry and decrement its ref count. void ForceReleaseCachedEntry(void* arg, void* h) { Cache* cache = reinterpret_cast(arg); @@ -137,17 +129,6 @@ void ForceReleaseCachedEntry(void* arg, void* h) { cache->Release(handle, true /* force_erase */); } -Slice GetCacheKeyFromOffset(const char* cache_key_prefix, - size_t cache_key_prefix_size, uint64_t offset, - char* cache_key) { - assert(cache_key != nullptr); - assert(cache_key_prefix_size != 0); - assert(cache_key_prefix_size <= BlockBasedTable::kMaxCacheKeyPrefixSize); - memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + cache_key_prefix_size, offset); - return Slice(cache_key, static_cast(end - cache_key)); -} - Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, int level, Tickers block_cache_miss_ticker, Tickers block_cache_hit_ticker, @@ -217,70 +198,193 @@ bool PrefixExtractorChanged(const TableProperties* table_properties, } // namespace +// Encapsulates common functionality for the various index reader +// implementations. Provides access to the index block regardless of whether +// it is owned by the reader or stored in the cache, or whether it is pinned +// in the cache or not. +class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader { +public: + IndexReaderCommon(BlockBasedTable* t, + CachableEntry&& index_block) + : table_(t) + , index_block_(std::move(index_block)) + { + assert(table_ != nullptr); + } + +protected: + static Status ReadIndexBlock(BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, const ReadOptions& read_options, + GetContext* get_context, CachableEntry* index_block); + + BlockBasedTable* table() const { return table_; } + + const InternalKeyComparator* internal_comparator() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + + return &table_->get_rep()->internal_comparator; + } + + bool index_key_includes_seq() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + + const TableProperties* const properties = + table_->get_rep()->table_properties.get(); + + return properties == nullptr || !properties->index_key_is_user_key; + } + + bool index_value_is_full() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + + const TableProperties* const properties = + table_->get_rep()->table_properties.get(); + + return properties == nullptr || !properties->index_value_is_delta_encoded; + } + + Status GetOrReadIndexBlock(const ReadOptions& read_options, + GetContext* get_context, + CachableEntry* index_block) const; + + size_t ApproximateIndexBlockMemoryUsage() const { + assert(!index_block_.GetOwnValue() || index_block_.GetValue() != nullptr); + return index_block_.GetOwnValue() ? + index_block_.GetValue()->ApproximateMemoryUsage() : 0; + } + +private: + BlockBasedTable* table_; + CachableEntry index_block_; +}; + +Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( + BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& read_options, GetContext* get_context, + CachableEntry* index_block) { + + PERF_TIMER_GUARD(read_index_block_nanos); + + assert(table != nullptr); + assert(index_block != nullptr); + assert(index_block->IsEmpty()); + + const Rep* const rep = table->get_rep(); + assert(rep != nullptr); + + constexpr bool is_index = true; + const Status s = BlockBasedTable::RetrieveBlock(prefetch_buffer, + rep, read_options, rep->footer.index_handle(), + UncompressionDict::GetEmptyDict(), index_block, is_index, get_context); + + return s; +} + +Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock( + const ReadOptions& read_options, GetContext* get_context, + CachableEntry* index_block) const { + + assert(index_block != nullptr); + + if (!index_block_.IsEmpty()) { + *index_block = CachableEntry(index_block_.GetValue(), + nullptr /* cache */, nullptr /* cache_handle */, false /* own_value */); + return Status::OK(); + } + + return ReadIndexBlock(table_, nullptr /* prefetch_buffer */, + read_options, get_context, index_block); +} + // Index that allows binary search lookup in a two-level index structure. -class PartitionIndexReader : public IndexReader { +class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { public: // Read the partition index from the file and create an instance for // `PartitionIndexReader`. // On success, index_reader will be populated; otherwise it will remain // unmodified. - static Status Create(BlockBasedTable* table, RandomAccessFileReader* file, - FilePrefetchBuffer* prefetch_buffer, - const Footer& footer, const BlockHandle& index_handle, - const ImmutableCFOptions& ioptions, - const InternalKeyComparator* icomparator, - IndexReader** index_reader, - const PersistentCacheOptions& cache_options, - const int level, const bool index_key_includes_seq, - const bool index_value_is_full, - MemoryAllocator* memory_allocator) { - std::unique_ptr index_block; - auto s = ReadBlockFromFile( - file, prefetch_buffer, footer, ReadOptions(), index_handle, - &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), - cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */, memory_allocator); + static Status Create(BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, IndexReader** index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = ReadIndexBlock(table, prefetch_buffer, ReadOptions(), + nullptr /* get_context */, &index_block); + if (!s.ok()) { + return s; + } - if (s.ok()) { - *index_reader = new PartitionIndexReader( - table, icomparator, std::move(index_block), ioptions.statistics, - level, index_key_includes_seq, index_value_is_full); + if (use_cache && !pin) { + index_block.Reset(); + } } - return s; + *index_reader = new PartitionIndexReader(table, std::move(index_block)); + + return Status::OK(); } // return a two-level iterator: first level is on the partition index InternalIteratorBase* NewIterator( - IndexBlockIter* /*iter*/ = nullptr, bool /*dont_care*/ = true, - bool fill_cache = true) override { + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context) override { + + CachableEntry index_block; + const Status s = GetOrReadIndexBlock(read_options, get_context, + &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + InternalIteratorBase* it = nullptr; + Statistics* kNullStats = nullptr; // Filters are already checked before seeking the index if (!partition_map_.empty()) { - // We don't return pinned datat from index blocks, so no need + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - return NewTwoLevelIterator( + it = NewTwoLevelIterator( new BlockBasedTable::PartitionedIndexIteratorState( - table_, &partition_map_, index_key_includes_seq_, - index_value_is_full_), - index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), nullptr, - kNullStats, true, index_key_includes_seq_, index_value_is_full_)); + table(), &partition_map_, index_key_includes_seq(), + index_value_is_full()), + index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), + nullptr, kNullStats, true, index_key_includes_seq(), + index_value_is_full())); } else { - auto ro = ReadOptions(); - ro.fill_cache = fill_cache; - bool kIsIndex = true; - // We don't return pinned datat from index blocks, so no need + ReadOptions ro; + ro.fill_cache = read_options.fill_cache; + constexpr bool is_index = true; + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - return new BlockBasedTableIterator( - table_, ro, *icomparator_, - index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), nullptr, - kNullStats, true, index_key_includes_seq_, index_value_is_full_), - false, true, /* prefix_extractor */ nullptr, kIsIndex, - index_key_includes_seq_, index_value_is_full_); + it = new BlockBasedTableIterator( + table(), ro, *internal_comparator(), + index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), + nullptr, kNullStats, true, index_key_includes_seq(), + index_value_is_full()), + false, true, /* prefix_extractor */ nullptr, is_index, + index_key_includes_seq(), index_value_is_full()); } + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + // TODO(myabandeh): Update TwoLevelIterator to be able to make use of // on-stack BlockIter while the state is on heap. Currentlly it assumes // the first level iter is always on heap and will attempt to delete it @@ -289,15 +393,26 @@ class PartitionIndexReader : public IndexReader { void CacheDependencies(bool pin) override { // Before read partitions, prefetch them to avoid lots of IOs - auto rep = table_->rep_; + auto rep = table()->rep_; IndexBlockIter biter; BlockHandle handle; Statistics* kNullStats = nullptr; - // We don't return pinned datat from index blocks, so no need + + CachableEntry index_block; + Status s = GetOrReadIndexBlock(ReadOptions(), nullptr /* get_context */, + &index_block); + if (!s.ok()) { + ROCKS_LOG_WARN(rep->ioptions.info_log, + "Error retrieving top-level index block while trying to " + "cache index partitions: %s", s.ToString().c_str()); + return; + } + + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), &biter, kNullStats, true, - index_key_includes_seq_, index_value_is_full_); + index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), &biter, + kNullStats, true, index_key_includes_seq(), index_value_is_full()); // Index partitions are assumed to be consecuitive. Prefetch them all. // Read the first block offset biter.SeekToFirst(); @@ -318,10 +433,10 @@ class PartitionIndexReader : public IndexReader { uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize; uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; - auto& file = table_->rep_->file; + auto& file = rep->file; prefetch_buffer.reset(new FilePrefetchBuffer()); - Status s = prefetch_buffer->Prefetch(file.get(), prefetch_off, - static_cast(prefetch_len)); + s = prefetch_buffer->Prefetch(file.get(), prefetch_off, + static_cast(prefetch_len)); // After prefetch, read the partitions one by one biter.SeekToFirst(); @@ -332,7 +447,7 @@ class PartitionIndexReader : public IndexReader { const bool is_index = true; // TODO: Support counter batch update for partitioned index and // filter blocks - s = table_->MaybeReadBlockAndLoadToCache( + s = BlockBasedTable::MaybeReadBlockAndLoadToCache( prefetch_buffer.get(), rep, ro, handle, UncompressionDict::GetEmptyDict(), &block, is_index, nullptr /* get_context */); @@ -348,12 +463,8 @@ class PartitionIndexReader : public IndexReader { } } - size_t size() const override { return index_block_->size(); } - size_t usable_size() const override { return index_block_->usable_size(); } - size_t ApproximateMemoryUsage() const override { - assert(index_block_); - size_t usage = index_block_->ApproximateMemoryUsage(); + size_t usage = ApproximateIndexBlockMemoryUsage(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE usage += malloc_usable_size((void*)this); #else @@ -364,78 +475,79 @@ class PartitionIndexReader : public IndexReader { } private: - PartitionIndexReader(BlockBasedTable* table, - const InternalKeyComparator* icomparator, - std::unique_ptr&& index_block, Statistics* stats, - const int /*level*/, const bool index_key_includes_seq, - const bool index_value_is_full) - : IndexReader(icomparator, stats), - table_(table), - index_block_(std::move(index_block)), - index_key_includes_seq_(index_key_includes_seq), - index_value_is_full_(index_value_is_full) { - assert(index_block_ != nullptr); - } - BlockBasedTable* table_; - std::unique_ptr index_block_; + PartitionIndexReader(BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) + {} + std::unordered_map> partition_map_; - const bool index_key_includes_seq_; - const bool index_value_is_full_; }; // Index that allows binary search lookup for the first key of each block. // This class can be viewed as a thin wrapper for `Block` class which already // supports binary search. -class BinarySearchIndexReader : public IndexReader { +class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon { public: // Read index from the file and create an intance for // `BinarySearchIndexReader`. // On success, index_reader will be populated; otherwise it will remain // unmodified. - static Status Create(RandomAccessFileReader* file, - FilePrefetchBuffer* prefetch_buffer, - const Footer& footer, const BlockHandle& index_handle, - const ImmutableCFOptions& ioptions, - const InternalKeyComparator* icomparator, - IndexReader** index_reader, - const PersistentCacheOptions& cache_options, - const bool index_key_includes_seq, - const bool index_value_is_full, - MemoryAllocator* memory_allocator) { - std::unique_ptr index_block; - auto s = ReadBlockFromFile( - file, prefetch_buffer, footer, ReadOptions(), index_handle, - &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), - cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */, memory_allocator); + static Status Create(BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, IndexReader** index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = ReadIndexBlock(table, prefetch_buffer, ReadOptions(), + nullptr /* get_context */, &index_block); + if (!s.ok()) { + return s; + } - if (s.ok()) { - *index_reader = new BinarySearchIndexReader( - icomparator, std::move(index_block), ioptions.statistics, - index_key_includes_seq, index_value_is_full); + if (use_cache && !pin) { + index_block.Reset(); + } } - return s; + *index_reader = new BinarySearchIndexReader(table, std::move(index_block)); + + return Status::OK(); } InternalIteratorBase* NewIterator( - IndexBlockIter* iter = nullptr, bool /*dont_care*/ = true, - bool /*dont_care*/ = true) override { + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context) override { + CachableEntry index_block; + const Status s = GetOrReadIndexBlock(read_options, get_context, + &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + Statistics* kNullStats = nullptr; - // We don't return pinned datat from index blocks, so no need + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - return index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), iter, kNullStats, true, - index_key_includes_seq_, index_value_is_full_); - } + auto it = index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), iter, + kNullStats, true, index_key_includes_seq(), index_value_is_full()); - size_t size() const override { return index_block_->size(); } - size_t usable_size() const override { return index_block_->usable_size(); } + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + } size_t ApproximateMemoryUsage() const override { - assert(index_block_); - size_t usage = index_block_->ApproximateMemoryUsage(); + size_t usage = ApproximateIndexBlockMemoryUsage(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE usage += malloc_usable_size((void*)this); #else @@ -445,60 +557,51 @@ class BinarySearchIndexReader : public IndexReader { } private: - BinarySearchIndexReader(const InternalKeyComparator* icomparator, - std::unique_ptr&& index_block, - Statistics* stats, const bool index_key_includes_seq, - const bool index_value_is_full) - : IndexReader(icomparator, stats), - index_block_(std::move(index_block)), - index_key_includes_seq_(index_key_includes_seq), - index_value_is_full_(index_value_is_full) { - assert(index_block_ != nullptr); - } - std::unique_ptr index_block_; - const bool index_key_includes_seq_; - const bool index_value_is_full_; + BinarySearchIndexReader(BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) + {} }; // Index that leverages an internal hash table to quicken the lookup for a given // key. -class HashIndexReader : public IndexReader { +class HashIndexReader : public BlockBasedTable::IndexReaderCommon { public: - static Status Create( - const SliceTransform* hash_key_extractor, const Footer& footer, - RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, - const ImmutableCFOptions& ioptions, - const InternalKeyComparator* icomparator, const BlockHandle& index_handle, - InternalIterator* meta_index_iter, IndexReader** index_reader, - bool /*hash_index_allow_collision*/, - const PersistentCacheOptions& cache_options, - const bool index_key_includes_seq, const bool index_value_is_full, - MemoryAllocator* memory_allocator) { - std::unique_ptr index_block; - auto s = ReadBlockFromFile( - file, prefetch_buffer, footer, ReadOptions(), index_handle, - &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), - cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */, memory_allocator); + static Status Create(BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_index_iter, bool use_cache, + bool prefetch, bool pin, IndexReader** index_reader) { + assert(table != nullptr); + assert(index_reader != nullptr); + assert(!pin || prefetch); + + auto rep = table->get_rep(); + assert(rep != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = ReadIndexBlock(table, prefetch_buffer, ReadOptions(), + nullptr /* get_context */, &index_block); + if (!s.ok()) { + return s; + } - if (!s.ok()) { - return s; + if (use_cache && !pin) { + index_block.Reset(); + } } // Note, failure to create prefix hash index does not need to be a // hard error. We can still fall back to the original binary search index. // So, Create will succeed regardless, from this point on. - auto new_index_reader = new HashIndexReader( - icomparator, std::move(index_block), ioptions.statistics, - index_key_includes_seq, index_value_is_full); + auto new_index_reader = new HashIndexReader(table, std::move(index_block)); *index_reader = new_index_reader; // Get prefixes block BlockHandle prefixes_handle; - s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, - &prefixes_handle); + Status s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, + &prefixes_handle); if (!s.ok()) { // TODO: log error return Status::OK(); @@ -513,6 +616,13 @@ class HashIndexReader : public IndexReader { return Status::OK(); } + RandomAccessFileReader* const file = rep->file.get(); + const Footer& footer = rep->footer; + const ImmutableCFOptions& ioptions = rep->ioptions; + const PersistentCacheOptions& cache_options = rep->persistent_cache_options; + MemoryAllocator* const memory_allocator = + GetMemoryAllocator(rep->table_options); + // Read contents for the blocks BlockContents prefixes_contents; BlockFetcher prefixes_block_fetcher( @@ -537,7 +647,8 @@ class HashIndexReader : public IndexReader { } BlockPrefixIndex* prefix_index = nullptr; - s = BlockPrefixIndex::Create(hash_key_extractor, prefixes_contents.data, + s = BlockPrefixIndex::Create(rep->internal_prefix_transform.get(), + prefixes_contents.data, prefixes_meta_contents.data, &prefix_index); // TODO: log error if (s.ok()) { @@ -548,24 +659,39 @@ class HashIndexReader : public IndexReader { } InternalIteratorBase* NewIterator( - IndexBlockIter* iter = nullptr, bool total_order_seek = true, - bool /*dont_care*/ = true) override { + const ReadOptions& read_options, bool disable_prefix_seek, + IndexBlockIter* iter, GetContext* get_context) override { + CachableEntry index_block; + const Status s = GetOrReadIndexBlock(read_options, get_context, + &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + Statistics* kNullStats = nullptr; - // We don't return pinned datat from index blocks, so no need + const bool total_order_seek = read_options.total_order_seek || + disable_prefix_seek; + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - return index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), iter, kNullStats, - total_order_seek, index_key_includes_seq_, index_value_is_full_, - false /* block_contents_pinned */, prefix_index_.get()); - } + auto it = index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), iter, + kNullStats, total_order_seek, index_key_includes_seq(), + index_value_is_full(), false /* block_contents_pinned */, + prefix_index_.get()); - size_t size() const override { return index_block_->size(); } - size_t usable_size() const override { return index_block_->usable_size(); } + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + } size_t ApproximateMemoryUsage() const override { - assert(index_block_); - size_t usage = index_block_->ApproximateMemoryUsage(); - usage += prefixes_contents_.usable_size(); + size_t usage = ApproximateIndexBlockMemoryUsage(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE usage += malloc_usable_size((void*)this); #else @@ -578,37 +704,22 @@ class HashIndexReader : public IndexReader { } private: - HashIndexReader(const InternalKeyComparator* icomparator, - std::unique_ptr&& index_block, Statistics* stats, - const bool index_key_includes_seq, - const bool index_value_is_full) - : IndexReader(icomparator, stats), - index_block_(std::move(index_block)), - index_key_includes_seq_(index_key_includes_seq), - index_value_is_full_(index_value_is_full) { - assert(index_block_ != nullptr); - } + HashIndexReader(BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) + {} - ~HashIndexReader() override {} - - std::unique_ptr index_block_; std::unique_ptr prefix_index_; - BlockContents prefixes_contents_; - const bool index_key_includes_seq_; - const bool index_value_is_full_; }; // Helper function to setup the cache key's prefix for the Table. -void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) { +void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { assert(kMaxCacheKeyPrefixSize >= 10); rep->cache_key_prefix_size = 0; rep->compressed_cache_key_prefix_size = 0; if (rep->table_options.block_cache != nullptr) { GenerateCachePrefix(rep->table_options.block_cache.get(), rep->file->file(), &rep->cache_key_prefix[0], &rep->cache_key_prefix_size); - // Create dummy offset of index reader which is beyond the file size. - rep->dummy_index_reader_offset = - file_size + rep->table_options.block_cache->NewId(); } if (rep->table_options.persistent_cache != nullptr) { GenerateCachePrefix(/*cache=*/nullptr, rep->file->file(), @@ -814,7 +925,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // handle prefix correctly. rep->internal_prefix_transform.reset( new InternalKeySliceTransform(prefix_extractor)); - SetupCacheKeyPrefix(rep, file_size); + SetupCacheKeyPrefix(rep); std::unique_ptr new_table(new BlockBasedTable(rep)); // page cache options @@ -848,9 +959,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, return s; } s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(), - new_table.get(), prefix_extractor, - prefetch_all, table_options, level, - prefetch_index_and_filter_in_cache); + new_table.get(), prefetch_all, table_options, + level); if (s.ok()) { // Update tail prefetch stats @@ -1116,9 +1226,8 @@ Status BlockBasedTable::ReadCompressionDictBlock( Status BlockBasedTable::PrefetchIndexAndFilterBlocks( Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, - BlockBasedTable* new_table, const SliceTransform* prefix_extractor, - bool prefetch_all, const BlockBasedTableOptions& table_options, - const int level, const bool prefetch_index_and_filter_in_cache) { + BlockBasedTable* new_table, bool prefetch_all, + const BlockBasedTableOptions& table_options, const int level) { Status s; // Find filter handle and filter type @@ -1157,10 +1266,10 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( &rep->compression_dict_handle); } - bool need_upper_bound_check = - PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor); - BlockBasedTableOptions::IndexType index_type = new_table->UpdateIndexType(); + + const bool use_cache = table_options.cache_index_and_filter_blocks; + // prefetch the first level of index const bool prefetch_index = prefetch_all || @@ -1183,39 +1292,34 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( const bool pin_filter = pin_all || (table_options.pin_top_level_index_and_filter && rep->filter_type == Rep::FilterType::kPartitionedFilter); + + IndexReader* index_reader = nullptr; + if (s.ok()) { + s = new_table->CreateIndexReader(prefetch_buffer, meta_iter, use_cache, + prefetch_index, pin_index, &index_reader); + if (s.ok()) { + assert(index_reader != nullptr); + rep->index_reader.reset(index_reader); + // The partitions of partitioned index are always stored in cache. They + // are hence follow the configuration for pin and prefetch regardless of + // the value of cache_index_and_filter_blocks + if (prefetch_all) { + rep->index_reader->CacheDependencies(pin_all); + } + } else { + delete index_reader; + index_reader = nullptr; + } + } + // pre-fetching of blocks is turned on // Will use block cache for meta-blocks access // Always prefetch index and filter for level 0 // TODO(ajkr): also prefetch compression dictionary block + // TODO(ajkr): also pin compression dictionary block when + // `pin_l0_filter_and_index_blocks_in_cache == true`. if (table_options.cache_index_and_filter_blocks) { assert(table_options.block_cache != nullptr); - if (prefetch_index) { - // Hack: Call NewIndexIterator() to implicitly add index to the - // block_cache - CachableEntry index_entry; - // check prefix_extractor match only if hash based index is used - bool disable_prefix_seek = - rep->index_type == BlockBasedTableOptions::kHashSearch && - need_upper_bound_check; - if (s.ok()) { - std::unique_ptr> iter( - new_table->NewIndexIterator(ReadOptions(), disable_prefix_seek, - nullptr, &index_entry)); - s = iter->status(); - } - if (s.ok()) { - // This is the first call to NewIndexIterator() since we're in Open(). - // On success it should give us ownership of the `CachableEntry` by - // populating `index_entry`. - assert(index_entry.GetValue() != nullptr); - if (prefetch_all) { - index_entry.GetValue()->CacheDependencies(pin_all); - } - if (pin_index) { - rep->index_entry = std::move(index_entry); - } - } - } if (s.ok() && prefetch_filter) { // Hack: Call GetFilter() to implicitly add filter to the block_cache auto filter_entry = @@ -1232,24 +1336,8 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( } } } else { - // If we don't use block cache for meta-block access, we'll pre-load these - // blocks, which will kept in member variables in Rep and with a same life- - // time as this table object. - IndexReader* index_reader = nullptr; - if (s.ok()) { - s = new_table->CreateIndexReader(prefetch_buffer, &index_reader, - meta_iter, level); - } std::unique_ptr compression_dict_block; if (s.ok()) { - rep->index_reader.reset(index_reader); - // The partitions of partitioned index are always stored in cache. They - // are hence follow the configuration for pin and prefetch regardless of - // the value of cache_index_and_filter_blocks - if (prefetch_index_and_filter_in_cache || level == 0) { - rep->index_reader->CacheDependencies(pin_all); - } - // Set filter block if (rep->filter_policy) { const bool is_a_filter_partition = true; @@ -1259,14 +1347,12 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( rep->filter.reset(filter); // Refer to the comment above about paritioned indexes always being // cached - if (filter && (prefetch_index_and_filter_in_cache || level == 0)) { + if (filter && prefetch_all) { filter->CacheDependencies(pin_all, rep->table_prefix_extractor.get()); } } s = ReadCompressionDictBlock(rep, prefetch_buffer, &compression_dict_block); - } else { - delete index_reader; } if (s.ok() && !rep->compression_dict_handle.IsNull()) { assert(compression_dict_block != nullptr); @@ -1350,7 +1436,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, Status BlockBasedTable::GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, - Cache* block_cache, Cache* block_cache_compressed, Rep* rep, + Cache* block_cache, Cache* block_cache_compressed, const Rep* rep, const ReadOptions& read_options, CachableEntry* block, const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit, bool is_index, GetContext* get_context) { @@ -1379,6 +1465,10 @@ Status BlockBasedTable::GetDataBlockFromCache( : nullptr, statistics, get_context); if (cache_handle != nullptr) { + if (is_index) { + PERF_COUNTER_ADD(block_cache_index_hit_count, 1); + } + block->SetCachedValue( reinterpret_cast(block_cache->Value(cache_handle)), block_cache, cache_handle); @@ -1843,119 +1933,15 @@ BlockBasedTable::GetUncompressionDict(Rep* rep, // differs from the one in mutable_cf_options and index type is HashBasedIndex InternalIteratorBase* BlockBasedTable::NewIndexIterator( const ReadOptions& read_options, bool disable_prefix_seek, - IndexBlockIter* input_iter, CachableEntry* index_entry, - GetContext* get_context) { - // index reader has already been pre-populated. - if (rep_->index_reader) { - // We don't return pinned datat from index blocks, so no need - // to set `block_contents_pinned`. - return rep_->index_reader->NewIterator( - input_iter, read_options.total_order_seek || disable_prefix_seek, - read_options.fill_cache); - } - // we have a pinned index block - if (rep_->index_entry.IsCached()) { - // We don't return pinned datat from index blocks, so no need - // to set `block_contents_pinned`. - return rep_->index_entry.GetValue()->NewIterator( - input_iter, read_options.total_order_seek || disable_prefix_seek, - read_options.fill_cache); - } - - PERF_TIMER_GUARD(read_index_block_nanos); - - const bool no_io = read_options.read_tier == kBlockCacheTier; - Cache* block_cache = rep_->table_options.block_cache.get(); - char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - auto key = - GetCacheKeyFromOffset(rep_->cache_key_prefix, rep_->cache_key_prefix_size, - rep_->dummy_index_reader_offset, cache_key); - Statistics* statistics = rep_->ioptions.statistics; - auto cache_handle = GetEntryFromCache( - block_cache, key, rep_->level, BLOCK_CACHE_INDEX_MISS, - BLOCK_CACHE_INDEX_HIT, - get_context ? &get_context->get_context_stats_.num_cache_index_miss - : nullptr, - get_context ? &get_context->get_context_stats_.num_cache_index_hit - : nullptr, - statistics, get_context); - - if (cache_handle == nullptr && no_io) { - if (input_iter != nullptr) { - input_iter->Invalidate(Status::Incomplete("no blocking io")); - return input_iter; - } else { - return NewErrorInternalIterator( - Status::Incomplete("no blocking io")); - } - } + IndexBlockIter* input_iter, GetContext* get_context) { - IndexReader* index_reader = nullptr; - if (cache_handle != nullptr) { - PERF_COUNTER_ADD(block_cache_index_hit_count, 1); - index_reader = - reinterpret_cast(block_cache->Value(cache_handle)); - } else { - // Create index reader and put it in the cache. - Status s; - TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:2"); - s = CreateIndexReader(nullptr /* prefetch_buffer */, &index_reader); - TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:1"); - TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:3"); - TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:4"); - size_t charge = 0; - if (s.ok()) { - assert(index_reader != nullptr); - charge = index_reader->ApproximateMemoryUsage(); - s = block_cache->Insert( - key, index_reader, charge, &DeleteCachedIndexEntry, &cache_handle, - rep_->table_options.cache_index_and_filter_blocks_with_high_priority - ? Cache::Priority::HIGH - : Cache::Priority::LOW); - } - - if (s.ok()) { - if (get_context != nullptr) { - get_context->get_context_stats_.num_cache_add++; - get_context->get_context_stats_.num_cache_bytes_write += charge; - } else { - RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge); - } - PERF_COUNTER_ADD(index_block_read_count, 1); - RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); - RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge); - } else { - if (index_reader != nullptr) { - delete index_reader; - } - RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); - // make sure if something goes wrong, index_reader shall remain intact. - if (input_iter != nullptr) { - input_iter->Invalidate(s); - return input_iter; - } else { - return NewErrorInternalIterator(s); - } - } - } + assert(rep_ != nullptr); + assert(rep_->index_reader != nullptr); - assert(cache_handle); - // We don't return pinned datat from index blocks, so no need + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - auto* iter = index_reader->NewIterator( - input_iter, read_options.total_order_seek || disable_prefix_seek); - - // the caller would like to take ownership of the index block - // don't call RegisterCleanup() in this case, the caller will take care of it - if (index_entry != nullptr) { - *index_entry = {index_reader, block_cache, cache_handle, - false /* own_value */}; - } else { - iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); - } - - return iter; + return rep_->index_reader->NewIterator(read_options, disable_prefix_seek, + input_iter, get_context); } // Convert an index iterator value (i.e., an encoded BlockHandle) @@ -1970,118 +1956,85 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( FilePrefetchBuffer* prefetch_buffer) { PERF_TIMER_GUARD(new_table_block_iter_nanos); - Cache* block_cache = rep->table_options.block_cache.get(); + TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + + const bool no_io = (ro.read_tier == kBlockCacheTier); + auto uncompression_dict_storage = + GetUncompressionDict(rep, prefetch_buffer, no_io, get_context); + const UncompressionDict& uncompression_dict = + uncompression_dict_storage.GetValue() == nullptr + ? UncompressionDict::GetEmptyDict() + : *uncompression_dict_storage.GetValue(); + CachableEntry block; - TBlockIter* iter; - { - const bool no_io = (ro.read_tier == kBlockCacheTier); - auto uncompression_dict_storage = - GetUncompressionDict(rep, prefetch_buffer, no_io, get_context); - const UncompressionDict& uncompression_dict = - uncompression_dict_storage.GetValue() == nullptr - ? UncompressionDict::GetEmptyDict() - : *uncompression_dict_storage.GetValue(); - if (s.ok()) { - s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle, - uncompression_dict, &block, is_index, - get_context); - } + s = RetrieveBlock(prefetch_buffer, rep, ro, handle, uncompression_dict, + &block, is_index, get_context); - if (input_iter != nullptr) { - iter = input_iter; - } else { - iter = new TBlockIter; - } - // Didn't get any data from block caches. - if (s.ok() && block.GetValue() == nullptr) { - if (no_io) { - // Could not read from block_cache and can't do IO - iter->Invalidate(Status::Incomplete("no blocking io")); - return iter; - } - std::unique_ptr block_value; - { - StopWatch sw(rep->ioptions.env, rep->ioptions.statistics, - READ_BLOCK_GET_MICROS); - s = ReadBlockFromFile( - rep->file.get(), prefetch_buffer, rep->footer, ro, handle, - &block_value, rep->ioptions, - rep->blocks_maybe_compressed /*do_decompress*/, - rep->blocks_maybe_compressed, uncompression_dict, - rep->persistent_cache_options, - is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, - GetMemoryAllocator(rep->table_options)); - } + if (!s.ok()) { + assert(block.IsEmpty()); + iter->Invalidate(s); + return iter; + } + + assert(block.GetValue() != nullptr); + constexpr bool kTotalOrderSeek = true; + // Block contents are pinned and it is still pinned after the iterator + // is destroyed as long as cleanup functions are moved to another object, + // when: + // 1. block cache handle is set to be released in cleanup function, or + // 2. it's pointing to immortal source. If own_bytes is true then we are + // not reading data from the original source, whether immortal or not. + // Otherwise, the block is pinned iff the source is immortal. + const bool block_contents_pinned = block.IsCached() || + (!block.GetValue()->own_bytes() && rep->immortal_table); + iter = block.GetValue()->NewIterator( + &rep->internal_comparator, rep->internal_comparator.user_comparator(), + iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq, + index_key_is_full, block_contents_pinned); + + if (!block.IsCached()) { + if (!ro.fill_cache && rep->cache_key_prefix_size != 0) { + // insert a dummy record to block cache to track the memory usage + Cache* const block_cache = rep->table_options.block_cache.get(); + Cache::Handle* cache_handle = nullptr; + // There are two other types of cache keys: 1) SST cache key added in + // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in + // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate + // from SST cache key(31 bytes), and use non-zero prefix to + // differentiate from `write_buffer_manager` + const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; + char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; + // Prefix: use rep->cache_key_prefix padded by 0s + memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); + assert(rep->cache_key_prefix_size != 0); + assert(rep->cache_key_prefix_size <= kExtraCacheKeyPrefix); + memcpy(cache_key, rep->cache_key_prefix, rep->cache_key_prefix_size); + char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, + next_cache_key_id_++); + assert(end - cache_key <= + static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); + const Slice unique_key(cache_key, static_cast(end - cache_key)); + s = block_cache->Insert(unique_key, nullptr, + block.GetValue()->ApproximateMemoryUsage(), + nullptr, &cache_handle); if (s.ok()) { - block.SetOwnedValue(block_value.release()); + assert(cache_handle != nullptr); + iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, + cache_handle); } } - // TODO(ajkr): also pin compression dictionary block when - // `pin_l0_filter_and_index_blocks_in_cache == true`. } - if (s.ok()) { - assert(block.GetValue() != nullptr); - const bool kTotalOrderSeek = true; - // Block contents are pinned and it is still pinned after the iterator - // is destroyed as long as cleanup functions are moved to another object, - // when: - // 1. block cache handle is set to be released in cleanup function, or - // 2. it's pointing to immortal source. If own_bytes is true then we are - // not reading data from the original source, whether immortal or not. - // Otherwise, the block is pinned iff the source is immortal. - bool block_contents_pinned = - (block.IsCached() || - (!block.GetValue()->own_bytes() && rep->immortal_table)); - iter = block.GetValue()->NewIterator( - &rep->internal_comparator, rep->internal_comparator.user_comparator(), - iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq, - index_key_is_full, block_contents_pinned); - if (!block.IsCached()) { - if (!ro.fill_cache && rep->cache_key_prefix_size != 0) { - // insert a dummy record to block cache to track the memory usage - Cache::Handle* cache_handle; - // There are two other types of cache keys: 1) SST cache key added in - // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in - // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate - // from SST cache key(31 bytes), and use non-zero prefix to - // differentiate from `write_buffer_manager` - const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; - char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; - // Prefix: use rep->cache_key_prefix padded by 0s - memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); - assert(rep->cache_key_prefix_size != 0); - assert(rep->cache_key_prefix_size <= kExtraCacheKeyPrefix); - memcpy(cache_key, rep->cache_key_prefix, rep->cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, - next_cache_key_id_++); - assert(end - cache_key <= - static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); - Slice unique_key = - Slice(cache_key, static_cast(end - cache_key)); - s = block_cache->Insert(unique_key, nullptr, - block.GetValue()->ApproximateMemoryUsage(), - nullptr, &cache_handle); - if (s.ok()) { - if (cache_handle != nullptr) { - iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, - cache_handle); - } - } - } - } - - block.TransferTo(iter); - } else { - assert(block.GetValue() == nullptr); - iter->Invalidate(s); - } + block.TransferTo(iter); return iter; } Status BlockBasedTable::MaybeReadBlockAndLoadToCache( - FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, const Rep* rep, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, bool is_index, GetContext* get_context) { assert(block_entry != nullptr); @@ -2116,7 +2069,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, rep, ro, block_entry, uncompression_dict, - rep->table_options.read_amp_bytes_per_bit, + !is_index ? + rep->table_options.read_amp_bytes_per_bit : 0, is_index, get_context); // Can't find the block from the cache. If I/O is allowed, read from the @@ -2148,7 +2102,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions, block_entry, &raw_block_contents, raw_block_comp_type, rep->table_options.format_version, uncompression_dict, seq_no, - rep->table_options.read_amp_bytes_per_bit, + !is_index ? rep->table_options.read_amp_bytes_per_bit : 0, GetMemoryAllocator(rep->table_options), is_index, is_index && rep->table_options .cache_index_and_filter_blocks_with_high_priority @@ -2162,6 +2116,64 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( return s; } +Status BlockBasedTable::RetrieveBlock( + FilePrefetchBuffer* prefetch_buffer, const Rep* rep, const ReadOptions& ro, + const BlockHandle& handle, const UncompressionDict& uncompression_dict, + CachableEntry* block_entry, bool is_index, GetContext* get_context) { + + assert(rep); + assert(block_entry); + assert(block_entry->IsEmpty()); + + Status s; + if (!is_index || rep->table_options.cache_index_and_filter_blocks) { + s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle, + uncompression_dict, block_entry, + is_index, get_context); + + if (!s.ok()) { + return s; + } + + if (block_entry->GetValue() != nullptr) { + assert (s.ok()); + return s; + } + } + + assert(block_entry->IsEmpty()); + + const bool no_io = ro.read_tier == kBlockCacheTier; + if (no_io) { + return Status::Incomplete("no blocking io"); + } + + std::unique_ptr block; + + { + StopWatch sw(rep->ioptions.env, rep->ioptions.statistics, + READ_BLOCK_GET_MICROS); + s = ReadBlockFromFile(rep->file.get(), prefetch_buffer, rep->footer, ro, + handle, &block, rep->ioptions, + rep->blocks_maybe_compressed, + rep->blocks_maybe_compressed, uncompression_dict, + rep->persistent_cache_options, + rep->get_global_seqno(is_index), + !is_index ? + rep->table_options.read_amp_bytes_per_bit : 0, + GetMemoryAllocator(rep->table_options)); + } + + if (!s.ok()) { + return s; + } + + block_entry->SetOwnedValue(block.release()); + + assert(s.ok()); + return s; +} + BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState( BlockBasedTable* table, std::unordered_map>* block_map, @@ -2188,7 +2200,7 @@ BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator( RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ, block_cache->GetUsage(block->second.GetCacheHandle())); Statistics* kNullStats = nullptr; - // We don't return pinned datat from index blocks, so no need + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. return block->second.GetValue()->NewIterator( &rep->internal_comparator, rep->internal_comparator.user_comparator(), @@ -2747,7 +2759,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, } auto iiter = NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack, - /* index_entry */ nullptr, get_context); + get_context); std::unique_ptr> iiter_unique_ptr; if (iiter != &iiter_on_stack) { iiter_unique_ptr.reset(iiter); @@ -2868,7 +2880,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, } auto iiter = NewIndexIterator( read_options, need_upper_bound_check, &iiter_on_stack, - /* index_entry */ nullptr, sst_file_range.begin()->get_context); + sst_file_range.begin()->get_context); std::unique_ptr> iiter_unique_ptr; if (iiter != &iiter_on_stack) { iiter_unique_ptr.reset(iiter); @@ -3085,45 +3097,37 @@ Status BlockBasedTable::VerifyChecksumInMetaBlocks( return s; } +bool BlockBasedTable::TEST_BlockInCache(const BlockHandle& handle) const { + assert(rep_ != nullptr); + + Cache* const cache = rep_->table_options.block_cache.get(); + if (cache == nullptr) { + return false; + } + + char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + Slice cache_key = GetCacheKey(rep_->cache_key_prefix, + rep_->cache_key_prefix_size, handle, + cache_key_storage); + + Cache::Handle* const cache_handle = cache->Lookup(cache_key); + if (cache_handle == nullptr) { + return false; + } + + cache->Release(cache_handle); + + return true; +} + bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, const Slice& key) { std::unique_ptr> iiter( NewIndexIterator(options)); iiter->Seek(key); assert(iiter->Valid()); - CachableEntry block; - - BlockHandle handle = iiter->value(); - Cache* block_cache = rep_->table_options.block_cache.get(); - assert(block_cache != nullptr); - - char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - Slice cache_key = - GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle, - cache_key_storage); - Slice ckey; - Status s; - if (!rep_->compression_dict_handle.IsNull()) { - std::unique_ptr compression_dict_block; - s = ReadCompressionDictBlock(rep_, nullptr /* prefetch_buffer */, - &compression_dict_block); - if (s.ok()) { - assert(compression_dict_block != nullptr); - UncompressionDict uncompression_dict( - compression_dict_block->data.ToString(), - rep_->blocks_definitely_zstd_compressed); - s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, rep_, - options, &block, uncompression_dict, - 0 /* read_amp_bytes_per_bit */); - } - } else { - s = GetDataBlockFromCache( - cache_key, ckey, block_cache, nullptr, rep_, options, &block, - UncompressionDict::GetEmptyDict(), 0 /* read_amp_bytes_per_bit */); - } - assert(s.ok()); - return block.IsCached(); + return TEST_BlockInCache(iiter->value()); } BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() { @@ -3151,14 +3155,11 @@ BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() { // 4. internal_comparator // 5. index_type Status BlockBasedTable::CreateIndexReader( - FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader, - InternalIterator* preloaded_meta_index_iter, int level) { + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* preloaded_meta_index_iter, bool use_cache, bool prefetch, + bool pin, IndexReader** index_reader) { auto index_type_on_file = rep_->index_type; - auto file = rep_->file.get(); - const InternalKeyComparator* icomparator = &rep_->internal_comparator; - const Footer& footer = rep_->footer; - // kHashSearch requires non-empty prefix_extractor but bypass checking // prefix_extractor here since we have no access to MutableCFOptions. // Add need_upper_bound_check flag in BlockBasedTable::NewIndexIterator. @@ -3167,25 +3168,12 @@ Status BlockBasedTable::CreateIndexReader( switch (index_type_on_file) { case BlockBasedTableOptions::kTwoLevelIndexSearch: { - return PartitionIndexReader::Create( - this, file, prefetch_buffer, footer, footer.index_handle(), - rep_->ioptions, icomparator, index_reader, - rep_->persistent_cache_options, level, - rep_->table_properties == nullptr || - rep_->table_properties->index_key_is_user_key == 0, - rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0, - GetMemoryAllocator(rep_->table_options)); + return PartitionIndexReader::Create(this, prefetch_buffer, use_cache, + prefetch, pin, index_reader); } case BlockBasedTableOptions::kBinarySearch: { - return BinarySearchIndexReader::Create( - file, prefetch_buffer, footer, footer.index_handle(), rep_->ioptions, - icomparator, index_reader, rep_->persistent_cache_options, - rep_->table_properties == nullptr || - rep_->table_properties->index_key_is_user_key == 0, - rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0, - GetMemoryAllocator(rep_->table_options)); + return BinarySearchIndexReader::Create(this, prefetch_buffer, use_cache, + prefetch, pin, index_reader); } case BlockBasedTableOptions::kHashSearch: { std::unique_ptr meta_guard; @@ -3200,29 +3188,15 @@ Status BlockBasedTable::CreateIndexReader( ROCKS_LOG_WARN(rep_->ioptions.info_log, "Unable to read the metaindex block." " Fall back to binary search index."); - return BinarySearchIndexReader::Create( - file, prefetch_buffer, footer, footer.index_handle(), - rep_->ioptions, icomparator, index_reader, - rep_->persistent_cache_options, - rep_->table_properties == nullptr || - rep_->table_properties->index_key_is_user_key == 0, - rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0, - GetMemoryAllocator(rep_->table_options)); + return BinarySearchIndexReader::Create(this, prefetch_buffer, + use_cache, prefetch, pin, + index_reader); } meta_index_iter = meta_iter_guard.get(); } - return HashIndexReader::Create( - rep_->internal_prefix_transform.get(), footer, file, prefetch_buffer, - rep_->ioptions, icomparator, footer.index_handle(), meta_index_iter, - index_reader, rep_->hash_index_allow_collision, - rep_->persistent_cache_options, - rep_->table_properties == nullptr || - rep_->table_properties->index_key_is_user_key == 0, - rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0, - GetMemoryAllocator(rep_->table_options)); + return HashIndexReader::Create(this, prefetch_buffer, meta_index_iter, + use_cache, prefetch, pin, index_reader); } default: { std::string error_message = @@ -3261,8 +3235,10 @@ bool BlockBasedTable::TEST_filter_block_preloaded() const { return rep_->filter != nullptr; } -bool BlockBasedTable::TEST_index_reader_preloaded() const { - return rep_->index_reader != nullptr; +bool BlockBasedTable::TEST_IndexBlockInCache() const { + assert(rep_ != nullptr); + + return TEST_BlockInCache(rep_->footer.index_handle()); } Status BlockBasedTable::GetKVPairsFromDataBlocks( @@ -3479,12 +3455,6 @@ void BlockBasedTable::Close() { rep_->filter_handle, cache_key); cache->Erase(key); - // Get the index block key - key = GetCacheKeyFromOffset(rep_->cache_key_prefix, - rep_->cache_key_prefix_size, - rep_->dummy_index_reader_offset, cache_key); - cache->Erase(key); - if (!rep_->compression_dict_handle.IsNull()) { // Get the compression dictionary block key key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, @@ -3674,15 +3644,6 @@ void DeleteCachedFilterEntry(const Slice& /*key*/, void* value) { delete filter; } -void DeleteCachedIndexEntry(const Slice& /*key*/, void* value) { - IndexReader* index_reader = reinterpret_cast(value); - if (index_reader->statistics() != nullptr) { - RecordTick(index_reader->statistics(), BLOCK_CACHE_INDEX_BYTES_EVICT, - index_reader->ApproximateMemoryUsage()); - } - delete index_reader; -} - void DeleteCachedUncompressionDictEntry(const Slice& /*key*/, void* value) { UncompressionDict* dict = reinterpret_cast(value); RecordTick(dict->statistics(), BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT, diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 270409b3a..54ce34d61 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -150,6 +150,8 @@ class BlockBasedTable : public TableReader { // be close to the file length. uint64_t ApproximateOffsetOf(const Slice& key) override; + bool TEST_BlockInCache(const BlockHandle& handle) const; + // Returns true if the block for the specified key is in cache. // REQUIRES: key is in this table && block cache enabled bool TEST_KeyInCache(const ReadOptions& options, const Slice& key); @@ -173,54 +175,35 @@ class BlockBasedTable : public TableReader { ~BlockBasedTable(); bool TEST_filter_block_preloaded() const; - bool TEST_index_reader_preloaded() const; + bool TEST_IndexBlockInCache() const; - // IndexReader is the interface that provide the functionality for index + // IndexReader is the interface that provides the functionality for index // access. class IndexReader { public: - explicit IndexReader(const InternalKeyComparator* icomparator, - Statistics* stats) - : icomparator_(icomparator), statistics_(stats) {} - - virtual ~IndexReader() {} - - // Create an iterator for index access. - // If iter is null then a new object is created on heap and the callee will - // have the ownership. If a non-null iter is passed in it will be used, and - // the returned value is either the same as iter or a new on-heap object - // that - // wrapps the passed iter. In the latter case the return value would point - // to - // a different object then iter and the callee has the ownership of the + virtual ~IndexReader() = default; + + // Create an iterator for index access. If iter is null, then a new object + // is created on the heap, and the callee will have the ownership. + // If a non-null iter is passed in, it will be used, and the returned value + // is either the same as iter or a new on-heap object that + // wraps the passed iter. In the latter case the return value points + // to a different object then iter, and the callee has the ownership of the // returned object. virtual InternalIteratorBase* NewIterator( - IndexBlockIter* iter = nullptr, bool total_order_seek = true, - bool fill_cache = true) = 0; - - // The size of the index. - virtual size_t size() const = 0; - // Memory usage of the index block - virtual size_t usable_size() const = 0; - // return the statistics pointer - virtual Statistics* statistics() const { return statistics_; } + const ReadOptions& read_options, bool disable_prefix_seek, + IndexBlockIter* iter, GetContext* get_context) = 0; + // Report an approximation of how much memory has been used other than - // memory - // that was allocated in block cache. + // memory that was allocated in block cache. virtual size_t ApproximateMemoryUsage() const = 0; - - virtual void CacheDependencies(bool /* unused */) {} - - // Prefetch all the blocks referenced by this index to the buffer - void PrefetchBlocks(FilePrefetchBuffer* buf); - - protected: - const InternalKeyComparator* icomparator_; - - private: - Statistics* statistics_; + // Cache the dependencies of the index reader (e.g. the partitions + // of a partitioned index). + virtual void CacheDependencies(bool /* pin */) {} }; + class IndexReaderCommon; + static Slice GetCacheKey(const char* cache_key_prefix, size_t cache_key_prefix_size, const BlockHandle& handle, char* cache_key); @@ -271,11 +254,22 @@ class BlockBasedTable : public TableReader { // in uncompressed block cache, also sets cache_handle to reference that // block. static Status MaybeReadBlockAndLoadToCache( - FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, - const BlockHandle& handle, const UncompressionDict& uncompression_dict, + FilePrefetchBuffer* prefetch_buffer, const Rep* rep, + const ReadOptions& ro, const BlockHandle& handle, + const UncompressionDict& uncompression_dict, CachableEntry* block_entry, bool is_index = false, GetContext* get_context = nullptr); + // Similar to the above, with one crucial difference: it will retrieve the + // block from the file even if there are no caches configured (assuming the + // read options allow I/O). + static Status RetrieveBlock( + FilePrefetchBuffer* prefetch_buffer, const Rep* rep, + const ReadOptions& ro, const BlockHandle& handle, + const UncompressionDict& uncompression_dict, + CachableEntry* block_entry, bool is_index, + GetContext* get_context); + // For the following two functions: // if `no_io == true`, we will not try to read filter/index from sst file // were they not present in cache yet. @@ -305,7 +299,6 @@ class BlockBasedTable : public TableReader { InternalIteratorBase* NewIndexIterator( const ReadOptions& read_options, bool need_upper_bound_check = false, IndexBlockIter* input_iter = nullptr, - CachableEntry* index_entry = nullptr, GetContext* get_context = nullptr); // Read block cache from block caches (if set): block_cache and @@ -316,7 +309,7 @@ class BlockBasedTable : public TableReader { // dictionary. static Status GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, - Cache* block_cache, Cache* block_cache_compressed, Rep* rep, + Cache* block_cache, Cache* block_cache_compressed, const Rep* rep, const ReadOptions& read_options, CachableEntry* block, const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit, bool is_index = false, @@ -359,9 +352,9 @@ class BlockBasedTable : public TableReader { // need to access extra meta blocks for index construction. This parameter // helps avoid re-reading meta index block if caller already created one. Status CreateIndexReader( - FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader, - InternalIterator* preloaded_meta_index_iter = nullptr, - const int level = -1); + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* preloaded_meta_index_iter, bool use_cache, + bool prefetch, bool pin, IndexReader** index_reader); bool FullFilterKeyMayMatch( const ReadOptions& read_options, FilterBlockReader* filter, @@ -398,9 +391,8 @@ class BlockBasedTable : public TableReader { static Status PrefetchIndexAndFilterBlocks( Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, BlockBasedTable* new_table, - const SliceTransform* prefix_extractor, bool prefetch_all, - const BlockBasedTableOptions& table_options, const int level, - const bool prefetch_index_and_filter_in_cache); + bool prefetch_all, const BlockBasedTableOptions& table_options, + const int level); Status VerifyChecksumInMetaBlocks(InternalIteratorBase* index_iter); Status VerifyChecksumInBlocks(InternalIteratorBase* index_iter); @@ -411,7 +403,7 @@ class BlockBasedTable : public TableReader { const bool is_a_filter_partition, const SliceTransform* prefix_extractor = nullptr) const; - static void SetupCacheKeyPrefix(Rep* rep, uint64_t file_size); + static void SetupCacheKeyPrefix(Rep* rep); // Generate a cache key prefix from the file static void GenerateCachePrefix(Cache* cc, RandomAccessFile* file, @@ -486,18 +478,21 @@ struct BlockBasedTable::Rep { size_t persistent_cache_key_prefix_size = 0; char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t compressed_cache_key_prefix_size = 0; - uint64_t dummy_index_reader_offset = - 0; // ID that is unique for the block cache. PersistentCacheOptions persistent_cache_options; // Footer contains the fixed table information Footer footer; - // `index_reader`, `filter`, and `uncompression_dict` will be populated (i.e., - // non-nullptr) and used only when options.block_cache is nullptr or when - // `cache_index_and_filter_blocks == false`. Otherwise, we will get the index, - // filter, and compression dictionary blocks via the block cache. In that case - // `dummy_index_reader_offset`, `filter_handle`, and `compression_dict_handle` - // are used to lookup these meta-blocks in block cache. + // `filter` and `uncompression_dict` will be populated (i.e., non-nullptr) + // and used only when options.block_cache is nullptr or when + // `cache_index_and_filter_blocks == false`. Otherwise, we will get the + // filter and compression dictionary blocks via the block cache. In that case, + // `filter_handle`, and `compression_dict_handle` are used to lookup these + // meta-blocks in block cache. + // + // Note: the IndexReader object is always stored in this member variable; + // the index block itself, however, may or may not be in the block cache + // based on the settings above. We plan to change the handling of the + // filter and compression dictionary similarly. std::unique_ptr index_reader; std::unique_ptr filter; std::unique_ptr uncompression_dict; @@ -526,12 +521,11 @@ struct BlockBasedTable::Rep { // only used in level 0 files when pin_l0_filter_and_index_blocks_in_cache is // true or in all levels when pin_top_level_index_and_filter is set in - // combination with partitioned index/filters: then we do use the LRU cache, - // but we always keep the filter & index block's handle checked out here (=we + // combination with partitioned filters: then we do use the LRU cache, + // but we always keep the filter block's handle checked out here (=we // don't call Release()), plus the parsed out objects the LRU cache will never // push flush them out, hence they're pinned CachableEntry filter_entry; - CachableEntry index_entry; std::shared_ptr fragmented_range_dels; // If global_seqno is used, all Keys in this file will have the same diff --git a/table/table_test.cc b/table/table_test.cc index dccc49194..aeb66f8d3 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1993,7 +1993,7 @@ TEST_P(BlockBasedTableTest, BlockCacheDisabledTest) { // preloading filter/index blocks is enabled. auto reader = dynamic_cast(c.GetTableReader()); ASSERT_TRUE(reader->TEST_filter_block_preloaded()); - ASSERT_TRUE(reader->TEST_index_reader_preloaded()); + ASSERT_FALSE(reader->TEST_IndexBlockInCache()); { // nothing happens in the beginning @@ -2040,7 +2040,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) { // preloading filter/index blocks is prohibited. auto* reader = dynamic_cast(c.GetTableReader()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); - ASSERT_TRUE(!reader->TEST_index_reader_preloaded()); + ASSERT_TRUE(reader->TEST_IndexBlockInCache()); // -- PART 1: Open with regular block cache. // Since block_cache is disabled, no cache activities will be involved. @@ -2612,69 +2612,6 @@ TEST_P(BlockBasedTableTest, MemoryAllocator) { EXPECT_GT(custom_memory_allocator->numAllocations.load(), 0); } -TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) { - // A regression test to avoid data race described in - // https://github.com/facebook/rocksdb/issues/1267 - TableConstructor c(BytewiseComparator(), true /* convert_to_internal_key_ */); - std::vector keys; - stl_wrappers::KVMap kvmap; - c.Add("a1", "val1"); - Options options; - options.prefix_extractor.reset(NewFixedPrefixTransform(1)); - BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); - table_options.index_type = BlockBasedTableOptions::kHashSearch; - table_options.cache_index_and_filter_blocks = true; - table_options.block_cache = NewLRUCache(0); - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - const ImmutableCFOptions ioptions(options); - const MutableCFOptions moptions(options); - c.Finish(options, ioptions, moptions, table_options, - GetPlainInternalComparator(options.comparator), &keys, &kvmap); - - rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers( - { - {"BlockBasedTable::NewIndexIterator::thread1:1", - "BlockBasedTable::NewIndexIterator::thread2:2"}, - {"BlockBasedTable::NewIndexIterator::thread2:3", - "BlockBasedTable::NewIndexIterator::thread1:4"}, - }, - { - {"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker", - "BlockBasedTable::NewIndexIterator::thread1:1"}, - {"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker", - "BlockBasedTable::NewIndexIterator::thread1:4"}, - {"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker", - "BlockBasedTable::NewIndexIterator::thread2:2"}, - {"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker", - "BlockBasedTable::NewIndexIterator::thread2:3"}, - }); - - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - ReadOptions ro; - auto* reader = c.GetTableReader(); - - std::function func1 = [&]() { - TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker"); - // TODO(Zhongyi): update test to use MutableCFOptions - std::unique_ptr iter( - reader->NewIterator(ro, moptions.prefix_extractor.get())); - iter->Seek(InternalKey("a1", 0, kTypeValue).Encode()); - }; - - std::function func2 = [&]() { - TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker"); - std::unique_ptr iter( - reader->NewIterator(ro, moptions.prefix_extractor.get())); - }; - - auto thread1 = port::Thread(func1); - auto thread2 = port::Thread(func2); - thread1.join(); - thread2.join(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - c.ResetTableReader(); -} - // Plain table is not supported in ROCKSDB_LITE #ifndef ROCKSDB_LITE TEST_F(PlainTableTest, BasicPlainTableProperties) {