Limit buffering for collecting samples for compression dictionary (#7970)

Summary:
For dictionary compression, we need to collect some representative samples of the data to be compressed, which we use to either generate or train (when `CompressionOptions::zstd_max_train_bytes > 0`) a dictionary. Previously, the strategy was to buffer all the data blocks during flush, and up to the target file size during compaction. That strategy allowed us to randomly pick samples from as wide a range as possible that'd be guaranteed to land in a single output file.

However, some users try to make huge files in memory-constrained environments, where this strategy can cause OOM. This PR introduces an option, `CompressionOptions::max_dict_buffer_bytes`, that limits how much data blocks are buffered before we switch to unbuffered mode (which means creating the per-SST dictionary, writing out the buffered data, and compressing/writing new blocks as soon as they are built). It is not strict as we currently buffer more than just data blocks -- also keys are buffered. But it does make a step towards giving users predictable memory usage.

Related changes include:

- Changed sampling for dictionary compression to select unique data blocks when there is limited availability of data blocks
- Made use of `BlockBuilder::SwapAndReset()` to save an allocation+memcpy when buffering data blocks for building a dictionary
- Changed `ParseBoolean()` to accept an input containing characters after the boolean. This is necessary since, with this PR, a value for `CompressionOptions::enabled` is no longer necessarily the final component in the `CompressionOptions` string.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7970

Test Plan:
- updated `CompressionOptions` unit tests to verify limit is respected (to the extent expected in the current implementation) in various scenarios of flush/compaction to bottommost/non-bottommost level
- looked at jemalloc heap profiles right before and after switching to unbuffered mode during flush/compaction. Verified memory usage in buffering is proportional to the limit set.

Reviewed By: pdillinger

Differential Revision: D26467994

Pulled By: ajkr

fbshipit-source-id: 3da4ef9fba59974e4ef40e40c01611002c861465
main
Andrew Kryczka 4 years ago committed by Facebook GitHub Bot
parent cf14cb3e29
commit d904233d2f
  1. 1
      HISTORY.md
  2. 13
      db/c.cc
  3. 118
      db/db_test2.cc
  4. 1
      db_stress_tool/db_stress_common.h
  5. 4
      db_stress_tool/db_stress_gflags.cc
  6. 2
      db_stress_tool/db_stress_test_base.cc
  7. 29
      include/rocksdb/advanced_options.h
  8. 6
      include/rocksdb/c.h
  9. 21
      java/rocksjni/compression_options.cc
  10. 90
      options/cf_options.cc
  11. 9
      options/options.cc
  12. 14
      options/options_settable_test.cc
  13. 8
      options/options_test.cc
  14. 82
      table/block_based/block_based_table_builder.cc
  15. 5
      table/block_based/block_based_table_builder.h
  16. 4
      table/sst_file_dumper.cc
  17. 3
      table/sst_file_dumper.h
  18. 6
      tools/db_bench_tool.cc
  19. 7
      tools/db_crashtest.py
  20. 18
      tools/sst_dump_tool.cc
  21. 3
      util/compression.h

@ -20,6 +20,7 @@
### Public API Change
* Added a "only_mutable_options" flag to the ConfigOptions. When this flag is "true", the Configurable functions and convenience methods (such as GetDBOptionsFromString) will only deal with options that are marked as mutable. When this flag is true, only options marked as mutable can be configured (a Status::InvalidArgument will be returned) and options not marked as mutable will not be returned or compared. The default is "false", meaning to compare all options.
* Add new Append and PositionedAppend APIs to FileSystem to bring the data verification information (data checksum information) from upper layer (e.g., WritableFileWriter) to the storage layer. In this way, the customized FileSystem is able to verify the correctness of data being written to the storage on time. Add checksum_handoff_file_types to DBOptions. User can use this option to control which file types (Currently supported file tyes: kWALFile, kTableFile, kDescriptorFile.) should use the new Append and PositionedAppend APIs to handoff the verification information. Currently, RocksDB only use crc32c to calculate the checksum for write handoff.
* Add an option, `CompressionOptions::max_dict_buffer_bytes`, to limit the in-memory buffering for selecting samples for generating/training a dictionary. The limit is currently loosely adhered to.
## 6.17.0 (01/15/2021)
### Behavior Changes

@ -2774,6 +2774,14 @@ void rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
opt->rep.bottommost_compression_opts.enabled = enabled;
}
void rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes(
rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes,
unsigned char enabled) {
opt->rep.bottommost_compression_opts.max_dict_buffer_bytes =
max_dict_buffer_bytes;
opt->rep.bottommost_compression_opts.enabled = enabled;
}
void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits,
int level, int strategy,
int max_dict_bytes) {
@ -2788,6 +2796,11 @@ void rocksdb_options_set_compression_options_zstd_max_train_bytes(
opt->rep.compression_opts.zstd_max_train_bytes = zstd_max_train_bytes;
}
void rocksdb_options_set_compression_options_max_dict_buffer_bytes(
rocksdb_options_t* opt, uint64_t max_dict_buffer_bytes) {
opt->rep.compression_opts.max_dict_buffer_bytes = max_dict_buffer_bytes;
}
void rocksdb_options_set_prefix_extractor(
rocksdb_options_t* opt, rocksdb_slicetransform_t* prefix_extractor) {
opt->rep.prefix_extractor.reset(prefix_extractor);

@ -1410,67 +1410,94 @@ INSTANTIATE_TEST_CASE_P(
TEST_P(PresetCompressionDictTest, Flush) {
// Verifies that dictionary is generated and written during flush only when
// `ColumnFamilyOptions::compression` enables dictionary.
// `ColumnFamilyOptions::compression` enables dictionary. Also verifies the
// size of the dictionary is within expectations according to the limit on
// buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.memtable_factory.reset(new SpecialSkipListFactory(kKeysPerFile));
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
Random rnd(301);
for (size_t i = 0; i <= kKeysPerFile; ++i) {
ASSERT_OK(Put(Key(static_cast<int>(i)), rnd.RandomString(kValueLen)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// If there's a compression dictionary, it should have been loaded when the
// flush finished, incurring a cache miss.
uint64_t expected_compression_dict_misses;
// We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
// compression dictionary exists since dictionaries would be preloaded when
// the flush finishes.
if (bottommost_) {
expected_compression_dict_misses = prev_compression_dict_misses;
// Flush is never considered bottommost. This should change in the future
// since flushed files may have nothing underneath them, like the one in
// this test case.
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
} else {
expected_compression_dict_misses = prev_compression_dict_misses + 1;
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
0);
// TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
// number of bytes needs to be adjusted in case the cached block is in
// ZSTD's digested dictionary format.
if (compression_type_ != kZSTD &&
compression_type_ != kZSTDNotFinalCompression) {
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit
// after each block is built.
ASSERT_LE(TestGetTickerCount(options,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
2 * kBlockLen);
}
}
ASSERT_EQ(expected_compression_dict_misses,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when `ColumnFamilyOptions::compression` enables
// dictionary.
// dictionary. Also verifies the size of the dictionary is within expectations
// according to the limit on buffering set by
// `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
@ -1492,8 +1519,8 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
ASSERT_EQ("2,0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
uint64_t prev_compression_dict_bytes_inserted =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
// This L0->L1 compaction merges the two L0 files into L1. The produced L1
// file is not bottommost due to the existing L2 file covering the same key-
// range.
@ -1501,38 +1528,58 @@ TEST_P(PresetCompressionDictTest, CompactNonBottommost) {
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
// If there's a compression dictionary, it should have been loaded when the
// compaction finished, incurring a cache miss.
uint64_t expected_compression_dict_misses;
// We can use `BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT` to detect whether a
// compression dictionary exists since dictionaries would be preloaded when
// the compaction finishes.
if (bottommost_) {
expected_compression_dict_misses = prev_compression_dict_misses;
ASSERT_EQ(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
} else {
expected_compression_dict_misses = prev_compression_dict_misses + 1;
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
// TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
// number of bytes needs to be adjusted in case the cached block is in
// ZSTD's digested dictionary format.
if (compression_type_ != kZSTD &&
compression_type_ != kZSTDNotFinalCompression) {
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit
// after each block is built.
ASSERT_LE(TestGetTickerCount(options,
BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
}
}
ASSERT_EQ(expected_compression_dict_misses,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
}
TEST_P(PresetCompressionDictTest, CompactBottommost) {
// Verifies that dictionary is generated and written during compaction to
// non-bottommost level only when either `ColumnFamilyOptions::compression` or
// `ColumnFamilyOptions::bottommost_compression` enables dictionary.
// `ColumnFamilyOptions::bottommost_compression` enables dictionary. Also
// verifies the size of the dictionary is within expectations according to the
// limit on buffering set by `CompressionOptions::max_dict_buffer_bytes`.
const size_t kValueLen = 256;
const size_t kKeysPerFile = 1 << 10;
const size_t kDictLen = 4 << 10;
const size_t kDictLen = 16 << 10;
const size_t kBlockLen = 4 << 10;
Options options = CurrentOptions();
if (bottommost_) {
options.bottommost_compression = compression_type_;
options.bottommost_compression_opts.enabled = true;
options.bottommost_compression_opts.max_dict_bytes = kDictLen;
options.bottommost_compression_opts.max_dict_buffer_bytes = kBlockLen;
} else {
options.compression = compression_type_;
options.compression_opts.max_dict_bytes = kDictLen;
options.compression_opts.max_dict_buffer_bytes = kBlockLen;
}
options.disable_auto_compactions = true;
options.statistics = CreateDBStatistics();
BlockBasedTableOptions bbto;
bbto.block_size = kBlockLen;
bbto.cache_index_and_filter_blocks = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
Reopen(options);
@ -1548,17 +1595,28 @@ TEST_P(PresetCompressionDictTest, CompactBottommost) {
ASSERT_EQ("2", FilesPerLevel(0));
#endif // ROCKSDB_LITE
uint64_t prev_compression_dict_misses =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS);
uint64_t prev_compression_dict_bytes_inserted =
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT);
CompactRangeOptions cro;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
#ifndef ROCKSDB_LITE
ASSERT_EQ("0,1", FilesPerLevel(0));
#endif // ROCKSDB_LITE
// If there's a compression dictionary, it should have been loaded when the
// compaction finished, incurring a cache miss.
ASSERT_EQ(prev_compression_dict_misses + 1,
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_MISS));
ASSERT_GT(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted);
// TODO(ajkr): fix the below assertion to work with ZSTD. The expectation on
// number of bytes needs to be adjusted in case the cached block is in ZSTD's
// digested dictionary format.
if (compression_type_ != kZSTD &&
compression_type_ != kZSTDNotFinalCompression) {
// Although we limited buffering to `kBlockLen`, there may be up to two
// blocks of data included in the dictionary since we only check limit after
// each block is built.
ASSERT_LE(
TestGetTickerCount(options, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT),
prev_compression_dict_bytes_inserted + 2 * kBlockLen);
}
}
class CompactionCompressionListener : public EventListener {

@ -206,6 +206,7 @@ DECLARE_string(bottommost_compression_type);
DECLARE_int32(compression_max_dict_bytes);
DECLARE_int32(compression_zstd_max_train_bytes);
DECLARE_int32(compression_parallel_threads);
DECLARE_uint64(compression_max_dict_buffer_bytes);
DECLARE_string(checksum_type);
DECLARE_string(hdfs);
DECLARE_string(env_uri);

@ -661,6 +661,10 @@ DEFINE_int32(compression_zstd_max_train_bytes, 0,
DEFINE_int32(compression_parallel_threads, 1,
"Number of threads for parallel compression.");
DEFINE_uint64(compression_max_dict_buffer_bytes, 0,
"Buffering limit for SST file data to sample for dictionary "
"compression.");
DEFINE_string(bottommost_compression_type, "disable",
"Algorithm to use to compress bottommost level of the database. "
"\"disable\" means disabling the feature");

@ -2083,6 +2083,8 @@ void StressTest::Open() {
FLAGS_compression_zstd_max_train_bytes;
options_.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads;
options_.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
options_.create_if_missing = true;
options_.max_manifest_file_size = FLAGS_max_manifest_file_size;
options_.inplace_update_support = FLAGS_in_place_update;

@ -143,6 +143,27 @@ struct CompressionOptions {
// Default: false.
bool enabled;
// Limit on data buffering when gathering samples to build a dictionary. Zero
// means no limit. When dictionary is disabled (`max_dict_bytes == 0`),
// enabling this limit (`max_dict_buffer_bytes != 0`) has no effect.
//
// In compaction, the buffering is limited to the target file size (see
// `target_file_size_base` and `target_file_size_multiplier`) even if this
// setting permits more buffering. Since we cannot determine where the file
// should be cut until data blocks are compressed with dictionary, buffering
// more than the target file size could lead to selecting samples that belong
// to a later output SST.
//
// Limiting too strictly may harm dictionary effectiveness since it forces
// RocksDB to pick samples from the initial portion of the output SST, which
// may not be representative of the whole file. Configuring this limit below
// `zstd_max_train_bytes` (when enabled) can restrict how many samples we can
// pass to the dictionary trainer. Configuring it below `max_dict_bytes` can
// restrict the size of the final dictionary.
//
// Default: 0 (unlimited)
uint64_t max_dict_buffer_bytes;
CompressionOptions()
: window_bits(-14),
level(kDefaultCompressionLevel),
@ -150,17 +171,19 @@ struct CompressionOptions {
max_dict_bytes(0),
zstd_max_train_bytes(0),
parallel_threads(1),
enabled(false) {}
enabled(false),
max_dict_buffer_bytes(0) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes,
int _zstd_max_train_bytes, int _parallel_threads,
bool _enabled)
bool _enabled, uint64_t _max_dict_buffer_bytes)
: window_bits(wbits),
level(_lev),
strategy(_strategy),
max_dict_bytes(_max_dict_bytes),
zstd_max_train_bytes(_zstd_max_train_bytes),
parallel_threads(_parallel_threads),
enabled(_enabled) {}
enabled(_enabled),
max_dict_buffer_bytes(_max_dict_buffer_bytes) {}
};
enum UpdateStatus { // Return status For inplace update callback

@ -998,11 +998,17 @@ extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_compression_options_zstd_max_train_bytes(rocksdb_options_t*,
int);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_compression_options_max_dict_buffer_bytes(
rocksdb_options_t*, uint64_t);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options(rocksdb_options_t*, int, int,
int, int, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options_zstd_max_train_bytes(
rocksdb_options_t*, int, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_bottommost_compression_options_max_dict_buffer_bytes(
rocksdb_options_t*, uint64_t, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_prefix_extractor(
rocksdb_options_t*, rocksdb_slicetransform_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_num_levels(

@ -132,6 +132,27 @@ jint Java_org_rocksdb_CompressionOptions_zstdMaxTrainBytes(
return static_cast<jint>(opt->zstd_max_train_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setMaxDictBufferBytes
* Signature: (JJ)V
*/
void Java_org_rocksdb_CompressionOptions_setMaxDictBufferBytes(
JNIEnv*, jobject, jlong jhandle, jlong jmax_dict_buffer_bytes) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
opt->max_dict_buffer_bytes = static_cast<uint64_t>(jmax_dict_buffer_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: maxDictBufferBytes
* Signature: (J)J
*/
jlong Java_org_rocksdb_CompressionOptions_maxDictBufferBytes(JNIEnv*, jobject,
jlong jhandle) {
auto* opt = reinterpret_cast<ROCKSDB_NAMESPACE::CompressionOptions*>(jhandle);
return static_cast<jlong>(opt->max_dict_buffer_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setEnabled

@ -51,83 +51,85 @@ int offset_of(T1 AdvancedColumnFamilyOptions::*member) {
static Status ParseCompressionOptions(const std::string& value,
const std::string& name,
CompressionOptions& compression_opts) {
size_t start = 0;
size_t end = value.find(':');
if (end == std::string::npos) {
const char kDelimiter = ':';
std::istringstream field_stream(value);
std::string field;
if (!std::getline(field_stream, field, kDelimiter)) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.window_bits = ParseInt(value.substr(start, end - start));
start = end + 1;
end = value.find(':', start);
if (end == std::string::npos) {
compression_opts.window_bits = ParseInt(field);
if (!std::getline(field_stream, field, kDelimiter)) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.level = ParseInt(value.substr(start, end - start));
start = end + 1;
if (start >= value.size()) {
compression_opts.level = ParseInt(field);
if (!std::getline(field_stream, field, kDelimiter)) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
end = value.find(':', start);
compression_opts.strategy =
ParseInt(value.substr(start, value.size() - start));
compression_opts.strategy = ParseInt(field);
// max_dict_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
if (!field_stream.eof()) {
if (!std::getline(field_stream, field, kDelimiter)) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.max_dict_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
compression_opts.max_dict_bytes = ParseInt(field);
}
// zstd_max_train_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
if (!field_stream.eof()) {
if (!std::getline(field_stream, field, kDelimiter)) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.zstd_max_train_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
compression_opts.zstd_max_train_bytes = ParseInt(field);
}
// parallel_threads is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
if (!field_stream.eof()) {
if (!std::getline(field_stream, field, kDelimiter)) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
// Since parallel_threads comes before enabled but was added optionally
// later, we need to check if this is the final token (meaning it is the
// enabled bit), or if there is another token (meaning this one is
// parallel_threads)
end = value.find(':', start);
if (end != std::string::npos) {
compression_opts.parallel_threads =
ParseInt(value.substr(start, value.size() - start));
// enabled bit), or if there are more tokens (meaning this one is
// parallel_threads).
if (!field_stream.eof()) {
compression_opts.parallel_threads = ParseInt(field);
} else {
// parallel_threads is not serialized with this format, but enabled is
compression_opts.parallel_threads = CompressionOptions().parallel_threads;
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
compression_opts.enabled = ParseBoolean("", field);
}
}
// enabled is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
if (!field_stream.eof()) {
if (!std::getline(field_stream, field, kDelimiter)) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.enabled = ParseBoolean("", field);
}
// max_dict_buffer_bytes is optional for backwards compatibility
if (!field_stream.eof()) {
if (!std::getline(field_stream, field, kDelimiter)) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
compression_opts.max_dict_buffer_bytes = ParseUint64(field);
}
if (!field_stream.eof()) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
return Status::OK();
}
@ -161,6 +163,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"enabled",
{offsetof(struct CompressionOptions, enabled), OptionType::kBoolean,
OptionVerificationType::kNormal, OptionTypeFlags::kMutable}},
{"max_dict_buffer_bytes",
{offsetof(struct CompressionOptions, max_dict_buffer_bytes),
OptionType::kUInt64T, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
};
static std::unordered_map<std::string, OptionTypeInfo>

@ -201,6 +201,11 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.enabled: %s",
bottommost_compression_opts.enabled ? "true" : "false");
ROCKS_LOG_HEADER(
log,
" Options.bottommost_compression_opts.max_dict_buffer_bytes: "
"%" PRIu64,
bottommost_compression_opts.max_dict_buffer_bytes);
ROCKS_LOG_HEADER(log, " Options.compression_opts.window_bits: %d",
compression_opts.window_bits);
ROCKS_LOG_HEADER(log, " Options.compression_opts.level: %d",
@ -222,6 +227,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER(log,
" Options.compression_opts.enabled: %s",
compression_opts.enabled ? "true" : "false");
ROCKS_LOG_HEADER(log,
" Options.compression_opts.max_dict_buffer_bytes: "
"%" PRIu64,
compression_opts.max_dict_buffer_bytes);
ROCKS_LOG_HEADER(log, " Options.level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger);
ROCKS_LOG_HEADER(log, " Options.level0_slowdown_writes_trigger: %d",

@ -407,14 +407,14 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
// Count padding bytes by setting all bytes in the memory to a special char,
// copy a well constructed struct to this memory and see how many special
// bytes left.
ColumnFamilyOptions* options = new (options_ptr) ColumnFamilyOptions();
FillWithSpecialChar(options_ptr, sizeof(ColumnFamilyOptions),
kColumnFamilyOptionsExcluded);
// It based on the behavior of compiler that padding bytes are not changed
// when copying the struct. It's prone to failure when compiler behavior
// changes. We verify there is unset bytes to detect the case.
*options = ColumnFamilyOptions();
// Invoke a user-defined constructor in the hope that it does not overwrite
// padding bytes. Note that previously we relied on the implicitly-defined
// copy-assignment operator (i.e., `*options = ColumnFamilyOptions();`) here,
// which did in fact modify padding bytes.
ColumnFamilyOptions* options = new (options_ptr) ColumnFamilyOptions();
// Deprecatd option which is not initialized. Need to set it to avoid
// Valgrind error
@ -472,8 +472,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"max_bytes_for_level_multiplier=60;"
"memtable_factory=SkipListFactory;"
"compression=kNoCompression;"
"compression_opts=5:6:7:8:9:true;"
"bottommost_compression_opts=4:5:6:7:8:true;"
"compression_opts=5:6:7:8:9:10:true:11;"
"bottommost_compression_opts=4:5:6:7:8:9:true:10;"
"bottommost_compression=kDisableCompressionOption;"
"level0_stop_writes_trigger=33;"
"num_levels=99;"

@ -725,12 +725,18 @@ TEST_F(OptionsTest, CompressionOptionsFromString) {
ASSERT_OK(GetColumnFamilyOptionsFromString(
ignore, ColumnFamilyOptions(), "compression_opts=5:6:7:8:9:x:false",
&base_cf_opt));
ASSERT_NOK(GetColumnFamilyOptionsFromString(
ASSERT_OK(GetColumnFamilyOptionsFromString(
config_options, ColumnFamilyOptions(),
"compression_opts=1:2:3:4:5:6:true:8", &base_cf_opt));
ASSERT_OK(GetColumnFamilyOptionsFromString(
ignore, ColumnFamilyOptions(), "compression_opts=1:2:3:4:5:6:true:8",
&base_cf_opt));
ASSERT_NOK(GetColumnFamilyOptionsFromString(
config_options, ColumnFamilyOptions(),
"compression_opts=1:2:3:4:5:6:true:8:9", &base_cf_opt));
ASSERT_OK(GetColumnFamilyOptionsFromString(
ignore, ColumnFamilyOptions(), "compression_opts=1:2:3:4:5:6:true:8:9",
&base_cf_opt));
ASSERT_NOK(GetColumnFamilyOptionsFromString(
config_options, ColumnFamilyOptions(), "compression_opts={unknown=bad;}",
&base_cf_opt));

@ -11,24 +11,25 @@
#include <assert.h>
#include <stdio.h>
#include <atomic>
#include <list>
#include <map>
#include <memory>
#include <numeric>
#include <string>
#include <unordered_map>
#include <utility>
#include "db/dbformat.h"
#include "index_builder.h"
#include "memory/memory_allocator.h"
#include "rocksdb/cache.h"
#include "rocksdb/comparator.h"
#include "rocksdb/env.h"
#include "rocksdb/flush_block_policy.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/table.h"
#include "table/block_based/block.h"
#include "table/block_based/block_based_filter_block.h"
#include "table/block_based/block_based_table_factory.h"
@ -40,8 +41,6 @@
#include "table/block_based/partitioned_filter_block.h"
#include "table/format.h"
#include "table/table_builder.h"
#include "memory/memory_allocator.h"
#include "util/coding.h"
#include "util/compression.h"
#include "util/crc32c.h"
@ -306,6 +305,10 @@ struct BlockBasedTableBuilder::Rep {
kClosed,
};
State state;
// `kBuffered` state is allowed only as long as the buffering of uncompressed
// data blocks (see `data_block_and_keys_buffers`) does not exceed
// `buffer_limit`.
uint64_t buffer_limit;
const bool use_delta_encoding_for_index_values;
std::unique_ptr<FilterBlockBuilder> filter_builder;
@ -321,7 +324,6 @@ struct BlockBasedTableBuilder::Rep {
const std::string& column_family_name;
uint64_t creation_time = 0;
uint64_t oldest_key_time = 0;
const uint64_t target_file_size;
uint64_t file_creation_time = 0;
// DB IDs
@ -407,7 +409,7 @@ struct BlockBasedTableBuilder::Rep {
const CompressionOptions& _compression_opts, const bool skip_filters,
const int _level_at_creation, const std::string& _column_family_name,
const uint64_t _creation_time, const uint64_t _oldest_key_time,
const uint64_t _target_file_size, const uint64_t _file_creation_time,
const uint64_t target_file_size, const uint64_t _file_creation_time,
const std::string& _db_id, const std::string& _db_session_id)
: ioptions(_ioptions),
moptions(_moptions),
@ -448,13 +450,20 @@ struct BlockBasedTableBuilder::Rep {
column_family_name(_column_family_name),
creation_time(_creation_time),
oldest_key_time(_oldest_key_time),
target_file_size(_target_file_size),
file_creation_time(_file_creation_time),
db_id(_db_id),
db_session_id(_db_session_id),
db_host_id(ioptions.db_host_id),
status_ok(true),
io_status_ok(true) {
if (target_file_size == 0) {
buffer_limit = compression_opts.max_dict_buffer_bytes;
} else if (compression_opts.max_dict_buffer_bytes == 0) {
buffer_limit = target_file_size;
} else {
buffer_limit =
std::min(target_file_size, compression_opts.max_dict_buffer_bytes);
}
for (uint32_t i = 0; i < compression_opts.parallel_threads; i++) {
compression_ctxs[i].reset(new CompressionContext(compression_type));
}
@ -896,8 +905,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
r->first_key_in_next_block = &key;
Flush();
if (r->state == Rep::State::kBuffered && r->target_file_size != 0 &&
r->data_begin_offset > r->target_file_size) {
if (r->state == Rep::State::kBuffered && r->buffer_limit != 0 &&
r->data_begin_offset > r->buffer_limit) {
EnterUnbuffered();
}
@ -997,23 +1006,28 @@ void BlockBasedTableBuilder::Flush() {
void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block,
BlockHandle* handle,
bool is_data_block) {
WriteBlock(block->Finish(), handle, is_data_block);
block->Reset();
block->Finish();
std::string raw_block_contents;
block->SwapAndReset(raw_block_contents);
if (rep_->state == Rep::State::kBuffered) {
assert(is_data_block);
assert(!rep_->data_block_and_keys_buffers.empty());
rep_->data_block_and_keys_buffers.back().first =
std::move(raw_block_contents);
rep_->data_begin_offset +=
rep_->data_block_and_keys_buffers.back().first.size();
return;
}
WriteBlock(raw_block_contents, handle, is_data_block);
}
void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
BlockHandle* handle,
bool is_data_block) {
Rep* r = rep_;
assert(r->state == Rep::State::kUnbuffered);
Slice block_contents;
CompressionType type;
if (r->state == Rep::State::kBuffered) {
assert(is_data_block);
assert(!r->data_block_and_keys_buffers.empty());
r->data_block_and_keys_buffers.back().first = raw_block_contents.ToString();
r->data_begin_offset += r->data_block_and_keys_buffers.back().first.size();
return;
}
Status compress_status;
CompressAndVerifyBlock(raw_block_contents, is_data_block,
*(r->compression_ctxs[0]), r->verify_ctxs[0].get(),
@ -1630,14 +1644,38 @@ void BlockBasedTableBuilder::EnterUnbuffered() {
const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0
? r->compression_opts.zstd_max_train_bytes
: r->compression_opts.max_dict_bytes;
// If buffer size is reasonable, we pre-generate a permutation to enforce
// uniqueness. This prevents wasting samples on duplicates, which is
// particularly likely when not many blocks were buffered.
std::vector<uint16_t> data_block_order;
size_t data_block_order_idx = 0;
if (r->data_block_and_keys_buffers.size() <= ((1 << 16) - 1)) {
data_block_order.resize(r->data_block_and_keys_buffers.size());
std::iota(data_block_order.begin(), data_block_order.end(),
static_cast<uint16_t>(0));
// We could be smarter and interleave the shuffling and sample appending
// logic. Then we could terminate as soon as `kSampleBytes` is reached,
// saving some shuffling computation.
RandomShuffle(data_block_order.begin(), data_block_order.end(),
static_cast<uint32_t>(r->creation_time));
}
Random64 generator{r->creation_time};
std::string compression_dict_samples;
std::vector<size_t> compression_dict_sample_lens;
if (!r->data_block_and_keys_buffers.empty()) {
while (compression_dict_samples.size() < kSampleBytes) {
size_t rand_idx =
static_cast<size_t>(
generator.Uniform(r->data_block_and_keys_buffers.size()));
while ((data_block_order.empty() ||
data_block_order_idx < data_block_order.size()) &&
compression_dict_samples.size() < kSampleBytes) {
size_t rand_idx;
if (data_block_order.empty()) {
rand_idx = static_cast<size_t>(
generator.Uniform(r->data_block_and_keys_buffers.size()));
} else {
rand_idx = data_block_order[data_block_order_idx];
++data_block_order_idx;
}
size_t copy_len =
std::min(kSampleBytes - compression_dict_samples.size(),
r->data_block_and_keys_buffers[rand_idx].first.size());

@ -117,8 +117,9 @@ class BlockBasedTableBuilder : public TableBuilder {
// REQUIRES: `rep_->state == kBuffered`
void EnterUnbuffered();
// Call block's Finish() method
// and then write the compressed block contents to file.
// Call block's Finish() method and then
// - in buffered mode, buffer the uncompressed block contents.
// - in unbuffered mode, write the compressed block contents to file.
void WriteBlock(BlockBuilder* block, BlockHandle* handle, bool is_data_block);
// Compress and write block content to the file.

@ -234,7 +234,8 @@ Status SstFileDumper::ShowAllCompressionSizes(
const std::vector<std::pair<CompressionType, const char*>>&
compression_types,
int32_t compress_level_from, int32_t compress_level_to,
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes) {
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes,
uint64_t max_dict_buffer_bytes) {
fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size);
for (auto& i : compression_types) {
if (CompressionTypeSupported(i.first)) {
@ -242,6 +243,7 @@ Status SstFileDumper::ShowAllCompressionSizes(
CompressionOptions compress_opt;
compress_opt.max_dict_bytes = max_dict_bytes;
compress_opt.zstd_max_train_bytes = zstd_max_train_bytes;
compress_opt.max_dict_buffer_bytes = max_dict_buffer_bytes;
for (int32_t j = compress_level_from; j <= compress_level_to; j++) {
fprintf(stdout, "Compression level: %d", j);
compress_opt.level = j;

@ -40,7 +40,8 @@ class SstFileDumper {
const std::vector<std::pair<CompressionType, const char*>>&
compression_types,
int32_t compress_level_from, int32_t compress_level_to,
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes);
uint32_t max_dict_bytes, uint32_t zstd_max_train_bytes,
uint64_t max_dict_buffer_bytes);
Status ShowCompressionSize(size_t block_size, CompressionType compress_type,
const CompressionOptions& compress_opt);

@ -989,6 +989,10 @@ DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts"
DEFINE_int32(compression_parallel_threads, 1,
"Number of threads for parallel compression.");
DEFINE_uint64(compression_max_dict_buffer_bytes,
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes,
"Maximum bytes to buffer to collect samples for dictionary.");
static bool ValidateTableCacheNumshardbits(const char* flagname,
int32_t value) {
if (0 >= value || value > 20) {
@ -4136,6 +4140,8 @@ class Benchmark {
FLAGS_compression_zstd_max_train_bytes;
options.compression_opts.parallel_threads =
FLAGS_compression_parallel_threads;
options.compression_opts.max_dict_buffer_bytes =
FLAGS_compression_max_dict_buffer_bytes;
// If this is a block based table, set some related options
auto table_options =
options.table_factory->GetOptions<BlockBasedTableOptions>();

@ -51,6 +51,7 @@ default_params = {
# Disabled compression_parallel_threads as the feature is not stable
# lambda: random.choice([1] * 9 + [4])
"compression_parallel_threads": 1,
"compression_max_dict_buffer_bytes": lambda: (1 << random.randint(0, 40)) - 1,
"clear_column_family_one_in": 0,
"compact_files_one_in": 1000000,
"compact_range_one_in": 1000000,
@ -284,8 +285,10 @@ blob_params = {
def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()])
if dest_params.get("compression_type") != "zstd" or \
dest_params.get("compression_max_dict_bytes") == 0:
if dest_params.get("compression_max_dict_bytes") == 0:
dest_params["compression_zstd_max_train_bytes"] = 0
dest_params["compression_max_dict_buffer_bytes"] = 0
if dest_params.get("compression_type") != "zstd":
dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
dest_params["memtablerep"] = "skip_list"

@ -103,6 +103,9 @@ void print_help(bool to_stderr) {
--compression_zstd_max_train_bytes=<uint32_t>
Maximum size of training data passed to zstd's dictionary trainer
--compression_max_dict_buffer_bytes=<int64_t>
Limit on buffer size from which we collect samples for dictionary generation.
)");
}
@ -166,6 +169,8 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_bytes;
uint32_t compression_zstd_max_train_bytes =
ROCKSDB_NAMESPACE::CompressionOptions().zstd_max_train_bytes;
uint64_t compression_max_dict_buffer_bytes =
ROCKSDB_NAMESPACE::CompressionOptions().max_dict_buffer_bytes;
int64_t tmp_val;
@ -276,6 +281,17 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
return 1;
}
compression_zstd_max_train_bytes = static_cast<uint32_t>(tmp_val);
} else if (ParseIntArg(argv[i], "--compression_max_dict_buffer_bytes=",
"compression_max_dict_buffer_bytes must be numeric",
&tmp_val)) {
if (tmp_val < 0) {
fprintf(stderr,
"compression_max_dict_buffer_bytes must be positive: '%s'\n",
argv[i]);
print_help(/*to_stderr*/ true);
return 1;
}
compression_max_dict_buffer_bytes = static_cast<uint64_t>(tmp_val);
} else if (strcmp(argv[i], "--help") == 0) {
print_help(/*to_stderr*/ false);
return 0;
@ -403,7 +419,7 @@ int SSTDumpTool::Run(int argc, char const* const* argv, Options options) {
set_block_size ? block_size : 16384,
compression_types.empty() ? kCompressions : compression_types,
compress_level_from, compress_level_to, compression_max_dict_bytes,
compression_zstd_max_train_bytes);
compression_zstd_max_train_bytes, compression_max_dict_buffer_bytes);
if (!st.ok()) {
fprintf(stderr, "Failed to recompress: %s\n", st.ToString().c_str());
exit(1);

@ -627,6 +627,9 @@ inline std::string CompressionOptionsToString(
result.append("enabled=")
.append(ToString(compression_options.enabled))
.append("; ");
result.append("max_dict_buffer_bytes=")
.append(ToString(compression_options.max_dict_buffer_bytes))
.append("; ");
return result;
}

Loading…
Cancel
Save