Cache warming blocks during flush (#8561)

Summary:
Insert warm blocks  (data, uncompressed dict, index and filter blocks) during flush in Block cache which is enabled under option BlockBasedTableOptions.prepopulate_block_cache.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8561

Test Plan: Added unit test

Reviewed By: anand1976

Differential Revision: D29773411

Pulled By: akankshamahajan15

fbshipit-source-id: 6631123c10134340ef0bd7e90baafaa6deba0e66
main
Akanksha Mahajan 3 years ago committed by Facebook GitHub Bot
parent b278152261
commit 8b2f60b668
  1. 1
      HISTORY.md
  2. 36
      db/db_block_cache_test.cc
  3. 22
      include/rocksdb/table.h
  4. 109
      table/block_based/block_based_table_builder.cc
  5. 18
      table/block_based/block_based_table_builder.h

@ -5,6 +5,7 @@
### New Features
* Made the EventListener extend the Customizable class.
* EventListeners that have a non-empty Name() and that are registered with the ObjectRegistry can now be serialized to/from the OPTIONS file.
* Insert warm blocks (data blocks, uncompressed dict blocks, index and filter blocks) in Block cache during flush under option BlockBasedTableOptions.prepopulate_block_cache. Previously it was enabled for only data blocks.
### Performance Improvements
* Try to avoid updating DBOptions if `SetDBOptions()` does not change any option value.

@ -497,10 +497,46 @@ TEST_F(DBBlockCacheTest, WarmCacheWithDataBlocksDuringFlush) {
ASSERT_OK(Put(ToString(i), value));
ASSERT_OK(Flush());
ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD));
ASSERT_EQ(value, Get(ToString(i)));
ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS));
ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT));
}
}
// This test cache all types of blocks during flush.
TEST_F(DBBlockCacheTest, WarmCacheWithBlocksDuringFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
BlockBasedTableOptions table_options;
table_options.block_cache = NewLRUCache(1 << 25, 0, false);
table_options.cache_index_and_filter_blocks = true;
table_options.prepopulate_block_cache =
BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly;
table_options.filter_policy.reset(NewBloomFilterPolicy(10, false));
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DestroyAndReopen(options);
std::string value(kValueSize, 'a');
for (size_t i = 1; i < 2; i++) {
ASSERT_OK(Put(ToString(i), value));
ASSERT_OK(Flush());
ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_ADD));
ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_ADD));
ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_FILTER_ADD));
ASSERT_EQ(value, Get(ToString(i)));
ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_DATA_MISS));
ASSERT_EQ(i, options.statistics->getTickerCount(BLOCK_CACHE_DATA_HIT));
ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_MISS));
ASSERT_EQ(i * 3, options.statistics->getTickerCount(BLOCK_CACHE_INDEX_HIT));
ASSERT_EQ(0, options.statistics->getTickerCount(BLOCK_CACHE_FILTER_MISS));
ASSERT_EQ(i * 2,
options.statistics->getTickerCount(BLOCK_CACHE_FILTER_HIT));
}
}
#endif

@ -464,22 +464,18 @@ struct BlockBasedTableOptions {
// Default: 256 KB (256 * 1024).
size_t max_auto_readahead_size = 256 * 1024;
// If enabled, prepopulate warm/hot data blocks which are already in memory
// into block cache at the time of flush. On a flush, the data block that is
// in memory (in memtables) get flushed to the device. If using Direct IO,
// additional IO is incurred to read this data back into memory again, which
// is avoided by enabling this option. This further helps if the workload
// exhibits high temporal locality, where most of the reads go to recently
// written data. This also helps in case of Distributed FileSystem.
//
// Right now, this is enabled only for flush for data blocks. We plan to
// expand this option to cover compactions in the future and for other types
// of blocks.
// If enabled, prepopulate warm/hot blocks (data, uncompressed dict, index and
// filter blocks) which are already in memory into block cache at the time of
// flush. On a flush, the block that is in memory (in memtables) get flushed
// to the device. If using Direct IO, additional IO is incurred to read this
// data back into memory again, which is avoided by enabling this option. This
// further helps if the workload exhibits high temporal locality, where most
// of the reads go to recently written data. This also helps in case of
// Distributed FileSystem.
enum class PrepopulateBlockCache : char {
// Disable prepopulate block cache.
kDisable,
// Prepopulate data blocks during flush only. Plan to extend it to all block
// types.
// Prepopulate blocks during flush only.
kFlushOnly,
};

@ -36,6 +36,7 @@
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_based_table_reader.h"
#include "table/block_based/block_builder.h"
#include "table/block_based/block_like_traits.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/filter_policy_internal.h"
#include "table/block_based/full_filter_block.h"
@ -994,33 +995,34 @@ void BlockBasedTableBuilder::Flush() {
r->get_offset());
r->pc_rep->EmitBlock(block_rep);
} else {
WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */);
WriteBlock(&r->data_block, &r->pending_handle, BlockType::kData);
}
}
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle,
bool is_data_block) {
BlockType block_type) {
block->Finish();
std::string raw_block_contents;
block->SwapAndReset(raw_block_contents);
if (rep_->state == Rep::State::kBuffered) {
assert(is_data_block);
assert(block_type == BlockType::kData);
rep_->data_block_buffers.emplace_back(std::move(raw_block_contents));
rep_->data_begin_offset += rep_->data_block_buffers.back().size();
return;
}
WriteBlock(raw_block_contents, handle, is_data_block);
WriteBlock(raw_block_contents, handle, block_type);
}
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle,
bool is_data_block) {
BlockType block_type) {
Rep* r = rep_;
assert(r->state == Rep::State::kUnbuffered);
Slice block_contents;
CompressionType type;
Status compress_status;
bool is_data_block = block_type == BlockType::kData;
CompressAndVerifyBlock(raw_block_contents, is_data_block,
*(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
&(r->compressed_output), &(block_contents), &type,
@ -1030,8 +1032,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
return;
}
WriteRawBlock(block_contents, type, handle, is_data_block,
&raw_block_contents);
WriteRawBlock(block_contents, type, handle, block_type, &raw_block_contents);
r->compressed_output.clear();
if (is_data_block) {
if (r->filter_builder != nullptr) {
@ -1189,9 +1190,10 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type,
BlockHandle* handle,
bool is_data_block,
BlockType block_type,
const Slice* raw_block_contents) {
Rep* r = rep_;
bool is_data_block = block_type == BlockType::kData;
Status s = Status::OK();
IOStatus io_s = IOStatus::OK();
StopWatch sw(r->ioptions.clock, r->ioptions.stats, WRITE_RAW_BLOCK_MICROS);
@ -1247,13 +1249,12 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (io_s.ok()) {
assert(s.ok());
if (is_data_block &&
r->table_options.prepopulate_block_cache ==
BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly) {
if (r->table_options.prepopulate_block_cache ==
BlockBasedTableOptions::PrepopulateBlockCache::kFlushOnly) {
if (type == kNoCompression) {
s = InsertBlockInCache(block_contents, handle);
s = InsertBlockInCacheHelper(block_contents, handle, block_type);
} else if (raw_block_contents != nullptr) {
s = InsertBlockInCache(*raw_block_contents, handle);
s = InsertBlockInCacheHelper(*raw_block_contents, handle, block_type);
}
if (!s.ok()) {
r->SetStatus(s);
@ -1328,10 +1329,8 @@ void BlockBasedTableBuilder::BGWorkWriteRawBlock() {
}
r->pc_rep->file_size_estimator.SetCurrBlockRawSize(block_rep->data->size());
WriteRawBlock(block_rep->compressed_contents, block_rep->compression_type,
&r->pending_handle, true /* is_data_block*/,
&block_rep->contents);
&r->pending_handle, BlockType::kData, &block_rep->contents);
if (!ok()) {
break;
}
@ -1460,8 +1459,30 @@ Status BlockBasedTableBuilder::InsertBlockInCompressedCache(
return s;
}
Status BlockBasedTableBuilder::InsertBlockInCacheHelper(
const Slice& block_contents, const BlockHandle* handle,
BlockType block_type) {
Status s;
if (block_type == BlockType::kData || block_type == BlockType::kIndex) {
s = InsertBlockInCache<Block>(block_contents, handle, block_type);
} else if (block_type == BlockType::kFilter) {
if (rep_->filter_builder->IsBlockBased()) {
s = InsertBlockInCache<Block>(block_contents, handle, block_type);
} else {
s = InsertBlockInCache<ParsedFullFilterBlock>(block_contents, handle,
block_type);
}
} else if (block_type == BlockType::kCompressionDictionary) {
s = InsertBlockInCache<UncompressionDict>(block_contents, handle,
block_type);
}
return s;
}
template <typename TBlocklike>
Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
const BlockHandle* handle) {
const BlockHandle* handle,
BlockType block_type) {
// Uncompressed regular block cache
Cache* block_cache = rep_->table_options.block_cache.get();
Status s;
@ -1479,15 +1500,25 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
const size_t read_amp_bytes_per_bit =
rep_->table_options.read_amp_bytes_per_bit;
Block* block = new Block(std::move(results), read_amp_bytes_per_bit);
size_t charge = block->ApproximateMemoryUsage();
s = block_cache->Insert(key, block, charge, &DeleteEntryCached<Block>);
if (s.ok()) {
BlockBasedTable::UpdateCacheInsertionMetrics(
BlockType::kData, nullptr /*get_context*/, charge,
s.IsOkOverwritten(), rep_->ioptions.stats);
} else {
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
TBlocklike* block_holder = BlocklikeTraits<TBlocklike>::Create(
std::move(results), read_amp_bytes_per_bit,
rep_->ioptions.statistics.get(),
false /*rep_->blocks_definitely_zstd_compressed*/,
rep_->table_options.filter_policy.get());
if (block_holder->own_bytes()) {
size_t charge = block_holder->ApproximateMemoryUsage();
s = block_cache->Insert(key, block_holder, charge,
&DeleteEntryCached<TBlocklike>);
if (s.ok()) {
BlockBasedTable::UpdateCacheInsertionMetrics(
block_type, nullptr /*get_context*/, charge, s.IsOkOverwritten(),
rep_->ioptions.stats);
} else {
RecordTick(rep_->ioptions.stats, BLOCK_CACHE_ADD_FAILURES);
}
}
}
return s;
@ -1507,7 +1538,8 @@ void BlockBasedTableBuilder::WriteFilterBlock(
rep_->filter_builder->Finish(filter_block_handle, &s);
assert(s.ok() || s.IsIncomplete());
rep_->props.filter_size += filter_content.size();
WriteRawBlock(filter_content, kNoCompression, &filter_block_handle);
WriteRawBlock(filter_content, kNoCompression, &filter_block_handle,
BlockType::kFilter);
}
}
if (ok() && !empty_filter_block) {
@ -1541,7 +1573,7 @@ void BlockBasedTableBuilder::WriteIndexBlock(
if (ok()) {
for (const auto& item : index_blocks.meta_blocks) {
BlockHandle block_handle;
WriteBlock(item.second, &block_handle, false /* is_data_block */);
WriteBlock(item.second, &block_handle, BlockType::kIndex);
if (!ok()) {
break;
}
@ -1550,10 +1582,11 @@ void BlockBasedTableBuilder::WriteIndexBlock(
}
if (ok()) {
if (rep_->table_options.enable_index_compression) {
WriteBlock(index_blocks.index_block_contents, index_block_handle, false);
WriteBlock(index_blocks.index_block_contents, index_block_handle,
BlockType::kIndex);
} else {
WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
index_block_handle);
index_block_handle, BlockType::kIndex);
}
}
// If there are more index partitions, finish them and write them out
@ -1567,10 +1600,10 @@ void BlockBasedTableBuilder::WriteIndexBlock(
}
if (rep_->table_options.enable_index_compression) {
WriteBlock(index_blocks.index_block_contents, index_block_handle,
false);
BlockType::kIndex);
} else {
WriteRawBlock(index_blocks.index_block_contents, kNoCompression,
index_block_handle);
index_block_handle, BlockType::kIndex);
}
// The last index_block_handle will be for the partition index block
}
@ -1665,7 +1698,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
&property_block_builder);
WriteRawBlock(property_block_builder.Finish(), kNoCompression,
&properties_block_handle);
&properties_block_handle, BlockType::kProperties);
}
if (ok()) {
#ifndef NDEBUG
@ -1691,7 +1724,8 @@ void BlockBasedTableBuilder::WriteCompressionDictBlock(
BlockHandle compression_dict_block_handle;
if (ok()) {
WriteRawBlock(rep_->compression_dict->GetRawDict(), kNoCompression,
&compression_dict_block_handle);
&compression_dict_block_handle,
BlockType::kCompressionDictionary);
#ifndef NDEBUG
Slice compression_dict = rep_->compression_dict->GetRawDict();
TEST_SYNC_POINT_CALLBACK(
@ -1711,7 +1745,7 @@ void BlockBasedTableBuilder::WriteRangeDelBlock(
if (ok() && !rep_->range_del_block.empty()) {
BlockHandle range_del_block_handle;
WriteRawBlock(rep_->range_del_block.Finish(), kNoCompression,
&range_del_block_handle);
&range_del_block_handle, BlockType::kRangeDeletion);
meta_index_builder->Add(kRangeDelBlock, range_del_block_handle);
}
}
@ -1872,8 +1906,7 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
}
r->index_builder->OnKeyAdded(key);
}
WriteBlock(Slice(data_block), &r->pending_handle,
true /* is_data_block */);
WriteBlock(Slice(data_block), &r->pending_handle, BlockType::kData);
if (ok() && i + 1 < r->data_block_buffers.size()) {
assert(next_block_iter != nullptr);
Slice first_key_in_next_block = next_block_iter->key();
@ -1935,7 +1968,7 @@ Status BlockBasedTableBuilder::Finish() {
if (ok()) {
// flush the meta index block
WriteRawBlock(meta_index_builder.Finish(), kNoCompression,
&metaindex_block_handle);
&metaindex_block_handle, BlockType::kMetaIndex);
}
if (ok()) {
WriteFooter(metaindex_block_handle, index_block_handle);

@ -9,6 +9,7 @@
#pragma once
#include <stdint.h>
#include <limits>
#include <string>
#include <utility>
@ -108,20 +109,27 @@ class BlockBasedTableBuilder : public TableBuilder {
// Call block's Finish() method and then
// - in buffered mode, buffer the uncompressed block contents.
// - in unbuffered mode, write the compressed block contents to file.
void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block);
void WriteBlock(BlockBuilder* block, BlockHandle* handle,
BlockType blocktype);
// Compress and write block content to the file.
void WriteBlock(const Slice& block_contents, BlockHandle* handle,
bool is_data_block);
BlockType block_type);
// Directly write data to the file.
void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle,
bool is_data_block = false,
const Slice* raw_data = nullptr);
BlockType block_type, const Slice* raw_data = nullptr);
void SetupCacheKeyPrefix(const TableBuilderOptions& tbo);
template <typename TBlocklike>
Status InsertBlockInCache(const Slice& block_contents,
const BlockHandle* handle);
const BlockHandle* handle, BlockType block_type);
Status InsertBlockInCacheHelper(const Slice& block_contents,
const BlockHandle* handle,
BlockType block_type);
Status InsertBlockInCompressedCache(const Slice& block_contents,
const CompressionType type,
const BlockHandle* handle);

Loading…
Cancel
Save