Fix BlockBasedTable not always using memory allocator if available (#4678)

Summary:
Fix block based table reader not using memory_allocator when allocating index blocks and compression dictionary blocks.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4678

Differential Revision: D13054594

Pulled By: yiwu-arbug

fbshipit-source-id: 379f25bcc665395662511c4f873f4b7b55104ce2
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent 8fe1e06ca0
commit 512a5e3ef8
  1. 4
      HISTORY.md
  2. 133
      table/block_based_table_reader.cc
  3. 6
      table/block_based_table_reader.h
  4. 26
      table/block_fetcher.cc
  5. 18
      table/block_fetcher.h
  6. 42
      table/meta_blocks.cc
  7. 15
      table/meta_blocks.h
  8. 11
      table/table_test.cc

@ -1,8 +1,8 @@
# Rocksdb Change Log
## Unreleased
### New Features
* Introduced `Memoryllocator`, which lets the user specify custom allocator for memory in block cache.
* Improved `DeleteRange` to prevent read performance degradation. The feature is no longer marked as experimental.
### Public API Change
* `NO_ITERATORS` is divided into two counters `NO_ITERATOR_CREATED` and `NO_ITERATOR_DELETE`. Both of them are only increasing now, just as other counters.
### Bug Fixes
@ -11,6 +11,8 @@
## 5.18.0 (11/12/2018)
### New Features
* Introduced `Memoryllocator` interface, which lets the user specify custom allocator for memory in block cache.
* Introduced `JemallocNodumpAllocator` memory allocator. When being use, block cache will be excluded from core dump.
* Introduced `PerfContextByLevel` as part of `PerfContext` which allows storing perf context at each level. Also replaced `__thread` with `thread_local` keyword for perf_context. Added per-level perf context for bloom filter and `Get` query.
* With level_compaction_dynamic_level_bytes = true, level multiplier may be adjusted automatically when Level 0 to 1 compaction is lagged behind.
* Introduced DB option `atomic_flush`. If true, RocksDB supports flushing multiple column families and atomically committing the result to MANIFEST. Useful when WAL is disabled.

@ -78,13 +78,14 @@ Status ReadBlockFromFile(
RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
bool do_uncompress, bool maybe_compressed, const Slice& compression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, MemoryAllocator* allocator = nullptr) {
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator) {
BlockContents contents;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
&contents, ioptions, do_uncompress,
compression_dict, cache_options, allocator);
maybe_compressed, compression_dict, cache_options,
memory_allocator);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno,
@ -101,6 +102,13 @@ inline MemoryAllocator* GetMemoryAllocator(
: nullptr;
}
inline MemoryAllocator* GetMemoryAllocatorForCompressedBlock(
const BlockBasedTableOptions& table_options) {
return table_options.block_cache_compressed.get()
? table_options.block_cache_compressed->memory_allocator()
: nullptr;
}
// Delete the resource that is held by the iterator.
template <class ResourceType>
void DeleteHeldResource(void* arg, void* /*ignored*/) {
@ -222,13 +230,15 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
IndexReader** index_reader,
const PersistentCacheOptions& cache_options,
const int level, const bool index_key_includes_seq,
const bool index_value_is_full) {
const bool index_value_is_full,
MemoryAllocator* memory_allocator) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(
file, prefetch_buffer, footer, ReadOptions(), index_handle,
&index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
memory_allocator);
if (s.ok()) {
*index_reader = new PartitionIndexReader(
@ -401,13 +411,15 @@ class BinarySearchIndexReader : public IndexReader {
IndexReader** index_reader,
const PersistentCacheOptions& cache_options,
const bool index_key_includes_seq,
const bool index_value_is_full) {
const bool index_value_is_full,
MemoryAllocator* memory_allocator) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(
file, prefetch_buffer, footer, ReadOptions(), index_handle,
&index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
memory_allocator);
if (s.ok()) {
*index_reader = new BinarySearchIndexReader(
@ -473,13 +485,15 @@ class HashIndexReader : public IndexReader {
InternalIterator* meta_index_iter, IndexReader** index_reader,
bool /*hash_index_allow_collision*/,
const PersistentCacheOptions& cache_options,
const bool index_key_includes_seq, const bool index_value_is_full) {
const bool index_key_includes_seq, const bool index_value_is_full,
MemoryAllocator* memory_allocator) {
std::unique_ptr<Block> index_block;
auto s = ReadBlockFromFile(
file, prefetch_buffer, footer, ReadOptions(), index_handle,
&index_block, ioptions, true /* decompress */,
Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */);
true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
memory_allocator);
if (!s.ok()) {
return s;
@ -518,7 +532,8 @@ class HashIndexReader : public IndexReader {
BlockFetcher prefixes_block_fetcher(
file, prefetch_buffer, footer, ReadOptions(), prefixes_handle,
&prefixes_contents, ioptions, true /*decompress*/,
dummy_comp_dict /*compression dict*/, cache_options);
true /*maybe_compressed*/, dummy_comp_dict /*compression dict*/,
cache_options, memory_allocator);
s = prefixes_block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
@ -527,7 +542,8 @@ class HashIndexReader : public IndexReader {
BlockFetcher prefixes_meta_block_fetcher(
file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle,
&prefixes_meta_contents, ioptions, true /*decompress*/,
dummy_comp_dict /*compression dict*/, cache_options);
true /*maybe_compressed*/, dummy_comp_dict /*compression dict*/,
cache_options, memory_allocator);
s = prefixes_meta_block_fetcher.ReadBlockContents();
if (!s.ok()) {
// TODO: log error
@ -894,7 +910,9 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
if (s.ok()) {
s = ReadProperties(meta_iter->value(), rep->file.get(),
prefetch_buffer.get(), rep->footer, rep->ioptions,
&table_properties, false /* compression_type_missing */);
&table_properties,
false /* compression_type_missing */,
nullptr /* memory_allocator */);
}
if (!s.ok()) {
@ -938,7 +956,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
read_options.verify_checksums = false;
BlockFetcher compression_block_fetcher(
rep->file.get(), prefetch_buffer.get(), rep->footer, read_options,
compression_dict_handle, compression_dict_cont.get(), rep->ioptions, false /* decompress */,
compression_dict_handle, compression_dict_cont.get(), rep->ioptions,
false /* decompress */, false /*maybe_compressed*/,
Slice() /*compression dict*/, cache_options);
s = compression_block_fetcher.ReadBlockContents();
@ -1166,9 +1185,10 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
Status s = ReadBlockFromFile(
rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(),
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, Slice() /*compression dict*/,
rep->persistent_cache_options, kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, GetMemoryAllocator(rep->table_options));
true /* decompress */, true /*maybe_compressed*/,
Slice() /*compression dict*/, rep->persistent_cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
GetMemoryAllocator(rep->table_options));
if (!s.ok()) {
ROCKS_LOG_ERROR(rep->ioptions.info_log,
@ -1312,8 +1332,8 @@ Status BlockBasedTable::PutDataBlockToCache(
CachableEntry<Block>* cached_block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type, uint32_t format_version,
const Slice& compression_dict, SequenceNumber seq_no,
size_t read_amp_bytes_per_bit, bool is_index, Cache::Priority priority,
GetContext* get_context, MemoryAllocator* allocator) {
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
bool is_index, Cache::Priority priority, GetContext* get_context) {
assert(raw_block_comp_type == kNoCompression ||
block_cache_compressed != nullptr);
@ -1327,7 +1347,7 @@ Status BlockBasedTable::PutDataBlockToCache(
s = UncompressBlockContents(
uncompression_ctx, raw_block_contents->data.data(),
raw_block_contents->data.size(), &uncompressed_block_contents,
format_version, ioptions, allocator);
format_version, ioptions, memory_allocator);
}
if (!s.ok()) {
return s;
@ -1431,11 +1451,11 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
Slice dummy_comp_dict;
BlockFetcher block_fetcher(rep->file.get(), prefetch_buffer, rep->footer,
ReadOptions(), filter_handle, &block,
rep->ioptions, false /* decompress */,
dummy_comp_dict, rep->persistent_cache_options,
GetMemoryAllocator(rep->table_options));
BlockFetcher block_fetcher(
rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(),
filter_handle, &block, rep->ioptions, false /* decompress */,
false /*maybe_compressed*/, dummy_comp_dict,
rep->persistent_cache_options, GetMemoryAllocator(rep->table_options));
Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -1736,8 +1756,10 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile(
rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
&block_value, rep->ioptions, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
&block_value, rep->ioptions,
rep->blocks_maybe_compressed /*do_decompress*/,
rep->blocks_maybe_compressed, compression_dict,
rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit,
GetMemoryAllocator(rep->table_options));
@ -1857,13 +1879,13 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
BlockContents raw_block_contents;
{
StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS);
BlockFetcher block_fetcher(
rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
&raw_block_contents, rep->ioptions,
do_decompress /* do uncompress */, compression_dict,
rep->persistent_cache_options,
GetMemoryAllocator(rep->table_options));
do_decompress /* do uncompress */, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
GetMemoryAllocator(rep->table_options),
GetMemoryAllocatorForCompressedBlock(rep->table_options));
s = block_fetcher.ReadBlockContents();
raw_block_comp_type = block_fetcher.get_compression_type();
}
@ -1876,12 +1898,13 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions,
block_entry, &raw_block_contents, raw_block_comp_type,
rep->table_options.format_version, compression_dict, seq_no,
rep->table_options.read_amp_bytes_per_bit, is_index,
rep->table_options.read_amp_bytes_per_bit,
GetMemoryAllocator(rep->table_options), is_index,
is_index && rep->table_options
.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW,
get_context, GetMemoryAllocator(rep->table_options));
get_context);
}
}
}
@ -2608,12 +2631,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
BlockHandle handle = index_iter->value();
BlockContents contents;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
rep_->footer, ReadOptions(), handle, &contents,
rep_->ioptions, false /* decompress */,
dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options,
GetMemoryAllocator(rep_->table_options));
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, false /*maybe_compressed*/,
dummy_comp_dict /*compression dict*/, rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
@ -2635,12 +2657,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
s = handle.DecodeFrom(&input);
BlockContents contents;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(rep_->file.get(), nullptr /* prefetch buffer */,
rep_->footer, ReadOptions(), handle, &contents,
rep_->ioptions, false /* decompress */,
dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options,
GetMemoryAllocator(rep_->table_options));
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, false /*maybe_compressed*/,
dummy_comp_dict /*compression dict*/, rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
@ -2729,7 +2750,8 @@ Status BlockBasedTable::CreateIndexReader(
rep_->table_properties == nullptr ||
rep_->table_properties->index_key_is_user_key == 0,
rep_->table_properties == nullptr ||
rep_->table_properties->index_value_is_delta_encoded == 0);
rep_->table_properties->index_value_is_delta_encoded == 0,
GetMemoryAllocator(rep_->table_options));
}
case BlockBasedTableOptions::kBinarySearch: {
return BinarySearchIndexReader::Create(
@ -2738,7 +2760,8 @@ Status BlockBasedTable::CreateIndexReader(
rep_->table_properties == nullptr ||
rep_->table_properties->index_key_is_user_key == 0,
rep_->table_properties == nullptr ||
rep_->table_properties->index_value_is_delta_encoded == 0);
rep_->table_properties->index_value_is_delta_encoded == 0,
GetMemoryAllocator(rep_->table_options));
}
case BlockBasedTableOptions::kHashSearch: {
std::unique_ptr<Block> meta_guard;
@ -2760,7 +2783,8 @@ Status BlockBasedTable::CreateIndexReader(
rep_->table_properties == nullptr ||
rep_->table_properties->index_key_is_user_key == 0,
rep_->table_properties == nullptr ||
rep_->table_properties->index_value_is_delta_encoded == 0);
rep_->table_properties->index_value_is_delta_encoded == 0,
GetMemoryAllocator(rep_->table_options));
}
meta_index_iter = meta_iter_guard.get();
}
@ -2773,7 +2797,8 @@ Status BlockBasedTable::CreateIndexReader(
rep_->table_properties == nullptr ||
rep_->table_properties->index_key_is_user_key == 0,
rep_->table_properties == nullptr ||
rep_->table_properties->index_value_is_delta_encoded == 0);
rep_->table_properties->index_value_is_delta_encoded == 0,
GetMemoryAllocator(rep_->table_options));
}
default: {
std::string error_message =
@ -2942,9 +2967,9 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file,
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
ReadOptions(), handle, &block, rep_->ioptions,
false /*decompress*/, dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options,
GetMemoryAllocator(rep_->table_options));
false /*decompress*/, false /*maybe_compressed*/,
dummy_comp_dict /*compression dict*/,
rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
rep_->filter.reset(new BlockBasedFilterBlockReader(

@ -322,9 +322,9 @@ class BlockBasedTable : public TableReader {
CachableEntry<Block>* block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type, uint32_t format_version,
const Slice& compression_dict, SequenceNumber seq_no,
size_t read_amp_bytes_per_bit, bool is_index = false,
Cache::Priority pri = Cache::Priority::LOW,
GetContext* get_context = nullptr, MemoryAllocator* allocator = nullptr);
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
GetContext* get_context = nullptr);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.

@ -140,8 +140,13 @@ void BlockFetcher::PrepareBufferForBlockFromFile() {
// If we've got a small enough hunk of data, read it in to the
// trivially allocated stack buffer instead of needing a full malloc()
used_buf_ = &stack_buf_[0];
} else if (maybe_compressed_ && !do_uncompress_) {
compressed_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize,
memory_allocator_compressed_);
used_buf_ = compressed_buf_.get();
} else {
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
heap_buf_ =
AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
used_buf_ = heap_buf_.get();
}
}
@ -168,6 +173,12 @@ void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
}
}
inline void BlockFetcher::CopyBufferToHeap() {
assert(used_buf_ != heap_buf_.get());
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, memory_allocator_);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
}
inline
void BlockFetcher::GetBlockContents() {
if (slice_.data() != used_buf_) {
@ -177,9 +188,14 @@ void BlockFetcher::GetBlockContents() {
// page can be either uncompressed or compressed, the buffer either stack
// or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
assert(used_buf_ != heap_buf_.get());
heap_buf_ = AllocateBlock(block_size_ + kBlockTrailerSize, allocator_);
memcpy(heap_buf_.get(), used_buf_, block_size_ + kBlockTrailerSize);
CopyBufferToHeap();
} else if (used_buf_ == compressed_buf_.get()) {
if (compression_type_ == kNoCompression &&
memory_allocator_ != memory_allocator_compressed_) {
CopyBufferToHeap();
} else {
heap_buf_ = std::move(compressed_buf_);
}
}
*contents_ = BlockContents(std::move(heap_buf_), block_size_);
}
@ -244,7 +260,7 @@ Status BlockFetcher::ReadBlockContents() {
compression_dict_);
status_ = UncompressBlockContents(uncompression_ctx, slice_.data(),
block_size_, contents_, footer_.version(),
ioptions_, allocator_);
ioptions_, memory_allocator_);
compression_type_ = kNoCompression;
} else {
GetBlockContents();

@ -10,7 +10,6 @@
#pragma once
#include "table/block.h"
#include "table/format.h"
#include "util/memory_allocator.h"
namespace rocksdb {
@ -26,9 +25,11 @@ class BlockFetcher {
FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ReadOptions& read_options, const BlockHandle& handle,
BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, const Slice& compression_dict,
bool do_uncompress, bool maybe_compressed,
const Slice& compression_dict,
const PersistentCacheOptions& cache_options,
MemoryAllocator* allocator = nullptr)
MemoryAllocator* memory_allocator = nullptr,
MemoryAllocator* memory_allocator_compressed = nullptr)
: file_(file),
prefetch_buffer_(prefetch_buffer),
footer_(footer),
@ -37,9 +38,11 @@ class BlockFetcher {
contents_(contents),
ioptions_(ioptions),
do_uncompress_(do_uncompress),
maybe_compressed_(maybe_compressed),
compression_dict_(compression_dict),
cache_options_(cache_options),
allocator_(allocator) {}
memory_allocator_(memory_allocator),
memory_allocator_compressed_(memory_allocator_compressed) {}
Status ReadBlockContents();
CompressionType get_compression_type() const { return compression_type_; }
@ -54,14 +57,17 @@ class BlockFetcher {
BlockContents* contents_;
const ImmutableCFOptions& ioptions_;
bool do_uncompress_;
bool maybe_compressed_;
const Slice& compression_dict_;
const PersistentCacheOptions& cache_options_;
MemoryAllocator* allocator_;
MemoryAllocator* memory_allocator_;
MemoryAllocator* memory_allocator_compressed_;
Status status_;
Slice slice_;
char* used_buf_ = nullptr;
size_t block_size_;
CacheAllocationPtr heap_buf_;
CacheAllocationPtr compressed_buf_;
char stack_buf_[kDefaultStackBufferSize];
bool got_from_prefetch_buffer_ = false;
rocksdb::CompressionType compression_type_;
@ -72,6 +78,8 @@ class BlockFetcher {
bool TryGetFromPrefetchBuffer();
bool TryGetCompressedBlockFromPersistentCache();
void PrepareBufferForBlockFromFile();
// Copy content from used_buf_ to new heap buffer.
void CopyBufferToHeap();
void GetBlockContents();
void InsertCompressedBlockToPersistentCacheIfNeeded();
void InsertUncompressedBlockToPersistentCacheIfNeeded();

@ -175,7 +175,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ImmutableCFOptions& ioptions,
TableProperties** table_properties,
bool /*compression_type_missing*/) {
bool /*compression_type_missing*/,
MemoryAllocator* memory_allocator) {
assert(table_properties);
Slice v = handle_value;
@ -191,9 +192,10 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
Slice compression_dict;
PersistentCacheOptions cache_options;
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, read_options, handle, &block_contents,
ioptions, false /* decompress */, compression_dict, cache_options);
BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options,
handle, &block_contents, ioptions,
false /* decompress */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
s = block_fetcher.ReadBlockContents();
// property block is never compressed. Need to add uncompress logic if we are
// to compress it..
@ -316,7 +318,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number,
const ImmutableCFOptions& ioptions,
TableProperties** properties,
bool compression_type_missing) {
bool compression_type_missing,
MemoryAllocator* memory_allocator) {
// -- Read metaindex block
Footer footer;
auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size,
@ -332,10 +335,11 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
Slice compression_dict;
PersistentCacheOptions cache_options;
BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions, false /* decompress */,
compression_dict, cache_options);
BlockFetcher block_fetcher(file, nullptr /* prefetch_buffer */, footer,
read_options, metaindex_handle,
&metaindex_contents, ioptions,
false /* decompress */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
@ -358,7 +362,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
TableProperties table_properties;
if (found_properties_block == true) {
s = ReadProperties(meta_iter->value(), file, nullptr /* prefetch_buffer */,
footer, ioptions, properties, compression_type_missing);
footer, ioptions, properties, compression_type_missing,
memory_allocator);
} else {
s = Status::NotFound();
}
@ -384,7 +389,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
const ImmutableCFOptions& ioptions,
const std::string& meta_block_name,
BlockHandle* block_handle,
bool /*compression_type_missing*/) {
bool /*compression_type_missing*/,
MemoryAllocator* memory_allocator) {
Footer footer;
auto s = ReadFooterFromFile(file, nullptr /* prefetch_buffer */, file_size,
&footer, table_magic_number);
@ -401,7 +407,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions,
false /* do decompression */, compression_dict, cache_options);
false /* do decompression */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
@ -423,8 +430,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
uint64_t table_magic_number,
const ImmutableCFOptions& ioptions,
const std::string& meta_block_name,
BlockContents* contents,
bool /*compression_type_missing*/) {
BlockContents* contents, bool /*compression_type_missing*/,
MemoryAllocator* memory_allocator) {
Status status;
Footer footer;
status = ReadFooterFromFile(file, prefetch_buffer, file_size, &footer,
@ -443,8 +450,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions,
false /* decompress */, compression_dict,
cache_options);
false /* decompress */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
status = block_fetcher.ReadBlockContents();
if (!status.ok()) {
return status;
@ -470,7 +477,8 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
// Reading metablock
BlockFetcher block_fetcher2(
file, prefetch_buffer, footer, read_options, block_handle, contents,
ioptions, false /* decompress */, compression_dict, cache_options);
ioptions, false /* decompress */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
return block_fetcher2.ReadBlockContents();
}

@ -11,12 +11,13 @@
#include "db/builder.h"
#include "db/table_properties_collector.h"
#include "util/kv_map.h"
#include "rocksdb/comparator.h"
#include "rocksdb/memory_allocator.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "table/block_builder.h"
#include "table/format.h"
#include "util/kv_map.h"
namespace rocksdb {
@ -96,7 +97,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ImmutableCFOptions& ioptions,
TableProperties** table_properties,
bool compression_type_missing = false);
bool compression_type_missing = false,
MemoryAllocator* memory_allocator = nullptr);
// Directly read the properties from the properties block of a plain table.
// @returns a status to indicate if the operation succeeded. On success,
@ -110,7 +112,8 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
uint64_t table_magic_number,
const ImmutableCFOptions& ioptions,
TableProperties** properties,
bool compression_type_missing = false);
bool compression_type_missing = false,
MemoryAllocator* memory_allocator = nullptr);
// Find the meta block from the meta index block.
Status FindMetaBlock(InternalIterator* meta_index_iter,
@ -123,7 +126,8 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
const ImmutableCFOptions& ioptions,
const std::string& meta_block_name,
BlockHandle* block_handle,
bool compression_type_missing = false);
bool compression_type_missing = false,
MemoryAllocator* memory_allocator = nullptr);
// Read the specified meta block with name meta_block_name
// from `file` and initialize `contents` with contents of this block.
@ -134,6 +138,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
const ImmutableCFOptions& ioptions,
const std::string& meta_block_name,
BlockContents* contents,
bool compression_type_missing = false);
bool compression_type_missing = false,
MemoryAllocator* memory_allocator = nullptr);
} // namespace rocksdb

@ -3579,10 +3579,10 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
Slice compression_dict;
PersistentCacheOptions cache_options;
BlockFetcher block_fetcher(file, nullptr /* prefetch_buffer */, footer,
read_options, handle, contents, ioptions,
false /* decompress */, compression_dict,
cache_options);
BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options, handle,
contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, compression_dict, cache_options);
ASSERT_OK(block_fetcher.ReadBlockContents());
};
@ -3668,7 +3668,8 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
BlockFetcher block_fetcher(
table_reader.get(), nullptr /* prefetch_buffer */, footer, ReadOptions(),
metaindex_handle, &metaindex_contents, ioptions, false /* decompress */,
compression_dict, pcache_opts);
false /*maybe_compressed*/, compression_dict, pcache_opts,
nullptr /*memory_allocator*/);
ASSERT_OK(block_fetcher.ReadBlockContents());
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);

Loading…
Cancel
Save