Add bottommost_compression option

Summary:
Add a new option that can be used to set a specific compression algorithm for bottommost level.
This option will only affect levels larger than base level.

I have also updated CompactionJobInfo to include the compression algorithm used in compaction

Test Plan:
added new unittest
existing unittests

Reviewers: andrewkr, yhchiang, sdong

Reviewed By: sdong

Subscribers: lightmark, andrewkr, dhruba, yoshinorim

Differential Revision: https://reviews.facebook.net/D57669
main
Islam AbdelRahman 9 years ago
parent bfb6b1b8a8
commit 4b31723433
  1. 5
      HISTORY.md
  2. 5
      db/compaction.cc
  3. 22
      db/compaction_picker.cc
  4. 1
      db/compaction_picker.h
  5. 1
      db/db_impl.cc
  6. 95
      db/db_test2.cc
  7. 2
      include/rocksdb/immutable_options.h
  8. 4
      include/rocksdb/listener.h
  9. 10
      include/rocksdb/options.h
  10. 7
      util/options.cc
  11. 8
      util/options_helper.h
  12. 1
      util/options_settable_test.cc
  13. 3
      util/options_test.cc

@ -1,4 +1,9 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased
### Public API changes
* Add bottommost_compression option, This option can be used to set a specific compression algorithm for the bottommost level (Last level containing files in the DB).
* Introduce CompactionJobInfo::compression, This field state the compression algorithm used to generate the output files of the compaction.
## 4.8.0 (5/2/2016) ## 4.8.0 (5/2/2016)
### Public API Change ### Public API Change
* Allow preset compression dictionary for improved compression of block-based tables. This is supported for zlib, zstd, and lz4. The compression dictionary's size is configurable via CompressionOptions::max_dict_bytes. * Allow preset compression dictionary for improved compression of block-based tables. This is supported for zlib, zstd, and lz4. The compression dictionary's size is configurable via CompressionOptions::max_dict_bytes.

@ -203,8 +203,9 @@ Compaction::~Compaction() {
} }
bool Compaction::InputCompressionMatchesOutput() const { bool Compaction::InputCompressionMatchesOutput() const {
int base_level = input_version_->storage_info()->base_level(); VersionStorageInfo* vstorage = input_version_->storage_info();
bool matches = (GetCompressionType(*cfd_->ioptions(), start_level_, int base_level = vstorage->base_level();
bool matches = (GetCompressionType(*cfd_->ioptions(), vstorage, start_level_,
base_level) == output_compression_); base_level) == output_compression_);
if (matches) { if (matches) {
TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches"); TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");

@ -109,12 +109,20 @@ SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
// matter what the values of the other two parameters are. // matter what the values of the other two parameters are.
// Otherwise, the compression type is determined based on options and level. // Otherwise, the compression type is determined based on options and level.
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
int level, int base_level, int level, int base_level,
const bool enable_compression) { const bool enable_compression) {
if (!enable_compression) { if (!enable_compression) {
// disable compression // disable compression
return kNoCompression; return kNoCompression;
} }
// If bottommost_compression is set and we are compacting to the
// bottommost level then we should use it.
if (ioptions.bottommost_compression != kDisableCompressionOption &&
level > base_level && level >= (vstorage->num_non_empty_levels() - 1)) {
return ioptions.bottommost_compression;
}
// If the use has specified a different compression level for each level, // If the use has specified a different compression level for each level,
// then pick the compression for that level. // then pick the compression for that level.
if (!ioptions.compression_per_level.empty()) { if (!ioptions.compression_per_level.empty()) {
@ -505,7 +513,7 @@ Compaction* CompactionPicker::CompactRange(
vstorage, mutable_cf_options, std::move(inputs), output_level, vstorage, mutable_cf_options, std::move(inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxFileSizeForLevel(output_level),
/* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id, /* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id,
GetCompressionType(ioptions_, output_level, 1), GetCompressionType(ioptions_, vstorage, output_level, 1),
/* grandparents */ {}, /* is manual */ true); /* grandparents */ {}, /* is manual */ true);
if (start_level == 0) { if (start_level == 0) {
level0_compactions_in_progress_.insert(c); level0_compactions_in_progress_.insert(c);
@ -605,8 +613,8 @@ Compaction* CompactionPicker::CompactRange(
vstorage, mutable_cf_options, std::move(compaction_inputs), output_level, vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(input_level), mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
output_path_id, output_path_id, GetCompressionType(ioptions_, vstorage, output_level,
GetCompressionType(ioptions_, output_level, vstorage->base_level()), vstorage->base_level()),
std::move(grandparents), /* is manual compaction */ true); std::move(grandparents), /* is manual compaction */ true);
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
@ -1026,7 +1034,8 @@ Compaction* LevelCompactionPicker::PickCompaction(
mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(level), mutable_cf_options.MaxGrandParentOverlapBytes(level),
GetPathId(ioptions_, mutable_cf_options, output_level), GetPathId(ioptions_, mutable_cf_options, output_level),
GetCompressionType(ioptions_, output_level, vstorage->base_level()), GetCompressionType(ioptions_, vstorage, output_level,
vstorage->base_level()),
std::move(grandparents), is_manual, score, std::move(grandparents), is_manual, score,
false /* deletion_compaction */, compaction_reason); false /* deletion_compaction */, compaction_reason);
@ -1638,7 +1647,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
return new Compaction( return new Compaction(
vstorage, mutable_cf_options, std::move(inputs), output_level, vstorage, mutable_cf_options, std::move(inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id, mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
GetCompressionType(ioptions_, start_level, 1, enable_compression), GetCompressionType(ioptions_, vstorage, start_level, 1,
enable_compression),
/* grandparents */ {}, /* is manual */ false, score, /* grandparents */ {}, /* is manual */ false, score,
false /* deletion_compaction */, compaction_reason); false /* deletion_compaction */, compaction_reason);
} }
@ -1763,7 +1773,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
vstorage->num_levels() - 1, vstorage->num_levels() - 1,
mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1), mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage->num_levels() - 1, 1), GetCompressionType(ioptions_, vstorage, vstorage->num_levels() - 1, 1),
/* grandparents */ {}, /* is manual */ false, score, /* grandparents */ {}, /* is manual */ false, score,
false /* deletion_compaction */, false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification); CompactionReason::kUniversalSizeAmplification);

@ -350,6 +350,7 @@ class NullCompactionPicker : public CompactionPicker {
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
int level, int base_level, int level, int base_level,
const bool enable_compression = true); const bool enable_compression = true);

@ -2066,6 +2066,7 @@ void DBImpl::NotifyOnCompactionCompleted(
info.stats = compaction_job_stats; info.stats = compaction_job_stats;
info.table_properties = c->GetOutputTableProperties(); info.table_properties = c->GetOutputTableProperties();
info.compaction_reason = c->compaction_reason(); info.compaction_reason = c->compaction_reason();
info.compression = c->output_compression();
for (size_t i = 0; i < c->num_input_levels(); ++i) { for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) { for (const auto fmd : *c->inputs(i)) {
auto fn = TableFileName(db_options_.db_paths, fmd->fd.GetNumber(), auto fn = TableFileName(db_options_.db_paths, fmd->fd.GetNumber(),

@ -817,6 +817,101 @@ TEST_F(DBTest2, FirstSnapshotTest) {
db_->ReleaseSnapshot(s1); db_->ReleaseSnapshot(s1);
} }
class CompactionCompressionListener : public EventListener {
public:
explicit CompactionCompressionListener(Options* db_options)
: db_options_(db_options) {}
void OnCompactionCompleted(DB* db, const CompactionJobInfo& ci) override {
// Figure out last level with files
int bottommost_level = 0;
for (int level = 0; level < db->NumberLevels(); level++) {
std::string files_at_level;
ASSERT_TRUE(
db->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
&files_at_level));
if (files_at_level != "0") {
bottommost_level = level;
}
}
if (db_options_->bottommost_compression != kDisableCompressionOption &&
ci.output_level == bottommost_level && ci.output_level >= 2) {
ASSERT_EQ(ci.compression, db_options_->bottommost_compression);
} else if (db_options_->compression_per_level.size() != 0) {
ASSERT_EQ(ci.compression,
db_options_->compression_per_level[ci.output_level]);
} else {
ASSERT_EQ(ci.compression, db_options_->compression);
}
max_level_checked = std::max(max_level_checked, ci.output_level);
}
int max_level_checked = 0;
const Options* db_options_;
};
TEST_F(DBTest2, CompressionOptions) {
if (!Zlib_Supported() || !Snappy_Supported()) {
return;
}
Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = 2;
options.max_bytes_for_level_base = 100;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 7;
options.max_background_compactions = 1;
options.base_background_compactions = 1;
CompactionCompressionListener* listener =
new CompactionCompressionListener(&options);
options.listeners.emplace_back(listener);
const int kKeySize = 5;
const int kValSize = 20;
Random rnd(301);
for (int iter = 0; iter <= 2; iter++) {
listener->max_level_checked = 0;
if (iter == 0) {
// Use different compression algorithms for different levels but
// always use Zlib for bottommost level
options.compression_per_level = {kNoCompression, kNoCompression,
kNoCompression, kSnappyCompression,
kSnappyCompression, kSnappyCompression,
kZlibCompression};
options.compression = kNoCompression;
options.bottommost_compression = kZlibCompression;
} else if (iter == 1) {
// Use Snappy except for bottommost level use ZLib
options.compression_per_level = {};
options.compression = kSnappyCompression;
options.bottommost_compression = kZlibCompression;
} else if (iter == 2) {
// Use Snappy everywhere
options.compression_per_level = {};
options.compression = kSnappyCompression;
options.bottommost_compression = kDisableCompressionOption;
}
DestroyAndReopen(options);
// Write 10 random files
for (int i = 0; i < 10; i++) {
for (int j = 0; j < 5; j++) {
ASSERT_OK(
Put(RandomString(&rnd, kKeySize), RandomString(&rnd, kValSize)));
}
ASSERT_OK(Flush());
dbfull()->TEST_WaitForCompact();
}
// Make sure that we wrote enough to check all 7 levels
ASSERT_EQ(listener->max_level_checked, 6);
}
}
class PinL0IndexAndFilterBlocksTest : public DBTestBase, class PinL0IndexAndFilterBlocksTest : public DBTestBase,
public testing::WithParamInterface<bool> { public testing::WithParamInterface<bool> {
public: public:

@ -85,6 +85,8 @@ struct ImmutableCFOptions {
std::vector<CompressionType> compression_per_level; std::vector<CompressionType> compression_per_level;
CompressionType bottommost_compression;
CompressionOptions compression_opts; CompressionOptions compression_opts;
bool level_compaction_dynamic_level_bytes; bool level_compaction_dynamic_level_bytes;

@ -20,6 +20,7 @@ typedef std::unordered_map<std::string, std::shared_ptr<const TableProperties>>
class DB; class DB;
class Status; class Status;
struct CompactionJobStats; struct CompactionJobStats;
enum CompressionType : char;
enum class TableFileCreationReason { enum class TableFileCreationReason {
kFlush, kFlush,
@ -142,6 +143,9 @@ struct CompactionJobInfo {
// Reason to run the compaction // Reason to run the compaction
CompactionReason compaction_reason; CompactionReason compaction_reason;
// Compression algorithm used for output files
CompressionType compression;
// If non-null, this variable stores detailed information // If non-null, this variable stores detailed information
// about this compaction. // about this compaction.
CompactionJobStats stats; CompactionJobStats stats;

@ -64,6 +64,9 @@ enum CompressionType : char {
kXpressCompression = 0x6, kXpressCompression = 0x6,
// zstd format is not finalized yet so it's subject to changes. // zstd format is not finalized yet so it's subject to changes.
kZSTDNotFinalCompression = 0x40, kZSTDNotFinalCompression = 0x40,
// kDisableCompressionOption is used to disable some compression options.
kDisableCompressionOption = -1,
}; };
enum CompactionStyle : char { enum CompactionStyle : char {
@ -369,6 +372,13 @@ struct ColumnFamilyOptions {
// change when data grows. // change when data grows.
std::vector<CompressionType> compression_per_level; std::vector<CompressionType> compression_per_level;
// Compression algorithm that will be used for the bottommost level that
// contain files. If level-compaction is used, this option will only affect
// levels after base level.
//
// Default: kDisableCompressionOption (Disabled)
CompressionType bottommost_compression;
// different options for compression algorithms // different options for compression algorithms
CompressionOptions compression_opts; CompressionOptions compression_opts;

@ -66,6 +66,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
use_fsync(options.use_fsync), use_fsync(options.use_fsync),
compression(options.compression), compression(options.compression),
compression_per_level(options.compression_per_level), compression_per_level(options.compression_per_level),
bottommost_compression(options.bottommost_compression),
compression_opts(options.compression_opts), compression_opts(options.compression_opts),
level_compaction_dynamic_level_bytes( level_compaction_dynamic_level_bytes(
options.level_compaction_dynamic_level_bytes), options.level_compaction_dynamic_level_bytes),
@ -88,6 +89,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
min_write_buffer_number_to_merge(1), min_write_buffer_number_to_merge(1),
max_write_buffer_number_to_maintain(0), max_write_buffer_number_to_maintain(0),
compression(Snappy_Supported() ? kSnappyCompression : kNoCompression), compression(Snappy_Supported() ? kSnappyCompression : kNoCompression),
bottommost_compression(kDisableCompressionOption),
prefix_extractor(nullptr), prefix_extractor(nullptr),
num_levels(7), num_levels(7),
level0_file_num_compaction_trigger(4), level0_file_num_compaction_trigger(4),
@ -146,6 +148,7 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options)
options.max_write_buffer_number_to_maintain), options.max_write_buffer_number_to_maintain),
compression(options.compression), compression(options.compression),
compression_per_level(options.compression_per_level), compression_per_level(options.compression_per_level),
bottommost_compression(options.bottommost_compression),
compression_opts(options.compression_opts), compression_opts(options.compression_opts),
prefix_extractor(options.prefix_extractor), prefix_extractor(options.prefix_extractor),
num_levels(options.num_levels), num_levels(options.num_levels),
@ -494,6 +497,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const {
Header(log, " Options.compression: %s", Header(log, " Options.compression: %s",
CompressionTypeToString(compression).c_str()); CompressionTypeToString(compression).c_str());
} }
Header(log, " Options.bottommost_compression: %s",
bottommost_compression == kDisableCompressionOption
? "Disabled"
: CompressionTypeToString(bottommost_compression).c_str());
Header(log, " Options.prefix_extractor: %s", Header(log, " Options.prefix_extractor: %s",
prefix_extractor == nullptr ? "nullptr" : prefix_extractor->Name()); prefix_extractor == nullptr ? "nullptr" : prefix_extractor->Name());
Header(log, " Options.num_levels: %d", num_levels); Header(log, " Options.num_levels: %d", num_levels);

@ -460,6 +460,9 @@ static std::unordered_map<std::string, OptionTypeInfo> cf_options_type_info = {
{"compression_per_level", {"compression_per_level",
{offsetof(struct ColumnFamilyOptions, compression_per_level), {offsetof(struct ColumnFamilyOptions, compression_per_level),
OptionType::kVectorCompressionType, OptionVerificationType::kNormal}}, OptionType::kVectorCompressionType, OptionVerificationType::kNormal}},
{"bottommost_compression",
{offsetof(struct ColumnFamilyOptions, bottommost_compression),
OptionType::kCompressionType, OptionVerificationType::kNormal}},
{"comparator", {"comparator",
{offsetof(struct ColumnFamilyOptions, comparator), OptionType::kComparator, {offsetof(struct ColumnFamilyOptions, comparator), OptionType::kComparator,
OptionVerificationType::kByName}}, OptionVerificationType::kByName}},
@ -575,8 +578,9 @@ static std::unordered_map<std::string, CompressionType>
{"kBZip2Compression", kBZip2Compression}, {"kBZip2Compression", kBZip2Compression},
{"kLZ4Compression", kLZ4Compression}, {"kLZ4Compression", kLZ4Compression},
{"kLZ4HCCompression", kLZ4HCCompression}, {"kLZ4HCCompression", kLZ4HCCompression},
{"kXpressCompression", kXpressCompression }, {"kXpressCompression", kXpressCompression},
{"kZSTDNotFinalCompression", kZSTDNotFinalCompression}}; {"kZSTDNotFinalCompression", kZSTDNotFinalCompression},
{"kDisableCompressionOption", kDisableCompressionOption}};
static std::unordered_map<std::string, BlockBasedTableOptions::IndexType> static std::unordered_map<std::string, BlockBasedTableOptions::IndexType>
block_base_table_index_type_string_map = { block_base_table_index_type_string_map = {

@ -397,6 +397,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"max_bytes_for_level_multiplier=60;" "max_bytes_for_level_multiplier=60;"
"memtable_factory=SkipListFactory;" "memtable_factory=SkipListFactory;"
"compression=kNoCompression;" "compression=kNoCompression;"
"bottommost_compression=kDisableCompressionOption;"
"min_partial_merge_operands=7576;" "min_partial_merge_operands=7576;"
"level0_stop_writes_trigger=33;" "level0_stop_writes_trigger=33;"
"num_levels=99;" "num_levels=99;"

@ -103,6 +103,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
"kLZ4HCCompression:" "kLZ4HCCompression:"
"kXpressCompression:" "kXpressCompression:"
"kZSTDNotFinalCompression"}, "kZSTDNotFinalCompression"},
{"bottommost_compression", "kLZ4Compression"},
{"compression_opts", "4:5:6:7"}, {"compression_opts", "4:5:6:7"},
{"num_levels", "8"}, {"num_levels", "8"},
{"level0_file_num_compaction_trigger", "8"}, {"level0_file_num_compaction_trigger", "8"},
@ -202,6 +203,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.compression_opts.level, 5); ASSERT_EQ(new_cf_opt.compression_opts.level, 5);
ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6); ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6);
ASSERT_EQ(new_cf_opt.compression_opts.max_dict_bytes, 7); ASSERT_EQ(new_cf_opt.compression_opts.max_dict_bytes, 7);
ASSERT_EQ(new_cf_opt.bottommost_compression, kLZ4Compression);
ASSERT_EQ(new_cf_opt.num_levels, 8); ASSERT_EQ(new_cf_opt.num_levels, 8);
ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8); ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8);
ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9); ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9);
@ -608,6 +610,7 @@ TEST_F(OptionsTest, GetOptionsFromStringTest) {
ASSERT_EQ(new_options.compression_opts.level, 5); ASSERT_EQ(new_options.compression_opts.level, 5);
ASSERT_EQ(new_options.compression_opts.strategy, 6); ASSERT_EQ(new_options.compression_opts.strategy, 6);
ASSERT_EQ(new_options.compression_opts.max_dict_bytes, 0); ASSERT_EQ(new_options.compression_opts.max_dict_bytes, 0);
ASSERT_EQ(new_options.bottommost_compression, kDisableCompressionOption);
ASSERT_EQ(new_options.write_buffer_size, 10U); ASSERT_EQ(new_options.write_buffer_size, 10U);
ASSERT_EQ(new_options.max_write_buffer_number, 16); ASSERT_EQ(new_options.max_write_buffer_number, 16);
BlockBasedTableOptions new_block_based_table_options = BlockBasedTableOptions new_block_based_table_options =

Loading…
Cancel
Save