Account for dictionary-building buffer in global memory limit (#8428)

Summary:
Context:
Some data blocks are temporarily buffered in memory in BlockBasedTableBuilder for building compression dictionary used in data block compression. Currently this memory usage is not counted toward our global memory usage utilizing block cache capacity. To improve that, this PR charges that memory usage into the block cache to achieve better memory tracking and limiting.

- Reserve memory in block cache for buffered data blocks that are used to build a compression dictionary
- Release all the memory associated with buffering the data blocks mentioned above in EnterUnbuffered(), which is called when (a) buffer limit is exceeded after buffering OR (b) the block cache becomes full after reservation OR (c) BlockBasedTableBuilder calls Finish()

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8428

Test Plan:
- Passing existing unit tests
- Passing new unit tests

Reviewed By: ajkr

Differential Revision: D30755305

Pulled By: hx235

fbshipit-source-id: 6e66665020b775154a94c4c5e0f2adaeaff13981
main
Hui Xiao 3 years ago committed by Facebook GitHub Bot
parent 0cb0fc6fd3
commit 91b95cadee
  1. 2
      cache/cache_entry_roles.cc
  2. 5
      cache/cache_entry_roles.h
  3. 3
      cache/cache_reservation_manager.cc
  4. 36
      table/block_based/block_based_table_builder.cc
  5. 234
      table/table_test.cc

@ -19,6 +19,7 @@ std::array<const char*, kNumCacheEntryRoles> kCacheEntryRoleToCamelString{{
"IndexBlock",
"OtherBlock",
"WriteBuffer",
"CompressionDictionaryBuildingBuffer",
"Misc",
}};
@ -30,6 +31,7 @@ std::array<const char*, kNumCacheEntryRoles> kCacheEntryRoleToHyphenString{{
"index-block",
"other-block",
"write-buffer",
"compression-dictionary-building-buffer",
"misc",
}};

@ -14,6 +14,8 @@
namespace ROCKSDB_NAMESPACE {
// Classifications of block cache entries, for reporting statistics
// Adding new enum to this class requires corresponding updates to
// kCacheEntryRoleToCamelString and kCacheEntryRoleToHyphenString
enum class CacheEntryRole {
// Block-based table data block
kDataBlock,
@ -29,6 +31,9 @@ enum class CacheEntryRole {
kOtherBlock,
// WriteBufferManager reservations to account for memtable usage
kWriteBuffer,
// BlockBasedTableBuilder reservations to account for
// compression dictionary building buffer's memory usage
kCompressionDictionaryBuildingBuffer,
// Default bucket, for miscellaneous cache entries. Do not use for
// entries that could potentially add up to large usage.
kMisc,

@ -69,6 +69,9 @@ Status CacheReservationManager::UpdateCacheReservation(
// This makes it possible to keep the template definitions in the .cc file.
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kWriteBuffer>(std::size_t new_mem_used);
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
std::size_t new_mem_used);
// For cache reservation manager unit tests
template Status CacheReservationManager::UpdateCacheReservation<
CacheEntryRole::kMisc>(std::size_t new_mem_used);

@ -21,6 +21,8 @@
#include <unordered_map>
#include <utility>
#include "cache/cache_entry_roles.h"
#include "cache/cache_reservation_manager.h"
#include "db/dbformat.h"
#include "index_builder.h"
#include "memory/memory_allocator.h"
@ -312,7 +314,7 @@ struct BlockBasedTableBuilder::Rep {
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_buffers`) does not exceed `buffer_limit`.
uint64_t buffer_limit;
std::unique_ptr<CacheReservationManager> cache_rev_mng;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
char cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
@ -444,6 +446,12 @@ struct BlockBasedTableBuilder::Rep {
buffer_limit = std::min(tbo.target_file_size,
compression_opts.max_dict_buffer_bytes);
}
if (table_options.no_block_cache) {
cache_rev_mng.reset(nullptr);
} else {
cache_rev_mng.reset(
new CacheReservationManager(table_options.block_cache));
}
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type));
}
@ -896,11 +904,25 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
assert(!r->data_block.empty());
r->first_key_in_next_block = &key;
Flush();
if (r->state == Rep::State::kBuffered) {
bool exceeds_buffer_limit =
(r->buffer_limit != 0 && r->data_begin_offset > r->buffer_limit);
bool is_cache_full = false;
if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
r->data_begin_offset > r->buffer_limit) {
// Increase cache reservation for the last buffered data block
// only if the block is not going to be unbuffered immediately
// and there exists a cache reservation manager
if (!exceeds_buffer_limit && r->cache_rev_mng != nullptr) {
Status s = r->cache_rev_mng->UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
r->data_begin_offset);
is_cache_full = s.IsIncomplete();
}
if (exceeds_buffer_limit || is_cache_full) {
EnterUnbuffered();
}
}
// Add item to index block.
// We do not emit the index entry for a block until we have seen the
@ -1910,10 +1932,16 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
r->pending_handle);
}
}
std::swap(iter, next_block_iter);
}
r->data_block_buffers.clear();
r->data_begin_offset = 0;
if (r->cache_rev_mng != nullptr) {
Status s = r->cache_rev_mng->UpdateCacheReservation<
CacheEntryRole::kCompressionDictionaryBuildingBuffer>(
r->data_begin_offset);
s.PermitUncheckedError();
}
}
Status BlockBasedTableBuilder::Finish() {

@ -7,6 +7,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <stddef.h>
#include <stdio.h>
#include <algorithm>
@ -4746,6 +4747,239 @@ TEST_P(BlockBasedTableTest, OutOfBoundOnNext) {
ASSERT_FALSE(iter->UpperBoundCheckResult() == IterBoundCheck::kOutOfBound);
}
TEST_P(
BlockBasedTableTest,
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBuilderFinish) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
constexpr std::size_t kMaxDictBytes = 1024;
constexpr std::size_t kMaxDictBufferBytes = 1024;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
LRUCacheOptions lo;
lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard
lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache(NewLRUCache(lo));
table_options.block_cache = cache;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
Options options;
options.compression = kSnappyCompression;
options.compression_opts.max_dict_bytes = kMaxDictBytes;
options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
test::StringSink* sink = new test::StringSink();
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "test_file_name", FileOptions()));
ImmutableOptions ioptions(options);
MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
IntTblPropCollectorFactories int_tbl_prop_collector_factories;
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kSnappyCompression,
options.compression_opts, kUnknownColumnFamily,
"test_cf", -1 /* level */),
file_writer.get()));
std::string key1 = "key1";
std::string value1 = "val1";
InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
// Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
// therefore won't trigger any data block's buffering
builder->Add(ik1.Encode(), value1);
ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
std::string key2 = "key2";
std::string value2 = "val2";
InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
// Adding the second key will trigger a flush of the last data block (the one
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of that data block.
builder->Add(ik2.Encode(), value2);
// Cache reservation will increase for last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after
// that buffering and the cache will not be full after this reservation
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
EXPECT_LT(cache->GetPinnedUsage(),
1 * kSizeDummyEntry + kMetaDataChargeOverhead);
ASSERT_OK(builder->Finish());
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
}
TEST_P(
BlockBasedTableTest,
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnBufferLimitExceed) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
constexpr std::size_t kCacheCapacity = 8 * 1024 * 1024;
constexpr std::size_t kMaxDictBytes = 1024;
constexpr std::size_t kMaxDictBufferBytes = 2 * kSizeDummyEntry;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
LRUCacheOptions lo;
lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard
lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache(NewLRUCache(lo));
table_options.block_cache = cache;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
Options options;
options.compression = kSnappyCompression;
options.compression_opts.max_dict_bytes = kMaxDictBytes;
options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
test::StringSink* sink = new test::StringSink();
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "test_file_name", FileOptions()));
ImmutableOptions ioptions(options);
MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
IntTblPropCollectorFactories int_tbl_prop_collector_factories;
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kSnappyCompression,
options.compression_opts, kUnknownColumnFamily,
"test_cf", -1 /* level */),
file_writer.get()));
std::string key1 = "key1";
std::string value1(kSizeDummyEntry, '0');
InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
// Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
// therefore won't trigger any data block's buffering
builder->Add(ik1.Encode(), value1);
ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
std::string key2 = "key2";
std::string value2(kSizeDummyEntry, '0');
InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
// Adding the second key will trigger a flush of the last data block (the one
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block.
builder->Add(ik2.Encode(), value2);
// Cache reservation will increase for last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after
// the buffering and the cache will not be full after this reservation
EXPECT_GE(cache->GetPinnedUsage(), 2 * kSizeDummyEntry);
EXPECT_LT(cache->GetPinnedUsage(),
2 * kSizeDummyEntry + kMetaDataChargeOverhead);
std::string key3 = "key3";
std::string value3 = "val3";
InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue);
// Adding the third key will trigger a flush of the last data block (the one
// containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block.
builder->Add(ik3.Encode(), value3);
// Cache reservation will decrease since the buffer limit is now exceeded
// after the last buffering and EnterUnbuffered() is triggered
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
ASSERT_OK(builder->Finish());
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
}
TEST_P(
BlockBasedTableTest,
IncreaseCacheReservationForCompressDictBuildingBufferOnBuilderAddAndDecreaseOnCacheFull) {
constexpr std::size_t kSizeDummyEntry = 256 * 1024;
constexpr std::size_t kMetaDataChargeOverhead = 10000;
// A small kCacheCapacity is chosen so that increase cache reservation for
// buffering two data blocks, each containing key1/value1, key2/a big
// value2, will cause cache full
constexpr std::size_t kCacheCapacity =
1 * kSizeDummyEntry + kSizeDummyEntry / 2;
constexpr std::size_t kMaxDictBytes = 1024;
// A big kMaxDictBufferBytes is chosen so that adding a big key value pair
// (key2, value2) won't exceed the buffer limit
constexpr std::size_t kMaxDictBufferBytes = 1024 * 1024 * 1024;
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
LRUCacheOptions lo;
lo.capacity = kCacheCapacity;
lo.num_shard_bits = 0; // 2^0 shard
lo.strict_capacity_limit = true;
std::shared_ptr<Cache> cache(NewLRUCache(lo));
table_options.block_cache = cache;
table_options.flush_block_policy_factory =
std::make_shared<FlushBlockEveryKeyPolicyFactory>();
Options options;
options.compression = kSnappyCompression;
options.compression_opts.max_dict_bytes = kMaxDictBytes;
options.compression_opts.max_dict_buffer_bytes = kMaxDictBufferBytes;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
test::StringSink* sink = new test::StringSink();
std::unique_ptr<FSWritableFile> holder(sink);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(holder), "test_file_name", FileOptions()));
ImmutableOptions ioptions(options);
MutableCFOptions moptions(options);
InternalKeyComparator ikc(options.comparator);
IntTblPropCollectorFactories int_tbl_prop_collector_factories;
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kSnappyCompression,
options.compression_opts, kUnknownColumnFamily,
"test_cf", -1 /* level */),
file_writer.get()));
std::string key1 = "key1";
std::string value1 = "val1";
InternalKey ik1(key1, 0 /* sequnce number */, kTypeValue);
// Adding the first key won't trigger a flush by FlushBlockEveryKeyPolicy
// therefore won't trigger any data block's buffering
builder->Add(ik1.Encode(), value1);
ASSERT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
std::string key2 = "key2";
std::string value2(kSizeDummyEntry, '0');
InternalKey ik2(key2, 1 /* sequnce number */, kTypeValue);
// Adding the second key will trigger a flush of the last data block (the one
// containing key1 and value1) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block.
builder->Add(ik2.Encode(), value2);
// Cache reservation will increase for the last buffered data block (the one
// containing key1 and value1) since the buffer limit is not exceeded after
// the buffering and the cache will not be full after this reservation
EXPECT_GE(cache->GetPinnedUsage(), 1 * kSizeDummyEntry);
EXPECT_LT(cache->GetPinnedUsage(),
1 * kSizeDummyEntry + kMetaDataChargeOverhead);
std::string key3 = "key3";
std::string value3 = "value3";
InternalKey ik3(key3, 2 /* sequnce number */, kTypeValue);
// Adding the third key will trigger a flush of the last data block (the one
// containing key2 and value2) by FlushBlockEveryKeyPolicy and hence trigger
// buffering of the last data block.
builder->Add(ik3.Encode(), value3);
// Cache reservation will decrease since the cache is now full after
// increasing reservation for the last buffered block and EnterUnbuffered() is
// triggered
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
ASSERT_OK(builder->Finish());
EXPECT_EQ(cache->GetPinnedUsage(), 0 * kSizeDummyEntry);
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

Loading…
Cancel
Save