diff --git a/cache/clock_cache.cc b/cache/clock_cache.cc index bca52493a..ead24ce62 100644 --- a/cache/clock_cache.cc +++ b/cache/clock_cache.cc @@ -271,7 +271,25 @@ class ClockCacheShard final : public CacheShard { Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Cache::Handle** handle, Cache::Priority priority) override; + Status Insert(const Slice& key, uint32_t hash, void* value, + const Cache::CacheItemHelper* helper, size_t charge, + Cache::Handle** handle, Cache::Priority priority) override { + return Insert(key, hash, value, charge, helper->del_cb, handle, priority); + } Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + Cache::Handle* Lookup(const Slice& key, uint32_t hash, + const Cache::CacheItemHelper* /*helper*/, + const Cache::CreateCallback& /*create_cb*/, + Cache::Priority /*priority*/, bool /*wait*/) override { + return Lookup(key, hash); + } + bool Release(Cache::Handle* handle, bool /*useful*/, + bool force_erase) override { + return Release(handle, force_erase); + } + bool IsReady(Cache::Handle* /*handle*/) override { return true; } + void Wait(Cache::Handle* /*handle*/) override {} + // If the entry in in cache, increase reference count and return true. // Return false otherwise. // @@ -797,6 +815,8 @@ class ClockCache final : public ShardedCache { #endif // __clang__ } + void WaitAll(std::vector& /*handles*/) override {} + private: ClockCacheShard* shards_; }; diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index 04aa8facf..da5707215 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -106,11 +106,11 @@ void LRUHandleTable::Resize() { length_bits_ = new_length_bits; } -LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, - double high_pri_pool_ratio, - bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy, - int max_upper_hash_bits) +LRUCacheShard::LRUCacheShard( + size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, + bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, + int max_upper_hash_bits, + const std::shared_ptr& secondary_cache) : capacity_(0), high_pri_pool_usage_(0), strict_capacity_limit_(strict_capacity_limit), @@ -119,7 +119,8 @@ LRUCacheShard::LRUCacheShard(size_t capacity, bool strict_capacity_limit, table_(max_upper_hash_bits), usage_(0), lru_usage_(0), - mutex_(use_adaptive_mutex) { + mutex_(use_adaptive_mutex), + secondary_cache_(secondary_cache) { set_metadata_charge_policy(metadata_charge_policy); // Make empty circular linked list lru_.next = &lru_; @@ -179,7 +180,10 @@ void LRUCacheShard::ApplyToSomeEntries( table_.ApplyToEntriesRange( [callback](LRUHandle* h) { - callback(h->key(), h->value, h->charge, h->deleter); + DeleterFn deleter = h->IsSecondaryCacheCompatible() + ? h->info_.helper->del_cb + : h->info_.deleter; + callback(h->key(), h->value, h->charge, deleter); }, index_begin, index_end); } @@ -288,8 +292,14 @@ void LRUCacheShard::SetCapacity(size_t capacity) { EvictFromLRU(0, &last_reference_list); } + // Try to insert the evicted entries into tiered cache // Free the entries outside of mutex for performance reasons for (auto entry : last_reference_list) { + if (secondary_cache_ && entry->IsSecondaryCacheCompatible() && + !entry->IsPromoted()) { + secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper) + .PermitUncheckedError(); + } entry->Free(); } } @@ -299,17 +309,139 @@ void LRUCacheShard::SetStrictCapacityLimit(bool strict_capacity_limit) { strict_capacity_limit_ = strict_capacity_limit; } -Cache::Handle* LRUCacheShard::Lookup(const Slice& key, uint32_t hash) { - MutexLock l(&mutex_); - LRUHandle* e = table_.Lookup(key, hash); - if (e != nullptr) { - assert(e->InCache()); - if (!e->HasRefs()) { - // The entry is in LRU since it's in hash and has no external references - LRU_Remove(e); +Status LRUCacheShard::InsertItem(LRUHandle* e, Cache::Handle** handle) { + Status s = Status::OK(); + autovector last_reference_list; + size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_); + + { + MutexLock l(&mutex_); + + // Free the space following strict LRU policy until enough space + // is freed or the lru list is empty + EvictFromLRU(total_charge, &last_reference_list); + + if ((usage_ + total_charge) > capacity_ && + (strict_capacity_limit_ || handle == nullptr)) { + if (handle == nullptr) { + // Don't insert the entry but still return ok, as if the entry inserted + // into cache and get evicted immediately. + e->SetInCache(false); + last_reference_list.push_back(e); + } else { + delete[] reinterpret_cast(e); + *handle = nullptr; + s = Status::Incomplete("Insert failed due to LRU cache being full."); + } + } else { + // Insert into the cache. Note that the cache might get larger than its + // capacity if not enough space was freed up. + LRUHandle* old = table_.Insert(e); + usage_ += total_charge; + if (old != nullptr) { + s = Status::OkOverwritten(); + assert(old->InCache()); + old->SetInCache(false); + if (!old->HasRefs()) { + // old is on LRU because it's in cache and its reference count is 0 + LRU_Remove(old); + size_t old_total_charge = + old->CalcTotalCharge(metadata_charge_policy_); + assert(usage_ >= old_total_charge); + usage_ -= old_total_charge; + last_reference_list.push_back(old); + } + } + if (handle == nullptr) { + LRU_Insert(e); + } else { + e->Ref(); + *handle = reinterpret_cast(e); + } + } + } + + // Try to insert the evicted entries into the secondary cache + // Free the entries here outside of mutex for performance reasons + for (auto entry : last_reference_list) { + if (secondary_cache_ && entry->IsSecondaryCacheCompatible() && + !entry->IsPromoted()) { + secondary_cache_->Insert(entry->key(), entry->value, entry->info_.helper) + .PermitUncheckedError(); + } + entry->Free(); + } + + return s; +} + +Cache::Handle* LRUCacheShard::Lookup( + const Slice& key, uint32_t hash, + const ShardedCache::CacheItemHelper* helper, + const ShardedCache::CreateCallback& create_cb, Cache::Priority priority, + bool wait) { + LRUHandle* e = nullptr; + { + MutexLock l(&mutex_); + e = table_.Lookup(key, hash); + if (e != nullptr) { + assert(e->InCache()); + if (!e->HasRefs()) { + // The entry is in LRU since it's in hash and has no external references + LRU_Remove(e); + } + e->Ref(); + e->SetHit(); + } + } + + // If handle table lookup failed, then allocate a handle outside the + // mutex if we're going to lookup in the secondary cache + // Only support synchronous for now + // TODO: Support asynchronous lookup in secondary cache + if (!e && secondary_cache_ && helper && helper->saveto_cb && wait) { + // For objects from the secondary cache, we expect the caller to provide + // a way to create/delete the primary cache object. The only case where + // a deleter would not be required is for dummy entries inserted for + // accounting purposes, which we won't demote to the secondary cache + // anyway. + assert(create_cb && helper->del_cb); + std::unique_ptr secondary_handle = + secondary_cache_->Lookup(key, create_cb, wait); + if (secondary_handle != nullptr) { + void* value = nullptr; + e = reinterpret_cast( + new char[sizeof(LRUHandle) - 1 + key.size()]); + + e->flags = 0; + e->SetPromoted(true); + e->SetSecondaryCacheCompatible(true); + e->info_.helper = helper; + e->key_length = key.size(); + e->hash = hash; + e->refs = 0; + e->next = e->prev = nullptr; + e->SetInCache(true); + e->SetPriority(priority); + memcpy(e->key_data, key.data(), key.size()); + + value = secondary_handle->Value(); + e->value = value; + e->charge = secondary_handle->Size(); + + // This call could nullify e if the cache is over capacity and + // strict_capacity_limit_ is true. In such a case, the caller will try + // to insert later, which might again fail, but its ok as this should + // not be common + // Being conservative here since there could be lookups that are + // actually ok to fail rather than succeed and bloat up the memory + // usage (preloading partitioned index blocks, for example). + Status s = InsertItem(e, reinterpret_cast(&e)); + if (!s.ok()) { + assert(e == nullptr); + (*helper->del_cb)(key, value); + } } - e->Ref(); - e->SetHit(); } return reinterpret_cast(e); } @@ -370,81 +502,32 @@ bool LRUCacheShard::Release(Cache::Handle* handle, bool force_erase) { Status LRUCacheShard::Insert(const Slice& key, uint32_t hash, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), + const Cache::CacheItemHelper* helper, Cache::Handle** handle, Cache::Priority priority) { // Allocate the memory here outside of the mutex // If the cache is full, we'll have to release it // It shouldn't happen very often though. LRUHandle* e = reinterpret_cast( new char[sizeof(LRUHandle) - 1 + key.size()]); - Status s = Status::OK(); - autovector last_reference_list; e->value = value; - e->deleter = deleter; + e->flags = 0; + if (helper) { + e->SetSecondaryCacheCompatible(true); + e->info_.helper = helper; + } else { + e->info_.deleter = deleter; + } e->charge = charge; e->key_length = key.size(); - e->flags = 0; e->hash = hash; e->refs = 0; e->next = e->prev = nullptr; e->SetInCache(true); e->SetPriority(priority); memcpy(e->key_data, key.data(), key.size()); - size_t total_charge = e->CalcTotalCharge(metadata_charge_policy_); - - { - MutexLock l(&mutex_); - // Free the space following strict LRU policy until enough space - // is freed or the lru list is empty - EvictFromLRU(total_charge, &last_reference_list); - - if ((usage_ + total_charge) > capacity_ && - (strict_capacity_limit_ || handle == nullptr)) { - if (handle == nullptr) { - // Don't insert the entry but still return ok, as if the entry inserted - // into cache and get evicted immediately. - e->SetInCache(false); - last_reference_list.push_back(e); - } else { - delete[] reinterpret_cast(e); - *handle = nullptr; - s = Status::Incomplete("Insert failed due to LRU cache being full."); - } - } else { - // Insert into the cache. Note that the cache might get larger than its - // capacity if not enough space was freed up. - LRUHandle* old = table_.Insert(e); - usage_ += total_charge; - if (old != nullptr) { - s = Status::OkOverwritten(); - assert(old->InCache()); - old->SetInCache(false); - if (!old->HasRefs()) { - // old is on LRU because it's in cache and its reference count is 0 - LRU_Remove(old); - size_t old_total_charge = - old->CalcTotalCharge(metadata_charge_policy_); - assert(usage_ >= old_total_charge); - usage_ -= old_total_charge; - last_reference_list.push_back(old); - } - } - if (handle == nullptr) { - LRU_Insert(e); - } else { - e->Ref(); - *handle = reinterpret_cast(e); - } - } - } - - // Free the entries here outside of mutex for performance reasons - for (auto entry : last_reference_list) { - entry->Free(); - } - - return s; + return InsertItem(e, handle); } void LRUCacheShard::Erase(const Slice& key, uint32_t hash) { @@ -500,7 +583,8 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, std::shared_ptr allocator, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& secondary_cache) : ShardedCache(capacity, num_shard_bits, strict_capacity_limit, std::move(allocator)) { num_shards_ = 1 << num_shard_bits; @@ -508,10 +592,10 @@ LRUCache::LRUCache(size_t capacity, int num_shard_bits, port::cacheline_aligned_alloc(sizeof(LRUCacheShard) * num_shards_)); size_t per_shard = (capacity + (num_shards_ - 1)) / num_shards_; for (int i = 0; i < num_shards_; i++) { - new (&shards_[i]) - LRUCacheShard(per_shard, strict_capacity_limit, high_pri_pool_ratio, - use_adaptive_mutex, metadata_charge_policy, - /* max_upper_hash_bits */ 32 - num_shard_bits); + new (&shards_[i]) LRUCacheShard( + per_shard, strict_capacity_limit, high_pri_pool_ratio, + use_adaptive_mutex, metadata_charge_policy, + /* max_upper_hash_bits */ 32 - num_shard_bits, secondary_cache); } } @@ -576,19 +660,12 @@ double LRUCache::GetHighPriPoolRatio() { return result; } -std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { - return NewLRUCache(cache_opts.capacity, cache_opts.num_shard_bits, - cache_opts.strict_capacity_limit, - cache_opts.high_pri_pool_ratio, - cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, - cache_opts.metadata_charge_policy); -} - std::shared_ptr NewLRUCache( size_t capacity, int num_shard_bits, bool strict_capacity_limit, double high_pri_pool_ratio, std::shared_ptr memory_allocator, bool use_adaptive_mutex, - CacheMetadataChargePolicy metadata_charge_policy) { + CacheMetadataChargePolicy metadata_charge_policy, + const std::shared_ptr& secondary_cache) { if (num_shard_bits >= 20) { return nullptr; // the cache cannot be sharded into too many fine pieces } @@ -601,7 +678,25 @@ std::shared_ptr NewLRUCache( } return std::make_shared( capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio, - std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy); + std::move(memory_allocator), use_adaptive_mutex, metadata_charge_policy, + secondary_cache); } +std::shared_ptr NewLRUCache(const LRUCacheOptions& cache_opts) { + return NewLRUCache( + cache_opts.capacity, cache_opts.num_shard_bits, + cache_opts.strict_capacity_limit, cache_opts.high_pri_pool_ratio, + cache_opts.memory_allocator, cache_opts.use_adaptive_mutex, + cache_opts.metadata_charge_policy, cache_opts.secondary_cache); +} + +std::shared_ptr NewLRUCache( + size_t capacity, int num_shard_bits, bool strict_capacity_limit, + double high_pri_pool_ratio, + std::shared_ptr memory_allocator, bool use_adaptive_mutex, + CacheMetadataChargePolicy metadata_charge_policy) { + return NewLRUCache(capacity, num_shard_bits, strict_capacity_limit, + high_pri_pool_ratio, memory_allocator, use_adaptive_mutex, + metadata_charge_policy, nullptr); +} } // namespace ROCKSDB_NAMESPACE diff --git a/cache/lru_cache.h b/cache/lru_cache.h index 04b54738b..76ca839fd 100644 --- a/cache/lru_cache.h +++ b/cache/lru_cache.h @@ -1,4 +1,4 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). @@ -14,6 +14,7 @@ #include "cache/sharded_cache.h" #include "port/malloc.h" #include "port/port.h" +#include "rocksdb/secondary_cache.h" #include "util/autovector.h" namespace ROCKSDB_NAMESPACE { @@ -49,7 +50,12 @@ namespace ROCKSDB_NAMESPACE { struct LRUHandle { void* value; - void (*deleter)(const Slice&, void* value); + union Info { + Info() {} + ~Info() {} + void (*deleter)(const Slice&, void* value); + const ShardedCache::CacheItemHelper* helper; + } info_; LRUHandle* next_hash; LRUHandle* next; LRUHandle* prev; @@ -69,6 +75,12 @@ struct LRUHandle { IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry has had any lookups (hits). HAS_HIT = (1 << 3), + // Can this be inserted into the tiered cache + IS_TIERED_CACHE_COMPATIBLE = (1 << 4), + // Is the handle still being read from a lower tier + IS_PENDING = (1 << 5), + // Has the item been promoted from a lower tier + IS_PROMOTED = (1 << 6), }; uint8_t flags; @@ -95,6 +107,11 @@ struct LRUHandle { bool IsHighPri() const { return flags & IS_HIGH_PRI; } bool InHighPriPool() const { return flags & IN_HIGH_PRI_POOL; } bool HasHit() const { return flags & HAS_HIT; } + bool IsSecondaryCacheCompatible() const { + return flags & IS_TIERED_CACHE_COMPATIBLE; + } + bool IsPending() const { return flags & IS_PENDING; } + bool IsPromoted() const { return flags & IS_PROMOTED; } void SetInCache(bool in_cache) { if (in_cache) { @@ -122,10 +139,36 @@ struct LRUHandle { void SetHit() { flags |= HAS_HIT; } + void SetSecondaryCacheCompatible(bool tiered) { + if (tiered) { + flags |= IS_TIERED_CACHE_COMPATIBLE; + } else { + flags &= ~IS_TIERED_CACHE_COMPATIBLE; + } + } + + void SetIncomplete(bool incomp) { + if (incomp) { + flags |= IS_PENDING; + } else { + flags &= ~IS_PENDING; + } + } + + void SetPromoted(bool promoted) { + if (promoted) { + flags |= IS_PROMOTED; + } else { + flags &= ~IS_PROMOTED; + } + } + void Free() { assert(refs == 0); - if (deleter) { - (*deleter)(key(), value); + if (!IsSecondaryCacheCompatible() && info_.deleter) { + (*info_.deleter)(key(), value); + } else if (IsSecondaryCacheCompatible()) { + (*info_.helper->del_cb)(key(), value); } delete[] reinterpret_cast(this); } @@ -207,7 +250,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { LRUCacheShard(size_t capacity, bool strict_capacity_limit, double high_pri_pool_ratio, bool use_adaptive_mutex, CacheMetadataChargePolicy metadata_charge_policy, - int max_upper_hash_bits); + int max_upper_hash_bits, + const std::shared_ptr& secondary_cache); virtual ~LRUCacheShard() override = default; // Separate from constructor so caller can easily make an array of LRUCache @@ -226,8 +270,32 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { size_t charge, void (*deleter)(const Slice& key, void* value), Cache::Handle** handle, - Cache::Priority priority) override; - virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override; + Cache::Priority priority) override { + return Insert(key, hash, value, charge, deleter, nullptr, handle, priority); + } + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + const Cache::CacheItemHelper* helper, size_t charge, + Cache::Handle** handle, + Cache::Priority priority) override { + assert(helper); + return Insert(key, hash, value, charge, nullptr, helper, handle, priority); + } + // If helper_cb is null, the values of the following arguments don't + // matter + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash, + const ShardedCache::CacheItemHelper* helper, + const ShardedCache::CreateCallback& create_cb, + ShardedCache::Priority priority, + bool wait) override; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) override { + return Lookup(key, hash, nullptr, nullptr, Cache::Priority::LOW, true); + } + virtual bool Release(Cache::Handle* handle, bool /*useful*/, + bool force_erase) override { + return Release(handle, force_erase); + } + virtual bool IsReady(Cache::Handle* /*handle*/) override { return true; } + virtual void Wait(Cache::Handle* /*handle*/) override {} virtual bool Ref(Cache::Handle* handle) override; virtual bool Release(Cache::Handle* handle, bool force_erase = false) override; @@ -259,6 +327,11 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { double GetHighPriPoolRatio(); private: + Status InsertItem(LRUHandle* item, Cache::Handle** handle); + Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, + void (*deleter)(const Slice& key, void* value), + const Cache::CacheItemHelper* helper, Cache::Handle** handle, + Cache::Priority priority); void LRU_Remove(LRUHandle* e); void LRU_Insert(LRUHandle* e); @@ -319,6 +392,8 @@ class ALIGN_AS(CACHE_LINE_SIZE) LRUCacheShard final : public CacheShard { // We don't count mutex_ as the cache's internal state so semantically we // don't mind mutex_ invoking the non-const actions. mutable port::Mutex mutex_; + + std::shared_ptr secondary_cache_; }; class LRUCache @@ -332,7 +407,8 @@ class LRUCache std::shared_ptr memory_allocator = nullptr, bool use_adaptive_mutex = kDefaultToAdaptiveMutex, CacheMetadataChargePolicy metadata_charge_policy = - kDontChargeCacheMetadata); + kDontChargeCacheMetadata, + const std::shared_ptr& secondary_cache = nullptr); virtual ~LRUCache(); virtual const char* Name() const override { return "LRUCache"; } virtual CacheShard* GetShard(uint32_t shard) override; @@ -341,6 +417,7 @@ class LRUCache virtual size_t GetCharge(Handle* handle) const override; virtual uint32_t GetHash(Handle* handle) const override; virtual void DisownData() override; + virtual void WaitAll(std::vector& /*handles*/) override {} // Retrieves number of elements in LRU, for unit test purpose only size_t TEST_GetLRUSize(); diff --git a/cache/lru_cache_test.cc b/cache/lru_cache_test.cc index d9c0d064d..5a8f2e1d0 100644 --- a/cache/lru_cache_test.cc +++ b/cache/lru_cache_test.cc @@ -7,8 +7,12 @@ #include #include + #include "port/port.h" +#include "rocksdb/cache.h" #include "test_util/testharness.h" +#include "util/coding.h" +#include "util/random.h" namespace ROCKSDB_NAMESPACE { @@ -30,10 +34,10 @@ class LRUCacheTest : public testing::Test { DeleteCache(); cache_ = reinterpret_cast( port::cacheline_aligned_alloc(sizeof(LRUCacheShard))); - new (cache_) - LRUCacheShard(capacity, false /*strict_capacity_limit*/, - high_pri_pool_ratio, use_adaptive_mutex, - kDontChargeCacheMetadata, 24 /*max_upper_hash_bits*/); + new (cache_) LRUCacheShard( + capacity, false /*strict_capcity_limit*/, high_pri_pool_ratio, + use_adaptive_mutex, kDontChargeCacheMetadata, + 24 /*max_upper_hash_bits*/, nullptr /*secondary_cache*/); } void Insert(const std::string& key, @@ -192,6 +196,357 @@ TEST_F(LRUCacheTest, EntriesWithPriority) { ValidateLRUList({"e", "f", "g", "Z", "d"}, 2); } +class TestSecondaryCache : public SecondaryCache { + public: + explicit TestSecondaryCache(size_t capacity) + : num_inserts_(0), num_lookups_(0) { + cache_ = NewLRUCache(capacity, 0, false, 0.5, nullptr, + kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + } + ~TestSecondaryCache() override { cache_.reset(); } + + std::string Name() override { return "TestSecondaryCache"; } + + Status Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper) override { + size_t size; + char* buf; + Status s; + + num_inserts_++; + size = (*helper->size_cb)(value); + buf = new char[size + sizeof(uint64_t)]; + EncodeFixed64(buf, size); + s = (*helper->saveto_cb)(value, 0, size, buf + sizeof(uint64_t)); + if (!s.ok()) { + delete[] buf; + return s; + } + return cache_->Insert(key, buf, size, + [](const Slice& /*key*/, void* val) -> void { + delete[] static_cast(val); + }); + } + + std::unique_ptr Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, + bool /*wait*/) override { + std::unique_ptr secondary_handle; + Cache::Handle* handle = cache_->Lookup(key); + num_lookups_++; + if (handle) { + void* value; + size_t charge; + char* ptr = (char*)cache_->Value(handle); + size_t size = DecodeFixed64(ptr); + ptr += sizeof(uint64_t); + Status s = create_cb(ptr, size, &value, &charge); + if (s.ok()) { + secondary_handle.reset( + new TestSecondaryCacheHandle(cache_.get(), handle, value, charge)); + } else { + cache_->Release(handle); + } + } + return secondary_handle; + } + + void Erase(const Slice& /*key*/) override {} + + void WaitAll(std::vector /*handles*/) override {} + + std::string GetPrintableOptions() const override { return ""; } + + uint32_t num_inserts() { return num_inserts_; } + + uint32_t num_lookups() { return num_lookups_; } + + private: + class TestSecondaryCacheHandle : public SecondaryCacheHandle { + public: + TestSecondaryCacheHandle(Cache* cache, Cache::Handle* handle, void* value, + size_t size) + : cache_(cache), handle_(handle), value_(value), size_(size) {} + ~TestSecondaryCacheHandle() override { cache_->Release(handle_); } + + bool IsReady() override { return true; } + + void Wait() override {} + + void* Value() override { return value_; } + + size_t Size() override { return size_; } + + private: + Cache* cache_; + Cache::Handle* handle_; + void* value_; + size_t size_; + }; + + std::shared_ptr cache_; + uint32_t num_inserts_; + uint32_t num_lookups_; +}; + +class LRUSecondaryCacheTest : public LRUCacheTest { + public: + LRUSecondaryCacheTest() : fail_create_(false) {} + ~LRUSecondaryCacheTest() {} + + protected: + class TestItem { + public: + TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) { + memcpy(buf_.get(), buf, size); + } + ~TestItem() {} + + char* Buf() { return buf_.get(); } + size_t Size() { return size_; } + + private: + std::unique_ptr buf_; + size_t size_; + }; + + static size_t SizeCallback(void* obj) { + return reinterpret_cast(obj)->Size(); + } + + static Status SaveToCallback(void* obj, size_t offset, size_t size, + void* out) { + TestItem* item = reinterpret_cast(obj); + char* buf = item->Buf(); + EXPECT_EQ(size, item->Size()); + EXPECT_EQ(offset, 0); + memcpy(out, buf, size); + return Status::OK(); + } + + static void DeletionCallback(const Slice& /*key*/, void* obj) { + delete reinterpret_cast(obj); + } + + static Cache::CacheItemHelper helper_; + + static Status SaveToCallbackFail(void* /*obj*/, size_t /*offset*/, + size_t /*size*/, void* /*out*/) { + return Status::NotSupported(); + } + + static Cache::CacheItemHelper helper_fail_; + + Cache::CreateCallback test_item_creator = + [&](void* buf, size_t size, void** out_obj, size_t* charge) -> Status { + if (fail_create_) { + return Status::NotSupported(); + } + *out_obj = reinterpret_cast(new TestItem((char*)buf, size)); + *charge = size; + return Status::OK(); + }; + + void SetFailCreate(bool fail) { fail_create_ = fail; } + + private: + bool fail_create_; +}; + +Cache::CacheItemHelper LRUSecondaryCacheTest::helper_( + LRUSecondaryCacheTest::SizeCallback, LRUSecondaryCacheTest::SaveToCallback, + LRUSecondaryCacheTest::DeletionCallback); + +Cache::CacheItemHelper LRUSecondaryCacheTest::helper_fail_( + LRUSecondaryCacheTest::SizeCallback, + LRUSecondaryCacheTest::SaveToCallbackFail, + LRUSecondaryCacheTest::DeletionCallback); + +TEST_F(LRUSecondaryCacheTest, BasicTest) { + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + std::shared_ptr secondary_cache = + std::make_shared(2048); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k2 should be demoted to NVM + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should promote k1 and demote k2 + handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + ASSERT_EQ(secondary_cache->num_inserts(), 2u); + ASSERT_EQ(secondary_cache->num_lookups(), 1u); + + cache.reset(); + secondary_cache.reset(); +} + +TEST_F(LRUSecondaryCacheTest, BasicFailTest) { + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + std::shared_ptr secondary_cache = + std::make_shared(2048); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_NOK(cache->Insert("k1", item1, nullptr, str1.length())); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", nullptr, test_item_creator, Cache::Priority::LOW, + true); + ASSERT_EQ(handle, nullptr); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, false); + ASSERT_EQ(handle, nullptr); + + cache.reset(); + secondary_cache.reset(); +} + +TEST_F(LRUSecondaryCacheTest, SaveFailTest) { + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + std::shared_ptr secondary_cache = + std::make_shared(2048); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_fail_, + str1.length())); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to NVM + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_fail_, + str2.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should fail, since k1 demotion would have failed + handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + // Since k1 didn't get promoted, k2 should still be in cache + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_fail_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + ASSERT_EQ(secondary_cache->num_inserts(), 1u); + ASSERT_EQ(secondary_cache->num_lookups(), 1u); + + cache.reset(); + secondary_cache.reset(); +} + +TEST_F(LRUSecondaryCacheTest, CreateFailTest) { + LRUCacheOptions opts(1024, 0, false, 0.5, nullptr, kDefaultToAdaptiveMutex, + kDontChargeCacheMetadata); + std::shared_ptr secondary_cache = + std::make_shared(2048); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to NVM + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + Cache::Handle* handle; + SetFailCreate(true); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + // This lookup should fail, since k1 creation would have failed + handle = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle, nullptr); + // Since k1 didn't get promoted, k2 should still be in cache + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + ASSERT_EQ(secondary_cache->num_inserts(), 1u); + ASSERT_EQ(secondary_cache->num_lookups(), 1u); + + cache.reset(); + secondary_cache.reset(); +} + +TEST_F(LRUSecondaryCacheTest, FullCapacityTest) { + LRUCacheOptions opts(1024, 0, /*_strict_capacity_limit=*/true, 0.5, nullptr, + kDefaultToAdaptiveMutex, kDontChargeCacheMetadata); + std::shared_ptr secondary_cache = + std::make_shared(2048); + opts.secondary_cache = secondary_cache; + std::shared_ptr cache = NewLRUCache(opts); + + Random rnd(301); + std::string str1 = rnd.RandomString(1020); + TestItem* item1 = new TestItem(str1.data(), str1.length()); + ASSERT_OK(cache->Insert("k1", item1, &LRUSecondaryCacheTest::helper_, + str1.length())); + std::string str2 = rnd.RandomString(1020); + TestItem* item2 = new TestItem(str2.data(), str2.length()); + // k1 should be demoted to NVM + ASSERT_OK(cache->Insert("k2", item2, &LRUSecondaryCacheTest::helper_, + str2.length())); + + Cache::Handle* handle; + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + // This lookup should fail, since k1 promotion would have failed due to + // the block cache being at capacity + Cache::Handle* handle2; + handle2 = cache->Lookup("k1", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_EQ(handle2, nullptr); + // Since k1 didn't get promoted, k2 should still be in cache + cache->Release(handle); + handle = cache->Lookup("k2", &LRUSecondaryCacheTest::helper_, + test_item_creator, Cache::Priority::LOW, true); + ASSERT_NE(handle, nullptr); + cache->Release(handle); + ASSERT_EQ(secondary_cache->num_inserts(), 1u); + ASSERT_EQ(secondary_cache->num_lookups(), 1u); + + cache.reset(); + secondary_cache.reset(); +} } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/cache/sharded_cache.cc b/cache/sharded_cache.cc index 8d909ce49..7e9272727 100644 --- a/cache/sharded_cache.cc +++ b/cache/sharded_cache.cc @@ -63,11 +63,42 @@ Status ShardedCache::Insert(const Slice& key, void* value, size_t charge, ->Insert(key, hash, value, charge, deleter, handle, priority); } +Status ShardedCache::Insert(const Slice& key, void* value, + const CacheItemHelper* helper, size_t charge, + Handle** handle, Priority priority) { + uint32_t hash = HashSlice(key); + if (!helper) { + return Status::InvalidArgument(); + } + return GetShard(Shard(hash)) + ->Insert(key, hash, value, helper, charge, handle, priority); +} + Cache::Handle* ShardedCache::Lookup(const Slice& key, Statistics* /*stats*/) { uint32_t hash = HashSlice(key); return GetShard(Shard(hash))->Lookup(key, hash); } +Cache::Handle* ShardedCache::Lookup(const Slice& key, + const CacheItemHelper* helper, + const CreateCallback& create_cb, + Priority priority, bool wait, + Statistics* /*stats*/) { + uint32_t hash = HashSlice(key); + return GetShard(Shard(hash)) + ->Lookup(key, hash, helper, create_cb, priority, wait); +} + +bool ShardedCache::IsReady(Handle* handle) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->IsReady(handle); +} + +void ShardedCache::Wait(Handle* handle) { + uint32_t hash = GetHash(handle); + GetShard(Shard(hash))->Wait(handle); +} + bool ShardedCache::Ref(Handle* handle) { uint32_t hash = GetHash(handle); return GetShard(Shard(hash))->Ref(handle); @@ -78,6 +109,11 @@ bool ShardedCache::Release(Handle* handle, bool force_erase) { return GetShard(Shard(hash))->Release(handle, force_erase); } +bool ShardedCache::Release(Handle* handle, bool useful, bool force_erase) { + uint32_t hash = GetHash(handle); + return GetShard(Shard(hash))->Release(handle, useful, force_erase); +} + void ShardedCache::Erase(const Slice& key) { uint32_t hash = HashSlice(key); GetShard(Shard(hash))->Erase(key, hash); diff --git a/cache/sharded_cache.h b/cache/sharded_cache.h index 54c9caf5b..b0d4f3de6 100644 --- a/cache/sharded_cache.h +++ b/cache/sharded_cache.h @@ -27,9 +27,20 @@ class CacheShard { virtual Status Insert(const Slice& key, uint32_t hash, void* value, size_t charge, DeleterFn deleter, Cache::Handle** handle, Cache::Priority priority) = 0; + virtual Status Insert(const Slice& key, uint32_t hash, void* value, + const Cache::CacheItemHelper* helper, size_t charge, + Cache::Handle** handle, Cache::Priority priority) = 0; virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash) = 0; + virtual Cache::Handle* Lookup(const Slice& key, uint32_t hash, + const Cache::CacheItemHelper* helper, + const Cache::CreateCallback& create_cb, + Cache::Priority priority, bool wait) = 0; + virtual bool Release(Cache::Handle* handle, bool useful, + bool force_erase) = 0; + virtual bool IsReady(Cache::Handle* handle) = 0; + virtual void Wait(Cache::Handle* handle) = 0; virtual bool Ref(Cache::Handle* handle) = 0; - virtual bool Release(Cache::Handle* handle, bool force_erase = false) = 0; + virtual bool Release(Cache::Handle* handle, bool force_erase) = 0; virtual void Erase(const Slice& key, uint32_t hash) = 0; virtual void SetCapacity(size_t capacity) = 0; virtual void SetStrictCapacityLimit(bool strict_capacity_limit) = 0; @@ -67,6 +78,7 @@ class ShardedCache : public Cache { virtual const CacheShard* GetShard(uint32_t shard) const = 0; virtual void* Value(Handle* handle) override = 0; virtual size_t GetCharge(Handle* handle) const override = 0; + virtual void WaitAll(std::vector& handles) override = 0; virtual uint32_t GetHash(Handle* handle) const = 0; virtual void DisownData() override = 0; @@ -77,7 +89,18 @@ class ShardedCache : public Cache { virtual Status Insert(const Slice& key, void* value, size_t charge, DeleterFn deleter, Handle** handle, Priority priority) override; + virtual Status Insert(const Slice& key, void* value, + const CacheItemHelper* helper, size_t chargge, + Handle** handle = nullptr, + Priority priority = Priority::LOW) override; virtual Handle* Lookup(const Slice& key, Statistics* stats) override; + virtual Handle* Lookup(const Slice& key, const CacheItemHelper* helper, + const CreateCallback& create_cb, Priority priority, + bool wait, Statistics* stats = nullptr) override; + virtual bool Release(Handle* handle, bool useful, + bool force_erase = false) override; + virtual bool IsReady(Handle* handle) override; + virtual void Wait(Handle* handle) override; virtual bool Ref(Handle* handle) override; virtual bool Release(Handle* handle, bool force_erase = false) override; virtual void Erase(const Slice& key) override; diff --git a/db/db_basic_test.cc b/db/db_basic_test.cc index 63a225252..34f27b809 100644 --- a/db/db_basic_test.cc +++ b/db/db_basic_test.cc @@ -2820,6 +2820,7 @@ class DBBasicTestMultiGet : public DBTestBase { const char* Name() const override { return "MyBlockCache"; } + using Cache::Insert; Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle = nullptr, @@ -2828,6 +2829,7 @@ class DBBasicTestMultiGet : public DBTestBase { return target_->Insert(key, value, charge, deleter, handle, priority); } + using Cache::Lookup; Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override { num_lookups_++; Handle* handle = target_->Lookup(key, stats); diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index cf6f6dfd9..a2a08abc8 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -446,6 +446,7 @@ class MockCache : public LRUCache { false /*strict_capacity_limit*/, 0.0 /*high_pri_pool_ratio*/) { } + using ShardedCache::Insert; Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle, Priority priority) override { @@ -533,6 +534,7 @@ class LookupLiarCache : public CacheWrapper { explicit LookupLiarCache(std::shared_ptr target) : CacheWrapper(std::move(target)) {} + using Cache::Lookup; Handle* Lookup(const Slice& key, Statistics* stats) override { if (nth_lookup_not_found_ == 1) { nth_lookup_not_found_ = 0; diff --git a/db/db_test_util.h b/db/db_test_util.h index a11457d9d..b1bdb8456 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -826,6 +826,7 @@ class CacheWrapper : public Cache { const char* Name() const override { return target_->Name(); } + using Cache::Insert; Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle = nullptr, @@ -833,12 +834,14 @@ class CacheWrapper : public Cache { return target_->Insert(key, value, charge, deleter, handle, priority); } + using Cache::Lookup; Handle* Lookup(const Slice& key, Statistics* stats = nullptr) override { return target_->Lookup(key, stats); } bool Ref(Handle* handle) override { return target_->Ref(handle); } + using Cache::Release; bool Release(Handle* handle, bool force_erase = false) override { return target_->Release(handle, force_erase); } diff --git a/include/rocksdb/cache.h b/include/rocksdb/cache.h index 21e9d2a5d..6d8de70cd 100644 --- a/include/rocksdb/cache.h +++ b/include/rocksdb/cache.h @@ -36,6 +36,7 @@ namespace ROCKSDB_NAMESPACE { class Cache; struct ConfigOptions; +class SecondaryCache; extern const bool kDefaultToAdaptiveMutex; @@ -89,6 +90,9 @@ struct LRUCacheOptions { CacheMetadataChargePolicy metadata_charge_policy = kDefaultCacheMetadataChargePolicy; + // A SecondaryCache instance to use a the non-volatile tier + std::shared_ptr secondary_cache; + LRUCacheOptions() {} LRUCacheOptions(size_t _capacity, int _num_shard_bits, bool _strict_capacity_limit, double _high_pri_pool_ratio, @@ -143,6 +147,67 @@ class Cache { // likely to get evicted than low priority entries. enum class Priority { HIGH, LOW }; + // A set of callbacks to allow objects in the primary block cache to be + // be persisted in a secondary cache. The purpose of the secondary cache + // is to support other ways of caching the object, such as persistent or + // compressed data, that may require the object to be parsed and transformed + // in some way. Since the primary cache holds C++ objects and the secondary + // cache may only hold flat data that doesn't need relocation, these + // callbacks need to be provided by the user of the block + // cache to do the conversion. + // The CacheItemHelper is passed to Insert() and Lookup(). It has pointers + // to callback functions for size, saving and deletion of the + // object. The callbacks are defined in C-style in order to make them + // stateless and not add to the cache metadata size. + // Saving multiple std::function objects will take up 32 bytes per + // function, even if its not bound to an object and does no capture. + // + // All the callbacks are C-style function pointers in order to simplify + // lifecycle management. Objects in the cache can outlive the parent DB, + // so anything required for these operations should be contained in the + // object itself. + // + // The SizeCallback takes a void* pointer to the object and returns the size + // of the persistable data. It can be used by the secondary cache to allocate + // memory if needed. + using SizeCallback = size_t (*)(void* obj); + + // The SaveToCallback takes a void* object pointer and saves the persistable + // data into a buffer. The secondary cache may decide to not store it in a + // contiguous buffer, in which case this callback will be called multiple + // times with increasing offset + using SaveToCallback = Status (*)(void* obj, size_t offset, size_t size, + void* out); + + // A function pointer type for custom destruction of an entry's + // value. The Cache is responsible for copying and reclaiming space + // for the key, but values are managed by the caller. + using DeleterFn = void (*)(const Slice& key, void* value); + + // A struct with pointers to helper functions for spilling items from the + // cache into the secondary cache. May be extended in the future. An + // instance of this struct is expected to outlive the cache. + struct CacheItemHelper { + SizeCallback size_cb; + SaveToCallback saveto_cb; + DeleterFn del_cb; + + CacheItemHelper() : size_cb(nullptr), saveto_cb(nullptr), del_cb(nullptr) {} + CacheItemHelper(SizeCallback _size_cb, SaveToCallback _saveto_cb, + DeleterFn _del_cb) + : size_cb(_size_cb), saveto_cb(_saveto_cb), del_cb(_del_cb) {} + }; + + // The CreateCallback is passed by the block cache user to Lookup(). It + // takes in a buffer from the NVM cache and constructs an object using + // it. The callback doesn't have ownership of the buffer and should + // copy the contents into its own buffer. + // typedef std::function + // CreateCallback; + using CreateCallback = std::function; + Cache(std::shared_ptr allocator = nullptr) : memory_allocator_(std::move(allocator)) {} // No copying allowed @@ -173,16 +238,11 @@ class Cache { // Opaque handle to an entry stored in the cache. struct Handle {}; - // A function pointer type for custom destruction of an entry's - // value. The Cache is responsible for copying and reclaiming space - // for the key, but values are managed by the caller. - using DeleterFn = void (*)(const Slice& key, void* value); - // The type of the Cache virtual const char* Name() const = 0; - // Insert a mapping from key->value into the cache and assign it - // the specified charge against the total cache capacity. + // Insert a mapping from key->value into the volatile cache only + // and assign it // the specified charge against the total cache capacity. // If strict_capacity_limit is true and cache reaches its full capacity, // return Status::Incomplete. // @@ -321,6 +381,104 @@ class Cache { MemoryAllocator* memory_allocator() const { return memory_allocator_.get(); } + // EXPERIMENTAL + // The following APIs are experimental and might change in the future. + // The Insert and Lookup APIs below are intended to allow cached objects + // to be demoted/promoted between the primary block cache and a secondary + // cache. The secondary cache could be a non-volatile cache, and will + // likely store the object in a different representation more suitable + // for on disk storage. They rely on a per object CacheItemHelper to do + // the conversions. + // The secondary cache may persist across process and system restarts, + // and may even be moved between hosts. Therefore, the cache key must + // be repeatable across restarts/reboots, and globally unique if + // multiple DBs share the same cache and the set of DBs can change + // over time. + + // Insert a mapping from key->value into the cache and assign it + // the specified charge against the total cache capacity. + // If strict_capacity_limit is true and cache reaches its full capacity, + // return Status::Incomplete. + // + // The helper argument is saved by the cache and will be used when the + // inserted object is evicted or promoted to the secondary cache. It, + // therefore, must outlive the cache. + // + // If handle is not nullptr, returns a handle that corresponds to the + // mapping. The caller must call this->Release(handle) when the returned + // mapping is no longer needed. In case of error caller is responsible to + // cleanup the value (i.e. calling "deleter"). + // + // If handle is nullptr, it is as if Release is called immediately after + // insert. In case of error value will be cleanup. + // + // Regardless of whether the item was inserted into the cache, + // it will attempt to insert it into the secondary cache if one is + // configured, and the helper supports it. + // The cache implementation must support a secondary cache, otherwise + // the item is only inserted into the primary cache. It may + // defer the insertion to the secondary cache as it sees fit. + // + // When the inserted entry is no longer needed, the key and + // value will be passed to "deleter". + virtual Status Insert(const Slice& key, void* value, + const CacheItemHelper* helper, size_t charge, + Handle** handle = nullptr, + Priority priority = Priority::LOW) { + if (!helper) { + return Status::InvalidArgument(); + } + return Insert(key, value, charge, helper->del_cb, handle, priority); + } + + // Lookup the key in the primary and secondary caches (if one is configured). + // The create_cb callback function object will be used to contruct the + // cached object. + // If none of the caches have the mapping for the key, returns nullptr. + // Else, returns a handle that corresponds to the mapping. + // + // This call may promote the object from the secondary cache (if one is + // configured, and has the given key) to the primary cache. + // + // The helper argument should be provided if the caller wants the lookup + // to include the secondary cache (if one is configured) and the object, + // if it exists, to be promoted to the primary cache. The helper may be + // saved and used later when the object is evicted. Therefore, it must + // outlive the cache. + // + // The handle returned may not be ready. The caller should call IsReady() + // to check if the item value is ready, and call Wait() or WaitAll() if + // its not ready. The caller should then call Value() to check if the + // item was successfully retrieved. If unsuccessful (perhaps due to an + // IO error), Value() will return nullptr. + virtual Handle* Lookup(const Slice& key, const CacheItemHelper* /*helper_cb*/, + const CreateCallback& /*create_cb*/, + Priority /*priority*/, bool /*wait*/, + Statistics* stats = nullptr) { + return Lookup(key, stats); + } + + // Release a mapping returned by a previous Lookup(). The "useful" + // parameter specifies whether the data was actually used or not, + // which may be used by the cache implementation to decide whether + // to consider it as a hit for retention purposes. + virtual bool Release(Handle* handle, bool /*useful*/, bool force_erase) { + return Release(handle, force_erase); + } + + // Determines if the handle returned by Lookup() has a valid value yet. + virtual bool IsReady(Handle* /*handle*/) { return true; } + + // If the handle returned by Lookup() is not ready yet, wait till it + // becomes ready. + // Note: A ready handle doesn't necessarily mean it has a valid value. The + // user should call Value() and check for nullptr. + virtual void Wait(Handle* /*handle*/) {} + + // Wait for a vector of handles to become ready. As with Wait(), the user + // should check the Value() of each handle for nullptr + virtual void WaitAll(std::vector& /*handles*/) {} + private: std::shared_ptr memory_allocator_; }; diff --git a/include/rocksdb/secondary_cache.h b/include/rocksdb/secondary_cache.h new file mode 100644 index 000000000..740a3c435 --- /dev/null +++ b/include/rocksdb/secondary_cache.h @@ -0,0 +1,77 @@ +// Copyright (c) 2021, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include + +#include +#include + +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" + +namespace ROCKSDB_NAMESPACE { + +// A handle for lookup result. The handle may not be immediately ready or +// have a valid value. The caller must call isReady() to determine if its +// ready, and call Wait() in order to block until it becomes ready. +// The caller must call value() after it becomes ready to determine if the +// handle successfullly read the item. +class SecondaryCacheHandle { + public: + virtual ~SecondaryCacheHandle() {} + + // Returns whether the handle is ready or not + virtual bool IsReady() = 0; + + // Block until handle becomes ready + virtual void Wait() = 0; + + // Return the value. If nullptr, it means the lookup was unsuccessful + virtual void* Value() = 0; + + // Return the size of value + virtual size_t Size() = 0; +}; + +// SecondaryCache +// +// Cache interface for caching blocks on a secondary tier (which can include +// non-volatile media, or alternate forms of caching such as compressed data) +class SecondaryCache { + public: + virtual ~SecondaryCache() {} + + virtual std::string Name() = 0; + + // Insert the given value into this cache. The value is not written + // directly. Rather, the SaveToCallback provided by helper_cb will be + // used to extract the persistable data in value, which will be written + // to this tier. The implementation may or may not write it to cache + // depending on the admission control policy, even if the return status is + // success. + virtual Status Insert(const Slice& key, void* value, + const Cache::CacheItemHelper* helper) = 0; + + // Lookup the data for the given key in this cache. The create_cb + // will be used to create the object. The handle returned may not be + // ready yet, unless wait=true, in which case Lookup() will block until + // the handle is ready + virtual std::unique_ptr Lookup( + const Slice& key, const Cache::CreateCallback& create_cb, bool wait) = 0; + + // At the discretion of the implementation, erase the data associated + // with key + virtual void Erase(const Slice& key) = 0; + + // Wait for a collection of handles to become ready + virtual void WaitAll(std::vector handles) = 0; + + virtual std::string GetPrintableOptions() const = 0; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/utilities/simulator_cache/sim_cache.cc b/utilities/simulator_cache/sim_cache.cc index b0d619eb3..c17b0c1b7 100644 --- a/utilities/simulator_cache/sim_cache.cc +++ b/utilities/simulator_cache/sim_cache.cc @@ -167,6 +167,7 @@ class SimCacheImpl : public SimCache { cache_->SetStrictCapacityLimit(strict_capacity_limit); } + using Cache::Insert; Status Insert(const Slice& key, void* value, size_t charge, void (*deleter)(const Slice& key, void* value), Handle** handle, Priority priority) override { @@ -193,6 +194,7 @@ class SimCacheImpl : public SimCache { return cache_->Insert(key, value, charge, deleter, handle, priority); } + using Cache::Lookup; Handle* Lookup(const Slice& key, Statistics* stats) override { Handle* h = key_only_cache_->Lookup(key); if (h != nullptr) { @@ -213,6 +215,7 @@ class SimCacheImpl : public SimCache { bool Ref(Handle* handle) override { return cache_->Ref(handle); } + using Cache::Release; bool Release(Handle* handle, bool force_erase = false) override { return cache_->Release(handle, force_erase); }