Reduce heavy hitter for Get operation

Summary:
This PR addresses the following heavy hitters in `Get` operation by moving calls to `StatisticsImpl::recordTick` from `BlockBasedTable` to `Version::Get`

- rocksdb.block.cache.bytes.write
- rocksdb.block.cache.add
- rocksdb.block.cache.data.miss
- rocksdb.block.cache.data.bytes.insert
- rocksdb.block.cache.data.add
- rocksdb.block.cache.hit
- rocksdb.block.cache.data.hit
- rocksdb.block.cache.bytes.read

The db_bench statistics before and after the change are:

|1GB block read|Children      |Self  |Command          |Shared Object        |Symbol|
|---|---|---|---|---|---|
|master:     |4.22%     |1.31%  |db_bench  |db_bench  |[.] rocksdb::StatisticsImpl::recordTick|
|updated:    |0.51%     |0.21%  |db_bench  |db_bench  |[.] rocksdb::StatisticsImpl::recordTick|
|     	     |0.14%     |0.14%  |db_bench  |db_bench  |[.] rocksdb::GetContext::record_counters|

|1MB block read|Children      |Self  |Command          |Shared Object        |Symbol|
|---|---|---|---|---|---|
|master:    |3.48%     |1.08%  |db_bench  |db_bench  |[.] rocksdb::StatisticsImpl::recordTick|
|updated:    |0.80%     |0.31%  |db_bench  |db_bench  |[.] rocksdb::StatisticsImpl::recordTick|
|    	     |0.35%     |0.35%  |db_bench  |db_bench  |[.] rocksdb::GetContext::record_counters|
Closes https://github.com/facebook/rocksdb/pull/3172

Differential Revision: D6330532

Pulled By: miasantreble

fbshipit-source-id: 2b492959e00a3db29e9437ecdcc5e48ca4ec5741
main
Zhongyi Xie 7 years ago committed by Facebook Github Bot
parent 9089373a01
commit 51c2ea0feb
  1. 20
      db/version_set.cc
  2. 209
      table/block_based_table_reader.cc
  3. 34
      table/block_based_table_reader.h
  4. 7
      table/get_context.cc
  5. 4
      table/get_context.h
  6. 6
      table/partitioned_filter_block.cc
  7. 3
      table/partitioned_filter_block_test.cc

@ -980,10 +980,12 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_, storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
user_comparator(), internal_comparator()); user_comparator(), internal_comparator());
FdWithKeyRange* f = fp.GetNextFile(); FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) { while (f != nullptr) {
if (get_context.sample()) { if (get_context.sample()) {
sample_file_read_inc(f->file_metadata); sample_file_read_inc(f->file_metadata);
} }
*status = table_cache_->Get( *status = table_cache_->Get(
read_options, *internal_comparator(), f->fd, ikey, &get_context, read_options, *internal_comparator(), f->fd, ikey, &get_context,
cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()), cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
@ -995,10 +997,21 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
return; return;
} }
// report the counters before returning
if (get_context.State() != GetContext::kNotFound &&
get_context.State() != GetContext::kMerge) {
for (uint32_t t = 0; t < Tickers::TICKER_ENUM_MAX; t++) {
if (get_context.tickers_value[t] > 0) {
RecordTick(db_statistics_, t, get_context.tickers_value[t]);
}
}
}
switch (get_context.State()) { switch (get_context.State()) {
case GetContext::kNotFound: case GetContext::kNotFound:
// Keep searching in other files // Keep searching in other files
break; break;
case GetContext::kMerge:
break;
case GetContext::kFound: case GetContext::kFound:
if (fp.GetHitFileLevel() == 0) { if (fp.GetHitFileLevel() == 0) {
RecordTick(db_statistics_, GET_HIT_L0); RecordTick(db_statistics_, GET_HIT_L0);
@ -1015,8 +1028,6 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
case GetContext::kCorrupt: case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key); *status = Status::Corruption("corrupted key for ", user_key);
return; return;
case GetContext::kMerge:
break;
case GetContext::kBlobIndex: case GetContext::kBlobIndex:
ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
*status = Status::NotSupported( *status = Status::NotSupported(
@ -1027,6 +1038,11 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
f = fp.GetNextFile(); f = fp.GetNextFile();
} }
for (uint32_t t = 0; t < Tickers::TICKER_ENUM_MAX; t++) {
if (get_context.tickers_value[t] > 0) {
RecordTick(db_statistics_, t, get_context.tickers_value[t]);
}
}
if (GetContext::kMerge == get_context.State()) { if (GetContext::kMerge == get_context.State()) {
if (!merge_operator_) { if (!merge_operator_) {
*status = Status::InvalidArgument( *status = Status::InvalidArgument(

@ -128,22 +128,37 @@ Slice GetCacheKeyFromOffset(const char* cache_key_prefix,
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
Tickers block_cache_miss_ticker, Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker, Tickers block_cache_hit_ticker,
Statistics* statistics) { Statistics* statistics,
GetContext* get_context) {
auto cache_handle = block_cache->Lookup(key, statistics); auto cache_handle = block_cache->Lookup(key, statistics);
if (cache_handle != nullptr) { if (cache_handle != nullptr) {
PERF_COUNTER_ADD(block_cache_hit_count, 1); PERF_COUNTER_ADD(block_cache_hit_count, 1);
// overall cache hit if (get_context != nullptr) {
RecordTick(statistics, BLOCK_CACHE_HIT); // overall cache hit
// total bytes read from cache get_context->RecordCounters(BLOCK_CACHE_HIT, 1);
RecordTick(statistics, BLOCK_CACHE_BYTES_READ, // total bytes read from cache
block_cache->GetUsage(cache_handle)); get_context->RecordCounters(BLOCK_CACHE_BYTES_READ,
// block-type specific cache hit block_cache->GetUsage(cache_handle));
RecordTick(statistics, block_cache_hit_ticker); // block-type specific cache hit
get_context->RecordCounters(block_cache_hit_ticker, 1);
} else {
// overall cache hit
RecordTick(statistics, BLOCK_CACHE_HIT);
// total bytes read from cache
RecordTick(statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(cache_handle));
RecordTick(statistics, block_cache_hit_ticker);
}
} else { } else {
// overall cache miss if (get_context != nullptr) {
RecordTick(statistics, BLOCK_CACHE_MISS); // overall cache miss
// block-type specific cache miss get_context->RecordCounters(BLOCK_CACHE_MISS, 1);
RecordTick(statistics, block_cache_miss_ticker); // block-type specific cache miss
get_context->RecordCounters(block_cache_miss_ticker, 1);
} else {
RecordTick(statistics, BLOCK_CACHE_MISS);
RecordTick(statistics, block_cache_miss_ticker);
}
} }
return cache_handle; return cache_handle;
@ -255,9 +270,11 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
compression_dict = rep->compression_dict_block->data; compression_dict = rep->compression_dict_block->data;
} }
const bool is_index = true; const bool is_index = true;
s = table_->MaybeLoadDataBlockToCache(prefetch_buffer.get(), rep, ro, // TODO: Support counter batch update for partitioned index and
handle, compression_dict, &block, // filter blocks
is_index); s = table_->MaybeLoadDataBlockToCache(
prefetch_buffer.get(), rep, ro, handle, compression_dict, &block,
is_index, nullptr /* get_context */);
assert(s.ok() || block.value == nullptr); assert(s.ok() || block.value == nullptr);
if (s.ok() && block.value != nullptr) { if (s.ok() && block.value != nullptr) {
@ -784,7 +801,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
ReadOptions read_options; ReadOptions read_options;
s = MaybeLoadDataBlockToCache( s = MaybeLoadDataBlockToCache(
prefetch_buffer.get(), rep, read_options, rep->range_del_handle, prefetch_buffer.get(), rep, read_options, rep->range_del_handle,
Slice() /* compression_dict */, &rep->range_del_entry); Slice() /* compression_dict */, &rep->range_del_entry,
false /* is_index */, nullptr /* get_context */);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
rep->ioptions.info_log, rep->ioptions.info_log,
@ -960,8 +978,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
Cache* block_cache, Cache* block_cache_compressed, Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options, const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version, BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
bool is_index) { GetContext* get_context) {
Status s; Status s;
Block* compressed_block = nullptr; Block* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr;
@ -972,7 +990,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
block->cache_handle = GetEntryFromCache( block->cache_handle = GetEntryFromCache(
block_cache, block_cache_key, block_cache, block_cache_key,
is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS, is_index ? BLOCK_CACHE_INDEX_MISS : BLOCK_CACHE_DATA_MISS,
is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, statistics); is_index ? BLOCK_CACHE_INDEX_HIT : BLOCK_CACHE_DATA_HIT, statistics,
get_context);
if (block->cache_handle != nullptr) { if (block->cache_handle != nullptr) {
block->value = block->value =
reinterpret_cast<Block*>(block_cache->Value(block->cache_handle)); reinterpret_cast<Block*>(block_cache->Value(block->cache_handle));
@ -1025,18 +1044,36 @@ Status BlockBasedTable::GetDataBlockFromCache(
block_cache->TEST_mark_as_data_block(block_cache_key, block_cache->TEST_mark_as_data_block(block_cache_key,
block->value->usable_size()); block->value->usable_size());
if (s.ok()) { if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD); if (get_context != nullptr) {
if (is_index) { get_context->RecordCounters(BLOCK_CACHE_ADD, 1);
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); get_context->RecordCounters(BLOCK_CACHE_BYTES_WRITE,
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, block->value->usable_size());
block->value->usable_size());
} else { } else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD); RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
block->value->usable_size()); block->value->usable_size());
} }
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, if (is_index) {
block->value->usable_size()); if (get_context != nullptr) {
get_context->RecordCounters(BLOCK_CACHE_INDEX_ADD, 1);
get_context->RecordCounters(BLOCK_CACHE_INDEX_BYTES_INSERT,
block->value->usable_size());
} else {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
block->value->usable_size());
}
} else {
if (get_context != nullptr) {
get_context->RecordCounters(BLOCK_CACHE_DATA_ADD, 1);
get_context->RecordCounters(BLOCK_CACHE_DATA_BYTES_INSERT,
block->value->usable_size());
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
block->value->usable_size());
}
}
} else { } else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete block->value; delete block->value;
@ -1056,7 +1093,7 @@ Status BlockBasedTable::PutDataBlockToCache(
const ReadOptions& read_options, const ImmutableCFOptions& ioptions, const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version, CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index, const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
Cache::Priority priority) { Cache::Priority priority, GetContext* get_context) {
assert(raw_block->compression_type() == kNoCompression || assert(raw_block->compression_type() == kNoCompression ||
block_cache_compressed != nullptr); block_cache_compressed != nullptr);
@ -1109,18 +1146,36 @@ Status BlockBasedTable::PutDataBlockToCache(
block->value->usable_size()); block->value->usable_size());
if (s.ok()) { if (s.ok()) {
assert(block->cache_handle != nullptr); assert(block->cache_handle != nullptr);
RecordTick(statistics, BLOCK_CACHE_ADD); if (get_context != nullptr) {
if (is_index) { get_context->RecordCounters(BLOCK_CACHE_ADD, 1);
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); get_context->RecordCounters(BLOCK_CACHE_BYTES_WRITE,
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, block->value->usable_size());
block->value->usable_size());
} else { } else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD); RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE,
block->value->usable_size()); block->value->usable_size());
} }
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, if (is_index) {
block->value->usable_size()); if (get_context != nullptr) {
get_context->RecordCounters(BLOCK_CACHE_INDEX_ADD, 1);
get_context->RecordCounters(BLOCK_CACHE_INDEX_BYTES_INSERT,
block->value->usable_size());
} else {
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT,
block->value->usable_size());
}
} else {
if (get_context != nullptr) {
get_context->RecordCounters(BLOCK_CACHE_DATA_ADD, 1);
get_context->RecordCounters(BLOCK_CACHE_DATA_BYTES_INSERT,
block->value->usable_size());
} else {
RecordTick(statistics, BLOCK_CACHE_DATA_ADD);
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT,
block->value->usable_size());
}
}
assert(reinterpret_cast<Block*>( assert(reinterpret_cast<Block*>(
block_cache->Value(block->cache_handle)) == block->value); block_cache->Value(block->cache_handle)) == block->value);
} else { } else {
@ -1198,16 +1253,18 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
} }
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter( BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
FilePrefetchBuffer* prefetch_buffer, bool no_io) const { FilePrefetchBuffer* prefetch_buffer, bool no_io,
GetContext* get_context) const {
const BlockHandle& filter_blk_handle = rep_->filter_handle; const BlockHandle& filter_blk_handle = rep_->filter_handle;
const bool is_a_filter_partition = true; const bool is_a_filter_partition = true;
return GetFilter(prefetch_buffer, filter_blk_handle, !is_a_filter_partition, return GetFilter(prefetch_buffer, filter_blk_handle, !is_a_filter_partition,
no_io); no_io, get_context);
} }
BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter( BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle, FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle,
const bool is_a_filter_partition, bool no_io) const { const bool is_a_filter_partition, bool no_io,
GetContext* get_context) const {
// If cache_index_and_filter_blocks is false, filter should be pre-populated. // If cache_index_and_filter_blocks is false, filter should be pre-populated.
// We will return rep_->filter anyway. rep_->filter can be nullptr if filter // We will return rep_->filter anyway. rep_->filter can be nullptr if filter
// read fails at Open() time. We don't want to reload again since it will // read fails at Open() time. We don't want to reload again since it will
@ -1237,7 +1294,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
Statistics* statistics = rep_->ioptions.statistics; Statistics* statistics = rep_->ioptions.statistics;
auto cache_handle = auto cache_handle =
GetEntryFromCache(block_cache, key, BLOCK_CACHE_FILTER_MISS, GetEntryFromCache(block_cache, key, BLOCK_CACHE_FILTER_MISS,
BLOCK_CACHE_FILTER_HIT, statistics); BLOCK_CACHE_FILTER_HIT, statistics, get_context);
FilterBlockReader* filter = nullptr; FilterBlockReader* filter = nullptr;
if (cache_handle != nullptr) { if (cache_handle != nullptr) {
@ -1257,10 +1314,19 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
? Cache::Priority::HIGH ? Cache::Priority::HIGH
: Cache::Priority::LOW); : Cache::Priority::LOW);
if (s.ok()) { if (s.ok()) {
RecordTick(statistics, BLOCK_CACHE_ADD); if (get_context != nullptr) {
RecordTick(statistics, BLOCK_CACHE_FILTER_ADD); get_context->RecordCounters(BLOCK_CACHE_ADD, 1);
RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT, filter->size()); get_context->RecordCounters(BLOCK_CACHE_BYTES_WRITE, filter->size());
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter->size()); get_context->RecordCounters(BLOCK_CACHE_FILTER_ADD, 1);
get_context->RecordCounters(BLOCK_CACHE_FILTER_BYTES_INSERT,
filter->size());
} else {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, filter->size());
RecordTick(statistics, BLOCK_CACHE_FILTER_ADD);
RecordTick(statistics, BLOCK_CACHE_FILTER_BYTES_INSERT,
filter->size());
}
} else { } else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES); RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete filter; delete filter;
@ -1274,7 +1340,7 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
InternalIterator* BlockBasedTable::NewIndexIterator( InternalIterator* BlockBasedTable::NewIndexIterator(
const ReadOptions& read_options, BlockIter* input_iter, const ReadOptions& read_options, BlockIter* input_iter,
CachableEntry<IndexReader>* index_entry) { CachableEntry<IndexReader>* index_entry, GetContext* get_context) {
// index reader has already been pre-populated. // index reader has already been pre-populated.
if (rep_->index_reader) { if (rep_->index_reader) {
return rep_->index_reader->NewIterator( return rep_->index_reader->NewIterator(
@ -1297,7 +1363,7 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
Statistics* statistics = rep_->ioptions.statistics; Statistics* statistics = rep_->ioptions.statistics;
auto cache_handle = auto cache_handle =
GetEntryFromCache(block_cache, key, BLOCK_CACHE_INDEX_MISS, GetEntryFromCache(block_cache, key, BLOCK_CACHE_INDEX_MISS,
BLOCK_CACHE_INDEX_HIT, statistics); BLOCK_CACHE_INDEX_HIT, statistics, get_context);
if (cache_handle == nullptr && no_io) { if (cache_handle == nullptr && no_io) {
if (input_iter != nullptr) { if (input_iter != nullptr) {
@ -1332,10 +1398,15 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
if (s.ok()) { if (s.ok()) {
size_t usable_size = index_reader->usable_size(); size_t usable_size = index_reader->usable_size();
RecordTick(statistics, BLOCK_CACHE_ADD); if (get_context != nullptr) {
get_context->RecordCounters(BLOCK_CACHE_ADD, 1);
get_context->RecordCounters(BLOCK_CACHE_BYTES_WRITE, usable_size);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usable_size);
}
RecordTick(statistics, BLOCK_CACHE_INDEX_ADD); RecordTick(statistics, BLOCK_CACHE_INDEX_ADD);
RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, usable_size); RecordTick(statistics, BLOCK_CACHE_INDEX_BYTES_INSERT, usable_size);
RecordTick(statistics, BLOCK_CACHE_BYTES_WRITE, usable_size);
} else { } else {
if (index_reader != nullptr) { if (index_reader != nullptr) {
delete index_reader; delete index_reader;
@ -1369,13 +1440,14 @@ InternalIterator* BlockBasedTable::NewIndexIterator(
InternalIterator* BlockBasedTable::NewDataBlockIterator( InternalIterator* BlockBasedTable::NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const Slice& index_value, Rep* rep, const ReadOptions& ro, const Slice& index_value,
BlockIter* input_iter, bool is_index) { BlockIter* input_iter, bool is_index, GetContext* get_context) {
BlockHandle handle; BlockHandle handle;
Slice input = index_value; Slice input = index_value;
// We intentionally allow extra stuff in index_value so that we // We intentionally allow extra stuff in index_value so that we
// can add more features in the future. // can add more features in the future.
Status s = handle.DecodeFrom(&input); Status s = handle.DecodeFrom(&input);
return NewDataBlockIterator(rep, ro, handle, input_iter, is_index, s); return NewDataBlockIterator(rep, ro, handle, input_iter, is_index,
get_context, s);
} }
// Convert an index iterator value (i.e., an encoded BlockHandle) // Convert an index iterator value (i.e., an encoded BlockHandle)
@ -1384,7 +1456,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
// If input_iter is not null, update this iter and return it // If input_iter is not null, update this iter and return it
InternalIterator* BlockBasedTable::NewDataBlockIterator( InternalIterator* BlockBasedTable::NewDataBlockIterator(
Rep* rep, const ReadOptions& ro, const BlockHandle& handle, Rep* rep, const ReadOptions& ro, const BlockHandle& handle,
BlockIter* input_iter, bool is_index, Status s) { BlockIter* input_iter, bool is_index, GetContext* get_context, Status s) {
PERF_TIMER_GUARD(new_table_block_iter_nanos); PERF_TIMER_GUARD(new_table_block_iter_nanos);
const bool no_io = (ro.read_tier == kBlockCacheTier); const bool no_io = (ro.read_tier == kBlockCacheTier);
@ -1396,7 +1468,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
compression_dict = rep->compression_dict_block->data; compression_dict = rep->compression_dict_block->data;
} }
s = MaybeLoadDataBlockToCache(nullptr /*prefetch_buffer*/, rep, ro, handle, s = MaybeLoadDataBlockToCache(nullptr /*prefetch_buffer*/, rep, ro, handle,
compression_dict, &block, is_index); compression_dict, &block, is_index,
get_context);
} }
// Didn't get any data from block caches. // Didn't get any data from block caches.
@ -1447,7 +1520,7 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator(
Status BlockBasedTable::MaybeLoadDataBlockToCache( Status BlockBasedTable::MaybeLoadDataBlockToCache(
FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro,
const BlockHandle& handle, Slice compression_dict, const BlockHandle& handle, Slice compression_dict,
CachableEntry<Block>* block_entry, bool is_index) { CachableEntry<Block>* block_entry, bool is_index, GetContext* get_context) {
assert(block_entry != nullptr); assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier); const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->table_options.block_cache.get(); Cache* block_cache = rep->table_options.block_cache.get();
@ -1478,7 +1551,7 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
s = GetDataBlockFromCache( s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro, key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
block_entry, rep->table_options.format_version, compression_dict, block_entry, rep->table_options.format_version, compression_dict,
rep->table_options.read_amp_bytes_per_bit, is_index); rep->table_options.read_amp_bytes_per_bit, is_index, get_context);
if (block_entry->value == nullptr && !no_io && ro.fill_cache) { if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block; std::unique_ptr<Block> raw_block;
@ -1497,11 +1570,11 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
block_entry, raw_block.release(), rep->table_options.format_version, block_entry, raw_block.release(), rep->table_options.format_version,
compression_dict, rep->table_options.read_amp_bytes_per_bit, compression_dict, rep->table_options.read_amp_bytes_per_bit,
is_index, is_index,
is_index && is_index && rep->table_options
rep->table_options .cache_index_and_filter_blocks_with_high_priority
.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH ? Cache::Priority::HIGH
: Cache::Priority::LOW); : Cache::Priority::LOW,
get_context);
} }
} }
} }
@ -1545,8 +1618,9 @@ BlockBasedTable::BlockEntryIteratorState::NewSecondaryIterator(
&rep->internal_comparator, nullptr, true, rep->ioptions.statistics); &rep->internal_comparator, nullptr, true, rep->ioptions.statistics);
} }
} }
return NewDataBlockIterator(rep, read_options_, handle, nullptr, is_index_, return NewDataBlockIterator(rep, read_options_, handle,
s); /* input_iter */ nullptr, is_index_,
/* get_context */ nullptr, s);
} }
bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch( bool BlockBasedTable::BlockEntryIteratorState::PrefixMayMatch(
@ -1740,8 +1814,9 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
const bool no_io = read_options.read_tier == kBlockCacheTier; const bool no_io = read_options.read_tier == kBlockCacheTier;
CachableEntry<FilterBlockReader> filter_entry; CachableEntry<FilterBlockReader> filter_entry;
if (!skip_filters) { if (!skip_filters) {
filter_entry = GetFilter(/*prefetch_buffer*/ nullptr, filter_entry =
read_options.read_tier == kBlockCacheTier); GetFilter(/*prefetch_buffer*/ nullptr,
read_options.read_tier == kBlockCacheTier, get_context);
} }
FilterBlockReader* filter = filter_entry.value; FilterBlockReader* filter = filter_entry.value;
@ -1751,7 +1826,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL); RecordTick(rep_->ioptions.statistics, BLOOM_FILTER_USEFUL);
} else { } else {
BlockIter iiter_on_stack; BlockIter iiter_on_stack;
auto iiter = NewIndexIterator(read_options, &iiter_on_stack); auto iiter = NewIndexIterator(read_options, &iiter_on_stack,
/* index_entry */ nullptr, get_context);
std::unique_ptr<InternalIterator> iiter_unique_ptr; std::unique_ptr<InternalIterator> iiter_unique_ptr;
if (iiter != &iiter_on_stack) { if (iiter != &iiter_on_stack) {
iiter_unique_ptr.reset(iiter); iiter_unique_ptr.reset(iiter);
@ -1775,7 +1851,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
break; break;
} else { } else {
BlockIter biter; BlockIter biter;
NewDataBlockIterator(rep_, read_options, iiter->value(), &biter); NewDataBlockIterator(rep_, read_options, iiter->value(), &biter, false,
get_context);
if (read_options.read_tier == kBlockCacheTier && if (read_options.read_tier == kBlockCacheTier &&
biter.status().IsIncomplete()) { biter.status().IsIncomplete()) {

@ -215,15 +215,14 @@ class BlockBasedTable : public TableReader {
private: private:
friend class MockedBlockBasedTable; friend class MockedBlockBasedTable;
// input_iter: if it is not null, update this one and return it as Iterator // input_iter: if it is not null, update this one and return it as Iterator
static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, static InternalIterator* NewDataBlockIterator(
const Slice& index_value, Rep* rep, const ReadOptions& ro, const Slice& index_value,
BlockIter* input_iter = nullptr, BlockIter* input_iter = nullptr, bool is_index = false,
bool is_index = false); GetContext* get_context = nullptr);
static InternalIterator* NewDataBlockIterator(Rep* rep, const ReadOptions& ro, static InternalIterator* NewDataBlockIterator(
const BlockHandle& block_hanlde, Rep* rep, const ReadOptions& ro, const BlockHandle& block_hanlde,
BlockIter* input_iter = nullptr, BlockIter* input_iter = nullptr, bool is_index = false,
bool is_index = false, GetContext* get_context = nullptr, Status s = Status());
Status s = Status());
// If block cache enabled (compressed or uncompressed), looks for the block // If block cache enabled (compressed or uncompressed), looks for the block
// identified by handle in (1) uncompressed cache, (2) compressed cache, and // identified by handle in (1) uncompressed cache, (2) compressed cache, and
// then (3) file. If found, inserts into the cache(s) that were searched // then (3) file. If found, inserts into the cache(s) that were searched
@ -238,16 +237,19 @@ class BlockBasedTable : public TableReader {
const BlockHandle& handle, const BlockHandle& handle,
Slice compression_dict, Slice compression_dict,
CachableEntry<Block>* block_entry, CachableEntry<Block>* block_entry,
bool is_index = false); bool is_index = false,
GetContext* get_context = nullptr);
// For the following two functions: // For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file // if `no_io == true`, we will not try to read filter/index from sst file
// were they not present in cache yet. // were they not present in cache yet.
CachableEntry<FilterBlockReader> GetFilter( CachableEntry<FilterBlockReader> GetFilter(
FilePrefetchBuffer* prefetch_buffer = nullptr, bool no_io = false) const; FilePrefetchBuffer* prefetch_buffer = nullptr, bool no_io = false,
GetContext* get_context = nullptr) const;
virtual CachableEntry<FilterBlockReader> GetFilter( virtual CachableEntry<FilterBlockReader> GetFilter(
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle, FilePrefetchBuffer* prefetch_buffer, const BlockHandle& filter_blk_handle,
const bool is_a_filter_partition, bool no_io) const; const bool is_a_filter_partition, bool no_io,
GetContext* get_context) const;
// Get the iterator from the index reader. // Get the iterator from the index reader.
// If input_iter is not set, return new Iterator // If input_iter is not set, return new Iterator
@ -261,7 +263,8 @@ class BlockBasedTable : public TableReader {
// kBlockCacheTier // kBlockCacheTier
InternalIterator* NewIndexIterator( InternalIterator* NewIndexIterator(
const ReadOptions& read_options, BlockIter* input_iter = nullptr, const ReadOptions& read_options, BlockIter* input_iter = nullptr,
CachableEntry<IndexReader>* index_entry = nullptr); CachableEntry<IndexReader>* index_entry = nullptr,
GetContext* get_context = nullptr);
// Read block cache from block caches (if set): block_cache and // Read block cache from block caches (if set): block_cache and
// block_cache_compressed. // block_cache_compressed.
@ -275,7 +278,7 @@ class BlockBasedTable : public TableReader {
const ImmutableCFOptions& ioptions, const ReadOptions& read_options, const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version, BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false); bool is_index = false, GetContext* get_context = nullptr);
// Put a raw block (maybe compressed) to the corresponding block caches. // Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then // This method will perform decompression against raw_block if needed and then
@ -293,7 +296,8 @@ class BlockBasedTable : public TableReader {
const ReadOptions& read_options, const ImmutableCFOptions& ioptions, const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version, CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW); bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
GetContext* get_context = nullptr);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false. // after a call to Seek(key), until handle_result returns false.

@ -87,6 +87,13 @@ void GetContext::SaveValue(const Slice& value, SequenceNumber seq) {
} }
} }
void GetContext::RecordCounters(Tickers ticker, size_t val) {
if (ticker == Tickers::TICKER_ENUM_MAX) {
return;
}
tickers_value[ticker] += static_cast<uint64_t>(val);
}
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
const Slice& value, Cleanable* value_pinner) { const Slice& value, Cleanable* value_pinner) {
assert((state_ != kMerge && parsed_key.type != kTypeMerge) || assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||

@ -9,6 +9,7 @@
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator.h"
#include "db/read_callback.h" #include "db/read_callback.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "table/block.h" #include "table/block.h"
@ -26,6 +27,7 @@ class GetContext {
kMerge, // saver contains the current merge result (the operands) kMerge, // saver contains the current merge result (the operands)
kBlobIndex, kBlobIndex,
}; };
uint64_t tickers_value[Tickers::TICKER_ENUM_MAX] = {0};
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator, GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics, GetState init_state, Logger* logger, Statistics* statistics, GetState init_state,
@ -72,6 +74,8 @@ class GetContext {
return true; return true;
} }
void RecordCounters(Tickers ticker, size_t val);
private: private:
const Comparator* ucmp_; const Comparator* ucmp_;
const MergeOperator* merge_operator_; const MergeOperator* merge_operator_;

@ -231,7 +231,8 @@ PartitionedFilterBlockReader::GetFilterPartition(
} }
} }
return table_->GetFilter(/*prefetch_buffer*/ nullptr, fltr_blk_handle, return table_->GetFilter(/*prefetch_buffer*/ nullptr, fltr_blk_handle,
is_a_filter_partition, no_io); is_a_filter_partition, no_io,
/* get_context */ nullptr);
} else { } else {
auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle, auto filter = table_->ReadFilter(prefetch_buffer, fltr_blk_handle,
is_a_filter_partition); is_a_filter_partition);
@ -295,7 +296,8 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
const bool no_io = true; const bool no_io = true;
const bool is_a_filter_partition = true; const bool is_a_filter_partition = true;
auto filter = table_->GetFilter(prefetch_buffer.get(), handle, auto filter = table_->GetFilter(prefetch_buffer.get(), handle,
is_a_filter_partition, !no_io); is_a_filter_partition, !no_io,
/* get_context */ nullptr);
if (LIKELY(filter.IsSet())) { if (LIKELY(filter.IsSet())) {
if (pin) { if (pin) {
filter_map_[handle.offset()] = std::move(filter); filter_map_[handle.offset()] = std::move(filter);

@ -29,7 +29,8 @@ class MockedBlockBasedTable : public BlockBasedTable {
virtual CachableEntry<FilterBlockReader> GetFilter( virtual CachableEntry<FilterBlockReader> GetFilter(
FilePrefetchBuffer*, const BlockHandle& filter_blk_handle, FilePrefetchBuffer*, const BlockHandle& filter_blk_handle,
const bool /* unused */, bool /* unused */) const override { const bool /* unused */, bool /* unused */,
GetContext* /* unused */) const override {
Slice slice = slices[filter_blk_handle.offset()]; Slice slice = slices[filter_blk_handle.offset()];
auto obj = new FullFilterBlockReader( auto obj = new FullFilterBlockReader(
nullptr, true, BlockContents(slice, false, kNoCompression), nullptr, true, BlockContents(slice, false, kNoCompression),

Loading…
Cancel
Save