From 9edda37027d0d082d92f97c30b6f104c28047e37 Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Thu, 17 Oct 2013 13:33:39 -0700 Subject: [PATCH] Universal Compaction to Have a Size Percentage Threshold To Decide Whether to Compress Summary: This patch adds a option for universal compaction to allow us to only compress output files if the files compacted previously did not yet reach a specified ratio, to save CPU costs in some cases. Compression is always skipped for flushing. This is because the size information is not easy to evaluate for flushing case. We can improve it later. Test Plan: add test DBTest.UniversalCompactionCompressRatio1 and DBTest.UniversalCompactionCompressRatio12 Reviewers: dhruba, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D13467 --- db/builder.cc | 6 +- db/builder.h | 3 +- db/db_impl.cc | 12 +++- db/db_impl.h | 3 + db/db_test.cc | 98 ++++++++++++++++++++++++++ db/repair.cc | 2 +- db/version_set.cc | 30 ++++++-- db/version_set.h | 6 +- include/rocksdb/table_builder.h | 7 +- include/rocksdb/universal_compaction.h | 18 +++++ table/table_builder.cc | 37 ++++++---- 11 files changed, 195 insertions(+), 27 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index af9cff7ef..b9206b4e2 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -30,7 +30,8 @@ Status BuildTable(const std::string& dbname, FileMetaData* meta, const Comparator* user_comparator, const SequenceNumber newest_snapshot, - const SequenceNumber earliest_seqno_in_memtable) { + const SequenceNumber earliest_seqno_in_memtable, + const bool enable_compression) { Status s; meta->file_size = 0; meta->smallest_seqno = meta->largest_seqno = 0; @@ -51,7 +52,8 @@ Status BuildTable(const std::string& dbname, if (!s.ok()) { return s; } - TableBuilder* builder = new TableBuilder(options, file.get(), 0); + TableBuilder* builder = new TableBuilder(options, file.get(), 0, + enable_compression); // the first key is the smallest key Slice key = iter->key(); diff --git a/db/builder.h b/db/builder.h index dcb52e873..a09033e94 100644 --- a/db/builder.h +++ b/db/builder.h @@ -35,6 +35,7 @@ extern Status BuildTable(const std::string& dbname, FileMetaData* meta, const Comparator* user_comparator, const SequenceNumber newest_snapshot, - const SequenceNumber earliest_seqno_in_memtable); + const SequenceNumber earliest_seqno_in_memtable, + const bool enable_compression); } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index b05d3b18c..812c679b7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -821,7 +821,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { s = BuildTable(dbname_, env_, options_, storage_options_, table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, - earliest_seqno_in_memtable); + earliest_seqno_in_memtable, true); mutex_.Lock(); } @@ -877,10 +877,15 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, Status s; { mutex_.Unlock(); + // We skip compression if universal compression is used and the size + // threshold is set for compression. + bool enable_compression = (options_.compaction_style + != kCompactionStyleUniversal || + options_.compaction_options_universal.compression_size_percent < 0); s = BuildTable(dbname_, env_, options_, storage_options_, table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, - earliest_seqno_in_memtable); + earliest_seqno_in_memtable, enable_compression); mutex_.Lock(); } base->Unref(); @@ -1724,7 +1729,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), - compact->compaction->output_level())); + compact->compaction->output_level(), + compact->compaction->enable_compression())); } return s; } diff --git a/db/db_impl.h b/db/db_impl.h index 6a06e79bc..7833066ba 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -114,6 +114,9 @@ class DBImpl : public DB { // Trigger's a background call for testing. void TEST_PurgeObsoleteteWAL(); + // get total level0 file size. Only for testing. + uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);} + protected: Env* const env_; const std::string dbname_; diff --git a/db/db_test.cc b/db/db_test.cc index ae58d7cb9..6695f5f3c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -54,6 +54,12 @@ static std::string RandomString(Random* rnd, int len) { return r; } +static std::string CompressibleString(Random* rnd, int len) { + std::string r; + test::CompressibleString(rnd, 0.8, len, &r); + return r; +} + namespace anon { class AtomicCounter { private: @@ -1867,6 +1873,7 @@ TEST(DBTest, UniversalCompactionOptions) { options.write_buffer_size = 100<<10; //100KB options.level0_file_num_compaction_trigger = 4; options.num_levels = 1; + options.compaction_options_universal.compression_size_percent = -1; Reopen(&options); Random rnd(301); @@ -1894,6 +1901,97 @@ TEST(DBTest, UniversalCompactionOptions) { } } +TEST(DBTest, UniversalCompactionCompressRatio1) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 100<<10; //100KB + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 1; + options.compaction_options_universal.compression_size_percent = 70; + Reopen(&options); + + Random rnd(301); + int key_idx = 0; + + // The first compaction (2) is compressed. + for (int num = 0; num < 2; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 2 * 0.9); + + // The second compaction (4) is compressed + for (int num = 0; num < 2; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 4 * 0.9); + + // The third compaction (2 4) is compressed since this time it is + // (1 1 3.2) and 3.2/5.2 doesn't reach ratio. + for (int num = 0; num < 2; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 6 * 0.9); + + // When we start for the compaction up to (2 4 8), the latest + // compressed is not compressed. + for (int num = 0; num < 8; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_GT((int ) dbfull()->TEST_GetLevel0TotalSize(), + 120000 * 12 * 0.8 + 110000 * 2); +} + +TEST(DBTest, UniversalCompactionCompressRatio2) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 100<<10; //100KB + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 1; + options.compaction_options_universal.compression_size_percent = 95; + Reopen(&options); + + Random rnd(301); + int key_idx = 0; + + // When we start for the compaction up to (2 4 8), the latest + // compressed is compressed given the size ratio to compress. + for (int num = 0; num < 14; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), + 120000 * 12 * 0.8 + 110000 * 2); +} + TEST(DBTest, ConvertCompactionStyle) { Random rnd(301); int max_key_level_insert = 200; diff --git a/db/repair.cc b/db/repair.cc index c9eeb203c..252f24e45 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -226,7 +226,7 @@ class Repairer { Iterator* iter = mem->NewIterator(); status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_, iter, &meta, - icmp_.user_comparator(), 0, 0); + icmp_.user_comparator(), 0, 0, true); delete iter; mem->Unref(); mem = nullptr; diff --git a/db/version_set.cc b/db/version_set.cc index f9e23493f..5c93550cf 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2250,8 +2250,10 @@ Compaction* VersionSet::PickCompactionUniversalSizeAmp( assert(start_index >= 0 && start_index < file_by_time.size() - 1); // create a compaction request + // We always compact all the files, so always compress. Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), - LLONG_MAX, NumberLevels()); + LLONG_MAX, NumberLevels(), false, + true); c->score_ = score; for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) { int index = file_by_time[loop]; @@ -2356,11 +2358,30 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( if (!done || candidate_count <= 1) { return nullptr; } + unsigned int first_index_after = start_index + candidate_count; + // Compression is enabled if files compacted earlier already reached + // size ratio of compression. + bool enable_compression = true; + int ratio_to_compress = + options_->compaction_options_universal.compression_size_percent; + if (ratio_to_compress >= 0) { + uint64_t total_size = TotalFileSize(current_->files_[level]); + uint64_t older_file_size = 0; + for (unsigned int i = file_by_time.size() - 1; i >= first_index_after; + i--) { + older_file_size += current_->files_[level][file_by_time[i]]->file_size; + if (older_file_size * 100L >= total_size * (long) ratio_to_compress) { + enable_compression = false; + break; + } + } + } Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), - LLONG_MAX, NumberLevels()); + LLONG_MAX, NumberLevels(), false, + enable_compression); c->score_ = score; - for (unsigned int i = start_index; i < start_index + candidate_count; i++) { + for (unsigned int i = start_index; i < first_index_after; i++) { int index = file_by_time[i]; FileMetaData* f = current_->files_[level][index]; c->inputs_[0].push_back(f); @@ -2884,7 +2905,7 @@ Compaction* VersionSet::CompactRange( Compaction::Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, - bool seek_compaction) + bool seek_compaction, bool enable_compression) : level_(level), out_level_(out_level), max_output_file_size_(target_file_size), @@ -2892,6 +2913,7 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size, input_version_(nullptr), number_levels_(number_levels), seek_compaction_(seek_compaction), + enable_compression_(enable_compression), grandparent_index_(0), seen_key_(false), overlapped_bytes_(0), diff --git a/db/version_set.h b/db/version_set.h index 8e24892b5..da933857c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -558,6 +558,9 @@ class Compaction { // Maximum size of files to build during this compaction. uint64_t MaxOutputFileSize() const { return max_output_file_size_; } + // Whether compression will be enabled for compaction outputs + bool enable_compression() const { return enable_compression_; } + // Is this a trivial compaction that can be implemented by just // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; @@ -592,7 +595,7 @@ class Compaction { explicit Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, - bool seek_compaction = false); + bool seek_compaction = false, bool enable_compression = true); int level_; int out_level_; // levels to which output files are stored @@ -603,6 +606,7 @@ class Compaction { int number_levels_; bool seek_compaction_; + bool enable_compression_; // Each compaction reads inputs from "level_" and "level_+1" std::vector inputs_[2]; // The two sets of inputs diff --git a/include/rocksdb/table_builder.h b/include/rocksdb/table_builder.h index 8cb3440f4..da0e07570 100644 --- a/include/rocksdb/table_builder.h +++ b/include/rocksdb/table_builder.h @@ -30,7 +30,12 @@ class TableBuilder { // caller to close the file after calling Finish(). The output file // will be part of level specified by 'level'. A value of -1 means // that the caller does not know which level the output file will reside. - TableBuilder(const Options& options, WritableFile* file, int level=-1); + // + // If enable_compression=true, this table will follow the compression + // setting given in parameter options. If enable_compression=false, the + // table will not be compressed. + TableBuilder(const Options& options, WritableFile* file, int level=-1, + const bool enable_compression=true); // REQUIRES: Either Finish() or Abandon() has been called. ~TableBuilder(); diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index 81144e8c8..46de688fc 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -51,6 +51,23 @@ class CompactionOptionsUniversal { // 300 bytes of storage. unsigned int max_size_amplification_percent; + // If this option is set to be -1 (the default value), all the output files + // will follow compression type specified. + // + // If this option is not negative, we will try to make sure compressed + // size is just above this value. In normal cases, at least this percentage + // of data will be compressed. + // When we are compacting to a new file, here is the criteria whether + // it needs to be compressed: assuming here are the list of files sorted + // by generation time: + // A1...An B1...Bm C1...Ct + // where A1 is the newest and Ct is the oldest, and we are going to compact + // B1...Bm, we calculate the total size of all the files as total_size, as + // well as the total size of C1...Ct as total_C, the compaction output file + // will be compressed iff + // total_C / total_size < this percentage + int compression_size_percent; + // The algorithm used to stop picking files into a single compaction run // Default: kCompactionStopStyleTotalSize CompactionStopStyle stop_style; @@ -61,6 +78,7 @@ class CompactionOptionsUniversal { min_merge_width(2), max_merge_width(UINT_MAX), max_size_amplification_percent(200), + compression_size_percent(-1), stop_style(kCompactionStopStyleTotalSize) { } }; diff --git a/table/table_builder.cc b/table/table_builder.cc index f4996f75d..7665ef793 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -68,6 +68,8 @@ struct TableBuilder::Rep { BlockBuilder data_block; BlockBuilder index_block; std::string last_key; + // Whether enable compression in this table. + bool enable_compression; uint64_t num_entries = 0; uint64_t num_data_blocks = 0; @@ -92,12 +94,13 @@ struct TableBuilder::Rep { std::string compressed_output; - Rep(const Options& opt, WritableFile* f) + Rep(const Options& opt, WritableFile* f, bool enable_compression) : options(opt), index_block_options(opt), file(f), data_block(&options), index_block(&index_block_options), + enable_compression(enable_compression), filter_block(opt.filter_policy == nullptr ? nullptr : new FilterBlockBuilder(opt)), pending_index_entry(false) { @@ -106,8 +109,8 @@ struct TableBuilder::Rep { }; TableBuilder::TableBuilder(const Options& options, WritableFile* file, - int level) - : rep_(new Rep(options, file)), level_(level) { + int level, const bool enable_compression) + : rep_(new Rep(options, file, enable_compression)), level_(level) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } @@ -209,18 +212,24 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { Slice block_contents; std::string* compressed = &r->compressed_output; CompressionType type; - // If the use has specified a different compression level for each level, - // then pick the compresison for that level. - if (!r->options.compression_per_level.empty()) { - const int n = r->options.compression_per_level.size(); - // It is possible for level_ to be -1; in that case, we use level - // 0's compression. This occurs mostly in backwards compatibility - // situations when the builder doesn't know what level the file - // belongs to. Likewise, if level_ is beyond the end of the - // specified compression levels, use the last value. - type = r->options.compression_per_level[std::max(0, std::min(level_, n))]; + if (!r->enable_compression) { + // disable compression + type = kNoCompression; } else { - type = r->options.compression; + // If the use has specified a different compression level for each level, + // then pick the compresison for that level. + if (!r->options.compression_per_level.empty()) { + const int n = r->options.compression_per_level.size(); + // It is possible for level_ to be -1; in that case, we use level + // 0's compression. This occurs mostly in backwards compatibility + // situations when the builder doesn't know what level the file + // belongs to. Likewise, if level_ is beyond the end of the + // specified compression levels, use the last value. + type = r->options.compression_per_level[std::max(0, + std::min(level_, n))]; + } else { + type = r->options.compression; + } } switch (type) { case kNoCompression: