From b23bbaa82a21f0eb494933e2ce27a666e2ac9ae8 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 30 Mar 2015 14:04:21 -0700 Subject: [PATCH] Universal Compactions with Small Files Summary: With this change, we use L1 and up to store compaction outputs in universal compaction. The compaction pick logic stays the same. Outputs are stored in the largest "level" as possible. If options.num_levels=1, it behaves all the same as now. Test Plan: 1) convert most of existing unit tests for universal comapaction to include the option of one level and multiple levels. 2) add a unit test to cover parallel compaction in universal compaction and run it in one level and multiple levels 3) add unit test to migrate from multiple level setting back to one level setting 4) add a unit test to insert keys to trigger multiple rounds of compactions and verify results. Reviewers: rven, kradhakrishnan, yhchiang, igor Reviewed By: igor Subscribers: meyering, leveldb, MarkCallaghan, dhruba Differential Revision: https://reviews.facebook.net/D34539 --- HISTORY.md | 1 + db/column_family.cc | 10 +- db/column_family_test.cc | 1 + db/compaction_picker.cc | 411 +++++++++++++++++++++---------- db/compaction_picker.h | 48 +++- db/db_impl.cc | 12 +- db/db_test.cc | 491 ++++++++++++++++++++++++++++++-------- db/version_set.cc | 63 +++-- db/version_set.h | 12 +- util/mutable_cf_options.h | 7 + util/options.cc | 7 +- 11 files changed, 804 insertions(+), 259 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 403f06ea9..bf000606b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,7 @@ ### New Features * Added an experimental API for handling flashcache devices (blacklists background threads from caching their reads) -- NewFlashcacheAwareEnv +* If universal compaction is used and options.num_levels > 1, compact files are tried to be stored in none-L0 with smaller files based on options.target_file_size_base. The limitation of DB size when using universal compaction is greatly mitigated by using more levels. You can set num_levels = 1 to make universal compaction behave as before. If you set num_levels > 1 and want to roll back to a previous version, you need to compact all files to a big file in level 0 (by setting target_file_size_base to be large and CompactRange(, nullptr, nullptr, true, 0) and reopen the DB with the same version to rewrite the manifest, and then you can open it using previous releases. ## 3.10.0 (3/24/2015) ### New Features diff --git a/db/column_family.cc b/db/column_family.cc index 627dedcd4..2921ad87a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -426,18 +426,18 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "(waiting for flush), max_write_buffer_number is set to %d", name_.c_str(), imm()->size(), mutable_cf_options.max_write_buffer_number); - } else if (vstorage->NumLevelFiles(0) >= + } else if (vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1); Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stopping writes because we have %d level-0 files", - name_.c_str(), vstorage->NumLevelFiles(0)); + name_.c_str(), vstorage->l0_delay_trigger_count()); } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && - vstorage->NumLevelFiles(0) >= + vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_slowdown_writes_trigger) { uint64_t slowdown = - SlowdownAmount(vstorage->NumLevelFiles(0), + SlowdownAmount(vstorage->l0_delay_trigger_count(), mutable_cf_options.level0_slowdown_writes_trigger, mutable_cf_options.level0_stop_writes_trigger); write_controller_token_ = write_controller->GetDelayToken(slowdown); @@ -445,7 +445,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stalling writes because we have %d level-0 files (%" PRIu64 "us)", - name_.c_str(), vstorage->NumLevelFiles(0), slowdown); + name_.c_str(), vstorage->l0_delay_trigger_count(), slowdown); } else if (mutable_cf_options.hard_rate_limit > 1.0 && score > mutable_cf_options.hard_rate_limit) { uint64_t kHardLimitSlowdown = 1000; diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 1bbd2c2d8..c48180367 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -807,6 +807,7 @@ TEST_F(ColumnFamilyTest, DifferentCompactionStyles) { default_cf.table_factory.reset(NewBlockBasedTableFactory(table_options)); one.compaction_style = kCompactionStyleUniversal; + one.num_levels = 1; // trigger compaction if there are >= 4 files one.level0_file_num_compaction_trigger = 4; one.write_buffer_size = 100000; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index a40a0a52f..ecbe89fad 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -902,6 +902,101 @@ bool UniversalCompactionPicker::NeedsCompaction( return vstorage->CompactionScore(kLevel0) >= 1; } +void UniversalCompactionPicker::SortedRun::Dump(char* out_buf, + size_t out_buf_size, + bool print_path) const { + if (level == 0) { + assert(file != nullptr); + if (file->fd.GetPathId() == 0 || !print_path) { + snprintf(out_buf, out_buf_size, "file %" PRIu64, file->fd.GetNumber()); + } else { + snprintf(out_buf, out_buf_size, "file %" PRIu64 + "(path " + "%" PRIu32 ")", + file->fd.GetNumber(), file->fd.GetPathId()); + } + } else { + snprintf(out_buf, out_buf_size, "level %d", level); + } +} + +void UniversalCompactionPicker::SortedRun::DumpSizeInfo( + char* out_buf, size_t out_buf_size, int sorted_run_count) const { + if (level == 0) { + assert(file != nullptr); + snprintf(out_buf, out_buf_size, + "file %" PRIu64 + "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + file->fd.GetNumber(), sorted_run_count, file->fd.GetFileSize(), + file->compensated_file_size); + } else { + snprintf(out_buf, out_buf_size, + "level %d[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + level, sorted_run_count, size, compensated_file_size); + } +} + +std::vector +UniversalCompactionPicker::CalculateSortedRuns( + const VersionStorageInfo& vstorage) { + std::vector ret; + for (FileMetaData* f : vstorage.LevelFiles(0)) { + ret.emplace_back(0, f, f->fd.GetFileSize(), f->compensated_file_size, + f->being_compacted); + } + for (int level = 1; level < vstorage.num_levels(); level++) { + uint64_t total_compensated_size = 0U; + uint64_t total_size = 0U; + bool being_compacted = false; + bool is_first = true; + for (FileMetaData* f : vstorage.LevelFiles(level)) { + total_compensated_size += f->compensated_file_size; + total_size += f->fd.GetFileSize(); + // Compaction always includes all files for a non-zero level, so for a + // non-zero level, all the files should share the same being_compacted + // value. + assert(is_first || f->being_compacted == being_compacted); + if (is_first) { + being_compacted = f->being_compacted; + is_first = false; + } + } + if (total_compensated_size > 0) { + ret.emplace_back(level, nullptr, total_size, total_compensated_size, + being_compacted); + } + } + return ret; +} + +#ifndef NDEBUG +namespace { +// smallest_seqno and largest_seqno are set iff. `files` is not empty. +void GetSmallestLargestSeqno(const std::vector& files, + SequenceNumber* smallest_seqno, + SequenceNumber* largest_seqno) { + bool is_first = true; + for (FileMetaData* f : files) { + assert(f->smallest_seqno <= f->largest_seqno); + if (is_first) { + is_first = false; + *smallest_seqno = f->smallest_seqno; + *largest_seqno = f->largest_seqno; + } else { + if (f->smallest_seqno < *smallest_seqno) { + *smallest_seqno = f->smallest_seqno; + } + if (f->largest_seqno > *largest_seqno) { + *largest_seqno = f->largest_seqno; + } + } + } +} +} // namespace +#endif + // Universal style of compaction. Pick files that are contiguous in // time-range to compact. // @@ -910,22 +1005,23 @@ Compaction* UniversalCompactionPicker::PickCompaction( VersionStorageInfo* vstorage, LogBuffer* log_buffer) { const int kLevel0 = 0; double score = vstorage->CompactionScore(kLevel0); - const std::vector& level_files = vstorage->LevelFiles(kLevel0); + std::vector sorted_runs = CalculateSortedRuns(*vstorage); - if ((level_files.size() < - (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) { + if (sorted_runs.size() < + (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger) { LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", cf_name.c_str()); return nullptr; } - VersionStorageInfo::FileSummaryStorage tmp; - LogToBuffer(log_buffer, 3072, "[%s] Universal: candidate files(%zu): %s\n", - cf_name.c_str(), level_files.size(), - vstorage->LevelFileSummary(&tmp, kLevel0)); + VersionStorageInfo::LevelSummaryStorage tmp; + LogToBuffer(log_buffer, 3072, "[%s] Universal: sorted runs files(%zu): %s\n", + cf_name.c_str(), sorted_runs.size(), + vstorage->LevelSummary(&tmp)); // Check for size amplification first. Compaction* c; if ((c = PickCompactionUniversalSizeAmp(cf_name, mutable_cf_options, vstorage, - score, log_buffer)) != nullptr) { + score, sorted_runs, log_buffer)) != + nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n", cf_name.c_str()); } else { @@ -933,9 +1029,9 @@ Compaction* UniversalCompactionPicker::PickCompaction( // amplification while maintaining file size ratios. unsigned int ratio = ioptions_.compaction_options_universal.size_ratio; - if ((c = PickCompactionUniversalReadAmp(cf_name, mutable_cf_options, - vstorage, score, ratio, UINT_MAX, - log_buffer)) != nullptr) { + if ((c = PickCompactionUniversalReadAmp( + cf_name, mutable_cf_options, vstorage, score, ratio, UINT_MAX, + sorted_runs, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n", cf_name.c_str()); } else { @@ -944,11 +1040,11 @@ Compaction* UniversalCompactionPicker::PickCompaction( // compaction without looking at filesize ratios and try to reduce // the number of files to fewer than level0_file_num_compaction_trigger. unsigned int num_files = - static_cast(level_files.size()) - + static_cast(sorted_runs.size()) - mutable_cf_options.level0_file_num_compaction_trigger; if ((c = PickCompactionUniversalReadAmp( cf_name, mutable_cf_options, vstorage, score, UINT_MAX, - num_files, log_buffer)) != nullptr) { + num_files, sorted_runs, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for file num -- %u\n", cf_name.c_str(), num_files); @@ -958,21 +1054,55 @@ Compaction* UniversalCompactionPicker::PickCompaction( if (c == nullptr) { return nullptr; } - assert(c->inputs_[kLevel0].size() > 1); - // validate that all the chosen files are non overlapping in time - FileMetaData* newerfile __attribute__((unused)) = nullptr; - for (unsigned int i = 0; i < c->inputs_[kLevel0].size(); i++) { - FileMetaData* f = c->inputs_[kLevel0][i]; - assert (f->smallest_seqno <= f->largest_seqno); - assert(newerfile == nullptr || - newerfile->smallest_seqno > f->largest_seqno); - newerfile = f; - } +// validate that all the chosen files of L0 are non overlapping in time +#ifndef NDEBUG + SequenceNumber prev_smallest_seqno = 0U; + bool is_first = true; - // Is the earliest file part of this compaction? - FileMetaData* last_file = level_files.back(); - c->bottommost_level_ = c->inputs_[kLevel0].files.back() == last_file; + size_t level_index = 0U; + if (c->start_level() == 0) { + for (unsigned int i = 0; i < c->inputs_[0].size(); i++) { + FileMetaData* f = c->inputs_[0][i]; + assert(f->smallest_seqno <= f->largest_seqno); + if (is_first) { + is_first = false; + } else { + assert(prev_smallest_seqno > f->largest_seqno); + } + prev_smallest_seqno = f->smallest_seqno; + } + level_index = 1U; + } + for (; level_index < c->num_input_levels(); level_index++) { + if (c->num_input_files(level_index) != 0) { + SequenceNumber smallest_seqno = 0U; + SequenceNumber largest_seqno = 0U; + GetSmallestLargestSeqno(*(c->inputs(level_index)), &smallest_seqno, + &largest_seqno); + if (is_first) { + is_first = false; + } else { + assert(prev_smallest_seqno > largest_seqno); + } + prev_smallest_seqno = smallest_seqno; + } + } +#endif + auto& last_sr = sorted_runs.back(); + if (c->output_level() < last_sr.level) { + // If it doesn't compact to the last level that has file, it's not the last + // bottom most. + c->bottommost_level_ = false; + } else if (last_sr.level == 0) { + // All files are level 0. Then the compaction is bottom most iff it includes + // the last file. + c->bottommost_level_ = c->inputs_[kLevel0].files.back() == last_sr.file; + } else { + // In this case, it's not level 0 only and the compaction includes the last + // level having files. It is bottom most. + c->bottommost_level_ = true; + } // update statistics MeasureTime(ioptions_.statistics, @@ -985,7 +1115,15 @@ Compaction* UniversalCompactionPicker::PickCompaction( // Record whether this compaction includes all sst files. // For now, it is only relevant in universal compaction mode. - c->is_full_compaction_ = (c->inputs_[kLevel0].size() == level_files.size()); + int num_files_in_compaction = 0; + int total_num_files = 0; + for (int level = 0; level < vstorage->num_levels(); level++) { + total_num_files += vstorage->NumLevelFiles(level); + } + for (size_t i = 0; i < c->num_input_levels(); i++) { + num_files_in_compaction += c->num_input_files(i); + } + c->is_full_compaction_ = (num_files_in_compaction == total_num_files); c->mutable_cf_options_ = mutable_cf_options; return c; @@ -1030,18 +1168,14 @@ uint32_t UniversalCompactionPicker::GetPathId( Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, double score, unsigned int ratio, - unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) { - const int kLevel0 = 0; - + unsigned int max_number_of_files_to_compact, + const std::vector& sorted_runs, LogBuffer* log_buffer) { unsigned int min_merge_width = ioptions_.compaction_options_universal.min_merge_width; unsigned int max_merge_width = ioptions_.compaction_options_universal.max_merge_width; - // The files are sorted from newest first to oldest last. - const auto& files = vstorage->LevelFiles(kLevel0); - - FileMetaData* f = nullptr; + const SortedRun* sr = nullptr; bool done = false; int start_index = 0; unsigned int candidate_count = 0; @@ -1052,40 +1186,43 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // Considers a candidate file only if it is smaller than the // total size accumulated so far. - for (unsigned int loop = 0; loop < files.size(); loop++) { - + for (unsigned int loop = 0; loop < sorted_runs.size(); loop++) { candidate_count = 0; // Skip files that are already being compacted - for (f = nullptr; loop < files.size(); loop++) { - f = files[loop]; + for (sr = nullptr; loop < sorted_runs.size(); loop++) { + sr = &sorted_runs[loop]; - if (!f->being_compacted) { + if (!sr->being_compacted) { candidate_count = 1; break; } - LogToBuffer(log_buffer, "[%s] Universal: file %" PRIu64 - "[%d] being compacted, skipping", - cf_name.c_str(), f->fd.GetNumber(), loop); - f = nullptr; + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf)); + LogToBuffer(log_buffer, + "[%s] Universal: %s" + "[%d] being compacted, skipping", + cf_name.c_str(), file_num_buf, loop); + + sr = nullptr; } // This file is not being compacted. Consider it as the // first candidate to be compacted. - uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0; - if (f != nullptr) { + uint64_t candidate_size = sr != nullptr ? sr->compensated_file_size : 0; + if (sr != nullptr) { char file_num_buf[kFormatFileNumberBufSize]; - FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, - sizeof(file_num_buf)); - LogToBuffer(log_buffer, "[%s] Universal: Possible candidate file %s[%d].", + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + LogToBuffer(log_buffer, "[%s] Universal: Possible candidate %s[%d].", cf_name.c_str(), file_num_buf, loop); } // Check if the suceeding files need compaction. for (unsigned int i = loop + 1; - candidate_count < max_files_to_compact && i < files.size(); i++) { - FileMetaData* suceeding_file = files[i]; - if (suceeding_file->being_compacted) { + candidate_count < max_files_to_compact && i < sorted_runs.size(); + i++) { + const SortedRun* suceeding_sr = &sorted_runs[i]; + if (suceeding_sr->being_compacted) { break; } // Pick files if the total/last candidate file size (increased by the @@ -1095,14 +1232,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // kCompactionStopStyleSimilarSize, it's simply the size of the last // picked file. double sz = candidate_size * (100.0 + ratio) / 100.0; - if (sz < static_cast(suceeding_file->fd.GetFileSize())) { + if (sz < static_cast(suceeding_sr->size)) { break; } if (ioptions_.compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) { // Similar-size stopping rule: also check the last picked file isn't // far larger than the next candidate file. - sz = (suceeding_file->fd.GetFileSize() * (100.0 + ratio)) / 100.0; + sz = (suceeding_sr->size * (100.0 + ratio)) / 100.0; if (sz < static_cast(candidate_size)) { // If the small file we've encountered begins a run of similar-size // files, we'll pick them up on a future iteration of the outer @@ -1110,9 +1247,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // by the last-resort read amp strategy which disregards size ratios. break; } - candidate_size = suceeding_file->compensated_file_size; + candidate_size = suceeding_sr->compensated_file_size; } else { // default kCompactionStopStyleTotalSize - candidate_size += suceeding_file->compensated_file_size; + candidate_size += suceeding_sr->compensated_file_size; } candidate_count++; } @@ -1124,15 +1261,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( break; } else { for (unsigned int i = loop; - i < loop + candidate_count && i < files.size(); i++) { - FileMetaData* skipping_file = files[i]; - LogToBuffer(log_buffer, "[%s] Universal: Skipping file %" PRIu64 - "[%d] with size %" PRIu64 - " (compensated size %" PRIu64 ") %d\n", - cf_name.c_str(), f->fd.GetNumber(), i, - skipping_file->fd.GetFileSize(), - skipping_file->compensated_file_size, - skipping_file->being_compacted); + i < loop + candidate_count && i < sorted_runs.size(); i++) { + const SortedRun* skipping_sr = &sorted_runs[i]; + char file_num_buf[256]; + skipping_sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); + LogToBuffer(log_buffer, "[%s] Universal: Skipping %s", cf_name.c_str(), + file_num_buf); } } } @@ -1146,10 +1280,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int ratio_to_compress = ioptions_.compaction_options_universal.compression_size_percent; if (ratio_to_compress >= 0) { - uint64_t total_size = vstorage->NumLevelBytes(kLevel0); + uint64_t total_size = 0; + for (auto& sorted_run : sorted_runs) { + total_size += sorted_run.compensated_file_size; + } + uint64_t older_file_size = 0; - for (size_t i = files.size() - 1; i >= first_index_after; i--) { - older_file_size += files[i]->fd.GetFileSize(); + for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) { + older_file_size += sorted_runs[i].size; if (older_file_size * 100L >= total_size * (long) ratio_to_compress) { enable_compression = false; break; @@ -1159,28 +1297,40 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( uint64_t estimated_total_size = 0; for (unsigned int i = 0; i < first_index_after; i++) { - estimated_total_size += files[i]->fd.GetFileSize(); + estimated_total_size += sorted_runs[i].size; } uint32_t path_id = GetPathId(ioptions_, estimated_total_size); + int start_level = sorted_runs[start_index].level; + int output_level; + if (first_index_after == sorted_runs.size()) { + output_level = vstorage->num_levels() - 1; + } else if (sorted_runs[first_index_after].level == 0) { + output_level = 0; + } else { + output_level = sorted_runs[first_index_after].level - 1; + } Compaction* c = new Compaction( - vstorage->num_levels(), kLevel0, kLevel0, - mutable_cf_options.MaxFileSizeForLevel(kLevel0), LLONG_MAX, path_id, - GetCompressionType(ioptions_, kLevel0, 1, enable_compression)); + vstorage->num_levels(), start_level, output_level, + mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id, + GetCompressionType(ioptions_, start_level, 1, enable_compression)); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { - FileMetaData* picking_file = files[i]; - c->inputs_[0].files.push_back(picking_file); - char file_num_buf[kFormatFileNumberBufSize]; - FormatFileNumber(picking_file->fd.GetNumber(), picking_file->fd.GetPathId(), - file_num_buf, sizeof(file_num_buf)); - LogToBuffer(log_buffer, - "[%s] Universal: Picking file %s[%d] " - "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", - cf_name.c_str(), file_num_buf, i, - picking_file->fd.GetFileSize(), - picking_file->compensated_file_size); + auto& picking_sr = sorted_runs[i]; + if (picking_sr.level == 0) { + FileMetaData* picking_file = picking_sr.file; + c->inputs_[0].files.push_back(picking_file); + } else { + auto& files = c->inputs_[picking_sr.level - start_level].files; + for (auto* f : vstorage->LevelFiles(picking_sr.level)) { + files.push_back(f); + } + } + char file_num_buf[256]; + picking_sr.DumpSizeInfo(file_num_buf, sizeof(file_num_buf), i); + LogToBuffer(log_buffer, "[%s] Universal: Picking %s", cf_name.c_str(), + file_num_buf); } return c; } @@ -1193,61 +1343,56 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, LogBuffer* log_buffer) { - const int kLevel = 0; - + VersionStorageInfo* vstorage, double score, + const std::vector& sorted_runs, LogBuffer* log_buffer) { // percentage flexibilty while reducing size amplification uint64_t ratio = ioptions_.compaction_options_universal. max_size_amplification_percent; - // The files are sorted from newest first to oldest last. - const auto& files = vstorage->LevelFiles(kLevel); - unsigned int candidate_count = 0; uint64_t candidate_size = 0; unsigned int start_index = 0; - FileMetaData* f = nullptr; + const SortedRun* sr = nullptr; // Skip files that are already being compacted - for (unsigned int loop = 0; loop < files.size() - 1; loop++) { - f = files[loop]; - if (!f->being_compacted) { + for (unsigned int loop = 0; loop < sorted_runs.size() - 1; loop++) { + sr = &sorted_runs[loop]; + if (!sr->being_compacted) { start_index = loop; // Consider this as the first candidate. break; } char file_num_buf[kFormatFileNumberBufSize]; - FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, - sizeof(file_num_buf)); - LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s", + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + LogToBuffer(log_buffer, "[%s] Universal: skipping %s[%d] compacted %s", cf_name.c_str(), file_num_buf, loop, " cannot be a candidate to reduce size amp.\n"); - f = nullptr; + sr = nullptr; } - if (f == nullptr) { + if (sr == nullptr) { return nullptr; // no candidate files } - - char file_num_buf[kFormatFileNumberBufSize]; - FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, - sizeof(file_num_buf)); - LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s", - cf_name.c_str(), file_num_buf, start_index, - " to reduce size amp.\n"); + { + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); + LogToBuffer(log_buffer, "[%s] Universal: First candidate %s[%d] %s", + cf_name.c_str(), file_num_buf, start_index, + " to reduce size amp.\n"); + } // keep adding up all the remaining files - for (unsigned int loop = start_index; loop < files.size() - 1; loop++) { - f = files[loop]; - if (f->being_compacted) { - FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId(), file_num_buf, - sizeof(file_num_buf)); + for (unsigned int loop = start_index; loop < sorted_runs.size() - 1; loop++) { + sr = &sorted_runs[loop]; + if (sr->being_compacted) { + char file_num_buf[kFormatFileNumberBufSize]; + sr->Dump(file_num_buf, sizeof(file_num_buf), true); LogToBuffer( - log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.", - cf_name.c_str(), file_num_buf, loop, + log_buffer, "[%s] Universal: Possible candidate %s[%d] %s", + cf_name.c_str(), file_num_buf, start_index, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } - candidate_size += f->compensated_file_size; + candidate_size += sr->compensated_file_size; candidate_count++; } if (candidate_count == 0) { @@ -1255,7 +1400,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( } // size of earliest file - uint64_t earliest_file_size = files.back()->fd.GetFileSize(); + uint64_t earliest_file_size = sorted_runs.back().size; // size amplification = percentage of additional size if (candidate_size * 100 < ratio * earliest_file_size) { @@ -1272,31 +1417,39 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( "earliest-file-size %" PRIu64, cf_name.c_str(), candidate_size, earliest_file_size); } - assert(start_index < files.size() - 1); + assert(start_index < sorted_runs.size() - 1); // Estimate total file size uint64_t estimated_total_size = 0; - for (unsigned int loop = start_index; loop < files.size(); loop++) { - estimated_total_size += files[loop]->fd.GetFileSize(); + for (unsigned int loop = start_index; loop < sorted_runs.size(); loop++) { + estimated_total_size += sorted_runs[loop].size; } uint32_t path_id = GetPathId(ioptions_, estimated_total_size); + int start_level = sorted_runs[start_index].level; // create a compaction request // We always compact all the files, so always compress. - Compaction* c = - new Compaction(vstorage->num_levels(), kLevel, kLevel, - mutable_cf_options.MaxFileSizeForLevel(kLevel), LLONG_MAX, - path_id, GetCompressionType(ioptions_, kLevel, 1)); + Compaction* c = new Compaction( + vstorage->num_levels(), start_level, vstorage->num_levels() - 1, + mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1), + LLONG_MAX, path_id, + GetCompressionType(ioptions_, vstorage->num_levels() - 1, 1)); c->score_ = score; - for (unsigned int loop = start_index; loop < files.size(); loop++) { - f = files[loop]; - c->inputs_[0].files.push_back(f); - LogToBuffer(log_buffer, - "[%s] Universal: size amp picking file %" PRIu64 - "[%d] " - "with size %" PRIu64 " (compensated size %" PRIu64 ")", - cf_name.c_str(), f->fd.GetNumber(), loop, f->fd.GetFileSize(), - f->compensated_file_size); + for (unsigned int loop = start_index; loop < sorted_runs.size(); loop++) { + auto& picking_sr = sorted_runs[loop]; + if (picking_sr.level == 0) { + FileMetaData* f = picking_sr.file; + c->inputs_[0].files.push_back(f); + } else { + auto& files = c->inputs_[picking_sr.level - start_level].files; + for (auto* f : vstorage->LevelFiles(picking_sr.level)) { + files.push_back(f); + } + } + char file_num_buf[256]; + sr->DumpSizeInfo(file_num_buf, sizeof(file_num_buf), loop); + LogToBuffer(log_buffer, "[%s] Universal: size amp picking %s", + cf_name.c_str(), file_num_buf); } return c; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index 34b5b5bf1..d7f0d69c6 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -210,30 +210,62 @@ class UniversalCompactionPicker : public CompactionPicker { VersionStorageInfo* vstorage, LogBuffer* log_buffer) override; - // The maxinum allowed input level. Always returns 0. virtual int MaxInputLevel(int current_num_levels) const override { - return 0; + return NumberLevels() - 2; } - // The maximum allowed output level. Always returns 0. - virtual int MaxOutputLevel() const override { - return 0; - } + virtual int MaxOutputLevel() const override { return NumberLevels() - 1; } virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const override; private: + struct SortedRun { + SortedRun(int _level, FileMetaData* _file, uint64_t _size, + uint64_t _compensated_file_size, bool _being_compacted) + : level(_level), + file(_file), + size(_size), + compensated_file_size(_compensated_file_size), + being_compacted(_being_compacted) { + assert(compensated_file_size > 0); + assert(level != 0 || file != nullptr); + } + + void Dump(char* out_buf, size_t out_buf_size, + bool print_path = false) const; + + // sorted_run_count is added into the string to print + void DumpSizeInfo(char* out_buf, size_t out_buf_size, + int sorted_run_count) const; + + int level; + // `file` Will be null for level > 0. For level = 0, the sorted run is + // for this file. + FileMetaData* file; + // For level > 0, `size` and `compensated_file_size` are sum of sizes all + // files in the level. `being_compacted` should be the same for all files + // in a non-zero level. Use the value here. + uint64_t size; + uint64_t compensated_file_size; + bool being_compacted; + }; + // Pick Universal compaction to limit read amplification Compaction* PickCompactionUniversalReadAmp( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, double score, unsigned int ratio, - unsigned int num_files, LogBuffer* log_buffer); + unsigned int num_files, const std::vector& sorted_runs, + LogBuffer* log_buffer); // Pick Universal compaction to limit space amplification. Compaction* PickCompactionUniversalSizeAmp( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, double score, LogBuffer* log_buffer); + VersionStorageInfo* vstorage, double score, + const std::vector& sorted_runs, LogBuffer* log_buffer); + + static std::vector CalculateSortedRuns( + const VersionStorageInfo& vstorage); // Pick a path ID to place a newly generated file, with its estimated file // size. diff --git a/db/db_impl.cc b/db/db_impl.cc index 74ab6657b..a37b660c1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1284,6 +1284,7 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family, (level == max_level_with_files && level > 0)) { s = RunManualCompaction(cfd, level, level, target_path_id, begin, end); } else { + // TODO(sdong) Skip empty levels if possible. s = RunManualCompaction(cfd, level, level + 1, target_path_id, begin, end); } @@ -2370,7 +2371,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, // We only compacted part of the requested range. Update *m // to the range that is left to be compacted. // Universal and FIFO compactions should always compact the whole range - assert(m->cfd->ioptions()->compaction_style != kCompactionStyleUniversal); + assert(m->cfd->ioptions()->compaction_style != + kCompactionStyleUniversal || + m->cfd->ioptions()->num_levels > 1); assert(m->cfd->ioptions()->compaction_style != kCompactionStyleFIFO); m->tmp_storage = *manual_end; m->begin = &m->tmp_storage; @@ -3303,6 +3306,7 @@ Status DBImpl::DelayWrite(uint64_t expiration_time) { mutex_.Unlock(); delayed = true; // hopefully we don't have to sleep more than 2 billion microseconds + TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); env_->SleepForMicroseconds(static_cast(delay)); mutex_.Lock(); } @@ -3310,6 +3314,7 @@ Status DBImpl::DelayWrite(uint64_t expiration_time) { while (bg_error_.ok() && write_controller_.IsStopped()) { delayed = true; if (has_timeout) { + TEST_SYNC_POINT("DBImpl::DelayWrite:TimedWait"); bg_cv_.TimedWait(expiration_time); if (env_->NowMicros() > expiration_time) { timed_out = true; @@ -3930,15 +3935,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - if (cfd->ioptions()->compaction_style == kCompactionStyleUniversal || - cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { + if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) { auto* vstorage = cfd->current()->storage_info(); for (int i = 1; i < vstorage->num_levels(); ++i) { int num_files = vstorage->NumLevelFiles(i); if (num_files > 0) { s = Status::InvalidArgument( "Not all files are at level 0. Cannot " - "open with universal or FIFO compaction style."); + "open with FIFO compaction style."); break; } } diff --git a/db/db_test.cc b/db/db_test.cc index 3e3b74c91..b8a067469 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -426,12 +426,13 @@ class DBTest : public testing::Test { kDeletesFilterFirst = 19, kHashSkipList = 20, kUniversalCompaction = 21, - kCompressedBlockCache = 22, - kInfiniteMaxOpenFiles = 23, - kxxHashChecksum = 24, - kFIFOCompaction = 25, - kOptimizeFiltersForHits = 26, - kEnd = 27 + kUniversalCompactionMultiLevel = 22, + kCompressedBlockCache = 23, + kInfiniteMaxOpenFiles = 24, + kxxHashChecksum = 25, + kFIFOCompaction = 26, + kOptimizeFiltersForHits = 27, + kEnd = 28 }; int option_config_; @@ -499,7 +500,8 @@ class DBTest : public testing::Test { continue; } if ((skip_mask & kSkipUniversalCompaction) && - option_config_ == kUniversalCompaction) { + (option_config_ == kUniversalCompaction || + option_config_ == kUniversalCompactionMultiLevel)) { continue; } if ((skip_mask & kSkipMergePut) && option_config_ == kMergePut) { @@ -555,6 +557,13 @@ class DBTest : public testing::Test { options.create_if_missing = true; TryReopen(options); return true; + } else if (option_config_ == kUniversalCompaction) { + option_config_ = kUniversalCompactionMultiLevel; + Destroy(last_options_); + auto options = CurrentOptions(); + options.create_if_missing = true; + TryReopen(options); + return true; } else { return false; } @@ -674,6 +683,11 @@ class DBTest : public testing::Test { break; case kUniversalCompaction: options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + break; + case kUniversalCompactionMultiLevel: + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 8; break; case kCompressedBlockCache: options.allow_mmap_writes = true; @@ -964,6 +978,32 @@ class DBTest : public testing::Test { return result; } + int NumSortedRuns(int cf = 0) { + ColumnFamilyMetaData cf_meta; + if (cf == 0) { + db_->GetColumnFamilyMetaData(&cf_meta); + } else { + db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta); + } + int num_sr = static_cast(cf_meta.levels[0].files.size()); + for (size_t i = 1U; i < cf_meta.levels.size(); i++) { + if (cf_meta.levels[i].files.size() > 0) { + num_sr++; + } + } + return num_sr; + } + + uint64_t TotalSize(int cf = 0) { + ColumnFamilyMetaData cf_meta; + if (cf == 0) { + db_->GetColumnFamilyMetaData(&cf_meta); + } else { + db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta); + } + return cf_meta.size; + } + int NumTableFilesAtLevel(int level, int cf = 0) { std::string property; if (cf == 0) { @@ -990,6 +1030,20 @@ class DBTest : public testing::Test { return sum; } + int TotalLiveFiles(int cf = 0) { + ColumnFamilyMetaData cf_meta; + if (cf == 0) { + db_->GetColumnFamilyMetaData(&cf_meta); + } else { + db_->GetColumnFamilyMetaData(handles_[cf], &cf_meta); + } + int num_files = 0; + for (auto& level : cf_meta.levels) { + num_files += level.files.size(); + } + return num_files; + } + int TotalTableFiles(int cf = 0, int levels = -1) { if (levels == -1) { levels = CurrentOptions().num_levels; @@ -3886,14 +3940,26 @@ class ChangeFilterFactory : public CompactionFilterFactory { virtual const char* Name() const override { return "ChangeFilterFactory"; } }; +class DBTestUniversalCompactionBase + : public DBTest, + public ::testing::WithParamInterface { + public: + virtual void SetUp() override { num_levels_ = GetParam(); } + int num_levels_; +}; + +class DBTestUniversalCompaction : public DBTestUniversalCompactionBase {}; + // TODO(kailiu) The tests on UniversalCompaction has some issues: // 1. A lot of magic numbers ("11" or "12"). // 2. Made assumption on the memtable flush conditions, which may change from // time to time. -TEST_F(DBTest, UniversalCompactionTrigger) { +TEST_P(DBTestUniversalCompaction, UniversalCompactionTrigger) { Options options; options.compaction_style = kCompactionStyleUniversal; - options.write_buffer_size = 100<<10; //100KB + options.num_levels = num_levels_; + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB // trigger compaction if there are >= 4 files options.level0_file_num_compaction_trigger = 4; KeepFilterFactory* filter = new KeepFilterFactory(true); @@ -3901,6 +3967,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) { options.compaction_filter_factory.reset(filter); options = CurrentOptions(options); + DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); Random rnd(301); @@ -3918,7 +3985,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) { key_idx++; } dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1); + ASSERT_EQ(NumSortedRuns(1), num + 1); } // Generate one more file at level-0, which should trigger level-0 @@ -3931,10 +3998,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) { // Suppose each file flushed from mem table has size 1. Now we compact // (level0_file_num_compaction_trigger+1)=4 files and should have a big // file of size 4. - ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); - } + ASSERT_EQ(NumSortedRuns(1), 1); // Stage 2: // Now we have one file at level 0, with size 4. We also have some data in @@ -3953,7 +4017,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) { key_idx++; } dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 3); + ASSERT_EQ(NumSortedRuns(1), num + 3); } // Generate one more file at level-0, which should trigger level-0 @@ -3965,10 +4029,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) { dbfull()->TEST_WaitForCompact(); // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1. // After compaction, we should have 2 files, with size 4, 2.4. - ASSERT_EQ(NumTableFilesAtLevel(0, 1), 2); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); - } + ASSERT_EQ(NumSortedRuns(1), 2); // Stage 3: // Now we have 2 files at level 0, with size 4 and 2.4. Continue @@ -3981,7 +4042,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) { key_idx++; } dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 3); + ASSERT_EQ(NumSortedRuns(1), num + 3); } // Generate one more file at level-0, which should trigger level-0 @@ -3993,10 +4054,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) { dbfull()->TEST_WaitForCompact(); // Before compaction, we have 4 files at level 0, with size 4, 2.4, 1, 1. // After compaction, we should have 3 files, with size 4, 2.4, 2. - ASSERT_EQ(NumTableFilesAtLevel(0, 1), 3); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); - } + ASSERT_EQ(NumSortedRuns(1), 3); // Stage 4: // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a @@ -4007,10 +4065,7 @@ TEST_F(DBTest, UniversalCompactionTrigger) { } dbfull()->TEST_WaitForCompact(); // Level-0 compaction is triggered, but no file will be picked up. - ASSERT_EQ(NumTableFilesAtLevel(0, 1), 4); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); - } + ASSERT_EQ(NumSortedRuns(1), 4); // Stage 5: // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate @@ -4022,18 +4077,18 @@ TEST_F(DBTest, UniversalCompactionTrigger) { } dbfull()->TEST_WaitForCompact(); // All files at level 0 will be compacted into a single one. - ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); - } + ASSERT_EQ(NumSortedRuns(1), 1); } -TEST_F(DBTest, UniversalCompactionSizeAmplification) { +TEST_P(DBTestUniversalCompaction, UniversalCompactionSizeAmplification) { Options options; options.compaction_style = kCompactionStyleUniversal; - options.write_buffer_size = 100<<10; //100KB + options.num_levels = num_levels_; + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB options.level0_file_num_compaction_trigger = 3; options = CurrentOptions(options); + DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); // Trigger compaction if size amplification exceeds 110% @@ -4053,9 +4108,9 @@ TEST_F(DBTest, UniversalCompactionSizeAmplification) { key_idx++; } dbfull()->TEST_WaitForFlushMemTable(handles_[1]); - ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1); + ASSERT_EQ(NumSortedRuns(1), num + 1); } - ASSERT_EQ(NumTableFilesAtLevel(0, 1), 2); + ASSERT_EQ(NumSortedRuns(1), 2); // Flush whatever is remaining in memtable. This is typically // small, which should not trigger size ratio based compaction @@ -4065,17 +4120,123 @@ TEST_F(DBTest, UniversalCompactionSizeAmplification) { dbfull()->TEST_WaitForCompact(); // Verify that size amplification did occur - ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); + ASSERT_EQ(NumSortedRuns(1), 1); } -TEST_F(DBTest, UniversalCompactionOptions) { +class DBTestUniversalCompactionMultiLevels + : public DBTestUniversalCompactionBase {}; + +TEST_P(DBTestUniversalCompactionMultiLevels, UniversalCompactionMultiLevels) { Options options; options.compaction_style = kCompactionStyleUniversal; - options.write_buffer_size = 100<<10; //100KB + options.num_levels = num_levels_; + options.write_buffer_size = 100 << 10; // 100KB + options.level0_file_num_compaction_trigger = 8; + options.max_background_compactions = 3; + options.target_file_size_base = 32 * 1024; + options = CurrentOptions(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Trigger compaction if size amplification exceeds 110% + options.compaction_options_universal.max_size_amplification_percent = 110; + options = CurrentOptions(options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + Random rnd(301); + int num_keys = 100000; + for (int i = 0; i < num_keys * 2; i++) { + ASSERT_OK(Put(1, Key(i % num_keys), Key(i))); + } + + dbfull()->TEST_WaitForCompact(); + + for (int i = num_keys; i < num_keys * 2; i++) { + ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); + } +} + +INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionMultiLevels, + DBTestUniversalCompactionMultiLevels, + ::testing::Values(3, 20)); + +class DBTestUniversalCompactionParallel : public DBTestUniversalCompactionBase { +}; + +TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { + Options options; + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = num_levels_; + options.write_buffer_size = 1 << 10; // 1KB + options.level0_file_num_compaction_trigger = 3; + options.max_background_compactions = 3; + options.max_background_flushes = 3; + options.target_file_size_base = 1 * 1024; + options.compaction_options_universal.max_size_amplification_percent = 110; + options = CurrentOptions(options); + DestroyAndReopen(options); + CreateAndReopenWithCF({"pikachu"}, options); + + // Delay every compaction so multiple compactions will happen. + std::atomic num_compactions_running(0); + std::atomic has_parallel(false); + rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start", + [&]() { + if (num_compactions_running.fetch_add(1) > 0) { + has_parallel.store(true); + return; + } + for (int nwait = 0; nwait < 20000; nwait++) { + if (has_parallel.load() || num_compactions_running.load() > 1) { + has_parallel.store(true); + break; + } + env_->SleepForMicroseconds(1000); + } + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run():End", + [&]() { num_compactions_running.fetch_add(-1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + options = CurrentOptions(options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + Random rnd(301); + int num_keys = 30000; + for (int i = 0; i < num_keys * 2; i++) { + ASSERT_OK(Put(1, Key(i % num_keys), Key(i))); + } + dbfull()->TEST_WaitForCompact(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + ASSERT_EQ(num_compactions_running.load(), 0); + ASSERT_TRUE(has_parallel.load()); + + for (int i = num_keys; i < num_keys * 2; i++) { + ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); + } + + // Reopen and check. + ReopenWithColumnFamilies({"default", "pikachu"}, options); + for (int i = num_keys; i < num_keys * 2; i++) { + ASSERT_EQ(Get(1, Key(i % num_keys)), Key(i)); + } +} + +INSTANTIATE_TEST_CASE_P(DBTestUniversalCompactionParallel, + DBTestUniversalCompactionParallel, + ::testing::Values(1, 10)); + +TEST_P(DBTestUniversalCompaction, UniversalCompactionOptions) { + Options options; + options.compaction_style = kCompactionStyleUniversal; + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB options.level0_file_num_compaction_trigger = 4; - options.num_levels = 1; + options.num_levels = num_levels_; options.compaction_options_universal.compression_size_percent = -1; options = CurrentOptions(options); + DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); Random rnd(301); @@ -4090,27 +4251,26 @@ TEST_F(DBTest, UniversalCompactionOptions) { dbfull()->TEST_WaitForFlushMemTable(handles_[1]); if (num < options.level0_file_num_compaction_trigger - 1) { - ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1); + ASSERT_EQ(NumSortedRuns(1), num + 1); } } dbfull()->TEST_WaitForCompact(); - ASSERT_EQ(NumTableFilesAtLevel(0, 1), 1); - for (int i = 1; i < options.num_levels ; i++) { - ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); - } + ASSERT_EQ(NumSortedRuns(1), 1); } -TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) { +TEST_P(DBTestUniversalCompaction, UniversalCompactionStopStyleSimilarSize) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; - options.write_buffer_size = 100<<10; //100KB + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB // trigger compaction if there are >= 4 files options.level0_file_num_compaction_trigger = 4; options.compaction_options_universal.size_ratio = 10; - options.compaction_options_universal.stop_style = kCompactionStopStyleSimilarSize; - options.num_levels=1; - Reopen(options); + options.compaction_options_universal.stop_style = + kCompactionStopStyleSimilarSize; + options.num_levels = num_levels_; + DestroyAndReopen(options); Random rnd(301); int key_idx = 0; @@ -4118,8 +4278,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) { // Stage 1: // Generate a set of files at level 0, but don't trigger level-0 // compaction. - for (int num = 0; - num < options.level0_file_num_compaction_trigger-1; + for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; num++) { // Write 110KB (11 values, each 10K) for (int i = 0; i < 11; i++) { @@ -4127,7 +4286,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) { key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); - ASSERT_EQ(NumTableFilesAtLevel(0), num + 1); + ASSERT_EQ(NumSortedRuns(), num + 1); } // Generate one more file at level-0, which should trigger level-0 @@ -4140,7 +4299,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) { // Suppose each file flushed from mem table has size 1. Now we compact // (level0_file_num_compaction_trigger+1)=4 files and should have a big // file of size 4. - ASSERT_EQ(NumTableFilesAtLevel(0), 1); + ASSERT_EQ(NumSortedRuns(), 1); // Stage 2: // Now we have one file at level 0, with size 4. We also have some data in @@ -4150,8 +4309,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) { // a level-0 file, with size around 0.4 (according to previously written // data amount). dbfull()->Flush(FlushOptions()); - for (int num = 0; - num < options.level0_file_num_compaction_trigger-3; + for (int num = 0; num < options.level0_file_num_compaction_trigger - 3; num++) { // Write 110KB (11 values, each 10K) for (int i = 0; i < 11; i++) { @@ -4159,7 +4317,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) { key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); - ASSERT_EQ(NumTableFilesAtLevel(0), num + 3); + ASSERT_EQ(NumSortedRuns(), num + 3); } // Generate one more file at level-0, which should trigger level-0 @@ -4171,7 +4329,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) { dbfull()->TEST_WaitForCompact(); // Before compaction, we have 4 files at level 0, with size 4, 0.4, 1, 1. // After compaction, we should have 3 files, with size 4, 0.4, 2. - ASSERT_EQ(NumTableFilesAtLevel(0), 3); + ASSERT_EQ(NumSortedRuns(), 3); // Stage 3: // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one // more file at level-0, which should trigger level-0 compaction. @@ -4181,7 +4339,7 @@ TEST_F(DBTest, UniversalCompactionStopStyleSimilarSize) { } dbfull()->TEST_WaitForCompact(); // Level-0 compaction is triggered, but no file will be picked up. - ASSERT_EQ(NumTableFilesAtLevel(0), 4); + ASSERT_EQ(NumSortedRuns(), 4); } TEST_F(DBTest, CompressedCache) { @@ -4308,18 +4466,20 @@ static std::string CompressibleString(Random* rnd, int len) { return r; } -TEST_F(DBTest, UniversalCompactionCompressRatio1) { +TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio1) { if (!SnappyCompressionSupported()) { return; } + Options options; options.compaction_style = kCompactionStyleUniversal; - options.write_buffer_size = 100<<10; //100KB + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB options.level0_file_num_compaction_trigger = 2; - options.num_levels = 1; + options.num_levels = num_levels_; options.compaction_options_universal.compression_size_percent = 70; options = CurrentOptions(options); - Reopen(options); + DestroyAndReopen(options); Random rnd(301); int key_idx = 0; @@ -4334,7 +4494,7 @@ TEST_F(DBTest, UniversalCompactionCompressRatio1) { dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 2 * 0.9); + ASSERT_LT(TotalSize(), 110000U * 2 * 0.9); // The second compaction (4) is compressed for (int num = 0; num < 2; num++) { @@ -4346,7 +4506,7 @@ TEST_F(DBTest, UniversalCompactionCompressRatio1) { dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 4 * 0.9); + ASSERT_LT(TotalSize(), 110000 * 4 * 0.9); // The third compaction (2 4) is compressed since this time it is // (1 1 3.2) and 3.2/5.2 doesn't reach ratio. @@ -4359,7 +4519,7 @@ TEST_F(DBTest, UniversalCompactionCompressRatio1) { dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 6 * 0.9); + ASSERT_LT(TotalSize(), 110000 * 6 * 0.9); // When we start for the compaction up to (2 4 8), the latest // compressed is not compressed. @@ -4372,22 +4532,22 @@ TEST_F(DBTest, UniversalCompactionCompressRatio1) { dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_GT((int)dbfull()->TEST_GetLevel0TotalSize(), - 110000 * 11 * 0.8 + 110000 * 2); + ASSERT_GT(TotalSize(), 110000 * 11 * 0.8 + 110000 * 2); } -TEST_F(DBTest, UniversalCompactionCompressRatio2) { +TEST_P(DBTestUniversalCompaction, UniversalCompactionCompressRatio2) { if (!SnappyCompressionSupported()) { return; } Options options; options.compaction_style = kCompactionStyleUniversal; - options.write_buffer_size = 100<<10; //100KB + options.write_buffer_size = 100 << 10; // 100KB + options.target_file_size_base = 32 << 10; // 32KB options.level0_file_num_compaction_trigger = 2; - options.num_levels = 1; + options.num_levels = num_levels_; options.compaction_options_universal.compression_size_percent = 95; options = CurrentOptions(options); - Reopen(options); + DestroyAndReopen(options); Random rnd(301); int key_idx = 0; @@ -4403,10 +4563,12 @@ TEST_F(DBTest, UniversalCompactionCompressRatio2) { dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), - 120000 * 12 * 0.8 + 120000 * 2); + ASSERT_LT(TotalSize(), 120000U * 12 * 0.8 + 120000 * 2); } +INSTANTIATE_TEST_CASE_P(UniversalCompactionNumLevels, DBTestUniversalCompaction, + ::testing::Values(1, 3, 5)); + TEST_F(DBTest, FailMoreDbPaths) { Options options = CurrentOptions(); options.db_paths.emplace_back(dbname_, 10000000); @@ -4908,6 +5070,7 @@ TEST_F(DBTest, ConvertCompactionStyle) { // Stage 2: reopen with universal compaction - should fail options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; options = CurrentOptions(options); Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options); ASSERT_TRUE(s.IsInvalidArgument()); @@ -4919,6 +5082,7 @@ TEST_F(DBTest, ConvertCompactionStyle) { options.target_file_size_multiplier = 1; options.max_bytes_for_level_base = INT_MAX; options.max_bytes_for_level_multiplier = 1; + options.num_levels = 4; options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, options); @@ -4937,11 +5101,15 @@ TEST_F(DBTest, ConvertCompactionStyle) { // Stage 4: re-open in universal compaction style and do some db operations options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 4; options.write_buffer_size = 100<<10; //100KB options.level0_file_num_compaction_trigger = 3; options = CurrentOptions(options); ReopenWithColumnFamilies({"default", "pikachu"}, options); + options.num_levels = 1; + ReopenWithColumnFamilies({"default", "pikachu"}, options); + for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) { ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); } @@ -4972,6 +5140,99 @@ TEST_F(DBTest, ConvertCompactionStyle) { ASSERT_EQ(keys_in_db, expected_keys); } +TEST_F(DBTest, IncreaseUniversalCompactionNumLevels) { + std::function verify_func = [&](int num_keys_in_db) { + std::string keys_in_db; + Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + keys_in_db.append(iter->key().ToString()); + keys_in_db.push_back(','); + } + delete iter; + + std::string expected_keys; + for (int i = 0; i <= num_keys_in_db; i++) { + expected_keys.append(Key(i)); + expected_keys.push_back(','); + } + + ASSERT_EQ(keys_in_db, expected_keys); + }; + + Random rnd(301); + int max_key1 = 200; + int max_key2 = 600; + int max_key3 = 800; + + // Stage 1: open a DB with universal compaction, num_levels=1 + Options options; + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + options.write_buffer_size = 100 << 10; // 100KB + options.level0_file_num_compaction_trigger = 3; + options = CurrentOptions(options); + CreateAndReopenWithCF({"pikachu"}, options); + + for (int i = 0; i <= max_key1; i++) { + // each value is 10K + ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); + } + ASSERT_OK(Flush(1)); + dbfull()->TEST_WaitForCompact(); + + int non_level0_num_files = 0; + for (int i = 1; i < options.num_levels; i++) { + non_level0_num_files += NumTableFilesAtLevel(i, 1); + } + ASSERT_EQ(non_level0_num_files, 0); + + // Stage 2: reopen with universal compaction, num_levels=4 + options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 4; + options = CurrentOptions(options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + verify_func(max_key1); + + // Insert more keys + for (int i = max_key1 + 1; i <= max_key2; i++) { + // each value is 10K + ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); + } + ASSERT_OK(Flush(1)); + dbfull()->TEST_WaitForCompact(); + + verify_func(max_key2); + // Compaction to non-L0 has happened. + ASSERT_GT(NumTableFilesAtLevel(options.num_levels - 1, 1), 0); + + // Stage 3: Revert it back to one level and revert to num_levels=1. + options.num_levels = 4; + options.target_file_size_base = INT_MAX; + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // Compact all to level 0 + dbfull()->CompactRange(handles_[1], nullptr, nullptr, true /* reduce level */, + 0 /* reduce to level 0 */); + // Need to restart it once to remove higher level records in manifest. + ReopenWithColumnFamilies({"default", "pikachu"}, options); + // Final reopen + options = CurrentOptions(); + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + options = CurrentOptions(options); + ReopenWithColumnFamilies({"default", "pikachu"}, options); + + // Insert more keys + for (int i = max_key2 + 1; i <= max_key3; i++) { + // each value is 10K + ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); + } + ASSERT_OK(Flush(1)); + dbfull()->TEST_WaitForCompact(); + verify_func(max_key3); +} + namespace { void MinLevelHelper(DBTest* self, Options& options) { Random rnd(301); @@ -5596,7 +5857,7 @@ TEST_F(DBTest, CompactionFilterContextManual) { // set this flag. dbfull()->CompactRange(nullptr, nullptr); ASSERT_EQ(cfilter_count, 700); - ASSERT_EQ(NumTableFilesAtLevel(0), 1); + ASSERT_EQ(NumSortedRuns(0), 1); // Verify total number of keys is correct after manual compaction. { @@ -5770,7 +6031,7 @@ TEST_F(DBTest, CompactionFilterV2) { dbfull()->TEST_CompactRange(0, nullptr, nullptr); dbfull()->TEST_CompactRange(1, nullptr, nullptr); - ASSERT_EQ(NumTableFilesAtLevel(0), 1); + ASSERT_EQ(NumSortedRuns(0), 1); // All the files are in the lowest level. int count = 0; @@ -6568,46 +6829,52 @@ TEST_F(DBTest, ManualCompaction) { } -TEST_F(DBTest, ManualCompactionOutputPathId) { +class DBTestUniversalManualCompactionOutputPathId + : public DBTestUniversalCompactionBase {}; + +TEST_P(DBTestUniversalManualCompactionOutputPathId, + ManualCompactionOutputPathId) { Options options = CurrentOptions(); options.create_if_missing = true; options.db_paths.emplace_back(dbname_, 1000000000); options.db_paths.emplace_back(dbname_ + "_2", 1000000000); options.compaction_style = kCompactionStyleUniversal; + options.num_levels = num_levels_; + options.target_file_size_base = 1 << 30; // Big size options.level0_file_num_compaction_trigger = 10; Destroy(options); DestroyAndReopen(options); CreateAndReopenWithCF({"pikachu"}, options); MakeTables(3, "p", "q", 1); dbfull()->TEST_WaitForCompact(); - ASSERT_EQ("3", FilesPerLevel(1)); + ASSERT_EQ(3, TotalLiveFiles(1)); ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); // Full compaction to DB path 0 db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 1); - ASSERT_EQ("1", FilesPerLevel(1)); + ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); - ASSERT_EQ("1", FilesPerLevel(1)); + ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); MakeTables(1, "p", "q", 1); - ASSERT_EQ("2", FilesPerLevel(1)); + ASSERT_EQ(2, TotalLiveFiles(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); ReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"}, options); - ASSERT_EQ("2", FilesPerLevel(1)); + ASSERT_EQ(2, TotalLiveFiles(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); // Full compaction to DB path 0 db_->CompactRange(handles_[1], nullptr, nullptr, false, -1, 0); - ASSERT_EQ("1", FilesPerLevel(1)); + ASSERT_EQ(1, TotalLiveFiles(1)); ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); ASSERT_EQ(0, GetSstFileCount(options.db_paths[1].path)); @@ -6616,6 +6883,10 @@ TEST_F(DBTest, ManualCompactionOutputPathId) { .IsInvalidArgument()); } +INSTANTIATE_TEST_CASE_P(DBTestUniversalManualCompactionOutputPathId, + DBTestUniversalManualCompactionOutputPathId, + ::testing::Values(1, 8)); + TEST_F(DBTest, ManualLevelCompactionOutputPathId) { Options options = CurrentOptions(); options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760); @@ -6796,7 +7067,10 @@ TEST_F(DBTest, DropWrites) { env_->drop_writes_.store(true, std::memory_order_release); env_->sleep_counter_.Reset(); for (int i = 0; i < 5; i++) { - for (int level = 0; level < dbfull()->NumberLevels() - 1; level++) { + for (int level = 0; level < dbfull()->NumberLevels(); level++) { + if (level > 0 && level == dbfull()->NumberLevels() - 1) { + break; + } dbfull()->TEST_CompactRange(level, nullptr, nullptr); } } @@ -9693,6 +9967,7 @@ TEST_F(DBTest, CompactFilesOnUniversalCompaction) { options.create_if_missing = true; options.write_buffer_size = kEntrySize * kEntriesPerBuffer; options.compaction_style = kCompactionStyleLevel; + options.num_levels = 1; options.target_file_size_base = options.write_buffer_size; options.compression = kNoCompression; options = CurrentOptions(options); @@ -10063,11 +10338,19 @@ TEST_F(DBTest, DynamicMemtableOptions) { int count = 0; Random rnd(301); WriteOptions wo; - wo.timeout_hint_us = 1000; + wo.timeout_hint_us = 100000; // Reasonabley long timeout to make sure sleep + // triggers but not forever. + + std::atomic sleep_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:TimedWait", + [&]() { sleep_count.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) { count++; } + ASSERT_GT(sleep_count.load(), 0); ASSERT_GT(static_cast(count), 128 * 0.8); ASSERT_LT(static_cast(count), 128 * 1.2); @@ -10085,9 +10368,11 @@ TEST_F(DBTest, DynamicMemtableOptions) { env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low2, Env::Priority::LOW); count = 0; + sleep_count.store(0); while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { count++; } + ASSERT_GT(sleep_count.load(), 0); ASSERT_GT(static_cast(count), 512 * 0.8); ASSERT_LT(static_cast(count), 512 * 1.2); sleeping_task_low2.WakeUp(); @@ -10103,14 +10388,19 @@ TEST_F(DBTest, DynamicMemtableOptions) { SleepingBackgroundTask sleeping_task_low3; env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low3, Env::Priority::LOW); + count = 0; + sleep_count.store(0); while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 1024) { count++; } + ASSERT_GT(sleep_count.load(), 0); ASSERT_GT(static_cast(count), 256 * 0.8); ASSERT_LT(static_cast(count), 266 * 1.2); sleeping_task_low3.WakeUp(); sleeping_task_low3.WaitUntilDone(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } #if ROCKSDB_USING_THREAD_STATUS @@ -11173,17 +11463,29 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024))); dbfull()->TEST_FlushMemTable(true); + std::atomic sleep_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:Sleep", + [&]() { sleep_count.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // Hard rate limit slow down for 1000 us, so default 10ms should be ok - ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok()); - wo.timeout_hint_us = 500; - ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).IsTimedOut()); + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); + sleep_count.store(0); + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); + ASSERT_GT(sleep_count.load(), 0); // Lift the limit and no timeout ASSERT_OK(dbfull()->SetOptions({ {"hard_rate_limit", "100"} })); dbfull()->TEST_FlushMemTable(true); - ASSERT_TRUE(Put(Key(count), RandomString(&rnd, 1024), wo).ok()); + sleep_count.store(0); + ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); + // Technically, time out is still possible for timing issue. + ASSERT_EQ(sleep_count.load(), 0); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + // Test max_mem_compaction_level. // Destory DB and start from scratch @@ -11195,8 +11497,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { ASSERT_EQ(NumTableFilesAtLevel(1), 0); ASSERT_EQ(NumTableFilesAtLevel(2), 0); - ASSERT_TRUE(Put("max_mem_compaction_level_key", - RandomString(&rnd, 8)).ok()); + ASSERT_OK(Put("max_mem_compaction_level_key", RandomString(&rnd, 8))); dbfull()->TEST_FlushMemTable(true); ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(1), 0); @@ -11886,8 +12187,8 @@ TEST_F(DBTest, MergeTestTime) { std::string result; db_->Get(opt, "foo", &result); - ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 2100000); - ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 1900000); + ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 2800000); + ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 1200000); ReadOptions read_options; std::unique_ptr iter(db_->NewIterator(read_options)); @@ -11899,8 +12200,8 @@ TEST_F(DBTest, MergeTestTime) { ASSERT_EQ(1, count); - ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 4200000); - ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 3800000); + ASSERT_LT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 6000000); + ASSERT_GT(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 3200000); } TEST_F(DBTest, MergeCompactionTimeTest) { diff --git a/db/version_set.cc b/db/version_set.cc index 8d67bf246..7c30a213f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -726,11 +726,12 @@ VersionStorageInfo::VersionStorageInfo( file_indexer_(user_comparator), compaction_style_(compaction_style), files_(new std::vector[num_levels_]), - base_level_(1), + base_level_(num_levels_ == 1 ? -1 : 1), files_by_size_(num_levels_), next_file_to_compact_by_size_(num_levels_), compaction_score_(num_levels_), compaction_level_(num_levels_), + l0_delay_trigger_count_(0), accumulated_file_size_(0), accumulated_raw_key_size_(0), accumulated_raw_value_size_(0), @@ -991,25 +992,37 @@ void VersionStorageInfo::ComputeCompactionScore( // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). - int numfiles = 0; + int num_sorted_runs = 0; uint64_t total_size = 0; for (unsigned int i = 0; i < files_[level].size(); i++) { if (!files_[level][i]->being_compacted) { total_size += files_[level][i]->compensated_file_size; - numfiles++; + num_sorted_runs++; } } + if (compaction_style_ == kCompactionStyleUniversal) { + // For universal compaction, we use level0 score to indicate + // compaction score for the whole DB. Adding other levels as if + // they are L0 files. + for (int i = 1; i < num_levels(); i++) { + if (!files_[i].empty() && !files_[i][0]->being_compacted) { + num_sorted_runs++; + } + } + } + if (compaction_style_ == kCompactionStyleFIFO) { score = static_cast(total_size) / compaction_options_fifo.max_table_files_size; - } else if (numfiles >= mutable_cf_options.level0_stop_writes_trigger) { + } else if (num_sorted_runs >= + mutable_cf_options.level0_stop_writes_trigger) { // If we are slowing down writes, then we better compact that first score = 1000000; - } else if (numfiles >= - mutable_cf_options.level0_slowdown_writes_trigger) { + } else if (num_sorted_runs >= + mutable_cf_options.level0_slowdown_writes_trigger) { score = 10000; } else { - score = static_cast(numfiles) / + score = static_cast(num_sorted_runs) / mutable_cf_options.level0_file_num_compaction_trigger; } } else { @@ -1090,7 +1103,12 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f) { void VersionStorageInfo::SetFinalized() { finalized_ = true; #ifndef NDEBUG - assert(base_level_ >= 1 && (num_levels() <= 1 || base_level_ < num_levels())); + if (compaction_style_ != kCompactionStyleLevel) { + // Not level based compaction. + return; + } + assert(base_level_ < 0 || num_levels() == 1 || + (base_level_ >= 1 && base_level_ < num_levels())); // Verify all levels newer than base_level are empty except L0 for (int level = 1; level < base_level(); level++) { assert(NumLevelBytes(level) == 0); @@ -1442,14 +1460,14 @@ uint64_t VersionStorageInfo::NumLevelBytes(int level) const { const char* VersionStorageInfo::LevelSummary( LevelSummaryStorage* scratch) const { int len = 0; - if (num_levels() > 1) { + if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) { assert(base_level_ < static_cast(level_max_bytes_.size())); len = snprintf(scratch->buffer, sizeof(scratch->buffer), - "base level %d max bytes base %" PRIu64, base_level_, + "base level %d max bytes base %" PRIu64 " ", base_level_, level_max_bytes_[base_level_]); } len += - snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " files["); + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files["); for (int i = 0; i < num_levels(); i++) { int sz = sizeof(scratch->buffer) - len; int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size())); @@ -1512,9 +1530,24 @@ uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const { void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions, const MutableCFOptions& options) { + // Special logic to set number of sorted runs. + // It is to match the previous behavior when all files are in L0. + int num_l0_count = static_cast(files_[0].size()); + if (compaction_style_ == kCompactionStyleUniversal) { + // For universal compaction, we use level0 score to indicate + // compaction score for the whole DB. Adding other levels as if + // they are L0 files. + for (int i = 1; i < num_levels(); i++) { + if (!files_[i].empty()) { + num_l0_count++; + } + } + } + set_l0_delay_trigger_count(num_l0_count); + level_max_bytes_.resize(ioptions.num_levels); if (!ioptions.level_compaction_dynamic_level_bytes) { - base_level_ = 1; + base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1; // Calculate for static bytes base case for (int i = 0; i < ioptions.num_levels; ++i) { @@ -1524,7 +1557,7 @@ void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions, level_max_bytes_[i] = MultiplyCheckOverflow( MultiplyCheckOverflow(level_max_bytes_[i - 1], options.max_bytes_for_level_multiplier), - options.max_bytes_for_level_multiplier_additional[i - 1]); + options.MaxBytesMultiplerAdditional(i - 1)); } else { level_max_bytes_[i] = options.max_bytes_for_level_base; } @@ -2868,7 +2901,9 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { "[%s] compaction output being applied to a different base version from" " input version", c->column_family_data()->GetName().c_str()); - if (c->start_level() == 0 && c->num_input_levels() > 2U) { + + if (vstorage->compaction_style_ == kCompactionStyleLevel && + c->start_level() == 0 && c->num_input_levels() > 2U) { // We are doing a L0->base_level compaction. The assumption is if // base level is not L1, levels from L1 to base_level - 1 is empty. // This is ensured by having one compaction from L0 going on at the diff --git a/db/version_set.h b/db/version_set.h index 92c04dfb5..1a1d67be0 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -189,6 +189,14 @@ class VersionStorageInfo { return num_non_empty_levels_; } + // REQUIRES: This version has been finalized. + // (CalculateBaseBytes() is called) + // This may or may not return number of level files. It is to keep backward + // compatible behavior in universal compaction. + int l0_delay_trigger_count() const { return l0_delay_trigger_count_; } + + void set_l0_delay_trigger_count(int v) { l0_delay_trigger_count_ = v; } + // REQUIRES: This version has been saved (see VersionSet::SaveTo) int NumLevelFiles(int level) const { assert(finalized_); @@ -312,7 +320,7 @@ class VersionStorageInfo { std::vector* files_; // Level that L0 data should be compacted to. All levels < base_level_ should - // be empty. + // be empty. -1 if it is not level-compaction so it's not applicable. int base_level_; // A list for the same set of files that are stored in files_, @@ -341,6 +349,8 @@ class VersionStorageInfo { std::vector compaction_level_; double max_compaction_score_ = 0.0; // max score in l1 to ln-1 int max_compaction_score_level_ = 0; // level on which max score occurs + int l0_delay_trigger_count_ = 0; // Count used to trigger slow down and stop + // for number of L0 files. // the following are the sampled temporary stats. // the current accumulated size of sampled files. diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 6c3b13ed3..593871583 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -84,6 +84,13 @@ struct MutableCFOptions { // file in level->level+1 compaction. uint64_t MaxGrandParentOverlapBytes(int level) const; uint64_t ExpandedCompactionByteSizeLimit(int level) const; + int MaxBytesMultiplerAdditional(int level) const { + if (level >= + static_cast(max_bytes_for_level_multiplier_additional.size())) { + return 1; + } + return max_bytes_for_level_multiplier_additional[level]; + } void Dump(Logger* log) const; diff --git a/util/options.cc b/util/options.cc index 3b47c3696..223c4cbb2 100644 --- a/util/options.cc +++ b/util/options.cc @@ -420,9 +420,10 @@ void ColumnFamilyOptions::Dump(Logger* log) const { level_compaction_dynamic_level_bytes); Log(log," Options.max_bytes_for_level_multiplier: %d", max_bytes_for_level_multiplier); - for (int i = 0; i < num_levels; i++) { - Log(log,"Options.max_bytes_for_level_multiplier_addtl[%d]: %d", - i, max_bytes_for_level_multiplier_additional[i]); + for (size_t i = 0; i < max_bytes_for_level_multiplier_additional.size(); + i++) { + Log(log, "Options.max_bytes_for_level_multiplier_addtl[%zu]: %d", i, + max_bytes_for_level_multiplier_additional[i]); } Log(log," Options.max_sequential_skip_in_iterations: %" PRIu64, max_sequential_skip_in_iterations);