Rewrite memory-charging feature's option API (#9926)

Summary:
**Context:**
Previous PR https://github.com/facebook/rocksdb/pull/9748, https://github.com/facebook/rocksdb/pull/9073, https://github.com/facebook/rocksdb/pull/8428 added separate flag for each charged memory area. Such API design is not scalable as we charge more and more memory areas. Also, we foresee an opportunity to consolidate this feature with other cache usage related features such as `cache_index_and_filter_blocks` using `CacheEntryRole`.

Therefore we decided to consolidate all these flags with `CacheUsageOptions cache_usage_options` and this PR serves as the first step by consolidating memory-charging related flags.

**Summary:**
- Replaced old API reference with new ones, including making `kCompressionDictionaryBuildingBuffer` opt-out and added a unit test for that
- Added missing db bench/stress test for some memory charging features
- Renamed related test suite to indicate they are under the same theme of memory charging
- Refactored a commonly used mocked cache component in memory charging related tests to reduce code duplication
- Replaced the phrases "memory tracking" / "cache reservation" (other than CacheReservationManager-related ones) with "memory charging" for standard description of this feature.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9926

Test Plan:
- New unit test for opt-out `kCompressionDictionaryBuildingBuffer` `TEST_F(ChargeCompressionDictionaryBuildingBufferTest, Basic)`
- New unit test for option validation/sanitization `TEST_F(CacheUsageOptionsOverridesTest, SanitizeAndValidateOptions)`
- CI
- db bench (in case querying new options introduces regression) **+0.5% micros/op**: `TEST_TMPDIR=/dev/shm/testdb ./db_bench -benchmarks=fillseq -db=$TEST_TMPDIR  -charge_compression_dictionary_building_buffer=1(remove this for comparison)  -compression_max_dict_bytes=10000 -disable_auto_compactions=1 -write_buffer_size=100000 -num=4000000 | egrep 'fillseq'`

#-run | (pre-PR) avg micros/op | std micros/op | (post-PR)  micros/op | std micros/op | change (%)
-- | -- | -- | -- | -- | --
10 | 3.9711 | 0.264408 | 3.9914 | 0.254563 | 0.5111933721
20 | 3.83905 | 0.0664488 | 3.8251 | 0.0695456 | **-0.3633711465**
40 | 3.86625 | 0.136669 | 3.8867 | 0.143765 | **0.5289363078**

- db_stress: `python3 tools/db_crashtest.py blackbox  -charge_compression_dictionary_building_buffer=1 -charge_filter_construction=1 -charge_table_reader=1 -cache_size=1` killed as normal

Reviewed By: ajkr

Differential Revision: D36054712

Pulled By: hx235

fbshipit-source-id: d406e90f5e0c5ea4dbcb585a484ad9302d4302af
main
Hui Xiao 3 years ago committed by Facebook GitHub Bot
parent f6339de0d2
commit 3573558ec5
  1. 1
      HISTORY.md
  2. 212
      db/db_bloom_filter_test.cc
  3. 15
      db/db_sst_test.cc
  4. 58
      db/db_test_util.cc
  5. 45
      db/db_test_util.h
  6. 4
      db_stress_tool/db_stress_common.h
  7. 18
      db_stress_tool/db_stress_gflags.cc
  8. 17
      db_stress_tool/db_stress_test_base.cc
  9. 10
      include/rocksdb/cache.h
  10. 2
      include/rocksdb/options.h
  11. 131
      include/rocksdb/table.h
  12. 2
      memtable/write_buffer_manager.cc
  13. 8
      memtable/write_buffer_manager_test.cc
  14. 7
      options/options_settable_test.cc
  15. 4
      options/options_test.cc
  16. 18
      table/block_based/block_based_table_builder.cc
  17. 64
      table/block_based/block_based_table_factory.cc
  18. 6
      table/block_based/block_based_table_reader.cc
  19. 114
      table/block_based/block_based_table_reader_test.cc
  20. 2
      table/block_based/filter_block.h
  21. 46
      table/block_based/filter_policy.cc
  22. 225
      table/table_test.cc
  23. 35
      tools/db_bench_tool.cc
  24. 4
      tools/db_crashtest.py
  25. 20
      util/bloom_test.cc
  26. 16
      util/filter_bench.cc

@ -18,6 +18,7 @@
* Renamed CompactionFilter::Decision::kRemoveWithSingleDelete to kPurge since the latter sounds more general and hides the implementation details of how compaction iterator handles keys. * Renamed CompactionFilter::Decision::kRemoveWithSingleDelete to kPurge since the latter sounds more general and hides the implementation details of how compaction iterator handles keys.
* Added ability to specify functions for Prepare and Validate to OptionsTypeInfo. Added methods to OptionTypeInfo to set the functions via an API. These methods are intended for RocksDB plugin developers for configuration management. * Added ability to specify functions for Prepare and Validate to OptionsTypeInfo. Added methods to OptionTypeInfo to set the functions via an API. These methods are intended for RocksDB plugin developers for configuration management.
* Added a new immutable db options, enforce_single_del_contracts. If set to false (default is true), compaction will NOT fail due to a single delete followed by a delete for the same key. The purpose of this temporay option is to help existing use cases migrate. * Added a new immutable db options, enforce_single_del_contracts. If set to false (default is true), compaction will NOT fail due to a single delete followed by a delete for the same key. The purpose of this temporay option is to help existing use cases migrate.
* Introduce `BlockBasedTableOptions::cache_usage_options` and use that to replace `BlockBasedTableOptions::reserve_table_builder_memory` and `BlockBasedTableOptions::reserve_table_reader_memory`.
### Bug Fixes ### Bug Fixes
* RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue. * RocksDB calls FileSystem::Poll API during FilePrefetchBuffer destruction which impacts performance as it waits for read requets completion which is not needed anymore. Calling FileSystem::AbortIO to abort those requests instead fixes that performance issue.

@ -886,103 +886,27 @@ TEST_F(DBBloomFilterTest, BloomFilterCompatibility) {
} }
} }
/*
* A cache wrapper that tracks peaks and increments of filter
* construction cache reservation.
* p0
* / \ p1
* / \ /\
* / \/ \
* a / b \
* peaks = {p0, p1}
* increments = {p1-a, p2-b}
*/
class FilterConstructResPeakTrackingCache : public CacheWrapper {
public:
explicit FilterConstructResPeakTrackingCache(std::shared_ptr<Cache> target)
: CacheWrapper(std::move(target)),
cur_cache_res_(0),
cache_res_peak_(0),
cache_res_increment_(0),
last_peak_tracked_(false),
cache_res_increments_sum_(0) {}
using Cache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
Status s = target_->Insert(key, value, charge, deleter, handle, priority);
if (deleter == kNoopDeleterForFilterConstruction) {
if (last_peak_tracked_) {
cache_res_peak_ = 0;
cache_res_increment_ = 0;
last_peak_tracked_ = false;
}
cur_cache_res_ += charge;
cache_res_peak_ = std::max(cache_res_peak_, cur_cache_res_);
cache_res_increment_ += charge;
}
return s;
}
using Cache::Release;
bool Release(Handle* handle, bool erase_if_last_ref = false) override {
auto deleter = GetDeleter(handle);
if (deleter == kNoopDeleterForFilterConstruction) {
if (!last_peak_tracked_) {
cache_res_peaks_.push_back(cache_res_peak_);
cache_res_increments_sum_ += cache_res_increment_;
last_peak_tracked_ = true;
}
cur_cache_res_ -= GetCharge(handle);
}
bool is_successful = target_->Release(handle, erase_if_last_ref);
return is_successful;
}
std::deque<std::size_t> GetReservedCachePeaks() { return cache_res_peaks_; }
std::size_t GetReservedCacheIncrementSum() {
return cache_res_increments_sum_;
}
private:
static const Cache::DeleterFn kNoopDeleterForFilterConstruction;
std::size_t cur_cache_res_;
std::size_t cache_res_peak_;
std::size_t cache_res_increment_;
bool last_peak_tracked_;
std::deque<std::size_t> cache_res_peaks_;
std::size_t cache_res_increments_sum_;
};
const Cache::DeleterFn
FilterConstructResPeakTrackingCache::kNoopDeleterForFilterConstruction =
CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::TEST_GetNoopDeleterForRole();
// To align with the type of hash entry being reserved in implementation. // To align with the type of hash entry being reserved in implementation.
using FilterConstructionReserveMemoryHash = uint64_t; using FilterConstructionReserveMemoryHash = uint64_t;
class DBFilterConstructionReserveMemoryTestWithParam class ChargeFilterConstructionTestWithParam
: public DBTestBase, : public DBTestBase,
public testing::WithParamInterface< public testing::WithParamInterface<std::tuple<
std::tuple<bool, std::string, bool, bool>> { CacheEntryRoleOptions::Decision, std::string, bool, bool>> {
public: public:
DBFilterConstructionReserveMemoryTestWithParam() ChargeFilterConstructionTestWithParam()
: DBTestBase("db_bloom_filter_tests", : DBTestBase("db_bloom_filter_tests",
/*env_do_fsync=*/true), /*env_do_fsync=*/true),
num_key_(0), num_key_(0),
reserve_table_builder_memory_(std::get<0>(GetParam())), charge_filter_construction_(std::get<0>(GetParam())),
policy_(std::get<1>(GetParam())), policy_(std::get<1>(GetParam())),
partition_filters_(std::get<2>(GetParam())), partition_filters_(std::get<2>(GetParam())),
detect_filter_construct_corruption_(std::get<3>(GetParam())) { detect_filter_construct_corruption_(std::get<3>(GetParam())) {
if (!reserve_table_builder_memory_ || policy_ == kDeprecatedBlock || if (charge_filter_construction_ ==
policy_ == kLegacyBloom) { CacheEntryRoleOptions::Decision::kDisabled ||
policy_ == kDeprecatedBlock || policy_ == kLegacyBloom) {
// For these cases, we only interested in whether filter construction // For these cases, we only interested in whether filter construction
// cache resevation happens instead of its accuracy. Therefore we don't // cache charging happens instead of its accuracy. Therefore we don't
// need many keys. // need many keys.
num_key_ = 5; num_key_ = 5;
} else if (partition_filters_) { } else if (partition_filters_) {
@ -997,11 +921,11 @@ class DBFilterConstructionReserveMemoryTestWithParam
sizeof(FilterConstructionReserveMemoryHash); sizeof(FilterConstructionReserveMemoryHash);
} else if (policy_ == kFastLocalBloom) { } else if (policy_ == kFastLocalBloom) {
// For Bloom Filter + FullFilter case, since we design the num_key_ to // For Bloom Filter + FullFilter case, since we design the num_key_ to
// make hash entry cache reservation be a multiple of dummy entries, the // make hash entry cache charging be a multiple of dummy entries, the
// correct behavior of charging final filter on top of it will trigger at // correct behavior of charging final filter on top of it will trigger at
// least another dummy entry insertion. Therefore we can assert that // least another dummy entry insertion. Therefore we can assert that
// behavior and we don't need a large number of keys to verify we // behavior and we don't need a large number of keys to verify we
// indeed charge the final filter for cache reservation, even though final // indeed charge the final filter for in cache, even though final
// filter is a lot smaller than hash entries. // filter is a lot smaller than hash entries.
num_key_ = 1 * num_key_ = 1 *
CacheReservationManagerImpl< CacheReservationManagerImpl<
@ -1011,7 +935,7 @@ class DBFilterConstructionReserveMemoryTestWithParam
// For Ribbon Filter + FullFilter case, we need a large enough number of // For Ribbon Filter + FullFilter case, we need a large enough number of
// keys so that charging final filter after releasing the hash entries // keys so that charging final filter after releasing the hash entries
// reservation will trigger at least another dummy entry (or equivalently // reservation will trigger at least another dummy entry (or equivalently
// to saying, causing another peak in cache reservation) as banding // to saying, causing another peak in cache charging) as banding
// reservation might not be a multiple of dummy entry. // reservation might not be a multiple of dummy entry.
num_key_ = 12 * num_key_ = 12 *
CacheReservationManagerImpl< CacheReservationManagerImpl<
@ -1027,7 +951,9 @@ class DBFilterConstructionReserveMemoryTestWithParam
// calculation. // calculation.
constexpr std::size_t kCacheCapacity = 100 * 1024 * 1024; constexpr std::size_t kCacheCapacity = 100 * 1024 * 1024;
table_options.reserve_table_builder_memory = reserve_table_builder_memory_; table_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kFilterConstruction,
{/*.charged = */ charge_filter_construction_}});
table_options.filter_policy = Create(10, policy_); table_options.filter_policy = Create(10, policy_);
table_options.partition_filters = partition_filters_; table_options.partition_filters = partition_filters_;
if (table_options.partition_filters) { if (table_options.partition_filters) {
@ -1045,7 +971,8 @@ class DBFilterConstructionReserveMemoryTestWithParam
lo.capacity = kCacheCapacity; lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard lo.num_shard_bits = 0; // 2^0 shard
lo.strict_capacity_limit = true; lo.strict_capacity_limit = true;
cache_ = std::make_shared<FilterConstructResPeakTrackingCache>( cache_ = std::make_shared<
TargetCacheChargeTrackingCache<CacheEntryRole::kFilterConstruction>>(
(NewLRUCache(lo))); (NewLRUCache(lo)));
table_options.block_cache = cache_; table_options.block_cache = cache_;
@ -1054,56 +981,73 @@ class DBFilterConstructionReserveMemoryTestWithParam
std::size_t GetNumKey() { return num_key_; } std::size_t GetNumKey() { return num_key_; }
bool ReserveTableBuilderMemory() { return reserve_table_builder_memory_; } CacheEntryRoleOptions::Decision ChargeFilterConstructMemory() {
return charge_filter_construction_;
}
std::string GetFilterPolicy() { return policy_; } std::string GetFilterPolicy() { return policy_; }
bool PartitionFilters() { return partition_filters_; } bool PartitionFilters() { return partition_filters_; }
std::shared_ptr<FilterConstructResPeakTrackingCache> std::shared_ptr<
GetFilterConstructResPeakTrackingCache() { TargetCacheChargeTrackingCache<CacheEntryRole::kFilterConstruction>>
GetCache() {
return cache_; return cache_;
} }
private: private:
std::size_t num_key_; std::size_t num_key_;
bool reserve_table_builder_memory_; CacheEntryRoleOptions::Decision charge_filter_construction_;
std::string policy_; std::string policy_;
bool partition_filters_; bool partition_filters_;
std::shared_ptr<FilterConstructResPeakTrackingCache> cache_; std::shared_ptr<
TargetCacheChargeTrackingCache<CacheEntryRole::kFilterConstruction>>
cache_;
bool detect_filter_construct_corruption_; bool detect_filter_construct_corruption_;
}; };
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(
DBFilterConstructionReserveMemoryTestWithParam, ChargeFilterConstructionTestWithParam,
DBFilterConstructionReserveMemoryTestWithParam, ChargeFilterConstructionTestWithParam,
::testing::Values(std::make_tuple(false, kFastLocalBloom, false, false), ::testing::Values(
std::make_tuple(CacheEntryRoleOptions::Decision::kDisabled,
std::make_tuple(true, kFastLocalBloom, false, false), kFastLocalBloom, false, false),
std::make_tuple(true, kFastLocalBloom, false, true),
std::make_tuple(true, kFastLocalBloom, true, false), std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
std::make_tuple(true, kFastLocalBloom, true, true), kFastLocalBloom, false, false),
std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
std::make_tuple(true, kStandard128Ribbon, false, false), kFastLocalBloom, false, true),
std::make_tuple(true, kStandard128Ribbon, false, true), std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
std::make_tuple(true, kStandard128Ribbon, true, false), kFastLocalBloom, true, false),
std::make_tuple(true, kStandard128Ribbon, true, true), std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
kFastLocalBloom, true, true),
std::make_tuple(true, kDeprecatedBlock, false, false),
std::make_tuple(true, kLegacyBloom, false, false))); std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
kStandard128Ribbon, false, false),
std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
kStandard128Ribbon, false, true),
std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
kStandard128Ribbon, true, false),
std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
kStandard128Ribbon, true, true),
std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled,
kDeprecatedBlock, false, false),
std::make_tuple(CacheEntryRoleOptions::Decision::kEnabled, kLegacyBloom,
false, false)));
// TODO: Speed up this test, and reduce disk space usage (~700MB) // TODO: Speed up this test, and reduce disk space usage (~700MB)
// The current test inserts many keys (on the scale of dummy entry size) // The current test inserts many keys (on the scale of dummy entry size)
// in order to make small memory user (e.g, final filter, partitioned hash // in order to make small memory user (e.g, final filter, partitioned hash
// entries/filter/banding) , which is proportional to the number of // entries/filter/banding) , which is proportional to the number of
// keys, big enough so that its cache reservation triggers dummy entry insertion // keys, big enough so that its cache charging triggers dummy entry insertion
// and becomes observable in the test. // and becomes observable in the test.
// //
// However, inserting that many keys slows down this test and leaves future // However, inserting that many keys slows down this test and leaves future
// developers an opportunity to speed it up. // developers an opportunity to speed it up.
// //
// Possible approaches & challenges: // Possible approaches & challenges:
// 1. Use sync point during cache reservation of filter construction // 1. Use sync point during cache charging of filter construction
// //
// Benefit: It does not rely on triggering dummy entry insertion // Benefit: It does not rely on triggering dummy entry insertion
// but the sync point to verify small memory user is charged correctly. // but the sync point to verify small memory user is charged correctly.
@ -1112,7 +1056,7 @@ INSTANTIATE_TEST_CASE_P(
// //
// 2. Make dummy entry size configurable and set it small in the test // 2. Make dummy entry size configurable and set it small in the test
// //
// Benefit: It increases the precision of cache reservation and therefore // Benefit: It increases the precision of cache charging and therefore
// small memory usage can still trigger insertion of dummy entry. // small memory usage can still trigger insertion of dummy entry.
// //
// Challenge: change CacheReservationManager related APIs and a hack // Challenge: change CacheReservationManager related APIs and a hack
@ -1120,16 +1064,17 @@ INSTANTIATE_TEST_CASE_P(
// CacheReservationManager used in filter construction for testing // CacheReservationManager used in filter construction for testing
// since CacheReservationManager is not exposed at the high level. // since CacheReservationManager is not exposed at the high level.
// //
TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) { TEST_P(ChargeFilterConstructionTestWithParam, Basic) {
Options options = CurrentOptions(); Options options = CurrentOptions();
// We set write_buffer_size big enough so that in the case where there is // We set write_buffer_size big enough so that in the case where there is
// filter construction cache reservation, flush won't be triggered before we // filter construction cache charging, flush won't be triggered before we
// manually trigger it for clean testing // manually trigger it for clean testing
options.write_buffer_size = 640 << 20; options.write_buffer_size = 640 << 20;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
std::shared_ptr<FilterConstructResPeakTrackingCache> cache = std::shared_ptr<
GetFilterConstructResPeakTrackingCache(); TargetCacheChargeTrackingCache<CacheEntryRole::kFilterConstruction>>
cache = GetCache();
options.create_if_missing = true; options.create_if_missing = true;
// Disable auto compaction to prevent its unexpected side effect // Disable auto compaction to prevent its unexpected side effect
// to the number of keys per partition designed by us in the test // to the number of keys per partition designed by us in the test
@ -1140,32 +1085,33 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) {
ASSERT_OK(Put(Key(i), Key(i))); ASSERT_OK(Put(Key(i), Key(i)));
} }
ASSERT_EQ(cache->GetReservedCacheIncrementSum(), 0) ASSERT_EQ(cache->GetChargedCacheIncrementSum(), 0)
<< "Flush was triggered too early in the test case with filter " << "Flush was triggered too early in the test case with filter "
"construction cache reservation - please make sure no flush triggered " "construction cache charging - please make sure no flush triggered "
"during the key insertions above"; "during the key insertions above";
ASSERT_OK(Flush()); ASSERT_OK(Flush());
bool reserve_table_builder_memory = ReserveTableBuilderMemory(); bool charge_filter_construction = (ChargeFilterConstructMemory() ==
CacheEntryRoleOptions::Decision::kEnabled);
std::string policy = GetFilterPolicy(); std::string policy = GetFilterPolicy();
bool partition_filters = PartitionFilters(); bool partition_filters = PartitionFilters();
bool detect_filter_construct_corruption = bool detect_filter_construct_corruption =
table_options.detect_filter_construct_corruption; table_options.detect_filter_construct_corruption;
std::deque<std::size_t> filter_construction_cache_res_peaks = std::deque<std::size_t> filter_construction_cache_res_peaks =
cache->GetReservedCachePeaks(); cache->GetChargedCachePeaks();
std::size_t filter_construction_cache_res_increments_sum = std::size_t filter_construction_cache_res_increments_sum =
cache->GetReservedCacheIncrementSum(); cache->GetChargedCacheIncrementSum();
if (!reserve_table_builder_memory) { if (!charge_filter_construction) {
EXPECT_EQ(filter_construction_cache_res_peaks.size(), 0); EXPECT_EQ(filter_construction_cache_res_peaks.size(), 0);
return; return;
} }
if (policy == kDeprecatedBlock || policy == kLegacyBloom) { if (policy == kDeprecatedBlock || policy == kLegacyBloom) {
EXPECT_EQ(filter_construction_cache_res_peaks.size(), 0) EXPECT_EQ(filter_construction_cache_res_peaks.size(), 0)
<< "There shouldn't be filter construction cache reservation as this " << "There shouldn't be filter construction cache charging as this "
"feature does not support kDeprecatedBlock " "feature does not support kDeprecatedBlock "
"nor kLegacyBloom"; "nor kLegacyBloom";
return; return;
@ -1239,14 +1185,14 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) {
*/ */
if (!partition_filters) { if (!partition_filters) {
EXPECT_EQ(filter_construction_cache_res_peaks.size(), 1) EXPECT_EQ(filter_construction_cache_res_peaks.size(), 1)
<< "Filter construction cache reservation should have only 1 peak in " << "Filter construction cache charging should have only 1 peak in "
"case: kFastLocalBloom + FullFilter"; "case: kFastLocalBloom + FullFilter";
std::size_t filter_construction_cache_res_peak = std::size_t filter_construction_cache_res_peak =
filter_construction_cache_res_peaks[0]; filter_construction_cache_res_peaks[0];
EXPECT_GT(filter_construction_cache_res_peak, EXPECT_GT(filter_construction_cache_res_peak,
predicted_hash_entries_cache_res) predicted_hash_entries_cache_res)
<< "The testing number of hash entries is designed to make hash " << "The testing number of hash entries is designed to make hash "
"entries cache reservation be multiples of dummy entries" "entries cache charging be multiples of dummy entries"
" so the correct behavior of charging final filter on top of it" " so the correct behavior of charging final filter on top of it"
" should've triggered at least another dummy entry insertion"; " should've triggered at least another dummy entry insertion";
@ -1259,7 +1205,7 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) {
return; return;
} else { } else {
EXPECT_GE(filter_construction_cache_res_peaks.size(), 2) EXPECT_GE(filter_construction_cache_res_peaks.size(), 2)
<< "Filter construction cache reservation should have multiple peaks " << "Filter construction cache charging should have multiple peaks "
"in case: kFastLocalBloom + " "in case: kFastLocalBloom + "
"PartitionedFilter"; "PartitionedFilter";
std::size_t predicted_filter_construction_cache_res_increments_sum = std::size_t predicted_filter_construction_cache_res_increments_sum =
@ -1366,11 +1312,11 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) {
CacheReservationManagerImpl< CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize()), CacheEntryRole::kFilterConstruction>::GetDummyEntrySize()),
1) 1)
<< "Final filter cache reservation too small for this test - please " << "Final filter cache charging too small for this test - please "
"increase the number of keys"; "increase the number of keys";
if (!detect_filter_construct_corruption) { if (!detect_filter_construct_corruption) {
EXPECT_EQ(filter_construction_cache_res_peaks.size(), 2) EXPECT_EQ(filter_construction_cache_res_peaks.size(), 2)
<< "Filter construction cache reservation should have 2 peaks in " << "Filter construction cache charging should have 2 peaks in "
"case: kStandard128Ribbon + " "case: kStandard128Ribbon + "
"FullFilter. " "FullFilter. "
"The second peak is resulted from charging the final filter " "The second peak is resulted from charging the final filter "
@ -1389,7 +1335,7 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) {
predicted_filter_construction_cache_res_peak * 1.1); predicted_filter_construction_cache_res_peak * 1.1);
} else { } else {
EXPECT_EQ(filter_construction_cache_res_peaks.size(), 1) EXPECT_EQ(filter_construction_cache_res_peaks.size(), 1)
<< "Filter construction cache reservation should have 1 peaks in " << "Filter construction cache charging should have 1 peaks in "
"case: kStandard128Ribbon + FullFilter " "case: kStandard128Ribbon + FullFilter "
"+ detect_filter_construct_corruption. " "+ detect_filter_construct_corruption. "
"The previous second peak now disappears since we don't " "The previous second peak now disappears since we don't "
@ -1410,13 +1356,13 @@ TEST_P(DBFilterConstructionReserveMemoryTestWithParam, ReserveMemory) {
} else { } else {
if (!detect_filter_construct_corruption) { if (!detect_filter_construct_corruption) {
EXPECT_GE(filter_construction_cache_res_peaks.size(), 3) EXPECT_GE(filter_construction_cache_res_peaks.size(), 3)
<< "Filter construction cache reservation should have more than 3 " << "Filter construction cache charging should have more than 3 "
"peaks " "peaks "
"in case: kStandard128Ribbon + " "in case: kStandard128Ribbon + "
"PartitionedFilter"; "PartitionedFilter";
} else { } else {
EXPECT_GE(filter_construction_cache_res_peaks.size(), 2) EXPECT_GE(filter_construction_cache_res_peaks.size(), 2)
<< "Filter construction cache reservation should have more than 2 " << "Filter construction cache charging should have more than 2 "
"peaks " "peaks "
"in case: kStandard128Ribbon + " "in case: kStandard128Ribbon + "
"PartitionedFilter + detect_filter_construct_corruption"; "PartitionedFilter + detect_filter_construct_corruption";

@ -1448,7 +1448,9 @@ TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
} }
TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) { TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) {
for (bool reserve_table_builder_memory : {true, false}) { for (CacheEntryRoleOptions::Decision charge_table_reader :
{CacheEntryRoleOptions::Decision::kEnabled,
CacheEntryRoleOptions::Decision::kDisabled}) {
// Open DB with infinite max open files // Open DB with infinite max open files
// - First iteration use 1 thread to open files // - First iteration use 1 thread to open files
// - Second iteration use 5 threads to open files // - Second iteration use 5 threads to open files
@ -1488,7 +1490,9 @@ TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) {
} }
Close(); Close();
table_options.reserve_table_reader_memory = reserve_table_builder_memory; table_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlockBasedTableReader,
{/*.charged = */ charge_table_reader}});
table_options.block_cache = table_options.block_cache =
NewLRUCache(1024 /* capacity */, 0 /* num_shard_bits */, NewLRUCache(1024 /* capacity */, 0 /* num_shard_bits */,
true /* strict_capacity_limit */); true /* strict_capacity_limit */);
@ -1497,8 +1501,13 @@ TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFilesSubjectToMemoryLimit) {
// Reopening the DB will try to load all existing files, conditionally // Reopening the DB will try to load all existing files, conditionally
// subject to memory limit // subject to memory limit
Status s = TryReopen(options); Status s = TryReopen(options);
if (table_options.reserve_table_reader_memory) {
if (charge_table_reader == CacheEntryRoleOptions::Decision::kEnabled) {
EXPECT_TRUE(s.IsMemoryLimit()); EXPECT_TRUE(s.IsMemoryLimit());
EXPECT_TRUE(s.ToString().find(
kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
CacheEntryRole::kBlockBasedTableReader)]) !=
std::string::npos);
EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") != EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
std::string::npos); std::string::npos);

@ -9,6 +9,7 @@
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "cache/cache_reservation_manager.h"
#include "db/forward_iterator.h" #include "db/forward_iterator.h"
#include "env/mock_env.h" #include "env/mock_env.h"
#include "port/lang.h" #include "port/lang.h"
@ -1683,4 +1684,61 @@ void VerifySstUniqueIds(const TablePropertiesCollection& props) {
} }
} }
template <CacheEntryRole R>
TargetCacheChargeTrackingCache<R>::TargetCacheChargeTrackingCache(
std::shared_ptr<Cache> target)
: CacheWrapper(std::move(target)),
cur_cache_charge_(0),
cache_charge_peak_(0),
cache_charge_increment_(0),
last_peak_tracked_(false),
cache_charge_increments_sum_(0) {}
template <CacheEntryRole R>
Status TargetCacheChargeTrackingCache<R>::Insert(
const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value), Handle** handle,
Priority priority) {
Status s = target_->Insert(key, value, charge, deleter, handle, priority);
if (deleter == kNoopDeleter) {
if (last_peak_tracked_) {
cache_charge_peak_ = 0;
cache_charge_increment_ = 0;
last_peak_tracked_ = false;
}
if (s.ok()) {
cur_cache_charge_ += charge;
}
cache_charge_peak_ = std::max(cache_charge_peak_, cur_cache_charge_);
cache_charge_increment_ += charge;
}
return s;
}
template <CacheEntryRole R>
bool TargetCacheChargeTrackingCache<R>::Release(Handle* handle,
bool erase_if_last_ref) {
auto deleter = GetDeleter(handle);
if (deleter == kNoopDeleter) {
if (!last_peak_tracked_) {
cache_charge_peaks_.push_back(cache_charge_peak_);
cache_charge_increments_sum_ += cache_charge_increment_;
last_peak_tracked_ = true;
}
cur_cache_charge_ -= GetCharge(handle);
}
bool is_successful = target_->Release(handle, erase_if_last_ref);
return is_successful;
}
template <CacheEntryRole R>
const Cache::DeleterFn TargetCacheChargeTrackingCache<R>::kNoopDeleter =
CacheReservationManagerImpl<R>::TEST_GetNoopDeleterForRole();
template class TargetCacheChargeTrackingCache<
CacheEntryRole::kFilterConstruction>;
template class TargetCacheChargeTrackingCache<
CacheEntryRole::kBlockBasedTableReader>;
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -952,6 +952,51 @@ class CacheWrapper : public Cache {
std::shared_ptr<Cache> target_; std::shared_ptr<Cache> target_;
}; };
/*
* A cache wrapper that tracks certain CacheEntryRole's cache charge, its
* peaks and increments
*
* p0
* / \ p1
* / \ /\
* / \/ \
* a / b \
* peaks = {p0, p1}
* increments = {p1-a, p2-b}
*/
template <CacheEntryRole R>
class TargetCacheChargeTrackingCache : public CacheWrapper {
public:
explicit TargetCacheChargeTrackingCache(std::shared_ptr<Cache> target);
using Cache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override;
using Cache::Release;
bool Release(Handle* handle, bool erase_if_last_ref = false) override;
std::size_t GetCacheCharge() { return cur_cache_charge_; }
std::deque<std::size_t> GetChargedCachePeaks() { return cache_charge_peaks_; }
std::size_t GetChargedCacheIncrementSum() {
return cache_charge_increments_sum_;
}
private:
static const Cache::DeleterFn kNoopDeleter;
std::size_t cur_cache_charge_;
std::size_t cache_charge_peak_;
std::size_t cache_charge_increment_;
bool last_peak_tracked_;
std::deque<std::size_t> cache_charge_peaks_;
std::size_t cache_charge_increments_sum_;
};
class DBTestBase : public testing::Test { class DBTestBase : public testing::Test {
public: public:
// Sequence of option configurations to try // Sequence of option configurations to try

@ -135,7 +135,9 @@ DECLARE_int32(set_in_place_one_in);
DECLARE_int64(cache_size); DECLARE_int64(cache_size);
DECLARE_int32(cache_numshardbits); DECLARE_int32(cache_numshardbits);
DECLARE_bool(cache_index_and_filter_blocks); DECLARE_bool(cache_index_and_filter_blocks);
DECLARE_bool(reserve_table_reader_memory); DECLARE_bool(charge_compression_dictionary_building_buffer);
DECLARE_bool(charge_filter_construction);
DECLARE_bool(charge_table_reader);
DECLARE_int32(top_level_index_pinning); DECLARE_int32(top_level_index_pinning);
DECLARE_int32(partition_pinning); DECLARE_int32(partition_pinning);
DECLARE_int32(unpartitioned_pinning); DECLARE_int32(unpartitioned_pinning);

@ -304,10 +304,20 @@ DEFINE_int32(cache_numshardbits, 6,
DEFINE_bool(cache_index_and_filter_blocks, false, DEFINE_bool(cache_index_and_filter_blocks, false,
"True if indexes/filters should be cached in block cache."); "True if indexes/filters should be cached in block cache.");
DEFINE_bool(reserve_table_reader_memory, false, DEFINE_bool(charge_compression_dictionary_building_buffer, false,
"A dynamically updating charge to block cache, loosely based on " "Setting for "
"the actual memory usage of table reader, will occur to account " "CacheEntryRoleOptions::charged of"
"the memory, if block cache available."); "CacheEntryRole::kCompressionDictionaryBuildingBuffer");
DEFINE_bool(charge_filter_construction, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRole::kFilterConstruction");
DEFINE_bool(charge_table_reader, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRole::kBlockBasedTableReader");
DEFINE_int32( DEFINE_int32(
top_level_index_pinning, top_level_index_pinning,

@ -2339,8 +2339,21 @@ void StressTest::Open(SharedState* shared) {
block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.checksum = checksum_type_e; block_based_options.checksum = checksum_type_e;
block_based_options.block_size = FLAGS_block_size; block_based_options.block_size = FLAGS_block_size;
block_based_options.reserve_table_reader_memory = block_based_options.cache_usage_options.options_overrides.insert(
FLAGS_reserve_table_reader_memory; {CacheEntryRole::kCompressionDictionaryBuildingBuffer,
{/*.charged = */ FLAGS_charge_compression_dictionary_building_buffer
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kFilterConstruction,
{/*.charged = */ FLAGS_charge_filter_construction
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlockBasedTableReader,
{/*.charged = */ FLAGS_charge_table_reader
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.format_version = block_based_options.format_version =
static_cast<uint32_t>(FLAGS_format_version); static_cast<uint32_t>(FLAGS_format_version);
block_based_options.index_block_restart_interval = block_based_options.index_block_restart_interval =

@ -559,15 +559,15 @@ enum class CacheEntryRole {
kIndexBlock, kIndexBlock,
// Other kinds of block-based table block // Other kinds of block-based table block
kOtherBlock, kOtherBlock,
// WriteBufferManager reservations to account for memtable usage // WriteBufferManager's charge to account for its memtable usage
kWriteBuffer, kWriteBuffer,
// BlockBasedTableBuilder reservations to account for // Compression dictionary building buffer's charge to account for
// compression dictionary building buffer's memory usage // its memory usage
kCompressionDictionaryBuildingBuffer, kCompressionDictionaryBuildingBuffer,
// Filter reservations to account for // Filter's charge to account for
// (new) bloom and ribbon filter construction's memory usage // (new) bloom and ribbon filter construction's memory usage
kFilterConstruction, kFilterConstruction,
// BlockBasedTableReader reservations to account for // BlockBasedTableReader's charge to account for
// its memory usage // its memory usage
kBlockBasedTableReader, kBlockBasedTableReader,
// Default bucket, for miscellaneous cache entries. Do not use for // Default bucket, for miscellaneous cache entries. Do not use for

@ -552,7 +552,7 @@ struct DBOptions {
// compaction. For universal-style compaction, you can usually set it to -1. // compaction. For universal-style compaction, you can usually set it to -1.
// //
// A high value or -1 for this option can cause high memory usage. // A high value or -1 for this option can cause high memory usage.
// See BlockBasedTableOptions::reserve_table_reader_memory to constrain // See BlockBasedTableOptions::cache_usage_options to constrain
// memory usage in case of block based table format. // memory usage in case of block based table format.
// //
// Default: -1 // Default: -1

@ -22,6 +22,7 @@
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include "rocksdb/cache.h"
#include "rocksdb/customizable.h" #include "rocksdb/customizable.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
@ -104,6 +105,23 @@ struct MetadataCacheOptions {
PinningTier unpartitioned_pinning = PinningTier::kFallback; PinningTier unpartitioned_pinning = PinningTier::kFallback;
}; };
struct CacheEntryRoleOptions {
enum class Decision {
kEnabled,
kDisabled,
kFallback,
};
Decision charged = Decision::kFallback;
bool operator==(const CacheEntryRoleOptions& other) const {
return charged == other.charged;
}
};
struct CacheUsageOptions {
CacheEntryRoleOptions options;
std::map<CacheEntryRole, CacheEntryRoleOptions> options_overrides;
};
// For advanced user only // For advanced user only
struct BlockBasedTableOptions { struct BlockBasedTableOptions {
static const char* kName() { return "BlockTableOptions"; }; static const char* kName() { return "BlockTableOptions"; };
@ -287,47 +305,80 @@ struct BlockBasedTableOptions {
// separately // separately
uint64_t metadata_block_size = 4096; uint64_t metadata_block_size = 4096;
// If true, a dynamically updating charge to block cache, loosely based // `cache_usage_options` allows users to specify the default
// on the actual memory usage of table building, will occur to account // options (`cache_usage_options.options`) and the overriding
// the memory, if block cache available. // options (`cache_usage_options.options_overrides`)
// // for different `CacheEntryRole` under various features related to cache
// Charged memory usage includes: // usage.
// 1. Bloom Filter (format_version >= 5) and Ribbon Filter construction //
// 2. More to come... // For a certain `CacheEntryRole role` and a certain feature `f` of
// // `CacheEntryRoleOptions`:
// Note: // 1. If `options_overrides` has an entry for `role` and
// 1. Bloom Filter (format_version >= 5) and Ribbon Filter construction // `options_overrides[role].f != kFallback`, we use
// // `options_overrides[role].f`
// If additional temporary memory of Ribbon Filter uses up too much memory // 2. Otherwise, if `options[role].f != kFallback`, we use `options[role].f`
// relative to the avaible space left in the block cache // 3. Otherwise, we follow the compatible existing behavior for `f` (see
// each feature's comment for more)
//
// `cache_usage_options` currently supports specifying options for the
// following features:
//
// 1. Memory charging to block cache (`CacheEntryRoleOptions::charged`)
// Memory charging is a feature of accounting memory usage of specific area
// (represented by `CacheEntryRole`) toward usage in block cache (if
// available), by updating a dynamical charge to the block cache loosely based
// on the actual memory usage of that area.
//
// (a) CacheEntryRole::kCompressionDictionaryBuildingBuffer
// (i) If kEnabled:
// Charge memory usage of the buffered data used as training samples for
// dictionary compression.
// If such memory usage exceeds the avaible space left in the block cache
// at some point (i.e, causing a cache full under // at some point (i.e, causing a cache full under
// LRUCacheOptions::strict_capacity_limit = true), construction will fall back // `LRUCacheOptions::strict_capacity_limit` = true), the data will then be
// to Bloom Filter. // unbuffered.
// // (ii) If kDisabled:
// Default: false // Does not charge the memory usage mentioned above.
bool reserve_table_builder_memory = false; // (iii) Compatible existing behavior:
// Same as kEnabled.
// If true, a dynamically updating charge to block cache, loosely based //
// on the actual memory usage of table reader, will occur to account // (b) CacheEntryRole::kFilterConstruction
// the memory, if block cache available. // (i) If kEnabled:
// // Charge memory usage of Bloom Filter
// Charged memory usage includes: // (format_version >= 5) and Ribbon Filter construction.
// 1. Table properties // If additional temporary memory of Ribbon Filter exceeds the avaible
// 2. Index block/Filter block/Uncompression dictionary if stored in table // space left in the block cache at some point (i.e, causing a cache full
// reader (i.e, BlockBasedTableOptions::cache_index_and_filter_blocks == // under `LRUCacheOptions::strict_capacity_limit` = true),
// false) // construction will fall back to Bloom Filter.
// 3. Some internal data structures // (ii) If kDisabled:
// 4. More to come... // Does not charge the memory usage mentioned above.
// // (iii) Compatible existing behavior:
// Note: // Same as kDisabled.
// If creation of a table reader uses up too much memory //
// relative to the avaible space left in the block cache // (c) CacheEntryRole::kBlockBasedTableReader
// at some point (i.e, causing a cache full under // (i) If kEnabled:
// LRUCacheOptions::strict_capacity_limit = true), such creation will fail // Charge memory usage of table properties +
// with Status::MemoryLimit(). // index block/filter block/uncompression dictionary (when stored in table
// // reader i.e, BlockBasedTableOptions::cache_index_and_filter_blocks ==
// Default: false // false) + some internal data structures during table reader creation.
bool reserve_table_reader_memory = false; // If such a table reader exceeds
// the avaible space left in the block cache at some point (i.e, causing
// a cache full under `LRUCacheOptions::strict_capacity_limit` = true),
// creation will fail with Status::MemoryLimit().
// (ii) If kDisabled:
// Does not charge the memory usage mentioned above.
// (iii) Compatible existing behavior:
// Same as kDisabled.
//
// (d) Other CacheEntryRole
// Not supported.
// `Status::kNotSupported` will be returned if
// `CacheEntryRoleOptions::charged` is set to {`kEnabled`, `kDisabled`}.
//
//
// 2. More to come ...
//
CacheUsageOptions cache_usage_options;
// Note: currently this option requires kTwoLevelIndexSearch to be set as // Note: currently this option requires kTwoLevelIndexSearch to be set as
// well. // well.

@ -82,7 +82,7 @@ void WriteBufferManager::ReserveMemWithCache(size_t mem) {
// We absorb the error since WriteBufferManager is not able to handle // We absorb the error since WriteBufferManager is not able to handle
// this failure properly. Ideallly we should prevent this allocation // this failure properly. Ideallly we should prevent this allocation
// from happening if this cache reservation fails. // from happening if this cache charging fails.
// [TODO] We'll need to improve it in the future and figure out what to do on // [TODO] We'll need to improve it in the future and figure out what to do on
// error // error
s.PermitUncheckedError(); s.PermitUncheckedError();

@ -77,7 +77,9 @@ TEST_F(WriteBufferManagerTest, ShouldFlush) {
ASSERT_FALSE(wbf->ShouldFlush()); ASSERT_FALSE(wbf->ShouldFlush());
} }
TEST_F(WriteBufferManagerTest, CacheCost) { class ChargeWriteBufferTest : public testing::Test {};
TEST_F(ChargeWriteBufferTest, Basic) {
constexpr std::size_t kMetaDataChargeOverhead = 10000; constexpr std::size_t kMetaDataChargeOverhead = 10000;
LRUCacheOptions co; LRUCacheOptions co;
@ -197,7 +199,7 @@ TEST_F(WriteBufferManagerTest, CacheCost) {
ASSERT_EQ(cache->GetPinnedUsage(), 0); ASSERT_EQ(cache->GetPinnedUsage(), 0);
} }
TEST_F(WriteBufferManagerTest, NoCapCacheCost) { TEST_F(ChargeWriteBufferTest, BasicWithNoBufferSizeLimit) {
constexpr std::size_t kMetaDataChargeOverhead = 10000; constexpr std::size_t kMetaDataChargeOverhead = 10000;
// 1GB cache // 1GB cache
std::shared_ptr<Cache> cache = NewLRUCache(1024 * 1024 * 1024, 4); std::shared_ptr<Cache> cache = NewLRUCache(1024 * 1024 * 1024, 4);
@ -231,7 +233,7 @@ TEST_F(WriteBufferManagerTest, NoCapCacheCost) {
ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead); ASSERT_LT(cache->GetPinnedUsage(), 4 * 256 * 1024 + kMetaDataChargeOverhead);
} }
TEST_F(WriteBufferManagerTest, CacheFull) { TEST_F(ChargeWriteBufferTest, BasicWithCacheFull) {
constexpr std::size_t kMetaDataChargeOverhead = 20000; constexpr std::size_t kMetaDataChargeOverhead = 20000;
// 12MB cache size with strict capacity // 12MB cache size with strict capacity

@ -117,7 +117,8 @@ bool CompareBytes(char* start_ptr1, char* start_ptr2, size_t total_size,
// kBbtoExcluded, and maybe add customized verification for it. // kBbtoExcluded, and maybe add customized verification for it.
TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) { TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
// Items in the form of <offset, size>. Need to be in ascending order // Items in the form of <offset, size>. Need to be in ascending order
// and not overlapping. Need to updated if new pointer-option is added. // and not overlapping. Need to update if new option to be excluded is added
// (e.g, pointer-type)
const OffsetGap kBbtoExcluded = { const OffsetGap kBbtoExcluded = {
{offsetof(struct BlockBasedTableOptions, flush_block_policy_factory), {offsetof(struct BlockBasedTableOptions, flush_block_policy_factory),
sizeof(std::shared_ptr<FlushBlockPolicyFactory>)}, sizeof(std::shared_ptr<FlushBlockPolicyFactory>)},
@ -127,6 +128,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
sizeof(std::shared_ptr<PersistentCache>)}, sizeof(std::shared_ptr<PersistentCache>)},
{offsetof(struct BlockBasedTableOptions, block_cache_compressed), {offsetof(struct BlockBasedTableOptions, block_cache_compressed),
sizeof(std::shared_ptr<Cache>)}, sizeof(std::shared_ptr<Cache>)},
{offsetof(struct BlockBasedTableOptions, cache_usage_options),
sizeof(CacheUsageOptions)},
{offsetof(struct BlockBasedTableOptions, filter_policy), {offsetof(struct BlockBasedTableOptions, filter_policy),
sizeof(std::shared_ptr<const FilterPolicy>)}, sizeof(std::shared_ptr<const FilterPolicy>)},
}; };
@ -189,8 +192,6 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"index_block_restart_interval=4;" "index_block_restart_interval=4;"
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;detect_filter_" "filter_policy=bloomfilter:4:true;whole_key_filtering=1;detect_filter_"
"construct_corruption=false;" "construct_corruption=false;"
"reserve_table_builder_memory=false;"
"reserve_table_reader_memory=false;"
"format_version=1;" "format_version=1;"
"verify_compression=true;read_amp_bytes_per_bit=0;" "verify_compression=true;read_amp_bytes_per_bit=0;"
"enable_index_compression=false;" "enable_index_compression=false;"

@ -854,8 +854,6 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
"block_cache=1M;block_cache_compressed=1k;block_size=1024;" "block_cache=1M;block_cache_compressed=1k;block_size=1024;"
"block_size_deviation=8;block_restart_interval=4;" "block_size_deviation=8;block_restart_interval=4;"
"format_version=5;whole_key_filtering=1;" "format_version=5;whole_key_filtering=1;"
"reserve_table_builder_memory=true;"
"reserve_table_reader_memory=true;"
"filter_policy=bloomfilter:4.567:false;detect_filter_construct_" "filter_policy=bloomfilter:4.567:false;detect_filter_construct_"
"corruption=true;" "corruption=true;"
// A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS // A bug caused read_amp_bytes_per_bit to be a large integer in OPTIONS
@ -877,8 +875,6 @@ TEST_F(OptionsTest, GetBlockBasedTableOptionsFromString) {
ASSERT_EQ(new_opt.format_version, 5U); ASSERT_EQ(new_opt.format_version, 5U);
ASSERT_EQ(new_opt.whole_key_filtering, true); ASSERT_EQ(new_opt.whole_key_filtering, true);
ASSERT_EQ(new_opt.detect_filter_construct_corruption, true); ASSERT_EQ(new_opt.detect_filter_construct_corruption, true);
ASSERT_EQ(new_opt.reserve_table_builder_memory, true);
ASSERT_EQ(new_opt.reserve_table_reader_memory, true);
ASSERT_TRUE(new_opt.filter_policy != nullptr); ASSERT_TRUE(new_opt.filter_policy != nullptr);
auto bfp = new_opt.filter_policy->CheckedCast<BloomFilterPolicy>(); auto bfp = new_opt.filter_policy->CheckedCast<BloomFilterPolicy>();
ASSERT_NE(bfp, nullptr); ASSERT_NE(bfp, nullptr);

@ -461,14 +461,24 @@ struct BlockBasedTableBuilder::Rep {
buffer_limit = std::min(tbo.target_file_size, buffer_limit = std::min(tbo.target_file_size,
compression_opts.max_dict_buffer_bytes); compression_opts.max_dict_buffer_bytes);
} }
if (table_options.no_block_cache || table_options.block_cache == nullptr) {
compression_dict_buffer_cache_res_mgr = nullptr; const auto compress_dict_build_buffer_charged =
} else { table_options.cache_usage_options.options_overrides
.at(CacheEntryRole::kCompressionDictionaryBuildingBuffer)
.charged;
if (table_options.block_cache &&
(compress_dict_build_buffer_charged ==
CacheEntryRoleOptions::Decision::kEnabled ||
compress_dict_build_buffer_charged ==
CacheEntryRoleOptions::Decision::kFallback)) {
compression_dict_buffer_cache_res_mgr = compression_dict_buffer_cache_res_mgr =
std::make_shared<CacheReservationManagerImpl< std::make_shared<CacheReservationManagerImpl<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>>( CacheEntryRole::kCompressionDictionaryBuildingBuffer>>(
table_options.block_cache); table_options.block_cache);
} else {
compression_dict_buffer_cache_res_mgr = nullptr;
} }
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) { for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type)); compression_ctxs[i].reset(new CompressionContext(compression_type));
} }
@ -942,7 +952,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit); (r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
bool exceeds_global_block_cache_limit = false; bool exceeds_global_block_cache_limit = false;
// Increase cache reservation for the last buffered data block // Increase cache charging for the last buffered data block
// only if the block is not going to be unbuffered immediately // only if the block is not going to be unbuffered immediately
// and there exists a cache reservation manager // and there exists a cache reservation manager
if (!exceeds_buffer_limit && if (!exceeds_buffer_limit &&

@ -234,6 +234,7 @@ static std::unordered_map<std::string, OptionTypeInfo>
/* currently not supported /* currently not supported
std::shared_ptr<Cache> block_cache = nullptr; std::shared_ptr<Cache> block_cache = nullptr;
std::shared_ptr<Cache> block_cache_compressed = nullptr; std::shared_ptr<Cache> block_cache_compressed = nullptr;
CacheUsageOptions cache_usage_options;
*/ */
{"flush_block_policy_factory", {"flush_block_policy_factory",
OptionTypeInfo::AsCustomSharedPtr<FlushBlockPolicyFactory>( OptionTypeInfo::AsCustomSharedPtr<FlushBlockPolicyFactory>(
@ -327,14 +328,6 @@ static std::unordered_map<std::string, OptionTypeInfo>
detect_filter_construct_corruption), detect_filter_construct_corruption),
OptionType::kBoolean, OptionVerificationType::kNormal, OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}}, OptionTypeFlags::kMutable}},
{"reserve_table_builder_memory",
{offsetof(struct BlockBasedTableOptions, reserve_table_builder_memory),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"reserve_table_reader_memory",
{offsetof(struct BlockBasedTableOptions, reserve_table_reader_memory),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"skip_table_builder_flush", {"skip_table_builder_flush",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, {0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kNone}}, OptionTypeFlags::kNone}},
@ -429,8 +422,12 @@ BlockBasedTableFactory::BlockBasedTableFactory(
InitializeOptions(); InitializeOptions();
RegisterOptions(&table_options_, &block_based_table_type_info); RegisterOptions(&table_options_, &block_based_table_type_info);
if (table_options_.reserve_table_reader_memory && const auto table_reader_charged =
table_options_.no_block_cache == false) { table_options_.cache_usage_options.options_overrides
.at(CacheEntryRole::kBlockBasedTableReader)
.charged;
if (table_options_.block_cache &&
table_reader_charged == CacheEntryRoleOptions::Decision::kEnabled) {
table_reader_cache_res_mgr_.reset(new ConcurrentCacheReservationManager( table_reader_cache_res_mgr_.reset(new ConcurrentCacheReservationManager(
std::make_shared<CacheReservationManagerImpl< std::make_shared<CacheReservationManagerImpl<
CacheEntryRole::kBlockBasedTableReader>>( CacheEntryRole::kBlockBasedTableReader>>(
@ -474,6 +471,19 @@ void BlockBasedTableFactory::InitializeOptions() {
// We do not support partitioned filters without partitioning indexes // We do not support partitioned filters without partitioning indexes
table_options_.partition_filters = false; table_options_.partition_filters = false;
} }
auto& options_overrides =
table_options_.cache_usage_options.options_overrides;
const auto options = table_options_.cache_usage_options.options;
for (std::uint32_t i = 0; i < kNumCacheEntryRoles; ++i) {
CacheEntryRole role = static_cast<CacheEntryRole>(i);
auto options_overrides_iter = options_overrides.find(role);
if (options_overrides_iter == options_overrides.end()) {
options_overrides.insert({role, options});
} else if (options_overrides_iter->second.charged ==
CacheEntryRoleOptions::Decision::kFallback) {
options_overrides_iter->second.charged = options.charged;
}
}
} }
Status BlockBasedTableFactory::PrepareOptions(const ConfigOptions& opts) { Status BlockBasedTableFactory::PrepareOptions(const ConfigOptions& opts) {
@ -637,12 +647,6 @@ Status BlockBasedTableFactory::ValidateOptions(
"Enable pin_l0_filter_and_index_blocks_in_cache, " "Enable pin_l0_filter_and_index_blocks_in_cache, "
", but block cache is disabled"); ", but block cache is disabled");
} }
if (table_options_.reserve_table_reader_memory &&
table_options_.no_block_cache) {
return Status::InvalidArgument(
"Enable reserve_table_reader_memory, "
", but block cache is disabled");
}
if (!IsSupportedFormatVersion(table_options_.format_version)) { if (!IsSupportedFormatVersion(table_options_.format_version)) {
return Status::InvalidArgument( return Status::InvalidArgument(
"Unsupported BlockBasedTable format_version. Please check " "Unsupported BlockBasedTable format_version. Please check "
@ -675,6 +679,34 @@ Status BlockBasedTableFactory::ValidateOptions(
"max_successive_merges larger than 0 is currently inconsistent with " "max_successive_merges larger than 0 is currently inconsistent with "
"unordered_write"); "unordered_write");
} }
const auto& options_overrides =
table_options_.cache_usage_options.options_overrides;
for (auto options_overrides_iter = options_overrides.cbegin();
options_overrides_iter != options_overrides.cend();
++options_overrides_iter) {
const CacheEntryRole role = options_overrides_iter->first;
const CacheEntryRoleOptions options = options_overrides_iter->second;
static const std::set<CacheEntryRole> kMemoryChargingSupported = {
CacheEntryRole::kCompressionDictionaryBuildingBuffer,
CacheEntryRole::kFilterConstruction,
CacheEntryRole::kBlockBasedTableReader};
if (options.charged != CacheEntryRoleOptions::Decision::kFallback &&
kMemoryChargingSupported.count(role) == 0) {
return Status::NotSupported(
"Enable/Disable CacheEntryRoleOptions::charged"
"for CacheEntryRole " +
kCacheEntryRoleToCamelString[static_cast<uint32_t>(role)] +
" is not supported");
}
if (table_options_.no_block_cache &&
options.charged == CacheEntryRoleOptions::Decision::kEnabled) {
return Status::InvalidArgument(
"Enable CacheEntryRoleOptions::charged"
"for CacheEntryRole " +
kCacheEntryRoleToCamelString[static_cast<uint32_t>(role)] +
" but block cache is disabled");
}
}
{ {
Status s = CheckCacheOptionCompatibility(table_options_); Status s = CheckCacheOptionCompatibility(table_options_);
if (!s.ok()) { if (!s.ok()) {

@ -10,6 +10,7 @@
#include <algorithm> #include <algorithm>
#include <array> #include <array>
#include <cstdint>
#include <limits> #include <limits>
#include <memory> #include <memory>
#include <string> #include <string>
@ -717,7 +718,10 @@ Status BlockBasedTable::Open(
mem_usage, &(rep->table_reader_cache_res_handle)); mem_usage, &(rep->table_reader_cache_res_handle));
if (s.IsIncomplete()) { if (s.IsIncomplete()) {
s = Status::MemoryLimit( s = Status::MemoryLimit(
"Can't allocate BlockBasedTableReader due to memory limit based on " "Can't allocate " +
kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
CacheEntryRole::kBlockBasedTableReader)] +
" due to memory limit based on "
"cache capacity for memory allocation"); "cache capacity for memory allocation");
} }
} }

@ -283,46 +283,10 @@ TEST_P(BlockBasedTableReaderTest, MultiGet) {
} }
} }
class BlockBasedTableReaderResOnlyCache : public CacheWrapper { class ChargeTableReaderTest
public:
explicit BlockBasedTableReaderResOnlyCache(std::shared_ptr<Cache> target)
: CacheWrapper(std::move(target)) {}
using Cache::Insert;
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
if (deleter == kNoopDeleterForBlockBasedTableReader) {
return target_->Insert(key, value, charge, deleter, handle, priority);
} else {
return Status::OK();
}
}
using Cache::Release;
bool Release(Handle* handle, bool force_erase = false) override {
auto deleter = GetDeleter(handle);
if (deleter == kNoopDeleterForBlockBasedTableReader) {
return target_->Release(handle, force_erase);
} else {
return true;
}
}
private:
static const Cache::DeleterFn kNoopDeleterForBlockBasedTableReader;
};
const Cache::DeleterFn
BlockBasedTableReaderResOnlyCache::kNoopDeleterForBlockBasedTableReader =
CacheReservationManagerImpl<CacheEntryRole::kBlockBasedTableReader>::
TEST_GetNoopDeleterForRole();
class BlockBasedTableReaderCapMemoryTest
: public BlockBasedTableReaderBaseTest, : public BlockBasedTableReaderBaseTest,
public testing::WithParamInterface< public testing::WithParamInterface<
bool /* reserve_table_builder_memory */> { CacheEntryRoleOptions::Decision /* charge_table_reader_mem */> {
protected: protected:
static std::size_t CalculateMaxTableReaderNumBeforeCacheFull( static std::size_t CalculateMaxTableReaderNumBeforeCacheFull(
std::size_t cache_capacity, std::size_t approx_table_reader_mem) { std::size_t cache_capacity, std::size_t approx_table_reader_mem) {
@ -361,28 +325,30 @@ class BlockBasedTableReaderCapMemoryTest
kv_ = BlockBasedTableReaderBaseTest::GenerateKVMap(); kv_ = BlockBasedTableReaderBaseTest::GenerateKVMap();
compression_type_ = CompressionType::kNoCompression; compression_type_ = CompressionType::kNoCompression;
table_reader_res_only_cache_.reset(new BlockBasedTableReaderResOnlyCache( table_reader_charge_tracking_cache_ = std::make_shared<
NewLRUCache(4 * CacheReservationManagerImpl< TargetCacheChargeTrackingCache<
CacheEntryRole::kBlockBasedTableReader>:: CacheEntryRole::kBlockBasedTableReader>>((NewLRUCache(
GetDummyEntrySize(), 4 * CacheReservationManagerImpl<
0 /* num_shard_bits */, true /* strict_capacity_limit */))); CacheEntryRole::kBlockBasedTableReader>::GetDummyEntrySize(),
0 /* num_shard_bits */, true /* strict_capacity_limit */)));
// To ApproximateTableReaderMem() without encountering any potential errors // To ApproximateTableReaderMem() without being affected by
// caused by BlocBasedTableReader::reserve_table_reader_memory == true, we // the feature of charging its memory, we turn off the feature
// first turn off the feature to test charge_table_reader_ = CacheEntryRoleOptions::Decision::kDisabled;
reserve_table_reader_memory_ = false;
BlockBasedTableReaderBaseTest::SetUp(); BlockBasedTableReaderBaseTest::SetUp();
approx_table_reader_mem_ = ApproximateTableReaderMem(); approx_table_reader_mem_ = ApproximateTableReaderMem();
// Now we condtionally turn on the feature to test // Now we condtionally turn on the feature to test
reserve_table_reader_memory_ = GetParam(); charge_table_reader_ = GetParam();
ConfigureTableFactory(); ConfigureTableFactory();
} }
void ConfigureTableFactory() override { void ConfigureTableFactory() override {
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.reserve_table_reader_memory = reserve_table_reader_memory_; table_options.cache_usage_options.options_overrides.insert(
table_options.block_cache = table_reader_res_only_cache_; {CacheEntryRole::kBlockBasedTableReader,
{/*.charged = */ charge_table_reader_}});
table_options.block_cache = table_reader_charge_tracking_cache_;
table_options.cache_index_and_filter_blocks = false; table_options.cache_index_and_filter_blocks = false;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false)); table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
@ -392,9 +358,10 @@ class BlockBasedTableReaderCapMemoryTest
options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
} }
bool reserve_table_reader_memory_; CacheEntryRoleOptions::Decision charge_table_reader_;
std::shared_ptr<BlockBasedTableReaderResOnlyCache> std::shared_ptr<
table_reader_res_only_cache_; TargetCacheChargeTrackingCache<CacheEntryRole::kBlockBasedTableReader>>
table_reader_charge_tracking_cache_;
std::size_t approx_table_reader_mem_; std::size_t approx_table_reader_mem_;
std::map<std::string, std::string> kv_; std::map<std::string, std::string> kv_;
CompressionType compression_type_; CompressionType compression_type_;
@ -420,16 +387,16 @@ class BlockBasedTableReaderCapMemoryTest
} }
}; };
INSTANTIATE_TEST_CASE_P(CapMemoryUsageUnderCacheCapacity, INSTANTIATE_TEST_CASE_P(
BlockBasedTableReaderCapMemoryTest, ChargeTableReaderTest, ChargeTableReaderTest,
::testing::Values(true, false)); ::testing::Values(CacheEntryRoleOptions::Decision::kEnabled,
CacheEntryRoleOptions::Decision::kDisabled));
TEST_P(BlockBasedTableReaderCapMemoryTest, CapMemoryUsageUnderCacheCapacity) { TEST_P(ChargeTableReaderTest, Basic) {
const std::size_t max_table_reader_num_capped = const std::size_t max_table_reader_num_capped =
BlockBasedTableReaderCapMemoryTest:: ChargeTableReaderTest::CalculateMaxTableReaderNumBeforeCacheFull(
CalculateMaxTableReaderNumBeforeCacheFull( table_reader_charge_tracking_cache_->GetCapacity(),
table_reader_res_only_cache_->GetCapacity(), approx_table_reader_mem_);
approx_table_reader_mem_);
// Acceptable estimtation errors coming from // Acceptable estimtation errors coming from
// 1. overstimate max_table_reader_num_capped due to # dummy entries is high // 1. overstimate max_table_reader_num_capped due to # dummy entries is high
@ -448,16 +415,16 @@ TEST_P(BlockBasedTableReaderCapMemoryTest, CapMemoryUsageUnderCacheCapacity) {
<< "We need `max_table_reader_num_uncapped` > " << "We need `max_table_reader_num_uncapped` > "
"`max_table_reader_num_capped_upper_bound` to differentiate cases " "`max_table_reader_num_capped_upper_bound` to differentiate cases "
"between " "between "
"reserve_table_reader_memory_ == false and == true)"; "charge_table_reader_ == kDisabled and == kEnabled)";
Status s = Status::OK(); Status s = Status::OK();
std::size_t opened_table_reader_num = 0; std::size_t opened_table_reader_num = 0;
std::string table_name; std::string table_name;
std::vector<std::unique_ptr<BlockBasedTable>> tables; std::vector<std::unique_ptr<BlockBasedTable>> tables;
// Keep creating BlockBasedTableReader till hiting the memory limit based on // Keep creating BlockBasedTableReader till hiting the memory limit based on
// cache capacity and creation fails (when reserve_table_reader_memory_ == // cache capacity and creation fails (when charge_table_reader_ ==
// true) or reaching a specfied big number of table readers (when // kEnabled) or reaching a specfied big number of table readers (when
// reserve_table_reader_memory_ == false) // charge_table_reader_ == kDisabled)
while (s.ok() && opened_table_reader_num < max_table_reader_num_uncapped) { while (s.ok() && opened_table_reader_num < max_table_reader_num_uncapped) {
table_name = "table_" + std::to_string(opened_table_reader_num); table_name = "table_" + std::to_string(opened_table_reader_num);
CreateTable(table_name, compression_type_, kv_); CreateTable(table_name, compression_type_, kv_);
@ -471,8 +438,12 @@ TEST_P(BlockBasedTableReaderCapMemoryTest, CapMemoryUsageUnderCacheCapacity) {
} }
} }
if (reserve_table_reader_memory_) { if (charge_table_reader_ == CacheEntryRoleOptions::Decision::kEnabled) {
EXPECT_TRUE(s.IsMemoryLimit()) << "s: " << s.ToString(); EXPECT_TRUE(s.IsMemoryLimit()) << "s: " << s.ToString();
EXPECT_TRUE(s.ToString().find(
kCacheEntryRoleToCamelString[static_cast<std::uint32_t>(
CacheEntryRole::kBlockBasedTableReader)]) !=
std::string::npos);
EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") != EXPECT_TRUE(s.ToString().find("memory limit based on cache capacity") !=
std::string::npos); std::string::npos);
@ -480,10 +451,9 @@ TEST_P(BlockBasedTableReaderCapMemoryTest, CapMemoryUsageUnderCacheCapacity) {
EXPECT_LE(opened_table_reader_num, max_table_reader_num_capped_upper_bound); EXPECT_LE(opened_table_reader_num, max_table_reader_num_capped_upper_bound);
std::size_t updated_max_table_reader_num_capped = std::size_t updated_max_table_reader_num_capped =
BlockBasedTableReaderCapMemoryTest:: ChargeTableReaderTest::CalculateMaxTableReaderNumBeforeCacheFull(
CalculateMaxTableReaderNumBeforeCacheFull( table_reader_charge_tracking_cache_->GetCapacity() / 2,
table_reader_res_only_cache_->GetCapacity() / 2, approx_table_reader_mem_);
approx_table_reader_mem_);
// Keep deleting BlockBasedTableReader to lower down memory usage from the // Keep deleting BlockBasedTableReader to lower down memory usage from the
// memory limit to make the next creation succeeds // memory limit to make the next creation succeeds
@ -501,13 +471,13 @@ TEST_P(BlockBasedTableReaderCapMemoryTest, CapMemoryUsageUnderCacheCapacity) {
EXPECT_TRUE(s.ok()) << s.ToString(); EXPECT_TRUE(s.ok()) << s.ToString();
tables.clear(); tables.clear();
EXPECT_EQ(table_reader_res_only_cache_->GetPinnedUsage(), 0); EXPECT_EQ(table_reader_charge_tracking_cache_->GetCacheCharge(), 0);
} else { } else {
EXPECT_TRUE(s.ok() && EXPECT_TRUE(s.ok() &&
opened_table_reader_num == max_table_reader_num_uncapped) opened_table_reader_num == max_table_reader_num_uncapped)
<< "s: " << s.ToString() << " opened_table_reader_num: " << "s: " << s.ToString() << " opened_table_reader_num: "
<< std::to_string(opened_table_reader_num); << std::to_string(opened_table_reader_num);
EXPECT_EQ(table_reader_res_only_cache_->GetPinnedUsage(), 0); EXPECT_EQ(table_reader_charge_tracking_cache_->GetCacheCharge(), 0);
} }
} }

@ -84,7 +84,7 @@ class FilterBlockBuilder {
Status* status, std::unique_ptr<const char[]>* filter_data = nullptr) = 0; Status* status, std::unique_ptr<const char[]>* filter_data = nullptr) = 0;
// This is called when finishes using the FilterBitsBuilder // This is called when finishes using the FilterBitsBuilder
// in order to release memory usage and cache reservation // in order to release memory usage and cache charge
// associated with it timely // associated with it timely
virtual void ResetFilterBitsBuilder() {} virtual void ResetFilterBitsBuilder() {}

@ -109,7 +109,7 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
static constexpr uint32_t kMetadataLen = 5; static constexpr uint32_t kMetadataLen = 5;
// Number of hash entries to accumulate before charging their memory usage to // Number of hash entries to accumulate before charging their memory usage to
// the cache when cache reservation is available // the cache when cache charging is available
static const std::size_t kUint64tHashEntryCacheResBucketSize = static const std::size_t kUint64tHashEntryCacheResBucketSize =
CacheReservationManagerImpl< CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() / CacheEntryRole::kFilterConstruction>::GetDummyEntrySize() /
@ -257,7 +257,7 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
// For reserving memory used in (new) Bloom and Ribbon Filter construction // For reserving memory used in (new) Bloom and Ribbon Filter construction
std::shared_ptr<CacheReservationManager> cache_res_mgr_; std::shared_ptr<CacheReservationManager> cache_res_mgr_;
// For managing cache reservation for final filter in (new) Bloom and Ribbon // For managing cache charge for final filter in (new) Bloom and Ribbon
// Filter construction // Filter construction
std::deque<std::unique_ptr<CacheReservationManager::CacheReservationHandle>> std::deque<std::unique_ptr<CacheReservationManager::CacheReservationHandle>>
final_filter_cache_res_handles_; final_filter_cache_res_handles_;
@ -270,7 +270,7 @@ class XXPH3FilterBitsBuilder : public BuiltinFilterBitsBuilder {
std::deque<uint64_t> entries; std::deque<uint64_t> entries;
// If cache_res_mgr_ != nullptr, // If cache_res_mgr_ != nullptr,
// it manages cache reservation for buckets of hash entries in (new) Bloom // it manages cache charge for buckets of hash entries in (new) Bloom
// or Ribbon Filter construction. // or Ribbon Filter construction.
// Otherwise, it is empty. // Otherwise, it is empty.
std::deque<std::unique_ptr<CacheReservationManager::CacheReservationHandle>> std::deque<std::unique_ptr<CacheReservationManager::CacheReservationHandle>>
@ -338,7 +338,7 @@ class FastLocalBloomBitsBuilder : public XXPH3FilterBitsBuilder {
final_filter_cache_res_handle; final_filter_cache_res_handle;
len_with_metadata = len_with_metadata =
AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf); AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf);
// Cache reservation for mutable_buf // Cache charging for mutable_buf
if (cache_res_mgr_) { if (cache_res_mgr_) {
Status s = cache_res_mgr_->MakeCacheReservation( Status s = cache_res_mgr_->MakeCacheReservation(
len_with_metadata * sizeof(char), &final_filter_cache_res_handle); len_with_metadata * sizeof(char), &final_filter_cache_res_handle);
@ -655,7 +655,7 @@ class Standard128RibbonBitsBuilder : public XXPH3FilterBitsBuilder {
Standard128RibbonTypesAndSettings>::EstimateMemoryUsage(num_slots); Standard128RibbonTypesAndSettings>::EstimateMemoryUsage(num_slots);
Status status_banding_cache_res = Status::OK(); Status status_banding_cache_res = Status::OK();
// Cache reservation for banding // Cache charging for banding
std::unique_ptr<CacheReservationManager::CacheReservationHandle> std::unique_ptr<CacheReservationManager::CacheReservationHandle>
banding_res_handle; banding_res_handle;
if (cache_res_mgr_) { if (cache_res_mgr_) {
@ -665,7 +665,7 @@ class Standard128RibbonBitsBuilder : public XXPH3FilterBitsBuilder {
if (status_banding_cache_res.IsIncomplete()) { if (status_banding_cache_res.IsIncomplete()) {
ROCKS_LOG_WARN(info_log_, ROCKS_LOG_WARN(info_log_,
"Cache reservation for Ribbon filter banding failed due " "Cache charging for Ribbon filter banding failed due "
"to cache full"); "to cache full");
SwapEntriesWith(&bloom_fallback_); SwapEntriesWith(&bloom_fallback_);
assert(hash_entries_info_.entries.empty()); assert(hash_entries_info_.entries.empty());
@ -717,7 +717,7 @@ class Standard128RibbonBitsBuilder : public XXPH3FilterBitsBuilder {
final_filter_cache_res_handle; final_filter_cache_res_handle;
len_with_metadata = len_with_metadata =
AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf); AllocateMaybeRounding(len_with_metadata, num_entries, &mutable_buf);
// Cache reservation for mutable_buf // Cache charging for mutable_buf
if (cache_res_mgr_) { if (cache_res_mgr_) {
Status s = cache_res_mgr_->MakeCacheReservation( Status s = cache_res_mgr_->MakeCacheReservation(
len_with_metadata * sizeof(char), &final_filter_cache_res_handle); len_with_metadata * sizeof(char), &final_filter_cache_res_handle);
@ -1483,11 +1483,19 @@ std::string BloomFilterPolicy::GetId() const {
FilterBitsBuilder* BloomLikeFilterPolicy::GetFastLocalBloomBuilderWithContext( FilterBitsBuilder* BloomLikeFilterPolicy::GetFastLocalBloomBuilderWithContext(
const FilterBuildingContext& context) const { const FilterBuildingContext& context) const {
bool offm = context.table_options.optimize_filters_for_memory; bool offm = context.table_options.optimize_filters_for_memory;
bool reserve_filter_construction_mem = const auto options_overrides_iter =
(context.table_options.reserve_table_builder_memory && context.table_options.cache_usage_options.options_overrides.find(
context.table_options.block_cache); CacheEntryRole::kFilterConstruction);
const auto filter_construction_charged =
options_overrides_iter !=
context.table_options.cache_usage_options.options_overrides.end()
? options_overrides_iter->second.charged
: context.table_options.cache_usage_options.options.charged;
std::shared_ptr<CacheReservationManager> cache_res_mgr; std::shared_ptr<CacheReservationManager> cache_res_mgr;
if (reserve_filter_construction_mem) { if (context.table_options.block_cache &&
filter_construction_charged ==
CacheEntryRoleOptions::Decision::kEnabled) {
cache_res_mgr = std::make_shared< cache_res_mgr = std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>>( CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>>(
context.table_options.block_cache); context.table_options.block_cache);
@ -1525,11 +1533,19 @@ BloomLikeFilterPolicy::GetStandard128RibbonBuilderWithContext(
const FilterBuildingContext& context) const { const FilterBuildingContext& context) const {
// FIXME: code duplication with GetFastLocalBloomBuilderWithContext // FIXME: code duplication with GetFastLocalBloomBuilderWithContext
bool offm = context.table_options.optimize_filters_for_memory; bool offm = context.table_options.optimize_filters_for_memory;
bool reserve_filter_construction_mem = const auto options_overrides_iter =
(context.table_options.reserve_table_builder_memory && context.table_options.cache_usage_options.options_overrides.find(
context.table_options.block_cache); CacheEntryRole::kFilterConstruction);
const auto filter_construction_charged =
options_overrides_iter !=
context.table_options.cache_usage_options.options_overrides.end()
? options_overrides_iter->second.charged
: context.table_options.cache_usage_options.options.charged;
std::shared_ptr<CacheReservationManager> cache_res_mgr; std::shared_ptr<CacheReservationManager> cache_res_mgr;
if (reserve_filter_construction_mem) { if (context.table_options.block_cache &&
filter_construction_charged ==
CacheEntryRoleOptions::Decision::kEnabled) {
cache_res_mgr = std::make_shared< cache_res_mgr = std::make_shared<
CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>>( CacheReservationManagerImpl<CacheEntryRole::kFilterConstruction>>(
context.table_options.block_cache); context.table_options.block_cache);

@ -22,6 +22,7 @@
#include <vector> #include <vector>
#include "cache/lru_cache.h" #include "cache/lru_cache.h"
#include "db/db_test_util.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
@ -1126,8 +1127,9 @@ class TableTest : public testing::Test {
}; };
class GeneralTableTest : public TableTest {}; class GeneralTableTest : public TableTest {};
class BlockBasedTableTestBase : public TableTest {};
class BlockBasedTableTest class BlockBasedTableTest
: public TableTest, : public BlockBasedTableTestBase,
virtual public ::testing::WithParamInterface<uint32_t> { virtual public ::testing::WithParamInterface<uint32_t> {
public: public:
BlockBasedTableTest() : format_(GetParam()) { BlockBasedTableTest() : format_(GetParam()) {
@ -5190,84 +5192,98 @@ TEST_P(BlockBasedTableTest, OutOfBoundOnNext) {
ASSERT_FALSE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound); ASSERT_FALSE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
} }
TEST_P( class ChargeCompressionDictionaryBuildingBufferTest
BlockBasedTableTest, : public BlockBasedTableTestBase {};
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBuilderFinish) { TEST_F(ChargeCompressionDictionaryBuildingBufferTest, Basic) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024; constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000; constexpr std::size_t kMetaDataChargeOverhead = 10000;
constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024; constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
constexpr std::size_t kMaxDictBytes = 1024; constexpr std::size_t kMaxDictBytes = 1024;
constexpr std::size_t kMaxDictBufferBytes = 1024; constexpr std::size_t kMaxDictBufferBytes = 1024;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); for (CacheEntryRoleOptions::Decision
LRUCacheOptions lo; charge_compression_dictionary_building_buffer :
lo.capacity = kCacheCapacity; {CacheEntryRoleOptions::Decision::kEnabled,
lo.num_shard_bits = 0; // 2^0 shard CacheEntryRoleOptions::Decision::kDisabled}) {
lo.strict_capacity_limit = true; BlockBasedTableOptions table_options;
std::shared_ptr<Cache> cache(NewLRUCache(lo)); LRUCacheOptions lo;
table_options.block_cache = cache; lo.capacity = kCacheCapacity;
table_options.flush_block_policy_factory = lo.num_shard_bits = 0; // 2^0 shard
std::make_shared<FlushBlockEveryKeyPolicyFactory>(); lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache(NewLRUCache(lo));
Options options; table_options.block_cache = cache;
options.compression = kSnappyCompression; table_options.flush_block_policy_factory =
options.compression_opts.max_dict_bytes = kMaxDictBytes; std::make_shared<FlushBlockEveryKeyPolicyFactory>();
options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes; table_options.cache_usage_options.options_overrides.insert(
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); {CacheEntryRole::kCompressionDictionaryBuildingBuffer,
{/*.charged = */ charge_compression_dictionary_building_buffer}});
test::StringSink* sink = new test::StringSink(); Options options;
std::unique_ptr<FSWritableFile> holder(sink); options.compression = kSnappyCompression;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( options.compression_opts.max_dict_bytes = kMaxDictBytes;
std::move(holder), "test_file_name", FileOptions())); options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
ImmutableOptions ioptions(options);
MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
IntTblPropCollectorFactories int_tbl_prop_collector_factories;
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder( test::StringSink* sink = new test::StringSink();
TableBuilderOptions(ioptions, moptions, ikc, std::unique_ptr<FSWritableFile> holder(sink);
&int_tbl_prop_collector_factories, kSnappyCompression, std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
options.compression_opts, kUnknownColumnFamily, std::move(holder), "test_file_name", FileOptions()));
"test_cf", -1 /* level */),
file_writer.get()));
std::string key1 = "key1"; ImmutableOptions ioptions(options);
std::string value1 = "val1"; MutableCFOptions moptions(options);
InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue); InternalKeyComparator ikc(options.comparator);
// Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy IntTblPropCollectorFactories int_tbl_prop_collector_factories;
// therefore won't trigger any data block's buffering
builder->Add(ik1.Encode(), value1);
ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
std::string key2 = "key2"; std::unique_ptr<TableBuilder> builder(
std::string value2 = "val2"; options.table_factory->NewTableBuilder(
InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue); TableBuilderOptions(
// Adding the second key will trigger a flush of the last data block (the one ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger kSnappyCompression, options.compression_opts,
// buffering of that data block. kUnknownColumnFamily, "test_cf", -1 /* level */),
builder->Add(ik2.Encode(), value2); file_writer.get()));
// Cache reservation will increase for last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after std::string key1 = "key1";
// that buffering and the cache will not be full after this reservation std::string value1 = "val1";
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
EXPECT_LT(cache->GetPinnedUsage(), // Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
1 * kSizeDummyEntry + kMetaDataChargeOverhead); // therefore won't trigger any data block's buffering
builder->Add(ik1.Encode(), value1);
ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
std::string key2 = "key2";
std::string value2 = "val2";
InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
// Adding the second key will trigger a flush of the last data block (the
// one containing key1 and value1) by FlushBlockEveryKeyPolicy and hence
// trigger buffering of that data block.
builder->Add(ik2.Encode(), value2);
// Cache charging will increase for last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after
// that buffering and the cache will not be full after this reservation
if (charge_compression_dictionary_building_buffer ==
CacheEntryRoleOptions::Decision::kEnabled) {
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
EXPECT_LT(cache->GetPinnedUsage(),
1 * kSizeDummyEntry + kMetaDataChargeOverhead);
} else {
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
}
ASSERT_OK(builder->Finish()); ASSERT_OK(builder->Finish());
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
}
} }
TEST_P( TEST_F(ChargeCompressionDictionaryBuildingBufferTest,
BlockBasedTableTest, BasicWithBufferLimitExceed) {
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBufferLimitExceed) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024; constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000; constexpr std::size_t kMetaDataChargeOverhead = 10000;
constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024; constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
constexpr std::size_t kMaxDictBytes = 1024; constexpr std::size_t kMaxDictBytes = 1024;
constexpr std::size_t kMaxDictBufferBytes = 2 * kSizeDummyEntry; constexpr std::size_t kMaxDictBufferBytes = 2 * kSizeDummyEntry;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); // `CacheEntryRoleOptions::charged` is enabled by default for
// CacheEntryRole::kCompressionDictionaryBuildingBuffer
BlockBasedTableOptions table_options;
LRUCacheOptions lo; LRUCacheOptions lo;
lo.capacity = kCacheCapacity; lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard lo.num_shard_bits = 0; // 2^0 shard
@ -5315,7 +5331,7 @@ TEST_P(
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block. // buffering of the last data block.
builder->Add(ik2.Encode(), value2); builder->Add(ik2.Encode(), value2);
// Cache reservation will increase for last buffered data block (the one // Cache charging will increase for last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after // containing key1 and value1) since the buffer limit is not exceeded after
// the buffering and the cache will not be full after this reservation // the buffering and the cache will not be full after this reservation
EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry); EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry);
@ -5329,7 +5345,7 @@ TEST_P(
// containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger // containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block. // buffering of the last data block.
builder->Add(ik3.Encode(), value3); builder->Add(ik3.Encode(), value3);
// Cache reservation will decrease since the buffer limit is now exceeded // Cache charging will decrease since the buffer limit is now exceeded
// after the last buffering and EnterUnbuffered() is triggered // after the last buffering and EnterUnbuffered() is triggered
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
@ -5337,12 +5353,10 @@ TEST_P(
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
} }
TEST_P( TEST_F(ChargeCompressionDictionaryBuildingBufferTest, BasicWithCacheFull) {
BlockBasedTableTest,
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnCacheFull) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024; constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000; constexpr std::size_t kMetaDataChargeOverhead = 10000;
// A small kCacheCapacity is chosen so that increase cache reservation for // A small kCacheCapacity is chosen so that increase cache charging for
// buffering two data blocks, each containing key1/value1, key2/a big // buffering two data blocks, each containing key1/value1, key2/a big
// value2, will cause cache full // value2, will cause cache full
constexpr std::size_t kCacheCapacity = constexpr std::size_t kCacheCapacity =
@ -5352,7 +5366,9 @@ TEST_P(
// (key2, value2) won't exceed the buffer limit // (key2, value2) won't exceed the buffer limit
constexpr std::size_t kMaxDictBufferBytes = 1024 * 1024 * 1024; constexpr std::size_t kMaxDictBufferBytes = 1024 * 1024 * 1024;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions(); // `CacheEntryRoleOptions::charged` is enabled by default for
// CacheEntryRole::kCompressionDictionaryBuildingBuffer
BlockBasedTableOptions table_options;
LRUCacheOptions lo; LRUCacheOptions lo;
lo.capacity = kCacheCapacity; lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard lo.num_shard_bits = 0; // 2^0 shard
@ -5400,7 +5416,7 @@ TEST_P(
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger // containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block. // buffering of the last data block.
builder->Add(ik2.Encode(), value2); builder->Add(ik2.Encode(), value2);
// Cache reservation will increase for the last buffered data block (the one // Cache charging will increase for the last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after // containing key1 and value1) since the buffer limit is not exceeded after
// the buffering and the cache will not be full after this reservation // the buffering and the cache will not be full after this reservation
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry); EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
@ -5414,7 +5430,7 @@ TEST_P(
// containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger // containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block. // buffering of the last data block.
builder->Add(ik3.Encode(), value3); builder->Add(ik3.Encode(), value3);
// Cache reservation will decrease since the cache is now full after // Cache charging will decrease since the cache is now full after
// increasing reservation for the last buffered block and EnterUnbuffered() is // increasing reservation for the last buffered block and EnterUnbuffered() is
// triggered // triggered
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
@ -5423,6 +5439,75 @@ TEST_P(
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry); EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
} }
class CacheUsageOptionsOverridesTest : public DBTestBase {
public:
CacheUsageOptionsOverridesTest()
: DBTestBase("cache_usage_options_overrides_test",
/*env_do_fsync=*/false) {}
};
TEST_F(CacheUsageOptionsOverridesTest, SanitizeAndValidateOptions) {
// To test `cache_usage_options.options_overrides` is sanitized
// where `cache_usage_options.options` is used when there is no entry in
// `cache_usage_options.options_overrides`
Options options;
options.create_if_missing = true;
BlockBasedTableOptions table_options = BlockBasedTableOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Destroy(options);
Status s = TryReopen(options);
EXPECT_TRUE(s.ok());
const auto* sanitized_table_options =
options.table_factory->GetOptions<BlockBasedTableOptions>();
const auto sanitized_options_overrides =
sanitized_table_options->cache_usage_options.options_overrides;
EXPECT_EQ(sanitized_options_overrides.size(), kNumCacheEntryRoles);
for (auto options_overrides_iter = sanitized_options_overrides.cbegin();
options_overrides_iter != sanitized_options_overrides.cend();
++options_overrides_iter) {
CacheEntryRoleOptions role_options = options_overrides_iter->second;
CacheEntryRoleOptions default_options =
sanitized_table_options->cache_usage_options.options;
EXPECT_TRUE(role_options == default_options);
}
Destroy(options);
// To test option validation on unsupported CacheEntryRole
table_options = BlockBasedTableOptions();
table_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kDataBlock,
{/*.charged = */ CacheEntryRoleOptions::Decision::kDisabled}});
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Destroy(options);
s = TryReopen(options);
EXPECT_TRUE(s.IsNotSupported());
EXPECT_TRUE(
s.ToString().find("Enable/Disable CacheEntryRoleOptions::charged") !=
std::string::npos);
EXPECT_TRUE(
s.ToString().find(kCacheEntryRoleToCamelString[static_cast<uint32_t>(
CacheEntryRole::kDataBlock)]) != std::string::npos);
Destroy(options);
// To test option validation on existence of block cache
table_options = BlockBasedTableOptions();
table_options.no_block_cache = true;
table_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kFilterConstruction,
{/*.charged = */ CacheEntryRoleOptions::Decision::kEnabled}});
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Destroy(options);
s = TryReopen(options);
EXPECT_TRUE(s.IsInvalidArgument());
EXPECT_TRUE(s.ToString().find("Enable CacheEntryRoleOptions::charged") !=
std::string::npos);
EXPECT_TRUE(
s.ToString().find(kCacheEntryRoleToCamelString[static_cast<std::size_t>(
CacheEntryRole::kFilterConstruction)]) != std::string::npos);
EXPECT_TRUE(s.ToString().find("block cache is disabled") !=
std::string::npos);
Destroy(options);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -1129,10 +1129,20 @@ DEFINE_bool(async_io, false,
"When set true, RocksDB does asynchronous reads for internal auto " "When set true, RocksDB does asynchronous reads for internal auto "
"readahead prefetching."); "readahead prefetching.");
DEFINE_bool(reserve_table_reader_memory, false, DEFINE_bool(charge_compression_dictionary_building_buffer, false,
"A dynamically updating charge to block cache, loosely based on " "Setting for "
"the actual memory usage of table reader, will occur to account " "CacheEntryRoleOptions::charged of"
"the memory, if block cache available."); "CacheEntryRole::kCompressionDictionaryBuildingBuffer");
DEFINE_bool(charge_filter_construction, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRole::kFilterConstruction");
DEFINE_bool(charge_table_reader, false,
"Setting for "
"CacheEntryRoleOptions::charged of"
"CacheEntryRole::kBlockBasedTableReader");
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
const char* ctype) { const char* ctype) {
@ -4162,8 +4172,21 @@ class Benchmark {
true; true;
} }
block_based_options.block_cache = cache_; block_based_options.block_cache = cache_;
block_based_options.reserve_table_reader_memory = block_based_options.cache_usage_options.options_overrides.insert(
FLAGS_reserve_table_reader_memory; {CacheEntryRole::kCompressionDictionaryBuildingBuffer,
{/*.charged = */ FLAGS_charge_compression_dictionary_building_buffer
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kFilterConstruction,
{/*.charged = */ FLAGS_charge_filter_construction
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kBlockBasedTableReader,
{/*.charged = */ FLAGS_charge_table_reader
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.block_size = FLAGS_block_size; block_based_options.block_size = FLAGS_block_size;
block_based_options.block_restart_interval = FLAGS_block_restart_interval; block_based_options.block_restart_interval = FLAGS_block_restart_interval;

@ -41,7 +41,9 @@ default_params = {
random.lognormvariate(2.3, 1.3)]), random.lognormvariate(2.3, 1.3)]),
"cache_index_and_filter_blocks": lambda: random.randint(0, 1), "cache_index_and_filter_blocks": lambda: random.randint(0, 1),
"cache_size": 8388608, "cache_size": 8388608,
"reserve_table_reader_memory": lambda: random.choice([0, 1]), "charge_compression_dictionary_building_buffer": lambda: random.choice([0, 1]),
"charge_filter_construction": lambda: random.choice([0, 1]),
"charge_table_reader": lambda: random.choice([0, 1]),
"checkpoint_one_in": 1000000, "checkpoint_one_in": 1000000,
"compression_type": lambda: random.choice( "compression_type": lambda: random.choice(
["none", "snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]), ["none", "snappy", "zlib", "lz4", "lz4hc", "xpress", "zstd"]),

@ -618,18 +618,23 @@ TEST_P(FullBloomTest, OptimizeForMemory) {
} }
} }
TEST(FullBloomFilterConstructionReserveMemTest, class ChargeFilterConstructionTest : public testing::Test {};
RibbonFilterFallBackOnLargeBanding) { TEST_F(ChargeFilterConstructionTest, RibbonFilterFallBackOnLargeBanding) {
constexpr std::size_t kCacheCapacity = constexpr std::size_t kCacheCapacity =
8 * CacheReservationManagerImpl< 8 * CacheReservationManagerImpl<
CacheEntryRole::kFilterConstruction>::GetDummyEntrySize(); CacheEntryRole::kFilterConstruction>::GetDummyEntrySize();
constexpr std::size_t num_entries_for_cache_full = kCacheCapacity / 8; constexpr std::size_t num_entries_for_cache_full = kCacheCapacity / 8;
for (bool reserve_builder_mem : {true, false}) { for (CacheEntryRoleOptions::Decision charge_filter_construction_mem :
bool will_fall_back = reserve_builder_mem; {CacheEntryRoleOptions::Decision::kEnabled,
CacheEntryRoleOptions::Decision::kDisabled}) {
bool will_fall_back = charge_filter_construction_mem ==
CacheEntryRoleOptions::Decision::kEnabled;
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.reserve_table_builder_memory = reserve_builder_mem; table_options.cache_usage_options.options_overrides.insert(
{CacheEntryRole::kFilterConstruction,
{/*.charged = */ charge_filter_construction_mem}});
LRUCacheOptions lo; LRUCacheOptions lo;
lo.capacity = kCacheCapacity; lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard lo.num_shard_bits = 0; // 2^0 shard
@ -651,7 +656,7 @@ TEST(FullBloomFilterConstructionReserveMemTest,
Slice filter = filter_bits_builder->Finish(&buf); Slice filter = filter_bits_builder->Finish(&buf);
// To verify Ribbon Filter fallbacks to Bloom Filter properly // To verify Ribbon Filter fallbacks to Bloom Filter properly
// based on cache reservation result // based on cache charging result
// See BloomFilterPolicy::GetBloomBitsReader re: metadata // See BloomFilterPolicy::GetBloomBitsReader re: metadata
// -1 = Marker for newer Bloom implementations // -1 = Marker for newer Bloom implementations
// -2 = Marker for Standard128 Ribbon // -2 = Marker for Standard128 Ribbon
@ -661,7 +666,8 @@ TEST(FullBloomFilterConstructionReserveMemTest,
EXPECT_EQ(filter.data()[filter.size() - 5], static_cast<char>(-2)); EXPECT_EQ(filter.data()[filter.size() - 5], static_cast<char>(-2));
} }
if (reserve_builder_mem) { if (charge_filter_construction_mem ==
CacheEntryRoleOptions::Decision::kEnabled) {
const size_t dummy_entry_num = static_cast<std::size_t>(std::ceil( const size_t dummy_entry_num = static_cast<std::size_t>(std::ceil(
filter.size() * 1.0 / filter.size() * 1.0 /
CacheReservationManagerImpl< CacheReservationManagerImpl<

@ -21,6 +21,7 @@ int main() {
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "rocksdb/table.h"
#include "table/block_based/filter_policy_internal.h" #include "table/block_based/filter_policy_internal.h"
#include "table/block_based/full_filter_block.h" #include "table/block_based/full_filter_block.h"
#include "table/block_based/mock_block_based_table.h" #include "table/block_based/mock_block_based_table.h"
@ -103,9 +104,10 @@ DEFINE_uint32(block_cache_capacity_MB, 8,
"Setting for " "Setting for "
"LRUCacheOptions::capacity"); "LRUCacheOptions::capacity");
DEFINE_bool(reserve_table_builder_memory, false, DEFINE_bool(charge_filter_construction, false,
"Setting for " "Setting for "
"BlockBasedTableOptions::reserve_table_builder_memory"); "CacheEntryRoleOptions::charged of"
"CacheEntryRole::kFilterConstruction");
DEFINE_bool(strict_capacity_limit, false, DEFINE_bool(strict_capacity_limit, false,
"Setting for " "Setting for "
@ -145,6 +147,8 @@ using ROCKSDB_NAMESPACE::BloomLikeFilterPolicy;
using ROCKSDB_NAMESPACE::BuiltinFilterBitsBuilder; using ROCKSDB_NAMESPACE::BuiltinFilterBitsBuilder;
using ROCKSDB_NAMESPACE::CachableEntry; using ROCKSDB_NAMESPACE::CachableEntry;
using ROCKSDB_NAMESPACE::Cache; using ROCKSDB_NAMESPACE::Cache;
using ROCKSDB_NAMESPACE::CacheEntryRole;
using ROCKSDB_NAMESPACE::CacheEntryRoleOptions;
using ROCKSDB_NAMESPACE::EncodeFixed32; using ROCKSDB_NAMESPACE::EncodeFixed32;
using ROCKSDB_NAMESPACE::FastRange32; using ROCKSDB_NAMESPACE::FastRange32;
using ROCKSDB_NAMESPACE::FilterBitsReader; using ROCKSDB_NAMESPACE::FilterBitsReader;
@ -321,8 +325,12 @@ struct FilterBench : public MockBlockBasedTableTester {
FLAGS_optimize_filters_for_memory; FLAGS_optimize_filters_for_memory;
table_options_.detect_filter_construct_corruption = table_options_.detect_filter_construct_corruption =
FLAGS_detect_filter_construct_corruption; FLAGS_detect_filter_construct_corruption;
if (FLAGS_reserve_table_builder_memory) { table_options_.cache_usage_options.options_overrides.insert(
table_options_.reserve_table_builder_memory = true; {CacheEntryRole::kFilterConstruction,
{/*.charged = */ FLAGS_charge_filter_construction
? CacheEntryRoleOptions::Decision::kEnabled
: CacheEntryRoleOptions::Decision::kDisabled}});
if (FLAGS_charge_filter_construction) {
table_options_.no_block_cache = false; table_options_.no_block_cache = false;
LRUCacheOptions lo; LRUCacheOptions lo;
lo.capacity = FLAGS_block_cache_capacity_MB * 1024 * 1024; lo.capacity = FLAGS_block_cache_capacity_MB * 1024 * 1024;

Loading…
Cancel
Save