Cache dictionary used for decompressing data blocks (#4881)

Summary:
- If block cache disabled or not used for meta-blocks, `BlockBasedTableReader::Rep::uncompression_dict` owns the `UncompressionDict`. It is preloaded during `PrefetchIndexAndFilterBlocks`.
- If block cache is enabled and used for meta-blocks, block cache owns the `UncompressionDict`, which holds dictionary and digested dictionary when needed. It is never prefetched though there is a TODO for this in the code. The cache key is simply the compression dictionary block handle.
- New stats for compression dictionary accesses in block cache: "BLOCK_CACHE_COMPRESSION_DICT_*" and "compression_dict_block_read_count"
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4881

Differential Revision: D13663801

Pulled By: ajkr

fbshipit-source-id: bdcc54044e180855cdcc57639b493b0e016c9a3f
main
Andrew Kryczka 6 years ago committed by Facebook Github Bot
parent 43defe9872
commit 8ec3e72551
  1. 1
      HISTORY.md
  2. 1
      TARGETS
  3. 2
      build_tools/fbcode_config.sh
  4. 68
      db/db_block_cache_test.cc
  5. 2
      include/rocksdb/perf_context.h
  6. 9
      include/rocksdb/statistics.h
  7. 4
      include/rocksdb/table.h
  8. 2
      monitoring/perf_context.cc
  9. 10
      monitoring/statistics.cc
  10. 5
      table/block_based_table_builder.cc
  11. 288
      table/block_based_table_reader.cc
  12. 45
      table/block_based_table_reader.h
  13. 3
      table/block_fetcher.cc
  14. 8
      table/block_fetcher.h
  15. 16
      table/get_context.cc
  16. 4
      table/get_context.h
  17. 29
      table/meta_blocks.cc
  18. 9
      table/table_test.cc
  19. 100
      util/compression.h

@ -5,6 +5,7 @@
* Make DB ignore dropped column families while committing results of atomic flush.
* RocksDB may choose to preopen some files even if options.max_open_files != -1. This may make DB open slightly longer.
* For users of dictionary compression with ZSTD v0.7.0+, we now reuse the same digested dictionary when compressing each of an SST file's data blocks for faster compression speeds.
* For all users of dictionary compression who set `cache_index_and_filter_blocks == true`, we now store dictionary data used for decompression in the block cache for better control over memory usage. For users of ZSTD v1.1.4+ who compile with -DZSTD_STATIC_LINKING_ONLY, this includes a digested dictionary, which is used to increase decompression speed.
### Public API Change
* CompactionPri = kMinOverlappingRatio also uses compensated file size, which boosts file with lots of tombstones to be compacted first.

@ -20,6 +20,7 @@ ROCKSDB_COMPILER_FLAGS = [
"-DBZIP2",
"-DLZ4",
"-DZSTD",
"-DZSTD_STATIC_LINKING_ONLY",
"-DGFLAGS=gflags",
"-DNUMA",
"-DTBB",

@ -51,7 +51,7 @@ if test -z $PIC_BUILD; then
else
ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a"
fi
CFLAGS+=" -DZSTD"
CFLAGS+=" -DZSTD -DZSTD_STATIC_LINKING_ONLY"
# location of gflags headers and libraries
GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/"

@ -631,6 +631,74 @@ TEST_F(DBBlockCacheTest, CompressedCache) {
}
}
TEST_F(DBBlockCacheTest, CacheCompressionDict) {
const int kNumFiles = 4;
const int kNumEntriesPerFile = 32;
const int kNumBytesPerEntry = 1024;
// Try all the available libraries that support dictionary compression
std::vector<CompressionType> compression_types;
#ifdef ZLIB
compression_types.push_back(kZlibCompression);
#endif // ZLIB
#if LZ4_VERSION_NUMBER >= 10400
compression_types.push_back(kLZ4Compression);
compression_types.push_back(kLZ4HCCompression);
#endif // LZ4_VERSION_NUMBER >= 10400
#if ZSTD_VERSION_NUMBER >= 500
compression_types.push_back(kZSTD);
#endif // ZSTD_VERSION_NUMBER >= 500
Random rnd(301);
for (auto compression_type : compression_types) {
Options options = CurrentOptions();
options.compression = compression_type;
options.compression_opts.max_dict_bytes = 4096;
options.create_if_missing = true;
options.num_levels = 2;
options.statistics = rocksdb::CreateDBStatistics();
options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry;
BlockBasedTableOptions table_options;
table_options.cache_index_and_filter_blocks = true;
table_options.block_cache.reset(new MockCache());
options.table_factory.reset(new BlockBasedTableFactory(table_options));
DestroyAndReopen(options);
for (int i = 0; i < kNumFiles; ++i) {
ASSERT_EQ(i, NumTableFilesAtLevel(0, 0));
for (int j = 0; j < kNumEntriesPerFile; ++j) {
std::string value = RandomString(&rnd, kNumBytesPerEntry);
ASSERT_OK(Put(Key(j * kNumFiles + i), value.c_str()));
}
ASSERT_OK(Flush());
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(1));
// Seek to a key in a file. It should cause the SST's dictionary meta-block
// to be read.
RecordCacheCounters(options);
ASSERT_EQ(0,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD));
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
ReadOptions read_options;
ASSERT_NE("NOT_FOUND", Get(Key(kNumFiles * kNumEntriesPerFile - 1)));
// Two blocks missed/added: dictionary and data block
// One block hit: index since it's prefetched
CheckCacheCounters(options, 2 /* expected_misses */, 1 /* expected_hits */,
2 /* expected_inserts */, 0 /* expected_failures */);
ASSERT_EQ(1,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD));
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
}
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -67,6 +67,8 @@ struct PerfContext {
uint64_t index_block_read_count; // total number of index block reads
uint64_t block_cache_filter_hit_count; // total number of filter block hits
uint64_t filter_block_read_count; // total number of filter block reads
uint64_t compression_dict_block_read_count; // total number of compression
// dictionary block reads
uint64_t block_checksum_time; // total nanos spent on block checksum
uint64_t block_decompress_time; // total nanos spent on block decompression

@ -320,12 +320,19 @@ enum Tickers : uint32_t {
// # of times snapshot_mutex_ is acquired in the fast path.
TXN_SNAPSHOT_MUTEX_OVERHEAD,
// Number of keys actually found in MultiGet calls (vs number requested by caller)
// Number of keys actually found in MultiGet calls (vs number requested by
// caller)
// NUMBER_MULTIGET_KEYS_READ gives the number requested by caller
NUMBER_MULTIGET_KEYS_FOUND,
NO_ITERATOR_CREATED, // number of iterators created
NO_ITERATOR_DELETED, // number of iterators deleted
BLOCK_CACHE_COMPRESSION_DICT_MISS,
BLOCK_CACHE_COMPRESSION_DICT_HIT,
BLOCK_CACHE_COMPRESSION_DICT_ADD,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT,
TICKER_ENUM_MAX
};

@ -62,6 +62,10 @@ struct BlockBasedTableOptions {
// TODO(kailiu) Temporarily disable this feature by making the default value
// to be false.
//
// TODO(ajkr) we need to update names of variables controlling meta-block
// caching as they should now apply to range tombstone and compression
// dictionary meta-blocks, in addition to index and filter meta-blocks.
//
// Indicating if we'd put index/filter blocks to the block cache.
// If not specified, each "table reader" object will pre-load index/filter
// block during table initialization.

@ -48,6 +48,7 @@ void PerfContext::Reset() {
index_block_read_count = 0;
block_cache_filter_hit_count = 0;
filter_block_read_count = 0;
compression_dict_block_read_count = 0;
block_checksum_time = 0;
block_decompress_time = 0;
get_read_bytes = 0;
@ -163,6 +164,7 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const {
PERF_CONTEXT_OUTPUT(index_block_read_count);
PERF_CONTEXT_OUTPUT(block_cache_filter_hit_count);
PERF_CONTEXT_OUTPUT(filter_block_read_count);
PERF_CONTEXT_OUTPUT(compression_dict_block_read_count);
PERF_CONTEXT_OUTPUT(block_checksum_time);
PERF_CONTEXT_OUTPUT(block_decompress_time);
PERF_CONTEXT_OUTPUT(get_read_bytes);

@ -168,6 +168,16 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{NUMBER_MULTIGET_KEYS_FOUND, "rocksdb.number.multiget.keys.found"},
{NO_ITERATOR_CREATED, "rocksdb.num.iterator.created"},
{NO_ITERATOR_DELETED, "rocksdb.num.iterator.deleted"},
{BLOCK_CACHE_COMPRESSION_DICT_MISS,
"rocksdb.block.cache.compression.dict.miss"},
{BLOCK_CACHE_COMPRESSION_DICT_HIT,
"rocksdb.block.cache.compression.dict.hit"},
{BLOCK_CACHE_COMPRESSION_DICT_ADD,
"rocksdb.block.cache.compression.dict.add"},
{BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
"rocksdb.block.cache.compression.dict.bytes.insert"},
{BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT,
"rocksdb.block.cache.compression.dict.bytes.evict"},
};
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {

@ -322,8 +322,9 @@ struct BlockBasedTableBuilder::Rep {
_compression_type, _compression_opts.level),
compression_ctx(_compression_type),
verify_dict(
_compression_dict == nullptr ? Slice() : Slice(*_compression_dict),
_compression_type),
_compression_dict == nullptr ? std::string() : *_compression_dict,
_compression_type == kZSTD ||
_compression_type == kZSTDNotFinalCompression),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align),
compressed_cache_key_prefix_size(0),

@ -72,20 +72,21 @@ namespace {
// The only relevant option is options.verify_checksums for now.
// On failure return non-OK.
// On success fill *result and return OK - caller owns *result
// @param compression_dict Data for presetting the compression library's
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
Status ReadBlockFromFile(
RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer,
const Footer& footer, const ReadOptions& options, const BlockHandle& handle,
std::unique_ptr<Block>* result, const ImmutableCFOptions& ioptions,
bool do_uncompress, bool maybe_compressed, const Slice& compression_dict,
bool do_uncompress, bool maybe_compressed,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options, SequenceNumber global_seqno,
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator) {
BlockContents contents;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle,
&contents, ioptions, do_uncompress,
maybe_compressed, compression_dict, cache_options,
memory_allocator);
maybe_compressed, uncompression_dict,
cache_options, memory_allocator);
Status s = block_fetcher.ReadBlockContents();
if (s.ok()) {
result->reset(new Block(std::move(contents), global_seqno,
@ -124,6 +125,7 @@ void DeleteCachedEntry(const Slice& /*key*/, void* value) {
void DeleteCachedFilterEntry(const Slice& key, void* value);
void DeleteCachedIndexEntry(const Slice& key, void* value);
void DeleteCachedUncompressionDictEntry(const Slice& key, void* value);
// Release the cached entry and decrement its ref count.
void ReleaseCachedEntry(void* arg, void* h) {
@ -241,9 +243,9 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
auto s = ReadBlockFromFile(
file, prefetch_buffer, footer, ReadOptions(), index_handle,
&index_block, ioptions, true /* decompress */,
true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
memory_allocator);
true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, memory_allocator);
if (s.ok()) {
*index_reader = new PartitionIndexReader(
@ -333,16 +335,13 @@ class PartitionIndexReader : public IndexReader, public Cleanable {
for (; biter.Valid(); biter.Next()) {
handle = biter.value();
BlockBasedTable::CachableEntry<Block> block;
Slice compression_dict;
if (rep->compression_dict_block) {
compression_dict = rep->compression_dict_block->data;
}
const bool is_index = true;
// TODO: Support counter batch update for partitioned index and
// filter blocks
s = table_->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), rep, ro, handle, compression_dict, &block,
is_index, nullptr /* get_context */);
prefetch_buffer.get(), rep, ro, handle,
UncompressionDict::GetEmptyDict(), &block, is_index,
nullptr /* get_context */);
assert(s.ok() || block.value == nullptr);
if (s.ok() && block.value != nullptr) {
@ -422,9 +421,9 @@ class BinarySearchIndexReader : public IndexReader {
auto s = ReadBlockFromFile(
file, prefetch_buffer, footer, ReadOptions(), index_handle,
&index_block, ioptions, true /* decompress */,
true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
memory_allocator);
true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, memory_allocator);
if (s.ok()) {
*index_reader = new BinarySearchIndexReader(
@ -496,9 +495,9 @@ class HashIndexReader : public IndexReader {
auto s = ReadBlockFromFile(
file, prefetch_buffer, footer, ReadOptions(), index_handle,
&index_block, ioptions, true /* decompress */,
true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
memory_allocator);
true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, kDisableGlobalSequenceNumber,
0 /* read_amp_bytes_per_bit */, memory_allocator);
if (!s.ok()) {
return s;
@ -531,13 +530,12 @@ class HashIndexReader : public IndexReader {
return Status::OK();
}
Slice dummy_comp_dict;
// Read contents for the blocks
BlockContents prefixes_contents;
BlockFetcher prefixes_block_fetcher(
file, prefetch_buffer, footer, ReadOptions(), prefixes_handle,
&prefixes_contents, ioptions, true /*decompress*/,
true /*maybe_compressed*/, dummy_comp_dict /*compression dict*/,
true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, memory_allocator);
s = prefixes_block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -547,7 +545,7 @@ class HashIndexReader : public IndexReader {
BlockFetcher prefixes_meta_block_fetcher(
file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle,
&prefixes_meta_contents, ioptions, true /*decompress*/,
true /*maybe_compressed*/, dummy_comp_dict /*compression dict*/,
true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, memory_allocator);
s = prefixes_meta_block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -858,9 +856,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions,
if (!s.ok()) {
return s;
}
// Disregard return status of ReadCompressionDictBlock.
s = ReadCompressionDictBlock(rep, prefetch_buffer.get(), meta_iter.get());
s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(),
new_table.get(), prefix_extractor,
prefetch_all, table_options, level,
@ -955,6 +950,11 @@ Status BlockBasedTable::ReadPropertiesBlock(
rep->table_properties.reset(table_properties);
rep->blocks_maybe_compressed = rep->table_properties->compression_name !=
CompressionTypeToString(kNoCompression);
rep->blocks_definitely_zstd_compressed =
(rep->table_properties->compression_name ==
CompressionTypeToString(kZSTD) ||
rep->table_properties->compression_name ==
CompressionTypeToString(kZSTDNotFinalCompression));
}
} else {
ROCKS_LOG_ERROR(rep->ioptions.info_log,
@ -1023,29 +1023,19 @@ Status BlockBasedTable::ReadRangeDelBlock(
Status BlockBasedTable::ReadCompressionDictBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter) {
std::unique_ptr<const BlockContents>* compression_dict_block) {
assert(compression_dict_block != nullptr);
Status s;
bool found_compression_dict;
BlockHandle compression_dict_handle;
s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict,
&compression_dict_handle);
if (!s.ok()) {
ROCKS_LOG_WARN(
rep->ioptions.info_log,
"Error when seeking to compression dictionary block from file: %s",
s.ToString().c_str());
} else if (found_compression_dict && !compression_dict_handle.IsNull()) {
// TODO(andrewkr): Add to block cache if cache_index_and_filter_blocks is
// true.
if (!rep->compression_dict_handle.IsNull()) {
std::unique_ptr<BlockContents> compression_dict_cont{new BlockContents()};
PersistentCacheOptions cache_options;
ReadOptions read_options;
read_options.verify_checksums = false;
read_options.verify_checksums = true;
BlockFetcher compression_block_fetcher(
rep->file.get(), prefetch_buffer, rep->footer, read_options,
compression_dict_handle, compression_dict_cont.get(), rep->ioptions,
false /* decompress */, false /*maybe_compressed*/,
Slice() /*compression dict*/, cache_options);
rep->compression_dict_handle, compression_dict_cont.get(),
rep->ioptions, false /* decompress */, false /*maybe_compressed*/,
UncompressionDict::GetEmptyDict(), cache_options);
s = compression_block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -1055,7 +1045,7 @@ Status BlockBasedTable::ReadCompressionDictBlock(
"block %s",
s.ToString().c_str());
} else {
rep->compression_dict_block = std::move(compression_dict_cont);
*compression_dict_block = std::move(compression_dict_cont);
}
}
return s;
@ -1097,6 +1087,13 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
}
}
{
// Find compression dictionary handle
bool found_compression_dict;
s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict,
&rep->compression_dict_handle);
}
bool need_upper_bound_check =
PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor);
@ -1124,8 +1121,9 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
pin_all || (table_options.pin_top_level_index_and_filter &&
rep->filter_type == Rep::FilterType::kPartitionedFilter);
// pre-fetching of blocks is turned on
// Will use block cache for index/filter blocks access
// Will use block cache for meta-blocks access
// Always prefetch index and filter for level 0
// TODO(ajkr): also prefetch compression dictionary block
if (table_options.cache_index_and_filter_blocks) {
assert(table_options.block_cache != nullptr);
if (prefetch_index) {
@ -1136,10 +1134,12 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
bool disable_prefix_seek =
rep->index_type == BlockBasedTableOptions::kHashSearch &&
need_upper_bound_check;
if (s.ok()) {
std::unique_ptr<InternalIteratorBase<BlockHandle>> iter(
new_table->NewIndexIterator(ReadOptions(), disable_prefix_seek,
nullptr, &index_entry));
s = iter->status();
}
if (s.ok()) {
// This is the first call to NewIndexIterator() since we're in Open().
// On success it should give us ownership of the `CachableEntry` by
@ -1173,12 +1173,15 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
}
}
} else {
// If we don't use block cache for index/filter blocks access, we'll
// pre-load these blocks, which will kept in member variables in Rep
// and with a same life-time as this table object.
// If we don't use block cache for meta-block access, we'll pre-load these
// blocks, which will kept in member variables in Rep and with a same life-
// time as this table object.
IndexReader* index_reader = nullptr;
s = new_table->CreateIndexReader(prefetch_buffer, &index_reader, meta_iter,
level);
if (s.ok()) {
s = new_table->CreateIndexReader(prefetch_buffer, &index_reader,
meta_iter, level);
}
std::unique_ptr<const BlockContents> compression_dict_block;
if (s.ok()) {
rep->index_reader.reset(index_reader);
// The partitions of partitioned index are always stored in cache. They
@ -1201,11 +1204,19 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
filter->CacheDependencies(pin_all, rep->table_prefix_extractor.get());
}
}
s = ReadCompressionDictBlock(rep, prefetch_buffer,
&compression_dict_block);
} else {
delete index_reader;
}
if (s.ok() && !rep->compression_dict_handle.IsNull()) {
assert(compression_dict_block != nullptr);
// TODO(ajkr): find a way to avoid the `compression_dict_block` data copy
rep->uncompression_dict.reset(new UncompressionDict(
compression_dict_block->data.ToString(),
rep->blocks_definitely_zstd_compressed, rep->ioptions.statistics));
}
}
return s;
}
@ -1240,6 +1251,9 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const {
if (rep_->index_reader) {
usage += rep_->index_reader->ApproximateMemoryUsage();
}
if (rep_->uncompression_dict) {
usage += rep_->uncompression_dict->ApproximateMemoryUsage();
}
return usage;
}
@ -1256,7 +1270,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep,
rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(),
rep->footer.metaindex_handle(), &meta, rep->ioptions,
true /* decompress */, true /*maybe_compressed*/,
Slice() /*compression dict*/, rep->persistent_cache_options,
UncompressionDict::GetEmptyDict(), rep->persistent_cache_options,
kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */,
GetMemoryAllocator(rep->table_options));
@ -1279,8 +1293,9 @@ Status BlockBasedTable::GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Rep* rep,
const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block, const Slice& compression_dict,
size_t read_amp_bytes_per_bit, bool is_index, GetContext* get_context) {
BlockBasedTable::CachableEntry<Block>* block,
const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit,
bool is_index, GetContext* get_context) {
Status s;
BlockContents* compressed_block = nullptr;
Cache::Handle* block_cache_compressed_handle = nullptr;
@ -1335,8 +1350,7 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
UncompressionContext context(compression_type);
UncompressionDict dict(compression_dict, compression_type);
UncompressionInfo info(context, dict, compression_type);
UncompressionInfo info(context, uncompression_dict, compression_type);
s = UncompressBlockContents(info, compressed_block->data.data(),
compressed_block->data.size(), &contents,
rep->table_options.format_version, rep->ioptions,
@ -1403,7 +1417,7 @@ Status BlockBasedTable::PutDataBlockToCache(
const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* cached_block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type, uint32_t format_version,
const Slice& compression_dict, SequenceNumber seq_no,
const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
bool is_index, Cache::Priority priority, GetContext* get_context) {
assert(raw_block_comp_type == kNoCompression ||
@ -1415,8 +1429,7 @@ Status BlockBasedTable::PutDataBlockToCache(
Statistics* statistics = ioptions.statistics;
if (raw_block_comp_type != kNoCompression) {
UncompressionContext context(raw_block_comp_type);
UncompressionDict dict(compression_dict, raw_block_comp_type);
UncompressionInfo info(context, dict, raw_block_comp_type);
UncompressionInfo info(context, uncompression_dict, raw_block_comp_type);
s = UncompressBlockContents(info, raw_block_contents->data.data(),
raw_block_contents->data.size(),
&uncompressed_block_contents, format_version,
@ -1522,12 +1535,10 @@ FilterBlockReader* BlockBasedTable::ReadFilter(
}
BlockContents block;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(
rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(),
filter_handle, &block, rep->ioptions, false /* decompress */,
false /*maybe_compressed*/, dummy_comp_dict,
false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
rep->persistent_cache_options, GetMemoryAllocator(rep->table_options));
Status s = block_fetcher.ReadBlockContents();
@ -1672,6 +1683,84 @@ BlockBasedTable::CachableEntry<FilterBlockReader> BlockBasedTable::GetFilter(
return { filter, cache_handle };
}
BlockBasedTable::CachableEntry<UncompressionDict>
BlockBasedTable::GetUncompressionDict(Rep* rep,
FilePrefetchBuffer* prefetch_buffer,
bool no_io, GetContext* get_context) {
if (!rep->table_options.cache_index_and_filter_blocks) {
// block cache is either disabled or not used for meta-blocks. In either
// case, BlockBasedTableReader is the owner of the uncompression dictionary.
return {rep->uncompression_dict.get(), nullptr /* cache handle */};
}
if (rep->compression_dict_handle.IsNull()) {
return {nullptr, nullptr};
}
char cache_key_buf[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto cache_key =
GetCacheKey(rep->cache_key_prefix, rep->cache_key_prefix_size,
rep->compression_dict_handle, cache_key_buf);
auto cache_handle = GetEntryFromCache(
rep->table_options.block_cache.get(), cache_key, rep->level,
BLOCK_CACHE_COMPRESSION_DICT_MISS, BLOCK_CACHE_COMPRESSION_DICT_HIT,
get_context
? &get_context->get_context_stats_.num_cache_compression_dict_miss
: nullptr,
get_context
? &get_context->get_context_stats_.num_cache_compression_dict_hit
: nullptr,
rep->ioptions.statistics, get_context);
UncompressionDict* dict = nullptr;
if (cache_handle != nullptr) {
dict = reinterpret_cast<UncompressionDict*>(
rep->table_options.block_cache->Value(cache_handle));
} else if (no_io) {
// Do not invoke any io.
} else {
std::unique_ptr<const BlockContents> compression_dict_block;
Status s =
ReadCompressionDictBlock(rep, prefetch_buffer, &compression_dict_block);
size_t usage = 0;
if (s.ok()) {
assert(compression_dict_block != nullptr);
// TODO(ajkr): find a way to avoid the `compression_dict_block` data copy
dict = new UncompressionDict(compression_dict_block->data.ToString(),
rep->blocks_definitely_zstd_compressed,
rep->ioptions.statistics);
usage = dict->ApproximateMemoryUsage();
s = rep->table_options.block_cache->Insert(
cache_key, dict, usage, &DeleteCachedUncompressionDictEntry,
&cache_handle,
rep->table_options.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW);
}
if (s.ok()) {
PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
if (get_context != nullptr) {
get_context->get_context_stats_.num_cache_add++;
get_context->get_context_stats_.num_cache_bytes_write += usage;
get_context->get_context_stats_.num_cache_compression_dict_add++;
get_context->get_context_stats_
.num_cache_compression_dict_bytes_insert += usage;
} else {
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_ADD);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_WRITE, usage);
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD);
RecordTick(rep->ioptions.statistics,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, usage);
}
} else {
// There should be no way to get here if block cache insertion succeeded.
// Though it is still possible something failed earlier.
RecordTick(rep->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES);
delete dict;
dict = nullptr;
assert(cache_handle == nullptr);
}
}
return {dict, cache_handle};
}
// disable_prefix_seek should be set to true when prefix_extractor found in SST
// differs from the one in mutable_cf_options and index type is HashBasedIndex
InternalIteratorBase<BlockHandle>* BlockBasedTable::NewIndexIterator(
@ -1803,20 +1892,23 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
FilePrefetchBuffer* prefetch_buffer) {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
const bool no_io = (ro.read_tier == kBlockCacheTier);
Cache* block_cache = rep->table_options.block_cache.get();
CachableEntry<Block> block;
Slice compression_dict;
TBlockIter* iter;
{
const bool no_io = (ro.read_tier == kBlockCacheTier);
auto uncompression_dict_storage =
GetUncompressionDict(rep, prefetch_buffer, no_io, get_context);
const UncompressionDict& uncompression_dict =
uncompression_dict_storage.value == nullptr
? UncompressionDict::GetEmptyDict()
: *uncompression_dict_storage.value;
if (s.ok()) {
if (rep->compression_dict_block) {
compression_dict = rep->compression_dict_block->data;
}
s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle,
compression_dict, &block, is_index,
uncompression_dict, &block, is_index,
get_context);
}
TBlockIter* iter;
if (input_iter != nullptr) {
iter = input_iter;
} else {
@ -1837,7 +1929,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
&block_value, rep->ioptions,
rep->blocks_maybe_compressed /*do_decompress*/,
rep->blocks_maybe_compressed, compression_dict,
rep->blocks_maybe_compressed, uncompression_dict,
rep->persistent_cache_options,
is_index ? kDisableGlobalSequenceNumber : rep->global_seqno,
rep->table_options.read_amp_bytes_per_bit,
@ -1847,6 +1939,10 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
block.value = block_value.release();
}
}
// TODO(ajkr): also pin compression dictionary block when
// `pin_l0_filter_and_index_blocks_in_cache == true`.
uncompression_dict_storage.Release(block_cache);
}
if (s.ok()) {
assert(block.value != nullptr);
@ -1911,7 +2007,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro,
const BlockHandle& handle, Slice compression_dict,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, bool is_index, GetContext* get_context) {
assert(block_entry != nullptr);
const bool no_io = (ro.read_tier == kBlockCacheTier);
@ -1944,7 +2040,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
}
s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed,
rep, ro, block_entry, compression_dict,
rep, ro, block_entry, uncompression_dict,
rep->table_options.read_amp_bytes_per_bit,
is_index, get_context);
@ -1962,7 +2058,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
rep->file.get(), prefetch_buffer, rep->footer, ro, handle,
&raw_block_contents, rep->ioptions,
do_decompress /* do uncompress */, rep->blocks_maybe_compressed,
compression_dict, rep->persistent_cache_options,
uncompression_dict, rep->persistent_cache_options,
GetMemoryAllocator(rep->table_options),
GetMemoryAllocatorForCompressedBlock(rep->table_options));
s = block_fetcher.ReadBlockContents();
@ -1976,7 +2072,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
s = PutDataBlockToCache(
key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions,
block_entry, &raw_block_contents, raw_block_comp_type,
rep->table_options.format_version, compression_dict, seq_no,
rep->table_options.format_version, uncompression_dict, seq_no,
rep->table_options.read_amp_bytes_per_bit,
GetMemoryAllocator(rep->table_options), is_index,
is_index && rep->table_options
@ -2681,12 +2777,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
}
BlockHandle handle = index_iter->value();
BlockContents contents;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, false /*maybe_compressed*/,
dummy_comp_dict /*compression dict*/, rep_->persistent_cache_options);
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
@ -2707,12 +2802,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
Slice input = index_iter->value();
s = handle.DecodeFrom(&input);
BlockContents contents;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer,
ReadOptions(), handle, &contents, rep_->ioptions,
false /* decompress */, false /*maybe_compressed*/,
dummy_comp_dict /*compression dict*/, rep_->persistent_cache_options);
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
break;
@ -2740,11 +2834,24 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options,
Slice ckey;
Status s;
if (!rep_->compression_dict_handle.IsNull()) {
std::unique_ptr<const BlockContents> compression_dict_block;
s = ReadCompressionDictBlock(rep_, nullptr /* prefetch_buffer */,
&compression_dict_block);
if (s.ok()) {
assert(compression_dict_block != nullptr);
UncompressionDict uncompression_dict(
compression_dict_block->data.ToString(),
rep_->blocks_definitely_zstd_compressed);
s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, rep_,
options, &block, uncompression_dict,
0 /* read_amp_bytes_per_bit */);
}
} else {
s = GetDataBlockFromCache(
cache_key, ckey, block_cache, nullptr, rep_, options, &block,
rep_->compression_dict_block ? rep_->compression_dict_block->data
: Slice(),
0 /* read_amp_bytes_per_bit */);
UncompressionDict::GetEmptyDict(), 0 /* read_amp_bytes_per_bit */);
}
assert(s.ok());
bool in_cache = block.value != nullptr;
if (in_cache) {
@ -3014,12 +3121,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file,
BlockHandle handle;
if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) {
BlockContents block;
Slice dummy_comp_dict;
BlockFetcher block_fetcher(
rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer,
ReadOptions(), handle, &block, rep_->ioptions,
false /*decompress*/, false /*maybe_compressed*/,
dummy_comp_dict /*compression dict*/,
UncompressionDict::GetEmptyDict(),
rep_->persistent_cache_options);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
@ -3048,8 +3154,15 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file,
}
// Output compression dictionary
if (rep_->compression_dict_block != nullptr) {
auto compression_dict = rep_->compression_dict_block->data;
if (!rep_->compression_dict_handle.IsNull()) {
std::unique_ptr<const BlockContents> compression_dict_block;
s = ReadCompressionDictBlock(rep_, nullptr /* prefetch_buffer */,
&compression_dict_block);
if (!s.ok()) {
return s;
}
assert(compression_dict_block != nullptr);
auto compression_dict = compression_dict_block->data;
out_file->Append(
"Compression Dictionary:\n"
"--------------------------------------\n");
@ -3292,6 +3405,13 @@ void DeleteCachedIndexEntry(const Slice& /*key*/, void* value) {
delete index_reader;
}
void DeleteCachedUncompressionDictEntry(const Slice& /*key*/, void* value) {
UncompressionDict* dict = reinterpret_cast<UncompressionDict*>(value);
RecordTick(dict->statistics(), BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT,
dict->ApproximateMemoryUsage());
delete dict;
}
} // anonymous namespace
} // namespace rocksdb

@ -259,7 +259,7 @@ class BlockBasedTable : public TableReader {
// block.
static Status MaybeReadBlockAndLoadToCache(
FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro,
const BlockHandle& handle, Slice compression_dict,
const BlockHandle& handle, const UncompressionDict& uncompression_dict,
CachableEntry<Block>* block_entry, bool is_index = false,
GetContext* get_context = nullptr);
@ -275,6 +275,10 @@ class BlockBasedTable : public TableReader {
const bool is_a_filter_partition, bool no_io, GetContext* get_context,
const SliceTransform* prefix_extractor = nullptr) const;
static CachableEntry<UncompressionDict> GetUncompressionDict(
Rep* rep, FilePrefetchBuffer* prefetch_buffer, bool no_io,
GetContext* get_context);
// Get the iterator from the index reader.
// If input_iter is not set, return new Iterator
// If input_iter is set, update it and return it as Iterator
@ -295,15 +299,16 @@ class BlockBasedTable : public TableReader {
// block_cache_compressed.
// On success, Status::OK with be returned and @block will be populated with
// pointer to the block as well as its block handle.
// @param compression_dict Data for presetting the compression library's
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
static Status GetDataBlockFromCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
Cache* block_cache, Cache* block_cache_compressed, Rep* rep,
const ReadOptions& read_options,
BlockBasedTable::CachableEntry<Block>* block,
const Slice& compression_dict, size_t read_amp_bytes_per_bit,
bool is_index = false, GetContext* get_context = nullptr);
const UncompressionDict& uncompression_dict,
size_t read_amp_bytes_per_bit, bool is_index = false,
GetContext* get_context = nullptr);
// Put a raw block (maybe compressed) to the corresponding block caches.
// This method will perform decompression against raw_block if needed and then
@ -313,7 +318,7 @@ class BlockBasedTable : public TableReader {
//
// Allocated memory managed by raw_block_contents will be transferred to
// PutDataBlockToCache(). After the call, the object will be invalid.
// @param compression_dict Data for presetting the compression library's
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
static Status PutDataBlockToCache(
const Slice& block_cache_key, const Slice& compressed_block_cache_key,
@ -321,7 +326,7 @@ class BlockBasedTable : public TableReader {
const ReadOptions& read_options, const ImmutableCFOptions& ioptions,
CachableEntry<Block>* block, BlockContents* raw_block_contents,
CompressionType raw_block_comp_type, uint32_t format_version,
const Slice& compression_dict, SequenceNumber seq_no,
const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator,
bool is_index = false, Cache::Priority pri = Cache::Priority::LOW,
GetContext* get_context = nullptr);
@ -367,9 +372,9 @@ class BlockBasedTable : public TableReader {
Rep* rep, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter,
const InternalKeyComparator& internal_comparator);
static Status ReadCompressionDictBlock(Rep* rep,
FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter);
static Status ReadCompressionDictBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer,
std::unique_ptr<const BlockContents>* compression_dict_block);
static Status PrefetchIndexAndFilterBlocks(
Rep* rep, FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter, BlockBasedTable* new_table,
@ -488,11 +493,15 @@ struct BlockBasedTable::Rep {
// Footer contains the fixed table information
Footer footer;
// index_reader and filter will be populated and used only when
// options.block_cache is nullptr; otherwise we will get the index block via
// the block cache.
// `index_reader`, `filter`, and `uncompression_dict` will be populated (i.e.,
// non-nullptr) and used only when options.block_cache is nullptr or when
// `cache_index_and_filter_blocks == false`. Otherwise, we will get the index,
// filter, and compression dictionary blocks via the block cache. In that case
// `dummy_index_reader_offset`, `filter_handle`, and `compression_dict_handle`
// are used to lookup these meta-blocks in block cache.
std::unique_ptr<IndexReader> index_reader;
std::unique_ptr<FilterBlockReader> filter;
std::unique_ptr<UncompressionDict> uncompression_dict;
enum class FilterType {
kNoFilter,
@ -502,13 +511,9 @@ struct BlockBasedTable::Rep {
};
FilterType filter_type;
BlockHandle filter_handle;
BlockHandle compression_dict_handle;
std::shared_ptr<const TableProperties> table_properties;
// Block containing the data for the compression dictionary. We take ownership
// for the entire block struct, even though we only use its Slice member. This
// is easier because the Slice member depends on the continued existence of
// another member ("allocation").
std::unique_ptr<const BlockContents> compression_dict_block;
BlockBasedTableOptions::IndexType index_type;
bool hash_index_allow_collision;
bool whole_key_filtering;
@ -545,6 +550,12 @@ struct BlockBasedTable::Rep {
// before reading individual blocks enables certain optimizations.
bool blocks_maybe_compressed = true;
// If true, data blocks in this file are definitely ZSTD compressed. If false
// they might not be. When false we skip creating a ZSTD digested
// uncompression dictionary. Even if we get a false negative, things should
// still work, just not as quickly.
bool blocks_definitely_zstd_compressed = false;
bool closed = false;
const bool immortal_table;

@ -257,8 +257,7 @@ Status BlockFetcher::ReadBlockContents() {
if (do_uncompress_ && compression_type_ != kNoCompression) {
// compressed page, uncompress, update cache
UncompressionContext context(compression_type_);
UncompressionDict dict(compression_dict_, compression_type_);
UncompressionInfo info(context, dict, compression_type_);
UncompressionInfo info(context, uncompression_dict_, compression_type_);
status_ = UncompressBlockContents(info, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_,
memory_allocator_);

@ -19,14 +19,14 @@ class BlockFetcher {
// The only relevant option is options.verify_checksums for now.
// On failure return non-OK.
// On success fill *result and return OK - caller owns *result
// @param compression_dict Data for presetting the compression library's
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
BlockFetcher(RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ReadOptions& read_options, const BlockHandle& handle,
BlockContents* contents, const ImmutableCFOptions& ioptions,
bool do_uncompress, bool maybe_compressed,
const Slice& compression_dict,
const UncompressionDict& uncompression_dict,
const PersistentCacheOptions& cache_options,
MemoryAllocator* memory_allocator = nullptr,
MemoryAllocator* memory_allocator_compressed = nullptr)
@ -39,7 +39,7 @@ class BlockFetcher {
ioptions_(ioptions),
do_uncompress_(do_uncompress),
maybe_compressed_(maybe_compressed),
compression_dict_(compression_dict),
uncompression_dict_(uncompression_dict),
cache_options_(cache_options),
memory_allocator_(memory_allocator),
memory_allocator_compressed_(memory_allocator_compressed) {}
@ -58,7 +58,7 @@ class BlockFetcher {
const ImmutableCFOptions& ioptions_;
bool do_uncompress_;
bool maybe_compressed_;
const Slice& compression_dict_;
const UncompressionDict& uncompression_dict_;
const PersistentCacheOptions& cache_options_;
MemoryAllocator* memory_allocator_;
MemoryAllocator* memory_allocator_compressed_;

@ -107,6 +107,10 @@ void GetContext::ReportCounters() {
RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
get_context_stats_.num_cache_filter_hit);
}
if (get_context_stats_.num_cache_compression_dict_hit > 0) {
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
get_context_stats_.num_cache_compression_dict_hit);
}
if (get_context_stats_.num_cache_index_miss > 0) {
RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
get_context_stats_.num_cache_index_miss);
@ -119,6 +123,10 @@ void GetContext::ReportCounters() {
RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
get_context_stats_.num_cache_data_miss);
}
if (get_context_stats_.num_cache_compression_dict_miss > 0) {
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
get_context_stats_.num_cache_compression_dict_miss);
}
if (get_context_stats_.num_cache_bytes_read > 0) {
RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
get_context_stats_.num_cache_bytes_read);
@ -158,6 +166,14 @@ void GetContext::ReportCounters() {
RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
get_context_stats_.num_cache_filter_bytes_insert);
}
if (get_context_stats_.num_cache_compression_dict_add > 0) {
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
get_context_stats_.num_cache_compression_dict_add);
}
if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
get_context_stats_.num_cache_compression_dict_bytes_insert);
}
}
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,

@ -21,9 +21,11 @@ struct GetContextStats {
uint64_t num_cache_index_hit = 0;
uint64_t num_cache_data_hit = 0;
uint64_t num_cache_filter_hit = 0;
uint64_t num_cache_compression_dict_hit = 0;
uint64_t num_cache_index_miss = 0;
uint64_t num_cache_filter_miss = 0;
uint64_t num_cache_data_miss = 0;
uint64_t num_cache_compression_dict_miss = 0;
uint64_t num_cache_bytes_read = 0;
uint64_t num_cache_miss = 0;
uint64_t num_cache_add = 0;
@ -34,6 +36,8 @@ struct GetContextStats {
uint64_t num_cache_data_bytes_insert = 0;
uint64_t num_cache_filter_add = 0;
uint64_t num_cache_filter_bytes_insert = 0;
uint64_t num_cache_compression_dict_add = 0;
uint64_t num_cache_compression_dict_bytes_insert = 0;
};
class GetContext {

@ -189,13 +189,12 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
ReadOptions read_options;
read_options.verify_checksums = false;
Status s;
Slice compression_dict;
PersistentCacheOptions cache_options;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options,
handle, &block_contents, ioptions,
false /* decompress */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
BlockFetcher block_fetcher(
file, prefetch_buffer, footer, read_options, handle, &block_contents,
ioptions, false /* decompress */, false /*maybe_compressed*/,
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
s = block_fetcher.ReadBlockContents();
// property block is never compressed. Need to add uncompress logic if we are
// to compress it..
@ -332,14 +331,13 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
BlockContents metaindex_contents;
ReadOptions read_options;
read_options.verify_checksums = false;
Slice compression_dict;
PersistentCacheOptions cache_options;
BlockFetcher block_fetcher(file, nullptr /* prefetch_buffer */, footer,
read_options, metaindex_handle,
&metaindex_contents, ioptions,
false /* decompress */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options, memory_allocator);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
@ -402,13 +400,12 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size,
BlockContents metaindex_contents;
ReadOptions read_options;
read_options.verify_checksums = false;
Slice compression_dict;
PersistentCacheOptions cache_options;
BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions,
false /* do decompression */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
s = block_fetcher.ReadBlockContents();
if (!s.ok()) {
return s;
@ -445,13 +442,13 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
BlockContents metaindex_contents;
ReadOptions read_options;
read_options.verify_checksums = false;
Slice compression_dict;
PersistentCacheOptions cache_options;
BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options,
metaindex_handle, &metaindex_contents, ioptions,
false /* decompress */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
UncompressionDict::GetEmptyDict(), cache_options,
memory_allocator);
status = block_fetcher.ReadBlockContents();
if (!status.ok()) {
return status;
@ -478,7 +475,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file,
BlockFetcher block_fetcher2(
file, prefetch_buffer, footer, read_options, block_handle, contents,
ioptions, false /* decompress */, false /*maybe_compressed*/,
compression_dict, cache_options, memory_allocator);
UncompressionDict::GetEmptyDict(), cache_options, memory_allocator);
return block_fetcher2.ReadBlockContents();
}

@ -3576,13 +3576,13 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
BlockContents* contents) {
ReadOptions read_options;
read_options.verify_checksums = false;
Slice compression_dict;
PersistentCacheOptions cache_options;
BlockFetcher block_fetcher(
file, nullptr /* prefetch_buffer */, footer, read_options, handle,
contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, compression_dict, cache_options);
false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
cache_options);
ASSERT_OK(block_fetcher.ReadBlockContents());
};
@ -3663,13 +3663,12 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
// read metaindex
auto metaindex_handle = footer.metaindex_handle();
BlockContents metaindex_contents;
Slice compression_dict;
PersistentCacheOptions pcache_opts;
BlockFetcher block_fetcher(
table_reader.get(), nullptr /* prefetch_buffer */, footer, ReadOptions(),
metaindex_handle, &metaindex_contents, ioptions, false /* decompress */,
false /*maybe_compressed*/, compression_dict, pcache_opts,
nullptr /*memory_allocator*/);
false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(),
pcache_opts, nullptr /*memory_allocator*/);
ASSERT_OK(block_fetcher.ReadBlockContents());
Block metaindex_block(std::move(metaindex_contents),
kDisableGlobalSequenceNumber);

@ -11,6 +11,13 @@
#include <algorithm>
#include <limits>
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
#ifdef OS_FREEBSD
#include <malloc_np.h>
#else // OS_FREEBSD
#include <malloc.h>
#endif // OS_FREEBSD
#endif // ROCKSDB_MALLOC_USABLE_SIZE
#include <string>
#include "rocksdb/options.h"
@ -54,6 +61,16 @@ ZSTD_customMem GetJeZstdAllocationOverrides();
#endif // defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) &&
// defined(ZSTD_STATIC_LINKING_ONLY)
// We require `ZSTD_sizeof_DDict` and `ZSTD_createDDict_byReference` to use
// `ZSTD_DDict`. The former was introduced in v1.0.0 and the latter was
// introduced in v1.1.3. But an important bug fix for `ZSTD_sizeof_DDict` came
// in v1.1.4, so that is the version we require. As of today's latest version
// (v1.3.8), they are both still in the experimental API, which means they are
// only exported when the compiler flag `ZSTD_STATIC_LINKING_ONLY` is set.
#if defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104
#define ROCKSDB_ZSTD_DDICT
#endif // defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104
// Cached data represents a portion that can be re-used
// If, in the future we have more than one native context to
// cache we can arrange this as a tuple
@ -201,42 +218,51 @@ struct CompressionDict {
// Holds dictionary and related data, like ZSTD's digested uncompression
// dictionary.
struct UncompressionDict {
#if ZSTD_VERSION_NUMBER >= 700
#ifdef ROCKSDB_ZSTD_DDICT
ZSTD_DDict* zstd_ddict_;
#endif // ZSTD_VERSION_NUMBER >= 700
Slice dict_;
#if ZSTD_VERSION_NUMBER >= 700
UncompressionDict(Slice dict, CompressionType type) {
#else // ZSTD_VERSION_NUMBER >= 700
UncompressionDict(Slice dict, CompressionType /*type*/) {
#endif // ZSTD_VERSION_NUMBER >= 700
#endif // ROCKSDB_ZSTD_DDICT
// Block containing the data for the compression dictionary. It may be
// redundant with the data held in `zstd_ddict_`.
std::string dict_;
// This `Statistics` pointer is intended to be used upon block cache eviction,
// so only needs to be populated on `UncompressionDict`s that'll be inserted
// into block cache.
Statistics* statistics_;
#ifdef ROCKSDB_ZSTD_DDICT
UncompressionDict(std::string dict, bool using_zstd,
Statistics* _statistics = nullptr) {
#else // ROCKSDB_ZSTD_DDICT
UncompressionDict(std::string dict, bool /*using_zstd*/,
Statistics* _statistics = nullptr) {
#endif // ROCKSDB_ZSTD_DDICT
dict_ = std::move(dict);
#if ZSTD_VERSION_NUMBER >= 700
statistics_ = _statistics;
#ifdef ROCKSDB_ZSTD_DDICT
zstd_ddict_ = nullptr;
if (!dict_.empty() && (type == kZSTD || type == kZSTDNotFinalCompression)) {
zstd_ddict_ = ZSTD_createDDict(dict_.data(), dict_.size());
if (!dict_.empty() && using_zstd) {
zstd_ddict_ = ZSTD_createDDict_byReference(dict_.data(), dict_.size());
assert(zstd_ddict_ != nullptr);
}
#endif // ZSTD_VERSION_NUMBER >= 700
#endif // ROCKSDB_ZSTD_DDICT
}
~UncompressionDict() {
#if ZSTD_VERSION_NUMBER >= 700
#ifdef ROCKSDB_ZSTD_DDICT
size_t res = 0;
if (zstd_ddict_ != nullptr) {
res = ZSTD_freeDDict(zstd_ddict_);
}
assert(res == 0); // Last I checked they can't fail
(void)res; // prevent unused var warning
#endif // ZSTD_VERSION_NUMBER >= 700
#endif // ROCKSDB_ZSTD_DDICT
}
#if ZSTD_VERSION_NUMBER >= 700
#ifdef ROCKSDB_ZSTD_DDICT
const ZSTD_DDict* GetDigestedZstdDDict() const {
return zstd_ddict_;
}
#endif // ZSTD_VERSION_NUMBER >= 700
#endif // ROCKSDB_ZSTD_DDICT
Slice GetRawDict() const { return dict_; }
@ -245,6 +271,18 @@ struct UncompressionDict {
return empty_dict;
}
Statistics* statistics() const { return statistics_; }
size_t ApproximateMemoryUsage() {
size_t usage = 0;
usage += sizeof(struct UncompressionDict);
#ifdef ROCKSDB_ZSTD_DDICT
usage += ZSTD_sizeof_DDict(zstd_ddict_);
#endif // ROCKSDB_ZSTD_DDICT
usage += dict_.size();
return usage;
}
UncompressionDict() = default;
// Disable copy/move
UncompressionDict(const CompressionDict&) = delete;
@ -255,11 +293,10 @@ struct UncompressionDict {
class CompressionContext {
private:
const CompressionType type_;
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
ZSTD_CCtx* zstd_ctx_ = nullptr;
void CreateNativeContext() {
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
void CreateNativeContext(CompressionType type) {
if (type == kZSTD || type == kZSTDNotFinalCompression) {
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ =
ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
@ -277,19 +314,18 @@ class CompressionContext {
public:
// callable inside ZSTD_Compress
ZSTD_CCtx* ZSTDPreallocCtx() const {
assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression);
assert(zstd_ctx_ != nullptr);
return zstd_ctx_;
}
#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
private:
void CreateNativeContext() {}
void CreateNativeContext(CompressionType /* type */) {}
void DestroyNativeContext() {}
#endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
public:
explicit CompressionContext(CompressionType comp_type) : type_(comp_type) {
(void)type_;
CreateNativeContext();
explicit CompressionContext(CompressionType type) {
CreateNativeContext(type);
}
~CompressionContext() { DestroyNativeContext(); }
CompressionContext(const CompressionContext&) = delete;
@ -316,24 +352,22 @@ class CompressionInfo {
class UncompressionContext {
private:
const CompressionType type_;
CompressionContextCache* ctx_cache_ = nullptr;
ZSTDUncompressCachedData uncomp_cached_data_;
public:
struct NoCache {};
// Do not use context cache, used by TableBuilder
UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {}
UncompressionContext(NoCache, CompressionType /* type */) {}
explicit UncompressionContext(CompressionType comp_type) : type_(comp_type) {
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
explicit UncompressionContext(CompressionType type) {
if (type == kZSTD || type == kZSTDNotFinalCompression) {
ctx_cache_ = CompressionContextCache::Instance();
uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
}
}
~UncompressionContext() {
if ((type_ == kZSTD || type_ == kZSTDNotFinalCompression) &&
uncomp_cached_data_.GetCacheIndex() != -1) {
if (uncomp_cached_data_.GetCacheIndex() != -1) {
assert(ctx_cache_ != nullptr);
ctx_cache_->ReturnCachedZSTDUncompressData(
uncomp_cached_data_.GetCacheIndex());
@ -1191,13 +1225,13 @@ inline CacheAllocationPtr ZSTD_Uncompress(
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_DCtx* context = info.context().GetZSTDContext();
assert(context != nullptr);
#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+
#ifdef ROCKSDB_ZSTD_DDICT
if (info.dict().GetDigestedZstdDDict() != nullptr) {
actual_output_length = ZSTD_decompress_usingDDict(
context, output.get(), output_len, input_data, input_length,
info.dict().GetDigestedZstdDDict());
}
#endif // ZSTD_VERSION_NUMBER >= 700
#endif // ROCKSDB_ZSTD_DDICT
if (actual_output_length == 0) {
actual_output_length = ZSTD_decompress_usingDict(
context, output.get(), output_len, input_data, input_length,

Loading…
Cancel
Save