Add bottommost_compression_opts to for bottommost_compression (#3985)

Summary:
…ression

 For `CompressionType` we have options `compression` and `bottommost_compression`. Thus, to make the compression options consitent with the compression type when bottommost_compression is enabled, we add the bottommost_compression_opts
Closes https://github.com/facebook/rocksdb/pull/3985

Reviewed By: riversand963

Differential Revision: D8385911

Pulled By: zhichao-cao

fbshipit-source-id: 07bc533dd61bcf1cef5927d8d62901c13d38d5fc
main
Zhichao Cao 6 years ago committed by Facebook Github Bot
parent 235ab9dd32
commit 1f6efabe23
  1. 1
      HISTORY.md
  2. 12
      db/c.cc
  3. 2
      db/compaction.cc
  4. 8
      db/compaction.h
  5. 14
      db/compaction_job.cc
  6. 3
      db/compaction_job_test.cc
  7. 53
      db/compaction_picker.cc
  8. 5
      db/compaction_picker.h
  9. 14
      db/compaction_picker_universal.cc
  10. 19
      include/rocksdb/advanced_options.h
  11. 5
      include/rocksdb/options.h
  12. 24
      java/rocksjni/compression_options.cc
  13. 29
      java/rocksjni/options.cc
  14. 1
      options/cf_options.cc
  15. 2
      options/cf_options.h
  16. 25
      options/options.cc
  17. 113
      options/options_helper.cc
  18. 1
      options/options_settable_test.cc
  19. 21
      options/options_test.cc
  20. 14
      table/sst_file_writer.cc

@ -3,6 +3,7 @@
### Public API Change
* For users of `Statistics` objects created via `CreateDBStatistics()`, the format of the string returned by its `ToString()` method has changed.
* With LRUCache, when high_pri_pool_ratio > 0, midpoint insertion strategy will be enabled to put low-pri items to the tail of low-pri list (the midpoint) when they first inserted into the cache. This is to make cache entries never get hit age out faster, improving cache efficiency when large background scan presents.
* For bottommost_compression, a compatible CompressionOptions is added via `bottommost_compression_opts`. To keep backward compatible, a new boolean `enabled` is added to CompressionOptions. For compression_opts, it will be always used no matter what value of `enabled` is. For bottommost_compression_opts, it will only be used when user set `enabled=true`, otherwise, compression_opts will be used for bottommost_compression as default.
* The "rocksdb.num.entries" table property no longer counts range deletion tombstones as entries.
### New Features

@ -2268,6 +2268,18 @@ void rocksdb_options_set_compression_per_level(rocksdb_options_t* opt,
}
}
void rocksdb_options_set_bottommost_compression_options(rocksdb_options_t* opt,
int w_bits, int level,
int strategy,
int max_dict_bytes,
bool enabled) {
opt->rep.bottommost_compression_opts.window_bits = w_bits;
opt->rep.bottommost_compression_opts.level = level;
opt->rep.bottommost_compression_opts.strategy = strategy;
opt->rep.bottommost_compression_opts.max_dict_bytes = max_dict_bytes;
opt->rep.bottommost_compression_opts.enabled = enabled;
}
void rocksdb_options_set_compression_options(rocksdb_options_t* opt, int w_bits,
int level, int strategy,
int max_dict_bytes) {

@ -134,6 +134,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
int _output_level, uint64_t _target_file_size,
uint64_t _max_compaction_bytes, uint32_t _output_path_id,
CompressionType _compression,
CompressionOptions _compression_opts,
uint32_t _max_subcompactions,
std::vector<FileMetaData*> _grandparents,
bool _manual_compaction, double _score,
@ -152,6 +153,7 @@ Compaction::Compaction(VersionStorageInfo* vstorage,
cfd_(nullptr),
output_path_id_(_output_path_id),
output_compression_(_compression),
output_compression_opts_(_compression_opts),
deletion_compaction_(_deletion_compaction),
inputs_(std::move(_inputs)),
grandparents_(std::move(_grandparents)),

@ -40,7 +40,7 @@ class Compaction {
std::vector<CompactionInputFiles> inputs, int output_level,
uint64_t target_file_size, uint64_t max_compaction_bytes,
uint32_t output_path_id, CompressionType compression,
uint32_t max_subcompactions,
CompressionOptions compression_opts, uint32_t max_subcompactions,
std::vector<FileMetaData*> grandparents,
bool manual_compaction = false, double score = -1,
bool deletion_compaction = false,
@ -119,6 +119,11 @@ class Compaction {
// What compression for output
CompressionType output_compression() const { return output_compression_; }
// What compression options for output
CompressionOptions output_compression_opts() const {
return output_compression_opts_;
}
// Whether need to write output file to second DB path.
uint32_t output_path_id() const { return output_path_id_; }
@ -283,6 +288,7 @@ class Compaction {
const uint32_t output_path_id_;
CompressionType output_compression_;
CompressionOptions output_compression_opts_;
// If true, then the comaction can be done by simply deleting input files.
const bool deletion_compaction_;

@ -823,10 +823,13 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
// zstd's dictionary trainer, or just use them directly. Then, the dictionary
// is used for compressing subsequent output files in the same subcompaction.
const bool kUseZstdTrainer =
cfd->ioptions()->compression_opts.zstd_max_train_bytes > 0;
sub_compact->compaction->output_compression_opts().zstd_max_train_bytes >
0;
const size_t kSampleBytes =
kUseZstdTrainer ? cfd->ioptions()->compression_opts.zstd_max_train_bytes
: cfd->ioptions()->compression_opts.max_dict_bytes;
kUseZstdTrainer
? sub_compact->compaction->output_compression_opts()
.zstd_max_train_bytes
: sub_compact->compaction->output_compression_opts().max_dict_bytes;
const int kSampleLenShift = 6; // 2^6 = 64-byte samples
std::set<size_t> sample_begin_offsets;
if (bottommost_level_ && kSampleBytes > 0) {
@ -1029,7 +1032,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (kUseZstdTrainer) {
sub_compact->compression_dict = ZSTD_TrainDictionary(
dict_sample_data, kSampleLenShift,
cfd->ioptions()->compression_opts.max_dict_bytes);
sub_compact->compaction->output_compression_opts()
.max_dict_bytes);
} else {
sub_compact->compression_dict = std::move(dict_sample_data);
}
@ -1431,7 +1435,7 @@ Status CompactionJob::OpenCompactionOutputFile(
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
sub_compact->compaction->output_compression(),
cfd->ioptions()->compression_opts,
sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), &sub_compact->compression_dict,
skip_filters, output_file_creation_time));
LogFlush(db_options_.info_log);

@ -246,7 +246,8 @@ class CompactionJobTest : public testing::Test {
Compaction compaction(cfd->current()->storage_info(), *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(),
compaction_input_files, 1, 1024 * 1024,
10 * 1024 * 1024, 0, kNoCompression, 0, {}, true);
10 * 1024 * 1024, 0, kNoCompression,
cfd->ioptions()->compression_opts, 0, {}, true);
compaction.SetInputVersion(cfd->current());
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());

@ -110,6 +110,24 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
}
}
CompressionOptions GetCompressionOptions(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
int level,
const bool enable_compression) {
if (!enable_compression) {
return ioptions.compression_opts;
}
// If bottommost_compression is set and we are compacting to the
// bottommost level then we should use the specified compression options
// for the bottmomost_compression.
if (ioptions.bottommost_compression != kDisableCompressionOption &&
level >= (vstorage->num_non_empty_levels() - 1) &&
ioptions.bottommost_compression_opts.enabled) {
return ioptions.bottommost_compression_opts;
}
return ioptions.compression_opts;
}
CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: ioptions_(ioptions), icmp_(icmp) {}
@ -301,7 +319,7 @@ Compaction* CompactionPicker::CompactFiles(
new Compaction(vstorage, ioptions_, mutable_cf_options, input_files,
output_level, compact_options.output_file_size_limit,
mutable_cf_options.max_compaction_bytes, output_path_id,
compact_options.compression,
compact_options.compression, ioptions_.compression_opts,
compact_options.max_subcompactions,
/* grandparents */ {}, true);
RegisterCompaction(c);
@ -569,11 +587,13 @@ Compaction* CompactionPicker::CompactRange(
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs),
output_level, MaxFileSizeForLevel(mutable_cf_options, output_level,
ioptions_.compaction_style),
output_level,
MaxFileSizeForLevel(mutable_cf_options, output_level,
ioptions_.compaction_style),
/* max_compaction_bytes */ LLONG_MAX, output_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, 1),
GetCompressionOptions(ioptions_, vstorage, output_level),
max_subcompactions, /* grandparents */ {}, /* is manual */ true);
RegisterCompaction(c);
return c;
@ -679,11 +699,12 @@ Compaction* CompactionPicker::CompactRange(
vstorage, ioptions_, mutable_cf_options, std::move(compaction_inputs),
output_level,
MaxFileSizeForLevel(mutable_cf_options, output_level,
ioptions_.compaction_style, vstorage->base_level(),
ioptions_.level_compaction_dynamic_level_bytes),
ioptions_.compaction_style, vstorage->base_level(),
ioptions_.level_compaction_dynamic_level_bytes),
mutable_cf_options.max_compaction_bytes, output_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
vstorage->base_level()),
GetCompressionOptions(ioptions_, vstorage, output_level),
/* max_subcompactions */ 0, std::move(grandparents),
/* is manual compaction */ true);
@ -1329,12 +1350,13 @@ Compaction* LevelCompactionBuilder::GetCompaction() {
vstorage_, ioptions_, mutable_cf_options_, std::move(compaction_inputs_),
output_level_,
MaxFileSizeForLevel(mutable_cf_options_, output_level_,
ioptions_.compaction_style, vstorage_->base_level(),
ioptions_.level_compaction_dynamic_level_bytes),
ioptions_.compaction_style, vstorage_->base_level(),
ioptions_.level_compaction_dynamic_level_bytes),
mutable_cf_options_.max_compaction_bytes,
GetPathId(ioptions_, mutable_cf_options_, output_level_),
GetCompressionType(ioptions_, vstorage_, mutable_cf_options_,
output_level_, vstorage_->base_level()),
GetCompressionOptions(ioptions_, vstorage_, output_level_),
/* max_subcompactions */ 0, std::move(grandparents_), is_manual_,
start_level_score_, false /* deletion_compaction */, compaction_reason_);
@ -1584,9 +1606,9 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, /* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0), /* is deletion compaction */ true,
CompactionReason::kFIFOTtl);
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
{}, /* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOTtl);
return c;
}
@ -1624,8 +1646,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
16 * 1024 * 1024 /* output file size limit */,
0 /* max compaction bytes, not applicable */,
0 /* output path ID */, mutable_cf_options.compression,
0 /* max_subcompactions */, {}, /* is manual */ false,
vstorage->CompactionScore(0), /* is deletion compaction */ false,
ioptions_.compression_opts, 0 /* max_subcompactions */, {},
/* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ false,
CompactionReason::kFIFOReduceNumFiles);
return c;
}
@ -1671,9 +1694,9 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs), 0, 0, 0, 0,
kNoCompression, /* max_subcompactions */ 0, {}, /* is manual */ false,
vstorage->CompactionScore(0), /* is deletion compaction */ true,
CompactionReason::kFIFOMaxSize);
kNoCompression, ioptions_.compression_opts, /* max_subcompactions */ 0,
{}, /* is manual */ false, vstorage->CompactionScore(0),
/* is deletion compaction */ true, CompactionReason::kFIFOMaxSize);
return c;
}

@ -313,4 +313,9 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
int level, int base_level,
const bool enable_compression = true);
CompressionOptions GetCompressionOptions(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
int level,
const bool enable_compression = true);
} // namespace rocksdb

@ -636,6 +636,8 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSortedRuns(
LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level,
1, enable_compression),
GetCompressionOptions(ioptions_, vstorage, start_level,
enable_compression),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score, false /* deletion_compaction */, compaction_reason);
}
@ -769,12 +771,13 @@ Compaction* UniversalCompactionPicker::PickCompactionToReduceSizeAmp(
}
return new Compaction(
vstorage, ioptions_, mutable_cf_options, std::move(inputs),
output_level, MaxFileSizeForLevel(mutable_cf_options, output_level,
kCompactionStyleUniversal),
vstorage, ioptions_, mutable_cf_options, std::move(inputs), output_level,
MaxFileSizeForLevel(mutable_cf_options, output_level,
kCompactionStyleUniversal),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, 1),
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
1),
GetCompressionOptions(ioptions_, vstorage, output_level),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ false,
score, false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
@ -894,6 +897,7 @@ Compaction* UniversalCompactionPicker::PickDeleteTriggeredCompaction(
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
1),
GetCompressionOptions(ioptions_, vstorage, output_level),
/* max_subcompactions */ 0, /* grandparents */ {}, /* is manual */ true,
score, false /* deletion_compaction */,
CompactionReason::kFilesMarkedForCompaction);

@ -126,19 +126,32 @@ struct CompressionOptions {
// Default: 0.
uint32_t zstd_max_train_bytes;
// When the compression options are set by the user, it will be set to "true".
// For bottommost_compression_opts, to enable it, user must set enabled=true.
// Otherwise, bottommost compression will use compression_opts as default
// compression options.
//
// For compression_opts, if compression_opts.enabled=false, it is still
// used as compression options for compression process.
//
// Default: false.
bool enabled;
CompressionOptions()
: window_bits(-14),
level(kDefaultCompressionLevel),
strategy(0),
max_dict_bytes(0),
zstd_max_train_bytes(0) {}
zstd_max_train_bytes(0),
enabled(false) {}
CompressionOptions(int wbits, int _lev, int _strategy, int _max_dict_bytes,
int _zstd_max_train_bytes)
int _zstd_max_train_bytes, bool _enabled)
: window_bits(wbits),
level(_lev),
strategy(_strategy),
max_dict_bytes(_max_dict_bytes),
zstd_max_train_bytes(_zstd_max_train_bytes) {}
zstd_max_train_bytes(_zstd_max_train_bytes),
enabled(_enabled) {}
};
enum UpdateStatus { // Return status For inplace update callback

@ -221,6 +221,11 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// Default: kDisableCompressionOption (Disabled)
CompressionType bottommost_compression = kDisableCompressionOption;
// different options for compression algorithms used by bottommost_compression
// if it is enabled. To enable it, please see the definition of
// CompressionOptions.
CompressionOptions bottommost_compression_opts;
// different options for compression algorithms
CompressionOptions compression_opts;

@ -121,6 +121,30 @@ jint Java_org_rocksdb_CompressionOptions_maxDictBytes(JNIEnv* /*env*/,
return static_cast<jint>(opt->max_dict_bytes);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: setEnabled
* Signature: (JI)V
*/
void Java_org_rocksdb_CompressionOptions_setEnabled(JNIEnv* /*env*/,
jobject /*jobj*/,
jlong jhandle,
jboolean jenabled) {
auto* opt = reinterpret_cast<rocksdb::CompressionOptions*>(jhandle);
opt->enabled = static_cast<int>(jenabled);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: Enabled
* Signature: (J)I
*/
jint Java_org_rocksdb_CompressionOptions_enabled(JNIEnv* /*env*/,
jobject /*jobj*/,
jlong jhandle) {
auto* opt = reinterpret_cast<rocksdb::CompressionOptions*>(jhandle);
return static_cast<jint>(opt->enabled);
}
/*
* Class: org_rocksdb_CompressionOptions
* Method: disposeInternal

@ -2081,6 +2081,21 @@ jbyte Java_org_rocksdb_Options_bottommostCompressionType(JNIEnv* /*env*/,
options->bottommost_compression);
}
/*
* Class: org_rocksdb_Options
* Method: setBottommostCompressionOptions
* Signature: (JJ)V
*/
void Java_org_rocksdb_Options_setBottommostCompressionOptions(
JNIEnv* /*env*/, jobject /*jobj*/, jlong jhandle,
jlong jbottommost_compression_options_handle) {
auto* options = reinterpret_cast<rocksdb::Options*>(jhandle);
auto* bottommost_compression_options =
reinterpret_cast<rocksdb::CompressionOptions*>(
jbottommost_compression_options_handle);
options->bottommost_compression_opts = *bottommost_compression_options;
}
/*
* Class: org_rocksdb_Options
* Method: setCompressionOptions
@ -3500,6 +3515,20 @@ jbyte Java_org_rocksdb_ColumnFamilyOptions_bottommostCompressionType(
return rocksdb::CompressionTypeJni::toJavaCompressionType(
cf_options->bottommost_compression);
}
/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setBottommostCompressionOptions
* Signature: (JJ)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setBottommostCompressionOptions(
JNIEnv* /*env*/, jobject /*jobj*/, jlong jhandle,
jlong jbottommost_compression_options_handle) {
auto* cf_options = reinterpret_cast<rocksdb::ColumnFamilyOptions*>(jhandle);
auto* bottommost_compression_options =
reinterpret_cast<rocksdb::CompressionOptions*>(
jbottommost_compression_options_handle);
cf_options->bottommost_compression_opts = *bottommost_compression_options;
}
/*
* Class: org_rocksdb_ColumnFamilyOptions

@ -57,6 +57,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
use_fsync(db_options.use_fsync),
compression_per_level(cf_options.compression_per_level),
bottommost_compression(cf_options.bottommost_compression),
bottommost_compression_opts(cf_options.bottommost_compression_opts),
compression_opts(cf_options.compression_opts),
level_compaction_dynamic_level_bytes(
cf_options.level_compaction_dynamic_level_bytes),

@ -89,6 +89,8 @@ struct ImmutableCFOptions {
CompressionType bottommost_compression;
CompressionOptions bottommost_compression_opts;
CompressionOptions compression_opts;
bool level_compaction_dynamic_level_bytes;

@ -159,6 +159,28 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
min_write_buffer_number_to_merge);
ROCKS_LOG_HEADER(log, " Options.max_write_buffer_number_to_maintain: %d",
max_write_buffer_number_to_maintain);
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.window_bits: %d",
bottommost_compression_opts.window_bits);
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.level: %d",
bottommost_compression_opts.level);
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.strategy: %d",
bottommost_compression_opts.strategy);
ROCKS_LOG_HEADER(
log,
" Options.bottommost_compression_opts.max_dict_bytes: "
"%" ROCKSDB_PRIszt,
bottommost_compression_opts.max_dict_bytes);
ROCKS_LOG_HEADER(
log,
" Options.bottommost_compression_opts.zstd_max_train_bytes: "
"%" ROCKSDB_PRIszt,
bottommost_compression_opts.zstd_max_train_bytes);
ROCKS_LOG_HEADER(
log, " Options.bottommost_compression_opts.enabled: %s",
bottommost_compression_opts.enabled ? "true" : "false");
ROCKS_LOG_HEADER(log, " Options.compression_opts.window_bits: %d",
compression_opts.window_bits);
ROCKS_LOG_HEADER(log, " Options.compression_opts.level: %d",
@ -173,6 +195,9 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
" Options.compression_opts.zstd_max_train_bytes: "
"%" ROCKSDB_PRIszt,
compression_opts.zstd_max_train_bytes);
ROCKS_LOG_HEADER(log,
" Options.compression_opts.enabled: %s",
compression_opts.enabled ? "true" : "false");
ROCKS_LOG_HEADER(log, " Options.level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger);
ROCKS_LOG_HEADER(log, " Options.level0_slowdown_writes_trigger: %d",

@ -856,6 +856,65 @@ Status StringToMap(const std::string& opts_str,
return Status::OK();
}
Status ParseCompressionOptions(const std::string& value, const std::string& name,
CompressionOptions& compression_opts) {
size_t start = 0;
size_t end = value.find(':');
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.window_bits = ParseInt(value.substr(start, end - start));
start = end + 1;
end = value.find(':', start);
if (end == std::string::npos) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
compression_opts.level = ParseInt(value.substr(start, end - start));
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument("unable to parse the specified CF option " +
name);
}
end = value.find(':', start);
compression_opts.strategy =
ParseInt(value.substr(start, value.size() - start));
// max_dict_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.max_dict_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// zstd_max_train_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.zstd_max_train_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
}
// enabled is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
compression_opts.enabled =
ParseBoolean("", value.substr(start, value.size() - start));
}
return Status::OK();
}
Status ParseColumnFamilyOption(const std::string& name,
const std::string& org_value,
ColumnFamilyOptions* new_options,
@ -904,51 +963,17 @@ Status ParseColumnFamilyOption(const std::string& name,
"unable to parse the specified CF option " + name);
}
new_options->memtable_factory.reset(new_mem_factory.release());
} else if (name == "compression_opts") {
size_t start = 0;
size_t end = value.find(':');
if (end == std::string::npos) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
new_options->compression_opts.window_bits =
ParseInt(value.substr(start, end - start));
start = end + 1;
end = value.find(':', start);
if (end == std::string::npos) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
new_options->compression_opts.level =
ParseInt(value.substr(start, end - start));
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
end = value.find(':', start);
new_options->compression_opts.strategy =
ParseInt(value.substr(start, value.size() - start));
// max_dict_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
new_options->compression_opts.max_dict_bytes =
ParseInt(value.substr(start, value.size() - start));
end = value.find(':', start);
} else if (name == "bottommost_compression_opts") {
Status s = ParseCompressionOptions(
value, name, new_options->bottommost_compression_opts);
if (!s.ok()) {
return s;
}
// zstd_max_train_bytes is optional for backwards compatibility
if (end != std::string::npos) {
start = end + 1;
if (start >= value.size()) {
return Status::InvalidArgument(
"unable to parse the specified CF option " + name);
}
new_options->compression_opts.zstd_max_train_bytes =
ParseInt(value.substr(start, value.size() - start));
} else if (name == "compression_opts") {
Status s =
ParseCompressionOptions(value, name, new_options->compression_opts);
if (!s.ok()) {
return s;
}
} else {
auto iter = cf_options_type_info.find(name);

@ -381,6 +381,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
options->rate_limit_delay_max_milliseconds = 33;
options->compaction_options_universal = CompactionOptionsUniversal();
options->compression_opts = CompressionOptions();
options->bottommost_compression_opts = CompressionOptions();
options->hard_rate_limit = 0;
options->soft_rate_limit = 0;
options->purge_redundant_kvs_while_flush = false;

@ -62,7 +62,8 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
"kZSTD:"
"kZSTDNotFinalCompression"},
{"bottommost_compression", "kLZ4Compression"},
{"compression_opts", "4:5:6:7"},
{"bottommost_compression_opts", "5:6:7:8:9:true"},
{"compression_opts", "4:5:6:7:8:true"},
{"num_levels", "8"},
{"level0_file_num_compaction_trigger", "8"},
{"level0_slowdown_writes_trigger", "9"},
@ -159,7 +160,15 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.compression_opts.level, 5);
ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6);
ASSERT_EQ(new_cf_opt.compression_opts.max_dict_bytes, 7);
ASSERT_EQ(new_cf_opt.compression_opts.zstd_max_train_bytes, 8);
ASSERT_EQ(new_cf_opt.compression_opts.enabled, true);
ASSERT_EQ(new_cf_opt.bottommost_compression, kLZ4Compression);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.window_bits, 5);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.level, 6);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.strategy, 7);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.max_dict_bytes, 8);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.zstd_max_train_bytes, 9);
ASSERT_EQ(new_cf_opt.bottommost_compression_opts.enabled, true);
ASSERT_EQ(new_cf_opt.num_levels, 8);
ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8);
ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9);
@ -716,6 +725,8 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
"write_buffer_size=10;max_write_buffer_number=16;"
"block_based_table_factory={block_cache=1M;block_size=4;};"
"compression_opts=4:5:6;create_if_missing=true;max_open_files=1;"
"bottommost_compression_opts=5:6:7;create_if_missing=true;max_open_files="
"1;"
"rate_limiter_bytes_per_sec=1024",
&new_options));
@ -723,7 +734,15 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
ASSERT_EQ(new_options.compression_opts.level, 5);
ASSERT_EQ(new_options.compression_opts.strategy, 6);
ASSERT_EQ(new_options.compression_opts.max_dict_bytes, 0);
ASSERT_EQ(new_options.compression_opts.zstd_max_train_bytes, 0);
ASSERT_EQ(new_options.compression_opts.enabled, false);
ASSERT_EQ(new_options.bottommost_compression, kDisableCompressionOption);
ASSERT_EQ(new_options.bottommost_compression_opts.window_bits, 5);
ASSERT_EQ(new_options.bottommost_compression_opts.level, 6);
ASSERT_EQ(new_options.bottommost_compression_opts.strategy, 7);
ASSERT_EQ(new_options.bottommost_compression_opts.max_dict_bytes, 0);
ASSERT_EQ(new_options.bottommost_compression_opts.zstd_max_train_bytes, 0);
ASSERT_EQ(new_options.bottommost_compression_opts.enabled, false);
ASSERT_EQ(new_options.write_buffer_size, 10U);
ASSERT_EQ(new_options.max_write_buffer_number, 16);
BlockBasedTableOptions new_block_based_table_options =

@ -150,13 +150,21 @@ Status SstFileWriter::Open(const std::string& file_path) {
sst_file->SetIOPriority(r->io_priority);
CompressionType compression_type;
CompressionOptions compression_opts;
if (r->ioptions.bottommost_compression != kDisableCompressionOption) {
compression_type = r->ioptions.bottommost_compression;
if (r->ioptions.bottommost_compression_opts.enabled) {
compression_opts = r->ioptions.bottommost_compression_opts;
} else {
compression_opts = r->ioptions.compression_opts;
}
} else if (!r->ioptions.compression_per_level.empty()) {
// Use the compression of the last level if we have per level compression
compression_type = *(r->ioptions.compression_per_level.rbegin());
compression_opts = r->ioptions.compression_opts;
} else {
compression_type = r->mutable_cf_options.compression;
compression_opts = r->ioptions.compression_opts;
}
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
@ -190,9 +198,9 @@ Status SstFileWriter::Open(const std::string& file_path) {
TableBuilderOptions table_builder_options(
r->ioptions, r->mutable_cf_options, r->internal_comparator,
&int_tbl_prop_collector_factories, compression_type,
r->ioptions.compression_opts, nullptr /* compression_dict */,
r->skip_filters, r->column_family_name, unknown_level);
&int_tbl_prop_collector_factories, compression_type, compression_opts,
nullptr /* compression_dict */, r->skip_filters, r->column_family_name,
unknown_level);
r->file_writer.reset(
new WritableFileWriter(std::move(sst_file), r->env_options));

Loading…
Cancel
Save