diff --git a/HISTORY.md b/HISTORY.md index 999f99087..261ce4ef4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,7 @@ # Rocksdb Change Log ## Unreleased ### Public API Change +* Allow preset compression dictionary for improved compression of block-based tables. This is supported for zlib, zstd, and lz4. The compression dictionary's size is configurable via CompressionOptions::max_dict_bytes. * Delete deprecated classes for creating backups (BackupableDB) and restoring from backups (RestoreBackupableDB). Now, BackupEngine should be used for creating backups, and BackupEngineReadOnly should be used for restorations. For more details, see https://github.com/facebook/rocksdb/wiki/How-to-backup-RocksDB%3F * Expose estimate of per-level compression ratio via DB property: "rocksdb.compression-ratio-at-levelN". diff --git a/db/builder.cc b/db/builder.cc index 9c98c0db7..ceb765862 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -43,14 +43,15 @@ TableBuilder* NewTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, const bool skip_filters) { + const CompressionOptions& compression_opts, + const std::string* compression_dict, const bool skip_filters) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); return ioptions.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, internal_comparator, int_tbl_prop_collector_factories, compression_type, - compression_opts, skip_filters, + compression_opts, compression_dict, skip_filters, column_family_name), column_family_id, file); } diff --git a/db/builder.h b/db/builder.h index bf54aed5d..01353ed8d 100644 --- a/db/builder.h +++ b/db/builder.h @@ -36,6 +36,8 @@ class InternalIterator; // @param column_family_name Name of the column family that is also identified // by column_family_id, or empty string if unknown. It must outlive the // TableBuilder returned by this function. +// @param compression_dict Data for presetting the compression library's +// dictionary, or nullptr. TableBuilder* NewTableBuilder( const ImmutableCFOptions& options, const InternalKeyComparator& internal_comparator, @@ -44,6 +46,7 @@ TableBuilder* NewTableBuilder( uint32_t column_family_id, const std::string& column_family_name, WritableFileWriter* file, const CompressionType compression_type, const CompressionOptions& compression_opts, + const std::string* compression_dict = nullptr, const bool skip_filters = false); // Build a Table file from the contents of *iter. The generated file diff --git a/db/c.cc b/db/c.cc index f1868a002..00141da18 100644 --- a/db/c.cc +++ b/db/c.cc @@ -1572,11 +1572,13 @@ void rocksdb_options_set_compression_per_level(rocksdb_options_t* opt, } } -void rocksdb_options_set_compression_options( - rocksdb_options_t* opt, int w_bits, int level, int strategy) { +void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits, + int level, int strategy, + size_t max_dict_bytes) { opt->rep.compression_opts.window_bits = w_bits; opt->rep.compression_opts.level = level; opt->rep.compression_opts.strategy = strategy; + opt->rep.compression_opts.max_dict_bytes = max_dict_bytes; } void rocksdb_options_set_prefix_extractor( diff --git a/db/c_test.c b/db/c_test.c index e8b031114..7236e01db 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -322,7 +322,7 @@ int main(int argc, char** argv) { rocksdb_options_set_block_based_table_factory(options, table_options); rocksdb_options_set_compression(options, rocksdb_no_compression); - rocksdb_options_set_compression_options(options, -14, -1, 0); + rocksdb_options_set_compression_options(options, -14, -1, 0, 0); int compression_levels[] = {rocksdb_no_compression, rocksdb_no_compression, rocksdb_no_compression, rocksdb_no_compression}; rocksdb_options_set_compression_per_level(options, compression_levels, 4); diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 527f7408d..f8f471b58 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -16,12 +16,13 @@ #include #include #include -#include -#include #include +#include +#include #include #include #include +#include #include "db/builder.h" #include "db/db_iter.h" @@ -111,6 +112,7 @@ struct CompactionJob::SubcompactionState { uint64_t overlapped_bytes = 0; // A flag determine whether the key has been seen in ShouldStopBefore() bool seen_key = false; + std::string compression_dict; SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size = 0) @@ -125,7 +127,8 @@ struct CompactionJob::SubcompactionState { approx_size(size), grandparent_index(0), overlapped_bytes(0), - seen_key(false) { + seen_key(false), + compression_dict() { assert(compaction != nullptr); } @@ -147,6 +150,7 @@ struct CompactionJob::SubcompactionState { grandparent_index = std::move(o.grandparent_index); overlapped_bytes = std::move(o.overlapped_bytes); seen_key = std::move(o.seen_key); + compression_dict = std::move(o.compression_dict); return *this; } @@ -665,6 +669,30 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { } ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); + + // To build compression dictionary, we sample the first output file, assuming + // it'll reach the maximum length, and then use the dictionary for compressing + // subsequent output files. The dictionary may be less than max_dict_bytes if + // the first output file's length is less than the maximum. + const int kSampleLenShift = 6; // 2^6 = 64-byte samples + std::set sample_begin_offsets; + if (bottommost_level_ && + cfd->ioptions()->compression_opts.max_dict_bytes > 0) { + const size_t kMaxSamples = + cfd->ioptions()->compression_opts.max_dict_bytes >> kSampleLenShift; + const size_t kOutFileLen = + cfd->GetCurrentMutableCFOptions()->MaxFileSizeForLevel( + compact_->compaction->output_level()); + if (kOutFileLen != port::kMaxSizet) { + const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift; + Random64 generator{versions_->NewFileNumber()}; + for (size_t i = 0; i < kMaxSamples; ++i) { + sample_begin_offsets.insert(generator.Uniform(kOutFileNumSamples) + << kSampleLenShift); + } + } + } + auto compaction_filter = cfd->ioptions()->compaction_filter; std::unique_ptr compaction_filter_from_factory = nullptr; if (compaction_filter == nullptr) { @@ -700,6 +728,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { auto c_iter = sub_compact->c_iter.get(); c_iter->SeekToFirst(); const auto& c_iter_stats = c_iter->iter_stats(); + auto sample_begin_offset_iter = sample_begin_offsets.cbegin(); + // data_begin_offset and compression_dict are only valid while generating + // dictionary from the first output file. + size_t data_begin_offset = 0; + std::string compression_dict; + compression_dict.reserve(cfd->ioptions()->compression_opts.max_dict_bytes); + // TODO(noetzli): check whether we could check !shutting_down_->... only // only occasionally (see diff D42687) while (status.ok() && !shutting_down_->load(std::memory_order_acquire) && @@ -743,6 +778,55 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { key, c_iter->ikey().sequence); sub_compact->num_output_records++; + if (sub_compact->outputs.size() == 1) { // first output file + // Check if this key/value overlaps any sample intervals; if so, appends + // overlapping portions to the dictionary. + for (const auto& data_elmt : {key, value}) { + size_t data_end_offset = data_begin_offset + data_elmt.size(); + while (sample_begin_offset_iter != sample_begin_offsets.cend() && + *sample_begin_offset_iter < data_end_offset) { + size_t sample_end_offset = + *sample_begin_offset_iter + (1 << kSampleLenShift); + // Invariant: Because we advance sample iterator while processing the + // data_elmt containing the sample's last byte, the current sample + // cannot end before the current data_elmt. + assert(data_begin_offset < sample_end_offset); + + size_t data_elmt_copy_offset, data_elmt_copy_len; + if (*sample_begin_offset_iter <= data_begin_offset) { + // The sample starts before data_elmt starts, so take bytes starting + // at the beginning of data_elmt. + data_elmt_copy_offset = 0; + } else { + // data_elmt starts before the sample starts, so take bytes starting + // at the below offset into data_elmt. + data_elmt_copy_offset = + *sample_begin_offset_iter - data_begin_offset; + } + if (sample_end_offset <= data_end_offset) { + // The sample ends before data_elmt ends, so take as many bytes as + // needed. + data_elmt_copy_len = + sample_end_offset - (data_begin_offset + data_elmt_copy_offset); + } else { + // data_elmt ends before the sample ends, so take all remaining + // bytes in data_elmt. + data_elmt_copy_len = + data_end_offset - (data_begin_offset + data_elmt_copy_offset); + } + compression_dict.append(&data_elmt.data()[data_elmt_copy_offset], + data_elmt_copy_len); + if (sample_end_offset > data_end_offset) { + // Didn't finish sample. Try to finish it with the next data_elmt. + break; + } + // Next sample may require bytes from same data_elmt. + sample_begin_offset_iter++; + } + data_begin_offset = data_end_offset; + } + } + // Close output file if it is big enough // TODO(aekmekji): determine if file should be closed earlier than this // during subcompactions (i.e. if output size, estimated by input size, is @@ -751,8 +835,12 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { if (sub_compact->builder->FileSize() >= sub_compact->compaction->max_output_file_size()) { status = FinishCompactionOutputFile(input->status(), sub_compact); + if (sub_compact->outputs.size() == 1) { + // Use dictionary from first output file for compression of subsequent + // files. + sub_compact->compression_dict = std::move(compression_dict); + } } - c_iter->Next(); } @@ -1020,7 +1108,8 @@ Status CompactionJob::OpenCompactionOutputFile( *cfd->ioptions(), cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), sub_compact->compaction->output_compression(), - cfd->ioptions()->compression_opts, skip_filters)); + cfd->ioptions()->compression_opts, &sub_compact->compression_dict, + skip_filters)); LogFlush(db_options_.info_log); return s; } diff --git a/db/db_test.cc b/db/db_test.cc index e9acfa7e1..2b1eab3b5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11,7 +11,6 @@ // in Release build. // which is a pity, it is a good test #include -#include #include #include #include diff --git a/db/db_test2.cc b/db/db_test2.cc index 32487b1c4..641cf3c27 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -838,6 +838,98 @@ TEST_P(PinL0IndexAndFilterBlocksTest, INSTANTIATE_TEST_CASE_P(PinL0IndexAndFilterBlocksTest, PinL0IndexAndFilterBlocksTest, ::testing::Bool()); + +TEST_F(DBTest2, PresetCompressionDict) { + const size_t kBlockSizeBytes = 4 << 10; + const size_t kL0FileBytes = 128 << 10; + const size_t kApproxPerBlockOverheadBytes = 50; + const int kNumL0Files = 5; + + Options options; + options.arena_block_size = kBlockSizeBytes; + options.compaction_style = kCompactionStyleUniversal; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.level0_file_num_compaction_trigger = kNumL0Files; + options.memtable_factory.reset( + new SpecialSkipListFactory(kL0FileBytes / kBlockSizeBytes)); + options.num_levels = 2; + options.target_file_size_base = kL0FileBytes; + options.target_file_size_multiplier = 2; + options.write_buffer_size = kL0FileBytes; + BlockBasedTableOptions table_options; + table_options.block_size = kBlockSizeBytes; + std::vector compression_types; + if (Zlib_Supported()) { + compression_types.push_back(kZlibCompression); + } +#if LZ4_VERSION_NUMBER >= 10400 // r124+ + compression_types.push_back(kLZ4Compression); + compression_types.push_back(kLZ4HCCompression); +#endif // LZ4_VERSION_NUMBER >= 10400 +#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ + compression_types.push_back(kZSTDNotFinalCompression); +#endif // ZSTD_VERSION_NUMBER >= 500 + + for (auto compression_type : compression_types) { + options.compression = compression_type; + size_t prev_out_bytes; + for (int i = 0; i < 2; ++i) { + // First iteration: compress without preset dictionary + // Second iteration: compress with preset dictionary + // To make sure the compression dictionary was actually used, we verify + // the compressed size is smaller in the second iteration. Also in the + // second iteration, verify the data we get out is the same data we put + // in. + if (i) { + options.compression_opts.max_dict_bytes = kBlockSizeBytes; + } else { + options.compression_opts.max_dict_bytes = 0; + } + + options.statistics = rocksdb::CreateDBStatistics(); + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + CreateAndReopenWithCF({"pikachu"}, options); + Random rnd(301); + std::string seq_data = + RandomString(&rnd, kBlockSizeBytes - kApproxPerBlockOverheadBytes); + + ASSERT_EQ(0, NumTableFilesAtLevel(0, 1)); + for (int j = 0; j < kNumL0Files; ++j) { + for (size_t k = 0; k < kL0FileBytes / kBlockSizeBytes + 1; ++k) { + ASSERT_OK(Put(1, Key(static_cast( + j * (kL0FileBytes / kBlockSizeBytes) + k)), + seq_data)); + } + dbfull()->TEST_WaitForFlushMemTable(handles_[1]); + ASSERT_EQ(j + 1, NumTableFilesAtLevel(0, 1)); + } + db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); + ASSERT_EQ(0, NumTableFilesAtLevel(0, 1)); + ASSERT_GT(NumTableFilesAtLevel(1, 1), 0); + + size_t out_bytes = 0; + std::vector files; + GetSstFiles(dbname_, &files); + for (const auto& file : files) { + size_t curr_bytes; + env_->GetFileSize(dbname_ + "/" + file, &curr_bytes); + out_bytes += curr_bytes; + } + + for (size_t j = 0; j < kNumL0Files * (kL0FileBytes / kBlockSizeBytes); + j++) { + ASSERT_EQ(seq_data, Get(1, Key(static_cast(j)))); + } + if (i) { + ASSERT_GT(prev_out_bytes, out_bytes); + } + prev_out_bytes = out_bytes; + DestroyAndReopen(options); + } + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 8bd462464..b61c892d9 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -538,7 +538,7 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_open_files( extern ROCKSDB_LIBRARY_API void rocksdb_options_set_max_total_wal_size( rocksdb_options_t* opt, uint64_t n); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_compression_options( - rocksdb_options_t*, int, int, int); + rocksdb_options_t*, int, int, int, size_t); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_prefix_extractor( rocksdb_options_t*, rocksdb_slicetransform_t*); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_num_levels( diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index dd9e4fc09..754653c37 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -137,9 +137,23 @@ struct CompressionOptions { int window_bits; int level; int strategy; - CompressionOptions() : window_bits(-14), level(-1), strategy(0) {} - CompressionOptions(int wbits, int _lev, int _strategy) - : window_bits(wbits), level(_lev), strategy(_strategy) {} + // Maximum size of dictionary used to prime the compression library. Currently + // this dictionary will be constructed by sampling the first output file in a + // subcompaction when the target level is bottommost. This dictionary will be + // loaded into the compression library before compressing/uncompressing each + // data block of subsequent files in the subcompaction. Effectively, this + // improves compression ratios when there are repetitions across data blocks. + // A value of 0 indicates the feature is disabled. + // Default: 0. + uint32_t max_dict_bytes; + + CompressionOptions() + : window_bits(-14), level(-1), strategy(0), max_dict_bytes(0) {} + CompressionOptions(int wbits, int _lev, int _strategy, size_t _max_dict_bytes) + : window_bits(wbits), + level(_lev), + strategy(_strategy), + max_dict_bytes(_max_dict_bytes) {} }; enum UpdateStatus { // Return status For inplace update callback diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 3f78cac0f..db7dede60 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -47,6 +47,7 @@ struct TablePropertiesNames { }; extern const std::string kPropertiesBlock; +extern const std::string kCompressionDictBlock; enum EntryType { kEntryPut, diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 141eb1307..32aabcf9a 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -316,6 +316,7 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { Slice CompressBlock(const Slice& raw, const CompressionOptions& compression_options, CompressionType* type, uint32_t format_version, + const Slice& compression_dict, std::string* compressed_output) { if (*type == kNoCompression) { return raw; @@ -335,7 +336,7 @@ Slice CompressBlock(const Slice& raw, if (Zlib_Compress( compression_options, GetCompressFormatForVersion(kZlibCompression, format_version), - raw.data(), raw.size(), compressed_output) && + raw.data(), raw.size(), compressed_output, compression_dict) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } @@ -353,7 +354,7 @@ Slice CompressBlock(const Slice& raw, if (LZ4_Compress( compression_options, GetCompressFormatForVersion(kLZ4Compression, format_version), - raw.data(), raw.size(), compressed_output) && + raw.data(), raw.size(), compressed_output, compression_dict) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } @@ -362,7 +363,7 @@ Slice CompressBlock(const Slice& raw, if (LZ4HC_Compress( compression_options, GetCompressFormatForVersion(kLZ4HCCompression, format_version), - raw.data(), raw.size(), compressed_output) && + raw.data(), raw.size(), compressed_output, compression_dict) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } @@ -376,7 +377,7 @@ Slice CompressBlock(const Slice& raw, break; case kZSTDNotFinalCompression: if (ZSTD_Compress(compression_options, raw.data(), raw.size(), - compressed_output) && + compressed_output, compression_dict) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } @@ -469,6 +470,8 @@ struct BlockBasedTableBuilder::Rep { std::string last_key; const CompressionType compression_type; const CompressionOptions compression_opts; + // Data for presetting the compression library's dictionary, or nullptr. + const std::string* compression_dict; TableProperties props; bool closed = false; // Either Finish() or Abandon() has been called. @@ -492,7 +495,8 @@ struct BlockBasedTableBuilder::Rep { int_tbl_prop_collector_factories, uint32_t _column_family_id, WritableFileWriter* f, const CompressionType _compression_type, - const CompressionOptions& _compression_opts, const bool skip_filters, + const CompressionOptions& _compression_opts, + const std::string* _compression_dict, const bool skip_filters, const std::string& _column_family_name) : ioptions(_ioptions), table_options(table_opt), @@ -507,6 +511,7 @@ struct BlockBasedTableBuilder::Rep { table_options.index_block_restart_interval)), compression_type(_compression_type), compression_opts(_compression_opts), + compression_dict(_compression_dict), filter_block(skip_filters ? nullptr : CreateFilterBlockBuilder( _ioptions, table_options)), flush_block_policy( @@ -533,7 +538,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, const bool skip_filters, + const CompressionOptions& compression_opts, + const std::string* compression_dict, const bool skip_filters, const std::string& column_family_name) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && @@ -548,8 +554,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, file, - compression_type, compression_opts, skip_filters, - column_family_name); + compression_type, compression_opts, compression_dict, + skip_filters, column_family_name); if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); @@ -614,7 +620,7 @@ void BlockBasedTableBuilder::Flush() { assert(!r->closed); if (!ok()) return; if (r->data_block.empty()) return; - WriteBlock(&r->data_block, &r->pending_handle); + WriteBlock(&r->data_block, &r->pending_handle, true /* is_data_block */); if (ok() && !r->table_options.skip_table_builder_flush) { r->status = r->file->Flush(); } @@ -626,13 +632,15 @@ void BlockBasedTableBuilder::Flush() { } void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, - BlockHandle* handle) { - WriteBlock(block->Finish(), handle); + BlockHandle* handle, + bool is_data_block) { + WriteBlock(block->Finish(), handle, is_data_block); block->Reset(); } void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, - BlockHandle* handle) { + BlockHandle* handle, + bool is_data_block) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 @@ -643,9 +651,13 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, auto type = r->compression_type; Slice block_contents; if (raw_block_contents.size() < kCompressionSizeLimit) { - block_contents = - CompressBlock(raw_block_contents, r->compression_opts, &type, - r->table_options.format_version, &r->compressed_output); + Slice compression_dict; + if (is_data_block && r->compression_dict && r->compression_dict->size()) { + compression_dict = *r->compression_dict; + } + block_contents = CompressBlock(raw_block_contents, r->compression_opts, + &type, r->table_options.format_version, + compression_dict, &r->compressed_output); } else { RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); type = kNoCompression; @@ -753,7 +765,8 @@ Status BlockBasedTableBuilder::Finish() { assert(!r->closed); r->closed = true; - BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; + BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle, + compression_dict_block_handle; // Write filter block if (ok() && r->filter_block != nullptr) { auto filter_contents = r->filter_block->Finish(); @@ -784,7 +797,7 @@ Status BlockBasedTableBuilder::Finish() { MetaIndexBuilder meta_index_builder; for (const auto& item : index_blocks.meta_blocks) { BlockHandle block_handle; - WriteBlock(item.second, &block_handle); + WriteBlock(item.second, &block_handle, false /* is_data_block */); meta_index_builder.Add(item.first, block_handle); } @@ -802,7 +815,7 @@ Status BlockBasedTableBuilder::Finish() { meta_index_builder.Add(key, filter_block_handle); } - // Write properties block. + // Write properties and compression dictionary blocks. { PropertyBlockBuilder property_block_builder; r->props.column_family_id = r->column_family_id; @@ -845,9 +858,16 @@ Status BlockBasedTableBuilder::Finish() { kNoCompression, &properties_block_handle ); - meta_index_builder.Add(kPropertiesBlock, properties_block_handle); - } // end of properties block writing + + // Write compression dictionary block + if (r->compression_dict && r->compression_dict->size()) { + WriteRawBlock(*r->compression_dict, kNoCompression, + &compression_dict_block_handle); + meta_index_builder.Add(kCompressionDictBlock, + compression_dict_block_handle); + } + } // end of properties/compression dictionary block writing } // meta blocks // Write index block @@ -855,7 +875,8 @@ Status BlockBasedTableBuilder::Finish() { // flush the meta index block WriteRawBlock(meta_index_builder.Finish(), kNoCompression, &metaindex_block_handle); - WriteBlock(index_blocks.index_block_contents, &index_block_handle); + WriteBlock(index_blocks.index_block_contents, &index_block_handle, + false /* is_data_block */); } // Write footer diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index fed67783e..8172c238e 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -34,6 +34,8 @@ class BlockBasedTableBuilder : public TableBuilder { // Create a builder that will store the contents of the table it is // building in *file. Does not close the file. It is up to the // caller to close the file after calling Finish(). + // @param compression_dict Data for presetting the compression library's + // dictionary, or nullptr. BlockBasedTableBuilder( const ImmutableCFOptions& ioptions, const BlockBasedTableOptions& table_options, @@ -42,7 +44,8 @@ class BlockBasedTableBuilder : public TableBuilder { int_tbl_prop_collector_factories, uint32_t column_family_id, WritableFileWriter* file, const CompressionType compression_type, - const CompressionOptions& compression_opts, const bool skip_filters, + const CompressionOptions& compression_opts, + const std::string* compression_dict, const bool skip_filters, const std::string& column_family_name); // REQUIRES: Either Finish() or Abandon() has been called. @@ -82,11 +85,14 @@ class BlockBasedTableBuilder : public TableBuilder { private: bool ok() const { return status().ok(); } + // Call block's Finish() method and then write the finalize block contents to // file. - void WriteBlock(BlockBuilder* block, BlockHandle* handle); + void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block); + // Directly write block content to the file. - void WriteBlock(const Slice& block_contents, BlockHandle* handle); + void WriteBlock(const Slice& block_contents, BlockHandle* handle, + bool is_data_block); void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); Status InsertBlockInCache(const Slice& block_contents, const CompressionType type, diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index ddd837b83..333da56ab 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -76,6 +76,7 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( table_builder_options.int_tbl_prop_collector_factories, column_family_id, file, table_builder_options.compression_type, table_builder_options.compression_opts, + table_builder_options.compression_dict, table_builder_options.skip_filters, table_builder_options.column_family_name); diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index a9a86fc34..849813c41 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -64,13 +64,16 @@ const size_t kMaxCacheKeyPrefixSize __attribute__((unused)) = // The only relevant option is options.verify_checksums for now. // On failure return non-OK. // On success fill *result and return OK - caller owns *result +// @param compression_dict Data for presetting the compression library's +// dictionary. Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer, const ReadOptions& options, const BlockHandle& handle, std::unique_ptr* result, Env* env, - bool do_uncompress = true) { + bool do_uncompress = true, + const Slice& compression_dict = Slice()) { BlockContents contents; Status s = ReadBlockContents(file, footer, options, handle, &contents, env, - do_uncompress); + do_uncompress, compression_dict); if (s.ok()) { result->reset(new Block(std::move(contents))); } @@ -407,6 +410,11 @@ struct BlockBasedTable::Rep { BlockHandle filter_handle; std::shared_ptr table_properties; + // Block containing the data for the compression dictionary. We take ownership + // for the entire block struct, even though we only use its Slice member. This + // is easier because the Slice member depends on the continued existence of + // another member ("allocation"). + std::unique_ptr compression_dict_block; BlockBasedTableOptions::IndexType index_type; bool hash_index_allow_collision; bool whole_key_filtering; @@ -585,6 +593,31 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, "Cannot find Properties block from file."); } + // Read the compression dictionary meta block + bool found_compression_dict; + s = SeekToCompressionDictBlock(meta_iter.get(), &found_compression_dict); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log, + "Cannot seek to compression dictionary block from file: %s", + s.ToString().c_str()); + } else if (found_compression_dict) { + // TODO(andrewkr): Add to block cache if cache_index_and_filter_blocks is + // true. + unique_ptr compression_dict_block{new BlockContents()}; + s = rocksdb::ReadMetaBlock(rep->file.get(), file_size, + kBlockBasedTableMagicNumber, rep->ioptions.env, + rocksdb::kCompressionDictBlock, + compression_dict_block.get()); + if (!s.ok()) { + Log(InfoLogLevel::WARN_LEVEL, rep->ioptions.info_log, + "Encountered error while reading data from compression dictionary " + "block %s", + s.ToString().c_str()); + } else { + rep->compression_dict_block = std::move(compression_dict_block); + } + } + // Determine whether whole key filtering is supported. if (rep->table_properties) { rep->whole_key_filtering &= @@ -727,7 +760,8 @@ Status BlockBasedTable::GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, const ReadOptions& read_options, - BlockBasedTable::CachableEntry* block, uint32_t format_version) { + BlockBasedTable::CachableEntry* block, uint32_t format_version, + const Slice& compression_dict) { Status s; Block* compressed_block = nullptr; Cache::Handle* block_cache_compressed_handle = nullptr; @@ -771,7 +805,7 @@ Status BlockBasedTable::GetDataBlockFromCache( BlockContents contents; s = UncompressBlockContents(compressed_block->data(), compressed_block->size(), &contents, - format_version); + format_version, compression_dict); // Insert uncompressed block into block cache if (s.ok()) { @@ -801,7 +835,8 @@ Status BlockBasedTable::PutDataBlockToCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, const ReadOptions& read_options, Statistics* statistics, - CachableEntry* block, Block* raw_block, uint32_t format_version) { + CachableEntry* block, Block* raw_block, uint32_t format_version, + const Slice& compression_dict) { assert(raw_block->compression_type() == kNoCompression || block_cache_compressed != nullptr); @@ -810,7 +845,7 @@ Status BlockBasedTable::PutDataBlockToCache( BlockContents contents; if (raw_block->compression_type() != kNoCompression) { s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents, - format_version); + format_version, compression_dict); } if (!s.ok()) { delete raw_block; @@ -1078,6 +1113,10 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } } + Slice compression_dict; + if (rep->compression_dict_block) { + compression_dict = rep->compression_dict_block->data; + } // If either block cache is enabled, we'll try to read from it. if (block_cache != nullptr || block_cache_compressed != nullptr) { Statistics* statistics = rep->ioptions.statistics; @@ -1098,9 +1137,9 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( compressed_cache_key); } - s = GetDataBlockFromCache(key, ckey, block_cache, block_cache_compressed, - statistics, ro, &block, - rep->table_options.format_version); + s = GetDataBlockFromCache( + key, ckey, block_cache, block_cache_compressed, statistics, ro, &block, + rep->table_options.format_version, compression_dict); if (block.value == nullptr && !no_io && ro.fill_cache) { std::unique_ptr raw_block; @@ -1108,13 +1147,15 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS); s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions.env, - block_cache_compressed == nullptr); + block_cache_compressed == nullptr, + compression_dict); } if (s.ok()) { s = PutDataBlockToCache(key, ckey, block_cache, block_cache_compressed, ro, statistics, &block, raw_block.release(), - rep->table_options.format_version); + rep->table_options.format_version, + compression_dict); } } } @@ -1132,7 +1173,8 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } std::unique_ptr block_value; s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, - &block_value, rep->ioptions.env); + &block_value, rep->ioptions.env, + true /* do_uncompress */, compression_dict); if (s.ok()) { block.value = block_value.release(); } @@ -1454,8 +1496,10 @@ bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, Slice ckey; s = GetDataBlockFromCache(cache_key, ckey, block_cache, nullptr, nullptr, - options, &block, - rep_->table_options.format_version); + options, &block, rep_->table_options.format_version, + rep_->compression_dict_block + ? rep_->compression_dict_block->data + : Slice()); assert(s.ok()); bool in_cache = block.value != nullptr; if (in_cache) { @@ -1605,6 +1649,10 @@ Status BlockBasedTable::DumpTable(WritableFile* out_file) { out_file->Append(" Properties block handle: "); out_file->Append(meta_iter->value().ToString(true).c_str()); out_file->Append("\n"); + } else if (meta_iter->key() == rocksdb::kCompressionDictBlock) { + out_file->Append(" Compression dictionary block handle: "); + out_file->Append(meta_iter->value().ToString(true).c_str()); + out_file->Append("\n"); } else if (strstr(meta_iter->key().ToString().c_str(), "filter.rocksdb.") != nullptr) { out_file->Append(" Filter block handle: "); diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 6a88d9d9a..3e1feaa8d 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -165,11 +165,15 @@ class BlockBasedTable : public TableReader { // block_cache_compressed. // On success, Status::OK with be returned and @block will be populated with // pointer to the block as well as its block handle. + // @param compression_dict Data for presetting the compression library's + // dictionary. static Status GetDataBlockFromCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, Statistics* statistics, const ReadOptions& read_options, - BlockBasedTable::CachableEntry* block, uint32_t format_version); + BlockBasedTable::CachableEntry* block, uint32_t format_version, + const Slice& compression_dict); + // Put a raw block (maybe compressed) to the corresponding block caches. // This method will perform decompression against raw_block if needed and then // populate the block caches. @@ -178,11 +182,14 @@ class BlockBasedTable : public TableReader { // // REQUIRES: raw_block is heap-allocated. PutDataBlockToCache() will be // responsible for releasing its memory if error occurs. + // @param compression_dict Data for presetting the compression library's + // dictionary. static Status PutDataBlockToCache( const Slice& block_cache_key, const Slice& compressed_block_cache_key, Cache* block_cache, Cache* block_cache_compressed, const ReadOptions& read_options, Statistics* statistics, - CachableEntry* block, Block* raw_block, uint32_t format_version); + CachableEntry* block, Block* raw_block, uint32_t format_version, + const Slice& compression_dict); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. diff --git a/table/format.cc b/table/format.cc index e5a3c0c28..628e08af1 100644 --- a/table/format.cc +++ b/table/format.cc @@ -296,7 +296,8 @@ Status ReadBlock(RandomAccessFileReader* file, const Footer& footer, Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, const ReadOptions& options, const BlockHandle& handle, BlockContents* contents, Env* env, - bool decompression_requested) { + bool decompression_requested, + const Slice& compression_dict) { Status status; Slice slice; size_t n = static_cast(handle.size()); @@ -326,7 +327,8 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, compression_type = static_cast(slice.data()[n]); if (decompression_requested && compression_type != kNoCompression) { - return UncompressBlockContents(slice.data(), n, contents, footer.version()); + return UncompressBlockContents(slice.data(), n, contents, footer.version(), + compression_dict); } if (slice.data() != used_buf) { @@ -351,8 +353,8 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer, // free this buffer. // format_version is the block format as defined in include/rocksdb/table.h Status UncompressBlockContents(const char* data, size_t n, - BlockContents* contents, - uint32_t format_version) { + BlockContents* contents, uint32_t format_version, + const Slice& compression_dict) { std::unique_ptr ubuf; int decompress_size = 0; assert(data[n] != kNoCompression); @@ -374,7 +376,8 @@ Status UncompressBlockContents(const char* data, size_t n, case kZlibCompression: ubuf.reset(Zlib_Uncompress( data, n, &decompress_size, - GetCompressFormatForVersion(kZlibCompression, format_version))); + GetCompressFormatForVersion(kZlibCompression, format_version), + compression_dict)); if (!ubuf) { static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; @@ -398,7 +401,8 @@ Status UncompressBlockContents(const char* data, size_t n, case kLZ4Compression: ubuf.reset(LZ4_Uncompress( data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4Compression, format_version))); + GetCompressFormatForVersion(kLZ4Compression, format_version), + compression_dict)); if (!ubuf) { static char lz4_corrupt_msg[] = "LZ4 not supported or corrupted LZ4 compressed block contents"; @@ -410,7 +414,8 @@ Status UncompressBlockContents(const char* data, size_t n, case kLZ4HCCompression: ubuf.reset(LZ4_Uncompress( data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4HCCompression, format_version))); + GetCompressFormatForVersion(kLZ4HCCompression, format_version), + compression_dict)); if (!ubuf) { static char lz4hc_corrupt_msg[] = "LZ4HC not supported or corrupted LZ4HC compressed block contents"; @@ -430,7 +435,7 @@ Status UncompressBlockContents(const char* data, size_t n, BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kZSTDNotFinalCompression: - ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size)); + ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size, compression_dict)); if (!ubuf) { static char zstd_corrupt_msg[] = "ZSTD not supported or corrupted ZSTD compressed block contents"; diff --git a/table/format.h b/table/format.h index 15203dfd9..8f89eee7c 100644 --- a/table/format.h +++ b/table/format.h @@ -193,7 +193,7 @@ struct BlockContents { compression_type(_compression_type), allocation(std::move(_data)) {} - BlockContents(BlockContents&& other) { *this = std::move(other); } + BlockContents(BlockContents&& other) noexcept { *this = std::move(other); } BlockContents& operator=(BlockContents&& other) { data = std::move(other.data); @@ -211,7 +211,8 @@ extern Status ReadBlockContents(RandomAccessFileReader* file, const ReadOptions& options, const BlockHandle& handle, BlockContents* contents, Env* env, - bool do_uncompress); + bool do_uncompress, + const Slice& compression_dict = Slice()); // The 'data' points to the raw block contents read in from file. // This method allocates a new heap buffer and the raw block @@ -222,7 +223,8 @@ extern Status ReadBlockContents(RandomAccessFileReader* file, // util/compression.h extern Status UncompressBlockContents(const char* data, size_t n, BlockContents* contents, - uint32_t compress_format_version); + uint32_t compress_format_version, + const Slice& compression_dict); // Implementation details follow. Clients should ignore, diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index ddc3d5821..0940b0d23 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -115,7 +115,8 @@ Status SstFileWriter::Open(const std::string& file_path) { TableBuilderOptions table_builder_options( r->ioptions, r->internal_comparator, &int_tbl_prop_collector_factories, - compression_type, r->ioptions.compression_opts, false /* skip_filters */, + compression_type, r->ioptions.compression_opts, + nullptr /* compression_dict */, false /* skip_filters */, r->column_family_name); r->file_writer.reset( new WritableFileWriter(std::move(sst_file), r->env_options)); diff --git a/table/table_builder.h b/table/table_builder.h index fb94b38fe..be19636f6 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -52,13 +52,15 @@ struct TableBuilderOptions { const std::vector>* _int_tbl_prop_collector_factories, CompressionType _compression_type, - const CompressionOptions& _compression_opts, bool _skip_filters, + const CompressionOptions& _compression_opts, + const std::string* _compression_dict, bool _skip_filters, const std::string& _column_family_name) : ioptions(_ioptions), internal_comparator(_internal_comparator), int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), compression_type(_compression_type), compression_opts(_compression_opts), + compression_dict(_compression_dict), skip_filters(_skip_filters), column_family_name(_column_family_name) {} const ImmutableCFOptions& ioptions; @@ -67,6 +69,8 @@ struct TableBuilderOptions { int_tbl_prop_collector_factories; CompressionType compression_type; const CompressionOptions& compression_opts; + // Data for presetting the compression library's dictionary, or nullptr. + const std::string* compression_dict; bool skip_filters; // only used by BlockBasedTableBuilder const std::string& column_family_name; }; diff --git a/table/table_properties.cc b/table/table_properties.cc index af3bae387..7ca2072f2 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -40,6 +40,19 @@ namespace { props, key, ToString(value), prop_delim, kv_delim ); } + + // Seek to the specified meta block. + // Return true if it successfully seeks to that block. + Status SeekToMetaBlock(InternalIterator* meta_iter, + const std::string& block_name, bool* is_found) { + *is_found = true; + meta_iter->Seek(block_name); + if (meta_iter->status().ok() && + (!meta_iter->Valid() || meta_iter->key() != block_name)) { + *is_found = false; + } + return meta_iter->status(); + } } std::string TableProperties::ToString( @@ -146,21 +159,22 @@ const std::string TablePropertiesNames::kPropertyCollectors = extern const std::string kPropertiesBlock = "rocksdb.properties"; // Old property block name for backward compatibility extern const std::string kPropertiesBlockOldName = "rocksdb.stats"; +extern const std::string kCompressionDictBlock = "rocksdb.compression_dict"; // Seek to the properties block. // Return true if it successfully seeks to the properties block. Status SeekToPropertiesBlock(InternalIterator* meta_iter, bool* is_found) { - *is_found = true; - meta_iter->Seek(kPropertiesBlock); - if (meta_iter->status().ok() && - (!meta_iter->Valid() || meta_iter->key() != kPropertiesBlock)) { - meta_iter->Seek(kPropertiesBlockOldName); - if (meta_iter->status().ok() && - (!meta_iter->Valid() || meta_iter->key() != kPropertiesBlockOldName)) { - *is_found = false; - } + Status status = SeekToMetaBlock(meta_iter, kPropertiesBlock, is_found); + if (!*is_found && status.ok()) { + status = SeekToMetaBlock(meta_iter, kPropertiesBlockOldName, is_found); } - return meta_iter->status(); + return status; +} + +// Seek to the compression dictionary block. +// Return true if it successfully seeks to that block. +Status SeekToCompressionDictBlock(InternalIterator* meta_iter, bool* is_found) { + return SeekToMetaBlock(meta_iter, kCompressionDictBlock, is_found); } } // namespace rocksdb diff --git a/table/table_properties_internal.h b/table/table_properties_internal.h index 77042acbb..3d3a4b5f8 100644 --- a/table/table_properties_internal.h +++ b/table/table_properties_internal.h @@ -17,4 +17,9 @@ class InternalIterator; // set to true. Status SeekToPropertiesBlock(InternalIterator* meta_iter, bool* is_found); +// Seek to the compression dictionary block. +// If it successfully seeks to the properties block, "is_found" will be +// set to true. +Status SeekToCompressionDictBlock(InternalIterator* meta_iter, bool* is_found); + } // namespace rocksdb diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index d0d2959f2..0a227d1e3 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -98,8 +98,9 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options, tb = opts.table_factory->NewTableBuilder( TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, CompressionType::kNoCompression, - CompressionOptions(), false /* skip_filters */, - kDefaultColumnFamilyName), + CompressionOptions(), + nullptr /* compression_dict */, + false /* skip_filters */, kDefaultColumnFamilyName), 0 /* column_family_id */, file_writer.get()); } else { s = DB::Open(opts, dbname, &db); diff --git a/table/table_test.cc b/table/table_test.cc index 92143ed39..4a3f4ed07 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -307,10 +307,11 @@ class TableConstructor: public Constructor { int_tbl_prop_collector_factories; std::string column_family_name; builder.reset(ioptions.table_factory->NewTableBuilder( - TableBuilderOptions( - ioptions, internal_comparator, &int_tbl_prop_collector_factories, - options.compression, CompressionOptions(), false /* skip_filters */, - column_family_name), + TableBuilderOptions(ioptions, internal_comparator, + &int_tbl_prop_collector_factories, + options.compression, CompressionOptions(), + nullptr /* compression_dict */, + false /* skip_filters */, column_family_name), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer_.get())); @@ -2039,6 +2040,7 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) { std::unique_ptr builder(factory.NewTableBuilder( TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, kNoCompression, CompressionOptions(), + nullptr /* compression_dict */, false /* skip_filters */, column_family_name), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); diff --git a/tools/sst_dump_test.cc b/tools/sst_dump_test.cc index 85b7bf04a..bef88bc23 100644 --- a/tools/sst_dump_test.cc +++ b/tools/sst_dump_test.cc @@ -62,6 +62,7 @@ void createSST(const std::string& file_name, tb.reset(opts.table_factory->NewTableBuilder( TableBuilderOptions(imoptions, ikc, &int_tbl_prop_collector_factories, CompressionType::kNoCompression, CompressionOptions(), + nullptr /* compression_dict */, false /* skip_filters */, column_family_name), TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, file_writer.get())); diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index 52bfb554d..7b4eb888c 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -187,7 +187,7 @@ int SstFileReader::ShowAllCompressionSizes(size_t block_size) { { CompressionType::kSnappyCompression, "kSnappyCompression" }, { CompressionType::kZlibCompression, "kZlibCompression" }, { CompressionType::kBZip2Compression, "kBZip2Compression" }, - { CompressionType::kLZ4Compression, "kLZ4Compression" }, + { CompressionType::kLZ4Compression, "kLZ4Compression" }, { CompressionType::kLZ4HCCompression, "kLZ4HCCompression" }, { CompressionType::kXpressCompression, "kXpressCompression" }, { CompressionType::kZSTDNotFinalCompression, "kZSTDNotFinalCompression" } @@ -196,9 +196,10 @@ int SstFileReader::ShowAllCompressionSizes(size_t block_size) { for (auto& i : compressions) { CompressionOptions compress_opt; std::string column_family_name; - TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, i.first, - compress_opt, false /* skip_filters */, - column_family_name); + TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, + i.first, compress_opt, + nullptr /* compression_dict */, + false /* skip_filters */, column_family_name); uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size); fprintf(stdout, "Compression: %s", i.second); fprintf(stdout, " Size: %" PRIu64 "\n", file_size); diff --git a/util/compression.h b/util/compression.h index 20e52833f..bc27200ea 100644 --- a/util/compression.h +++ b/util/compression.h @@ -196,10 +196,12 @@ inline bool GetDecompressedSizeInfo(const char** input_data, // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format +// @param compression_dict Data for presetting the compression library's +// dictionary. inline bool Zlib_Compress(const CompressionOptions& opts, - uint32_t compress_format_version, - const char* input, size_t length, - ::std::string* output) { + uint32_t compress_format_version, const char* input, + size_t length, ::std::string* output, + const Slice& compression_dict = Slice()) { #ifdef ZLIB if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -229,6 +231,16 @@ inline bool Zlib_Compress(const CompressionOptions& opts, return false; } + if (compression_dict.size()) { + // Initialize the compression library's dictionary + st = deflateSetDictionary( + &_stream, reinterpret_cast(compression_dict.data()), + static_cast(compression_dict.size())); + if (st != Z_OK) { + return false; + } + } + // Compress the input, and put compressed data in output. _stream.next_in = (Bytef *)input; _stream.avail_in = static_cast(length); @@ -266,9 +278,12 @@ inline bool Zlib_Compress(const CompressionOptions& opts, // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format +// @param compression_dict Data for presetting the compression library's +// dictionary. inline char* Zlib_Uncompress(const char* input_data, size_t input_length, int* decompress_size, uint32_t compress_format_version, + const Slice& compression_dict = Slice(), int windowBits = -14) { #ifdef ZLIB uint32_t output_len = 0; @@ -298,6 +313,16 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, return nullptr; } + if (compression_dict.size()) { + // Initialize the compression library's dictionary + st = inflateSetDictionary( + &_stream, reinterpret_cast(compression_dict.data()), + static_cast(compression_dict.size())); + if (st != Z_OK) { + return nullptr; + } + } + _stream.next_in = (Bytef *)input_data; _stream.avail_in = static_cast(input_length); @@ -497,9 +522,12 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, // block header using memcpy, which makes database non-portable) // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format +// @param compression_dict Data for presetting the compression library's +// dictionary. inline bool LZ4_Compress(const CompressionOptions& opts, uint32_t compress_format_version, const char* input, - size_t length, ::std::string* output) { + size_t length, ::std::string* output, + const Slice compression_dict = Slice()) { #ifdef LZ4 if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -519,18 +547,31 @@ inline bool LZ4_Compress(const CompressionOptions& opts, char* p = const_cast(output->c_str()); memcpy(p, &length, sizeof(length)); } + int compress_bound = LZ4_compressBound(static_cast(length)); + output->resize(static_cast(output_header_len + compress_bound)); + + int outlen; +#if LZ4_VERSION_NUMBER >= 10400 // r124+ + LZ4_stream_t* stream = LZ4_createStream(); + if (compression_dict.size()) { + LZ4_loadDict(stream, compression_dict.data(), + static_cast(compression_dict.size())); + } + outlen = LZ4_compress_limitedOutput_continue( + stream, input, &(*output)[output_header_len], static_cast(length), + compress_bound); + LZ4_freeStream(stream); +#else // up to r123 + outlen = LZ4_compress_limitedOutput(input, &(*output)[output_header_len], + static_cast(length), compress_bound); +#endif // LZ4_VERSION_NUMBER >= 10400 - int compressBound = LZ4_compressBound(static_cast(length)); - output->resize(static_cast(output_header_len + compressBound)); - int outlen = - LZ4_compress_limitedOutput(input, &(*output)[output_header_len], - static_cast(length), compressBound); if (outlen == 0) { return false; } output->resize(static_cast(output_header_len + outlen)); return true; -#endif +#endif // LZ4 return false; } @@ -538,9 +579,12 @@ inline bool LZ4_Compress(const CompressionOptions& opts, // block header using memcpy, which makes database non-portable) // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format +// @param compression_dict Data for presetting the compression library's +// dictionary. inline char* LZ4_Uncompress(const char* input_data, size_t input_length, int* decompress_size, - uint32_t compress_format_version) { + uint32_t compress_format_version, + const Slice& compression_dict = Slice()) { #ifdef LZ4 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -559,17 +603,31 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length, input_length -= 8; input_data += 8; } + char* output = new char[output_len]; +#if LZ4_VERSION_NUMBER >= 10400 // r124+ + LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); + if (compression_dict.size()) { + LZ4_setStreamDecode(stream, compression_dict.data(), + static_cast(compression_dict.size())); + } + *decompress_size = LZ4_decompress_safe_continue( + stream, input_data, output, static_cast(input_length), + static_cast(output_len)); + LZ4_freeStreamDecode(stream); +#else // up to r123 *decompress_size = LZ4_decompress_safe(input_data, output, static_cast(input_length), static_cast(output_len)); +#endif // LZ4_VERSION_NUMBER >= 10400 + if (*decompress_size < 0) { delete[] output; return nullptr; } assert(*decompress_size == static_cast(output_len)); return output; -#endif +#endif // LZ4 return nullptr; } @@ -577,9 +635,12 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length, // block header using memcpy, which makes database non-portable) // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format +// @param compression_dict Data for presetting the compression library's +// dictionary. inline bool LZ4HC_Compress(const CompressionOptions& opts, uint32_t compress_format_version, const char* input, - size_t length, ::std::string* output) { + size_t length, ::std::string* output, + const Slice& compression_dict = Slice()) { #ifdef LZ4 if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -599,25 +660,46 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts, char* p = const_cast(output->c_str()); memcpy(p, &length, sizeof(length)); } + int compress_bound = LZ4_compressBound(static_cast(length)); + output->resize(static_cast(output_header_len + compress_bound)); - int compressBound = LZ4_compressBound(static_cast(length)); - output->resize(static_cast(output_header_len + compressBound)); int outlen; -#ifdef LZ4_VERSION_MAJOR // they only started defining this since r113 +#if LZ4_VERSION_NUMBER >= 10400 // r124+ + LZ4_streamHC_t* stream = LZ4_createStreamHC(); + LZ4_resetStreamHC(stream, opts.level); + const char* compression_dict_data = + compression_dict.size() > 0 ? compression_dict.data() : nullptr; + size_t compression_dict_size = compression_dict.size(); + LZ4_loadDictHC(stream, compression_dict_data, + static_cast(compression_dict_size)); + +#if LZ4_VERSION_NUMBER >= 10700 // r129+ + outlen = + LZ4_compress_HC_continue(stream, input, &(*output)[output_header_len], + static_cast(length), compress_bound); +#else // r124-r128 + outlen = LZ4_compressHC_limitedOutput_continue( + stream, input, &(*output)[output_header_len], static_cast(length), + compress_bound); +#endif // LZ4_VERSION_NUMBER >= 10700 + LZ4_freeStreamHC(stream); + +#elif LZ4_VERSION_MAJOR // r113-r123 outlen = LZ4_compressHC2_limitedOutput(input, &(*output)[output_header_len], static_cast(length), - compressBound, opts.level); -#else + compress_bound, opts.level); +#else // up to r112 outlen = LZ4_compressHC_limitedOutput(input, &(*output)[output_header_len], - static_cast(length), compressBound); -#endif + static_cast(length), compress_bound); +#endif // LZ4_VERSION_NUMBER >= 10400 + if (outlen == 0) { return false; } output->resize(static_cast(output_header_len + outlen)); return true; -#endif +#endif // LZ4 return false; } @@ -637,8 +719,11 @@ inline char* XPRESS_Uncompress(const char* input_data, size_t input_length, } +// @param compression_dict Data for presetting the compression library's +// dictionary. inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, - size_t length, ::std::string* output) { + size_t length, ::std::string* output, + const Slice& compression_dict = Slice()) { #ifdef ZSTD if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -650,8 +735,17 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, size_t compressBound = ZSTD_compressBound(length); output->resize(static_cast(output_header_len + compressBound)); - size_t outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, - input, length, opts.level); + size_t outlen; +#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ + ZSTD_CCtx* context = ZSTD_createCCtx(); + outlen = ZSTD_compress_usingDict( + context, &(*output)[output_header_len], compressBound, input, length, + compression_dict.data(), compression_dict.size(), opts.level); + ZSTD_freeCCtx(context); +#else // up to v0.4.x + outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input, + length, opts.level); +#endif // ZSTD_VERSION_NUMBER >= 500 if (outlen == 0) { return false; } @@ -661,8 +755,11 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, return false; } +// @param compression_dict Data for presetting the compression library's +// dictionary. inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, - int* decompress_size) { + int* decompress_size, + const Slice& compression_dict = Slice()) { #ifdef ZSTD uint32_t output_len = 0; if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, @@ -671,8 +768,17 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, } char* output = new char[output_len]; - size_t actual_output_length = + size_t actual_output_length; +#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ + ZSTD_DCtx* context = ZSTD_createDCtx(); + actual_output_length = ZSTD_decompress_usingDict( + context, output, output_len, input_data, input_length, + compression_dict.data(), compression_dict.size()); + ZSTD_freeDCtx(context); +#else // up to v0.4.x + actual_output_length = ZSTD_decompress(output, output_len, input_data, input_length); +#endif // ZSTD_VERSION_NUMBER >= 500 assert(actual_output_length == output_len); *decompress_size = static_cast(actual_output_length); return output; diff --git a/util/options.cc b/util/options.cc index 36bad372e..f195cd49c 100644 --- a/util/options.cc +++ b/util/options.cc @@ -507,6 +507,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const { compression_opts.level); Header(log, " Options.compression_opts.strategy: %d", compression_opts.strategy); + Header(log, + " Options.compression_opts.max_dict_bytes: %" ROCKSDB_PRIszt, + compression_opts.max_dict_bytes); Header(log, " Options.level0_file_num_compaction_trigger: %d", level0_file_num_compaction_trigger); Header(log, " Options.level0_slowdown_writes_trigger: %d", diff --git a/util/options_helper.cc b/util/options_helper.cc index 674aed7b6..77402188d 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -806,12 +806,20 @@ Status ParseColumnFamilyOption(const std::string& name, new_options->compression_opts.level = ParseInt(value.substr(start, end - start)); start = end + 1; - if (start >= value.size()) { + end = value.find(':', start); + if (end == std::string::npos) { return Status::InvalidArgument( "unable to parse the specified CF option " + name); } new_options->compression_opts.strategy = ParseInt(value.substr(start, value.size() - start)); + start = end + 1; + if (start >= value.size()) { + return Status::InvalidArgument( + "unable to parse the specified CF option " + name); + } + new_options->compression_opts.max_dict_bytes = + ParseInt(value.substr(start, value.size() - start)); } else if (name == "compaction_options_fifo") { new_options->compaction_options_fifo.max_table_files_size = ParseUint64(value); diff --git a/util/options_test.cc b/util/options_test.cc index 7ca41291f..879cc3c6b 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -103,7 +103,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { "kLZ4HCCompression:" "kXpressCompression:" "kZSTDNotFinalCompression"}, - {"compression_opts", "4:5:6"}, + {"compression_opts", "4:5:6:7"}, {"num_levels", "8"}, {"level0_file_num_compaction_trigger", "8"}, {"level0_slowdown_writes_trigger", "9"}, @@ -201,6 +201,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.compression_opts.window_bits, 4); ASSERT_EQ(new_cf_opt.compression_opts.level, 5); ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6); + ASSERT_EQ(new_cf_opt.compression_opts.max_dict_bytes, 7); ASSERT_EQ(new_cf_opt.num_levels, 8); ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8); ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9);