Cache warming data blocks during flush (#8242)

Summary:
This PR prepopulates warm/hot data blocks which are already in memory
into block cache at the time of flush. On a flush, the data block that is
in memory (in memtables) get flushed to the device. If using Direct IO,
additional IO is incurred to read this data back into memory again, which
is avoided by enabling newly added option.

 Right now, this is enabled only for flush for data blocks. We plan to
expand this option to cover compactions in the future and for other types
 of blocks.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8242

Test Plan: Add new unit test

Reviewed By: anand1976

Differential Revision: D28521703

Pulled By: akankshamahajan15

fbshipit-source-id: 7219d6958821cedce689a219c3963a6f1a9d5f05
main
Akanksha Mahajan 4 years ago committed by Facebook GitHub Bot
parent d53f7ff69a
commit 5ba1b6e549
  1. 3
      HISTORY.md
  2. 90
      db/db_block_cache_test.cc
  3. 3
      db/db_test_util.h
  4. 22
      include/rocksdb/table.h
  5. 3
      options/options_settable_test.cc
  6. 123
      table/block_based/block_based_table_builder.cc
  7. 8
      table/block_based/block_based_table_builder.h
  8. 19
      table/block_based/block_based_table_factory.cc
  9. 13
      table/block_based/block_based_table_reader.cc
  10. 9
      table/block_based/block_based_table_reader.h
  11. 19
      tools/db_bench_tool.cc

@ -9,9 +9,8 @@
### New Features
* Marked the Ribbon filter and optimize_filters_for_memory features as production-ready, each enabling memory savings for Bloom-like filters. Use `NewRibbonFilterPolicy` in place of `NewBloomFilterPolicy` to use Ribbon filters instead of Bloom, or `ribbonfilter` in place of `bloomfilter` in configuration string.
### New Features
* Allow `DBWithTTL` to use `DeleteRange` api just like other DBs. `DeleteRangeCF()` which executes `WriteBatchInternal::DeleteRange()` has been added to the handler in `DBWithTTLImpl::Write()` to implement it.
* Add BlockBasedTableOptions.prepopulate_block_cache. If enabled, it prepopulate warm/hot data blocks which are already in memory into block cache at the time of flush. On a flush, the data block that is in memory (in memtables) get flushed to the device. If using Direct IO, additional IO is incurred to read this data back into memory again, which is avoided by enabling this option and it also helps with Distributed FileSystem. More details in include/rocksdb/table.h.
## 6.21.0 (2021-05-21)
### Bug Fixes

@ -242,34 +242,48 @@ TEST_F(DBBlockCacheTest, TestWithoutCompressedBlockCache) {
#ifdef SNAPPY
TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
ReadOptions read_options;
auto table_options = GetTableOptions();
auto options = GetOptions(table_options);
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.no_block_cache = true;
table_options.block_cache_compressed = nullptr;
table_options.block_size = 1;
table_options.filter_policy.reset(NewBloomFilterPolicy(20));
table_options.cache_index_and_filter_blocks = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.compression = CompressionType::kSnappyCompression;
InitTable(options);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
DestroyAndReopen(options);
std::string value(kValueSize, 'a');
for (size_t i = 0; i < kNumBlocks; i++) {
ASSERT_OK(Put(ToString(i), value));
ASSERT_OK(Flush());
}
ReadOptions read_options;
std::shared_ptr<Cache> compressed_cache = NewLRUCache(1 << 25, 0, false);
std::shared_ptr<Cache> cache = NewLRUCache(0, 0, false);
table_options.block_cache = cache;
table_options.no_block_cache = false;
table_options.block_cache_compressed = compressed_cache;
table_options.max_auto_readahead_size = 0;
table_options.cache_index_and_filter_blocks = false;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
Reopen(options);
RecordCacheCounters(options);
std::vector<std::unique_ptr<Iterator>> iterators(kNumBlocks - 1);
Iterator* iter = nullptr;
// Load blocks into cache.
for (size_t i = 0; i + 1 < kNumBlocks; i++) {
iter = db_->NewIterator(read_options);
iter->Seek(ToString(i));
ASSERT_OK(iter->status());
for (size_t i = 0; i < kNumBlocks - 1; i++) {
ASSERT_EQ(value, Get(ToString(i)));
CheckCacheCounters(options, 1, 0, 1, 0);
CheckCompressedCacheCounters(options, 1, 0, 1, 0);
iterators[i].reset(iter);
}
size_t usage = cache->GetUsage();
ASSERT_LT(0, usage);
ASSERT_EQ(0, usage);
ASSERT_EQ(usage, cache->GetPinnedUsage());
size_t compressed_usage = compressed_cache->GetUsage();
ASSERT_LT(0, compressed_usage);
@ -281,24 +295,21 @@ TEST_F(DBBlockCacheTest, TestWithCompressedBlockCache) {
cache->SetCapacity(usage);
cache->SetStrictCapacityLimit(true);
ASSERT_EQ(usage, cache->GetPinnedUsage());
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_TRUE(iter->status().IsIncomplete());
CheckCacheCounters(options, 1, 0, 0, 1);
// Load last key block.
ASSERT_EQ("Result incomplete: Insert failed due to LRU cache being full.",
Get(ToString(kNumBlocks - 1)));
// Failure won't record the miss counter.
CheckCacheCounters(options, 0, 0, 0, 1);
CheckCompressedCacheCounters(options, 1, 0, 1, 0);
delete iter;
iter = nullptr;
// Clear strict capacity limit flag. This time we shall hit compressed block
// cache.
// cache and load into block cache.
cache->SetStrictCapacityLimit(false);
iter = db_->NewIterator(read_options);
iter->Seek(ToString(kNumBlocks - 1));
ASSERT_OK(iter->status());
// Load last key block.
ASSERT_EQ(value, Get(ToString(kNumBlocks - 1)));
CheckCacheCounters(options, 1, 0, 1, 0);
CheckCompressedCacheCounters(options, 0, 1, 0, 0);
delete iter;
iter = nullptr;
}
#endif // SNAPPY
@ -446,6 +457,33 @@ TEST_F(DBBlockCacheTest, IndexAndFilterBlocksStats) {
// filter_bytes_insert);
}
#if (defined OS_LINUX || defined OS_WIN)
TEST_F(DBBlockCacheTest, WarmCacheWithDataBlocksDuringFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.block_cache = NewLRUCache(1 << 25, 0, false);
table_options.cache_index_and_filter_blocks = false;
table_options.prepopulate_block_cache =
BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
std::string value(kValueSize, 'a');
for (size_t i = 1; i <= kNumBlocks; i++) {
ASSERT_OK(Put(ToString(i), value));
ASSERT_OK(Flush());
ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD));
ASSERT_EQ(value, Get(ToString(i)));
ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS));
ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT));
}
}
#endif
namespace {
// A mock cache wraps LRUCache, and record how many entries have been

@ -303,6 +303,9 @@ class SpecialEnv : public EnvWrapper {
Status Allocate(uint64_t offset, uint64_t len) override {
return base_->Allocate(offset, len);
}
size_t GetUniqueId(char* id, size_t max_size) const override {
return base_->GetUniqueId(id, max_size);
}
};
class ManifestFile : public WritableFile {
public:

@ -463,6 +463,28 @@ struct BlockBasedTableOptions {
//
// Default: 256 KB (256 * 1024).
size_t max_auto_readahead_size = 256 * 1024;
// If enabled, prepopulate warm/hot data blocks which are already in memory
// into block cache at the time of flush. On a flush, the data block that is
// in memory (in memtables) get flushed to the device. If using Direct IO,
// additional IO is incurred to read this data back into memory again, which
// is avoided by enabling this option. This further helps if the workload
// exhibits high temporal locality, where most of the reads go to recently
// written data. This also helps in case of Distributed FileSystem.
//
// Right now, this is enabled only for flush for data blocks. We plan to
// expand this option to cover compactions in the future and for other types
// of blocks.
enum class PrepopulateBlockCache : char {
// Disable prepopulate block cache.
kDisable,
// Prepopulate data blocks during flush only. Plan to extend it to all block
// types.
kFlushOnly,
};
PrepopulateBlockCache prepopulate_block_cache =
PrepopulateBlockCache::kDisable;
};
// Table Properties that are specific to block-based table properties.

@ -180,7 +180,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"verify_compression=true;read_amp_bytes_per_bit=0;"
"enable_index_compression=false;"
"block_align=true;"
"max_auto_readahead_size=0",
"max_auto_readahead_size=0;"
"prepopulate_block_cache=kDisable",
new_bbto));
ASSERT_EQ(unset_bytes_base,

@ -314,6 +314,8 @@ struct BlockBasedTableBuilder::Rep {
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
char cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t cache_key_prefix_size;
char compressed_cache_key_prefix[BlockBasedTable::kMaxCacheKeyPrefixSize];
size_t compressed_cache_key_prefix_size;
@ -436,6 +438,7 @@ struct BlockBasedTableBuilder::Rep {
: State::kUnbuffered),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
cache_key_prefix_size(0),
compressed_cache_key_prefix_size(0),
flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy(
@ -869,13 +872,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
}
if (table_options.block_cache_compressed.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
table_options.block_cache_compressed.get(), file->writable_file(),
&rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size, tbo.db_session_id,
tbo.cur_file_num);
}
SetupCacheKeyPrefix(tbo);
if (rep_->IsParallelCompressionEnabled()) {
StartParallelCompression();
@ -1031,7 +1029,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (!ok()) {
return;
}
WriteRawBlock(block_contents, type, handle, is_data_block);
WriteRawBlock(block_contents, type, handle, is_data_block,
&raw_block_contents);
r->compressed_output.clear();
if (is_data_block) {
if (r->filter_builder != nullptr) {
@ -1189,7 +1189,8 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type,
BlockHandle* handle,
bool is_data_block) {
bool is_data_block,
const Slice* raw_block_contents) {
Rep* r = rep_;
Status s = Status::OK();
IOStatus io_s = IOStatus::OK();
@ -1246,7 +1247,21 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (io_s.ok()) {
assert(s.ok());
s = InsertBlockInCache(block_contents, type, handle);
if (is_data_block &&
r->table_options.prepopulate_block_cache ==
BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly) {
if (type == kNoCompression) {
s = InsertBlockInCache(block_contents, handle);
} else if (raw_block_contents != nullptr) {
s = InsertBlockInCache(*raw_block_contents, handle);
}
if (!s.ok()) {
r->SetStatus(s);
}
}
// TODO:: Should InsertBlockInCompressedCache take into account error from
// InsertBlockInCache or ignore and overwrite it.
s = InsertBlockInCompressedCache(block_contents, type, handle);
if (!s.ok()) {
r->SetStatus(s);
}
@ -1313,8 +1328,10 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
}
r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
&r->pending_handle, true /* is_data_block*/);
&r->pending_handle, true /* is_data_block*/,
&block_rep->contents);
if (!ok()) {
break;
}
@ -1370,20 +1387,42 @@ IOStatus BlockBasedTableBuilder::io_status() const {
return rep_->GetIOStatus();
}
static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
BlockContents* bc = reinterpret_cast<BlockContents*>(value);
delete bc;
namespace {
// Delete the entry resided in the cache.
template <class Entry>
void DeleteEntryCached(const Slice& /*key*/, void* value) {
auto entry = reinterpret_cast<Entry*>(value);
delete entry;
}
} // namespace
// Helper function to setup the cache key's prefix for the Table.
void BlockBasedTableBuilder::SetupCacheKeyPrefix(
const TableBuilderOptions& tbo) {
if (rep_->table_options.block_cache.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
rep_->table_options.block_cache.get(), rep_->file->writable_file(),
&rep_->cache_key_prefix[0], &rep_->cache_key_prefix_size,
tbo.db_session_id, tbo.cur_file_num);
}
if (rep_->table_options.block_cache_compressed.get() != nullptr) {
BlockBasedTable::GenerateCachePrefix<Cache, FSWritableFile>(
rep_->table_options.block_cache_compressed.get(),
rep_->file->writable_file(), &rep_->compressed_cache_key_prefix[0],
&rep_->compressed_cache_key_prefix_size, tbo.db_session_id,
tbo.cur_file_num);
}
}
//
// Make a copy of the block contents and insert into compressed block cache
//
Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
const CompressionType type,
Status BlockBasedTableBuilder::InsertBlockInCompressedCache(
const Slice& block_contents, const CompressionType type,
const BlockHandle* handle) {
Rep* r = rep_;
Cache* block_cache_compressed = r->table_options.block_cache_compressed.get();
Status s;
if (type != kNoCompression && block_cache_compressed != nullptr) {
size_t size = block_contents.size();
@ -1405,19 +1444,53 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
Slice key(r->compressed_cache_key_prefix,
static_cast<size_t>(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
// How should we deal with compressed cache full?
block_cache_compressed
->Insert(key, block_contents_to_cache,
s = block_cache_compressed->Insert(
key, block_contents_to_cache,
block_contents_to_cache->ApproximateMemoryUsage(),
&DeleteCachedBlockContents)
.PermitUncheckedError();
&DeleteEntryCached<BlockContents>);
if (s.ok()) {
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_COMPRESSED_ADD);
} else {
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
}
// Invalidate OS cache.
r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
.PermitUncheckedError();
}
return Status::OK();
return s;
}
Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
const BlockHandle* handle) {
// Uncompressed regular block cache
Cache* block_cache = rep_->table_options.block_cache.get();
Status s;
if (block_cache != nullptr) {
size_t size = block_contents.size();
auto buf = AllocateBlock(size, block_cache->memory_allocator());
memcpy(buf.get(), block_contents.data(), size);
BlockContents results(std::move(buf), size);
char
cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice key = BlockBasedTable::GetCacheKey(rep_->cache_key_prefix,
rep_->cache_key_prefix_size,
*handle, cache_key);
const size_t read_amp_bytes_per_bit =
rep_->table_options.read_amp_bytes_per_bit;
Block* block = new Block(std::move(results), read_amp_bytes_per_bit);
size_t charge = block->ApproximateMemoryUsage();
s = block_cache->Insert(key, block, charge, &DeleteEntryCached<Block>);
if (s.ok()) {
BlockBasedTable::UpdateCacheInsertionMetrics(
BlockType::kData, nullptr /*get_context*/, charge,
s.IsOkOverwritten(), rep_->ioptions.stats);
} else {
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
}
}
return s;
}
void BlockBasedTableBuilder::WriteFilterBlock(

@ -115,8 +115,14 @@ class BlockBasedTableBuilder : public TableBuilder {
bool is_data_block);
// Directly write data to the file.
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle,
bool is_data_block = false);
bool is_data_block = false,
const Slice* raw_data = nullptr);
void SetupCacheKeyPrefix(const TableBuilderOptions& tbo);
Status InsertBlockInCache(const Slice& block_contents,
const BlockHandle* handle);
Status InsertBlockInCompressedCache(const Slice& block_contents,
const CompressionType type,
const BlockHandle* handle);

@ -213,6 +213,13 @@ static std::unordered_map<std::string, OptionTypeInfo>
offsetof(struct MetadataCacheOptions, unpartitioned_pinning),
&pinning_tier_type_string_map)}};
static std::unordered_map<std::string,
BlockBasedTableOptions::PrepopulateBlockCache>
block_base_table_prepopulate_block_cache_string_map = {
{"kDisable", BlockBasedTableOptions::PrepopulateBlockCache::kDisable},
{"kFlushOnly",
BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly}};
#endif // ROCKSDB_LITE
static std::unordered_map<std::string, OptionTypeInfo>
@ -416,6 +423,11 @@ static std::unordered_map<std::string, OptionTypeInfo>
{offsetof(struct BlockBasedTableOptions, max_auto_readahead_size),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"prepopulate_block_cache",
OptionTypeInfo::Enum<BlockBasedTableOptions::PrepopulateBlockCache>(
offsetof(struct BlockBasedTableOptions, prepopulate_block_cache),
&block_base_table_prepopulate_block_cache_string_map)},
#endif // ROCKSDB_LITE
};
@ -486,7 +498,8 @@ Status BlockBasedTableFactory::NewTableReader(
table_reader_options.force_direct_prefetch, &tail_prefetch_stats_,
table_reader_options.block_cache_tracer,
table_reader_options.max_file_size_for_l0_meta_pin,
table_reader_options.cur_db_session_id);
table_reader_options.cur_db_session_id,
table_reader_options.cur_file_num);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(
@ -677,6 +690,10 @@ std::string BlockBasedTableFactory::GetPrintableOptions() const {
snprintf(buffer, kBufferSize,
" max_auto_readahead_size: %" ROCKSDB_PRIszt "\n",
table_options_.max_auto_readahead_size);
ret.append(buffer);
snprintf(buffer, kBufferSize, " prepopulate_block_cache: %d\n",
static_cast<int>(table_options_.prepopulate_block_cache));
ret.append(buffer);
return ret;
}

@ -259,12 +259,9 @@ void BlockBasedTable::UpdateCacheMissMetrics(BlockType block_type,
}
}
void BlockBasedTable::UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context,
size_t usage,
bool redundant) const {
Statistics* const statistics = rep_->ioptions.stats;
void BlockBasedTable::UpdateCacheInsertionMetrics(
BlockType block_type, GetContext* get_context, size_t usage, bool redundant,
Statistics* const statistics) {
// TODO: introduce perf counters for block cache insertions
if (get_context) {
++get_context->get_context_stats_.num_cache_add;
@ -1206,7 +1203,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
cache_handle);
UpdateCacheInsertionMetrics(block_type, get_context, charge,
s.IsOkOverwritten());
s.IsOkOverwritten(), rep_->ioptions.stats);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
}
@ -1313,7 +1310,7 @@ Status BlockBasedTable::PutDataBlockToCache(
cache_handle);
UpdateCacheInsertionMetrics(block_type, get_context, charge,
s.IsOkOverwritten());
s.IsOkOverwritten(), rep_->ioptions.stats);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
}

@ -217,6 +217,11 @@ class BlockBasedTable : public TableReader {
size_t cache_key_prefix_size,
const BlockHandle& handle, char* cache_key);
static void UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context, size_t usage,
bool redundant,
Statistics* const statistics);
// Retrieve all key value pairs from data blocks in the table.
// The key retrieved are internal keys.
Status GetKVPairsFromDataBlocks(std::vector<KVPairBlock>* kv_pair_blocks);
@ -267,9 +272,7 @@ class BlockBasedTable : public TableReader {
size_t usage) const;
void UpdateCacheMissMetrics(BlockType block_type,
GetContext* get_context) const;
void UpdateCacheInsertionMetrics(BlockType block_type,
GetContext* get_context, size_t usage,
bool redundant) const;
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
BlockType block_type,
GetContext* get_context,

@ -557,6 +557,10 @@ DEFINE_bool(block_align,
ROCKSDB_NAMESPACE::BlockBasedTableOptions().block_align,
"Align data blocks on page size");
DEFINE_int64(prepopulate_block_cache, 0,
"Pre-populate hot/warm blocks in block cache. 0 to disable and 1 "
"to insert during flush");
DEFINE_bool(use_data_block_hash_index, false,
"if use kDataBlockBinaryAndHash "
"instead of kDataBlockBinarySearch. "
@ -3998,6 +4002,21 @@ class Benchmark {
block_based_options.enable_index_compression =
FLAGS_enable_index_compression;
block_based_options.block_align = FLAGS_block_align;
BlockBasedTableOptions::PrepopulateBlockCache prepopulate_block_cache =
block_based_options.prepopulate_block_cache;
switch (FLAGS_prepopulate_block_cache) {
case 0:
prepopulate_block_cache =
BlockBasedTableOptions::PrepopulateBlockCache::kDisable;
break;
case 1:
prepopulate_block_cache =
BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly;
break;
default:
fprintf(stderr, "Unknown prepopulate block cache mode\n");
}
block_based_options.prepopulate_block_cache = prepopulate_block_cache;
if (FLAGS_use_data_block_hash_index) {
block_based_options.data_block_index_type =
ROCKSDB_NAMESPACE::BlockBasedTableOptions::kDataBlockBinaryAndHash;

Loading…
Cancel
Save