Account memory of big memory users in BlockBasedTable in global memory limit (#9748)

Summary:
**Context:**
Through heap profiling, we discovered that `BlockBasedTableReader` objects can accumulate and lead to high memory usage (e.g, `max_open_file = -1`). These memories are currently not saved, not tracked, not constrained and not cache evict-able. As a first step to improve this, similar to https://github.com/facebook/rocksdb/pull/8428,  this PR is to track an estimate of `BlockBasedTableReader` object's memory in block cache and fail future creation if the memory usage exceeds the available space of cache at the time of creation.

**Summary:**
- Approximate big memory users  (`BlockBasedTable::Rep` and `TableProperties` )' memory usage in addition to the existing estimated ones (filter block/index block/un-compression dictionary)
- Charge all of these memory usages to block cache on `BlockBasedTable::Open()` and release them on `~BlockBasedTable()` as there is no memory usage fluctuation of concern in between
- Refactor on CacheReservationManager (and its call-sites) to add concurrent support for BlockBasedTable  used in this PR.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9748

Test Plan:
- New unit tests
- db bench: `OpenDb` : **-0.52% in ms**
  - Setup `./db_bench -benchmarks=fillseq -db=/dev/shm/testdb -disable_auto_compactions=1 -write_buffer_size=1048576`
  - Repeated run with pre-change w/o feature and post-change with feature, benchmark `OpenDb`:  `./db_bench -benchmarks=readrandom -use_existing_db=1 -db=/dev/shm/testdb -reserve_table_reader_memory=true (remove this when running w/o feature) -file_opening_threads=3 -open_files=-1 -report_open_timing=true| egrep 'OpenDb:'`

#-run | (feature-off) avg milliseconds | std milliseconds | (feature-on) avg milliseconds | std milliseconds | change (%)
-- | -- | -- | -- | -- | --
10 | 11.4018 | 5.95173 | 9.47788 | 1.57538 | -16.87382694
20 | 9.23746 | 0.841053 | 9.32377 | 1.14074 | 0.9343477536
40 | 9.0876 | 0.671129 | 9.35053 | 1.11713 | 2.893283155
80 | 9.72514 | 2.28459 | 9.52013 | 1.0894 | -2.108041632
160 | 9.74677 | 0.991234 | 9.84743 | 1.73396 | 1.032752389
320 | 10.7297 | 5.11555 | 10.547 | 1.97692 | **-1.70275031**
640 | 11.7092 | 2.36565 | 11.7869 | 2.69377 | **0.6635807741**

-  db bench on write with cost to cache in WriteBufferManager (just in case this PR's CRM refactoring accidentally slows down anything in WBM) : `fillseq` : **+0.54% in micros/op**
`./db_bench -benchmarks=fillseq -db=/dev/shm/testdb -disable_auto_compactions=1 -cost_write_buffer_to_cache=true -write_buffer_size=10000000000 | egrep 'fillseq'`

#-run | (pre-PR) avg micros/op | std micros/op | (post-PR)  avg micros/op | std micros/op | change (%)
-- | -- | -- | -- | -- | --
10 | 6.15 | 0.260187 | 6.289 | 0.371192 | 2.260162602
20 | 7.28025 | 0.465402 | 7.37255 | 0.451256 | 1.267813605
40 | 7.06312 | 0.490654 | 7.13803 | 0.478676 | **1.060579461**
80 | 7.14035 | 0.972831 | 7.14196 | 0.92971 | **0.02254791432**

-  filter bench: `bloom filter`: **-0.78% in ms/key**
    - ` ./filter_bench -impl=2 -quick -reserve_table_builder_memory=true | grep 'Build avg'`

#-run | (pre-PR) avg ns/key | std ns/key | (post-PR)  ns/key | std ns/key | change (%)
-- | -- | -- | -- | -- | --
10 | 26.4369 | 0.442182 | 26.3273 | 0.422919 | **-0.4145720565**
20 | 26.4451 | 0.592787 | 26.1419 | 0.62451 | **-1.1465262**

- Crash test `python3 tools/db_crashtest.py blackbox --reserve_table_reader_memory=1 --cache_size=1` killed as normal

Reviewed By: ajkr

Differential Revision: D35136549

Pulled By: hx235

fbshipit-source-id: 146978858d0f900f43f4eb09bfd3e83195e3be28
main
Hui Xiao 3 years ago committed by Facebook GitHub Bot
parent 633b7f15d5
commit 49623f9c8e
  1. 1
      HISTORY.md
  2. 2
      cache/cache_entry_roles.cc
  3. 3
      cache/cache_entry_roles.h
  4. 123
      cache/cache_reservation_manager.cc
  5. 195
      cache/cache_reservation_manager.h
  6. 127
      cache/cache_reservation_manager_test.cc
  7. 29
      db/db_bloom_filter_test.cc
  8. 65
      db/db_sst_test.cc
  9. 2
      db/db_table_properties_test.cc
  10. 1
      db_stress_tool/db_stress_common.h
  11. 5
      db_stress_tool/db_stress_gflags.cc
  12. 2
      db_stress_tool/db_stress_test_base.cc
  13. 4
      include/rocksdb/options.h
  14. 27
      include/rocksdb/table.h
  15. 4
      include/rocksdb/table_properties.h
  16. 2
      include/rocksdb/write_buffer_manager.h
  17. 15
      memtable/write_buffer_manager.cc
  18. 1
      options/options_settable_test.cc
  19. 2
      options/options_test.cc
  20. 16
      table/block_based/block_based_table_builder.cc
  21. 27
      table/block_based/block_based_table_factory.cc
  22. 2
      table/block_based/block_based_table_factory.h
  23. 24
      table/block_based/block_based_table_reader.cc
  24. 18
      table/block_based/block_based_table_reader.h
  25. 402
      table/block_based/block_based_table_reader_test.cc
  26. 50
      table/block_based/filter_policy.cc
  27. 31
      table/table_properties.cc
  28. 7
      tools/db_bench_tool.cc
  29. 1
      tools/db_crashtest.py
  30. 18
      util/bloom_test.cc

@ -12,6 +12,7 @@
### New Features
* For db_bench when --seed=0 or --seed is not set then it uses the current time as the seed value. Previously it used the value 1000.
* For db_bench when --benchmark lists multiple tests and each test uses a seed for a RNG then the seeds across tests will no longer be repeated.
* Added an option to dynamically charge an updating estimated memory usage of block-based table reader to block cache if block cache available. To enable this feature, set `BlockBasedTableOptions::reserve_table_reader_memory = true`.
### Behavior changes
* Disallow usage of commit-time-write-batch for write-prepared/write-unprepared transactions if TransactionOptions::use_only_the_last_commit_time_batch_for_recovery is false to prevent two (or more) uncommitted versions of the same key in the database. Otherwise, bottommost compaction may violate the internal key uniqueness invariant of SSTs if the sequence numbers of both internal keys are zeroed out (#9794).

@ -21,6 +21,7 @@ std::array<const char*, kNumCacheEntryRoles> kCacheEntryRoleToCamelString{{
"WriteBuffer",
"CompressionDictionaryBuildingBuffer",
"FilterConstruction",
"BlockBasedTableReader",
"Misc",
}};
@ -34,6 +35,7 @@ std::array<const char*, kNumCacheEntryRoles> kCacheEntryRoleToHyphenString{{
"write-buffer",
"compression-dictionary-building-buffer",
"filter-construction",
"block-based-table-reader",
"misc",
}};

@ -39,6 +39,9 @@ enum class CacheEntryRole {
// Filter reservations to account for
// (new) bloom and ribbon filter construction's memory usage
kFilterConstruction,
// BlockBasedTableReader reservations to account for
// its memory usage
kBlockBasedTableReader,
// Default bucket, for miscellaneous cache entries. Do not use for
// entries that could potentially add up to large usage.
kMisc,

@ -17,12 +17,30 @@
#include "rocksdb/cache.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/reader_common.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
CacheReservationManager::CacheReservationManager(std::shared_ptr<Cache> cache,
bool delayed_decrease)
template <CacheEntryRole R>
CacheReservationManagerImpl<R>::CacheReservationHandle::CacheReservationHandle(
std::size_t incremental_memory_used,
std::shared_ptr<CacheReservationManagerImpl> cache_res_mgr)
: incremental_memory_used_(incremental_memory_used) {
assert(cache_res_mgr);
cache_res_mgr_ = cache_res_mgr;
}
template <CacheEntryRole R>
CacheReservationManagerImpl<
R>::CacheReservationHandle::~CacheReservationHandle() {
Status s = cache_res_mgr_->ReleaseCacheReservation(incremental_memory_used_);
s.PermitUncheckedError();
}
template <CacheEntryRole R>
CacheReservationManagerImpl<R>::CacheReservationManagerImpl(
std::shared_ptr<Cache> cache, bool delayed_decrease)
: delayed_decrease_(delayed_decrease),
cache_allocated_size_(0),
memory_used_(0) {
@ -30,14 +48,15 @@ CacheReservationManager::CacheReservationManager(std::shared_ptr<Cache> cache,
cache_ = cache;
}
CacheReservationManager::~CacheReservationManager() {
template <CacheEntryRole R>
CacheReservationManagerImpl<R>::~CacheReservationManagerImpl() {
for (auto* handle : dummy_handles_) {
cache_->Release(handle, true);
}
}
template <CacheEntryRole R>
Status CacheReservationManager::UpdateCacheReservation(
Status CacheReservationManagerImpl<R>::UpdateCacheReservation(
std::size_t new_mem_used) {
memory_used_ = new_mem_used;
std::size_t cur_cache_allocated_size =
@ -45,7 +64,7 @@ Status CacheReservationManager::UpdateCacheReservation(
if (new_mem_used == cur_cache_allocated_size) {
return Status::OK();
} else if (new_mem_used > cur_cache_allocated_size) {
Status s = IncreaseCacheReservation<R>(new_mem_used);
Status s = IncreaseCacheReservation(new_mem_used);
return s;
} else {
// In delayed decrease mode, we don't decrease cache reservation
@ -66,41 +85,32 @@ Status CacheReservationManager::UpdateCacheReservation(
}
}
// Explicitly instantiate templates for "CacheEntryRole" values we use.
// This makes it possible to keep the template definitions in the .cc file.
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kWriteBuffer>(std::size_t new_mem_used);
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
std::size_t new_mem_used);
// For cache reservation manager unit tests
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kMisc>(std::size_t new_mem_used);
template <CacheEntryRole R>
Status CacheReservationManager::MakeCacheReservation(
Status CacheReservationManagerImpl<R>::MakeCacheReservation(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationHandle<R>>* handle) {
assert(handle != nullptr);
std::unique_ptr<CacheReservationManager::CacheReservationHandle>* handle) {
assert(handle);
Status s =
UpdateCacheReservation<R>(GetTotalMemoryUsed() + incremental_memory_used);
(*handle).reset(new CacheReservationHandle<R>(incremental_memory_used,
shared_from_this()));
UpdateCacheReservation(GetTotalMemoryUsed() + incremental_memory_used);
(*handle).reset(new CacheReservationManagerImpl::CacheReservationHandle(
incremental_memory_used,
std::enable_shared_from_this<
CacheReservationManagerImpl<R>>::shared_from_this()));
return s;
}
template Status
CacheReservationManager::MakeCacheReservation<CacheEntryRole::kMisc>(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationHandle<CacheEntryRole::kMisc>>* handle);
template Status CacheReservationManager::MakeCacheReservation<
CacheEntryRole::kFilterConstruction>(
std::size_t incremental_memory_used,
std::unique_ptr<
CacheReservationHandle<CacheEntryRole::kFilterConstruction>>* handle);
template <CacheEntryRole R>
Status CacheReservationManagerImpl<R>::ReleaseCacheReservation(
std::size_t incremental_memory_used) {
assert(GetTotalMemoryUsed() >= incremental_memory_used);
std::size_t updated_total_mem_used =
GetTotalMemoryUsed() - incremental_memory_used;
Status s = UpdateCacheReservation(updated_total_mem_used);
return s;
}
template <CacheEntryRole R>
Status CacheReservationManager::IncreaseCacheReservation(
Status CacheReservationManagerImpl<R>::IncreaseCacheReservation(
std::size_t new_mem_used) {
Status return_status = Status::OK();
while (new_mem_used > cache_allocated_size_.load(std::memory_order_relaxed)) {
@ -118,7 +128,8 @@ Status CacheReservationManager::IncreaseCacheReservation(
return return_status;
}
Status CacheReservationManager::DecreaseCacheReservation(
template <CacheEntryRole R>
Status CacheReservationManagerImpl<R>::DecreaseCacheReservation(
std::size_t new_mem_used) {
Status return_status = Status::OK();
@ -137,15 +148,18 @@ Status CacheReservationManager::DecreaseCacheReservation(
return return_status;
}
std::size_t CacheReservationManager::GetTotalReservedCacheSize() {
template <CacheEntryRole R>
std::size_t CacheReservationManagerImpl<R>::GetTotalReservedCacheSize() {
return cache_allocated_size_.load(std::memory_order_relaxed);
}
std::size_t CacheReservationManager::GetTotalMemoryUsed() {
template <CacheEntryRole R>
std::size_t CacheReservationManagerImpl<R>::GetTotalMemoryUsed() {
return memory_used_;
}
Slice CacheReservationManager::GetNextCacheKey() {
template <CacheEntryRole R>
Slice CacheReservationManagerImpl<R>::GetNextCacheKey() {
// Calling this function will have the side-effect of changing the
// underlying cache_key_ that is shared among other keys generated from this
// fucntion. Therefore please make sure the previous keys are saved/copied
@ -155,34 +169,15 @@ Slice CacheReservationManager::GetNextCacheKey() {
}
template <CacheEntryRole R>
Cache::DeleterFn CacheReservationManager::TEST_GetNoopDeleterForRole() {
Cache::DeleterFn CacheReservationManagerImpl<R>::TEST_GetNoopDeleterForRole() {
return GetNoopDeleterForRole<R>();
}
template Cache::DeleterFn CacheReservationManager::TEST_GetNoopDeleterForRole<
CacheEntryRole::kFilterConstruction>();
template <CacheEntryRole R>
CacheReservationHandle<R>::CacheReservationHandle(
std::size_t incremental_memory_used,
std::shared_ptr<CacheReservationManager> cache_res_mgr)
: incremental_memory_used_(incremental_memory_used) {
assert(cache_res_mgr != nullptr);
cache_res_mgr_ = cache_res_mgr;
}
template <CacheEntryRole R>
CacheReservationHandle<R>::~CacheReservationHandle() {
assert(cache_res_mgr_ != nullptr);
assert(cache_res_mgr_->GetTotalMemoryUsed() >= incremental_memory_used_);
Status s = cache_res_mgr_->UpdateCacheReservation<R>(
cache_res_mgr_->GetTotalMemoryUsed() - incremental_memory_used_);
s.PermitUncheckedError();
}
// Explicitly instantiate templates for "CacheEntryRole" values we use.
// This makes it possible to keep the template definitions in the .cc file.
template class CacheReservationHandle<CacheEntryRole::kMisc>;
template class CacheReservationHandle<CacheEntryRole::kFilterConstruction>;
template class CacheReservationManagerImpl<
CacheEntryRole::kBlockBasedTableReader>;
template class CacheReservationManagerImpl<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>;
template class CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>;
template class CacheReservationManagerImpl<CacheEntryRole::kMisc>;
template class CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>;
} // namespace ROCKSDB_NAMESPACE

@ -13,54 +13,90 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <vector>
#include "cache/cache_entry_roles.h"
#include "cache/cache_key.h"
#include "rocksdb/cache.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "table/block_based/block_based_table_reader.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
// CacheReservationManager is an interface for reserving cache space for the
// memory used
class CacheReservationManager {
public:
// CacheReservationHandle is for managing the lifetime of a cache reservation
// for an incremental amount of memory used (i.e, incremental_memory_used)
class CacheReservationHandle {
public:
virtual ~CacheReservationHandle() {}
};
virtual ~CacheReservationManager() {}
virtual Status UpdateCacheReservation(std::size_t new_memory_used) = 0;
virtual Status MakeCacheReservation(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
*handle) = 0;
virtual std::size_t GetTotalReservedCacheSize() = 0;
virtual std::size_t GetTotalMemoryUsed() = 0;
};
template <CacheEntryRole R>
class CacheReservationHandle;
// CacheReservationManager is for reserving cache space for the memory used
// through inserting/releasing dummy entries in the cache.
// CacheReservationManagerImpl implements interface CacheReservationManager
// for reserving cache space for the memory used by inserting/releasing dummy
// entries in the cache.
//
// This class is NOT thread-safe, except that GetTotalReservedCacheSize()
// can be called without external synchronization.
class CacheReservationManager
: public std::enable_shared_from_this<CacheReservationManager> {
template <CacheEntryRole R>
class CacheReservationManagerImpl
: public CacheReservationManager,
public std::enable_shared_from_this<CacheReservationManagerImpl<R>> {
public:
// Construct a CacheReservationManager
class CacheReservationHandle
: public CacheReservationManager::CacheReservationHandle {
public:
CacheReservationHandle(
std::size_t incremental_memory_used,
std::shared_ptr<CacheReservationManagerImpl> cache_res_mgr);
~CacheReservationHandle() override;
private:
std::size_t incremental_memory_used_;
std::shared_ptr<CacheReservationManagerImpl> cache_res_mgr_;
};
// Construct a CacheReservationManagerImpl
// @param cache The cache where dummy entries are inserted and released for
// reserving cache space
// @param delayed_decrease If set true, then dummy entries won't be released
// immediately when memory usage decreases.
// immediately when memory usage decreases.
// Instead, it will be released when the memory usage
// decreases to 3/4 of what we have reserved so far.
// This is for saving some future dummy entry
// insertion when memory usage increases are likely to
// happen in the near future.
explicit CacheReservationManager(std::shared_ptr<Cache> cache,
bool delayed_decrease = false);
//
// REQUIRED: cache is not nullptr
explicit CacheReservationManagerImpl(std::shared_ptr<Cache> cache,
bool delayed_decrease = false);
// no copy constructor, copy assignment, move constructor, move assignment
CacheReservationManager(const CacheReservationManager &) = delete;
CacheReservationManager &operator=(const CacheReservationManager &) = delete;
CacheReservationManager(CacheReservationManager &&) = delete;
CacheReservationManager &operator=(CacheReservationManager &&) = delete;
~CacheReservationManager();
CacheReservationManagerImpl(const CacheReservationManagerImpl &) = delete;
CacheReservationManagerImpl &operator=(const CacheReservationManagerImpl &) =
delete;
CacheReservationManagerImpl(CacheReservationManagerImpl &&) = delete;
CacheReservationManagerImpl &operator=(CacheReservationManagerImpl &&) =
delete;
template <CacheEntryRole R>
~CacheReservationManagerImpl() override;
// One of the two ways of reserving/releasing cache,
// see CacheReservationManager::MakeCacheReservation() for the other.
// Use ONLY one of them to prevent unexpected behavior.
// One of the two ways of reserving/releasing cache space,
// see MakeCacheReservation() for the other.
//
// Use ONLY one of these two ways to prevent unexpected behavior.
//
// Insert and release dummy entries in the cache to
// match the size of total dummy entries with the least multiple of
@ -90,11 +126,13 @@ class CacheReservationManager
// Otherwise, it returns the first non-ok status;
// On releasing dummy entries, it always returns Status::OK().
// On keeping dummy entries the same, it always returns Status::OK().
Status UpdateCacheReservation(std::size_t new_memory_used);
Status UpdateCacheReservation(std::size_t new_memory_used) override;
// One of the two ways of reserving/releasing cache,
// see CacheReservationManager::UpdateCacheReservation() for the other.
// Use ONLY one of them to prevent unexpected behavior.
// One of the two ways of reserving cache space and releasing is done through
// destruction of CacheReservationHandle.
// See UpdateCacheReservation() for the other way.
//
// Use ONLY one of these two ways to prevent unexpected behavior.
//
// Insert dummy entries in the cache for the incremental memory usage
// to match the size of total dummy entries with the least multiple of
@ -118,21 +156,19 @@ class CacheReservationManager
// calling MakeCacheReservation() is needed if you want
// GetTotalMemoryUsed() indeed returns the latest memory used.
//
// @param handle An pointer to std::unique_ptr<CacheReservationHandle<R>> that
// manages the lifetime of the handle and its cache reservation.
// @param handle An pointer to std::unique_ptr<CacheReservationHandle> that
// manages the lifetime of the cache reservation represented by the
// handle.
//
// @return It returns Status::OK() if all dummy
// entry insertions succeed.
// Otherwise, it returns the first non-ok status;
//
// REQUIRES: handle != nullptr
// REQUIRES: The CacheReservationManager object is NOT managed by
// std::unique_ptr as CacheReservationHandle needs to
// shares ownership to the CacheReservationManager object.
template <CacheEntryRole R>
Status MakeCacheReservation(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationHandle<R>> *handle);
std::unique_ptr<CacheReservationManager::CacheReservationHandle> *handle)
override;
// Return the size of the cache (which is a multiple of kSizeDummyEntry)
// successfully reserved by calling UpdateCacheReservation().
@ -142,25 +178,25 @@ class CacheReservationManager
// smaller number than the actual reserved cache size due to
// the returned number will always be a multiple of kSizeDummyEntry
// and cache full might happen in the middle of inserting a dummy entry.
std::size_t GetTotalReservedCacheSize();
std::size_t GetTotalReservedCacheSize() override;
// Return the latest total memory used indicated by the most recent call of
// UpdateCacheReservation(std::size_t new_memory_used);
std::size_t GetTotalMemoryUsed();
std::size_t GetTotalMemoryUsed() override;
static constexpr std::size_t GetDummyEntrySize() { return kSizeDummyEntry; }
// For testing only - it is to help ensure the NoopDeleterForRole<R>
// accessed from CacheReservationManager and the one accessed from the test
// are from the same translation units
template <CacheEntryRole R>
// accessed from CacheReservationManagerImpl and the one accessed from the
// test are from the same translation units
static Cache::DeleterFn TEST_GetNoopDeleterForRole();
private:
static constexpr std::size_t kSizeDummyEntry = 256 * 1024;
Slice GetNextCacheKey();
template <CacheEntryRole R>
Status ReleaseCacheReservation(std::size_t incremental_memory_used);
Status IncreaseCacheReservation(std::size_t new_mem_used);
Status DecreaseCacheReservation(std::size_t new_mem_used);
@ -172,20 +208,81 @@ class CacheReservationManager
CacheKey cache_key_;
};
// CacheReservationHandle is for managing the lifetime of a cache reservation
// This class is NOT thread-safe
template <CacheEntryRole R>
class CacheReservationHandle {
class ConcurrentCacheReservationManager
: public CacheReservationManager,
public std::enable_shared_from_this<ConcurrentCacheReservationManager> {
public:
// REQUIRES: cache_res_mgr != nullptr
explicit CacheReservationHandle(
std::size_t incremental_memory_used,
std::shared_ptr<CacheReservationManager> cache_res_mgr);
class CacheReservationHandle
: public CacheReservationManager::CacheReservationHandle {
public:
CacheReservationHandle(
std::shared_ptr<ConcurrentCacheReservationManager> cache_res_mgr,
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
cache_res_handle) {
assert(cache_res_mgr && cache_res_handle);
cache_res_mgr_ = cache_res_mgr;
cache_res_handle_ = std::move(cache_res_handle);
}
~CacheReservationHandle();
~CacheReservationHandle() override {
std::lock_guard<std::mutex> lock(cache_res_mgr_->cache_res_mgr_mu_);
cache_res_handle_.reset();
}
private:
std::shared_ptr<ConcurrentCacheReservationManager> cache_res_mgr_;
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
cache_res_handle_;
};
explicit ConcurrentCacheReservationManager(
std::shared_ptr<CacheReservationManager> cache_res_mgr) {
cache_res_mgr_ = std::move(cache_res_mgr);
}
ConcurrentCacheReservationManager(const ConcurrentCacheReservationManager &) =
delete;
ConcurrentCacheReservationManager &operator=(
const ConcurrentCacheReservationManager &) = delete;
ConcurrentCacheReservationManager(ConcurrentCacheReservationManager &&) =
delete;
ConcurrentCacheReservationManager &operator=(
ConcurrentCacheReservationManager &&) = delete;
~ConcurrentCacheReservationManager() override {}
inline Status UpdateCacheReservation(std::size_t new_memory_used) override {
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
return cache_res_mgr_->UpdateCacheReservation(new_memory_used);
}
inline Status MakeCacheReservation(
std::size_t incremental_memory_used,
std::unique_ptr<CacheReservationManager::CacheReservationHandle> *handle)
override {
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
wrapped_handle;
Status s;
{
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
s = cache_res_mgr_->MakeCacheReservation(incremental_memory_used,
&wrapped_handle);
}
(*handle).reset(
new ConcurrentCacheReservationManager::CacheReservationHandle(
std::enable_shared_from_this<
ConcurrentCacheReservationManager>::shared_from_this(),
std::move(wrapped_handle)));
return s;
}
inline std::size_t GetTotalReservedCacheSize() override {
return cache_res_mgr_->GetTotalReservedCacheSize();
}
inline std::size_t GetTotalMemoryUsed() override {
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
return cache_res_mgr_->GetTotalMemoryUsed();
}
private:
std::size_t incremental_memory_used_;
std::mutex cache_res_mgr_mu_;
std::shared_ptr<CacheReservationManager> cache_res_mgr_;
};
} // namespace ROCKSDB_NAMESPACE

@ -15,7 +15,6 @@
#include "cache/cache_entry_roles.h"
#include "rocksdb/cache.h"
#include "rocksdb/slice.h"
#include "table/block_based/block_based_table_reader.h"
#include "test_util/testharness.h"
#include "util/coding.h"
@ -23,25 +22,24 @@ namespace ROCKSDB_NAMESPACE {
class CacheReservationManagerTest : public ::testing::Test {
protected:
static constexpr std::size_t kSizeDummyEntry =
CacheReservationManager::GetDummyEntrySize();
CacheReservationManagerImpl<CacheEntryRole::kMisc>::GetDummyEntrySize();
static constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry;
static constexpr int kNumShardBits = 0; // 2^0 shard
static constexpr std::size_t kMetaDataChargeOverhead = 10000;
std::shared_ptr<Cache> cache = NewLRUCache(kCacheCapacity, kNumShardBits);
std::unique_ptr<CacheReservationManager> test_cache_rev_mng;
std::shared_ptr<CacheReservationManager> test_cache_rev_mng;
CacheReservationManagerTest() {
test_cache_rev_mng.reset(new CacheReservationManager(cache));
test_cache_rev_mng =
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache);
}
};
TEST_F(CacheReservationManagerTest, GenerateCacheKey) {
std::size_t new_mem_used = 1 * kSizeDummyEntry;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
ASSERT_EQ(s, Status::OK());
ASSERT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
ASSERT_LT(cache->GetPinnedUsage(),
@ -66,10 +64,7 @@ TEST_F(CacheReservationManagerTest, GenerateCacheKey) {
TEST_F(CacheReservationManagerTest, KeepCacheReservationTheSame) {
std::size_t new_mem_used = 1 * kSizeDummyEntry;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
1 * kSizeDummyEntry);
@ -79,9 +74,7 @@ TEST_F(CacheReservationManagerTest, KeepCacheReservationTheSame) {
ASSERT_LT(initial_pinned_usage,
1 * kSizeDummyEntry + kMetaDataChargeOverhead);
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK())
<< "Failed to keep cache reservation the same when new_mem_used equals "
"to current cache reservation";
@ -100,10 +93,7 @@ TEST_F(CacheReservationManagerTest, KeepCacheReservationTheSame) {
TEST_F(CacheReservationManagerTest,
IncreaseCacheReservationByMultiplesOfDummyEntrySize) {
std::size_t new_mem_used = 2 * kSizeDummyEntry;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK())
<< "Failed to increase cache reservation correctly";
EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
@ -121,10 +111,7 @@ TEST_F(CacheReservationManagerTest,
TEST_F(CacheReservationManagerTest,
IncreaseCacheReservationNotByMultiplesOfDummyEntrySize) {
std::size_t new_mem_used = 2 * kSizeDummyEntry + kSizeDummyEntry / 2;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK())
<< "Failed to increase cache reservation correctly";
EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
@ -143,7 +130,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
IncreaseCacheReservationOnFullCache) {
;
constexpr std::size_t kSizeDummyEntry =
CacheReservationManager::GetDummyEntrySize();
CacheReservationManagerImpl<CacheEntryRole::kMisc>::GetDummyEntrySize();
constexpr std::size_t kSmallCacheCapacity = 4 * kSizeDummyEntry;
constexpr std::size_t kBigCacheCapacity = 4096 * kSizeDummyEntry;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
@ -153,14 +140,12 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
lo.num_shard_bits = 0; // 2^0 shard
lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache = NewLRUCache(lo);
std::unique_ptr<CacheReservationManager> test_cache_rev_mng(
new CacheReservationManager(cache));
std::shared_ptr<CacheReservationManager> test_cache_rev_mng =
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache);
std::size_t new_mem_used = kSmallCacheCapacity + 1;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::Incomplete())
<< "Failed to return status to indicate failure of dummy entry insertion "
"during cache reservation on full cache";
@ -183,9 +168,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
"encountering cache resevation failure due to full cache";
new_mem_used = kSmallCacheCapacity / 2; // 2 dummy entries
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK())
<< "Failed to decrease cache reservation after encountering cache "
"reservation failure due to full cache";
@ -207,9 +190,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
// Create cache full again for subsequent tests
new_mem_used = kSmallCacheCapacity + 1;
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::Incomplete())
<< "Failed to return status to indicate failure of dummy entry insertion "
"during cache reservation on full cache";
@ -235,9 +216,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
// succeed
cache->SetCapacity(kBigCacheCapacity);
new_mem_used = kSmallCacheCapacity + 1;
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK())
<< "Failed to increase cache reservation after increasing cache capacity "
"and mitigating cache full error";
@ -259,10 +238,7 @@ TEST(CacheReservationManagerIncreaseReservcationOnFullCacheTest,
TEST_F(CacheReservationManagerTest,
DecreaseCacheReservationByMultiplesOfDummyEntrySize) {
std::size_t new_mem_used = 2 * kSizeDummyEntry;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
2 * kSizeDummyEntry);
@ -272,9 +248,7 @@ TEST_F(CacheReservationManagerTest,
2 * kSizeDummyEntry + kMetaDataChargeOverhead);
new_mem_used = 1 * kSizeDummyEntry;
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK())
<< "Failed to decrease cache reservation correctly";
EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
@ -292,10 +266,7 @@ TEST_F(CacheReservationManagerTest,
TEST_F(CacheReservationManagerTest,
DecreaseCacheReservationNotByMultiplesOfDummyEntrySize) {
std::size_t new_mem_used = 2 * kSizeDummyEntry;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
2 * kSizeDummyEntry);
@ -305,9 +276,7 @@ TEST_F(CacheReservationManagerTest,
2 * kSizeDummyEntry + kMetaDataChargeOverhead);
new_mem_used = kSizeDummyEntry / 2;
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK())
<< "Failed to decrease cache reservation correctly";
EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
@ -325,7 +294,7 @@ TEST_F(CacheReservationManagerTest,
TEST(CacheReservationManagerWithDelayedDecreaseTest,
DecreaseCacheReservationWithDelayedDecrease) {
constexpr std::size_t kSizeDummyEntry =
CacheReservationManager::GetDummyEntrySize();
CacheReservationManagerImpl<CacheEntryRole::kMisc>::GetDummyEntrySize();
constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
@ -333,14 +302,12 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest,
lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0;
std::shared_ptr<Cache> cache = NewLRUCache(lo);
std::unique_ptr<CacheReservationManager> test_cache_rev_mng(
new CacheReservationManager(cache, true /* delayed_decrease */));
std::shared_ptr<CacheReservationManager> test_cache_rev_mng =
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache, true /* delayed_decrease */);
std::size_t new_mem_used = 8 * kSizeDummyEntry;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
ASSERT_EQ(s, Status::OK());
ASSERT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
8 * kSizeDummyEntry);
@ -351,9 +318,7 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest,
8 * kSizeDummyEntry + kMetaDataChargeOverhead);
new_mem_used = 6 * kSizeDummyEntry;
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK()) << "Failed to delay decreasing cache reservation";
EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
8 * kSizeDummyEntry)
@ -365,9 +330,7 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest,
<< "Failed to delay decreasing underlying dummy entries in cache";
new_mem_used = 7 * kSizeDummyEntry;
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK()) << "Failed to delay decreasing cache reservation";
EXPECT_EQ(test_cache_rev_mng->GetTotalReservedCacheSize(),
8 * kSizeDummyEntry)
@ -379,9 +342,7 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest,
<< "Failed to delay decreasing underlying dummy entries in cache";
new_mem_used = 6 * kSizeDummyEntry - 1;
s = test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
EXPECT_EQ(s, Status::OK())
<< "Failed to decrease cache reservation correctly when new_mem_used < "
"GetTotalReservedCacheSize() * 3 / 4 on delayed decrease mode";
@ -405,7 +366,7 @@ TEST(CacheReservationManagerWithDelayedDecreaseTest,
TEST(CacheReservationManagerDestructorTest,
ReleaseRemainingDummyEntriesOnDestruction) {
constexpr std::size_t kSizeDummyEntry =
CacheReservationManager::GetDummyEntrySize();
CacheReservationManagerImpl<CacheEntryRole::kMisc>::GetDummyEntrySize();
constexpr std::size_t kCacheCapacity = 4096 * kSizeDummyEntry;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
@ -414,13 +375,11 @@ TEST(CacheReservationManagerDestructorTest,
lo.num_shard_bits = 0;
std::shared_ptr<Cache> cache = NewLRUCache(lo);
{
std::unique_ptr<CacheReservationManager> test_cache_rev_mng(
new CacheReservationManager(cache));
std::shared_ptr<CacheReservationManager> test_cache_rev_mng =
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache);
std::size_t new_mem_used = 1 * kSizeDummyEntry;
Status s =
test_cache_rev_mng
->UpdateCacheReservation<ROCKSDB_NAMESPACE::CacheEntryRole::kMisc>(
new_mem_used);
Status s = test_cache_rev_mng->UpdateCacheReservation(new_mem_used);
ASSERT_EQ(s, Status::OK());
ASSERT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
ASSERT_LT(cache->GetPinnedUsage(),
@ -442,18 +401,19 @@ TEST(CacheReservationHandleTest, HandleTest) {
std::shared_ptr<Cache> cache = NewLRUCache(lo);
std::shared_ptr<CacheReservationManager> test_cache_rev_mng(
std::make_shared<CacheReservationManager>(cache));
std::make_shared<CacheReservationManagerImpl<CacheEntryRole::kMisc>>(
cache));
std::size_t mem_used = 0;
const std::size_t incremental_mem_used_handle_1 = 1 * kSizeDummyEntry;
const std::size_t incremental_mem_used_handle_2 = 2 * kSizeDummyEntry;
std::unique_ptr<CacheReservationHandle<CacheEntryRole::kMisc>> handle_1,
std::unique_ptr<CacheReservationManager::CacheReservationHandle> handle_1,
handle_2;
// To test consecutive CacheReservationManager::MakeCacheReservation works
// correctly in terms of returning the handle as well as updating cache
// reservation and the latest total memory used
Status s = test_cache_rev_mng->MakeCacheReservation<CacheEntryRole::kMisc>(
Status s = test_cache_rev_mng->MakeCacheReservation(
incremental_mem_used_handle_1, &handle_1);
mem_used = mem_used + incremental_mem_used_handle_1;
ASSERT_EQ(s, Status::OK());
@ -463,8 +423,8 @@ TEST(CacheReservationHandleTest, HandleTest) {
EXPECT_GE(cache->GetPinnedUsage(), mem_used);
EXPECT_LT(cache->GetPinnedUsage(), mem_used + kMetaDataChargeOverhead);
s = test_cache_rev_mng->MakeCacheReservation<CacheEntryRole::kMisc>(
incremental_mem_used_handle_2, &handle_2);
s = test_cache_rev_mng->MakeCacheReservation(incremental_mem_used_handle_2,
&handle_2);
mem_used = mem_used + incremental_mem_used_handle_2;
ASSERT_EQ(s, Status::OK());
EXPECT_TRUE(handle_2 != nullptr);
@ -473,8 +433,9 @@ TEST(CacheReservationHandleTest, HandleTest) {
EXPECT_GE(cache->GetPinnedUsage(), mem_used);
EXPECT_LT(cache->GetPinnedUsage(), mem_used + kMetaDataChargeOverhead);
// To test CacheReservationHandle::~CacheReservationHandle() works correctly
// in releasing the cache reserved for the handle
// To test
// CacheReservationManager::CacheReservationHandle::~CacheReservationHandle()
// works correctly in releasing the cache reserved for the handle
handle_1.reset();
EXPECT_TRUE(handle_1 == nullptr);
mem_used = mem_used - incremental_mem_used_handle_1;

@ -952,8 +952,8 @@ class FilterConstructResPeakTrackingCache : public CacheWrapper {
const Cache::DeleterFn
FilterConstructResPeakTrackingCache::kNoopDeleterForFilterConstruction =
CacheReservationManager::TEST_GetNoopDeleterForRole<
CacheEntryRole::kFilterConstruction>();
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::TEST_GetNoopDeleterForRole();
// To align with the type of hash entry being reserved in implementation.
using FilterConstructionReserveMemoryHash = uint64_t;
@ -983,7 +983,9 @@ class DBFilterConstructionReserveMemoryTestWithParam
// trigger at least 1 dummy entry reservation each for hash entries and
// final filter, we need a large number of keys to ensure we have at least
// two partitions.
num_key_ = 18 * CacheReservationManager::GetDummyEntrySize() /
num_key_ = 18 *
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() /
sizeof(FilterConstructionReserveMemoryHash);
} else if (policy_ == kFastLocalBloom) {
// For Bloom Filter + FullFilter case, since we design the num_key_ to
@ -993,7 +995,9 @@ class DBFilterConstructionReserveMemoryTestWithParam
// behavior and we don't need a large number of keys to verify we
// indeed charge the final filter for cache reservation, even though final
// filter is a lot smaller than hash entries.
num_key_ = 1 * CacheReservationManager::GetDummyEntrySize() /
num_key_ = 1 *
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() /
sizeof(FilterConstructionReserveMemoryHash);
} else {
// For Ribbon Filter + FullFilter case, we need a large enough number of
@ -1001,7 +1005,9 @@ class DBFilterConstructionReserveMemoryTestWithParam
// reservation will trigger at least another dummy entry (or equivalently
// to saying, causing another peak in cache reservation) as banding
// reservation might not be a multiple of dummy entry.
num_key_ = 12 * CacheReservationManager::GetDummyEntrySize() /
num_key_ = 12 *
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() /
sizeof(FilterConstructionReserveMemoryHash);
}
}
@ -1156,8 +1162,8 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) {
return;
}
const std::size_t kDummyEntrySize =
CacheReservationManager::GetDummyEntrySize();
const std::size_t kDummyEntrySize = CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize();
const std::size_t predicted_hash_entries_cache_res =
num_key * sizeof(FilterConstructionReserveMemoryHash);
@ -1345,9 +1351,12 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) {
*
*/
if (!partition_filters) {
ASSERT_GE(std::floor(1.0 * predicted_final_filter_cache_res /
CacheReservationManager::GetDummyEntrySize()),
1)
ASSERT_GE(
std::floor(
1.0 * predicted_final_filter_cache_res /
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize()),
1)
<< "Final filter cache reservation too small for this test - please "
"increase the number of keys";
if (!detect_filter_construct_corruption) {

@ -11,7 +11,9 @@
#include "file/sst_file_manager_impl.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/cache.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/table.h"
#include "util/random.h"
namespace ROCKSDB_NAMESPACE {
@ -1393,6 +1395,69 @@ TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
}
}
TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) {
for (bool reserve_table_builder_memory : {true, false}) {
// Open DB with infinite max open files
// - First iteration use 1 thread to open files
// - Second iteration use 5 threads to open files
for (int iter = 0; iter < 2; iter++) {
Options options;
options.create_if_missing = true;
options.write_buffer_size = 100000;
options.disable_auto_compactions = true;
options.max_open_files = -1;
BlockBasedTableOptions table_options;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
if (iter == 0) {
options.max_file_opening_threads = 1;
} else {
options.max_file_opening_threads = 5;
}
DestroyAndReopen(options);
// Create 5 Files in L0 (then move then to L2)
for (int i = 0; i < 5; i++) {
std::string k = "L2_" + Key(i);
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
ASSERT_OK(Flush()) << i;
}
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
// Create 5 Files in L0
for (int i = 0; i < 5; i++) {
std::string k = "L0_" + Key(i);
ASSERT_OK(Put(k, k + std::string(1000, 'a')));
ASSERT_OK(Flush());
}
Close();
table_options.reserve_table_reader_memory = reserve_table_builder_memory;
table_options.block_cache =
NewLRUCache(1024 /* capacity */, 0 /* num_shard_bits */,
true /* strict_capacity_limit */);
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
// Reopening the DB will try to load all existing files, conditionally
// subject to memory limit
Status s = TryReopen(options);
if (table_options.reserve_table_reader_memory) {
EXPECT_TRUE(s.IsMemoryLimit());
EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
std::string::npos);
} else {
EXPECT_TRUE(s.ok());
ASSERT_EQ("5,0,5", FilesPerLevel(0));
}
}
}
}
TEST_F(DBSSTTest, GetTotalSstFilesSize) {
// We don't propagate oldest-key-time table property on compaction and
// just write 0 as default value. This affect the exact table size, since

@ -7,6 +7,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <memory>
#include <unordered_set>
#include <vector>
@ -18,6 +19,7 @@
#include "rocksdb/utilities/table_properties_collectors.h"
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/table_properties_internal.h"
#include "test_util/testharness.h"
#include "test_util/testutil.h"
#include "util/random.h"

@ -134,6 +134,7 @@ DECLARE_int32(set_in_place_one_in);
DECLARE_int64(cache_size);
DECLARE_int32(cache_numshardbits);
DECLARE_bool(cache_index_and_filter_blocks);
DECLARE_bool(reserve_table_reader_memory);
DECLARE_int32(top_level_index_pinning);
DECLARE_int32(partition_pinning);
DECLARE_int32(unpartitioned_pinning);

@ -304,6 +304,11 @@ DEFINE_int32(cache_numshardbits, 6,
DEFINE_bool(cache_index_and_filter_blocks, false,
"True if indexes/filters should be cached in block cache.");
DEFINE_bool(reserve_table_reader_memory, false,
"A dynamically updating charge to block cache, loosely based on "
"the actual memory usage of table reader, will occur to account "
"the memory, if block cache available.");
DEFINE_int32(
top_level_index_pinning,
static_cast<int32_t>(ROCKSDB_NAMESPACE::PinningTier::kFallback),

@ -2319,6 +2319,8 @@ void StressTest::Open() {
block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.checksum = checksum_type_e;
block_based_options.block_size = FLAGS_block_size;
block_based_options.reserve_table_reader_memory =
FLAGS_reserve_table_reader_memory;
block_based_options.format_version =
static_cast<uint32_t>(FLAGS_format_version);
block_based_options.index_block_restart_interval =

@ -551,6 +551,10 @@ struct DBOptions {
// on target_file_size_base and target_file_size_multiplier for level-based
// compaction. For universal-style compaction, you can usually set it to -1.
//
// A high value or -1 for this option can cause high memory usage.
// See BlockBasedTableOptions::reserve_table_reader_memory to constrain
// memory usage in case of block based table format.
//
// Default: -1
//
// Dynamically changeable through SetDBOptions() API.

@ -300,12 +300,35 @@ struct BlockBasedTableOptions {
//
// If additional temporary memory of Ribbon Filter uses up too much memory
// relative to the avaible space left in the block cache
// at some point (i.e, causing a cache full when strict_capacity_limit =
// true), construction will fall back to Bloom Filter.
// at some point (i.e, causing a cache full under
// LRUCacheOptions::strict_capacity_limit = true), construction will fall back
// to Bloom Filter.
//
// Default: false
bool reserve_table_builder_memory = false;
// If true, a dynamically updating charge to block cache, loosely based
// on the actual memory usage of table reader, will occur to account
// the memory, if block cache available.
//
// Charged memory usage includes:
// 1. Table properties
// 2. Index block/Filter block/Uncompression dictionary if stored in table
// reader (i.e, BlockBasedTableOptions::cache_index_and_filter_blocks ==
// false)
// 3. Some internal data structures
// 4. More to come...
//
// Note:
// If creation of a table reader uses up too much memory
// relative to the avaible space left in the block cache
// at some point (i.e, causing a cache full under
// LRUCacheOptions::strict_capacity_limit = true), such creation will fail
// with Status::MemoryLimit().
//
// Default: false
bool reserve_table_reader_memory = false;
// Note: currently this option requires kTwoLevelIndexSearch to be set as
// well.
// TODO(myabandeh): remove the note above once the limitation is lifted

@ -301,6 +301,10 @@ struct TableProperties {
// between tables. Keys match field names in this class instead
// of using full property names.
std::map<std::string, uint64_t> GetAggregatablePropertiesAsMap() const;
// Return the approximated memory usage of this TableProperties object,
// including memory used by the string properties and UserCollectedProperties
std::size_t ApproximateMemoryUsage() const;
};
// Extra properties

@ -158,7 +158,7 @@ class WriteBufferManager final {
std::atomic<size_t> memory_used_;
// Memory that hasn't been scheduled to free.
std::atomic<size_t> memory_active_;
std::unique_ptr<CacheReservationManager> cache_res_mgr_;
std::shared_ptr<CacheReservationManager> cache_res_mgr_;
// Protects cache_res_mgr_
std::mutex cache_res_mgr_mu_;

@ -9,6 +9,8 @@
#include "rocksdb/write_buffer_manager.h"
#include <memory>
#include "cache/cache_entry_roles.h"
#include "cache/cache_reservation_manager.h"
#include "db/db_impl/db_impl.h"
@ -31,8 +33,9 @@ WriteBufferManager::WriteBufferManager(size_t _buffer_size,
// Memtable's memory usage tends to fluctuate frequently
// therefore we set delayed_decrease = true to save some dummy entry
// insertion on memory increase right after memory decrease
cache_res_mgr_.reset(
new CacheReservationManager(cache, true /* delayed_decrease */));
cache_res_mgr_ = std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kWriteBuffer>>(
cache, true /* delayed_decrease */);
}
#else
(void)cache;
@ -75,9 +78,7 @@ void WriteBufferManager::ReserveMemWithCache(size_t mem) {
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
Status s =
cache_res_mgr_->UpdateCacheReservation<CacheEntryRole::kWriteBuffer>(
new_mem_used);
Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
// We absorb the error since WriteBufferManager is not able to handle
// this failure properly. Ideallly we should prevent this allocation
@ -114,9 +115,7 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) {
std::lock_guard<std::mutex> lock(cache_res_mgr_mu_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
Status s =
cache_res_mgr_->UpdateCacheReservation<CacheEntryRole::kWriteBuffer>(
new_mem_used);
Status s = cache_res_mgr_->UpdateCacheReservation(new_mem_used);
// We absorb the error since WriteBufferManager is not able to handle
// this failure properly.

@ -189,6 +189,7 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;detect_filter_"
"construct_corruption=false;"
"reserve_table_builder_memory=false;"
"reserve_table_reader_memory=false;"
"format_version=1;"
"verify_compression=true;read_amp_bytes_per_bit=0;"
"enable_index_compression=false;"

@ -855,6 +855,7 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
"block_size_deviation=8;block_restart_interval=4;"
"format_version=5;whole_key_filtering=1;"
"reserve_table_builder_memory=true;"
"reserve_table_reader_memory=true;"
"filter_policy=bloomfilter:4.567:false;detect_filter_construct_"
"corruption=true;"
// A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS
@ -877,6 +878,7 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
ASSERT_EQ(new_opt.whole_key_filtering, true);
ASSERT_EQ(new_opt.detect_filter_construct_corruption, true);
ASSERT_EQ(new_opt.reserve_table_builder_memory, true);
ASSERT_EQ(new_opt.reserve_table_reader_memory, true);
ASSERT_TRUE(new_opt.filter_policy != nullptr);
auto bfp = new_opt.filter_policy->CheckedCast<BloomFilterPolicy>();
ASSERT_NE(bfp, nullptr);

@ -328,7 +328,7 @@ struct BlockBasedTableBuilder::Rep {
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
uint64_t buffer_limit;
std::unique_ptr<CacheReservationManager>
std::shared_ptr<CacheReservationManager>
compression_dict_buffer_cache_res_mgr;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
@ -462,10 +462,12 @@ struct BlockBasedTableBuilder::Rep {
compression_opts.max_dict_buffer_bytes);
}
if (table_options.no_block_cache || table_options.block_cache == nullptr) {
compression_dict_buffer_cache_res_mgr.reset(nullptr);
compression_dict_buffer_cache_res_mgr = nullptr;
} else {
compression_dict_buffer_cache_res_mgr.reset(
new CacheReservationManager(table_options.block_cache));
compression_dict_buffer_cache_res_mgr =
std::make_shared<CacheReservationManagerImpl<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
table_options.block_cache);
}
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type));
@ -946,8 +948,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
if (!exceeds_buffer_limit &&
r->compression_dict_buffer_cache_res_mgr != nullptr) {
Status s =
r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
r->data_begin_offset);
exceeds_global_block_cache_limit = s.IsIncomplete();
}
@ -1975,8 +1976,7 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
r->data_begin_offset = 0;
// Release all reserved cache for data block buffers
if (r->compression_dict_buffer_cache_res_mgr != nullptr) {
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
Status s = r->compression_dict_buffer_cache_res_mgr->UpdateCacheReservation(
r->data_begin_offset);
s.PermitUncheckedError();
}

@ -16,6 +16,7 @@
#include <string>
#include "cache/cache_entry_roles.h"
#include "cache/cache_reservation_manager.h"
#include "logging/logging.h"
#include "options/options_helper.h"
#include "port/port.h"
@ -330,6 +331,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct BlockBasedTableOptions, reserve_table_builder_memory),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"reserve_table_reader_memory",
{offsetof(struct BlockBasedTableOptions, reserve_table_reader_memory),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"skip_table_builder_flush",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}},
@ -419,6 +424,14 @@ BlockBasedTableFactory::BlockBasedTableFactory(
: table_options_(_table_options) {
InitializeOptions();
RegisterOptions(&table_options_, &block_based_table_type_info);
if (table_options_.reserve_table_reader_memory &&
table_options_.no_block_cache == false) {
table_reader_cache_res_mgr_.reset(new ConcurrentCacheReservationManager(
std::make_shared<CacheReservationManagerImpl<
CacheEntryRole::kBlockBasedTableReader>>(
table_options_.block_cache)));
}
}
void BlockBasedTableFactory::InitializeOptions() {
@ -582,10 +595,10 @@ Status BlockBasedTableFactory::NewTableReader(
return BlockBasedTable::Open(
ro, table_reader_options.ioptions, table_reader_options.env_options,
table_options_, table_reader_options.internal_comparator, std::move(file),
file_size, table_reader, table_reader_options.prefix_extractor,
prefetch_index_and_filter_in_cache, table_reader_options.skip_filters,
table_reader_options.level, table_reader_options.immortal,
table_reader_options.largest_seqno,
file_size, table_reader, table_reader_cache_res_mgr_,
table_reader_options.prefix_extractor, prefetch_index_and_filter_in_cache,
table_reader_options.skip_filters, table_reader_options.level,
table_reader_options.immortal, table_reader_options.largest_seqno,
table_reader_options.force_direct_prefetch, &tail_prefetch_stats_,
table_reader_options.block_cache_tracer,
table_reader_options.max_file_size_for_l0_meta_pin,
@ -620,6 +633,12 @@ Status BlockBasedTableFactory::ValidateOptions(
"Enable pin_l0_filter_and_index_blocks_in_cache, "
", but block cache is disabled");
}
if (table_options_.reserve_table_reader_memory &&
table_options_.no_block_cache) {
return Status::InvalidArgument(
"Enable reserve_table_reader_memory, "
", but block cache is disabled");
}
if (!IsSupportedFormatVersion(table_options_.format_version)) {
return Status::InvalidArgument(
"Unsupported BlockBasedTable format_version. Please check "

@ -13,6 +13,7 @@
#include <memory>
#include <string>
#include "cache/cache_reservation_manager.h"
#include "port/port.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/table.h"
@ -89,6 +90,7 @@ class BlockBasedTableFactory : public TableFactory {
private:
BlockBasedTableOptions table_options_;
std::shared_ptr<CacheReservationManager> table_reader_cache_res_mgr_;
mutable TailPrefetchStats tail_prefetch_stats_;
};

@ -11,6 +11,7 @@
#include <algorithm>
#include <array>
#include <limits>
#include <memory>
#include <string>
#include <unordered_set>
#include <utility>
@ -552,6 +553,7 @@ Status BlockBasedTable::Open(
const InternalKeyComparator& internal_comparator,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* table_reader,
std::shared_ptr<CacheReservationManager> table_reader_cache_res_mgr,
const std::shared_ptr<const SliceTransform>& prefix_extractor,
const bool prefetch_index_and_filter_in_cache, const bool skip_filters,
const int level, const bool immortal_table,
@ -715,10 +717,22 @@ Status BlockBasedTable::Open(
tail_prefetch_stats->RecordEffectiveSize(
static_cast<size_t>(file_size) - prefetch_buffer->min_offset_read());
}
}
*table_reader = std::move(new_table);
if (s.ok() && table_reader_cache_res_mgr) {
std::size_t mem_usage = new_table->ApproximateMemoryUsage();
s = table_reader_cache_res_mgr->MakeCacheReservation(
mem_usage, &(rep->table_reader_cache_res_handle));
if (s.IsIncomplete()) {
s = Status::MemoryLimit(
"Can't allocate BlockBasedTableReader due to memory limit based on "
"cache capacity for memory allocation");
}
}
if (s.ok()) {
*table_reader = std::move(new_table);
}
return s;
}
@ -1108,6 +1122,11 @@ std::shared_ptr<const TableProperties> BlockBasedTable::GetTableProperties()
size_t BlockBasedTable::ApproximateMemoryUsage() const {
size_t usage = 0;
if (rep_) {
usage += rep_->ApproximateMemoryUsage();
} else {
return usage;
}
if (rep_->filter) {
usage += rep_->filter->ApproximateMemoryUsage();
}
@ -1117,6 +1136,9 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const {
if (rep_->uncompression_dict_reader) {
usage += rep_->uncompression_dict_reader->ApproximateMemoryUsage();
}
if (rep_->table_properties) {
usage += rep_->table_properties->ApproximateMemoryUsage();
}
return usage;
}

@ -10,8 +10,11 @@
#pragma once
#include <cstdint>
#include <memory>
#include "cache/cache_entry_roles.h"
#include "cache/cache_key.h"
#include "cache/cache_reservation_manager.h"
#include "db/range_tombstone_fragmenter.h"
#include "file/filename.h"
#include "rocksdb/slice_transform.h"
@ -98,6 +101,8 @@ class BlockBasedTable : public TableReader {
const InternalKeyComparator& internal_key_comparator,
std::unique_ptr<RandomAccessFileReader>&& file, uint64_t file_size,
std::unique_ptr<TableReader>* table_reader,
std::shared_ptr<CacheReservationManager> table_reader_cache_res_mgr =
nullptr,
const std::shared_ptr<const SliceTransform>& prefix_extractor = nullptr,
bool prefetch_index_and_filter_in_cache = true, bool skip_filters = false,
int level = -1, const bool immortal_table = false,
@ -626,6 +631,9 @@ struct BlockBasedTable::Rep {
const bool immortal_table;
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
table_reader_cache_res_handle = nullptr;
SequenceNumber get_global_seqno(BlockType block_type) const {
return (block_type == BlockType::kFilter ||
block_type == BlockType::kCompressionDictionary)
@ -670,6 +678,16 @@ struct BlockBasedTable::Rep {
implicit_auto_readahead, async_io);
}
}
std::size_t ApproximateMemoryUsage() const {
std::size_t usage = 0;
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size(const_cast<BlockBasedTable::Rep*>(this));
#else
usage += sizeof(*this);
#endif // ROCKSDB_MALLOC_USABLE_SIZE
return usage;
}
};
// This is an adapter class for `WritableFile` to be used for `std::ostream`.

@ -5,11 +5,17 @@
#include "table/block_based/block_based_table_reader.h"
#include <memory>
#include <string>
#include "cache/cache_reservation_manager.h"
#include "db/db_test_util.h"
#include "db/table_properties_collector.h"
#include "file/file_util.h"
#include "options/options_helper.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/db.h"
#include "rocksdb/file_system.h"
#include "table/block_based/block_based_table_builder.h"
@ -22,33 +28,53 @@
namespace ROCKSDB_NAMESPACE {
class BlockBasedTableReaderTest
: public testing::Test,
public testing::WithParamInterface<std::tuple<
CompressionType, bool, BlockBasedTableOptions::IndexType, bool>> {
class BlockBasedTableReaderBaseTest : public testing::Test {
protected:
CompressionType compression_type_;
bool use_direct_reads_;
// Prepare key-value pairs to occupy multiple blocks.
// Each value is 256B, every 16 pairs constitute 1 block.
// If mixed_with_human_readable_string_value == true,
// then adjacent blocks contain values with different compression
// complexity: human readable strings are easier to compress than random
// strings.
static std::map<std::string, std::string> GenerateKVMap(
int num_block = 100,
bool mixed_with_human_readable_string_value = false) {
std::map<std::string, std::string> kv;
void SetUp() override {
BlockBasedTableOptions::IndexType index_type;
bool no_block_cache;
std::tie(compression_type_, use_direct_reads_, index_type, no_block_cache) =
GetParam();
Random rnd(101);
uint32_t key = 0;
for (int block = 0; block < num_block; block++) {
for (int i = 0; i < 16; i++) {
char k[9] = {0};
// Internal key is constructed directly from this key,
// and internal key size is required to be >= 8 bytes,
// so use %08u as the format string.
sprintf(k, "%08u", key);
std::string v;
if (mixed_with_human_readable_string_value) {
v = (block % 2) ? rnd.HumanReadableString(256)
: rnd.RandomString(256);
} else {
v = rnd.RandomString(256);
}
kv[std::string(k)] = v;
key++;
}
}
return kv;
}
void SetUp() override {
SetupSyncPointsToMockDirectIO();
test_dir_ = test::PerThreadDBPath("block_based_table_reader_test");
env_ = Env::Default();
fs_ = FileSystem::Default();
ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
BlockBasedTableOptions opts;
opts.index_type = index_type;
opts.no_block_cache = no_block_cache;
table_factory_.reset(
static_cast<BlockBasedTableFactory*>(NewBlockBasedTableFactory(opts)));
ConfigureTableFactory();
}
virtual void ConfigureTableFactory() = 0;
void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
// Creates a table with the specificied key value pairs (kv).
@ -59,18 +85,18 @@ class BlockBasedTableReaderTest
NewFileWriter(table_name, &writer);
// Create table builder.
Options options;
ImmutableOptions ioptions(options);
InternalKeyComparator comparator(options.comparator);
ImmutableOptions ioptions(options_);
InternalKeyComparator comparator(options_.comparator);
ColumnFamilyOptions cf_options;
MutableCFOptions moptions(cf_options);
IntTblPropCollectorFactories factories;
std::unique_ptr<TableBuilder> table_builder(table_factory_->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, comparator, &factories,
compression_type, CompressionOptions(),
0 /* column_family_id */, kDefaultColumnFamilyName,
-1 /* level */),
writer.get()));
std::unique_ptr<TableBuilder> table_builder(
options_.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, comparator, &factories,
compression_type, CompressionOptions(),
0 /* column_family_id */,
kDefaultColumnFamilyName, -1 /* level */),
writer.get()));
// Build table.
for (auto it = kv.begin(); it != kv.end(); it++) {
@ -85,35 +111,41 @@ class BlockBasedTableReaderTest
const ImmutableOptions& ioptions,
const InternalKeyComparator& comparator,
const std::string& table_name,
std::unique_ptr<BlockBasedTable>* table) {
std::unique_ptr<BlockBasedTable>* table,
bool prefetch_index_and_filter_in_cache = true,
Status* status = nullptr) {
const MutableCFOptions moptions(options_);
TableReaderOptions table_reader_options = TableReaderOptions(
ioptions, moptions.prefix_extractor, EnvOptions(), comparator);
std::unique_ptr<RandomAccessFileReader> file;
NewFileReader(table_name, foptions, &file);
uint64_t file_size = 0;
ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size));
std::unique_ptr<TableReader> table_reader;
ReadOptions ro;
const auto* table_options =
table_factory_->GetOptions<BlockBasedTableOptions>();
ASSERT_NE(table_options, nullptr);
ASSERT_OK(BlockBasedTable::Open(ro, ioptions, EnvOptions(), *table_options,
comparator, std::move(file), file_size,
&table_reader));
std::unique_ptr<TableReader> general_table;
Status s = options_.table_factory->NewTableReader(
ReadOptions(), table_reader_options, std::move(file), file_size,
&general_table, prefetch_index_and_filter_in_cache);
if (s.ok()) {
table->reset(reinterpret_cast<BlockBasedTable*>(general_table.release()));
}
table->reset(reinterpret_cast<BlockBasedTable*>(table_reader.release()));
if (status) {
*status = s;
}
}
std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
const std::shared_ptr<FileSystem>& fs() const { return fs_; }
private:
std::string test_dir_;
Env* env_;
std::shared_ptr<FileSystem> fs_;
std::unique_ptr<BlockBasedTableFactory> table_factory_;
Options options_;
private:
void WriteToFile(const std::string& content, const std::string& filename) {
std::unique_ptr<FSWritableFile> f;
ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr));
@ -146,35 +178,36 @@ class BlockBasedTableReaderTest
}
};
class BlockBasedTableReaderTest
: public BlockBasedTableReaderBaseTest,
public testing::WithParamInterface<std::tuple<
CompressionType, bool, BlockBasedTableOptions::IndexType, bool>> {
protected:
void SetUp() override {
compression_type_ = std::get<0>(GetParam());
use_direct_reads_ = std::get<1>(GetParam());
BlockBasedTableReaderBaseTest::SetUp();
}
void ConfigureTableFactory() override {
BlockBasedTableOptions opts;
opts.index_type = std::get<2>(GetParam());
opts.no_block_cache = std::get<3>(GetParam());
options_.table_factory.reset(
static_cast<BlockBasedTableFactory*>(NewBlockBasedTableFactory(opts)));
}
CompressionType compression_type_;
bool use_direct_reads_;
};
// Tests MultiGet in both direct IO and non-direct IO mode.
// The keys should be in cache after MultiGet.
TEST_P(BlockBasedTableReaderTest, MultiGet) {
// Prepare key-value pairs to occupy multiple blocks.
// Each value is 256B, every 16 pairs constitute 1 block.
// Adjacent blocks contain values with different compression complexity:
// human readable strings are easier to compress than random strings.
std::map<std::string, std::string> kv;
{
Random rnd(101);
uint32_t key = 0;
for (int block = 0; block < 100; block++) {
for (int i = 0; i < 16; i++) {
char k[9] = {0};
// Internal key is constructed directly from this key,
// and internal key size is required to be >= 8 bytes,
// so use %08u as the format string.
sprintf(k, "%08u", key);
std::string v;
if (block % 2) {
v = rnd.HumanReadableString(256);
} else {
v = rnd.RandomString(256);
}
kv[std::string(k)] = v;
key++;
}
}
}
std::map<std::string, std::string> kv =
BlockBasedTableReaderBaseTest::GenerateKVMap(
100 /* num_block */,
true /* mixed_with_human_readable_string_value */);
// Prepare keys, values, and statuses for MultiGet.
autovector<Slice, MultiGetContext::MAX_BATCH_SIZE> keys;
@ -249,6 +282,220 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) {
}
}
class BlockBasedTableReaderResOnlyCache : public CacheWrapper {
public:
explicit BlockBasedTableReaderResOnlyCache(std::shared_ptr<Cache> target)
: CacheWrapper(std::move(target)) {}
using Cache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
if (deleter == kNoopDeleterForBlockBasedTableReader) {
return target_->Insert(key, value, charge, deleter, handle, priority);
} else {
return Status::OK();
}
}
using Cache::Release;
bool Release(Handle* handle, bool force_erase = false) override {
auto deleter = GetDeleter(handle);
if (deleter == kNoopDeleterForBlockBasedTableReader) {
return target_->Release(handle, force_erase);
} else {
return true;
}
}
private:
static const Cache::DeleterFn kNoopDeleterForBlockBasedTableReader;
};
const Cache::DeleterFn
BlockBasedTableReaderResOnlyCache::kNoopDeleterForBlockBasedTableReader =
CacheReservationManagerImpl<CacheEntryRole::kBlockBasedTableReader>::
TEST_GetNoopDeleterForRole();
class BlockBasedTableReaderCapMemoryTest
: public BlockBasedTableReaderBaseTest,
public testing::WithParamInterface<
bool /* reserve_table_builder_memory */> {
protected:
static std::size_t CalculateMaxTableReaderNumBeforeCacheFull(
std::size_t cache_capacity, std::size_t approx_table_reader_mem) {
// To make calculation easier for testing
assert(cache_capacity % CacheReservationManagerImpl<
CacheEntryRole::kBlockBasedTableReader>::
GetDummyEntrySize() ==
0 &&
cache_capacity > 2 * CacheReservationManagerImpl<
CacheEntryRole::kBlockBasedTableReader>::
GetDummyEntrySize());
// We need to subtract 1 for max_num_dummy_entry to account for dummy
// entries' overhead, assumed the overhead is no greater than 1 dummy entry
// size
std::size_t max_num_dummy_entry =
(size_t)std::floor((
1.0 * cache_capacity /
CacheReservationManagerImpl<
CacheEntryRole::kBlockBasedTableReader>::GetDummyEntrySize())) -
1;
std::size_t cache_capacity_rounded_to_dummy_entry_multiples =
max_num_dummy_entry *
CacheReservationManagerImpl<
CacheEntryRole::kBlockBasedTableReader>::GetDummyEntrySize();
std::size_t max_table_reader_num = static_cast<std::size_t>(
std::floor(1.0 * cache_capacity_rounded_to_dummy_entry_multiples /
approx_table_reader_mem));
return max_table_reader_num;
}
void SetUp() override {
// To cache and re-use the same kv map and compression type in the test
// suite for elimiating variance caused by these two factors
kv_ = BlockBasedTableReaderBaseTest::GenerateKVMap();
compression_type_ = CompressionType::kNoCompression;
table_reader_res_only_cache_.reset(new BlockBasedTableReaderResOnlyCache(
NewLRUCache(6 * CacheReservationManagerImpl<
CacheEntryRole::kBlockBasedTableReader>::
GetDummyEntrySize(),
0 /* num_shard_bits */, true /* strict_capacity_limit */)));
// To ApproximateTableReaderMem() without encountering any potential errors
// caused by BlocBasedTableReader::reserve_table_reader_memory == true, we
// first turn off the feature to test
reserve_table_reader_memory_ = false;
BlockBasedTableReaderBaseTest::SetUp();
approx_table_reader_mem_ = ApproximateTableReaderMem();
// Now we condtionally turn on the feature to test
reserve_table_reader_memory_ = GetParam();
ConfigureTableFactory();
}
void ConfigureTableFactory() override {
BlockBasedTableOptions table_options;
table_options.reserve_table_reader_memory = reserve_table_reader_memory_;
table_options.block_cache = table_reader_res_only_cache_;
table_options.cache_index_and_filter_blocks = false;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
table_options.partition_filters = true;
table_options.index_type = BlockBasedTableOptions::kTwoLevelIndexSearch;
options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
}
bool reserve_table_reader_memory_;
std::shared_ptr<BlockBasedTableReaderResOnlyCache>
table_reader_res_only_cache_;
std::size_t approx_table_reader_mem_;
std::map<std::string, std::string> kv_;
CompressionType compression_type_;
private:
std::size_t ApproximateTableReaderMem() {
std::size_t approx_table_reader_mem = 0;
std::string table_name = "table_for_approx_table_reader_mem";
CreateTable(table_name, compression_type_, kv_);
std::unique_ptr<BlockBasedTable> table;
Status s;
NewBlockBasedTableReader(
FileOptions(), ImmutableOptions(options_),
InternalKeyComparator(options_.comparator), table_name, &table,
false /* prefetch_index_and_filter_in_cache */, &s);
assert(s.ok());
approx_table_reader_mem = table->ApproximateMemoryUsage();
assert(approx_table_reader_mem > 0);
return approx_table_reader_mem;
}
};
INSTANTIATE_TEST_CASE_P(CapMemoryUsageUnderCacheCapacity,
BlockBasedTableReaderCapMemoryTest, ::testing::Bool());
TEST_P(BlockBasedTableReaderCapMemoryTest, CapMemoryUsageUnderCacheCapacity) {
const std::size_t max_table_reader_num = BlockBasedTableReaderCapMemoryTest::
CalculateMaxTableReaderNumBeforeCacheFull(
table_reader_res_only_cache_->GetCapacity(),
approx_table_reader_mem_);
Status s = Status::OK();
std::size_t opened_table_reader_num = 0;
std::string table_name;
std::vector<std::unique_ptr<BlockBasedTable>> tables;
// Keep creating BlockBasedTableReader till hiting the memory limit based on
// cache capacity and creation fails or reaching a big number of table readers
while (s.ok() && opened_table_reader_num < 2 * max_table_reader_num) {
table_name = "table_" + std::to_string(opened_table_reader_num);
CreateTable(table_name, compression_type_, kv_);
tables.push_back(std::unique_ptr<BlockBasedTable>());
NewBlockBasedTableReader(
FileOptions(), ImmutableOptions(options_),
InternalKeyComparator(options_.comparator), table_name, &tables.back(),
false /* prefetch_index_and_filter_in_cache */, &s);
if (s.ok()) {
++opened_table_reader_num;
}
}
if (reserve_table_reader_memory_) {
EXPECT_TRUE(s.IsMemoryLimit() &&
opened_table_reader_num < 2 * max_table_reader_num)
<< "s: " << s.ToString() << " opened_table_reader_num: "
<< std::to_string(opened_table_reader_num);
EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
std::string::npos);
// Acceptable estimtation errors coming from
// 1. overstimate max_table_reader_num due to # dummy entries is high and
// results in metadata charge overhead greater than 1 dummy entry size
// (violating our assumption in calculating max_table_reader_nums)
// 2. overestimate/underestimate max_table_reader_num due to the gap between
// ApproximateTableReaderMem() and actual table reader mem
EXPECT_GE(opened_table_reader_num, max_table_reader_num * 0.99);
EXPECT_LE(opened_table_reader_num, max_table_reader_num * 1.01);
std::size_t updated_max_table_reader_num =
BlockBasedTableReaderCapMemoryTest::
CalculateMaxTableReaderNumBeforeCacheFull(
table_reader_res_only_cache_->GetCapacity() / 2,
approx_table_reader_mem_);
// Keep deleting BlockBasedTableReader to lower down memory usage from the
// memory limit to make the next creation succeeds
while (opened_table_reader_num >= updated_max_table_reader_num) {
tables.pop_back();
--opened_table_reader_num;
}
table_name = "table_for_successful_table_reader_open";
CreateTable(table_name, compression_type_, kv_);
tables.push_back(std::unique_ptr<BlockBasedTable>());
NewBlockBasedTableReader(
FileOptions(), ImmutableOptions(options_),
InternalKeyComparator(options_.comparator), table_name, &tables.back(),
false /* prefetch_index_and_filter_in_cache */, &s);
EXPECT_TRUE(s.ok()) << s.ToString();
tables.clear();
EXPECT_EQ(table_reader_res_only_cache_->GetPinnedUsage(), 0);
} else {
EXPECT_TRUE(s.ok() && opened_table_reader_num == 2 * max_table_reader_num)
<< "s: " << s.ToString() << " opened_table_reader_num: "
<< std::to_string(opened_table_reader_num);
EXPECT_EQ(table_reader_res_only_cache_->GetPinnedUsage(), 0);
}
}
class BlockBasedTableReaderTestVerifyChecksum
: public BlockBasedTableReaderTest {
public:
@ -256,27 +503,8 @@ class BlockBasedTableReaderTestVerifyChecksum
};
TEST_P(BlockBasedTableReaderTestVerifyChecksum, ChecksumMismatch) {
// Prepare key-value pairs to occupy multiple blocks.
// Each value is 256B, every 16 pairs constitute 1 block.
// Adjacent blocks contain values with different compression complexity:
// human readable strings are easier to compress than random strings.
Random rnd(101);
std::map<std::string, std::string> kv;
{
uint32_t key = 0;
for (int block = 0; block < 800; block++) {
for (int i = 0; i < 16; i++) {
char k[9] = {0};
// Internal key is constructed directly from this key,
// and internal key size is required to be >= 8 bytes,
// so use %08u as the format string.
sprintf(k, "%08u", key);
std::string v = rnd.RandomString(256);
kv[std::string(k)] = v;
key++;
}
}
}
std::map<std::string, std::string> kv =
BlockBasedTableReaderBaseTest::GenerateKVMap(800 /* num_block */);
std::string table_name =
"BlockBasedTableReaderTest" + CompressionTypeToString(compression_type_);

@ -91,11 +91,9 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
kUint64tHashEntryCacheResBucketSize) ==
kUint64tHashEntryCacheResBucketSize / 2)) {
hash_entries_info_.cache_res_bucket_handles.emplace_back(nullptr);
Status s =
cache_res_mgr_
->MakeCacheReservation<CacheEntryRole::kFilterConstruction>(
kUint64tHashEntryCacheResBucketSize * sizeof(hash),
&hash_entries_info_.cache_res_bucket_handles.back());
Status s = cache_res_mgr_->MakeCacheReservation(
kUint64tHashEntryCacheResBucketSize * sizeof(hash),
&hash_entries_info_.cache_res_bucket_handles.back());
s.PermitUncheckedError();
}
}
@ -113,7 +111,9 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
// Number of hash entries to accumulate before charging their memory usage to
// the cache when cache reservation is available
static const std::size_t kUint64tHashEntryCacheResBucketSize =
CacheReservationManager::GetDummyEntrySize() / sizeof(uint64_t);
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() /
sizeof(uint64_t);
// For delegating between XXPH3FilterBitsBuilders
void SwapEntriesWith(XXPH3FilterBitsBuilder* other) {
@ -259,8 +259,7 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
// For managing cache reservation for final filter in (new) Bloom and Ribbon
// Filter construction
std::deque<std::unique_ptr<
CacheReservationHandle<CacheEntryRole::kFilterConstruction>>>
std::deque<std::unique_ptr<CacheReservationManager::CacheReservationHandle>>
final_filter_cache_res_handles_;
bool detect_filter_construct_corruption_;
@ -274,8 +273,7 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
// it manages cache reservation for buckets of hash entries in (new) Bloom
// or Ribbon Filter construction.
// Otherwise, it is empty.
std::deque<std::unique_ptr<
CacheReservationHandle<CacheEntryRole::kFilterConstruction>>>
std::deque<std::unique_ptr<CacheReservationManager::CacheReservationHandle>>
cache_res_bucket_handles;
// If detect_filter_construct_corruption_ == true,
@ -336,17 +334,14 @@ class FastLocalBloomBitsBuilder : public XXPH3FilterBitsBuilder {
size_t len_with_metadata = CalculateSpace(num_entries);
std::unique_ptr<char[]> mutable_buf;
std::unique_ptr<CacheReservationHandle<CacheEntryRole::kFilterConstruction>>
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
final_filter_cache_res_handle;
len_with_metadata =
AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf);
// Cache reservation for mutable_buf
if (cache_res_mgr_) {
Status s =
cache_res_mgr_
->MakeCacheReservation<CacheEntryRole::kFilterConstruction>(
len_with_metadata * sizeof(char),
&final_filter_cache_res_handle);
Status s = cache_res_mgr_->MakeCacheReservation(
len_with_metadata * sizeof(char), &final_filter_cache_res_handle);
s.PermitUncheckedError();
}
@ -661,13 +656,11 @@ class Standard128RibbonBitsBuilder : public XXPH3FilterBitsBuilder {
Status status_banding_cache_res = Status::OK();
// Cache reservation for banding
std::unique_ptr<CacheReservationHandle<CacheEntryRole::kFilterConstruction>>
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
banding_res_handle;
if (cache_res_mgr_) {
status_banding_cache_res =
cache_res_mgr_
->MakeCacheReservation<CacheEntryRole::kFilterConstruction>(
bytes_banding, &banding_res_handle);
status_banding_cache_res = cache_res_mgr_->MakeCacheReservation(
bytes_banding, &banding_res_handle);
}
if (status_banding_cache_res.IsIncomplete()) {
@ -720,17 +713,14 @@ class Standard128RibbonBitsBuilder : public XXPH3FilterBitsBuilder {
assert(seed < 256);
std::unique_ptr<char[]> mutable_buf;
std::unique_ptr<CacheReservationHandle<CacheEntryRole::kFilterConstruction>>
std::unique_ptr<CacheReservationManager::CacheReservationHandle>
final_filter_cache_res_handle;
len_with_metadata =
AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf);
// Cache reservation for mutable_buf
if (cache_res_mgr_) {
Status s =
cache_res_mgr_
->MakeCacheReservation<CacheEntryRole::kFilterConstruction>(
len_with_metadata * sizeof(char),
&final_filter_cache_res_handle);
Status s = cache_res_mgr_->MakeCacheReservation(
len_with_metadata * sizeof(char), &final_filter_cache_res_handle);
s.PermitUncheckedError();
}
@ -1498,7 +1488,8 @@ FilterBitsBuilder* BloomLikeFilterPolicy::GetFastLocalBloomBuilderWithContext(
context.table_options.block_cache);
std::shared_ptr<CacheReservationManager> cache_res_mgr;
if (reserve_filter_construction_mem) {
cache_res_mgr = std::make_shared<CacheReservationManager>(
cache_res_mgr = std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>>(
context.table_options.block_cache);
}
return new FastLocalBloomBitsBuilder(
@ -1539,7 +1530,8 @@ BloomLikeFilterPolicy::GetStandard128RibbonBuilderWithContext(
context.table_options.block_cache);
std::shared_ptr<CacheReservationManager> cache_res_mgr;
if (reserve_filter_construction_mem) {
cache_res_mgr = std::make_shared<CacheReservationManager>(
cache_res_mgr = std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>>(
context.table_options.block_cache);
}
return new Standard128RibbonBitsBuilder(

@ -5,6 +5,7 @@
#include "rocksdb/table_properties.h"
#include "port/malloc.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/unique_id.h"
@ -213,6 +214,36 @@ TableProperties::GetAggregatablePropertiesAsMap() const {
return rv;
}
// WARNING: manual update to this function is needed
// whenever a new string property is added to TableProperties
// to reduce approximation error.
//
// TODO: eliminate the need of manually updating this function
// for new string properties
std::size_t TableProperties::ApproximateMemoryUsage() const {
std::size_t usage = 0;
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size((void*)this);
#else
usage += sizeof(*this);
#endif // ROCKSDB_MALLOC_USABLE_SIZE
std::size_t string_props_mem_usage =
db_id.size() + db_session_id.size() + db_host_id.size() +
column_family_name.size() + filter_policy_name.size() +
comparator_name.size() + merge_operator_name.size() +
prefix_extractor_name.size() + property_collectors_names.size() +
compression_name.size() + compression_options.size();
usage += string_props_mem_usage;
for (auto iter = user_collected_properties.begin();
iter != user_collected_properties.end(); ++iter) {
usage += (iter->first.size() + iter->second.size());
}
return usage;
}
const std::string TablePropertiesNames::kDbId = "rocksdb.creating.db.identity";
const std::string TablePropertiesNames::kDbSessionId =
"rocksdb.creating.session.identity";

@ -1125,6 +1125,11 @@ DEFINE_bool(async_io, false,
"When set true, RocksDB does asynchronous reads for internal auto "
"readahead prefetching.");
DEFINE_bool(reserve_table_reader_memory, false,
"A dynamically updating charge to block cache, loosely based on "
"the actual memory usage of table reader, will occur to account "
"the memory, if block cache available.");
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
const char* ctype) {
assert(ctype);
@ -4049,6 +4054,8 @@ class Benchmark {
true;
}
block_based_options.block_cache = cache_;
block_based_options.reserve_table_reader_memory =
FLAGS_reserve_table_reader_memory;
block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.block_size = FLAGS_block_size;
block_based_options.block_restart_interval = FLAGS_block_restart_interval;

@ -41,6 +41,7 @@ default_params = {
random.lognormvariate(2.3, 1.3)]),
"cache_index_and_filter_blocks": lambda: random.randint(0, 1),
"cache_size": 8388608,
"reserve_table_reader_memory": lambda: random.choice([0, 1]),
"checkpoint_one_in": 1000000,
"compression_type": lambda: random.choice(
["none", "snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]),

@ -621,7 +621,8 @@ TEST_P(FullBloomTest, OptimizeForMemory) {
TEST(FullBloomFilterConstructionReserveMemTest,
RibbonFilterFallBackOnLargeBanding) {
constexpr std::size_t kCacheCapacity =
8 * CacheReservationManager::GetDummyEntrySize();
8 * CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize();
constexpr std::size_t num_entries_for_cache_full = kCacheCapacity / 8;
for (bool reserve_builder_mem : {true, false}) {
@ -662,12 +663,19 @@ TEST(FullBloomFilterConstructionReserveMemTest,
if (reserve_builder_mem) {
const size_t dummy_entry_num = static_cast<std::size_t>(std::ceil(
filter.size() * 1.0 / CacheReservationManager::GetDummyEntrySize()));
EXPECT_GE(cache->GetPinnedUsage(),
dummy_entry_num * CacheReservationManager::GetDummyEntrySize());
filter.size() * 1.0 /
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize()));
EXPECT_GE(
cache->GetPinnedUsage(),
dummy_entry_num *
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize());
EXPECT_LT(
cache->GetPinnedUsage(),
(dummy_entry_num + 1) * CacheReservationManager::GetDummyEntrySize());
(dummy_entry_num + 1) *
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize());
} else {
EXPECT_EQ(cache->GetPinnedUsage(), 0);
}

Loading…
Cancel
Save