diff --git a/CMakeLists.txt b/CMakeLists.txt index 1d78743d9..d7ab5b488 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -624,6 +624,7 @@ find_package(Threads REQUIRED) set(SOURCES cache/cache.cc cache/cache_entry_roles.cc + cache/cache_reservation_manager.cc cache/clock_cache.cc cache/lru_cache.cc cache/sharded_cache.cc @@ -1123,6 +1124,7 @@ if(WITH_TESTS) ) if(WITH_ALL_TESTS) list(APPEND TESTS + cache/cache_reservation_manager_test.cc cache/cache_test.cc cache/lru_cache_test.cc db/blob/blob_counting_iterator_test.cc diff --git a/Makefile b/Makefile index 77ce2dd83..e2790a2cc 100644 --- a/Makefile +++ b/Makefile @@ -1897,7 +1897,9 @@ clipping_iterator_test: $(OBJ_DIR)/db/compaction/clipping_iterator_test.o $(TEST ribbon_bench: $(OBJ_DIR)/microbench/ribbon_bench.o $(LIBRARY) $(AM_LINK) - + +cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) #------------------------------------------------- # make install related stuff PREFIX ?= /usr/local diff --git a/TARGETS b/TARGETS index b5b992b6b..ac10bcacb 100644 --- a/TARGETS +++ b/TARGETS @@ -133,6 +133,7 @@ cpp_library( srcs = [ "cache/cache.cc", "cache/cache_entry_roles.cc", + "cache/cache_reservation_manager.cc", "cache/clock_cache.cc", "cache/lru_cache.cc", "cache/sharded_cache.cc", @@ -452,6 +453,7 @@ cpp_library( srcs = [ "cache/cache.cc", "cache/cache_entry_roles.cc", + "cache/cache_reservation_manager.cc", "cache/clock_cache.cc", "cache/lru_cache.cc", "cache/sharded_cache.cc", @@ -1025,6 +1027,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "cache_reservation_manager_test", + "cache/cache_reservation_manager_test.cc", + "parallel", + [], + [], + ], [ "cache_simulator_test", "utilities/simulator_cache/cache_simulator_test.cc", diff --git a/cache/cache_reservation_manager.cc b/cache/cache_reservation_manager.cc new file mode 100644 index 000000000..d6f62d647 --- /dev/null +++ b/cache/cache_reservation_manager.cc @@ -0,0 +1,128 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "cache/cache_reservation_manager.h" + +#include +#include +#include +#include + +#include "cache/cache_entry_roles.h" +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "table/block_based/block_based_table_reader.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { +CacheReservationManager::CacheReservationManager(std::shared_ptr cache, + bool delayed_decrease) + : delayed_decrease_(delayed_decrease), cache_allocated_size_(0) { + assert(cache != nullptr); + cache_ = cache; + std::memset(cache_key_, 0, kCacheKeyPrefixSize + kMaxVarint64Length); + EncodeVarint64(cache_key_, cache_->NewId()); +} + +CacheReservationManager::~CacheReservationManager() { + for (auto* handle : dummy_handles_) { + cache_->Release(handle, true); + } +} + +template +Status CacheReservationManager::UpdateCacheReservation( + std::size_t new_mem_used) { + std::size_t cur_cache_allocated_size = + cache_allocated_size_.load(std::memory_order_relaxed); + if (new_mem_used == cur_cache_allocated_size) { + return Status::OK(); + } else if (new_mem_used > cur_cache_allocated_size) { + Status s = IncreaseCacheReservation(new_mem_used); + return s; + } else { + // In delayed decrease mode, we don't decrease cache reservation + // untill the memory usage is less than 3/4 of what we reserve + // in the cache. + // We do this because + // (1) Dummy entry insertion is expensive in block cache + // (2) Delayed releasing previously inserted dummy entries can save such + // expensive dummy entry insertion on memory increase in the near future, + // which is likely to happen when the memory usage is greater than or equal + // to 3/4 of what we reserve + if (delayed_decrease_ && new_mem_used >= cur_cache_allocated_size / 4 * 3) { + return Status::OK(); + } else { + Status s = DecreaseCacheReservation(new_mem_used); + return s; + } + } +} + +// Explicitly instantiate templates for "CacheEntryRole" values we use. +// This makes it possible to keep the template definitions in the .cc file. +template Status CacheReservationManager::UpdateCacheReservation< + CacheEntryRole::kWriteBuffer>(std::size_t new_mem_used); +// For cache reservation manager unit tests +template Status CacheReservationManager::UpdateCacheReservation< + CacheEntryRole::kMisc>(std::size_t new_mem_used); + +template +Status CacheReservationManager::IncreaseCacheReservation( + std::size_t new_mem_used) { + Status return_status = Status::OK(); + while (new_mem_used > cache_allocated_size_.load(std::memory_order_relaxed)) { + Cache::Handle* handle = nullptr; + return_status = cache_->Insert(GetNextCacheKey(), nullptr, kSizeDummyEntry, + GetNoopDeleterForRole(), &handle); + + if (return_status != Status::OK()) { + return return_status; + } + + dummy_handles_.push_back(handle); + cache_allocated_size_ += kSizeDummyEntry; + } + return return_status; +} + +Status CacheReservationManager::DecreaseCacheReservation( + std::size_t new_mem_used) { + Status return_status = Status::OK(); + + // Decrease to the smallest multiple of kSizeDummyEntry that is greater than + // or equal to new_mem_used We do addition instead of new_mem_used <= + // cache_allocated_size_.load(std::memory_order_relaxed) - kSizeDummyEntry to + // avoid underflow of size_t when cache_allocated_size_ = 0 + while (new_mem_used + kSizeDummyEntry <= + cache_allocated_size_.load(std::memory_order_relaxed)) { + assert(!dummy_handles_.empty()); + auto* handle = dummy_handles_.back(); + cache_->Release(handle, true); + dummy_handles_.pop_back(); + cache_allocated_size_ -= kSizeDummyEntry; + } + return return_status; +} + +std::size_t CacheReservationManager::GetTotalReservedCacheSize() { + return cache_allocated_size_.load(std::memory_order_relaxed); +} + +Slice CacheReservationManager::GetNextCacheKey() { + // Calling this function will have the side-effect of changing the + // underlying cache_key_ that is shared among other keys generated from this + // fucntion. Therefore please make sure the previous keys are saved/copied + // before calling this function. + std::memset(cache_key_ + kCacheKeyPrefixSize, 0, kMaxVarint64Length); + char* end = + EncodeVarint64(cache_key_ + kCacheKeyPrefixSize, next_cache_key_id_++); + return Slice(cache_key_, static_cast(end - cache_key_)); +} +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/cache/cache_reservation_manager.h b/cache/cache_reservation_manager.h new file mode 100644 index 000000000..7c5ccf14b --- /dev/null +++ b/cache/cache_reservation_manager.h @@ -0,0 +1,97 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include +#include +#include +#include + +#include "cache/cache_entry_roles.h" +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "table/block_based/block_based_table_reader.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +// CacheReservationManager is for reserving cache space for the memory used +// through inserting/releasing dummy entries in the cache. +// This class is not thread-safe. +class CacheReservationManager { + public: + // Construct a CacheReservationManager + // @param cache The cache where dummy entries are inserted and released for + // reserving cache space + // @param delayed_decrease If set true, then dummy entries won't be released + // immediately when memory usage decreases. + // Instead, it will be released when the memory usage + // decreases to 3/4 of what we have reserved so far. + // This is for saving some future dummy entry + // insertion when memory usage increases are likely to + // happen in the near future. + explicit CacheReservationManager(std::shared_ptr cache, + bool delayed_decrease = false); + + // no copy constructor, copy assignment, move constructor, move assignment + CacheReservationManager(const CacheReservationManager &) = delete; + CacheReservationManager &operator=(const CacheReservationManager &) = delete; + CacheReservationManager(CacheReservationManager &&) = delete; + CacheReservationManager &operator=(CacheReservationManager &&) = delete; + + ~CacheReservationManager(); + + template + + // Insert and release dummy entries in the cache to + // match the size of total dummy entries with the smallest multiple of + // kSizeDummyEntry that is greater than or equal to new_mem_used + // + // Insert dummy entries if new_memory_used > cache_allocated_size_; + // + // Release dummy entries if new_memory_used < cache_allocated_size_ + // (and new_memory_used < cache_allocated_size_ * 3/4 + // when delayed_decrease is set true); + // + // Keey dummy entries the same if (1) new_memory_used == cache_allocated_size_ + // or (2) new_memory_used is in the interval of + // [cache_allocated_size_ * 3/4, cache_allocated_size) when delayed_decrease + // is set true. + // + // On inserting dummy entries, it returns Status::OK() if all dummy entry + // insertions succeed. Otherwise, it returns the first non-ok status; + // On releasing dummy entries, it always returns Status::OK(). + // On keeping dummy entries the same, it always returns Status::OK(). + Status UpdateCacheReservation(std::size_t new_memory_used); + std::size_t GetTotalReservedCacheSize(); + + private: + static constexpr std::size_t kSizeDummyEntry = 256 * 1024; + // The key will be longer than keys for blocks in SST files so they won't + // conflict. + static const std::size_t kCacheKeyPrefixSize = + BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length; + + Slice GetNextCacheKey(); + template + Status IncreaseCacheReservation(std::size_t new_mem_used); + Status DecreaseCacheReservation(std::size_t new_mem_used); + + std::shared_ptr cache_; + bool delayed_decrease_; + std::atomic cache_allocated_size_; + std::vector dummy_handles_; + std::uint64_t next_cache_key_id_ = 0; + // The non-prefix part will be updated according to the ID to use. + char cache_key_[kCacheKeyPrefixSize + kMaxVarint64Length]; +}; +} // namespace ROCKSDB_NAMESPACE \ No newline at end of file diff --git a/cache/cache_reservation_manager_test.cc b/cache/cache_reservation_manager_test.cc new file mode 100644 index 000000000..be548d3af --- /dev/null +++ b/cache/cache_reservation_manager_test.cc @@ -0,0 +1,411 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "cache/cache_reservation_manager.h" + +#include +#include +#include + +#include "cache/cache_entry_roles.h" +#include "rocksdb/cache.h" +#include "rocksdb/slice.h" +#include "table/block_based/block_based_table_reader.h" +#include "test_util/testharness.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { +class CacheReservationManagerTest : public ::testing::Test { + protected: + static constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024; + static constexpr int kNumShardBits = 0; // 2^0 shard + + static constexpr std::size_t kSizeDummyEntry = 256 * 1024; + static const std::size_t kCacheKeyPrefixSize = + BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length; + static constexpr std::size_t kMetaDataChargeOverhead = 10000; + + std::shared_ptr cache = NewLRUCache(kOneGigabyte, kNumShardBits); + std::unique_ptr test_cache_rev_mng; + + CacheReservationManagerTest() { + test_cache_rev_mng.reset(new CacheReservationManager(cache)); + } +}; + +TEST_F(CacheReservationManagerTest, GenerateCacheKey) { + // The first cache reservation manager owning the cache will have + // cache->NewId() = 1 + constexpr std::size_t kCacheNewId = 1; + // The first key generated inside of cache reservation manager will have + // next_cache_key_id = 0 + constexpr std::size_t kCacheKeyId = 0; + + char expected_cache_key[kCacheKeyPrefixSize + kMaxVarint64Length]; + std::memset(expected_cache_key, 0, kCacheKeyPrefixSize + kMaxVarint64Length); + + EncodeVarint64(expected_cache_key, kCacheNewId); + char* end = + EncodeVarint64(expected_cache_key + kCacheKeyPrefixSize, kCacheKeyId); + Slice expected_cache_key_slice( + expected_cache_key, static_cast(end - expected_cache_key)); + + std::size_t new_mem_used = 1 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead); + + Cache::Handle* handle = cache->Lookup(expected_cache_key_slice); + EXPECT_NE(handle, nullptr) + << "Failed to generate the cache key for the dummy entry correctly"; + // Clean up the returned handle from Lookup() to prevent memory leak + cache->Release(handle); +} + +TEST_F(CacheReservationManagerTest, KeepCacheReservationTheSame) { + std::size_t new_mem_used = 1 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry); + std::size_t initial_pinned_usage = cache->GetPinnedUsage(); + ASSERT_GE(initial_pinned_usage, 1 * kSizeDummyEntry); + ASSERT_LT(initial_pinned_usage, + 1 * kSizeDummyEntry + kMetaDataChargeOverhead); + + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to keep cache reservation the same when new_mem_used equals " + "to current cache reservation"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep correctly when new_mem_used equals to current " + "cache reservation"; + EXPECT_EQ(cache->GetPinnedUsage(), initial_pinned_usage) + << "Failed to keep underlying dummy entries the same when new_mem_used " + "equals to current cache reservation"; +} + +TEST_F(CacheReservationManagerTest, + IncreaseCacheReservationByMultiplesOfDummyEntrySize) { + std::size_t new_mem_used = 2 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to increase cache reservation correctly"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 2 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation increase correctly"; + EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry) + << "Failed to increase underlying dummy entries in cache correctly"; + EXPECT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to increase underlying dummy entries in cache correctly"; +} + +TEST_F(CacheReservationManagerTest, + IncreaseCacheReservationNotByMultiplesOfDummyEntrySize) { + std::size_t new_mem_used = 2 * kSizeDummyEntry + kSizeDummyEntry / 2; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to increase cache reservation correctly"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 3 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation increase correctly"; + EXPECT_GE(cache->GetPinnedUsage(), 3 * kSizeDummyEntry) + << "Failed to increase underlying dummy entries in cache correctly"; + EXPECT_LT(cache->GetPinnedUsage(), + 3 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to increase underlying dummy entries in cache correctly"; +} + +TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest, + IncreaseCacheReservationOnFullCache) { + constexpr std::size_t kOneMegabyte = 1024 * 1024; + constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024; + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + + LRUCacheOptions lo; + lo.capacity = kOneMegabyte; + lo.num_shard_bits = 0; // 2^0 shard + lo.strict_capacity_limit = true; + std::shared_ptr cache = NewLRUCache(lo); + std::unique_ptr test_cache_rev_mng( + new CacheReservationManager(cache)); + + std::size_t new_mem_used = kOneMegabyte + 1; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::Incomplete()) + << "Failed to return status to indicate failure of dummy entry insertion " + "during cache reservation on full cache"; + EXPECT_GE(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep correctly before cache resevation failure happens " + "due to full cache"; + EXPECT_LE(test_cache_rev_mng->GetTotalReservedCacheSize(), kOneMegabyte) + << "Failed to bookkeep correctly (i.e, bookkeep only successful dummy " + "entry insertions) when encountering cache resevation failure due to " + "full cache"; + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry) + << "Failed to insert underlying dummy entries correctly when " + "encountering cache resevation failure due to full cache"; + EXPECT_LE(cache->GetPinnedUsage(), kOneMegabyte) + << "Failed to insert underlying dummy entries correctly when " + "encountering cache resevation failure due to full cache"; + + new_mem_used = kOneMegabyte / 2; // 2 dummy entries + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to decrease cache reservation after encountering cache " + "reservation failure due to full cache"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 2 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation decrease correctly after " + "encountering cache reservation due to full cache"; + EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry) + << "Failed to release underlying dummy entries correctly on cache " + "reservation decrease after encountering cache resevation failure due " + "to full cache"; + EXPECT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to release underlying dummy entries correctly on cache " + "reservation decrease after encountering cache resevation failure due " + "to full cache"; + + // Create cache full again for subsequent tests + new_mem_used = kOneMegabyte + 1; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::Incomplete()) + << "Failed to return status to indicate failure of dummy entry insertion " + "during cache reservation on full cache"; + EXPECT_GE(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep correctly before cache resevation failure happens " + "due to full cache"; + EXPECT_LE(test_cache_rev_mng->GetTotalReservedCacheSize(), kOneMegabyte) + << "Failed to bookkeep correctly (i.e, bookkeep only successful dummy " + "entry insertions) when encountering cache resevation failure due to " + "full cache"; + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry) + << "Failed to insert underlying dummy entries correctly when " + "encountering cache resevation failure due to full cache"; + EXPECT_LE(cache->GetPinnedUsage(), kOneMegabyte) + << "Failed to insert underlying dummy entries correctly when " + "encountering cache resevation failure due to full cache"; + + // Increase cache capacity so the previously failed insertion can fully + // succeed + cache->SetCapacity(kOneGigabyte); + new_mem_used = kOneMegabyte + 1; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to increase cache reservation after increasing cache capacity " + "and mitigating cache full error"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 5 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation increase correctly after " + "increasing cache capacity and mitigating cache full error"; + EXPECT_GE(cache->GetPinnedUsage(), 5 * kSizeDummyEntry) + << "Failed to insert underlying dummy entries correctly after increasing " + "cache capacity and mitigating cache full error"; + EXPECT_LT(cache->GetPinnedUsage(), + 5 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to insert underlying dummy entries correctly after increasing " + "cache capacity and mitigating cache full error"; +} + +TEST_F(CacheReservationManagerTest, + DecreaseCacheReservationByMultiplesOfDummyEntrySize) { + std::size_t new_mem_used = 2 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 2 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead); + + new_mem_used = 1 * kSizeDummyEntry; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to decrease cache reservation correctly"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation decrease correctly"; + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry) + << "Failed to decrease underlying dummy entries in cache correctly"; + EXPECT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to decrease underlying dummy entries in cache correctly"; +} + +TEST_F(CacheReservationManagerTest, + DecreaseCacheReservationNotByMultiplesOfDummyEntrySize) { + std::size_t new_mem_used = 2 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 2 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 2 * kSizeDummyEntry + kMetaDataChargeOverhead); + + new_mem_used = kSizeDummyEntry / 2; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to decrease cache reservation correctly"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 1 * kSizeDummyEntry) + << "Failed to bookkeep cache reservation decrease correctly"; + EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry) + << "Failed to decrease underlying dummy entries in cache correctly"; + EXPECT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to decrease underlying dummy entries in cache correctly"; +} + +TEST(CacheReservationManagerWithDelayedDecreaseTest, + DecreaseCacheReservationWithDelayedDecrease) { + constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024; + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + + LRUCacheOptions lo; + lo.capacity = kOneGigabyte; + lo.num_shard_bits = 0; + std::shared_ptr cache = NewLRUCache(lo); + std::unique_ptr test_cache_rev_mng( + new CacheReservationManager(cache, true /* delayed_decrease */)); + + std::size_t new_mem_used = 8 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 8 * kSizeDummyEntry); + std::size_t initial_pinned_usage = cache->GetPinnedUsage(); + ASSERT_GE(initial_pinned_usage, 8 * kSizeDummyEntry); + ASSERT_LT(initial_pinned_usage, + 8 * kSizeDummyEntry + kMetaDataChargeOverhead); + + new_mem_used = 6 * kSizeDummyEntry; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) << "Failed to delay decreasing cache reservation"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 8 * kSizeDummyEntry) + << "Failed to bookkeep correctly when delaying cache reservation " + "decrease"; + EXPECT_EQ(cache->GetPinnedUsage(), initial_pinned_usage) + << "Failed to delay decreasing underlying dummy entries in cache"; + + new_mem_used = 7 * kSizeDummyEntry; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) << "Failed to delay decreasing cache reservation"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 8 * kSizeDummyEntry) + << "Failed to bookkeep correctly when delaying cache reservation " + "decrease"; + EXPECT_EQ(cache->GetPinnedUsage(), initial_pinned_usage) + << "Failed to delay decreasing underlying dummy entries in cache"; + + new_mem_used = 6 * kSizeDummyEntry - 1; + s = test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + EXPECT_EQ(s, Status::OK()) + << "Failed to decrease cache reservation correctly when new_mem_used < " + "GetTotalReservedCacheSize() * 3 / 4 on delayed decrease mode"; + EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(), + 6 * kSizeDummyEntry) + << "Failed to bookkeep correctly when new_mem_used < " + "GetTotalReservedCacheSize() * 3 / 4 on delayed decrease mode"; + EXPECT_GE(cache->GetPinnedUsage(), 6 * kSizeDummyEntry) + << "Failed to decrease underlying dummy entries in cache when " + "new_mem_used < GetTotalReservedCacheSize() * 3 / 4 on delayed " + "decrease mode"; + EXPECT_LT(cache->GetPinnedUsage(), + 6 * kSizeDummyEntry + kMetaDataChargeOverhead) + << "Failed to decrease underlying dummy entries in cache when " + "new_mem_used < GetTotalReservedCacheSize() * 3 / 4 on delayed " + "decrease mode"; +} + +TEST(CacheReservationManagerDestructorTest, + ReleaseRemainingDummyEntriesOnDestruction) { + constexpr std::size_t kOneGigabyte = 1024 * 1024 * 1024; + constexpr std::size_t kSizeDummyEntry = 256 * 1024; + constexpr std::size_t kMetaDataChargeOverhead = 10000; + + LRUCacheOptions lo; + lo.capacity = kOneGigabyte; + lo.num_shard_bits = 0; + std::shared_ptr cache = NewLRUCache(lo); + { + std::unique_ptr test_cache_rev_mng( + new CacheReservationManager(cache)); + std::size_t new_mem_used = 1 * kSizeDummyEntry; + Status s = + test_cache_rev_mng + ->UpdateCacheReservation( + new_mem_used); + ASSERT_EQ(s, Status::OK()); + ASSERT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 1 * kSizeDummyEntry + kMetaDataChargeOverhead); + } + EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry) + << "Failed to release remaining underlying dummy entries in cache in " + "CacheReservationManager's destructor"; +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 67aef7f8f..add957d84 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -21,6 +21,7 @@ #include "rocksdb/cache.h" namespace ROCKSDB_NAMESPACE { +class CacheReservationManager; // Interface to block and signal DB instances. // Each DB instance contains ptr to StallInterface. @@ -60,7 +61,7 @@ class WriteBufferManager { bool enabled() const { return buffer_size() > 0; } // Returns true if pointer to cache is passed. - bool cost_to_cache() const { return cache_rep_ != nullptr; } + bool cost_to_cache() const { return cache_rev_mng_ != nullptr; } // Returns the total memory used by memtables. // Only valid if enabled() @@ -73,9 +74,7 @@ class WriteBufferManager { return memory_active_.load(std::memory_order_relaxed); } - size_t dummy_entries_in_cache_usage() const { - return dummy_size_.load(std::memory_order_relaxed); - } + size_t dummy_entries_in_cache_usage() const; // Returns the buffer_size. size_t buffer_size() const { @@ -163,9 +162,10 @@ class WriteBufferManager { std::atomic memory_used_; // Memory that hasn't been scheduled to free. std::atomic memory_active_; - std::atomic dummy_size_; - struct CacheRep; - std::unique_ptr cache_rep_; + std::unique_ptr cache_rev_mng_; + // Protects cache_rev_mng_ + std::mutex cache_rev_mng_mu_; + std::list queue_; // Protects the queue_ std::mutex mu_; diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index ecbccb82b..c599b658c 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -10,46 +10,12 @@ #include "rocksdb/write_buffer_manager.h" #include "cache/cache_entry_roles.h" +#include "cache/cache_reservation_manager.h" #include "db/db_impl/db_impl.h" +#include "rocksdb/status.h" #include "util/coding.h" namespace ROCKSDB_NAMESPACE { -#ifndef ROCKSDB_LITE -namespace { -const size_t kSizeDummyEntry = 256 * 1024; -// The key will be longer than keys for blocks in SST files so they won't -// conflict. -const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1; -} // namespace - -struct WriteBufferManager::CacheRep { - std::shared_ptr cache_; - std::mutex cache_mutex_; - std::atomic cache_allocated_size_; - // The non-prefix part will be updated according to the ID to use. - char cache_key_[kCacheKeyPrefix + kMaxVarint64Length]; - uint64_t next_cache_key_id_ = 0; - std::vector dummy_handles_; - - explicit CacheRep(std::shared_ptr cache) - : cache_(cache), cache_allocated_size_(0) { - memset(cache_key_, 0, kCacheKeyPrefix); - size_t pointer_size = sizeof(const void*); - assert(pointer_size <= kCacheKeyPrefix); - memcpy(cache_key_, static_cast(this), pointer_size); - } - - Slice GetNextCacheKey() { - memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length); - char* end = - EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++); - return Slice(cache_key_, static_cast(end - cache_key_)); - } -}; -#else -struct WriteBufferManager::CacheRep {}; -#endif // ROCKSDB_LITE - WriteBufferManager::WriteBufferManager(size_t _buffer_size, std::shared_ptr cache, bool allow_stall) @@ -57,34 +23,34 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size, mutable_limit_(buffer_size_ * 7 / 8), memory_used_(0), memory_active_(0), - dummy_size_(0), - cache_rep_(nullptr), + cache_rev_mng_(nullptr), allow_stall_(allow_stall), stall_active_(false) { #ifndef ROCKSDB_LITE if (cache) { - // Construct the cache key using the pointer to this. - cache_rep_.reset(new CacheRep(cache)); + // Memtable's memory usage tends to fluctuate frequently + // therefore we set delayed_decrease = true to save some dummy entry + // insertion on memory increase right after memory decrease + cache_rev_mng_.reset( + new CacheReservationManager(cache, true /* delayed_decrease */)); } #else (void)cache; #endif // ROCKSDB_LITE } -WriteBufferManager::~WriteBufferManager() { -#ifndef ROCKSDB_LITE - if (cache_rep_) { - for (auto* handle : cache_rep_->dummy_handles_) { - if (handle != nullptr) { - cache_rep_->cache_->Release(handle, true); - } - } +WriteBufferManager::~WriteBufferManager() = default; + +std::size_t WriteBufferManager::dummy_entries_in_cache_usage() const { + if (cache_rev_mng_ != nullptr) { + return cache_rev_mng_->GetTotalReservedCacheSize(); + } else { + return 0; } -#endif // ROCKSDB_LITE } void WriteBufferManager::ReserveMem(size_t mem) { - if (cache_rep_ != nullptr) { + if (cache_rev_mng_ != nullptr) { ReserveMemWithCache(mem); } else if (enabled()) { memory_used_.fetch_add(mem, std::memory_order_relaxed); @@ -97,32 +63,23 @@ void WriteBufferManager::ReserveMem(size_t mem) { // Should only be called from write thread void WriteBufferManager::ReserveMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE - assert(cache_rep_ != nullptr); + assert(cache_rev_mng_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. - std::lock_guard lock(cache_rep_->cache_mutex_); + std::lock_guard lock(cache_rev_mng_mu_); size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); - while (new_mem_used > cache_rep_->cache_allocated_size_) { - // Expand size by at least 256KB. - // Add a dummy record to the cache - Cache::Handle* handle = nullptr; - Status s = cache_rep_->cache_->Insert( - cache_rep_->GetNextCacheKey(), nullptr, kSizeDummyEntry, - GetNoopDeleterForRole(), &handle); - s.PermitUncheckedError(); // TODO: What to do on error? - // We keep the handle even if insertion fails and a null handle is - // returned, so that when memory shrinks, we don't release extra - // entries from cache. - // Ideallly we should prevent this allocation from happening if - // this insertion fails. However, the callers to this code path - // are not able to handle failures properly. We'll need to improve - // it in the future. - cache_rep_->dummy_handles_.push_back(handle); - cache_rep_->cache_allocated_size_ += kSizeDummyEntry; - dummy_size_.fetch_add(kSizeDummyEntry, std::memory_order_relaxed); - } + Status s = + cache_rev_mng_->UpdateCacheReservation( + new_mem_used); + + // We absorb the error since WriteBufferManager is not able to handle + // this failure properly. Ideallly we should prevent this allocation + // from happening if this cache reservation fails. + // [TODO] We'll need to improve it in the future and figure out what to do on + // error + s.PermitUncheckedError(); #else (void)mem; #endif // ROCKSDB_LITE @@ -135,7 +92,7 @@ void WriteBufferManager::ScheduleFreeMem(size_t mem) { } void WriteBufferManager::FreeMem(size_t mem) { - if (cache_rep_ != nullptr) { + if (cache_rev_mng_ != nullptr) { FreeMemWithCache(mem); } else if (enabled()) { memory_used_.fetch_sub(mem, std::memory_order_relaxed); @@ -148,33 +105,21 @@ void WriteBufferManager::FreeMem(size_t mem) { void WriteBufferManager::FreeMemWithCache(size_t mem) { #ifndef ROCKSDB_LITE - assert(cache_rep_ != nullptr); + assert(cache_rev_mng_ != nullptr); // Use a mutex to protect various data structures. Can be optimized to a // lock-free solution if it ends up with a performance bottleneck. - std::lock_guard lock(cache_rep_->cache_mutex_); + std::lock_guard lock(cache_rev_mng_mu_); size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem; memory_used_.store(new_mem_used, std::memory_order_relaxed); - // Gradually shrink memory costed in the block cache if the actual - // usage is less than 3/4 of what we reserve from the block cache. - // We do this because: - // 1. we don't pay the cost of the block cache immediately a memtable is - // freed, as block cache insert is expensive; - // 2. eventually, if we walk away from a temporary memtable size increase, - // we make sure shrink the memory costed in block cache over time. - // In this way, we only shrink costed memory showly even there is enough - // margin. - if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 && - cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) { - assert(!cache_rep_->dummy_handles_.empty()); - auto* handle = cache_rep_->dummy_handles_.back(); - // If insert failed, handle is null so we should not release. - if (handle != nullptr) { - cache_rep_->cache_->Release(handle, true); - } - cache_rep_->dummy_handles_.pop_back(); - cache_rep_->cache_allocated_size_ -= kSizeDummyEntry; - dummy_size_.fetch_sub(kSizeDummyEntry, std::memory_order_relaxed); - } + Status s = + cache_rev_mng_->UpdateCacheReservation( + new_mem_used); + + // We absorb the error since WriteBufferManager is not able to handle + // this failure properly. + // [TODO] We'll need to improve it in the future and figure out what to do on + // error + s.PermitUncheckedError(); #else (void)mem; #endif // ROCKSDB_LITE diff --git a/memtable/write_buffer_manager_test.cc b/memtable/write_buffer_manager_test.cc index 7e3de41d1..709a723e3 100644 --- a/memtable/write_buffer_manager_test.cc +++ b/memtable/write_buffer_manager_test.cc @@ -78,6 +78,8 @@ TEST_F(WriteBufferManagerTest, ShouldFlush) { } TEST_F(WriteBufferManagerTest, CacheCost) { + constexpr std::size_t kMetaDataChargeOverhead = 10000; + LRUCacheOptions co; // 1GB cache co.capacity = 1024 * 1024 * 1024; @@ -88,137 +90,206 @@ TEST_F(WriteBufferManagerTest, CacheCost) { std::unique_ptr wbf( new WriteBufferManager(50 * 1024 * 1024, cache)); - // Allocate 333KB will allocate 512KB + // Allocate 333KB will allocate 512KB, memory_used_ = 333KB wbf->ReserveMem(333 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 2 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 2 * 256 * 1024 + 10000); - // 2 dummy entries are added for size 333 kb. + // 2 dummy entries are added for size 333 KB ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 2 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 2 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 2 * 256 * 1024 + kMetaDataChargeOverhead); - // Allocate another 512KB + // Allocate another 512KB, memory_used_ = 845KB wbf->ReserveMem(512 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + 10000); - // 2 more dummy entries are added for size 512. + // 2 more dummy entries are added for size 512 KB + // since ceil((memory_used_ - dummy_entries_in_cache_usage) % kSizeDummyEntry) + // = 2 ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 4 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead); - // Allocate another 10MB + // Allocate another 10MB, memory_used_ = 11085KB wbf->ReserveMem(10 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 11 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 11 * 1024 * 1024 + 10000); - // 40 more entries are added for size 10 * 1024 * 1024. + // 40 more entries are added for size 10 * 1024 * 1024 KB ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); - // Free 1MB will not cause any change in cache cost - wbf->FreeMem(1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 11 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 11 * 1024 * 1024 + 10000); + // Free 1MB, memory_used_ = 10061KB + // It will not cause any change in cache cost + // since memory_used_ > dummy_entries_in_cache_usage * (3/4) + wbf->FreeMem(1 * 1024 * 1024); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); ASSERT_FALSE(wbf->ShouldFlush()); - // Allocate another 41MB + // Allocate another 41MB, memory_used_ = 52045KB wbf->ReserveMem(41 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 + 10000); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 204 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 204 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), + 204 * 256 * 1024 + kMetaDataChargeOverhead); ASSERT_TRUE(wbf->ShouldFlush()); ASSERT_TRUE(wbf->ShouldFlush()); + // Schedule free 20MB, memory_used_ = 52045KB + // It will not cause any change in memory_used and cache cost wbf->ScheduleFreeMem(20 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 + 10000); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 204 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 204 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), + 204 * 256 * 1024 + kMetaDataChargeOverhead); // Still need flush as the hard limit hits ASSERT_TRUE(wbf->ShouldFlush()); - // Free 20MB will releae 256KB from cache + // Free 20MB, memory_used_ = 31565KB + // It will releae 80 dummy entries from cache since + // since memory_used_ < dummy_entries_in_cache_usage * (3/4) + // and floor((dummy_entries_in_cache_usage - memory_used_) % kSizeDummyEntry) + // = 80 wbf->FreeMem(20 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 203 * kSizeDummyEntry); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 124 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 124 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), + 124 * 256 * 1024 + kMetaDataChargeOverhead); ASSERT_FALSE(wbf->ShouldFlush()); - // Every free will release 256KB if still not hit 3/4 - wbf->FreeMem(16 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 2 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 2 * 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 202 * kSizeDummyEntry); - + // Free 16KB, memory_used_ = 31549KB + // It will not release any dummy entry since memory_used_ >= + // dummy_entries_in_cache_usage * (3/4) wbf->FreeMem(16 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 3 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 3 * 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 201 * kSizeDummyEntry); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 124 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 124 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), + 124 * 256 * 1024 + kMetaDataChargeOverhead); + + // Free 20MB, memory_used_ = 11069KB + // It will releae 80 dummy entries from cache + // since memory_used_ < dummy_entries_in_cache_usage * (3/4) + // and floor((dummy_entries_in_cache_usage - memory_used_) % kSizeDummyEntry) + // = 80 + wbf->FreeMem(20 * 1024 * 1024); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); - // Reserve 512KB will not cause any change in cache cost + // Free 1MB, memory_used_ = 10045KB + // It will not cause any change in cache cost + // since memory_used_ > dummy_entries_in_cache_usage * (3/4) + wbf->FreeMem(1 * 1024 * 1024); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); + + // Reserve 512KB, memory_used_ = 10557KB + // It will not casue any change in cache cost + // since memory_used_ > dummy_entries_in_cache_usage * (3/4) + // which reflects the benefit of saving dummy entry insertion on memory + // reservation after delay decrease wbf->ReserveMem(512 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 3 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 3 * 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 201 * kSizeDummyEntry); - - wbf->FreeMem(16 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 4 * 256 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 - 4 * 256 * 1024 + 10000); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 200 * kSizeDummyEntry); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 44 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 44 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 44 * 256 * 1024 + kMetaDataChargeOverhead); // Destory write buffer manger should free everything wbf.reset(); - ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024); + ASSERT_EQ(cache->GetPinnedUsage(), 0); } TEST_F(WriteBufferManagerTest, NoCapCacheCost) { + constexpr std::size_t kMetaDataChargeOverhead = 10000; // 1GB cache std::shared_ptr cache = NewLRUCache(1024 * 1024 * 1024, 4); // A write buffer manager of size 256MB std::unique_ptr wbf(new WriteBufferManager(0, cache)); - // Allocate 1.5MB will allocate 2MB + + // Allocate 10MB, memory_used_ = 10240KB + // It will allocate 40 dummy entries wbf->ReserveMem(10 * 1024 * 1024); - ASSERT_GE(cache->GetPinnedUsage(), 10 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 10 * 1024 * 1024 + 10000); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 40 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 40 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 40 * 256 * 1024 + kMetaDataChargeOverhead); + ASSERT_FALSE(wbf->ShouldFlush()); + // Free 9MB, memory_used_ = 1024KB + // It will free 36 dummy entries wbf->FreeMem(9 * 1024 * 1024); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 4 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead); + + // Free 160KB gradually, memory_used_ = 864KB + // It will not cause any change + // since memory_used_ > dummy_entries_in_cache_usage * 3/4 for (int i = 0; i < 40; i++) { wbf->FreeMem(4 * 1024); } - ASSERT_GE(cache->GetPinnedUsage(), 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024 + 10000); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 4 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 4 * 256 * 1024); + ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead); } TEST_F(WriteBufferManagerTest, CacheFull) { - // 15MB cache size with strict capacity + constexpr std::size_t kMetaDataChargeOverhead = 20000; + + // 12MB cache size with strict capacity LRUCacheOptions lo; lo.capacity = 12 * 1024 * 1024; lo.num_shard_bits = 0; lo.strict_capacity_limit = true; std::shared_ptr cache = NewLRUCache(lo); std::unique_ptr wbf(new WriteBufferManager(0, cache)); + + // Allocate 10MB, memory_used_ = 10240KB wbf->ReserveMem(10 * 1024 * 1024); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 40 * kSizeDummyEntry); - size_t prev_pinned = cache->GetPinnedUsage(); - ASSERT_GE(prev_pinned, 10 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 40 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 40 * kSizeDummyEntry + kMetaDataChargeOverhead); - // Some insert will fail + // Allocate 10MB, memory_used_ = 20480KB + // Some dummy entry insertion will fail due to full cache wbf->ReserveMem(10 * 1024 * 1024); + ASSERT_GE(cache->GetPinnedUsage(), 40 * kSizeDummyEntry); ASSERT_LE(cache->GetPinnedUsage(), 12 * 1024 * 1024); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 80 * kSizeDummyEntry); + ASSERT_LT(wbf->dummy_entries_in_cache_usage(), 80 * kSizeDummyEntry); - // Increase capacity so next insert will succeed - cache->SetCapacity(30 * 1024 * 1024); + // Free 15MB after encoutering cache full, memory_used_ = 5120KB + wbf->FreeMem(15 * 1024 * 1024); + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 20 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 20 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 20 * kSizeDummyEntry + kMetaDataChargeOverhead); + + // Reserve 15MB, creating cache full again, memory_used_ = 20480KB + wbf->ReserveMem(15 * 1024 * 1024); + ASSERT_LE(cache->GetPinnedUsage(), 12 * 1024 * 1024); + ASSERT_LT(wbf->dummy_entries_in_cache_usage(), 80 * kSizeDummyEntry); + + // Increase capacity so next insert will fully succeed + cache->SetCapacity(40 * 1024 * 1024); + + // Allocate 10MB, memory_used_ = 30720KB wbf->ReserveMem(10 * 1024 * 1024); - ASSERT_GT(cache->GetPinnedUsage(), 20 * 1024 * 1024); ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 120 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 120 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 120 * kSizeDummyEntry + kMetaDataChargeOverhead); // Gradually release 20 MB + // It ended up sequentially releasing 32, 24, 18 dummy entries when + // memory_used_ decreases to 22528KB, 16384KB, 11776KB. + // In total, it releases 74 dummy entries for (int i = 0; i < 40; i++) { wbf->FreeMem(512 * 1024); } - ASSERT_GE(cache->GetPinnedUsage(), 10 * 1024 * 1024); - ASSERT_LT(cache->GetPinnedUsage(), 20 * 1024 * 1024); - ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 95 * kSizeDummyEntry); + + ASSERT_EQ(wbf->dummy_entries_in_cache_usage(), 46 * kSizeDummyEntry); + ASSERT_GE(cache->GetPinnedUsage(), 46 * kSizeDummyEntry); + ASSERT_LT(cache->GetPinnedUsage(), + 46 * kSizeDummyEntry + kMetaDataChargeOverhead); } #endif // ROCKSDB_LITE diff --git a/src.mk b/src.mk index 28d8380ad..2da058495 100644 --- a/src.mk +++ b/src.mk @@ -2,6 +2,7 @@ LIB_SOURCES = \ cache/cache.cc \ cache/cache_entry_roles.cc \ + cache/cache_reservation_manager.cc \ cache/clock_cache.cc \ cache/lru_cache.cc \ cache/sharded_cache.cc \ @@ -384,6 +385,7 @@ BENCH_MAIN_SOURCES = \ TEST_MAIN_SOURCES = \ cache/cache_test.cc \ + cache/cache_reservation_manager_test.cc \ cache/lru_cache_test.cc \ db/blob/blob_counting_iterator_test.cc \ db/blob/blob_file_addition_test.cc \