Refactor the handling of cache related counters and statistics (#5408)

Summary:
The patch cleans up the handling of cache hit/miss/insertion related
performance counters, get context counters, and statistics by
eliminating some code duplication and factoring out the affected logic
into separate methods. In addition, it makes the semantics of cache hit
metrics more consistent by changing the code so that accessing a
partition of partitioned indexes/filters through a pinned reference no
longer counts as a cache hit.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5408

Differential Revision: D15610883

Pulled By: ltamasi

fbshipit-source-id: ee749c18965077aca971d8f8bee8b24ed8fa76f1
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent aa71718ac3
commit bee2f48a66
  1. 1
      HISTORY.md
  2. 460
      table/block_based/block_based_table_reader.cc
  3. 58
      table/block_based/block_based_table_reader.h
  4. 24
      table/block_based/block_type.h
  5. 5
      table/block_based/partitioned_filter_block.cc

@ -6,6 +6,7 @@
* Due to a refactoring, block cache eviction statistics for indexes are temporarily broken. We plan to reintroduce them in a later phase.
* options.keep_log_file_num will be enforced strictly all the time. File names of all log files will be tracked, which may take significantly amount of memory if options.keep_log_file_num is large and either of options.max_log_file_size or options.log_file_time_to_roll is set.
* Add initial support for Get/Put with user timestamps. Users can specify timestamps via ReadOptions and WriteOptions when calling DB::Get and DB::Put.
* Accessing a partition of a partitioned filter or index through a pinned reference is no longer considered a cache hit.
### New Features
* Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.

@ -230,10 +230,10 @@ Status BlockBasedTable::IndexReaderCommon::ReadIndexBlock(
const Rep* const rep = table->get_rep();
assert(rep != nullptr);
constexpr bool is_index = true;
const Status s = table->RetrieveBlock(
prefetch_buffer, read_options, rep->footer.index_handle(),
UncompressionDict::GetEmptyDict(), index_block, is_index, get_context);
UncompressionDict::GetEmptyDict(), index_block, BlockType::kIndex,
get_context);
return s;
}
@ -244,9 +244,7 @@ Status BlockBasedTable::IndexReaderCommon::GetOrReadIndexBlock(
assert(index_block != nullptr);
if (!index_block_.IsEmpty()) {
*index_block =
CachableEntry<Block>(index_block_.GetValue(), nullptr /* cache */,
nullptr /* cache_handle */, false /* own_value */);
index_block->SetUnownedValue(index_block_.GetValue());
return Status::OK();
}
@ -321,7 +319,6 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
} else {
ReadOptions ro;
ro.fill_cache = read_options.fill_cache;
constexpr bool is_index = true;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
it = new BlockBasedTableIterator<IndexBlockIter, BlockHandle>(
@ -330,7 +327,7 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
internal_comparator(), internal_comparator()->user_comparator(),
nullptr, kNullStats, true, index_key_includes_seq(),
index_value_is_full()),
false, true, /* prefix_extractor */ nullptr, is_index,
false, true, /* prefix_extractor */ nullptr, BlockType::kIndex,
index_key_includes_seq(), index_value_is_full());
}
@ -399,12 +396,11 @@ class PartitionIndexReader : public BlockBasedTable::IndexReaderCommon {
for (; biter.Valid(); biter.Next()) {
handle = biter.value();
CachableEntry<Block> block;
const bool is_index = true;
// TODO: Support counter batch update for partitioned index and
// filter blocks
s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
&block, is_index, nullptr /* get_context */);
&block, BlockType::kIndex, nullptr /* get_context */);
assert(s.ok() || block.GetValue() == nullptr);
if (s.ok() && block.GetValue() != nullptr) {
@ -662,44 +658,188 @@ class HashIndexReader : public BlockBasedTable::IndexReaderCommon {
std::unique_ptr<BlockPrefixIndex> prefix_index_;
};
void BlockBasedTable::UpdateCacheHitMetrics(BlockType block_type,
GetContext* get_context,
size_t usage) const {
Statistics* const statistics = rep_->ioptions.statistics;
PERF_COUNTER_ADD(block_cache_hit_count, 1);
PERF_COUNTER_BY_LEVEL_ADD(block_cache_hit_count, 1,
static_cast<uint32_t>(rep_->level));
if (get_context) {
++get_context->get_context_stats_.num_cache_hit;
get_context->get_context_stats_.num_cache_bytes_read += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_HIT);
RecordTick(statistics, BLOCK_CACHE_BYTES_READ, usage);
}
switch (block_type) {
case BlockType::kFilter:
PERF_COUNTER_ADD(block_cache_filter_hit_count, 1);
if (get_context) {
++get_context->get_context_stats_.num_cache_filter_hit;
} else {
RecordTick(statistics, BLOCK_CACHE_FILTER_HIT);
}
break;
case BlockType::kCompressionDictionary:
// TODO: introduce perf counter for compression dictionary hit count
if (get_context) {
++get_context->get_context_stats_.num_cache_compression_dict_hit;
} else {
RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_HIT);
}
break;
case BlockType::kIndex:
PERF_COUNTER_ADD(block_cache_index_hit_count, 1);
if (get_context) {
++get_context->get_context_stats_.num_cache_index_hit;
} else {
RecordTick(statistics, BLOCK_CACHE_INDEX_HIT);
}
break;
default:
// TODO: introduce dedicated tickers/statistics/counters
// for range tombstones
if (get_context) {
++get_context->get_context_stats_.num_cache_data_hit;
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_HIT);
}
break;
}
}
void BlockBasedTable::UpdateCacheMissMetrics(BlockType block_type,
GetContext* get_context) const {
Statistics* const statistics = rep_->ioptions.statistics;
// TODO: introduce aggregate (not per-level) block cache miss count
PERF_COUNTER_BY_LEVEL_ADD(block_cache_miss_count, 1,
static_cast<uint32_t>(rep_->level));
if (get_context) {
++get_context->get_context_stats_.num_cache_miss;
} else {
RecordTick(statistics, BLOCK_CACHE_MISS);
}
// TODO: introduce perf counters for misses per block type
switch (block_type) {
case BlockType::kFilter:
if (get_context) {
++get_context->get_context_stats_.num_cache_filter_miss;
} else {
RecordTick(statistics, BLOCK_CACHE_FILTER_MISS);
}
break;
case BlockType::kCompressionDictionary:
if (get_context) {
++get_context->get_context_stats_.num_cache_compression_dict_miss;
} else {
RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_MISS);
}
break;
case BlockType::kIndex:
if (get_context) {
++get_context->get_context_stats_.num_cache_index_miss;
} else {
RecordTick(statistics, BLOCK_CACHE_INDEX_MISS);
}
break;
default:
// TODO: introduce dedicated tickers/statistics/counters
// for range tombstones
if (get_context) {
++get_context->get_context_stats_.num_cache_data_miss;
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_MISS);
}
break;
}
}
void BlockBasedTable::UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context,
size_t usage) const {
Statistics* const statistics = rep_->ioptions.statistics;
// TODO: introduce perf counters for block cache insertions
if (get_context) {
++get_context->get_context_stats_.num_cache_add;
get_context->get_context_stats_.num_cache_bytes_write += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usage);
}
switch (block_type) {
case BlockType::kFilter:
if (get_context) {
++get_context->get_context_stats_.num_cache_filter_add;
get_context->get_context_stats_.num_cache_filter_bytes_insert += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, usage);
}
break;
case BlockType::kCompressionDictionary:
if (get_context) {
++get_context->get_context_stats_.num_cache_compression_dict_add;
get_context->get_context_stats_
.num_cache_compression_dict_bytes_insert += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD);
RecordTick(statistics, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
usage);
}
break;
case BlockType::kIndex:
if (get_context) {
++get_context->get_context_stats_.num_cache_index_add;
get_context->get_context_stats_.num_cache_index_bytes_insert += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, usage);
}
break;
default:
// TODO: introduce dedicated tickers/statistics/counters
// for range tombstones
if (get_context) {
++get_context->get_context_stats_.num_cache_data_add;
get_context->get_context_stats_.num_cache_data_bytes_insert += usage;
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, usage);
}
break;
}
}
Cache::Handle* BlockBasedTable::GetEntryFromCache(
Cache* block_cache, const Slice& key, Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker, uint64_t* block_cache_miss_stats,
uint64_t* block_cache_hit_stats, Statistics* statistics,
Cache* block_cache, const Slice& key, BlockType block_type,
GetContext* get_context) const {
auto cache_handle = block_cache->Lookup(key, statistics);
auto cache_handle = block_cache->Lookup(key, rep_->ioptions.statistics);
if (cache_handle != nullptr) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
PERF_COUNTER_BY_LEVEL_ADD(block_cache_hit_count, 1,
static_cast<uint32_t>(rep_->level));
if (get_context != nullptr) {
// overall cache hit
get_context->get_context_stats_.num_cache_hit++;
// total bytes read from cache
get_context->get_context_stats_.num_cache_bytes_read +=
block_cache->GetUsage(cache_handle);
// block-type specific cache hit
(*block_cache_hit_stats)++;
} else {
// overall cache hit
RecordTick(statistics, BLOCK_CACHE_HIT);
// total bytes read from cache
RecordTick(statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(cache_handle));
RecordTick(statistics, block_cache_hit_ticker);
}
UpdateCacheHitMetrics(block_type, get_context,
block_cache->GetUsage(cache_handle));
} else {
PERF_COUNTER_BY_LEVEL_ADD(block_cache_miss_count, 1,
static_cast<uint32_t>(rep_->level));
if (get_context != nullptr) {
// overall cache miss
get_context->get_context_stats_.num_cache_miss++;
// block-type specific cache miss
(*block_cache_miss_stats)++;
} else {
RecordTick(statistics, BLOCK_CACHE_MISS);
RecordTick(statistics, block_cache_miss_ticker);
}
UpdateCacheMissMetrics(block_type, get_context);
}
return cache_handle;
@ -1170,7 +1310,7 @@ Status BlockBasedTable::ReadRangeDelBlock(
ReadOptions read_options;
std::unique_ptr<InternalIterator> iter(NewDataBlockIterator<DataBlockIter>(
read_options, range_del_handle, nullptr /* input_iter */,
false /* is_index */, true /* key_includes_seq */,
BlockType::kRangeDeletion, true /* key_includes_seq */,
true /* index_key_is_full */, nullptr /* get_context */, Status(),
prefetch_buffer));
assert(iter != nullptr);
@ -1433,38 +1573,24 @@ Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, CachableEntry<Block>* block,
const UncompressionDict& uncompression_dict, bool is_index,
const UncompressionDict& uncompression_dict, BlockType block_type,
GetContext* get_context) const {
const size_t read_amp_bytes_per_bit =
!is_index ? rep_->table_options.read_amp_bytes_per_bit : 0;
block_type == BlockType::kData
? rep_->table_options.read_amp_bytes_per_bit
: 0;
assert(block);
assert(block->IsEmpty());
Status s;
BlockContents* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
Statistics* statistics = rep_->ioptions.statistics;
// Lookup uncompressed cache first
if (block_cache != nullptr) {
auto cache_handle = GetEntryFromCache(
block_cache, block_cache_key,
is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS,
is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT,
get_context
? (is_index ? &get_context->get_context_stats_.num_cache_index_miss
: &get_context->get_context_stats_.num_cache_data_miss)
: nullptr,
get_context
? (is_index ? &get_context->get_context_stats_.num_cache_index_hit
: &get_context->get_context_stats_.num_cache_data_hit)
: nullptr,
statistics, get_context);
auto cache_handle = GetEntryFromCache(block_cache, block_cache_key,
block_type, get_context);
if (cache_handle != nullptr) {
if (is_index) {
PERF_COUNTER_ADD(block_cache_index_hit_count, 1);
}
block->SetCachedValue(
reinterpret_cast<Block*>(block_cache->Value(cache_handle)),
block_cache, cache_handle);
@ -1482,6 +1608,9 @@ Status BlockBasedTable::GetDataBlockFromCache(
assert(!compressed_block_cache_key.empty());
block_cache_compressed_handle =
block_cache_compressed->Lookup(compressed_block_cache_key);
Statistics* statistics = rep_->ioptions.statistics;
// if we found in the compressed cache, then uncompress and insert into
// uncompressed cache
if (block_cache_compressed_handle == nullptr) {
@ -1508,7 +1637,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Insert uncompressed block into block cache
if (s.ok()) {
std::unique_ptr<Block> block_holder(
new Block(std::move(contents), rep_->get_global_seqno(is_index),
new Block(std::move(contents), rep_->get_global_seqno(block_type),
read_amp_bytes_per_bit, statistics)); // uncompressed block
if (block_cache != nullptr && block_holder->own_bytes() &&
@ -1526,32 +1655,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
block->SetCachedValue(block_holder.release(), block_cache,
cache_handle);
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
get_context->get_context_stats_.num_cache_bytes_write += charge;
} else {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge);
}
if (is_index) {
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_index_add++;
get_context->get_context_stats_.num_cache_index_bytes_insert +=
charge;
} else {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge);
}
} else {
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_data_add++;
get_context->get_context_stats_.num_cache_data_bytes_insert +=
charge;
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, charge);
}
}
UpdateCacheInsertionMetrics(block_type, get_context, charge);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
}
@ -1571,15 +1675,19 @@ Status BlockBasedTable::PutDataBlockToCache(
CachableEntry<Block>* cached_block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type,
const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
MemoryAllocator* memory_allocator, bool is_index,
MemoryAllocator* memory_allocator, BlockType block_type,
GetContext* get_context) const {
const ImmutableCFOptions& ioptions = rep_->ioptions;
const uint32_t format_version = rep_->table_options.format_version;
const size_t read_amp_bytes_per_bit =
!is_index ? rep_->table_options.read_amp_bytes_per_bit : 0;
block_type == BlockType::kData
? rep_->table_options.read_amp_bytes_per_bit
: 0;
const Cache::Priority priority =
is_index && rep_->table_options
.cache_index_and_filter_blocks_with_high_priority
rep_->table_options.cache_index_and_filter_blocks_with_high_priority &&
(block_type == BlockType::kFilter ||
block_type == BlockType::kCompressionDictionary ||
block_type == BlockType::kIndex)
? Cache::Priority::HIGH
: Cache::Priority::LOW;
assert(cached_block);
@ -1652,33 +1760,7 @@ Status BlockBasedTable::PutDataBlockToCache(
cached_block->SetCachedValue(block_holder.release(), block_cache,
cache_handle);
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
get_context->get_context_stats_.num_cache_bytes_write += charge;
} else {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, charge);
}
if (is_index) {
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_index_add++;
get_context->get_context_stats_.num_cache_index_bytes_insert +=
charge;
} else {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, charge);
}
} else {
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_data_add++;
get_context->get_context_stats_.num_cache_data_bytes_insert += charge;
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, charge);
}
}
assert(reinterpret_cast<Block*>(block_cache->Value(
cached_block->GetCacheHandle())) == cached_block->GetValue());
UpdateCacheInsertionMetrics(block_type, get_context, charge);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
}
@ -1798,18 +1880,11 @@ CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
auto key = GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
filter_blk_handle, cache_key);
Statistics* statistics = rep_->ioptions.statistics;
Cache::Handle* cache_handle = GetEntryFromCache(
block_cache, key, BLOCK_CACHE_FILTER_MISS, BLOCK_CACHE_FILTER_HIT,
get_context ? &get_context->get_context_stats_.num_cache_filter_miss
: nullptr,
get_context ? &get_context->get_context_stats_.num_cache_filter_hit
: nullptr,
statistics, get_context);
Cache::Handle* cache_handle =
GetEntryFromCache(block_cache, key, BlockType::kFilter, get_context);
FilterBlockReader* filter = nullptr;
if (cache_handle != nullptr) {
PERF_COUNTER_ADD(block_cache_filter_hit_count, 1);
filter =
reinterpret_cast<FilterBlockReader*>(block_cache->Value(cache_handle));
} else if (no_io) {
@ -1827,20 +1902,9 @@ CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
: Cache::Priority::LOW);
if (s.ok()) {
PERF_COUNTER_ADD(filter_block_read_count, 1);
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
get_context->get_context_stats_.num_cache_bytes_write += usage;
get_context->get_context_stats_.num_cache_filter_add++;
get_context->get_context_stats_.num_cache_filter_bytes_insert +=
usage;
} else {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usage);
RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, usage);
}
UpdateCacheInsertionMetrics(BlockType::kFilter, get_context, usage);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES);
delete filter;
return CachableEntry<FilterBlockReader>();
}
@ -1867,16 +1931,9 @@ CachableEntry<UncompressionDict> BlockBasedTable::GetUncompressionDict(
auto cache_key =
GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
rep_->compression_dict_handle, cache_key_buf);
auto cache_handle = GetEntryFromCache(
rep_->table_options.block_cache.get(), cache_key,
BLOCK_CACHE_COMPRESSION_DICT_MISS, BLOCK_CACHE_COMPRESSION_DICT_HIT,
get_context
? &get_context->get_context_stats_.num_cache_compression_dict_miss
: nullptr,
get_context
? &get_context->get_context_stats_.num_cache_compression_dict_hit
: nullptr,
rep_->ioptions.statistics, get_context);
auto cache_handle =
GetEntryFromCache(rep_->table_options.block_cache.get(), cache_key,
BlockType::kCompressionDictionary, get_context);
UncompressionDict* dict = nullptr;
if (cache_handle != nullptr) {
dict = reinterpret_cast<UncompressionDict*>(
@ -1887,43 +1944,31 @@ CachableEntry<UncompressionDict> BlockBasedTable::GetUncompressionDict(
std::unique_ptr<const BlockContents> compression_dict_block;
Status s =
ReadCompressionDictBlock(prefetch_buffer, &compression_dict_block);
size_t usage = 0;
if (s.ok()) {
assert(compression_dict_block != nullptr);
// TODO(ajkr): find a way to avoid the `compression_dict_block` data copy
dict = new UncompressionDict(compression_dict_block->data.ToString(),
rep_->blocks_definitely_zstd_compressed,
rep_->ioptions.statistics);
usage = dict->ApproximateMemoryUsage();
std::unique_ptr<UncompressionDict> uncompression_dict(
new UncompressionDict(compression_dict_block->data.ToString(),
rep_->blocks_definitely_zstd_compressed,
rep_->ioptions.statistics));
const size_t usage = uncompression_dict->ApproximateMemoryUsage();
s = rep_->table_options.block_cache->Insert(
cache_key, dict, usage, &DeleteCachedUncompressionDictEntry,
&cache_handle,
cache_key, uncompression_dict.get(), usage,
&DeleteCachedUncompressionDictEntry, &cache_handle,
rep_->table_options.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW);
}
if (s.ok()) {
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
get_context->get_context_stats_.num_cache_bytes_write += usage;
get_context->get_context_stats_.num_cache_compression_dict_add++;
get_context->get_context_stats_
.num_cache_compression_dict_bytes_insert += usage;
if (s.ok()) {
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
UpdateCacheInsertionMetrics(BlockType::kCompressionDictionary,
get_context, usage);
dict = uncompression_dict.release();
} else {
RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ADD);
RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_BYTES_WRITE, usage);
RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD);
RecordTick(rep_->ioptions.statistics,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, usage);
RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES);
assert(dict == nullptr);
assert(cache_handle == nullptr);
}
} else {
// There should be no way to get here if block cache insertion succeeded.
// Though it is still possible something failed earlier.
RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES);
delete dict;
dict = nullptr;
assert(cache_handle == nullptr);
}
}
return {dict, cache_handle ? rep_->table_options.block_cache.get() : nullptr,
@ -1951,7 +1996,7 @@ InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
template <typename TBlockIter>
TBlockIter* BlockBasedTable::NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
bool is_index, bool key_includes_seq, bool index_key_is_full,
BlockType block_type, bool key_includes_seq, bool index_key_is_full,
GetContext* get_context, Status s,
FilePrefetchBuffer* prefetch_buffer) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
@ -1972,7 +2017,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
CachableEntry<Block> block;
s = RetrieveBlock(prefetch_buffer, ro, handle, uncompression_dict, &block,
is_index, get_context);
block_type, get_context);
if (!s.ok()) {
assert(block.IsEmpty());
@ -2037,7 +2082,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, bool is_index,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context) const {
assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
@ -2070,7 +2115,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
}
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
ro, block_entry, uncompression_dict, is_index,
ro, block_entry, uncompression_dict, block_type,
get_context);
// Can't find the block from the cache. If I/O is allowed, read from the
@ -2095,14 +2140,14 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
}
if (s.ok()) {
SequenceNumber seq_no = rep_->get_global_seqno(is_index);
SequenceNumber seq_no = rep_->get_global_seqno(block_type);
// If filling cache is allowed and a cache is configured, try to put the
// block to the cache.
s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed,
block_entry, &raw_block_contents,
raw_block_comp_type, uncompression_dict, seq_no,
GetMemoryAllocator(rep_->table_options),
is_index, get_context);
block_type, get_context);
}
}
}
@ -2113,16 +2158,19 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
Status BlockBasedTable::RetrieveBlock(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, bool is_index,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context) const {
assert(block_entry);
assert(block_entry->IsEmpty());
Status s;
if (!is_index || rep_->table_options.cache_index_and_filter_blocks) {
if (rep_->table_options.cache_index_and_filter_blocks ||
(block_type != BlockType::kFilter &&
block_type != BlockType::kCompressionDictionary &&
block_type != BlockType::kIndex)) {
s = MaybeReadBlockAndLoadToCache(prefetch_buffer, ro, handle,
uncompression_dict, block_entry, is_index,
get_context);
uncompression_dict, block_entry,
block_type, get_context);
if (!s.ok()) {
return s;
@ -2150,8 +2198,10 @@ Status BlockBasedTable::RetrieveBlock(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
rep_->ioptions, rep_->blocks_maybe_compressed,
rep_->blocks_maybe_compressed, uncompression_dict,
rep_->persistent_cache_options, rep_->get_global_seqno(is_index),
!is_index ? rep_->table_options.read_amp_bytes_per_bit : 0,
rep_->persistent_cache_options, rep_->get_global_seqno(block_type),
block_type == BlockType::kData
? rep_->table_options.read_amp_bytes_per_bit
: 0,
GetMemoryAllocator(rep_->table_options));
}
@ -2178,18 +2228,13 @@ InternalIteratorBase<BlockHandle>*
BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
const BlockHandle& handle) {
// Return a block iterator on the index partition
auto rep = table_->get_rep();
auto block = block_map_->find(handle.offset());
// This is a possible scenario since block cache might not have had space
// for the partition
if (block != block_map_->end()) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_INDEX_HIT);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_HIT);
Cache* block_cache = rep->table_options.block_cache.get();
assert(block_cache);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(block->second.GetCacheHandle()));
auto rep = table_->get_rep();
assert(rep);
Statistics* kNullStats = nullptr;
// We don't return pinned data from index blocks, so no need
// to set `block_contents_pinned`.
@ -2531,7 +2576,7 @@ void BlockBasedTableIterator<TBlockIter, TValue>::InitDataBlock() {
Status s;
table_->NewDataBlockIterator<TBlockIter>(
read_options_, data_block_handle, &block_iter_, is_index_,
read_options_, data_block_handle, &block_iter_, block_type_,
key_includes_seq_, index_key_is_full_,
/* get_context */ nullptr, s, prefetch_buffer_.get());
block_iter_points_to_real_block_ = true;
@ -2623,7 +2668,6 @@ InternalIterator* BlockBasedTable::NewIterator(
Arena* arena, bool skip_filters, bool for_compaction) {
bool need_upper_bound_check =
PrefixExtractorChanged(rep_->table_properties.get(), prefix_extractor);
const bool kIsNotIndex = false;
if (arena == nullptr) {
return new BlockBasedTableIterator<DataBlockIter>(
this, read_options, rep_->internal_comparator,
@ -2633,7 +2677,7 @@ InternalIterator* BlockBasedTable::NewIterator(
rep_->index_type == BlockBasedTableOptions::kHashSearch),
!skip_filters && !read_options.total_order_seek &&
prefix_extractor != nullptr,
need_upper_bound_check, prefix_extractor, kIsNotIndex,
need_upper_bound_check, prefix_extractor, BlockType::kData,
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction);
} else {
auto* mem =
@ -2643,7 +2687,7 @@ InternalIterator* BlockBasedTable::NewIterator(
NewIndexIterator(read_options, need_upper_bound_check),
!skip_filters && !read_options.total_order_seek &&
prefix_extractor != nullptr,
need_upper_bound_check, prefix_extractor, kIsNotIndex,
need_upper_bound_check, prefix_extractor, BlockType::kData,
true /*key_includes_seq*/, true /*index_key_is_full*/, for_compaction);
}
}
@ -2780,7 +2824,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
} else {
DataBlockIter biter;
NewDataBlockIterator<DataBlockIter>(
read_options, iiter->value(), &biter, false,
read_options, iiter->value(), &biter, BlockType::kData,
true /* key_includes_seq */, true /* index_key_is_full */,
get_context);
@ -2893,7 +2937,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) {
DataBlockIter biter;
NewDataBlockIterator<DataBlockIter>(
read_options, iiter->value(), &biter, false,
read_options, iiter->value(), &biter, BlockType::kData,
true /* key_includes_seq */, get_context);
if (read_options.read_tier == kBlockCacheTier &&

@ -25,6 +25,7 @@
#include "rocksdb/table.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_type.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/filter_block.h"
#include "table/format.h"
@ -220,8 +221,8 @@ class BlockBasedTable : public TableReader {
// input_iter: if it is not null, update this one and return it as Iterator
template <typename TBlockIter>
TBlockIter* NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& block_hanlde,
TBlockIter* input_iter = nullptr, bool is_index = false,
const ReadOptions& ro, const BlockHandle& block_handle,
TBlockIter* input_iter = nullptr, BlockType block_type = BlockType::kData,
bool key_includes_seq = true, bool index_key_is_full = true,
GetContext* get_context = nullptr, Status s = Status(),
FilePrefetchBuffer* prefetch_buffer = nullptr) const;
@ -238,12 +239,14 @@ class BlockBasedTable : public TableReader {
friend class MockedBlockBasedTable;
static std::atomic<uint64_t> next_cache_key_id_;
void UpdateCacheHitMetrics(BlockType block_type, GetContext* get_context,
size_t usage) const;
void UpdateCacheMissMetrics(BlockType block_type,
GetContext* get_context) const;
void UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context, size_t usage) const;
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker,
uint64_t* block_cache_miss_stats,
uint64_t* block_cache_hit_stats,
Statistics* statistics,
BlockType block_type,
GetContext* get_context) const;
// If block cache enabled (compressed or uncompressed), looks for the block
@ -258,7 +261,7 @@ class BlockBasedTable : public TableReader {
Status MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, bool is_index = false,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context = nullptr) const;
// Similar to the above, with one crucial difference: it will retrieve the
@ -267,7 +270,7 @@ class BlockBasedTable : public TableReader {
Status RetrieveBlock(FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& ro, const BlockHandle& handle,
const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, bool is_index,
CachableEntry<Block>* block_entry, BlockType block_type,
GetContext* get_context) const;
// For the following two functions:
@ -311,7 +314,7 @@ class BlockBasedTable : public TableReader {
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, CachableEntry<Block>* block,
const UncompressionDict& uncompression_dict, bool is_index = false,
const UncompressionDict& uncompression_dict, BlockType block_type,
GetContext* get_context = nullptr) const;
// Put a raw block (maybe compressed) to the corresponding block caches.
@ -324,16 +327,14 @@ class BlockBasedTable : public TableReader {
// PutDataBlockToCache(). After the call, the object will be invalid.
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
Status PutDataBlockToCache(const Slice& block_cache_key,
const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
CachableEntry<Block>* cached_block,
BlockContents* raw_block_contents,
CompressionType raw_block_comp_type,
const UncompressionDict& uncompression_dict,
SequenceNumber seq_no,
MemoryAllocator* memory_allocator, bool is_index,
GetContext* get_context) const;
Status PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
CachableEntry<Block>* cached_block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type,
const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
MemoryAllocator* memory_allocator, BlockType block_type,
GetContext* get_context) const;
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
@ -546,8 +547,12 @@ struct BlockBasedTable::Rep {
bool closed = false;
const bool immortal_table;
SequenceNumber get_global_seqno(bool is_index) const {
return is_index ? kDisableGlobalSequenceNumber : global_seqno;
SequenceNumber get_global_seqno(BlockType block_type) const {
return (block_type == BlockType::kFilter ||
block_type == BlockType::kIndex ||
block_type == BlockType::kCompressionDictionary)
? kDisableGlobalSequenceNumber
: global_seqno;
}
};
@ -560,8 +565,8 @@ class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
const InternalKeyComparator& icomp,
InternalIteratorBase<BlockHandle>* index_iter,
bool check_filter, bool need_upper_bound_check,
const SliceTransform* prefix_extractor, bool is_index,
bool key_includes_seq = true,
const SliceTransform* prefix_extractor,
BlockType block_type, bool key_includes_seq = true,
bool index_key_is_full = true,
bool for_compaction = false)
: InternalIteratorBase<TValue>(false),
@ -575,7 +580,7 @@ class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
check_filter_(check_filter),
need_upper_bound_check_(need_upper_bound_check),
prefix_extractor_(prefix_extractor),
is_index_(is_index),
block_type_(block_type),
key_includes_seq_(key_includes_seq),
index_key_is_full_(index_key_is_full),
for_compaction_(for_compaction) {}
@ -690,8 +695,7 @@ class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
// TODO(Zhongyi): pick a better name
bool need_upper_bound_check_;
const SliceTransform* prefix_extractor_;
// If the blocks over which we iterate are index blocks
bool is_index_;
BlockType block_type_;
// If the keys in the blocks over which we iterate include 8 byte sequence
bool key_includes_seq_;
bool index_key_is_full_;

@ -0,0 +1,24 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
namespace rocksdb {
// Represents the types of blocks used in the block based table format.
// See https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format
// for details.
enum class BlockType : uint8_t {
kData,
kFilter,
kProperties,
kCompressionDictionary,
kRangeDeletion,
kMetaIndex,
kIndex,
};
} // namespace rocksdb

@ -243,11 +243,6 @@ PartitionedFilterBlockReader::GetFilterPartition(
// This is a possible scenario since block cache might not have had space
// for the partition
if (iter != filter_map_.end()) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
RecordTick(statistics(), BLOCK_CACHE_FILTER_HIT);
RecordTick(statistics(), BLOCK_CACHE_HIT);
RecordTick(statistics(), BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(iter->second.GetCacheHandle()));
return {iter->second.GetValue(), nullptr /* cache */,
nullptr /* cache_handle */, false /* own_value */};
}

Loading…
Cancel
Save