Revert to storing UncompressionDicts in the cache (#5645)

Summary:
PR https://github.com/facebook/rocksdb/issues/5584 decoupled the uncompression dictionary object from the underlying block data; however, this defeats the purpose of the digested ZSTD dictionary, since the whole point
of the digest is to create it once and reuse it over and over again. This patch goes back to
storing the uncompression dictionary itself in the cache (which should be now safe to do,
since it no longer includes a Statistics pointer), while preserving the rest of the refactoring.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5645

Test Plan: make asan_check

Differential Revision: D16551864

Pulled By: ltamasi

fbshipit-source-id: 2a7e2d34bb16e70e3c816506d5afe1d842057800
main
Levi Tamasi 5 years ago committed by Facebook Github Bot
parent d8a27d9331
commit df8c307d63
  1. 186
      table/block_based/block_based_table_reader.cc
  2. 6
      table/block_based/block_based_table_reader.h
  3. 72
      table/block_based/uncompression_dict_reader.cc
  4. 21
      table/block_based/uncompression_dict_reader.h
  5. 69
      util/compression.h

@ -72,6 +72,57 @@ BlockBasedTable::~BlockBasedTable() {
std::atomic<uint64_t> BlockBasedTable::next_cache_key_id_(0); std::atomic<uint64_t> BlockBasedTable::next_cache_key_id_(0);
template <typename TBlocklike>
class BlocklikeTraits;
template <>
class BlocklikeTraits<BlockContents> {
public:
static BlockContents* Create(BlockContents&& contents,
SequenceNumber /* global_seqno */,
size_t /* read_amp_bytes_per_bit */,
Statistics* /* statistics */,
bool /* using_zstd */) {
return new BlockContents(std::move(contents));
}
static uint32_t GetNumRestarts(const BlockContents& /* contents */) {
return 0;
}
};
template <>
class BlocklikeTraits<Block> {
public:
static Block* Create(BlockContents&& contents, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, Statistics* statistics,
bool /* using_zstd */) {
return new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit,
statistics);
}
static uint32_t GetNumRestarts(const Block& block) {
return block.NumRestarts();
}
};
template <>
class BlocklikeTraits<UncompressionDict> {
public:
static UncompressionDict* Create(BlockContents&& contents,
SequenceNumber /* global_seqno */,
size_t /* read_amp_bytes_per_bit */,
Statistics* /* statistics */,
bool using_zstd) {
return new UncompressionDict(contents.data, std::move(contents.allocation),
using_zstd);
}
static uint32_t GetNumRestarts(const UncompressionDict& /* dict */) {
return 0;
}
};
namespace { namespace {
// Read the block identified by "handle" from "file". // Read the block identified by "handle" from "file".
// The only relevant option is options.verify_checksums for now. // The only relevant option is options.verify_checksums for now.
@ -79,15 +130,16 @@ namespace {
// On success fill *result and return OK - caller owns *result // On success fill *result and return OK - caller owns *result
// @param uncompression_dict Data for presetting the compression library's // @param uncompression_dict Data for presetting the compression library's
// dictionary. // dictionary.
template <typename TBlocklike>
Status ReadBlockFromFile( Status ReadBlockFromFile(
RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
const Footer& footer, const ReadOptions& options, const BlockHandle& handle, const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions, std::unique_ptr<TBlocklike>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, bool maybe_compressed, BlockType block_type, bool do_uncompress, bool maybe_compressed, BlockType block_type,
const UncompressionDict& uncompression_dict, const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator, size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
bool for_compaction = false) { bool for_compaction, bool using_zstd) {
assert(result); assert(result);
BlockContents contents; BlockContents contents;
@ -97,34 +149,9 @@ Status ReadBlockFromFile(
cache_options, memory_allocator, nullptr, for_compaction); cache_options, memory_allocator, nullptr, for_compaction);
Status s = block_fetcher.ReadBlockContents(); Status s = block_fetcher.ReadBlockContents();
if (s.ok()) { if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno, result->reset(BlocklikeTraits<TBlocklike>::Create(
read_amp_bytes_per_bit, ioptions.statistics)); std::move(contents), global_seqno, read_amp_bytes_per_bit,
} ioptions.statistics, using_zstd));
return s;
}
Status ReadBlockFromFile(
RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<BlockContents>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, bool maybe_compressed, BlockType block_type,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options,
SequenceNumber /* global_seqno */, size_t /* read_amp_bytes_per_bit */,
MemoryAllocator* memory_allocator, bool for_compaction = false) {
assert(result);
result->reset(new BlockContents);
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, options, handle, result->get(), ioptions,
do_uncompress, maybe_compressed, block_type, uncompression_dict,
cache_options, memory_allocator, nullptr, for_compaction);
const Status s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
result->reset();
} }
return s; return s;
@ -1599,7 +1626,8 @@ Status BlockBasedTable::ReadMetaBlock(FilePrefetchBuffer* prefetch_buffer,
true /* decompress */, true /*maybe_compressed*/, BlockType::kMetaIndex, true /* decompress */, true /*maybe_compressed*/, BlockType::kMetaIndex,
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options, UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
GetMemoryAllocator(rep_->table_options)); GetMemoryAllocator(rep_->table_options), false /* for_compaction */,
rep_->blocks_definitely_zstd_compressed);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(rep_->ioptions.info_log, ROCKS_LOG_ERROR(rep_->ioptions.info_log,
@ -1616,38 +1644,6 @@ Status BlockBasedTable::ReadMetaBlock(FilePrefetchBuffer* prefetch_buffer,
return Status::OK(); return Status::OK();
} }
template <typename TBlocklike>
class BlocklikeTraits;
template <>
class BlocklikeTraits<BlockContents> {
public:
static BlockContents* Create(BlockContents&& contents,
SequenceNumber /* global_seqno */,
size_t /* read_amp_bytes_per_bit */,
Statistics* /* statistics */) {
return new BlockContents(std::move(contents));
}
static uint32_t GetNumRestarts(const BlockContents& /* contents */) {
return 0;
}
};
template <>
class BlocklikeTraits<Block> {
public:
static Block* Create(BlockContents&& contents, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, Statistics* statistics) {
return new Block(std::move(contents), global_seqno, read_amp_bytes_per_bit,
statistics);
}
static uint32_t GetNumRestarts(const Block& block) {
return block.NumRestarts();
}
};
template <typename TBlocklike> template <typename TBlocklike>
Status BlockBasedTable::GetDataBlockFromCache( Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key, const Slice& block_cache_key, const Slice& compressed_block_cache_key,
@ -1719,7 +1715,8 @@ Status BlockBasedTable::GetDataBlockFromCache(
std::unique_ptr<TBlocklike> block_holder( std::unique_ptr<TBlocklike> block_holder(
BlocklikeTraits<TBlocklike>::Create( BlocklikeTraits<TBlocklike>::Create(
std::move(contents), rep_->get_global_seqno(block_type), std::move(contents), rep_->get_global_seqno(block_type),
read_amp_bytes_per_bit, statistics)); // uncompressed block read_amp_bytes_per_bit, statistics,
rep_->blocks_definitely_zstd_compressed)); // uncompressed block
if (block_cache != nullptr && block_holder->own_bytes() && if (block_cache != nullptr && block_holder->own_bytes() &&
read_options.fill_cache) { read_options.fill_cache) {
@ -1790,11 +1787,11 @@ Status BlockBasedTable::PutDataBlockToCache(
block_holder.reset(BlocklikeTraits<TBlocklike>::Create( block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
std::move(uncompressed_block_contents), seq_no, read_amp_bytes_per_bit, std::move(uncompressed_block_contents), seq_no, read_amp_bytes_per_bit,
statistics)); statistics, rep_->blocks_definitely_zstd_compressed));
} else { } else {
block_holder.reset(BlocklikeTraits<TBlocklike>::Create( block_holder.reset(BlocklikeTraits<TBlocklike>::Create(
std::move(*raw_block_contents), seq_no, read_amp_bytes_per_bit, std::move(*raw_block_contents), seq_no, read_amp_bytes_per_bit,
statistics)); statistics, rep_->blocks_definitely_zstd_compressed));
} }
// Insert compressed block into compressed block cache. // Insert compressed block into compressed block cache.
@ -1912,7 +1909,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
return iter; return iter;
} }
UncompressionDict uncompression_dict; CachableEntry<UncompressionDict> uncompression_dict;
if (rep_->uncompression_dict_reader) { if (rep_->uncompression_dict_reader) {
const bool no_io = (ro.read_tier == kBlockCacheTier); const bool no_io = (ro.read_tier == kBlockCacheTier);
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
@ -1924,9 +1921,13 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
} }
} }
const UncompressionDict& dict = uncompression_dict.GetValue()
? *uncompression_dict.GetValue()
: UncompressionDict::GetEmptyDict();
CachableEntry<Block> block; CachableEntry<Block> block;
s = RetrieveBlock(prefetch_buffer, ro, handle, uncompression_dict, &block, s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
block_type, get_context, lookup_context, for_compaction, get_context, lookup_context, for_compaction,
/* use_cache */ true); /* use_cache */ true);
if (!s.ok()) { if (!s.ok()) {
@ -2255,15 +2256,11 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// handles - A vector of block handles. Some of them me be NULL handles // handles - A vector of block handles. Some of them me be NULL handles
// scratch - An optional contiguous buffer to read compressed blocks into // scratch - An optional contiguous buffer to read compressed blocks into
void BlockBasedTable::RetrieveMultipleBlocks( void BlockBasedTable::RetrieveMultipleBlocks(
const ReadOptions& options, const ReadOptions& options, const MultiGetRange* batch,
const MultiGetRange* batch, const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses, autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
autovector< autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results, char* scratch, const UncompressionDict& uncompression_dict) const {
char* scratch,
const UncompressionDict& uncompression_dict) const {
RandomAccessFileReader* file = rep_->file.get(); RandomAccessFileReader* file = rep_->file.get();
const Footer& footer = rep_->footer; const Footer& footer = rep_->footer;
const ImmutableCFOptions& ioptions = rep_->ioptions; const ImmutableCFOptions& ioptions = rep_->ioptions;
@ -2459,7 +2456,8 @@ Status BlockBasedTable::RetrieveBlock(
block_type == BlockType::kData block_type == BlockType::kData
? rep_->table_options.read_amp_bytes_per_bit ? rep_->table_options.read_amp_bytes_per_bit
: 0, : 0,
GetMemoryAllocator(rep_->table_options), for_compaction); GetMemoryAllocator(rep_->table_options), for_compaction,
rep_->blocks_definitely_zstd_compressed);
} }
if (!s.ok()) { if (!s.ok()) {
@ -2488,6 +2486,13 @@ template Status BlockBasedTable::RetrieveBlock<Block>(
GetContext* get_context, BlockCacheLookupContext* lookup_context, GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool for_compaction, bool use_cache) const; bool for_compaction, bool use_cache) const;
template Status BlockBasedTable::RetrieveBlock<UncompressionDict>(
FilePrefetchBuffer* prefetch_buffer, const ReadOptions& ro,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<UncompressionDict>* block_entry, BlockType block_type,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
bool for_compaction, bool use_cache) const;
BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState( BlockBasedTable::PartitionedIndexIteratorState::PartitionedIndexIteratorState(
const BlockBasedTable* table, const BlockBasedTable* table,
std::unordered_map<uint64_t, CachableEntry<Block>>* block_map) std::unordered_map<uint64_t, CachableEntry<Block>>* block_map)
@ -3369,7 +3374,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(), MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
sst_file_range.end()); sst_file_range.end());
UncompressionDict uncompression_dict; CachableEntry<UncompressionDict> uncompression_dict;
Status uncompression_dict_status; Status uncompression_dict_status;
if (rep_->uncompression_dict_reader) { if (rep_->uncompression_dict_reader) {
uncompression_dict_status = uncompression_dict_status =
@ -3379,6 +3384,10 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
&uncompression_dict); &uncompression_dict);
} }
const UncompressionDict& dict = uncompression_dict.GetValue()
? *uncompression_dict.GetValue()
: UncompressionDict::GetEmptyDict();
size_t total_len = 0; size_t total_len = 0;
ReadOptions ro = read_options; ReadOptions ro = read_options;
ro.read_tier = kBlockCacheTier; ro.read_tier = kBlockCacheTier;
@ -3427,10 +3436,10 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
BlockHandle handle = v.handle; BlockHandle handle = v.handle;
BlockCacheLookupContext lookup_data_block_context( BlockCacheLookupContext lookup_data_block_context(
TableReaderCaller::kUserMultiGet); TableReaderCaller::kUserMultiGet);
Status s = RetrieveBlock(nullptr, ro, handle, uncompression_dict, Status s = RetrieveBlock(
&(results.back()), BlockType::kData, nullptr, ro, handle, dict, &(results.back()), BlockType::kData,
miter->get_context, &lookup_data_block_context, miter->get_context, &lookup_data_block_context,
/* for_compaction */ false, /* use_cache */ true); /* for_compaction */ false, /* use_cache */ true);
if (s.IsIncomplete()) { if (s.IsIncomplete()) {
s = Status::OK(); s = Status::OK();
} }
@ -3464,9 +3473,8 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
block_buf.reset(scratch); block_buf.reset(scratch);
} }
} }
RetrieveMultipleBlocks(read_options, RetrieveMultipleBlocks(read_options, &data_block_range, &block_handles,
&data_block_range, &block_handles, &statuses, &results, &statuses, &results, scratch, dict);
scratch, uncompression_dict);
} }
} }
@ -4117,7 +4125,7 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
// Output compression dictionary // Output compression dictionary
if (rep_->uncompression_dict_reader) { if (rep_->uncompression_dict_reader) {
UncompressionDict uncompression_dict; CachableEntry<UncompressionDict> uncompression_dict;
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary( s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
nullptr /* prefetch_buffer */, false /* no_io */, nullptr /* prefetch_buffer */, false /* no_io */,
nullptr /* get_context */, nullptr /* lookup_context */, nullptr /* get_context */, nullptr /* lookup_context */,
@ -4126,7 +4134,9 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
return s; return s;
} }
const Slice& raw_dict = uncompression_dict.GetRawDict(); assert(uncompression_dict.GetValue());
const Slice& raw_dict = uncompression_dict.GetValue()->GetRawDict();
out_file->Append( out_file->Append(
"Compression Dictionary:\n" "Compression Dictionary:\n"
"--------------------------------------\n"); "--------------------------------------\n");

@ -318,10 +318,10 @@ class BlockBasedTable : public TableReader {
void RetrieveMultipleBlocks( void RetrieveMultipleBlocks(
const ReadOptions& options, const MultiGetRange* batch, const ReadOptions& options, const MultiGetRange* batch,
const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles, const autovector<BlockHandle, MultiGetContext::MAX_BATCH_SIZE>* handles,
autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses, autovector<Status, MultiGetContext::MAX_BATCH_SIZE>* statuses,
autovector< autovector<CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>*
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results, results,
char* scratch, const UncompressionDict& uncompression_dict) const; char* scratch, const UncompressionDict& uncompression_dict) const;
// Get the iterator from the index reader. // Get the iterator from the index reader.

@ -21,36 +21,36 @@ Status UncompressionDictReader::Create(
assert(!pin || prefetch); assert(!pin || prefetch);
assert(uncompression_dict_reader); assert(uncompression_dict_reader);
CachableEntry<BlockContents> uncompression_dict_block; CachableEntry<UncompressionDict> uncompression_dict;
if (prefetch || !use_cache) { if (prefetch || !use_cache) {
const Status s = ReadUncompressionDictionaryBlock( const Status s = ReadUncompressionDictionary(
table, prefetch_buffer, ReadOptions(), use_cache, table, prefetch_buffer, ReadOptions(), use_cache,
nullptr /* get_context */, lookup_context, &uncompression_dict_block); nullptr /* get_context */, lookup_context, &uncompression_dict);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
if (use_cache && !pin) { if (use_cache && !pin) {
uncompression_dict_block.Reset(); uncompression_dict.Reset();
} }
} }
uncompression_dict_reader->reset( uncompression_dict_reader->reset(
new UncompressionDictReader(table, std::move(uncompression_dict_block))); new UncompressionDictReader(table, std::move(uncompression_dict)));
return Status::OK(); return Status::OK();
} }
Status UncompressionDictReader::ReadUncompressionDictionaryBlock( Status UncompressionDictReader::ReadUncompressionDictionary(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& read_options, bool use_cache, GetContext* get_context, const ReadOptions& read_options, bool use_cache, GetContext* get_context,
BlockCacheLookupContext* lookup_context, BlockCacheLookupContext* lookup_context,
CachableEntry<BlockContents>* uncompression_dict_block) { CachableEntry<UncompressionDict>* uncompression_dict) {
// TODO: add perf counter for compression dictionary read time // TODO: add perf counter for compression dictionary read time
assert(table); assert(table);
assert(uncompression_dict_block); assert(uncompression_dict);
assert(uncompression_dict_block->IsEmpty()); assert(uncompression_dict->IsEmpty());
const BlockBasedTable::Rep* const rep = table->get_rep(); const BlockBasedTable::Rep* const rep = table->get_rep();
assert(rep); assert(rep);
@ -58,7 +58,7 @@ Status UncompressionDictReader::ReadUncompressionDictionaryBlock(
const Status s = table->RetrieveBlock( const Status s = table->RetrieveBlock(
prefetch_buffer, read_options, rep->compression_dict_handle, prefetch_buffer, read_options, rep->compression_dict_handle,
UncompressionDict::GetEmptyDict(), uncompression_dict_block, UncompressionDict::GetEmptyDict(), uncompression_dict,
BlockType::kCompressionDictionary, get_context, lookup_context, BlockType::kCompressionDictionary, get_context, lookup_context,
/* for_compaction */ false, use_cache); /* for_compaction */ false, use_cache);
@ -73,15 +73,14 @@ Status UncompressionDictReader::ReadUncompressionDictionaryBlock(
return s; return s;
} }
Status UncompressionDictReader::GetOrReadUncompressionDictionaryBlock( Status UncompressionDictReader::GetOrReadUncompressionDictionary(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context, FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context, BlockCacheLookupContext* lookup_context,
CachableEntry<BlockContents>* uncompression_dict_block) const { CachableEntry<UncompressionDict>* uncompression_dict) const {
assert(uncompression_dict_block); assert(uncompression_dict);
if (!uncompression_dict_block_.IsEmpty()) { if (!uncompression_dict_.IsEmpty()) {
uncompression_dict_block->SetUnownedValue( uncompression_dict->SetUnownedValue(uncompression_dict_.GetValue());
uncompression_dict_block_.GetValue());
return Status::OK(); return Status::OK();
} }
@ -90,42 +89,17 @@ Status UncompressionDictReader::GetOrReadUncompressionDictionaryBlock(
read_options.read_tier = kBlockCacheTier; read_options.read_tier = kBlockCacheTier;
} }
return ReadUncompressionDictionaryBlock( return ReadUncompressionDictionary(table_, prefetch_buffer, read_options,
table_, prefetch_buffer, read_options, cache_dictionary_blocks(), cache_dictionary_blocks(), get_context,
get_context, lookup_context, uncompression_dict_block); lookup_context, uncompression_dict);
}
Status UncompressionDictReader::GetOrReadUncompressionDictionary(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
UncompressionDict* uncompression_dict) const {
CachableEntry<BlockContents> uncompression_dict_block;
const Status s = GetOrReadUncompressionDictionaryBlock(
prefetch_buffer, no_io, get_context, lookup_context,
&uncompression_dict_block);
if (!s.ok()) {
return s;
}
assert(uncompression_dict);
assert(table_);
assert(table_->get_rep());
UncompressionDict dict(uncompression_dict_block.GetValue()->data,
table_->get_rep()->blocks_definitely_zstd_compressed);
*uncompression_dict = std::move(dict);
uncompression_dict_block.TransferTo(uncompression_dict);
return Status::OK();
} }
size_t UncompressionDictReader::ApproximateMemoryUsage() const { size_t UncompressionDictReader::ApproximateMemoryUsage() const {
assert(!uncompression_dict_block_.GetOwnValue() || assert(!uncompression_dict_.GetOwnValue() ||
uncompression_dict_block_.GetValue() != nullptr); uncompression_dict_.GetValue() != nullptr);
size_t usage = uncompression_dict_block_.GetOwnValue() size_t usage = uncompression_dict_.GetOwnValue()
? uncompression_dict_block_.GetValue()->ApproximateMemoryUsage() ? uncompression_dict_.GetValue()->ApproximateMemoryUsage()
: 0; : 0;
#ifdef ROCKSDB_MALLOC_USABLE_SIZE #ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size(const_cast<UncompressionDictReader*>(this)); usage += malloc_usable_size(const_cast<UncompressionDictReader*>(this));

@ -33,34 +33,27 @@ class UncompressionDictReader {
Status GetOrReadUncompressionDictionary( Status GetOrReadUncompressionDictionary(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context, FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context, BlockCacheLookupContext* lookup_context,
UncompressionDict* uncompression_dict) const; CachableEntry<UncompressionDict>* uncompression_dict) const;
size_t ApproximateMemoryUsage() const; size_t ApproximateMemoryUsage() const;
private: private:
UncompressionDictReader( UncompressionDictReader(const BlockBasedTable* t,
const BlockBasedTable* t, CachableEntry<UncompressionDict>&& uncompression_dict)
CachableEntry<BlockContents>&& uncompression_dict_block) : table_(t), uncompression_dict_(std::move(uncompression_dict)) {
: table_(t),
uncompression_dict_block_(std::move(uncompression_dict_block)) {
assert(table_); assert(table_);
} }
bool cache_dictionary_blocks() const; bool cache_dictionary_blocks() const;
static Status ReadUncompressionDictionaryBlock( static Status ReadUncompressionDictionary(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer, const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& read_options, bool use_cache, GetContext* get_context, const ReadOptions& read_options, bool use_cache, GetContext* get_context,
BlockCacheLookupContext* lookup_context, BlockCacheLookupContext* lookup_context,
CachableEntry<BlockContents>* uncompression_dict_block); CachableEntry<UncompressionDict>* uncompression_dict);
Status GetOrReadUncompressionDictionaryBlock(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<BlockContents>* uncompression_dict_block) const;
const BlockBasedTable* table_; const BlockBasedTable* table_;
CachableEntry<BlockContents> uncompression_dict_block_; CachableEntry<UncompressionDict> uncompression_dict_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -21,7 +21,6 @@
#include <string> #include <string>
#include "memory/memory_allocator.h" #include "memory/memory_allocator.h"
#include "rocksdb/cleanable.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "util/coding.h" #include "util/coding.h"
@ -217,14 +216,19 @@ struct CompressionDict {
// Holds dictionary and related data, like ZSTD's digested uncompression // Holds dictionary and related data, like ZSTD's digested uncompression
// dictionary. // dictionary.
struct UncompressionDict : public Cleanable { struct UncompressionDict {
// Block containing the data for the compression dictionary. It is non-empty // Block containing the data for the compression dictionary in case the
// only if the constructor that takes a string parameter is used. // constructor that takes a string parameter is used.
std::string dict_; std::string dict_;
// Slice pointing to the compression dictionary data. Points to // Block containing the data for the compression dictionary in case the
// dict_ if the string constructor is used. In the case of the Slice // constructor that takes a Slice parameter is used and the passed in
// constructor, it is a copy of the Slice passed by the caller. // CacheAllocationPtr is not nullptr.
CacheAllocationPtr allocation_;
// Slice pointing to the compression dictionary data. Can point to
// dict_, allocation_, or some other memory location, depending on how
// the object was constructed.
Slice slice_; Slice slice_;
#ifdef ROCKSDB_ZSTD_DDICT #ifdef ROCKSDB_ZSTD_DDICT
@ -232,18 +236,12 @@ struct UncompressionDict : public Cleanable {
ZSTD_DDict* zstd_ddict_ = nullptr; ZSTD_DDict* zstd_ddict_ = nullptr;
#endif // ROCKSDB_ZSTD_DDICT #endif // ROCKSDB_ZSTD_DDICT
// Slice constructor: it is the caller's responsibility to either
// a) make sure slice remains valid throughout the lifecycle of this object OR
// b) transfer the management of the underlying resource (e.g. cache handle)
// to this object, in which case UncompressionDict is self-contained, and the
// resource is guaranteed to be released (via the cleanup logic in Cleanable)
// when UncompressionDict is destroyed.
#ifdef ROCKSDB_ZSTD_DDICT #ifdef ROCKSDB_ZSTD_DDICT
UncompressionDict(Slice slice, bool using_zstd) UncompressionDict(std::string dict, bool using_zstd)
#else // ROCKSDB_ZSTD_DDICT #else // ROCKSDB_ZSTD_DDICT
UncompressionDict(Slice slice, bool /*using_zstd*/) UncompressionDict(std::string dict, bool /* using_zstd */)
#endif // ROCKSDB_ZSTD_DDICT #endif // ROCKSDB_ZSTD_DDICT
: slice_(std::move(slice)) { : dict_(std::move(dict)), slice_(dict_) {
#ifdef ROCKSDB_ZSTD_DDICT #ifdef ROCKSDB_ZSTD_DDICT
if (!slice_.empty() && using_zstd) { if (!slice_.empty() && using_zstd) {
zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size()); zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
@ -252,14 +250,25 @@ struct UncompressionDict : public Cleanable {
#endif // ROCKSDB_ZSTD_DDICT #endif // ROCKSDB_ZSTD_DDICT
} }
// String constructor: results in a self-contained UncompressionDict. #ifdef ROCKSDB_ZSTD_DDICT
UncompressionDict(std::string dict, bool using_zstd) UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
: UncompressionDict(Slice(dict), using_zstd) { bool using_zstd)
dict_ = std::move(dict); #else // ROCKSDB_ZSTD_DDICT
UncompressionDict(Slice slice, CacheAllocationPtr&& allocation,
bool /* using_zstd */)
#endif // ROCKSDB_ZSTD_DDICT
: allocation_(std::move(allocation)), slice_(std::move(slice)) {
#ifdef ROCKSDB_ZSTD_DDICT
if (!slice_.empty() && using_zstd) {
zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
assert(zstd_ddict_ != nullptr);
}
#endif // ROCKSDB_ZSTD_DDICT
} }
UncompressionDict(UncompressionDict&& rhs) UncompressionDict(UncompressionDict&& rhs)
: dict_(std::move(rhs.dict_)), : dict_(std::move(rhs.dict_)),
allocation_(std::move(rhs.allocation_)),
slice_(std::move(rhs.slice_)) slice_(std::move(rhs.slice_))
#ifdef ROCKSDB_ZSTD_DDICT #ifdef ROCKSDB_ZSTD_DDICT
, ,
@ -288,6 +297,7 @@ struct UncompressionDict : public Cleanable {
} }
dict_ = std::move(rhs.dict_); dict_ = std::move(rhs.dict_);
allocation_ = std::move(rhs.allocation_);
slice_ = std::move(rhs.slice_); slice_ = std::move(rhs.slice_);
#ifdef ROCKSDB_ZSTD_DDICT #ifdef ROCKSDB_ZSTD_DDICT
@ -298,6 +308,12 @@ struct UncompressionDict : public Cleanable {
return *this; return *this;
} }
// The object is self-contained if the string constructor is used, or the
// Slice constructor is invoked with a non-null allocation. Otherwise, it
// is the caller's responsibility to ensure that the underlying storage
// outlives this object.
bool own_bytes() const { return !dict_.empty() || allocation_; }
const Slice& GetRawDict() const { return slice_; } const Slice& GetRawDict() const { return slice_; }
#ifdef ROCKSDB_ZSTD_DDICT #ifdef ROCKSDB_ZSTD_DDICT
@ -310,12 +326,19 @@ struct UncompressionDict : public Cleanable {
} }
size_t ApproximateMemoryUsage() const { size_t ApproximateMemoryUsage() const {
size_t usage = 0; size_t usage = sizeof(struct UncompressionDict);
usage += sizeof(struct UncompressionDict); usage += dict_.size();
if (allocation_) {
auto allocator = allocation_.get_deleter().allocator;
if (allocator) {
usage += allocator->UsableSize(allocation_.get(), slice_.size());
} else {
usage += slice_.size();
}
}
#ifdef ROCKSDB_ZSTD_DDICT #ifdef ROCKSDB_ZSTD_DDICT
usage += ZSTD_sizeof_DDict(zstd_ddict_); usage += ZSTD_sizeof_DDict(zstd_ddict_);
#endif // ROCKSDB_ZSTD_DDICT #endif // ROCKSDB_ZSTD_DDICT
usage += dict_.size();
return usage; return usage;
} }

Loading…
Cancel
Save