diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 514587d0b..1dc220dde 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -112,12 +112,6 @@ inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock( : nullptr; } -// Delete the resource that is held by the iterator. -template -void DeleteHeldResource(void* arg, void* /*ignored*/) { - delete reinterpret_cast(arg); -} - // Delete the entry resided in the cache. template void DeleteCachedEntry(const Slice& /*key*/, void* value) { @@ -224,7 +218,7 @@ bool PrefixExtractorChanged(const TableProperties* table_properties, } // namespace // Index that allows binary search lookup in a two-level index structure. -class PartitionIndexReader : public IndexReader, public Cleanable { +class PartitionIndexReader : public IndexReader { public: // Read the partition index from the file and create an instance for // `PartitionIndexReader`. @@ -332,10 +326,9 @@ class PartitionIndexReader : public IndexReader, public Cleanable { // After prefetch, read the partitions one by one biter.SeekToFirst(); auto ro = ReadOptions(); - Cache* block_cache = rep->table_options.block_cache.get(); for (; biter.Valid(); biter.Next()) { handle = biter.value(); - BlockBasedTable::CachableEntry block; + CachableEntry block; const bool is_index = true; // TODO: Support counter batch update for partitioned index and // filter blocks @@ -344,18 +337,12 @@ class PartitionIndexReader : public IndexReader, public Cleanable { UncompressionDict::GetEmptyDict(), &block, is_index, nullptr /* get_context */); - assert(s.ok() || block.value == nullptr); - if (s.ok() && block.value != nullptr) { - if (block.cache_handle != nullptr) { + assert(s.ok() || block.GetValue() == nullptr); + if (s.ok() && block.GetValue() != nullptr) { + if (block.IsCached()) { if (pin) { - partition_map_[handle.offset()] = block; - RegisterCleanup(&ReleaseCachedEntry, block_cache, - block.cache_handle); - } else { - block_cache->Release(block.cache_handle); + partition_map_[handle.offset()] = std::move(block); } - } else { - delete block.value; } } } @@ -391,8 +378,7 @@ class PartitionIndexReader : public IndexReader, public Cleanable { } BlockBasedTable* table_; std::unique_ptr index_block_; - std::unordered_map> - partition_map_; + std::unordered_map> partition_map_; const bool index_key_includes_seq_; const bool index_value_is_full_; }; @@ -1221,14 +1207,12 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( // This is the first call to NewIndexIterator() since we're in Open(). // On success it should give us ownership of the `CachableEntry` by // populating `index_entry`. - assert(index_entry.value != nullptr); + assert(index_entry.GetValue() != nullptr); if (prefetch_all) { - index_entry.value->CacheDependencies(pin_all); + index_entry.GetValue()->CacheDependencies(pin_all); } if (pin_index) { rep->index_entry = std::move(index_entry); - } else { - index_entry.Release(table_options.block_cache.get()); } } } @@ -1236,17 +1220,15 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( // Hack: Call GetFilter() to implicitly add filter to the block_cache auto filter_entry = new_table->GetFilter(rep->table_prefix_extractor.get()); - if (filter_entry.value != nullptr && prefetch_all) { - filter_entry.value->CacheDependencies( + if (filter_entry.GetValue() != nullptr && prefetch_all) { + filter_entry.GetValue()->CacheDependencies( pin_all, rep->table_prefix_extractor.get()); } // if pin_filter is true then save it in rep_->filter_entry; it will be // released in the destructor only, hence it will be pinned in the // cache while this reader is alive if (pin_filter) { - rep->filter_entry = filter_entry; - } else { - filter_entry.Release(table_options.block_cache.get()); + rep->filter_entry = std::move(filter_entry); } } } else { @@ -1369,10 +1351,13 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, Status BlockBasedTable::GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, Rep* rep, - const ReadOptions& read_options, - BlockBasedTable::CachableEntry* block, + const ReadOptions& read_options, CachableEntry* block, const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit, bool is_index, GetContext* get_context) { + + assert(block); + assert(block->IsEmpty()); + Status s; BlockContents* compressed_block = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr; @@ -1380,7 +1365,7 @@ Status BlockBasedTable::GetDataBlockFromCache( // Lookup uncompressed cache first if (block_cache != nullptr) { - block->cache_handle = GetEntryFromCache( + auto cache_handle = GetEntryFromCache( block_cache, block_cache_key, rep->level, is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS, is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, @@ -1393,15 +1378,16 @@ Status BlockBasedTable::GetDataBlockFromCache( : &get_context->get_context_stats_.num_cache_data_hit) : nullptr, statistics, get_context); - if (block->cache_handle != nullptr) { - block->value = - reinterpret_cast(block_cache->Value(block->cache_handle)); + if (cache_handle != nullptr) { + block->SetCachedValue( + reinterpret_cast(block_cache->Value(cache_handle)), + block_cache, cache_handle); return s; } } // If not found, search from the compressed block cache. - assert(block->cache_handle == nullptr && block->value == nullptr); + assert(block->IsEmpty()); if (block_cache_compressed == nullptr) { return s; @@ -1435,20 +1421,25 @@ Status BlockBasedTable::GetDataBlockFromCache( // Insert uncompressed block into block cache if (s.ok()) { - block->value = - new Block(std::move(contents), rep->get_global_seqno(is_index), - read_amp_bytes_per_bit, - statistics); // uncompressed block - if (block_cache != nullptr && block->value->own_bytes() && + std::unique_ptr block_holder( + new Block(std::move(contents), rep->get_global_seqno(is_index), + read_amp_bytes_per_bit, statistics)); // uncompressed block + + if (block_cache != nullptr && block_holder->own_bytes() && read_options.fill_cache) { - size_t charge = block->value->ApproximateMemoryUsage(); - s = block_cache->Insert(block_cache_key, block->value, charge, + size_t charge = block_holder->ApproximateMemoryUsage(); + Cache::Handle* cache_handle = nullptr; + s = block_cache->Insert(block_cache_key, block_holder.get(), charge, &DeleteCachedEntry, - &(block->cache_handle)); + &cache_handle); #ifndef NDEBUG block_cache->TEST_mark_as_data_block(block_cache_key, charge); #endif // NDEBUG if (s.ok()) { + assert(cache_handle != nullptr); + block->SetCachedValue(block_holder.release(), block_cache, + cache_handle); + if (get_context != nullptr) { get_context->get_context_stats_.num_cache_add++; get_context->get_context_stats_.num_cache_bytes_write += charge; @@ -1477,9 +1468,9 @@ Status BlockBasedTable::GetDataBlockFromCache( } } else { RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); - delete block->value; - block->value = nullptr; } + } else { + block->SetOwnedValue(block_holder.release()); } } @@ -1497,33 +1488,34 @@ Status BlockBasedTable::PutDataBlockToCache( const UncompressionDict& uncompression_dict, SequenceNumber seq_no, size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator, bool is_index, Cache::Priority priority, GetContext* get_context) { + + assert(cached_block); + assert(cached_block->IsEmpty()); assert(raw_block_comp_type == kNoCompression || block_cache_compressed != nullptr); Status s; - // Retrieve the uncompressed contents into a new buffer - BlockContents uncompressed_block_contents; Statistics* statistics = ioptions.statistics; + + std::unique_ptr block_holder; if (raw_block_comp_type != kNoCompression) { + // Retrieve the uncompressed contents into a new buffer + BlockContents uncompressed_block_contents; UncompressionContext context(raw_block_comp_type); UncompressionInfo info(context, uncompression_dict, raw_block_comp_type); s = UncompressBlockContents(info, raw_block_contents->data.data(), raw_block_contents->data.size(), &uncompressed_block_contents, format_version, ioptions, memory_allocator); - } - if (!s.ok()) { - return s; - } + if (!s.ok()) { + return s; + } - if (raw_block_comp_type != kNoCompression) { - cached_block->value = new Block(std::move(uncompressed_block_contents), - seq_no, read_amp_bytes_per_bit, - statistics); // uncompressed block + block_holder.reset(new Block(std::move(uncompressed_block_contents), seq_no, + read_amp_bytes_per_bit, statistics)); } else { - cached_block->value = - new Block(std::move(*raw_block_contents), seq_no, - read_amp_bytes_per_bit, ioptions.statistics); + block_holder.reset(new Block(std::move(*raw_block_contents), seq_no, + read_amp_bytes_per_bit, statistics)); } // Insert compressed block into compressed block cache. @@ -1553,16 +1545,20 @@ Status BlockBasedTable::PutDataBlockToCache( } // insert into uncompressed block cache - if (block_cache != nullptr && cached_block->value->own_bytes()) { - size_t charge = cached_block->value->ApproximateMemoryUsage(); - s = block_cache->Insert(block_cache_key, cached_block->value, charge, + if (block_cache != nullptr && block_holder->own_bytes()) { + size_t charge = block_holder->ApproximateMemoryUsage(); + Cache::Handle* cache_handle = nullptr; + s = block_cache->Insert(block_cache_key, block_holder.get(), charge, &DeleteCachedEntry, - &(cached_block->cache_handle), priority); + &cache_handle, priority); #ifndef NDEBUG block_cache->TEST_mark_as_data_block(block_cache_key, charge); #endif // NDEBUG if (s.ok()) { - assert(cached_block->cache_handle != nullptr); + assert(cache_handle != nullptr); + cached_block->SetCachedValue(block_holder.release(), block_cache, + cache_handle); + if (get_context != nullptr) { get_context->get_context_stats_.num_cache_add++; get_context->get_context_stats_.num_cache_bytes_write += charge; @@ -1589,12 +1585,12 @@ Status BlockBasedTable::PutDataBlockToCache( } } assert(reinterpret_cast(block_cache->Value( - cached_block->cache_handle)) == cached_block->value); + cached_block->GetCacheHandle())) == cached_block->GetValue()); } else { RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); - delete cached_block->value; - cached_block->value = nullptr; } + } else { + cached_block->SetOwnedValue(block_holder.release()); } return s; @@ -1668,7 +1664,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter( } } -BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( +CachableEntry BlockBasedTable::GetFilter( const SliceTransform* prefix_extractor, FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context) const { const BlockHandle& filter_blk_handle = rep_->filter_handle; @@ -1677,7 +1673,7 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( no_io, get_context, prefix_extractor); } -BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( +CachableEntry BlockBasedTable::GetFilter( FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle, const bool is_a_filter_partition, bool no_io, GetContext* get_context, const SliceTransform* prefix_extractor) const { @@ -1687,17 +1683,19 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( // most probably fail again. if (!is_a_filter_partition && !rep_->table_options.cache_index_and_filter_blocks) { - return {rep_->filter.get(), nullptr /* cache handle */}; + return {rep_->filter.get(), nullptr /* cache */, + nullptr /* cache_handle */, false /* own_value */}; } Cache* block_cache = rep_->table_options.block_cache.get(); if (rep_->filter_policy == nullptr /* do not use filter */ || block_cache == nullptr /* no block cache at all */) { - return {nullptr /* filter */, nullptr /* cache handle */}; + return CachableEntry(); } - if (!is_a_filter_partition && rep_->filter_entry.IsSet()) { - return rep_->filter_entry; + if (!is_a_filter_partition && rep_->filter_entry.IsCached()) { + return {rep_->filter_entry.GetValue(), nullptr /* cache */, + nullptr /* cache_handle */, false /* own_value */}; } PERF_TIMER_GUARD(read_filter_block_nanos); @@ -1708,7 +1706,7 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( filter_blk_handle, cache_key); Statistics* statistics = rep_->ioptions.statistics; - auto cache_handle = GetEntryFromCache( + Cache::Handle* cache_handle = GetEntryFromCache( block_cache, key, rep_->level, BLOCK_CACHE_FILTER_MISS, BLOCK_CACHE_FILTER_HIT, get_context ? &get_context->get_context_stats_.num_cache_filter_miss @@ -1757,20 +1755,22 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( } } - return {filter, cache_handle}; + return {filter, cache_handle ? block_cache : nullptr, cache_handle, + false /* own_value */}; } -BlockBasedTable::CachableEntry +CachableEntry BlockBasedTable::GetUncompressionDict(Rep* rep, FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context) { if (!rep->table_options.cache_index_and_filter_blocks) { // block cache is either disabled or not used for meta-blocks. In either // case, BlockBasedTableReader is the owner of the uncompression dictionary. - return {rep->uncompression_dict.get(), nullptr /* cache handle */}; + return {rep->uncompression_dict.get(), nullptr /* cache */, + nullptr /* cache_handle */, false /* own_value */}; } if (rep->compression_dict_handle.IsNull()) { - return {nullptr, nullptr}; + return CachableEntry(); } char cache_key_buf[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; auto cache_key = @@ -1835,7 +1835,8 @@ BlockBasedTable::GetUncompressionDict(Rep* rep, assert(cache_handle == nullptr); } } - return {dict, cache_handle}; + return {dict, cache_handle ? rep->table_options.block_cache.get() : nullptr, + cache_handle, false /* own_value */}; } // disable_prefix_seek should be set to true when prefix_extractor found in SST @@ -1853,10 +1854,10 @@ InternalIteratorBase* BlockBasedTable::NewIndexIterator( read_options.fill_cache); } // we have a pinned index block - if (rep_->index_entry.IsSet()) { + if (rep_->index_entry.IsCached()) { // We don't return pinned datat from index blocks, so no need // to set `block_contents_pinned`. - return rep_->index_entry.value->NewIterator( + return rep_->index_entry.GetValue()->NewIterator( input_iter, read_options.total_order_seek || disable_prefix_seek, read_options.fill_cache); } @@ -1948,7 +1949,8 @@ InternalIteratorBase* BlockBasedTable::NewIndexIterator( // the caller would like to take ownership of the index block // don't call RegisterCleanup() in this case, the caller will take care of it if (index_entry != nullptr) { - *index_entry = {index_reader, cache_handle}; + *index_entry = {index_reader, block_cache, cache_handle, + false /* own_value */}; } else { iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle); } @@ -1976,9 +1978,9 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( auto uncompression_dict_storage = GetUncompressionDict(rep, prefetch_buffer, no_io, get_context); const UncompressionDict& uncompression_dict = - uncompression_dict_storage.value == nullptr + uncompression_dict_storage.GetValue() == nullptr ? UncompressionDict::GetEmptyDict() - : *uncompression_dict_storage.value; + : *uncompression_dict_storage.GetValue(); if (s.ok()) { s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle, uncompression_dict, &block, is_index, @@ -1991,7 +1993,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( iter = new TBlockIter; } // Didn't get any data from block caches. - if (s.ok() && block.value == nullptr) { + if (s.ok() && block.GetValue() == nullptr) { if (no_io) { // Could not read from block_cache and can't do IO iter->Invalidate(Status::Incomplete("no blocking io")); @@ -2012,16 +2014,15 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( GetMemoryAllocator(rep->table_options)); } if (s.ok()) { - block.value = block_value.release(); + block.SetOwnedValue(block_value.release()); } } // TODO(ajkr): also pin compression dictionary block when // `pin_l0_filter_and_index_blocks_in_cache == true`. - uncompression_dict_storage.Release(block_cache); } if (s.ok()) { - assert(block.value != nullptr); + assert(block.GetValue() != nullptr); const bool kTotalOrderSeek = true; // Block contents are pinned and it is still pinned after the iterator // is destroyed as long as cleanup functions are moved to another object, @@ -2031,16 +2032,13 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( // not reading data from the original source, whether immortal or not. // Otherwise, the block is pinned iff the source is immortal. bool block_contents_pinned = - (block.cache_handle != nullptr || - (!block.value->own_bytes() && rep->immortal_table)); - iter = block.value->NewIterator( + (block.IsCached() || + (!block.GetValue()->own_bytes() && rep->immortal_table)); + iter = block.GetValue()->NewIterator( &rep->internal_comparator, rep->internal_comparator.user_comparator(), iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq, index_key_is_full, block_contents_pinned); - if (block.cache_handle != nullptr) { - iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, - block.cache_handle); - } else { + if (!block.IsCached()) { if (!ro.fill_cache && rep->cache_key_prefix_size != 0) { // insert a dummy record to block cache to track the memory usage Cache::Handle* cache_handle; @@ -2063,8 +2061,8 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( Slice unique_key = Slice(cache_key, static_cast(end - cache_key)); s = block_cache->Insert(unique_key, nullptr, - block.value->ApproximateMemoryUsage(), nullptr, - &cache_handle); + block.GetValue()->ApproximateMemoryUsage(), + nullptr, &cache_handle); if (s.ok()) { if (cache_handle != nullptr) { iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache, @@ -2072,10 +2070,11 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( } } } - iter->RegisterCleanup(&DeleteHeldResource, block.value, nullptr); } + + block.TransferTo(iter); } else { - assert(block.value == nullptr); + assert(block.GetValue() == nullptr); iter->Invalidate(s); } return iter; @@ -2122,7 +2121,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( // Can't find the block from the cache. If I/O is allowed, read from the // file. - if (block_entry->value == nullptr && !no_io && ro.fill_cache) { + if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) { Statistics* statistics = rep->ioptions.statistics; bool do_decompress = block_cache_compressed == nullptr && rep->blocks_maybe_compressed; @@ -2159,7 +2158,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( } } } - assert(s.ok() || block_entry->value == nullptr); + assert(s.ok() || block_entry->GetValue() == nullptr); return s; } @@ -2187,11 +2186,11 @@ BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator( Cache* block_cache = rep->table_options.block_cache.get(); assert(block_cache); RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ, - block_cache->GetUsage(block->second.cache_handle)); + block_cache->GetUsage(block->second.GetCacheHandle())); Statistics* kNullStats = nullptr; // We don't return pinned datat from index blocks, so no need // to set `block_contents_pinned`. - return block->second.value->NewIterator( + return block->second.GetValue()->NewIterator( &rep->internal_comparator, rep->internal_comparator.user_comparator(), nullptr, kNullStats, true, index_key_includes_seq_, index_key_is_full_); } @@ -2239,7 +2238,7 @@ bool BlockBasedTable::PrefixMayMatch( // First, try check with full filter auto filter_entry = GetFilter(prefix_extractor); - FilterBlockReader* filter = filter_entry.value; + FilterBlockReader* filter = filter_entry.GetValue(); bool filter_checked = true; if (filter != nullptr) { if (!filter->IsBlockBased()) { @@ -2251,9 +2250,6 @@ bool BlockBasedTable::PrefixMayMatch( } else { // if prefix_extractor changed for block based filter, skip filter if (need_upper_bound_check) { - if (!rep_->filter_entry.IsSet()) { - filter_entry.Release(rep_->table_options.block_cache.get()); - } return true; } auto prefix = prefix_extractor->Transform(user_key); @@ -2317,12 +2313,6 @@ bool BlockBasedTable::PrefixMayMatch( } } - // if rep_->filter_entry is not set, we should call Release(); otherwise - // don't call, in this case we have a local copy in rep_->filter_entry, - // it's pinned to the cache and will be released in the destructor - if (!rep_->filter_entry.IsSet()) { - filter_entry.Release(rep_->table_options.block_cache.get()); - } return may_match; } @@ -2734,7 +2724,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr, read_options.read_tier == kBlockCacheTier, get_context); } - filter = filter_entry.value; + filter = filter_entry.GetValue(); // First check the full filter // If full filter not useful, Then go into each block @@ -2838,12 +2828,6 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, } } - // if rep_->filter_entry is not set, we should call Release(); otherwise - // don't call, in this case we have a local copy in rep_->filter_entry, - // it's pinned to the cache and will be released in the destructor - if (!rep_->filter_entry.IsSet()) { - filter_entry.Release(rep_->table_options.block_cache.get()); - } return s; } @@ -2864,7 +2848,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, read_options.read_tier == kBlockCacheTier, nullptr /*get_context*/); } - filter = filter_entry.value; + filter = filter_entry.GetValue(); // First check the full filter // If full filter not useful, Then go into each block @@ -2954,13 +2938,6 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options, *(miter->s) = s; } } - - // if rep_->filter_entry is not set, we should call Release(); otherwise - // don't call, in this case we have a local copy in rep_->filter_entry, - // it's pinned to the cache and will be released in the destructor - if (!rep_->filter_entry.IsSet()) { - filter_entry.Release(rep_->table_options.block_cache.get()); - } } Status BlockBasedTable::Prefetch(const Slice* const begin, @@ -3144,11 +3121,7 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, UncompressionDict::GetEmptyDict(), 0 /* read_amp_bytes_per_bit */); } assert(s.ok()); - bool in_cache = block.value != nullptr; - if (in_cache) { - ReleaseCachedEntry(block_cache, block.cache_handle); - } - return in_cache; + return block.IsCached(); } BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() { @@ -3494,9 +3467,6 @@ void BlockBasedTable::Close() { Cache* const cache = rep_->table_options.block_cache.get(); - rep_->filter_entry.Release(cache); - rep_->index_entry.Release(cache); - // cleanup index, filter, and compression dictionary blocks // to avoid accessing dangling pointers if (!rep_->table_options.no_block_cache) { diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 74d2caeb2..385e50ab7 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -25,6 +25,7 @@ #include "rocksdb/table.h" #include "table/block.h" #include "table/block_based_table_factory.h" +#include "table/cachable_entry.h" #include "table/filter_block.h" #include "table/format.h" #include "table/get_context.h" @@ -220,8 +221,6 @@ class BlockBasedTable : public TableReader { // The key retrieved are internal keys. Status GetKVPairsFromDataBlocks(std::vector* kv_pair_blocks); - template - struct CachableEntry; struct Rep; Rep* get_rep() { return rep_; } @@ -311,8 +310,7 @@ class BlockBasedTable : public TableReader { const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, Rep* rep, const ReadOptions& read_options, - BlockBasedTable::CachableEntry* block, - const UncompressionDict& uncompression_dict, + CachableEntry* block, const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit, bool is_index = false, GetContext* get_context = nullptr); @@ -446,29 +444,6 @@ class BlockBasedTable::PartitionedIndexIteratorState bool index_key_is_full_; }; -// CachableEntry represents the entries that *may* be fetched from block cache. -// field `value` is the item we want to get. -// field `cache_handle` is the cache handle to the block cache. If the value -// was not read from cache, `cache_handle` will be nullptr. -template -struct BlockBasedTable::CachableEntry { - CachableEntry(TValue* _value, Cache::Handle* _cache_handle) - : value(_value), cache_handle(_cache_handle) {} - CachableEntry() : CachableEntry(nullptr, nullptr) {} - void Release(Cache* cache, bool force_erase = false) { - if (cache_handle) { - cache->Release(cache_handle, force_erase); - value = nullptr; - cache_handle = nullptr; - } - } - bool IsSet() const { return cache_handle != nullptr; } - - TValue* value = nullptr; - // if the entry is from the cache, cache_handle will be populated. - Cache::Handle* cache_handle = nullptr; -}; - struct BlockBasedTable::Rep { Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options, const BlockBasedTableOptions& _table_opt, diff --git a/table/cachable_entry.h b/table/cachable_entry.h new file mode 100644 index 000000000..5b5d16ef3 --- /dev/null +++ b/table/cachable_entry.h @@ -0,0 +1,219 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include "rocksdb/cache.h" +#include "rocksdb/cleanable.h" + +namespace rocksdb { + +// CachableEntry is a handle to an object that may or may not be in the block +// cache. It is used in a variety of ways: +// +// 1) It may refer to an object in the block cache. In this case, cache_ and +// cache_handle_ are not nullptr, and the cache handle has to be released when +// the CachableEntry is destroyed (the lifecycle of the cached object, on the +// other hand, is managed by the cache itself). +// 2) It may uniquely own the (non-cached) object it refers to (examples include +// a block read directly from file, or uncompressed blocks when there is a +// compressed block cache but no uncompressed block cache). In such cases, the +// object has to be destroyed when the CachableEntry is destroyed. +// 3) It may point to an object (cached or not) without owning it. In this case, +// no action is needed when the CachableEntry is destroyed. +// 4) Sometimes, management of a cached or owned object (see #1 and #2 above) +// is transferred to some other object. This is used for instance with iterators +// (where cleanup is performed using a chain of cleanup functions, +// see Cleanable). +// +// Because of #1 and #2 above, copying a CachableEntry is not safe (and thus not +// allowed); hence, this is a move-only type, where a move transfers the +// management responsibilities, and leaves the source object in an empty state. + +template +class CachableEntry { +public: + CachableEntry() = default; + + CachableEntry(T* value, Cache* cache, Cache::Handle* cache_handle, + bool own_value) + : value_(value) + , cache_(cache) + , cache_handle_(cache_handle) + , own_value_(own_value) + { + assert(value_ != nullptr || + (cache_ == nullptr && cache_handle_ == nullptr && !own_value_)); + assert(!!cache_ == !!cache_handle_); + assert(!cache_handle_ || !own_value_); + } + + CachableEntry(const CachableEntry&) = delete; + CachableEntry& operator=(const CachableEntry&) = delete; + + CachableEntry(CachableEntry&& rhs) + : value_(rhs.value_) + , cache_(rhs.cache_) + , cache_handle_(rhs.cache_handle_) + , own_value_(rhs.own_value_) + { + assert(value_ != nullptr || + (cache_ == nullptr && cache_handle_ == nullptr && !own_value_)); + assert(!!cache_ == !!cache_handle_); + assert(!cache_handle_ || !own_value_); + + rhs.ResetFields(); + } + + CachableEntry& operator=(CachableEntry&& rhs) { + if (UNLIKELY(this == &rhs)) { + return *this; + } + + ReleaseResource(); + + value_ = rhs.value_; + cache_ = rhs.cache_; + cache_handle_ = rhs.cache_handle_; + own_value_ = rhs.own_value_; + + assert(value_ != nullptr || + (cache_ == nullptr && cache_handle_ == nullptr && !own_value_)); + assert(!!cache_ == !!cache_handle_); + assert(!cache_handle_ || !own_value_); + + rhs.ResetFields(); + + return *this; + } + + ~CachableEntry() { + ReleaseResource(); + } + + bool IsEmpty() const { + return value_ == nullptr && cache_ == nullptr && cache_handle_ == nullptr && + !own_value_; + } + + bool IsCached() const { + assert(!!cache_ == !!cache_handle_); + + return cache_handle_ != nullptr; + } + + T* GetValue() const { return value_; } + Cache* GetCache() const { return cache_; } + Cache::Handle* GetCacheHandle() const { return cache_handle_; } + bool GetOwnValue() const { return own_value_; } + + void Reset() { + ReleaseResource(); + ResetFields(); + } + + void TransferTo(Cleanable* cleanable) { + if (cleanable) { + if (cache_handle_ != nullptr) { + assert(cache_ != nullptr); + cleanable->RegisterCleanup(&ReleaseCacheHandle, cache_, cache_handle_); + } else if (own_value_) { + cleanable->RegisterCleanup(&DeleteValue, value_, nullptr); + } + } + + ResetFields(); + } + + void SetOwnedValue(T* value) { + assert(value != nullptr); + + if (UNLIKELY(value_ == value && own_value_)) { + assert(cache_ == nullptr && cache_handle_ == nullptr); + return; + } + + Reset(); + + value_ = value; + own_value_ = true; + } + + void SetUnownedValue(T* value) { + assert(value != nullptr); + + if (UNLIKELY(value_ == value && cache_ == nullptr && + cache_handle_ == nullptr && !own_value_)) { + return; + } + + Reset(); + + value_ = value; + assert(!own_value_); + } + + void SetCachedValue(T* value, Cache* cache, Cache::Handle* cache_handle) { + assert(value != nullptr); + assert(cache != nullptr); + assert(cache_handle != nullptr); + + if (UNLIKELY(value_ == value && cache_ == cache && + cache_handle_ == cache_handle && !own_value_)) { + return; + } + + Reset(); + + value_ = value; + cache_ = cache; + cache_handle_ = cache_handle; + assert(!own_value_); + } + +private: + void ReleaseResource() { + if (LIKELY(cache_handle_ != nullptr)) { + assert(cache_ != nullptr); + cache_->Release(cache_handle_); + } else if (own_value_) { + delete value_; + } + } + + void ResetFields() { + value_ = nullptr; + cache_ = nullptr; + cache_handle_ = nullptr; + own_value_ = false; + } + + static void ReleaseCacheHandle(void* arg1, void* arg2) { + Cache* const cache = static_cast(arg1); + assert(cache); + + Cache::Handle* const cache_handle = static_cast(arg2); + assert(cache_handle); + + cache->Release(cache_handle); + } + + static void DeleteValue(void* arg1, void* /* arg2 */) { + delete static_cast(arg1); + } + +private: + T* value_ = nullptr; + Cache* cache_ = nullptr; + Cache::Handle* cache_handle_ = nullptr; + bool own_value_ = false; +}; + +} // namespace rocksdb diff --git a/table/partitioned_filter_block.cc b/table/partitioned_filter_block.cc index aab0f5509..3ccc79463 100644 --- a/table/partitioned_filter_block.cc +++ b/table/partitioned_filter_block.cc @@ -176,24 +176,14 @@ bool PartitionedFilterBlockReader::KeyMayMatch( if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range return false; } - bool cached = false; auto filter_partition = GetFilterPartition(nullptr /* prefetch_buffer */, filter_handle, no_io, - &cached, prefix_extractor); - if (UNLIKELY(!filter_partition.value)) { + prefix_extractor); + if (UNLIKELY(!filter_partition.GetValue())) { return true; } - auto res = filter_partition.value->KeyMayMatch(key, prefix_extractor, - block_offset, no_io); - if (cached) { - return res; - } - if (LIKELY(filter_partition.IsSet())) { - filter_partition.Release(table_->rep_->table_options.block_cache.get()); - } else { - delete filter_partition.value; - } - return res; + return filter_partition.GetValue()->KeyMayMatch(key, prefix_extractor, + block_offset, no_io); } bool PartitionedFilterBlockReader::PrefixMayMatch( @@ -215,24 +205,14 @@ bool PartitionedFilterBlockReader::PrefixMayMatch( if (UNLIKELY(filter_handle.size() == 0)) { // prefix is out of range return false; } - bool cached = false; auto filter_partition = GetFilterPartition(nullptr /* prefetch_buffer */, filter_handle, no_io, - &cached, prefix_extractor); - if (UNLIKELY(!filter_partition.value)) { + prefix_extractor); + if (UNLIKELY(!filter_partition.GetValue())) { return true; } - auto res = filter_partition.value->PrefixMayMatch(prefix, prefix_extractor, - kNotValid, no_io); - if (cached) { - return res; - } - if (LIKELY(filter_partition.IsSet())) { - filter_partition.Release(table_->rep_->table_options.block_cache.get()); - } else { - delete filter_partition.value; - } - return res; + return filter_partition.GetValue()->PrefixMayMatch(prefix, prefix_extractor, + kNotValid, no_io); } BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle( @@ -251,10 +231,10 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle( return fltr_blk_handle; } -BlockBasedTable::CachableEntry +CachableEntry PartitionedFilterBlockReader::GetFilterPartition( FilePrefetchBuffer* prefetch_buffer, BlockHandle& fltr_blk_handle, - const bool no_io, bool* cached, const SliceTransform* prefix_extractor) { + const bool no_io, const SliceTransform* prefix_extractor) { const bool is_a_filter_partition = true; auto block_cache = table_->rep_->table_options.block_cache.get(); if (LIKELY(block_cache != nullptr)) { @@ -267,9 +247,9 @@ PartitionedFilterBlockReader::GetFilterPartition( RecordTick(statistics(), BLOCK_CACHE_FILTER_HIT); RecordTick(statistics(), BLOCK_CACHE_HIT); RecordTick(statistics(), BLOCK_CACHE_BYTES_READ, - block_cache->GetUsage(iter->second.cache_handle)); - *cached = true; - return iter->second; + block_cache->GetUsage(iter->second.GetCacheHandle())); + return {iter->second.GetValue(), nullptr /* cache */, + nullptr /* cache_handle */, false /* own_value */}; } } return table_->GetFilter(/*prefetch_buffer*/ nullptr, fltr_blk_handle, @@ -278,7 +258,8 @@ PartitionedFilterBlockReader::GetFilterPartition( } else { auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle, is_a_filter_partition, prefix_extractor); - return {filter, nullptr}; + return {filter, nullptr /* cache */, nullptr /* cache_handle */, + true /* own_value */}; } } @@ -293,18 +274,10 @@ size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const { // TODO(myabandeh): better estimation for filter_map_ size } -// Release the cached entry and decrement its ref count. -void ReleaseFilterCachedEntry(void* arg, void* h) { - Cache* cache = reinterpret_cast(arg); - Cache::Handle* handle = reinterpret_cast(h); - cache->Release(handle); -} - // TODO(myabandeh): merge this with the same function in IndexReader void PartitionedFilterBlockReader::CacheDependencies( bool pin, const SliceTransform* prefix_extractor) { // Before read partitions, prefetch them to avoid lots of IOs - auto rep = table_->rep_; IndexBlockIter biter; Statistics* kNullStats = nullptr; idx_on_fltr_blk_->NewIterator( @@ -330,7 +303,6 @@ void PartitionedFilterBlockReader::CacheDependencies( // After prefetch, read the partitions one by one biter.SeekToFirst(); - Cache* block_cache = rep->table_options.block_cache.get(); for (; biter.Valid(); biter.Next()) { handle = biter.value(); const bool no_io = true; @@ -338,16 +310,10 @@ void PartitionedFilterBlockReader::CacheDependencies( auto filter = table_->GetFilter( prefetch_buffer.get(), handle, is_a_filter_partition, !no_io, /* get_context */ nullptr, prefix_extractor); - if (LIKELY(filter.IsSet())) { + if (LIKELY(filter.IsCached())) { if (pin) { filter_map_[handle.offset()] = std::move(filter); - RegisterCleanup(&ReleaseFilterCachedEntry, block_cache, - filter.cache_handle); - } else { - block_cache->Release(filter.cache_handle); } - } else { - delete filter.value; } } } diff --git a/table/partitioned_filter_block.h b/table/partitioned_filter_block.h index 5d55da544..2563dd2bf 100644 --- a/table/partitioned_filter_block.h +++ b/table/partitioned_filter_block.h @@ -15,6 +15,7 @@ #include "table/block.h" #include "table/block_based_table_reader.h" +#include "table/cachable_entry.h" #include "table/full_filter_block.h" #include "table/index_builder.h" #include "util/autovector.h" @@ -69,8 +70,7 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder { BlockHandle last_encoded_handle_; }; -class PartitionedFilterBlockReader : public FilterBlockReader, - public Cleanable { +class PartitionedFilterBlockReader : public FilterBlockReader { public: explicit PartitionedFilterBlockReader( const SliceTransform* prefix_extractor, bool whole_key_filtering, @@ -93,10 +93,9 @@ class PartitionedFilterBlockReader : public FilterBlockReader, private: BlockHandle GetFilterPartitionHandle(const Slice& entry); - BlockBasedTable::CachableEntry GetFilterPartition( + CachableEntry GetFilterPartition( FilePrefetchBuffer* prefetch_buffer, BlockHandle& handle, - const bool no_io, bool* cached, - const SliceTransform* prefix_extractor = nullptr); + const bool no_io, const SliceTransform* prefix_extractor = nullptr); virtual void CacheDependencies( bool bin, const SliceTransform* prefix_extractor) override; @@ -106,9 +105,7 @@ class PartitionedFilterBlockReader : public FilterBlockReader, const BlockBasedTable* table_; const bool index_key_includes_seq_; const bool index_value_is_full_; - std::unordered_map> - filter_map_; + std::unordered_map> filter_map_; }; } // namespace rocksdb diff --git a/table/partitioned_filter_block_test.cc b/table/partitioned_filter_block_test.cc index 8068f14d8..8afa530d7 100644 --- a/table/partitioned_filter_block_test.cc +++ b/table/partitioned_filter_block_test.cc @@ -35,7 +35,8 @@ class MockedBlockBasedTable : public BlockBasedTable { auto obj = new FullFilterBlockReader( prefix_extractor, true, BlockContents(slice), rep_->table_options.filter_policy->GetFilterBitsReader(slice), nullptr); - return {obj, nullptr}; + return {obj, nullptr /* cache */, nullptr /* cache_handle */, + true /* own_value */}; } FilterBlockReader* ReadFilter(