Remove two variables from BlockContents class and don't use class Block for compressed block (#4650)

Summary:
We carry compression type and "cachable" variables for every block in the block cache, while they take well-known values. 8-byte is wasted for each block (2-byte for useful information but it takes 8 bytes because of padding). With this change, these two variables are removed.

The cachable information is only useful in the process of reading the block. We use other information to infer from it. For compressed blocks, the compression type is a part of the block content itself so we can get it from there.

Some code is slightly refactored so that the cachable information can flow better.

Another change is to only use class BlockContents for compressed block, and narrow the class Block to only be used for uncompressed blocks, including blocks in compressed block cache. This can make the Block class less confusing. It also saves tens of bytes for each block in compressed block cache.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4650

Differential Revision: D12969070

Pulled By: siying

fbshipit-source-id: 548b62724e9eb66993026429fd9c7c3acd1f95ed
main
Siying Dong 6 years ago committed by Facebook Github Bot
parent b76398a82b
commit b82e57d425
  1. 2
      include/rocksdb/table.h
  2. 81
      table/block.cc
  3. 21
      table/block.h
  4. 12
      table/block_based_filter_block_test.cc
  5. 20
      table/block_based_table_builder.cc
  6. 240
      table/block_based_table_reader.cc
  7. 37
      table/block_based_table_reader.h
  8. 22
      table/block_fetcher.cc
  9. 8
      table/block_fetcher.h
  10. 9
      table/block_test.cc
  11. 6
      table/data_block_hash_index_test.cc
  12. 20
      table/format.cc
  13. 53
      table/format.h
  14. 37
      table/meta_blocks.cc
  15. 8
      table/partitioned_filter_block_test.cc
  16. 12
      table/persistent_cache_helper.cc
  17. 1
      table/table_test.cc

@ -139,6 +139,8 @@ struct BlockBasedTableOptions {
// If non-NULL use the specified cache for compressed blocks.
// If NULL, rocksdb will not use a compressed block cache.
// Note: though it looks similar to `block_cache`, RocksDB doesn't put the
// same type of object there.
std::shared_ptr<Cache> block_cache_compressed = nullptr;
// Approximate size of user data packed per block. Note that the

@ -781,47 +781,45 @@ Block::Block(BlockContents&& contents, SequenceNumber _global_seqno,
size_ = 0; // Error marker
} else {
// Should only decode restart points for uncompressed blocks
if (compression_type() == kNoCompression) {
num_restarts_ = NumRestarts();
switch (IndexType()) {
case BlockBasedTableOptions::kDataBlockBinarySearch:
restart_offset_ = static_cast<uint32_t>(size_) -
(1 + num_restarts_) * sizeof(uint32_t);
if (restart_offset_ > size_ - sizeof(uint32_t)) {
// The size is too small for NumRestarts() and therefore
// restart_offset_ wrapped around.
size_ = 0;
}
num_restarts_ = NumRestarts();
switch (IndexType()) {
case BlockBasedTableOptions::kDataBlockBinarySearch:
restart_offset_ = static_cast<uint32_t>(size_) -
(1 + num_restarts_) * sizeof(uint32_t);
if (restart_offset_ > size_ - sizeof(uint32_t)) {
// The size is too small for NumRestarts() and therefore
// restart_offset_ wrapped around.
size_ = 0;
}
break;
case BlockBasedTableOptions::kDataBlockBinaryAndHash:
if (size_ < sizeof(uint32_t) /* block footer */ +
sizeof(uint16_t) /* NUM_BUCK */) {
size_ = 0;
break;
case BlockBasedTableOptions::kDataBlockBinaryAndHash:
if (size_ < sizeof(uint32_t) /* block footer */ +
sizeof(uint16_t) /* NUM_BUCK */) {
size_ = 0;
break;
}
uint16_t map_offset;
data_block_hash_index_.Initialize(
contents.data.data(),
static_cast<uint16_t>(contents.data.size() -
sizeof(uint32_t)), /*chop off
NUM_RESTARTS*/
&map_offset);
restart_offset_ = map_offset - num_restarts_ * sizeof(uint32_t);
if (restart_offset_ > map_offset) {
// map_offset is too small for NumRestarts() and
// therefore restart_offset_ wrapped around.
size_ = 0;
break;
}
}
uint16_t map_offset;
data_block_hash_index_.Initialize(
contents.data.data(),
static_cast<uint16_t>(contents.data.size() -
sizeof(uint32_t)), /*chop off
NUM_RESTARTS*/
&map_offset);
restart_offset_ = map_offset - num_restarts_ * sizeof(uint32_t);
if (restart_offset_ > map_offset) {
// map_offset is too small for NumRestarts() and
// therefore restart_offset_ wrapped around.
size_ = 0;
break;
default:
size_ = 0; // Error marker
}
}
break;
default:
size_ = 0; // Error marker
}
}
}
if (read_amp_bytes_per_bit != 0 && statistics && size_ != 0) {
read_amp_bitmap_.reset(new BlockReadAmpBitmap(
restart_offset_, read_amp_bytes_per_bit, statistics));
@ -834,6 +832,7 @@ DataBlockIter* Block::NewIterator(const Comparator* cmp, const Comparator* ucmp,
bool /*total_order_seek*/,
bool /*key_includes_seq*/,
bool /*value_is_full*/,
bool block_contents_pinned,
BlockPrefixIndex* /*prefix_index*/) {
DataBlockIter* ret_iter;
if (iter != nullptr) {
@ -852,7 +851,7 @@ DataBlockIter* Block::NewIterator(const Comparator* cmp, const Comparator* ucmp,
} else {
ret_iter->Initialize(
cmp, ucmp, data_, restart_offset_, num_restarts_, global_seqno_,
read_amp_bitmap_.get(), cachable(),
read_amp_bitmap_.get(), block_contents_pinned,
data_block_hash_index_.Valid() ? &data_block_hash_index_ : nullptr);
if (read_amp_bitmap_) {
if (read_amp_bitmap_->GetStatistics() != stats) {
@ -870,6 +869,7 @@ IndexBlockIter* Block::NewIterator(const Comparator* cmp,
const Comparator* ucmp, IndexBlockIter* iter,
Statistics* /*stats*/, bool total_order_seek,
bool key_includes_seq, bool value_is_full,
bool block_contents_pinned,
BlockPrefixIndex* prefix_index) {
IndexBlockIter* ret_iter;
if (iter != nullptr) {
@ -890,7 +890,8 @@ IndexBlockIter* Block::NewIterator(const Comparator* cmp,
total_order_seek ? nullptr : prefix_index;
ret_iter->Initialize(cmp, ucmp, data_, restart_offset_, num_restarts_,
prefix_index_ptr, key_includes_seq, value_is_full,
cachable(), nullptr /* data_block_hash_index */);
block_contents_pinned,
nullptr /* data_block_hash_index */);
}
return ret_iter;

@ -153,14 +153,12 @@ class Block {
size_t size() const { return size_; }
const char* data() const { return data_; }
bool cachable() const { return contents_.cachable; }
// The additional memory space taken by the block data.
size_t usable_size() const { return contents_.usable_size(); }
uint32_t NumRestarts() const;
bool own_bytes() const { return contents_.own_bytes(); }
BlockBasedTableOptions::DataBlockIndexType IndexType() const;
CompressionType compression_type() const {
return contents_.compression_type;
}
// If comparator is InternalKeyComparator, user_comparator is its user
// comparator; they are equal otherwise.
@ -180,6 +178,14 @@ class Block {
// If `prefix_index` is not nullptr this block will do hash lookup for the key
// prefix. If total_order_seek is true, prefix_index_ is ignored.
//
// If `block_contents_pinned` is true, the caller will guarantee that when
// the cleanup functions are transferred from the iterator to other
// classes, e.g. PinnableSlice, the pointer to the bytes will still be
// valid. Either the iterator holds cache handle or ownership of some resource
// and release them in a release function, or caller is sure that the data
// will not go away (for example, it's from mmapped file which will not be
// closed).
//
// NOTE: for the hash based lookup, if a key prefix doesn't match any key,
// the iterator will simply be set as "invalid", rather than returning
// the key that is just pass the target key.
@ -188,7 +194,8 @@ class Block {
const Comparator* comparator, const Comparator* user_comparator,
TBlockIter* iter = nullptr, Statistics* stats = nullptr,
bool total_order_seek = true, bool key_includes_seq = true,
bool value_is_full = true, BlockPrefixIndex* prefix_index = nullptr);
bool value_is_full = true, bool block_contents_pinned = false,
BlockPrefixIndex* prefix_index = nullptr);
// Report an approximation of how much memory has been used.
size_t ApproximateMemoryUsage() const;
@ -295,7 +302,9 @@ class BlockIter : public InternalIteratorBase<TValue> {
Slice value_;
Status status_;
bool key_pinned_;
// whether the block data is guaranteed to outlive this iterator
// Whether the block data is guaranteed to outlive this iterator, and
// as long as the cleanup functions are transferred to another class,
// e.g. PinnableSlice, the pointer to the bytes will still be valid.
bool block_contents_pinned_;
SequenceNumber global_seqno_;

@ -55,7 +55,7 @@ class FilterBlockTest : public testing::Test {
TEST_F(FilterBlockTest, EmptyBuilder) {
BlockBasedFilterBlockBuilder builder(nullptr, table_options_);
BlockContents block(builder.Finish(), false, kNoCompression);
BlockContents block(builder.Finish());
ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block.data));
BlockBasedFilterBlockReader reader(nullptr, table_options_, true,
std::move(block), nullptr);
@ -75,7 +75,7 @@ TEST_F(FilterBlockTest, SingleChunk) {
builder.StartBlock(300);
builder.Add("hello");
ASSERT_EQ(5, builder.NumAdded());
BlockContents block(builder.Finish(), false, kNoCompression);
BlockContents block(builder.Finish());
BlockBasedFilterBlockReader reader(nullptr, table_options_, true,
std::move(block), nullptr);
ASSERT_TRUE(reader.KeyMayMatch("foo", nullptr, 100));
@ -107,7 +107,7 @@ TEST_F(FilterBlockTest, MultiChunk) {
builder.Add("box");
builder.Add("hello");
BlockContents block(builder.Finish(), false, kNoCompression);
BlockContents block(builder.Finish());
BlockBasedFilterBlockReader reader(nullptr, table_options_, true,
std::move(block), nullptr);
@ -152,7 +152,7 @@ class BlockBasedFilterBlockTest : public testing::Test {
TEST_F(BlockBasedFilterBlockTest, BlockBasedEmptyBuilder) {
FilterBlockBuilder* builder = new BlockBasedFilterBlockBuilder(
nullptr, table_options_);
BlockContents block(builder->Finish(), false, kNoCompression);
BlockContents block(builder->Finish());
ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block.data));
FilterBlockReader* reader = new BlockBasedFilterBlockReader(
nullptr, table_options_, true, std::move(block), nullptr);
@ -174,7 +174,7 @@ TEST_F(BlockBasedFilterBlockTest, BlockBasedSingleChunk) {
builder->Add("box");
builder->StartBlock(300);
builder->Add("hello");
BlockContents block(builder->Finish(), false, kNoCompression);
BlockContents block(builder->Finish());
FilterBlockReader* reader = new BlockBasedFilterBlockReader(
nullptr, table_options_, true, std::move(block), nullptr);
ASSERT_TRUE(reader->KeyMayMatch("foo", nullptr, 100));
@ -210,7 +210,7 @@ TEST_F(BlockBasedFilterBlockTest, BlockBasedMultiChunk) {
builder->Add("box");
builder->Add("hello");
BlockContents block(builder->Finish(), false, kNoCompression);
BlockContents block(builder->Finish());
FilterBlockReader* reader = new BlockBasedFilterBlockReader(
nullptr, table_options_, true, std::move(block), nullptr);

@ -654,9 +654,9 @@ Status BlockBasedTableBuilder::status() const {
return rep_->status;
}
static void DeleteCachedBlock(const Slice& /*key*/, void* value) {
Block* block = reinterpret_cast<Block*>(value);
delete block;
static void DeleteCachedBlockContents(const Slice& /*key*/, void* value) {
BlockContents* bc = reinterpret_cast<BlockContents*>(value);
delete bc;
}
//
@ -677,9 +677,11 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
memcpy(ubuf.get(), block_contents.data(), size);
ubuf[size] = type;
BlockContents results(std::move(ubuf), size, true, type);
Block* block = new Block(std::move(results), kDisableGlobalSequenceNumber);
BlockContents* block_contents_to_cache =
new BlockContents(std::move(ubuf), size);
#ifndef NDEBUG
block_contents_to_cache->is_raw_block = true;
#endif // NDEBUG
// make cache key by appending the file offset to the cache prefix id
char* end = EncodeVarint64(
@ -690,8 +692,10 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
block_cache_compressed->Insert(key, block, block->ApproximateMemoryUsage(),
&DeleteCachedBlock);
block_cache_compressed->Insert(
key, block_contents_to_cache,
block_contents_to_cache->ApproximateMemoryUsage(),
&DeleteCachedBlockContents);
// Invalidate OS cache.
r->file->InvalidateCache(static_cast<size_t>(r->offset), size);

@ -80,12 +80,11 @@ Status ReadBlockFromFile(
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, MemoryAllocator* allocator = nullptr,
const bool immortal_file = false) {
size_t read_amp_bytes_per_bit, MemoryAllocator* allocator = nullptr) {
BlockContents contents;
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, options, handle, &contents, ioptions,
do_uncompress, compression_dict, cache_options, allocator, immortal_file);
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
&contents, ioptions, do_uncompress,
compression_dict, cache_options, allocator);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno,
@ -245,6 +244,8 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
Statistics* kNullStats = nullptr;
// Filters are already checked before seeking the index
if (!partition_map_.empty()) {
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return NewTwoLevelIterator(
new BlockBasedTable::PartitionedIndexIteratorState(
table_, &partition_map_, index_key_includes_seq_,
@ -256,6 +257,8 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
auto ro = ReadOptions();
ro.fill_cache = fill_cache;
bool kIsIndex = true;
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return new BlockBasedTableIterator<IndexBlockIter, BlockHandle>(
table_, ro, *icomparator_,
index_block_->NewIterator<IndexBlockIter>(
@ -276,6 +279,8 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
IndexBlockIter biter;
BlockHandle handle;
Statistics* kNullStats = nullptr;
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
index_block_->NewIterator<IndexBlockIter>(
icomparator_, icomparator_->user_comparator(), &biter, kNullStats, true,
index_key_includes_seq_, index_value_is_full_);
@ -318,7 +323,7 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
const bool is_index = true;
// TODO: Support counter batch update for partitioned index and
// filter blocks
s = table_->MaybeLoadDataBlockToCache(
s = table_->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), rep, ro, handle, compression_dict, &block,
is_index, nullptr /* get_context */);
@ -415,6 +420,8 @@ class BinarySearchIndexReader : public IndexReader {
IndexBlockIter* iter = nullptr, bool /*dont_care*/ = true,
bool /*dont_care*/ = true) override {
Statistics* kNullStats = nullptr;
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return index_block_->NewIterator<IndexBlockIter>(
icomparator_, icomparator_->user_comparator(), iter, kNullStats, true,
index_key_includes_seq_, index_value_is_full_);
@ -540,10 +547,12 @@ class HashIndexReader : public IndexReader {
IndexBlockIter* iter = nullptr, bool total_order_seek = true,
bool /*dont_care*/ = true) override {
Statistics* kNullStats = nullptr;
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return index_block_->NewIterator<IndexBlockIter>(
icomparator_, icomparator_->user_comparator(), iter, kNullStats,
total_order_seek, index_key_includes_seq_, index_value_is_full_,
prefix_index_.get());
false /* block_contents_pinned */, prefix_index_.get());
}
virtual size_t size() const override { return index_block_->size(); }
@ -578,8 +587,7 @@ class HashIndexReader : public IndexReader {
assert(index_block_ != nullptr);
}
~HashIndexReader() {
}
~HashIndexReader() {}
std::unique_ptr<Block> index_block_;
std::unique_ptr<BlockPrefixIndex> prefix_index_;
@ -972,7 +980,7 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
s.ToString().c_str());
} else if (found_range_del_block && !rep->range_del_handle.IsNull()) {
ReadOptions read_options;
s = MaybeLoadDataBlockToCache(
s = MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), rep, read_options, rep->range_del_handle,
Slice() /* compression_dict */, &rep->range_del_entry,
false /* is_index */, nullptr /* get_context */);
@ -1177,15 +1185,14 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
GetContext* get_context, MemoryAllocator* allocator) {
Cache* block_cache, Cache* block_cache_compressed, Rep* rep,
const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, const Slice& compression_dict,
size_t read_amp_bytes_per_bit, bool is_index, GetContext* get_context) {
Status s;
Block* compressed_block = nullptr;
BlockContents* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
Statistics* statistics = ioptions.statistics;
Statistics* statistics = rep->ioptions.statistics;
// Lookup uncompressed cache first
if (block_cache != nullptr) {
@ -1228,32 +1235,34 @@ Status BlockBasedTable::GetDataBlockFromCache(
// found compressed block
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_HIT);
compressed_block = reinterpret_cast<Block*>(
compressed_block = reinterpret_cast<BlockContents*>(
block_cache_compressed->Value(block_cache_compressed_handle));
assert(compressed_block->compression_type() != kNoCompression);
CompressionType compression_type = compressed_block->get_compression_type();
assert(compression_type != kNoCompression);
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
UncompressionContext uncompresssion_ctx(compressed_block->compression_type(),
compression_dict);
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
compressed_block->size(), &contents,
format_version, ioptions, allocator);
UncompressionContext uncompresssion_ctx(compression_type, compression_dict);
s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data.data(),
compressed_block->data.size(), &contents,
rep->table_options.format_version, rep->ioptions,
GetMemoryAllocator(rep->table_options));
// Insert uncompressed block into block cache
if (s.ok()) {
block->value =
new Block(std::move(contents), compressed_block->global_seqno(),
new Block(std::move(contents), rep->get_global_seqno(is_index),
read_amp_bytes_per_bit,
statistics); // uncompressed block
assert(block->value->compression_type() == kNoCompression);
if (block_cache != nullptr && block->value->cachable() &&
if (block_cache != nullptr && block->value->own_bytes() &&
read_options.fill_cache) {
size_t charge = block->value->ApproximateMemoryUsage();
s = block_cache->Insert(block_cache_key, block->value, charge,
&DeleteCachedEntry<Block>,
&(block->cache_handle));
#ifndef NDEBUG
block_cache->TEST_mark_as_data_block(block_cache_key, charge);
#endif // NDEBUG
if (s.ok()) {
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
@ -1298,65 +1307,77 @@ Status BlockBasedTable::PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit, bool is_index,
Cache::Priority priority, GetContext* get_context,
MemoryAllocator* allocator) {
assert(raw_block->compression_type() == kNoCompression ||
CachableEntry<Block>* cached_block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type, uint32_t format_version,
const Slice& compression_dict, SequenceNumber seq_no,
size_t read_amp_bytes_per_bit, bool is_index, Cache::Priority priority,
GetContext* get_context, MemoryAllocator* allocator) {
assert(raw_block_comp_type == kNoCompression ||
block_cache_compressed != nullptr);
Status s;
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
BlockContents uncompressed_block_contents;
Statistics* statistics = ioptions.statistics;
if (raw_block->compression_type() != kNoCompression) {
UncompressionContext uncompression_ctx(raw_block->compression_type(),
if (raw_block_comp_type != kNoCompression) {
UncompressionContext uncompression_ctx(raw_block_comp_type,
compression_dict);
s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents, format_version,
ioptions, allocator);
s = UncompressBlockContents(
uncompression_ctx, raw_block_contents->data.data(),
raw_block_contents->data.size(), &uncompressed_block_contents,
format_version, ioptions, allocator);
}
if (!s.ok()) {
delete raw_block;
return s;
}
if (raw_block->compression_type() != kNoCompression) {
block->value = new Block(std::move(contents), raw_block->global_seqno(),
read_amp_bytes_per_bit,
statistics); // uncompressed block
if (raw_block_comp_type != kNoCompression) {
cached_block->value = new Block(std::move(uncompressed_block_contents),
seq_no, read_amp_bytes_per_bit,
statistics); // uncompressed block
} else {
block->value = raw_block;
raw_block = nullptr;
cached_block->value =
new Block(std::move(*raw_block_contents), seq_no,
read_amp_bytes_per_bit, ioptions.statistics);
}
// Insert compressed block into compressed block cache.
// Release the hold on the compressed cache entry immediately.
if (block_cache_compressed != nullptr && raw_block != nullptr &&
raw_block->cachable()) {
s = block_cache_compressed->Insert(compressed_block_cache_key, raw_block,
raw_block->ApproximateMemoryUsage(),
&DeleteCachedEntry<Block>);
if (block_cache_compressed != nullptr &&
raw_block_comp_type != kNoCompression && raw_block_contents != nullptr &&
raw_block_contents->own_bytes()) {
#ifndef NDEBUG
assert(raw_block_contents->is_raw_block);
#endif // NDEBUG
// We cannot directly put raw_block_contents because this could point to
// an object in the stack.
BlockContents* block_cont_for_comp_cache =
new BlockContents(std::move(*raw_block_contents));
s = block_cache_compressed->Insert(
compressed_block_cache_key, block_cont_for_comp_cache,
block_cont_for_comp_cache->ApproximateMemoryUsage(),
&DeleteCachedEntry<BlockContents>);
if (s.ok()) {
// Avoid the following code to delete this cached block.
raw_block = nullptr;
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD);
} else {
RecordTick(statistics, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
delete block_cont_for_comp_cache;
}
}
delete raw_block;
// insert into uncompressed block cache
assert((block->value->compression_type() == kNoCompression));
if (block_cache != nullptr && block->value->cachable()) {
size_t charge = block->value->ApproximateMemoryUsage();
s = block_cache->Insert(block_cache_key, block->value, charge,
&DeleteCachedEntry<Block>, &(block->cache_handle),
priority);
if (block_cache != nullptr && cached_block->value->own_bytes()) {
size_t charge = cached_block->value->ApproximateMemoryUsage();
s = block_cache->Insert(block_cache_key, cached_block->value, charge,
&DeleteCachedEntry<Block>,
&(cached_block->cache_handle), priority);
#ifndef NDEBUG
block_cache->TEST_mark_as_data_block(block_cache_key, charge);
#endif // NDEBUG
if (s.ok()) {
assert(block->cache_handle != nullptr);
assert(cached_block->cache_handle != nullptr);
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
get_context->get_context_stats_.num_cache_bytes_write += charge;
@ -1382,12 +1403,12 @@ Status BlockBasedTable::PutDataBlockToCache(
RecordTick(statistics, BLOCK_CACHE_DATA_BYTES_INSERT, charge);
}
}
assert(reinterpret_cast<Block*>(
block_cache->Value(block->cache_handle)) == block->value);
assert(reinterpret_cast<Block*>(block_cache->Value(
cached_block->cache_handle)) == cached_block->value);
} else {
RecordTick(statistics, BLOCK_CACHE_ADD_FAILURES);
delete block->value;
block->value = nullptr;
delete cached_block->value;
cached_block->value = nullptr;
}
}
@ -1561,12 +1582,16 @@ InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
GetContext* get_context) {
// index reader has already been pre-populated.
if (rep_->index_reader) {
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return rep_->index_reader->NewIterator(
input_iter, read_options.total_order_seek || disable_prefix_seek,
read_options.fill_cache);
}
// we have a pinned index block
if (rep_->index_entry.IsSet()) {
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return rep_->index_entry.value->NewIterator(
input_iter, read_options.total_order_seek || disable_prefix_seek,
read_options.fill_cache);
@ -1649,6 +1674,8 @@ InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
}
assert(cache_handle);
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
auto* iter = index_reader->NewIterator(
input_iter, read_options.total_order_seek || disable_prefix_seek);
@ -1683,9 +1710,9 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
if (rep->compression_dict_block) {
compression_dict = rep->compression_dict_block->data;
}
s = MaybeLoadDataBlockToCache(prefetch_buffer, rep, ro, handle,
compression_dict, &block, is_index,
get_context);
s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle,
compression_dict, &block, is_index,
get_context);
}
TBlockIter* iter;
@ -1711,7 +1738,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit,
GetMemoryAllocator(rep->table_options), rep->immortal_table);
GetMemoryAllocator(rep->table_options));
}
if (s.ok()) {
block.value = block_value.release();
@ -1721,10 +1748,20 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
if (s.ok()) {
assert(block.value != nullptr);
const bool kTotalOrderSeek = true;
// Block contents are pinned and it is still pinned after the iterator
// is destoryed as long as cleanup functions are moved to another object,
// when:
// 1. block cache handle is set to be released in cleanup function, or
// 2. it's pointing to immortable source. If own_bytes is true then we are
// not reading data from the original source, weather immortal or not.
// Otherwise, the block is pinned iff the source is immortal.
bool block_contents_pinned =
(block.cache_handle != nullptr ||
(!block.value->own_bytes() && rep->immortal_table));
iter = block.value->NewIterator<TBlockIter>(
&rep->internal_comparator, rep->internal_comparator.user_comparator(),
iter, rep->ioptions.statistics, kTotalOrderSeek, key_includes_seq,
index_key_is_full);
index_key_is_full, block_contents_pinned);
if (block.cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache,
block.cache_handle);
@ -1733,7 +1770,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
// insert a dummy record to block cache to track the memory usage
Cache::Handle* cache_handle;
// There are two other types of cache keys: 1) SST cache key added in
// `MaybeLoadDataBlockToCache` 2) dummy cache key added in
// `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
// `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
// from SST cache key(31 bytes), and use non-zero prefix to
// differentiate from `write_buffer_manager`
@ -1769,25 +1806,28 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
return iter;
}
Status BlockBasedTable::MaybeLoadDataBlockToCache(
Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro,
const BlockHandle& handle, Slice compression_dict,
CachableEntry<Block>* block_entry, bool is_index, GetContext* get_context) {
assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->table_options.block_cache.get();
// No point to cache compressed blocks if it never goes away
Cache* block_cache_compressed =
rep->table_options.block_cache_compressed.get();
rep->immortal_table ? nullptr
: rep->table_options.block_cache_compressed.get();
// First, try to get the block from the cache
//
// If either block cache is enabled, we'll try to read from it.
Status s;
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice key /* key to the block cache */;
Slice ckey /* key to the compressed block cache */;
if (block_cache != nullptr || block_cache_compressed != nullptr) {
Statistics* statistics = rep->ioptions.statistics;
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
char compressed_cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
Slice key, /* key to the block cache */
ckey /* key to the compressed block cache */;
// create key for block cache
if (block_cache != nullptr) {
key = GetCacheKey(rep->cache_key_prefix, rep->cache_key_prefix_size,
@ -1800,32 +1840,41 @@ Status BlockBasedTable::MaybeLoadDataBlockToCache(
compressed_cache_key);
}
s = GetDataBlockFromCache(
key, ckey, block_cache, block_cache_compressed, rep->ioptions, ro,
block_entry, rep->table_options.format_version, compression_dict,
rep->table_options.read_amp_bytes_per_bit, is_index, get_context,
GetMemoryAllocator(rep->table_options));
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
rep, ro, block_entry, compression_dict,
rep->table_options.read_amp_bytes_per_bit,
is_index, get_context);
// Can't find the block from the cache. If I/O is allowed, read from the
// file.
if (block_entry->value == nullptr && !no_io && ro.fill_cache) {
std::unique_ptr<Block> raw_block;
Statistics* statistics = rep->ioptions.statistics;
bool do_decompress =
block_cache_compressed == nullptr && rep->blocks_maybe_compressed;
CompressionType raw_block_comp_type;
BlockContents raw_block_contents;
{
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile(
BlockFetcher block_fetcher(
rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
&raw_block, rep->ioptions,
block_cache_compressed == nullptr && rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit,
GetMemoryAllocator(rep->table_options), rep->immortal_table);
&raw_block_contents, rep->ioptions,
do_decompress /* do uncompress */, compression_dict,
rep->persistent_cache_options,
GetMemoryAllocator(rep->table_options));
s = block_fetcher.ReadBlockContents();
raw_block_comp_type = block_fetcher.get_compression_type();
}
if (s.ok()) {
SequenceNumber seq_no = rep->get_global_seqno(is_index);
// If filling cache is allowed and a cache is configured, try to put the
// block to the cache.
s = PutDataBlockToCache(
key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions,
block_entry, raw_block.release(), rep->table_options.format_version,
compression_dict, rep->table_options.read_amp_bytes_per_bit,
is_index,
block_entry, &raw_block_contents, raw_block_comp_type,
rep->table_options.format_version, compression_dict, seq_no,
rep->table_options.read_amp_bytes_per_bit, is_index,
is_index && rep->table_options
.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
@ -1868,6 +1917,8 @@ BlockBasedTable::PartitionedIndexIteratorState::NewSecondaryIterator(
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(block->second.cache_handle));
Statistics* kNullStats = nullptr;
// We don't return pinned datat from index blocks, so no need
// to set `block_contents_pinned`.
return block->second.value->NewIterator<IndexBlockIter>(
&rep->internal_comparator, rep->internal_comparator.user_comparator(),
nullptr, kNullStats, true, index_key_includes_seq_, index_key_is_full_);
@ -2612,8 +2663,7 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
Status s;
s = GetDataBlockFromCache(
cache_key, ckey, block_cache, nullptr, rep_->ioptions, options, &block,
rep_->table_options.format_version,
cache_key, ckey, block_cache, nullptr, rep_, options, &block,
rep_->compression_dict_block ? rep_->compression_dict_block->data
: Slice(),
0 /* read_amp_bytes_per_bit */);

@ -257,13 +257,11 @@ class BlockBasedTable : public TableReader {
// @param block_entry value is set to the uncompressed block if found. If
// in uncompressed block cache, also sets cache_handle to reference that
// block.
static Status MaybeLoadDataBlockToCache(FilePrefetchBuffer* prefetch_buffer,
Rep* rep, const ReadOptions& ro,
const BlockHandle& handle,
Slice compression_dict,
CachableEntry<Block>* block_entry,
bool is_index = false,
GetContext* get_context = nullptr);
static Status MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro,
const BlockHandle& handle, Slice compression_dict,
CachableEntry<Block>* block_entry, bool is_index = false,
GetContext* get_context = nullptr);
// For the following two functions:
// if `no_io == true`, we will not try to read filter/index from sst file
@ -301,12 +299,11 @@ class BlockBasedTable : public TableReader {
// dictionary.
static Status GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ImmutableCFOptions& ioptions, const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, uint32_t format_version,
Cache* block_cache, Cache* block_cache_compressed, Rep* rep,
const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, GetContext* get_context = nullptr,
MemoryAllocator* allocator = nullptr);
bool is_index = false, GetContext* get_context = nullptr);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
@ -314,17 +311,19 @@ class BlockBasedTable : public TableReader {
// On success, Status::OK will be returned; also @block will be populated with
// uncompressed block and its cache handle.
//
// REQUIRES: raw_block is heap-allocated. PutDataBlockToCache() will be
// responsible for releasing its memory if error occurs.
// Allocated memory managed by raw_block_contents will be transferred to
// PutDataBlockToCache(). After the call, the object will be invalid.
// @param compression_dict Data for presetting the compression library's
// dictionary.
static Status PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed,
const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, Block* raw_block, uint32_t format_version,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
CachableEntry<Block>* block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type, uint32_t format_version,
const Slice& compression_dict, SequenceNumber seq_no,
size_t read_amp_bytes_per_bit, bool is_index = false,
Cache::Priority pri = Cache::Priority::LOW,
GetContext* get_context = nullptr, MemoryAllocator* allocator = nullptr);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
@ -535,6 +534,10 @@ struct BlockBasedTable::Rep {
bool closed = false;
const bool immortal_table;
SequenceNumber get_global_seqno(bool is_index) const {
return is_index ? kDisableGlobalSequenceNumber : global_seqno;
}
};
template <class TBlockIter, typename TValue = Slice>

@ -172,8 +172,7 @@ inline
void BlockFetcher::GetBlockContents() {
if (slice_.data() != used_buf_) {
// the slice content is not the buffer provided
*contents_ = BlockContents(Slice(slice_.data(), block_size_),
immortal_source_, compression_type);
*contents_ = BlockContents(Slice(slice_.data(), block_size_));
} else {
// page can be either uncompressed or compressed, the buffer either stack
// or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
@ -182,15 +181,21 @@ void BlockFetcher::GetBlockContents() {
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_, true,
compression_type);
*contents_ = BlockContents(std::move(heap_buf_), block_size_);
}
#ifndef NDEBUG
contents_->is_raw_block = true;
#endif
}
Status BlockFetcher::ReadBlockContents() {
block_size_ = static_cast<size_t>(handle_.size());
if (TryGetUncompressBlockFromPersistentCache()) {
compression_type_ = kNoCompression;
#ifndef NDEBUG
contents_->is_raw_block = true;
#endif // NDEBUG
return Status::OK();
}
if (TryGetFromPrefetchBuffer()) {
@ -231,15 +236,16 @@ Status BlockFetcher::ReadBlockContents() {
PERF_TIMER_GUARD(block_decompress_time);
compression_type =
static_cast<rocksdb::CompressionType>(slice_.data()[block_size_]);
compression_type_ = get_block_compression_type(slice_.data(), block_size_);
if (do_uncompress_ && compression_type != kNoCompression) {
if (do_uncompress_ && compression_type_ != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext uncompression_ctx(compression_type, compression_dict_);
UncompressionContext uncompression_ctx(compression_type_,
compression_dict_);
status_ = UncompressBlockContents(uncompression_ctx, slice_.data(),
block_size_, contents_, footer_.version(),
ioptions_, allocator_);
compression_type_ = kNoCompression;
} else {
GetBlockContents();
}

@ -28,8 +28,7 @@ class BlockFetcher {
BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
MemoryAllocator* allocator = nullptr,
const bool immortal_source = false)
MemoryAllocator* allocator = nullptr)
: file_(file),
prefetch_buffer_(prefetch_buffer),
footer_(footer),
@ -38,11 +37,11 @@ class BlockFetcher {
contents_(contents),
ioptions_(ioptions),
do_uncompress_(do_uncompress),
immortal_source_(immortal_source),
compression_dict_(compression_dict),
cache_options_(cache_options),
allocator_(allocator) {}
Status ReadBlockContents();
CompressionType get_compression_type() const { return compression_type_; }
private:
static const uint32_t kDefaultStackBufferSize = 5000;
@ -55,7 +54,6 @@ class BlockFetcher {
BlockContents* contents_;
const ImmutableCFOptions& ioptions_;
bool do_uncompress_;
const bool immortal_source_;
const Slice& compression_dict_;
const PersistentCacheOptions& cache_options_;
MemoryAllocator* allocator_;
@ -66,7 +64,7 @@ class BlockFetcher {
CacheAllocationPtr heap_buf_;
char stack_buf_[kDefaultStackBufferSize];
bool got_from_prefetch_buffer_ = false;
rocksdb::CompressionType compression_type;
rocksdb::CompressionType compression_type_;
// return true if found
bool TryGetUncompressBlockFromPersistentCache();

@ -117,7 +117,6 @@ TEST_F(BlockTest, SimpleTest) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
// read contents of block sequentially
@ -188,7 +187,6 @@ TEST_F(BlockTest, ValueDeltaEncodingTest) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
const bool kTotalOrderSeek = true;
@ -247,7 +245,6 @@ BlockContents GetBlockContents(std::unique_ptr<BlockBuilder> *builder,
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
return contents;
}
@ -257,8 +254,7 @@ void CheckBlockContents(BlockContents contents, const int max_key,
const std::vector<std::string> &values) {
const size_t prefix_size = 6;
// create block reader
BlockContents contents_ref(contents.data, contents.cachable,
contents.compression_type);
BlockContents contents_ref(contents.data);
Block reader1(std::move(contents), kDisableGlobalSequenceNumber);
Block reader2(std::move(contents_ref), kDisableGlobalSequenceNumber);
@ -486,7 +482,6 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());
@ -521,7 +516,6 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());
@ -558,7 +552,6 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = true;
Block reader(std::move(contents), kDisableGlobalSequenceNumber,
kBytesPerBit, stats.get());

@ -284,7 +284,6 @@ TEST(DataBlockHashIndex, BlockRestartIndexExceedMax) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
ASSERT_EQ(reader.IndexType(),
@ -307,7 +306,6 @@ TEST(DataBlockHashIndex, BlockRestartIndexExceedMax) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
ASSERT_EQ(reader.IndexType(),
@ -339,7 +337,6 @@ TEST(DataBlockHashIndex, BlockSizeExceedMax) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
ASSERT_EQ(reader.IndexType(),
@ -364,7 +361,6 @@ TEST(DataBlockHashIndex, BlockSizeExceedMax) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
// the index type have fallen back to binary when build finish.
@ -392,7 +388,6 @@ TEST(DataBlockHashIndex, BlockTestSingleKey) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
const InternalKeyComparator icmp(BytewiseComparator());
@ -474,7 +469,6 @@ TEST(DataBlockHashIndex, BlockTestLarge) {
// create block reader
BlockContents contents;
contents.data = rawblock;
contents.cachable = false;
Block reader(std::move(contents), kDisableGlobalSequenceNumber);
const InternalKeyComparator icmp(BytewiseComparator());

@ -301,7 +301,7 @@ Status UncompressBlockContentsForCompressionType(
if (!Snappy_Uncompress(data, n, ubuf.get())) {
return Status::Corruption(snappy_corrupt_msg);
}
*contents = BlockContents(std::move(ubuf), ulength, true, kNoCompression);
*contents = BlockContents(std::move(ubuf), ulength);
break;
}
case kZlibCompression:
@ -314,8 +314,7 @@ Status UncompressBlockContentsForCompressionType(
"Zlib not supported or corrupted Zlib compressed block contents";
return Status::Corruption(zlib_corrupt_msg);
}
*contents =
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kBZip2Compression:
ubuf = BZip2_Uncompress(
@ -327,8 +326,7 @@ Status UncompressBlockContentsForCompressionType(
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
return Status::Corruption(bzip2_corrupt_msg);
}
*contents =
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kLZ4Compression:
ubuf = LZ4_Uncompress(
@ -340,8 +338,7 @@ Status UncompressBlockContentsForCompressionType(
"LZ4 not supported or corrupted LZ4 compressed block contents";
return Status::Corruption(lz4_corrupt_msg);
}
*contents =
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kLZ4HCCompression:
ubuf = LZ4_Uncompress(
@ -353,8 +350,7 @@ Status UncompressBlockContentsForCompressionType(
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
return Status::Corruption(lz4hc_corrupt_msg);
}
*contents =
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kXpressCompression:
// XPRESS allocates memory internally, thus no support for custom
@ -365,8 +361,7 @@ Status UncompressBlockContentsForCompressionType(
"XPRESS not supported or corrupted XPRESS compressed block contents";
return Status::Corruption(xpress_corrupt_msg);
}
*contents =
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kZSTD:
case kZSTDNotFinalCompression:
@ -377,8 +372,7 @@ Status UncompressBlockContentsForCompressionType(
"ZSTD not supported or corrupted ZSTD compressed block contents";
return Status::Corruption(zstd_corrupt_msg);
}
*contents =
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
default:
return Status::Corruption("bad block type");

@ -189,33 +189,43 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
// 1-byte type + 32-bit crc
static const size_t kBlockTrailerSize = 5;
inline CompressionType get_block_compression_type(const char* block_data,
size_t block_size) {
return static_cast<CompressionType>(block_data[block_size]);
}
struct BlockContents {
Slice data; // Actual contents of data
bool cachable; // True iff data can be cached
CompressionType compression_type;
CacheAllocationPtr allocation;
BlockContents() : cachable(false), compression_type(kNoCompression) {}
#ifndef NDEBUG
// Whether the block is a raw block, which contains compression type
// byte. It is only used for assertion.
bool is_raw_block = false;
#endif // NDEBUG
BlockContents(const Slice& _data, bool _cachable,
CompressionType _compression_type)
: data(_data), cachable(_cachable), compression_type(_compression_type) {}
BlockContents() {}
BlockContents(CacheAllocationPtr&& _data, size_t _size, bool _cachable,
CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type),
allocation(std::move(_data)) {}
BlockContents(const Slice& _data) : data(_data) {}
BlockContents(std::unique_ptr<char[]>&& _data, size_t _size, bool _cachable,
CompressionType _compression_type)
: data(_data.get(), _size),
cachable(_cachable),
compression_type(_compression_type) {
BlockContents(CacheAllocationPtr&& _data, size_t _size)
: data(_data.get(), _size), allocation(std::move(_data)) {}
BlockContents(std::unique_ptr<char[]>&& _data, size_t _size)
: data(_data.get(), _size) {
allocation.reset(_data.release());
}
bool own_bytes() const { return allocation.get() != nullptr; }
// It's the caller's responsibility to make sure that this is
// for raw block contents, which contains the compression
// byte in the end.
CompressionType get_compression_type() const {
assert(is_raw_block);
return get_block_compression_type(data.data(), data.size());
}
// The additional memory space taken by the block data.
size_t usable_size() const {
if (allocation.get() != nullptr) {
@ -233,15 +243,20 @@ struct BlockContents {
}
}
size_t ApproximateMemoryUsage() const {
return usable_size() + sizeof(*this);
}
BlockContents(BlockContents&& other) ROCKSDB_NOEXCEPT {
*this = std::move(other);
}
BlockContents& operator=(BlockContents&& other) {
data = std::move(other.data);
cachable = other.cachable;
compression_type = other.compression_type;
allocation = std::move(other.allocation);
#ifndef NDEBUG
is_raw_block = other.is_raw_block;
#endif // NDEBUG
return *this;
}
};

@ -175,7 +175,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ImmutableCFOptions& ioptions,
TableProperties** table_properties,
bool compression_type_missing) {
bool /*compression_type_missing*/) {
assert(table_properties);
Slice v = handle_value;
@ -195,11 +195,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
file, prefetch_buffer, footer, read_options, handle, &block_contents,
ioptions, false /* decompress */, compression_dict, cache_options);
s = block_fetcher.ReadBlockContents();
// override compression_type when table file is known to contain undefined
// value at compression type marker
if (compression_type_missing) {
block_contents.compression_type = kNoCompression;
}
// property block is never compressed. Need to add uncompress logic if we are
// to compress it..
if (!s.ok()) {
return s;
@ -343,11 +340,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
if (!s.ok()) {
return s;
}
// override compression_type when table file is known to contain undefined
// value at compression type marker
if (compression_type_missing) {
metaindex_contents.compression_type = kNoCompression;
}
// property blocks are never compressed. Need to add uncompress logic if we
// are to compress it.
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);
std::unique_ptr<InternalIterator> meta_iter(
@ -387,10 +381,10 @@ Status FindMetaBlock(InternalIterator* meta_index_iter,
Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number,
const ImmutableCFOptions &ioptions,
const ImmutableCFOptions& ioptions,
const std::string& meta_block_name,
BlockHandle* block_handle,
bool compression_type_missing) {
bool /*compression_type_missing*/) {
Footer footer;
auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size,
&footer, table_magic_number);
@ -412,11 +406,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
if (!s.ok()) {
return s;
}
// override compression_type when table file is known to contain undefined
// value at compression type marker
if (compression_type_missing) {
metaindex_contents.compression_type = kNoCompression;
}
// meta blocks are never compressed. Need to add uncompress logic if we are to
// compress it.
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);
@ -432,7 +423,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
uint64_t table_magic_number,
const ImmutableCFOptions& ioptions,
const std::string& meta_block_name,
BlockContents* contents, bool compression_type_missing) {
BlockContents* contents,
bool /*compression_type_missing*/) {
Status status;
Footer footer;
status = ReadFooterFromFile(file, prefetch_buffer, file_size, &footer,
@ -457,11 +449,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
if (!status.ok()) {
return status;
}
// override compression_type when table file is known to contain undefined
// value at compression type marker
if (compression_type_missing) {
metaindex_contents.compression_type = kNoCompression;
}
// meta block is never compressed. Need to add uncompress logic if we are to
// compress it.
// Finding metablock
Block metaindex_block(std::move(metaindex_contents),

@ -33,7 +33,7 @@ class MockedBlockBasedTable : public BlockBasedTable {
const SliceTransform* prefix_extractor) const override {
Slice slice = slices[filter_blk_handle.offset()];
auto obj = new FullFilterBlockReader(
prefix_extractor, true, BlockContents(slice, false, kNoCompression),
prefix_extractor, true, BlockContents(slice),
rep_->table_options.filter_policy->GetFilterBitsReader(slice), nullptr);
return {obj, nullptr};
}
@ -44,7 +44,7 @@ class MockedBlockBasedTable : public BlockBasedTable {
const SliceTransform* prefix_extractor) const override {
Slice slice = slices[filter_blk_handle.offset()];
auto obj = new FullFilterBlockReader(
prefix_extractor, true, BlockContents(slice, false, kNoCompression),
prefix_extractor, true, BlockContents(slice),
rep_->table_options.filter_policy->GetFilterBitsReader(slice), nullptr);
return obj;
}
@ -149,8 +149,8 @@ class PartitionedFilterBlockTest
new BlockBasedTable::Rep(ioptions, env_options, table_options_, icomp,
!kSkipFilters, 0, !kImmortal)));
auto reader = new PartitionedFilterBlockReader(
prefix_extractor, true, BlockContents(slice, false, kNoCompression),
nullptr, nullptr, icomp, table.get(), pib->seperator_is_key_plus_seq(),
prefix_extractor, true, BlockContents(slice), nullptr, nullptr, icomp,
table.get(), pib->seperator_is_key_plus_seq(),
!pib->get_use_value_delta_encoding());
return reader;
}

@ -29,12 +29,9 @@ void PersistentCacheHelper::InsertUncompressedPage(
const BlockContents& contents) {
assert(cache_options.persistent_cache);
assert(!cache_options.persistent_cache->IsCompressed());
if (!contents.cachable || contents.compression_type != kNoCompression) {
// We shouldn't cache this. Either
// (1) content is not cacheable
// (2) content is compressed
return;
}
// Precondition:
// (1) content is cacheable
// (2) content is not compressed
// construct the page key
char cache_key[BlockBasedTable::kMaxCacheKeyPrefixSize + kMaxVarint64Length];
@ -109,8 +106,7 @@ Status PersistentCacheHelper::LookupUncompressedPage(
// update stats
RecordTick(cache_options.statistics, PERSISTENT_CACHE_HIT);
// construct result and return
*contents =
BlockContents(std::move(data), size, false /*cacheable*/, kNoCompression);
*contents = BlockContents(std::move(data), size);
return Status::OK();
}

@ -232,7 +232,6 @@ class BlockConstructor: public Constructor {
data_ = builder.Finish().ToString();
BlockContents contents;
contents.data = data_;
contents.cachable = false;
block_ = new Block(std::move(contents), kDisableGlobalSequenceNumber);
return Status::OK();
}

Loading…
Cancel
Save