Feature for sampling and reporting compressibility (#4842)

Summary:
This is a feature to sample data-block compressibility and and report them as stats. 1 in N (tunable) blocks is sampled for compressibility using two algorithms:
1. lz4 or snappy for fast compression
2. zstd or zlib for slow but higher compression.

The stats are reported to the caller as raw-bytes and compressed-bytes. The block continues to be compressed for storage using the specified CompressionType.

The db_bench_tool how has a command line option for specifying the sampling rate. It's default value is 0 (no sampling). To test the overhead for a certain value, users can compare the performance of db_bench_tool, varying the sampling rate. It is unlikely to have a noticeable impact for high values like 20.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4842

Differential Revision: D13629011

Pulled By: shobhitdayal

fbshipit-source-id: 14ca668bcab6499b2a1734edf848eb62a4f4fafa
main
Shobhit Dayal 6 years ago committed by Facebook Github Bot
parent 20d49da90c
commit b45b1cde3e
  1. 7
      HISTORY.md
  2. 25
      db/builder.cc
  3. 2
      db/builder.h
  4. 1
      db/compaction_job.cc
  5. 1
      db/db_impl_open.cc
  6. 3
      db/flush_job.cc
  7. 9
      db/repair.cc
  8. 7
      db/table_properties_collector.cc
  9. 8
      db/table_properties_collector.h
  10. 10
      db/table_properties_collector_test.cc
  11. 6
      include/rocksdb/advanced_options.h
  12. 8
      include/rocksdb/table_properties.h
  13. 7
      options/cf_options.h
  14. 3
      options/options.cc
  15. 7
      options/options_helper.cc
  16. 1
      options/options_settable_test.cc
  17. 167
      table/block_based_table_builder.cc
  18. 5
      table/block_based_table_builder.h
  19. 1
      table/block_based_table_factory.cc
  20. 5
      table/data_block_hash_index_test.cc
  21. 10
      table/meta_blocks.cc
  22. 5
      table/meta_blocks.h
  23. 7
      table/sst_file_writer.cc
  24. 8
      table/sst_file_writer_collectors.h
  25. 4
      table/table_builder.h
  26. 5
      table/table_reader_bench.cc
  27. 26
      table/table_test.cc
  28. 17
      tools/db_bench_tool.cc
  29. 5
      tools/sst_dump_test.cc
  30. 8
      tools/sst_dump_tool.cc
  31. 11
      util/compression.h
  32. 7
      utilities/blob_db/blob_db_impl.cc

@ -10,6 +10,13 @@
* Fix JEMALLOC_CXX_THROW macro missing from older Jemalloc versions, causing build failures on some platforms.
## Unreleased
### New Features
* Added a feature to perform data-block sampling for compressibility, and report stats to user.
### Public API Change
### Bug fixes
## 6.0.0 (2/19/2019)
### New Features
* Enabled checkpoint on readonly db (DBImplReadOnly).

@ -47,8 +47,8 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const bool skip_filters, const uint64_t creation_time,
uint64_t sample_for_compression, const CompressionOptions& compression_opts,
int level, const bool skip_filters, const uint64_t creation_time,
const uint64_t oldest_key_time, const uint64_t target_file_size) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
@ -56,9 +56,9 @@ TableBuilder* NewTableBuilder(
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, skip_filters, column_family_name,
level, creation_time, oldest_key_time,
target_file_size),
sample_for_compression, compression_opts,
skip_filters, column_family_name, level,
creation_time, oldest_key_time, target_file_size),
column_family_id, file);
}
@ -75,11 +75,12 @@ Status BuildTable(
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, const CompressionType compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) {
uint64_t sample_for_compression, const CompressionOptions& compression_opts,
bool paranoid_file_checks, InternalStats* internal_stats,
TableFileCreationReason reason, EventLogger* event_logger, int job_id,
const Env::IOPriority io_priority, TableProperties* table_properties,
int level, const uint64_t creation_time, const uint64_t oldest_key_time,
Env::WriteLifeTimeHint write_hint) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
@ -133,8 +134,8 @@ Status BuildTable(
ioptions, mutable_cf_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id,
column_family_name, file_writer.get(), compression,
compression_opts_for_flush, level, false /* skip_filters */,
creation_time, oldest_key_time);
sample_for_compression, compression_opts_for_flush, level,
false /* skip_filters */, creation_time, oldest_key_time);
}
MergeHelper merge(env, internal_comparator.user_comparator(),

@ -47,6 +47,7 @@ TableBuilder* NewTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, const std::string& column_family_name,
WritableFileWriter* file, const CompressionType compression_type,
const uint64_t sample_for_compression,
const CompressionOptions& compression_opts, int level,
const bool skip_filters = false, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0);
@ -72,6 +73,7 @@ extern Status BuildTable(
std::vector<SequenceNumber> snapshots,
SequenceNumber earliest_write_conflict_snapshot,
SnapshotChecker* snapshot_checker, const CompressionType compression,
const uint64_t sample_for_compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger = nullptr, int job_id = 0,

@ -1499,6 +1499,7 @@ Status CompactionJob::OpenCompactionOutputFile(
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
sub_compact->compaction->output_compression(),
0 /*sample_for_compression */,
sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), skip_filters,
output_file_creation_time, 0 /* oldest_key_time */,

@ -1029,6 +1029,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
mutable_cf_options.sample_for_compression,
cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery,
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,

@ -372,7 +372,8 @@ Status FlushJob::WriteLevel0Table() {
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, snapshot_checker_,
output_compression_, cfd_->ioptions()->compression_opts,
output_compression_, mutable_cf_options_.sample_for_compression,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,

@ -430,10 +430,11 @@ class Repairer {
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
{}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery, nullptr /* event_logger */,
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, write_hint);
0 /* sample_for_compression */, CompressionOptions(), false,
nullptr /* internal_stats */, TableFileCreationReason::kRecovery,
nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
nullptr /* table_properties */, -1 /* level */, current_time,
write_hint);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),

@ -41,6 +41,13 @@ Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key,
ikey.sequence, file_size);
}
void UserKeyTablePropertiesCollector::BlockAdd(
uint64_t bLockRawBytes, uint64_t blockCompressedBytesFast,
uint64_t blockCompressedBytesSlow) {
return collector_->BlockAdd(bLockRawBytes, blockCompressedBytesFast,
blockCompressedBytesSlow);
}
Status UserKeyTablePropertiesCollector::Finish(
UserCollectedProperties* properties) {
return collector_->Finish(properties);

@ -27,6 +27,10 @@ class IntTblPropCollector {
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) = 0;
virtual void BlockAdd(uint64_t blockRawBytes,
uint64_t blockCompressedBytesFast,
uint64_t blockCompressedBytesSlow) = 0;
virtual UserCollectedProperties GetReadableProperties() const = 0;
virtual bool NeedCompact() const { return false; }
@ -60,6 +64,10 @@ class UserKeyTablePropertiesCollector : public IntTblPropCollector {
virtual Status InternalAdd(const Slice& key, const Slice& value,
uint64_t file_size) override;
virtual void BlockAdd(uint64_t blockRawBytes,
uint64_t blockCompressedBytesFast,
uint64_t blockCompressedBytesSlow) override;
virtual Status Finish(UserCollectedProperties* properties) override;
virtual const char* Name() const override { return collector_->Name(); }

@ -52,7 +52,8 @@ void MakeBuilder(const Options& options, const ImmutableCFOptions& ioptions,
builder->reset(NewTableBuilder(
ioptions, moptions, internal_comparator, int_tbl_prop_collector_factories,
kTestColumnFamilyId, kTestColumnFamilyName, writable->get(),
options.compression, options.compression_opts, unknown_level));
options.compression, options.sample_for_compression,
options.compression_opts, unknown_level));
}
} // namespace
@ -172,6 +173,13 @@ class RegularKeysStartWithAInternal : public IntTblPropCollector {
return Status::OK();
}
void BlockAdd(uint64_t /* blockRawBytes */,
uint64_t /* blockCompressedBytesFast */,
uint64_t /* blockCompressedBytesSlow */) override {
// Nothing to do.
return;
}
UserCollectedProperties GetReadableProperties() const override {
return UserCollectedProperties{};
}

@ -644,6 +644,12 @@ struct AdvancedColumnFamilyOptions {
// Dynamically changeable through SetOptions() API
uint64_t ttl = 0;
// If this option is set then 1 in N blocks are compressed
// using a fast (lz4) and slow (zstd) compression algorithm.
// The compressibility is reported as stats and the stored
// data is left uncompressed (unless compression is also requested).
uint64_t sample_for_compression = 0;
// Create ColumnFamilyOptions with default values for all fields
AdvancedColumnFamilyOptions();
// Create ColumnFamilyOptions from Options

@ -92,6 +92,14 @@ class TablePropertiesCollector {
return Add(key, value);
}
// Called after each new block is cut
virtual void BlockAdd(uint64_t /* blockRawBytes */,
uint64_t /* blockCompressedBytesFast */,
uint64_t /* blockCompressedBytesSlow */) {
// Nothing to do here. Callback registers can override.
return;
}
// Finish() will be called when a table has already been built and is ready
// for writing the properties block.
// @params properties User will add their collected statistics to

@ -159,7 +159,8 @@ struct MutableCFOptions {
options.max_sequential_skip_in_iterations),
paranoid_file_checks(options.paranoid_file_checks),
report_bg_io_stats(options.report_bg_io_stats),
compression(options.compression) {
compression(options.compression),
sample_for_compression(options.sample_for_compression) {
RefreshDerivedOptions(options.num_levels, options.compaction_style);
}
@ -189,7 +190,8 @@ struct MutableCFOptions {
max_sequential_skip_in_iterations(0),
paranoid_file_checks(false),
report_bg_io_stats(false),
compression(Snappy_Supported() ? kSnappyCompression : kNoCompression) {}
compression(Snappy_Supported() ? kSnappyCompression : kNoCompression),
sample_for_compression(0) {}
explicit MutableCFOptions(const Options& options);
@ -243,6 +245,7 @@ struct MutableCFOptions {
bool paranoid_file_checks;
bool report_bg_io_stats;
CompressionType compression;
uint64_t sample_for_compression;
// Derived options
// Per-level target file size.

@ -87,7 +87,8 @@ AdvancedColumnFamilyOptions::AdvancedColumnFamilyOptions(const Options& options)
paranoid_file_checks(options.paranoid_file_checks),
force_consistency_checks(options.force_consistency_checks),
report_bg_io_stats(options.report_bg_io_stats),
ttl(options.ttl) {
ttl(options.ttl),
sample_for_compression(options.sample_for_compression) {
assert(memtable_factory.get() != nullptr);
if (max_bytes_for_level_multiplier_additional.size() <
static_cast<unsigned int>(num_levels)) {

@ -193,6 +193,7 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
cf_opts.paranoid_file_checks = mutable_cf_options.paranoid_file_checks;
cf_opts.report_bg_io_stats = mutable_cf_options.report_bg_io_stats;
cf_opts.compression = mutable_cf_options.compression;
cf_opts.sample_for_compression = mutable_cf_options.sample_for_compression;
cf_opts.table_factory = options.table_factory;
// TODO(yhchiang): find some way to handle the following derived options
@ -1927,7 +1928,11 @@ std::unordered_map<std::string, OptionTypeInfo>
{"ttl",
{offset_of(&ColumnFamilyOptions::ttl), OptionType::kUInt64T,
OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, ttl)}}};
offsetof(struct MutableCFOptions, ttl)}},
{"sample_for_compression",
{offset_of(&ColumnFamilyOptions::sample_for_compression),
OptionType::kUInt64T, OptionVerificationType::kNormal, true,
offsetof(struct MutableCFOptions, sample_for_compression)}}};
std::unordered_map<std::string, OptionTypeInfo>
OptionsHelper::fifo_compaction_options_type_info = {

@ -451,6 +451,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"disable_auto_compactions=false;"
"report_bg_io_stats=true;"
"ttl=60;"
"sample_for_compression=0;"
"compaction_options_fifo={max_table_files_size=3;allow_"
"compaction=false;};",
new_options));

@ -101,83 +101,105 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
return compressed_size < raw_size - (raw_size / 8u);
}
} // namespace
// format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw, const CompressionInfo& compression_info,
CompressionType* type, uint32_t format_version,
bool CompressBlockInternal(const Slice& raw,
const CompressionInfo& compression_info,
uint32_t format_version,
std::string* compressed_output) {
*type = compression_info.type();
if (compression_info.type() == kNoCompression) {
return raw;
}
// Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough".
switch (compression_info.type()) {
case kSnappyCompression:
if (Snappy_Compress(compression_info, raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
return Snappy_Compress(compression_info, raw.data(), raw.size(),
compressed_output);
case kZlibCompression:
if (Zlib_Compress(
return Zlib_Compress(
compression_info,
GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
raw.data(), raw.size(), compressed_output);
case kBZip2Compression:
if (BZip2_Compress(
return BZip2_Compress(
compression_info,
GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
raw.data(), raw.size(), compressed_output);
case kLZ4Compression:
if (LZ4_Compress(
return LZ4_Compress(
compression_info,
GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
raw.data(), raw.size(), compressed_output);
case kLZ4HCCompression:
if (LZ4HC_Compress(
return LZ4HC_Compress(
compression_info,
GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
raw.data(), raw.size(), compressed_output);
case kXpressCompression:
if (XPRESS_Compress(raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break;
return XPRESS_Compress(raw.data(), raw.size(), compressed_output);
case kZSTD:
case kZSTDNotFinalCompression:
if (ZSTD_Compress(compression_info, raw.data(), raw.size(),
compressed_output) &&
return ZSTD_Compress(compression_info, raw.data(), raw.size(),
compressed_output);
default:
// Do not recognize this compression type
return false;
}
}
} // namespace
// format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
CompressionType* type, uint32_t format_version,
bool do_sample, std::string* compressed_output,
std::string* sampled_output_fast,
std::string* sampled_output_slow) {
*type = info.type();
if (info.type() == kNoCompression && !info.SampleForCompression()) {
return raw;
}
// If requested, we sample one in every N block with a
// fast and slow compression algorithm and report the stats.
// The users can use these stats to decide if it is worthwhile
// enabling compression and they also get a hint about which
// compression algorithm wil be beneficial.
if (do_sample && info.SampleForCompression() &&
Random::GetTLSInstance()->OneIn((int)info.SampleForCompression()) &&
sampled_output_fast && sampled_output_slow) {
// Sampling with a fast compression algorithm
if (LZ4_Supported() || Snappy_Supported()) {
CompressionType c =
LZ4_Supported() ? kLZ4Compression : kSnappyCompression;
CompressionContext context(c);
CompressionOptions options;
CompressionInfo info_tmp(options, context,
CompressionDict::GetEmptyDict(), c,
info.SampleForCompression());
CompressBlockInternal(raw, info_tmp, format_version, sampled_output_fast);
}
// Sampling with a slow but high-compression algorithm
if (ZSTD_Supported() || Zlib_Supported()) {
CompressionType c = ZSTD_Supported() ? kZSTD : kZlibCompression;
CompressionContext context(c);
CompressionOptions options;
CompressionInfo info_tmp(options, context,
CompressionDict::GetEmptyDict(), c,
info.SampleForCompression());
CompressBlockInternal(raw, info_tmp, format_version, sampled_output_slow);
}
}
// Actually compress the data
if (*type != kNoCompression) {
if (CompressBlockInternal(raw, info, format_version, compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break; // fall back to no compression.
default: {} // Do not recognize this compression type
}
// Compression method is not supported, or not good compression ratio, so just
// fall back to uncompressed form.
// Compression method is not supported, or not good
// compression ratio, so just fall back to uncompressed form.
*type = kNoCompression;
return raw;
}
@ -217,6 +239,14 @@ class BlockBasedTableBuilder::BlockBasedTablePropertiesCollector
return Status::OK();
}
virtual void BlockAdd(uint64_t /* blockRawBytes */,
uint64_t /* blockCompressedBytesFast */,
uint64_t /* blockCompressedBytesSlow */) override {
// Intentionally left blank. No interest in collecting stats for
// blocks.
return;
}
Status Finish(UserCollectedProperties* properties) override {
std::string val;
PutFixed32(&val, static_cast<uint32_t>(index_type_));
@ -269,6 +299,7 @@ struct BlockBasedTableBuilder::Rep {
std::string last_key;
CompressionType compression_type;
uint64_t sample_for_compression;
CompressionOptions compression_opts;
std::unique_ptr<CompressionDict> compression_dict;
CompressionContext compression_ctx;
@ -328,6 +359,7 @@ struct BlockBasedTableBuilder::Rep {
int_tbl_prop_collector_factories,
uint32_t _column_family_id, WritableFileWriter* f,
const CompressionType _compression_type,
const uint64_t _sample_for_compression,
const CompressionOptions& _compression_opts, const bool skip_filters,
const std::string& _column_family_name, const uint64_t _creation_time,
const uint64_t _oldest_key_time, const uint64_t _target_file_size)
@ -350,6 +382,7 @@ struct BlockBasedTableBuilder::Rep {
range_del_block(1 /* block_restart_interval */),
internal_prefix_transform(_moptions.prefix_extractor.get()),
compression_type(_compression_type),
sample_for_compression(_sample_for_compression),
compression_opts(_compression_opts),
compression_dict(),
compression_ctx(_compression_type),
@ -415,6 +448,7 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const uint64_t sample_for_compression,
const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time,
const uint64_t oldest_key_time, const uint64_t target_file_size) {
@ -430,11 +464,11 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
sanitized_table_options.format_version = 1;
}
rep_ = new Rep(ioptions, moptions, sanitized_table_options,
internal_comparator, int_tbl_prop_collector_factories,
column_family_id, file, compression_type, compression_opts,
skip_filters, column_family_name, creation_time,
oldest_key_time, target_file_size);
rep_ = new Rep(
ioptions, moptions, sanitized_table_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id, file,
compression_type, sample_for_compression, compression_opts, skip_filters,
column_family_name, creation_time, oldest_key_time, target_file_size);
if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
@ -558,6 +592,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
Rep* r = rep_;
auto type = r->compression_type;
uint64_t sample_for_compression = r->sample_for_compression;
Slice block_contents;
bool abort_compression = false;
@ -581,10 +616,20 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
}
assert(compression_dict != nullptr);
CompressionInfo compression_info(r->compression_opts, r->compression_ctx,
*compression_dict, r->compression_type);
block_contents =
CompressBlock(raw_block_contents, compression_info, &type,
r->table_options.format_version, &r->compressed_output);
*compression_dict, type,
sample_for_compression);
std::string sampled_output_fast;
std::string sampled_output_slow;
block_contents = CompressBlock(
raw_block_contents, compression_info, &type,
r->table_options.format_version, is_data_block /* do_sample */,
&r->compressed_output, &sampled_output_fast, &sampled_output_slow);
// notify collectors on block add
NotifyCollectTableCollectorsOnBlockAdd(
r->table_properties_collectors, raw_block_contents.size(),
sampled_output_fast.size(), sampled_output_slow.size());
// Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the

@ -45,6 +45,7 @@ class BlockBasedTableBuilder : public TableBuilder {
int_tbl_prop_collector_factories,
uint32_t column_family_id, WritableFileWriter* file,
const CompressionType compression_type,
const uint64_t sample_for_compression,
const CompressionOptions& compression_opts, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = 0, const uint64_t target_file_size = 0);
@ -137,6 +138,8 @@ class BlockBasedTableBuilder : public TableBuilder {
Slice CompressBlock(const Slice& raw, const CompressionInfo& info,
CompressionType* type, uint32_t format_version,
std::string* compressed_output);
bool do_sample, std::string* compressed_output,
std::string* sampled_output_fast,
std::string* sampled_output_slow);
} // namespace rocksdb

@ -214,6 +214,7 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder(
table_options_, table_builder_options.internal_comparator,
table_builder_options.int_tbl_prop_collector_factories, column_family_id,
file, table_builder_options.compression_type,
table_builder_options.sample_for_compression,
table_builder_options.compression_opts,
table_builder_options.skip_filters,
table_builder_options.column_family_name,

@ -558,8 +558,9 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2,
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, internal_comparator,
&int_tbl_prop_collector_factories,
options.compression, CompressionOptions(),
false /* skip_filters */, column_family_name, level_),
options.compression, options.sample_for_compression,
CompressionOptions(), false /* skip_filters */,
column_family_name, level_),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

@ -151,6 +151,16 @@ bool NotifyCollectTableCollectorsOnAdd(
return all_succeeded;
}
void NotifyCollectTableCollectorsOnBlockAdd(
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
const uint64_t blockRawBytes, const uint64_t blockCompressedBytesFast,
const uint64_t blockCompressedBytesSlow) {
for (auto& collector : collectors) {
collector->BlockAdd(blockRawBytes, blockCompressedBytesFast,
blockCompressedBytesSlow);
}
}
bool NotifyCollectTableCollectorsOnFinish(
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
Logger* info_log, PropertyBlockBuilder* builder) {

@ -83,6 +83,11 @@ bool NotifyCollectTableCollectorsOnAdd(
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
Logger* info_log);
void NotifyCollectTableCollectorsOnBlockAdd(
const std::vector<std::unique_ptr<IntTblPropCollector>>& collectors,
uint64_t blockRawBytes, uint64_t blockCompressedBytesFast,
uint64_t blockCompressedBytesSlow);
// NotifyCollectTableCollectorsOnAdd() triggers the `Finish` event for all
// property collectors. The collected properties will be added to `builder`.
bool NotifyCollectTableCollectorsOnFinish(

@ -202,6 +202,8 @@ Status SstFileWriter::Open(const std::string& file_path) {
compression_type = r->mutable_cf_options.compression;
compression_opts = r->ioptions.compression_opts;
}
uint64_t sample_for_compression =
r->mutable_cf_options.sample_for_compression;
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
@ -234,8 +236,9 @@ Status SstFileWriter::Open(const std::string& file_path) {
TableBuilderOptions table_builder_options(
r->ioptions, r->mutable_cf_options, r->internal_comparator,
&int_tbl_prop_collector_factories, compression_type, compression_opts,
r->skip_filters, r->column_family_name, unknown_level);
&int_tbl_prop_collector_factories, compression_type,
sample_for_compression, compression_opts, r->skip_filters,
r->column_family_name, unknown_level);
r->file_writer.reset(new WritableFileWriter(
std::move(sst_file), file_path, r->env_options, r->ioptions.env,
nullptr /* stats */, r->ioptions.listeners));

@ -33,6 +33,14 @@ class SstFileWriterPropertiesCollector : public IntTblPropCollector {
return Status::OK();
}
virtual void BlockAdd(uint64_t /* blockRawBytes */,
uint64_t /* blockCompressedBytesFast */,
uint64_t /* blockCompressedBytesSlow */) override {
// Intentionally left blank. No interest in collecting stats for
// blocks.
return;
}
virtual Status Finish(UserCollectedProperties* properties) override {
// File version
std::string version_val;

@ -73,7 +73,7 @@ struct TableBuilderOptions {
const InternalKeyComparator& _internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
_int_tbl_prop_collector_factories,
CompressionType _compression_type,
CompressionType _compression_type, uint64_t _sample_for_compression,
const CompressionOptions& _compression_opts, bool _skip_filters,
const std::string& _column_family_name, int _level,
const uint64_t _creation_time = 0, const int64_t _oldest_key_time = 0,
@ -83,6 +83,7 @@ struct TableBuilderOptions {
internal_comparator(_internal_comparator),
int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories),
compression_type(_compression_type),
sample_for_compression(_sample_for_compression),
compression_opts(_compression_opts),
skip_filters(_skip_filters),
column_family_name(_column_family_name),
@ -96,6 +97,7 @@ struct TableBuilderOptions {
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories;
CompressionType compression_type;
uint64_t sample_for_compression;
const CompressionOptions& compression_opts;
bool skip_filters; // only used by BlockBasedTableBuilder
const std::string& column_family_name;

@ -100,8 +100,9 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
tb = opts.table_factory->NewTableBuilder(
TableBuilderOptions(
ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression, CompressionOptions(),
false /* skip_filters */, kDefaultColumnFamilyName, unknown_level),
CompressionType::kNoCompression, 0 /* sample_for_compression */,
CompressionOptions(), false /* skip_filters */,
kDefaultColumnFamilyName, unknown_level),
0 /* column_family_id */, file_writer.get());
} else {
s = DB::Open(opts, dbname, &db);

@ -328,9 +328,9 @@ class TableConstructor: public Constructor {
builder.reset(ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, internal_comparator,
&int_tbl_prop_collector_factories,
options.compression, CompressionOptions(),
false /* skip_filters */, column_family_name,
level_),
options.compression, options.sample_for_compression,
CompressionOptions(), false /* skip_filters */,
column_family_name, level_),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer_.get()));
@ -2627,10 +2627,10 @@ TEST_F(PlainTableTest, BasicPlainTableProperties) {
std::string column_family_name;
int unknown_level = -1;
std::unique_ptr<TableBuilder> builder(factory.NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), false /* skip_filters */,
column_family_name, unknown_level),
TableBuilderOptions(
ioptions, moptions, ikc, &int_tbl_prop_collector_factories,
kNoCompression, 0 /* sample_for_compression */, CompressionOptions(),
false /* skip_filters */, column_family_name, unknown_level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
@ -3256,8 +3256,8 @@ TEST_P(BlockBasedTableTest, DISABLED_TableWithGlobalSeqno) {
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), false /* skip_filters */,
column_family_name, -1),
0 /* sample_for_compression */, CompressionOptions(),
false /* skip_filters */, column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
@ -3436,8 +3436,8 @@ TEST_P(BlockBasedTableTest, BlockAlignTest) {
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), false /* skip_filters */,
column_family_name, -1),
0 /* sample_for_compression */, CompressionOptions(),
false /* skip_filters */, column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));
@ -3529,8 +3529,8 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
std::unique_ptr<TableBuilder> builder(options.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, moptions, ikc,
&int_tbl_prop_collector_factories, kNoCompression,
CompressionOptions(), false /* skip_filters */,
column_family_name, -1),
0 /* sample_for_compression */, CompressionOptions(),
false /* skip_filters */, column_family_name, -1),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

@ -801,6 +801,8 @@ DEFINE_string(compression_type, "snappy",
static enum rocksdb::CompressionType FLAGS_compression_type_e =
rocksdb::kSnappyCompression;
DEFINE_int64(sample_for_compression, 0, "Sample every N block for compression");
DEFINE_int32(compression_level, rocksdb::CompressionOptions().level,
"Compression level. The meaning of this value is library-"
"dependent. If unset, we try to use the default for the library "
@ -2195,6 +2197,8 @@ class Benchmark {
auto compression = CompressionTypeToString(FLAGS_compression_type_e);
fprintf(stdout, "Compression: %s\n", compression.c_str());
fprintf(stdout, "Compression sampling rate: %" PRId64 "\n",
FLAGS_sample_for_compression);
switch (FLAGS_rep_factory) {
case kPrefixHash:
@ -2234,7 +2238,8 @@ class Benchmark {
CompressionOptions opts;
CompressionContext context(FLAGS_compression_type_e);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e);
FLAGS_compression_type_e,
FLAGS_sample_for_compression);
bool result = CompressSlice(info, Slice(input_str), &compressed);
if (!result) {
@ -3101,7 +3106,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
CompressionOptions opts;
CompressionContext context(FLAGS_compression_type_e);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e);
FLAGS_compression_type_e,
FLAGS_sample_for_compression);
// Compress 1G
while (ok && bytes < int64_t(1) << 30) {
compressed.clear();
@ -3129,9 +3135,9 @@ void VerifyDBFromDB(std::string& truth_db_name) {
CompressionContext compression_ctx(FLAGS_compression_type_e);
CompressionOptions compression_opts;
CompressionInfo compression_info(compression_opts, compression_ctx,
CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e);
CompressionInfo compression_info(
compression_opts, compression_ctx, CompressionDict::GetEmptyDict(),
FLAGS_compression_type_e, FLAGS_sample_for_compression);
UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
UncompressionInfo uncompression_info(uncompression_ctx,
UncompressionDict::GetEmptyDict(),
@ -3488,6 +3494,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type_e;
options.sample_for_compression = FLAGS_sample_for_compression;
options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds;
options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB;
options.max_total_wal_size = FLAGS_max_total_wal_size;

@ -59,8 +59,9 @@ void createSST(const Options& opts, const std::string& file_name) {
tb.reset(opts.table_factory->NewTableBuilder(
TableBuilderOptions(
imoptions, moptions, ikc, &int_tbl_prop_collector_factories,
CompressionType::kNoCompression, CompressionOptions(),
false /* skip_filters */, column_family_name, unknown_level),
CompressionType::kNoCompression, 0 /* sample_for_compression */,
CompressionOptions(), false /* skip_filters */, column_family_name,
unknown_level),
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
file_writer.get()));

@ -214,10 +214,10 @@ int SstFileDumper::ShowAllCompressionSizes(
CompressionOptions compress_opt;
std::string column_family_name;
int unknown_level = -1;
TableBuilderOptions tb_opts(imoptions, moptions, ikc,
&block_based_table_factories, i.first,
compress_opt, false /* skip_filters */,
column_family_name, unknown_level);
TableBuilderOptions tb_opts(
imoptions, moptions, ikc, &block_based_table_factories, i.first,
0 /* sample_for_compression */, compress_opt,
false /* skip_filters */, column_family_name, unknown_level);
uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size);
fprintf(stdout, "Compression: %s", i.second);
fprintf(stdout, " Size: %" PRIu64 "\n", file_size);

@ -333,17 +333,24 @@ class CompressionInfo {
const CompressionContext& context_;
const CompressionDict& dict_;
const CompressionType type_;
const uint64_t sample_for_compression_;
public:
CompressionInfo(const CompressionOptions& _opts,
const CompressionContext& _context,
const CompressionDict& _dict, CompressionType _type)
: opts_(_opts), context_(_context), dict_(_dict), type_(_type) {}
const CompressionDict& _dict, CompressionType _type,
uint64_t _sample_for_compression)
: opts_(_opts),
context_(_context),
dict_(_dict),
type_(_type),
sample_for_compression_(_sample_for_compression) {}
const CompressionOptions& options() const { return opts_; }
const CompressionContext& context() const { return context_; }
const CompressionDict& dict() const { return dict_; }
CompressionType type() const { return type_; }
uint64_t SampleForCompression() const { return sample_for_compression_; }
};
class UncompressionContext {

@ -755,9 +755,10 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
CompressionType type = bdb_options_.compression;
CompressionOptions opts;
CompressionContext context(type);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type);
CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat,
compression_output);
CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
0 /* sample_for_compression */);
CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
compression_output, nullptr, nullptr);
return *compression_output;
}

Loading…
Cancel
Save