add SetCapacity and GetCapacity for secondary cache (#10712)

Summary:
To support tuning secondary cache dynamically, add `SetCapacity()` and `GetCapacity()` for CompressedSecondaryCache.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10712

Test Plan: Unit Tests

Reviewed By: anand1976

Differential Revision: D39685212

Pulled By: gitbw95

fbshipit-source-id: 19573c67237011927320207732b5de083cb87240
main
gitbw95 2 years ago committed by Facebook GitHub Bot
parent aa71464410
commit 47b57a3731
  1. 17
      cache/compressed_secondary_cache.cc
  2. 12
      cache/compressed_secondary_cache.h
  3. 102
      cache/compressed_secondary_cache_test.cc
  4. 21
      include/rocksdb/secondary_cache.h
  5. 8
      utilities/fault_injection_secondary_cache.h

@ -178,6 +178,19 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value,
void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
Status CompressedSecondaryCache::SetCapacity(size_t capacity) {
MutexLock l(&capacity_mutex_);
cache_options_.capacity = capacity;
cache_->SetCapacity(capacity);
return Status::OK();
}
Status CompressedSecondaryCache::GetCapacity(size_t& capacity) {
MutexLock l(&capacity_mutex_);
capacity = cache_options_.capacity;
return Status::OK();
}
std::string CompressedSecondaryCache::GetPrintableOptions() const {
std::string ret;
ret.reserve(20000);
@ -194,8 +207,8 @@ std::string CompressedSecondaryCache::GetPrintableOptions() const {
}
CompressedSecondaryCache::CacheValueChunk*
CompressedSecondaryCache::SplitValueIntoChunks(
const Slice& value, const CompressionType compression_type,
CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value,
CompressionType compression_type,
size_t& charge) {
assert(!value.empty());
const char* src_ptr = value.data();

@ -15,6 +15,7 @@
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/compression.h"
#include "util/mutexlock.h"
namespace ROCKSDB_NAMESPACE {
@ -22,7 +23,7 @@ class CompressedSecondaryCacheResultHandle : public SecondaryCacheResultHandle {
public:
CompressedSecondaryCacheResultHandle(void* value, size_t size)
: value_(value), size_(size) {}
virtual ~CompressedSecondaryCacheResultHandle() override = default;
~CompressedSecondaryCacheResultHandle() override = default;
CompressedSecondaryCacheResultHandle(
const CompressedSecondaryCacheResultHandle&) = delete;
@ -78,7 +79,7 @@ class CompressedSecondaryCache : public SecondaryCache {
CompressionType compression_type = CompressionType::kLZ4Compression,
uint32_t compress_format_version = 2,
bool enable_custom_split_merge = false);
virtual ~CompressedSecondaryCache() override;
~CompressedSecondaryCache() override;
const char* Name() const override { return "CompressedSecondaryCache"; }
@ -95,6 +96,10 @@ class CompressedSecondaryCache : public SecondaryCache {
void WaitAll(std::vector<SecondaryCacheResultHandle*> /*handles*/) override {}
Status SetCapacity(size_t capacity) override;
Status GetCapacity(size_t& capacity) override;
std::string GetPrintableOptions() const override;
private:
@ -116,7 +121,7 @@ class CompressedSecondaryCache : public SecondaryCache {
// are stored in CacheValueChunk and extra charge is needed for each chunk,
// so the cache charge is recalculated here.
CacheValueChunk* SplitValueIntoChunks(const Slice& value,
const CompressionType compression_type,
CompressionType compression_type,
size_t& charge);
// After merging chunks, the extra charge for each chunk is removed, so
@ -128,6 +133,7 @@ class CompressedSecondaryCache : public SecondaryCache {
static Cache::DeleterFn GetDeletionCallback(bool enable_custom_split_merge);
std::shared_ptr<Cache> cache_;
CompressedSecondaryCacheOptions cache_options_;
mutable port::Mutex capacity_mutex_;
};
} // namespace ROCKSDB_NAMESPACE

@ -5,28 +5,21 @@
#include "cache/compressed_secondary_cache.h"
#include <algorithm>
#include <cstdint>
#include <iterator>
#include <memory>
#include <tuple>
#include "cache/lru_cache.h"
#include "memory/jemalloc_nodump_allocator.h"
#include "memory/memory_allocator.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/convenience.h"
#include "rocksdb/secondary_cache.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/compression.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
class CompressedSecondaryCacheTest : public testing::Test {
public:
CompressedSecondaryCacheTest() : fail_create_(false) {}
~CompressedSecondaryCacheTest() {}
~CompressedSecondaryCacheTest() override = default;
protected:
class TestItem {
@ -34,10 +27,10 @@ class CompressedSecondaryCacheTest : public testing::Test {
TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) {
memcpy(buf_.get(), buf, size);
}
~TestItem() {}
~TestItem() = default;
char* Buf() { return buf_.get(); }
size_t Size() { return size_; }
[[nodiscard]] size_t Size() const { return size_; }
private:
std::unique_ptr<char[]> buf_;
@ -50,7 +43,7 @@ class CompressedSecondaryCacheTest : public testing::Test {
static Status SaveToCallback(void* from_obj, size_t from_offset,
size_t length, void* out) {
TestItem* item = reinterpret_cast<TestItem*>(from_obj);
auto item = reinterpret_cast<TestItem*>(from_obj);
const char* buf = item->Buf();
EXPECT_EQ(length, item->Size());
EXPECT_EQ(from_offset, 0);
@ -315,12 +308,12 @@ class CompressedSecondaryCacheTest : public testing::Test {
get_perf_context()->Reset();
Random rnd(301);
std::string str1 = rnd.RandomString(1001);
TestItem* item1_1 = new TestItem(str1.data(), str1.length());
auto item1_1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert(
"k1", item1_1, &CompressedSecondaryCacheTest::helper_, str1.length()));
std::string str2 = rnd.RandomString(1012);
TestItem* item2_1 = new TestItem(str2.data(), str2.length());
auto item2_1 = new TestItem(str2.data(), str2.length());
// After this Insert, primary cache contains k2 and secondary cache contains
// k1's dummy item.
ASSERT_OK(cache->Insert(
@ -330,7 +323,7 @@ class CompressedSecondaryCacheTest : public testing::Test {
ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, 0);
std::string str3 = rnd.RandomString(1024);
TestItem* item3_1 = new TestItem(str3.data(), str3.length());
auto item3_1 = new TestItem(str3.data(), str3.length());
// After this Insert, primary cache contains k3 and secondary cache contains
// k1's dummy item and k2's dummy item.
ASSERT_OK(cache->Insert(
@ -339,14 +332,14 @@ class CompressedSecondaryCacheTest : public testing::Test {
// After this Insert, primary cache contains k1 and secondary cache contains
// k1's dummy item, k2's dummy item, and k3's dummy item.
TestItem* item1_2 = new TestItem(str1.data(), str1.length());
auto item1_2 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert(
"k1", item1_2, &CompressedSecondaryCacheTest::helper_, str1.length()));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 3);
// After this Insert, primary cache contains k2 and secondary cache contains
// k1's item, k2's dummy item, and k3's dummy item.
TestItem* item2_2 = new TestItem(str2.data(), str2.length());
auto item2_2 = new TestItem(str2.data(), str2.length());
ASSERT_OK(cache->Insert(
"k2", item2_2, &CompressedSecondaryCacheTest::helper_, str2.length()));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1);
@ -362,7 +355,7 @@ class CompressedSecondaryCacheTest : public testing::Test {
// After this Insert, primary cache contains k3 and secondary cache contains
// k1's item and k2's item.
TestItem* item3_2 = new TestItem(str3.data(), str3.length());
auto item3_2 = new TestItem(str3.data(), str3.length());
ASSERT_OK(cache->Insert(
"k3", item3_2, &CompressedSecondaryCacheTest::helper_, str3.length()));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 2);
@ -381,7 +374,7 @@ class CompressedSecondaryCacheTest : public testing::Test {
test_item_creator, Cache::Priority::LOW, true,
stats.get());
ASSERT_NE(handle, nullptr);
TestItem* val3 = static_cast<TestItem*>(cache->Value(handle));
auto val3 = static_cast<TestItem*>(cache->Value(handle));
ASSERT_NE(val3, nullptr);
ASSERT_EQ(memcmp(val3->Buf(), item3_2->Buf(), item3_2->Size()), 0);
cache->Release(handle);
@ -399,18 +392,20 @@ class CompressedSecondaryCacheTest : public testing::Test {
stats.get());
ASSERT_NE(handle, nullptr);
ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 1);
TestItem* val1_1 = static_cast<TestItem*>(cache->Value(handle));
auto val1_1 = static_cast<TestItem*>(cache->Value(handle));
ASSERT_NE(val1_1, nullptr);
ASSERT_EQ(memcmp(val1_1->Buf(), str1.data(), str1.size()), 0);
cache->Release(handle);
// This Lookup should erase k1 from the secondary cache and insert
// it into primary cache; then k3 is demoted.
// k2 and k3 are in secondary cache.
handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true,
stats.get());
ASSERT_NE(handle, nullptr);
ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 1);
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 3);
cache->Release(handle);
// k2 is still in secondary cache.
@ -421,6 +416,54 @@ class CompressedSecondaryCacheTest : public testing::Test {
ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 2);
cache->Release(handle);
// Testing SetCapacity().
ASSERT_OK(secondary_cache->SetCapacity(0));
handle = cache->Lookup("k3", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true,
stats.get());
ASSERT_EQ(handle, nullptr);
ASSERT_OK(secondary_cache->SetCapacity(7000));
size_t capacity;
ASSERT_OK(secondary_cache->GetCapacity(capacity));
ASSERT_EQ(capacity, 7000);
auto item1_3 = new TestItem(str1.data(), str1.length());
// After this Insert, primary cache contains k1.
ASSERT_OK(cache->Insert(
"k1", item1_3, &CompressedSecondaryCacheTest::helper_, str2.length()));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 3);
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 4);
auto item2_3 = new TestItem(str2.data(), str2.length());
// After this Insert, primary cache contains k2 and secondary cache contains
// k1's dummy item.
ASSERT_OK(cache->Insert(
"k2", item2_3, &CompressedSecondaryCacheTest::helper_, str1.length()));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 4);
auto item1_4 = new TestItem(str1.data(), str1.length());
// After this Insert, primary cache contains k1 and secondary cache contains
// k1's dummy item and k2's dummy item.
ASSERT_OK(cache->Insert(
"k1", item1_4, &CompressedSecondaryCacheTest::helper_, str2.length()));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 5);
auto item2_4 = new TestItem(str2.data(), str2.length());
// After this Insert, primary cache contains k2 and secondary cache contains
// k1's real item and k2's dummy item.
ASSERT_OK(cache->Insert(
"k2", item2_4, &CompressedSecondaryCacheTest::helper_, str2.length()));
ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 5);
// This Lookup should just insert a dummy handle in the primary cache
// and the k1 is still in the secondary cache.
handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_,
test_item_creator, Cache::Priority::LOW, true,
stats.get());
ASSERT_NE(handle, nullptr);
cache->Release(handle);
ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 3);
cache.reset();
secondary_cache.reset();
}
@ -452,8 +495,7 @@ class CompressedSecondaryCacheTest : public testing::Test {
Random rnd(301);
std::string str1 = rnd.RandomString(1001);
auto item1 =
std::unique_ptr<TestItem>(new TestItem(str1.data(), str1.length()));
auto item1 = std::make_unique<TestItem>(str1.data(), str1.length());
ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length()));
ASSERT_OK(cache->Insert("k1", item1.get(),
&CompressedSecondaryCacheTest::helper_,
@ -500,13 +542,13 @@ class CompressedSecondaryCacheTest : public testing::Test {
Random rnd(301);
std::string str1 = rnd.RandomString(1001);
TestItem* item1 = new TestItem(str1.data(), str1.length());
auto item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1,
&CompressedSecondaryCacheTest::helper_fail_,
str1.length()));
std::string str2 = rnd.RandomString(1002);
TestItem* item2 = new TestItem(str2.data(), str2.length());
auto item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2,
&CompressedSecondaryCacheTest::helper_fail_,
@ -559,12 +601,12 @@ class CompressedSecondaryCacheTest : public testing::Test {
Random rnd(301);
std::string str1 = rnd.RandomString(1001);
TestItem* item1 = new TestItem(str1.data(), str1.length());
auto item1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_,
str1.length()));
std::string str2 = rnd.RandomString(1002);
TestItem* item2 = new TestItem(str2.data(), str2.length());
auto item2 = new TestItem(str2.data(), str2.length());
// k1 should be demoted to the secondary cache.
ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_,
str2.length()));
@ -617,13 +659,13 @@ class CompressedSecondaryCacheTest : public testing::Test {
Random rnd(301);
std::string str1 = rnd.RandomString(1001);
TestItem* item1_1 = new TestItem(str1.data(), str1.length());
auto item1_1 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert(
"k1", item1_1, &CompressedSecondaryCacheTest::helper_, str1.length()));
std::string str2 = rnd.RandomString(1002);
std::string str2_clone{str2};
TestItem* item2 = new TestItem(str2.data(), str2.length());
auto item2 = new TestItem(str2.data(), str2.length());
// After this Insert, primary cache contains k2 and secondary cache contains
// k1's dummy item.
ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_,
@ -631,11 +673,11 @@ class CompressedSecondaryCacheTest : public testing::Test {
// After this Insert, primary cache contains k1 and secondary cache contains
// k1's dummy item and k2's dummy item.
TestItem* item1_2 = new TestItem(str1.data(), str1.length());
auto item1_2 = new TestItem(str1.data(), str1.length());
ASSERT_OK(cache->Insert(
"k1", item1_2, &CompressedSecondaryCacheTest::helper_, str1.length()));
TestItem* item2_2 = new TestItem(str2.data(), str2.length());
auto item2_2 = new TestItem(str2.data(), str2.length());
// After this Insert, primary cache contains k2 and secondary cache contains
// k1's item and k2's dummy item.
ASSERT_OK(cache->Insert(

@ -24,7 +24,7 @@ namespace ROCKSDB_NAMESPACE {
// handle successfullly read the item.
class SecondaryCacheResultHandle {
public:
virtual ~SecondaryCacheResultHandle() {}
virtual ~SecondaryCacheResultHandle() = default;
// Returns whether the handle is ready or not
virtual bool IsReady() = 0;
@ -49,7 +49,7 @@ class SecondaryCacheResultHandle {
// including data loss, unreported corruption, deadlocks, and more.
class SecondaryCache : public Customizable {
public:
virtual ~SecondaryCache() {}
~SecondaryCache() override = default;
static const char* Type() { return "SecondaryCache"; }
static Status CreateFromString(const ConfigOptions& config_options,
@ -83,7 +83,7 @@ class SecondaryCache : public Customizable {
bool advise_erase, bool& is_in_sec_cache) = 0;
// Indicate whether a handle can be erased in this secondary cache.
virtual bool SupportForceErase() const = 0;
[[nodiscard]] virtual bool SupportForceErase() const = 0;
// At the discretion of the implementation, erase the data associated
// with key.
@ -92,7 +92,20 @@ class SecondaryCache : public Customizable {
// Wait for a collection of handles to become ready.
virtual void WaitAll(std::vector<SecondaryCacheResultHandle*> handles) = 0;
virtual std::string GetPrintableOptions() const = 0;
// Set the maximum configured capacity of the cache.
// When the new capacity is less than the old capacity and the existing usage
// is greater than new capacity, the implementation will do its best job to
// purge the released entries from the cache in order to lower the usage.
//
// The derived class can make this function no-op and return NotSupported().
virtual Status SetCapacity(size_t /* capacity */) {
return Status::NotSupported();
}
// The derived class can make this function no-op and return NotSupported().
virtual Status GetCapacity(size_t& /* capacity */) {
return Status::NotSupported();
}
};
} // namespace ROCKSDB_NAMESPACE

@ -44,6 +44,14 @@ class FaultInjectionSecondaryCache : public SecondaryCache {
void WaitAll(std::vector<SecondaryCacheResultHandle*> handles) override;
Status SetCapacity(size_t capacity) override {
return base_->SetCapacity(capacity);
}
Status GetCapacity(size_t& capacity) override {
return base_->GetCapacity(capacity);
}
std::string GetPrintableOptions() const override {
return base_->GetPrintableOptions();
}

Loading…
Cancel
Save