diff --git a/HISTORY.md b/HISTORY.md index 038a242da..ee8400312 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -16,6 +16,7 @@ * Block based table now makes use of prefix bloom filter if it is a full fulter. * Block based table remembers whether a whole key or prefix based bloom filter is supported in SST files. Do a sanity check when reading the file with users' configuration. * Fixed a bug in ReadOnlyBackupEngine that deleted corrupted backups in some cases, even though the engine was ReadOnly +* options.level_compaction_dynamic_level_bytes, a feature to allow RocksDB to pick dynamic base of bytes for levels. With this feature turned on, we will automatically adjust max bytes for each level. The goal of this feature is to have lower bound on size amplification. For more details, see comments in options.h. ### Public API changes * Deprecated skip_log_error_on_recovery option diff --git a/db/column_family.cc b/db/column_family.cc index 0335db9c0..72888136c 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -100,9 +100,9 @@ const Comparator* ColumnFamilyHandleImpl::user_comparator() const { return cfd()->user_comparator(); } -ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, - const ColumnFamilyOptions& src, - Logger* info_log) { +ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, + const InternalKeyComparator* icmp, + const ColumnFamilyOptions& src) { ColumnFamilyOptions result = src; result.comparator = icmp; #ifdef OS_MACOSX @@ -168,7 +168,7 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, result.level0_slowdown_writes_trigger || result.level0_slowdown_writes_trigger < result.level0_file_num_compaction_trigger) { - Warn(info_log, + Warn(db_options.info_log.get(), "This condition must be satisfied: " "level0_stop_writes_trigger(%d) >= " "level0_slowdown_writes_trigger(%d) >= " @@ -185,7 +185,7 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, result.level0_slowdown_writes_trigger) { result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger; } - Warn(info_log, + Warn(db_options.info_log.get(), "Adjust the value to " "level0_stop_writes_trigger(%d)" "level0_slowdown_writes_trigger(%d)" @@ -194,6 +194,16 @@ ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, result.level0_slowdown_writes_trigger, result.level0_file_num_compaction_trigger); } + if (result.level_compaction_dynamic_level_bytes) { + if (result.compaction_style != kCompactionStyleLevel || + db_options.db_paths.size() > 1U) { + // 1. level_compaction_dynamic_level_bytes only makes sense for + // level-based compaction. + // 2. we don't yet know how to make both of this feature and multiple + // DB path work. + result.level_compaction_dynamic_level_bytes = false; + } + } return result; } @@ -269,8 +279,8 @@ ColumnFamilyData::ColumnFamilyData( refs_(0), dropped_(false), internal_comparator_(cf_options.comparator), - options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options, - db_options->info_log.get())), + options_(*db_options, + SanitizeOptions(*db_options, &internal_comparator_, cf_options)), ioptions_(options_), mutable_cf_options_(options_, ioptions_), write_buffer_(write_buffer), diff --git a/db/column_family.h b/db/column_family.h index e4c1803db..ee615e045 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -122,9 +122,9 @@ struct SuperVersion { static void* const kSVObsolete; }; -extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, - const ColumnFamilyOptions& src, - Logger* info_log); +extern ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, + const InternalKeyComparator* icmp, + const ColumnFamilyOptions& src); class ColumnFamilySet; diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 36fcb0080..cb837dfea 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -1020,7 +1020,7 @@ TEST(ColumnFamilyTest, CreateMissingColumnFamilies) { } TEST(ColumnFamilyTest, SanitizeOptions) { - DumbLogger logger; + DBOptions db_options; for (int i = 1; i <= 3; i++) { for (int j = 1; j <= 3; j++) { for (int k = 1; k <= 3; k++) { @@ -1028,7 +1028,8 @@ TEST(ColumnFamilyTest, SanitizeOptions) { original.level0_stop_writes_trigger = i; original.level0_slowdown_writes_trigger = j; original.level0_file_num_compaction_trigger = k; - ColumnFamilyOptions result = SanitizeOptions(NULL, original, &logger); + ColumnFamilyOptions result = + SanitizeOptions(db_options, nullptr, original); ASSERT_TRUE(result.level0_stop_writes_trigger >= result.level0_slowdown_writes_trigger); ASSERT_TRUE(result.level0_slowdown_writes_trigger >= diff --git a/db/compaction.cc b/db/compaction.cc index 56be34ef3..682fb61db 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -39,13 +39,13 @@ void Compaction::SetInputVersion(Version* _input_version) { edit_->SetColumnFamily(cfd_->GetID()); } -Compaction::Compaction(int number_levels, int start_level, int out_level, +Compaction::Compaction(int number_levels, int _start_level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, uint32_t output_path_id, CompressionType output_compression, bool seek_compaction, bool deletion_compaction) - : start_level_(start_level), + : start_level_(_start_level), output_level_(out_level), max_output_file_size_(target_file_size), max_grandparent_overlap_bytes_(max_grandparent_overlap_bytes), @@ -242,6 +242,32 @@ void Compaction::SetupBottomMostLevel(VersionStorageInfo* vstorage, } } +// Sample output: +// If compacting 3 L0 files, 2 L3 files and 1 L4 file, and outputting to L5, +// print: "3@0 + 2@3 + 1@4 files to L5" +const char* Compaction::InputLevelSummary( + InputLevelSummaryBuffer* scratch) const { + int len = 0; + bool is_first = true; + for (auto& input_level : inputs_) { + if (input_level.empty()) { + continue; + } + if (!is_first) { + len += + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " + "); + } else { + is_first = false; + } + len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, + "%zu@%d", input_level.size(), input_level.level); + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, + " files to L%d", output_level()); + + return scratch->buffer; +} + void Compaction::ReleaseCompactionFiles(Status status) { cfd_->compaction_picker()->ReleaseCompactionFiles(this, status); } diff --git a/db/compaction.h b/db/compaction.h index 12905163b..3a95f2416 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -52,6 +52,8 @@ class Compaction { return inputs_[compaction_input_level].level; } + int start_level() const { return start_level_; } + // Outputs will go to this level int output_level() const { return output_level_; } @@ -189,6 +191,12 @@ class Compaction { return &inputs_[l]; } + struct InputLevelSummaryBuffer { + char buffer[128]; + }; + + const char* InputLevelSummary(InputLevelSummaryBuffer* scratch) const; + private: friend class CompactionPicker; friend class UniversalCompactionPicker; diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 36cc46412..bbe576d1d 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -231,15 +231,19 @@ void CompactionJob::Prepare() { compact_->CleanupBatchBuffer(); compact_->CleanupMergedBuffer(); + auto* compaction = compact_->compaction; + // Generate file_levels_ for compaction berfore making Iterator - compact_->compaction->GenerateFileLevels(); + compaction->GenerateFileLevels(); ColumnFamilyData* cfd = compact_->compaction->column_family_data(); assert(cfd != nullptr); - LogToBuffer( - log_buffer_, "[%s] [JOB %d] Compacting %d@%d + %d@%d files, score %.2f", - cfd->GetName().c_str(), job_id_, compact_->compaction->num_input_files(0), - compact_->compaction->level(), compact_->compaction->num_input_files(1), - compact_->compaction->output_level(), compact_->compaction->score()); + { + Compaction::InputLevelSummaryBuffer inputs_summary; + LogToBuffer(log_buffer_, "[%s] [JOB %d] Compacting %s, score %.2f", + cfd->GetName().c_str(), job_id_, + compaction->InputLevelSummary(&inputs_summary), + compaction->score()); + } char scratch[2345]; compact_->compaction->Summary(scratch, sizeof(scratch)); LogToBuffer(log_buffer_, "[%s] Compaction start summary: %s\n", @@ -959,39 +963,40 @@ Status CompactionJob::FinishCompactionOutputFile(Iterator* input) { Status CompactionJob::InstallCompactionResults(InstrumentedMutex* db_mutex) { db_mutex->AssertHeld(); + auto* compaction = compact_->compaction; // paranoia: verify that the files that we started with // still exist in the current version and in the same original level. // This ensures that a concurrent compaction did not erroneously // pick the same files to compact_. - if (!versions_->VerifyCompactionFileConsistency(compact_->compaction)) { + if (!versions_->VerifyCompactionFileConsistency(compaction)) { + Compaction::InputLevelSummaryBuffer inputs_summary; + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "[%s] [JOB %d] Compaction %d@%d + %d@%d files aborted", - compact_->compaction->column_family_data()->GetName().c_str(), job_id_, - compact_->compaction->num_input_files(0), compact_->compaction->level(), - compact_->compaction->num_input_files(1), - compact_->compaction->output_level()); + "[%s] [JOB %d] Compaction %s aborted", + compaction->column_family_data()->GetName().c_str(), job_id_, + compaction->InputLevelSummary(&inputs_summary)); return Status::Corruption("Compaction input files inconsistent"); } - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[%s] [JOB %d] Compacted %d@%d + %d@%d files => %" PRIu64 " bytes", - compact_->compaction->column_family_data()->GetName().c_str(), job_id_, - compact_->compaction->num_input_files(0), compact_->compaction->level(), - compact_->compaction->num_input_files(1), - compact_->compaction->output_level(), compact_->total_bytes); + { + Compaction::InputLevelSummaryBuffer inputs_summary; + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", + compaction->column_family_data()->GetName().c_str(), job_id_, + compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes); + } // Add compaction outputs - compact_->compaction->AddInputDeletions(compact_->compaction->edit()); + compaction->AddInputDeletions(compact_->compaction->edit()); for (size_t i = 0; i < compact_->outputs.size(); i++) { const CompactionState::Output& out = compact_->outputs[i]; - compact_->compaction->edit()->AddFile( - compact_->compaction->output_level(), out.number, out.path_id, - out.file_size, out.smallest, out.largest, out.smallest_seqno, - out.largest_seqno); + compaction->edit()->AddFile( + compaction->output_level(), out.number, out.path_id, out.file_size, + out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); } - return versions_->LogAndApply( - compact_->compaction->column_family_data(), mutable_cf_options_, - compact_->compaction->edit(), db_mutex, db_directory_); + return versions_->LogAndApply(compaction->column_family_data(), + mutable_cf_options_, compaction->edit(), + db_mutex, db_directory_); } // Given a sequence number, return the sequence number of the diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index f74e63436..fe9e2a23c 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -116,7 +116,7 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, assert(c != nullptr); // If inputs are empty then there is nothing to expand. if (c->inputs_[0].empty()) { - assert(c->inputs_[1].empty()); + assert(c->inputs(c->num_input_levels() - 1)->empty()); // This isn't good compaction return false; } @@ -157,10 +157,10 @@ bool CompactionPicker::ExpandWhileOverlapping(const std::string& cf_name, } if (c->inputs_[0].empty() || FilesInCompaction(c->inputs_[0].files) || (c->level() != c->output_level() && - ParentRangeInCompaction(vstorage, &smallest, &largest, level, - &parent_index))) { + RangeInCompaction(vstorage, &smallest, &largest, c->output_level(), + &parent_index))) { c->inputs_[0].clear(); - c->inputs_[1].clear(); + c->inputs_[c->num_input_levels() - 1].clear(); if (!c->inputs_[0].empty()) { Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] ExpandWhileOverlapping() failure because some of the necessary" @@ -267,22 +267,24 @@ Status CompactionPicker::GetCompactionInputsFromFileNumbers( // Returns true if any one of the parent files are being compacted -bool CompactionPicker::ParentRangeInCompaction(VersionStorageInfo* vstorage, - const InternalKey* smallest, - const InternalKey* largest, - int level, int* parent_index) { +bool CompactionPicker::RangeInCompaction(VersionStorageInfo* vstorage, + const InternalKey* smallest, + const InternalKey* largest, int level, + int* level_index) { std::vector inputs; - assert(level + 1 < NumberLevels()); + assert(level < NumberLevels()); - vstorage->GetOverlappingInputs(level + 1, smallest, largest, &inputs, - *parent_index, parent_index); + vstorage->GetOverlappingInputs(level, smallest, largest, &inputs, + *level_index, level_index); return FilesInCompaction(inputs); } -// Populates the set of inputs from "level+1" that overlap with "level". -// Will also attempt to expand "level" if that doesn't expand "level+1" -// or cause "level" to include a file for compaction that has an overlapping -// user-key with another file. +// Populates the set of inputs of all other levels that overlap with the +// start level. +// Now we assume all levels except start level and output level are empty. +// Will also attempt to expand "start level" if that doesn't expand +// "output level" or cause "level" to include a file for compaction that has an +// overlapping user-key with another file. void CompactionPicker::SetupOtherInputs( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, VersionStorageInfo* vstorage, Compaction* c) { @@ -293,32 +295,41 @@ void CompactionPicker::SetupOtherInputs( return; } + // For now, we only support merging two levels, start level and output level. + // We need to assert other levels are empty. + for (int l = c->start_level() + 1; l < c->output_level(); l++) { + assert(vstorage->NumLevelFiles(l) == 0); + } + const int level = c->level(); InternalKey smallest, largest; // Get the range one last time. GetRange(c->inputs_[0].files, &smallest, &largest); - // Populate the set of next-level files (inputs_[1]) to include in compaction - vstorage->GetOverlappingInputs(level + 1, &smallest, &largest, - &c->inputs_[1].files, c->parent_index_, - &c->parent_index_); + // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to + // include in compaction + vstorage->GetOverlappingInputs(c->output_level(), &smallest, &largest, + &c->inputs_[c->num_input_levels() - 1].files, + c->parent_index_, &c->parent_index_); // Get entire range covered by compaction InternalKey all_start, all_limit; - GetRange(c->inputs_[0].files, c->inputs_[1].files, &all_start, &all_limit); + GetRange(c->inputs_[0].files, c->inputs_[c->num_input_levels() - 1].files, + &all_start, &all_limit); // See if we can further grow the number of inputs in "level" without // changing the number of "level+1" files we pick up. We also choose NOT // to expand if this would cause "level" to include some entries for some // user key, while excluding other entries for the same user key. This // can happen when one user key spans multiple files. - if (!c->inputs_[1].empty()) { + if (!c->inputs(c->num_input_levels() - 1)->empty()) { std::vector expanded0; vstorage->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files); - const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1].files); + const uint64_t inputs1_size = + TotalCompensatedFileSize(c->inputs_[c->num_input_levels() - 1].files); const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && @@ -328,32 +339,34 @@ void CompactionPicker::SetupOtherInputs( InternalKey new_start, new_limit; GetRange(expanded0, &new_start, &new_limit); std::vector expanded1; - vstorage->GetOverlappingInputs(level + 1, &new_start, &new_limit, + vstorage->GetOverlappingInputs(c->output_level(), &new_start, &new_limit, &expanded1, c->parent_index_, &c->parent_index_); - if (expanded1.size() == c->inputs_[1].size() && + if (expanded1.size() == c->inputs(c->num_input_levels() - 1)->size() && !FilesInCompaction(expanded1)) { Log(InfoLogLevel::INFO_LEVEL, ioptions_.info_log, "[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64 " bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n", - cf_name.c_str(), level, c->inputs_[0].size(), c->inputs_[1].size(), - inputs0_size, inputs1_size, expanded0.size(), expanded1.size(), - expanded0_size, inputs1_size); + cf_name.c_str(), level, c->inputs_[0].size(), + c->inputs(c->num_input_levels() - 1)->size(), inputs0_size, + inputs1_size, expanded0.size(), expanded1.size(), expanded0_size, + inputs1_size); smallest = new_start; largest = new_limit; c->inputs_[0].files = expanded0; - c->inputs_[1].files = expanded1; - GetRange(c->inputs_[0].files, c->inputs_[1].files, - &all_start, &all_limit); + c->inputs_[c->num_input_levels() - 1].files = expanded1; + GetRange(c->inputs_[0].files, + c->inputs_[c->num_input_levels() - 1].files, &all_start, + &all_limit); } } } // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) - if (level + 2 < NumberLevels()) { - vstorage->GetOverlappingInputs(level + 2, &all_start, &all_limit, - &c->grandparents_); + if (c->output_level() + 1 < NumberLevels()) { + vstorage->GetOverlappingInputs(c->output_level() + 1, &all_start, + &all_limit, &c->grandparents_); } } @@ -682,9 +695,6 @@ Compaction* LevelCompactionPicker::PickCompaction( Compaction* c = nullptr; int level = -1; - // We prefer compactions triggered by too much data in a level over - // the compactions triggered by seeks. - // // Find the compactions by size on all levels. for (int i = 0; i < NumberLevels() - 1; i++) { double score = vstorage->CompactionScore(i); @@ -723,15 +733,15 @@ Compaction* LevelCompactionPicker::PickCompaction( // cause the 'smallest' and 'largest' key to get extended to a // larger range. So, re-invoke GetRange to get the new key range GetRange(c->inputs_[0].files, &smallest, &largest); - if (ParentRangeInCompaction(vstorage, &smallest, &largest, level, - &c->parent_index_)) { + if (RangeInCompaction(vstorage, &smallest, &largest, c->output_level(), + &c->parent_index_)) { delete c; return nullptr; } assert(!c->inputs_[0].empty()); } - // Setup "level+1" files (inputs_[1]) + // Setup input files from output level SetupOtherInputs(cf_name, mutable_cf_options, vstorage, c); // mark all the files that are being compacted @@ -810,12 +820,19 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( } assert(level >= 0); - assert(level + 1 < NumberLevels()); - c = new Compaction(vstorage->num_levels(), level, level + 1, - mutable_cf_options.MaxFileSizeForLevel(level + 1), + int output_level; + if (level == 0) { + output_level = vstorage->base_level(); + } else { + output_level = level + 1; + } + assert(output_level < NumberLevels()); + + c = new Compaction(vstorage->num_levels(), level, output_level, + mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxGrandParentOverlapBytes(level), - GetPathId(ioptions_, mutable_cf_options, level + 1), - GetCompressionType(ioptions_, level + 1)); + GetPathId(ioptions_, mutable_cf_options, output_level), + GetCompressionType(ioptions_, output_level)); c->score_ = score; // Pick the largest file in this level that is not already @@ -850,8 +867,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( // Do not pick this file if its parents at level+1 are being compacted. // Maybe we can avoid redoing this work in SetupOtherInputs int parent_index = -1; - if (ParentRangeInCompaction(vstorage, &f->smallest, &f->largest, level, - &parent_index)) { + if (RangeInCompaction(vstorage, &f->smallest, &f->largest, + c->output_level(), &parent_index)) { continue; } c->inputs_[0].files.push_back(f); diff --git a/db/compaction_picker.h b/db/compaction_picker.h index b1a934651..7a0c6e7bc 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -139,10 +139,9 @@ class CompactionPicker { VersionStorageInfo* vstorage, Compaction* c); // Returns true if any one of the parent files are being compacted - bool ParentRangeInCompaction(VersionStorageInfo* vstorage, - const InternalKey* smallest, - const InternalKey* largest, int level, - int* index); + bool RangeInCompaction(VersionStorageInfo* vstorage, + const InternalKey* smallest, + const InternalKey* largest, int level, int* index); void SetupOtherInputs(const std::string& cf_name, const MutableCFOptions& mutable_cf_options, diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index ca7ba014f..811b970c1 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -59,6 +59,7 @@ class CompactionPickerTest { options_.num_levels = num_levels; vstorage_.reset(new VersionStorageInfo( &icmp_, ucmp_, options_.num_levels, style, nullptr)); + vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_); } void DeleteVersionStorage() { @@ -82,6 +83,7 @@ class CompactionPickerTest { } void UpdateVersionStorageInfo() { + vstorage_->CalculateBaseBytes(ioptions_, mutable_cf_options_); vstorage_->UpdateFilesBySize(); vstorage_->UpdateNumNonEmptyLevels(); vstorage_->GenerateFileIndexer(); @@ -186,9 +188,10 @@ TEST(CompactionPickerTest, LevelMaxScore) { TEST(CompactionPickerTest, NeedsCompactionLevel) { const int kLevels = 6; const int kFileCount = 20; + for (int level = 0; level < kLevels - 1; ++level) { - uint64_t file_size = - mutable_cf_options_.MaxBytesForLevel(level) * 2 / kFileCount; + NewVersionStorage(kLevels, kCompactionStyleLevel); + uint64_t file_size = vstorage_->MaxBytesForLevel(level) * 2 / kFileCount; for (int file_count = 1; file_count <= kFileCount; ++file_count) { // start a brand new version in each test. NewVersionStorage(kLevels, kCompactionStyleLevel); @@ -207,6 +210,137 @@ TEST(CompactionPickerTest, NeedsCompactionLevel) { } } +TEST(CompactionPickerTest, Level0TriggerDynamic) { + int num_levels = ioptions_.num_levels; + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_bytes_for_level_base = 200; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + NewVersionStorage(num_levels, kCompactionStyleLevel); + Add(0, 1U, "150", "200"); + Add(0, 2U, "200", "250"); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(num_levels, static_cast(compaction->num_input_levels())); + ASSERT_EQ(num_levels - 1, compaction->output_level()); +} + +TEST(CompactionPickerTest, Level0TriggerDynamic2) { + int num_levels = ioptions_.num_levels; + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_bytes_for_level_base = 200; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + NewVersionStorage(num_levels, kCompactionStyleLevel); + Add(0, 1U, "150", "200"); + Add(0, 2U, "200", "250"); + Add(num_levels - 1, 3U, "200", "250", 300U); + + UpdateVersionStorageInfo(); + ASSERT_EQ(vstorage_->base_level(), num_levels - 2); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(num_levels - 1, static_cast(compaction->num_input_levels())); + ASSERT_EQ(num_levels - 2, compaction->output_level()); +} + +TEST(CompactionPickerTest, Level0TriggerDynamic3) { + int num_levels = ioptions_.num_levels; + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_bytes_for_level_base = 200; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + NewVersionStorage(num_levels, kCompactionStyleLevel); + Add(0, 1U, "150", "200"); + Add(0, 2U, "200", "250"); + Add(num_levels - 1, 3U, "200", "250", 300U); + Add(num_levels - 1, 4U, "300", "350", 3000U); + + UpdateVersionStorageInfo(); + ASSERT_EQ(vstorage_->base_level(), num_levels - 3); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(num_levels - 2, static_cast(compaction->num_input_levels())); + ASSERT_EQ(num_levels - 3, compaction->output_level()); +} + +TEST(CompactionPickerTest, Level0TriggerDynamic4) { + int num_levels = ioptions_.num_levels; + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_bytes_for_level_base = 200; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + NewVersionStorage(num_levels, kCompactionStyleLevel); + Add(0, 1U, "150", "200"); + Add(0, 2U, "200", "250"); + Add(num_levels - 1, 3U, "200", "250", 300U); + Add(num_levels - 1, 4U, "300", "350", 3000U); + Add(num_levels - 3, 5U, "150", "180", 3U); + Add(num_levels - 3, 6U, "181", "300", 3U); + Add(num_levels - 3, 7U, "400", "450", 3U); + + UpdateVersionStorageInfo(); + ASSERT_EQ(vstorage_->base_level(), num_levels - 3); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(2U, compaction->num_input_files(0)); + ASSERT_EQ(1U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->input(0, 1)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->num_input_files(num_levels - 3)); + ASSERT_EQ(5U, compaction->input(num_levels - 3, 0)->fd.GetNumber()); + ASSERT_EQ(6U, compaction->input(num_levels - 3, 1)->fd.GetNumber()); + ASSERT_EQ(num_levels - 2, static_cast(compaction->num_input_levels())); + ASSERT_EQ(num_levels - 3, compaction->output_level()); +} + +TEST(CompactionPickerTest, LevelTriggerDynamic4) { + int num_levels = ioptions_.num_levels; + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.level0_file_num_compaction_trigger = 2; + mutable_cf_options_.max_bytes_for_level_base = 200; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + NewVersionStorage(num_levels, kCompactionStyleLevel); + Add(0, 1U, "150", "200"); + Add(num_levels - 1, 3U, "200", "250", 300U); + Add(num_levels - 1, 4U, "300", "350", 3000U); + Add(num_levels - 1, 4U, "400", "450", 3U); + Add(num_levels - 2, 5U, "150", "180", 300U); + Add(num_levels - 2, 6U, "181", "350", 500U); + Add(num_levels - 2, 7U, "400", "450", 200U); + + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_files(0)); + ASSERT_EQ(6U, compaction->input(0, 0)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->num_input_files(1)); + ASSERT_EQ(3U, compaction->input(1, 0)->fd.GetNumber()); + ASSERT_EQ(4U, compaction->input(1, 1)->fd.GetNumber()); + ASSERT_EQ(2U, compaction->num_input_levels()); + ASSERT_EQ(num_levels - 1, compaction->output_level()); +} + TEST(CompactionPickerTest, NeedsCompactionUniversal) { NewVersionStorage(1, kCompactionStyleUniversal); UniversalCompactionPicker universal_compaction_picker( diff --git a/db/db_bench.cc b/db/db_bench.cc index 6d6492a68..66f63c20a 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -335,6 +335,9 @@ DEFINE_int32(target_file_size_multiplier, 1, DEFINE_uint64(max_bytes_for_level_base, 10 * 1048576, "Max bytes for level-1"); +DEFINE_bool(level_compaction_dynamic_level_bytes, false, + "Whether level size base is dynamic"); + DEFINE_int32(max_bytes_for_level_multiplier, 10, "A multiplier to compute max bytes for level-N (N >= 2)"); @@ -1933,6 +1936,8 @@ class Benchmark { options.target_file_size_base = FLAGS_target_file_size_base; options.target_file_size_multiplier = FLAGS_target_file_size_multiplier; options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; + options.level_compaction_dynamic_level_bytes = + FLAGS_level_compaction_dynamic_level_bytes; options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; options.filter_deletes = FLAGS_filter_deletes; diff --git a/db/db_impl.cc b/db/db_impl.cc index 31f58b612..75229a67f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -108,8 +108,7 @@ Options SanitizeOptions(const std::string& dbname, const InternalKeyComparator* icmp, const Options& src) { auto db_options = SanitizeOptions(dbname, DBOptions(src)); - auto cf_options = SanitizeOptions(icmp, ColumnFamilyOptions(src), - db_options.info_log.get()); + auto cf_options = SanitizeOptions(db_options, icmp, ColumnFamilyOptions(src)); return Options(db_options, cf_options); } @@ -1514,8 +1513,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, // stop if level i is not empty if (vstorage->NumLevelFiles(i) > 0) break; // stop if level i is too small (cannot fit the level files) - if (mutable_cf_options.MaxBytesForLevel(i) < - vstorage->NumLevelBytes(level)) { + if (vstorage->MaxBytesForLevel(i) < vstorage->NumLevelBytes(level)) { break; } diff --git a/db/db_test.cc b/db/db_test.cc index 884d91e3b..afe893a1e 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2825,9 +2825,8 @@ TEST(DBTest, IgnoreRecoveredLog) { Options options = CurrentOptions(); options.create_if_missing = true; options.merge_operator = MergeOperators::CreateUInt64AddOperator(); - options.wal_dir = dbname_ + "/wal"; - Destroy(options); - Reopen(options); + options.wal_dir = dbname_ + "/logs"; + DestroyAndReopen(options); // fill up the DB std::string one, two; @@ -10184,6 +10183,239 @@ TEST(DBTest, ThreadStatusSingleCompaction) { #endif // ROCKSDB_USING_THREAD_STATUS +TEST(DBTest, DynamicLevelMaxBytesBase) { + // Use InMemoryEnv, or it would be too slow. + unique_ptr env(new MockEnv(env_)); + + const int kNKeys = 1000; + int keys[kNKeys]; + + auto verify_func = [&]() { + for (int i = 0; i < kNKeys; i++) { + ASSERT_NE("NOT_FOUND", Get(Key(i))); + ASSERT_NE("NOT_FOUND", Get(Key(kNKeys * 2 + i))); + if (i < kNKeys / 10) { + ASSERT_EQ("NOT_FOUND", Get(Key(kNKeys + keys[i]))); + } else { + ASSERT_NE("NOT_FOUND", Get(Key(kNKeys + keys[i]))); + } + } + }; + + Random rnd(301); + for (int ordered_insert = 0; ordered_insert <= 1; ordered_insert++) { + for (int i = 0; i < kNKeys; i++) { + keys[i] = i; + } + if (ordered_insert == 0) { + std::random_shuffle(std::begin(keys), std::end(keys)); + } + for (int max_background_compactions = 1; max_background_compactions < 4; + max_background_compactions += 2) { + Options options; + options.env = env.get(); + options.create_if_missing = true; + options.db_write_buffer_size = 2048; + options.write_buffer_size = 2048; + options.max_write_buffer_number = 2; + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 2; + options.level0_stop_writes_trigger = 2; + options.target_file_size_base = 2048; + options.level_compaction_dynamic_level_bytes = true; + options.max_bytes_for_level_base = 10240; + options.max_bytes_for_level_multiplier = 4; + options.hard_rate_limit = 1.1; + options.max_background_compactions = max_background_compactions; + options.num_levels = 5; + + DestroyAndReopen(options); + + for (int i = 0; i < kNKeys; i++) { + int key = keys[i]; + ASSERT_OK(Put(Key(kNKeys + key), RandomString(&rnd, 102))); + ASSERT_OK(Put(Key(key), RandomString(&rnd, 102))); + ASSERT_OK(Put(Key(kNKeys * 2 + key), RandomString(&rnd, 102))); + ASSERT_OK(Delete(Key(kNKeys + keys[i / 10]))); + env_->SleepForMicroseconds(5000); + } + + uint64_t int_prop; + ASSERT_TRUE(db_->GetIntProperty("rocksdb.background-errors", &int_prop)); + ASSERT_EQ(0U, int_prop); + + // Verify DB + for (int j = 0; j < 2; j++) { + verify_func(); + if (j == 0) { + Reopen(options); + } + } + + // Test compact range works + dbfull()->CompactRange(nullptr, nullptr); + // All data should be in the last level. + ColumnFamilyMetaData cf_meta; + db_->GetColumnFamilyMetaData(&cf_meta); + ASSERT_EQ(5U, cf_meta.levels.size()); + for (int i = 0; i < 4; i++) { + ASSERT_EQ(0U, cf_meta.levels[i].files.size()); + } + ASSERT_GT(cf_meta.levels[4U].files.size(), 0U); + verify_func(); + + Close(); + } + } + + env_->SetBackgroundThreads(1, Env::LOW); + env_->SetBackgroundThreads(1, Env::HIGH); +} + +// Test specific cases in dynamic max bytes +TEST(DBTest, DynamicLevelMaxBytesBase2) { + Random rnd(301); + int kMaxKey = 1000000; + + Options options = CurrentOptions(); + options.create_if_missing = true; + options.db_write_buffer_size = 2048; + options.write_buffer_size = 2048; + options.max_write_buffer_number = 2; + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 9999; + options.level0_stop_writes_trigger = 9999; + options.target_file_size_base = 2048; + options.level_compaction_dynamic_level_bytes = true; + options.max_bytes_for_level_base = 10240; + options.max_bytes_for_level_multiplier = 4; + options.max_background_compactions = 2; + options.num_levels = 5; + options.expanded_compaction_factor = 0; // Force not expanding in compactions + BlockBasedTableOptions table_options; + table_options.block_size = 1024; + options.table_factory.reset(NewBlockBasedTableFactory(table_options)); + + DestroyAndReopen(options); + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "true"}, + })); + + uint64_t int_prop; + std::string str_prop; + + // Initial base level is the last level + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(4U, int_prop); + + // Put about 7K to L0 + for (int i = 0; i < 70; i++) { + ASSERT_OK(Put(Key(static_cast(rnd.Uniform(kMaxKey))), + RandomString(&rnd, 80))); + } + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "false"}, + })); + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(4U, int_prop); + + // Insert extra about 3.5K to L0. After they are compacted to L4, base level + // should be changed to L3. + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "true"}, + })); + for (int i = 0; i < 70; i++) { + ASSERT_OK(Put(Key(static_cast(rnd.Uniform(kMaxKey))), + RandomString(&rnd, 80))); + } + + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "false"}, + })); + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(3U, int_prop); + ASSERT_TRUE(db_->GetProperty("rocksdb.num-files-at-level3", &str_prop)); + ASSERT_EQ("0", str_prop); + + // Trigger parallel compaction, and the first one would change the base + // level. + // Hold compaction jobs to make sure + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::Run:Start", + [&]() { env_->SleepForMicroseconds(100000); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "true"}, + })); + // Write about 10K more + for (int i = 0; i < 100; i++) { + ASSERT_OK(Put(Key(static_cast(rnd.Uniform(kMaxKey))), + RandomString(&rnd, 80))); + } + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "false"}, + })); + Flush(); + // Wait for 200 milliseconds before proceeding compactions to make sure two + // parallel ones are executed. + env_->SleepForMicroseconds(200000); + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(3U, int_prop); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + // Trigger a condition that the compaction changes base level and L0->Lbase + // happens at the same time. + // We try to make last levels' targets to be 10K, 40K, 160K, add triggers + // another compaction from 40K->160K. + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "true"}, + })); + // Write about 150K more + for (int i = 0; i < 1350; i++) { + ASSERT_OK(Put(Key(static_cast(rnd.Uniform(kMaxKey))), + RandomString(&rnd, 80))); + } + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "false"}, + })); + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(2U, int_prop); + + // Keep Writing data until base level changed 2->1. There will be L0->L2 + // compaction going on at the same time. + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + for (int attempt = 0; attempt <= 20; attempt++) { + // Write about 5K more data with two flushes. It should be flush to level 2 + // but when it is applied, base level is already 1. + for (int i = 0; i < 50; i++) { + ASSERT_OK(Put(Key(static_cast(rnd.Uniform(kMaxKey))), + RandomString(&rnd, 80))); + } + Flush(); + + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + if (int_prop == 2U) { + env_->SleepForMicroseconds(50000); + } else { + break; + } + } + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + + env_->SleepForMicroseconds(200000); + + ASSERT_TRUE(db_->GetIntProperty("rocksdb.base-level", &int_prop)); + ASSERT_EQ(1U, int_prop); +} + TEST(DBTest, DynamicCompactionOptions) { // minimum write buffer size is enforced at 64KB const uint64_t k32KB = 1 << 15; diff --git a/db/internal_stats.cc b/db/internal_stats.cc index e557f9e06..6d7cd72bd 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -142,6 +142,8 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property, return kOldestSnapshotTime; } else if (in == "num-live-versions") { return kNumLiveVersions; + } else if (in == "base-level") { + return kBaseLevel; } return kUnknown; } @@ -284,6 +286,9 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type, *value = db->IsFileDeletionsEnabled(); return true; #endif + case kBaseLevel: + *value = vstorage->base_level(); + return true; default: return false; } diff --git a/db/internal_stats.h b/db/internal_stats.h index 02bdabd09..9d2b7788c 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -51,6 +51,7 @@ enum DBPropertyType : uint32_t { kNumSnapshots, // Number of snapshots in the system kOldestSnapshotTime, // Unix timestamp of the first snapshot kNumLiveVersions, + kBaseLevel, // The level that L0 data is compacted to }; extern DBPropertyType GetPropertyType(const Slice& property, diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index a48b4e3a2..33d3fa269 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -53,15 +53,17 @@ class VersionBuilderTest { void Add(int level, uint32_t file_number, const char* smallest, const char* largest, uint64_t file_size = 0, uint32_t path_id = 0, - SequenceNumber smallest_seq = 100, - SequenceNumber largest_seq = 100, + SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100, uint64_t num_entries = 0, uint64_t num_deletions = 0, - bool sampled = false) { + bool sampled = false, SequenceNumber smallest_seqno = 0, + SequenceNumber largest_seqno = 0) { assert(level < vstorage_.num_levels()); FileMetaData* f = new FileMetaData; f->fd = FileDescriptor(file_number, path_id, file_size); f->smallest = GetInternalKey(smallest, smallest_seq); f->largest = GetInternalKey(largest, largest_seq); + f->smallest_seqno = smallest_seqno; + f->largest_seqno = largest_seqno; f->compensated_file_size = file_size; f->refs = 0; f->num_entries = num_entries; @@ -78,20 +80,31 @@ class VersionBuilderTest { vstorage_.UpdateNumNonEmptyLevels(); vstorage_.GenerateFileIndexer(); vstorage_.GenerateLevelFilesBrief(); + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); vstorage_.SetFinalized(); } }; +void UnrefFilesInVersion(VersionStorageInfo* new_vstorage) { + for (int i = 0; i < new_vstorage->num_levels(); i++) { + for (auto* f : new_vstorage->LevelFiles(i)) { + if (--f->refs == 0) { + delete f; + } + } + } +} + TEST(VersionBuilderTest, ApplyAndSaveTo) { Add(0, 1U, "150", "200", 100U); - // Level 1 score 1.2 + Add(1, 66U, "150", "200", 100U); Add(1, 88U, "201", "300", 100U); - // Level 2 score 1.8. File 7 is the largest. Should be picked + Add(2, 6U, "150", "179", 100U); Add(2, 7U, "180", "220", 100U); Add(2, 8U, "221", "300", 100U); - // Level 3 score slightly larger than 1 + Add(3, 26U, "150", "170", 100U); Add(3, 27U, "171", "179", 100U); Add(3, 28U, "191", "220", 100U); @@ -115,13 +128,83 @@ TEST(VersionBuilderTest, ApplyAndSaveTo) { ASSERT_EQ(400U, new_vstorage.NumLevelBytes(2)); ASSERT_EQ(300U, new_vstorage.NumLevelBytes(3)); - for (int i = 0; i < new_vstorage.num_levels(); i++) { - for (auto* f : new_vstorage.LevelFiles(i)) { - if (--f->refs == 0) { - delete f; - } - } - } + UnrefFilesInVersion(&new_vstorage); +} + +TEST(VersionBuilderTest, ApplyAndSaveToDynamic) { + ioptions_.level_compaction_dynamic_level_bytes = true; + + Add(0, 1U, "150", "200", 100U, 0, 200U, 200U, 0, 0, false, 200U, 200U); + Add(0, 88U, "201", "300", 100U, 0, 100U, 100U, 0, 0, false, 100U, 100U); + + Add(4, 6U, "150", "179", 100U); + Add(4, 7U, "180", "220", 100U); + Add(4, 8U, "221", "300", 100U); + + Add(5, 26U, "150", "170", 100U); + Add(5, 27U, "171", "179", 100U); + UpdateVersionStorageInfo(); + + VersionEdit version_edit; + version_edit.AddFile(3, 666, 0, 100U, GetInternalKey("301"), + GetInternalKey("350"), 200, 200); + version_edit.DeleteFile(0, 1U); + version_edit.DeleteFile(0, 88U); + + EnvOptions env_options; + + VersionBuilder version_builder(env_options, nullptr, &vstorage_); + + VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, + kCompactionStyleLevel, nullptr); + version_builder.Apply(&version_edit); + version_builder.SaveTo(&new_vstorage); + + ASSERT_EQ(0U, new_vstorage.NumLevelBytes(0)); + ASSERT_EQ(100U, new_vstorage.NumLevelBytes(3)); + ASSERT_EQ(300U, new_vstorage.NumLevelBytes(4)); + ASSERT_EQ(200U, new_vstorage.NumLevelBytes(5)); + + UnrefFilesInVersion(&new_vstorage); +} + +TEST(VersionBuilderTest, ApplyAndSaveToDynamic2) { + ioptions_.level_compaction_dynamic_level_bytes = true; + + Add(0, 1U, "150", "200", 100U, 0, 200U, 200U, 0, 0, false, 200U, 200U); + Add(0, 88U, "201", "300", 100U, 0, 100U, 100U, 0, 0, false, 100U, 100U); + + Add(4, 6U, "150", "179", 100U); + Add(4, 7U, "180", "220", 100U); + Add(4, 8U, "221", "300", 100U); + + Add(5, 26U, "150", "170", 100U); + Add(5, 27U, "171", "179", 100U); + UpdateVersionStorageInfo(); + + VersionEdit version_edit; + version_edit.AddFile(4, 666, 0, 100U, GetInternalKey("301"), + GetInternalKey("350"), 200, 200); + version_edit.DeleteFile(0, 1U); + version_edit.DeleteFile(0, 88U); + version_edit.DeleteFile(4, 6U); + version_edit.DeleteFile(4, 7U); + version_edit.DeleteFile(4, 8U); + + EnvOptions env_options; + + VersionBuilder version_builder(env_options, nullptr, &vstorage_); + + VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, + kCompactionStyleLevel, nullptr); + version_builder.Apply(&version_edit); + version_builder.SaveTo(&new_vstorage); + + ASSERT_EQ(0U, new_vstorage.NumLevelBytes(0)); + ASSERT_EQ(100U, new_vstorage.NumLevelBytes(4)); + ASSERT_EQ(200U, new_vstorage.NumLevelBytes(5)); + + UnrefFilesInVersion(&new_vstorage); } TEST(VersionBuilderTest, ApplyMultipleAndSaveTo) { @@ -150,13 +233,7 @@ TEST(VersionBuilderTest, ApplyMultipleAndSaveTo) { ASSERT_EQ(500U, new_vstorage.NumLevelBytes(2)); - for (int i = 0; i < new_vstorage.num_levels(); i++) { - for (auto* f : new_vstorage.LevelFiles(i)) { - if (--f->refs == 0) { - delete f; - } - } - } + UnrefFilesInVersion(&new_vstorage); } TEST(VersionBuilderTest, ApplyDeleteAndSaveTo) { @@ -193,13 +270,7 @@ TEST(VersionBuilderTest, ApplyDeleteAndSaveTo) { ASSERT_EQ(300U, new_vstorage.NumLevelBytes(2)); - for (int i = 0; i < new_vstorage.num_levels(); i++) { - for (auto* f : new_vstorage.LevelFiles(i)) { - if (--f->refs == 0) { - delete f; - } - } - } + UnrefFilesInVersion(&new_vstorage); } TEST(VersionBuilderTest, EstimatedActiveKeys) { diff --git a/db/version_set.cc b/db/version_set.cc index 2824b5944..61353c0cc 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -726,6 +726,7 @@ VersionStorageInfo::VersionStorageInfo( file_indexer_(user_comparator), compaction_style_(compaction_style), files_(new std::vector[num_levels_]), + base_level_(1), files_by_size_(num_levels_), next_file_to_compact_by_size_(num_levels_), compaction_score_(num_levels_), @@ -856,10 +857,11 @@ void VersionStorageInfo::GenerateLevelFilesBrief() { } } -void Version::PrepareApply() { +void Version::PrepareApply(const MutableCFOptions& mutable_cf_options) { UpdateAccumulatedStats(); - storage_info_.UpdateFilesBySize(); storage_info_.UpdateNumNonEmptyLevels(); + storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options); + storage_info_.UpdateFilesBySize(); storage_info_.GenerateFileIndexer(); storage_info_.GenerateLevelFilesBrief(); } @@ -1018,7 +1020,7 @@ void VersionStorageInfo::ComputeCompactionScore( } } score = static_cast(level_bytes_no_compacting) / - mutable_cf_options.MaxBytesForLevel(level); + MaxBytesForLevel(level); if (max_score < score) { max_score = score; max_score_level = level; @@ -1077,6 +1079,45 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f) { level_files->push_back(f); } +// Version::PrepareApply() need to be called before calling the function, or +// following functions called: +// 1. UpdateNumNonEmptyLevels(); +// 2. CalculateBaseBytes(); +// 3. UpdateFilesBySize(); +// 4. GenerateFileIndexer(); +// 5. GenerateLevelFilesBrief(); +void VersionStorageInfo::SetFinalized() { + finalized_ = true; +#ifndef NDEBUG + assert(base_level_ >= 1 && (num_levels() <= 1 || base_level_ < num_levels())); + // Verify all levels newer than base_level are empty except L0 + for (int level = 1; level < base_level(); level++) { + assert(NumLevelBytes(level) == 0); + } + uint64_t max_bytes_prev_level = 0; + for (int level = base_level(); level < num_levels() - 1; level++) { + if (LevelFiles(level).size() == 0) { + continue; + } + assert(MaxBytesForLevel(level) >= max_bytes_prev_level); + max_bytes_prev_level = MaxBytesForLevel(level); + } + int num_empty_non_l0_level = 0; + for (int level = 0; level < num_levels(); level++) { + assert(LevelFiles(level).size() == 0 || + LevelFiles(level).size() == LevelFilesBrief(level).num_files); + if (level > 0 && NumLevelBytes(level) > 0) { + num_empty_non_l0_level++; + } + if (LevelFiles(level).size() > 0) { + assert(level < num_non_empty_levels()); + } + } + assert(compaction_level_.size() > 0); + assert(compaction_level_.size() == compaction_score_.size()); +#endif +} + void VersionStorageInfo::UpdateNumNonEmptyLevels() { num_non_empty_levels_ = num_levels_; for (int i = num_levels_ - 1; i >= 0; i--) { @@ -1399,7 +1440,15 @@ uint64_t VersionStorageInfo::NumLevelBytes(int level) const { const char* VersionStorageInfo::LevelSummary( LevelSummaryStorage* scratch) const { - int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files["); + int len = 0; + if (num_levels() > 1) { + assert(base_level_ < static_cast(level_max_bytes_.size())); + len = snprintf(scratch->buffer, sizeof(scratch->buffer), + "base level %d max bytes base %" PRIu64, base_level_, + level_max_bytes_[base_level_]); + } + len += + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, " files["); for (int i = 0; i < num_levels(); i++) { int sz = sizeof(scratch->buffer) - len; int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size())); @@ -1452,6 +1501,113 @@ int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() { return result; } +uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const { + // Note: the result for level zero is not really used since we set + // the level-0 compaction threshold based on number of files. + assert(level >= 0); + assert(level < static_cast(level_max_bytes_.size())); + return level_max_bytes_[level]; +} + +void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions, + const MutableCFOptions& options) { + level_max_bytes_.resize(ioptions.num_levels); + if (!ioptions.level_compaction_dynamic_level_bytes) { + base_level_ = 1; + + // Calculate for static bytes base case + for (int i = 0; i < ioptions.num_levels; ++i) { + if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) { + level_max_bytes_[i] = options.max_bytes_for_level_base; + } else if (i > 1) { + level_max_bytes_[i] = MultiplyCheckOverflow( + MultiplyCheckOverflow(level_max_bytes_[i - 1], + options.max_bytes_for_level_multiplier), + options.max_bytes_for_level_multiplier_additional[i - 1]); + } else { + level_max_bytes_[i] = options.max_bytes_for_level_base; + } + } + } else { + uint64_t max_level_size = 0; + + int first_non_empty_level = -1; + // Find size of non-L0 level of most data. + // Cannot use the size of the last level because it can be empty or less + // than previous levels after compaction. + for (int i = 1; i < num_levels_; i++) { + uint64_t total_size = 0; + for (const auto& f : files_[i]) { + total_size += f->fd.GetFileSize(); + } + if (total_size > 0 && first_non_empty_level == -1) { + first_non_empty_level = i; + } + if (total_size > max_level_size) { + max_level_size = total_size; + } + } + + // Prefill every level's max bytes to disallow compaction from there. + for (int i = 0; i < num_levels_; i++) { + level_max_bytes_[i] = std::numeric_limits::max(); + } + + if (max_level_size == 0) { + // No data for L1 and up. L0 compacts to last level directly. + // No compaction from L1+ needs to be scheduled. + base_level_ = num_levels_ - 1; + } else { + uint64_t base_bytes_max = options.max_bytes_for_level_base; + uint64_t base_bytes_min = + base_bytes_max / options.max_bytes_for_level_multiplier; + + // Try whether we can make last level's target size to be max_level_size + uint64_t cur_level_size = max_level_size; + for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) { + // Round up after dividing + cur_level_size /= options.max_bytes_for_level_multiplier; + } + + // Calculate base level and its size. + int base_level_size; + if (cur_level_size <= base_bytes_min) { + // Case 1. If we make target size of last level to be max_level_size, + // target size of the first non-empty level would be smaller than + // base_bytes_min. We set it be base_bytes_min. + base_level_size = static_cast(base_bytes_min + 1); + base_level_ = first_non_empty_level; + Warn(ioptions.info_log, + "More existing levels in DB than needed. " + "max_bytes_for_level_multiplier may not be guaranteed."); + } else { + // Find base level (where L0 data is compacted to). + base_level_ = first_non_empty_level; + while (base_level_ > 1 && cur_level_size > base_bytes_max) { + --base_level_; + cur_level_size = + cur_level_size / options.max_bytes_for_level_multiplier; + } + if (cur_level_size > base_bytes_max) { + // Even L1 will be too large + assert(base_level_ == 1); + base_level_size = static_cast(base_bytes_max); + } else { + base_level_size = static_cast(cur_level_size); + } + } + + int level_size = base_level_size; + for (int i = base_level_; i < num_levels_; i++) { + if (i > base_level_) { + level_size = level_size * options.max_bytes_for_level_multiplier; + } + level_max_bytes_[i] = level_size; + } + } + } +} + void Version::AddLiveFiles(std::vector* live) { for (int level = 0; level < storage_info_.num_levels(); level++) { const std::vector& files = storage_info_.files_[level]; @@ -1679,7 +1835,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!edit->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. - v->PrepareApply(); + v->PrepareApply(mutable_cf_options); } // Write new record to MANIFEST log @@ -2102,7 +2258,7 @@ Status VersionSet::Recover( builder->SaveTo(v->storage_info()); // Install recovered version - v->PrepareApply(); + v->PrepareApply(*cfd->GetLatestMutableCFOptions()); AppendVersion(cfd, v); } @@ -2436,7 +2592,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, Version* v = new Version(cfd, this, current_version_number_++); builder->SaveTo(v->storage_info()); - v->PrepareApply(); + v->PrepareApply(*cfd->GetLatestMutableCFOptions()); printf("--------------- Column family \"%s\" (ID %u) --------------\n", cfd->GetName().c_str(), (unsigned int)cfd->GetID()); @@ -2700,6 +2856,18 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { "[%s] compaction output being applied to a different base version from" " input version", c->column_family_data()->GetName().c_str()); + if (c->start_level() == 0 && c->num_input_levels() > 2U) { + // We are doing a L0->base_level compaction. The assumption is if + // base level is not L1, levels from L1 to base_level - 1 is empty. + // This is ensured by having one compaction from L0 going on at the + // same time in level-based compaction. So that during the time, no + // compaction/flush can put files to those levels. + for (int l = c->start_level() + 1; l < c->output_level(); l++) { + if (vstorage->NumLevelFiles(l) != 0) { + return false; + } + } + } } for (size_t input = 0; input < c->num_input_levels(); ++input) { @@ -2797,6 +2965,9 @@ ColumnFamilyData* VersionSet::CreateColumnFamily( Version* v = new Version(new_cfd, this, current_version_number_++); + // Fill level target base information. + v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(), + *new_cfd->GetLatestMutableCFOptions()); AppendVersion(new_cfd, v); // GetLatestMutableCFOptions() is safe here without mutex since the // cfd is not available to client diff --git a/db/version_set.h b/db/version_set.h index 353454a19..5d56e128c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -63,8 +63,7 @@ class MergeIteratorBuilder; // REQUIRES: "file_level.files" contains a sorted list of // non-overlapping files. extern int FindFile(const InternalKeyComparator& icmp, - const LevelFilesBrief& file_level, - const Slice& key); + const LevelFilesBrief& file_level, const Slice& key); // Returns true iff some file in "files" overlaps the user key range // [*smallest,*largest]. @@ -72,19 +71,18 @@ extern int FindFile(const InternalKeyComparator& icmp, // largest==nullptr represents a key largest than all keys in the DB. // REQUIRES: If disjoint_sorted_files, file_level.files[] // contains disjoint ranges in sorted order. -extern bool SomeFileOverlapsRange( - const InternalKeyComparator& icmp, - bool disjoint_sorted_files, - const LevelFilesBrief& file_level, - const Slice* smallest_user_key, - const Slice* largest_user_key); +extern bool SomeFileOverlapsRange(const InternalKeyComparator& icmp, + bool disjoint_sorted_files, + const LevelFilesBrief& file_level, + const Slice* smallest_user_key, + const Slice* largest_user_key); // Generate LevelFilesBrief from vector // Would copy smallest_key and largest_key data to sequential memory // arena: Arena used to allocate the memory extern void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level, - const std::vector& files, - Arena* arena); + const std::vector& files, + Arena* arena); class VersionStorageInfo { public: @@ -98,7 +96,7 @@ class VersionStorageInfo { void AddFile(int level, FileMetaData* f); - void SetFinalized() { finalized_ = true; } + void SetFinalized(); // Update num_non_empty_levels_. void UpdateNumNonEmptyLevels(); @@ -148,15 +146,17 @@ class VersionStorageInfo { int* file_index = nullptr); // return index of overlap file void GetOverlappingInputsBinarySearch( - int level, const Slice& begin, // nullptr means before all keys - const Slice& end, // nullptr means after all keys + int level, + const Slice& begin, // nullptr means before all keys + const Slice& end, // nullptr means after all keys std::vector* inputs, int hint_index, // index of overlap file int* file_index); // return index of overlap file void ExtendOverlappingInputs( - int level, const Slice& begin, // nullptr means before all keys - const Slice& end, // nullptr means after all keys + int level, + const Slice& begin, // nullptr means before all keys + const Slice& end, // nullptr means after all keys std::vector* inputs, unsigned int index); // start extending from this index @@ -213,6 +213,8 @@ class VersionStorageInfo { return files_by_size_[level]; } + int base_level() const { return base_level_; } + // REQUIRES: lock is held // Set the index that is used to offset into files_by_size_ to find // the next compaction candidate file. @@ -281,12 +283,22 @@ class VersionStorageInfo { return internal_comparator_; } + // Returns maximum total bytes of data on a given level. + uint64_t MaxBytesForLevel(int level) const; + + // Must be called after any change to MutableCFOptions. + void CalculateBaseBytes(const ImmutableCFOptions& ioptions, + const MutableCFOptions& options); + private: const InternalKeyComparator* internal_comparator_; const Comparator* user_comparator_; int num_levels_; // Number of levels int num_non_empty_levels_; // Number of levels. Any level larger than it // is guaranteed to be empty. + // Per-level max bytes + std::vector level_max_bytes_; + // A short brief metadata of files per level autovector level_files_brief_; FileIndexer file_indexer_; @@ -298,6 +310,10 @@ class VersionStorageInfo { // in increasing order of keys std::vector* files_; + // Level that L0 data should be compacted to. All levels < base_level_ should + // be empty. + int base_level_; + // A list for the same set of files that are stored in files_, // but files in each level are now sorted based on file // size. The file with the largest size is at the front. @@ -366,7 +382,7 @@ class Version { // Loads some stats information from files. Call without mutex held. It needs // to be called before applying the version to the version set. - void PrepareApply(); + void PrepareApply(const MutableCFOptions& mutable_cf_options); // Reference count management (so Versions do not disappear out from // under live iterators) @@ -407,7 +423,6 @@ class Version { ColumnFamilyData* cfd() const { return cfd_; } - // Return the next Version in the linked list. Used for debug only Version* TEST_Next() const { return next_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 9920a9e05..11244dd64 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -73,6 +73,149 @@ TEST(GenerateLevelFilesBriefTest, Multiple) { ASSERT_EQ(0, Compare()); } +class CountingLogger : public Logger { + public: + CountingLogger() : log_count(0) {} + using Logger::Logv; + virtual void Logv(const char* format, va_list ap) override { log_count++; } + int log_count; +}; + +Options GetOptionsWithNumLevels(int num_levels, + std::shared_ptr logger) { + Options opt; + opt.num_levels = num_levels; + opt.info_log = logger; + return opt; +} + +class VersionStorageInfoTest { + public: + const Comparator* ucmp_; + InternalKeyComparator icmp_; + std::shared_ptr logger_; + Options options_; + ImmutableCFOptions ioptions_; + MutableCFOptions mutable_cf_options_; + VersionStorageInfo vstorage_; + + InternalKey GetInternalKey(const char* ukey, + SequenceNumber smallest_seq = 100) { + return InternalKey(ukey, smallest_seq, kTypeValue); + } + + VersionStorageInfoTest() + : ucmp_(BytewiseComparator()), + icmp_(ucmp_), + logger_(new CountingLogger()), + options_(GetOptionsWithNumLevels(6, logger_)), + ioptions_(options_), + mutable_cf_options_(options_, ioptions_), + vstorage_(&icmp_, ucmp_, 6, kCompactionStyleLevel, nullptr) {} + + ~VersionStorageInfoTest() { + for (int i = 0; i < vstorage_.num_levels(); i++) { + for (auto* f : vstorage_.LevelFiles(i)) { + if (--f->refs == 0) { + delete f; + } + } + } + } + + void Add(int level, uint32_t file_number, const char* smallest, + const char* largest, uint64_t file_size = 0) { + assert(level < vstorage_.num_levels()); + FileMetaData* f = new FileMetaData; + f->fd = FileDescriptor(file_number, 0, file_size); + f->smallest = GetInternalKey(smallest, 0); + f->largest = GetInternalKey(largest, 0); + f->compensated_file_size = file_size; + f->refs = 0; + f->num_entries = 0; + f->num_deletions = 0; + vstorage_.AddFile(level, f); + } +}; + +TEST(VersionStorageInfoTest, MaxBytesForLevelStatic) { + ioptions_.level_compaction_dynamic_level_bytes = false; + mutable_cf_options_.max_bytes_for_level_base = 10; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + Add(4, 100U, "1", "2"); + Add(5, 101U, "1", "2"); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 10U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 50U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 250U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1250U); + + ASSERT_EQ(0, logger_->log_count); +} + +TEST(VersionStorageInfoTest, MaxBytesForLevelDynamic) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 5; + Add(5, 1U, "1", "2", 500U); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.base_level(), 5); + + Add(5, 2U, "3", "4", 550U); + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 210U); + ASSERT_EQ(vstorage_.base_level(), 4); + + Add(4, 3U, "3", "4", 550U); + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(0, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 210U); + ASSERT_EQ(vstorage_.base_level(), 4); + + Add(3, 4U, "3", "4", 250U); + Add(3, 5U, "5", "7", 300U); + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(1, logger_->log_count); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 201U); + ASSERT_EQ(vstorage_.base_level(), 3); + + Add(1, 6U, "3", "4", 5U); + Add(1, 7U, "8", "9", 5U); + logger_->log_count = 0; + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(1, logger_->log_count); + ASSERT_GT(vstorage_.MaxBytesForLevel(4), 1005U); + ASSERT_GT(vstorage_.MaxBytesForLevel(3), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 1005U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 201U); + ASSERT_EQ(vstorage_.base_level(), 1); +} + +TEST(VersionStorageInfoTest, MaxBytesForLevelDynamicLotsOfData) { + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.max_bytes_for_level_base = 100; + mutable_cf_options_.max_bytes_for_level_multiplier = 2; + Add(0, 1U, "1", "2", 50U); + Add(1, 2U, "1", "2", 50U); + Add(2, 3U, "1", "2", 500U); + Add(3, 4U, "1", "2", 500U); + Add(4, 5U, "1", "2", 1700U); + Add(5, 6U, "1", "2", 500U); + + vstorage_.CalculateBaseBytes(ioptions_, mutable_cf_options_); + ASSERT_EQ(vstorage_.MaxBytesForLevel(4), 800U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(3), 400U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(2), 200U); + ASSERT_EQ(vstorage_.MaxBytesForLevel(1), 100U); + ASSERT_EQ(vstorage_.base_level(), 1); + ASSERT_EQ(0, logger_->log_count); +} + class FindLevelFileTest { public: LevelFilesBrief file_level_; diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 790685fbc..1551d2674 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -87,6 +87,8 @@ struct ImmutableCFOptions { CompressionOptions compression_opts; + bool level_compaction_dynamic_level_bytes; + Options::AccessHint access_hint_on_compaction_start; int num_levels; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index cbc0093f5..f1565498d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -343,6 +343,65 @@ struct ColumnFamilyOptions { // Dynamically changeable through SetOptions() API uint64_t max_bytes_for_level_base; + // If true, RocksDB will pick target size of each level dynamically. + // We will pick a base level b >= 1. L0 will be directly merged into level b, + // instead of always into level 1. Level 1 to b-1 need to be empty. + // We try to pick b and its target size so that + // 1. target size is in the range of + // (max_bytes_for_level_base / max_bytes_for_level_multiplier, + // max_bytes_for_level_base] + // 2. target size of the last level (level num_levels-1) equals to extra size + // of the level. + // At the same time max_bytes_for_level_multiplier and + // max_bytes_for_level_multiplier_additional are still satisfied. + // + // With this option on, from an empty DB, we make last level the base level, + // which means merging L0 data into the last level, until it exceeds + // max_bytes_for_level_base. And then we make the second last level to be + // base level, to start to merge L0 data to second last level, with its + // target size to be 1/max_bytes_for_level_multiplier of the last level's + // extra size. After the data accumulates more so that we need to move the + // base level to the third last one, and so on. + // + // For example, assume max_bytes_for_level_multiplier=10, num_levels=6, + // and max_bytes_for_level_base=10MB. + // Target sizes of level 1 to 5 starts with: + // [- - - - 10MB] + // with base level is level. Target sizes of level 1 to 4 are not applicable + // because they will not be used. + // Until the size of Level 5 grows to more than 10MB, say 11MB, we make + // base target to level 4 and now the targets looks like: + // [- - - 1.1MB 11MB] + // While data are accumulated, size targets are tuned based on actual data + // of level 5. When level 5 has 50MB of data, the target is like: + // [- - - 5MB 50MB] + // Until level 5's actual size is more than 100MB, say 101MB. Now if we keep + // level 4 to be the base level, its target size needs to be 10.1MB, which + // doesn't satisfy the target size range. So now we make level 3 the target + // size and the target sizes of the levels look like: + // [- - 1.01MB 10.1MB 101MB] + // In the same way, while level 5 further grows, all levels' targets grow, + // like + // [- - 5MB 50MB 500MB] + // Until level 5 exceeds 1000MB and becomes 1001MB, we make level 2 the + // base level and make levels' target sizes like this: + // [- 1.001MB 10.01MB 100.1MB 1001MB] + // and go on... + // + // By doing it, we give max_bytes_for_level_multiplier a priority against + // max_bytes_for_level_base, for a more predictable LSM tree shape. It is + // useful to limit worse case space amplification. + // + // max_bytes_for_level_multiplier_additional is ignored with this flag on. + // + // Turning this feature on or off for an existing DB can cause unexpected + // LSM tree structure so it's not recommended. + // + // NOTE: this option is experimental + // + // Default: false + bool level_compaction_dynamic_level_bytes; + // Default: 10. // // Dynamically changeable through SetOptions() API diff --git a/util/mutable_cf_options.cc b/util/mutable_cf_options.cc index 4ec2a4138..187a97ae6 100644 --- a/util/mutable_cf_options.cc +++ b/util/mutable_cf_options.cc @@ -18,7 +18,6 @@ namespace rocksdb { -namespace { // Multiple two operands. If they overflow, return op1. uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { if (op1 == 0) { @@ -33,26 +32,18 @@ uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { } return op1 * casted_op2; } -} // anonymous namespace void MutableCFOptions::RefreshDerivedOptions( const ImmutableCFOptions& ioptions) { max_file_size.resize(ioptions.num_levels); - level_max_bytes.resize(ioptions.num_levels); for (int i = 0; i < ioptions.num_levels; ++i) { if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) { max_file_size[i] = ULLONG_MAX; - level_max_bytes[i] = max_bytes_for_level_base; } else if (i > 1) { max_file_size[i] = MultiplyCheckOverflow(max_file_size[i - 1], target_file_size_multiplier); - level_max_bytes[i] = MultiplyCheckOverflow( - MultiplyCheckOverflow(level_max_bytes[i - 1], - max_bytes_for_level_multiplier), - max_bytes_for_level_multiplier_additional[i - 1]); } else { max_file_size[i] = target_file_size_base; - level_max_bytes[i] = max_bytes_for_level_base; } } } @@ -62,13 +53,6 @@ uint64_t MutableCFOptions::MaxFileSizeForLevel(int level) const { assert(level < (int)max_file_size.size()); return max_file_size[level]; } -uint64_t MutableCFOptions::MaxBytesForLevel(int level) const { - // Note: the result for level zero is not really used since we set - // the level-0 compaction threshold based on number of files. - assert(level >= 0); - assert(level < (int)level_max_bytes.size()); - return level_max_bytes[level]; -} uint64_t MutableCFOptions::MaxGrandParentOverlapBytes(int level) const { return MaxFileSizeForLevel(level) * max_grandparent_overlap_factor; } diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 9f876ace0..6c3b13ed3 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -79,8 +79,6 @@ struct MutableCFOptions { // Get the max file size in a given level. uint64_t MaxFileSizeForLevel(int level) const; - // Returns maximum total bytes of data on a given level. - uint64_t MaxBytesForLevel(int level) const; // Returns maximum total overlap bytes with grandparent // level (i.e., level+2) before we stop building a single // file in level->level+1 compaction. @@ -124,8 +122,8 @@ struct MutableCFOptions { // Derived options // Per-level target file size. std::vector max_file_size; - // Per-level max bytes - std::vector level_max_bytes; }; +uint64_t MultiplyCheckOverflow(uint64_t op1, int op2); + } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index cfebae61e..2c126d028 100644 --- a/util/options.cc +++ b/util/options.cc @@ -35,43 +35,48 @@ namespace rocksdb { ImmutableCFOptions::ImmutableCFOptions(const Options& options) - : compaction_style(options.compaction_style), - compaction_options_universal(options.compaction_options_universal), - compaction_options_fifo(options.compaction_options_fifo), - prefix_extractor(options.prefix_extractor.get()), - comparator(options.comparator), - merge_operator(options.merge_operator.get()), - compaction_filter(options.compaction_filter), - compaction_filter_factory(options.compaction_filter_factory.get()), - compaction_filter_factory_v2(options.compaction_filter_factory_v2.get()), - inplace_update_support(options.inplace_update_support), - inplace_callback(options.inplace_callback), - info_log(options.info_log.get()), - statistics(options.statistics.get()), - env(options.env), - allow_mmap_reads(options.allow_mmap_reads), - allow_mmap_writes(options.allow_mmap_writes), - db_paths(options.db_paths), - memtable_factory(options.memtable_factory.get()), - table_factory(options.table_factory.get()), - table_properties_collector_factories( - options.table_properties_collector_factories), - advise_random_on_open(options.advise_random_on_open), - bloom_locality(options.bloom_locality), - purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), - min_partial_merge_operands(options.min_partial_merge_operands), - disable_data_sync(options.disableDataSync), - use_fsync(options.use_fsync), - compression(options.compression), - compression_per_level(options.compression_per_level), - compression_opts(options.compression_opts), - access_hint_on_compaction_start(options.access_hint_on_compaction_start), - num_levels(options.num_levels), - optimize_filters_for_hits(options.optimize_filters_for_hits) + : compaction_style(options.compaction_style), + compaction_options_universal(options.compaction_options_universal), + compaction_options_fifo(options.compaction_options_fifo), + prefix_extractor(options.prefix_extractor.get()), + comparator(options.comparator), + merge_operator(options.merge_operator.get()), + compaction_filter(options.compaction_filter), + compaction_filter_factory(options.compaction_filter_factory.get()), + compaction_filter_factory_v2(options.compaction_filter_factory_v2.get()), + inplace_update_support(options.inplace_update_support), + inplace_callback(options.inplace_callback), + info_log(options.info_log.get()), + statistics(options.statistics.get()), + env(options.env), + allow_mmap_reads(options.allow_mmap_reads), + allow_mmap_writes(options.allow_mmap_writes), + db_paths(options.db_paths), + memtable_factory(options.memtable_factory.get()), + table_factory(options.table_factory.get()), + table_properties_collector_factories( + options.table_properties_collector_factories), + advise_random_on_open(options.advise_random_on_open), + bloom_locality(options.bloom_locality), + purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), + min_partial_merge_operands(options.min_partial_merge_operands), + disable_data_sync(options.disableDataSync), + use_fsync(options.use_fsync), + compression(options.compression), + compression_per_level(options.compression_per_level), + compression_opts(options.compression_opts), + level_compaction_dynamic_level_bytes( + options.level_compaction_dynamic_level_bytes), + access_hint_on_compaction_start(options.access_hint_on_compaction_start), + num_levels(options.num_levels), + optimize_filters_for_hits(options.optimize_filters_for_hits) #ifndef ROCKSDB_LITE - , listeners(options.listeners) {} + , + listeners(options.listeners) { +} #else // ROCKSDB_LITE - {} +{ +} #endif // ROCKSDB_LITE ColumnFamilyOptions::ColumnFamilyOptions() @@ -94,6 +99,7 @@ ColumnFamilyOptions::ColumnFamilyOptions() target_file_size_base(2 * 1048576), target_file_size_multiplier(1), max_bytes_for_level_base(10 * 1048576), + level_compaction_dynamic_level_bytes(false), max_bytes_for_level_multiplier(10), max_bytes_for_level_multiplier_additional(num_levels, 1), expanded_compaction_factor(25), @@ -123,9 +129,10 @@ ColumnFamilyOptions::ColumnFamilyOptions() min_partial_merge_operands(2), optimize_filters_for_hits(false) #ifndef ROCKSDB_LITE - , listeners() { + , + listeners() { #else // ROCKSDB_LITE - { +{ #endif // ROCKSDB_LITE assert(memtable_factory.get() != nullptr); } @@ -153,6 +160,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) target_file_size_base(options.target_file_size_base), target_file_size_multiplier(options.target_file_size_multiplier), max_bytes_for_level_base(options.max_bytes_for_level_base), + level_compaction_dynamic_level_bytes( + options.level_compaction_dynamic_level_bytes), max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier), max_bytes_for_level_multiplier_additional( options.max_bytes_for_level_multiplier_additional), @@ -189,9 +198,10 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) min_partial_merge_operands(options.min_partial_merge_operands), optimize_filters_for_hits(options.optimize_filters_for_hits) #ifndef ROCKSDB_LITE - , listeners(options.listeners) { + , + listeners(options.listeners) { #else // ROCKSDB_LITE - { +{ #endif // ROCKSDB_LITE assert(memtable_factory.get() != nullptr); if (max_bytes_for_level_multiplier_additional.size() < @@ -411,6 +421,8 @@ void ColumnFamilyOptions::Dump(Logger* log) const { target_file_size_multiplier); Log(log," Options.max_bytes_for_level_base: %" PRIu64, max_bytes_for_level_base); + Log(log, "Options.level_compaction_dynamic_level_bytes: %d", + level_compaction_dynamic_level_bytes); Log(log," Options.max_bytes_for_level_multiplier: %d", max_bytes_for_level_multiplier); for (int i = 0; i < num_levels; i++) { diff --git a/util/options_helper.cc b/util/options_helper.cc index 028f8f38b..9501c2637 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -410,6 +410,9 @@ bool ParseColumnFamilyOption(const std::string& name, const std::string& value, ParseInt(value.substr(start, value.size() - start)); } else if (name == "num_levels") { new_options->num_levels = ParseInt(value); + } else if (name == "level_compaction_dynamic_level_bytes") { + new_options->level_compaction_dynamic_level_bytes = + ParseBoolean(name, value); } else if (name == "purge_redundant_kvs_while_flush") { new_options->purge_redundant_kvs_while_flush = ParseBoolean(name, value); diff --git a/util/options_test.cc b/util/options_test.cc index 001e6aa86..b5b7b140f 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -92,50 +92,52 @@ TEST(OptionsTest, LooseCondition) { #ifndef ROCKSDB_LITE // GetOptionsFromMap is not supported in ROCKSDB_LITE TEST(OptionsTest, GetOptionsFromMapTest) { std::unordered_map cf_options_map = { - {"write_buffer_size", "1"}, - {"max_write_buffer_number", "2"}, - {"min_write_buffer_number_to_merge", "3"}, - {"compression", "kSnappyCompression"}, - {"compression_per_level", "kNoCompression:" - "kSnappyCompression:" - "kZlibCompression:" - "kBZip2Compression:" - "kLZ4Compression:" - "kLZ4HCCompression"}, - {"compression_opts", "4:5:6"}, - {"num_levels", "7"}, - {"level0_file_num_compaction_trigger", "8"}, - {"level0_slowdown_writes_trigger", "9"}, - {"level0_stop_writes_trigger", "10"}, - {"max_mem_compaction_level", "11"}, - {"target_file_size_base", "12"}, - {"target_file_size_multiplier", "13"}, - {"max_bytes_for_level_base", "14"}, - {"max_bytes_for_level_multiplier", "15"}, - {"max_bytes_for_level_multiplier_additional", "16:17:18"}, - {"expanded_compaction_factor", "19"}, - {"source_compaction_factor", "20"}, - {"max_grandparent_overlap_factor", "21"}, - {"soft_rate_limit", "1.1"}, - {"hard_rate_limit", "2.1"}, - {"arena_block_size", "22"}, - {"disable_auto_compactions", "true"}, - {"purge_redundant_kvs_while_flush", "1"}, - {"compaction_style", "kCompactionStyleLevel"}, - {"verify_checksums_in_compaction", "false"}, - {"compaction_options_fifo", "23"}, - {"filter_deletes", "0"}, - {"max_sequential_skip_in_iterations", "24"}, - {"inplace_update_support", "true"}, - {"inplace_update_num_locks", "25"}, - {"memtable_prefix_bloom_bits", "26"}, - {"memtable_prefix_bloom_probes", "27"}, - {"memtable_prefix_bloom_huge_page_tlb_size", "28"}, - {"bloom_locality", "29"}, - {"max_successive_merges", "30"}, - {"min_partial_merge_operands", "31"}, - {"prefix_extractor", "fixed:31"}, - {"optimize_filters_for_hits", "true"}, + {"write_buffer_size", "1"}, + {"max_write_buffer_number", "2"}, + {"min_write_buffer_number_to_merge", "3"}, + {"compression", "kSnappyCompression"}, + {"compression_per_level", + "kNoCompression:" + "kSnappyCompression:" + "kZlibCompression:" + "kBZip2Compression:" + "kLZ4Compression:" + "kLZ4HCCompression"}, + {"compression_opts", "4:5:6"}, + {"num_levels", "7"}, + {"level0_file_num_compaction_trigger", "8"}, + {"level0_slowdown_writes_trigger", "9"}, + {"level0_stop_writes_trigger", "10"}, + {"max_mem_compaction_level", "11"}, + {"target_file_size_base", "12"}, + {"target_file_size_multiplier", "13"}, + {"max_bytes_for_level_base", "14"}, + {"level_compaction_dynamic_level_bytes", "true"}, + {"max_bytes_for_level_multiplier", "15"}, + {"max_bytes_for_level_multiplier_additional", "16:17:18"}, + {"expanded_compaction_factor", "19"}, + {"source_compaction_factor", "20"}, + {"max_grandparent_overlap_factor", "21"}, + {"soft_rate_limit", "1.1"}, + {"hard_rate_limit", "2.1"}, + {"arena_block_size", "22"}, + {"disable_auto_compactions", "true"}, + {"purge_redundant_kvs_while_flush", "1"}, + {"compaction_style", "kCompactionStyleLevel"}, + {"verify_checksums_in_compaction", "false"}, + {"compaction_options_fifo", "23"}, + {"filter_deletes", "0"}, + {"max_sequential_skip_in_iterations", "24"}, + {"inplace_update_support", "true"}, + {"inplace_update_num_locks", "25"}, + {"memtable_prefix_bloom_bits", "26"}, + {"memtable_prefix_bloom_probes", "27"}, + {"memtable_prefix_bloom_huge_page_tlb_size", "28"}, + {"bloom_locality", "29"}, + {"max_successive_merges", "30"}, + {"min_partial_merge_operands", "31"}, + {"prefix_extractor", "fixed:31"}, + {"optimize_filters_for_hits", "true"}, }; std::unordered_map db_options_map = { @@ -198,6 +200,7 @@ TEST(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.target_file_size_base, static_cast(12)); ASSERT_EQ(new_cf_opt.target_file_size_multiplier, 13); ASSERT_EQ(new_cf_opt.max_bytes_for_level_base, 14U); + ASSERT_EQ(new_cf_opt.level_compaction_dynamic_level_bytes, true); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier, 15); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional.size(), 3U); ASSERT_EQ(new_cf_opt.max_bytes_for_level_multiplier_additional[0], 16); diff --git a/util/sync_point.cc b/util/sync_point.cc index 68e99e07b..22e12f682 100644 --- a/util/sync_point.cc +++ b/util/sync_point.cc @@ -34,6 +34,20 @@ bool SyncPoint::PredecessorsAllCleared(const std::string& point) { return true; } +void SyncPoint::SetCallBack(const std::string point, + std::function callback) { + std::unique_lock lock(mutex_); + callbacks_[point] = callback; +} + +void SyncPoint::ClearAllCallBacks() { + std::unique_lock lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.clear(); +} + void SyncPoint::EnableProcessing() { std::unique_lock lock(mutex_); enabled_ = true; @@ -54,6 +68,16 @@ void SyncPoint::Process(const std::string& point) { if (!enabled_) return; + auto callback_pair = callbacks_.find(point); + if (callback_pair != callbacks_.end()) { + num_callbacks_running_++; + mutex_.unlock(); + callback_pair->second(); + mutex_.lock(); + num_callbacks_running_--; + cv_.notify_all(); + } + while (!PredecessorsAllCleared(point)) { cv_.wait(lock); } diff --git a/util/sync_point.h b/util/sync_point.h index b4b61a9fc..fda62c731 100644 --- a/util/sync_point.h +++ b/util/sync_point.h @@ -38,6 +38,11 @@ class SyncPoint { // sync points void LoadDependency(const std::vector& dependencies); + // Set up a call back function in sync point. + void SetCallBack(const std::string point, std::function callback); + // Clear all call back functions. + void ClearAllCallBacks(); + // enable sync point processing (disabled on startup) void EnableProcessing(); @@ -60,12 +65,14 @@ class SyncPoint { // successor/predecessor map loaded from LoadDependency std::unordered_map> successors_; std::unordered_map> predecessors_; + std::unordered_map > callbacks_; std::mutex mutex_; std::condition_variable cv_; // sync points that have been passed through std::unordered_set cleared_points_; bool enabled_ = false; + int num_callbacks_running_ = 0; }; } // namespace rocksdb