From 47b57a37317fb31219eb9838643ac7576924ba4f Mon Sep 17 00:00:00 2001 From: gitbw95 <95719937+gitbw95@users.noreply.github.com> Date: Thu, 29 Sep 2022 19:15:04 -0700 Subject: [PATCH] add SetCapacity and GetCapacity for secondary cache (#10712) Summary: To support tuning secondary cache dynamically, add `SetCapacity()` and `GetCapacity()` for CompressedSecondaryCache. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10712 Test Plan: Unit Tests Reviewed By: anand1976 Differential Revision: D39685212 Pulled By: gitbw95 fbshipit-source-id: 19573c67237011927320207732b5de083cb87240 --- cache/compressed_secondary_cache.cc | 19 +++- cache/compressed_secondary_cache.h | 12 ++- cache/compressed_secondary_cache_test.cc | 102 ++++++++++++++------ include/rocksdb/secondary_cache.h | 21 +++- utilities/fault_injection_secondary_cache.h | 8 ++ 5 files changed, 122 insertions(+), 40 deletions(-) diff --git a/cache/compressed_secondary_cache.cc b/cache/compressed_secondary_cache.cc index fd33de4c6..7d1bdc789 100644 --- a/cache/compressed_secondary_cache.cc +++ b/cache/compressed_secondary_cache.cc @@ -178,6 +178,19 @@ Status CompressedSecondaryCache::Insert(const Slice& key, void* value, void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); } +Status CompressedSecondaryCache::SetCapacity(size_t capacity) { + MutexLock l(&capacity_mutex_); + cache_options_.capacity = capacity; + cache_->SetCapacity(capacity); + return Status::OK(); +} + +Status CompressedSecondaryCache::GetCapacity(size_t& capacity) { + MutexLock l(&capacity_mutex_); + capacity = cache_options_.capacity; + return Status::OK(); +} + std::string CompressedSecondaryCache::GetPrintableOptions() const { std::string ret; ret.reserve(20000); @@ -194,9 +207,9 @@ std::string CompressedSecondaryCache::GetPrintableOptions() const { } CompressedSecondaryCache::CacheValueChunk* -CompressedSecondaryCache::SplitValueIntoChunks( - const Slice& value, const CompressionType compression_type, - size_t& charge) { +CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value, + CompressionType compression_type, + size_t& charge) { assert(!value.empty()); const char* src_ptr = value.data(); size_t src_size{value.size()}; diff --git a/cache/compressed_secondary_cache.h b/cache/compressed_secondary_cache.h index 041e96f91..4dee38802 100644 --- a/cache/compressed_secondary_cache.h +++ b/cache/compressed_secondary_cache.h @@ -15,6 +15,7 @@ #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "util/compression.h" +#include "util/mutexlock.h" namespace ROCKSDB_NAMESPACE { @@ -22,7 +23,7 @@ class CompressedSecondaryCacheResultHandle : public SecondaryCacheResultHandle { public: CompressedSecondaryCacheResultHandle(void* value, size_t size) : value_(value), size_(size) {} - virtual ~CompressedSecondaryCacheResultHandle() override = default; + ~CompressedSecondaryCacheResultHandle() override = default; CompressedSecondaryCacheResultHandle( const CompressedSecondaryCacheResultHandle&) = delete; @@ -78,7 +79,7 @@ class CompressedSecondaryCache : public SecondaryCache { CompressionType compression_type = CompressionType::kLZ4Compression, uint32_t compress_format_version = 2, bool enable_custom_split_merge = false); - virtual ~CompressedSecondaryCache() override; + ~CompressedSecondaryCache() override; const char* Name() const override { return "CompressedSecondaryCache"; } @@ -95,6 +96,10 @@ class CompressedSecondaryCache : public SecondaryCache { void WaitAll(std::vector /*handles*/) override {} + Status SetCapacity(size_t capacity) override; + + Status GetCapacity(size_t& capacity) override; + std::string GetPrintableOptions() const override; private: @@ -116,7 +121,7 @@ class CompressedSecondaryCache : public SecondaryCache { // are stored in CacheValueChunk and extra charge is needed for each chunk, // so the cache charge is recalculated here. CacheValueChunk* SplitValueIntoChunks(const Slice& value, - const CompressionType compression_type, + CompressionType compression_type, size_t& charge); // After merging chunks, the extra charge for each chunk is removed, so @@ -128,6 +133,7 @@ class CompressedSecondaryCache : public SecondaryCache { static Cache::DeleterFn GetDeletionCallback(bool enable_custom_split_merge); std::shared_ptr cache_; CompressedSecondaryCacheOptions cache_options_; + mutable port::Mutex capacity_mutex_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/cache/compressed_secondary_cache_test.cc b/cache/compressed_secondary_cache_test.cc index 3057f84e5..5a5eeebda 100644 --- a/cache/compressed_secondary_cache_test.cc +++ b/cache/compressed_secondary_cache_test.cc @@ -5,28 +5,21 @@ #include "cache/compressed_secondary_cache.h" -#include -#include #include +#include #include -#include "cache/lru_cache.h" #include "memory/jemalloc_nodump_allocator.h" -#include "memory/memory_allocator.h" -#include "rocksdb/compression_type.h" #include "rocksdb/convenience.h" -#include "rocksdb/secondary_cache.h" #include "test_util/testharness.h" #include "test_util/testutil.h" -#include "util/compression.h" -#include "util/random.h" namespace ROCKSDB_NAMESPACE { class CompressedSecondaryCacheTest : public testing::Test { public: CompressedSecondaryCacheTest() : fail_create_(false) {} - ~CompressedSecondaryCacheTest() {} + ~CompressedSecondaryCacheTest() override = default; protected: class TestItem { @@ -34,10 +27,10 @@ class CompressedSecondaryCacheTest : public testing::Test { TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) { memcpy(buf_.get(), buf, size); } - ~TestItem() {} + ~TestItem() = default; char* Buf() { return buf_.get(); } - size_t Size() { return size_; } + [[nodiscard]] size_t Size() const { return size_; } private: std::unique_ptr buf_; @@ -50,7 +43,7 @@ class CompressedSecondaryCacheTest : public testing::Test { static Status SaveToCallback(void* from_obj, size_t from_offset, size_t length, void* out) { - TestItem* item = reinterpret_cast(from_obj); + auto item = reinterpret_cast(from_obj); const char* buf = item->Buf(); EXPECT_EQ(length, item->Size()); EXPECT_EQ(from_offset, 0); @@ -315,12 +308,12 @@ class CompressedSecondaryCacheTest : public testing::Test { get_perf_context()->Reset(); Random rnd(301); std::string str1 = rnd.RandomString(1001); - TestItem* item1_1 = new TestItem(str1.data(), str1.length()); + auto item1_1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert( "k1", item1_1, &CompressedSecondaryCacheTest::helper_, str1.length())); std::string str2 = rnd.RandomString(1012); - TestItem* item2_1 = new TestItem(str2.data(), str2.length()); + auto item2_1 = new TestItem(str2.data(), str2.length()); // After this Insert, primary cache contains k2 and secondary cache contains // k1's dummy item. ASSERT_OK(cache->Insert( @@ -330,7 +323,7 @@ class CompressedSecondaryCacheTest : public testing::Test { ASSERT_EQ(get_perf_context()->compressed_sec_cache_compressed_bytes, 0); std::string str3 = rnd.RandomString(1024); - TestItem* item3_1 = new TestItem(str3.data(), str3.length()); + auto item3_1 = new TestItem(str3.data(), str3.length()); // After this Insert, primary cache contains k3 and secondary cache contains // k1's dummy item and k2's dummy item. ASSERT_OK(cache->Insert( @@ -339,14 +332,14 @@ class CompressedSecondaryCacheTest : public testing::Test { // After this Insert, primary cache contains k1 and secondary cache contains // k1's dummy item, k2's dummy item, and k3's dummy item. - TestItem* item1_2 = new TestItem(str1.data(), str1.length()); + auto item1_2 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert( "k1", item1_2, &CompressedSecondaryCacheTest::helper_, str1.length())); ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 3); // After this Insert, primary cache contains k2 and secondary cache contains // k1's item, k2's dummy item, and k3's dummy item. - TestItem* item2_2 = new TestItem(str2.data(), str2.length()); + auto item2_2 = new TestItem(str2.data(), str2.length()); ASSERT_OK(cache->Insert( "k2", item2_2, &CompressedSecondaryCacheTest::helper_, str2.length())); ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 1); @@ -362,7 +355,7 @@ class CompressedSecondaryCacheTest : public testing::Test { // After this Insert, primary cache contains k3 and secondary cache contains // k1's item and k2's item. - TestItem* item3_2 = new TestItem(str3.data(), str3.length()); + auto item3_2 = new TestItem(str3.data(), str3.length()); ASSERT_OK(cache->Insert( "k3", item3_2, &CompressedSecondaryCacheTest::helper_, str3.length())); ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 2); @@ -381,7 +374,7 @@ class CompressedSecondaryCacheTest : public testing::Test { test_item_creator, Cache::Priority::LOW, true, stats.get()); ASSERT_NE(handle, nullptr); - TestItem* val3 = static_cast(cache->Value(handle)); + auto val3 = static_cast(cache->Value(handle)); ASSERT_NE(val3, nullptr); ASSERT_EQ(memcmp(val3->Buf(), item3_2->Buf(), item3_2->Size()), 0); cache->Release(handle); @@ -399,18 +392,20 @@ class CompressedSecondaryCacheTest : public testing::Test { stats.get()); ASSERT_NE(handle, nullptr); ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 1); - TestItem* val1_1 = static_cast(cache->Value(handle)); + auto val1_1 = static_cast(cache->Value(handle)); ASSERT_NE(val1_1, nullptr); ASSERT_EQ(memcmp(val1_1->Buf(), str1.data(), str1.size()), 0); cache->Release(handle); // This Lookup should erase k1 from the secondary cache and insert // it into primary cache; then k3 is demoted. + // k2 and k3 are in secondary cache. handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, test_item_creator, Cache::Priority::LOW, true, stats.get()); ASSERT_NE(handle, nullptr); ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 1); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 3); cache->Release(handle); // k2 is still in secondary cache. @@ -421,6 +416,54 @@ class CompressedSecondaryCacheTest : public testing::Test { ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 2); cache->Release(handle); + // Testing SetCapacity(). + ASSERT_OK(secondary_cache->SetCapacity(0)); + handle = cache->Lookup("k3", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + ASSERT_EQ(handle, nullptr); + + ASSERT_OK(secondary_cache->SetCapacity(7000)); + size_t capacity; + ASSERT_OK(secondary_cache->GetCapacity(capacity)); + ASSERT_EQ(capacity, 7000); + auto item1_3 = new TestItem(str1.data(), str1.length()); + // After this Insert, primary cache contains k1. + ASSERT_OK(cache->Insert( + "k1", item1_3, &CompressedSecondaryCacheTest::helper_, str2.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 3); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 4); + + auto item2_3 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's dummy item. + ASSERT_OK(cache->Insert( + "k2", item2_3, &CompressedSecondaryCacheTest::helper_, str1.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 4); + + auto item1_4 = new TestItem(str1.data(), str1.length()); + // After this Insert, primary cache contains k1 and secondary cache contains + // k1's dummy item and k2's dummy item. + ASSERT_OK(cache->Insert( + "k1", item1_4, &CompressedSecondaryCacheTest::helper_, str2.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_dummy_count, 5); + + auto item2_4 = new TestItem(str2.data(), str2.length()); + // After this Insert, primary cache contains k2 and secondary cache contains + // k1's real item and k2's dummy item. + ASSERT_OK(cache->Insert( + "k2", item2_4, &CompressedSecondaryCacheTest::helper_, str2.length())); + ASSERT_EQ(get_perf_context()->compressed_sec_cache_insert_real_count, 5); + // This Lookup should just insert a dummy handle in the primary cache + // and the k1 is still in the secondary cache. + handle = cache->Lookup("k1", &CompressedSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true, + stats.get()); + + ASSERT_NE(handle, nullptr); + cache->Release(handle); + ASSERT_EQ(get_perf_context()->block_cache_standalone_handle_count, 3); + cache.reset(); secondary_cache.reset(); } @@ -452,8 +495,7 @@ class CompressedSecondaryCacheTest : public testing::Test { Random rnd(301); std::string str1 = rnd.RandomString(1001); - auto item1 = - std::unique_ptr(new TestItem(str1.data(), str1.length())); + auto item1 = std::make_unique(str1.data(), str1.length()); ASSERT_NOK(cache->Insert("k1", item1.get(), nullptr, str1.length())); ASSERT_OK(cache->Insert("k1", item1.get(), &CompressedSecondaryCacheTest::helper_, @@ -500,13 +542,13 @@ class CompressedSecondaryCacheTest : public testing::Test { Random rnd(301); std::string str1 = rnd.RandomString(1001); - TestItem* item1 = new TestItem(str1.data(), str1.length()); + auto item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_fail_, str1.length())); std::string str2 = rnd.RandomString(1002); - TestItem* item2 = new TestItem(str2.data(), str2.length()); + auto item2 = new TestItem(str2.data(), str2.length()); // k1 should be demoted to the secondary cache. ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_fail_, @@ -559,12 +601,12 @@ class CompressedSecondaryCacheTest : public testing::Test { Random rnd(301); std::string str1 = rnd.RandomString(1001); - TestItem* item1 = new TestItem(str1.data(), str1.length()); + auto item1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert("k1", item1, &CompressedSecondaryCacheTest::helper_, str1.length())); std::string str2 = rnd.RandomString(1002); - TestItem* item2 = new TestItem(str2.data(), str2.length()); + auto item2 = new TestItem(str2.data(), str2.length()); // k1 should be demoted to the secondary cache. ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, str2.length())); @@ -617,13 +659,13 @@ class CompressedSecondaryCacheTest : public testing::Test { Random rnd(301); std::string str1 = rnd.RandomString(1001); - TestItem* item1_1 = new TestItem(str1.data(), str1.length()); + auto item1_1 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert( "k1", item1_1, &CompressedSecondaryCacheTest::helper_, str1.length())); std::string str2 = rnd.RandomString(1002); std::string str2_clone{str2}; - TestItem* item2 = new TestItem(str2.data(), str2.length()); + auto item2 = new TestItem(str2.data(), str2.length()); // After this Insert, primary cache contains k2 and secondary cache contains // k1's dummy item. ASSERT_OK(cache->Insert("k2", item2, &CompressedSecondaryCacheTest::helper_, @@ -631,11 +673,11 @@ class CompressedSecondaryCacheTest : public testing::Test { // After this Insert, primary cache contains k1 and secondary cache contains // k1's dummy item and k2's dummy item. - TestItem* item1_2 = new TestItem(str1.data(), str1.length()); + auto item1_2 = new TestItem(str1.data(), str1.length()); ASSERT_OK(cache->Insert( "k1", item1_2, &CompressedSecondaryCacheTest::helper_, str1.length())); - TestItem* item2_2 = new TestItem(str2.data(), str2.length()); + auto item2_2 = new TestItem(str2.data(), str2.length()); // After this Insert, primary cache contains k2 and secondary cache contains // k1's item and k2's dummy item. ASSERT_OK(cache->Insert( diff --git a/include/rocksdb/secondary_cache.h b/include/rocksdb/secondary_cache.h index cb22c2f51..3e637efb3 100644 --- a/include/rocksdb/secondary_cache.h +++ b/include/rocksdb/secondary_cache.h @@ -24,7 +24,7 @@ namespace ROCKSDB_NAMESPACE { // handle successfullly read the item. class SecondaryCacheResultHandle { public: - virtual ~SecondaryCacheResultHandle() {} + virtual ~SecondaryCacheResultHandle() = default; // Returns whether the handle is ready or not virtual bool IsReady() = 0; @@ -49,7 +49,7 @@ class SecondaryCacheResultHandle { // including data loss, unreported corruption, deadlocks, and more. class SecondaryCache : public Customizable { public: - virtual ~SecondaryCache() {} + ~SecondaryCache() override = default; static const char* Type() { return "SecondaryCache"; } static Status CreateFromString(const ConfigOptions& config_options, @@ -83,7 +83,7 @@ class SecondaryCache : public Customizable { bool advise_erase, bool& is_in_sec_cache) = 0; // Indicate whether a handle can be erased in this secondary cache. - virtual bool SupportForceErase() const = 0; + [[nodiscard]] virtual bool SupportForceErase() const = 0; // At the discretion of the implementation, erase the data associated // with key. @@ -92,7 +92,20 @@ class SecondaryCache : public Customizable { // Wait for a collection of handles to become ready. virtual void WaitAll(std::vector handles) = 0; - virtual std::string GetPrintableOptions() const = 0; + // Set the maximum configured capacity of the cache. + // When the new capacity is less than the old capacity and the existing usage + // is greater than new capacity, the implementation will do its best job to + // purge the released entries from the cache in order to lower the usage. + // + // The derived class can make this function no-op and return NotSupported(). + virtual Status SetCapacity(size_t /* capacity */) { + return Status::NotSupported(); + } + + // The derived class can make this function no-op and return NotSupported(). + virtual Status GetCapacity(size_t& /* capacity */) { + return Status::NotSupported(); + } }; } // namespace ROCKSDB_NAMESPACE diff --git a/utilities/fault_injection_secondary_cache.h b/utilities/fault_injection_secondary_cache.h index 582ea14ed..5321df626 100644 --- a/utilities/fault_injection_secondary_cache.h +++ b/utilities/fault_injection_secondary_cache.h @@ -44,6 +44,14 @@ class FaultInjectionSecondaryCache : public SecondaryCache { void WaitAll(std::vector handles) override; + Status SetCapacity(size_t capacity) override { + return base_->SetCapacity(capacity); + } + + Status GetCapacity(size_t& capacity) override { + return base_->GetCapacity(capacity); + } + std::string GetPrintableOptions() const override { return base_->GetPrintableOptions(); }