Turn CachableEntry into a proper resource handle (#5252)

Summary:
CachableEntry is used in a variety of contexts: it may refer to a cached
object (i.e. an object in the block cache), an owned object, or an
unowned object; also, in some cases (most notably with iterators), the
responsibility of managing the pointed-to object gets handed off to
another object. Each of the above scenarios have different implications
for the lifecycle of the referenced object. For the most part, the patch
does not change the lifecycle of managed objects; however, it makes
these relationships explicit, and it also enables us to eliminate some
hacks and accident-prone code around releasing cache handles and
deleting/cleaning up objects. (The only places where the patch changes
how an objects are managed are the partitions of partitioned indexes and
filters.)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5252

Differential Revision: D15101358

Pulled By: ltamasi

fbshipit-source-id: 9eb59e9ae5a7230e3345789762d0ba1f189485be
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent 6451673f37
commit f0bf3bf34b
  1. 244
      table/block_based_table_reader.cc
  2. 29
      table/block_based_table_reader.h
  3. 219
      table/cachable_entry.h
  4. 66
      table/partitioned_filter_block.cc
  5. 13
      table/partitioned_filter_block.h
  6. 3
      table/partitioned_filter_block_test.cc

@ -112,12 +112,6 @@ inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock(
: nullptr;
}
// Delete the resource that is held by the iterator.
template <class ResourceType>
void DeleteHeldResource(void* arg, void* /*ignored*/) {
delete reinterpret_cast<ResourceType*>(arg);
}
// Delete the entry resided in the cache.
template <class Entry>
void DeleteCachedEntry(const Slice& /*key*/, void* value) {
@ -224,7 +218,7 @@ bool PrefixExtractorChanged(const TableProperties* table_properties,
} // namespace
// Index that allows binary search lookup in a two-level index structure.
class PartitionIndexReader : public IndexReader, public Cleanable {
class PartitionIndexReader : public IndexReader {
public:
// Read the partition index from the file and create an instance for
// `PartitionIndexReader`.
@ -332,10 +326,9 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
// After prefetch, read the partitions one by one
biter.SeekToFirst();
auto ro = ReadOptions();
Cache* block_cache = rep->table_options.block_cache.get();
for (; biter.Valid(); biter.Next()) {
handle = biter.value();
BlockBasedTable::CachableEntry<Block> block;
CachableEntry<Block> block;
const bool is_index = true;
// TODO: Support counter batch update for partitioned index and
// filter blocks
@ -344,18 +337,12 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
UncompressionDict::GetEmptyDict(), &block, is_index,
nullptr /* get_context */);
assert(s.ok() || block.value == nullptr);
if (s.ok() && block.value != nullptr) {
if (block.cache_handle != nullptr) {
assert(s.ok() || block.GetValue() == nullptr);
if (s.ok() && block.GetValue() != nullptr) {
if (block.IsCached()) {
if (pin) {
partition_map_[handle.offset()] = block;
RegisterCleanup(&ReleaseCachedEntry, block_cache,
block.cache_handle);
} else {
block_cache->Release(block.cache_handle);
partition_map_[handle.offset()] = std::move(block);
}
} else {
delete block.value;
}
}
}
@ -391,8 +378,7 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
}
BlockBasedTable* table_;
std::unique_ptr<Block> index_block_;
std::unordered_map<uint64_t, BlockBasedTable::CachableEntry<Block>>
partition_map_;
std::unordered_map<uint64_t, CachableEntry<Block>> partition_map_;
const bool index_key_includes_seq_;
const bool index_value_is_full_;
};
@ -1221,14 +1207,12 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
// This is the first call to NewIndexIterator() since we're in Open().
// On success it should give us ownership of the `CachableEntry` by
// populating `index_entry`.
assert(index_entry.value != nullptr);
assert(index_entry.GetValue() != nullptr);
if (prefetch_all) {
index_entry.value->CacheDependencies(pin_all);
index_entry.GetValue()->CacheDependencies(pin_all);
}
if (pin_index) {
rep->index_entry = std::move(index_entry);
} else {
index_entry.Release(table_options.block_cache.get());
}
}
}
@ -1236,17 +1220,15 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
// Hack: Call GetFilter() to implicitly add filter to the block_cache
auto filter_entry =
new_table->GetFilter(rep->table_prefix_extractor.get());
if (filter_entry.value != nullptr && prefetch_all) {
filter_entry.value->CacheDependencies(
if (filter_entry.GetValue() != nullptr && prefetch_all) {
filter_entry.GetValue()->CacheDependencies(
pin_all, rep->table_prefix_extractor.get());
}
// if pin_filter is true then save it in rep_->filter_entry; it will be
// released in the destructor only, hence it will be pinned in the
// cache while this reader is alive
if (pin_filter) {
rep->filter_entry = filter_entry;
} else {
filter_entry.Release(table_options.block_cache.get());
rep->filter_entry = std::move(filter_entry);
}
}
} else {
@ -1369,10 +1351,13 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Rep* rep,
const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block,
const ReadOptions& read_options, CachableEntry<Block>* block,
const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit,
bool is_index, GetContext* get_context) {
assert(block);
assert(block->IsEmpty());
Status s;
BlockContents* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
@ -1380,7 +1365,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Lookup uncompressed cache first
if (block_cache != nullptr) {
block->cache_handle = GetEntryFromCache(
auto cache_handle = GetEntryFromCache(
block_cache, block_cache_key, rep->level,
is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS,
is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT,
@ -1393,15 +1378,16 @@ Status BlockBasedTable::GetDataBlockFromCache(
: &get_context->get_context_stats_.num_cache_data_hit)
: nullptr,
statistics, get_context);
if (block->cache_handle != nullptr) {
block->value =
reinterpret_cast<Block*>(block_cache->Value(block->cache_handle));
if (cache_handle != nullptr) {
block->SetCachedValue(
reinterpret_cast<Block*>(block_cache->Value(cache_handle)),
block_cache, cache_handle);
return s;
}
}
// If not found, search from the compressed block cache.
assert(block->cache_handle == nullptr && block->value == nullptr);
assert(block->IsEmpty());
if (block_cache_compressed == nullptr) {
return s;
@ -1435,20 +1421,25 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Insert uncompressed block into block cache
if (s.ok()) {
block->value =
new Block(std::move(contents), rep->get_global_seqno(is_index),
read_amp_bytes_per_bit,
statistics); // uncompressed block
if (block_cache != nullptr && block->value->own_bytes() &&
std::unique_ptr<Block> block_holder(
new Block(std::move(contents), rep->get_global_seqno(is_index),
read_amp_bytes_per_bit, statistics)); // uncompressed block
if (block_cache != nullptr && block_holder->own_bytes() &&
read_options.fill_cache) {
size_t charge = block->value->ApproximateMemoryUsage();
s = block_cache->Insert(block_cache_key, block->value, charge,
size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr;
s = block_cache->Insert(block_cache_key, block_holder.get(), charge,
&DeleteCachedEntry<Block>,
&(block->cache_handle));
&cache_handle);
#ifndef NDEBUG
block_cache->TEST_mark_as_data_block(block_cache_key, charge);
#endif // NDEBUG
if (s.ok()) {
assert(cache_handle != nullptr);
block->SetCachedValue(block_holder.release(), block_cache,
cache_handle);
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
get_context->get_context_stats_.num_cache_bytes_write += charge;
@ -1477,9 +1468,9 @@ Status BlockBasedTable::GetDataBlockFromCache(
}
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete block->value;
block->value = nullptr;
}
} else {
block->SetOwnedValue(block_holder.release());
}
}
@ -1497,33 +1488,34 @@ Status BlockBasedTable::PutDataBlockToCache(
const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
bool is_index, Cache::Priority priority, GetContext* get_context) {
assert(cached_block);
assert(cached_block->IsEmpty());
assert(raw_block_comp_type == kNoCompression ||
block_cache_compressed != nullptr);
Status s;
// Retrieve the uncompressed contents into a new buffer
BlockContents uncompressed_block_contents;
Statistics* statistics = ioptions.statistics;
std::unique_ptr<Block> block_holder;
if (raw_block_comp_type != kNoCompression) {
// Retrieve the uncompressed contents into a new buffer
BlockContents uncompressed_block_contents;
UncompressionContext context(raw_block_comp_type);
UncompressionInfo info(context, uncompression_dict, raw_block_comp_type);
s = UncompressBlockContents(info, raw_block_contents->data.data(),
raw_block_contents->data.size(),
&uncompressed_block_contents, format_version,
ioptions, memory_allocator);
}
if (!s.ok()) {
return s;
}
if (!s.ok()) {
return s;
}
if (raw_block_comp_type != kNoCompression) {
cached_block->value = new Block(std::move(uncompressed_block_contents),
seq_no, read_amp_bytes_per_bit,
statistics); // uncompressed block
block_holder.reset(new Block(std::move(uncompressed_block_contents), seq_no,
read_amp_bytes_per_bit, statistics));
} else {
cached_block->value =
new Block(std::move(*raw_block_contents), seq_no,
read_amp_bytes_per_bit, ioptions.statistics);
block_holder.reset(new Block(std::move(*raw_block_contents), seq_no,
read_amp_bytes_per_bit, statistics));
}
// Insert compressed block into compressed block cache.
@ -1553,16 +1545,20 @@ Status BlockBasedTable::PutDataBlockToCache(
}
// insert into uncompressed block cache
if (block_cache != nullptr && cached_block->value->own_bytes()) {
size_t charge = cached_block->value->ApproximateMemoryUsage();
s = block_cache->Insert(block_cache_key, cached_block->value, charge,
if (block_cache != nullptr && block_holder->own_bytes()) {
size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr;
s = block_cache->Insert(block_cache_key, block_holder.get(), charge,
&DeleteCachedEntry<Block>,
&(cached_block->cache_handle), priority);
&cache_handle, priority);
#ifndef NDEBUG
block_cache->TEST_mark_as_data_block(block_cache_key, charge);
#endif // NDEBUG
if (s.ok()) {
assert(cached_block->cache_handle != nullptr);
assert(cache_handle != nullptr);
cached_block->SetCachedValue(block_holder.release(), block_cache,
cache_handle);
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
get_context->get_context_stats_.num_cache_bytes_write += charge;
@ -1589,12 +1585,12 @@ Status BlockBasedTable::PutDataBlockToCache(
}
}
assert(reinterpret_cast<Block*>(block_cache->Value(
cached_block->cache_handle)) == cached_block->value);
cached_block->GetCacheHandle())) == cached_block->GetValue());
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete cached_block->value;
cached_block->value = nullptr;
}
} else {
cached_block->SetOwnedValue(block_holder.release());
}
return s;
@ -1668,7 +1664,7 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
}
}
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
const SliceTransform* prefix_extractor, FilePrefetchBuffer* prefetch_buffer,
bool no_io, GetContext* get_context) const {
const BlockHandle& filter_blk_handle = rep_->filter_handle;
@ -1677,7 +1673,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
no_io, get_context, prefix_extractor);
}
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle,
const bool is_a_filter_partition, bool no_io, GetContext* get_context,
const SliceTransform* prefix_extractor) const {
@ -1687,17 +1683,19 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
// most probably fail again.
if (!is_a_filter_partition &&
!rep_->table_options.cache_index_and_filter_blocks) {
return {rep_->filter.get(), nullptr /* cache handle */};
return {rep_->filter.get(), nullptr /* cache */,
nullptr /* cache_handle */, false /* own_value */};
}
Cache* block_cache = rep_->table_options.block_cache.get();
if (rep_->filter_policy == nullptr /* do not use filter */ ||
block_cache == nullptr /* no block cache at all */) {
return {nullptr /* filter */, nullptr /* cache handle */};
return CachableEntry<FilterBlockReader>();
}
if (!is_a_filter_partition && rep_->filter_entry.IsSet()) {
return rep_->filter_entry;
if (!is_a_filter_partition && rep_->filter_entry.IsCached()) {
return {rep_->filter_entry.GetValue(), nullptr /* cache */,
nullptr /* cache_handle */, false /* own_value */};
}
PERF_TIMER_GUARD(read_filter_block_nanos);
@ -1708,7 +1706,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
filter_blk_handle, cache_key);
Statistics* statistics = rep_->ioptions.statistics;
auto cache_handle = GetEntryFromCache(
Cache::Handle* cache_handle = GetEntryFromCache(
block_cache, key, rep_->level, BLOCK_CACHE_FILTER_MISS,
BLOCK_CACHE_FILTER_HIT,
get_context ? &get_context->get_context_stats_.num_cache_filter_miss
@ -1757,20 +1755,22 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
}
}
return {filter, cache_handle};
return {filter, cache_handle ? block_cache : nullptr, cache_handle,
false /* own_value */};
}
BlockBasedTable::CachableEntry<UncompressionDict>
CachableEntry<UncompressionDict>
BlockBasedTable::GetUncompressionDict(Rep* rep,
FilePrefetchBuffer* prefetch_buffer,
bool no_io, GetContext* get_context) {
if (!rep->table_options.cache_index_and_filter_blocks) {
// block cache is either disabled or not used for meta-blocks. In either
// case, BlockBasedTableReader is the owner of the uncompression dictionary.
return {rep->uncompression_dict.get(), nullptr /* cache handle */};
return {rep->uncompression_dict.get(), nullptr /* cache */,
nullptr /* cache_handle */, false /* own_value */};
}
if (rep->compression_dict_handle.IsNull()) {
return {nullptr, nullptr};
return CachableEntry<UncompressionDict>();
}
char cache_key_buf[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto cache_key =
@ -1835,7 +1835,8 @@ BlockBasedTable::GetUncompressionDict(Rep* rep,
assert(cache_handle == nullptr);
}
}
return {dict, cache_handle};
return {dict, cache_handle ? rep->table_options.block_cache.get() : nullptr,
cache_handle, false /* own_value */};
}
// disable_prefix_seek should be set to true when prefix_extractor found in SST
@ -1853,10 +1854,10 @@ InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
read_options.fill_cache);
}
// we have a pinned index block
if (rep_->index_entry.IsSet()) {
if (rep_->index_entry.IsCached()) {
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return rep_->index_entry.value->NewIterator(
return rep_->index_entry.GetValue()->NewIterator(
input_iter, read_options.total_order_seek || disable_prefix_seek,
read_options.fill_cache);
}
@ -1948,7 +1949,8 @@ InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
// the caller would like to take ownership of the index block
// don't call RegisterCleanup() in this case, the caller will take care of it
if (index_entry != nullptr) {
*index_entry = {index_reader, cache_handle};
*index_entry = {index_reader, block_cache, cache_handle,
false /* own_value */};
} else {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle);
}
@ -1976,9 +1978,9 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
auto uncompression_dict_storage =
GetUncompressionDict(rep, prefetch_buffer, no_io, get_context);
const UncompressionDict& uncompression_dict =
uncompression_dict_storage.value == nullptr
uncompression_dict_storage.GetValue() == nullptr
? UncompressionDict::GetEmptyDict()
: *uncompression_dict_storage.value;
: *uncompression_dict_storage.GetValue();
if (s.ok()) {
s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle,
uncompression_dict, &block, is_index,
@ -1991,7 +1993,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
iter = new TBlockIter;
}
// Didn't get any data from block caches.
if (s.ok() && block.value == nullptr) {
if (s.ok() && block.GetValue() == nullptr) {
if (no_io) {
// Could not read from block_cache and can't do IO
iter->Invalidate(Status::Incomplete("no blocking io"));
@ -2012,16 +2014,15 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
GetMemoryAllocator(rep->table_options));
}
if (s.ok()) {
block.value = block_value.release();
block.SetOwnedValue(block_value.release());
}
}
// TODO(ajkr): also pin compression dictionary block when
// `pin_l0_filter_and_index_blocks_in_cache == true`.
uncompression_dict_storage.Release(block_cache);
}
if (s.ok()) {
assert(block.value != nullptr);
assert(block.GetValue() != nullptr);
const bool kTotalOrderSeek = true;
// Block contents are pinned and it is still pinned after the iterator
// is destroyed as long as cleanup functions are moved to another object,
@ -2031,16 +2032,13 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
// not reading data from the original source, whether immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
bool block_contents_pinned =
(block.cache_handle != nullptr ||
(!block.value->own_bytes() && rep->immortal_table));
iter = block.value->NewIterator<TBlockIter>(
(block.IsCached() ||
(!block.GetValue()->own_bytes() && rep->immortal_table));
iter = block.GetValue()->NewIterator<TBlockIter>(
&rep->internal_comparator, rep->internal_comparator.user_comparator(),
iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq,
index_key_is_full, block_contents_pinned);
if (block.cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
block.cache_handle);
} else {
if (!block.IsCached()) {
if (!ro.fill_cache && rep->cache_key_prefix_size != 0) {
// insert a dummy record to block cache to track the memory usage
Cache::Handle* cache_handle;
@ -2063,8 +2061,8 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
Slice unique_key =
Slice(cache_key, static_cast<size_t>(end - cache_key));
s = block_cache->Insert(unique_key, nullptr,
block.value->ApproximateMemoryUsage(), nullptr,
&cache_handle);
block.GetValue()->ApproximateMemoryUsage(),
nullptr, &cache_handle);
if (s.ok()) {
if (cache_handle != nullptr) {
iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
@ -2072,10 +2070,11 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
}
}
}
iter->RegisterCleanup(&DeleteHeldResource<Block>, block.value, nullptr);
}
block.TransferTo(iter);
} else {
assert(block.value == nullptr);
assert(block.GetValue() == nullptr);
iter->Invalidate(s);
}
return iter;
@ -2122,7 +2121,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// Can't find the block from the cache. If I/O is allowed, read from the
// file.
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) {
Statistics* statistics = rep->ioptions.statistics;
bool do_decompress =
block_cache_compressed == nullptr && rep->blocks_maybe_compressed;
@ -2159,7 +2158,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
}
}
}
assert(s.ok() || block_entry->value == nullptr);
assert(s.ok() || block_entry->GetValue() == nullptr);
return s;
}
@ -2187,11 +2186,11 @@ BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
Cache* block_cache = rep->table_options.block_cache.get();
assert(block_cache);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(block->second.cache_handle));
block_cache->GetUsage(block->second.GetCacheHandle()));
Statistics* kNullStats = nullptr;
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return block->second.value->NewIterator<IndexBlockIter>(
return block->second.GetValue()->NewIterator<IndexBlockIter>(
&rep->internal_comparator, rep->internal_comparator.user_comparator(),
nullptr, kNullStats, true, index_key_includes_seq_, index_key_is_full_);
}
@ -2239,7 +2238,7 @@ bool BlockBasedTable::PrefixMayMatch(
// First, try check with full filter
auto filter_entry = GetFilter(prefix_extractor);
FilterBlockReader* filter = filter_entry.value;
FilterBlockReader* filter = filter_entry.GetValue();
bool filter_checked = true;
if (filter != nullptr) {
if (!filter->IsBlockBased()) {
@ -2251,9 +2250,6 @@ bool BlockBasedTable::PrefixMayMatch(
} else {
// if prefix_extractor changed for block based filter, skip filter
if (need_upper_bound_check) {
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
return true;
}
auto prefix = prefix_extractor->Transform(user_key);
@ -2317,12 +2313,6 @@ bool BlockBasedTable::PrefixMayMatch(
}
}
// if rep_->filter_entry is not set, we should call Release(); otherwise
// don't call, in this case we have a local copy in rep_->filter_entry,
// it's pinned to the cache and will be released in the destructor
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
return may_match;
}
@ -2734,7 +2724,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
GetFilter(prefix_extractor, /*prefetch_buffer*/ nullptr,
read_options.read_tier == kBlockCacheTier, get_context);
}
filter = filter_entry.value;
filter = filter_entry.GetValue();
// First check the full filter
// If full filter not useful, Then go into each block
@ -2838,12 +2828,6 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
}
}
// if rep_->filter_entry is not set, we should call Release(); otherwise
// don't call, in this case we have a local copy in rep_->filter_entry,
// it's pinned to the cache and will be released in the destructor
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
return s;
}
@ -2864,7 +2848,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
read_options.read_tier == kBlockCacheTier,
nullptr /*get_context*/);
}
filter = filter_entry.value;
filter = filter_entry.GetValue();
// First check the full filter
// If full filter not useful, Then go into each block
@ -2954,13 +2938,6 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
*(miter->s) = s;
}
}
// if rep_->filter_entry is not set, we should call Release(); otherwise
// don't call, in this case we have a local copy in rep_->filter_entry,
// it's pinned to the cache and will be released in the destructor
if (!rep_->filter_entry.IsSet()) {
filter_entry.Release(rep_->table_options.block_cache.get());
}
}
Status BlockBasedTable::Prefetch(const Slice* const begin,
@ -3144,11 +3121,7 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
UncompressionDict::GetEmptyDict(), 0 /* read_amp_bytes_per_bit */);
}
assert(s.ok());
bool in_cache = block.value != nullptr;
if (in_cache) {
ReleaseCachedEntry(block_cache, block.cache_handle);
}
return in_cache;
return block.IsCached();
}
BlockBasedTableOptions::IndexType BlockBasedTable::UpdateIndexType() {
@ -3494,9 +3467,6 @@ void BlockBasedTable::Close() {
Cache* const cache = rep_->table_options.block_cache.get();
rep_->filter_entry.Release(cache);
rep_->index_entry.Release(cache);
// cleanup index, filter, and compression dictionary blocks
// to avoid accessing dangling pointers
if (!rep_->table_options.no_block_cache) {

@ -25,6 +25,7 @@
#include "rocksdb/table.h"
#include "table/block.h"
#include "table/block_based_table_factory.h"
#include "table/cachable_entry.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "table/get_context.h"
@ -220,8 +221,6 @@ class BlockBasedTable : public TableReader {
// The key retrieved are internal keys.
Status GetKVPairsFromDataBlocks(std::vector<KVPairBlock>* kv_pair_blocks);
template <class TValue>
struct CachableEntry;
struct Rep;
Rep* get_rep() { return rep_; }
@ -311,8 +310,7 @@ class BlockBasedTable : public TableReader {
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Rep* rep,
const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block,
const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block, const UncompressionDict& uncompression_dict,
size_t read_amp_bytes_per_bit, bool is_index = false,
GetContext* get_context = nullptr);
@ -446,29 +444,6 @@ class BlockBasedTable::PartitionedIndexIteratorState
bool index_key_is_full_;
};
// CachableEntry represents the entries that *may* be fetched from block cache.
// field `value` is the item we want to get.
// field `cache_handle` is the cache handle to the block cache. If the value
// was not read from cache, `cache_handle` will be nullptr.
template <class TValue>
struct BlockBasedTable::CachableEntry {
CachableEntry(TValue* _value, Cache::Handle* _cache_handle)
: value(_value), cache_handle(_cache_handle) {}
CachableEntry() : CachableEntry(nullptr, nullptr) {}
void Release(Cache* cache, bool force_erase = false) {
if (cache_handle) {
cache->Release(cache_handle, force_erase);
value = nullptr;
cache_handle = nullptr;
}
}
bool IsSet() const { return cache_handle != nullptr; }
TValue* value = nullptr;
// if the entry is from the cache, cache_handle will be populated.
Cache::Handle* cache_handle = nullptr;
};
struct BlockBasedTable::Rep {
Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options,
const BlockBasedTableOptions& _table_opt,

@ -0,0 +1,219 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2012 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <cassert>
#include "rocksdb/cache.h"
#include "rocksdb/cleanable.h"
namespace rocksdb {
// CachableEntry is a handle to an object that may or may not be in the block
// cache. It is used in a variety of ways:
//
// 1) It may refer to an object in the block cache. In this case, cache_ and
// cache_handle_ are not nullptr, and the cache handle has to be released when
// the CachableEntry is destroyed (the lifecycle of the cached object, on the
// other hand, is managed by the cache itself).
// 2) It may uniquely own the (non-cached) object it refers to (examples include
// a block read directly from file, or uncompressed blocks when there is a
// compressed block cache but no uncompressed block cache). In such cases, the
// object has to be destroyed when the CachableEntry is destroyed.
// 3) It may point to an object (cached or not) without owning it. In this case,
// no action is needed when the CachableEntry is destroyed.
// 4) Sometimes, management of a cached or owned object (see #1 and #2 above)
// is transferred to some other object. This is used for instance with iterators
// (where cleanup is performed using a chain of cleanup functions,
// see Cleanable).
//
// Because of #1 and #2 above, copying a CachableEntry is not safe (and thus not
// allowed); hence, this is a move-only type, where a move transfers the
// management responsibilities, and leaves the source object in an empty state.
template <class T>
class CachableEntry {
public:
CachableEntry() = default;
CachableEntry(T* value, Cache* cache, Cache::Handle* cache_handle,
bool own_value)
: value_(value)
, cache_(cache)
, cache_handle_(cache_handle)
, own_value_(own_value)
{
assert(value_ != nullptr ||
(cache_ == nullptr && cache_handle_ == nullptr && !own_value_));
assert(!!cache_ == !!cache_handle_);
assert(!cache_handle_ || !own_value_);
}
CachableEntry(const CachableEntry&) = delete;
CachableEntry& operator=(const CachableEntry&) = delete;
CachableEntry(CachableEntry&& rhs)
: value_(rhs.value_)
, cache_(rhs.cache_)
, cache_handle_(rhs.cache_handle_)
, own_value_(rhs.own_value_)
{
assert(value_ != nullptr ||
(cache_ == nullptr && cache_handle_ == nullptr && !own_value_));
assert(!!cache_ == !!cache_handle_);
assert(!cache_handle_ || !own_value_);
rhs.ResetFields();
}
CachableEntry& operator=(CachableEntry&& rhs) {
if (UNLIKELY(this == &rhs)) {
return *this;
}
ReleaseResource();
value_ = rhs.value_;
cache_ = rhs.cache_;
cache_handle_ = rhs.cache_handle_;
own_value_ = rhs.own_value_;
assert(value_ != nullptr ||
(cache_ == nullptr && cache_handle_ == nullptr && !own_value_));
assert(!!cache_ == !!cache_handle_);
assert(!cache_handle_ || !own_value_);
rhs.ResetFields();
return *this;
}
~CachableEntry() {
ReleaseResource();
}
bool IsEmpty() const {
return value_ == nullptr && cache_ == nullptr && cache_handle_ == nullptr &&
!own_value_;
}
bool IsCached() const {
assert(!!cache_ == !!cache_handle_);
return cache_handle_ != nullptr;
}
T* GetValue() const { return value_; }
Cache* GetCache() const { return cache_; }
Cache::Handle* GetCacheHandle() const { return cache_handle_; }
bool GetOwnValue() const { return own_value_; }
void Reset() {
ReleaseResource();
ResetFields();
}
void TransferTo(Cleanable* cleanable) {
if (cleanable) {
if (cache_handle_ != nullptr) {
assert(cache_ != nullptr);
cleanable->RegisterCleanup(&ReleaseCacheHandle, cache_, cache_handle_);
} else if (own_value_) {
cleanable->RegisterCleanup(&DeleteValue, value_, nullptr);
}
}
ResetFields();
}
void SetOwnedValue(T* value) {
assert(value != nullptr);
if (UNLIKELY(value_ == value && own_value_)) {
assert(cache_ == nullptr && cache_handle_ == nullptr);
return;
}
Reset();
value_ = value;
own_value_ = true;
}
void SetUnownedValue(T* value) {
assert(value != nullptr);
if (UNLIKELY(value_ == value && cache_ == nullptr &&
cache_handle_ == nullptr && !own_value_)) {
return;
}
Reset();
value_ = value;
assert(!own_value_);
}
void SetCachedValue(T* value, Cache* cache, Cache::Handle* cache_handle) {
assert(value != nullptr);
assert(cache != nullptr);
assert(cache_handle != nullptr);
if (UNLIKELY(value_ == value && cache_ == cache &&
cache_handle_ == cache_handle && !own_value_)) {
return;
}
Reset();
value_ = value;
cache_ = cache;
cache_handle_ = cache_handle;
assert(!own_value_);
}
private:
void ReleaseResource() {
if (LIKELY(cache_handle_ != nullptr)) {
assert(cache_ != nullptr);
cache_->Release(cache_handle_);
} else if (own_value_) {
delete value_;
}
}
void ResetFields() {
value_ = nullptr;
cache_ = nullptr;
cache_handle_ = nullptr;
own_value_ = false;
}
static void ReleaseCacheHandle(void* arg1, void* arg2) {
Cache* const cache = static_cast<Cache*>(arg1);
assert(cache);
Cache::Handle* const cache_handle = static_cast<Cache::Handle*>(arg2);
assert(cache_handle);
cache->Release(cache_handle);
}
static void DeleteValue(void* arg1, void* /* arg2 */) {
delete static_cast<T*>(arg1);
}
private:
T* value_ = nullptr;
Cache* cache_ = nullptr;
Cache::Handle* cache_handle_ = nullptr;
bool own_value_ = false;
};
} // namespace rocksdb

@ -176,24 +176,14 @@ bool PartitionedFilterBlockReader::KeyMayMatch(
if (UNLIKELY(filter_handle.size() == 0)) { // key is out of range
return false;
}
bool cached = false;
auto filter_partition =
GetFilterPartition(nullptr /* prefetch_buffer */, filter_handle, no_io,
&cached, prefix_extractor);
if (UNLIKELY(!filter_partition.value)) {
prefix_extractor);
if (UNLIKELY(!filter_partition.GetValue())) {
return true;
}
auto res = filter_partition.value->KeyMayMatch(key, prefix_extractor,
block_offset, no_io);
if (cached) {
return res;
}
if (LIKELY(filter_partition.IsSet())) {
filter_partition.Release(table_->rep_->table_options.block_cache.get());
} else {
delete filter_partition.value;
}
return res;
return filter_partition.GetValue()->KeyMayMatch(key, prefix_extractor,
block_offset, no_io);
}
bool PartitionedFilterBlockReader::PrefixMayMatch(
@ -215,24 +205,14 @@ bool PartitionedFilterBlockReader::PrefixMayMatch(
if (UNLIKELY(filter_handle.size() == 0)) { // prefix is out of range
return false;
}
bool cached = false;
auto filter_partition =
GetFilterPartition(nullptr /* prefetch_buffer */, filter_handle, no_io,
&cached, prefix_extractor);
if (UNLIKELY(!filter_partition.value)) {
prefix_extractor);
if (UNLIKELY(!filter_partition.GetValue())) {
return true;
}
auto res = filter_partition.value->PrefixMayMatch(prefix, prefix_extractor,
kNotValid, no_io);
if (cached) {
return res;
}
if (LIKELY(filter_partition.IsSet())) {
filter_partition.Release(table_->rep_->table_options.block_cache.get());
} else {
delete filter_partition.value;
}
return res;
return filter_partition.GetValue()->PrefixMayMatch(prefix, prefix_extractor,
kNotValid, no_io);
}
BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
@ -251,10 +231,10 @@ BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
return fltr_blk_handle;
}
BlockBasedTable::CachableEntry<FilterBlockReader>
CachableEntry<FilterBlockReader>
PartitionedFilterBlockReader::GetFilterPartition(
FilePrefetchBuffer* prefetch_buffer, BlockHandle& fltr_blk_handle,
const bool no_io, bool* cached, const SliceTransform* prefix_extractor) {
const bool no_io, const SliceTransform* prefix_extractor) {
const bool is_a_filter_partition = true;
auto block_cache = table_->rep_->table_options.block_cache.get();
if (LIKELY(block_cache != nullptr)) {
@ -267,9 +247,9 @@ PartitionedFilterBlockReader::GetFilterPartition(
RecordTick(statistics(), BLOCK_CACHE_FILTER_HIT);
RecordTick(statistics(), BLOCK_CACHE_HIT);
RecordTick(statistics(), BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(iter->second.cache_handle));
*cached = true;
return iter->second;
block_cache->GetUsage(iter->second.GetCacheHandle()));
return {iter->second.GetValue(), nullptr /* cache */,
nullptr /* cache_handle */, false /* own_value */};
}
}
return table_->GetFilter(/*prefetch_buffer*/ nullptr, fltr_blk_handle,
@ -278,7 +258,8 @@ PartitionedFilterBlockReader::GetFilterPartition(
} else {
auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle,
is_a_filter_partition, prefix_extractor);
return {filter, nullptr};
return {filter, nullptr /* cache */, nullptr /* cache_handle */,
true /* own_value */};
}
}
@ -293,18 +274,10 @@ size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
// TODO(myabandeh): better estimation for filter_map_ size
}
// Release the cached entry and decrement its ref count.
void ReleaseFilterCachedEntry(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);
Cache::Handle* handle = reinterpret_cast<Cache::Handle*>(h);
cache->Release(handle);
}
// TODO(myabandeh): merge this with the same function in IndexReader
void PartitionedFilterBlockReader::CacheDependencies(
bool pin, const SliceTransform* prefix_extractor) {
// Before read partitions, prefetch them to avoid lots of IOs
auto rep = table_->rep_;
IndexBlockIter biter;
Statistics* kNullStats = nullptr;
idx_on_fltr_blk_->NewIterator<IndexBlockIter>(
@ -330,7 +303,6 @@ void PartitionedFilterBlockReader::CacheDependencies(
// After prefetch, read the partitions one by one
biter.SeekToFirst();
Cache* block_cache = rep->table_options.block_cache.get();
for (; biter.Valid(); biter.Next()) {
handle = biter.value();
const bool no_io = true;
@ -338,16 +310,10 @@ void PartitionedFilterBlockReader::CacheDependencies(
auto filter = table_->GetFilter(
prefetch_buffer.get(), handle, is_a_filter_partition, !no_io,
/* get_context */ nullptr, prefix_extractor);
if (LIKELY(filter.IsSet())) {
if (LIKELY(filter.IsCached())) {
if (pin) {
filter_map_[handle.offset()] = std::move(filter);
RegisterCleanup(&ReleaseFilterCachedEntry, block_cache,
filter.cache_handle);
} else {
block_cache->Release(filter.cache_handle);
}
} else {
delete filter.value;
}
}
}

@ -15,6 +15,7 @@
#include "table/block.h"
#include "table/block_based_table_reader.h"
#include "table/cachable_entry.h"
#include "table/full_filter_block.h"
#include "table/index_builder.h"
#include "util/autovector.h"
@ -69,8 +70,7 @@ class PartitionedFilterBlockBuilder : public FullFilterBlockBuilder {
BlockHandle last_encoded_handle_;
};
class PartitionedFilterBlockReader : public FilterBlockReader,
public Cleanable {
class PartitionedFilterBlockReader : public FilterBlockReader {
public:
explicit PartitionedFilterBlockReader(
const SliceTransform* prefix_extractor, bool whole_key_filtering,
@ -93,10 +93,9 @@ class PartitionedFilterBlockReader : public FilterBlockReader,
private:
BlockHandle GetFilterPartitionHandle(const Slice& entry);
BlockBasedTable::CachableEntry<FilterBlockReader> GetFilterPartition(
CachableEntry<FilterBlockReader> GetFilterPartition(
FilePrefetchBuffer* prefetch_buffer, BlockHandle& handle,
const bool no_io, bool* cached,
const SliceTransform* prefix_extractor = nullptr);
const bool no_io, const SliceTransform* prefix_extractor = nullptr);
virtual void CacheDependencies(
bool bin, const SliceTransform* prefix_extractor) override;
@ -106,9 +105,7 @@ class PartitionedFilterBlockReader : public FilterBlockReader,
const BlockBasedTable* table_;
const bool index_key_includes_seq_;
const bool index_value_is_full_;
std::unordered_map<uint64_t,
BlockBasedTable::CachableEntry<FilterBlockReader>>
filter_map_;
std::unordered_map<uint64_t, CachableEntry<FilterBlockReader>> filter_map_;
};
} // namespace rocksdb

@ -35,7 +35,8 @@ class MockedBlockBasedTable : public BlockBasedTable {
auto obj = new FullFilterBlockReader(
prefix_extractor, true, BlockContents(slice),
rep_->table_options.filter_policy->GetFilterBitsReader(slice), nullptr);
return {obj, nullptr};
return {obj, nullptr /* cache */, nullptr /* cache_handle */,
true /* own_value */};
}
FilterBlockReader* ReadFilter(

Loading…
Cancel
Save