diff --git a/db/column_family.cc b/db/column_family.cc index 1b96bbcf7..fe56db721 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -535,6 +535,8 @@ Compaction* ColumnFamilyData::PickCompaction( return result; } +const int ColumnFamilyData::kCompactAllLevels = -1; + Compaction* ColumnFamilyData::CompactRange( const MutableCFOptions& mutable_cf_options, int input_level, int output_level, uint32_t output_path_id, diff --git a/db/column_family.h b/db/column_family.h index e90a757c4..f9db0df6c 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -236,6 +236,9 @@ class ColumnFamilyData { // REQUIRES: DB mutex held Compaction* PickCompaction(const MutableCFOptions& mutable_options, LogBuffer* log_buffer); + // A flag to tell a manual compaction is to compact all levels together + // instad of for specific level. + static const int kCompactAllLevels; // REQUIRES: DB mutex held Compaction* CompactRange( const MutableCFOptions& mutable_cf_options, diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index ca813e38b..90293004c 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -16,6 +16,7 @@ #include #include #include +#include "db/column_family.h" #include "db/filename.h" #include "util/log_buffer.h" #include "util/statistics.h" @@ -357,6 +358,45 @@ Compaction* CompactionPicker::CompactRange( // CompactionPickerFIFO has its own implementation of compact range assert(ioptions_.compaction_style != kCompactionStyleFIFO); + if (input_level == ColumnFamilyData::kCompactAllLevels) { + assert(ioptions_.compaction_style == kCompactionStyleUniversal); + + // Universal compaction with more than one level always compacts all the + // files together to the last level. + assert(vstorage->num_levels() > 1); + // DBImpl::CompactRange() set output level to be the last level + assert(output_level == vstorage->num_levels() - 1); + // DBImpl::RunManualCompaction will make full range for universal compaction + assert(begin == nullptr); + assert(end == nullptr); + *compaction_end = nullptr; + + int start_level = 0; + for (; start_level < vstorage->num_levels() && + vstorage->NumLevelFiles(start_level) == 0; + start_level++) { + } + if (start_level == vstorage->num_levels()) { + return nullptr; + } + + std::vector inputs(vstorage->num_levels() - + start_level); + for (int level = start_level; level < vstorage->num_levels(); level++) { + inputs[level - start_level].level = level; + auto& files = inputs[level - start_level].files; + for (FileMetaData* f : vstorage->LevelFiles(level)) { + files.push_back(f); + } + } + return new Compaction( + vstorage, mutable_cf_options, std::move(inputs), output_level, + mutable_cf_options.MaxFileSizeForLevel(output_level), + /* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id, + GetCompressionType(ioptions_, output_level, 1), + /* grandparents */ {}, /* is manual */ true); + } + CompactionInputFiles inputs; inputs.level = input_level; bool covering_the_whole_range = true; diff --git a/db/db_impl.cc b/db/db_impl.cc index 1461e7d22..1a1807d48 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1266,25 +1266,41 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, } } } - for (int level = 0; level <= max_level_with_files; level++) { - // in case the compaction is unversal or if we're compacting the - // bottom-most level, the output level will be the same as input one. - // level 0 can never be the bottommost level (i.e. if all files are in level - // 0, we will compact to level 1) - if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || - cfd->ioptions()->compaction_style == kCompactionStyleFIFO || - (level == max_level_with_files && level > 0)) { - s = RunManualCompaction(cfd, level, level, target_path_id, begin, end); - } else { - // TODO(sdong) Skip empty levels if possible. - s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin, - end); + + if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && + cfd->NumberLevels() > 1) { + // Always compact all files together. + int output_level = 0; + if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal && + cfd->NumberLevels() > 1) { + output_level = cfd->NumberLevels() - 1; } - if (!s.ok()) { - LogFlush(db_options_.info_log); - return s; + s = RunManualCompaction(cfd, ColumnFamilyData::kCompactAllLevels, + output_level, target_path_id, begin, end); + } else { + for (int level = 0; level <= max_level_with_files; level++) { + // in case the compaction is unversal or if we're compacting the + // bottom-most level, the output level will be the same as input one. + // level 0 can never be the bottommost level (i.e. if all files are in + // level 0, we will compact to level 1) + if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || + cfd->ioptions()->compaction_style == kCompactionStyleFIFO || + (level == max_level_with_files && level > 0)) { + s = RunManualCompaction(cfd, level, level, target_path_id, begin, end); + } else { + // TODO(sdong) Skip empty levels if possible. + s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin, + end); + } + if (!s.ok()) { + break; + } } } + if (!s.ok()) { + LogFlush(db_options_.info_log); + return s; + } if (reduce_level) { s = ReFitLevel(cfd, max_level_with_files, target_level); @@ -1668,7 +1684,8 @@ SequenceNumber DBImpl::GetLatestSequenceNumber() const { Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level, int output_level, uint32_t output_path_id, const Slice* begin, const Slice* end) { - assert(input_level >= 0); + assert(input_level == ColumnFamilyData::kCompactAllLevels || + input_level >= 0); InternalKey begin_storage, end_storage; diff --git a/db/db_test.cc b/db/db_test.cc index f94e0ce21..96973aeb2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -5250,7 +5250,7 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) { int max_key3 = 800; // Stage 1: open a DB with universal compaction, num_levels=1 - Options options; + Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; options.num_levels = 1; options.write_buffer_size = 100 << 10; // 100KB @@ -5272,7 +5272,6 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) { ASSERT_EQ(non_level0_num_files, 0); // Stage 2: reopen with universal compaction, num_levels=4 - options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; options.num_levels = 4; options = CurrentOptions(options); @@ -5302,7 +5301,6 @@ TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) { // Need to restart it once to remove higher level records in manifest. ReopenWithColumnFamilies({"default", "pikachu"}, options); // Final reopen - options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; options.num_levels = 1; options = CurrentOptions(options); @@ -5808,8 +5806,12 @@ TEST_F(DBTest, CompactionFilterWithValueChange) { // push all files to lower levels ASSERT_OK(Flush(1)); - dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); - dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); + if (option_config_ != kUniversalCompactionMultiLevel) { + dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); + dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); + } else { + dbfull()->CompactRange(handles_[1], nullptr, nullptr); + } // re-write all data again for (int i = 0; i < 100001; i++) { @@ -5821,8 +5823,12 @@ TEST_F(DBTest, CompactionFilterWithValueChange) { // push all files to lower levels. This should // invoke the compaction filter for all 100000 keys. ASSERT_OK(Flush(1)); - dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); - dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); + if (option_config_ != kUniversalCompactionMultiLevel) { + dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); + dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); + } else { + dbfull()->CompactRange(handles_[1], nullptr, nullptr); + } // verify that all keys now have the new value that // was set by the compaction process. @@ -7145,11 +7151,15 @@ TEST_F(DBTest, DropWrites) { env_->drop_writes_.store(true, std::memory_order_release); env_->sleep_counter_.Reset(); for (int i = 0; i < 5; i++) { - for (int level = 0; level < dbfull()->NumberLevels(); level++) { - if (level > 0 && level == dbfull()->NumberLevels() - 1) { - break; + if (option_config_ != kUniversalCompactionMultiLevel) { + for (int level = 0; level < dbfull()->NumberLevels(); level++) { + if (level > 0 && level == dbfull()->NumberLevels() - 1) { + break; + } + dbfull()->TEST_CompactRange(level, nullptr, nullptr); } - dbfull()->TEST_CompactRange(level, nullptr, nullptr); + } else { + dbfull()->CompactRange(nullptr, nullptr); } }