Prevent double caching in the compressed secondary cache (#9747)

Summary:
###  **Summary:**
When both LRU Cache and CompressedSecondaryCache are configured together, there possibly are some data blocks double cached.

**Changes include:**
1. Update IS_PROMOTED to IS_IN_SECONDARY_CACHE to prevent confusions.
2. This PR updates SecondaryCacheResultHandle and use IsErasedFromSecondaryCache to determine whether the handle is erased in the secondary cache. Then, the caller can determine whether to SetIsInSecondaryCache().
3. Rename LRUSecondaryCache to CompressedSecondaryCache.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9747

Test Plan:
**Test Scripts:**
1. Populate a DB. The on disk footprint is 482 MB. The data is set to be 50% compressible, so the total decompressed size is expected to be 964 MB.
./db_bench --benchmarks=fillrandom --num=10000000 -db=/db_bench_1

2. overwrite it to a stable state:
./db_bench --benchmarks=overwrite,stats --num=10000000 -use_existing_db -duration=10 --benchmark_write_rate_limit=2000000 -db=/db_bench_1

4. Run read tests with diffeernt cache setting:

T1:
./db_bench --benchmarks=seekrandom,stats --threads=16 --num=10000000 -use_existing_db -duration=120 --benchmark_write_rate_limit=52000000 -use_direct_reads --cache_size=520000000  --statistics -db=/db_bench_1

T2:
./db_bench --benchmarks=seekrandom,stats --threads=16 --num=10000000 -use_existing_db -duration=120 --benchmark_write_rate_limit=52000000 -use_direct_reads --cache_size=320000000 -compressed_secondary_cache_size=400000000 --statistics -use_compressed_secondary_cache -db=/db_bench_1

T3:
./db_bench --benchmarks=seekrandom,stats --threads=16 --num=10000000 -use_existing_db -duration=120 --benchmark_write_rate_limit=52000000 -use_direct_reads --cache_size=520000000 -compressed_secondary_cache_size=400000000 --statistics -use_compressed_secondary_cache -db=/db_bench_1

T4:
./db_bench --benchmarks=seekrandom,stats --threads=16 --num=10000000 -use_existing_db -duration=120 --benchmark_write_rate_limit=52000000 -use_direct_reads --cache_size=20000000 -compressed_secondary_cache_size=500000000 --statistics -use_compressed_secondary_cache -db=/db_bench_1

**Before this PR**
| Cache Size | Compressed Secondary Cache Size | Cache Hit Rate |
|------------|-------------------------------------|----------------|
|520 MB | 0 MB | 85.5% |
|320 MB | 400 MB | 96.2% |
|520 MB | 400 MB | 98.3% |
|20 MB | 500 MB | 98.8% |

**Before this PR**
| Cache Size | Compressed Secondary Cache Size | Cache Hit Rate |
|------------|-------------------------------------|----------------|
|520 MB | 0 MB | 85.5% |
|320 MB | 400 MB | 99.9% |
|520 MB | 400 MB | 99.9% |
|20 MB | 500 MB | 99.2% |

Reviewed By: anand1976

Differential Revision: D35117499

Pulled By: gitbw95

fbshipit-source-id: ea2657749fc13efebe91a8a1b56bc61d6a224a12
main
gitbw95 2 years ago committed by Facebook GitHub Bot
parent f3bcac39a6
commit f241d082b6
  1. 4
      CMakeLists.txt
  2. 2
      Makefile
  3. 16
      TARGETS
  4. 39
      cache/compressed_secondary_cache.cc
  5. 33
      cache/compressed_secondary_cache.h
  6. 256
      cache/compressed_secondary_cache_test.cc
  7. 10
      cache/lru_cache.cc
  8. 19
      cache/lru_cache.h
  9. 100
      cache/lru_cache_test.cc
  10. 12
      include/rocksdb/cache.h
  11. 6
      include/rocksdb/secondary_cache.h
  12. 3
      options/customizable_test.cc
  13. 4
      src.mk
  14. 43
      tools/db_bench_tool.cc
  15. 4
      utilities/fault_injection_secondary_cache.cc
  16. 4
      utilities/fault_injection_secondary_cache.h

@ -628,8 +628,8 @@ set(SOURCES
cache/cache_key.cc
cache/cache_reservation_manager.cc
cache/clock_cache.cc
cache/compressed_secondary_cache.cc
cache/lru_cache.cc
cache/lru_secondary_cache.cc
cache/sharded_cache.cc
db/arena_wrapped_db_iter.cc
db/blob/blob_fetcher.cc
@ -1157,8 +1157,8 @@ if(WITH_TESTS)
list(APPEND TESTS
cache/cache_reservation_manager_test.cc
cache/cache_test.cc
cache/compressed_secondary_cache_test.cc
cache/lru_cache_test.cc
cache/lru_secondary_cache_test.cc
db/blob/blob_counting_iterator_test.cc
db/blob/blob_file_addition_test.cc
db/blob/blob_file_builder_test.cc

@ -1842,7 +1842,7 @@ statistics_test: $(OBJ_DIR)/monitoring/statistics_test.o $(TEST_LIBRARY) $(LIBRA
stats_history_test: $(OBJ_DIR)/monitoring/stats_history_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
lru_secondary_cache_test: $(OBJ_DIR)/cache/lru_secondary_cache_test.o $(TEST_LIBRARY) $(LIBRARY)
compressed_secondary_cache_test: $(OBJ_DIR)/cache/compressed_secondary_cache_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
lru_cache_test: $(OBJ_DIR)/cache/lru_cache_test.o $(TEST_LIBRARY) $(LIBRARY)

@ -14,8 +14,8 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/clock_cache.cc",
"cache/compressed_secondary_cache.cc",
"cache/lru_cache.cc",
"cache/lru_secondary_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_fetcher.cc",
@ -332,8 +332,8 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/clock_cache.cc",
"cache/compressed_secondary_cache.cc",
"cache/lru_cache.cc",
"cache/lru_secondary_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_fetcher.cc",
@ -4984,6 +4984,12 @@ cpp_unittest_wrapper(name="comparator_db_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="compressed_secondary_cache_test",
srcs=["cache/compressed_secondary_cache_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="configurable_test",
srcs=["options/configurable_test.cc"],
deps=[":rocksdb_test_lib"],
@ -5478,12 +5484,6 @@ cpp_unittest_wrapper(name="lru_cache_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="lru_secondary_cache_test",
srcs=["cache/lru_secondary_cache_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="manual_compaction_test",
srcs=["db/manual_compaction_test.cc"],
deps=[":rocksdb_test_lib"],

@ -3,7 +3,7 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "cache/lru_secondary_cache.h"
#include "cache/compressed_secondary_cache.h"
#include <memory>
@ -22,7 +22,7 @@ void DeletionCallback(const Slice& /*key*/, void* obj) {
} // namespace
LRUSecondaryCache::LRUSecondaryCache(
CompressedSecondaryCache::CompressedSecondaryCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
@ -37,11 +37,13 @@ LRUSecondaryCache::LRUSecondaryCache(
use_adaptive_mutex, metadata_charge_policy);
}
LRUSecondaryCache::~LRUSecondaryCache() { cache_.reset(); }
CompressedSecondaryCache::~CompressedSecondaryCache() { cache_.reset(); }
std::unique_ptr<SecondaryCacheResultHandle> LRUSecondaryCache::Lookup(
const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) {
std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/,
bool& is_in_sec_cache) {
std::unique_ptr<SecondaryCacheResultHandle> handle;
is_in_sec_cache = false;
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
return handle;
@ -69,24 +71,25 @@ std::unique_ptr<SecondaryCacheResultHandle> LRUSecondaryCache::Lookup(
cache_options_.memory_allocator.get());
if (!uncompressed) {
cache_->Release(lru_handle, true);
cache_->Release(lru_handle, /* erase_if_last_ref */ true);
return handle;
}
s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge);
}
if (!s.ok()) {
cache_->Release(lru_handle, true);
cache_->Release(lru_handle, /* erase_if_last_ref */ true);
return handle;
}
handle.reset(new LRUSecondaryCacheResultHandle(value, charge));
cache_->Release(lru_handle);
cache_->Release(lru_handle, /* erase_if_last_ref */ true);
handle.reset(new CompressedSecondaryCacheResultHandle(value, charge));
return handle;
}
Status LRUSecondaryCache::Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) {
Status CompressedSecondaryCache::Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) {
size_t size = (*helper->size_cb)(value);
CacheAllocationPtr ptr =
AllocateBlock(size, cache_options_.memory_allocator.get());
@ -125,9 +128,9 @@ Status LRUSecondaryCache::Insert(const Slice& key, void* value,
return cache_->Insert(key, buf, size, DeletionCallback);
}
void LRUSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
std::string LRUSecondaryCache::GetPrintableOptions() const {
std::string CompressedSecondaryCache::GetPrintableOptions() const {
std::string ret;
ret.reserve(20000);
const int kBufferSize = 200;
@ -142,23 +145,23 @@ std::string LRUSecondaryCache::GetPrintableOptions() const {
return ret;
}
std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy,
CompressionType compression_type, uint32_t compress_format_version) {
return std::make_shared<LRUSecondaryCache>(
return std::make_shared<CompressedSecondaryCache>(
capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio,
memory_allocator, use_adaptive_mutex, metadata_charge_policy,
compression_type, compress_format_version);
}
std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
const LRUSecondaryCacheOptions& opts) {
std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
const CompressedSecondaryCacheOptions& opts) {
// The secondary_cache is disabled for this LRUCache instance.
assert(opts.secondary_cache == nullptr);
return NewLRUSecondaryCache(
return NewCompressedSecondaryCache(
opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit,
opts.high_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex,
opts.metadata_charge_policy, opts.compression_type,

@ -16,15 +16,16 @@
namespace ROCKSDB_NAMESPACE {
class LRUSecondaryCacheResultHandle : public SecondaryCacheResultHandle {
class CompressedSecondaryCacheResultHandle : public SecondaryCacheResultHandle {
public:
LRUSecondaryCacheResultHandle(void* value, size_t size)
CompressedSecondaryCacheResultHandle(void* value, size_t size)
: value_(value), size_(size) {}
virtual ~LRUSecondaryCacheResultHandle() override = default;
virtual ~CompressedSecondaryCacheResultHandle() override = default;
LRUSecondaryCacheResultHandle(const LRUSecondaryCacheResultHandle&) = delete;
LRUSecondaryCacheResultHandle& operator=(
const LRUSecondaryCacheResultHandle&) = delete;
CompressedSecondaryCacheResultHandle(
const CompressedSecondaryCacheResultHandle&) = delete;
CompressedSecondaryCacheResultHandle& operator=(
const CompressedSecondaryCacheResultHandle&) = delete;
bool IsReady() override { return true; }
@ -39,19 +40,19 @@ class LRUSecondaryCacheResultHandle : public SecondaryCacheResultHandle {
size_t size_;
};
// The LRUSecondaryCache is a concrete implementation of
// The CompressedSecondaryCache is a concrete implementation of
// rocksdb::SecondaryCache.
//
// Users can also cast a pointer to it and call methods on
// it directly, especially custom methods that may be added
// in the future. For example -
// std::unique_ptr<rocksdb::SecondaryCache> cache =
// NewLRUSecondaryCache(opts);
// static_cast<LRUSecondaryCache*>(cache.get())->Erase(key);
// NewCompressedSecondaryCache(opts);
// static_cast<CompressedSecondaryCache*>(cache.get())->Erase(key);
class LRUSecondaryCache : public SecondaryCache {
class CompressedSecondaryCache : public SecondaryCache {
public:
LRUSecondaryCache(
CompressedSecondaryCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr,
@ -60,16 +61,16 @@ class LRUSecondaryCache : public SecondaryCache {
kDontChargeCacheMetadata,
CompressionType compression_type = CompressionType::kLZ4Compression,
uint32_t compress_format_version = 2);
virtual ~LRUSecondaryCache() override;
virtual ~CompressedSecondaryCache() override;
const char* Name() const override { return "LRUSecondaryCache"; }
const char* Name() const override { return "CompressedSecondaryCache"; }
Status Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) override;
std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb,
bool /*wait*/) override;
const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/,
bool& is_in_sec_cache) override;
void Erase(const Slice& key) override;
@ -79,7 +80,7 @@ class LRUSecondaryCache : public SecondaryCache {
private:
std::shared_ptr<Cache> cache_;
LRUSecondaryCacheOptions cache_options_;
CompressedSecondaryCacheOptions cache_options_;
};
} // namespace ROCKSDB_NAMESPACE

@ -3,7 +3,7 @@
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "cache/lru_secondary_cache.h"
#include "cache/compressed_secondary_cache.h"
#include <algorithm>
#include <cstdint>
@ -17,10 +17,10 @@
namespace ROCKSDB_NAMESPACE {
class LRUSecondaryCacheTest : public testing::Test {
class CompressedSecondaryCacheTest : public testing::Test {
public:
LRUSecondaryCacheTest() : fail_create_(false) {}
~LRUSecondaryCacheTest() {}
CompressedSecondaryCacheTest() : fail_create_(false) {}
~CompressedSecondaryCacheTest() {}
protected:
class TestItem {
@ -80,7 +80,7 @@ class LRUSecondaryCacheTest : public testing::Test {
void SetFailCreate(bool fail) { fail_create_ = fail; }
void BasicTest(bool sec_cache_is_compressed, bool use_jemalloc) {
LRUSecondaryCacheOptions opts;
CompressedSecondaryCacheOptions opts;
opts.capacity = 2048;
opts.num_shard_bits = 0;
opts.metadata_charge_policy = kDontChargeCacheMetadata;
@ -107,11 +107,13 @@ class LRUSecondaryCacheTest : public testing::Test {
ROCKSDB_GTEST_BYPASS("JEMALLOC not supported");
}
}
std::shared_ptr<SecondaryCache> cache = NewLRUSecondaryCache(opts);
std::shared_ptr<SecondaryCache> sec_cache =
NewCompressedSecondaryCache(opts);
bool is_in_sec_cache{true};
// Lookup an non-existent key.
std::unique_ptr<SecondaryCacheResultHandle> handle0 =
cache->Lookup("k0", test_item_creator, true);
sec_cache->Lookup("k0", test_item_creator, true, is_in_sec_cache);
ASSERT_EQ(handle0, nullptr);
Random rnd(301);
@ -119,51 +121,47 @@ class LRUSecondaryCacheTest : public testing::Test {
std::string str1;
test::CompressibleString(&rnd, 0.25, 1000, &str1);
TestItem item1(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", &item1, &LRUSecondaryCacheTest::helper_));
ASSERT_OK(sec_cache->Insert("k1", &item1,
&CompressedSecondaryCacheTest::helper_));
std::unique_ptr<SecondaryCacheResultHandle> handle1 =
cache->Lookup("k1", test_item_creator, true);
sec_cache->Lookup("k1", test_item_creator, true, is_in_sec_cache);
ASSERT_NE(handle1, nullptr);
// delete reinterpret_cast<TestItem*>(handle1->Value());
ASSERT_FALSE(is_in_sec_cache);
std::unique_ptr<TestItem> val1 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle1->Value()));
ASSERT_NE(val1, nullptr);
ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0);
// Lookup the first item again.
std::unique_ptr<SecondaryCacheResultHandle> handle1_1 =
sec_cache->Lookup("k1", test_item_creator, true, is_in_sec_cache);
ASSERT_EQ(handle1_1, nullptr);
// Insert and Lookup the second item.
std::string str2;
test::CompressibleString(&rnd, 0.5, 1000, &str2);
TestItem item2(str2.data(), str2.length());
ASSERT_OK(cache->Insert("k2", &item2, &LRUSecondaryCacheTest::helper_));
ASSERT_OK(sec_cache->Insert("k2", &item2,
&CompressedSecondaryCacheTest::helper_));
std::unique_ptr<SecondaryCacheResultHandle> handle2 =
cache->Lookup("k2", test_item_creator, true);
sec_cache->Lookup("k2", test_item_creator, true, is_in_sec_cache);
ASSERT_NE(handle2, nullptr);
std::unique_ptr<TestItem> val2 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle2->Value()));
ASSERT_NE(val2, nullptr);
ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0);
// Lookup the first item again to make sure it is still in the cache.
std::unique_ptr<SecondaryCacheResultHandle> handle1_1 =
cache->Lookup("k1", test_item_creator, true);
ASSERT_NE(handle1_1, nullptr);
std::unique_ptr<TestItem> val1_1 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle1_1->Value()));
ASSERT_NE(val1_1, nullptr);
ASSERT_EQ(memcmp(val1_1->Buf(), item1.Buf(), item1.Size()), 0);
std::vector<SecondaryCacheResultHandle*> handles = {handle1.get(),
handle2.get()};
cache->WaitAll(handles);
sec_cache->WaitAll(handles);
cache->Erase("k1");
handle1 = cache->Lookup("k1", test_item_creator, true);
ASSERT_EQ(handle1, nullptr);
cache.reset();
sec_cache.reset();
}
void FailsTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
CompressedSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
@ -176,32 +174,28 @@ class LRUSecondaryCacheTest : public testing::Test {
secondary_cache_opts.capacity = 1100;
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> cache =
NewLRUSecondaryCache(secondary_cache_opts);
std::shared_ptr<SecondaryCache> sec_cache =
NewCompressedSecondaryCache(secondary_cache_opts);
// Insert and Lookup the first item.
Random rnd(301);
std::string str1(rnd.RandomString(1000));
TestItem item1(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", &item1, &LRUSecondaryCacheTest::helper_));
std::unique_ptr<SecondaryCacheResultHandle> handle1 =
cache->Lookup("k1", test_item_creator, true);
ASSERT_NE(handle1, nullptr);
std::unique_ptr<TestItem> val1 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle1->Value()));
ASSERT_NE(val1, nullptr);
ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0);
ASSERT_OK(sec_cache->Insert("k1", &item1,
&CompressedSecondaryCacheTest::helper_));
// Insert and Lookup the second item.
std::string str2(rnd.RandomString(200));
TestItem item2(str2.data(), str2.length());
// k1 is evicted.
ASSERT_OK(cache->Insert("k2", &item2, &LRUSecondaryCacheTest::helper_));
ASSERT_OK(sec_cache->Insert("k2", &item2,
&CompressedSecondaryCacheTest::helper_));
bool is_in_sec_cache{false};
std::unique_ptr<SecondaryCacheResultHandle> handle1_1 =
cache->Lookup("k1", test_item_creator, true);
sec_cache->Lookup("k1", test_item_creator, true, is_in_sec_cache);
ASSERT_EQ(handle1_1, nullptr);
std::unique_ptr<SecondaryCacheResultHandle> handle2 =
cache->Lookup("k2", test_item_creator, true);
sec_cache->Lookup("k2", test_item_creator, true, is_in_sec_cache);
ASSERT_NE(handle2, nullptr);
std::unique_ptr<TestItem> val2 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle2->Value()));
@ -211,20 +205,20 @@ class LRUSecondaryCacheTest : public testing::Test {
// Create Fails.
SetFailCreate(true);
std::unique_ptr<SecondaryCacheResultHandle> handle2_1 =
cache->Lookup("k2", test_item_creator, true);
sec_cache->Lookup("k2", test_item_creator, true, is_in_sec_cache);
ASSERT_EQ(handle2_1, nullptr);
// Save Fails.
std::string str3 = rnd.RandomString(10);
TestItem item3(str3.data(), str3.length());
ASSERT_NOK(
cache->Insert("k3", &item3, &LRUSecondaryCacheTest::helper_fail_));
ASSERT_NOK(sec_cache->Insert("k3", &item3,
&CompressedSecondaryCacheTest::helper_fail_));
cache.reset();
sec_cache.reset();
}
void BasicIntegrationTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
CompressedSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
@ -239,7 +233,7 @@ class LRUSecondaryCacheTest : public testing::Test {
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions lru_cache_opts(1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
@ -252,26 +246,26 @@ class LRUSecondaryCacheTest : public testing::Test {
std::string str1 = rnd.RandomString(1010);
std::string str1_clone{str1};
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// After Insert, lru cache contains k2 and secondary cache contains k1.
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_,
str2.length()));
std::string str3 = rnd.RandomString(1020);
TestItem* item3 = new TestItem(str3.data(), str3.length());
// After Insert, lru cache contains k3 and secondary cache contains k1 and
// k2
ASSERT_OK(cache->Insert("k3", item3, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k3", item3, &CompressedSecondaryCacheTest::helper_,
str3.length()));
Cache::Handle* handle;
handle =
cache->Lookup("k3", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
handle = cache->Lookup("k3", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true,
stats.get());
ASSERT_NE(handle, nullptr);
TestItem* val3 = static_cast<TestItem*>(cache->Value(handle));
ASSERT_NE(val3, nullptr);
@ -279,34 +273,35 @@ class LRUSecondaryCacheTest : public testing::Test {
cache->Release(handle);
// Lookup an non-existent key.
handle =
cache->Lookup("k0", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
handle = cache->Lookup("k0", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true,
stats.get());
ASSERT_EQ(handle, nullptr);
// This Lookup should promote k1 and demote k3, so k2 is evicted from the
// secondary cache. The lru cache contains k1 and secondary cache contains
// k3. item1 was Free(), so it cannot be compared against the item1.
handle =
cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
// This Lookup should promote k1 and erase k1 from the secondary cache,
// then k3 is demoted. So k2 and k3 are in the secondary cache.
handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true,
stats.get());
ASSERT_NE(handle, nullptr);
TestItem* val1_1 = static_cast<TestItem*>(cache->Value(handle));
ASSERT_NE(val1_1, nullptr);
ASSERT_EQ(memcmp(val1_1->Buf(), str1_clone.data(), str1_clone.size()), 0);
cache->Release(handle);
handle =
cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
ASSERT_EQ(handle, nullptr);
handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true,
stats.get());
ASSERT_NE(handle, nullptr);
cache->Release(handle);
cache.reset();
secondary_cache.reset();
}
void BasicIntegrationFailTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
CompressedSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
@ -321,7 +316,7 @@ class LRUSecondaryCacheTest : public testing::Test {
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
@ -333,7 +328,8 @@ class LRUSecondaryCacheTest : public testing::Test {
auto item1 =
std::unique_ptr<TestItem>(new TestItem(str1.data(), str1.length()));
ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length()));
ASSERT_OK(cache->Insert("k1", item1.get(), &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k1", item1.get(),
&CompressedSecondaryCacheTest::helper_,
str1.length()));
item1.release(); // Appease clang-analyze "potential memory leak"
@ -341,7 +337,7 @@ class LRUSecondaryCacheTest : public testing::Test {
handle = cache->Lookup("k2", nullptr, test_item_creator,
Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, false);
ASSERT_EQ(handle, nullptr);
@ -350,7 +346,7 @@ class LRUSecondaryCacheTest : public testing::Test {
}
void IntegrationSaveFailTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
CompressedSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
@ -366,7 +362,7 @@ class LRUSecondaryCacheTest : public testing::Test {
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
@ -376,25 +372,27 @@ class LRUSecondaryCacheTest : public testing::Test {
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_fail_,
ASSERT_OK(cache->Insert("k1", item1,
&CompressedSecondaryCacheTest::helper_fail_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_fail_,
ASSERT_OK(cache->Insert("k2", item2,
&CompressedSecondaryCacheTest::helper_fail_,
str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_,
handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should fail, since k1 demotion would have failed
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_fail_,
handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_,
handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
@ -404,7 +402,7 @@ class LRUSecondaryCacheTest : public testing::Test {
}
void IntegrationCreateFailTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
CompressedSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
@ -420,7 +418,7 @@ class LRUSecondaryCacheTest : public testing::Test {
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
@ -430,27 +428,27 @@ class LRUSecondaryCacheTest : public testing::Test {
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
SetFailCreate(true);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should fail, since k1 creation would have failed
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
@ -460,7 +458,7 @@ class LRUSecondaryCacheTest : public testing::Test {
}
void IntegrationFullCapacityTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
CompressedSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
@ -476,7 +474,7 @@ class LRUSecondaryCacheTest : public testing::Test {
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
NewCompressedSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
@ -486,31 +484,32 @@ class LRUSecondaryCacheTest : public testing::Test {
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
// k1 promotion should fail due to the block cache being at capacity,
// but the lookup should still succeed
Cache::Handle* handle2;
handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
handle2 = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle2, nullptr);
cache->Release(handle2);
// k1 promotion should fail due to the block cache being at capacity,
// but the lookup should still succeed
Cache::Handle* handle1;
handle1 = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle1, nullptr);
cache->Release(handle1);
// Since k1 didn't get inserted, k2 should still be in cache
cache->Release(handle);
handle2 = cache->Lookup("k2", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle2, nullptr);
cache->Release(handle2);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
cache.reset();
secondary_cache.reset();
@ -520,72 +519,83 @@ class LRUSecondaryCacheTest : public testing::Test {
bool fail_create_;
};
Cache::CacheItemHelper LRUSecondaryCacheTest::helper_(
LRUSecondaryCacheTest::SizeCallback, LRUSecondaryCacheTest::SaveToCallback,
LRUSecondaryCacheTest::DeletionCallback);
Cache::CacheItemHelper CompressedSecondaryCacheTest::helper_(
CompressedSecondaryCacheTest::SizeCallback,
CompressedSecondaryCacheTest::SaveToCallback,
CompressedSecondaryCacheTest::DeletionCallback);
Cache::CacheItemHelper LRUSecondaryCacheTest::helper_fail_(
LRUSecondaryCacheTest::SizeCallback,
LRUSecondaryCacheTest::SaveToCallbackFail,
LRUSecondaryCacheTest::DeletionCallback);
Cache::CacheItemHelper CompressedSecondaryCacheTest::helper_fail_(
CompressedSecondaryCacheTest::SizeCallback,
CompressedSecondaryCacheTest::SaveToCallbackFail,
CompressedSecondaryCacheTest::DeletionCallback);
TEST_F(LRUSecondaryCacheTest, BasicTestWithNoCompression) {
TEST_F(CompressedSecondaryCacheTest, BasicTestWithNoCompression) {
BasicTest(false, false);
}
TEST_F(LRUSecondaryCacheTest, BasicTestWithMemoryAllocatorAndNoCompression) {
TEST_F(CompressedSecondaryCacheTest,
BasicTestWithMemoryAllocatorAndNoCompression) {
BasicTest(false, true);
}
TEST_F(LRUSecondaryCacheTest, BasicTestWithCompression) {
TEST_F(CompressedSecondaryCacheTest, BasicTestWithCompression) {
BasicTest(true, false);
}
TEST_F(LRUSecondaryCacheTest, BasicTestWithMemoryAllocatorAndCompression) {
TEST_F(CompressedSecondaryCacheTest,
BasicTestWithMemoryAllocatorAndCompression) {
BasicTest(true, true);
}
TEST_F(LRUSecondaryCacheTest, FailsTestWithNoCompression) { FailsTest(false); }
TEST_F(CompressedSecondaryCacheTest, FailsTestWithNoCompression) {
FailsTest(false);
}
TEST_F(LRUSecondaryCacheTest, FailsTestWithCompression) { FailsTest(true); }
TEST_F(CompressedSecondaryCacheTest, FailsTestWithCompression) {
FailsTest(true);
}
TEST_F(LRUSecondaryCacheTest, BasicIntegrationTestWithNoCompression) {
TEST_F(CompressedSecondaryCacheTest, BasicIntegrationTestWithNoCompression) {
BasicIntegrationTest(false);
}
TEST_F(LRUSecondaryCacheTest, BasicIntegrationTestWithCompression) {
TEST_F(CompressedSecondaryCacheTest, BasicIntegrationTestWithCompression) {
BasicIntegrationTest(true);
}
TEST_F(LRUSecondaryCacheTest, BasicIntegrationFailTestWithNoCompression) {
TEST_F(CompressedSecondaryCacheTest,
BasicIntegrationFailTestWithNoCompression) {
BasicIntegrationFailTest(false);
}
TEST_F(LRUSecondaryCacheTest, BasicIntegrationFailTestWithCompression) {
TEST_F(CompressedSecondaryCacheTest, BasicIntegrationFailTestWithCompression) {
BasicIntegrationFailTest(true);
}
TEST_F(LRUSecondaryCacheTest, IntegrationSaveFailTestWithNoCompression) {
TEST_F(CompressedSecondaryCacheTest, IntegrationSaveFailTestWithNoCompression) {
IntegrationSaveFailTest(false);
}
TEST_F(LRUSecondaryCacheTest, IntegrationSaveFailTestWithCompression) {
TEST_F(CompressedSecondaryCacheTest, IntegrationSaveFailTestWithCompression) {
IntegrationSaveFailTest(true);
}
TEST_F(LRUSecondaryCacheTest, IntegrationCreateFailTestWithNoCompression) {
TEST_F(CompressedSecondaryCacheTest,
IntegrationCreateFailTestWithNoCompression) {
IntegrationCreateFailTest(false);
}
TEST_F(LRUSecondaryCacheTest, IntegrationCreateFailTestWithCompression) {
TEST_F(CompressedSecondaryCacheTest, IntegrationCreateFailTestWithCompression) {
IntegrationCreateFailTest(true);
}
TEST_F(LRUSecondaryCacheTest, IntegrationFullCapacityTestWithNoCompression) {
TEST_F(CompressedSecondaryCacheTest,
IntegrationFullCapacityTestWithNoCompression) {
IntegrationFullCapacityTest(false);
}
TEST_F(LRUSecondaryCacheTest, IntegrationFullCapacityTestWithCompression) {
TEST_F(CompressedSecondaryCacheTest,
IntegrationFullCapacityTestWithCompression) {
IntegrationFullCapacityTest(true);
}

10
cache/lru_cache.cc vendored

@ -298,7 +298,7 @@ void LRUCacheShard::SetCapacity(size_t capacity) {
// Free the entries outside of mutex for performance reasons.
for (auto entry : last_reference_list) {
if (secondary_cache_ && entry->IsSecondaryCacheCompatible() &&
!entry->IsPromoted()) {
!entry->IsInSecondaryCache()) {
secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper)
.PermitUncheckedError();
}
@ -373,7 +373,7 @@ Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle,
// Free the entries here outside of mutex for performance reasons.
for (auto entry : last_reference_list) {
if (secondary_cache_ && entry->IsSecondaryCacheCompatible() &&
!entry->IsPromoted()) {
!entry->IsInSecondaryCache()) {
secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper)
.PermitUncheckedError();
}
@ -389,7 +389,6 @@ void LRUCacheShard::Promote(LRUHandle* e) {
assert(secondary_handle->IsReady());
e->SetIncomplete(false);
e->SetInCache(true);
e->SetPromoted(true);
e->value = secondary_handle->Value();
e->charge = secondary_handle->Size();
delete secondary_handle;
@ -446,8 +445,9 @@ Cache::Handle* LRUCacheShard::Lookup(
// accounting purposes, which we won't demote to the secondary cache
// anyway.
assert(create_cb && helper->del_cb);
bool is_in_sec_cache{false};
std::unique_ptr<SecondaryCacheResultHandle> secondary_handle =
secondary_cache_->Lookup(key, create_cb, wait);
secondary_cache_->Lookup(key, create_cb, wait, is_in_sec_cache);
if (secondary_handle != nullptr) {
e = reinterpret_cast<LRUHandle*>(
new char[sizeof(LRUHandle) - 1 + key.size()]);
@ -467,6 +467,7 @@ Cache::Handle* LRUCacheShard::Lookup(
if (wait) {
Promote(e);
e->SetIsInSecondaryCache(is_in_sec_cache);
if (!e->value) {
// The secondary cache returned a handle, but the lookup failed.
e->Unref();
@ -480,6 +481,7 @@ Cache::Handle* LRUCacheShard::Lookup(
// If wait is false, we always return a handle and let the caller
// release the handle after checking for success or failure.
e->SetIncomplete(true);
e->SetIsInSecondaryCache(is_in_sec_cache);
// This may be slightly inaccurate, if the lookup eventually fails.
// But the probability is very low.
PERF_COUNTER_ADD(secondary_cache_hit_count, 1);

19
cache/lru_cache.h vendored

@ -85,8 +85,8 @@ struct LRUHandle {
IS_SECONDARY_CACHE_COMPATIBLE = (1 << 4),
// Is the handle still being read from a lower tier.
IS_PENDING = (1 << 5),
// Has the item been promoted from a lower tier.
IS_PROMOTED = (1 << 6),
// Whether this handle is still in a lower tier
IS_IN_SECONDARY_CACHE = (1 << 6),
};
uint8_t flags;
@ -129,7 +129,7 @@ struct LRUHandle {
#endif // __SANITIZE_THREAD__
}
bool IsPending() const { return flags & IS_PENDING; }
bool IsPromoted() const { return flags & IS_PROMOTED; }
bool IsInSecondaryCache() const { return flags & IS_IN_SECONDARY_CACHE; }
void SetInCache(bool in_cache) {
if (in_cache) {
@ -176,11 +176,11 @@ struct LRUHandle {
}
}
void SetPromoted(bool promoted) {
if (promoted) {
flags |= IS_PROMOTED;
void SetIsInSecondaryCache(bool is_in_secondary_cache) {
if (is_in_secondary_cache) {
flags |= IS_IN_SECONDARY_CACHE;
} else {
flags &= ~IS_PROMOTED;
flags &= ~IS_IN_SECONDARY_CACHE;
}
}
@ -371,8 +371,9 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard {
Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge,
DeleterFn deleter, const Cache::CacheItemHelper* helper,
Cache::Handle** handle, Cache::Priority priority);
// Promote an item looked up from the secondary cache to the LRU cache. The
// item is only inserted into the hash table and not the LRU list, and only
// Promote an item looked up from the secondary cache to the LRU cache.
// The item may be still in the secondary cache.
// It is only inserted into the hash table and not the LRU list, and only
// if the cache is not at full capacity, as is the case during Insert. The
// caller should hold a reference on the LRUHandle. When the caller releases
// the last reference, the item is added to the LRU list.

@ -266,12 +266,13 @@ class TestSecondaryCache : public SecondaryCache {
}
std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb,
bool /*wait*/) override {
const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/,
bool& is_in_sec_cache) override {
std::string key_str = key.ToString();
TEST_SYNC_POINT_CALLBACK("TestSecondaryCache::Lookup", &key_str);
std::unique_ptr<SecondaryCacheResultHandle> secondary_handle;
is_in_sec_cache = false;
ResultType type = ResultType::SUCCESS;
auto iter = result_map_.find(key.ToString());
if (iter != result_map_.end()) {
@ -296,6 +297,7 @@ class TestSecondaryCache : public SecondaryCache {
if (s.ok()) {
secondary_handle.reset(new TestSecondaryCacheResultHandle(
cache_.get(), handle, value, charge, type));
is_in_sec_cache = true;
} else {
cache_->Release(handle);
}
@ -383,10 +385,10 @@ class DBSecondaryCacheTest : public DBTestBase {
std::unique_ptr<Env> fault_env_;
};
class LRUSecondaryCacheTest : public LRUCacheTest {
class LRUCacheSecondaryCacheTest : public LRUCacheTest {
public:
LRUSecondaryCacheTest() : fail_create_(false) {}
~LRUSecondaryCacheTest() {}
LRUCacheSecondaryCacheTest() : fail_create_(false) {}
~LRUCacheSecondaryCacheTest() {}
protected:
class TestItem {
@ -449,16 +451,17 @@ class LRUSecondaryCacheTest : public LRUCacheTest {
bool fail_create_;
};
Cache::CacheItemHelper LRUSecondaryCacheTest::helper_(
LRUSecondaryCacheTest::SizeCallback, LRUSecondaryCacheTest::SaveToCallback,
LRUSecondaryCacheTest::DeletionCallback);
Cache::CacheItemHelper LRUCacheSecondaryCacheTest::helper_(
LRUCacheSecondaryCacheTest::SizeCallback,
LRUCacheSecondaryCacheTest::SaveToCallback,
LRUCacheSecondaryCacheTest::DeletionCallback);
Cache::CacheItemHelper LRUSecondaryCacheTest::helper_fail_(
LRUSecondaryCacheTest::SizeCallback,
LRUSecondaryCacheTest::SaveToCallbackFail,
LRUSecondaryCacheTest::DeletionCallback);
Cache::CacheItemHelper LRUCacheSecondaryCacheTest::helper_fail_(
LRUCacheSecondaryCacheTest::SizeCallback,
LRUCacheSecondaryCacheTest::SaveToCallbackFail,
LRUCacheSecondaryCacheTest::DeletionCallback);
TEST_F(LRUSecondaryCacheTest, BasicTest) {
TEST_F(LRUCacheSecondaryCacheTest, BasicTest) {
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
@ -470,25 +473,25 @@ TEST_F(LRUSecondaryCacheTest, BasicTest) {
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k1", item1, &LRUCacheSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to NVM
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k2", item2, &LRUCacheSecondaryCacheTest::helper_,
str2.length()));
get_perf_context()->Reset();
Cache::Handle* handle;
handle =
cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
cache->Lookup("k2", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true, stats.get());
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should promote k1 and demote k2
handle =
cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
cache->Lookup("k1", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true, stats.get());
ASSERT_NE(handle, nullptr);
cache->Release(handle);
ASSERT_EQ(secondary_cache->num_inserts(), 2u);
@ -502,7 +505,7 @@ TEST_F(LRUSecondaryCacheTest, BasicTest) {
secondary_cache.reset();
}
TEST_F(LRUSecondaryCacheTest, BasicFailTest) {
TEST_F(LRUCacheSecondaryCacheTest, BasicFailTest) {
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
@ -515,15 +518,15 @@ TEST_F(LRUSecondaryCacheTest, BasicFailTest) {
auto item1 = std::make_unique<TestItem>(str1.data(), str1.length());
ASSERT_TRUE(cache->Insert("k1", item1.get(), nullptr, str1.length())
.IsInvalidArgument());
ASSERT_OK(cache->Insert("k1", item1.get(), &LRUSecondaryCacheTest::helper_,
str1.length()));
ASSERT_OK(cache->Insert("k1", item1.get(),
&LRUCacheSecondaryCacheTest::helper_, str1.length()));
item1.release(); // Appease clang-analyze "potential memory leak"
Cache::Handle* handle;
handle = cache->Lookup("k2", nullptr, test_item_creator, Cache::Priority::LOW,
true);
ASSERT_EQ(handle, nullptr);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k2", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, false);
ASSERT_EQ(handle, nullptr);
@ -531,7 +534,7 @@ TEST_F(LRUSecondaryCacheTest, BasicFailTest) {
secondary_cache.reset();
}
TEST_F(LRUSecondaryCacheTest, SaveFailTest) {
TEST_F(LRUCacheSecondaryCacheTest, SaveFailTest) {
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
@ -542,25 +545,25 @@ TEST_F(LRUSecondaryCacheTest, SaveFailTest) {
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_fail_,
str1.length()));
ASSERT_OK(cache->Insert(
"k1", item1, &LRUCacheSecondaryCacheTest::helper_fail_, str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to NVM
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_fail_,
str2.length()));
ASSERT_OK(cache->Insert(
"k2", item2, &LRUCacheSecondaryCacheTest::helper_fail_, str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_,
handle = cache->Lookup("k2", &LRUCacheSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should fail, since k1 demotion would have failed
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_fail_,
handle = cache->Lookup("k1", &LRUCacheSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_,
handle = cache->Lookup("k2", &LRUCacheSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
@ -571,7 +574,7 @@ TEST_F(LRUSecondaryCacheTest, SaveFailTest) {
secondary_cache.reset();
}
TEST_F(LRUSecondaryCacheTest, CreateFailTest) {
TEST_F(LRUCacheSecondaryCacheTest, CreateFailTest) {
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
@ -582,26 +585,26 @@ TEST_F(LRUSecondaryCacheTest, CreateFailTest) {
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k1", item1, &LRUCacheSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to NVM
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k2", item2, &LRUCacheSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
SetFailCreate(true);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k2", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should fail, since k1 creation would have failed
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k1", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k2", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
@ -612,7 +615,7 @@ TEST_F(LRUSecondaryCacheTest, CreateFailTest) {
secondary_cache.reset();
}
TEST_F(LRUSecondaryCacheTest, FullCapacityTest) {
TEST_F(LRUCacheSecondaryCacheTest, FullCapacityTest) {
LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
@ -623,28 +626,28 @@ TEST_F(LRUSecondaryCacheTest, FullCapacityTest) {
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k1", item1, &LRUCacheSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to NVM
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
ASSERT_OK(cache->Insert("k2", item2, &LRUCacheSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k2", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
// k1 promotion should fail due to the block cache being at capacity,
// but the lookup should still succeed
Cache::Handle* handle2;
handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
handle2 = cache->Lookup("k1", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle2, nullptr);
// Since k1 didn't get inserted, k2 should still be in cache
cache->Release(handle);
cache->Release(handle2);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
handle = cache->Lookup("k2", &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
@ -1046,7 +1049,7 @@ TEST_F(DBSecondaryCacheTest, SecondaryCacheFailureTest) {
Destroy(options);
}
TEST_F(LRUSecondaryCacheTest, BasicWaitAllTest) {
TEST_F(LRUCacheSecondaryCacheTest, BasicWaitAllTest) {
LRUCacheOptions opts(1024, 2, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
std::shared_ptr<TestSecondaryCache> secondary_cache =
@ -1062,7 +1065,8 @@ TEST_F(LRUSecondaryCacheTest, BasicWaitAllTest) {
values.emplace_back(str);
TestItem* item = new TestItem(str.data(), str.length());
ASSERT_OK(cache->Insert("k" + std::to_string(i), item,
&LRUSecondaryCacheTest::helper_, str.length()));
&LRUCacheSecondaryCacheTest::helper_,
str.length()));
}
// Force all entries to be evicted to the secondary cache
cache->SetCapacity(0);
@ -1075,9 +1079,9 @@ TEST_F(LRUSecondaryCacheTest, BasicWaitAllTest) {
{"k5", TestSecondaryCache::ResultType::FAIL}});
std::vector<Cache::Handle*> results;
for (int i = 0; i < 6; ++i) {
results.emplace_back(
cache->Lookup("k" + std::to_string(i), &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, false));
results.emplace_back(cache->Lookup(
"k" + std::to_string(i), &LRUCacheSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, false));
}
cache->WaitAll(results);
for (int i = 0; i < 6; ++i) {

@ -131,7 +131,7 @@ extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
// Options structure for configuring a SecondaryCache instance based on
// LRUCache. The LRUCacheOptions.secondary_cache is not used and
// should not be set.
struct LRUSecondaryCacheOptions : LRUCacheOptions {
struct CompressedSecondaryCacheOptions : LRUCacheOptions {
// The compression method (if any) that is used to compress data.
CompressionType compression_type = CompressionType::kLZ4Compression;
@ -142,8 +142,8 @@ struct LRUSecondaryCacheOptions : LRUCacheOptions {
// header in varint32 format.
uint32_t compress_format_version = 2;
LRUSecondaryCacheOptions() {}
LRUSecondaryCacheOptions(
CompressedSecondaryCacheOptions() {}
CompressedSecondaryCacheOptions(
size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit,
double _high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> _memory_allocator = nullptr,
@ -161,7 +161,7 @@ struct LRUSecondaryCacheOptions : LRUCacheOptions {
// EXPERIMENTAL
// Create a new Secondary Cache that is implemented on top of LRUCache.
extern std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
extern std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
size_t capacity, int num_shard_bits = -1,
bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.5,
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr,
@ -171,8 +171,8 @@ extern std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
CompressionType compression_type = CompressionType::kLZ4Compression,
uint32_t compress_format_version = 2);
extern std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
const LRUSecondaryCacheOptions& opts);
extern std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
const CompressedSecondaryCacheOptions& opts);
// Similar to NewLRUCache, but create a cache based on CLOCK algorithm with
// better concurrent performance in some cases. See util/clock_cache.cc for

@ -68,9 +68,11 @@ class SecondaryCache : public Customizable {
// Lookup the data for the given key in this cache. The create_cb
// will be used to create the object. The handle returned may not be
// ready yet, unless wait=true, in which case Lookup() will block until
// the handle is ready
// the handle is ready. is_in_sec_cache is to indicate whether the
// handle is possibly erased from the secondary cache after the Lookup.
virtual std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb, bool wait) = 0;
const Slice& key, const Cache::CreateCallback& create_cb, bool wait,
bool& is_in_sec_cache) = 0;
// At the discretion of the implementation, erase the data associated
// with key

@ -1329,7 +1329,8 @@ class TestSecondaryCache : public SecondaryCache {
}
std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& /*key*/, const Cache::CreateCallback& /*create_cb*/,
bool /*wait*/) override {
bool /*wait*/, bool& is_in_sec_cache) override {
is_in_sec_cache = true;
return nullptr;
}
void Erase(const Slice& /*key*/) override {}

@ -6,7 +6,7 @@ LIB_SOURCES = \
cache/cache_reservation_manager.cc \
cache/clock_cache.cc \
cache/lru_cache.cc \
cache/lru_secondary_cache.cc \
cache/compressed_secondary_cache.cc \
cache/sharded_cache.cc \
db/arena_wrapped_db_iter.cc \
db/blob/blob_fetcher.cc \
@ -402,7 +402,7 @@ TEST_MAIN_SOURCES = \
cache/cache_test.cc \
cache/cache_reservation_manager_test.cc \
cache/lru_cache_test.cc \
cache/lru_secondary_cache_test.cc \
cache/compressed_secondary_cache_test.cc \
db/blob/blob_counting_iterator_test.cc \
db/blob/blob_file_addition_test.cc \
db/blob/blob_file_builder_test.cc \

@ -558,32 +558,32 @@ DEFINE_double(cache_high_pri_pool_ratio, 0.0,
DEFINE_bool(use_clock_cache, false,
"Replace default LRU block cache with clock cache.");
DEFINE_bool(use_lru_secondary_cache, false,
"Use the LRUSecondaryCache as the secondary cache.");
DEFINE_bool(use_compressed_secondary_cache, false,
"Use the CompressedSecondaryCache as the secondary cache.");
DEFINE_int64(lru_secondary_cache_size, 8 << 20, // 8MB
"Number of bytes to use as a cache of data.");
DEFINE_int64(compressed_secondary_cache_size, 8 << 20, // 8MB
"Number of bytes to use as a cache of data");
DEFINE_int32(lru_secondary_cache_numshardbits, 6,
DEFINE_int32(compressed_secondary_cache_numshardbits, 6,
"Number of shards for the block cache"
" is 2 ** lru_secondary_cache_numshardbits."
" is 2 ** compressed_secondary_cache_numshardbits."
" Negative means use default settings."
" This is applied only if FLAGS_cache_size is non-negative.");
DEFINE_double(lru_secondary_cache_high_pri_pool_ratio, 0.0,
DEFINE_double(compressed_secondary_cache_high_pri_pool_ratio, 0.0,
"Ratio of block cache reserve for high pri blocks. "
"If > 0.0, we also enable "
"cache_index_and_filter_blocks_with_high_priority.");
DEFINE_string(lru_secondary_cache_compression_type, "lz4",
DEFINE_string(compressed_secondary_cache_compression_type, "lz4",
"The compression algorithm to use for large "
"values stored in LRUSecondaryCache.");
"values stored in CompressedSecondaryCache.");
static enum ROCKSDB_NAMESPACE::CompressionType
FLAGS_lru_secondary_cache_compression_type_e =
FLAGS_compressed_secondary_cache_compression_type_e =
ROCKSDB_NAMESPACE::kLZ4Compression;
DEFINE_uint32(
lru_secondary_cache_compress_format_version, 2,
compressed_secondary_cache_compress_format_version, 2,
"compress_format_version can have two values: "
"compress_format_version == 1 -- decompressed size is not included"
" in the block header."
@ -2851,18 +2851,19 @@ class Benchmark {
}
#endif // ROCKSDB_LITE
if (FLAGS_use_lru_secondary_cache) {
LRUSecondaryCacheOptions secondary_cache_opts;
secondary_cache_opts.capacity = FLAGS_lru_secondary_cache_size;
if (FLAGS_use_compressed_secondary_cache) {
CompressedSecondaryCacheOptions secondary_cache_opts;
secondary_cache_opts.capacity = FLAGS_compressed_secondary_cache_size;
secondary_cache_opts.num_shard_bits =
FLAGS_lru_secondary_cache_numshardbits;
FLAGS_compressed_secondary_cache_numshardbits;
secondary_cache_opts.high_pri_pool_ratio =
FLAGS_lru_secondary_cache_high_pri_pool_ratio;
FLAGS_compressed_secondary_cache_high_pri_pool_ratio;
secondary_cache_opts.compression_type =
FLAGS_lru_secondary_cache_compression_type_e;
FLAGS_compressed_secondary_cache_compression_type_e;
secondary_cache_opts.compress_format_version =
FLAGS_lru_secondary_cache_compress_format_version;
opts.secondary_cache = NewLRUSecondaryCache(secondary_cache_opts);
FLAGS_compressed_secondary_cache_compress_format_version;
opts.secondary_cache =
NewCompressedSecondaryCache(secondary_cache_opts);
}
return NewLRUCache(opts);
@ -8109,8 +8110,8 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_wal_compression_e =
StringToCompressionType(FLAGS_wal_compression.c_str());
FLAGS_lru_secondary_cache_compression_type_e = StringToCompressionType(
FLAGS_lru_secondary_cache_compression_type.c_str());
FLAGS_compressed_secondary_cache_compression_type_e = StringToCompressionType(
FLAGS_compressed_secondary_cache_compression_type.c_str());
#ifndef ROCKSDB_LITE
// Stacked BlobDB

@ -87,9 +87,9 @@ Status FaultInjectionSecondaryCache::Insert(
std::unique_ptr<SecondaryCacheResultHandle>
FaultInjectionSecondaryCache::Lookup(const Slice& key,
const Cache::CreateCallback& create_cb,
bool wait) {
bool wait, bool& is_in_sec_cache) {
std::unique_ptr<SecondaryCacheResultHandle> hdl =
base_->Lookup(key, create_cb, wait);
base_->Lookup(key, create_cb, wait, is_in_sec_cache);
ErrorContext* ctx = GetErrorContext();
if (wait && ctx->rand.OneIn(prob_)) {
hdl.reset();

@ -32,8 +32,8 @@ class FaultInjectionSecondaryCache : public SecondaryCache {
const Cache::CacheItemHelper* helper) override;
std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb,
bool wait) override;
const Slice& key, const Cache::CreateCallback& create_cb, bool wait,
bool& is_in_sec_cache) override;
void Erase(const Slice& /*key*/) override;

Loading…
Cancel
Save