From 1e355842519debea764a8e04c5c08918dcc01d91 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Thu, 30 May 2019 11:49:36 -0700 Subject: [PATCH] Move the index readers out of the block cache (#5298) Summary: Currently, when the block cache is used for index blocks as well, it is not really the index block that is stored in the cache but an IndexReader object. Since this object is not pure data (it has, for instance, pointers that might dangle), it's not really sharable. To avoid the issues around this, the current code uses a dummy unique cache key for each TableReader to store the IndexReader, and erases the IndexReader entry when the TableReader is closed. Instead of doing this, the new code moves the IndexReader out of the cache altogether. In particular, instead of the TableReader owning, or caching/pinning the IndexReader based on the customer's settings, the TableReader unconditionally owns the IndexReader, which in turn owns/caches/pins the index block (which is itself sharable and thus can be safely put in the cache without any hacks). Note: the change has two side effects: 1) Partitions of partitioned indexes no longer affect the read amplification statistics. 2) Eviction statistics for index blocks are temporarily broken. We plan to fix this in a separate phase. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5298 Differential Revision: D15303203 Pulled By: ltamasi fbshipit-source-id: 935a69ba59d87d5e44f42e2310619b790c366e47 --- HISTORY.md | 2 + db/db_block_cache_test.cc | 12 +- table/block_based_table_reader.cc | 1085 ++++++++++++++--------------- table/block_based_table_reader.h | 114 ++- table/table_test.cc | 67 +- 5 files changed, 590 insertions(+), 690 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 40d11096d..55366b006 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,8 @@ ## Unreleased ### Public API Change * Now DB::Close() will return Aborted() error when there is unreleased snapshot. Users can retry after all snapshots are released. +* Partitions of partitioned indexes no longer affect the read amplification statistics. +* Due to a refactoring, block cache eviction statistics for indexes are temporarily broken. We plan to reintroduce them in a later phase. ### New Features * Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature. diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index f6e1aad32..8eb73a23d 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -365,7 +365,10 @@ TEST_F(DBBlockCacheTest, IndexAndFilterBlocksStats) { ASSERT_EQ(cache->GetUsage(), index_bytes_insert + filter_bytes_insert); // set the cache capacity to the current usage cache->SetCapacity(index_bytes_insert + filter_bytes_insert); - ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), 0); + // The index eviction statistics were broken by the refactoring that moved + // the index readers out of the block cache. Disabling these until we can + // bring the stats back. + // ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), 0); ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_EVICT), 0); // Note that the second key needs to be no longer than the first one. // Otherwise the second index block may not fit in cache. @@ -377,8 +380,11 @@ TEST_F(DBBlockCacheTest, IndexAndFilterBlocksStats) { index_bytes_insert); ASSERT_GT(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_INSERT), filter_bytes_insert); - ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), - index_bytes_insert); + // The index eviction statistics were broken by the refactoring that moved + // the index readers out of the block cache. Disabling these until we can + // bring the stats back. + // ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_INDEX_BYTES_EVICT), + // index_bytes_insert); ASSERT_EQ(TestGetTickerCount(options, BLOCK_CACHE_FILTER_BYTES_EVICT), filter_bytes_insert); } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index a45fc0a5b..82f964926 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -120,16 +120,8 @@ void DeleteCachedEntry(const Slice& /*key*/, void* value) { } void DeleteCachedFilterEntry(const Slice& key, void* value); -void DeleteCachedIndexEntry(const Slice& key, void* value); void DeleteCachedUncompressionDictEntry(const Slice& key, void* value); -// Release the cached entry and decrement its ref count. -void ReleaseCachedEntry(void* arg, void* h) { - Cache* cache = reinterpret_cast(arg); - Cache::Handle* handle = reinterpret_cast(h); - cache->Release(handle); -} - // Release the cached entry and decrement its ref count. void ForceReleaseCachedEntry(void* arg, void* h) { Cache* cache = reinterpret_cast(arg); @@ -137,17 +129,6 @@ void ForceReleaseCachedEntry(void* arg, void* h) { cache->Release(handle, true /* force_erase */); } -Slice GetCacheKeyFromOffset(const char* cache_key_prefix, - size_t cache_key_prefix_size, uint64_t offset, - char* cache_key) { - assert(cache_key != nullptr); - assert(cache_key_prefix_size != 0); - assert(cache_key_prefix_size <= BlockBasedTable::kMaxCacheKeyPrefixSize); - memcpy(cache_key, cache_key_prefix, cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + cache_key_prefix_size, offset); - return Slice(cache_key, static_cast(end - cache_key)); -} - Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, int level, Tickers block_cache_miss_ticker, Tickers block_cache_hit_ticker, @@ -217,70 +198,193 @@ bool PrefixExtractorChanged(const TableProperties* table_properties, } // namespace +// Encapsulates common functionality for the various index reader +// implementations. Provides access to the index block regardless of whether +// it is owned by the reader or stored in the cache, or whether it is pinned +// in the cache or not. +class BlockBasedTable::IndexReaderCommon : public BlockBasedTable::IndexReader { +public: + IndexReaderCommon(BlockBasedTable* t, + CachableEntry&& index_block) + : table_(t) + , index_block_(std::move(index_block)) + { + assert(table_ != nullptr); + } + +protected: + static Status ReadIndexBlock(BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, const ReadOptions& read_options, + GetContext* get_context, CachableEntry* index_block); + + BlockBasedTable* table() const { return table_; } + + const InternalKeyComparator* internal_comparator() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + + return &table_->get_rep()->internal_comparator; + } + + bool index_key_includes_seq() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + + const TableProperties* const properties = + table_->get_rep()->table_properties.get(); + + return properties == nullptr || !properties->index_key_is_user_key; + } + + bool index_value_is_full() const { + assert(table_ != nullptr); + assert(table_->get_rep() != nullptr); + + const TableProperties* const properties = + table_->get_rep()->table_properties.get(); + + return properties == nullptr || !properties->index_value_is_delta_encoded; + } + + Status GetOrReadIndexBlock(const ReadOptions& read_options, + GetContext* get_context, + CachableEntry* index_block) const; + + size_t ApproximateIndexBlockMemoryUsage() const { + assert(!index_block_.GetOwnValue() || index_block_.GetValue() != nullptr); + return index_block_.GetOwnValue() ? + index_block_.GetValue()->ApproximateMemoryUsage() : 0; + } + +private: + BlockBasedTable* table_; + CachableEntry index_block_; +}; + +Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock( + BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, + const ReadOptions& read_options, GetContext* get_context, + CachableEntry* index_block) { + + PERF_TIMER_GUARD(read_index_block_nanos); + + assert(table != nullptr); + assert(index_block != nullptr); + assert(index_block->IsEmpty()); + + const Rep* const rep = table->get_rep(); + assert(rep != nullptr); + + constexpr bool is_index = true; + const Status s = BlockBasedTable::RetrieveBlock(prefetch_buffer, + rep, read_options, rep->footer.index_handle(), + UncompressionDict::GetEmptyDict(), index_block, is_index, get_context); + + return s; +} + +Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock( + const ReadOptions& read_options, GetContext* get_context, + CachableEntry* index_block) const { + + assert(index_block != nullptr); + + if (!index_block_.IsEmpty()) { + *index_block = CachableEntry(index_block_.GetValue(), + nullptr /* cache */, nullptr /* cache_handle */, false /* own_value */); + return Status::OK(); + } + + return ReadIndexBlock(table_, nullptr /* prefetch_buffer */, + read_options, get_context, index_block); +} + // Index that allows binary search lookup in a two-level index structure. -class PartitionIndexReader : public IndexReader { +class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon { public: // Read the partition index from the file and create an instance for // `PartitionIndexReader`. // On success, index_reader will be populated; otherwise it will remain // unmodified. - static Status Create(BlockBasedTable* table, RandomAccessFileReader* file, - FilePrefetchBuffer* prefetch_buffer, - const Footer& footer, const BlockHandle& index_handle, - const ImmutableCFOptions& ioptions, - const InternalKeyComparator* icomparator, - IndexReader** index_reader, - const PersistentCacheOptions& cache_options, - const int level, const bool index_key_includes_seq, - const bool index_value_is_full, - MemoryAllocator* memory_allocator) { - std::unique_ptr index_block; - auto s = ReadBlockFromFile( - file, prefetch_buffer, footer, ReadOptions(), index_handle, - &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), - cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */, memory_allocator); + static Status Create(BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, IndexReader** index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = ReadIndexBlock(table, prefetch_buffer, ReadOptions(), + nullptr /* get_context */, &index_block); + if (!s.ok()) { + return s; + } - if (s.ok()) { - *index_reader = new PartitionIndexReader( - table, icomparator, std::move(index_block), ioptions.statistics, - level, index_key_includes_seq, index_value_is_full); + if (use_cache && !pin) { + index_block.Reset(); + } } - return s; + *index_reader = new PartitionIndexReader(table, std::move(index_block)); + + return Status::OK(); } // return a two-level iterator: first level is on the partition index InternalIteratorBase* NewIterator( - IndexBlockIter* /*iter*/ = nullptr, bool /*dont_care*/ = true, - bool fill_cache = true) override { + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context) override { + + CachableEntry index_block; + const Status s = GetOrReadIndexBlock(read_options, get_context, + &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + + InternalIteratorBase* it = nullptr; + Statistics* kNullStats = nullptr; // Filters are already checked before seeking the index if (!partition_map_.empty()) { - // We don't return pinned datat from index blocks, so no need + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - return NewTwoLevelIterator( + it = NewTwoLevelIterator( new BlockBasedTable::PartitionedIndexIteratorState( - table_, &partition_map_, index_key_includes_seq_, - index_value_is_full_), - index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), nullptr, - kNullStats, true, index_key_includes_seq_, index_value_is_full_)); + table(), &partition_map_, index_key_includes_seq(), + index_value_is_full()), + index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), + nullptr, kNullStats, true, index_key_includes_seq(), + index_value_is_full())); } else { - auto ro = ReadOptions(); - ro.fill_cache = fill_cache; - bool kIsIndex = true; - // We don't return pinned datat from index blocks, so no need + ReadOptions ro; + ro.fill_cache = read_options.fill_cache; + constexpr bool is_index = true; + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - return new BlockBasedTableIterator( - table_, ro, *icomparator_, - index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), nullptr, - kNullStats, true, index_key_includes_seq_, index_value_is_full_), - false, true, /* prefix_extractor */ nullptr, kIsIndex, - index_key_includes_seq_, index_value_is_full_); + it = new BlockBasedTableIterator( + table(), ro, *internal_comparator(), + index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), + nullptr, kNullStats, true, index_key_includes_seq(), + index_value_is_full()), + false, true, /* prefix_extractor */ nullptr, is_index, + index_key_includes_seq(), index_value_is_full()); } + + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + // TODO(myabandeh): Update TwoLevelIterator to be able to make use of // on-stack BlockIter while the state is on heap. Currentlly it assumes // the first level iter is always on heap and will attempt to delete it @@ -289,15 +393,26 @@ class PartitionIndexReader : public IndexReader { void CacheDependencies(bool pin) override { // Before read partitions, prefetch them to avoid lots of IOs - auto rep = table_->rep_; + auto rep = table()->rep_; IndexBlockIter biter; BlockHandle handle; Statistics* kNullStats = nullptr; - // We don't return pinned datat from index blocks, so no need + + CachableEntry index_block; + Status s = GetOrReadIndexBlock(ReadOptions(), nullptr /* get_context */, + &index_block); + if (!s.ok()) { + ROCKS_LOG_WARN(rep->ioptions.info_log, + "Error retrieving top-level index block while trying to " + "cache index partitions: %s", s.ToString().c_str()); + return; + } + + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), &biter, kNullStats, true, - index_key_includes_seq_, index_value_is_full_); + index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), &biter, + kNullStats, true, index_key_includes_seq(), index_value_is_full()); // Index partitions are assumed to be consecuitive. Prefetch them all. // Read the first block offset biter.SeekToFirst(); @@ -318,10 +433,10 @@ class PartitionIndexReader : public IndexReader { uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize; uint64_t prefetch_len = last_off - prefetch_off; std::unique_ptr prefetch_buffer; - auto& file = table_->rep_->file; + auto& file = rep->file; prefetch_buffer.reset(new FilePrefetchBuffer()); - Status s = prefetch_buffer->Prefetch(file.get(), prefetch_off, - static_cast(prefetch_len)); + s = prefetch_buffer->Prefetch(file.get(), prefetch_off, + static_cast(prefetch_len)); // After prefetch, read the partitions one by one biter.SeekToFirst(); @@ -332,7 +447,7 @@ class PartitionIndexReader : public IndexReader { const bool is_index = true; // TODO: Support counter batch update for partitioned index and // filter blocks - s = table_->MaybeReadBlockAndLoadToCache( + s = BlockBasedTable::MaybeReadBlockAndLoadToCache( prefetch_buffer.get(), rep, ro, handle, UncompressionDict::GetEmptyDict(), &block, is_index, nullptr /* get_context */); @@ -348,12 +463,8 @@ class PartitionIndexReader : public IndexReader { } } - size_t size() const override { return index_block_->size(); } - size_t usable_size() const override { return index_block_->usable_size(); } - size_t ApproximateMemoryUsage() const override { - assert(index_block_); - size_t usage = index_block_->ApproximateMemoryUsage(); + size_t usage = ApproximateIndexBlockMemoryUsage(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE usage += malloc_usable_size((void*)this); #else @@ -364,78 +475,79 @@ class PartitionIndexReader : public IndexReader { } private: - PartitionIndexReader(BlockBasedTable* table, - const InternalKeyComparator* icomparator, - std::unique_ptr&& index_block, Statistics* stats, - const int /*level*/, const bool index_key_includes_seq, - const bool index_value_is_full) - : IndexReader(icomparator, stats), - table_(table), - index_block_(std::move(index_block)), - index_key_includes_seq_(index_key_includes_seq), - index_value_is_full_(index_value_is_full) { - assert(index_block_ != nullptr); - } - BlockBasedTable* table_; - std::unique_ptr index_block_; + PartitionIndexReader(BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) + {} + std::unordered_map> partition_map_; - const bool index_key_includes_seq_; - const bool index_value_is_full_; }; // Index that allows binary search lookup for the first key of each block. // This class can be viewed as a thin wrapper for `Block` class which already // supports binary search. -class BinarySearchIndexReader : public IndexReader { +class BinarySearchIndexReader : public BlockBasedTable::IndexReaderCommon { public: // Read index from the file and create an intance for // `BinarySearchIndexReader`. // On success, index_reader will be populated; otherwise it will remain // unmodified. - static Status Create(RandomAccessFileReader* file, - FilePrefetchBuffer* prefetch_buffer, - const Footer& footer, const BlockHandle& index_handle, - const ImmutableCFOptions& ioptions, - const InternalKeyComparator* icomparator, - IndexReader** index_reader, - const PersistentCacheOptions& cache_options, - const bool index_key_includes_seq, - const bool index_value_is_full, - MemoryAllocator* memory_allocator) { - std::unique_ptr index_block; - auto s = ReadBlockFromFile( - file, prefetch_buffer, footer, ReadOptions(), index_handle, - &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), - cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */, memory_allocator); + static Status Create(BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, bool use_cache, + bool prefetch, bool pin, IndexReader** index_reader) { + assert(table != nullptr); + assert(table->get_rep()); + assert(!pin || prefetch); + assert(index_reader != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = ReadIndexBlock(table, prefetch_buffer, ReadOptions(), + nullptr /* get_context */, &index_block); + if (!s.ok()) { + return s; + } - if (s.ok()) { - *index_reader = new BinarySearchIndexReader( - icomparator, std::move(index_block), ioptions.statistics, - index_key_includes_seq, index_value_is_full); + if (use_cache && !pin) { + index_block.Reset(); + } } - return s; + *index_reader = new BinarySearchIndexReader(table, std::move(index_block)); + + return Status::OK(); } InternalIteratorBase* NewIterator( - IndexBlockIter* iter = nullptr, bool /*dont_care*/ = true, - bool /*dont_care*/ = true) override { + const ReadOptions& read_options, bool /* disable_prefix_seek */, + IndexBlockIter* iter, GetContext* get_context) override { + CachableEntry index_block; + const Status s = GetOrReadIndexBlock(read_options, get_context, + &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + Statistics* kNullStats = nullptr; - // We don't return pinned datat from index blocks, so no need + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - return index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), iter, kNullStats, true, - index_key_includes_seq_, index_value_is_full_); - } + auto it = index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), iter, + kNullStats, true, index_key_includes_seq(), index_value_is_full()); - size_t size() const override { return index_block_->size(); } - size_t usable_size() const override { return index_block_->usable_size(); } + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + } size_t ApproximateMemoryUsage() const override { - assert(index_block_); - size_t usage = index_block_->ApproximateMemoryUsage(); + size_t usage = ApproximateIndexBlockMemoryUsage(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE usage += malloc_usable_size((void*)this); #else @@ -445,60 +557,51 @@ class BinarySearchIndexReader : public IndexReader { } private: - BinarySearchIndexReader(const InternalKeyComparator* icomparator, - std::unique_ptr&& index_block, - Statistics* stats, const bool index_key_includes_seq, - const bool index_value_is_full) - : IndexReader(icomparator, stats), - index_block_(std::move(index_block)), - index_key_includes_seq_(index_key_includes_seq), - index_value_is_full_(index_value_is_full) { - assert(index_block_ != nullptr); - } - std::unique_ptr index_block_; - const bool index_key_includes_seq_; - const bool index_value_is_full_; + BinarySearchIndexReader(BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) + {} }; // Index that leverages an internal hash table to quicken the lookup for a given // key. -class HashIndexReader : public IndexReader { +class HashIndexReader : public BlockBasedTable::IndexReaderCommon { public: - static Status Create( - const SliceTransform* hash_key_extractor, const Footer& footer, - RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, - const ImmutableCFOptions& ioptions, - const InternalKeyComparator* icomparator, const BlockHandle& index_handle, - InternalIterator* meta_index_iter, IndexReader** index_reader, - bool /*hash_index_allow_collision*/, - const PersistentCacheOptions& cache_options, - const bool index_key_includes_seq, const bool index_value_is_full, - MemoryAllocator* memory_allocator) { - std::unique_ptr index_block; - auto s = ReadBlockFromFile( - file, prefetch_buffer, footer, ReadOptions(), index_handle, - &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), - cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */, memory_allocator); + static Status Create(BlockBasedTable* table, + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* meta_index_iter, bool use_cache, + bool prefetch, bool pin, IndexReader** index_reader) { + assert(table != nullptr); + assert(index_reader != nullptr); + assert(!pin || prefetch); + + auto rep = table->get_rep(); + assert(rep != nullptr); + + CachableEntry index_block; + if (prefetch || !use_cache) { + const Status s = ReadIndexBlock(table, prefetch_buffer, ReadOptions(), + nullptr /* get_context */, &index_block); + if (!s.ok()) { + return s; + } - if (!s.ok()) { - return s; + if (use_cache && !pin) { + index_block.Reset(); + } } // Note, failure to create prefix hash index does not need to be a // hard error. We can still fall back to the original binary search index. // So, Create will succeed regardless, from this point on. - auto new_index_reader = new HashIndexReader( - icomparator, std::move(index_block), ioptions.statistics, - index_key_includes_seq, index_value_is_full); + auto new_index_reader = new HashIndexReader(table, std::move(index_block)); *index_reader = new_index_reader; // Get prefixes block BlockHandle prefixes_handle; - s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, - &prefixes_handle); + Status s = FindMetaBlock(meta_index_iter, kHashIndexPrefixesBlock, + &prefixes_handle); if (!s.ok()) { // TODO: log error return Status::OK(); @@ -513,6 +616,13 @@ class HashIndexReader : public IndexReader { return Status::OK(); } + RandomAccessFileReader* const file = rep->file.get(); + const Footer& footer = rep->footer; + const ImmutableCFOptions& ioptions = rep->ioptions; + const PersistentCacheOptions& cache_options = rep->persistent_cache_options; + MemoryAllocator* const memory_allocator = + GetMemoryAllocator(rep->table_options); + // Read contents for the blocks BlockContents prefixes_contents; BlockFetcher prefixes_block_fetcher( @@ -537,7 +647,8 @@ class HashIndexReader : public IndexReader { } BlockPrefixIndex* prefix_index = nullptr; - s = BlockPrefixIndex::Create(hash_key_extractor, prefixes_contents.data, + s = BlockPrefixIndex::Create(rep->internal_prefix_transform.get(), + prefixes_contents.data, prefixes_meta_contents.data, &prefix_index); // TODO: log error if (s.ok()) { @@ -548,24 +659,39 @@ class HashIndexReader : public IndexReader { } InternalIteratorBase* NewIterator( - IndexBlockIter* iter = nullptr, bool total_order_seek = true, - bool /*dont_care*/ = true) override { + const ReadOptions& read_options, bool disable_prefix_seek, + IndexBlockIter* iter, GetContext* get_context) override { + CachableEntry index_block; + const Status s = GetOrReadIndexBlock(read_options, get_context, + &index_block); + if (!s.ok()) { + if (iter != nullptr) { + iter->Invalidate(s); + return iter; + } + + return NewErrorInternalIterator(s); + } + Statistics* kNullStats = nullptr; - // We don't return pinned datat from index blocks, so no need + const bool total_order_seek = read_options.total_order_seek || + disable_prefix_seek; + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - return index_block_->NewIterator( - icomparator_, icomparator_->user_comparator(), iter, kNullStats, - total_order_seek, index_key_includes_seq_, index_value_is_full_, - false /* block_contents_pinned */, prefix_index_.get()); - } + auto it = index_block.GetValue()->NewIterator( + internal_comparator(), internal_comparator()->user_comparator(), iter, + kNullStats, total_order_seek, index_key_includes_seq(), + index_value_is_full(), false /* block_contents_pinned */, + prefix_index_.get()); - size_t size() const override { return index_block_->size(); } - size_t usable_size() const override { return index_block_->usable_size(); } + assert(it != nullptr); + index_block.TransferTo(it); + + return it; + } size_t ApproximateMemoryUsage() const override { - assert(index_block_); - size_t usage = index_block_->ApproximateMemoryUsage(); - usage += prefixes_contents_.usable_size(); + size_t usage = ApproximateIndexBlockMemoryUsage(); #ifdef ROCKSDB_MALLOC_USABLE_SIZE usage += malloc_usable_size((void*)this); #else @@ -578,37 +704,22 @@ class HashIndexReader : public IndexReader { } private: - HashIndexReader(const InternalKeyComparator* icomparator, - std::unique_ptr&& index_block, Statistics* stats, - const bool index_key_includes_seq, - const bool index_value_is_full) - : IndexReader(icomparator, stats), - index_block_(std::move(index_block)), - index_key_includes_seq_(index_key_includes_seq), - index_value_is_full_(index_value_is_full) { - assert(index_block_ != nullptr); - } + HashIndexReader(BlockBasedTable* t, + CachableEntry&& index_block) + : IndexReaderCommon(t, std::move(index_block)) + {} - ~HashIndexReader() override {} - - std::unique_ptr index_block_; std::unique_ptr prefix_index_; - BlockContents prefixes_contents_; - const bool index_key_includes_seq_; - const bool index_value_is_full_; }; // Helper function to setup the cache key's prefix for the Table. -void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep, uint64_t file_size) { +void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { assert(kMaxCacheKeyPrefixSize >= 10); rep->cache_key_prefix_size = 0; rep->compressed_cache_key_prefix_size = 0; if (rep->table_options.block_cache != nullptr) { GenerateCachePrefix(rep->table_options.block_cache.get(), rep->file->file(), &rep->cache_key_prefix[0], &rep->cache_key_prefix_size); - // Create dummy offset of index reader which is beyond the file size. - rep->dummy_index_reader_offset = - file_size + rep->table_options.block_cache->NewId(); } if (rep->table_options.persistent_cache != nullptr) { GenerateCachePrefix(/*cache=*/nullptr, rep->file->file(), @@ -814,7 +925,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, // handle prefix correctly. rep->internal_prefix_transform.reset( new InternalKeySliceTransform(prefix_extractor)); - SetupCacheKeyPrefix(rep, file_size); + SetupCacheKeyPrefix(rep); std::unique_ptr new_table(new BlockBasedTable(rep)); // page cache options @@ -848,9 +959,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, return s; } s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(), - new_table.get(), prefix_extractor, - prefetch_all, table_options, level, - prefetch_index_and_filter_in_cache); + new_table.get(), prefetch_all, table_options, + level); if (s.ok()) { // Update tail prefetch stats @@ -1116,9 +1226,8 @@ Status BlockBasedTable::ReadCompressionDictBlock( Status BlockBasedTable::PrefetchIndexAndFilterBlocks( Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, - BlockBasedTable* new_table, const SliceTransform* prefix_extractor, - bool prefetch_all, const BlockBasedTableOptions& table_options, - const int level, const bool prefetch_index_and_filter_in_cache) { + BlockBasedTable* new_table, bool prefetch_all, + const BlockBasedTableOptions& table_options, const int level) { Status s; // Find filter handle and filter type @@ -1157,10 +1266,10 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( &rep->compression_dict_handle); } - bool need_upper_bound_check = - PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor); - BlockBasedTableOptions::IndexType index_type = new_table->UpdateIndexType(); + + const bool use_cache = table_options.cache_index_and_filter_blocks; + // prefetch the first level of index const bool prefetch_index = prefetch_all || @@ -1183,39 +1292,34 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( const bool pin_filter = pin_all || (table_options.pin_top_level_index_and_filter && rep->filter_type == Rep::FilterType::kPartitionedFilter); + + IndexReader* index_reader = nullptr; + if (s.ok()) { + s = new_table->CreateIndexReader(prefetch_buffer, meta_iter, use_cache, + prefetch_index, pin_index, &index_reader); + if (s.ok()) { + assert(index_reader != nullptr); + rep->index_reader.reset(index_reader); + // The partitions of partitioned index are always stored in cache. They + // are hence follow the configuration for pin and prefetch regardless of + // the value of cache_index_and_filter_blocks + if (prefetch_all) { + rep->index_reader->CacheDependencies(pin_all); + } + } else { + delete index_reader; + index_reader = nullptr; + } + } + // pre-fetching of blocks is turned on // Will use block cache for meta-blocks access // Always prefetch index and filter for level 0 // TODO(ajkr): also prefetch compression dictionary block + // TODO(ajkr): also pin compression dictionary block when + // `pin_l0_filter_and_index_blocks_in_cache == true`. if (table_options.cache_index_and_filter_blocks) { assert(table_options.block_cache != nullptr); - if (prefetch_index) { - // Hack: Call NewIndexIterator() to implicitly add index to the - // block_cache - CachableEntry index_entry; - // check prefix_extractor match only if hash based index is used - bool disable_prefix_seek = - rep->index_type == BlockBasedTableOptions::kHashSearch && - need_upper_bound_check; - if (s.ok()) { - std::unique_ptr> iter( - new_table->NewIndexIterator(ReadOptions(), disable_prefix_seek, - nullptr, &index_entry)); - s = iter->status(); - } - if (s.ok()) { - // This is the first call to NewIndexIterator() since we're in Open(). - // On success it should give us ownership of the `CachableEntry` by - // populating `index_entry`. - assert(index_entry.GetValue() != nullptr); - if (prefetch_all) { - index_entry.GetValue()->CacheDependencies(pin_all); - } - if (pin_index) { - rep->index_entry = std::move(index_entry); - } - } - } if (s.ok() && prefetch_filter) { // Hack: Call GetFilter() to implicitly add filter to the block_cache auto filter_entry = @@ -1232,24 +1336,8 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( } } } else { - // If we don't use block cache for meta-block access, we'll pre-load these - // blocks, which will kept in member variables in Rep and with a same life- - // time as this table object. - IndexReader* index_reader = nullptr; - if (s.ok()) { - s = new_table->CreateIndexReader(prefetch_buffer, &index_reader, - meta_iter, level); - } std::unique_ptr compression_dict_block; if (s.ok()) { - rep->index_reader.reset(index_reader); - // The partitions of partitioned index are always stored in cache. They - // are hence follow the configuration for pin and prefetch regardless of - // the value of cache_index_and_filter_blocks - if (prefetch_index_and_filter_in_cache || level == 0) { - rep->index_reader->CacheDependencies(pin_all); - } - // Set filter block if (rep->filter_policy) { const bool is_a_filter_partition = true; @@ -1259,14 +1347,12 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( rep->filter.reset(filter); // Refer to the comment above about paritioned indexes always being // cached - if (filter && (prefetch_index_and_filter_in_cache || level == 0)) { + if (filter && prefetch_all) { filter->CacheDependencies(pin_all, rep->table_prefix_extractor.get()); } } s = ReadCompressionDictBlock(rep, prefetch_buffer, &compression_dict_block); - } else { - delete index_reader; } if (s.ok() && !rep->compression_dict_handle.IsNull()) { assert(compression_dict_block != nullptr); @@ -1350,7 +1436,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, Status BlockBasedTable::GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, - Cache* block_cache, Cache* block_cache_compressed, Rep* rep, + Cache* block_cache, Cache* block_cache_compressed, const Rep* rep, const ReadOptions& read_options, CachableEntry* block, const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit, bool is_index, GetContext* get_context) { @@ -1379,6 +1465,10 @@ Status BlockBasedTable::GetDataBlockFromCache( : nullptr, statistics, get_context); if (cache_handle != nullptr) { + if (is_index) { + PERF_COUNTER_ADD(block_cache_index_hit_count, 1); + } + block->SetCachedValue( reinterpret_cast(block_cache->Value(cache_handle)), block_cache, cache_handle); @@ -1843,119 +1933,15 @@ BlockBasedTable::GetUncompressionDict(Rep* rep, // differs from the one in mutable_cf_options and index type is HashBasedIndex InternalIteratorBase* BlockBasedTable::NewIndexIterator( const ReadOptions& read_options, bool disable_prefix_seek, - IndexBlockIter* input_iter, CachableEntry* index_entry, - GetContext* get_context) { - // index reader has already been pre-populated. - if (rep_->index_reader) { - // We don't return pinned datat from index blocks, so no need - // to set `block_contents_pinned`. - return rep_->index_reader->NewIterator( - input_iter, read_options.total_order_seek || disable_prefix_seek, - read_options.fill_cache); - } - // we have a pinned index block - if (rep_->index_entry.IsCached()) { - // We don't return pinned datat from index blocks, so no need - // to set `block_contents_pinned`. - return rep_->index_entry.GetValue()->NewIterator( - input_iter, read_options.total_order_seek || disable_prefix_seek, - read_options.fill_cache); - } - - PERF_TIMER_GUARD(read_index_block_nanos); - - const bool no_io = read_options.read_tier == kBlockCacheTier; - Cache* block_cache = rep_->table_options.block_cache.get(); - char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - auto key = - GetCacheKeyFromOffset(rep_->cache_key_prefix, rep_->cache_key_prefix_size, - rep_->dummy_index_reader_offset, cache_key); - Statistics* statistics = rep_->ioptions.statistics; - auto cache_handle = GetEntryFromCache( - block_cache, key, rep_->level, BLOCK_CACHE_INDEX_MISS, - BLOCK_CACHE_INDEX_HIT, - get_context ? &get_context->get_context_stats_.num_cache_index_miss - : nullptr, - get_context ? &get_context->get_context_stats_.num_cache_index_hit - : nullptr, - statistics, get_context); - - if (cache_handle == nullptr && no_io) { - if (input_iter != nullptr) { - input_iter->Invalidate(Status::Incomplete("no blocking io")); - return input_iter; - } else { - return NewErrorInternalIterator( - Status::Incomplete("no blocking io")); - } - } + IndexBlockIter* input_iter, GetContext* get_context) { - IndexReader* index_reader = nullptr; - if (cache_handle != nullptr) { - PERF_COUNTER_ADD(block_cache_index_hit_count, 1); - index_reader = - reinterpret_cast(block_cache->Value(cache_handle)); - } else { - // Create index reader and put it in the cache. - Status s; - TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:2"); - s = CreateIndexReader(nullptr /* prefetch_buffer */, &index_reader); - TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:1"); - TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread2:3"); - TEST_SYNC_POINT("BlockBasedTable::NewIndexIterator::thread1:4"); - size_t charge = 0; - if (s.ok()) { - assert(index_reader != nullptr); - charge = index_reader->ApproximateMemoryUsage(); - s = block_cache->Insert( - key, index_reader, charge, &DeleteCachedIndexEntry, &cache_handle, - rep_->table_options.cache_index_and_filter_blocks_with_high_priority - ? Cache::Priority::HIGH - : Cache::Priority::LOW); - } - - if (s.ok()) { - if (get_context != nullptr) { - get_context->get_context_stats_.num_cache_add++; - get_context->get_context_stats_.num_cache_bytes_write += charge; - } else { - RecordTick(statistics, BLOCK_CACHE_ADD); - RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge); - } - PERF_COUNTER_ADD(index_block_read_count, 1); - RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); - RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge); - } else { - if (index_reader != nullptr) { - delete index_reader; - } - RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); - // make sure if something goes wrong, index_reader shall remain intact. - if (input_iter != nullptr) { - input_iter->Invalidate(s); - return input_iter; - } else { - return NewErrorInternalIterator(s); - } - } - } + assert(rep_ != nullptr); + assert(rep_->index_reader != nullptr); - assert(cache_handle); - // We don't return pinned datat from index blocks, so no need + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. - auto* iter = index_reader->NewIterator( - input_iter, read_options.total_order_seek || disable_prefix_seek); - - // the caller would like to take ownership of the index block - // don't call RegisterCleanup() in this case, the caller will take care of it - if (index_entry != nullptr) { - *index_entry = {index_reader, block_cache, cache_handle, - false /* own_value */}; - } else { - iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); - } - - return iter; + return rep_->index_reader->NewIterator(read_options, disable_prefix_seek, + input_iter, get_context); } // Convert an index iterator value (i.e., an encoded BlockHandle) @@ -1970,118 +1956,85 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( FilePrefetchBuffer* prefetch_buffer) { PERF_TIMER_GUARD(new_table_block_iter_nanos); - Cache* block_cache = rep->table_options.block_cache.get(); + TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter; + if (!s.ok()) { + iter->Invalidate(s); + return iter; + } + + const bool no_io = (ro.read_tier == kBlockCacheTier); + auto uncompression_dict_storage = + GetUncompressionDict(rep, prefetch_buffer, no_io, get_context); + const UncompressionDict& uncompression_dict = + uncompression_dict_storage.GetValue() == nullptr + ? UncompressionDict::GetEmptyDict() + : *uncompression_dict_storage.GetValue(); + CachableEntry block; - TBlockIter* iter; - { - const bool no_io = (ro.read_tier == kBlockCacheTier); - auto uncompression_dict_storage = - GetUncompressionDict(rep, prefetch_buffer, no_io, get_context); - const UncompressionDict& uncompression_dict = - uncompression_dict_storage.GetValue() == nullptr - ? UncompressionDict::GetEmptyDict() - : *uncompression_dict_storage.GetValue(); - if (s.ok()) { - s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle, - uncompression_dict, &block, is_index, - get_context); - } + s = RetrieveBlock(prefetch_buffer, rep, ro, handle, uncompression_dict, + &block, is_index, get_context); - if (input_iter != nullptr) { - iter = input_iter; - } else { - iter = new TBlockIter; - } - // Didn't get any data from block caches. - if (s.ok() && block.GetValue() == nullptr) { - if (no_io) { - // Could not read from block_cache and can't do IO - iter->Invalidate(Status::Incomplete("no blocking io")); - return iter; - } - std::unique_ptr block_value; - { - StopWatch sw(rep->ioptions.env, rep->ioptions.statistics, - READ_BLOCK_GET_MICROS); - s = ReadBlockFromFile( - rep->file.get(), prefetch_buffer, rep->footer, ro, handle, - &block_value, rep->ioptions, - rep->blocks_maybe_compressed /*do_decompress*/, - rep->blocks_maybe_compressed, uncompression_dict, - rep->persistent_cache_options, - is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, - GetMemoryAllocator(rep->table_options)); - } + if (!s.ok()) { + assert(block.IsEmpty()); + iter->Invalidate(s); + return iter; + } + + assert(block.GetValue() != nullptr); + constexpr bool kTotalOrderSeek = true; + // Block contents are pinned and it is still pinned after the iterator + // is destroyed as long as cleanup functions are moved to another object, + // when: + // 1. block cache handle is set to be released in cleanup function, or + // 2. it's pointing to immortal source. If own_bytes is true then we are + // not reading data from the original source, whether immortal or not. + // Otherwise, the block is pinned iff the source is immortal. + const bool block_contents_pinned = block.IsCached() || + (!block.GetValue()->own_bytes() && rep->immortal_table); + iter = block.GetValue()->NewIterator( + &rep->internal_comparator, rep->internal_comparator.user_comparator(), + iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq, + index_key_is_full, block_contents_pinned); + + if (!block.IsCached()) { + if (!ro.fill_cache && rep->cache_key_prefix_size != 0) { + // insert a dummy record to block cache to track the memory usage + Cache* const block_cache = rep->table_options.block_cache.get(); + Cache::Handle* cache_handle = nullptr; + // There are two other types of cache keys: 1) SST cache key added in + // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in + // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate + // from SST cache key(31 bytes), and use non-zero prefix to + // differentiate from `write_buffer_manager` + const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; + char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; + // Prefix: use rep->cache_key_prefix padded by 0s + memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); + assert(rep->cache_key_prefix_size != 0); + assert(rep->cache_key_prefix_size <= kExtraCacheKeyPrefix); + memcpy(cache_key, rep->cache_key_prefix, rep->cache_key_prefix_size); + char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, + next_cache_key_id_++); + assert(end - cache_key <= + static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); + const Slice unique_key(cache_key, static_cast(end - cache_key)); + s = block_cache->Insert(unique_key, nullptr, + block.GetValue()->ApproximateMemoryUsage(), + nullptr, &cache_handle); if (s.ok()) { - block.SetOwnedValue(block_value.release()); + assert(cache_handle != nullptr); + iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, + cache_handle); } } - // TODO(ajkr): also pin compression dictionary block when - // `pin_l0_filter_and_index_blocks_in_cache == true`. } - if (s.ok()) { - assert(block.GetValue() != nullptr); - const bool kTotalOrderSeek = true; - // Block contents are pinned and it is still pinned after the iterator - // is destroyed as long as cleanup functions are moved to another object, - // when: - // 1. block cache handle is set to be released in cleanup function, or - // 2. it's pointing to immortal source. If own_bytes is true then we are - // not reading data from the original source, whether immortal or not. - // Otherwise, the block is pinned iff the source is immortal. - bool block_contents_pinned = - (block.IsCached() || - (!block.GetValue()->own_bytes() && rep->immortal_table)); - iter = block.GetValue()->NewIterator( - &rep->internal_comparator, rep->internal_comparator.user_comparator(), - iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq, - index_key_is_full, block_contents_pinned); - if (!block.IsCached()) { - if (!ro.fill_cache && rep->cache_key_prefix_size != 0) { - // insert a dummy record to block cache to track the memory usage - Cache::Handle* cache_handle; - // There are two other types of cache keys: 1) SST cache key added in - // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in - // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate - // from SST cache key(31 bytes), and use non-zero prefix to - // differentiate from `write_buffer_manager` - const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1; - char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length]; - // Prefix: use rep->cache_key_prefix padded by 0s - memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length); - assert(rep->cache_key_prefix_size != 0); - assert(rep->cache_key_prefix_size <= kExtraCacheKeyPrefix); - memcpy(cache_key, rep->cache_key_prefix, rep->cache_key_prefix_size); - char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix, - next_cache_key_id_++); - assert(end - cache_key <= - static_cast(kExtraCacheKeyPrefix + kMaxVarint64Length)); - Slice unique_key = - Slice(cache_key, static_cast(end - cache_key)); - s = block_cache->Insert(unique_key, nullptr, - block.GetValue()->ApproximateMemoryUsage(), - nullptr, &cache_handle); - if (s.ok()) { - if (cache_handle != nullptr) { - iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, - cache_handle); - } - } - } - } - - block.TransferTo(iter); - } else { - assert(block.GetValue() == nullptr); - iter->Invalidate(s); - } + block.TransferTo(iter); return iter; } Status BlockBasedTable::MaybeReadBlockAndLoadToCache( - FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, + FilePrefetchBuffer* prefetch_buffer, const Rep* rep, const ReadOptions& ro, const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, bool is_index, GetContext* get_context) { assert(block_entry != nullptr); @@ -2116,7 +2069,8 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, rep, ro, block_entry, uncompression_dict, - rep->table_options.read_amp_bytes_per_bit, + !is_index ? + rep->table_options.read_amp_bytes_per_bit : 0, is_index, get_context); // Can't find the block from the cache. If I/O is allowed, read from the @@ -2148,7 +2102,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions, block_entry, &raw_block_contents, raw_block_comp_type, rep->table_options.format_version, uncompression_dict, seq_no, - rep->table_options.read_amp_bytes_per_bit, + !is_index ? rep->table_options.read_amp_bytes_per_bit : 0, GetMemoryAllocator(rep->table_options), is_index, is_index && rep->table_options .cache_index_and_filter_blocks_with_high_priority @@ -2162,6 +2116,64 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( return s; } +Status BlockBasedTable::RetrieveBlock( + FilePrefetchBuffer* prefetch_buffer, const Rep* rep, const ReadOptions& ro, + const BlockHandle& handle, const UncompressionDict& uncompression_dict, + CachableEntry* block_entry, bool is_index, GetContext* get_context) { + + assert(rep); + assert(block_entry); + assert(block_entry->IsEmpty()); + + Status s; + if (!is_index || rep->table_options.cache_index_and_filter_blocks) { + s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle, + uncompression_dict, block_entry, + is_index, get_context); + + if (!s.ok()) { + return s; + } + + if (block_entry->GetValue() != nullptr) { + assert (s.ok()); + return s; + } + } + + assert(block_entry->IsEmpty()); + + const bool no_io = ro.read_tier == kBlockCacheTier; + if (no_io) { + return Status::Incomplete("no blocking io"); + } + + std::unique_ptr block; + + { + StopWatch sw(rep->ioptions.env, rep->ioptions.statistics, + READ_BLOCK_GET_MICROS); + s = ReadBlockFromFile(rep->file.get(), prefetch_buffer, rep->footer, ro, + handle, &block, rep->ioptions, + rep->blocks_maybe_compressed, + rep->blocks_maybe_compressed, uncompression_dict, + rep->persistent_cache_options, + rep->get_global_seqno(is_index), + !is_index ? + rep->table_options.read_amp_bytes_per_bit : 0, + GetMemoryAllocator(rep->table_options)); + } + + if (!s.ok()) { + return s; + } + + block_entry->SetOwnedValue(block.release()); + + assert(s.ok()); + return s; +} + BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState( BlockBasedTable* table, std::unordered_map>* block_map, @@ -2188,7 +2200,7 @@ BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator( RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ, block_cache->GetUsage(block->second.GetCacheHandle())); Statistics* kNullStats = nullptr; - // We don't return pinned datat from index blocks, so no need + // We don't return pinned data from index blocks, so no need // to set `block_contents_pinned`. return block->second.GetValue()->NewIterator( &rep->internal_comparator, rep->internal_comparator.user_comparator(), @@ -2747,7 +2759,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, } auto iiter = NewIndexIterator(read_options, need_upper_bound_check, &iiter_on_stack, - /* index_entry */ nullptr, get_context); + get_context); std::unique_ptr> iiter_unique_ptr; if (iiter != &iiter_on_stack) { iiter_unique_ptr.reset(iiter); @@ -2868,7 +2880,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, } auto iiter = NewIndexIterator( read_options, need_upper_bound_check, &iiter_on_stack, - /* index_entry */ nullptr, sst_file_range.begin()->get_context); + sst_file_range.begin()->get_context); std::unique_ptr> iiter_unique_ptr; if (iiter != &iiter_on_stack) { iiter_unique_ptr.reset(iiter); @@ -3085,45 +3097,37 @@ Status BlockBasedTable::VerifyChecksumInMetaBlocks( return s; } +bool BlockBasedTable::TEST_BlockInCache(const BlockHandle& handle) const { + assert(rep_ != nullptr); + + Cache* const cache = rep_->table_options.block_cache.get(); + if (cache == nullptr) { + return false; + } + + char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + Slice cache_key = GetCacheKey(rep_->cache_key_prefix, + rep_->cache_key_prefix_size, handle, + cache_key_storage); + + Cache::Handle* const cache_handle = cache->Lookup(cache_key); + if (cache_handle == nullptr) { + return false; + } + + cache->Release(cache_handle); + + return true; +} + bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, const Slice& key) { std::unique_ptr> iiter( NewIndexIterator(options)); iiter->Seek(key); assert(iiter->Valid()); - CachableEntry block; - - BlockHandle handle = iiter->value(); - Cache* block_cache = rep_->table_options.block_cache.get(); - assert(block_cache != nullptr); - - char cache_key_storage[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; - Slice cache_key = - GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, handle, - cache_key_storage); - Slice ckey; - Status s; - if (!rep_->compression_dict_handle.IsNull()) { - std::unique_ptr compression_dict_block; - s = ReadCompressionDictBlock(rep_, nullptr /* prefetch_buffer */, - &compression_dict_block); - if (s.ok()) { - assert(compression_dict_block != nullptr); - UncompressionDict uncompression_dict( - compression_dict_block->data.ToString(), - rep_->blocks_definitely_zstd_compressed); - s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, rep_, - options, &block, uncompression_dict, - 0 /* read_amp_bytes_per_bit */); - } - } else { - s = GetDataBlockFromCache( - cache_key, ckey, block_cache, nullptr, rep_, options, &block, - UncompressionDict::GetEmptyDict(), 0 /* read_amp_bytes_per_bit */); - } - assert(s.ok()); - return block.IsCached(); + return TEST_BlockInCache(iiter->value()); } BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() { @@ -3151,14 +3155,11 @@ BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() { // 4. internal_comparator // 5. index_type Status BlockBasedTable::CreateIndexReader( - FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader, - InternalIterator* preloaded_meta_index_iter, int level) { + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* preloaded_meta_index_iter, bool use_cache, bool prefetch, + bool pin, IndexReader** index_reader) { auto index_type_on_file = rep_->index_type; - auto file = rep_->file.get(); - const InternalKeyComparator* icomparator = &rep_->internal_comparator; - const Footer& footer = rep_->footer; - // kHashSearch requires non-empty prefix_extractor but bypass checking // prefix_extractor here since we have no access to MutableCFOptions. // Add need_upper_bound_check flag in BlockBasedTable::NewIndexIterator. @@ -3167,25 +3168,12 @@ Status BlockBasedTable::CreateIndexReader( switch (index_type_on_file) { case BlockBasedTableOptions::kTwoLevelIndexSearch: { - return PartitionIndexReader::Create( - this, file, prefetch_buffer, footer, footer.index_handle(), - rep_->ioptions, icomparator, index_reader, - rep_->persistent_cache_options, level, - rep_->table_properties == nullptr || - rep_->table_properties->index_key_is_user_key == 0, - rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0, - GetMemoryAllocator(rep_->table_options)); + return PartitionIndexReader::Create(this, prefetch_buffer, use_cache, + prefetch, pin, index_reader); } case BlockBasedTableOptions::kBinarySearch: { - return BinarySearchIndexReader::Create( - file, prefetch_buffer, footer, footer.index_handle(), rep_->ioptions, - icomparator, index_reader, rep_->persistent_cache_options, - rep_->table_properties == nullptr || - rep_->table_properties->index_key_is_user_key == 0, - rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0, - GetMemoryAllocator(rep_->table_options)); + return BinarySearchIndexReader::Create(this, prefetch_buffer, use_cache, + prefetch, pin, index_reader); } case BlockBasedTableOptions::kHashSearch: { std::unique_ptr meta_guard; @@ -3200,29 +3188,15 @@ Status BlockBasedTable::CreateIndexReader( ROCKS_LOG_WARN(rep_->ioptions.info_log, "Unable to read the metaindex block." " Fall back to binary search index."); - return BinarySearchIndexReader::Create( - file, prefetch_buffer, footer, footer.index_handle(), - rep_->ioptions, icomparator, index_reader, - rep_->persistent_cache_options, - rep_->table_properties == nullptr || - rep_->table_properties->index_key_is_user_key == 0, - rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0, - GetMemoryAllocator(rep_->table_options)); + return BinarySearchIndexReader::Create(this, prefetch_buffer, + use_cache, prefetch, pin, + index_reader); } meta_index_iter = meta_iter_guard.get(); } - return HashIndexReader::Create( - rep_->internal_prefix_transform.get(), footer, file, prefetch_buffer, - rep_->ioptions, icomparator, footer.index_handle(), meta_index_iter, - index_reader, rep_->hash_index_allow_collision, - rep_->persistent_cache_options, - rep_->table_properties == nullptr || - rep_->table_properties->index_key_is_user_key == 0, - rep_->table_properties == nullptr || - rep_->table_properties->index_value_is_delta_encoded == 0, - GetMemoryAllocator(rep_->table_options)); + return HashIndexReader::Create(this, prefetch_buffer, meta_index_iter, + use_cache, prefetch, pin, index_reader); } default: { std::string error_message = @@ -3261,8 +3235,10 @@ bool BlockBasedTable::TEST_filter_block_preloaded() const { return rep_->filter != nullptr; } -bool BlockBasedTable::TEST_index_reader_preloaded() const { - return rep_->index_reader != nullptr; +bool BlockBasedTable::TEST_IndexBlockInCache() const { + assert(rep_ != nullptr); + + return TEST_BlockInCache(rep_->footer.index_handle()); } Status BlockBasedTable::GetKVPairsFromDataBlocks( @@ -3479,12 +3455,6 @@ void BlockBasedTable::Close() { rep_->filter_handle, cache_key); cache->Erase(key); - // Get the index block key - key = GetCacheKeyFromOffset(rep_->cache_key_prefix, - rep_->cache_key_prefix_size, - rep_->dummy_index_reader_offset, cache_key); - cache->Erase(key); - if (!rep_->compression_dict_handle.IsNull()) { // Get the compression dictionary block key key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size, @@ -3674,15 +3644,6 @@ void DeleteCachedFilterEntry(const Slice& /*key*/, void* value) { delete filter; } -void DeleteCachedIndexEntry(const Slice& /*key*/, void* value) { - IndexReader* index_reader = reinterpret_cast(value); - if (index_reader->statistics() != nullptr) { - RecordTick(index_reader->statistics(), BLOCK_CACHE_INDEX_BYTES_EVICT, - index_reader->ApproximateMemoryUsage()); - } - delete index_reader; -} - void DeleteCachedUncompressionDictEntry(const Slice& /*key*/, void* value) { UncompressionDict* dict = reinterpret_cast(value); RecordTick(dict->statistics(), BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT, diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 270409b3a..54ce34d61 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -150,6 +150,8 @@ class BlockBasedTable : public TableReader { // be close to the file length. uint64_t ApproximateOffsetOf(const Slice& key) override; + bool TEST_BlockInCache(const BlockHandle& handle) const; + // Returns true if the block for the specified key is in cache. // REQUIRES: key is in this table && block cache enabled bool TEST_KeyInCache(const ReadOptions& options, const Slice& key); @@ -173,54 +175,35 @@ class BlockBasedTable : public TableReader { ~BlockBasedTable(); bool TEST_filter_block_preloaded() const; - bool TEST_index_reader_preloaded() const; + bool TEST_IndexBlockInCache() const; - // IndexReader is the interface that provide the functionality for index + // IndexReader is the interface that provides the functionality for index // access. class IndexReader { public: - explicit IndexReader(const InternalKeyComparator* icomparator, - Statistics* stats) - : icomparator_(icomparator), statistics_(stats) {} - - virtual ~IndexReader() {} - - // Create an iterator for index access. - // If iter is null then a new object is created on heap and the callee will - // have the ownership. If a non-null iter is passed in it will be used, and - // the returned value is either the same as iter or a new on-heap object - // that - // wrapps the passed iter. In the latter case the return value would point - // to - // a different object then iter and the callee has the ownership of the + virtual ~IndexReader() = default; + + // Create an iterator for index access. If iter is null, then a new object + // is created on the heap, and the callee will have the ownership. + // If a non-null iter is passed in, it will be used, and the returned value + // is either the same as iter or a new on-heap object that + // wraps the passed iter. In the latter case the return value points + // to a different object then iter, and the callee has the ownership of the // returned object. virtual InternalIteratorBase* NewIterator( - IndexBlockIter* iter = nullptr, bool total_order_seek = true, - bool fill_cache = true) = 0; - - // The size of the index. - virtual size_t size() const = 0; - // Memory usage of the index block - virtual size_t usable_size() const = 0; - // return the statistics pointer - virtual Statistics* statistics() const { return statistics_; } + const ReadOptions& read_options, bool disable_prefix_seek, + IndexBlockIter* iter, GetContext* get_context) = 0; + // Report an approximation of how much memory has been used other than - // memory - // that was allocated in block cache. + // memory that was allocated in block cache. virtual size_t ApproximateMemoryUsage() const = 0; - - virtual void CacheDependencies(bool /* unused */) {} - - // Prefetch all the blocks referenced by this index to the buffer - void PrefetchBlocks(FilePrefetchBuffer* buf); - - protected: - const InternalKeyComparator* icomparator_; - - private: - Statistics* statistics_; + // Cache the dependencies of the index reader (e.g. the partitions + // of a partitioned index). + virtual void CacheDependencies(bool /* pin */) {} }; + class IndexReaderCommon; + static Slice GetCacheKey(const char* cache_key_prefix, size_t cache_key_prefix_size, const BlockHandle& handle, char* cache_key); @@ -271,11 +254,22 @@ class BlockBasedTable : public TableReader { // in uncompressed block cache, also sets cache_handle to reference that // block. static Status MaybeReadBlockAndLoadToCache( - FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, - const BlockHandle& handle, const UncompressionDict& uncompression_dict, + FilePrefetchBuffer* prefetch_buffer, const Rep* rep, + const ReadOptions& ro, const BlockHandle& handle, + const UncompressionDict& uncompression_dict, CachableEntry* block_entry, bool is_index = false, GetContext* get_context = nullptr); + // Similar to the above, with one crucial difference: it will retrieve the + // block from the file even if there are no caches configured (assuming the + // read options allow I/O). + static Status RetrieveBlock( + FilePrefetchBuffer* prefetch_buffer, const Rep* rep, + const ReadOptions& ro, const BlockHandle& handle, + const UncompressionDict& uncompression_dict, + CachableEntry* block_entry, bool is_index, + GetContext* get_context); + // For the following two functions: // if `no_io == true`, we will not try to read filter/index from sst file // were they not present in cache yet. @@ -305,7 +299,6 @@ class BlockBasedTable : public TableReader { InternalIteratorBase* NewIndexIterator( const ReadOptions& read_options, bool need_upper_bound_check = false, IndexBlockIter* input_iter = nullptr, - CachableEntry* index_entry = nullptr, GetContext* get_context = nullptr); // Read block cache from block caches (if set): block_cache and @@ -316,7 +309,7 @@ class BlockBasedTable : public TableReader { // dictionary. static Status GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, - Cache* block_cache, Cache* block_cache_compressed, Rep* rep, + Cache* block_cache, Cache* block_cache_compressed, const Rep* rep, const ReadOptions& read_options, CachableEntry* block, const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit, bool is_index = false, @@ -359,9 +352,9 @@ class BlockBasedTable : public TableReader { // need to access extra meta blocks for index construction. This parameter // helps avoid re-reading meta index block if caller already created one. Status CreateIndexReader( - FilePrefetchBuffer* prefetch_buffer, IndexReader** index_reader, - InternalIterator* preloaded_meta_index_iter = nullptr, - const int level = -1); + FilePrefetchBuffer* prefetch_buffer, + InternalIterator* preloaded_meta_index_iter, bool use_cache, + bool prefetch, bool pin, IndexReader** index_reader); bool FullFilterKeyMayMatch( const ReadOptions& read_options, FilterBlockReader* filter, @@ -398,9 +391,8 @@ class BlockBasedTable : public TableReader { static Status PrefetchIndexAndFilterBlocks( Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, BlockBasedTable* new_table, - const SliceTransform* prefix_extractor, bool prefetch_all, - const BlockBasedTableOptions& table_options, const int level, - const bool prefetch_index_and_filter_in_cache); + bool prefetch_all, const BlockBasedTableOptions& table_options, + const int level); Status VerifyChecksumInMetaBlocks(InternalIteratorBase* index_iter); Status VerifyChecksumInBlocks(InternalIteratorBase* index_iter); @@ -411,7 +403,7 @@ class BlockBasedTable : public TableReader { const bool is_a_filter_partition, const SliceTransform* prefix_extractor = nullptr) const; - static void SetupCacheKeyPrefix(Rep* rep, uint64_t file_size); + static void SetupCacheKeyPrefix(Rep* rep); // Generate a cache key prefix from the file static void GenerateCachePrefix(Cache* cc, RandomAccessFile* file, @@ -486,18 +478,21 @@ struct BlockBasedTable::Rep { size_t persistent_cache_key_prefix_size = 0; char compressed_cache_key_prefix[kMaxCacheKeyPrefixSize]; size_t compressed_cache_key_prefix_size = 0; - uint64_t dummy_index_reader_offset = - 0; // ID that is unique for the block cache. PersistentCacheOptions persistent_cache_options; // Footer contains the fixed table information Footer footer; - // `index_reader`, `filter`, and `uncompression_dict` will be populated (i.e., - // non-nullptr) and used only when options.block_cache is nullptr or when - // `cache_index_and_filter_blocks == false`. Otherwise, we will get the index, - // filter, and compression dictionary blocks via the block cache. In that case - // `dummy_index_reader_offset`, `filter_handle`, and `compression_dict_handle` - // are used to lookup these meta-blocks in block cache. + // `filter` and `uncompression_dict` will be populated (i.e., non-nullptr) + // and used only when options.block_cache is nullptr or when + // `cache_index_and_filter_blocks == false`. Otherwise, we will get the + // filter and compression dictionary blocks via the block cache. In that case, + // `filter_handle`, and `compression_dict_handle` are used to lookup these + // meta-blocks in block cache. + // + // Note: the IndexReader object is always stored in this member variable; + // the index block itself, however, may or may not be in the block cache + // based on the settings above. We plan to change the handling of the + // filter and compression dictionary similarly. std::unique_ptr index_reader; std::unique_ptr filter; std::unique_ptr uncompression_dict; @@ -526,12 +521,11 @@ struct BlockBasedTable::Rep { // only used in level 0 files when pin_l0_filter_and_index_blocks_in_cache is // true or in all levels when pin_top_level_index_and_filter is set in - // combination with partitioned index/filters: then we do use the LRU cache, - // but we always keep the filter & index block's handle checked out here (=we + // combination with partitioned filters: then we do use the LRU cache, + // but we always keep the filter block's handle checked out here (=we // don't call Release()), plus the parsed out objects the LRU cache will never // push flush them out, hence they're pinned CachableEntry filter_entry; - CachableEntry index_entry; std::shared_ptr fragmented_range_dels; // If global_seqno is used, all Keys in this file will have the same diff --git a/table/table_test.cc b/table/table_test.cc index dccc49194..aeb66f8d3 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1993,7 +1993,7 @@ TEST_P(BlockBasedTableTest, BlockCacheDisabledTest) { // preloading filter/index blocks is enabled. auto reader = dynamic_cast(c.GetTableReader()); ASSERT_TRUE(reader->TEST_filter_block_preloaded()); - ASSERT_TRUE(reader->TEST_index_reader_preloaded()); + ASSERT_FALSE(reader->TEST_IndexBlockInCache()); { // nothing happens in the beginning @@ -2040,7 +2040,7 @@ TEST_P(BlockBasedTableTest, FilterBlockInBlockCache) { // preloading filter/index blocks is prohibited. auto* reader = dynamic_cast(c.GetTableReader()); ASSERT_TRUE(!reader->TEST_filter_block_preloaded()); - ASSERT_TRUE(!reader->TEST_index_reader_preloaded()); + ASSERT_TRUE(reader->TEST_IndexBlockInCache()); // -- PART 1: Open with regular block cache. // Since block_cache is disabled, no cache activities will be involved. @@ -2612,69 +2612,6 @@ TEST_P(BlockBasedTableTest, MemoryAllocator) { EXPECT_GT(custom_memory_allocator->numAllocations.load(), 0); } -TEST_P(BlockBasedTableTest, NewIndexIteratorLeak) { - // A regression test to avoid data race described in - // https://github.com/facebook/rocksdb/issues/1267 - TableConstructor c(BytewiseComparator(), true /* convert_to_internal_key_ */); - std::vector keys; - stl_wrappers::KVMap kvmap; - c.Add("a1", "val1"); - Options options; - options.prefix_extractor.reset(NewFixedPrefixTransform(1)); - BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); - table_options.index_type = BlockBasedTableOptions::kHashSearch; - table_options.cache_index_and_filter_blocks = true; - table_options.block_cache = NewLRUCache(0); - options.table_factory.reset(NewBlockBasedTableFactory(table_options)); - const ImmutableCFOptions ioptions(options); - const MutableCFOptions moptions(options); - c.Finish(options, ioptions, moptions, table_options, - GetPlainInternalComparator(options.comparator), &keys, &kvmap); - - rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers( - { - {"BlockBasedTable::NewIndexIterator::thread1:1", - "BlockBasedTable::NewIndexIterator::thread2:2"}, - {"BlockBasedTable::NewIndexIterator::thread2:3", - "BlockBasedTable::NewIndexIterator::thread1:4"}, - }, - { - {"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker", - "BlockBasedTable::NewIndexIterator::thread1:1"}, - {"BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker", - "BlockBasedTable::NewIndexIterator::thread1:4"}, - {"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker", - "BlockBasedTable::NewIndexIterator::thread2:2"}, - {"BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker", - "BlockBasedTable::NewIndexIterator::thread2:3"}, - }); - - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - ReadOptions ro; - auto* reader = c.GetTableReader(); - - std::function func1 = [&]() { - TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread1Marker"); - // TODO(Zhongyi): update test to use MutableCFOptions - std::unique_ptr iter( - reader->NewIterator(ro, moptions.prefix_extractor.get())); - iter->Seek(InternalKey("a1", 0, kTypeValue).Encode()); - }; - - std::function func2 = [&]() { - TEST_SYNC_POINT("BlockBasedTableTest::NewIndexIteratorLeak:Thread2Marker"); - std::unique_ptr iter( - reader->NewIterator(ro, moptions.prefix_extractor.get())); - }; - - auto thread1 = port::Thread(func1); - auto thread2 = port::Thread(func2); - thread1.join(); - thread2.join(); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - c.ResetTableReader(); -} - // Plain table is not supported in ROCKSDB_LITE #ifndef ROCKSDB_LITE TEST_F(PlainTableTest, BasicPlainTableProperties) {