diff --git a/db/builder.cc b/db/builder.cc index af9cff7ef..b9206b4e2 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -30,7 +30,8 @@ Status BuildTable(const std::string& dbname, FileMetaData* meta, const Comparator* user_comparator, const SequenceNumber newest_snapshot, - const SequenceNumber earliest_seqno_in_memtable) { + const SequenceNumber earliest_seqno_in_memtable, + const bool enable_compression) { Status s; meta->file_size = 0; meta->smallest_seqno = meta->largest_seqno = 0; @@ -51,7 +52,8 @@ Status BuildTable(const std::string& dbname, if (!s.ok()) { return s; } - TableBuilder* builder = new TableBuilder(options, file.get(), 0); + TableBuilder* builder = new TableBuilder(options, file.get(), 0, + enable_compression); // the first key is the smallest key Slice key = iter->key(); diff --git a/db/builder.h b/db/builder.h index dcb52e873..a09033e94 100644 --- a/db/builder.h +++ b/db/builder.h @@ -35,6 +35,7 @@ extern Status BuildTable(const std::string& dbname, FileMetaData* meta, const Comparator* user_comparator, const SequenceNumber newest_snapshot, - const SequenceNumber earliest_seqno_in_memtable); + const SequenceNumber earliest_seqno_in_memtable, + const bool enable_compression); } // namespace rocksdb diff --git a/db/db_impl.cc b/db/db_impl.cc index b05d3b18c..812c679b7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -821,7 +821,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { s = BuildTable(dbname_, env_, options_, storage_options_, table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, - earliest_seqno_in_memtable); + earliest_seqno_in_memtable, true); mutex_.Lock(); } @@ -877,10 +877,15 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, Status s; { mutex_.Unlock(); + // We skip compression if universal compression is used and the size + // threshold is set for compression. + bool enable_compression = (options_.compaction_style + != kCompactionStyleUniversal || + options_.compaction_options_universal.compression_size_percent < 0); s = BuildTable(dbname_, env_, options_, storage_options_, table_cache_.get(), iter, &meta, user_comparator(), newest_snapshot, - earliest_seqno_in_memtable); + earliest_seqno_in_memtable, enable_compression); mutex_.Lock(); } base->Unref(); @@ -1724,7 +1729,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), - compact->compaction->output_level())); + compact->compaction->output_level(), + compact->compaction->enable_compression())); } return s; } diff --git a/db/db_impl.h b/db/db_impl.h index 6a06e79bc..7833066ba 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -114,6 +114,9 @@ class DBImpl : public DB { // Trigger's a background call for testing. void TEST_PurgeObsoleteteWAL(); + // get total level0 file size. Only for testing. + uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);} + protected: Env* const env_; const std::string dbname_; diff --git a/db/db_test.cc b/db/db_test.cc index ae58d7cb9..6695f5f3c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -54,6 +54,12 @@ static std::string RandomString(Random* rnd, int len) { return r; } +static std::string CompressibleString(Random* rnd, int len) { + std::string r; + test::CompressibleString(rnd, 0.8, len, &r); + return r; +} + namespace anon { class AtomicCounter { private: @@ -1867,6 +1873,7 @@ TEST(DBTest, UniversalCompactionOptions) { options.write_buffer_size = 100<<10; //100KB options.level0_file_num_compaction_trigger = 4; options.num_levels = 1; + options.compaction_options_universal.compression_size_percent = -1; Reopen(&options); Random rnd(301); @@ -1894,6 +1901,97 @@ TEST(DBTest, UniversalCompactionOptions) { } } +TEST(DBTest, UniversalCompactionCompressRatio1) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 100<<10; //100KB + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 1; + options.compaction_options_universal.compression_size_percent = 70; + Reopen(&options); + + Random rnd(301); + int key_idx = 0; + + // The first compaction (2) is compressed. + for (int num = 0; num < 2; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 2 * 0.9); + + // The second compaction (4) is compressed + for (int num = 0; num < 2; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 4 * 0.9); + + // The third compaction (2 4) is compressed since this time it is + // (1 1 3.2) and 3.2/5.2 doesn't reach ratio. + for (int num = 0; num < 2; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 6 * 0.9); + + // When we start for the compaction up to (2 4 8), the latest + // compressed is not compressed. + for (int num = 0; num < 8; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_GT((int ) dbfull()->TEST_GetLevel0TotalSize(), + 120000 * 12 * 0.8 + 110000 * 2); +} + +TEST(DBTest, UniversalCompactionCompressRatio2) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 100<<10; //100KB + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 1; + options.compaction_options_universal.compression_size_percent = 95; + Reopen(&options); + + Random rnd(301); + int key_idx = 0; + + // When we start for the compaction up to (2 4 8), the latest + // compressed is compressed given the size ratio to compress. + for (int num = 0; num < 14; num++) { + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); + key_idx++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), + 120000 * 12 * 0.8 + 110000 * 2); +} + TEST(DBTest, ConvertCompactionStyle) { Random rnd(301); int max_key_level_insert = 200; diff --git a/db/repair.cc b/db/repair.cc index c9eeb203c..252f24e45 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -226,7 +226,7 @@ class Repairer { Iterator* iter = mem->NewIterator(); status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_, iter, &meta, - icmp_.user_comparator(), 0, 0); + icmp_.user_comparator(), 0, 0, true); delete iter; mem->Unref(); mem = nullptr; diff --git a/db/version_set.cc b/db/version_set.cc index f9e23493f..5c93550cf 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2250,8 +2250,10 @@ Compaction* VersionSet::PickCompactionUniversalSizeAmp( assert(start_index >= 0 && start_index < file_by_time.size() - 1); // create a compaction request + // We always compact all the files, so always compress. Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), - LLONG_MAX, NumberLevels()); + LLONG_MAX, NumberLevels(), false, + true); c->score_ = score; for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) { int index = file_by_time[loop]; @@ -2356,11 +2358,30 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp( if (!done || candidate_count <= 1) { return nullptr; } + unsigned int first_index_after = start_index + candidate_count; + // Compression is enabled if files compacted earlier already reached + // size ratio of compression. + bool enable_compression = true; + int ratio_to_compress = + options_->compaction_options_universal.compression_size_percent; + if (ratio_to_compress >= 0) { + uint64_t total_size = TotalFileSize(current_->files_[level]); + uint64_t older_file_size = 0; + for (unsigned int i = file_by_time.size() - 1; i >= first_index_after; + i--) { + older_file_size += current_->files_[level][file_by_time[i]]->file_size; + if (older_file_size * 100L >= total_size * (long) ratio_to_compress) { + enable_compression = false; + break; + } + } + } Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), - LLONG_MAX, NumberLevels()); + LLONG_MAX, NumberLevels(), false, + enable_compression); c->score_ = score; - for (unsigned int i = start_index; i < start_index + candidate_count; i++) { + for (unsigned int i = start_index; i < first_index_after; i++) { int index = file_by_time[i]; FileMetaData* f = current_->files_[level][index]; c->inputs_[0].push_back(f); @@ -2884,7 +2905,7 @@ Compaction* VersionSet::CompactRange( Compaction::Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, - bool seek_compaction) + bool seek_compaction, bool enable_compression) : level_(level), out_level_(out_level), max_output_file_size_(target_file_size), @@ -2892,6 +2913,7 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size, input_version_(nullptr), number_levels_(number_levels), seek_compaction_(seek_compaction), + enable_compression_(enable_compression), grandparent_index_(0), seen_key_(false), overlapped_bytes_(0), diff --git a/db/version_set.h b/db/version_set.h index 8e24892b5..da933857c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -558,6 +558,9 @@ class Compaction { // Maximum size of files to build during this compaction. uint64_t MaxOutputFileSize() const { return max_output_file_size_; } + // Whether compression will be enabled for compaction outputs + bool enable_compression() const { return enable_compression_; } + // Is this a trivial compaction that can be implemented by just // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; @@ -592,7 +595,7 @@ class Compaction { explicit Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, - bool seek_compaction = false); + bool seek_compaction = false, bool enable_compression = true); int level_; int out_level_; // levels to which output files are stored @@ -603,6 +606,7 @@ class Compaction { int number_levels_; bool seek_compaction_; + bool enable_compression_; // Each compaction reads inputs from "level_" and "level_+1" std::vector inputs_[2]; // The two sets of inputs diff --git a/include/rocksdb/table_builder.h b/include/rocksdb/table_builder.h index 8cb3440f4..da0e07570 100644 --- a/include/rocksdb/table_builder.h +++ b/include/rocksdb/table_builder.h @@ -30,7 +30,12 @@ class TableBuilder { // caller to close the file after calling Finish(). The output file // will be part of level specified by 'level'. A value of -1 means // that the caller does not know which level the output file will reside. - TableBuilder(const Options& options, WritableFile* file, int level=-1); + // + // If enable_compression=true, this table will follow the compression + // setting given in parameter options. If enable_compression=false, the + // table will not be compressed. + TableBuilder(const Options& options, WritableFile* file, int level=-1, + const bool enable_compression=true); // REQUIRES: Either Finish() or Abandon() has been called. ~TableBuilder(); diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index 81144e8c8..46de688fc 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -51,6 +51,23 @@ class CompactionOptionsUniversal { // 300 bytes of storage. unsigned int max_size_amplification_percent; + // If this option is set to be -1 (the default value), all the output files + // will follow compression type specified. + // + // If this option is not negative, we will try to make sure compressed + // size is just above this value. In normal cases, at least this percentage + // of data will be compressed. + // When we are compacting to a new file, here is the criteria whether + // it needs to be compressed: assuming here are the list of files sorted + // by generation time: + // A1...An B1...Bm C1...Ct + // where A1 is the newest and Ct is the oldest, and we are going to compact + // B1...Bm, we calculate the total size of all the files as total_size, as + // well as the total size of C1...Ct as total_C, the compaction output file + // will be compressed iff + // total_C / total_size < this percentage + int compression_size_percent; + // The algorithm used to stop picking files into a single compaction run // Default: kCompactionStopStyleTotalSize CompactionStopStyle stop_style; @@ -61,6 +78,7 @@ class CompactionOptionsUniversal { min_merge_width(2), max_merge_width(UINT_MAX), max_size_amplification_percent(200), + compression_size_percent(-1), stop_style(kCompactionStopStyleTotalSize) { } }; diff --git a/table/table_builder.cc b/table/table_builder.cc index f4996f75d..7665ef793 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -68,6 +68,8 @@ struct TableBuilder::Rep { BlockBuilder data_block; BlockBuilder index_block; std::string last_key; + // Whether enable compression in this table. + bool enable_compression; uint64_t num_entries = 0; uint64_t num_data_blocks = 0; @@ -92,12 +94,13 @@ struct TableBuilder::Rep { std::string compressed_output; - Rep(const Options& opt, WritableFile* f) + Rep(const Options& opt, WritableFile* f, bool enable_compression) : options(opt), index_block_options(opt), file(f), data_block(&options), index_block(&index_block_options), + enable_compression(enable_compression), filter_block(opt.filter_policy == nullptr ? nullptr : new FilterBlockBuilder(opt)), pending_index_entry(false) { @@ -106,8 +109,8 @@ struct TableBuilder::Rep { }; TableBuilder::TableBuilder(const Options& options, WritableFile* file, - int level) - : rep_(new Rep(options, file)), level_(level) { + int level, const bool enable_compression) + : rep_(new Rep(options, file, enable_compression)), level_(level) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } @@ -209,18 +212,24 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { Slice block_contents; std::string* compressed = &r->compressed_output; CompressionType type; - // If the use has specified a different compression level for each level, - // then pick the compresison for that level. - if (!r->options.compression_per_level.empty()) { - const int n = r->options.compression_per_level.size(); - // It is possible for level_ to be -1; in that case, we use level - // 0's compression. This occurs mostly in backwards compatibility - // situations when the builder doesn't know what level the file - // belongs to. Likewise, if level_ is beyond the end of the - // specified compression levels, use the last value. - type = r->options.compression_per_level[std::max(0, std::min(level_, n))]; + if (!r->enable_compression) { + // disable compression + type = kNoCompression; } else { - type = r->options.compression; + // If the use has specified a different compression level for each level, + // then pick the compresison for that level. + if (!r->options.compression_per_level.empty()) { + const int n = r->options.compression_per_level.size(); + // It is possible for level_ to be -1; in that case, we use level + // 0's compression. This occurs mostly in backwards compatibility + // situations when the builder doesn't know what level the file + // belongs to. Likewise, if level_ is beyond the end of the + // specified compression levels, use the last value. + type = r->options.compression_per_level[std::max(0, + std::min(level_, n))]; + } else { + type = r->options.compression; + } } switch (type) { case kNoCompression: