Add a secondary cache implementation based on LRUCache 1 (#9518)

Summary:
**Summary:**
RocksDB uses a block cache to reduce IO and make queries more efficient. The block cache is based on the LRU algorithm (LRUCache) and keeps objects containing uncompressed data, such as Block, ParsedFullFilterBlock etc. It allows the user to configure a second level cache (rocksdb::SecondaryCache) to extend the primary block cache by holding items evicted from it. Some of the major RocksDB users, like MyRocks, use direct IO and would like to use a primary block cache for uncompressed data and a secondary cache for compressed data. The latter allows us to mitigate the loss of the Linux page cache due to direct IO.

This PR includes a concrete implementation of rocksdb::SecondaryCache that integrates with compression libraries such as LZ4 and implements an LRU cache to hold compressed blocks.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9518

Test Plan:
In this PR, the lru_secondary_cache_test.cc includes the following tests:
1. The unit tests for the secondary cache with either compression or no compression, such as basic tests, fails tests.
2. The integration tests with both primary cache and this secondary cache .

**Follow Up:**

1. Statistics (e.g. compression ratio) will be added in another PR.
2. Once this implementation is ready, I will do some shadow testing and benchmarking with UDB to measure the impact.

Reviewed By: anand1976

Differential Revision: D34430930

Pulled By: gitbw95

fbshipit-source-id: 218d78b672a2f914856d8a90ff32f2f5b5043ded
main
Bo Wang 2 years ago committed by Facebook GitHub Bot
parent 6f12599863
commit f706a9c199
  1. 2
      CMakeLists.txt
  2. 3
      Makefile
  3. 8
      TARGETS
  4. 168
      cache/lru_secondary_cache.cc
  5. 85
      cache/lru_secondary_cache.h
  6. 597
      cache/lru_secondary_cache_test.cc
  7. 48
      include/rocksdb/cache.h
  8. 4
      src.mk
  9. 50
      tools/db_bench_tool.cc

@ -627,6 +627,7 @@ set(SOURCES
cache/cache_reservation_manager.cc
cache/clock_cache.cc
cache/lru_cache.cc
cache/lru_secondary_cache.cc
cache/sharded_cache.cc
db/arena_wrapped_db_iter.cc
db/blob/blob_fetcher.cc
@ -1154,6 +1155,7 @@ if(WITH_TESTS)
cache/cache_reservation_manager_test.cc
cache/cache_test.cc
cache/lru_cache_test.cc
cache/lru_secondary_cache_test.cc
db/blob/blob_counting_iterator_test.cc
db/blob/blob_file_addition_test.cc
db/blob/blob_file_builder_test.cc

@ -1897,6 +1897,9 @@ statistics_test: $(OBJ_DIR)/monitoring/statistics_test.o $(TEST_LIBRARY) $(LIBRA
stats_history_test: $(OBJ_DIR)/monitoring/stats_history_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
lru_secondary_cache_test: $(OBJ_DIR)/cache/lru_secondary_cache_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
lru_cache_test: $(OBJ_DIR)/cache/lru_cache_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)

@ -15,6 +15,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"cache/cache_reservation_manager.cc",
"cache/clock_cache.cc",
"cache/lru_cache.cc",
"cache/lru_secondary_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_fetcher.cc",
@ -331,6 +332,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"cache/cache_reservation_manager.cc",
"cache/clock_cache.cc",
"cache/lru_cache.cc",
"cache/lru_secondary_cache.cc",
"cache/sharded_cache.cc",
"db/arena_wrapped_db_iter.cc",
"db/blob/blob_fetcher.cc",
@ -9096,6 +9098,12 @@ cpp_unittest_wrapper(name="lru_cache_test",
extra_compiler_flags=[])
cpp_unittest_wrapper(name="lru_secondary_cache_test",
srcs=["cache/lru_secondary_cache_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="manual_compaction_test",
srcs=["db/manual_compaction_test.cc"],
deps=[":rocksdb_test_lib"],

@ -0,0 +1,168 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "cache/lru_secondary_cache.h"
#include <memory>
#include "memory/memory_allocator.h"
#include "util/compression.h"
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
namespace {
void DeletionCallback(const Slice& /*key*/, void* obj) {
delete reinterpret_cast<CacheAllocationPtr*>(obj);
obj = nullptr;
}
} // namespace
LRUSecondaryCache::LRUSecondaryCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy,
CompressionType compression_type, uint32_t compress_format_version)
: cache_options_(capacity, num_shard_bits, strict_capacity_limit,
high_pri_pool_ratio, memory_allocator, use_adaptive_mutex,
metadata_charge_policy, compression_type,
compress_format_version) {
cache_ = NewLRUCache(capacity, num_shard_bits, strict_capacity_limit,
high_pri_pool_ratio, memory_allocator,
use_adaptive_mutex, metadata_charge_policy);
}
LRUSecondaryCache::~LRUSecondaryCache() { cache_.reset(); }
std::unique_ptr<SecondaryCacheResultHandle> LRUSecondaryCache::Lookup(
const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/) {
std::unique_ptr<SecondaryCacheResultHandle> handle;
Cache::Handle* lru_handle = cache_->Lookup(key);
if (lru_handle == nullptr) {
return handle;
}
CacheAllocationPtr* ptr =
reinterpret_cast<CacheAllocationPtr*>(cache_->Value(lru_handle));
void* value = nullptr;
size_t charge = 0;
Status s;
if (cache_options_.compression_type == kNoCompression) {
s = create_cb(ptr->get(), cache_->GetCharge(lru_handle), &value, &charge);
} else {
UncompressionContext uncompression_context(cache_options_.compression_type);
UncompressionInfo uncompression_info(uncompression_context,
UncompressionDict::GetEmptyDict(),
cache_options_.compression_type);
size_t uncompressed_size = 0;
CacheAllocationPtr uncompressed;
uncompressed = UncompressData(
uncompression_info, (char*)ptr->get(), cache_->GetCharge(lru_handle),
&uncompressed_size, cache_options_.compress_format_version,
cache_options_.memory_allocator.get());
if (!uncompressed) {
cache_->Release(lru_handle, true);
return handle;
}
s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge);
}
if (!s.ok()) {
cache_->Release(lru_handle, true);
return handle;
}
handle.reset(new LRUSecondaryCacheResultHandle(value, charge));
cache_->Release(lru_handle);
return handle;
}
Status LRUSecondaryCache::Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) {
size_t size = (*helper->size_cb)(value);
CacheAllocationPtr ptr =
AllocateBlock(size, cache_options_.memory_allocator.get());
Status s = (*helper->saveto_cb)(value, 0, size, ptr.get());
if (!s.ok()) {
return s;
}
Slice val(ptr.get(), size);
std::string compressed_val;
if (cache_options_.compression_type != kNoCompression) {
CompressionOptions compression_opts;
CompressionContext compression_context(cache_options_.compression_type);
uint64_t sample_for_compression = 0;
CompressionInfo compression_info(
compression_opts, compression_context, CompressionDict::GetEmptyDict(),
cache_options_.compression_type, sample_for_compression);
bool success =
CompressData(val, compression_info,
cache_options_.compress_format_version, &compressed_val);
if (!success) {
return Status::Corruption("Error compressing value.");
}
val = Slice(compressed_val);
size = compressed_val.size();
ptr = AllocateBlock(size, cache_options_.memory_allocator.get());
memcpy(ptr.get(), compressed_val.data(), size);
}
CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
return cache_->Insert(key, buf, size, DeletionCallback);
}
void LRUSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
std::string LRUSecondaryCache::GetPrintableOptions() const {
std::string ret;
ret.reserve(20000);
const int kBufferSize = 200;
char buffer[kBufferSize];
ret.append(cache_->GetPrintableOptions());
snprintf(buffer, kBufferSize, " compression_type : %s\n",
CompressionTypeToString(cache_options_.compression_type).c_str());
ret.append(buffer);
snprintf(buffer, kBufferSize, " compression_type : %d\n",
cache_options_.compress_format_version);
ret.append(buffer);
return ret;
}
std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
CacheMetadataChargePolicy metadata_charge_policy,
CompressionType compression_type, uint32_t compress_format_version) {
return std::make_shared<LRUSecondaryCache>(
capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio,
memory_allocator, use_adaptive_mutex, metadata_charge_policy,
compression_type, compress_format_version);
}
std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
const LRUSecondaryCacheOptions& opts) {
// The secondary_cache is disabled for this LRUCache instance.
assert(opts.secondary_cache == nullptr);
return NewLRUSecondaryCache(
opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit,
opts.high_pri_pool_ratio, opts.memory_allocator, opts.use_adaptive_mutex,
opts.metadata_charge_policy, opts.compression_type,
opts.compress_format_version);
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,85 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <memory>
#include "cache/lru_cache.h"
#include "memory/memory_allocator.h"
#include "rocksdb/secondary_cache.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
class LRUSecondaryCacheResultHandle : public SecondaryCacheResultHandle {
public:
LRUSecondaryCacheResultHandle(void* value, size_t size)
: value_(value), size_(size) {}
virtual ~LRUSecondaryCacheResultHandle() override = default;
LRUSecondaryCacheResultHandle(const LRUSecondaryCacheResultHandle&) = delete;
LRUSecondaryCacheResultHandle& operator=(
const LRUSecondaryCacheResultHandle&) = delete;
bool IsReady() override { return true; }
void Wait() override {}
void* Value() override { return value_; }
size_t Size() override { return size_; }
private:
void* value_;
size_t size_;
};
// The LRUSecondaryCache is a concrete implementation of
// rocksdb::SecondaryCache.
//
// Users can also cast a pointer to it and call methods on
// it directly, especially custom methods that may be added
// in the future. For example -
// std::unique_ptr<rocksdb::SecondaryCache> cache =
// NewLRUSecondaryCache(opts);
// static_cast<LRUSecondaryCache*>(cache.get())->Erase(key);
class LRUSecondaryCache : public SecondaryCache {
public:
LRUSecondaryCache(
size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr,
bool use_adaptive_mutex = kDefaultToAdaptiveMutex,
CacheMetadataChargePolicy metadata_charge_policy =
kDontChargeCacheMetadata,
CompressionType compression_type = CompressionType::kLZ4Compression,
uint32_t compress_format_version = 2);
virtual ~LRUSecondaryCache() override;
const char* Name() const override { return "LRUSecondaryCache"; }
Status Insert(const Slice& key, void* value,
const Cache::CacheItemHelper* helper) override;
std::unique_ptr<SecondaryCacheResultHandle> Lookup(
const Slice& key, const Cache::CreateCallback& create_cb,
bool /*wait*/) override;
void Erase(const Slice& key) override;
void WaitAll(std::vector<SecondaryCacheResultHandle*> /*handles*/) override {}
std::string GetPrintableOptions() const override;
private:
std::shared_ptr<Cache> cache_;
LRUSecondaryCacheOptions cache_options_;
};
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,597 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "cache/lru_secondary_cache.h"
#include <algorithm>
#include <cstdint>
#include "memory/jemalloc_nodump_allocator.h"
#include "memory/memory_allocator.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/compression.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
class LRUSecondaryCacheTest : public testing::Test {
public:
LRUSecondaryCacheTest() : fail_create_(false) {}
~LRUSecondaryCacheTest() {}
protected:
class TestItem {
public:
TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) {
memcpy(buf_.get(), buf, size);
}
~TestItem() {}
char* Buf() { return buf_.get(); }
size_t Size() { return size_; }
private:
std::unique_ptr<char[]> buf_;
size_t size_;
};
static size_t SizeCallback(void* obj) {
return reinterpret_cast<TestItem*>(obj)->Size();
}
static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
TestItem* item = reinterpret_cast<TestItem*>(from_obj);
const char* buf = item->Buf();
EXPECT_EQ(length, item->Size());
EXPECT_EQ(from_offset, 0);
memcpy(out, buf, length);
return Status::OK();
}
static void DeletionCallback(const Slice& /*key*/, void* obj) {
delete reinterpret_cast<TestItem*>(obj);
obj = nullptr;
}
static Cache::CacheItemHelper helper_;
static Status SaveToCallbackFail(void* /*obj*/, size_t /*offset*/,
size_t /*size*/, void* /*out*/) {
return Status::NotSupported();
}
static Cache::CacheItemHelper helper_fail_;
Cache::CreateCallback test_item_creator = [&](const void* buf, size_t size,
void** out_obj,
size_t* charge) -> Status {
if (fail_create_) {
return Status::NotSupported();
}
*out_obj = reinterpret_cast<void*>(new TestItem((char*)buf, size));
*charge = size;
return Status::OK();
};
void SetFailCreate(bool fail) { fail_create_ = fail; }
void BasicTest(bool sec_cache_is_compressed, bool use_jemalloc) {
LRUSecondaryCacheOptions opts;
opts.capacity = 2048;
opts.num_shard_bits = 0;
opts.metadata_charge_policy = kDontChargeCacheMetadata;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
opts.compression_type = CompressionType::kNoCompression;
}
} else {
opts.compression_type = CompressionType::kNoCompression;
}
if (use_jemalloc) {
JemallocAllocatorOptions jopts;
std::shared_ptr<MemoryAllocator> allocator;
std::string msg;
if (JemallocNodumpAllocator::IsSupported(&msg)) {
Status s = NewJemallocNodumpAllocator(jopts, &allocator);
if (s.ok()) {
opts.memory_allocator = allocator;
}
} else {
ROCKSDB_GTEST_BYPASS("JEMALLOC not supported");
}
}
std::shared_ptr<SecondaryCache> cache = NewLRUSecondaryCache(opts);
// Lookup an non-existent key.
std::unique_ptr<SecondaryCacheResultHandle> handle0 =
cache->Lookup("k0", test_item_creator, true);
ASSERT_EQ(handle0, nullptr);
Random rnd(301);
// Insert and Lookup the first item.
std::string str1;
test::CompressibleString(&rnd, 0.25, 1000, &str1);
TestItem item1(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", &item1, &LRUSecondaryCacheTest::helper_));
std::unique_ptr<SecondaryCacheResultHandle> handle1 =
cache->Lookup("k1", test_item_creator, true);
ASSERT_NE(handle1, nullptr);
// delete reinterpret_cast<TestItem*>(handle1->Value());
std::unique_ptr<TestItem> val1 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle1->Value()));
ASSERT_NE(val1, nullptr);
ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0);
// Insert and Lookup the second item.
std::string str2;
test::CompressibleString(&rnd, 0.5, 1000, &str2);
TestItem item2(str2.data(), str2.length());
ASSERT_OK(cache->Insert("k2", &item2, &LRUSecondaryCacheTest::helper_));
std::unique_ptr<SecondaryCacheResultHandle> handle2 =
cache->Lookup("k2", test_item_creator, true);
ASSERT_NE(handle2, nullptr);
std::unique_ptr<TestItem> val2 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle2->Value()));
ASSERT_NE(val2, nullptr);
ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0);
// Lookup the first item again to make sure it is still in the cache.
std::unique_ptr<SecondaryCacheResultHandle> handle1_1 =
cache->Lookup("k1", test_item_creator, true);
ASSERT_NE(handle1_1, nullptr);
std::unique_ptr<TestItem> val1_1 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle1_1->Value()));
ASSERT_NE(val1_1, nullptr);
ASSERT_EQ(memcmp(val1_1->Buf(), item1.Buf(), item1.Size()), 0);
std::vector<SecondaryCacheResultHandle*> handles = {handle1.get(),
handle2.get()};
cache->WaitAll(handles);
cache->Erase("k1");
handle1 = cache->Lookup("k1", test_item_creator, true);
ASSERT_EQ(handle1, nullptr);
cache.reset();
}
void FailsTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
} else {
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
secondary_cache_opts.capacity = 1100;
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> cache =
NewLRUSecondaryCache(secondary_cache_opts);
// Insert and Lookup the first item.
Random rnd(301);
std::string str1(rnd.RandomString(1000));
TestItem item1(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", &item1, &LRUSecondaryCacheTest::helper_));
std::unique_ptr<SecondaryCacheResultHandle> handle1 =
cache->Lookup("k1", test_item_creator, true);
ASSERT_NE(handle1, nullptr);
std::unique_ptr<TestItem> val1 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle1->Value()));
ASSERT_NE(val1, nullptr);
ASSERT_EQ(memcmp(val1->Buf(), item1.Buf(), item1.Size()), 0);
// Insert and Lookup the second item.
std::string str2(rnd.RandomString(200));
TestItem item2(str2.data(), str2.length());
// k1 is evicted.
ASSERT_OK(cache->Insert("k2", &item2, &LRUSecondaryCacheTest::helper_));
std::unique_ptr<SecondaryCacheResultHandle> handle1_1 =
cache->Lookup("k1", test_item_creator, true);
ASSERT_EQ(handle1_1, nullptr);
std::unique_ptr<SecondaryCacheResultHandle> handle2 =
cache->Lookup("k2", test_item_creator, true);
ASSERT_NE(handle2, nullptr);
std::unique_ptr<TestItem> val2 =
std::unique_ptr<TestItem>(static_cast<TestItem*>(handle2->Value()));
ASSERT_NE(val2, nullptr);
ASSERT_EQ(memcmp(val2->Buf(), item2.Buf(), item2.Size()), 0);
// Create Fails.
SetFailCreate(true);
std::unique_ptr<SecondaryCacheResultHandle> handle2_1 =
cache->Lookup("k2", test_item_creator, true);
ASSERT_EQ(handle2_1, nullptr);
// Save Fails.
std::string str3 = rnd.RandomString(10);
TestItem item3(str3.data(), str3.length());
ASSERT_NOK(
cache->Insert("k3", &item3, &LRUSecondaryCacheTest::helper_fail_));
cache.reset();
}
void BasicIntegrationTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
} else {
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
secondary_cache_opts.capacity = 2300;
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
LRUCacheOptions lru_cache_opts(1024, 0, false, 0.5, nullptr,
kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
lru_cache_opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(lru_cache_opts);
std::shared_ptr<Statistics> stats = CreateDBStatistics();
Random rnd(301);
std::string str1 = rnd.RandomString(1010);
std::string str1_clone{str1};
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// After Insert, lru cache contains k2 and secondary cache contains k1.
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
str2.length()));
std::string str3 = rnd.RandomString(1020);
TestItem* item3 = new TestItem(str3.data(), str3.length());
// After Insert, lru cache contains k3 and secondary cache contains k1 and
// k2
ASSERT_OK(cache->Insert("k3", item3, &LRUSecondaryCacheTest::helper_,
str3.length()));
Cache::Handle* handle;
handle =
cache->Lookup("k3", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
ASSERT_NE(handle, nullptr);
TestItem* val3 = static_cast<TestItem*>(cache->Value(handle));
ASSERT_NE(val3, nullptr);
ASSERT_EQ(memcmp(val3->Buf(), item3->Buf(), item3->Size()), 0);
cache->Release(handle);
// Lookup an non-existent key.
handle =
cache->Lookup("k0", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
ASSERT_EQ(handle, nullptr);
// This Lookup should promote k1 and demote k3, so k2 is evicted from the
// secondary cache. The lru cache contains k1 and secondary cache contains
// k3. item1 was Free(), so it cannot be compared against the item1.
handle =
cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
ASSERT_NE(handle, nullptr);
TestItem* val1_1 = static_cast<TestItem*>(cache->Value(handle));
ASSERT_NE(val1_1, nullptr);
ASSERT_EQ(memcmp(val1_1->Buf(), str1_clone.data(), str1_clone.size()), 0);
cache->Release(handle);
handle =
cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, test_item_creator,
Cache::Priority::LOW, true, stats.get());
ASSERT_EQ(handle, nullptr);
cache.reset();
secondary_cache.reset();
}
void BasicIntegrationFailTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
} else {
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
secondary_cache_opts.capacity = 2048;
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
auto item1 =
std::unique_ptr<TestItem>(new TestItem(str1.data(), str1.length()));
ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length()));
ASSERT_OK(cache->Insert("k1", item1.get(), &LRUSecondaryCacheTest::helper_,
str1.length()));
item1.release(); // Appease clang-analyze "potential memory leak"
Cache::Handle* handle;
handle = cache->Lookup("k2", nullptr, test_item_creator,
Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, false);
ASSERT_EQ(handle, nullptr);
cache.reset();
secondary_cache.reset();
}
void IntegrationSaveFailTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
} else {
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
secondary_cache_opts.capacity = 2048;
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_fail_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_fail_,
str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should fail, since k1 demotion would have failed
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
cache.reset();
secondary_cache.reset();
}
void IntegrationCreateFailTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
} else {
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
secondary_cache_opts.capacity = 2048;
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex,
kDontChargeCacheMetadata);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
SetFailCreate(true);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
// This lookup should fail, since k1 creation would have failed
handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_EQ(handle, nullptr);
// Since k1 didn't get promoted, k2 should still be in cache
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
cache.reset();
secondary_cache.reset();
}
void IntegrationFullCapacityTest(bool sec_cache_is_compressed) {
LRUSecondaryCacheOptions secondary_cache_opts;
if (sec_cache_is_compressed) {
if (!LZ4_Supported()) {
ROCKSDB_GTEST_SKIP("This test requires LZ4 support.");
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
} else {
secondary_cache_opts.compression_type = CompressionType::kNoCompression;
}
secondary_cache_opts.capacity = 2048;
secondary_cache_opts.num_shard_bits = 0;
secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<SecondaryCache> secondary_cache =
NewLRUSecondaryCache(secondary_cache_opts);
LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr,
kDefaultToAdaptiveMutex, kDontChargeCacheMetadata);
opts.secondary_cache = secondary_cache;
std::shared_ptr<Cache> cache = NewLRUCache(opts);
Random rnd(301);
std::string str1 = rnd.RandomString(1020);
TestItem* item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1020);
TestItem* item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_,
str2.length()));
Cache::Handle* handle;
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
// k1 promotion should fail due to the block cache being at capacity,
// but the lookup should still succeed
Cache::Handle* handle2;
handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle2, nullptr);
// Since k1 didn't get inserted, k2 should still be in cache
cache->Release(handle);
cache->Release(handle2);
handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true);
ASSERT_NE(handle, nullptr);
cache->Release(handle);
cache.reset();
secondary_cache.reset();
}
private:
bool fail_create_;
};
Cache::CacheItemHelper LRUSecondaryCacheTest::helper_(
LRUSecondaryCacheTest::SizeCallback, LRUSecondaryCacheTest::SaveToCallback,
LRUSecondaryCacheTest::DeletionCallback);
Cache::CacheItemHelper LRUSecondaryCacheTest::helper_fail_(
LRUSecondaryCacheTest::SizeCallback,
LRUSecondaryCacheTest::SaveToCallbackFail,
LRUSecondaryCacheTest::DeletionCallback);
TEST_F(LRUSecondaryCacheTest, BasicTestWithNoCompression) {
BasicTest(false, false);
}
TEST_F(LRUSecondaryCacheTest, BasicTestWithMemoryAllocatorAndNoCompression) {
BasicTest(false, true);
}
TEST_F(LRUSecondaryCacheTest, BasicTestWithCompression) {
BasicTest(true, false);
}
TEST_F(LRUSecondaryCacheTest, BasicTestWithMemoryAllocatorAndCompression) {
BasicTest(true, true);
}
TEST_F(LRUSecondaryCacheTest, FailsTestWithNoCompression) { FailsTest(false); }
TEST_F(LRUSecondaryCacheTest, FailsTestWithCompression) { FailsTest(true); }
TEST_F(LRUSecondaryCacheTest, BasicIntegrationTestWithNoCompression) {
BasicIntegrationTest(false);
}
TEST_F(LRUSecondaryCacheTest, BasicIntegrationTestWithCompression) {
BasicIntegrationTest(true);
}
TEST_F(LRUSecondaryCacheTest, BasicIntegrationFailTestWithNoCompression) {
BasicIntegrationFailTest(false);
}
TEST_F(LRUSecondaryCacheTest, BasicIntegrationFailTestWithCompression) {
BasicIntegrationFailTest(true);
}
TEST_F(LRUSecondaryCacheTest, IntegrationSaveFailTestWithNoCompression) {
IntegrationSaveFailTest(false);
}
TEST_F(LRUSecondaryCacheTest, IntegrationSaveFailTestWithCompression) {
IntegrationSaveFailTest(true);
}
TEST_F(LRUSecondaryCacheTest, IntegrationCreateFailTestWithNoCompression) {
IntegrationCreateFailTest(false);
}
TEST_F(LRUSecondaryCacheTest, IntegrationCreateFailTestWithCompression) {
IntegrationCreateFailTest(true);
}
TEST_F(LRUSecondaryCacheTest, IntegrationFullCapacityTestWithNoCompression) {
IntegrationFullCapacityTest(false);
}
TEST_F(LRUSecondaryCacheTest, IntegrationFullCapacityTestWithCompression) {
IntegrationFullCapacityTest(true);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -27,6 +27,7 @@
#include <memory>
#include <string>
#include "rocksdb/compression_type.h"
#include "rocksdb/memory_allocator.h"
#include "rocksdb/slice.h"
#include "rocksdb/statistics.h"
@ -127,6 +128,53 @@ extern std::shared_ptr<Cache> NewLRUCache(
extern std::shared_ptr<Cache> NewLRUCache(const LRUCacheOptions& cache_opts);
// EXPERIMENTAL
// Options structure for configuring a SecondaryCache instance based on
// LRUCache. The LRUCacheOptions.secondary_cache is not used and
// should not be set.
struct LRUSecondaryCacheOptions : LRUCacheOptions {
// The compression method (if any) that is used to compress data.
CompressionType compression_type = CompressionType::kLZ4Compression;
// compress_format_version can have two values:
// compress_format_version == 1 -- decompressed size is not included in the
// block header.
// compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format.
uint32_t compress_format_version = 2;
LRUSecondaryCacheOptions() {}
LRUSecondaryCacheOptions(
size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit,
double _high_pri_pool_ratio,
std::shared_ptr<MemoryAllocator> _memory_allocator = nullptr,
bool _use_adaptive_mutex = kDefaultToAdaptiveMutex,
CacheMetadataChargePolicy _metadata_charge_policy =
kDefaultCacheMetadataChargePolicy,
CompressionType _compression_type = CompressionType::kLZ4Compression,
uint32_t _compress_format_version = 2)
: LRUCacheOptions(_capacity, _num_shard_bits, _strict_capacity_limit,
_high_pri_pool_ratio, std::move(_memory_allocator),
_use_adaptive_mutex, _metadata_charge_policy),
compression_type(_compression_type),
compress_format_version(_compress_format_version) {}
};
// EXPERIMENTAL
// Create a new Secondary Cache that is implemented on top of LRUCache.
extern std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
size_t capacity, int num_shard_bits = -1,
bool strict_capacity_limit = false, double high_pri_pool_ratio = 0.5,
std::shared_ptr<MemoryAllocator> memory_allocator = nullptr,
bool use_adaptive_mutex = kDefaultToAdaptiveMutex,
CacheMetadataChargePolicy metadata_charge_policy =
kDefaultCacheMetadataChargePolicy,
CompressionType compression_type = CompressionType::kLZ4Compression,
uint32_t compress_format_version = 2);
extern std::shared_ptr<SecondaryCache> NewLRUSecondaryCache(
const LRUSecondaryCacheOptions& opts);
// Similar to NewLRUCache, but create a cache based on CLOCK algorithm with
// better concurrent performance in some cases. See util/clock_cache.cc for
// more detail.

@ -6,6 +6,7 @@ LIB_SOURCES = \
cache/cache_reservation_manager.cc \
cache/clock_cache.cc \
cache/lru_cache.cc \
cache/lru_secondary_cache.cc \
cache/sharded_cache.cc \
db/arena_wrapped_db_iter.cc \
db/blob/blob_fetcher.cc \
@ -398,8 +399,9 @@ BENCH_MAIN_SOURCES = \
TEST_MAIN_SOURCES = \
cache/cache_test.cc \
cache/cache_reservation_manager_test.cc \
cache/cache_reservation_manager_test.cc \
cache/lru_cache_test.cc \
cache/lru_secondary_cache_test.cc \
db/blob/blob_counting_iterator_test.cc \
db/blob/blob_file_addition_test.cc \
db/blob/blob_file_builder_test.cc \

@ -556,6 +556,38 @@ DEFINE_double(cache_high_pri_pool_ratio, 0.0,
DEFINE_bool(use_clock_cache, false,
"Replace default LRU block cache with clock cache.");
DEFINE_bool(use_lru_secondary_cache, false,
"Use the LRUSecondaryCache as the secondary cache.");
DEFINE_int64(lru_secondary_cache_size, 8 << 20, // 8MB
"Number of bytes to use as a cache of data");
DEFINE_int32(lru_secondary_cache_numshardbits, 6,
"Number of shards for the block cache"
" is 2 ** lru_secondary_cache_numshardbits."
" Negative means use default settings."
" This is applied only if FLAGS_cache_size is non-negative.");
DEFINE_double(lru_secondary_cache_high_pri_pool_ratio, 0.0,
"Ratio of block cache reserve for high pri blocks. "
"If > 0.0, we also enable "
"cache_index_and_filter_blocks_with_high_priority.");
DEFINE_string(lru_secondary_cache_compression_type, "lz4",
"The compression algorithm to use for large "
"values stored in LRUSecondaryCache.");
static enum ROCKSDB_NAMESPACE::CompressionType
FLAGS_lru_secondary_cache_compression_type_e =
ROCKSDB_NAMESPACE::kLZ4Compression;
DEFINE_uint32(
lru_secondary_cache_compress_format_version, 2,
"compress_format_version can have two values: "
"compress_format_version == 1 -- decompressed size is not included"
" in the block header."
"compress_format_version == 2 -- decompressed size is included"
" in the block header in varint32 format.");
DEFINE_int64(simcache_size, -1,
"Number of bytes to use as a simcache of "
"uncompressed data. Nagative value disables simcache.");
@ -2791,6 +2823,21 @@ class Benchmark {
opts.secondary_cache = secondary_cache;
}
#endif // ROCKSDB_LITE
if (FLAGS_use_lru_secondary_cache) {
LRUSecondaryCacheOptions secondary_cache_opts;
secondary_cache_opts.capacity = FLAGS_lru_secondary_cache_size;
secondary_cache_opts.num_shard_bits =
FLAGS_lru_secondary_cache_numshardbits;
secondary_cache_opts.high_pri_pool_ratio =
FLAGS_lru_secondary_cache_high_pri_pool_ratio;
secondary_cache_opts.compression_type =
FLAGS_lru_secondary_cache_compression_type_e;
secondary_cache_opts.compress_format_version =
FLAGS_lru_secondary_cache_compress_format_version;
opts.secondary_cache = NewLRUSecondaryCache(secondary_cache_opts);
}
return NewLRUCache(opts);
}
}
@ -7961,6 +8008,9 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_wal_compression_e =
StringToCompressionType(FLAGS_wal_compression.c_str());
FLAGS_lru_secondary_cache_compression_type_e = StringToCompressionType(
FLAGS_lru_secondary_cache_compression_type.c_str());
#ifndef ROCKSDB_LITE
// Stacked BlobDB
FLAGS_blob_db_compression_type_e =

Loading…
Cancel
Save