From 8ec3e725510d1d680b376570fc70587063efc506 Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Wed, 23 Jan 2019 18:11:08 -0800 Subject: [PATCH] Cache dictionary used for decompressing data blocks (#4881) Summary: - If block cache disabled or not used for meta-blocks, `BlockBasedTableReader::Rep::uncompression_dict` owns the `UncompressionDict`. It is preloaded during `PrefetchIndexAndFilterBlocks`. - If block cache is enabled and used for meta-blocks, block cache owns the `UncompressionDict`, which holds dictionary and digested dictionary when needed. It is never prefetched though there is a TODO for this in the code. The cache key is simply the compression dictionary block handle. - New stats for compression dictionary accesses in block cache: "BLOCK_CACHE_COMPRESSION_DICT_*" and "compression_dict_block_read_count" Pull Request resolved: https://github.com/facebook/rocksdb/pull/4881 Differential Revision: D13663801 Pulled By: ajkr fbshipit-source-id: bdcc54044e180855cdcc57639b493b0e016c9a3f --- HISTORY.md | 1 + TARGETS | 1 + build_tools/fbcode_config.sh | 2 +- db/db_block_cache_test.cc | 68 ++++++ include/rocksdb/perf_context.h | 2 + include/rocksdb/statistics.h | 9 +- include/rocksdb/table.h | 4 + monitoring/perf_context.cc | 2 + monitoring/statistics.cc | 10 + table/block_based_table_builder.cc | 5 +- table/block_based_table_reader.cc | 360 +++++++++++++++++++---------- table/block_based_table_reader.h | 45 ++-- table/block_fetcher.cc | 3 +- table/block_fetcher.h | 8 +- table/get_context.cc | 16 ++ table/get_context.h | 4 + table/meta_blocks.cc | 29 ++- table/table_test.cc | 9 +- util/compression.h | 100 +++++--- 19 files changed, 477 insertions(+), 201 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 0a70a00a3..9fca0c70c 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,7 @@ * Make DB ignore dropped column families while committing results of atomic flush. * RocksDB may choose to preopen some files even if options.max_open_files != -1. This may make DB open slightly longer. * For users of dictionary compression with ZSTD v0.7.0+, we now reuse the same digested dictionary when compressing each of an SST file's data blocks for faster compression speeds. +* For all users of dictionary compression who set `cache_index_and_filter_blocks == true`, we now store dictionary data used for decompression in the block cache for better control over memory usage. For users of ZSTD v1.1.4+ who compile with -DZSTD_STATIC_LINKING_ONLY, this includes a digested dictionary, which is used to increase decompression speed. ### Public API Change * CompactionPri = kMinOverlappingRatio also uses compensated file size, which boosts file with lots of tombstones to be compacted first. diff --git a/TARGETS b/TARGETS index f9c2ae306..ee0c1774a 100644 --- a/TARGETS +++ b/TARGETS @@ -20,6 +20,7 @@ ROCKSDB_COMPILER_FLAGS = [ "-DBZIP2", "-DLZ4", "-DZSTD", + "-DZSTD_STATIC_LINKING_ONLY", "-DGFLAGS=gflags", "-DNUMA", "-DTBB", diff --git a/build_tools/fbcode_config.sh b/build_tools/fbcode_config.sh index 26e465c38..f46a580bd 100644 --- a/build_tools/fbcode_config.sh +++ b/build_tools/fbcode_config.sh @@ -51,7 +51,7 @@ if test -z $PIC_BUILD; then else ZSTD_LIBS=" $ZSTD_BASE/lib/libzstd_pic.a" fi -CFLAGS+=" -DZSTD" +CFLAGS+=" -DZSTD -DZSTD_STATIC_LINKING_ONLY" # location of gflags headers and libraries GFLAGS_INCLUDE=" -I $GFLAGS_BASE/include/" diff --git a/db/db_block_cache_test.cc b/db/db_block_cache_test.cc index 612e727b6..187a46b3d 100644 --- a/db/db_block_cache_test.cc +++ b/db/db_block_cache_test.cc @@ -631,6 +631,74 @@ TEST_F(DBBlockCacheTest, CompressedCache) { } } +TEST_F(DBBlockCacheTest, CacheCompressionDict) { + const int kNumFiles = 4; + const int kNumEntriesPerFile = 32; + const int kNumBytesPerEntry = 1024; + + // Try all the available libraries that support dictionary compression + std::vector compression_types; +#ifdef ZLIB + compression_types.push_back(kZlibCompression); +#endif // ZLIB +#if LZ4_VERSION_NUMBER >= 10400 + compression_types.push_back(kLZ4Compression); + compression_types.push_back(kLZ4HCCompression); +#endif // LZ4_VERSION_NUMBER >= 10400 +#if ZSTD_VERSION_NUMBER >= 500 + compression_types.push_back(kZSTD); +#endif // ZSTD_VERSION_NUMBER >= 500 + Random rnd(301); + for (auto compression_type : compression_types) { + Options options = CurrentOptions(); + options.compression = compression_type; + options.compression_opts.max_dict_bytes = 4096; + options.create_if_missing = true; + options.num_levels = 2; + options.statistics = rocksdb::CreateDBStatistics(); + options.target_file_size_base = kNumEntriesPerFile * kNumBytesPerEntry; + BlockBasedTableOptions table_options; + table_options.cache_index_and_filter_blocks = true; + table_options.block_cache.reset(new MockCache()); + options.table_factory.reset(new BlockBasedTableFactory(table_options)); + DestroyAndReopen(options); + + for (int i = 0; i < kNumFiles; ++i) { + ASSERT_EQ(i, NumTableFilesAtLevel(0, 0)); + for (int j = 0; j < kNumEntriesPerFile; ++j) { + std::string value = RandomString(&rnd, kNumBytesPerEntry); + ASSERT_OK(Put(Key(j * kNumFiles + i), value.c_str())); + } + ASSERT_OK(Flush()); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(1)); + + // Seek to a key in a file. It should cause the SST's dictionary meta-block + // to be read. + RecordCacheCounters(options); + ASSERT_EQ(0, + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); + ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD)); + ASSERT_EQ( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + 0); + ReadOptions read_options; + ASSERT_NE("NOT_FOUND", Get(Key(kNumFiles * kNumEntriesPerFile - 1))); + // Two blocks missed/added: dictionary and data block + // One block hit: index since it's prefetched + CheckCacheCounters(options, 2 /* expected_misses */, 1 /* expected_hits */, + 2 /* expected_inserts */, 0 /* expected_failures */); + ASSERT_EQ(1, + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS)); + ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD)); + ASSERT_GT( + TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT), + 0); + } +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/include/rocksdb/perf_context.h b/include/rocksdb/perf_context.h index 31740796c..a47b4d5ed 100644 --- a/include/rocksdb/perf_context.h +++ b/include/rocksdb/perf_context.h @@ -67,6 +67,8 @@ struct PerfContext { uint64_t index_block_read_count; // total number of index block reads uint64_t block_cache_filter_hit_count; // total number of filter block hits uint64_t filter_block_read_count; // total number of filter block reads + uint64_t compression_dict_block_read_count; // total number of compression + // dictionary block reads uint64_t block_checksum_time; // total nanos spent on block checksum uint64_t block_decompress_time; // total nanos spent on block decompression diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index d2d0a8d27..a814ea0b3 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -320,12 +320,19 @@ enum Tickers : uint32_t { // # of times snapshot_mutex_ is acquired in the fast path. TXN_SNAPSHOT_MUTEX_OVERHEAD, - // Number of keys actually found in MultiGet calls (vs number requested by caller) + // Number of keys actually found in MultiGet calls (vs number requested by + // caller) // NUMBER_MULTIGET_KEYS_READ gives the number requested by caller NUMBER_MULTIGET_KEYS_FOUND, NO_ITERATOR_CREATED, // number of iterators created NO_ITERATOR_DELETED, // number of iterators deleted + + BLOCK_CACHE_COMPRESSION_DICT_MISS, + BLOCK_CACHE_COMPRESSION_DICT_HIT, + BLOCK_CACHE_COMPRESSION_DICT_ADD, + BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, + BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT, TICKER_ENUM_MAX }; diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index e3407dada..8d97a60e3 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -62,6 +62,10 @@ struct BlockBasedTableOptions { // TODO(kailiu) Temporarily disable this feature by making the default value // to be false. // + // TODO(ajkr) we need to update names of variables controlling meta-block + // caching as they should now apply to range tombstone and compression + // dictionary meta-blocks, in addition to index and filter meta-blocks. + // // Indicating if we'd put index/filter blocks to the block cache. // If not specified, each "table reader" object will pre-load index/filter // block during table initialization. diff --git a/monitoring/perf_context.cc b/monitoring/perf_context.cc index 645df70e3..1046106e2 100644 --- a/monitoring/perf_context.cc +++ b/monitoring/perf_context.cc @@ -48,6 +48,7 @@ void PerfContext::Reset() { index_block_read_count = 0; block_cache_filter_hit_count = 0; filter_block_read_count = 0; + compression_dict_block_read_count = 0; block_checksum_time = 0; block_decompress_time = 0; get_read_bytes = 0; @@ -163,6 +164,7 @@ std::string PerfContext::ToString(bool exclude_zero_counters) const { PERF_CONTEXT_OUTPUT(index_block_read_count); PERF_CONTEXT_OUTPUT(block_cache_filter_hit_count); PERF_CONTEXT_OUTPUT(filter_block_read_count); + PERF_CONTEXT_OUTPUT(compression_dict_block_read_count); PERF_CONTEXT_OUTPUT(block_checksum_time); PERF_CONTEXT_OUTPUT(block_decompress_time); PERF_CONTEXT_OUTPUT(get_read_bytes); diff --git a/monitoring/statistics.cc b/monitoring/statistics.cc index 285287d20..df843889b 100644 --- a/monitoring/statistics.cc +++ b/monitoring/statistics.cc @@ -168,6 +168,16 @@ const std::vector> TickersNameMap = { {NUMBER_MULTIGET_KEYS_FOUND, "rocksdb.number.multiget.keys.found"}, {NO_ITERATOR_CREATED, "rocksdb.num.iterator.created"}, {NO_ITERATOR_DELETED, "rocksdb.num.iterator.deleted"}, + {BLOCK_CACHE_COMPRESSION_DICT_MISS, + "rocksdb.block.cache.compression.dict.miss"}, + {BLOCK_CACHE_COMPRESSION_DICT_HIT, + "rocksdb.block.cache.compression.dict.hit"}, + {BLOCK_CACHE_COMPRESSION_DICT_ADD, + "rocksdb.block.cache.compression.dict.add"}, + {BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, + "rocksdb.block.cache.compression.dict.bytes.insert"}, + {BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT, + "rocksdb.block.cache.compression.dict.bytes.evict"}, }; const std::vector> HistogramsNameMap = { diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index f2b8d9310..60d8a5dbd 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -322,8 +322,9 @@ struct BlockBasedTableBuilder::Rep { _compression_type, _compression_opts.level), compression_ctx(_compression_type), verify_dict( - _compression_dict == nullptr ? Slice() : Slice(*_compression_dict), - _compression_type), + _compression_dict == nullptr ? std::string() : *_compression_dict, + _compression_type == kZSTD || + _compression_type == kZSTDNotFinalCompression), use_delta_encoding_for_index_values(table_opt.format_version >= 4 && !table_opt.block_align), compressed_cache_key_prefix_size(0), diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index c8dc01bfe..b7757ee06 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -72,20 +72,21 @@ namespace { // The only relevant option is options.verify_checksums for now. // On failure return non-OK. // On success fill *result and return OK - caller owns *result -// @param compression_dict Data for presetting the compression library's +// @param uncompression_dict Data for presetting the compression library's // dictionary. Status ReadBlockFromFile( RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ReadOptions& options, const BlockHandle& handle, std::unique_ptr* result, const ImmutableCFOptions& ioptions, - bool do_uncompress, bool maybe_compressed, const Slice& compression_dict, + bool do_uncompress, bool maybe_compressed, + const UncompressionDict& uncompression_dict, const PersistentCacheOptions& cache_options, SequenceNumber global_seqno, size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator) { BlockContents contents; BlockFetcher block_fetcher(file, prefetch_buffer, footer, options, handle, &contents, ioptions, do_uncompress, - maybe_compressed, compression_dict, cache_options, - memory_allocator); + maybe_compressed, uncompression_dict, + cache_options, memory_allocator); Status s = block_fetcher.ReadBlockContents(); if (s.ok()) { result->reset(new Block(std::move(contents), global_seqno, @@ -124,6 +125,7 @@ void DeleteCachedEntry(const Slice& /*key*/, void* value) { void DeleteCachedFilterEntry(const Slice& key, void* value); void DeleteCachedIndexEntry(const Slice& key, void* value); +void DeleteCachedUncompressionDictEntry(const Slice& key, void* value); // Release the cached entry and decrement its ref count. void ReleaseCachedEntry(void* arg, void* h) { @@ -241,9 +243,9 @@ class PartitionIndexReader : public IndexReader, public Cleanable { auto s = ReadBlockFromFile( file, prefetch_buffer, footer, ReadOptions(), index_handle, &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options, - kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, - memory_allocator); + true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), + cache_options, kDisableGlobalSequenceNumber, + 0 /* read_amp_bytes_per_bit */, memory_allocator); if (s.ok()) { *index_reader = new PartitionIndexReader( @@ -333,16 +335,13 @@ class PartitionIndexReader : public IndexReader, public Cleanable { for (; biter.Valid(); biter.Next()) { handle = biter.value(); BlockBasedTable::CachableEntry block; - Slice compression_dict; - if (rep->compression_dict_block) { - compression_dict = rep->compression_dict_block->data; - } const bool is_index = true; // TODO: Support counter batch update for partitioned index and // filter blocks s = table_->MaybeReadBlockAndLoadToCache( - prefetch_buffer.get(), rep, ro, handle, compression_dict, &block, - is_index, nullptr /* get_context */); + prefetch_buffer.get(), rep, ro, handle, + UncompressionDict::GetEmptyDict(), &block, is_index, + nullptr /* get_context */); assert(s.ok() || block.value == nullptr); if (s.ok() && block.value != nullptr) { @@ -422,9 +421,9 @@ class BinarySearchIndexReader : public IndexReader { auto s = ReadBlockFromFile( file, prefetch_buffer, footer, ReadOptions(), index_handle, &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options, - kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, - memory_allocator); + true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), + cache_options, kDisableGlobalSequenceNumber, + 0 /* read_amp_bytes_per_bit */, memory_allocator); if (s.ok()) { *index_reader = new BinarySearchIndexReader( @@ -496,9 +495,9 @@ class HashIndexReader : public IndexReader { auto s = ReadBlockFromFile( file, prefetch_buffer, footer, ReadOptions(), index_handle, &index_block, ioptions, true /* decompress */, - true /*maybe_compressed*/, Slice() /*compression dict*/, cache_options, - kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, - memory_allocator); + true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), + cache_options, kDisableGlobalSequenceNumber, + 0 /* read_amp_bytes_per_bit */, memory_allocator); if (!s.ok()) { return s; @@ -531,13 +530,12 @@ class HashIndexReader : public IndexReader { return Status::OK(); } - Slice dummy_comp_dict; // Read contents for the blocks BlockContents prefixes_contents; BlockFetcher prefixes_block_fetcher( file, prefetch_buffer, footer, ReadOptions(), prefixes_handle, &prefixes_contents, ioptions, true /*decompress*/, - true /*maybe_compressed*/, dummy_comp_dict /*compression dict*/, + true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); s = prefixes_block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -547,7 +545,7 @@ class HashIndexReader : public IndexReader { BlockFetcher prefixes_meta_block_fetcher( file, prefetch_buffer, footer, ReadOptions(), prefixes_meta_handle, &prefixes_meta_contents, ioptions, true /*decompress*/, - true /*maybe_compressed*/, dummy_comp_dict /*compression dict*/, + true /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); s = prefixes_meta_block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -858,9 +856,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, if (!s.ok()) { return s; } - // Disregard return status of ReadCompressionDictBlock. - s = ReadCompressionDictBlock(rep, prefetch_buffer.get(), meta_iter.get()); - s = PrefetchIndexAndFilterBlocks(rep, prefetch_buffer.get(), meta_iter.get(), new_table.get(), prefix_extractor, prefetch_all, table_options, level, @@ -955,6 +950,11 @@ Status BlockBasedTable::ReadPropertiesBlock( rep->table_properties.reset(table_properties); rep->blocks_maybe_compressed = rep->table_properties->compression_name != CompressionTypeToString(kNoCompression); + rep->blocks_definitely_zstd_compressed = + (rep->table_properties->compression_name == + CompressionTypeToString(kZSTD) || + rep->table_properties->compression_name == + CompressionTypeToString(kZSTDNotFinalCompression)); } } else { ROCKS_LOG_ERROR(rep->ioptions.info_log, @@ -1023,29 +1023,19 @@ Status BlockBasedTable::ReadRangeDelBlock( Status BlockBasedTable::ReadCompressionDictBlock( Rep* rep, FilePrefetchBuffer* prefetch_buffer, - InternalIterator* meta_iter) { + std::unique_ptr* compression_dict_block) { + assert(compression_dict_block != nullptr); Status s; - bool found_compression_dict; - BlockHandle compression_dict_handle; - s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict, - &compression_dict_handle); - if (!s.ok()) { - ROCKS_LOG_WARN( - rep->ioptions.info_log, - "Error when seeking to compression dictionary block from file: %s", - s.ToString().c_str()); - } else if (found_compression_dict && !compression_dict_handle.IsNull()) { - // TODO(andrewkr): Add to block cache if cache_index_and_filter_blocks is - // true. + if (!rep->compression_dict_handle.IsNull()) { std::unique_ptr compression_dict_cont{new BlockContents()}; PersistentCacheOptions cache_options; ReadOptions read_options; - read_options.verify_checksums = false; + read_options.verify_checksums = true; BlockFetcher compression_block_fetcher( rep->file.get(), prefetch_buffer, rep->footer, read_options, - compression_dict_handle, compression_dict_cont.get(), rep->ioptions, - false /* decompress */, false /*maybe_compressed*/, - Slice() /*compression dict*/, cache_options); + rep->compression_dict_handle, compression_dict_cont.get(), + rep->ioptions, false /* decompress */, false /*maybe_compressed*/, + UncompressionDict::GetEmptyDict(), cache_options); s = compression_block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -1055,7 +1045,7 @@ Status BlockBasedTable::ReadCompressionDictBlock( "block %s", s.ToString().c_str()); } else { - rep->compression_dict_block = std::move(compression_dict_cont); + *compression_dict_block = std::move(compression_dict_cont); } } return s; @@ -1097,6 +1087,13 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( } } + { + // Find compression dictionary handle + bool found_compression_dict; + s = SeekToCompressionDictBlock(meta_iter, &found_compression_dict, + &rep->compression_dict_handle); + } + bool need_upper_bound_check = PrefixExtractorChanged(rep->table_properties.get(), prefix_extractor); @@ -1124,8 +1121,9 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( pin_all || (table_options.pin_top_level_index_and_filter && rep->filter_type == Rep::FilterType::kPartitionedFilter); // pre-fetching of blocks is turned on - // Will use block cache for index/filter blocks access + // Will use block cache for meta-blocks access // Always prefetch index and filter for level 0 + // TODO(ajkr): also prefetch compression dictionary block if (table_options.cache_index_and_filter_blocks) { assert(table_options.block_cache != nullptr); if (prefetch_index) { @@ -1136,10 +1134,12 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( bool disable_prefix_seek = rep->index_type == BlockBasedTableOptions::kHashSearch && need_upper_bound_check; - std::unique_ptr> iter( - new_table->NewIndexIterator(ReadOptions(), disable_prefix_seek, - nullptr, &index_entry)); - s = iter->status(); + if (s.ok()) { + std::unique_ptr> iter( + new_table->NewIndexIterator(ReadOptions(), disable_prefix_seek, + nullptr, &index_entry)); + s = iter->status(); + } if (s.ok()) { // This is the first call to NewIndexIterator() since we're in Open(). // On success it should give us ownership of the `CachableEntry` by @@ -1173,12 +1173,15 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( } } } else { - // If we don't use block cache for index/filter blocks access, we'll - // pre-load these blocks, which will kept in member variables in Rep - // and with a same life-time as this table object. + // If we don't use block cache for meta-block access, we'll pre-load these + // blocks, which will kept in member variables in Rep and with a same life- + // time as this table object. IndexReader* index_reader = nullptr; - s = new_table->CreateIndexReader(prefetch_buffer, &index_reader, meta_iter, - level); + if (s.ok()) { + s = new_table->CreateIndexReader(prefetch_buffer, &index_reader, + meta_iter, level); + } + std::unique_ptr compression_dict_block; if (s.ok()) { rep->index_reader.reset(index_reader); // The partitions of partitioned index are always stored in cache. They @@ -1201,11 +1204,19 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks( filter->CacheDependencies(pin_all, rep->table_prefix_extractor.get()); } } + s = ReadCompressionDictBlock(rep, prefetch_buffer, + &compression_dict_block); } else { delete index_reader; } + if (s.ok() && !rep->compression_dict_handle.IsNull()) { + assert(compression_dict_block != nullptr); + // TODO(ajkr): find a way to avoid the `compression_dict_block` data copy + rep->uncompression_dict.reset(new UncompressionDict( + compression_dict_block->data.ToString(), + rep->blocks_definitely_zstd_compressed, rep->ioptions.statistics)); + } } - return s; } @@ -1240,6 +1251,9 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const { if (rep_->index_reader) { usage += rep_->index_reader->ApproximateMemoryUsage(); } + if (rep_->uncompression_dict) { + usage += rep_->uncompression_dict->ApproximateMemoryUsage(); + } return usage; } @@ -1256,7 +1270,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), rep->footer.metaindex_handle(), &meta, rep->ioptions, true /* decompress */, true /*maybe_compressed*/, - Slice() /*compression dict*/, rep->persistent_cache_options, + UncompressionDict::GetEmptyDict(), rep->persistent_cache_options, kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */, GetMemoryAllocator(rep->table_options)); @@ -1279,8 +1293,9 @@ Status BlockBasedTable::GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, Rep* rep, const ReadOptions& read_options, - BlockBasedTable::CachableEntry* block, const Slice& compression_dict, - size_t read_amp_bytes_per_bit, bool is_index, GetContext* get_context) { + BlockBasedTable::CachableEntry* block, + const UncompressionDict& uncompression_dict, size_t read_amp_bytes_per_bit, + bool is_index, GetContext* get_context) { Status s; BlockContents* compressed_block = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr; @@ -1335,8 +1350,7 @@ Status BlockBasedTable::GetDataBlockFromCache( // Retrieve the uncompressed contents into a new buffer BlockContents contents; UncompressionContext context(compression_type); - UncompressionDict dict(compression_dict, compression_type); - UncompressionInfo info(context, dict, compression_type); + UncompressionInfo info(context, uncompression_dict, compression_type); s = UncompressBlockContents(info, compressed_block->data.data(), compressed_block->data.size(), &contents, rep->table_options.format_version, rep->ioptions, @@ -1403,7 +1417,7 @@ Status BlockBasedTable::PutDataBlockToCache( const ReadOptions& /*read_options*/, const ImmutableCFOptions& ioptions, CachableEntry* cached_block, BlockContents* raw_block_contents, CompressionType raw_block_comp_type, uint32_t format_version, - const Slice& compression_dict, SequenceNumber seq_no, + const UncompressionDict& uncompression_dict, SequenceNumber seq_no, size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator, bool is_index, Cache::Priority priority, GetContext* get_context) { assert(raw_block_comp_type == kNoCompression || @@ -1415,8 +1429,7 @@ Status BlockBasedTable::PutDataBlockToCache( Statistics* statistics = ioptions.statistics; if (raw_block_comp_type != kNoCompression) { UncompressionContext context(raw_block_comp_type); - UncompressionDict dict(compression_dict, raw_block_comp_type); - UncompressionInfo info(context, dict, raw_block_comp_type); + UncompressionInfo info(context, uncompression_dict, raw_block_comp_type); s = UncompressBlockContents(info, raw_block_contents->data.data(), raw_block_contents->data.size(), &uncompressed_block_contents, format_version, @@ -1522,12 +1535,10 @@ FilterBlockReader* BlockBasedTable::ReadFilter( } BlockContents block; - Slice dummy_comp_dict; - BlockFetcher block_fetcher( rep->file.get(), prefetch_buffer, rep->footer, ReadOptions(), filter_handle, &block, rep->ioptions, false /* decompress */, - false /*maybe_compressed*/, dummy_comp_dict, + false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), rep->persistent_cache_options, GetMemoryAllocator(rep->table_options)); Status s = block_fetcher.ReadBlockContents(); @@ -1672,6 +1683,84 @@ BlockBasedTable::CachableEntry BlockBasedTable::GetFilter( return { filter, cache_handle }; } +BlockBasedTable::CachableEntry +BlockBasedTable::GetUncompressionDict(Rep* rep, + FilePrefetchBuffer* prefetch_buffer, + bool no_io, GetContext* get_context) { + if (!rep->table_options.cache_index_and_filter_blocks) { + // block cache is either disabled or not used for meta-blocks. In either + // case, BlockBasedTableReader is the owner of the uncompression dictionary. + return {rep->uncompression_dict.get(), nullptr /* cache handle */}; + } + if (rep->compression_dict_handle.IsNull()) { + return {nullptr, nullptr}; + } + char cache_key_buf[kMaxCacheKeyPrefixSize + kMaxVarint64Length]; + auto cache_key = + GetCacheKey(rep->cache_key_prefix, rep->cache_key_prefix_size, + rep->compression_dict_handle, cache_key_buf); + auto cache_handle = GetEntryFromCache( + rep->table_options.block_cache.get(), cache_key, rep->level, + BLOCK_CACHE_COMPRESSION_DICT_MISS, BLOCK_CACHE_COMPRESSION_DICT_HIT, + get_context + ? &get_context->get_context_stats_.num_cache_compression_dict_miss + : nullptr, + get_context + ? &get_context->get_context_stats_.num_cache_compression_dict_hit + : nullptr, + rep->ioptions.statistics, get_context); + UncompressionDict* dict = nullptr; + if (cache_handle != nullptr) { + dict = reinterpret_cast( + rep->table_options.block_cache->Value(cache_handle)); + } else if (no_io) { + // Do not invoke any io. + } else { + std::unique_ptr compression_dict_block; + Status s = + ReadCompressionDictBlock(rep, prefetch_buffer, &compression_dict_block); + size_t usage = 0; + if (s.ok()) { + assert(compression_dict_block != nullptr); + // TODO(ajkr): find a way to avoid the `compression_dict_block` data copy + dict = new UncompressionDict(compression_dict_block->data.ToString(), + rep->blocks_definitely_zstd_compressed, + rep->ioptions.statistics); + usage = dict->ApproximateMemoryUsage(); + s = rep->table_options.block_cache->Insert( + cache_key, dict, usage, &DeleteCachedUncompressionDictEntry, + &cache_handle, + rep->table_options.cache_index_and_filter_blocks_with_high_priority + ? Cache::Priority::HIGH + : Cache::Priority::LOW); + } + if (s.ok()) { + PERF_COUNTER_ADD(compression_dict_block_read_count, 1); + if (get_context != nullptr) { + get_context->get_context_stats_.num_cache_add++; + get_context->get_context_stats_.num_cache_bytes_write += usage; + get_context->get_context_stats_.num_cache_compression_dict_add++; + get_context->get_context_stats_ + .num_cache_compression_dict_bytes_insert += usage; + } else { + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_ADD); + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_BYTES_WRITE, usage); + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_COMPRESSION_DICT_ADD); + RecordTick(rep->ioptions.statistics, + BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, usage); + } + } else { + // There should be no way to get here if block cache insertion succeeded. + // Though it is still possible something failed earlier. + RecordTick(rep->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES); + delete dict; + dict = nullptr; + assert(cache_handle == nullptr); + } + } + return {dict, cache_handle}; +} + // disable_prefix_seek should be set to true when prefix_extractor found in SST // differs from the one in mutable_cf_options and index type is HashBasedIndex InternalIteratorBase* BlockBasedTable::NewIndexIterator( @@ -1803,49 +1892,56 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( FilePrefetchBuffer* prefetch_buffer) { PERF_TIMER_GUARD(new_table_block_iter_nanos); - const bool no_io = (ro.read_tier == kBlockCacheTier); Cache* block_cache = rep->table_options.block_cache.get(); CachableEntry block; - Slice compression_dict; - if (s.ok()) { - if (rep->compression_dict_block) { - compression_dict = rep->compression_dict_block->data; - } - s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle, - compression_dict, &block, is_index, - get_context); - } - TBlockIter* iter; - if (input_iter != nullptr) { - iter = input_iter; - } else { - iter = new TBlockIter; - } - // Didn't get any data from block caches. - if (s.ok() && block.value == nullptr) { - if (no_io) { - // Could not read from block_cache and can't do IO - iter->Invalidate(Status::Incomplete("no blocking io")); - return iter; - } - std::unique_ptr block_value; - { - StopWatch sw(rep->ioptions.env, rep->ioptions.statistics, - READ_BLOCK_GET_MICROS); - s = ReadBlockFromFile( - rep->file.get(), prefetch_buffer, rep->footer, ro, handle, - &block_value, rep->ioptions, - rep->blocks_maybe_compressed /*do_decompress*/, - rep->blocks_maybe_compressed, compression_dict, - rep->persistent_cache_options, - is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit, - GetMemoryAllocator(rep->table_options)); - } + { + const bool no_io = (ro.read_tier == kBlockCacheTier); + auto uncompression_dict_storage = + GetUncompressionDict(rep, prefetch_buffer, no_io, get_context); + const UncompressionDict& uncompression_dict = + uncompression_dict_storage.value == nullptr + ? UncompressionDict::GetEmptyDict() + : *uncompression_dict_storage.value; if (s.ok()) { - block.value = block_value.release(); + s = MaybeReadBlockAndLoadToCache(prefetch_buffer, rep, ro, handle, + uncompression_dict, &block, is_index, + get_context); + } + + if (input_iter != nullptr) { + iter = input_iter; + } else { + iter = new TBlockIter; + } + // Didn't get any data from block caches. + if (s.ok() && block.value == nullptr) { + if (no_io) { + // Could not read from block_cache and can't do IO + iter->Invalidate(Status::Incomplete("no blocking io")); + return iter; + } + std::unique_ptr block_value; + { + StopWatch sw(rep->ioptions.env, rep->ioptions.statistics, + READ_BLOCK_GET_MICROS); + s = ReadBlockFromFile( + rep->file.get(), prefetch_buffer, rep->footer, ro, handle, + &block_value, rep->ioptions, + rep->blocks_maybe_compressed /*do_decompress*/, + rep->blocks_maybe_compressed, uncompression_dict, + rep->persistent_cache_options, + is_index ? kDisableGlobalSequenceNumber : rep->global_seqno, + rep->table_options.read_amp_bytes_per_bit, + GetMemoryAllocator(rep->table_options)); + } + if (s.ok()) { + block.value = block_value.release(); + } } + // TODO(ajkr): also pin compression dictionary block when + // `pin_l0_filter_and_index_blocks_in_cache == true`. + uncompression_dict_storage.Release(block_cache); } if (s.ok()) { @@ -1911,7 +2007,7 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator( Status BlockBasedTable::MaybeReadBlockAndLoadToCache( FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, - const BlockHandle& handle, Slice compression_dict, + const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, bool is_index, GetContext* get_context) { assert(block_entry != nullptr); const bool no_io = (ro.read_tier == kBlockCacheTier); @@ -1944,7 +2040,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( } s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, - rep, ro, block_entry, compression_dict, + rep, ro, block_entry, uncompression_dict, rep->table_options.read_amp_bytes_per_bit, is_index, get_context); @@ -1962,7 +2058,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( rep->file.get(), prefetch_buffer, rep->footer, ro, handle, &raw_block_contents, rep->ioptions, do_decompress /* do uncompress */, rep->blocks_maybe_compressed, - compression_dict, rep->persistent_cache_options, + uncompression_dict, rep->persistent_cache_options, GetMemoryAllocator(rep->table_options), GetMemoryAllocatorForCompressedBlock(rep->table_options)); s = block_fetcher.ReadBlockContents(); @@ -1976,7 +2072,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache( s = PutDataBlockToCache( key, ckey, block_cache, block_cache_compressed, ro, rep->ioptions, block_entry, &raw_block_contents, raw_block_comp_type, - rep->table_options.format_version, compression_dict, seq_no, + rep->table_options.format_version, uncompression_dict, seq_no, rep->table_options.read_amp_bytes_per_bit, GetMemoryAllocator(rep->table_options), is_index, is_index && rep->table_options @@ -2681,12 +2777,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks( } BlockHandle handle = index_iter->value(); BlockContents contents; - Slice dummy_comp_dict; BlockFetcher block_fetcher( rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, ReadOptions(), handle, &contents, rep_->ioptions, false /* decompress */, false /*maybe_compressed*/, - dummy_comp_dict /*compression dict*/, rep_->persistent_cache_options); + UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2707,12 +2802,11 @@ Status BlockBasedTable::VerifyChecksumInBlocks( Slice input = index_iter->value(); s = handle.DecodeFrom(&input); BlockContents contents; - Slice dummy_comp_dict; BlockFetcher block_fetcher( rep_->file.get(), nullptr /* prefetch buffer */, rep_->footer, ReadOptions(), handle, &contents, rep_->ioptions, false /* decompress */, false /*maybe_compressed*/, - dummy_comp_dict /*compression dict*/, rep_->persistent_cache_options); + UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { break; @@ -2740,11 +2834,24 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, Slice ckey; Status s; - s = GetDataBlockFromCache( - cache_key, ckey, block_cache, nullptr, rep_, options, &block, - rep_->compression_dict_block ? rep_->compression_dict_block->data - : Slice(), - 0 /* read_amp_bytes_per_bit */); + if (!rep_->compression_dict_handle.IsNull()) { + std::unique_ptr compression_dict_block; + s = ReadCompressionDictBlock(rep_, nullptr /* prefetch_buffer */, + &compression_dict_block); + if (s.ok()) { + assert(compression_dict_block != nullptr); + UncompressionDict uncompression_dict( + compression_dict_block->data.ToString(), + rep_->blocks_definitely_zstd_compressed); + s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, rep_, + options, &block, uncompression_dict, + 0 /* read_amp_bytes_per_bit */); + } + } else { + s = GetDataBlockFromCache( + cache_key, ckey, block_cache, nullptr, rep_, options, &block, + UncompressionDict::GetEmptyDict(), 0 /* read_amp_bytes_per_bit */); + } assert(s.ok()); bool in_cache = block.value != nullptr; if (in_cache) { @@ -3014,12 +3121,11 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file, BlockHandle handle; if (FindMetaBlock(meta_iter.get(), filter_block_key, &handle).ok()) { BlockContents block; - Slice dummy_comp_dict; BlockFetcher block_fetcher( rep_->file.get(), nullptr /* prefetch_buffer */, rep_->footer, ReadOptions(), handle, &block, rep_->ioptions, false /*decompress*/, false /*maybe_compressed*/, - dummy_comp_dict /*compression dict*/, + UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { @@ -3048,8 +3154,15 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file, } // Output compression dictionary - if (rep_->compression_dict_block != nullptr) { - auto compression_dict = rep_->compression_dict_block->data; + if (!rep_->compression_dict_handle.IsNull()) { + std::unique_ptr compression_dict_block; + s = ReadCompressionDictBlock(rep_, nullptr /* prefetch_buffer */, + &compression_dict_block); + if (!s.ok()) { + return s; + } + assert(compression_dict_block != nullptr); + auto compression_dict = compression_dict_block->data; out_file->Append( "Compression Dictionary:\n" "--------------------------------------\n"); @@ -3292,6 +3405,13 @@ void DeleteCachedIndexEntry(const Slice& /*key*/, void* value) { delete index_reader; } +void DeleteCachedUncompressionDictEntry(const Slice& /*key*/, void* value) { + UncompressionDict* dict = reinterpret_cast(value); + RecordTick(dict->statistics(), BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT, + dict->ApproximateMemoryUsage()); + delete dict; +} + } // anonymous namespace } // namespace rocksdb diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 9f1909bd9..510ca3163 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -259,7 +259,7 @@ class BlockBasedTable : public TableReader { // block. static Status MaybeReadBlockAndLoadToCache( FilePrefetchBuffer* prefetch_buffer, Rep* rep, const ReadOptions& ro, - const BlockHandle& handle, Slice compression_dict, + const BlockHandle& handle, const UncompressionDict& uncompression_dict, CachableEntry* block_entry, bool is_index = false, GetContext* get_context = nullptr); @@ -275,6 +275,10 @@ class BlockBasedTable : public TableReader { const bool is_a_filter_partition, bool no_io, GetContext* get_context, const SliceTransform* prefix_extractor = nullptr) const; + static CachableEntry GetUncompressionDict( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, bool no_io, + GetContext* get_context); + // Get the iterator from the index reader. // If input_iter is not set, return new Iterator // If input_iter is set, update it and return it as Iterator @@ -295,15 +299,16 @@ class BlockBasedTable : public TableReader { // block_cache_compressed. // On success, Status::OK with be returned and @block will be populated with // pointer to the block as well as its block handle. - // @param compression_dict Data for presetting the compression library's + // @param uncompression_dict Data for presetting the compression library's // dictionary. static Status GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, Rep* rep, const ReadOptions& read_options, BlockBasedTable::CachableEntry* block, - const Slice& compression_dict, size_t read_amp_bytes_per_bit, - bool is_index = false, GetContext* get_context = nullptr); + const UncompressionDict& uncompression_dict, + size_t read_amp_bytes_per_bit, bool is_index = false, + GetContext* get_context = nullptr); // Put a raw block (maybe compressed) to the corresponding block caches. // This method will perform decompression against raw_block if needed and then @@ -313,7 +318,7 @@ class BlockBasedTable : public TableReader { // // Allocated memory managed by raw_block_contents will be transferred to // PutDataBlockToCache(). After the call, the object will be invalid. - // @param compression_dict Data for presetting the compression library's + // @param uncompression_dict Data for presetting the compression library's // dictionary. static Status PutDataBlockToCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, @@ -321,7 +326,7 @@ class BlockBasedTable : public TableReader { const ReadOptions& read_options, const ImmutableCFOptions& ioptions, CachableEntry* block, BlockContents* raw_block_contents, CompressionType raw_block_comp_type, uint32_t format_version, - const Slice& compression_dict, SequenceNumber seq_no, + const UncompressionDict& uncompression_dict, SequenceNumber seq_no, size_t read_amp_bytes_per_bit, MemoryAllocator* memory_allocator, bool is_index = false, Cache::Priority pri = Cache::Priority::LOW, GetContext* get_context = nullptr); @@ -367,9 +372,9 @@ class BlockBasedTable : public TableReader { Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, const InternalKeyComparator& internal_comparator); - static Status ReadCompressionDictBlock(Rep* rep, - FilePrefetchBuffer* prefetch_buffer, - InternalIterator* meta_iter); + static Status ReadCompressionDictBlock( + Rep* rep, FilePrefetchBuffer* prefetch_buffer, + std::unique_ptr* compression_dict_block); static Status PrefetchIndexAndFilterBlocks( Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, BlockBasedTable* new_table, @@ -488,11 +493,15 @@ struct BlockBasedTable::Rep { // Footer contains the fixed table information Footer footer; - // index_reader and filter will be populated and used only when - // options.block_cache is nullptr; otherwise we will get the index block via - // the block cache. + // `index_reader`, `filter`, and `uncompression_dict` will be populated (i.e., + // non-nullptr) and used only when options.block_cache is nullptr or when + // `cache_index_and_filter_blocks == false`. Otherwise, we will get the index, + // filter, and compression dictionary blocks via the block cache. In that case + // `dummy_index_reader_offset`, `filter_handle`, and `compression_dict_handle` + // are used to lookup these meta-blocks in block cache. std::unique_ptr index_reader; std::unique_ptr filter; + std::unique_ptr uncompression_dict; enum class FilterType { kNoFilter, @@ -502,13 +511,9 @@ struct BlockBasedTable::Rep { }; FilterType filter_type; BlockHandle filter_handle; + BlockHandle compression_dict_handle; std::shared_ptr table_properties; - // Block containing the data for the compression dictionary. We take ownership - // for the entire block struct, even though we only use its Slice member. This - // is easier because the Slice member depends on the continued existence of - // another member ("allocation"). - std::unique_ptr compression_dict_block; BlockBasedTableOptions::IndexType index_type; bool hash_index_allow_collision; bool whole_key_filtering; @@ -545,6 +550,12 @@ struct BlockBasedTable::Rep { // before reading individual blocks enables certain optimizations. bool blocks_maybe_compressed = true; + // If true, data blocks in this file are definitely ZSTD compressed. If false + // they might not be. When false we skip creating a ZSTD digested + // uncompression dictionary. Even if we get a false negative, things should + // still work, just not as quickly. + bool blocks_definitely_zstd_compressed = false; + bool closed = false; const bool immortal_table; diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 5a0da1157..975736737 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -257,8 +257,7 @@ Status BlockFetcher::ReadBlockContents() { if (do_uncompress_ && compression_type_ != kNoCompression) { // compressed page, uncompress, update cache UncompressionContext context(compression_type_); - UncompressionDict dict(compression_dict_, compression_type_); - UncompressionInfo info(context, dict, compression_type_); + UncompressionInfo info(context, uncompression_dict_, compression_type_); status_ = UncompressBlockContents(info, slice_.data(), block_size_, contents_, footer_.version(), ioptions_, memory_allocator_); diff --git a/table/block_fetcher.h b/table/block_fetcher.h index aed73a392..b5fee9415 100644 --- a/table/block_fetcher.h +++ b/table/block_fetcher.h @@ -19,14 +19,14 @@ class BlockFetcher { // The only relevant option is options.verify_checksums for now. // On failure return non-OK. // On success fill *result and return OK - caller owns *result - // @param compression_dict Data for presetting the compression library's + // @param uncompression_dict Data for presetting the compression library's // dictionary. BlockFetcher(RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ReadOptions& read_options, const BlockHandle& handle, BlockContents* contents, const ImmutableCFOptions& ioptions, bool do_uncompress, bool maybe_compressed, - const Slice& compression_dict, + const UncompressionDict& uncompression_dict, const PersistentCacheOptions& cache_options, MemoryAllocator* memory_allocator = nullptr, MemoryAllocator* memory_allocator_compressed = nullptr) @@ -39,7 +39,7 @@ class BlockFetcher { ioptions_(ioptions), do_uncompress_(do_uncompress), maybe_compressed_(maybe_compressed), - compression_dict_(compression_dict), + uncompression_dict_(uncompression_dict), cache_options_(cache_options), memory_allocator_(memory_allocator), memory_allocator_compressed_(memory_allocator_compressed) {} @@ -58,7 +58,7 @@ class BlockFetcher { const ImmutableCFOptions& ioptions_; bool do_uncompress_; bool maybe_compressed_; - const Slice& compression_dict_; + const UncompressionDict& uncompression_dict_; const PersistentCacheOptions& cache_options_; MemoryAllocator* memory_allocator_; MemoryAllocator* memory_allocator_compressed_; diff --git a/table/get_context.cc b/table/get_context.cc index 6f0bd2ebb..e1d75fe4e 100644 --- a/table/get_context.cc +++ b/table/get_context.cc @@ -107,6 +107,10 @@ void GetContext::ReportCounters() { RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT, get_context_stats_.num_cache_filter_hit); } + if (get_context_stats_.num_cache_compression_dict_hit > 0) { + RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT, + get_context_stats_.num_cache_compression_dict_hit); + } if (get_context_stats_.num_cache_index_miss > 0) { RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS, get_context_stats_.num_cache_index_miss); @@ -119,6 +123,10 @@ void GetContext::ReportCounters() { RecordTick(statistics_, BLOCK_CACHE_DATA_MISS, get_context_stats_.num_cache_data_miss); } + if (get_context_stats_.num_cache_compression_dict_miss > 0) { + RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS, + get_context_stats_.num_cache_compression_dict_miss); + } if (get_context_stats_.num_cache_bytes_read > 0) { RecordTick(statistics_, BLOCK_CACHE_BYTES_READ, get_context_stats_.num_cache_bytes_read); @@ -158,6 +166,14 @@ void GetContext::ReportCounters() { RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT, get_context_stats_.num_cache_filter_bytes_insert); } + if (get_context_stats_.num_cache_compression_dict_add > 0) { + RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD, + get_context_stats_.num_cache_compression_dict_add); + } + if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) { + RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, + get_context_stats_.num_cache_compression_dict_bytes_insert); + } } bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, diff --git a/table/get_context.h b/table/get_context.h index 407473808..d7d0e9808 100644 --- a/table/get_context.h +++ b/table/get_context.h @@ -21,9 +21,11 @@ struct GetContextStats { uint64_t num_cache_index_hit = 0; uint64_t num_cache_data_hit = 0; uint64_t num_cache_filter_hit = 0; + uint64_t num_cache_compression_dict_hit = 0; uint64_t num_cache_index_miss = 0; uint64_t num_cache_filter_miss = 0; uint64_t num_cache_data_miss = 0; + uint64_t num_cache_compression_dict_miss = 0; uint64_t num_cache_bytes_read = 0; uint64_t num_cache_miss = 0; uint64_t num_cache_add = 0; @@ -34,6 +36,8 @@ struct GetContextStats { uint64_t num_cache_data_bytes_insert = 0; uint64_t num_cache_filter_add = 0; uint64_t num_cache_filter_bytes_insert = 0; + uint64_t num_cache_compression_dict_add = 0; + uint64_t num_cache_compression_dict_bytes_insert = 0; }; class GetContext { diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index fdf8a5612..744091fbc 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -189,13 +189,12 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, ReadOptions read_options; read_options.verify_checksums = false; Status s; - Slice compression_dict; PersistentCacheOptions cache_options; - BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options, - handle, &block_contents, ioptions, - false /* decompress */, false /*maybe_compressed*/, - compression_dict, cache_options, memory_allocator); + BlockFetcher block_fetcher( + file, prefetch_buffer, footer, read_options, handle, &block_contents, + ioptions, false /* decompress */, false /*maybe_compressed*/, + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); s = block_fetcher.ReadBlockContents(); // property block is never compressed. Need to add uncompress logic if we are // to compress it.. @@ -332,14 +331,13 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, BlockContents metaindex_contents; ReadOptions read_options; read_options.verify_checksums = false; - Slice compression_dict; PersistentCacheOptions cache_options; - BlockFetcher block_fetcher(file, nullptr /* prefetch_buffer */, footer, - read_options, metaindex_handle, - &metaindex_contents, ioptions, - false /* decompress */, false /*maybe_compressed*/, - compression_dict, cache_options, memory_allocator); + BlockFetcher block_fetcher( + file, nullptr /* prefetch_buffer */, footer, read_options, + metaindex_handle, &metaindex_contents, ioptions, false /* decompress */, + false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), + cache_options, memory_allocator); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { return s; @@ -402,13 +400,12 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, BlockContents metaindex_contents; ReadOptions read_options; read_options.verify_checksums = false; - Slice compression_dict; PersistentCacheOptions cache_options; BlockFetcher block_fetcher( file, nullptr /* prefetch_buffer */, footer, read_options, metaindex_handle, &metaindex_contents, ioptions, false /* do decompression */, false /*maybe_compressed*/, - compression_dict, cache_options, memory_allocator); + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); s = block_fetcher.ReadBlockContents(); if (!s.ok()) { return s; @@ -445,13 +442,13 @@ Status ReadMetaBlock(RandomAccessFileReader* file, BlockContents metaindex_contents; ReadOptions read_options; read_options.verify_checksums = false; - Slice compression_dict; PersistentCacheOptions cache_options; BlockFetcher block_fetcher(file, prefetch_buffer, footer, read_options, metaindex_handle, &metaindex_contents, ioptions, false /* decompress */, false /*maybe_compressed*/, - compression_dict, cache_options, memory_allocator); + UncompressionDict::GetEmptyDict(), cache_options, + memory_allocator); status = block_fetcher.ReadBlockContents(); if (!status.ok()) { return status; @@ -478,7 +475,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, BlockFetcher block_fetcher2( file, prefetch_buffer, footer, read_options, block_handle, contents, ioptions, false /* decompress */, false /*maybe_compressed*/, - compression_dict, cache_options, memory_allocator); + UncompressionDict::GetEmptyDict(), cache_options, memory_allocator); return block_fetcher2.ReadBlockContents(); } diff --git a/table/table_test.cc b/table/table_test.cc index 5ec613bec..e946556c7 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3576,13 +3576,13 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) { BlockContents* contents) { ReadOptions read_options; read_options.verify_checksums = false; - Slice compression_dict; PersistentCacheOptions cache_options; BlockFetcher block_fetcher( file, nullptr /* prefetch_buffer */, footer, read_options, handle, contents, ioptions, false /* decompress */, - false /*maybe_compressed*/, compression_dict, cache_options); + false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), + cache_options); ASSERT_OK(block_fetcher.ReadBlockContents()); }; @@ -3663,13 +3663,12 @@ TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) { // read metaindex auto metaindex_handle = footer.metaindex_handle(); BlockContents metaindex_contents; - Slice compression_dict; PersistentCacheOptions pcache_opts; BlockFetcher block_fetcher( table_reader.get(), nullptr /* prefetch_buffer */, footer, ReadOptions(), metaindex_handle, &metaindex_contents, ioptions, false /* decompress */, - false /*maybe_compressed*/, compression_dict, pcache_opts, - nullptr /*memory_allocator*/); + false /*maybe_compressed*/, UncompressionDict::GetEmptyDict(), + pcache_opts, nullptr /*memory_allocator*/); ASSERT_OK(block_fetcher.ReadBlockContents()); Block metaindex_block(std::move(metaindex_contents), kDisableGlobalSequenceNumber); diff --git a/util/compression.h b/util/compression.h index 1b73fff76..3076b6260 100644 --- a/util/compression.h +++ b/util/compression.h @@ -11,6 +11,13 @@ #include #include +#ifdef ROCKSDB_MALLOC_USABLE_SIZE +#ifdef OS_FREEBSD +#include +#else // OS_FREEBSD +#include +#endif // OS_FREEBSD +#endif // ROCKSDB_MALLOC_USABLE_SIZE #include #include "rocksdb/options.h" @@ -54,6 +61,16 @@ ZSTD_customMem GetJeZstdAllocationOverrides(); #endif // defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && // defined(ZSTD_STATIC_LINKING_ONLY) +// We require `ZSTD_sizeof_DDict` and `ZSTD_createDDict_byReference` to use +// `ZSTD_DDict`. The former was introduced in v1.0.0 and the latter was +// introduced in v1.1.3. But an important bug fix for `ZSTD_sizeof_DDict` came +// in v1.1.4, so that is the version we require. As of today's latest version +// (v1.3.8), they are both still in the experimental API, which means they are +// only exported when the compiler flag `ZSTD_STATIC_LINKING_ONLY` is set. +#if defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104 +#define ROCKSDB_ZSTD_DDICT +#endif // defined(ZSTD_STATIC_LINKING_ONLY) && ZSTD_VERSION_NUMBER >= 10104 + // Cached data represents a portion that can be re-used // If, in the future we have more than one native context to // cache we can arrange this as a tuple @@ -201,42 +218,51 @@ struct CompressionDict { // Holds dictionary and related data, like ZSTD's digested uncompression // dictionary. struct UncompressionDict { -#if ZSTD_VERSION_NUMBER >= 700 +#ifdef ROCKSDB_ZSTD_DDICT ZSTD_DDict* zstd_ddict_; -#endif // ZSTD_VERSION_NUMBER >= 700 - Slice dict_; - -#if ZSTD_VERSION_NUMBER >= 700 - UncompressionDict(Slice dict, CompressionType type) { -#else // ZSTD_VERSION_NUMBER >= 700 - UncompressionDict(Slice dict, CompressionType /*type*/) { -#endif // ZSTD_VERSION_NUMBER >= 700 +#endif // ROCKSDB_ZSTD_DDICT + // Block containing the data for the compression dictionary. It may be + // redundant with the data held in `zstd_ddict_`. + std::string dict_; + // This `Statistics` pointer is intended to be used upon block cache eviction, + // so only needs to be populated on `UncompressionDict`s that'll be inserted + // into block cache. + Statistics* statistics_; + +#ifdef ROCKSDB_ZSTD_DDICT + UncompressionDict(std::string dict, bool using_zstd, + Statistics* _statistics = nullptr) { +#else // ROCKSDB_ZSTD_DDICT + UncompressionDict(std::string dict, bool /*using_zstd*/, + Statistics* _statistics = nullptr) { +#endif // ROCKSDB_ZSTD_DDICT dict_ = std::move(dict); -#if ZSTD_VERSION_NUMBER >= 700 + statistics_ = _statistics; +#ifdef ROCKSDB_ZSTD_DDICT zstd_ddict_ = nullptr; - if (!dict_.empty() && (type == kZSTD || type == kZSTDNotFinalCompression)) { - zstd_ddict_ = ZSTD_createDDict(dict_.data(), dict_.size()); + if (!dict_.empty() && using_zstd) { + zstd_ddict_ = ZSTD_createDDict_byReference(dict_.data(), dict_.size()); assert(zstd_ddict_ != nullptr); } -#endif // ZSTD_VERSION_NUMBER >= 700 +#endif // ROCKSDB_ZSTD_DDICT } ~UncompressionDict() { -#if ZSTD_VERSION_NUMBER >= 700 +#ifdef ROCKSDB_ZSTD_DDICT size_t res = 0; if (zstd_ddict_ != nullptr) { res = ZSTD_freeDDict(zstd_ddict_); } assert(res == 0); // Last I checked they can't fail (void)res; // prevent unused var warning -#endif // ZSTD_VERSION_NUMBER >= 700 +#endif // ROCKSDB_ZSTD_DDICT } -#if ZSTD_VERSION_NUMBER >= 700 +#ifdef ROCKSDB_ZSTD_DDICT const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; } -#endif // ZSTD_VERSION_NUMBER >= 700 +#endif // ROCKSDB_ZSTD_DDICT Slice GetRawDict() const { return dict_; } @@ -245,6 +271,18 @@ struct UncompressionDict { return empty_dict; } + Statistics* statistics() const { return statistics_; } + + size_t ApproximateMemoryUsage() { + size_t usage = 0; + usage += sizeof(struct UncompressionDict); +#ifdef ROCKSDB_ZSTD_DDICT + usage += ZSTD_sizeof_DDict(zstd_ddict_); +#endif // ROCKSDB_ZSTD_DDICT + usage += dict_.size(); + return usage; + } + UncompressionDict() = default; // Disable copy/move UncompressionDict(const CompressionDict&) = delete; @@ -255,11 +293,10 @@ struct UncompressionDict { class CompressionContext { private: - const CompressionType type_; #if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500) ZSTD_CCtx* zstd_ctx_ = nullptr; - void CreateNativeContext() { - if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) { + void CreateNativeContext(CompressionType type) { + if (type == kZSTD || type == kZSTDNotFinalCompression) { #ifdef ROCKSDB_ZSTD_CUSTOM_MEM zstd_ctx_ = ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides()); @@ -277,19 +314,18 @@ class CompressionContext { public: // callable inside ZSTD_Compress ZSTD_CCtx* ZSTDPreallocCtx() const { - assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression); + assert(zstd_ctx_ != nullptr); return zstd_ctx_; } #else // ZSTD && (ZSTD_VERSION_NUMBER >= 500) private: - void CreateNativeContext() {} + void CreateNativeContext(CompressionType /* type */) {} void DestroyNativeContext() {} #endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500) public: - explicit CompressionContext(CompressionType comp_type) : type_(comp_type) { - (void)type_; - CreateNativeContext(); + explicit CompressionContext(CompressionType type) { + CreateNativeContext(type); } ~CompressionContext() { DestroyNativeContext(); } CompressionContext(const CompressionContext&) = delete; @@ -316,24 +352,22 @@ class CompressionInfo { class UncompressionContext { private: - const CompressionType type_; CompressionContextCache* ctx_cache_ = nullptr; ZSTDUncompressCachedData uncomp_cached_data_; public: struct NoCache {}; // Do not use context cache, used by TableBuilder - UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {} + UncompressionContext(NoCache, CompressionType /* type */) {} - explicit UncompressionContext(CompressionType comp_type) : type_(comp_type) { - if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) { + explicit UncompressionContext(CompressionType type) { + if (type == kZSTD || type == kZSTDNotFinalCompression) { ctx_cache_ = CompressionContextCache::Instance(); uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData(); } } ~UncompressionContext() { - if ((type_ == kZSTD || type_ == kZSTDNotFinalCompression) && - uncomp_cached_data_.GetCacheIndex() != -1) { + if (uncomp_cached_data_.GetCacheIndex() != -1) { assert(ctx_cache_ != nullptr); ctx_cache_->ReturnCachedZSTDUncompressData( uncomp_cached_data_.GetCacheIndex()); @@ -1191,13 +1225,13 @@ inline CacheAllocationPtr ZSTD_Uncompress( #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ ZSTD_DCtx* context = info.context().GetZSTDContext(); assert(context != nullptr); -#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+ +#ifdef ROCKSDB_ZSTD_DDICT if (info.dict().GetDigestedZstdDDict() != nullptr) { actual_output_length = ZSTD_decompress_usingDDict( context, output.get(), output_len, input_data, input_length, info.dict().GetDigestedZstdDDict()); } -#endif // ZSTD_VERSION_NUMBER >= 700 +#endif // ROCKSDB_ZSTD_DDICT if (actual_output_length == 0) { actual_output_length = ZSTD_decompress_usingDict( context, output.get(), output_len, input_data, input_length,