Changes and enhancements to compression stats, thresholds (#11388)

Summary:
## Option API updates
* Add new CompressionOptions::max_compressed_bytes_per_kb, which corresponds to 1024.0 / min allowable compression ratio. This avoids the hard-coded minimum ratio of 8/7.
* Remove unnecessary constructor for CompressionOptions.
* Document undocumented CompressionOptions. Use idiom for default values shown clearly in one place (not precariously repeated).

 ## Stat API updates
* Deprecate the BYTES_COMPRESSED, BYTES_DECOMPRESSED histograms. Histograms incur substantial extra space & time costs compared to tickers, and the distribution of uncompressed data block sizes tends to be uninteresting. If we're interested in that distribution, I don't see why it should be limited to blocks stored as compressed.
* Deprecate the NUMBER_BLOCK_NOT_COMPRESSED ticker, because the name is very confusing.
* New or existing tickers relevant to compression:
  * BYTES_COMPRESSED_FROM
  * BYTES_COMPRESSED_TO
  * BYTES_COMPRESSION_BYPASSED
  * BYTES_COMPRESSION_REJECTED
  * COMPACT_WRITE_BYTES + FLUSH_WRITE_BYTES (both existing)
  * NUMBER_BLOCK_COMPRESSED (existing)
  * NUMBER_BLOCK_COMPRESSION_BYPASSED
  * NUMBER_BLOCK_COMPRESSION_REJECTED
  * BYTES_DECOMPRESSED_FROM
  * BYTES_DECOMPRESSED_TO

We can compute a number of things with these stats:
* "Successful" compression ratio: BYTES_COMPRESSED_FROM / BYTES_COMPRESSED_TO
* Compression ratio of data on which compression was attempted: (BYTES_COMPRESSED_FROM + BYTES_COMPRESSION_REJECTED) / (BYTES_COMPRESSED_TO + BYTES_COMPRESSION_REJECTED)
* Compression ratio of data that could be eligible for compression: (BYTES_COMPRESSED_FROM + X) / (BYTES_COMPRESSED_TO + X) where X = BYTES_COMPRESSION_REJECTED + NUMBER_BLOCK_COMPRESSION_REJECTED
* Overall SST compression ratio (compression disabled vs. actual): (Y - BYTES_COMPRESSED_TO + BYTES_COMPRESSED_FROM) / Y where Y = COMPACT_WRITE_BYTES + FLUSH_WRITE_BYTES

Keeping _REJECTED separate from _BYPASSED helps us to understand "wasted" CPU time in compression.

 ## BlockBasedTableBuilder
Various small refactorings, optimizations, and name clean-ups.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11388

Test Plan:
unit tests added

* `options_settable_test.cc`: use non-deprecated idiom for configuring CompressionOptions from string. The old idiom is tested elsewhere and does not need to be updated to support the new field.

Reviewed By: ajkr

Differential Revision: D45128202

Pulled By: pdillinger

fbshipit-source-id: 5a652bf5c022b7ec340cf79018cccf0686962803
oxigraph-8.3.2
Peter Dillinger 2 years ago committed by Facebook GitHub Bot
parent adc9001f20
commit d79be3dca2
  1. 2
      HISTORY.md
  2. 1
      db/blob/blob_file_builder.cc
  3. 95
      db/db_statistics_test.cc
  4. 90
      include/rocksdb/advanced_options.h
  5. 37
      include/rocksdb/statistics.h
  6. 10
      monitoring/statistics.cc
  7. 4
      options/cf_options.cc
  8. 9
      options/options_settable_test.cc
  9. 103
      table/block_based/block_based_table_builder.cc
  10. 4
      table/format.cc
  11. 3
      table/sst_file_dumper.cc
  12. 66
      table/table_test.cc
  13. 5
      test_util/testharness.h
  14. 2
      util/stop_watch.h

@ -3,6 +3,7 @@
### Public API Changes
* `SstFileWriter::DeleteRange()` now returns `Status::InvalidArgument` if the range's end key comes before its start key according to the user comparator. Previously the behavior was undefined.
* Add `multi_get_for_update` to C API.
* Remove unnecessary constructor for CompressionOptions.
### Behavior changes
* Changed default block cache size from an 8MB to 32MB LRUCache, which increases the default number of cache shards from 16 to 64. This change is intended to minimize cache mutex contention under stress conditions. See https://github.com/facebook/rocksdb/wiki/Block-Cache for more information.
@ -17,6 +18,7 @@
### New Features
* Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called.
* Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()`
* Added statistics tickers BYTES_COMPRESSED_FROM, BYTES_COMPRESSED_TO, BYTES_COMPRESSION_BYPASSED, BYTES_COMPRESSION_REJECTED, NUMBER_BLOCK_COMPRESSION_BYPASSED, and NUMBER_BLOCK_COMPRESSION_REJECTED. Disabled/deprecated histograms BYTES_COMPRESSED and BYTES_DECOMPRESSED, and ticker NUMBER_BLOCK_NOT_COMPRESSED. The new tickers offer more inight into compression ratios, rejected vs. disabled compression, etc. (#11388)
* New statistics `rocksdb.file.read.{flush|compaction}.micros` that measure read time of block-based SST tables or blob files during flush or compaction.
### Bug Fixes

@ -259,6 +259,7 @@ Status BlobFileBuilder::CompressBlobIfNeeded(
return Status::OK();
}
// TODO: allow user CompressionOptions, including max_compressed_bytes_per_kb
CompressionOptions opts;
CompressionContext context(blob_compression_type_);
constexpr uint64_t sample_for_compression = 0;

@ -49,47 +49,102 @@ TEST_F(DBStatisticsTest, CompressionStatsTest) {
options.compression = type;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(StatsLevel::kExceptTimeForMutex);
BlockBasedTableOptions bbto;
bbto.enable_index_compression = false;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
int kNumKeysWritten = 100000;
auto PopStat = [&](Tickers t) -> uint64_t {
return options.statistics->getAndResetTickerCount(t);
};
int kNumKeysWritten = 100;
double compress_to = 0.5;
// About three KVs per block
int len = static_cast<int>(BlockBasedTableOptions().block_size / 3);
int uncomp_est = kNumKeysWritten * (len + 20);
// Check that compressions occur and are counted when compression is turned on
Random rnd(301);
std::string buf;
// Check that compressions occur and are counted when compression is turned on
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), rnd.RandomString(128) + std::string(128, 'a')));
ASSERT_OK(
Put(Key(i), test::CompressibleString(&rnd, compress_to, len, &buf)));
}
ASSERT_OK(Flush());
ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED), 0);
EXPECT_EQ(34, PopStat(NUMBER_BLOCK_COMPRESSED));
EXPECT_NEAR2(uncomp_est, PopStat(BYTES_COMPRESSED_FROM), uncomp_est / 10);
EXPECT_NEAR2(uncomp_est * compress_to, PopStat(BYTES_COMPRESSED_TO),
uncomp_est / 10);
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_DECOMPRESSED));
EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_FROM));
EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_TO));
// And decompressions
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
ASSERT_GT(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED), 0);
EXPECT_EQ(34, PopStat(NUMBER_BLOCK_DECOMPRESSED));
EXPECT_NEAR2(uncomp_est, PopStat(BYTES_DECOMPRESSED_TO), uncomp_est / 10);
EXPECT_NEAR2(uncomp_est * compress_to, PopStat(BYTES_DECOMPRESSED_FROM),
uncomp_est / 10);
options.compression = kNoCompression;
EXPECT_EQ(0, PopStat(BYTES_COMPRESSION_BYPASSED));
EXPECT_EQ(0, PopStat(BYTES_COMPRESSION_REJECTED));
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
// Check when compression is rejected.
compress_to = 0.95;
DestroyAndReopen(options);
uint64_t currentCompressions =
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED);
uint64_t currentDecompressions =
options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED);
// Check that compressions do not occur when turned off
for (int i = 0; i < kNumKeysWritten; ++i) {
// compressible string
ASSERT_OK(Put(Key(i), rnd.RandomString(128) + std::string(128, 'a')));
ASSERT_OK(
Put(Key(i), test::CompressibleString(&rnd, compress_to, len, &buf)));
}
ASSERT_OK(Flush());
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSED) -
currentCompressions,
0);
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
EXPECT_EQ(34, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
EXPECT_NEAR2(uncomp_est, PopStat(BYTES_COMPRESSION_REJECTED),
uncomp_est / 10);
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSED));
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_DECOMPRESSED));
EXPECT_EQ(0, PopStat(BYTES_COMPRESSED_FROM));
EXPECT_EQ(0, PopStat(BYTES_COMPRESSED_TO));
EXPECT_EQ(0, PopStat(BYTES_COMPRESSION_BYPASSED));
EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_FROM));
EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_TO));
// Check when compression is disabled.
options.compression = kNoCompression;
DestroyAndReopen(options);
for (int i = 0; i < kNumKeysWritten; ++i) {
ASSERT_OK(
Put(Key(i), test::CompressibleString(&rnd, compress_to, len, &buf)));
}
ASSERT_OK(Flush());
for (int i = 0; i < kNumKeysWritten; ++i) {
auto r = Get(Key(i));
}
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_DECOMPRESSED) -
currentDecompressions,
0);
EXPECT_EQ(34, PopStat(NUMBER_BLOCK_COMPRESSION_BYPASSED));
EXPECT_NEAR2(uncomp_est, PopStat(BYTES_COMPRESSION_BYPASSED),
uncomp_est / 10);
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSED));
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_COMPRESSION_REJECTED));
EXPECT_EQ(0, PopStat(NUMBER_BLOCK_DECOMPRESSED));
EXPECT_EQ(0, PopStat(BYTES_COMPRESSED_FROM));
EXPECT_EQ(0, PopStat(BYTES_COMPRESSED_TO));
EXPECT_EQ(0, PopStat(BYTES_COMPRESSION_REJECTED));
EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_FROM));
EXPECT_EQ(0, PopStat(BYTES_DECOMPRESSED_TO));
}
TEST_F(DBStatisticsTest, MutexWaitStatsDisabledByDefault) {

@ -85,17 +85,28 @@ struct CompactionOptionsFIFO {
// Compression options for different compression algorithms like Zlib
struct CompressionOptions {
// ==> BEGIN options that can be set by deprecated configuration syntax, <==
// ==> e.g. compression_opts=5:6:7:8:9:10:true:11:false <==
// ==> Please use compression_opts={level=6;strategy=7;} form instead. <==
// RocksDB's generic default compression level. Internally it'll be translated
// to the default compression level specific to the library being used (see
// comment above `ColumnFamilyOptions::compression`).
//
// The default value is the max 16-bit int as it'll be written out in OPTIONS
// file, which should be portable.
const static int kDefaultCompressionLevel = 32767;
static constexpr int kDefaultCompressionLevel = 32767;
// zlib only: windowBits parameter. See https://www.zlib.net/manual.html
int window_bits = -14;
// Compression "level" applicable to zstd, zlib, LZ4. Except for
// kDefaultCompressionLevel (see above), the meaning of each value depends
// on the compression algorithm.
int level = kDefaultCompressionLevel;
int window_bits;
int level;
int strategy;
// zlib only: strategy parameter. See https://www.zlib.net/manual.html
int strategy = 0;
// Maximum size of dictionaries used to prime the compression library.
// Enabling dictionary can improve compression ratios when there are
@ -117,18 +128,14 @@ struct CompressionOptions {
// If block cache insertion fails with `Status::MemoryLimit` (i.e., it is
// full), we finalize the dictionary with whatever data we have and then stop
// buffering.
//
// Default: 0.
uint32_t max_dict_bytes;
uint32_t max_dict_bytes = 0;
// Maximum size of training data passed to zstd's dictionary trainer. Using
// zstd's dictionary trainer can achieve even better compression ratio
// improvements than using `max_dict_bytes` alone.
//
// The training data will be used to generate a dictionary of max_dict_bytes.
//
// Default: 0.
uint32_t zstd_max_train_bytes;
uint32_t zstd_max_train_bytes = 0;
// Number of threads for parallel compression.
// Parallel compression is enabled only if threads > 1.
@ -141,9 +148,7 @@ struct CompressionOptions {
// compressed size is in flight when compression is parallelized. To be
// reasonably accurate, this inflation is also estimated by using historical
// compression ratio and current bytes inflight.
//
// Default: 1.
uint32_t parallel_threads;
uint32_t parallel_threads = 1;
// When the compression options are set by the user, it will be set to "true".
// For bottommost_compression_opts, to enable it, user must set enabled=true.
@ -152,9 +157,7 @@ struct CompressionOptions {
//
// For compression_opts, if compression_opts.enabled=false, it is still
// used as compression options for compression process.
//
// Default: false.
bool enabled;
bool enabled = false;
// Limit on data buffering when gathering samples to build a dictionary. Zero
// means no limit. When dictionary is disabled (`max_dict_bytes == 0`),
@ -173,9 +176,7 @@ struct CompressionOptions {
// `zstd_max_train_bytes` (when enabled) can restrict how many samples we can
// pass to the dictionary trainer. Configuring it below `max_dict_bytes` can
// restrict the size of the final dictionary.
//
// Default: 0 (unlimited)
uint64_t max_dict_buffer_bytes;
uint64_t max_dict_buffer_bytes = 0;
// Use zstd trainer to generate dictionaries. When this option is set to true,
// zstd_max_train_bytes of training data sampled from max_dict_buffer_bytes
@ -187,34 +188,29 @@ struct CompressionOptions {
// data will be passed to this API. Using this API should save CPU time on
// dictionary training, but the compression ratio may not be as good as using
// a dictionary trainer.
//
// Default: true
bool use_zstd_dict_trainer;
CompressionOptions()
: window_bits(-14),
level(kDefaultCompressionLevel),
strategy(0),
max_dict_bytes(0),
zstd_max_train_bytes(0),
parallel_threads(1),
enabled(false),
max_dict_buffer_bytes(0),
use_zstd_dict_trainer(true) {}
CompressionOptions(int wbits, int _lev, int _strategy,
uint32_t _max_dict_bytes, uint32_t _zstd_max_train_bytes,
uint32_t _parallel_threads, bool _enabled,
uint64_t _max_dict_buffer_bytes,
bool _use_zstd_dict_trainer)
: window_bits(wbits),
level(_lev),
strategy(_strategy),
max_dict_bytes(_max_dict_bytes),
zstd_max_train_bytes(_zstd_max_train_bytes),
parallel_threads(_parallel_threads),
enabled(_enabled),
max_dict_buffer_bytes(_max_dict_buffer_bytes),
use_zstd_dict_trainer(_use_zstd_dict_trainer) {}
bool use_zstd_dict_trainer = true;
// ===> END options that can be set by deprecated configuration syntax <===
// ===> Use compression_opts={level=6;strategy=7;} form for below opts <===
// Essentially specifies a minimum acceptable compression ratio. A block is
// stored uncompressed if the compressed block does not achieve this ratio,
// because the downstream cost of decompression is not considered worth such
// a small savings (if any).
// However, the ratio is specified in a way that is efficient for checking.
// An integer from 1 to 1024 indicates the maximum allowable compressed bytes
// per 1KB of input, so the minimum acceptable ratio is 1024.0 / this value.
// For example, for a minimum ratio of 1.5:1, set to 683. See SetMinRatio().
// Default: abandon use of compression for a specific block or entry if
// compressed by less than 12.5% (minimum ratio of 1.143:1).
int max_compressed_bytes_per_kb = 1024 * 7 / 8;
// A convenience function for setting max_compressed_bytes_per_kb based on a
// minimum acceptable compression ratio (uncompressed size over compressed
// size).
void SetMinRatio(double min_ratio) {
max_compressed_bytes_per_kb = static_cast<int>(1024.0 / min_ratio + 0.5);
}
};
// Temperature of a file. Used to pass to FileSystem for a different

@ -201,6 +201,7 @@ enum Tickers : uint32_t {
NUMBER_BLOCK_COMPRESSED,
NUMBER_BLOCK_DECOMPRESSED,
// DEPRECATED / unused (see NUMBER_BLOCK_COMPRESSION_*)
NUMBER_BLOCK_NOT_COMPRESSED,
MERGE_OPERATION_TOTAL_TIME,
FILTER_OPERATION_TOTAL_TIME,
@ -435,6 +436,36 @@ enum Tickers : uint32_t {
// # of times timestamps can successfully help skip the table access
TIMESTAMP_FILTER_TABLE_FILTERED,
// Number of input bytes (uncompressed) to compression for SST blocks that
// are stored compressed.
BYTES_COMPRESSED_FROM,
// Number of output bytes (compressed) from compression for SST blocks that
// are stored compressed.
BYTES_COMPRESSED_TO,
// Number of uncompressed bytes for SST blocks that are stored uncompressed
// because compression type is kNoCompression, or some error case caused
// compression not to run or produce an output. Index blocks are only counted
// if enable_index_compression is true.
BYTES_COMPRESSION_BYPASSED,
// Number of input bytes (uncompressed) to compression for SST blocks that
// are stored uncompressed because the compression result was rejected,
// either because the ratio was not acceptable (see
// CompressionOptions::max_compressed_bytes_per_kb) or found invalid by the
// `verify_compression` option.
BYTES_COMPRESSION_REJECTED,
// Like BYTES_COMPRESSION_BYPASSED but counting number of blocks
NUMBER_BLOCK_COMPRESSION_BYPASSED,
// Like BYTES_COMPRESSION_REJECTED but counting number of blocks
NUMBER_BLOCK_COMPRESSION_REJECTED,
// Number of input bytes (compressed) to decompression in reading compressed
// SST blocks from storage.
BYTES_DECOMPRESSED_FROM,
// Number of output bytes (uncompressed) from decompression in reading
// compressed SST blocks from storage.
BYTES_DECOMPRESSED_TO,
TICKER_ENUM_MAX
};
@ -486,10 +517,8 @@ enum Histograms : uint32_t {
BYTES_PER_WRITE,
BYTES_PER_MULTIGET,
// number of bytes compressed/decompressed
// number of bytes is when uncompressed; i.e. before/after respectively
BYTES_COMPRESSED,
BYTES_DECOMPRESSED,
BYTES_COMPRESSED, // DEPRECATED / unused (see BYTES_COMPRESSED_{FROM,TO})
BYTES_DECOMPRESSED, // DEPRECATED / unused (see BYTES_DECOMPRESSED_{FROM,TO})
COMPRESSION_TIMES_NANOS,
DECOMPRESSION_TIMES_NANOS,
// Number of merge operands passed to the merge operator in user read

@ -222,6 +222,16 @@ const std::vector<std::pair<Tickers, std::string>> TickersNameMap = {
{TIMESTAMP_FILTER_TABLE_CHECKED, "rocksdb.timestamp.filter.table.checked"},
{TIMESTAMP_FILTER_TABLE_FILTERED,
"rocksdb.timestamp.filter.table.filtered"},
{BYTES_COMPRESSED_FROM, "rocksdb.bytes.compressed.from"},
{BYTES_COMPRESSED_TO, "rocksdb.bytes.compressed.to"},
{BYTES_COMPRESSION_BYPASSED, "rocksdb.bytes.compression_bypassed"},
{BYTES_COMPRESSION_REJECTED, "rocksdb.bytes.compression.rejected"},
{NUMBER_BLOCK_COMPRESSION_BYPASSED,
"rocksdb.number.block_compression_bypassed"},
{NUMBER_BLOCK_COMPRESSION_REJECTED,
"rocksdb.number.block_compression_rejected"},
{BYTES_DECOMPRESSED_FROM, "rocksdb.bytes.decompressed.from"},
{BYTES_DECOMPRESSED_TO, "rocksdb.bytes.decompressed.to"},
};
const std::vector<std::pair<Histograms, std::string>> HistogramsNameMap = {

@ -147,6 +147,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"strategy",
{offsetof(struct CompressionOptions, strategy), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
{"max_compressed_bytes_per_kb",
{offsetof(struct CompressionOptions, max_compressed_bytes_per_kb),
OptionType::kInt, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"max_dict_bytes",
{offsetof(struct CompressionOptions, max_dict_bytes), OptionType::kInt,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},

@ -497,8 +497,13 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"max_bytes_for_level_multiplier=60;"
"memtable_factory=SkipListFactory;"
"compression=kNoCompression;"
"compression_opts=5:6:7:8:9:10:true:11:false;"
"bottommost_compression_opts=4:5:6:7:8:9:true:10:true;"
"compression_opts={max_dict_buffer_bytes=5;use_zstd_dict_trainer=true;"
"enabled=false;parallel_threads=6;zstd_max_train_bytes=7;strategy=8;max_"
"dict_bytes=9;level=10;window_bits=11;max_compressed_bytes_per_kb=987;};"
"bottommost_compression_opts={max_dict_buffer_bytes=4;use_zstd_dict_"
"trainer=true;enabled=true;parallel_threads=5;zstd_max_train_bytes=6;"
"strategy=7;max_dict_bytes=8;level=9;window_bits=10;max_compressed_bytes_"
"per_kb=876;};"
"bottommost_compression=kDisableCompressionOption;"
"level0_stop_writes_trigger=33;"
"num_levels=99;"

@ -104,9 +104,12 @@ FilterBlockBuilder* CreateFilterBlockBuilder(
}
}
bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size) {
// Check to see if compressed less than 12.5%
return compressed_size < uncomp_size - (uncomp_size / 8u);
bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size,
int max_compressed_bytes_per_kb) {
// For efficiency, avoid floating point and division
return compressed_size <=
(static_cast<uint64_t>(max_compressed_bytes_per_kb) * uncomp_size) >>
10;
}
} // namespace
@ -114,7 +117,7 @@ bool GoodCompressionRatio(size_t compressed_size, size_t uncomp_size) {
// format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info,
CompressionType* type, uint32_t format_version,
bool do_sample, std::string* compressed_output,
bool allow_sample, std::string* compressed_output,
std::string* sampled_output_fast,
std::string* sampled_output_slow) {
assert(type);
@ -126,7 +129,7 @@ Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info,
// The users can use these stats to decide if it is worthwhile
// enabling compression and they also get a hint about which
// compression algorithm wil be beneficial.
if (do_sample && info.SampleForCompression() &&
if (allow_sample && info.SampleForCompression() &&
Random::GetTLSInstance()->OneIn(
static_cast<int>(info.SampleForCompression()))) {
// Sampling with a fast compression algorithm
@ -159,7 +162,8 @@ Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info,
}
}
if (info.type() == kNoCompression) {
int max_compressed_bytes_per_kb = info.options().max_compressed_bytes_per_kb;
if (info.type() == kNoCompression || max_compressed_bytes_per_kb <= 0) {
*type = kNoCompression;
return uncompressed_data;
}
@ -175,8 +179,8 @@ Slice CompressBlock(const Slice& uncompressed_data, const CompressionInfo& info,
// Check the compression ratio; if it's not good enough, just fall back to
// uncompressed
if (!GoodCompressionRatio(compressed_output->size(),
uncompressed_data.size())) {
if (!GoodCompressionRatio(compressed_output->size(), uncompressed_data.size(),
max_compressed_bytes_per_kb)) {
*type = kNoCompression;
return uncompressed_data;
}
@ -1108,25 +1112,17 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
const CompressionContext& compression_ctx, UncompressionContext* verify_ctx,
std::string* compressed_output, Slice* block_contents,
CompressionType* type, Status* out_status) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
Rep* r = rep_;
bool is_status_ok = ok();
if (!r->IsParallelCompressionEnabled()) {
assert(is_status_ok);
}
*type = r->compression_type;
uint64_t sample_for_compression = r->sample_for_compression;
bool abort_compression = false;
StopWatchNano timer(
r->ioptions.clock,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
if (is_status_ok && uncompressed_block_data.size() < kCompressionSizeLimit) {
StopWatchNano timer(
r->ioptions.clock,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats));
if (is_data_block) {
r->compressible_input_data_bytes.fetch_add(uncompressed_block_data.size(),
std::memory_order_relaxed);
@ -1139,14 +1135,14 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
}
assert(compression_dict != nullptr);
CompressionInfo compression_info(r->compression_opts, compression_ctx,
*compression_dict, *type,
sample_for_compression);
*compression_dict, r->compression_type,
r->sample_for_compression);
std::string sampled_output_fast;
std::string sampled_output_slow;
*block_contents = CompressBlock(
uncompressed_block_data, compression_info, type,
r->table_options.format_version, is_data_block /* do_sample */,
r->table_options.format_version, is_data_block /* allow_sample */,
compressed_output, &sampled_output_fast, &sampled_output_slow);
if (sampled_output_slow.size() > 0 || sampled_output_fast.size() > 0) {
@ -1179,35 +1175,38 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
BlockContents contents;
UncompressionInfo uncompression_info(*verify_ctx, *verify_dict,
r->compression_type);
Status stat = UncompressBlockData(
Status uncompress_status = UncompressBlockData(
uncompression_info, block_contents->data(), block_contents->size(),
&contents, r->table_options.format_version, r->ioptions);
if (stat.ok()) {
bool compressed_ok =
contents.data.compare(uncompressed_block_data) == 0;
if (!compressed_ok) {
if (uncompress_status.ok()) {
bool data_match = contents.data.compare(uncompressed_block_data) == 0;
if (!data_match) {
// The result of the compression was invalid. abort.
abort_compression = true;
const char* const msg =
"Decompressed block did not match pre-compression block";
ROCKS_LOG_ERROR(r->ioptions.logger, "%s", msg);
*out_status = Status::Corruption(msg);
*type = kNoCompression;
}
} else {
// Decompression reported an error. abort.
*out_status = Status::Corruption(std::string("Could not decompress: ") +
stat.getState());
abort_compression = true;
uncompress_status.getState());
*type = kNoCompression;
}
}
if (timer.IsStarted()) {
RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
}
} else {
// Block is too big to be compressed.
// Status is not OK, or block is too big to be compressed.
if (is_data_block) {
r->uncompressible_input_data_bytes.fetch_add(
uncompressed_block_data.size(), std::memory_order_relaxed);
}
abort_compression = true;
*type = kNoCompression;
}
if (is_data_block) {
r->uncompressible_input_data_bytes.fetch_add(kBlockTrailerSize,
@ -1216,26 +1215,32 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
// Abort compression if the block is too big, or did not pass
// verification.
if (abort_compression) {
RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
*type = kNoCompression;
if (*type == kNoCompression) {
*block_contents = uncompressed_block_data;
} else if (*type != kNoCompression) {
if (ShouldReportDetailedTime(r->ioptions.env, r->ioptions.stats)) {
RecordTimeToHistogram(r->ioptions.stats, COMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
}
RecordInHistogram(r->ioptions.stats, BYTES_COMPRESSED,
uncompressed_block_data.size());
bool compression_attempted = !compressed_output->empty();
RecordTick(r->ioptions.stats, compression_attempted
? NUMBER_BLOCK_COMPRESSION_REJECTED
: NUMBER_BLOCK_COMPRESSION_BYPASSED);
RecordTick(r->ioptions.stats,
compression_attempted ? BYTES_COMPRESSION_REJECTED
: BYTES_COMPRESSION_BYPASSED,
uncompressed_block_data.size());
} else {
RecordTick(r->ioptions.stats, NUMBER_BLOCK_COMPRESSED);
} else if (*type != r->compression_type) {
RecordTick(r->ioptions.stats, NUMBER_BLOCK_NOT_COMPRESSED);
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_FROM,
uncompressed_block_data.size());
RecordTick(r->ioptions.stats, BYTES_COMPRESSED_TO,
compressed_output->size());
}
}
void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
const Slice& block_contents, CompressionType type, BlockHandle* handle,
const Slice& block_contents, CompressionType comp_type, BlockHandle* handle,
BlockType block_type, const Slice* uncompressed_block_data) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// compression_type: uint8
// checksum: uint32
Rep* r = rep_;
bool is_data_block = block_type == BlockType::kData;
// Old, misleading name of this function: WriteRawBlock
@ -1246,7 +1251,7 @@ void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
assert(io_status().ok());
if (uncompressed_block_data == nullptr) {
uncompressed_block_data = &block_contents;
assert(type == kNoCompression);
assert(comp_type == kNoCompression);
}
{
@ -1258,10 +1263,10 @@ void BlockBasedTableBuilder::WriteMaybeCompressedBlock(
}
std::array<char, kBlockTrailerSize> trailer;
trailer[0] = type;
trailer[0] = comp_type;
uint32_t checksum = ComputeBuiltinChecksumWithLastByte(
r->table_options.checksum, block_contents.data(), block_contents.size(),
/*last_byte*/ type);
/*last_byte*/ comp_type);
if (block_type == BlockType::kFilter) {
Status s = r->filter_builder->MaybePostVerifyFilter(block_contents);

@ -530,8 +530,8 @@ Status UncompressBlockData(const UncompressionInfo& uncompression_info,
RecordTimeToHistogram(ioptions.stats, DECOMPRESSION_TIMES_NANOS,
timer.ElapsedNanos());
}
RecordTimeToHistogram(ioptions.stats, BYTES_DECOMPRESSED,
out_contents->data.size());
RecordTick(ioptions.stats, BYTES_DECOMPRESSED_FROM, size);
RecordTick(ioptions.stats, BYTES_DECOMPRESSED_TO, out_contents->data.size());
RecordTick(ioptions.stats, NUMBER_BLOCK_DECOMPRESSED);
TEST_SYNC_POINT_CALLBACK("UncompressBlockData:TamperWithReturnValue",

@ -315,7 +315,8 @@ Status SstFileDumper::ShowCompressionSize(
const uint64_t compressed_blocks =
opts.statistics->getAndResetTickerCount(NUMBER_BLOCK_COMPRESSED);
const uint64_t not_compressed_blocks =
opts.statistics->getAndResetTickerCount(NUMBER_BLOCK_NOT_COMPRESSED);
opts.statistics->getAndResetTickerCount(
NUMBER_BLOCK_COMPRESSION_REJECTED);
// When the option enable_index_compression is true,
// NUMBER_BLOCK_COMPRESSED is incremented for index block(s).
if ((compressed_blocks + not_compressed_blocks) > num_data_blocks) {

@ -1718,7 +1718,8 @@ TEST_P(BlockBasedTableTest, BasicBlockBasedTableProperties) {
MutableCFOptions moptions(options);
c.Finish(options, ioptions, moptions, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
ASSERT_EQ(options.statistics->getTickerCount(NUMBER_BLOCK_NOT_COMPRESSED), 0);
ASSERT_EQ(
options.statistics->getTickerCount(NUMBER_BLOCK_COMPRESSION_REJECTED), 0);
auto& props = *c.GetTableReader()->GetTableProperties();
ASSERT_EQ(kvmap.size(), props.num_entries);
@ -5079,6 +5080,69 @@ TEST_P(BlockBasedTableTest, PropertiesBlockRestartPointTest) {
}
}
TEST_P(BlockBasedTableTest, CompressionRatioThreshold) {
Options options;
if (Snappy_Supported()) {
options.compression = kSnappyCompression;
fprintf(stderr, "using snappy\n");
} else if (Zlib_Supported()) {
options.compression = kZlibCompression;
fprintf(stderr, "using zlib\n");
} else if (BZip2_Supported()) {
options.compression = kBZip2Compression;
fprintf(stderr, "using bzip2\n");
} else if (LZ4_Supported()) {
options.compression = kLZ4Compression;
fprintf(stderr, "using lz4\n");
} else if (XPRESS_Supported()) {
options.compression = kXpressCompression;
fprintf(stderr, "using xpress\n");
} else if (ZSTD_Supported()) {
options.compression = kZSTD;
fprintf(stderr, "using ZSTD\n");
} else {
fprintf(stderr, "skipping test, compression disabled\n");
return;
}
BlockBasedTableOptions table_options = GetBlockBasedTableOptions();
int len = 10000;
Random rnd(301);
std::vector<std::string> keys;
stl_wrappers::KVMap kvmap;
// Test the max_compressed_bytes_per_kb option
for (int threshold : {0, 1, 100, 400, 600, 900, 1024}) {
SCOPED_TRACE("threshold=" + std::to_string(threshold));
options.compression_opts.max_compressed_bytes_per_kb = threshold;
ImmutableOptions ioptions(options);
MutableCFOptions moptions(options);
for (double compressible_to : {0.25, 0.75}) {
SCOPED_TRACE("compressible_to=" + std::to_string(compressible_to));
TableConstructor c(BytewiseComparator(),
true /* convert_to_internal_key_ */);
std::string buf;
c.Add("x", test::CompressibleString(&rnd, compressible_to, len, &buf));
// write an SST file
c.Finish(options, ioptions, moptions, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
size_t table_file_size = c.TEST_GetSink()->contents().size();
size_t approx_sst_overhead = 1000;
if (compressible_to < threshold / 1024.0) {
// Should be compressed
EXPECT_NEAR2(len * compressible_to + approx_sst_overhead,
table_file_size, len / 10);
} else {
// Should not be compressed
EXPECT_NEAR2(len + approx_sst_overhead, table_file_size, len / 10);
}
}
}
}
TEST_P(BlockBasedTableTest, PropertiesMetaBlockLast) {
// The properties meta-block should come at the end since we always need to
// read it when opening a file, unlike index/filter/other meta-blocks, which

@ -51,6 +51,11 @@
GTEST_SUCCESS_("BYPASSED: " m); \
} while (false) /* user ; */
// Avoid "loss of precision" warnings when passing in 64-bit integers
#define EXPECT_NEAR2(val1, val2, abs_error) \
EXPECT_NEAR(static_cast<double>(val1), static_cast<double>(val2), \
static_cast<double>(abs_error))
#include <string>
#include "port/stack_trace.h"

@ -126,6 +126,8 @@ class StopWatchNano {
return (clock_ != nullptr) ? ElapsedNanos(reset) : 0U;
}
bool IsStarted() { return start_ != 0; }
private:
SystemClock* clock_;
uint64_t start_;

Loading…
Cancel
Save