Integrate CacheReservationManager with compressed secondary cache (#11449)

Summary:
This draft PR implements charging of reserved memory, for write buffers, table readers, and other purposes, proportionally to the block cache and the compressed secondary cache. The basic flow of memory reservation is maintained - clients use ```CacheReservationManager``` to request reservations, and ```CacheReservationManager``` inserts placeholder entries, i.e null value and non-zero charge, into the block cache. The ```CacheWithSecondaryAdapter``` wrapper uses its own instance of ```CacheReservationManager``` to keep track of reservations charged to the secondary cache, while the placeholder entries are inserted into the primary block cache. The design is as follows.

When ```CacheWithSecondaryAdapter``` is constructed with the ```distribute_cache_res``` parameter set to true, it manages the entire memory budget across the primary and secondary cache. The secondary cache is assumed to be in memory, such as the ```CompressedSecondaryCache```. When a placeholder entry is inserted by a CacheReservationManager instance to reserve memory, the ```CacheWithSecondaryAdapter```ensures that the reservation is distributed proportionally across the primary/secondary caches.

The primary block cache is initially sized to the sum of the primary cache budget + the secondary cache budget, as follows -
  |---------    Primary Cache Configured Capacity  -----------|
  |---Secondary Cache Budget----|----Primary Cache Budget-----|

A ```ConcurrentCacheReservationManager``` member in the ```CacheWithSecondaryAdapter```, ```pri_cache_res_```, is used to help with tracking the distribution of memory reservations. Initially, it accounts for the entire secondary cache budget as a reservation against the primary cache. This shrinks the usable capacity of the primary cache to the budget that the user originally desired.

  |--Reservation for Sec Cache--|-Pri Cache Usable Capacity---|

When a reservation placeholder is inserted into the adapter, it is inserted directly into the primary cache. This means the entire charge of the placeholder is counted against the primary cache. To compensate and count a portion of it against the secondary cache, the secondary cache ```Deflate()``` method is called to shrink it. Since the ```Deflate()``` causes the secondary actual usage to shrink, it is reflected here by releasing an equal amount from the ```pri_cache_res_``` reservation.

For example, if the pri/sec ratio is 50/50, this would be the state after placeholder insertion -

  |-Reservation for Sec Cache-|-Pri Cache Usable Capacity-|-R-|

Likewise, when the user inserted placeholder is released, the secondary cache ```Inflate()``` method is called to grow it, and the ```pri_cache_res_``` reservation is increased by an equal amount.

Other alternatives -
1. Another way of implementing this would have been to simply split the user reservation in ```CacheWithSecondaryAdapter``` into primary and secondary components. However, this would require allocating a structure to track the associated secondary cache reservation, which adds some complexity and overhead.
2. Yet another option is to implement the splitting directly in ```CacheReservationManager```. However, there are multiple instances of ```CacheReservationManager``` in a DB instance, making it complicated to keep track of them.

The PR contains the following changes -
1. A new cache allocator, ```NewTieredVolatileCache()```, is defined for allocating a tiered primary block cache and compressed secondary cache. This internally allocates an instance of ```CacheWithSecondaryAdapter```.
3. New interfaces, ```Deflate()``` and ```Inflate()```, are added to the ```SecondaryCache``` interface. The default implementaion returns ```NotSupported``` with overrides in ```CompressedSecondaryCache```.
4. The ```CompressedSecondaryCache``` uses a ```ConcurrentCacheReservationManager``` instance to manage reservations done using ```Inflate()/Deflate()```.
5. The ```CacheWithSecondaryAdapter``` optionally distributes memory reservations across the primary and secondary caches. The primary cache is sized to the total memory budget (primary + secondary), and the capacity allocated to secondary cache is "reserved" against the primary cache. For any subsequent reservations, the primary cache pre-reserved capacity is adjusted.

Benchmarks -
Baseline
```
time ~/rocksdb_anand76/db_bench --db=/dev/shm/comp_cache_res/base --use_existing_db=true --benchmarks="readseq,readwhilewriting" --key_size=32 --value_size=1024 --num=20000000 --threads=32 --bloom_bits=10 --cache_size=30000000000 --use_compressed_secondary_cache=true --compressed_secondary_cache_size=5000000000 --duration=300 --cost_write_buffer_to_cache=true
```
```
readseq      :       3.301 micros/op 9694317 ops/sec 66.018 seconds 640000000 operations; 9763.0 MB/s
readwhilewriting :      22.921 micros/op 1396058 ops/sec 300.021 seconds 418846968 operations; 1405.9 MB/s (13068999 of 13068999 found)

real    6m31.052s
user    152m5.660s
sys     26m18.738s
```
With TieredVolatileCache
```
time ~/rocksdb_anand76/db_bench --db=/dev/shm/comp_cache_res/base --use_existing_db=true --benchmarks="readseq,readwhilewriting" --key_size=32 --value_size=1024 --num=20000000 --threads=32 --bloom_bits=10 --cache_size=30000000000 --use_compressed_secondary_cache=true --compressed_secondary_cache_size=5000000000 --duration=300 --cost_write_buffer_to_cache=true --use_tiered_volatile_cache=true
```
```
readseq      :       4.064 micros/op 7873915 ops/sec 81.281 seconds 640000000 operations; 7929.7 MB/s
readwhilewriting :      20.944 micros/op 1527827 ops/sec 300.020 seconds 458378968 operations; 1538.6 MB/s (14296999 of 14296999 found)

real    6m42.743s
user    157m58.972s
sys     33m16.671
```
```
readseq      :       3.484 micros/op 9184967 ops/sec 69.679 seconds 640000000 operations; 9250.0 MB/s
readwhilewriting :      21.261 micros/op 1505035 ops/sec 300.024 seconds 451545968 operations; 1515.7 MB/s (14101999 of 14101999 found)

real    6m31.469s
user    155m16.570s
sys     27m47.834s
```

ToDo -
1. Add to db_stress

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11449

Reviewed By: pdillinger

Differential Revision: D46197388

Pulled By: anand1976

fbshipit-source-id: 42d16f0254df683db4929db20d06ff26030e90df
oxigraph-main
anand76 2 years ago committed by Facebook GitHub Bot
parent 3e7fc88167
commit fcc358baf2
  1. 1
      HISTORY.md
  2. 20
      cache/compressed_secondary_cache.cc
  3. 8
      cache/compressed_secondary_cache.h
  4. 93
      cache/compressed_secondary_cache_test.cc
  5. 142
      cache/secondary_cache_adapter.cc
  6. 26
      cache/secondary_cache_adapter.h
  7. 22
      include/rocksdb/cache.h
  8. 11
      include/rocksdb/secondary_cache.h
  9. 62
      tools/db_bench_tool.cc

@ -3,6 +3,7 @@
### New Features ### New Features
* Add a new option OptimisticTransactionDBOptions::shared_lock_buckets that enables sharing mutexes for validating transactions between DB instances, for better balancing memory efficiency and validation contention across DB instances. Different column families and DBs also now use different hash seeds in this validation, so that the same set of key names will not contend across DBs or column families. * Add a new option OptimisticTransactionDBOptions::shared_lock_buckets that enables sharing mutexes for validating transactions between DB instances, for better balancing memory efficiency and validation contention across DB instances. Different column families and DBs also now use different hash seeds in this validation, so that the same set of key names will not contend across DBs or column families.
* Add `WriteBatch::Release()` that releases the batch's serialized data to the caller. * Add `WriteBatch::Release()` that releases the batch's serialized data to the caller.
* Add an API NewTieredVolatileCache() in include/rocksdb/cache.h to allocate an instance of a block cache with a primary block cache tier and a compressed secondary cache tier. A cache of this type distributes memory reservations against the block cache, such as WriteBufferManager, table reader memory etc., proportionally across both the primary and compressed secondary cache.
### Public API Changes ### Public API Changes
* Add `WaitForCompact()` to wait for all flush and compactions jobs to finish. Jobs to wait include the unscheduled (queued, but not scheduled yet). * Add `WaitForCompact()` to wait for all flush and compactions jobs to finish. Jobs to wait include the unscheduled (queued, but not scheduled yet).

@ -18,9 +18,15 @@ namespace ROCKSDB_NAMESPACE {
CompressedSecondaryCache::CompressedSecondaryCache( CompressedSecondaryCache::CompressedSecondaryCache(
const CompressedSecondaryCacheOptions& opts) const CompressedSecondaryCacheOptions& opts)
: cache_(opts.LRUCacheOptions::MakeSharedCache()), cache_options_(opts) {} : cache_(opts.LRUCacheOptions::MakeSharedCache()),
cache_options_(opts),
CompressedSecondaryCache::~CompressedSecondaryCache() { cache_.reset(); } cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>(
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache_))) {}
CompressedSecondaryCache::~CompressedSecondaryCache() {
assert(cache_res_mgr_->GetTotalReservedCacheSize() == 0);
}
std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup( std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
const Slice& key, const Cache::CacheItemHelper* helper, const Slice& key, const Cache::CacheItemHelper* helper,
@ -301,4 +307,12 @@ CompressedSecondaryCacheOptions::MakeSharedSecondaryCache() const {
return std::make_shared<CompressedSecondaryCache>(*this); return std::make_shared<CompressedSecondaryCache>(*this);
} }
Status CompressedSecondaryCache::Deflate(size_t decrease) {
return cache_res_mgr_->UpdateCacheReservation(decrease, /*increase=*/true);
}
Status CompressedSecondaryCache::Inflate(size_t increase) {
return cache_res_mgr_->UpdateCacheReservation(increase, /*increase=*/false);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -9,6 +9,7 @@
#include <cstddef> #include <cstddef>
#include <memory> #include <memory>
#include "cache/cache_reservation_manager.h"
#include "cache/lru_cache.h" #include "cache/lru_cache.h"
#include "memory/memory_allocator_impl.h" #include "memory/memory_allocator_impl.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
@ -93,8 +94,14 @@ class CompressedSecondaryCache : public SecondaryCache {
Status GetCapacity(size_t& capacity) override; Status GetCapacity(size_t& capacity) override;
Status Deflate(size_t decrease) override;
Status Inflate(size_t increase) override;
std::string GetPrintableOptions() const override; std::string GetPrintableOptions() const override;
size_t TEST_GetUsage() { return cache_->GetUsage(); }
private: private:
friend class CompressedSecondaryCacheTestBase; friend class CompressedSecondaryCacheTestBase;
static constexpr std::array<uint16_t, 8> malloc_bin_sizes_{ static constexpr std::array<uint16_t, 8> malloc_bin_sizes_{
@ -127,6 +134,7 @@ class CompressedSecondaryCache : public SecondaryCache {
std::shared_ptr<Cache> cache_; std::shared_ptr<Cache> cache_;
CompressedSecondaryCacheOptions cache_options_; CompressedSecondaryCacheOptions cache_options_;
mutable port::Mutex capacity_mutex_; mutable port::Mutex capacity_mutex_;
std::shared_ptr<ConcurrentCacheReservationManager> cache_res_mgr_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -10,11 +10,13 @@
#include <memory> #include <memory>
#include <tuple> #include <tuple>
#include "cache/secondary_cache_adapter.h"
#include "memory/jemalloc_nodump_allocator.h" #include "memory/jemalloc_nodump_allocator.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "test_util/secondary_cache_test_util.h" #include "test_util/secondary_cache_test_util.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "test_util/testutil.h" #include "test_util/testutil.h"
#include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -974,6 +976,97 @@ TEST_P(CompressedSecondaryCacheTest, SplictValueAndMergeChunksTest) {
SplictValueAndMergeChunksTest(); SplictValueAndMergeChunksTest();
} }
class CompressedSecCacheTestWithTiered : public ::testing::Test {
public:
CompressedSecCacheTestWithTiered() {
LRUCacheOptions lru_opts;
TieredVolatileCacheOptions opts;
lru_opts.capacity = 70 << 20;
opts.cache_opts = &lru_opts;
opts.cache_type = PrimaryCacheType::kCacheTypeLRU;
opts.comp_cache_opts.capacity = 30 << 20;
cache_ = NewTieredVolatileCache(opts);
cache_res_mgr_ =
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache_);
}
protected:
CacheReservationManager* cache_res_mgr() { return cache_res_mgr_.get(); }
Cache* GetCache() {
return static_cast_with_check<CacheWithSecondaryAdapter, Cache>(
cache_.get())
->TEST_GetCache();
}
SecondaryCache* GetSecondaryCache() {
return static_cast_with_check<CacheWithSecondaryAdapter, Cache>(
cache_.get())
->TEST_GetSecondaryCache();
}
size_t GetPercent(size_t val, unsigned int percent) {
return static_cast<size_t>(val * percent / 100);
}
private:
std::shared_ptr<Cache> cache_;
std::shared_ptr<CacheReservationManager> cache_res_mgr_;
};
bool CacheUsageWithinBounds(size_t val1, size_t val2, size_t error) {
return ((val1 < (val2 + error)) && (val1 > (val2 - error)));
}
TEST_F(CompressedSecCacheTestWithTiered, CacheReservationManager) {
CompressedSecondaryCache* sec_cache =
reinterpret_cast<CompressedSecondaryCache*>(GetSecondaryCache());
// Use EXPECT_PRED3 instead of EXPECT_NEAR to void too many size_t to
// double explicit casts
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (30 << 20),
GetPercent(30 << 20, 1));
EXPECT_EQ(sec_cache->TEST_GetUsage(), 0);
ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(10 << 20));
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (37 << 20),
GetPercent(37 << 20, 1));
EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (3 << 20),
GetPercent(3 << 20, 1));
ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(0));
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (30 << 20),
GetPercent(30 << 20, 1));
EXPECT_EQ(sec_cache->TEST_GetUsage(), 0);
}
TEST_F(CompressedSecCacheTestWithTiered,
CacheReservationManagerMultipleUpdate) {
CompressedSecondaryCache* sec_cache =
reinterpret_cast<CompressedSecondaryCache*>(GetSecondaryCache());
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (30 << 20),
GetPercent(30 << 20, 1));
EXPECT_EQ(sec_cache->TEST_GetUsage(), 0);
int i;
for (i = 0; i < 10; ++i) {
ASSERT_OK(cache_res_mgr()->UpdateCacheReservation((1 + i) << 20));
}
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (37 << 20),
GetPercent(37 << 20, 1));
EXPECT_PRED3(CacheUsageWithinBounds, sec_cache->TEST_GetUsage(), (3 << 20),
GetPercent(3 << 20, 1));
for (i = 10; i > 0; --i) {
ASSERT_OK(cache_res_mgr()->UpdateCacheReservation(((i - 1) << 20)));
}
EXPECT_PRED3(CacheUsageWithinBounds, GetCache()->GetUsage(), (30 << 20),
GetPercent(30 << 20, 1));
EXPECT_EQ(sec_cache->TEST_GetUsage(), 0);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -6,6 +6,7 @@
#include "cache/secondary_cache_adapter.h" #include "cache/secondary_cache_adapter.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "util/cast_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -18,20 +19,98 @@ const Dummy kDummy{};
Cache::ObjectPtr const kDummyObj = const_cast<Dummy*>(&kDummy); Cache::ObjectPtr const kDummyObj = const_cast<Dummy*>(&kDummy);
} // namespace } // namespace
// When CacheWithSecondaryAdapter is constructed with the distribute_cache_res
// parameter set to true, it manages the entire memory budget across the
// primary and secondary cache. The secondary cache is assumed to be in
// memory, such as the CompressedSecondaryCache. When a placeholder entry
// is inserted by a CacheReservationManager instance to reserve memory,
// the CacheWithSecondaryAdapter ensures that the reservation is distributed
// proportionally across the primary/secondary caches.
//
// The primary block cache is initially sized to the sum of the primary cache
// budget + teh secondary cache budget, as follows -
// |--------- Primary Cache Configured Capacity -----------|
// |---Secondary Cache Budget----|----Primary Cache Budget-----|
//
// A ConcurrentCacheReservationManager member in the CacheWithSecondaryAdapter,
// pri_cache_res_,
// is used to help with tracking the distribution of memory reservations.
// Initially, it accounts for the entire secondary cache budget as a
// reservation against the primary cache. This shrinks the usable capacity of
// the primary cache to the budget that the user originally desired.
//
// |--Reservation for Sec Cache--|-Pri Cache Usable Capacity---|
//
// When a reservation placeholder is inserted into the adapter, it is inserted
// directly into the primary cache. This means the entire charge of the
// placeholder is counted against the primary cache. To compensate and count
// a portion of it against the secondary cache, the secondary cache Deflate()
// method is called to shrink it. Since the Deflate() causes the secondary
// actual usage to shrink, it is refelcted here by releasing an equal amount
// from the pri_cache_res_ reservation. The Deflate() in the secondary cache
// can be, but is not required to be, implemented using its own cache
// reservation manager.
//
// For example, if the pri/sec ratio is 70/30, and the combined capacity is
// 100MB, the intermediate and final state after inserting a reservation
// placeholder for 10MB would be as follows -
//
// |-Reservation for Sec Cache-|-Pri Cache Usable Capacity-|---R---|
// 1. After inserting the placeholder in primary
// |------- 30MB -------------|------- 60MB -------------|-10MB--|
// 2. After deflating the secondary and adjusting the reservation for
// secondary against the primary
// |------- 27MB -------------|------- 63MB -------------|-10MB--|
//
// Likewise, when the user inserted placeholder is released, the secondary
// cache Inflate() method is called to grow it, and the pri_cache_res_
// reservation is increased by an equal amount.
//
// Another way of implementing this would have been to simply split the user
// reservation into primary and seconary components. However, this would
// require allocating a structure to track the associated secondary cache
// reservation, which adds some complexity and overhead.
//
CacheWithSecondaryAdapter::CacheWithSecondaryAdapter( CacheWithSecondaryAdapter::CacheWithSecondaryAdapter(
std::shared_ptr<Cache> target, std::shared_ptr<Cache> target,
std::shared_ptr<SecondaryCache> secondary_cache) std::shared_ptr<SecondaryCache> secondary_cache, bool distribute_cache_res)
: CacheWrapper(std::move(target)), : CacheWrapper(std::move(target)),
secondary_cache_(std::move(secondary_cache)) { secondary_cache_(std::move(secondary_cache)),
distribute_cache_res_(distribute_cache_res) {
target_->SetEvictionCallback([this](const Slice& key, Handle* handle) { target_->SetEvictionCallback([this](const Slice& key, Handle* handle) {
return EvictionHandler(key, handle); return EvictionHandler(key, handle);
}); });
if (distribute_cache_res_) {
size_t sec_capacity = 0;
pri_cache_res_ = std::make_shared<ConcurrentCacheReservationManager>(
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
target_));
Status s = secondary_cache_->GetCapacity(sec_capacity);
assert(s.ok());
// Initially, the primary cache is sized to uncompressed cache budget plsu
// compressed secondary cache budget. The secondary cache budget is then
// taken away from the primary cache through cache reservations. Later,
// when a placeholder entry is inserted by the caller, its inserted
// into the primary cache and the portion that should be assigned to the
// secondary cache is freed from the reservation.
s = pri_cache_res_->UpdateCacheReservation(sec_capacity);
assert(s.ok());
sec_cache_res_ratio_ = (double)sec_capacity / target_->GetCapacity();
}
} }
CacheWithSecondaryAdapter::~CacheWithSecondaryAdapter() { CacheWithSecondaryAdapter::~CacheWithSecondaryAdapter() {
// `*this` will be destroyed before `*target_`, so we have to prevent // `*this` will be destroyed before `*target_`, so we have to prevent
// use after free // use after free
target_->SetEvictionCallback({}); target_->SetEvictionCallback({});
#ifndef NDEBUG
if (distribute_cache_res_) {
size_t sec_capacity = 0;
Status s = secondary_cache_->GetCapacity(sec_capacity);
assert(s.ok());
assert(pri_cache_res_->GetTotalReservedCacheSize() == sec_capacity);
}
#endif // NDEBUG
} }
bool CacheWithSecondaryAdapter::EvictionHandler(const Slice& key, bool CacheWithSecondaryAdapter::EvictionHandler(const Slice& key,
@ -136,6 +215,22 @@ Cache::Handle* CacheWithSecondaryAdapter::Promote(
return result; return result;
} }
Status CacheWithSecondaryAdapter::Insert(const Slice& key, ObjectPtr value,
const CacheItemHelper* helper,
size_t charge, Handle** handle,
Priority priority) {
Status s = target_->Insert(key, value, helper, charge, handle, priority);
if (s.ok() && value == nullptr && distribute_cache_res_) {
size_t sec_charge = static_cast<size_t>(charge * (sec_cache_res_ratio_));
s = secondary_cache_->Deflate(sec_charge);
assert(s.ok());
s = pri_cache_res_->UpdateCacheReservation(sec_charge, /*increase=*/false);
assert(s.ok());
}
return s;
}
Cache::Handle* CacheWithSecondaryAdapter::Lookup(const Slice& key, Cache::Handle* CacheWithSecondaryAdapter::Lookup(const Slice& key,
const CacheItemHelper* helper, const CacheItemHelper* helper,
CreateContext* create_context, CreateContext* create_context,
@ -162,6 +257,22 @@ Cache::Handle* CacheWithSecondaryAdapter::Lookup(const Slice& key,
return result; return result;
} }
bool CacheWithSecondaryAdapter::Release(Handle* handle,
bool erase_if_last_ref) {
if (erase_if_last_ref) {
ObjectPtr v = target_->Value(handle);
if (v == nullptr && distribute_cache_res_) {
size_t charge = target_->GetCharge(handle);
size_t sec_charge = static_cast<size_t>(charge * (sec_cache_res_ratio_));
Status s = secondary_cache_->Inflate(sec_charge);
assert(s.ok());
s = pri_cache_res_->UpdateCacheReservation(sec_charge, /*increase=*/true);
assert(s.ok());
}
}
return target_->Release(handle, erase_if_last_ref);
}
Cache::ObjectPtr CacheWithSecondaryAdapter::Value(Handle* handle) { Cache::ObjectPtr CacheWithSecondaryAdapter::Value(Handle* handle) {
ObjectPtr v = target_->Value(handle); ObjectPtr v = target_->Value(handle);
// TODO with stacked secondaries: might fail in EvictionHandler // TODO with stacked secondaries: might fail in EvictionHandler
@ -292,4 +403,31 @@ const char* CacheWithSecondaryAdapter::Name() const {
// a secondary cache. So we pretend to be that cache // a secondary cache. So we pretend to be that cache
return target_->Name(); return target_->Name();
} }
std::shared_ptr<Cache> NewTieredVolatileCache(
TieredVolatileCacheOptions& opts) {
if (!opts.cache_opts) {
return nullptr;
}
std::shared_ptr<Cache> cache;
if (opts.cache_type == PrimaryCacheType::kCacheTypeLRU) {
LRUCacheOptions cache_opts =
*(static_cast_with_check<LRUCacheOptions, ShardedCacheOptions>(
opts.cache_opts));
cache_opts.capacity += opts.comp_cache_opts.capacity;
cache = cache_opts.MakeSharedCache();
} else if (opts.cache_type == PrimaryCacheType::kCacheTypeHCC) {
HyperClockCacheOptions cache_opts =
*(static_cast_with_check<HyperClockCacheOptions, ShardedCacheOptions>(
opts.cache_opts));
cache = cache_opts.MakeSharedCache();
} else {
return nullptr;
}
std::shared_ptr<SecondaryCache> sec_cache;
sec_cache = NewCompressedSecondaryCache(opts.comp_cache_opts);
return std::make_shared<CacheWithSecondaryAdapter>(cache, sec_cache, true);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -5,6 +5,7 @@
#pragma once #pragma once
#include "cache/cache_reservation_manager.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -13,15 +14,24 @@ class CacheWithSecondaryAdapter : public CacheWrapper {
public: public:
explicit CacheWithSecondaryAdapter( explicit CacheWithSecondaryAdapter(
std::shared_ptr<Cache> target, std::shared_ptr<Cache> target,
std::shared_ptr<SecondaryCache> secondary_cache); std::shared_ptr<SecondaryCache> secondary_cache,
bool distribute_cache_res = false);
~CacheWithSecondaryAdapter() override; ~CacheWithSecondaryAdapter() override;
Status Insert(const Slice& key, ObjectPtr value,
const CacheItemHelper* helper, size_t charge,
Handle** handle = nullptr,
Priority priority = Priority::LOW) override;
Handle* Lookup(const Slice& key, const CacheItemHelper* helper, Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
CreateContext* create_context, CreateContext* create_context,
Priority priority = Priority::LOW, Priority priority = Priority::LOW,
Statistics* stats = nullptr) override; Statistics* stats = nullptr) override;
using Cache::Release;
bool Release(Handle* handle, bool erase_if_last_ref = false) override;
ObjectPtr Value(Handle* handle) override; ObjectPtr Value(Handle* handle) override;
void StartAsyncLookup(AsyncLookupHandle& async_handle) override; void StartAsyncLookup(AsyncLookupHandle& async_handle) override;
@ -32,6 +42,10 @@ class CacheWithSecondaryAdapter : public CacheWrapper {
const char* Name() const override; const char* Name() const override;
Cache* TEST_GetCache() { return target_.get(); }
SecondaryCache* TEST_GetSecondaryCache() { return secondary_cache_.get(); }
private: private:
bool EvictionHandler(const Slice& key, Handle* handle); bool EvictionHandler(const Slice& key, Handle* handle);
@ -47,6 +61,16 @@ class CacheWithSecondaryAdapter : public CacheWrapper {
void CleanupCacheObject(ObjectPtr obj, const CacheItemHelper* helper); void CleanupCacheObject(ObjectPtr obj, const CacheItemHelper* helper);
std::shared_ptr<SecondaryCache> secondary_cache_; std::shared_ptr<SecondaryCache> secondary_cache_;
// Whether to proportionally distribute cache memory reservations, i.e
// placeholder entries with null value and a non-zero charge, across
// the primary and secondary caches.
bool distribute_cache_res_;
// A cache reservation manager to keep track of secondary cache memory
// usage by reserving equivalent capacity against the primary cache
std::shared_ptr<ConcurrentCacheReservationManager> pri_cache_res_;
// Fraction of a cache memory reservation to be assigned to the secondary
// cache
double sec_cache_res_ratio_;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -199,6 +199,8 @@ struct ShardedCacheOptions {
strict_capacity_limit(_strict_capacity_limit), strict_capacity_limit(_strict_capacity_limit),
memory_allocator(std::move(_memory_allocator)), memory_allocator(std::move(_memory_allocator)),
metadata_charge_policy(_metadata_charge_policy) {} metadata_charge_policy(_metadata_charge_policy) {}
// Make ShardedCacheOptions polymorphic
virtual ~ShardedCacheOptions() = default;
}; };
// LRUCache - A cache using LRU eviction to stay at or below a set capacity. // LRUCache - A cache using LRU eviction to stay at or below a set capacity.
@ -440,4 +442,24 @@ extern std::shared_ptr<Cache> NewClockCache(
CacheMetadataChargePolicy metadata_charge_policy = CacheMetadataChargePolicy metadata_charge_policy =
kDefaultCacheMetadataChargePolicy); kDefaultCacheMetadataChargePolicy);
enum PrimaryCacheType {
kCacheTypeLRU, // LRU cache type
kCacheTypeHCC, // Hyper Clock Cache type
kCacheTypeMax,
};
// A 2-tier cache with a primary block cache, and a compressed secondary
// cache. The returned cache instance will internally allocate a primary
// uncompressed cache of the specified type, and a compressed secondary
// cache. Any cache memory reservations, such as WriteBufferManager
// allocations costed to the block cache, will be distributed
// proportionally across both the primary and secondary.
struct TieredVolatileCacheOptions {
ShardedCacheOptions* cache_opts;
PrimaryCacheType cache_type;
CompressedSecondaryCacheOptions comp_cache_opts;
};
extern std::shared_ptr<Cache> NewTieredVolatileCache(
TieredVolatileCacheOptions& cache_opts);
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -134,6 +134,17 @@ class SecondaryCache : public Customizable {
virtual Status GetCapacity(size_t& /* capacity */) { virtual Status GetCapacity(size_t& /* capacity */) {
return Status::NotSupported(); return Status::NotSupported();
} }
// Temporarily decrease the cache capacity in RAM by the specified amount.
// The caller should call Inflate() to restore the cache capacity. This is
// intended to be lighter weight than SetCapacity(). The latter evenly
// distributes the new capacity across all shards and is meant for large
// changes in capacity, whereas the former is meant for relatively small
// changes and may be uneven by lowering capacity in a single shard.
virtual Status Deflate(size_t /*decrease*/) { return Status::NotSupported(); }
// Restore the capacity reduced by a prior call to Deflate().
virtual Status Inflate(size_t /*increase*/) { return Status::NotSupported(); }
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -602,6 +602,12 @@ DEFINE_uint32(
"compress_format_version == 2 -- decompressed size is included" "compress_format_version == 2 -- decompressed size is included"
" in the block header in varint32 format."); " in the block header in varint32 format.");
DEFINE_bool(use_tiered_volatile_cache, false,
"If use_compressed_secondary_cache is true and "
"use_tiered_volatile_cache is true, then allocate a tiered cache "
"that distributes cache reservations proportionally over both "
"the caches.");
DEFINE_int64(simcache_size, -1, DEFINE_int64(simcache_size, -1,
"Number of bytes to use as a simcache of " "Number of bytes to use as a simcache of "
"uncompressed data. Nagative value disables simcache."); "uncompressed data. Nagative value disables simcache.");
@ -3009,9 +3015,27 @@ class Benchmark {
} }
static std::shared_ptr<Cache> NewCache(int64_t capacity) { static std::shared_ptr<Cache> NewCache(int64_t capacity) {
CompressedSecondaryCacheOptions secondary_cache_opts;
bool use_tiered_cache = false;
if (capacity <= 0) { if (capacity <= 0) {
return nullptr; return nullptr;
} }
if (FLAGS_use_compressed_secondary_cache) {
secondary_cache_opts.capacity = FLAGS_compressed_secondary_cache_size;
secondary_cache_opts.num_shard_bits =
FLAGS_compressed_secondary_cache_numshardbits;
secondary_cache_opts.high_pri_pool_ratio =
FLAGS_compressed_secondary_cache_high_pri_pool_ratio;
secondary_cache_opts.low_pri_pool_ratio =
FLAGS_compressed_secondary_cache_low_pri_pool_ratio;
secondary_cache_opts.compression_type =
FLAGS_compressed_secondary_cache_compression_type_e;
secondary_cache_opts.compress_format_version =
FLAGS_compressed_secondary_cache_compress_format_version;
if (FLAGS_use_tiered_volatile_cache) {
use_tiered_cache = true;
}
}
if (FLAGS_cache_type == "clock_cache") { if (FLAGS_cache_type == "clock_cache") {
fprintf(stderr, "Old clock cache implementation has been removed.\n"); fprintf(stderr, "Old clock cache implementation has been removed.\n");
exit(1); exit(1);
@ -3021,7 +3045,16 @@ class Benchmark {
static_cast<size_t>(FLAGS_block_size) /*estimated_entry_charge*/, static_cast<size_t>(FLAGS_block_size) /*estimated_entry_charge*/,
FLAGS_cache_numshardbits}; FLAGS_cache_numshardbits};
hcco.hash_seed = GetCacheHashSeed(); hcco.hash_seed = GetCacheHashSeed();
return hcco.MakeSharedCache(); if (use_tiered_cache) {
TieredVolatileCacheOptions opts;
hcco.capacity += secondary_cache_opts.capacity;
opts.cache_type = PrimaryCacheType::kCacheTypeHCC;
opts.cache_opts = &hcco;
opts.comp_cache_opts = secondary_cache_opts;
return NewTieredVolatileCache(opts);
} else {
return hcco.MakeSharedCache();
}
} else if (FLAGS_cache_type == "lru_cache") { } else if (FLAGS_cache_type == "lru_cache") {
LRUCacheOptions opts( LRUCacheOptions opts(
static_cast<size_t>(capacity), FLAGS_cache_numshardbits, static_cast<size_t>(capacity), FLAGS_cache_numshardbits,
@ -3040,26 +3073,21 @@ class Benchmark {
exit(1); exit(1);
} }
opts.secondary_cache = secondary_cache; opts.secondary_cache = secondary_cache;
} } else if (FLAGS_use_compressed_secondary_cache && !use_tiered_cache) {
if (FLAGS_use_compressed_secondary_cache) {
CompressedSecondaryCacheOptions secondary_cache_opts;
secondary_cache_opts.capacity = FLAGS_compressed_secondary_cache_size;
secondary_cache_opts.num_shard_bits =
FLAGS_compressed_secondary_cache_numshardbits;
secondary_cache_opts.high_pri_pool_ratio =
FLAGS_compressed_secondary_cache_high_pri_pool_ratio;
secondary_cache_opts.low_pri_pool_ratio =
FLAGS_compressed_secondary_cache_low_pri_pool_ratio;
secondary_cache_opts.compression_type =
FLAGS_compressed_secondary_cache_compression_type_e;
secondary_cache_opts.compress_format_version =
FLAGS_compressed_secondary_cache_compress_format_version;
opts.secondary_cache = opts.secondary_cache =
NewCompressedSecondaryCache(secondary_cache_opts); NewCompressedSecondaryCache(secondary_cache_opts);
} }
return opts.MakeSharedCache(); if (use_tiered_cache) {
TieredVolatileCacheOptions tiered_opts;
opts.capacity += secondary_cache_opts.capacity;
tiered_opts.cache_type = PrimaryCacheType::kCacheTypeLRU;
tiered_opts.cache_opts = &opts;
tiered_opts.comp_cache_opts = secondary_cache_opts;
return NewTieredVolatileCache(tiered_opts);
} else {
return opts.MakeSharedCache();
}
} else { } else {
fprintf(stderr, "Cache type not supported."); fprintf(stderr, "Cache type not supported.");
exit(1); exit(1);

Loading…
Cancel
Save