Use new Insert and Lookup APIs in table reader to support secondary cache (#8315)

Summary:
Secondary cache is implemented to achieve the secondary cache tier for block cache. New Insert and Lookup APIs are introduced in https://github.com/facebook/rocksdb/issues/8271  . To support and use the secondary cache in block based table reader, this PR introduces the corresponding callback functions that will be used in secondary cache, and update the Insert and Lookup APIs accordingly.

benchmarking:
./db_bench --benchmarks="fillrandom" -num=1000000 -key_size=32 -value_size=256 -use_direct_io_for_flush_and_compaction=true -db=/tmp/rocks_t/db -partition_index_and_filters=true

./db_bench -db=/tmp/rocks_t/db -use_existing_db=true -benchmarks=readrandom -num=1000000 -key_size=32 -value_size=256 -use_direct_reads=true -cache_size=1073741824 -cache_numshardbits=5 -cache_index_and_filter_blocks=true -read_random_exp_range=17 -statistics -partition_index_and_filters=true -stats_dump_period_sec=30 -reads=50000000

master benchmarking results:
readrandom   :       3.923 micros/op 254881 ops/sec;   33.4 MB/s (23849796 of 50000000 found)
rocksdb.db.get.micros P50 : 2.820992 P95 : 5.636716 P99 : 16.450553 P100 : 8396.000000 COUNT : 50000000 SUM : 179947064

Current PR benchmarking results
readrandom   :       4.083 micros/op 244925 ops/sec;   32.1 MB/s (23849796 of 50000000 found)
rocksdb.db.get.micros P50 : 2.967687 P95 : 5.754916 P99 : 15.665912 P100 : 8213.000000 COUNT : 50000000 SUM : 187250053

About 3.8% throughput reduction.
P50: 5.2% increasing, P95, 2.09% increasing, P99 4.77% improvement

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8315

Test Plan: added the testing case

Reviewed By: anand1976

Differential Revision: D28599774

Pulled By: zhichao-cao

fbshipit-source-id: 098c4df0d7327d3a546df7604b2f1602f13044ed
main
Zhichao Cao 4 years ago committed by Facebook GitHub Bot
parent 6c7c3e8cb3
commit 7303d02bdf
  1. 399
      cache/lru_cache_test.cc
  2. 10
      db/db_block_cache_test.cc
  3. 4
      include/rocksdb/cache.h
  4. 59
      table/block_based/block_based_table_reader.cc
  5. 5
      table/block_based/block_based_table_reader.h
  6. 134
      table/block_based/block_like_traits.h
  7. 2
      table/block_based/parsed_full_filter_block.h

@ -8,8 +8,13 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "db/db_test_util.h"
#include "file/sst_file_manager_impl.h"
#include "port/port.h" #include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/io_status.h"
#include "rocksdb/sst_file_manager.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/random.h" #include "util/random.h"
@ -199,7 +204,7 @@ TEST_F(LRUCacheTest, EntriesWithPriority) {
class TestSecondaryCache : public SecondaryCache { class TestSecondaryCache : public SecondaryCache {
public: public:
explicit TestSecondaryCache(size_t capacity) explicit TestSecondaryCache(size_t capacity)
: num_inserts_(0), num_lookups_(0) { : num_inserts_(0), num_lookups_(0), inject_failure_(false) {
cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr, cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
} }
@ -207,8 +212,15 @@ class TestSecondaryCache : public SecondaryCache {
std::string Name() override { return "TestSecondaryCache"; } std::string Name() override { return "TestSecondaryCache"; }
void InjectFailure() { inject_failure_ = true; }
void ResetInjectFailure() { inject_failure_ = false; }
Status Insert(const Slice& key, void* value, Status Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) override { const Cache::CacheItemHelper* helper) override {
if (inject_failure_) {
return Status::Corruption("Insertion Data Corrupted");
}
size_t size; size_t size;
char* buf; char* buf;
Status s; Status s;
@ -287,6 +299,13 @@ class TestSecondaryCache : public SecondaryCache {
std::shared_ptr<Cache> cache_; std::shared_ptr<Cache> cache_;
uint32_t num_inserts_; uint32_t num_inserts_;
uint32_t num_lookups_; uint32_t num_lookups_;
bool inject_failure_;
};
class DBSecondaryCacheTest : public DBTestBase {
public:
DBSecondaryCacheTest()
: DBTestBase("/db_secondary_cache_test", /*env_do_fsync=*/true) {}
}; };
class LRUSecondaryCacheTest : public LRUCacheTest { class LRUSecondaryCacheTest : public LRUCacheTest {
@ -314,13 +333,13 @@ class LRUSecondaryCacheTest : public LRUCacheTest {
return reinterpret_cast<TestItem*>(obj)->Size(); return reinterpret_cast<TestItem*>(obj)->Size();
} }
static Status SaveToCallback(void* obj, size_t offset, size_t size, static Status SaveToCallback(void* from_obj, size_t from_offset,
void* out) { size_t length, void* out) {
TestItem* item = reinterpret_cast<TestItem*>(obj); TestItem* item = reinterpret_cast<TestItem*>(from_obj);
char* buf = item->Buf(); char* buf = item->Buf();
EXPECT_EQ(size, item->Size()); EXPECT_EQ(length, item->Size());
EXPECT_EQ(offset, 0); EXPECT_EQ(from_offset, 0);
memcpy(out, buf, size); memcpy(out, buf, length);
return Status::OK(); return Status::OK();
} }
@ -547,6 +566,372 @@ TEST_F(LRUSecondaryCacheTest, FullCapacityTest) {
cache.reset(); cache.reset();
secondary_cache.reset(); secondary_cache.reset();
} }
// In this test, the block cache size is set to 4096, after insert 6 KV-pairs
// and flush, there are 5 blocks in this SST file, 2 data blocks and 3 meta
// blocks. block_1 size is 4096 and block_2 size is 2056. The total size
// of the meta blocks are about 900 to 1000. Therefore, in any situation,
// if we try to insert block_1 to the block cache, it will always fails. Only
// block_2 will be successfully inserted into the block cache.
TEST_F(DBSecondaryCacheTest, TestSecondaryCacheCorrectness1) {
LRUCacheOptions opts(4 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
// Set the file paranoid check, so after flush, the file will be read
// all the blocks will be accessed.
options.paranoid_file_checks = true;
DestroyAndReopen(options);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB do the paranoid check for the new
// SST file. Meta blocks are always cached in the block cache and they
// will not be evicted. When block_2 is cache miss and read out, it is
// inserted to the block cache. Note that, block_1 is never successfully
// inserted to the block cache. Here are 2 lookups in the secondary cache
// for block_1 and block_2
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
Compact("a", "z");
// Compaction will create the iterator to scan the whole file. So all the
// blocks are needed. Meta blocks are always cached. When block_1 is read
// out, block_2 is evicted from block cache and inserted to secondary
// cache.
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 3u);
std::string v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// The first data block is not in the cache, similarly, trigger the block
// cache Lookup and secondary cache lookup for block_1. But block_1 will not
// be inserted successfully due to the size. Currently, cache only has
// the meta blocks.
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 4u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// The second data block is not in the cache, similarly, trigger the block
// cache Lookup and secondary cache lookup for block_2 and block_2 is found
// in the secondary cache. Now block cache has block_2
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 5u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// block_2 is in the block cache. There is a block cache hit. No need to
// lookup or insert the secondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 5u);
v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// Lookup the first data block, not in the block cache, so lookup the
// secondary cache. Also not in the secondary cache. After Get, still
// block_1 is will not be cached.
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 6u);
v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// Lookup the first data block, not in the block cache, so lookup the
// secondary cache. Also not in the secondary cache. After Get, still
// block_1 is will not be cached.
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 7u);
Destroy(options);
}
// In this test, the block cache size is set to 5100, after insert 6 KV-pairs
// and flush, there are 5 blocks in this SST file, 2 data blocks and 3 meta
// blocks. block_1 size is 4096 and block_2 size is 2056. The total size
// of the meta blocks are about 900 to 1000. Therefore, we can successfully
// insert and cache block_1 in the block cache (this is the different place
// from TestSecondaryCacheCorrectness1)
TEST_F(DBSecondaryCacheTest, TestSecondaryCacheCorrectness2) {
LRUCacheOptions opts(5100, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.paranoid_file_checks = true;
DestroyAndReopen(options);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB do the paranoid check for the new
// SST file. Meta blocks are always cached in the block cache and they
// will not be evicted. When block_2 is cache miss and read out, it is
// inserted to the block cache. Thefore, block_1 is evicted from block
// cache and successfully inserted to the secondary cache. Here are 2
// lookups in the secondary cache for block_1 and block_2.
ASSERT_EQ(secondary_cache->num_inserts(), 1u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
Compact("a", "z");
// Compaction will create the iterator to scan the whole file. So all the
// blocks are needed. After Flush, only block_2 is cached in block cache
// and block_1 is in the secondary cache. So when read block_1, it is
// read out from secondary cache and inserted to block cache. At the same
// time, block_2 is inserted to secondary cache. Now, secondary cache has
// both block_1 and block_2. After compaction, block_1 is in the cache.
ASSERT_EQ(secondary_cache->num_inserts(), 2u);
ASSERT_EQ(secondary_cache->num_lookups(), 3u);
std::string v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// This Get needs to access block_1, since block_1 is cached in block cache
// there is no secondary cache lookup.
ASSERT_EQ(secondary_cache->num_inserts(), 2u);
ASSERT_EQ(secondary_cache->num_lookups(), 3u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// This Get needs to access block_2 which is not in the block cache. So
// it will lookup the secondary cache for block_2 and cache it in the
// block_cache.
ASSERT_EQ(secondary_cache->num_inserts(), 2u);
ASSERT_EQ(secondary_cache->num_lookups(), 4u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// This Get needs to access block_2 which is already in the block cache.
// No need to lookup secondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 2u);
ASSERT_EQ(secondary_cache->num_lookups(), 4u);
v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// This Get needs to access block_1, since block_1 is not in block cache
// there is one econdary cache lookup. Then, block_1 is cached in the
// block cache.
ASSERT_EQ(secondary_cache->num_inserts(), 2u);
ASSERT_EQ(secondary_cache->num_lookups(), 5u);
v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// This Get needs to access block_1, since block_1 is cached in block cache
// there is no secondary cache lookup.
ASSERT_EQ(secondary_cache->num_inserts(), 2u);
ASSERT_EQ(secondary_cache->num_lookups(), 5u);
Destroy(options);
}
// The block cache size is set to 1024*1024, after insert 6 KV-pairs
// and flush, there are 5 blocks in this SST file, 2 data blocks and 3 meta
// blocks. block_1 size is 4096 and block_2 size is 2056. The total size
// of the meta blocks are about 900 to 1000. Therefore, we can successfully
// cache all the blocks in the block cache and there is not secondary cache
// insertion. 2 lookup is needed for the blocks.
TEST_F(DBSecondaryCacheTest, NoSecondaryCacheInsertion) {
LRUCacheOptions opts(1024 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.paranoid_file_checks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1000);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB do the paranoid check for the new
// SST file. Meta blocks are always cached in the block cache and they
// will not be evicted. Now, block cache is large enough, it cache
// both block_1 and block_2. When first time read block_1 and block_2
// there are cache misses. So 2 secondary cache lookups are needed for
// the 2 blocks
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
Compact("a", "z");
// Compaction will iterate the whole SST file. Since all the data blocks
// are in the block cache. No need to lookup the secondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
std::string v = Get(Key(0));
ASSERT_EQ(1000, v.size());
// Since the block cache is large enough, all the blocks are cached. we
// do not need to lookup the seondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
Destroy(options);
}
TEST_F(DBSecondaryCacheTest, SecondaryCacheIntensiveTesting) {
LRUCacheOptions opts(8 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
Random rnd(301);
const int N = 256;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1000);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
Compact("a", "z");
Random r_index(47);
std::string v;
for (int i = 0; i < 1000; i++) {
uint32_t key_i = r_index.Next() % N;
v = Get(Key(key_i));
}
// We have over 200 data blocks there will be multiple insertion
// and lookups.
ASSERT_GE(secondary_cache->num_inserts(), 1u);
ASSERT_GE(secondary_cache->num_lookups(), 1u);
Destroy(options);
}
// In this test, the block cache size is set to 4096, after insert 6 KV-pairs
// and flush, there are 5 blocks in this SST file, 2 data blocks and 3 meta
// blocks. block_1 size is 4096 and block_2 size is 2056. The total size
// of the meta blocks are about 900 to 1000. Therefore, in any situation,
// if we try to insert block_1 to the block cache, it will always fails. Only
// block_2 will be successfully inserted into the block cache.
TEST_F(DBSecondaryCacheTest, SecondaryCacheFailureTest) {
LRUCacheOptions opts(4 * 1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache(
new TestSecondaryCache(2048 * 1024));
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
table_options.block_size = 4 * 1024;
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.paranoid_file_checks = true;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
Random rnd(301);
const int N = 6;
for (int i = 0; i < N; i++) {
std::string p_v = rnd.RandomString(1007);
ASSERT_OK(Put(Key(i), p_v));
}
ASSERT_OK(Flush());
// After Flush is successful, RocksDB do the paranoid check for the new
// SST file. Meta blocks are always cached in the block cache and they
// will not be evicted. When block_2 is cache miss and read out, it is
// inserted to the block cache. Note that, block_1 is never successfully
// inserted to the block cache. Here are 2 lookups in the secondary cache
// for block_1 and block_2
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 2u);
// Fail the insertion, in LRU cache, the secondary insertion returned status
// is not checked, therefore, the DB will not be influenced.
secondary_cache->InjectFailure();
Compact("a", "z");
// Compaction will create the iterator to scan the whole file. So all the
// blocks are needed. Meta blocks are always cached. When block_1 is read
// out, block_2 is evicted from block cache and inserted to secondary
// cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 3u);
std::string v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// The first data block is not in the cache, similarly, trigger the block
// cache Lookup and secondary cache lookup for block_1. But block_1 will not
// be inserted successfully due to the size. Currently, cache only has
// the meta blocks.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 4u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// The second data block is not in the cache, similarly, trigger the block
// cache Lookup and secondary cache lookup for block_2 and block_2 is found
// in the secondary cache. Now block cache has block_2
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 5u);
v = Get(Key(5));
ASSERT_EQ(1007, v.size());
// block_2 is in the block cache. There is a block cache hit. No need to
// lookup or insert the secondary cache.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 5u);
v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// Lookup the first data block, not in the block cache, so lookup the
// secondary cache. Also not in the secondary cache. After Get, still
// block_1 is will not be cached.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 6u);
v = Get(Key(0));
ASSERT_EQ(1007, v.size());
// Lookup the first data block, not in the block cache, so lookup the
// secondary cache. Also not in the secondary cache. After Get, still
// block_1 is will not be cached.
ASSERT_EQ(secondary_cache->num_inserts(), 0u);
ASSERT_EQ(secondary_cache->num_lookups(), 7u);
secondary_cache->ResetInjectFailure();
Destroy(options);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -460,15 +460,17 @@ class MockCache : public LRUCache {
} }
using ShardedCache::Insert; using ShardedCache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), Handle** handle, Status Insert(const Slice& key, void* value,
Priority priority) override { const Cache::CacheItemHelper* helper_cb, size_t charge,
Handle** handle, Priority priority) override {
DeleterFn delete_cb = helper_cb->del_cb;
if (priority == Priority::LOW) { if (priority == Priority::LOW) {
low_pri_insert_count++; low_pri_insert_count++;
} else { } else {
high_pri_insert_count++; high_pri_insert_count++;
} }
return LRUCache::Insert(key, value, charge, deleter, handle, priority); return LRUCache::Insert(key, value, charge, delete_cb, handle, priority);
} }
}; };

@ -176,8 +176,8 @@ class Cache {
// data into a buffer. The secondary cache may decide to not store it in a // data into a buffer. The secondary cache may decide to not store it in a
// contiguous buffer, in which case this callback will be called multiple // contiguous buffer, in which case this callback will be called multiple
// times with increasing offset // times with increasing offset
using SaveToCallback = Status (*)(void* obj, size_t offset, size_t size, using SaveToCallback = Status (*)(void* from_obj, size_t from_offset,
void* out); size_t length, void* out);
// A function pointer type for custom destruction of an entry's // A function pointer type for custom destruction of an entry's
// value. The Cache is responsible for copying and reclaiming space // value. The Cache is responsible for copying and reclaiming space

@ -354,8 +354,11 @@ void BlockBasedTable::UpdateCacheInsertionMetrics(BlockType block_type,
Cache::Handle* BlockBasedTable::GetEntryFromCache( Cache::Handle* BlockBasedTable::GetEntryFromCache(
Cache* block_cache, const Slice& key, BlockType block_type, Cache* block_cache, const Slice& key, BlockType block_type,
GetContext* get_context) const { GetContext* get_context, const Cache::CacheItemHelper* cache_helper,
auto cache_handle = block_cache->Lookup(key, rep_->ioptions.stats); const Cache::CreateCallback& create_cb, Cache::Priority priority) const {
auto cache_handle =
block_cache->Lookup(key, cache_helper, create_cb, priority, true,
rep_->ioptions.statistics.get());
if (cache_handle != nullptr) { if (cache_handle != nullptr) {
UpdateCacheHitMetrics(block_type, get_context, UpdateCacheHitMetrics(block_type, get_context,
@ -1107,15 +1110,30 @@ Status BlockBasedTable::GetDataBlockFromCache(
: 0; : 0;
assert(block); assert(block);
assert(block->IsEmpty()); assert(block->IsEmpty());
const Cache::Priority priority =
rep_->table_options.cache_index_and_filter_blocks_with_high_priority &&
(block_type == BlockType::kFilter ||
block_type == BlockType::kCompressionDictionary ||
block_type == BlockType::kIndex)
? Cache::Priority::HIGH
: Cache::Priority::LOW;
Status s; Status s;
BlockContents* compressed_block = nullptr; BlockContents* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr;
Statistics* statistics = rep_->ioptions.statistics.get();
bool using_zstd = rep_->blocks_definitely_zstd_compressed;
const FilterPolicy* filter_policy = rep_->filter_policy;
Cache::CreateCallback create_cb =
GetCreateCallback(read_amp_bytes_per_bit, statistics, using_zstd,
filter_policy, *block->GetValue());
// Lookup uncompressed cache first // Lookup uncompressed cache first
if (block_cache != nullptr) { if (block_cache != nullptr) {
auto cache_handle = GetEntryFromCache(block_cache, block_cache_key, auto cache_handle = GetEntryFromCache(
block_type, get_context); block_cache, block_cache_key, block_type, get_context,
BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), create_cb,
priority);
if (cache_handle != nullptr) { if (cache_handle != nullptr) {
block->SetCachedValue( block->SetCachedValue(
reinterpret_cast<TBlocklike*>(block_cache->Value(cache_handle)), reinterpret_cast<TBlocklike*>(block_cache->Value(cache_handle)),
@ -1132,10 +1150,13 @@ Status BlockBasedTable::GetDataBlockFromCache(
} }
assert(!compressed_block_cache_key.empty()); assert(!compressed_block_cache_key.empty());
block_cache_compressed_handle = BlockContents contents;
block_cache_compressed->Lookup(compressed_block_cache_key); Cache::CreateCallback create_cb_special = GetCreateCallback(
read_amp_bytes_per_bit, statistics, using_zstd, filter_policy, contents);
Statistics* statistics = rep_->ioptions.stats; block_cache_compressed_handle = block_cache_compressed->Lookup(
compressed_block_cache_key,
BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
create_cb_special, priority, true);
// if we found in the compressed cache, then uncompress and insert into // if we found in the compressed cache, then uncompress and insert into
// uncompressed cache // uncompressed cache
@ -1152,7 +1173,6 @@ Status BlockBasedTable::GetDataBlockFromCache(
assert(compression_type != kNoCompression); assert(compression_type != kNoCompression);
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents;
UncompressionContext context(compression_type); UncompressionContext context(compression_type);
UncompressionInfo info(context, uncompression_dict, compression_type); UncompressionInfo info(context, uncompression_dict, compression_type);
s = UncompressBlockContents( s = UncompressBlockContents(
@ -1160,7 +1180,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
&contents, rep_->table_options.format_version, rep_->ioptions, &contents, rep_->table_options.format_version, rep_->ioptions,
GetMemoryAllocator(rep_->table_options)); GetMemoryAllocator(rep_->table_options));
// Insert uncompressed block into block cache // Insert uncompressed block into block cache, the priority is based on the
// data block type.
if (s.ok()) { if (s.ok()) {
std::unique_ptr<TBlocklike> block_holder( std::unique_ptr<TBlocklike> block_holder(
BlocklikeTraits<TBlocklike>::Create( BlocklikeTraits<TBlocklike>::Create(
@ -1172,9 +1193,10 @@ Status BlockBasedTable::GetDataBlockFromCache(
read_options.fill_cache) { read_options.fill_cache) {
size_t charge = block_holder->ApproximateMemoryUsage(); size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr; Cache::Handle* cache_handle = nullptr;
auto deleter = BlocklikeTraits<TBlocklike>::GetDeleter(block_type); s = block_cache->Insert(
s = block_cache->Insert(block_cache_key, block_holder.get(), charge, block_cache_key, block_holder.get(),
deleter, &cache_handle); BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), charge,
&cache_handle, priority);
if (s.ok()) { if (s.ok()) {
assert(cache_handle != nullptr); assert(cache_handle != nullptr);
block->SetCachedValue(block_holder.release(), block_cache, block->SetCachedValue(block_holder.release(), block_cache,
@ -1261,10 +1283,10 @@ Status BlockBasedTable::PutDataBlockToCache(
// an object in the stack. // an object in the stack.
BlockContents* block_cont_for_comp_cache = BlockContents* block_cont_for_comp_cache =
new BlockContents(std::move(*raw_block_contents)); new BlockContents(std::move(*raw_block_contents));
auto deleter = BlocklikeTraits<BlockContents>::GetDeleter(block_type);
s = block_cache_compressed->Insert( s = block_cache_compressed->Insert(
compressed_block_cache_key, block_cont_for_comp_cache, compressed_block_cache_key, block_cont_for_comp_cache,
block_cont_for_comp_cache->ApproximateMemoryUsage(), deleter); BlocklikeTraits<BlockContents>::GetCacheItemHelper(block_type),
block_cont_for_comp_cache->ApproximateMemoryUsage());
if (s.ok()) { if (s.ok()) {
// Avoid the following code to delete this cached block. // Avoid the following code to delete this cached block.
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD); RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
@ -1278,9 +1300,10 @@ Status BlockBasedTable::PutDataBlockToCache(
if (block_cache != nullptr && block_holder->own_bytes()) { if (block_cache != nullptr && block_holder->own_bytes()) {
size_t charge = block_holder->ApproximateMemoryUsage(); size_t charge = block_holder->ApproximateMemoryUsage();
Cache::Handle* cache_handle = nullptr; Cache::Handle* cache_handle = nullptr;
auto deleter = BlocklikeTraits<TBlocklike>::GetDeleter(block_type); s = block_cache->Insert(
s = block_cache->Insert(block_cache_key, block_holder.get(), charge, block_cache_key, block_holder.get(),
deleter, &cache_handle, priority); BlocklikeTraits<TBlocklike>::GetCacheItemHelper(block_type), charge,
&cache_handle, priority);
if (s.ok()) { if (s.ok()) {
assert(cache_handle != nullptr); assert(cache_handle != nullptr);
cached_block->SetCachedValue(block_holder.release(), block_cache, cached_block->SetCachedValue(block_holder.release(), block_cache,

@ -270,7 +270,10 @@ class BlockBasedTable : public TableReader {
bool redundant) const; bool redundant) const;
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key, Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
BlockType block_type, BlockType block_type,
GetContext* get_context) const; GetContext* get_context,
const Cache::CacheItemHelper* cache_helper,
const Cache::CreateCallback& create_cb,
Cache::Priority priority) const;
// Either Block::NewDataIterator() or Block::NewIndexIterator(). // Either Block::NewDataIterator() or Block::NewIndexIterator().
template <typename TBlockIter> template <typename TBlockIter>

@ -17,6 +17,29 @@ namespace ROCKSDB_NAMESPACE {
template <typename TBlocklike> template <typename TBlocklike>
class BlocklikeTraits; class BlocklikeTraits;
template <typename T, CacheEntryRole R>
Cache::CacheItemHelper* GetCacheItemHelperForRole();
template <typename TBlocklike>
Cache::CreateCallback GetCreateCallback(size_t read_amp_bytes_per_bit,
Statistics* statistics, bool using_zstd,
const FilterPolicy* filter_policy,
const TBlocklike& /*block*/) {
return [read_amp_bytes_per_bit, statistics, using_zstd, filter_policy](
void* buf, size_t size, void** out_obj, size_t* charge) -> Status {
assert(buf != nullptr);
std::unique_ptr<char[]> buf_data(new char[size]());
memcpy(buf_data.get(), buf, size);
BlockContents bc = BlockContents(std::move(buf_data), size);
TBlocklike* ucd_ptr = BlocklikeTraits<TBlocklike>::Create(
std::move(bc), read_amp_bytes_per_bit, statistics, using_zstd,
filter_policy);
*out_obj = reinterpret_cast<void*>(ucd_ptr);
*charge = size;
return Status::OK();
};
}
template <> template <>
class BlocklikeTraits<BlockContents> { class BlocklikeTraits<BlockContents> {
public: public:
@ -32,14 +55,31 @@ class BlocklikeTraits<BlockContents> {
return 0; return 0;
} }
static Cache::DeleterFn GetDeleter(BlockType block_type) { static size_t SizeCallback(void* obj) {
assert(obj != nullptr);
BlockContents* ptr = static_cast<BlockContents*>(obj);
return ptr->data.size();
}
static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
assert(from_obj != nullptr);
BlockContents* ptr = static_cast<BlockContents*>(from_obj);
const char* buf = ptr->data.data();
assert(length == ptr->data.size());
(void)from_offset;
memcpy(out, buf, length);
return Status::OK();
}
static Cache::CacheItemHelper* GetCacheItemHelper(BlockType block_type) {
if (block_type == BlockType::kFilter) { if (block_type == BlockType::kFilter) {
return GetCacheEntryDeleterForRole< return GetCacheItemHelperForRole<
BlockContents, CacheEntryRole::kDeprecatedFilterBlock>(); BlockContents, CacheEntryRole::kDeprecatedFilterBlock>();
} else { } else {
// E.g. compressed cache // E.g. compressed cache
return GetCacheEntryDeleterForRole<BlockContents, return GetCacheItemHelperForRole<BlockContents,
CacheEntryRole::kOtherBlock>(); CacheEntryRole::kOtherBlock>();
} }
} }
}; };
@ -59,11 +99,28 @@ class BlocklikeTraits<ParsedFullFilterBlock> {
return 0; return 0;
} }
static Cache::DeleterFn GetDeleter(BlockType block_type) { static size_t SizeCallback(void* obj) {
assert(obj != nullptr);
ParsedFullFilterBlock* ptr = static_cast<ParsedFullFilterBlock*>(obj);
return ptr->GetBlockContentsData().size();
}
static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
assert(from_obj != nullptr);
ParsedFullFilterBlock* ptr = static_cast<ParsedFullFilterBlock*>(from_obj);
const char* buf = ptr->GetBlockContentsData().data();
assert(length == ptr->GetBlockContentsData().size());
(void)from_offset;
memcpy(out, buf, length);
return Status::OK();
}
static Cache::CacheItemHelper* GetCacheItemHelper(BlockType block_type) {
(void)block_type; (void)block_type;
assert(block_type == BlockType::kFilter); assert(block_type == BlockType::kFilter);
return GetCacheEntryDeleterForRole<ParsedFullFilterBlock, return GetCacheItemHelperForRole<ParsedFullFilterBlock,
CacheEntryRole::kFilterBlock>(); CacheEntryRole::kFilterBlock>();
} }
}; };
@ -80,23 +137,38 @@ class BlocklikeTraits<Block> {
return block.NumRestarts(); return block.NumRestarts();
} }
static Cache::DeleterFn GetDeleter(BlockType block_type) { static size_t SizeCallback(void* obj) {
assert(obj != nullptr);
Block* ptr = static_cast<Block*>(obj);
return ptr->size();
}
static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
assert(from_obj != nullptr);
Block* ptr = static_cast<Block*>(from_obj);
const char* buf = ptr->data();
assert(length == ptr->size());
(void)from_offset;
memcpy(out, buf, length);
return Status::OK();
}
static Cache::CacheItemHelper* GetCacheItemHelper(BlockType block_type) {
switch (block_type) { switch (block_type) {
case BlockType::kData: case BlockType::kData:
return GetCacheEntryDeleterForRole<Block, CacheEntryRole::kDataBlock>(); return GetCacheItemHelperForRole<Block, CacheEntryRole::kDataBlock>();
case BlockType::kIndex: case BlockType::kIndex:
return GetCacheEntryDeleterForRole<Block, return GetCacheItemHelperForRole<Block, CacheEntryRole::kIndexBlock>();
CacheEntryRole::kIndexBlock>();
case BlockType::kFilter: case BlockType::kFilter:
return GetCacheEntryDeleterForRole<Block, return GetCacheItemHelperForRole<Block,
CacheEntryRole::kFilterMetaBlock>(); CacheEntryRole::kFilterMetaBlock>();
default: default:
// Not a recognized combination // Not a recognized combination
assert(false); assert(false);
FALLTHROUGH_INTENDED; FALLTHROUGH_INTENDED;
case BlockType::kRangeDeletion: case BlockType::kRangeDeletion:
return GetCacheEntryDeleterForRole<Block, return GetCacheItemHelperForRole<Block, CacheEntryRole::kOtherBlock>();
CacheEntryRole::kOtherBlock>();
} }
} }
}; };
@ -117,12 +189,38 @@ class BlocklikeTraits<UncompressionDict> {
return 0; return 0;
} }
static Cache::DeleterFn GetDeleter(BlockType block_type) { static size_t SizeCallback(void* obj) {
assert(obj != nullptr);
UncompressionDict* ptr = static_cast<UncompressionDict*>(obj);
return ptr->slice_.size();
}
static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
assert(from_obj != nullptr);
UncompressionDict* ptr = static_cast<UncompressionDict*>(from_obj);
const char* buf = ptr->slice_.data();
assert(length == ptr->slice_.size());
(void)from_offset;
memcpy(out, buf, length);
return Status::OK();
}
static Cache::CacheItemHelper* GetCacheItemHelper(BlockType block_type) {
(void)block_type; (void)block_type;
assert(block_type == BlockType::kCompressionDictionary); assert(block_type == BlockType::kCompressionDictionary);
return GetCacheEntryDeleterForRole<UncompressionDict, return GetCacheItemHelperForRole<UncompressionDict,
CacheEntryRole::kOtherBlock>(); CacheEntryRole::kOtherBlock>();
} }
}; };
// Get an CacheItemHelper pointer for value type T and role R.
template <typename T, CacheEntryRole R>
Cache::CacheItemHelper* GetCacheItemHelperForRole() {
static Cache::CacheItemHelper cache_helper(
BlocklikeTraits<T>::SizeCallback, BlocklikeTraits<T>::SaveToCallback,
GetCacheEntryDeleterForRole<T, R>());
return &cache_helper;
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -32,6 +32,8 @@ class ParsedFullFilterBlock {
bool own_bytes() const { return block_contents_.own_bytes(); } bool own_bytes() const { return block_contents_.own_bytes(); }
const Slice GetBlockContentsData() const { return block_contents_.data; }
private: private:
BlockContents block_contents_; BlockContents block_contents_;
std::unique_ptr<FilterBitsReader> filter_bits_reader_; std::unique_ptr<FilterBitsReader> filter_bits_reader_;

Loading…
Cancel
Save