Move the uncompression dictionary object out of the block cache (#5584)

Summary:
RocksDB has historically stored uncompression dictionary objects in the block
cache as opposed to storing just the block contents. This neccesitated
evicting the object upon table close. With the new code, only the raw blocks
are stored in the cache, eliminating the need for eviction.

In addition, the patch makes the following improvements:

1) Compression dictionary blocks are now prefetched/pinned similarly to
index/filter blocks.
2) A copy operation got eliminated when the uncompression dictionary is
retrieved.
3) Errors related to retrieving the uncompression dictionary are propagated as
opposed to silently ignored.

Note: the patch temporarily breaks the compression dictionary evicition stats.
They will be fixed in a separate phase.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5584

Test Plan: make asan_check

Differential Revision: D16344151

Pulled By: ltamasi

fbshipit-source-id: 2962b295f5b19628f9da88a3fcebbce5a5017a7b
main
Levi Tamasi 6 years ago committed by Facebook Github Bot
parent 6b7fcc0d5f
commit 092f417037
  1. 1
      CMakeLists.txt
  2. 7
      HISTORY.md
  3. 1
      TARGETS
  4. 67
      db/db_block_cache_test.cc
  5. 6
      db/version_set.cc
  6. 5
      include/rocksdb/cache.h
  7. 1
      src.mk
  8. 7
      table/block_based/block_based_filter_block_test.cc
  9. 249
      table/block_based/block_based_table_reader.cc
  10. 16
      table/block_based/block_based_table_reader.h
  11. 7
      table/block_based/full_filter_block_test.cc
  12. 2
      table/block_based/partitioned_filter_block.cc
  13. 4
      table/block_based/partitioned_filter_block_test.cc
  14. 138
      table/block_based/uncompression_dict_reader.cc
  15. 64
      table/block_based/uncompression_dict_reader.h
  16. 2
      table/table_reader.h
  17. 170
      table/table_test.cc
  18. 93
      util/compression.h

@ -601,6 +601,7 @@ set(SOURCES
table/block_based/full_filter_block.cc
table/block_based/index_builder.cc
table/block_based/partitioned_filter_block.cc
table/block_based/uncompression_dict_reader.cc
table/block_fetcher.cc
table/bloom_block.cc
table/cuckoo/cuckoo_table_builder.cc

@ -6,9 +6,10 @@
### Public API Change
* Now DB::Close() will return Aborted() error when there is unreleased snapshot. Users can retry after all snapshots are released.
* Index and filter blocks are now handled similarly to data blocks with regards to the block cache: instead of storing reader objects in the cache, only the blocks themselves are cached. In addition, index and filter blocks (as well as filter partitions) no longer get evicted from the cache when a table is closed. Moreover, index blocks can now use the compressed block cache (if any).
* Index, filter, and compression dictionary blocks are now handled similarly to data blocks with regards to the block cache: instead of storing objects in the cache, only the blocks themselves are cached. In addition, index, filter, and compression dictionary blocks (as well as filter partitions) no longer get evicted from the cache when a table is closed. Moreover, index blocks can now use the compressed block cache (if any), and cached index blocks can be shared among multiple table readers.
* Partitions of partitioned indexes no longer affect the read amplification statistics.
* Due to the above refactoring, block cache eviction statistics for indexes and filters are temporarily broken. We plan to reintroduce them in a later phase.
* Due to the above refactoring, block cache eviction statistics for indexes, filters, and compression dictionaries are temporarily broken. We plan to reintroduce them in a later phase.
* Errors related to the retrieval of the compression dictionary are now propagated to the user.
* options.keep_log_file_num will be enforced strictly all the time. File names of all log files will be tracked, which may take significantly amount of memory if options.keep_log_file_num is large and either of options.max_log_file_size or options.log_file_time_to_roll is set.
* Add initial support for Get/Put with user timestamps. Users can specify timestamps via ReadOptions and WriteOptions when calling DB::Get and DB::Put.
* Accessing a partition of a partitioned filter or index through a pinned reference is no longer considered a cache hit.
@ -26,6 +27,7 @@
* Allow DBImplSecondary to remove memtables with obsolete data after replaying MANIFEST and WAL.
* Add an option `failed_move_fall_back_to_copy` (default is true) for external SST ingestion. When `move_files` is true and hard link fails, ingestion falls back to copy if `failed_move_fall_back_to_copy` is true. Otherwise, ingestion reports an error.
* Add argument `--secondary_path` to ldb to open the database as the secondary instance. This would keep the original DB intact.
* Compression dictionary blocks are now prefetched and pinned in the cache (based on the customer's settings) the same way as index and filter blocks.
### Performance Improvements
* Reduce binary search when iterator reseek into the same data block.
@ -35,6 +37,7 @@
* Log Writer will flush after finishing the whole record, rather than a fragment.
* Lower MultiGet batching API latency by reading data blocks from disk in parallel
* Improve performance of row_cache: make reads with newer snapshots than data in an SST file share the same cache key, except in some transaction cases.
* The compression dictionary is no longer copied to a new object upon retrieval.
### General Improvements
* Added new status code kColumnFamilyDropped to distinguish between Column Family Dropped and DB Shutdown in progress.

@ -198,6 +198,7 @@ cpp_library(
"table/block_based/full_filter_block.cc",
"table/block_based/index_builder.cc",
"table/block_based/partitioned_filter_block.cc",
"table/block_based/uncompression_dict_reader.cc",
"table/block_fetcher.cc",
"table/bloom_block.cc",
"table/cuckoo/cuckoo_table_builder.cc",

@ -19,6 +19,9 @@ class DBBlockCacheTest : public DBTestBase {
size_t hit_count_ = 0;
size_t insert_count_ = 0;
size_t failure_count_ = 0;
size_t compression_dict_miss_count_ = 0;
size_t compression_dict_hit_count_ = 0;
size_t compression_dict_insert_count_ = 0;
size_t compressed_miss_count_ = 0;
size_t compressed_hit_count_ = 0;
size_t compressed_insert_count_ = 0;
@ -69,6 +72,15 @@ class DBBlockCacheTest : public DBTestBase {
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSED_ADD_FAILURES);
}
void RecordCacheCountersForCompressionDict(const Options& options) {
compression_dict_miss_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
compression_dict_hit_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_HIT);
compression_dict_insert_count_ =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD);
}
void CheckCacheCounters(const Options& options, size_t expected_misses,
size_t expected_hits, size_t expected_inserts,
size_t expected_failures) {
@ -87,6 +99,28 @@ class DBBlockCacheTest : public DBTestBase {
failure_count_ = new_failure_count;
}
void CheckCacheCountersForCompressionDict(
const Options& options, size_t expected_compression_dict_misses,
size_t expected_compression_dict_hits,
size_t expected_compression_dict_inserts) {
size_t new_compression_dict_miss_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
size_t new_compression_dict_hit_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_HIT);
size_t new_compression_dict_insert_count =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD);
ASSERT_EQ(compression_dict_miss_count_ + expected_compression_dict_misses,
new_compression_dict_miss_count);
ASSERT_EQ(compression_dict_hit_count_ + expected_compression_dict_hits,
new_compression_dict_hit_count);
ASSERT_EQ(
compression_dict_insert_count_ + expected_compression_dict_inserts,
new_compression_dict_insert_count);
compression_dict_miss_count_ = new_compression_dict_miss_count;
compression_dict_hit_count_ = new_compression_dict_hit_count;
compression_dict_insert_count_ = new_compression_dict_insert_count;
}
void CheckCompressedCacheCounters(const Options& options,
size_t expected_misses,
size_t expected_hits,
@ -671,6 +705,8 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) {
options.table_factory.reset(new BlockBasedTableFactory(table_options));
DestroyAndReopen(options);
RecordCacheCountersForCompressionDict(options);
for (int i = 0; i < kNumFiles; ++i) {
ASSERT_EQ(i, NumTableFilesAtLevel(0, 0));
for (int j = 0; j < kNumEntriesPerFile; ++j) {
@ -683,27 +719,26 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) {
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(1));
// Compression dictionary blocks are preloaded.
CheckCacheCountersForCompressionDict(
options, kNumFiles /* expected_compression_dict_misses */,
0 /* expected_compression_dict_hits */,
kNumFiles /* expected_compression_dict_inserts */);
// Seek to a key in a file. It should cause the SST's dictionary meta-block
// to be read.
RecordCacheCounters(options);
ASSERT_EQ(0,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
ASSERT_EQ(0, TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD));
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
RecordCacheCountersForCompressionDict(options);
ReadOptions read_options;
ASSERT_NE("NOT_FOUND", Get(Key(kNumFiles * kNumEntriesPerFile - 1)));
// Two blocks missed/added: dictionary and data block
// One block hit: index since it's prefetched
CheckCacheCounters(options, 2 /* expected_misses */, 1 /* expected_hits */,
2 /* expected_inserts */, 0 /* expected_failures */);
ASSERT_EQ(1,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
ASSERT_EQ(1, TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_ADD));
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
// Two block hits: index and dictionary since they are prefetched
// One block missed/added: data block
CheckCacheCounters(options, 1 /* expected_misses */, 2 /* expected_hits */,
1 /* expected_inserts */, 0 /* expected_failures */);
CheckCacheCountersForCompressionDict(
options, 0 /* expected_compression_dict_misses */,
1 /* expected_compression_dict_hits */,
0 /* expected_compression_dict_inserts */);
}
}

@ -3420,16 +3420,10 @@ VersionSet::VersionSet(const std::string& dbname,
env_options_(storage_options),
block_cache_tracer_(block_cache_tracer) {}
void CloseTables(void* ptr, size_t) {
TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
table_reader->Close();
}
VersionSet::~VersionSet() {
// we need to delete column_family_set_ because its destructor depends on
// VersionSet
Cache* table_cache = column_family_set_->get_table_cache();
table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
column_family_set_.reset();
for (auto& file : obsolete_files_) {
if (file.metadata->table_reader_handle) {

@ -250,11 +250,6 @@ class Cache {
virtual std::string GetPrintableOptions() const { return ""; }
// Mark the last inserted object as being a raw data block. This will be used
// in tests. The default implementation does nothing.
virtual void TEST_mark_as_data_block(const Slice& /*key*/,
size_t /*charge*/) {}
MemoryAllocator* memory_allocator() const { return memory_allocator_.get(); }
private:

@ -121,6 +121,7 @@ LIB_SOURCES = \
table/block_based/full_filter_block.cc \
table/block_based/index_builder.cc \
table/block_based/partitioned_filter_block.cc \
table/block_based/uncompression_dict_reader.cc \
table/block_fetcher.cc \
table/bloom_block.cc \
table/cuckoo/cuckoo_table_builder.cc \

@ -45,10 +45,7 @@ class TestHashFilter : public FilterPolicy {
class MockBlockBasedTable : public BlockBasedTable {
public:
explicit MockBlockBasedTable(Rep* rep)
: BlockBasedTable(rep, nullptr /* block_cache_tracer */) {
// Initialize what Open normally does as much as necessary for the test
rep->cache_key_prefix_size = 10;
}
: BlockBasedTable(rep, nullptr /* block_cache_tracer */) {}
};
class FilterBlockTest : public testing::Test {
@ -64,7 +61,6 @@ class FilterBlockTest : public testing::Test {
: ioptions_(options_),
env_options_(options_),
icomp_(options_.comparator) {
table_options_.no_block_cache = true;
table_options_.filter_policy.reset(new TestHashFilter);
constexpr bool skip_filters = false;
@ -271,7 +267,6 @@ class BlockBasedFilterBlockTest : public testing::Test {
: ioptions_(options_),
env_options_(options_),
icomp_(options_.comparator) {
table_options_.no_block_cache = true;
table_options_.filter_policy.reset(NewBloomFilterPolicy(10));
constexpr bool skip_filters = false;

@ -63,7 +63,6 @@ extern const std::string kHashIndexPrefixesMetadataBlock;
typedef BlockBasedTable::IndexReader IndexReader;
BlockBasedTable::~BlockBasedTable() {
Close();
delete rep_;
}
@ -148,8 +147,6 @@ void DeleteCachedEntry(const Slice& /*key*/, void* value) {
delete entry;
}
void DeleteCachedUncompressionDictEntry(const Slice& key, void* value);
// Release the cached entry and decrement its ref count.
void ForceReleaseCachedEntry(void* arg, void* h) {
Cache* cache = reinterpret_cast<Cache*>(arg);
@ -1419,37 +1416,6 @@ Status BlockBasedTable::ReadRangeDelBlock(
return s;
}
Status BlockBasedTable::ReadCompressionDictBlock(
FilePrefetchBuffer* prefetch_buffer,
std::unique_ptr<const BlockContents>* compression_dict_block) const {
assert(compression_dict_block != nullptr);
Status s;
if (!rep_->compression_dict_handle.IsNull()) {
std::unique_ptr<BlockContents> compression_dict_cont{new BlockContents()};
PersistentCacheOptions cache_options;
ReadOptions read_options;
read_options.verify_checksums = true;
BlockFetcher compression_block_fetcher(
rep_->file.get(), prefetch_buffer, rep_->footer, read_options,
rep_->compression_dict_handle, compression_dict_cont.get(),
rep_->ioptions, false /* decompress */, false /*maybe_compressed*/,
BlockType::kCompressionDictionary, UncompressionDict::GetEmptyDict(),
cache_options);
s = compression_block_fetcher.ReadBlockContents();
if (!s.ok()) {
ROCKS_LOG_WARN(
rep_->ioptions.info_log,
"Encountered error while reading data from compression dictionary "
"block %s",
s.ToString().c_str());
} else {
*compression_dict_block = std::move(compression_dict_cont);
}
}
return s;
}
Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
BlockBasedTable* new_table, bool prefetch_all,
@ -1555,23 +1521,16 @@ Status BlockBasedTable::PrefetchIndexAndFilterBlocks(
}
}
// TODO(ajkr): also prefetch compression dictionary block
// TODO(ajkr): also pin compression dictionary block when
// `pin_l0_filter_and_index_blocks_in_cache == true`.
if (!table_options.cache_index_and_filter_blocks) {
std::unique_ptr<const BlockContents> compression_dict_block;
s = ReadCompressionDictBlock(prefetch_buffer, &compression_dict_block);
if (!rep_->compression_dict_handle.IsNull()) {
std::unique_ptr<UncompressionDictReader> uncompression_dict_reader;
s = UncompressionDictReader::Create(this, prefetch_buffer, use_cache,
prefetch_all, pin_all, lookup_context,
&uncompression_dict_reader);
if (!s.ok()) {
return s;
}
if (!rep_->compression_dict_handle.IsNull()) {
assert(compression_dict_block != nullptr);
// TODO(ajkr): find a way to avoid the `compression_dict_block` data copy
rep_->uncompression_dict.reset(new UncompressionDict(
compression_dict_block->data.ToString(),
rep_->blocks_definitely_zstd_compressed, rep_->ioptions.statistics));
}
rep_->uncompression_dict_reader = std::move(uncompression_dict_reader);
}
assert(s.ok());
@ -1609,8 +1568,8 @@ size_t BlockBasedTable::ApproximateMemoryUsage() const {
if (rep_->index_reader) {
usage += rep_->index_reader->ApproximateMemoryUsage();
}
if (rep_->uncompression_dict) {
usage += rep_->uncompression_dict->ApproximateMemoryUsage();
if (rep_->uncompression_dict_reader) {
usage += rep_->uncompression_dict_reader->ApproximateMemoryUsage();
}
return usage;
}
@ -1757,9 +1716,6 @@ Status BlockBasedTable::GetDataBlockFromCache(
Cache::Handle* cache_handle = nullptr;
s = block_cache->Insert(block_cache_key, block_holder.get(), charge,
&DeleteCachedEntry<TBlocklike>, &cache_handle);
#ifndef NDEBUG
block_cache->TEST_mark_as_data_block(block_cache_key, charge);
#endif // NDEBUG
if (s.ok()) {
assert(cache_handle != nullptr);
block->SetCachedValue(block_holder.release(), block_cache,
@ -1863,9 +1819,6 @@ Status BlockBasedTable::PutDataBlockToCache(
s = block_cache->Insert(block_cache_key, block_holder.get(), charge,
&DeleteCachedEntry<TBlocklike>, &cache_handle,
priority);
#ifndef NDEBUG
block_cache->TEST_mark_as_data_block(block_cache_key, charge);
#endif // NDEBUG
if (s.ok()) {
assert(cache_handle != nullptr);
cached_block->SetCachedValue(block_holder.release(), block_cache,
@ -1914,86 +1867,6 @@ std::unique_ptr<FilterBlockReader> BlockBasedTable::CreateFilterBlockReader(
}
}
CachableEntry<UncompressionDict> BlockBasedTable::GetUncompressionDict(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context) const {
if (!rep_->table_options.cache_index_and_filter_blocks) {
// block cache is either disabled or not used for meta-blocks. In either
// case, BlockBasedTableReader is the owner of the uncompression dictionary.
return {rep_->uncompression_dict.get(), nullptr /* cache */,
nullptr /* cache_handle */, false /* own_value */};
}
if (rep_->compression_dict_handle.IsNull()) {
return CachableEntry<UncompressionDict>();
}
char cache_key_buf[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto cache_key =
GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
rep_->compression_dict_handle, cache_key_buf);
auto cache_handle =
GetEntryFromCache(rep_->table_options.block_cache.get(), cache_key,
BlockType::kCompressionDictionary, get_context);
UncompressionDict* dict = nullptr;
bool is_cache_hit = false;
size_t usage = 0;
if (cache_handle != nullptr) {
dict = reinterpret_cast<UncompressionDict*>(
rep_->table_options.block_cache->Value(cache_handle));
is_cache_hit = true;
usage = dict->ApproximateMemoryUsage();
} else if (no_io) {
// Do not invoke any io.
} else {
std::unique_ptr<const BlockContents> compression_dict_block;
Status s =
ReadCompressionDictBlock(prefetch_buffer, &compression_dict_block);
if (s.ok()) {
assert(compression_dict_block != nullptr);
// TODO(ajkr): find a way to avoid the `compression_dict_block` data copy
std::unique_ptr<UncompressionDict> uncompression_dict(
new UncompressionDict(compression_dict_block->data.ToString(),
rep_->blocks_definitely_zstd_compressed,
rep_->ioptions.statistics));
usage = uncompression_dict->ApproximateMemoryUsage();
s = rep_->table_options.block_cache->Insert(
cache_key, uncompression_dict.get(), usage,
&DeleteCachedUncompressionDictEntry, &cache_handle,
rep_->table_options.cache_index_and_filter_blocks_with_high_priority
? Cache::Priority::HIGH
: Cache::Priority::LOW);
if (s.ok()) {
UpdateCacheInsertionMetrics(BlockType::kCompressionDictionary,
get_context, usage);
dict = uncompression_dict.release();
} else {
RecordTick(rep_->ioptions.statistics, BLOCK_CACHE_ADD_FAILURES);
assert(dict == nullptr);
assert(cache_handle == nullptr);
}
}
}
if (block_cache_tracer_ && block_cache_tracer_->is_tracing_enabled() &&
lookup_context) {
// Avoid making copy of block_key and cf_name when constructing the access
// record.
BlockCacheTraceRecord access_record(
rep_->ioptions.env->NowMicros(),
/*block_key=*/"", TraceType::kBlockTraceUncompressionDictBlock,
/*block_size=*/usage, rep_->cf_id_for_tracing(),
/*cf_name=*/"", rep_->level_for_tracing(),
rep_->sst_number_for_tracing(), lookup_context->caller, is_cache_hit,
/*no_insert=*/no_io, lookup_context->get_id,
lookup_context->get_from_user_specified_snapshot,
/*referenced_key=*/"");
block_cache_tracer_->WriteBlockAccess(access_record, cache_key,
rep_->cf_name_for_tracing(),
lookup_context->referenced_key);
}
return {dict, cache_handle ? rep_->table_options.block_cache.get() : nullptr,
cache_handle, false /* own_value */};
}
// disable_prefix_seek should be set to true when prefix_extractor found in SST
// differs from the one in mutable_cf_options and index type is HashBasedIndex
InternalIteratorBase<IndexValue>* BlockBasedTable::NewIndexIterator(
@ -2028,13 +1901,17 @@ TBlockIter* BlockBasedTable::NewDataBlockIterator(
return iter;
}
const bool no_io = (ro.read_tier == kBlockCacheTier);
auto uncompression_dict_storage =
GetUncompressionDict(prefetch_buffer, no_io, get_context, lookup_context);
const UncompressionDict& uncompression_dict =
uncompression_dict_storage.GetValue() == nullptr
? UncompressionDict::GetEmptyDict()
: *uncompression_dict_storage.GetValue();
UncompressionDict uncompression_dict;
if (rep_->uncompression_dict_reader) {
const bool no_io = (ro.read_tier == kBlockCacheTier);
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
prefetch_buffer, no_io, get_context, lookup_context,
&uncompression_dict);
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
}
CachableEntry<Block> block;
s = RetrieveBlock(prefetch_buffer, ro, handle, uncompression_dict, &block,
@ -2268,7 +2145,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
if (block_entry->GetValue() == nullptr && !no_io && ro.fill_cache) {
Statistics* statistics = rep_->ioptions.statistics;
const bool maybe_compressed =
block_type != BlockType::kFilter && rep_->blocks_maybe_compressed;
block_type != BlockType::kFilter &&
block_type != BlockType::kCompressionDictionary &&
rep_->blocks_maybe_compressed;
const bool do_uncompress = maybe_compressed && !block_cache_compressed;
CompressionType raw_block_comp_type;
BlockContents raw_block_contents;
@ -2321,6 +2200,9 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
case BlockType::kFilter:
trace_block_type = TraceType::kBlockTraceFilterBlock;
break;
case BlockType::kCompressionDictionary:
trace_block_type = TraceType::kBlockTraceUncompressionDictBlock;
break;
case BlockType::kRangeDeletion:
trace_block_type = TraceType::kBlockTraceRangeDeletionBlock;
break;
@ -2568,7 +2450,9 @@ Status BlockBasedTable::RetrieveBlock(
}
const bool maybe_compressed =
block_type != BlockType::kFilter && rep_->blocks_maybe_compressed;
block_type != BlockType::kFilter &&
block_type != BlockType::kCompressionDictionary &&
rep_->blocks_maybe_compressed;
const bool do_uncompress = maybe_compressed;
std::unique_ptr<TBlocklike> block;
@ -3504,12 +3388,17 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
{
MultiGetRange data_block_range(sst_file_range, sst_file_range.begin(),
sst_file_range.end());
auto uncompression_dict_storage = GetUncompressionDict(
nullptr, no_io, sst_file_range.begin()->get_context, &lookup_context);
const UncompressionDict& uncompression_dict =
uncompression_dict_storage.GetValue() == nullptr
? UncompressionDict::GetEmptyDict()
: *uncompression_dict_storage.GetValue();
UncompressionDict uncompression_dict;
Status uncompression_dict_status;
if (rep_->uncompression_dict_reader) {
uncompression_dict_status =
rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
nullptr /* prefetch_buffer */, no_io,
sst_file_range.begin()->get_context, &lookup_context,
&uncompression_dict);
}
size_t total_len = 0;
ReadOptions ro = read_options;
ro.read_tier = kBlockCacheTier;
@ -3535,6 +3424,14 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
sst_file_range.SkipKey(miter);
continue;
}
if (!uncompression_dict_status.ok()) {
*(miter->s) = uncompression_dict_status;
data_block_range.SkipKey(miter);
sst_file_range.SkipKey(miter);
continue;
}
statuses.emplace_back();
results.emplace_back();
if (v.handle.offset() == offset) {
@ -4191,23 +4088,25 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
}
// Output compression dictionary
if (!rep_->compression_dict_handle.IsNull()) {
std::unique_ptr<const BlockContents> compression_dict_block;
s = ReadCompressionDictBlock(nullptr /* prefetch_buffer */,
&compression_dict_block);
if (rep_->uncompression_dict_reader) {
UncompressionDict uncompression_dict;
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
nullptr /* prefetch_buffer */, false /* no_io */,
nullptr /* get_context */, nullptr /* lookup_context */,
&uncompression_dict);
if (!s.ok()) {
return s;
}
assert(compression_dict_block != nullptr);
auto compression_dict = compression_dict_block->data;
const Slice& raw_dict = uncompression_dict.GetRawDict();
out_file->Append(
"Compression Dictionary:\n"
"--------------------------------------\n");
out_file->Append(" size (bytes): ");
out_file->Append(rocksdb::ToString(compression_dict.size()));
out_file->Append(rocksdb::ToString(raw_dict.size()));
out_file->Append("\n\n");
out_file->Append(" HEX ");
out_file->Append(compression_dict.ToString(true).c_str());
out_file->Append(raw_dict.ToString(true).c_str());
out_file->Append("\n\n");
}
@ -4233,29 +4132,6 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) {
return s;
}
void BlockBasedTable::Close() {
if (rep_->closed) {
return;
}
// cleanup index, filter, and compression dictionary blocks
// to avoid accessing dangling pointers
if (!rep_->table_options.no_block_cache) {
if (!rep_->compression_dict_handle.IsNull()) {
// Get the compression dictionary block key
char cache_key[kMaxCacheKeyPrefixSize + kMaxVarint64Length];
auto key =
GetCacheKey(rep_->cache_key_prefix, rep_->cache_key_prefix_size,
rep_->compression_dict_handle, cache_key);
Cache* const cache = rep_->table_options.block_cache.get();
cache->Erase(key);
}
}
rep_->closed = true;
}
Status BlockBasedTable::DumpIndexBlock(WritableFile* out_file) {
out_file->Append(
"Index Details:\n"
@ -4431,15 +4307,4 @@ void BlockBasedTable::DumpKeyValue(const Slice& key, const Slice& value,
out_file->Append("\n ------\n");
}
namespace {
void DeleteCachedUncompressionDictEntry(const Slice& /*key*/, void* value) {
UncompressionDict* dict = reinterpret_cast<UncompressionDict*>(value);
RecordTick(dict->statistics(), BLOCK_CACHE_COMPRESSION_DICT_BYTES_EVICT,
dict->ApproximateMemoryUsage());
delete dict;
}
} // anonymous namespace
} // namespace rocksdb

@ -29,6 +29,7 @@
#include "table/block_based/block_type.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/uncompression_dict_reader.h"
#include "table/format.h"
#include "table/get_context.h"
#include "table/multiget_context.h"
@ -176,8 +177,6 @@ class BlockBasedTable : public TableReader {
Status VerifyChecksum(TableReaderCaller caller) override;
void Close() override;
~BlockBasedTable();
bool TEST_FilterBlockInCache() const;
@ -242,8 +241,11 @@ class BlockBasedTable : public TableReader {
template <typename TBlocklike>
friend class FilterBlockReaderCommon;
friend class PartitionIndexReader;
friend class UncompressionDictReader;
protected:
Rep* rep_;
explicit BlockBasedTable(Rep* rep, BlockCacheTracer* const block_cache_tracer)
@ -313,10 +315,6 @@ class BlockBasedTable : public TableReader {
CachableEntry<Block>, MultiGetContext::MAX_BATCH_SIZE>* results,
char* scratch, const UncompressionDict& uncompression_dict) const;
CachableEntry<UncompressionDict> GetUncompressionDict(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context) const;
// Get the iterator from the index reader.
//
// If input_iter is not set, return a new Iterator.
@ -416,9 +414,6 @@ class BlockBasedTable : public TableReader {
InternalIterator* meta_iter,
const InternalKeyComparator& internal_comparator,
BlockCacheLookupContext* lookup_context);
Status ReadCompressionDictBlock(
FilePrefetchBuffer* prefetch_buffer,
std::unique_ptr<const BlockContents>* compression_dict_block) const;
Status PrefetchIndexAndFilterBlocks(
FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
BlockBasedTable* new_table, bool prefetch_all,
@ -514,7 +509,7 @@ struct BlockBasedTable::Rep {
std::unique_ptr<IndexReader> index_reader;
std::unique_ptr<FilterBlockReader> filter;
std::unique_ptr<UncompressionDict> uncompression_dict;
std::unique_ptr<UncompressionDictReader> uncompression_dict_reader;
enum class FilterType {
kNoFilter,
@ -566,7 +561,6 @@ struct BlockBasedTable::Rep {
bool index_key_includes_seq = true;
bool index_value_is_full = true;
bool closed = false;
const bool immortal_table;
SequenceNumber get_global_seqno(BlockType block_type) const {

@ -44,10 +44,7 @@ class TestFilterBitsBuilder : public FilterBitsBuilder {
class MockBlockBasedTable : public BlockBasedTable {
public:
explicit MockBlockBasedTable(Rep* rep)
: BlockBasedTable(rep, nullptr /* block_cache_tracer */) {
// Initialize what Open normally does as much as necessary for the test
rep->cache_key_prefix_size = 10;
}
: BlockBasedTable(rep, nullptr /* block_cache_tracer */) {}
};
class TestFilterBitsReader : public FilterBitsReader {
@ -116,7 +113,6 @@ class PluginFullFilterBlockTest : public testing::Test {
: ioptions_(options_),
env_options_(options_),
icomp_(options_.comparator) {
table_options_.no_block_cache = true;
table_options_.filter_policy.reset(new TestHashFilter);
constexpr bool skip_filters = false;
@ -210,7 +206,6 @@ class FullFilterBlockTest : public testing::Test {
: ioptions_(options_),
env_options_(options_),
icomp_(options_.comparator) {
table_options_.no_block_cache = true;
table_options_.filter_policy.reset(NewBloomFilterPolicy(10, false));
constexpr bool skip_filters = false;

@ -324,7 +324,7 @@ void PartitionedFilterBlockReader::CacheDependencies(bool pin) {
prefetch_buffer.reset(new FilePrefetchBuffer());
s = prefetch_buffer->Prefetch(rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len));
static_cast<size_t>(prefetch_len));
// After prefetch, read the partitions one by one
ReadOptions read_options;

@ -27,7 +27,6 @@ class MockedBlockBasedTable : public BlockBasedTable {
MockedBlockBasedTable(Rep* rep, PartitionedIndexBuilder* pib)
: BlockBasedTable(rep, /*block_cache_tracer=*/nullptr) {
// Initialize what Open normally does as much as necessary for the test
rep->cache_key_prefix_size = 10;
rep->index_key_includes_seq = pib->seperator_is_key_plus_seq();
rep->index_value_is_full = !pib->get_use_value_delta_encoding();
}
@ -67,9 +66,6 @@ class PartitionedFilterBlockTest
env_options_(options_),
icomp_(options_.comparator) {
table_options_.filter_policy.reset(NewBloomFilterPolicy(10, false));
table_options_.no_block_cache = true; // Otherwise BlockBasedTable::Close
// will access variable that are not
// initialized in our mocked version
table_options_.format_version = GetParam();
table_options_.index_block_restart_interval = 3;
}

@ -0,0 +1,138 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include "table/block_based/uncompression_dict_reader.h"
#include "monitoring/perf_context_imp.h"
#include "table/block_based/block_based_table_reader.h"
#include "util/compression.h"
namespace rocksdb {
Status UncompressionDictReader::Create(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<UncompressionDictReader>* uncompression_dict_reader) {
assert(table);
assert(table->get_rep());
assert(!pin || prefetch);
assert(uncompression_dict_reader);
CachableEntry<BlockContents> uncompression_dict_block;
if (prefetch || !use_cache) {
const Status s = ReadUncompressionDictionaryBlock(
table, prefetch_buffer, ReadOptions(), nullptr /* get_context */,
lookup_context, &uncompression_dict_block);
if (!s.ok()) {
return s;
}
if (use_cache && !pin) {
uncompression_dict_block.Reset();
}
}
uncompression_dict_reader->reset(
new UncompressionDictReader(table, std::move(uncompression_dict_block)));
return Status::OK();
}
Status UncompressionDictReader::ReadUncompressionDictionaryBlock(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& read_options, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<BlockContents>* uncompression_dict_block) {
// TODO: add perf counter for compression dictionary read time
assert(table);
assert(uncompression_dict_block);
assert(uncompression_dict_block->IsEmpty());
const BlockBasedTable::Rep* const rep = table->get_rep();
assert(rep);
assert(!rep->compression_dict_handle.IsNull());
const Status s = table->RetrieveBlock(
prefetch_buffer, read_options, rep->compression_dict_handle,
UncompressionDict::GetEmptyDict(), uncompression_dict_block,
BlockType::kCompressionDictionary, get_context, lookup_context);
if (!s.ok()) {
ROCKS_LOG_WARN(
rep->ioptions.info_log,
"Encountered error while reading data from compression dictionary "
"block %s",
s.ToString().c_str());
}
return s;
}
Status UncompressionDictReader::GetOrReadUncompressionDictionaryBlock(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<BlockContents>* uncompression_dict_block) const {
assert(uncompression_dict_block);
if (!uncompression_dict_block_.IsEmpty()) {
uncompression_dict_block->SetUnownedValue(
uncompression_dict_block_.GetValue());
return Status::OK();
}
ReadOptions read_options;
if (no_io) {
read_options.read_tier = kBlockCacheTier;
}
return ReadUncompressionDictionaryBlock(table_, prefetch_buffer, read_options,
get_context, lookup_context,
uncompression_dict_block);
}
Status UncompressionDictReader::GetOrReadUncompressionDictionary(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
UncompressionDict* uncompression_dict) const {
CachableEntry<BlockContents> uncompression_dict_block;
const Status s = GetOrReadUncompressionDictionaryBlock(
prefetch_buffer, no_io, get_context, lookup_context,
&uncompression_dict_block);
if (!s.ok()) {
return s;
}
assert(uncompression_dict);
assert(table_);
assert(table_->get_rep());
UncompressionDict dict(uncompression_dict_block.GetValue()->data,
table_->get_rep()->blocks_definitely_zstd_compressed);
*uncompression_dict = std::move(dict);
uncompression_dict_block.TransferTo(uncompression_dict);
return Status::OK();
}
size_t UncompressionDictReader::ApproximateMemoryUsage() const {
assert(!uncompression_dict_block_.GetOwnValue() ||
uncompression_dict_block_.GetValue() != nullptr);
size_t usage = uncompression_dict_block_.GetOwnValue()
? uncompression_dict_block_.GetValue()->ApproximateMemoryUsage()
: 0;
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
usage += malloc_usable_size(const_cast<UncompressionDictReader*>(this));
#else
usage += sizeof(*this);
#endif // ROCKSDB_MALLOC_USABLE_SIZE
return usage;
}
} // namespace rocksdb

@ -0,0 +1,64 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include <cassert>
#include "table/block_based/cachable_entry.h"
#include "table/format.h"
namespace rocksdb {
class BlockBasedTable;
struct BlockCacheLookupContext;
class FilePrefetchBuffer;
class GetContext;
struct ReadOptions;
struct UncompressionDict;
// Provides access to the uncompression dictionary regardless of whether
// it is owned by the reader or stored in the cache, or whether it is pinned
// in the cache or not.
class UncompressionDictReader {
public:
static Status Create(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
std::unique_ptr<UncompressionDictReader>* uncompression_dict_reader);
Status GetOrReadUncompressionDictionary(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
UncompressionDict* uncompression_dict) const;
size_t ApproximateMemoryUsage() const;
private:
UncompressionDictReader(
const BlockBasedTable* t,
CachableEntry<BlockContents>&& uncompression_dict_block)
: table_(t),
uncompression_dict_block_(std::move(uncompression_dict_block)) {
assert(table_);
}
static Status ReadUncompressionDictionaryBlock(
const BlockBasedTable* table, FilePrefetchBuffer* prefetch_buffer,
const ReadOptions& read_options, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<BlockContents>* uncompression_dict_block);
Status GetOrReadUncompressionDictionaryBlock(
FilePrefetchBuffer* prefetch_buffer, bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
CachableEntry<BlockContents>* uncompression_dict_block) const;
const BlockBasedTable* table_;
CachableEntry<BlockContents> uncompression_dict_block_;
};
} // namespace rocksdb

@ -124,8 +124,6 @@ class TableReader {
virtual Status VerifyChecksum(TableReaderCaller /*caller*/) {
return Status::NotSupported("VerifyChecksum() not supported");
}
virtual void Close() {}
};
} // namespace rocksdb

@ -2889,176 +2889,6 @@ TEST_P(BlockBasedTableTest, BlockReadCountTest) {
}
}
// A wrapper around LRICache that also keeps track of data blocks (in contrast
// with the objects) in the cache. The class is very simple and can be used only
// for trivial tests.
class MockCache : public LRUCache {
public:
MockCache(size_t capacity, int num_shard_bits, bool strict_capacity_limit,
double high_pri_pool_ratio)
: LRUCache(capacity, num_shard_bits, strict_capacity_limit,
high_pri_pool_ratio) {}
Status Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value),
Handle** handle = nullptr,
Priority priority = Priority::LOW) override {
// Replace the deleter with our own so that we keep track of data blocks
// erased from the cache
deleters_[key.ToString()] = deleter;
return ShardedCache::Insert(key, value, charge, &MockDeleter, handle,
priority);
}
// This is called by the application right after inserting a data block
void TEST_mark_as_data_block(const Slice& key, size_t charge) override {
marked_data_in_cache_[key.ToString()] = charge;
marked_size_ += charge;
}
using DeleterFunc = void (*)(const Slice& key, void* value);
static std::map<std::string, DeleterFunc> deleters_;
static std::map<std::string, size_t> marked_data_in_cache_;
static size_t marked_size_;
static void MockDeleter(const Slice& key, void* value) {
// If the item was marked for being data block, decrease its usage from the
// total data block usage of the cache
if (marked_data_in_cache_.find(key.ToString()) !=
marked_data_in_cache_.end()) {
marked_size_ -= marked_data_in_cache_[key.ToString()];
}
// Then call the origianl deleter
assert(deleters_.find(key.ToString()) != deleters_.end());
auto deleter = deleters_[key.ToString()];
deleter(key, value);
}
};
size_t MockCache::marked_size_ = 0;
std::map<std::string, MockCache::DeleterFunc> MockCache::deleters_;
std::map<std::string, size_t> MockCache::marked_data_in_cache_;
// Block cache can contain raw data blocks as well as general objects. If an
// object depends on the table to be live, it then must be destructed before the
// table is closed. This test makes sure that the only items remains in the
// cache after the table is closed are raw data blocks.
TEST_P(BlockBasedTableTest, NoObjectInCacheAfterTableClose) {
std::vector<CompressionType> compression_types{kNoCompression};
// The following are the compression library versions supporting compression
// dictionaries. See the test case CacheCompressionDict in the
// DBBlockCacheTest suite.
#ifdef ZLIB
compression_types.push_back(kZlibCompression);
#endif // ZLIB
#if LZ4_VERSION_NUMBER >= 10400
compression_types.push_back(kLZ4Compression);
compression_types.push_back(kLZ4HCCompression);
#endif // LZ4_VERSION_NUMBER >= 10400
#if ZSTD_VERSION_NUMBER >= 500
compression_types.push_back(kZSTD);
#endif // ZSTD_VERSION_NUMBER >= 500
for (int level: {-1, 0, 1, 10}) {
for (auto index_type :
{BlockBasedTableOptions::IndexType::kBinarySearch,
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch}) {
for (bool block_based_filter : {true, false}) {
for (bool partition_filter : {true, false}) {
if (partition_filter &&
(block_based_filter ||
index_type !=
BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch)) {
continue;
}
for (bool index_and_filter_in_cache : {true, false}) {
for (bool pin_l0 : {true, false}) {
for (bool pin_top_level : {true, false}) {
if (pin_l0 && !index_and_filter_in_cache) {
continue;
}
for (auto compression_type : compression_types) {
for (uint32_t max_dict_bytes : {0, 1 << 14}) {
if (compression_type == kNoCompression && max_dict_bytes)
continue;
// Create a table
Options opt;
std::unique_ptr<InternalKeyComparator> ikc;
ikc.reset(new test::PlainInternalKeyComparator(
opt.comparator));
opt.compression = compression_type;
opt.compression_opts.max_dict_bytes = max_dict_bytes;
BlockBasedTableOptions table_options =
GetBlockBasedTableOptions();
table_options.block_size = 1024;
table_options.index_type = index_type;
table_options.pin_l0_filter_and_index_blocks_in_cache =
pin_l0;
table_options.pin_top_level_index_and_filter =
pin_top_level;
table_options.partition_filters = partition_filter;
table_options.cache_index_and_filter_blocks =
index_and_filter_in_cache;
// big enough so we don't ever lose cached values.
table_options.block_cache = std::make_shared<MockCache>(
16 * 1024 * 1024, 4, false, 0.0);
table_options.filter_policy.reset(
rocksdb::NewBloomFilterPolicy(10, block_based_filter));
opt.table_factory.reset(NewBlockBasedTableFactory(
table_options));
bool convert_to_internal_key = false;
TableConstructor c(BytewiseComparator(),
convert_to_internal_key, level);
std::string user_key = "k01";
std::string key =
InternalKey(user_key, 0, kTypeValue).Encode().ToString();
c.Add(key, "hello");
std::vector<std::string> keys;
stl_wrappers::KVMap kvmap;
const ImmutableCFOptions ioptions(opt);
const MutableCFOptions moptions(opt);
c.Finish(opt, ioptions, moptions, table_options, *ikc,
&keys, &kvmap);
// Doing a read to make index/filter loaded into the cache
auto table_reader =
dynamic_cast<BlockBasedTable*>(c.GetTableReader());
PinnableSlice value;
GetContext get_context(opt.comparator, nullptr, nullptr,
nullptr, GetContext::kNotFound, user_key, &value,
nullptr, nullptr, nullptr, nullptr);
InternalKey ikey(user_key, 0, kTypeValue);
auto s = table_reader->Get(ReadOptions(), key, &get_context,
moptions.prefix_extractor.get());
ASSERT_EQ(get_context.State(), GetContext::kFound);
ASSERT_STREQ(value.data(), "hello");
// Close the table
c.ResetTableReader();
auto usage = table_options.block_cache->GetUsage();
auto pinned_usage =
table_options.block_cache->GetPinnedUsage();
// The only usage must be for marked data blocks
ASSERT_EQ(usage, MockCache::marked_size_);
// There must be some pinned data since PinnableSlice has
// not released them yet
ASSERT_GT(pinned_usage, 0);
// Release pinnable slice reousrces
value.Reset();
pinned_usage = table_options.block_cache->GetPinnedUsage();
ASSERT_EQ(pinned_usage, 0);
}
}
}
}
}
}
}
}
} // level
}
TEST_P(BlockBasedTableTest, BlockCacheLeak) {
// Check that when we reopen a table we don't lose access to blocks already
// in the cache. This test checks whether the Table actually makes use of the

@ -21,6 +21,7 @@
#include <string>
#include "memory/memory_allocator.h"
#include "rocksdb/cleanable.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
#include "util/coding.h"
@ -216,36 +217,60 @@ struct CompressionDict {
// Holds dictionary and related data, like ZSTD's digested uncompression
// dictionary.
struct UncompressionDict {
struct UncompressionDict : public Cleanable {
// Block containing the data for the compression dictionary. It is non-empty
// only if the constructor that takes a string parameter is used.
std::string dict_;
// Slice pointing to the compression dictionary data. Points to
// dict_ if the string constructor is used. In the case of the Slice
// constructor, it is a copy of the Slice passed by the caller.
Slice slice_;
#ifdef ROCKSDB_ZSTD_DDICT
ZSTD_DDict* zstd_ddict_;
// Processed version of the contents of slice_ for ZSTD compression.
ZSTD_DDict* zstd_ddict_ = nullptr;
#endif // ROCKSDB_ZSTD_DDICT
// Block containing the data for the compression dictionary. It may be
// redundant with the data held in `zstd_ddict_`.
std::string dict_;
// This `Statistics` pointer is intended to be used upon block cache eviction,
// so only needs to be populated on `UncompressionDict`s that'll be inserted
// into block cache.
Statistics* statistics_;
// Slice constructor: it is the caller's responsibility to either
// a) make sure slice remains valid throughout the lifecycle of this object OR
// b) transfer the management of the underlying resource (e.g. cache handle)
// to this object, in which case UncompressionDict is self-contained, and the
// resource is guaranteed to be released (via the cleanup logic in Cleanable)
// when UncompressionDict is destroyed.
#ifdef ROCKSDB_ZSTD_DDICT
UncompressionDict(std::string dict, bool using_zstd,
Statistics* _statistics = nullptr) {
UncompressionDict(Slice slice, bool using_zstd)
#else // ROCKSDB_ZSTD_DDICT
UncompressionDict(std::string dict, bool /*using_zstd*/,
Statistics* _statistics = nullptr) {
UncompressionDict(Slice slice, bool /*using_zstd*/)
#endif // ROCKSDB_ZSTD_DDICT
dict_ = std::move(dict);
statistics_ = _statistics;
: slice_(std::move(slice)) {
#ifdef ROCKSDB_ZSTD_DDICT
zstd_ddict_ = nullptr;
if (!dict_.empty() && using_zstd) {
zstd_ddict_ = ZSTD_createDDict_byReference(dict_.data(), dict_.size());
if (!slice_.empty() && using_zstd) {
zstd_ddict_ = ZSTD_createDDict_byReference(slice_.data(), slice_.size());
assert(zstd_ddict_ != nullptr);
}
#endif // ROCKSDB_ZSTD_DDICT
}
// String constructor: results in a self-contained UncompressionDict.
UncompressionDict(std::string dict, bool using_zstd)
: UncompressionDict(Slice(dict), using_zstd) {
dict_ = std::move(dict);
}
UncompressionDict(UncompressionDict&& rhs)
: dict_(std::move(rhs.dict_)),
slice_(std::move(rhs.slice_))
#ifdef ROCKSDB_ZSTD_DDICT
,
zstd_ddict_(rhs.zstd_ddict_)
#endif
{
#ifdef ROCKSDB_ZSTD_DDICT
rhs.zstd_ddict_ = nullptr;
#endif
}
~UncompressionDict() {
#ifdef ROCKSDB_ZSTD_DDICT
size_t res = 0;
@ -257,20 +282,34 @@ struct UncompressionDict {
#endif // ROCKSDB_ZSTD_DDICT
}
UncompressionDict& operator=(UncompressionDict&& rhs) {
if (this == &rhs) {
return *this;
}
dict_ = std::move(rhs.dict_);
slice_ = std::move(rhs.slice_);
#ifdef ROCKSDB_ZSTD_DDICT
zstd_ddict_ = rhs.zstd_ddict_;
rhs.zstd_ddict_ = nullptr;
#endif
return *this;
}
const Slice& GetRawDict() const { return slice_; }
#ifdef ROCKSDB_ZSTD_DDICT
const ZSTD_DDict* GetDigestedZstdDDict() const { return zstd_ddict_; }
#endif // ROCKSDB_ZSTD_DDICT
Slice GetRawDict() const { return dict_; }
static const UncompressionDict& GetEmptyDict() {
static UncompressionDict empty_dict{};
return empty_dict;
}
Statistics* statistics() const { return statistics_; }
size_t ApproximateMemoryUsage() {
size_t ApproximateMemoryUsage() const {
size_t usage = 0;
usage += sizeof(struct UncompressionDict);
#ifdef ROCKSDB_ZSTD_DDICT
@ -281,11 +320,9 @@ struct UncompressionDict {
}
UncompressionDict() = default;
// Disable copy/move
// Disable copy
UncompressionDict(const CompressionDict&) = delete;
UncompressionDict& operator=(const CompressionDict&) = delete;
UncompressionDict(CompressionDict&&) = delete;
UncompressionDict& operator=(CompressionDict&&) = delete;
};
class CompressionContext {
@ -725,7 +762,7 @@ inline CacheAllocationPtr Zlib_Uncompress(
return nullptr;
}
Slice compression_dict = info.dict().GetRawDict();
const Slice& compression_dict = info.dict().GetRawDict();
if (compression_dict.size()) {
// Initialize the compression library's dictionary
st = inflateSetDictionary(
@ -1040,7 +1077,7 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
auto output = AllocateBlock(output_len, allocator);
#if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
Slice compression_dict = info.dict().GetRawDict();
const Slice& compression_dict = info.dict().GetRawDict();
if (compression_dict.size()) {
LZ4_setStreamDecode(stream, compression_dict.data(),
static_cast<int>(compression_dict.size()));

Loading…
Cancel
Save