Revert "Digest ZSTD compression dictionary once per SST file (#4251)" (#4347)

Summary:
Reverting is needed to unblock a user building against master, who is blocked for multiple days due to a thread-safety issue in `GetEmptyDict`. We haven't been able to fix it quickly, so reverting.

Simply ran `git revert 6c40806e51a89386d2b066fddf73d3fd03a36f65`. There were no merge conflicts.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4347

Differential Revision: D9668365

Pulled By: ajkr

fbshipit-source-id: 0c56334f0a23cf5ee0233d4e4679eae6709739cd
main
Andrew Kryczka 6 years ago committed by Facebook Github Bot
parent 64324e329e
commit 2c14662213
  1. 77
      table/block_based_table_builder.cc
  2. 2
      table/block_based_table_builder.h
  3. 21
      table/block_based_table_reader.cc
  4. 11
      table/block_fetcher.cc
  5. 21
      table/format.cc
  6. 11
      table/format.h
  7. 52
      tools/db_bench_tool.cc
  8. 332
      util/compression.h
  9. 14
      utilities/blob_db/blob_db_impl.cc
  10. 8
      utilities/blob_db/blob_dump_tool.cc
  11. 18
      utilities/column_aware_encoding_util.cc

@ -103,19 +103,19 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
} // namespace } // namespace
// format_version is the block format as defined in include/rocksdb/table.h // format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info, Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
CompressionType* type, uint32_t format_version, CompressionType* type, uint32_t format_version,
std::string* compressed_output) { std::string* compressed_output) {
*type = compression_info.type(); *type = compression_ctx.type();
if (compression_info.type() == kNoCompression) { if (compression_ctx.type() == kNoCompression) {
return raw; return raw;
} }
// Will return compressed block contents if (1) the compression method is // Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough". // supported in this platform and (2) the compression rate is "good enough".
switch (compression_info.type()) { switch (compression_ctx.type()) {
case kSnappyCompression: case kSnappyCompression:
if (Snappy_Compress(compression_info, raw.data(), raw.size(), if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
compressed_output) && compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
@ -123,7 +123,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
break; // fall back to no compression. break; // fall back to no compression.
case kZlibCompression: case kZlibCompression:
if (Zlib_Compress( if (Zlib_Compress(
compression_info, compression_ctx,
GetCompressFormatForVersion(kZlibCompression, format_version), GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output) && raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
@ -132,7 +132,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
break; // fall back to no compression. break; // fall back to no compression.
case kBZip2Compression: case kBZip2Compression:
if (BZip2_Compress( if (BZip2_Compress(
compression_info, compression_ctx,
GetCompressFormatForVersion(kBZip2Compression, format_version), GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output) && raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
@ -141,7 +141,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
break; // fall back to no compression. break; // fall back to no compression.
case kLZ4Compression: case kLZ4Compression:
if (LZ4_Compress( if (LZ4_Compress(
compression_info, compression_ctx,
GetCompressFormatForVersion(kLZ4Compression, format_version), GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output) && raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
@ -150,7 +150,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
break; // fall back to no compression. break; // fall back to no compression.
case kLZ4HCCompression: case kLZ4HCCompression:
if (LZ4HC_Compress( if (LZ4HC_Compress(
compression_info, compression_ctx,
GetCompressFormatForVersion(kLZ4HCCompression, format_version), GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output) && raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
@ -166,7 +166,7 @@ Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
break; break;
case kZSTD: case kZSTD:
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
if (ZSTD_Compress(compression_info, raw.data(), raw.size(), if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
compressed_output) && compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
@ -260,9 +260,8 @@ struct BlockBasedTableBuilder::Rep {
PartitionedIndexBuilder* p_index_builder_ = nullptr; PartitionedIndexBuilder* p_index_builder_ = nullptr;
std::string last_key; std::string last_key;
CompressionType compression_type; // Compression dictionary or nullptr
CompressionOptions compression_opts; const std::string* compression_dict;
CompressionDict compression_dict;
CompressionContext compression_ctx; CompressionContext compression_ctx;
std::unique_ptr<UncompressionContext> verify_ctx; std::unique_ptr<UncompressionContext> verify_ctx;
TableProperties props; TableProperties props;
@ -313,9 +312,8 @@ struct BlockBasedTableBuilder::Rep {
table_options.data_block_hash_table_util_ratio), table_options.data_block_hash_table_util_ratio),
range_del_block(1 /* block_restart_interval */), range_del_block(1 /* block_restart_interval */),
internal_prefix_transform(_moptions.prefix_extractor.get()), internal_prefix_transform(_moptions.prefix_extractor.get()),
compression_type(_compression_type), compression_dict(_compression_dict),
compression_opts(_compression_opts), compression_ctx(_compression_type, _compression_opts),
compression_ctx(_compression_type),
use_delta_encoding_for_index_values(table_opt.format_version >= 4 && use_delta_encoding_for_index_values(table_opt.format_version >= 4 &&
!table_opt.block_align), !table_opt.block_align),
compressed_cache_key_prefix_size(0), compressed_cache_key_prefix_size(0),
@ -326,15 +324,6 @@ struct BlockBasedTableBuilder::Rep {
column_family_name(_column_family_name), column_family_name(_column_family_name),
creation_time(_creation_time), creation_time(_creation_time),
oldest_key_time(_oldest_key_time) { oldest_key_time(_oldest_key_time) {
if (_compression_dict != nullptr) {
compression_dict.Init(*_compression_dict,
CompressionDict::Mode::kCompression,
_compression_type, _compression_opts.level);
} else {
compression_dict.Init(Slice() /* dict */,
CompressionDict::Mode::kEmpty,
_compression_type, _compression_opts.level);
}
if (table_options.index_type == if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) { BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
@ -365,7 +354,7 @@ struct BlockBasedTableBuilder::Rep {
_moptions.prefix_extractor != nullptr)); _moptions.prefix_extractor != nullptr));
if (table_options.verify_compression) { if (table_options.verify_compression) {
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
compression_type)); compression_ctx.type()));
} }
} }
@ -509,7 +498,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
assert(ok()); assert(ok());
Rep* r = rep_; Rep* r = rep_;
auto type = r->compression_type; auto type = r->compression_ctx.type();
Slice block_contents; Slice block_contents;
bool abort_compression = false; bool abort_compression = false;
@ -517,12 +506,24 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics)); ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (raw_block_contents.size() < kCompressionSizeLimit) { if (raw_block_contents.size() < kCompressionSizeLimit) {
CompressionInfo compression_info( Slice compression_dict;
r->compression_opts, r->compression_ctx, if (is_data_block && r->compression_dict && r->compression_dict->size()) {
is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(), r->compression_ctx.dict() = *r->compression_dict;
r->compression_type); if (r->table_options.verify_compression) {
assert(r->verify_ctx != nullptr);
r->verify_ctx->dict() = *r->compression_dict;
}
} else {
// Clear dictionary
r->compression_ctx.dict() = Slice();
if (r->table_options.verify_compression) {
assert(r->verify_ctx != nullptr);
r->verify_ctx->dict() = Slice();
}
}
block_contents = block_contents =
CompressBlock(raw_block_contents, compression_info, &type, CompressBlock(raw_block_contents, r->compression_ctx, &type,
r->table_options.format_version, &r->compressed_output); r->table_options.format_version, &r->compressed_output);
// Some of the compression algorithms are known to be unreliable. If // Some of the compression algorithms are known to be unreliable. If
@ -531,12 +532,8 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (type != kNoCompression && r->table_options.verify_compression) { if (type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents; BlockContents contents;
UncompressionInfo uncompression_info(
*r->verify_ctx,
is_data_block ? r->compression_dict : CompressionDict::GetEmptyDict(),
r->compression_type);
Status stat = UncompressBlockContentsForCompressionType( Status stat = UncompressBlockContentsForCompressionType(
uncompression_info, block_contents.data(), block_contents.size(), *r->verify_ctx, block_contents.data(), block_contents.size(),
&contents, r->table_options.format_version, r->ioptions); &contents, r->table_options.format_version, r->ioptions);
if (stat.ok()) { if (stat.ok()) {
@ -783,7 +780,7 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
? rep_->ioptions.merge_operator->Name() ? rep_->ioptions.merge_operator->Name()
: "nullptr"; : "nullptr";
rep_->props.compression_name = rep_->props.compression_name =
CompressionTypeToString(rep_->compression_type); CompressionTypeToString(rep_->compression_ctx.type());
rep_->props.prefix_extractor_name = rep_->props.prefix_extractor_name =
rep_->moptions.prefix_extractor != nullptr rep_->moptions.prefix_extractor != nullptr
? rep_->moptions.prefix_extractor->Name() ? rep_->moptions.prefix_extractor->Name()
@ -832,10 +829,10 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
void BlockBasedTableBuilder::WriteCompressionDictBlock( void BlockBasedTableBuilder::WriteCompressionDictBlock(
MetaIndexBuilder* meta_index_builder) { MetaIndexBuilder* meta_index_builder) {
if (rep_->compression_dict.GetRawDict().size()) { if (rep_->compression_dict && rep_->compression_dict->size()) {
BlockHandle compression_dict_block_handle; BlockHandle compression_dict_block_handle;
if (ok()) { if (ok()) {
WriteRawBlock(rep_->compression_dict.GetRawDict(), kNoCompression, WriteRawBlock(*rep_->compression_dict, kNoCompression,
&compression_dict_block_handle); &compression_dict_block_handle);
} }
if (ok()) { if (ok()) {

@ -131,7 +131,7 @@ class BlockBasedTableBuilder : public TableBuilder {
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max(); const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
}; };
Slice CompressBlock(const Slice& raw, const CompressionInfo& info, Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
CompressionType* type, uint32_t format_version, CompressionType* type, uint32_t format_version,
std::string* compressed_output); std::string* compressed_output);

@ -1226,12 +1226,9 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents; BlockContents contents;
UncompressionContext context(compressed_block->compression_type()); UncompressionContext uncompresssion_ctx(compressed_block->compression_type(),
CompressionDict dict; compression_dict);
dict.Init(compression_dict, CompressionDict::Mode::kUncompression, s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
compressed_block->compression_type());
UncompressionInfo info(context, dict, compressed_block->compression_type());
s = UncompressBlockContents(info, compressed_block->data(),
compressed_block->size(), &contents, compressed_block->size(), &contents,
format_version, ioptions); format_version, ioptions);
@ -1304,13 +1301,11 @@ Status BlockBasedTable::PutDataBlockToCache(
BlockContents contents; BlockContents contents;
Statistics* statistics = ioptions.statistics; Statistics* statistics = ioptions.statistics;
if (raw_block->compression_type() != kNoCompression) { if (raw_block->compression_type() != kNoCompression) {
UncompressionContext context(raw_block->compression_type()); UncompressionContext uncompression_ctx(raw_block->compression_type(),
CompressionDict dict; compression_dict);
dict.Init(compression_dict, CompressionDict::Mode::kUncompression, s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->compression_type()); raw_block->size(), &contents, format_version,
UncompressionInfo info(context, dict, raw_block->compression_type()); ioptions);
s = UncompressBlockContents(info, raw_block->data(), raw_block->size(),
&contents, format_version, ioptions);
} }
if (!s.ok()) { if (!s.ok()) {
delete raw_block; delete raw_block;

@ -227,13 +227,10 @@ Status BlockFetcher::ReadBlockContents() {
if (do_uncompress_ && compression_type != kNoCompression) { if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache // compressed page, uncompress, update cache
UncompressionContext context(compression_type); UncompressionContext uncompression_ctx(compression_type, compression_dict_);
CompressionDict dict; status_ =
dict.Init(compression_dict_, CompressionDict::Mode::kUncompression, UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
compression_type); contents_, footer_.version(), ioptions_);
UncompressionInfo info(context, dict, compression_type);
status_ = UncompressBlockContents(info, slice_.data(), block_size_,
contents_, footer_.version(), ioptions_);
} else { } else {
GetBlockContents(); GetBlockContents();
} }

@ -284,18 +284,18 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
} }
Status UncompressBlockContentsForCompressionType( Status UncompressBlockContentsForCompressionType(
const UncompressionInfo& uncompression_info, const char* data, size_t n, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t format_version, BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions) { const ImmutableCFOptions& ioptions) {
std::unique_ptr<char[]> ubuf; std::unique_ptr<char[]> ubuf;
assert(uncompression_info.type() != kNoCompression && assert(uncompression_ctx.type() != kNoCompression &&
"Invalid compression type"); "Invalid compression type");
StopWatchNano timer(ioptions.env, StopWatchNano timer(ioptions.env,
ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
int decompress_size = 0; int decompress_size = 0;
switch (uncompression_info.type()) { switch (uncompression_ctx.type()) {
case kSnappyCompression: { case kSnappyCompression: {
size_t ulength = 0; size_t ulength = 0;
static char snappy_corrupt_msg[] = static char snappy_corrupt_msg[] =
@ -312,7 +312,7 @@ Status UncompressBlockContentsForCompressionType(
} }
case kZlibCompression: case kZlibCompression:
ubuf.reset(Zlib_Uncompress( ubuf.reset(Zlib_Uncompress(
uncompression_info, data, n, &decompress_size, uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kZlibCompression, format_version))); GetCompressFormatForVersion(kZlibCompression, format_version)));
if (!ubuf) { if (!ubuf) {
static char zlib_corrupt_msg[] = static char zlib_corrupt_msg[] =
@ -336,7 +336,7 @@ Status UncompressBlockContentsForCompressionType(
break; break;
case kLZ4Compression: case kLZ4Compression:
ubuf.reset(LZ4_Uncompress( ubuf.reset(LZ4_Uncompress(
uncompression_info, data, n, &decompress_size, uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4Compression, format_version))); GetCompressFormatForVersion(kLZ4Compression, format_version)));
if (!ubuf) { if (!ubuf) {
static char lz4_corrupt_msg[] = static char lz4_corrupt_msg[] =
@ -348,7 +348,7 @@ Status UncompressBlockContentsForCompressionType(
break; break;
case kLZ4HCCompression: case kLZ4HCCompression:
ubuf.reset(LZ4_Uncompress( ubuf.reset(LZ4_Uncompress(
uncompression_info, data, n, &decompress_size, uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4HCCompression, format_version))); GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
if (!ubuf) { if (!ubuf) {
static char lz4hc_corrupt_msg[] = static char lz4hc_corrupt_msg[] =
@ -370,8 +370,7 @@ Status UncompressBlockContentsForCompressionType(
break; break;
case kZSTD: case kZSTD:
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
ubuf.reset( ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
ZSTD_Uncompress(uncompression_info, data, n, &decompress_size));
if (!ubuf) { if (!ubuf) {
static char zstd_corrupt_msg[] = static char zstd_corrupt_msg[] =
"ZSTD not supported or corrupted ZSTD compressed block contents"; "ZSTD not supported or corrupted ZSTD compressed block contents";
@ -401,14 +400,14 @@ Status UncompressBlockContentsForCompressionType(
// buffer is returned via 'result' and it is upto the caller to // buffer is returned via 'result' and it is upto the caller to
// free this buffer. // free this buffer.
// format_version is the block format as defined in include/rocksdb/table.h // format_version is the block format as defined in include/rocksdb/table.h
Status UncompressBlockContents(const UncompressionInfo& uncompression_info, Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
const char* data, size_t n, const char* data, size_t n,
BlockContents* contents, uint32_t format_version, BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions) { const ImmutableCFOptions& ioptions) {
assert(data[n] != kNoCompression); assert(data[n] != kNoCompression);
assert(data[n] == uncompression_info.type()); assert(data[n] == uncompression_ctx.type());
return UncompressBlockContentsForCompressionType( return UncompressBlockContentsForCompressionType(
uncompression_info, data, n, contents, format_version, ioptions); uncompression_ctx, data, n, contents, format_version, ioptions);
} }
} // namespace rocksdb } // namespace rocksdb

@ -250,17 +250,16 @@ extern Status ReadBlockContents(
// free this buffer. // free this buffer.
// For description of compress_format_version and possible values, see // For description of compress_format_version and possible values, see
// util/compression.h // util/compression.h
extern Status UncompressBlockContents(const UncompressionInfo& info, extern Status UncompressBlockContents(
const char* data, size_t n, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, BlockContents* contents, uint32_t compress_format_version,
uint32_t compress_format_version, const ImmutableCFOptions& ioptions);
const ImmutableCFOptions& ioptions);
// This is an extension to UncompressBlockContents that accepts // This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks // a specific compression type. This is used by un-wrapped blocks
// with no compression header. // with no compression header.
extern Status UncompressBlockContentsForCompressionType( extern Status UncompressBlockContentsForCompressionType(
const UncompressionInfo& info, const char* data, size_t n, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, uint32_t compress_format_version, BlockContents* contents, uint32_t compress_format_version,
const ImmutableCFOptions& ioptions); const ImmutableCFOptions& ioptions);

@ -1991,28 +1991,28 @@ class Benchmark {
return true; return true;
} }
inline bool CompressSlice(const CompressionInfo& compression_info, inline bool CompressSlice(const CompressionContext& compression_ctx,
const Slice& input, std::string* compressed) { const Slice& input, std::string* compressed) {
bool ok = true; bool ok = true;
switch (FLAGS_compression_type_e) { switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression: case rocksdb::kSnappyCompression:
ok = Snappy_Compress(compression_info, input.data(), input.size(), ok = Snappy_Compress(compression_ctx, input.data(), input.size(),
compressed); compressed);
break; break;
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
ok = Zlib_Compress(compression_info, 2, input.data(), input.size(), ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(),
compressed); compressed);
break; break;
case rocksdb::kBZip2Compression: case rocksdb::kBZip2Compression:
ok = BZip2_Compress(compression_info, 2, input.data(), input.size(), ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(),
compressed); compressed);
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
ok = LZ4_Compress(compression_info, 2, input.data(), input.size(), ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(),
compressed); compressed);
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(), ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(),
compressed); compressed);
break; break;
case rocksdb::kXpressCompression: case rocksdb::kXpressCompression:
@ -2020,7 +2020,7 @@ class Benchmark {
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kZSTD: case rocksdb::kZSTD:
ok = ZSTD_Compress(compression_info, input.data(), input.size(), ok = ZSTD_Compress(compression_ctx, input.data(), input.size(),
compressed); compressed);
break; break;
default: default:
@ -2103,11 +2103,10 @@ class Benchmark {
const int len = FLAGS_block_size; const int len = FLAGS_block_size;
std::string input_str(len, 'y'); std::string input_str(len, 'y');
std::string compressed; std::string compressed;
CompressionOptions opts; CompressionContext compression_ctx(FLAGS_compression_type_e,
CompressionContext context(FLAGS_compression_type_e); Options().compression_opts);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), bool result =
FLAGS_compression_type_e); CompressSlice(compression_ctx, Slice(input_str), &compressed);
bool result = CompressSlice(info, Slice(input_str), &compressed);
if (!result) { if (!result) {
fprintf(stdout, "WARNING: %s compression is not enabled\n", fprintf(stdout, "WARNING: %s compression is not enabled\n",
@ -2957,14 +2956,13 @@ void VerifyDBFromDB(std::string& truth_db_name) {
int64_t produced = 0; int64_t produced = 0;
bool ok = true; bool ok = true;
std::string compressed; std::string compressed;
CompressionOptions opts; CompressionContext compression_ctx(FLAGS_compression_type_e,
CompressionContext context(FLAGS_compression_type_e); Options().compression_opts);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e);
// Compress 1G // Compress 1G
while (ok && bytes < int64_t(1) << 30) { while (ok && bytes < int64_t(1) << 30) {
compressed.clear(); compressed.clear();
ok = CompressSlice(info, input, &compressed); ok = CompressSlice(compression_ctx, input, &compressed);
produced += compressed.size(); produced += compressed.size();
bytes += input.size(); bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress); thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
@ -2986,17 +2984,11 @@ void VerifyDBFromDB(std::string& truth_db_name) {
Slice input = gen.Generate(FLAGS_block_size); Slice input = gen.Generate(FLAGS_block_size);
std::string compressed; std::string compressed;
CompressionContext compression_ctx(FLAGS_compression_type_e);
CompressionOptions compression_opts;
CompressionInfo compression_info(compression_opts, compression_ctx,
CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e);
UncompressionContext uncompression_ctx(FLAGS_compression_type_e); UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
UncompressionInfo uncompression_info(uncompression_ctx, CompressionContext compression_ctx(FLAGS_compression_type_e,
CompressionDict::GetEmptyDict(), Options().compression_opts);
FLAGS_compression_type_e);
bool ok = CompressSlice(compression_info, input, &compressed); bool ok = CompressSlice(compression_ctx, input, &compressed);
int64_t bytes = 0; int64_t bytes = 0;
int decompress_size; int decompress_size;
while (ok && bytes < 1024 * 1048576) { while (ok && bytes < 1024 * 1048576) {
@ -3016,7 +3008,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
break; break;
} }
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(uncompression_info, compressed.data(), uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2); compressed.size(), &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
@ -3026,12 +3018,12 @@ void VerifyDBFromDB(std::string& truth_db_name) {
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(), uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2); compressed.size(), &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(), uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size, 2); compressed.size(), &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
@ -3041,7 +3033,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kZSTD: case rocksdb::kZSTD:
uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(), uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), &decompress_size); compressed.size(), &decompress_size);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;

@ -133,147 +133,13 @@ class ZSTDUncompressCachedData {
namespace rocksdb { namespace rocksdb {
// Holds dictionary and related data, like ZSTD's digested dictionary. // Instantiate this class and pass it to the uncompression API below
struct CompressionDict {
enum class Mode {
kUninit,
kEmpty, // An empty one can be used for both compression and uncompression
kCompression,
kUncompression,
};
#if ZSTD_VERSION_NUMBER >= 700
union {
ZSTD_CDict* zstd_cdict_;
ZSTD_DDict* zstd_ddict_;
};
#endif // ZSTD_VERSION_NUMBER >= 700
Mode mode_ = Mode::kUninit;
Slice dict_;
public:
static const CompressionDict& GetEmptyDict() {
static CompressionDict empty_dict{};
static bool init = false;
if (!init) {
empty_dict.Init(Slice() /* dict */, Mode::kEmpty,
false /* use_zstd_trainer */);
init = true;
}
return empty_dict;
}
void Init(Slice dict, Mode mode, CompressionType type, int level = -1) {
return Init(dict, mode, type == kZSTD || type == kZSTDNotFinalCompression,
level);
}
private:
#if ZSTD_VERSION_NUMBER >= 700
void Init(Slice dict, Mode mode, bool use_zstd_trainer, int level = -1) {
#else // ZSTD_VERSION_NUMBER >= 700
void Init(Slice dict, Mode mode, bool /* use_zstd_trainer */,
int /*level*/ = -1) {
#endif // ZSTD_VERSION_NUMBER >= 700
assert(mode_ == Mode::kUninit);
dict_ = std::move(dict);
mode_ = mode;
switch (mode) {
case Mode::kUninit:
assert(false);
break;
case Mode::kEmpty:
break;
case Mode::kCompression:
#if ZSTD_VERSION_NUMBER >= 700
zstd_cdict_ = nullptr;
if (!dict_.empty() && use_zstd_trainer) {
if (level == CompressionOptions::kDefaultCompressionLevel) {
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
// https://github.com/facebook/zstd/issues/1148
level = 3;
}
// Should be safe (but slower) if below call fails as we'll use the
// raw dictionary to compress.
zstd_cdict_ = ZSTD_createCDict(dict_.data(), dict_.size(), level);
assert(zstd_cdict_ != nullptr);
}
#endif // ZSTD_VERSION_NUMBER >= 700
break;
case Mode::kUncompression:
#if ZSTD_VERSION_NUMBER >= 700
zstd_ddict_ = nullptr;
if (!dict_.empty() && use_zstd_trainer) {
zstd_ddict_ = ZSTD_createDDict(dict_.data(), dict_.size());
assert(zstd_ddict_ != nullptr);
}
#endif // ZSTD_VERSION_NUMBER >= 700
break;
}
}
public:
~CompressionDict() {
#if ZSTD_VERSION_NUMBER >= 700
size_t res = 0;
switch (mode_) {
case Mode::kUninit:
break;
case Mode::kEmpty:
break;
case Mode::kCompression:
if (zstd_cdict_ != nullptr) {
res = ZSTD_freeCDict(zstd_cdict_);
}
break;
case Mode::kUncompression:
if (zstd_ddict_ != nullptr) {
res = ZSTD_freeDDict(zstd_ddict_);
}
break;
}
assert(res == 0); // Last I checked they can't fail
(void)res; // prevent unused var warning
#endif // ZSTD_VERSION_NUMBER >= 700
}
#if ZSTD_VERSION_NUMBER >= 700
const ZSTD_CDict* GetDigestedZstdCDict() const {
assert(mode_ != Mode::kUninit);
if (mode_ == Mode::kEmpty) {
return nullptr;
}
assert(mode_ == Mode::kCompression);
return zstd_cdict_;
}
const ZSTD_DDict* GetDigestedZstdDDict() const {
assert(mode_ != Mode::kUninit);
if (mode_ == Mode::kEmpty) {
return nullptr;
}
assert(mode_ == Mode::kUncompression);
return zstd_ddict_;
}
#endif // ZSTD_VERSION_NUMBER >= 700
Slice GetRawDict() const {
assert(mode_ != Mode::kUninit);
assert(mode_ != Mode::kEmpty || dict_.empty());
return dict_;
}
CompressionDict() = default;
// Disable copy/move
CompressionDict(const CompressionDict&) = delete;
CompressionDict& operator=(const CompressionDict&) = delete;
CompressionDict(CompressionDict&&) = delete;
CompressionDict& operator=(CompressionDict&&) = delete;
};
class CompressionContext { class CompressionContext {
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
private: private:
const CompressionType type_; const CompressionType type_;
const CompressionOptions opts_;
Slice dict_;
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
ZSTD_CCtx* zstd_ctx_ = nullptr; ZSTD_CCtx* zstd_ctx_ = nullptr;
void CreateNativeContext() { void CreateNativeContext() {
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) { if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
@ -297,45 +163,35 @@ class CompressionContext {
assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression); assert(type_ == kZSTD || type_ == kZSTDNotFinalCompression);
return zstd_ctx_; return zstd_ctx_;
} }
explicit CompressionContext(CompressionType comp_type) : type_(comp_type) {
CreateNativeContext();
}
#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500) #else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
public:
explicit CompressionContext(CompressionType /* comp_type */) {}
private: private:
void CreateNativeContext() {} void CreateNativeContext() {}
void DestroyNativeContext() {} void DestroyNativeContext() {}
#endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500) #endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
public: public:
explicit CompressionContext(CompressionType comp_type) : type_(comp_type) {
CreateNativeContext();
}
CompressionContext(CompressionType comp_type, const CompressionOptions& opts,
const Slice& comp_dict = Slice())
: type_(comp_type), opts_(opts), dict_(comp_dict) {
CreateNativeContext();
}
~CompressionContext() { DestroyNativeContext(); } ~CompressionContext() { DestroyNativeContext(); }
CompressionContext(const CompressionContext&) = delete; CompressionContext(const CompressionContext&) = delete;
CompressionContext& operator=(const CompressionContext&) = delete; CompressionContext& operator=(const CompressionContext&) = delete;
};
class CompressionInfo {
const CompressionOptions& opts_;
const CompressionContext& context_;
const CompressionDict& dict_;
const CompressionType type_;
public:
CompressionInfo(const CompressionOptions& _opts,
const CompressionContext& _context,
const CompressionDict& _dict, CompressionType _type)
: opts_(_opts), context_(_context), dict_(_dict), type_(_type) {}
const CompressionOptions& options() const { return opts_; } const CompressionOptions& options() const { return opts_; }
const CompressionContext& context() const { return context_; }
const CompressionDict& dict() const { return dict_; }
CompressionType type() const { return type_; } CompressionType type() const { return type_; }
const Slice& dict() const { return dict_; }
Slice& dict() { return dict_; }
}; };
// Instantiate this class and pass it to the uncompression API below
class UncompressionContext { class UncompressionContext {
private: private:
const CompressionType type_; CompressionType type_;
Slice dict_;
CompressionContextCache* ctx_cache_ = nullptr; CompressionContextCache* ctx_cache_ = nullptr;
ZSTDUncompressCachedData uncomp_cached_data_; ZSTDUncompressCachedData uncomp_cached_data_;
@ -343,8 +199,10 @@ class UncompressionContext {
struct NoCache {}; struct NoCache {};
// Do not use context cache, used by TableBuilder // Do not use context cache, used by TableBuilder
UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {} UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {}
explicit UncompressionContext(CompressionType comp_type)
explicit UncompressionContext(CompressionType comp_type) : type_(comp_type) { : UncompressionContext(comp_type, Slice()) {}
UncompressionContext(CompressionType comp_type, const Slice& comp_dict)
: type_(comp_type), dict_(comp_dict) {
if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) { if (type_ == kZSTD || type_ == kZSTDNotFinalCompression) {
ctx_cache_ = CompressionContextCache::Instance(); ctx_cache_ = CompressionContextCache::Instance();
uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData(); uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
@ -364,21 +222,9 @@ class UncompressionContext {
ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const { ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
return uncomp_cached_data_.Get(); return uncomp_cached_data_.Get();
} }
};
class UncompressionInfo {
const UncompressionContext& context_;
const CompressionDict& dict_;
const CompressionType type_;
public:
UncompressionInfo(const UncompressionContext& _context,
const CompressionDict& _dict, CompressionType _type)
: context_(_context), dict_(_dict), type_(_type) {}
const UncompressionContext& context() const { return context_; }
const CompressionDict& dict() const { return dict_; }
CompressionType type() const { return type_; } CompressionType type() const { return type_; }
const Slice& dict() const { return dict_; }
Slice& dict() { return dict_; }
}; };
inline bool Snappy_Supported() { inline bool Snappy_Supported() {
@ -497,8 +343,9 @@ inline std::string CompressionTypeToString(CompressionType compression_type) {
// 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
// start of compressed block. Snappy format is the same as version 1. // start of compressed block. Snappy format is the same as version 1.
inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input, inline bool Snappy_Compress(const CompressionContext& /*ctx*/,
size_t length, ::std::string* output) { const char* input, size_t length,
::std::string* output) {
#ifdef SNAPPY #ifdef SNAPPY
output->resize(snappy::MaxCompressedLength(length)); output->resize(snappy::MaxCompressedLength(length));
size_t outlen; size_t outlen;
@ -563,7 +410,7 @@ inline bool GetDecompressedSizeInfo(const char** input_data,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline bool Zlib_Compress(const CompressionInfo& info, inline bool Zlib_Compress(const CompressionContext& ctx,
uint32_t compress_format_version, const char* input, uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef ZLIB #ifdef ZLIB
@ -588,25 +435,24 @@ inline bool Zlib_Compress(const CompressionInfo& info,
// The default value is 8. See zconf.h for more details. // The default value is 8. See zconf.h for more details.
static const int memLevel = 8; static const int memLevel = 8;
int level; int level;
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
level = Z_DEFAULT_COMPRESSION; level = Z_DEFAULT_COMPRESSION;
} else { } else {
level = info.options().level; level = ctx.options().level;
} }
z_stream _stream; z_stream _stream;
memset(&_stream, 0, sizeof(z_stream)); memset(&_stream, 0, sizeof(z_stream));
int st = deflateInit2(&_stream, level, Z_DEFLATED, info.options().window_bits, int st = deflateInit2(&_stream, level, Z_DEFLATED, ctx.options().window_bits,
memLevel, info.options().strategy); memLevel, ctx.options().strategy);
if (st != Z_OK) { if (st != Z_OK) {
return false; return false;
} }
Slice compression_dict = info.dict().GetRawDict(); if (ctx.dict().size()) {
if (compression_dict.size()) {
// Initialize the compression library's dictionary // Initialize the compression library's dictionary
st = deflateSetDictionary( st = deflateSetDictionary(&_stream,
&_stream, reinterpret_cast<const Bytef*>(compression_dict.data()), reinterpret_cast<const Bytef*>(ctx.dict().data()),
static_cast<unsigned int>(compression_dict.size())); static_cast<unsigned int>(ctx.dict().size()));
if (st != Z_OK) { if (st != Z_OK) {
deflateEnd(&_stream); deflateEnd(&_stream);
return false; return false;
@ -634,7 +480,7 @@ inline bool Zlib_Compress(const CompressionInfo& info,
deflateEnd(&_stream); deflateEnd(&_stream);
return compressed; return compressed;
#else #else
(void)info; (void)ctx;
(void)compress_format_version; (void)compress_format_version;
(void)input; (void)input;
(void)length; (void)length;
@ -649,7 +495,7 @@ inline bool Zlib_Compress(const CompressionInfo& info,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* Zlib_Uncompress(const UncompressionInfo& info, inline char* Zlib_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length, const char* input_data, size_t input_length,
int* decompress_size, int* decompress_size,
uint32_t compress_format_version, uint32_t compress_format_version,
@ -682,12 +528,11 @@ inline char* Zlib_Uncompress(const UncompressionInfo& info,
return nullptr; return nullptr;
} }
Slice compression_dict = info.dict().GetRawDict(); if (ctx.dict().size()) {
if (compression_dict.size()) {
// Initialize the compression library's dictionary // Initialize the compression library's dictionary
st = inflateSetDictionary( st = inflateSetDictionary(&_stream,
&_stream, reinterpret_cast<const Bytef*>(compression_dict.data()), reinterpret_cast<const Bytef*>(ctx.dict().data()),
static_cast<unsigned int>(compression_dict.size())); static_cast<unsigned int>(ctx.dict().size()));
if (st != Z_OK) { if (st != Z_OK) {
return nullptr; return nullptr;
} }
@ -740,7 +585,7 @@ inline char* Zlib_Uncompress(const UncompressionInfo& info,
inflateEnd(&_stream); inflateEnd(&_stream);
return output; return output;
#else #else
(void)info; (void)ctx;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)decompress_size;
@ -754,7 +599,7 @@ inline char* Zlib_Uncompress(const UncompressionInfo& info,
// block header // block header
// compress_format_version == 2 -- decompressed size is included in the block // compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format // header in varint32 format
inline bool BZip2_Compress(const CompressionInfo& /*info*/, inline bool BZip2_Compress(const CompressionContext& /*ctx*/,
uint32_t compress_format_version, const char* input, uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef BZIP2 #ifdef BZIP2
@ -901,7 +746,7 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline bool LZ4_Compress(const CompressionInfo& info, inline bool LZ4_Compress(const CompressionContext& ctx,
uint32_t compress_format_version, const char* input, uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef LZ4 #ifdef LZ4
@ -929,10 +774,9 @@ inline bool LZ4_Compress(const CompressionInfo& info,
int outlen; int outlen;
#if LZ4_VERSION_NUMBER >= 10400 // r124+ #if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_stream_t* stream = LZ4_createStream(); LZ4_stream_t* stream = LZ4_createStream();
Slice compression_dict = info.dict().GetRawDict(); if (ctx.dict().size()) {
if (compression_dict.size()) { LZ4_loadDict(stream, ctx.dict().data(),
LZ4_loadDict(stream, compression_dict.data(), static_cast<int>(ctx.dict().size()));
static_cast<int>(compression_dict.size()));
} }
#if LZ4_VERSION_NUMBER >= 10700 // r129+ #if LZ4_VERSION_NUMBER >= 10700 // r129+
outlen = outlen =
@ -955,7 +799,7 @@ inline bool LZ4_Compress(const CompressionInfo& info,
output->resize(static_cast<size_t>(output_header_len + outlen)); output->resize(static_cast<size_t>(output_header_len + outlen));
return true; return true;
#else // LZ4 #else // LZ4
(void)info; (void)ctx;
(void)compress_format_version; (void)compress_format_version;
(void)input; (void)input;
(void)length; (void)length;
@ -970,7 +814,7 @@ inline bool LZ4_Compress(const CompressionInfo& info,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* LZ4_Uncompress(const UncompressionInfo& info, inline char* LZ4_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length, const char* input_data, size_t input_length,
int* decompress_size, int* decompress_size,
uint32_t compress_format_version) { uint32_t compress_format_version) {
@ -996,10 +840,9 @@ inline char* LZ4_Uncompress(const UncompressionInfo& info,
char* output = new char[output_len]; char* output = new char[output_len];
#if LZ4_VERSION_NUMBER >= 10400 // r124+ #if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
Slice compression_dict = info.dict().GetRawDict(); if (ctx.dict().size()) {
if (compression_dict.size()) { LZ4_setStreamDecode(stream, ctx.dict().data(),
LZ4_setStreamDecode(stream, compression_dict.data(), static_cast<int>(ctx.dict().size()));
static_cast<int>(compression_dict.size()));
} }
*decompress_size = LZ4_decompress_safe_continue( *decompress_size = LZ4_decompress_safe_continue(
stream, input_data, output, static_cast<int>(input_length), stream, input_data, output, static_cast<int>(input_length),
@ -1018,7 +861,7 @@ inline char* LZ4_Uncompress(const UncompressionInfo& info,
assert(*decompress_size == static_cast<int>(output_len)); assert(*decompress_size == static_cast<int>(output_len));
return output; return output;
#else // LZ4 #else // LZ4
(void)info; (void)ctx;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)decompress_size;
@ -1033,7 +876,7 @@ inline char* LZ4_Uncompress(const UncompressionInfo& info,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline bool LZ4HC_Compress(const CompressionInfo& info, inline bool LZ4HC_Compress(const CompressionContext& ctx,
uint32_t compress_format_version, const char* input, uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef LZ4 #ifdef LZ4
@ -1060,18 +903,17 @@ inline bool LZ4HC_Compress(const CompressionInfo& info,
int outlen; int outlen;
int level; int level;
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
level = 0; // lz4hc.h says any value < 1 will be sanitized to default level = 0; // lz4hc.h says any value < 1 will be sanitized to default
} else { } else {
level = info.options().level; level = ctx.options().level;
} }
#if LZ4_VERSION_NUMBER >= 10400 // r124+ #if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamHC_t* stream = LZ4_createStreamHC(); LZ4_streamHC_t* stream = LZ4_createStreamHC();
LZ4_resetStreamHC(stream, level); LZ4_resetStreamHC(stream, level);
Slice compression_dict = info.dict().GetRawDict();
const char* compression_dict_data = const char* compression_dict_data =
compression_dict.size() > 0 ? compression_dict.data() : nullptr; ctx.dict().size() > 0 ? ctx.dict().data() : nullptr;
size_t compression_dict_size = compression_dict.size(); size_t compression_dict_size = ctx.dict().size();
LZ4_loadDictHC(stream, compression_dict_data, LZ4_loadDictHC(stream, compression_dict_data,
static_cast<int>(compression_dict_size)); static_cast<int>(compression_dict_size));
@ -1102,7 +944,7 @@ inline bool LZ4HC_Compress(const CompressionInfo& info,
output->resize(static_cast<size_t>(output_header_len + outlen)); output->resize(static_cast<size_t>(output_header_len + outlen));
return true; return true;
#else // LZ4 #else // LZ4
(void)info; (void)ctx;
(void)compress_format_version; (void)compress_format_version;
(void)input; (void)input;
(void)length; (void)length;
@ -1136,7 +978,9 @@ inline char* XPRESS_Uncompress(const char* /*input_data*/,
} }
#endif #endif
inline bool ZSTD_Compress(const CompressionInfo& info, const char* input, // @param compression_dict Data for presetting the compression library's
// dictionary.
inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef ZSTD #ifdef ZSTD
if (length > std::numeric_limits<uint32_t>::max()) { if (length > std::numeric_limits<uint32_t>::max()) {
@ -1151,29 +995,19 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
output->resize(static_cast<size_t>(output_header_len + compressBound)); output->resize(static_cast<size_t>(output_header_len + compressBound));
size_t outlen = 0; size_t outlen = 0;
int level; int level;
if (info.options().level == CompressionOptions::kDefaultCompressionLevel) { if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
// https://github.com/facebook/zstd/issues/1148 // https://github.com/facebook/zstd/issues/1148
level = 3; level = 3;
} else { } else {
level = info.options().level; level = ctx.options().level;
} }
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_CCtx* context = info.context().ZSTDPreallocCtx(); ZSTD_CCtx* context = ctx.ZSTDPreallocCtx();
assert(context != nullptr); assert(context != nullptr);
#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+ outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
if (info.dict().GetDigestedZstdCDict() != nullptr) { compressBound, input, length,
outlen = ZSTD_compress_usingCDict(context, &(*output)[output_header_len], ctx.dict().data(), ctx.dict().size(), level);
compressBound, input, length,
info.dict().GetDigestedZstdCDict());
}
#endif // ZSTD_VERSION_NUMBER >= 700
if (outlen == 0) {
outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
compressBound, input, length,
info.dict().GetRawDict().data(),
info.dict().GetRawDict().size(), level);
}
#else // up to v0.4.x #else // up to v0.4.x
outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input, outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
length, level); length, level);
@ -1184,7 +1018,7 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
output->resize(output_header_len + outlen); output->resize(output_header_len + outlen);
return true; return true;
#else // ZSTD #else // ZSTD
(void)info; (void)ctx;
(void)input; (void)input;
(void)length; (void)length;
(void)output; (void)output;
@ -1194,7 +1028,7 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* ZSTD_Uncompress(const UncompressionInfo& info, inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
const char* input_data, size_t input_length, const char* input_data, size_t input_length,
int* decompress_size) { int* decompress_size) {
#ifdef ZSTD #ifdef ZSTD
@ -1205,24 +1039,14 @@ inline char* ZSTD_Uncompress(const UncompressionInfo& info,
} }
char* output = new char[output_len]; char* output = new char[output_len];
size_t actual_output_length = 0; size_t actual_output_length;
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_DCtx* context = info.context().GetZSTDContext(); ZSTD_DCtx* context = ctx.GetZSTDContext();
assert(context != nullptr); assert(context != nullptr);
#if ZSTD_VERSION_NUMBER >= 700 // v0.7.0+ actual_output_length = ZSTD_decompress_usingDict(
if (info.dict().GetDigestedZstdDDict() != nullptr) { context, output, output_len, input_data, input_length, ctx.dict().data(),
actual_output_length = ZSTD_decompress_usingDDict( ctx.dict().size());
context, output, output_len, input_data, input_length,
info.dict().GetDigestedZstdDDict());
}
#endif // ZSTD_VERSION_NUMBER >= 700
if (actual_output_length == 0) {
actual_output_length = ZSTD_decompress_usingDict(
context, output, output_len, input_data, input_length,
info.dict().GetRawDict().data(), info.dict().GetRawDict().size());
}
#else // up to v0.4.x #else // up to v0.4.x
(void) info;
actual_output_length = actual_output_length =
ZSTD_decompress(output, output_len, input_data, input_length); ZSTD_decompress(output, output_len, input_data, input_length);
#endif // ZSTD_VERSION_NUMBER >= 500 #endif // ZSTD_VERSION_NUMBER >= 500
@ -1230,7 +1054,7 @@ inline char* ZSTD_Uncompress(const UncompressionInfo& info,
*decompress_size = static_cast<int>(actual_output_length); *decompress_size = static_cast<int>(actual_output_length);
return output; return output;
#else // ZSTD #else // ZSTD
(void)info; (void)ctx;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)decompress_size;
@ -1245,10 +1069,6 @@ inline std::string ZSTD_TrainDictionary(const std::string& samples,
// available for dynamic linking until v1.1.3. For now we enable the feature // available for dynamic linking until v1.1.3. For now we enable the feature
// in v1.1.3+ only. // in v1.1.3+ only.
#if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+ #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
assert(samples.empty() == sample_lens.empty());
if (samples.empty()) {
return "";
}
std::string dict_data(max_dict_bytes, '\0'); std::string dict_data(max_dict_bytes, '\0');
size_t dict_len = ZDICT_trainFromBuffer( size_t dict_len = ZDICT_trainFromBuffer(
&dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0], &dict_data[0], max_dict_bytes, &samples[0], &sample_lens[0],

@ -738,11 +738,9 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
return raw; return raw;
} }
StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS); StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
CompressionType type = bdb_options_.compression; CompressionType ct = bdb_options_.compression;
CompressionOptions opts; CompressionContext compression_ctx(ct);
CompressionContext context(type); CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat,
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type);
CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat,
compression_output); compression_output);
return *compression_output; return *compression_output;
} }
@ -1075,11 +1073,9 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
{ {
StopWatch decompression_sw(env_, statistics_, StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS); BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext context(bfile->compression()); UncompressionContext uncompression_ctx(bfile->compression());
UncompressionInfo info(context, CompressionDict::GetEmptyDict(),
bfile->compression());
s = UncompressBlockContentsForCompressionType( s = UncompressBlockContentsForCompressionType(
info, blob_value.data(), blob_value.size(), &contents, uncompression_ctx, blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
} }
value->PinSelf(contents.data); value->PinSelf(contents.data);

@ -208,12 +208,10 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
if (compression != kNoCompression && if (compression != kNoCompression &&
(show_uncompressed_blob != DisplayType::kNone || show_summary)) { (show_uncompressed_blob != DisplayType::kNone || show_summary)) {
BlockContents contents; BlockContents contents;
UncompressionContext context(compression); UncompressionContext uncompression_ctx(compression);
UncompressionInfo info(context, CompressionDict::GetEmptyDict(),
compression);
s = UncompressBlockContentsForCompressionType( s = UncompressBlockContentsForCompressionType(
info, slice.data() + key_size, static_cast<size_t>(value_size), &contents, uncompression_ctx, slice.data() + key_size, static_cast<size_t>(value_size),
2 /*compress_format_version*/, ImmutableCFOptions(Options())); &contents, 2 /*compress_format_version*/, ImmutableCFOptions(Options()));
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

@ -89,9 +89,8 @@ void ColumnAwareEncodingReader::DecodeBlocks(
CompressionType type = CompressionType type =
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
if (type != kNoCompression) { if (type != kNoCompression) {
UncompressionContext context(type); UncompressionContext uncompression_ctx(type);
UncompressionInfo info(context, CompressionDict::GetEmptyDict(), type); UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(),
UncompressBlockContents(info, slice_final_with_bit.c_str(),
slice_final_with_bit.size() - 1, &contents, slice_final_with_bit.size() - 1, &contents,
format_version, ioptions); format_version, ioptions);
content_ptr = contents.data.data(); content_ptr = contents.data.data();
@ -172,9 +171,8 @@ void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat(
CompressionType type = CompressionType type =
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
if (type != kNoCompression) { if (type != kNoCompression) {
UncompressionContext context(type); UncompressionContext uncompression_ctx(type);
UncompressionInfo info(context, CompressionDict::GetEmptyDict(), type); UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(),
UncompressBlockContents(info, slice_final_with_bit.c_str(),
slice_final_with_bit.size() - 1, &contents, slice_final_with_bit.size() - 1, &contents,
format_version, ioptions); format_version, ioptions);
decoded_content = std::string(contents.data.data(), contents.data.size()); decoded_content = std::string(contents.data.data(), contents.data.size());
@ -245,12 +243,10 @@ namespace {
void CompressDataBlock(const std::string& output_content, Slice* slice_final, void CompressDataBlock(const std::string& output_content, Slice* slice_final,
CompressionType* type, std::string* compressed_output) { CompressionType* type, std::string* compressed_output) {
CompressionContext context(*type); CompressionContext compression_ctx(*type);
CompressionOptions opts;
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), *type);
uint32_t format_version = 2; // hard-coded version uint32_t format_version = 2; // hard-coded version
*slice_final = CompressBlock(output_content, info, type, format_version, *slice_final = CompressBlock(output_content, compression_ctx, type,
compressed_output); format_version, compressed_output);
} }
} // namespace } // namespace

Loading…
Cancel
Save