Charge blob cache usage against the global memory limit (#10321)

Summary:
To help service owners to manage their memory budget effectively, we have been working towards counting all major memory users inside RocksDB towards a single global memory limit (see e.g. https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager#cost-memory-used-in-memtable-to-block-cache). The global limit is specified by the capacity of the block-based table's block cache, and is technically implemented by inserting dummy entries ("reservations") into the block cache. The goal of this task is to support charging the memory usage of the new blob cache against this global memory limit when the backing cache of the blob cache and the block cache are different.

This PR is a part of https://github.com/facebook/rocksdb/issues/10156

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10321

Reviewed By: ltamasi

Differential Revision: D37913590

Pulled By: gangliao

fbshipit-source-id: eaacf23907f82dc7d18964a3f24d7039a2937a72
main
Gang Liao 2 years ago committed by Facebook GitHub Bot
parent 18a61a1734
commit 0b6bc101ba
  1. 1
      CMakeLists.txt
  2. 8
      HISTORY.md
  3. 2
      TARGETS
  4. 2
      cache/cache_entry_roles.cc
  5. 1
      cache/cache_reservation_manager.cc
  6. 117
      cache/charged_cache.cc
  7. 121
      cache/charged_cache.h
  8. 4
      cache/lru_cache.h
  9. 4
      cache/lru_cache_test.cc
  10. 2
      cache/sharded_cache.h
  11. 15
      db/blob/blob_source.cc
  12. 2
      db/blob/blob_source.h
  13. 293
      db/blob/blob_source_test.cc
  14. 1
      db_stress_tool/db_stress_common.h
  15. 13
      db_stress_tool/db_stress_gflags.cc
  16. 5
      db_stress_tool/db_stress_test_base.cc
  17. 8
      include/rocksdb/cache.h
  18. 1
      src.mk
  19. 39
      table/block_based/block_based_table_factory.cc
  20. 96
      tools/db_bench_tool.cc

@ -598,6 +598,7 @@ set(SOURCES
cache/cache_entry_roles.cc
cache/cache_key.cc
cache/cache_reservation_manager.cc
cache/charged_cache.cc
cache/clock_cache.cc
cache/compressed_secondary_cache.cc
cache/fast_lru_cache.cc

@ -1,5 +1,9 @@
# Rocksdb Change Log
## Unreleased
* Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies.
* Support using secondary cache with the blob cache. When creating a blob cache, the user can set a secondary blob cache by configuring `secondary_cache` in LRUCacheOptions.
* Charge memory usage of blob cache when the backing cache of the blob cache and the block cache are different. If an operation reserving memory for blob cache exceeds the avaible space left in the block cache at some point (i.e, causing a cache full under `LRUCacheOptions::strict_capacity_limit` = true), creation will fail with `Status::MemoryLimit()`. To opt in this feature, enable charging `CacheEntryRole::kBlobCache` in `BlockBasedTableOptions::cache_usage_options`.
### Public API changes
* Removed Customizable support for RateLimiter and removed its CreateFromString() and Type() functions.
@ -14,9 +18,7 @@
* Added support for blob caching in order to cache frequently used blobs for BlobDB.
* User can configure the new ColumnFamilyOptions `blob_cache` to enable/disable blob caching.
* Either sharing the backend cache with the block cache or using a completely separate cache is supported.
* A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`).
* Added `prepopulate_blob_cache` to ColumnFamilyOptions. If enabled, prepopulate warm/hot blobs which are already in memory into blob cache at the time of flush. On a flush, the blob that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this blob back into memory again, which is avoided by enabling this option. This further helps if the workload exhibits high temporal locality, where most of the reads go to recently written data. This also helps in case of the remote file system since it involves network traffic and higher latencies.
* Support using secondary cache with the blob cache. When creating a blob cache, the user can set a secondary blob cache by configuring `secondary_cache` in LRUCacheOptions.
* A new abstraction interface called `BlobSource` for blob read logic gives all users access to blobs, whether they are in the blob cache, secondary cache, or (remote) storage. Blobs can be potentially read both while handling user reads (`Get`, `MultiGet`, or iterator) and during compaction (while dealing with compaction filters, Merges, or garbage collection) but eventually all blob reads go through `Version::GetBlob` or, for MultiGet, `Version::MultiGetBlob` (and then get dispatched to the interface -- `BlobSource`).
* Add experimental tiered compaction feature `AdvancedColumnFamilyOptions::preclude_last_level_data_seconds`, which makes sure the new data inserted within preclude_last_level_data_seconds won't be placed on cold tier (the feature is not complete).
### Public API changes

@ -13,6 +13,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"cache/cache_entry_roles.cc",
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/charged_cache.cc",
"cache/clock_cache.cc",
"cache/compressed_secondary_cache.cc",
"cache/fast_lru_cache.cc",
@ -348,6 +349,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"cache/cache_entry_roles.cc",
"cache/cache_key.cc",
"cache/cache_reservation_manager.cc",
"cache/charged_cache.cc",
"cache/clock_cache.cc",
"cache/compressed_secondary_cache.cc",
"cache/fast_lru_cache.cc",

@ -23,6 +23,7 @@ std::array<std::string, kNumCacheEntryRoles> kCacheEntryRoleToCamelString{{
"FilterConstruction",
"BlockBasedTableReader",
"FileMetadata",
"BlobCache",
"Misc",
}};
@ -38,6 +39,7 @@ std::array<std::string, kNumCacheEntryRoles> kCacheEntryRoleToHyphenString{{
"filter-construction",
"block-based-table-reader",
"file-metadata",
"blob-cache",
"misc",
}};

@ -181,4 +181,5 @@ template class CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>;
template class CacheReservationManagerImpl<CacheEntryRole::kMisc>;
template class CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>;
template class CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>;
template class CacheReservationManagerImpl<CacheEntryRole::kBlobCache>;
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,117 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "cache/charged_cache.h"
#include "cache/cache_reservation_manager.h"
namespace ROCKSDB_NAMESPACE {
ChargedCache::ChargedCache(std::shared_ptr<Cache> cache,
std::shared_ptr<Cache> block_cache)
: cache_(cache),
cache_res_mgr_(std::make_shared<ConcurrentCacheReservationManager>(
std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kBlobCache>>(
block_cache))) {}
Status ChargedCache::Insert(const Slice& key, void* value, size_t charge,
DeleterFn deleter, Handle** handle,
Priority priority) {
Status s = cache_->Insert(key, value, charge, deleter, handle, priority);
if (s.ok()) {
// Insert may cause the cache entry eviction if the cache is full. So we
// directly call the reservation manager to update the total memory used
// in the cache.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}
return s;
}
Status ChargedCache::Insert(const Slice& key, void* value,
const CacheItemHelper* helper, size_t charge,
Handle** handle, Priority priority) {
Status s = cache_->Insert(key, value, helper, charge, handle, priority);
if (s.ok()) {
// Insert may cause the cache entry eviction if the cache is full. So we
// directly call the reservation manager to update the total memory used
// in the cache.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}
return s;
}
Cache::Handle* ChargedCache::Lookup(const Slice& key, Statistics* stats) {
return cache_->Lookup(key, stats);
}
Cache::Handle* ChargedCache::Lookup(const Slice& key,
const CacheItemHelper* helper,
const CreateCallback& create_cb,
Priority priority, bool wait,
Statistics* stats) {
auto handle = cache_->Lookup(key, helper, create_cb, priority, wait, stats);
// Lookup may promote the KV pair from the secondary cache to the primary
// cache. So we directly call the reservation manager to update the total
// memory used in the cache.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
return handle;
}
bool ChargedCache::Release(Cache::Handle* handle, bool useful,
bool erase_if_last_ref) {
size_t memory_used_delta = cache_->GetUsage(handle);
bool erased = cache_->Release(handle, useful, erase_if_last_ref);
if (erased) {
assert(cache_res_mgr_);
cache_res_mgr_
->UpdateCacheReservation(memory_used_delta, /* increase */ false)
.PermitUncheckedError();
}
return erased;
}
bool ChargedCache::Release(Cache::Handle* handle, bool erase_if_last_ref) {
size_t memory_used_delta = cache_->GetUsage(handle);
bool erased = cache_->Release(handle, erase_if_last_ref);
if (erased) {
assert(cache_res_mgr_);
cache_res_mgr_
->UpdateCacheReservation(memory_used_delta, /* increase */ false)
.PermitUncheckedError();
}
return erased;
}
void ChargedCache::Erase(const Slice& key) {
cache_->Erase(key);
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}
void ChargedCache::EraseUnRefEntries() {
cache_->EraseUnRefEntries();
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}
void ChargedCache::SetCapacity(size_t capacity) {
cache_->SetCapacity(capacity);
// SetCapacity can result in evictions when the cache capacity is decreased,
// so we would want to update the cache reservation here as well.
assert(cache_res_mgr_);
cache_res_mgr_->UpdateCacheReservation(cache_->GetUsage())
.PermitUncheckedError();
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,121 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <string>
#include "port/port.h"
#include "rocksdb/cache.h"
namespace ROCKSDB_NAMESPACE {
class ConcurrentCacheReservationManager;
// A cache interface which wraps around another cache and takes care of
// reserving space in block cache towards a single global memory limit, and
// forwards all the calls to the underlying cache.
class ChargedCache : public Cache {
public:
ChargedCache(std::shared_ptr<Cache> cache,
std::shared_ptr<Cache> block_cache);
~ChargedCache() override = default;
Status Insert(const Slice& key, void* value, size_t charge, DeleterFn deleter,
Handle** handle, Priority priority) override;
Status Insert(const Slice& key, void* value, const CacheItemHelper* helper,
size_t charge, Handle** handle = nullptr,
Priority priority = Priority::LOW) override;
Cache::Handle* Lookup(const Slice& key, Statistics* stats) override;
Cache::Handle* Lookup(const Slice& key, const CacheItemHelper* helper,
const CreateCallback& create_cb, Priority priority,
bool wait, Statistics* stats = nullptr) override;
bool Release(Cache::Handle* handle, bool useful,
bool erase_if_last_ref = false) override;
bool Release(Cache::Handle* handle, bool erase_if_last_ref = false) override;
void Erase(const Slice& key) override;
void EraseUnRefEntries() override;
static const char* kClassName() { return "ChargedCache"; }
const char* Name() const override { return kClassName(); }
uint64_t NewId() override { return cache_->NewId(); }
void SetCapacity(size_t capacity) override;
void SetStrictCapacityLimit(bool strict_capacity_limit) override {
cache_->SetStrictCapacityLimit(strict_capacity_limit);
}
bool HasStrictCapacityLimit() const override {
return cache_->HasStrictCapacityLimit();
}
void* Value(Cache::Handle* handle) override { return cache_->Value(handle); }
bool IsReady(Cache::Handle* handle) override {
return cache_->IsReady(handle);
}
void Wait(Cache::Handle* handle) override { cache_->Wait(handle); }
void WaitAll(std::vector<Handle*>& handles) override {
cache_->WaitAll(handles);
}
bool Ref(Cache::Handle* handle) override { return cache_->Ref(handle); }
size_t GetCapacity() const override { return cache_->GetCapacity(); }
size_t GetUsage() const override { return cache_->GetUsage(); }
size_t GetUsage(Cache::Handle* handle) const override {
return cache_->GetUsage(handle);
}
size_t GetPinnedUsage() const override { return cache_->GetPinnedUsage(); }
size_t GetCharge(Cache::Handle* handle) const override {
return cache_->GetCharge(handle);
}
Cache::DeleterFn GetDeleter(Cache::Handle* handle) const override {
return cache_->GetDeleter(handle);
}
void ApplyToAllEntries(
const std::function<void(const Slice& key, void* value, size_t charge,
Cache::DeleterFn deleter)>& callback,
const Cache::ApplyToAllEntriesOptions& opts) override {
cache_->ApplyToAllEntries(callback, opts);
}
void ApplyToAllCacheEntries(void (*callback)(void* value, size_t charge),
bool thread_safe) override {
cache_->ApplyToAllCacheEntries(callback, thread_safe);
}
std::string GetPrintableOptions() const override {
return cache_->GetPrintableOptions();
}
void DisownData() override { return cache_->DisownData(); }
inline Cache* GetCache() const { return cache_.get(); }
inline ConcurrentCacheReservationManager* TEST_GetCacheReservationManager()
const {
return cache_res_mgr_.get();
}
private:
std::shared_ptr<Cache> cache_;
std::shared_ptr<ConcurrentCacheReservationManager> cache_res_mgr_;
};
} // namespace ROCKSDB_NAMESPACE

4
cache/lru_cache.h vendored

@ -484,9 +484,9 @@ class LRUCache
virtual void WaitAll(std::vector<Handle*>& handles) override;
std::string GetPrintableOptions() const override;
// Retrieves number of elements in LRU, for unit test purpose only.
// Retrieves number of elements in LRU, for unit test purpose only.
size_t TEST_GetLRUSize();
// Retrieves high pri pool ratio.
// Retrieves high pri pool ratio.
double GetHighPriPoolRatio();
private:

@ -1480,10 +1480,10 @@ class LRUCacheWithStat : public LRUCache {
return LRUCache::Insert(key, value, charge, deleter, handle, priority);
}
Status Insert(const Slice& key, void* value, const CacheItemHelper* helper,
size_t chargge, Handle** handle = nullptr,
size_t charge, Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
insert_count_++;
return LRUCache::Insert(key, value, helper, chargge, handle, priority);
return LRUCache::Insert(key, value, helper, charge, handle, priority);
}
Handle* Lookup(const Slice& key, Statistics* stats) override {
lookup_count_++;

@ -86,7 +86,7 @@ class ShardedCache : public Cache {
DeleterFn deleter, Handle** handle,
Priority priority) override;
virtual Status Insert(const Slice& key, void* value,
const CacheItemHelper* helper, size_t chargge,
const CacheItemHelper* helper, size_t charge,
Handle** handle = nullptr,
Priority priority = Priority::LOW) override;
virtual Handle* Lookup(const Slice& key, Statistics* stats) override;

@ -8,6 +8,8 @@
#include <cassert>
#include <string>
#include "cache/cache_reservation_manager.h"
#include "cache/charged_cache.h"
#include "db/blob/blob_file_reader.h"
#include "db/blob/blob_log_format.h"
#include "monitoring/statistics.h"
@ -26,7 +28,18 @@ BlobSource::BlobSource(const ImmutableOptions* immutable_options,
statistics_(immutable_options->statistics.get()),
blob_file_cache_(blob_file_cache),
blob_cache_(immutable_options->blob_cache),
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {}
lowest_used_cache_tier_(immutable_options->lowest_used_cache_tier) {
#ifndef ROCKSDB_LITE
auto bbto =
immutable_options->table_factory->GetOptions<BlockBasedTableOptions>();
if (bbto &&
bbto->cache_usage_options.options_overrides.at(CacheEntryRole::kBlobCache)
.charged == CacheEntryRoleOptions::Decision::kEnabled) {
blob_cache_ = std::make_shared<ChargedCache>(immutable_options->blob_cache,
bbto->block_cache);
}
#endif // ROCKSDB_LITE
}
BlobSource::~BlobSource() = default;

@ -99,6 +99,8 @@ class BlobSource {
blob_file_reader);
}
inline Cache* GetBlobCache() const { return blob_cache_.get(); }
bool TEST_BlobInCache(uint64_t file_number, uint64_t file_size,
uint64_t offset) const;

@ -11,6 +11,7 @@
#include <memory>
#include <string>
#include "cache/charged_cache.h"
#include "cache/compressed_secondary_cache.h"
#include "db/blob/blob_file_cache.h"
#include "db/blob/blob_file_reader.h"
@ -183,9 +184,10 @@ TEST_F(BlobSourceTest, GetBlobsFromCache) {
FileOptions file_options;
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache(
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));
std::unique_ptr<BlobFileCache> blob_file_cache =
std::make_unique<BlobFileCache>(
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
@ -481,9 +483,10 @@ TEST_F(BlobSourceTest, GetCompressedBlobs) {
auto backing_cache = NewLRUCache(capacity); // Blob file cache
FileOptions file_options;
std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache(
backing_cache.get(), &immutable_options, &file_options, column_family_id,
nullptr /*HistogramImpl*/, nullptr /*IOTracer*/));
std::unique_ptr<BlobFileCache> blob_file_cache =
std::make_unique<BlobFileCache>(
backing_cache.get(), &immutable_options, &file_options,
column_family_id, nullptr /*HistogramImpl*/, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
@ -625,9 +628,10 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) {
FileOptions file_options;
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache(
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));
std::unique_ptr<BlobFileCache> blob_file_cache =
std::make_unique<BlobFileCache>(
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
@ -807,9 +811,10 @@ TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
FileOptions file_options;
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache(
backing_cache.get(), &immutable_options, &file_options, column_family_id,
blob_file_read_hist, nullptr /*IOTracer*/));
std::unique_ptr<BlobFileCache> blob_file_cache =
std::make_unique<BlobFileCache>(
backing_cache.get(), &immutable_options, &file_options,
column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
@ -1258,6 +1263,270 @@ TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
}
}
class BlobSourceCacheReservationTest : public DBTestBase {
public:
explicit BlobSourceCacheReservationTest()
: DBTestBase("blob_source_cache_reservation_test",
/*env_do_fsync=*/true) {
options_.env = env_;
options_.enable_blob_files = true;
options_.create_if_missing = true;
LRUCacheOptions co;
co.capacity = kCacheCapacity;
co.num_shard_bits = kNumShardBits;
co.metadata_charge_policy = kDontChargeCacheMetadata;
std::shared_ptr<Cache> blob_cache = NewLRUCache(co);
std::shared_ptr<Cache> block_cache = NewLRUCache(co);
options_.blob_cache = blob_cache;
options_.lowest_used_cache_tier = CacheTier::kVolatileTier;
BlockBasedTableOptions block_based_options;
block_based_options.no_block_cache = false;
block_based_options.block_cache = block_cache;
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlobCache,
{/* charged = */ CacheEntryRoleOptions::Decision::kEnabled}});
options_.table_factory.reset(
NewBlockBasedTableFactory(block_based_options));
assert(db_->GetDbIdentity(db_id_).ok());
assert(db_->GetDbSessionId(db_session_id_).ok());
}
void GenerateKeysAndBlobs() {
for (size_t i = 0; i < kNumBlobs; ++i) {
key_strs_.push_back("key" + std::to_string(i));
blob_strs_.push_back("blob" + std::to_string(i));
}
blob_file_size_ = BlobLogHeader::kSize;
for (size_t i = 0; i < kNumBlobs; ++i) {
keys_.push_back({key_strs_[i]});
blobs_.push_back({blob_strs_[i]});
blob_file_size_ +=
BlobLogRecord::kHeaderSize + keys_[i].size() + blobs_[i].size();
}
blob_file_size_ += BlobLogFooter::kSize;
}
static constexpr std::size_t kSizeDummyEntry = CacheReservationManagerImpl<
CacheEntryRole::kBlobCache>::GetDummyEntrySize();
static constexpr std::size_t kCacheCapacity = 1 * kSizeDummyEntry;
static constexpr int kNumShardBits = 0; // 2^0 shard
static constexpr uint32_t kColumnFamilyId = 1;
static constexpr bool kHasTTL = false;
static constexpr uint64_t kBlobFileNumber = 1;
static constexpr size_t kNumBlobs = 16;
std::vector<Slice> keys_;
std::vector<Slice> blobs_;
std::vector<std::string> key_strs_;
std::vector<std::string> blob_strs_;
uint64_t blob_file_size_;
Options options_;
std::string db_id_;
std::string db_session_id_;
};
#ifndef ROCKSDB_LITE
TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
options_.cf_paths.emplace_back(
test::PerThreadDBPath(
env_, "BlobSourceCacheReservationTest_SimpleCacheReservation"),
0);
GenerateKeysAndBlobs();
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
constexpr ExpirationRange expiration_range;
std::vector<uint64_t> blob_offsets(keys_.size());
std::vector<uint64_t> blob_sizes(keys_.size());
WriteBlobFile(immutable_options, kColumnFamilyId, kHasTTL, expiration_range,
expiration_range, kBlobFileNumber, keys_, blobs_,
kNoCompression, blob_offsets, blob_sizes);
constexpr size_t capacity = 10;
std::shared_ptr<Cache> backing_cache = NewLRUCache(capacity);
FileOptions file_options;
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileCache> blob_file_cache =
std::make_unique<BlobFileCache>(
backing_cache.get(), &immutable_options, &file_options,
kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
ConcurrentCacheReservationManager* cache_res_mgr =
static_cast<ChargedCache*>(blob_source.GetBlobCache())
->TEST_GetCacheReservationManager();
ASSERT_NE(cache_res_mgr, nullptr);
ReadOptions read_options;
read_options.verify_checksums = true;
std::vector<PinnableSlice> values(keys_.size());
{
read_options.fill_cache = false;
for (size_t i = 0; i < kNumBlobs; ++i) {
ASSERT_OK(blob_source.GetBlob(
read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
blob_file_size_, blob_sizes[i], kNoCompression,
nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0);
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), 0);
}
}
{
read_options.fill_cache = true;
// num_blobs is 16, so the total blob cache usage is less than a single
// dummy entry. Therefore, cache reservation manager only reserves one dummy
// entry here.
uint64_t blob_bytes = 0;
for (size_t i = 0; i < kNumBlobs; ++i) {
ASSERT_OK(blob_source.GetBlob(
read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
blob_file_size_, blob_sizes[i], kNoCompression,
nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
blob_bytes += blob_sizes[i];
ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry);
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes);
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(),
options_.blob_cache->GetUsage());
}
}
{
OffsetableCacheKey base_cache_key(db_id_, db_session_id_, kBlobFileNumber,
blob_file_size_);
size_t blob_bytes = options_.blob_cache->GetUsage();
for (size_t i = 0; i < kNumBlobs; ++i) {
CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[i]);
// We didn't call options_.blob_cache->Erase() here, this is because
// the cache wrapper's Erase() method must be called to update the
// cache usage after erasing the cache entry.
blob_source.GetBlobCache()->Erase(cache_key.AsSlice());
if (i == kNumBlobs - 1) {
// The last blob is not in the cache. cache_res_mgr should not reserve
// any space for it.
ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0);
} else {
ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry);
}
blob_bytes -= blob_sizes[i];
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes);
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(),
options_.blob_cache->GetUsage());
}
}
}
TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservationOnFullCache) {
options_.cf_paths.emplace_back(
test::PerThreadDBPath(
env_,
"BlobSourceCacheReservationTest_IncreaseCacheReservationOnFullCache"),
0);
GenerateKeysAndBlobs();
DestroyAndReopen(options_);
ImmutableOptions immutable_options(options_);
constexpr size_t blob_size = kSizeDummyEntry / (kNumBlobs / 2);
for (size_t i = 0; i < kNumBlobs; ++i) {
blob_file_size_ -= blobs_[i].size(); // old blob size
blob_strs_[i].resize(blob_size, '@');
blobs_[i] = Slice(blob_strs_[i]);
blob_file_size_ += blobs_[i].size(); // new blob size
}
std::vector<uint64_t> blob_offsets(keys_.size());
std::vector<uint64_t> blob_sizes(keys_.size());
constexpr ExpirationRange expiration_range;
WriteBlobFile(immutable_options, kColumnFamilyId, kHasTTL, expiration_range,
expiration_range, kBlobFileNumber, keys_, blobs_,
kNoCompression, blob_offsets, blob_sizes);
constexpr size_t capacity = 10;
std::shared_ptr<Cache> backing_cache = NewLRUCache(capacity);
FileOptions file_options;
constexpr HistogramImpl* blob_file_read_hist = nullptr;
std::unique_ptr<BlobFileCache> blob_file_cache =
std::make_unique<BlobFileCache>(
backing_cache.get(), &immutable_options, &file_options,
kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/);
BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
blob_file_cache.get());
ConcurrentCacheReservationManager* cache_res_mgr =
static_cast<ChargedCache*>(blob_source.GetBlobCache())
->TEST_GetCacheReservationManager();
ASSERT_NE(cache_res_mgr, nullptr);
ReadOptions read_options;
read_options.verify_checksums = true;
std::vector<PinnableSlice> values(keys_.size());
{
read_options.fill_cache = false;
for (size_t i = 0; i < kNumBlobs; ++i) {
ASSERT_OK(blob_source.GetBlob(
read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
blob_file_size_, blob_sizes[i], kNoCompression,
nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0);
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), 0);
}
}
{
read_options.fill_cache = true;
// Since we resized each blob to be kSizeDummyEntry / (num_blobs/ 2), we
// should observe cache eviction for the second half blobs.
uint64_t blob_bytes = 0;
for (size_t i = 0; i < kNumBlobs; ++i) {
ASSERT_OK(blob_source.GetBlob(
read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
blob_file_size_, blob_sizes[i], kNoCompression,
nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
blob_bytes += blob_sizes[i];
ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry);
if (i >= kNumBlobs / 2) {
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), kSizeDummyEntry);
} else {
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes);
}
ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(),
options_.blob_cache->GetUsage());
}
}
}
#endif // ROCKSDB_LITE
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -141,6 +141,7 @@ DECLARE_bool(charge_compression_dictionary_building_buffer);
DECLARE_bool(charge_filter_construction);
DECLARE_bool(charge_table_reader);
DECLARE_bool(charge_file_metadata);
DECLARE_bool(charge_blob_cache);
DECLARE_int32(top_level_index_pinning);
DECLARE_int32(partition_pinning);
DECLARE_int32(unpartitioned_pinning);

@ -316,24 +316,29 @@ DEFINE_bool(cache_index_and_filter_blocks, false,
DEFINE_bool(charge_compression_dictionary_building_buffer, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRoleOptions::charged of "
"CacheEntryRole::kCompressionDictionaryBuildingBuffer");
DEFINE_bool(charge_filter_construction, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRoleOptions::charged of "
"CacheEntryRole::kFilterConstruction");
DEFINE_bool(charge_table_reader, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRoleOptions::charged of "
"CacheEntryRole::kBlockBasedTableReader");
DEFINE_bool(charge_file_metadata, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRoleOptions::charged of "
"kFileMetadata");
DEFINE_bool(charge_blob_cache, false,
"Setting for "
"CacheEntryRoleOptions::charged of "
"kBlobCache");
DEFINE_int32(
top_level_index_pinning,
static_cast<int32_t>(ROCKSDB_NAMESPACE::PinningTier::kFallback),

@ -2907,6 +2907,11 @@ void InitializeOptionsFromFlags(
{/*.charged = */ FLAGS_charge_file_metadata
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlobCache,
{/*.charged = */ FLAGS_charge_blob_cache
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.format_version =
static_cast<uint32_t>(FLAGS_format_version);
block_based_options.index_block_restart_interval =

@ -571,12 +571,12 @@ enum class CacheEntryRole {
// Filter's charge to account for
// (new) bloom and ribbon filter construction's memory usage
kFilterConstruction,
// BlockBasedTableReader's charge to account for
// its memory usage
// BlockBasedTableReader's charge to account for its memory usage
kBlockBasedTableReader,
// FileMetadata's charge to account for
// its memory usage
// FileMetadata's charge to account for its memory usage
kFileMetadata,
// Blob cache's charge to account for its memory usage
kBlobCache,
// Default bucket, for miscellaneous cache entries. Do not use for
// entries that could potentially add up to large usage.
kMisc,

@ -4,6 +4,7 @@ LIB_SOURCES = \
cache/cache_entry_roles.cc \
cache/cache_key.cc \
cache/cache_reservation_manager.cc \
cache/charged_cache.cc \
cache/clock_cache.cc \
cache/fast_lru_cache.cc \
cache/lru_cache.cc \

@ -695,12 +695,13 @@ Status BlockBasedTableFactory::ValidateOptions(
static const std::set<CacheEntryRole> kMemoryChargingSupported = {
CacheEntryRole::kCompressionDictionaryBuildingBuffer,
CacheEntryRole::kFilterConstruction,
CacheEntryRole::kBlockBasedTableReader, CacheEntryRole::kFileMetadata};
CacheEntryRole::kBlockBasedTableReader, CacheEntryRole::kFileMetadata,
CacheEntryRole::kBlobCache};
if (options.charged != CacheEntryRoleOptions::Decision::kFallback &&
kMemoryChargingSupported.count(role) == 0) {
return Status::NotSupported(
"Enable/Disable CacheEntryRoleOptions::charged"
"for CacheEntryRole " +
" for CacheEntryRole " +
kCacheEntryRoleToCamelString[static_cast<uint32_t>(role)] +
" is not supported");
}
@ -708,10 +709,42 @@ Status BlockBasedTableFactory::ValidateOptions(
options.charged == CacheEntryRoleOptions::Decision::kEnabled) {
return Status::InvalidArgument(
"Enable CacheEntryRoleOptions::charged"
"for CacheEntryRole " +
" for CacheEntryRole " +
kCacheEntryRoleToCamelString[static_cast<uint32_t>(role)] +
" but block cache is disabled");
}
if (role == CacheEntryRole::kBlobCache &&
options.charged == CacheEntryRoleOptions::Decision::kEnabled) {
if (cf_opts.blob_cache == nullptr) {
return Status::InvalidArgument(
"Enable CacheEntryRoleOptions::charged"
" for CacheEntryRole " +
kCacheEntryRoleToCamelString[static_cast<uint32_t>(role)] +
" but blob cache is not configured");
}
if (table_options_.no_block_cache) {
return Status::InvalidArgument(
"Enable CacheEntryRoleOptions::charged"
" for CacheEntryRole " +
kCacheEntryRoleToCamelString[static_cast<uint32_t>(role)] +
" but block cache is disabled");
}
if (table_options_.block_cache == cf_opts.blob_cache) {
return Status::InvalidArgument(
"Enable CacheEntryRoleOptions::charged"
" for CacheEntryRole " +
kCacheEntryRoleToCamelString[static_cast<uint32_t>(role)] +
" but blob cache is the same as block cache");
}
if (cf_opts.blob_cache->GetCapacity() >
table_options_.block_cache->GetCapacity()) {
return Status::InvalidArgument(
"Enable CacheEntryRoleOptions::charged"
" for CacheEntryRole " +
kCacheEntryRoleToCamelString[static_cast<uint32_t>(role)] +
" but blob cache capacity is larger than block cache capacity");
}
}
}
{
Status s = CheckCacheOptionCompatibility(table_options_);

@ -1170,24 +1170,29 @@ DEFINE_bool(async_io, false,
DEFINE_bool(charge_compression_dictionary_building_buffer, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRoleOptions::charged of "
"CacheEntryRole::kCompressionDictionaryBuildingBuffer");
DEFINE_bool(charge_filter_construction, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRoleOptions::charged of "
"CacheEntryRole::kFilterConstruction");
DEFINE_bool(charge_table_reader, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRoleOptions::charged of "
"CacheEntryRole::kBlockBasedTableReader");
DEFINE_bool(charge_file_metadata, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRoleOptions::charged of "
"CacheEntryRole::kFileMetadata");
DEFINE_bool(charge_blob_cache, false,
"Setting for "
"CacheEntryRoleOptions::charged of "
"CacheEntryRole::kBlobCache");
DEFINE_uint64(backup_rate_limit, 0ull,
"If non-zero, db_bench will rate limit reads and writes for DB "
"backup. This "
@ -4294,6 +4299,11 @@ class Benchmark {
{/*.charged = */ FLAGS_charge_file_metadata
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlobCache,
{/*.charged = */ FLAGS_charge_blob_cache
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.block_size = FLAGS_block_size;
block_based_options.block_restart_interval = FLAGS_block_restart_interval;
@ -4369,6 +4379,46 @@ class Benchmark {
#endif
}
if (FLAGS_use_blob_cache) {
if (FLAGS_use_shared_block_and_blob_cache) {
options.blob_cache = cache_;
} else {
if (FLAGS_blob_cache_size > 0) {
LRUCacheOptions co;
co.capacity = FLAGS_blob_cache_size;
co.num_shard_bits = FLAGS_blob_cache_numshardbits;
options.blob_cache = NewLRUCache(co);
} else {
fprintf(
stderr,
"Unable to create a standalone blob cache if blob_cache_size "
"<= 0.\n");
exit(1);
}
}
switch (FLAGS_prepopulate_blob_cache) {
case 0:
options.prepopulate_blob_cache = PrepopulateBlobCache::kDisable;
break;
case 1:
options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly;
break;
default:
fprintf(stderr, "Unknown prepopulate blob cache mode\n");
exit(1);
}
fprintf(stdout,
"Integrated BlobDB: blob cache enabled, block and blob caches "
"shared: %d, blob cache size %" PRIu64
", blob cache num shard bits: %d, hot/warm blobs prepopulated: "
"%d\n",
FLAGS_use_shared_block_and_blob_cache, FLAGS_blob_cache_size,
FLAGS_blob_cache_numshardbits, FLAGS_prepopulate_blob_cache);
} else {
fprintf(stdout, "Integrated BlobDB: blob cache disabled\n");
}
options.table_factory.reset(
NewBlockBasedTableFactory(block_based_options));
}
@ -4512,44 +4562,6 @@ class Benchmark {
FLAGS_blob_compaction_readahead_size;
options.blob_file_starting_level = FLAGS_blob_file_starting_level;
if (FLAGS_use_blob_cache) {
if (FLAGS_use_shared_block_and_blob_cache) {
options.blob_cache = cache_;
} else {
if (FLAGS_blob_cache_size > 0) {
LRUCacheOptions co;
co.capacity = FLAGS_blob_cache_size;
co.num_shard_bits = FLAGS_blob_cache_numshardbits;
options.blob_cache = NewLRUCache(co);
} else {
fprintf(stderr,
"Unable to create a standalone blob cache if blob_cache_size "
"<= 0.\n");
exit(1);
}
}
switch (FLAGS_prepopulate_blob_cache) {
case 0:
options.prepopulate_blob_cache = PrepopulateBlobCache::kDisable;
break;
case 1:
options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly;
break;
default:
fprintf(stderr, "Unknown prepopulate blob cache mode\n");
exit(1);
}
fprintf(
stdout,
"Integrated BlobDB: blob cache enabled, block and blob caches "
"shared: %d, blob cache size %" PRIu64
", blob cache num shard bits: %d, hot/warm blobs prepopulated: %d\n",
FLAGS_use_shared_block_and_blob_cache, FLAGS_blob_cache_size,
FLAGS_blob_cache_numshardbits, FLAGS_prepopulate_blob_cache);
} else {
fprintf(stdout, "Integrated BlobDB: blob cache disabled\n");
}
#ifndef ROCKSDB_LITE
if (FLAGS_readonly && FLAGS_transaction_db) {
fprintf(stderr, "Cannot use readonly flag with transaction_db\n");

Loading…
Cancel
Save