From 5ec53f3edf62bec1b690ce12fb21a6c52203f3c8 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Wed, 1 Oct 2014 16:19:16 -0700 Subject: [PATCH] make compaction related options changeable Summary: make compaction related options changeable. Most of changes are tedious, following the same convention: grabs MutableCFOptions at the beginning of compaction under mutex, then pass it throughout the job and register it in SuperVersion at the end. Test Plan: make all check Reviewers: igor, yhchiang, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D23349 --- db/column_family.cc | 75 +++++---- db/column_family.h | 16 +- db/compaction.cc | 7 +- db/compaction.h | 8 +- db/compaction_picker.cc | 238 ++++++++++++---------------- db/compaction_picker.h | 107 ++++++------- db/db_impl.cc | 159 ++++++++++++------- db/db_impl.h | 30 ++-- db/db_test.cc | 108 +++++++++++++ db/log_and_apply_bench.cc | 6 +- db/memtable_list.cc | 5 +- db/memtable_list.h | 3 +- db/repair.cc | 2 +- db/version_set.cc | 48 +++--- db/version_set.h | 17 +- db/write_batch_test.cc | 5 +- include/rocksdb/immutable_options.h | 3 + table/table_test.cc | 15 +- util/mutable_cf_options.cc | 72 +++++++++ util/mutable_cf_options.h | 67 +++++++- util/options.cc | 4 +- util/options_helper.cc | 90 ++++++----- 22 files changed, 686 insertions(+), 399 deletions(-) create mode 100644 util/mutable_cf_options.cc diff --git a/db/column_family.cc b/db/column_family.cc index f95090225..0beb23c91 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -230,7 +230,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, internal_comparator_(cf_options.comparator), options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)), ioptions_(options_), - mutable_cf_options_(options_), + mutable_cf_options_(options_, ioptions_), mem_(nullptr), imm_(options_.min_write_buffer_number_to_merge), super_version_(nullptr), @@ -245,27 +245,27 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, // if dummy_versions is nullptr, then this is a dummy column family. if (dummy_versions != nullptr) { internal_stats_.reset( - new InternalStats(options_.num_levels, db_options->env, this)); + new InternalStats(ioptions_.num_levels, db_options->env, this)); table_cache_.reset(new TableCache(ioptions_, env_options, table_cache)); - if (options_.compaction_style == kCompactionStyleUniversal) { + if (ioptions_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset( - new UniversalCompactionPicker(&options_, &internal_comparator_)); - } else if (options_.compaction_style == kCompactionStyleLevel) { + new UniversalCompactionPicker(ioptions_, &internal_comparator_)); + } else if (ioptions_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( - new LevelCompactionPicker(&options_, &internal_comparator_)); + new LevelCompactionPicker(ioptions_, &internal_comparator_)); } else { - assert(options_.compaction_style == kCompactionStyleFIFO); + assert(ioptions_.compaction_style == kCompactionStyleFIFO); compaction_picker_.reset( - new FIFOCompactionPicker(&options_, &internal_comparator_)); + new FIFOCompactionPicker(ioptions_, &internal_comparator_)); } - Log(options_.info_log, "Options for column family \"%s\":\n", + Log(ioptions_.info_log, "Options for column family \"%s\":\n", name.c_str()); const ColumnFamilyOptions* cf_options = &options_; - cf_options->Dump(options_.info_log.get()); + cf_options->Dump(ioptions_.info_log); } - RecalculateWriteStallConditions(); + RecalculateWriteStallConditions(mutable_cf_options_); } // DB mutex held @@ -318,7 +318,8 @@ ColumnFamilyData::~ColumnFamilyData() { } } -void ColumnFamilyData::RecalculateWriteStallConditions() { +void ColumnFamilyData::RecalculateWriteStallConditions( + const MutableCFOptions& mutable_cf_options) { if (current_ != nullptr) { const double score = current_->MaxCompactionScore(); const int max_level = current_->MaxCompactionScoreLevel(); @@ -328,26 +329,27 @@ void ColumnFamilyData::RecalculateWriteStallConditions() { if (imm()->size() == options_.max_write_buffer_number) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stopping writes because we have %d immutable memtables " "(waiting for flush)", name_.c_str(), imm()->size()); } else if (current_->NumLevelFiles(0) >= - options_.level0_stop_writes_trigger) { + mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), current_->NumLevelFiles(0)); - } else if (options_.level0_slowdown_writes_trigger >= 0 && + } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && current_->NumLevelFiles(0) >= - options_.level0_slowdown_writes_trigger) { + mutable_cf_options.level0_slowdown_writes_trigger) { uint64_t slowdown = SlowdownAmount( - current_->NumLevelFiles(0), options_.level0_slowdown_writes_trigger, - options_.level0_stop_writes_trigger); + current_->NumLevelFiles(0), + mutable_cf_options.level0_slowdown_writes_trigger, + mutable_cf_options.level0_stop_writes_trigger); write_controller_token_ = write_controller->GetDelayToken(slowdown); internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stalling writes because we have %d level-0 files (%" PRIu64 "us)", name_.c_str(), current_->NumLevelFiles(0), slowdown); @@ -358,7 +360,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() { write_controller->GetDelayToken(kHardLimitSlowdown); internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown, false); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stalling writes because we hit hard limit on level %d. " "(%" PRIu64 "us)", name_.c_str(), max_level, kHardLimitSlowdown); @@ -368,7 +370,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() { options_.hard_rate_limit); write_controller_token_ = write_controller->GetDelayToken(slowdown); internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64 "us)", name_.c_str(), max_level, slowdown); @@ -393,19 +395,21 @@ void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) { mem_->Ref(); } -Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) { - auto result = compaction_picker_->PickCompaction(current_, log_buffer); +Compaction* ColumnFamilyData::PickCompaction( + const MutableCFOptions& mutable_options, LogBuffer* log_buffer) { + auto result = compaction_picker_->PickCompaction( + mutable_options, current_, log_buffer); return result; } -Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, - uint32_t output_path_id, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end) { - return compaction_picker_->CompactRange(current_, input_level, output_level, - output_path_id, begin, end, - compaction_end); +Compaction* ColumnFamilyData::CompactRange( + const MutableCFOptions& mutable_cf_options, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end) { + return compaction_picker_->CompactRange( + mutable_cf_options, current_, input_level, output_level, + output_path_id, begin, end, compaction_end); } SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( @@ -443,11 +447,11 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( sv = static_cast(ptr); if (sv == SuperVersion::kSVObsolete || sv->version_number != super_version_number_.load()) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES); + RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES); SuperVersion* sv_to_delete = nullptr; if (sv && sv->Unref()) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS); + RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS); db_mutex->Lock(); // NOTE: underlying resources held by superversion (sst files) might // not be released until the next background job. @@ -502,7 +506,7 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion( // Reset SuperVersions cached in thread local storage ResetThreadLocalSuperVersions(); - RecalculateWriteStallConditions(); + RecalculateWriteStallConditions(mutable_cf_options); if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); @@ -533,6 +537,7 @@ bool ColumnFamilyData::SetOptions( if (GetMutableOptionsFromStrings(mutable_cf_options_, options_map, &new_mutable_cf_options)) { mutable_cf_options_ = new_mutable_cf_options; + mutable_cf_options_.RefreshDerivedOptions(ioptions_); return true; } return false; diff --git a/db/column_family.h b/db/column_family.h index 65b4b53ba..9c415c2a8 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -203,11 +203,14 @@ class ColumnFamilyData { TableCache* table_cache() const { return table_cache_.get(); } // See documentation in compaction_picker.h - Compaction* PickCompaction(LogBuffer* log_buffer); - Compaction* CompactRange(int input_level, int output_level, - uint32_t output_path_id, const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end); + // REQUIRES: DB mutex held + Compaction* PickCompaction(const MutableCFOptions& mutable_options, + LogBuffer* log_buffer); + Compaction* CompactRange( + const MutableCFOptions& mutable_cf_options, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end); CompactionPicker* compaction_picker() { return compaction_picker_.get(); } // thread-safe @@ -260,7 +263,8 @@ class ColumnFamilyData { // recalculation of compaction score. These values are used in // DBImpl::MakeRoomForWrite function to decide, if it need to make // a write stall - void RecalculateWriteStallConditions(); + void RecalculateWriteStallConditions( + const MutableCFOptions& mutable_cf_options); uint32_t id_; const std::string name_; diff --git a/db/compaction.cc b/db/compaction.cc index 28a3174b0..f02feeee7 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -56,7 +56,6 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level, is_full_compaction_(false), is_manual_compaction_(false), level_ptrs_(std::vector(number_levels_)) { - cfd_->Ref(); input_version_->Ref(); edit_ = new VersionEdit(); @@ -267,12 +266,12 @@ void Compaction::Summary(char* output, int len) { snprintf(output + write, len - write, "]"); } -uint64_t Compaction::OutputFilePreallocationSize() { +uint64_t Compaction::OutputFilePreallocationSize( + const MutableCFOptions& mutable_options) { uint64_t preallocation_size = 0; if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { - preallocation_size = - cfd_->compaction_picker()->MaxFileSizeForLevel(output_level()); + preallocation_size = mutable_options.MaxFileSizeForLevel(output_level()); } else { for (int level = 0; level < num_input_levels(); ++level) { for (const auto& f : inputs_[level].files) { diff --git a/db/compaction.h b/db/compaction.h index 6000f636b..7c490946a 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -10,6 +10,7 @@ #pragma once #include "util/arena.h" #include "util/autovector.h" +#include "util/mutable_cf_options.h" #include "db/version_set.h" namespace rocksdb { @@ -151,10 +152,14 @@ class Compaction { // Was this compaction triggered manually by the client? bool IsManualCompaction() { return is_manual_compaction_; } + // Return the MutableCFOptions that should be used throughout the compaction + // procedure + const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } + // Returns the size in bytes that the output file should be preallocated to. // In level compaction, that is max_file_size_. In universal compaction, that // is the sum of all input file sizes. - uint64_t OutputFilePreallocationSize(); + uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options); private: friend class CompactionPicker; @@ -171,6 +176,7 @@ class Compaction { const int output_level_; // levels to which output files are stored uint64_t max_output_file_size_; uint64_t max_grandparent_overlap_bytes_; + MutableCFOptions mutable_cf_options_; Version* input_version_; VersionEdit* edit_; int number_levels_; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index add3556d8..84bd95839 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -35,70 +35,36 @@ namespace { // If enable_compression is false, then compression is always disabled no // matter what the values of the other two parameters are. // Otherwise, the compression type is determined based on options and level. -CompressionType GetCompressionType(const Options& options, int level, - const bool enable_compression = true) { +CompressionType GetCompressionType( + const ImmutableCFOptions& ioptions, int level, + const bool enable_compression = true) { if (!enable_compression) { // disable compression return kNoCompression; } // If the use has specified a different compression level for each level, // then pick the compression for that level. - if (!options.compression_per_level.empty()) { - const int n = options.compression_per_level.size() - 1; + if (!ioptions.compression_per_level.empty()) { + const int n = ioptions.compression_per_level.size() - 1; // It is possible for level_ to be -1; in that case, we use level // 0's compression. This occurs mostly in backwards compatibility // situations when the builder doesn't know what level the file // belongs to. Likewise, if level is beyond the end of the // specified compression levels, use the last value. - return options.compression_per_level[std::max(0, std::min(level, n))]; + return ioptions.compression_per_level[std::max(0, std::min(level, n))]; } else { - return options.compression; + return ioptions.compression; } } -// Multiple two operands. If they overflow, return op1. -uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { - if (op1 == 0) { - return 0; - } - if (op2 <= 0) { - return op1; - } - uint64_t casted_op2 = (uint64_t) op2; - if (std::numeric_limits::max() / op1 < casted_op2) { - return op1; - } - return op1 * casted_op2; -} } // anonymous namespace -CompactionPicker::CompactionPicker(const Options* options, +CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) - : compactions_in_progress_(options->num_levels), - options_(options), - num_levels_(options->num_levels), + : ioptions_(ioptions), + compactions_in_progress_(ioptions_.num_levels), icmp_(icmp) { - - max_file_size_.reset(new uint64_t[NumberLevels()]); - level_max_bytes_.reset(new uint64_t[NumberLevels()]); - int target_file_size_multiplier = options_->target_file_size_multiplier; - int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; - for (int i = 0; i < NumberLevels(); i++) { - if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) { - max_file_size_[i] = ULLONG_MAX; - level_max_bytes_[i] = options_->max_bytes_for_level_base; - } else if (i > 1) { - max_file_size_[i] = MultiplyCheckOverflow(max_file_size_[i - 1], - target_file_size_multiplier); - level_max_bytes_[i] = MultiplyCheckOverflow( - MultiplyCheckOverflow(level_max_bytes_[i - 1], max_bytes_multiplier), - options_->max_bytes_for_level_multiplier_additional[i - 1]); - } else { - max_file_size_[i] = options_->target_file_size_base; - level_max_bytes_[i] = options_->max_bytes_for_level_base; - } - } } CompactionPicker::~CompactionPicker() {} @@ -126,26 +92,6 @@ void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { } } -uint64_t CompactionPicker::MaxFileSizeForLevel(int level) const { - assert(level >= 0); - assert(level < NumberLevels()); - return max_file_size_[level]; -} - -uint64_t CompactionPicker::MaxGrandParentOverlapBytes(int level) { - uint64_t result = MaxFileSizeForLevel(level); - result *= options_->max_grandparent_overlap_factor; - return result; -} - -double CompactionPicker::MaxBytesForLevel(int level) { - // Note: the result for level zero is not really used since we set - // the level-0 compaction threshold based on number of files. - assert(level >= 0); - assert(level < NumberLevels()); - return level_max_bytes_[level]; -} - void CompactionPicker::GetRange(const std::vector& inputs, InternalKey* smallest, InternalKey* largest) { assert(!inputs.empty()); @@ -214,7 +160,7 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) { // compaction, then we must drop/cancel this compaction. int parent_index = -1; if (c->inputs_[0].empty()) { - Log(options_->info_log, + Log(ioptions_.info_log, "[%s] ExpandWhileOverlapping() failure because zero input files", c->column_family_data()->GetName().c_str()); } @@ -229,12 +175,6 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) { return true; } -uint64_t CompactionPicker::ExpandedCompactionByteSizeLimit(int level) { - uint64_t result = MaxFileSizeForLevel(level); - result *= options_->expanded_compaction_factor; - return result; -} - // Returns true if any one of specified files are being compacted bool CompactionPicker::FilesInCompaction(std::vector& files) { for (unsigned int i = 0; i < files.size(); i++) { @@ -262,7 +202,8 @@ bool CompactionPicker::ParentRangeInCompaction(Version* version, // Will also attempt to expand "level" if that doesn't expand "level+1" // or cause "level" to include a file for compaction that has an overlapping // user-key with another file. -void CompactionPicker::SetupOtherInputs(Compaction* c) { +void CompactionPicker::SetupOtherInputs( + const MutableCFOptions& mutable_cf_options, Compaction* c) { // If inputs are empty, then there is nothing to expand. // If both input and output levels are the same, no need to consider // files at level "level+1" @@ -298,7 +239,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files); const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1].files); const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); - uint64_t limit = ExpandedCompactionByteSizeLimit(level); + uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && !FilesInCompaction(expanded0) && @@ -311,7 +252,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { &c->parent_index_); if (expanded1.size() == c->inputs_[1].size() && !FilesInCompaction(expanded1)) { - Log(options_->info_log, + Log(ioptions_.info_log, "[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64 " bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n", c->column_family_data()->GetName().c_str(), level, @@ -336,21 +277,20 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { } } -Compaction* CompactionPicker::CompactRange(Version* version, int input_level, - int output_level, - uint32_t output_path_id, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end) { +Compaction* CompactionPicker::CompactRange( + const MutableCFOptions& mutable_cf_options, Version* version, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end) { // CompactionPickerFIFO has its own implementation of compact range - assert(options_->compaction_style != kCompactionStyleFIFO); + assert(ioptions_.compaction_style != kCompactionStyleFIFO); std::vector inputs; bool covering_the_whole_range = true; // All files are 'overlapping' in universal style compaction. // We have to compact the entire range in one shot. - if (options_->compaction_style == kCompactionStyleUniversal) { + if (ioptions_.compaction_style == kCompactionStyleUniversal) { begin = nullptr; end = nullptr; } @@ -364,8 +304,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, // and we must not pick one file and drop another older file if the // two files overlap. if (input_level > 0) { - const uint64_t limit = - MaxFileSizeForLevel(input_level) * options_->source_compaction_factor; + const uint64_t limit = mutable_cf_options.MaxFileSizeForLevel(input_level) * + mutable_cf_options.source_compaction_factor; uint64_t total = 0; for (size_t i = 0; i + 1 < inputs.size(); ++i) { uint64_t s = inputs[i]->compensated_file_size; @@ -378,22 +318,24 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, } } } - assert(output_path_id < static_cast(options_->db_paths.size())); + assert(output_path_id < static_cast(ioptions_.db_paths.size())); Compaction* c = new Compaction( - version, input_level, output_level, MaxFileSizeForLevel(output_level), - MaxGrandParentOverlapBytes(input_level), output_path_id, - GetCompressionType(*options_, output_level)); + version, input_level, output_level, + mutable_cf_options.MaxFileSizeForLevel(output_level), + mutable_cf_options.MaxGrandParentOverlapBytes(input_level), + output_path_id, + GetCompressionType(ioptions_, output_level)); c->inputs_[0].files = inputs; if (ExpandWhileOverlapping(c) == false) { delete c; - Log(options_->info_log, + Log(ioptions_.info_log, "[%s] Could not compact due to expansion failure.\n", version->cfd_->GetName().c_str()); return nullptr; } - SetupOtherInputs(c); + SetupOtherInputs(mutable_cf_options, c); if (covering_the_whole_range) { *compaction_end = nullptr; @@ -408,12 +350,14 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, c->SetupBottomMostLevel(true); c->is_manual_compaction_ = true; + c->mutable_cf_options_ = mutable_cf_options; return c; } -Compaction* LevelCompactionPicker::PickCompaction(Version* version, - LogBuffer* log_buffer) { +Compaction* LevelCompactionPicker::PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) { Compaction* c = nullptr; int level = -1; @@ -421,7 +365,7 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, // and also in LogAndApply(), otherwise the values could be stale. std::vector size_being_compacted(NumberLevels() - 1); SizeBeingCompacted(size_being_compacted); - version->ComputeCompactionScore(size_being_compacted); + version->ComputeCompactionScore(mutable_cf_options, size_being_compacted); // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. @@ -432,7 +376,8 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, version->compaction_score_[i] <= version->compaction_score_[i - 1]); level = version->compaction_level_[i]; if ((version->compaction_score_[i] >= 1)) { - c = PickCompactionBySize(version, level, version->compaction_score_[i]); + c = PickCompactionBySize(mutable_cf_options, version, level, + version->compaction_score_[i]); if (c == nullptr || ExpandWhileOverlapping(c) == false) { delete c; c = nullptr; @@ -472,7 +417,7 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, } // Setup "level+1" files (inputs_[1]) - SetupOtherInputs(c); + SetupOtherInputs(mutable_cf_options, c); // mark all the files that are being compacted c->MarkFilesBeingCompacted(true); @@ -483,12 +428,13 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, // remember this currently undergoing compaction compactions_in_progress_[level].insert(c); + c->mutable_cf_options_ = mutable_cf_options; return c; } -Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, - int level, - double score) { +Compaction* LevelCompactionPicker::PickCompactionBySize( + const MutableCFOptions& mutable_cf_options, + Version* version, int level, double score) { Compaction* c = nullptr; // level 0 files are overlapping. So we cannot pick more @@ -501,9 +447,10 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, assert(level >= 0); assert(level + 1 < NumberLevels()); - c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1), - MaxGrandParentOverlapBytes(level), 0, - GetCompressionType(*options_, level + 1)); + c = new Compaction(version, level, level + 1, + mutable_cf_options.MaxFileSizeForLevel(level + 1), + mutable_cf_options.MaxGrandParentOverlapBytes(level), 0, + GetCompressionType(ioptions_, level + 1)); c->score_ = score; // Pick the largest file in this level that is not already @@ -563,13 +510,14 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, // Universal style of compaction. Pick files that are contiguous in // time-range to compact. // -Compaction* UniversalCompactionPicker::PickCompaction(Version* version, - LogBuffer* log_buffer) { +Compaction* UniversalCompactionPicker::PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) { int level = 0; double score = version->compaction_score_[0]; if ((version->files_[level].size() < - (unsigned int)options_->level0_file_num_compaction_trigger)) { + (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) { LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", version->cfd_->GetName().c_str()); return nullptr; @@ -581,17 +529,18 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, // Check for size amplification first. Compaction* c; - if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) != - nullptr) { + if ((c = PickCompactionUniversalSizeAmp( + mutable_cf_options, version, score, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n", version->cfd_->GetName().c_str()); } else { // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. - unsigned int ratio = options_->compaction_options_universal.size_ratio; + unsigned int ratio = ioptions_.compaction_options_universal.size_ratio; - if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX, - log_buffer)) != nullptr) { + if ((c = PickCompactionUniversalReadAmp( + mutable_cf_options, version, score, ratio, + UINT_MAX, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n", version->cfd_->GetName().c_str()); } else { @@ -600,9 +549,10 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, // compaction without looking at filesize ratios and try to reduce // the number of files to fewer than level0_file_num_compaction_trigger. unsigned int num_files = version->files_[level].size() - - options_->level0_file_num_compaction_trigger; + mutable_cf_options.level0_file_num_compaction_trigger; if ((c = PickCompactionUniversalReadAmp( - version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { + mutable_cf_options, version, score, UINT_MAX, + num_files, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for file num\n", version->cfd_->GetName().c_str()); } @@ -628,7 +578,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, c->bottommost_level_ = c->inputs_[0].files.back() == last_file; // update statistics - MeasureTime(options_->statistics.get(), + MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[0].size()); // mark all the files that are being compacted @@ -642,11 +592,12 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, c->is_full_compaction_ = (c->inputs_[0].size() == c->input_version_->files_[0].size()); + c->mutable_cf_options_ = mutable_cf_options; return c; } -uint32_t UniversalCompactionPicker::GetPathId(const Options& options, - uint64_t file_size) { +uint32_t UniversalCompactionPicker::GetPathId( + const ImmutableCFOptions& ioptions, uint64_t file_size) { // Two conditions need to be satisfied: // (1) the target path needs to be able to hold the file's size // (2) Total size left in this and previous paths need to be not @@ -662,11 +613,11 @@ uint32_t UniversalCompactionPicker::GetPathId(const Options& options, // considered in this algorithm. So the target size can be violated in // that case. We need to improve it. uint64_t accumulated_size = 0; - uint64_t future_size = - file_size * (100 - options.compaction_options_universal.size_ratio) / 100; + uint64_t future_size = file_size * + (100 - ioptions.compaction_options_universal.size_ratio) / 100; uint32_t p = 0; - for (; p < options.db_paths.size() - 1; p++) { - uint64_t target_size = options.db_paths[p].target_size; + for (; p < ioptions.db_paths.size() - 1; p++) { + uint64_t target_size = ioptions.db_paths[p].target_size; if (target_size > file_size && accumulated_size + (target_size - file_size) > future_size) { return p; @@ -681,14 +632,15 @@ uint32_t UniversalCompactionPicker::GetPathId(const Options& options, // the next file in time order. // Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( - Version* version, double score, unsigned int ratio, + const MutableCFOptions& mutable_cf_options, Version* version, + double score, unsigned int ratio, unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) { int level = 0; unsigned int min_merge_width = - options_->compaction_options_universal.min_merge_width; + ioptions_.compaction_options_universal.min_merge_width; unsigned int max_merge_width = - options_->compaction_options_universal.max_merge_width; + ioptions_.compaction_options_universal.max_merge_width; // The files are sorted from newest first to oldest last. const auto& files = version->files_[level]; @@ -750,7 +702,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( if (sz < static_cast(f->fd.GetFileSize())) { break; } - if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) { + if (ioptions_.compaction_options_universal.stop_style == + kCompactionStopStyleSimilarSize) { // Similar-size stopping rule: also check the last picked file isn't // far larger than the next candidate file. sz = (f->fd.GetFileSize() * (100.0 + ratio)) / 100.0; @@ -794,7 +747,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // size ratio of compression. bool enable_compression = true; int ratio_to_compress = - options_->compaction_options_universal.compression_size_percent; + ioptions_.compaction_options_universal.compression_size_percent; if (ratio_to_compress >= 0) { uint64_t total_size = version->NumLevelBytes(level); uint64_t older_file_size = 0; @@ -812,11 +765,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( for (unsigned int i = 0; i < first_index_after; i++) { estimated_total_size += files[i]->fd.GetFileSize(); } - uint32_t path_id = GetPathId(*options_, estimated_total_size); + uint32_t path_id = GetPathId(ioptions_, estimated_total_size); Compaction* c = new Compaction( - version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, path_id, - GetCompressionType(*options_, level, enable_compression)); + version, level, level, mutable_cf_options.MaxFileSizeForLevel(level), + LLONG_MAX, path_id, GetCompressionType(ioptions_, level, + enable_compression)); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { @@ -841,11 +795,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // min_merge_width and max_merge_width). // Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( - Version* version, double score, LogBuffer* log_buffer) { + const MutableCFOptions& mutable_cf_options, Version* version, + double score, LogBuffer* log_buffer) { int level = 0; // percentage flexibilty while reducing size amplification - uint64_t ratio = options_->compaction_options_universal. + uint64_t ratio = ioptions_.compaction_options_universal. max_size_amplification_percent; // The files are sorted from newest first to oldest last. @@ -927,13 +882,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( for (unsigned int loop = start_index; loop < files.size(); loop++) { estimated_total_size += files[loop]->fd.GetFileSize(); } - uint32_t path_id = GetPathId(*options_, estimated_total_size); + uint32_t path_id = GetPathId(ioptions_, estimated_total_size); // create a compaction request // We always compact all the files, so always compress. Compaction* c = - new Compaction(version, level, level, MaxFileSizeForLevel(level), - LLONG_MAX, path_id, GetCompressionType(*options_, level)); + new Compaction(version, level, level, + mutable_cf_options.MaxFileSizeForLevel(level), + LLONG_MAX, path_id, GetCompressionType(ioptions_, level)); c->score_ = score; for (unsigned int loop = start_index; loop < files.size(); loop++) { f = c->input_version_->files_[level][loop]; @@ -948,22 +904,23 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( return c; } -Compaction* FIFOCompactionPicker::PickCompaction(Version* version, - LogBuffer* log_buffer) { +Compaction* FIFOCompactionPicker::PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) { assert(version->NumberLevels() == 1); uint64_t total_size = 0; for (const auto& file : version->files_[0]) { total_size += file->compensated_file_size; } - if (total_size <= options_->compaction_options_fifo.max_table_files_size || + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size || version->files_[0].size() == 0) { // total size not exceeded LogToBuffer(log_buffer, "[%s] FIFO compaction: nothing to do. Total size %" PRIu64 ", max size %" PRIu64 "\n", version->cfd_->GetName().c_str(), total_size, - options_->compaction_options_fifo.max_table_files_size); + ioptions_.compaction_options_fifo.max_table_files_size); return nullptr; } @@ -988,28 +945,29 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 " with size %s for deletion", version->cfd_->GetName().c_str(), f->fd.GetNumber(), tmp_fsize); - if (total_size <= options_->compaction_options_fifo.max_table_files_size) { + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { break; } } c->MarkFilesBeingCompacted(true); compactions_in_progress_[0].insert(c); - + c->mutable_cf_options_ = mutable_cf_options; return c; } Compaction* FIFOCompactionPicker::CompactRange( + const MutableCFOptions& mutable_cf_options, Version* version, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { assert(input_level == 0); assert(output_level == 0); *compaction_end = nullptr; - LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get()); - Compaction* c = PickCompaction(version, &log_buffer); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log); + Compaction* c = PickCompaction(mutable_cf_options, version, &log_buffer); if (c != nullptr) { - assert(output_path_id < static_cast(options_->db_paths.size())); + assert(output_path_id < static_cast(ioptions_.db_paths.size())); c->output_path_id_ = output_path_id; } log_buffer.FlushBufferToLog(); diff --git a/db/compaction_picker.h b/db/compaction_picker.h index c1e27c471..9862bdfea 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -13,6 +13,7 @@ #include "rocksdb/status.h" #include "rocksdb/options.h" #include "rocksdb/env.h" +#include "util/mutable_cf_options.h" #include #include @@ -26,15 +27,17 @@ class Version; class CompactionPicker { public: - CompactionPicker(const Options* options, const InternalKeyComparator* icmp); + CompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp); virtual ~CompactionPicker(); // Pick level and inputs for a new compaction. // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - virtual Compaction* PickCompaction(Version* version, - LogBuffer* log_buffer) = 0; + virtual Compaction* PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) = 0; // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that @@ -47,11 +50,11 @@ class CompactionPicker { // compaction_end will be set to nullptr. // Client is responsible for compaction_end storage -- when called, // *compaction_end should point to valid InternalKey! - virtual Compaction* CompactRange(Version* version, int input_level, - int output_level, uint32_t output_path_id, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end); + virtual Compaction* CompactRange( + const MutableCFOptions& mutable_cf_options, Version* version, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end); // Given the current number of levels, returns the lowest allowed level // for compaction input. @@ -64,19 +67,8 @@ class CompactionPicker { // compactions per level void SizeBeingCompacted(std::vector& sizes); - // Returns maximum total overlap bytes with grandparent - // level (i.e., level+2) before we stop building a single - // file in level->level+1 compaction. - uint64_t MaxGrandParentOverlapBytes(int level); - - // Returns maximum total bytes of data on a given level. - double MaxBytesForLevel(int level); - - // Get the max file size in a given level. - uint64_t MaxFileSizeForLevel(int level) const; - protected: - int NumberLevels() const { return num_levels_; } + int NumberLevels() const { return ioptions_.num_levels; } // Stores the minimal range that covers all entries in inputs in // *smallest, *largest. @@ -103,8 +95,6 @@ class CompactionPicker { // Will return false if it is impossible to apply this compaction. bool ExpandWhileOverlapping(Compaction* c); - uint64_t ExpandedCompactionByteSizeLimit(int level); - // Returns true if any one of the specified files are being compacted bool FilesInCompaction(std::vector& files); @@ -113,32 +103,30 @@ class CompactionPicker { const InternalKey* largest, int level, int* index); - void SetupOtherInputs(Compaction* c); + void SetupOtherInputs(const MutableCFOptions& mutable_cf_options, + Compaction* c); + + const ImmutableCFOptions& ioptions_; // record all the ongoing compactions for all levels std::vector> compactions_in_progress_; - // Per-level target file size. - std::unique_ptr max_file_size_; - - // Per-level max bytes - std::unique_ptr level_max_bytes_; - - const Options* const options_; private: - int num_levels_; - const InternalKeyComparator* const icmp_; + + int max_grandparent_overlap_factor_; + int expanded_compaction_factor_; }; class UniversalCompactionPicker : public CompactionPicker { public: - UniversalCompactionPicker(const Options* options, + UniversalCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) - : CompactionPicker(options, icmp) {} - virtual Compaction* PickCompaction(Version* version, - LogBuffer* log_buffer) override; + : CompactionPicker(ioptions, icmp) {} + virtual Compaction* PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) override; // The maxinum allowed input level. Always return 0. virtual int MaxInputLevel(int current_num_levels) const override { @@ -147,27 +135,30 @@ class UniversalCompactionPicker : public CompactionPicker { private: // Pick Universal compaction to limit read amplification - Compaction* PickCompactionUniversalReadAmp(Version* version, double score, - unsigned int ratio, - unsigned int num_files, - LogBuffer* log_buffer); + Compaction* PickCompactionUniversalReadAmp( + const MutableCFOptions& mutable_cf_options, + Version* version, double score, unsigned int ratio, + unsigned int num_files, LogBuffer* log_buffer); // Pick Universal compaction to limit space amplification. - Compaction* PickCompactionUniversalSizeAmp(Version* version, double score, - LogBuffer* log_buffer); + Compaction* PickCompactionUniversalSizeAmp( + const MutableCFOptions& mutable_cf_options, + Version* version, double score, LogBuffer* log_buffer); // Pick a path ID to place a newly generated file, with its estimated file // size. - static uint32_t GetPathId(const Options& options, uint64_t file_size); + static uint32_t GetPathId(const ImmutableCFOptions& ioptions, + uint64_t file_size); }; class LevelCompactionPicker : public CompactionPicker { public: - LevelCompactionPicker(const Options* options, + LevelCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) - : CompactionPicker(options, icmp) {} - virtual Compaction* PickCompaction(Version* version, - LogBuffer* log_buffer) override; + : CompactionPicker(ioptions, icmp) {} + virtual Compaction* PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) override; // Returns current_num_levels - 2, meaning the last level cannot be // compaction input level. @@ -180,23 +171,25 @@ class LevelCompactionPicker : public CompactionPicker { // Returns nullptr if there is no compaction to be done. // If level is 0 and there is already a compaction on that level, this // function will return nullptr. - Compaction* PickCompactionBySize(Version* version, int level, double score); + Compaction* PickCompactionBySize(const MutableCFOptions& mutable_cf_options, + Version* version, int level, double score); }; class FIFOCompactionPicker : public CompactionPicker { public: - FIFOCompactionPicker(const Options* options, + FIFOCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) - : CompactionPicker(options, icmp) {} + : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction(Version* version, - LogBuffer* log_buffer) override; + virtual Compaction* PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) override; - virtual Compaction* CompactRange(Version* version, int input_level, - int output_level, uint32_t output_path_id, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end) override; + virtual Compaction* CompactRange( + const MutableCFOptions& mutable_cf_options, Version* version, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end) override; // The maxinum allowed input level. Always return 0. virtual int MaxInputLevel(int current_num_levels) const override { diff --git a/db/db_impl.cc b/db/db_impl.cc index 1a2b7f7b2..680a22cb3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1410,7 +1410,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // VersionSet::next_file_number_ always to be strictly greater than any // log number versions_->MarkFileNumberUsed(max_log_number + 1); - status = versions_->LogAndApply(cfd, edit, &mutex_); + status = versions_->LogAndApply( + cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_); if (!status.ok()) { // Recovery failed break; @@ -1479,8 +1480,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, } Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, - autovector& mems, VersionEdit* edit, - uint64_t* filenumber, LogBuffer* log_buffer) { + const MutableCFOptions& mutable_cf_options, + const autovector& mems, + VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -1560,7 +1562,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, if (base != nullptr && db_options_.max_background_compactions <= 1 && db_options_.max_background_flushes == 0 && cfd->ioptions()->compaction_style == kCompactionStyleLevel) { - level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); + level = base->PickLevelForMemTableOutput( + mutable_cf_options, min_user_key, max_user_key); } edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, @@ -1577,10 +1580,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, return s; } -Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, - bool* madeProgress, - DeletionState& deletion_state, - LogBuffer* log_buffer) { +Status DBImpl::FlushMemTableToOutputFile( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(cfd->imm()->size() != 0); assert(cfd->imm()->IsFlushPending()); @@ -1607,8 +1609,10 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, edit->SetLogNumber(mems.back()->GetNextLogNumber()); edit->SetColumnFamily(cfd->GetID()); + // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(cfd, mems, edit, &file_number, log_buffer); + Status s = WriteLevel0Table(cfd, mutable_cf_options, mems, edit, + &file_number, log_buffer); if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) { s = Status::ShutdownInProgress( @@ -1620,14 +1624,13 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, } else { // Replace immutable memtable with the generated Table s = cfd->imm()->InstallMemtableFlushResults( - cfd, mems, versions_.get(), &mutex_, db_options_.info_log.get(), - file_number, &pending_outputs_, &deletion_state.memtables_to_free, - db_directory_.get(), log_buffer); + cfd, mutable_cf_options, mems, versions_.get(), &mutex_, + db_options_.info_log.get(), file_number, &pending_outputs_, + &deletion_state.memtables_to_free, db_directory_.get(), log_buffer); } if (s.ok()) { - // Use latest MutableCFOptions - InstallSuperVersion(cfd, deletion_state); + InstallSuperVersion(cfd, deletion_state, mutable_cf_options); if (madeProgress) { *madeProgress = 1; } @@ -1726,7 +1729,8 @@ bool DBImpl::SetOptions(ColumnFamilyHandle* column_family, } // return the same level if it cannot be moved -int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) { +int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, + const MutableCFOptions& mutable_cf_options, int level) { mutex_.AssertHeld(); Version* current = cfd->current(); int minimum_level = level; @@ -1734,7 +1738,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) { // stop if level i is not empty if (current->NumLevelFiles(i) > 0) break; // stop if level i is too small (cannot fit the level files) - if (cfd->compaction_picker()->MaxBytesForLevel(i) < + if (mutable_cf_options.MaxBytesForLevel(i) < current->NumLevelBytes(level)) { break; } @@ -1770,10 +1774,12 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { bg_cv_.Wait(); } + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); // move to a smaller level int to_level = target_level; if (target_level < 0) { - to_level = FindMinimumEmptyLevelFitting(cfd, level); + to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level); } assert(to_level <= level); @@ -1794,9 +1800,10 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { Log(db_options_.info_log, "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), edit.DebugString().data()); - status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get()); - // Use latest MutableCFOptions - superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_); + status = versions_->LogAndApply(cfd, + mutable_cf_options, &edit, &mutex_, db_directory_.get()); + superversion_to_free = cfd->InstallSuperVersion( + new_superversion, &mutex_, mutable_cf_options); new_superversion = nullptr; Log(db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(), @@ -2058,6 +2065,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->Ref(); Status flush_status; + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); while (flush_status.ok() && cfd->imm()->IsFlushPending()) { LogToBuffer( log_buffer, @@ -2065,8 +2074,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, "family [%s], flush slots available %d", cfd->GetName().c_str(), db_options_.max_background_flushes - bg_flush_scheduled_); - flush_status = FlushMemTableToOutputFile(cfd, madeProgress, - deletion_state, log_buffer); + flush_status = FlushMemTableToOutputFile( + cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer); } if (call_status.ok() && !flush_status.ok()) { call_status = flush_status; @@ -2259,6 +2268,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, // FLUSH preempts compaction Status flush_stat; for (auto cfd : *versions_->GetColumnFamilySet()) { + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); while (cfd->imm()->IsFlushPending()) { LogToBuffer( log_buffer, @@ -2266,8 +2277,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, "compaction slots available %d", db_options_.max_background_compactions - bg_compaction_scheduled_); cfd->Ref(); - flush_stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state, - log_buffer); + flush_stat = FlushMemTableToOutputFile( + cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer); cfd->Unref(); if (!flush_stat.ok()) { if (is_manual) { @@ -2281,15 +2292,18 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } } + // Compaction makes a copy of the latest MutableCFOptions. It should be used + // throughout the compaction procedure to make sure consistency. It will + // eventually be installed into SuperVersion unique_ptr c; InternalKey manual_end_storage; InternalKey* manual_end = &manual_end_storage; if (is_manual) { ManualCompaction* m = manual_compaction_; assert(m->in_progress); - c.reset(m->cfd->CompactRange(m->input_level, m->output_level, - m->output_path_id, m->begin, m->end, - &manual_end)); + c.reset(m->cfd->CompactRange( + *m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level, + m->output_path_id, m->begin, m->end, &manual_end)); if (!c) { m->done = true; } @@ -2306,7 +2320,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, // no need to refcount in iteration since it's always under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { if (!cfd->options()->disable_auto_compactions) { - c.reset(cfd->PickCompaction(log_buffer)); + // NOTE: try to avoid unnecessary copy of MutableCFOptions if + // compaction is not necessary. Need to make sure mutex is held + // until we make a copy in the following code + c.reset(cfd->PickCompaction( + *cfd->GetLatestMutableCFOptions(), log_buffer)); if (c != nullptr) { // update statistics MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, @@ -2331,10 +2349,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, for (const auto& f : *c->inputs(0)) { c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); } - status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, - db_directory_.get()); - // Use latest MutableCFOptions - InstallSuperVersion(c->column_family_data(), deletion_state); + status = versions_->LogAndApply( + c->column_family_data(), *c->mutable_cf_options(), c->edit(), + &mutex_, db_directory_.get()); + InstallSuperVersion(c->column_family_data(), deletion_state, + *c->mutable_cf_options()); LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); @@ -2348,10 +2367,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); - status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, - db_directory_.get()); + status = versions_->LogAndApply(c->column_family_data(), + *c->mutable_cf_options(), + c->edit(), &mutex_, db_directory_.get()); // Use latest MutableCFOptions - InstallSuperVersion(c->column_family_data(), deletion_state); + InstallSuperVersion(c->column_family_data(), deletion_state, + *c->mutable_cf_options()); Version::LevelSummaryStorage tmp; LogToBuffer( @@ -2366,7 +2387,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); - status = DoCompactionWork(compact, deletion_state, log_buffer); + status = DoCompactionWork(compact, *c->mutable_cf_options(), + deletion_state, log_buffer); CleanupCompaction(compact, status); c->ReleaseCompactionFiles(status); c->ReleaseInputs(); @@ -2468,7 +2490,8 @@ void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) { } } -Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { +Status DBImpl::OpenCompactionOutputFile( + CompactionState* compact, const MutableCFOptions& mutable_cf_options) { assert(compact != nullptr); assert(compact->builder == nullptr); uint64_t file_number; @@ -2500,7 +2523,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { if (s.ok()) { compact->outfile->SetIOPriority(Env::IO_LOW); compact->outfile->SetPreallocationBlockSize( - compact->compaction->OutputFilePreallocationSize()); + compact->compaction->OutputFilePreallocationSize(mutable_cf_options)); ColumnFamilyData* cfd = compact->compaction->column_family_data(); compact->builder.reset(NewTableBuilder( @@ -2570,7 +2593,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, Status DBImpl::InstallCompactionResults(CompactionState* compact, - LogBuffer* log_buffer) { + const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer) { mutex_.AssertHeld(); // paranoia: verify that the files that we started with @@ -2604,6 +2627,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact, out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->column_family_data(), + mutable_cf_options, compact->compaction->edit(), &mutex_, db_directory_.get()); } @@ -2635,8 +2659,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( } uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, - DeletionState& deletion_state, - LogBuffer* log_buffer) { + const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, + LogBuffer* log_buffer) { if (db_options_.max_background_flushes > 0) { // flush thread will take care of this return 0; @@ -2646,7 +2670,8 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, mutex_.Lock(); if (cfd->imm()->IsFlushPending()) { cfd->Ref(); - FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer); + FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr, + deletion_state, log_buffer); cfd->Unref(); bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary } @@ -2658,6 +2683,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, } Status DBImpl::ProcessKeyValueCompaction( + const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported, SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot, @@ -2721,7 +2747,8 @@ Status DBImpl::ProcessKeyValueCompaction( // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on // other column families, too - imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer); + imm_micros += CallFlushDuringCompaction( + cfd, mutable_cf_options, deletion_state, log_buffer); Slice key; Slice value; @@ -2922,7 +2949,7 @@ Status DBImpl::ProcessKeyValueCompaction( // Open output file if necessary if (compact->builder == nullptr) { - status = OpenCompactionOutputFile(compact); + status = OpenCompactionOutputFile(compact, mutable_cf_options); if (!status.ok()) { break; } @@ -3059,6 +3086,7 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, } Status DBImpl::DoCompactionWork(CompactionState* compact, + const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, LogBuffer* log_buffer) { assert(compact); @@ -3129,6 +3157,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (!compaction_filter_v2) { status = ProcessKeyValueCompaction( + mutable_cf_options, is_snapshot_supported, visible_at_tip, earliest_snapshot, @@ -3158,7 +3187,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on // other column families, too - imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer); + imm_micros += CallFlushDuringCompaction(cfd, mutable_cf_options, + deletion_state, log_buffer); Slice key = backup_input->key(); Slice value = backup_input->value(); @@ -3208,6 +3238,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // Done buffering for the current prefix. Spit it out to disk // Now just iterate through all the kv-pairs status = ProcessKeyValueCompaction( + mutable_cf_options, is_snapshot_supported, visible_at_tip, earliest_snapshot, @@ -3244,6 +3275,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); status = ProcessKeyValueCompaction( + mutable_cf_options, is_snapshot_supported, visible_at_tip, earliest_snapshot, @@ -3266,6 +3298,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); status = ProcessKeyValueCompaction( + mutable_cf_options, is_snapshot_supported, visible_at_tip, earliest_snapshot, @@ -3333,9 +3366,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, ReleaseCompactionUnusedFileNumbers(compact); if (status.ok()) { - status = InstallCompactionResults(compact, log_buffer); - // Use latest MutableCFOptions - InstallSuperVersion(cfd, deletion_state); + status = InstallCompactionResults(compact, mutable_cf_options, log_buffer); + InstallSuperVersion(cfd, deletion_state, mutable_cf_options); } Version::LevelSummaryStorage tmp; LogToBuffer( @@ -3434,16 +3466,16 @@ Status DBImpl::Get(const ReadOptions& options, // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, - DeletionState& deletion_state) { +void DBImpl::InstallSuperVersion( + ColumnFamilyData* cfd, DeletionState& deletion_state, + const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); // if new_superversion == nullptr, it means somebody already used it SuperVersion* new_superversion = (deletion_state.new_superversion != nullptr) ? deletion_state.new_superversion : new SuperVersion(); - // Use latest MutableCFOptions SuperVersion* old_superversion = - cfd->InstallSuperVersion(new_superversion, &mutex_); + cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options); deletion_state.new_superversion = nullptr; deletion_state.superversions_to_free.push_back(old_superversion); } @@ -3627,15 +3659,17 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object - Status s = versions_->LogAndApply(nullptr, &edit, &mutex_, - db_directory_.get(), false, &options); + Options opt(db_options_, options); + Status s = versions_->LogAndApply(nullptr, + MutableCFOptions(opt, ImmutableCFOptions(opt)), + &edit, &mutex_, db_directory_.get(), false, &options); if (s.ok()) { single_column_family_mode_ = false; auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - // Use latest MutableCFOptions - delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_); + delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_, + *cfd->GetLatestMutableCFOptions()); *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); Log(db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); @@ -3671,7 +3705,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { WriteThread::Writer w(&mutex_); s = write_thread_.EnterWriteThread(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job - s = versions_->LogAndApply(cfd, &edit, &mutex_); + s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_); write_thread_.ExitWriteThread(&w, &w, s); } } @@ -4450,9 +4485,11 @@ Status DBImpl::DeleteFile(std::string name) { } } edit.DeleteFile(level, number); - status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get()); + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_, db_directory_.get()); if (status.ok()) { - InstallSuperVersion(cfd, deletion_state); + InstallSuperVersion(cfd, deletion_state, + *cfd->GetLatestMutableCFOptions()); } FindObsoleteFiles(deletion_state, false); } // lock released here @@ -4681,8 +4718,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - // Use latest MutableCFOptions - delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_); + delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_, + *cfd->GetLatestMutableCFOptions()); } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); diff --git a/db/db_impl.h b/db/db_impl.h index c6baf9c95..f1a81e00c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -347,9 +347,9 @@ class DBImpl : public DB { // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress, - DeletionState& deletion_state, - LogBuffer* log_buffer); + Status FlushMemTableToOutputFile( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer); // REQUIRES: log_numbers are sorted in ascending order Status RecoverLogFiles(const std::vector& log_numbers, @@ -362,9 +362,10 @@ class DBImpl : public DB { // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit); - Status WriteLevel0Table(ColumnFamilyData* cfd, autovector& mems, - VersionEdit* edit, uint64_t* filenumber, - LogBuffer* log_buffer); + Status WriteLevel0Table(ColumnFamilyData* cfd, + const MutableCFOptions& mutable_cf_options, + const autovector& mems, + VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer); void DelayWrite(uint64_t expiration_time); @@ -393,6 +394,7 @@ class DBImpl : public DB { LogBuffer* log_buffer); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, + const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, LogBuffer* log_buffer); @@ -400,12 +402,13 @@ class DBImpl : public DB { // preempt compaction, since it's higher prioirty // Returns: micros spent executing uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd, - DeletionState& deletion_state, - LogBuffer* log_buffer); + const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, + LogBuffer* log_buffer); // Call compaction filter if is_compaction_v2 is not true. Then iterate // through input and compact the kv-pairs Status ProcessKeyValueCompaction( + const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported, SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot, @@ -422,10 +425,11 @@ class DBImpl : public DB { void CallCompactionFilterV2(CompactionState* compact, CompactionFilterV2* compaction_filter_v2); - Status OpenCompactionOutputFile(CompactionState* compact); + Status OpenCompactionOutputFile(CompactionState* compact, + const MutableCFOptions& mutable_cf_options); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status InstallCompactionResults(CompactionState* compact, - LogBuffer* log_buffer); + const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer); void AllocateCompactionOutputFileNumbers(CompactionState* compact); void ReleaseCompactionUnusedFileNumbers(CompactionState* compact); @@ -467,7 +471,8 @@ class DBImpl : public DB { // Return the minimum empty level that could hold the total data in the // input level. Return the input level, if such level could not be found. - int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level); + int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, + const MutableCFOptions& mutable_cf_options, int level); // Move the files in the input level to the target level. // If target_level < 0, automatically calculate the minimum level that could @@ -621,7 +626,8 @@ class DBImpl : public DB { // the cfd->InstallSuperVersion() function. Background threads carry // deletion_state which can have new_superversion already allocated. void InstallSuperVersion(ColumnFamilyData* cfd, - DeletionState& deletion_state); + DeletionState& deletion_state, + const MutableCFOptions& mutable_cf_options); // Find Super version and reference it. Based on options, it might return // the thread local cached one. diff --git a/db/db_test.cc b/db/db_test.cc index 986d5810e..a2479d58e 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -874,6 +874,18 @@ class DBTest { return atoi(property.c_str()); } + uint64_t SizeAtLevel(int level) { + std::vector metadata; + db_->GetLiveFilesMetaData(&metadata); + uint64_t sum = 0; + for (const auto& m : metadata) { + if (m.level == level) { + sum += m.size; + } + } + return sum; + } + int TotalTableFiles(int cf = 0, int levels = -1) { if (levels == -1) { levels = CurrentOptions().num_levels; @@ -8527,6 +8539,102 @@ TEST(DBTest, DisableDataSyncTest) { } } +TEST(DBTest, DynamicCompactionOptions) { + const uint64_t k64KB = 1 << 16; + const uint64_t k128KB = 1 << 17; + const uint64_t k256KB = 1 << 18; + const uint64_t k5KB = 5 * 1024; + Options options; + options.env = env_; + options.create_if_missing = true; + options.compression = kNoCompression; + options.max_background_compactions = 4; + options.hard_rate_limit = 1.1; + options.write_buffer_size = k128KB; + options.max_write_buffer_number = 2; + // Compaction related options + options.level0_file_num_compaction_trigger = 3; + options.level0_slowdown_writes_trigger = 10; + options.level0_stop_writes_trigger = 20; + options.max_grandparent_overlap_factor = 10; + options.expanded_compaction_factor = 25; + options.source_compaction_factor = 1; + options.target_file_size_base = k128KB; + options.target_file_size_multiplier = 1; + options.max_bytes_for_level_base = k256KB; + options.max_bytes_for_level_multiplier = 4; + DestroyAndReopen(&options); + + auto gen_l0_kb = [this](int start, int size, int stride = 1) { + Random rnd(301); + std::vector values; + for (int i = 0; i < size; i++) { + values.push_back(RandomString(&rnd, 1024)); + ASSERT_OK(Put(Key(start + stride * i), values[i])); + } + dbfull()->TEST_WaitForFlushMemTable(); + }; + + // Write 3 files that have the same key range, trigger compaction and + // result in one L1 file + gen_l0_kb(0, 128); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + gen_l0_kb(0, 128); + ASSERT_EQ(NumTableFilesAtLevel(0), 2); + gen_l0_kb(0, 128); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,1", FilesPerLevel()); + std::vector metadata; + db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(1, metadata.size()); + ASSERT_LE(metadata[0].size, k128KB + k5KB); // < 128KB + 5KB + ASSERT_GE(metadata[0].size, k128KB - k5KB); // > 128B - 5KB + + // Make compaction trigger and file size smaller + ASSERT_TRUE(dbfull()->SetOptions({ + {"level0_file_num_compaction_trigger", "2"}, + {"target_file_size_base", "65536"} + })); + + gen_l0_kb(0, 128); + ASSERT_EQ("1,1", FilesPerLevel()); + gen_l0_kb(0, 128); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,2", FilesPerLevel()); + metadata.clear(); + db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(2, metadata.size()); + ASSERT_LE(metadata[0].size, k64KB + k5KB); // < 64KB + 5KB + ASSERT_GE(metadata[0].size, k64KB - k5KB); // > 64KB - 5KB + + // Change base level size to 1MB + ASSERT_TRUE(dbfull()->SetOptions({ {"max_bytes_for_level_base", "1048576"} })); + + // writing 56 x 128KB => 7MB + // (L1 + L2) = (1 + 4) * 1MB = 5MB + for (int i = 0; i < 56; ++i) { + gen_l0_kb(i, 128, 56); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(SizeAtLevel(1) < 1048576 * 1.1); + ASSERT_TRUE(SizeAtLevel(2) < 4 * 1048576 * 1.1); + + // Change multiplier to 2 with smaller base + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_bytes_for_level_multiplier", "2"}, + {"max_bytes_for_level_base", "262144"} + })); + + // writing 16 x 128KB + // (L1 + L2 + L3) = (1 + 2 + 4) * 256KB + for (int i = 0; i < 16; ++i) { + gen_l0_kb(i, 128, 50); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1); + ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1); + ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1); +} } // namespace rocksdb diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index 3a5535d2d..eba0a2787 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -60,7 +60,8 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); } - ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu)); + ASSERT_OK(vset->LogAndApply(default_cfd, + *default_cfd->GetLatestMutableCFOptions(), &vbase, &mu)); } for (int i = 0; i < iters; i++) { @@ -69,7 +70,8 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKey start(MakeKey(2 * fnum), 1, kTypeValue); InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); - vset->LogAndApply(default_cfd, &vedit, &mu); + vset->LogAndApply(default_cfd, *default_cfd->GetLatestMutableCFOptions(), + &vedit, &mu); } delete vset; } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 728b1c0a0..bd48f1f47 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -160,7 +160,8 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( - ColumnFamilyData* cfd, const autovector& mems, VersionSet* vset, + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + const autovector& mems, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, FileNumToPathIdMap* pending_outputs, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer) { @@ -197,7 +198,7 @@ Status MemTableList::InstallMemtableFlushResults( cfd->GetName().c_str(), (unsigned long)m->file_number_); // this can release and reacquire the mutex. - s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory); + s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory); // we will be changing the version in the next code path, // so we better create a new one, since versions are immutable diff --git a/db/memtable_list.h b/db/memtable_list.h index 92688825a..d93c7df92 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -113,7 +113,8 @@ class MemTableList { // Commit a successful flush in the manifest file Status InstallMemtableFlushResults( - ColumnFamilyData* cfd, const autovector& m, VersionSet* vset, + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + const autovector& m, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, FileNumToPathIdMap* pending_outputs, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer); diff --git a/db/repair.cc b/db/repair.cc index 2773d4c71..80fb92bd9 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -220,7 +220,7 @@ class Repairer { Slice record; WriteBatch batch; MemTable* mem = new MemTable(icmp_, ioptions_, - MemTableOptions(MutableCFOptions(options_), options_)); + MemTableOptions(MutableCFOptions(options_, ioptions_), options_)); auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_); mem->Ref(); int counter = 0; diff --git a/db/version_set.cc b/db/version_set.cc index 1d1d53813..a092277fa 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -672,7 +672,7 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, } } -void Version::Get(const ReadOptions& options, +void Version::Get(const ReadOptions& read_options, const LookupKey& k, std::string* value, Status* status, @@ -691,8 +691,8 @@ void Version::Get(const ReadOptions& options, &file_indexer_, user_comparator_, internal_comparator_); FdWithKeyRange* f = fp.GetNextFile(); while (f != nullptr) { - *status = table_cache_->Get(options, *internal_comparator_, f->fd, ikey, - &get_context); + *status = table_cache_->Get(read_options, *internal_comparator_, f->fd, + ikey, &get_context); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -746,9 +746,10 @@ void Version::GenerateFileLevels() { } } -void Version::PrepareApply(std::vector& size_being_compacted) { +void Version::PrepareApply(const MutableCFOptions& mutable_cf_options, + std::vector& size_being_compacted) { UpdateTemporaryStats(); - ComputeCompactionScore(size_being_compacted); + ComputeCompactionScore(mutable_cf_options, size_being_compacted); UpdateFilesBySize(); UpdateNumNonEmptyLevels(); file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_); @@ -817,13 +818,13 @@ void Version::UpdateTemporaryStats() { } void Version::ComputeCompactionScore( + const MutableCFOptions& mutable_cf_options, std::vector& size_being_compacted) { double max_score = 0; int max_score_level = 0; int max_input_level = cfd_->compaction_picker()->MaxInputLevel(NumberLevels()); - for (int level = 0; level <= max_input_level; level++) { double score; if (level == 0) { @@ -849,21 +850,22 @@ void Version::ComputeCompactionScore( if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { score = static_cast(total_size) / cfd_->options()->compaction_options_fifo.max_table_files_size; - } else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) { + } else if (numfiles >= mutable_cf_options.level0_stop_writes_trigger) { // If we are slowing down writes, then we better compact that first score = 1000000; - } else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) { + } else if (numfiles >= + mutable_cf_options.level0_slowdown_writes_trigger) { score = 10000; } else { score = static_cast(numfiles) / - cfd_->options()->level0_file_num_compaction_trigger; + mutable_cf_options.level0_file_num_compaction_trigger; } } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalCompensatedFileSize(files_[level]) - size_being_compacted[level]; score = static_cast(level_bytes) / - cfd_->compaction_picker()->MaxBytesForLevel(level); + mutable_cf_options.MaxBytesForLevel(level); if (max_score < score) { max_score = score; max_score_level = level; @@ -993,6 +995,7 @@ bool Version::OverlapInLevel(int level, } int Version::PickLevelForMemTableOutput( + const MutableCFOptions& mutable_cf_options, const Slice& smallest_user_key, const Slice& largest_user_key) { int level = 0; @@ -1013,7 +1016,7 @@ int Version::PickLevelForMemTableOutput( } GetOverlappingInputs(level + 2, &start, &limit, &overlaps); const uint64_t sum = TotalFileSize(overlaps); - if (sum > cfd_->compaction_picker()->MaxGrandParentOverlapBytes(level)) { + if (sum > mutable_cf_options.MaxGrandParentOverlapBytes(level)) { break; } level++; @@ -1246,7 +1249,7 @@ bool Version::HasOverlappingUserKey( return false; } -int64_t Version::NumLevelBytes(int level) const { +uint64_t Version::NumLevelBytes(int level) const { assert(level >= 0); assert(level < NumberLevels()); return TotalFileSize(files_[level]); @@ -1653,16 +1656,17 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, } Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, + const MutableCFOptions& mutable_cf_options, VersionEdit* edit, port::Mutex* mu, Directory* db_directory, bool new_descriptor_log, - const ColumnFamilyOptions* options) { + const ColumnFamilyOptions* new_cf_options) { mu->AssertHeld(); // column_family_data can be nullptr only if this is column_family_add. // in that case, we also need to specify ColumnFamilyOptions if (column_family_data == nullptr) { assert(edit->is_column_family_add_); - assert(options != nullptr); + assert(new_cf_options != nullptr); } // queue our request @@ -1777,7 +1781,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!edit->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. - v->PrepareApply(size_being_compacted); + v->PrepareApply(mutable_cf_options, size_being_compacted); } // Write new record to MANIFEST log @@ -1853,8 +1857,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (edit->is_column_family_add_) { // no group commit on column family add assert(batch_edits.size() == 1); - assert(options != nullptr); - CreateColumnFamily(*options, edit); + assert(new_cf_options != nullptr); + CreateColumnFamily(*new_cf_options, edit); } else if (edit->is_column_family_drop_) { assert(batch_edits.size() == 1); column_family_data->SetDropped(); @@ -2198,7 +2202,7 @@ Status VersionSet::Recover( // Install recovered version std::vector size_being_compacted(v->NumberLevels() - 1); cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); - v->PrepareApply(size_being_compacted); + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); AppendVersion(cfd, v); } @@ -2374,11 +2378,13 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, current_version->files_ = new_files_list; current_version->num_levels_ = new_levels; + MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options)); VersionEdit ve; port::Mutex dummy_mutex; MutexLock l(&dummy_mutex); - return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(), &ve, - &dummy_mutex, nullptr, true); + return versions.LogAndApply( + versions.GetColumnFamilySet()->GetDefault(), + mutable_cf_options, &ve, &dummy_mutex, nullptr, true); } Status VersionSet::DumpManifest(Options& options, std::string& dscname, @@ -2530,7 +2536,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, builder->SaveTo(v); std::vector size_being_compacted(v->NumberLevels() - 1); cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); - v->PrepareApply(size_being_compacted); + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); delete builder; printf("--------------- Column family \"%s\" (ID %u) --------------\n", diff --git a/db/version_set.h b/db/version_set.h index 4a27a9592..05e6e9a65 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -103,14 +103,18 @@ class Version { // We use compaction scores to figure out which compaction to do next // REQUIRES: If Version is not yet saved to current_, it can be called without // a lock. Once a version is saved to current_, call only with mutex held - void ComputeCompactionScore(std::vector& size_being_compacted); + void ComputeCompactionScore( + const MutableCFOptions& mutable_cf_options, + std::vector& size_being_compacted); // Generate file_levels_ from files_ void GenerateFileLevels(); // Update scores, pre-calculated variables. It needs to be called before // applying the version to the version set. - void PrepareApply(std::vector& size_being_compacted); + void PrepareApply( + const MutableCFOptions& mutable_cf_options, + std::vector& size_being_compacted); // Reference count management (so Versions do not disappear out from // under live iterators) @@ -169,7 +173,8 @@ class Version { // Return the level at which we should place a new memtable compaction // result that covers the range [smallest_user_key,largest_user_key]. - int PickLevelForMemTableOutput(const Slice& smallest_user_key, + int PickLevelForMemTableOutput(const MutableCFOptions& mutable_cf_options, + const Slice& smallest_user_key, const Slice& largest_user_key); int NumberLevels() const { return num_levels_; } @@ -178,7 +183,7 @@ class Version { int NumLevelFiles(int level) const { return files_[level].size(); } // Return the combined file size of all files at the specified level. - int64_t NumLevelBytes(int level) const; + uint64_t NumLevelBytes(int level) const; // Return a human-readable short (single-line) summary of the number // of files per level. Uses *scratch as backing store. @@ -369,7 +374,9 @@ class VersionSet { // column_family_options has to be set if edit is column family add // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() - Status LogAndApply(ColumnFamilyData* column_family_data, VersionEdit* edit, + Status LogAndApply(ColumnFamilyData* column_family_data, + const MutableCFOptions& mutable_cf_options, + VersionEdit* edit, port::Mutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index ba7451078..cb4048214 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -27,8 +27,9 @@ static std::string PrintContents(WriteBatch* b) { auto factory = std::make_shared(); Options options; options.memtable_factory = factory; - MemTable* mem = new MemTable(cmp, ImmutableCFOptions(options), - MemTableOptions(MutableCFOptions(options), options)); + ImmutableCFOptions ioptions(options); + MemTable* mem = new MemTable(cmp, ioptions, + MemTableOptions(MutableCFOptions(options, ioptions), options)); mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 54b676626..2dd50f756 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -22,6 +22,7 @@ struct ImmutableCFOptions { CompactionStyle compaction_style; CompactionOptionsUniversal compaction_options_universal; + CompactionOptionsFIFO compaction_options_fifo; const SliceTransform* prefix_extractor; @@ -79,6 +80,8 @@ struct ImmutableCFOptions { CompressionOptions compression_opts; Options::AccessHint access_hint_on_compaction_start; + + int num_levels; }; } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index df662ad88..e4657e8cd 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -437,8 +437,9 @@ class MemTableConstructor: public Constructor { table_factory_(new SkipListFactory) { Options options; options.memtable_factory = table_factory_; - memtable_ = new MemTable(internal_comparator_, ImmutableCFOptions(options), - MemTableOptions(MutableCFOptions(options), options)); + ImmutableCFOptions ioptions(options); + memtable_ = new MemTable(internal_comparator_, ioptions, + MemTableOptions(MutableCFOptions(options, ioptions), options)); memtable_->Ref(); } ~MemTableConstructor() { @@ -452,8 +453,9 @@ class MemTableConstructor: public Constructor { delete memtable_->Unref(); Options options; options.memtable_factory = table_factory_; - memtable_ = new MemTable(internal_comparator_, ImmutableCFOptions(options), - MemTableOptions(MutableCFOptions(options), options)); + ImmutableCFOptions mem_ioptions(options); + memtable_ = new MemTable(internal_comparator_, mem_ioptions, + MemTableOptions(MutableCFOptions(options, mem_ioptions), options)); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -1864,8 +1866,9 @@ TEST(MemTableTest, Simple) { auto table_factory = std::make_shared(); Options options; options.memtable_factory = table_factory; - MemTable* memtable = new MemTable(cmp, ImmutableCFOptions(options), - MemTableOptions(MutableCFOptions(options), options)); + ImmutableCFOptions ioptions(options); + MemTable* memtable = new MemTable(cmp, ioptions, + MemTableOptions(MutableCFOptions(options, ioptions), options)); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/util/mutable_cf_options.cc b/util/mutable_cf_options.cc new file mode 100644 index 000000000..1c710c656 --- /dev/null +++ b/util/mutable_cf_options.cc @@ -0,0 +1,72 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include "rocksdb/options.h" +#include "rocksdb/immutable_options.h" +#include "util/mutable_cf_options.h" + +namespace rocksdb { + +namespace { +// Multiple two operands. If they overflow, return op1. +uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { + if (op1 == 0) { + return 0; + } + if (op2 <= 0) { + return op1; + } + uint64_t casted_op2 = (uint64_t) op2; + if (std::numeric_limits::max() / op1 < casted_op2) { + return op1; + } + return op1 * casted_op2; +} +} // anonymous namespace + +void MutableCFOptions::RefreshDerivedOptions( + const ImmutableCFOptions& ioptions) { + max_file_size.resize(ioptions.num_levels); + level_max_bytes.resize(ioptions.num_levels); + for (int i = 0; i < ioptions.num_levels; ++i) { + if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) { + max_file_size[i] = ULLONG_MAX; + level_max_bytes[i] = max_bytes_for_level_base; + } else if (i > 1) { + max_file_size[i] = MultiplyCheckOverflow(max_file_size[i - 1], + target_file_size_multiplier); + level_max_bytes[i] = MultiplyCheckOverflow( + MultiplyCheckOverflow(level_max_bytes[i - 1], + max_bytes_for_level_multiplier), + max_bytes_for_level_multiplier_additional[i - 1]); + } else { + max_file_size[i] = target_file_size_base; + level_max_bytes[i] = max_bytes_for_level_base; + } + } +} + +uint64_t MutableCFOptions::MaxFileSizeForLevel(int level) const { + assert(level >= 0); + assert(level < (int)max_file_size.size()); + return max_file_size[level]; +} +uint64_t MutableCFOptions::MaxBytesForLevel(int level) const { + // Note: the result for level zero is not really used since we set + // the level-0 compaction threshold based on number of files. + assert(level >= 0); + assert(level < (int)level_max_bytes.size()); + return level_max_bytes[level]; +} +uint64_t MutableCFOptions::MaxGrandParentOverlapBytes(int level) const { + return MaxFileSizeForLevel(level) * max_grandparent_overlap_factor; +} +uint64_t MutableCFOptions::ExpandedCompactionByteSizeLimit(int level) const { + return MaxFileSizeForLevel(level) * expanded_compaction_factor; +} + +} // namespace rocksdb diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 39ebe2d85..02f63fed4 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -5,12 +5,14 @@ #pragma once +#include #include "rocksdb/options.h" +#include "rocksdb/immutable_options.h" namespace rocksdb { struct MutableCFOptions { - explicit MutableCFOptions(const Options& options) + MutableCFOptions(const Options& options, const ImmutableCFOptions& ioptions) : write_buffer_size(options.write_buffer_size), arena_block_size(options.arena_block_size), memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits), @@ -18,7 +20,22 @@ struct MutableCFOptions { memtable_prefix_bloom_huge_page_tlb_size( options.memtable_prefix_bloom_huge_page_tlb_size), max_successive_merges(options.max_successive_merges), - filter_deletes(options.filter_deletes) { + filter_deletes(options.filter_deletes), + level0_file_num_compaction_trigger( + options.level0_file_num_compaction_trigger), + level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), + level0_stop_writes_trigger(options.level0_stop_writes_trigger), + max_grandparent_overlap_factor(options.max_grandparent_overlap_factor), + expanded_compaction_factor(options.expanded_compaction_factor), + source_compaction_factor(options.source_compaction_factor), + target_file_size_base(options.target_file_size_base), + target_file_size_multiplier(options.target_file_size_multiplier), + max_bytes_for_level_base(options.max_bytes_for_level_base), + max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier), + max_bytes_for_level_multiplier_additional( + options.max_bytes_for_level_multiplier_additional) + { + RefreshDerivedOptions(ioptions); } MutableCFOptions() : write_buffer_size(0), @@ -27,8 +44,33 @@ struct MutableCFOptions { memtable_prefix_bloom_probes(0), memtable_prefix_bloom_huge_page_tlb_size(0), max_successive_merges(0), - filter_deletes(false) {} + filter_deletes(false), + level0_file_num_compaction_trigger(0), + level0_slowdown_writes_trigger(0), + level0_stop_writes_trigger(0), + max_grandparent_overlap_factor(0), + expanded_compaction_factor(0), + source_compaction_factor(0), + target_file_size_base(0), + target_file_size_multiplier(0), + max_bytes_for_level_base(0), + max_bytes_for_level_multiplier(0) + {} + // Must be called after any change to MutableCFOptions + void RefreshDerivedOptions(const ImmutableCFOptions& ioptions); + + // Get the max file size in a given level. + uint64_t MaxFileSizeForLevel(int level) const; + // Returns maximum total bytes of data on a given level. + uint64_t MaxBytesForLevel(int level) const; + // Returns maximum total overlap bytes with grandparent + // level (i.e., level+2) before we stop building a single + // file in level->level+1 compaction. + uint64_t MaxGrandParentOverlapBytes(int level) const; + uint64_t ExpandedCompactionByteSizeLimit(int level) const; + + // Memtable related options size_t write_buffer_size; size_t arena_block_size; uint32_t memtable_prefix_bloom_bits; @@ -36,6 +78,25 @@ struct MutableCFOptions { size_t memtable_prefix_bloom_huge_page_tlb_size; size_t max_successive_merges; bool filter_deletes; + + // Compaction related options + int level0_file_num_compaction_trigger; + int level0_slowdown_writes_trigger; + int level0_stop_writes_trigger; + int max_grandparent_overlap_factor; + int expanded_compaction_factor; + int source_compaction_factor; + int target_file_size_base; + int target_file_size_multiplier; + uint64_t max_bytes_for_level_base; + int max_bytes_for_level_multiplier; + std::vector max_bytes_for_level_multiplier_additional; + + // Derived options + // Per-level target file size. + std::vector max_file_size; + // Per-level max bytes + std::vector level_max_bytes; }; } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index 8716b465d..b5dc98317 100644 --- a/util/options.cc +++ b/util/options.cc @@ -35,6 +35,7 @@ namespace rocksdb { ImmutableCFOptions::ImmutableCFOptions(const Options& options) : compaction_style(options.compaction_style), compaction_options_universal(options.compaction_options_universal), + compaction_options_fifo(options.compaction_options_fifo), prefix_extractor(options.prefix_extractor.get()), comparator(options.comparator), merge_operator(options.merge_operator.get()), @@ -60,7 +61,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) compression(options.compression), compression_per_level(options.compression_per_level), compression_opts(options.compression_opts), - access_hint_on_compaction_start(options.access_hint_on_compaction_start) {} + access_hint_on_compaction_start(options.access_hint_on_compaction_start), + num_levels(options.num_levels) {} ColumnFamilyOptions::ColumnFamilyOptions() : comparator(BytewiseComparator()), diff --git a/util/options_helper.cc b/util/options_helper.cc index 35c3f63df..2a61c8b69 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include #include "rocksdb/options.h" #include "util/options_helper.h" @@ -73,8 +74,8 @@ CompactionStyle ParseCompactionStyle(const std::string& type) { } // anonymouse namespace template -bool ParseMemtableOption(const std::string& name, const std::string& value, - OptionsType* new_options) { +bool ParseMemtableOptions(const std::string& name, const std::string& value, + OptionsType* new_options) { if (name == "write_buffer_size") { new_options->write_buffer_size = ParseInt64(value); } else if (name == "arena_block_size") { @@ -96,6 +97,50 @@ bool ParseMemtableOption(const std::string& name, const std::string& value, return true; } +template +bool ParseCompactionOptions(const std::string& name, const std::string& value, + OptionsType* new_options) { + if (name == "level0_file_num_compaction_trigger") { + new_options->level0_file_num_compaction_trigger = ParseInt(value); + } else if (name == "level0_slowdown_writes_trigger") { + new_options->level0_slowdown_writes_trigger = ParseInt(value); + } else if (name == "level0_stop_writes_trigger") { + new_options->level0_stop_writes_trigger = ParseInt(value); + } else if (name == "max_grandparent_overlap_factor") { + new_options->max_grandparent_overlap_factor = ParseInt(value); + } else if (name == "expanded_compaction_factor") { + new_options->expanded_compaction_factor = ParseInt(value); + } else if (name == "source_compaction_factor") { + new_options->source_compaction_factor = ParseInt(value); + } else if (name == "target_file_size_base") { + new_options->target_file_size_base = ParseInt(value); + } else if (name == "target_file_size_multiplier") { + new_options->target_file_size_multiplier = ParseInt(value); + } else if (name == "max_bytes_for_level_base") { + new_options->max_bytes_for_level_base = ParseUint64(value); + } else if (name == "max_bytes_for_level_multiplier") { + new_options->max_bytes_for_level_multiplier = ParseInt(value); + } else if (name == "max_bytes_for_level_multiplier_additional") { + new_options->max_bytes_for_level_multiplier_additional.clear(); + size_t start = 0; + while (true) { + size_t end = value.find_first_of(':', start); + if (end == std::string::npos) { + new_options->max_bytes_for_level_multiplier_additional.push_back( + ParseInt(value.substr(start))); + break; + } else { + new_options->max_bytes_for_level_multiplier_additional.push_back( + ParseInt(value.substr(start, end - start))); + start = end + 1; + } + } + } else { + return false; + } + return true; +} + bool GetMutableOptionsFromStrings( const MutableCFOptions& base_options, const std::unordered_map& options_map, @@ -104,7 +149,8 @@ bool GetMutableOptionsFromStrings( *new_options = base_options; try { for (const auto& o : options_map) { - if (ParseMemtableOption(o.first, o.second, new_options)) { + if (ParseMemtableOptions(o.first, o.second, new_options)) { + } else if (ParseCompactionOptions(o.first, o.second, new_options)) { } else { return false; } @@ -123,7 +169,8 @@ bool GetOptionsFromStrings( *new_options = base_options; for (const auto& o : options_map) { try { - if (ParseMemtableOption(o.first, o.second, new_options)) { + if (ParseMemtableOptions(o.first, o.second, new_options)) { + } else if (ParseCompactionOptions(o.first, o.second, new_options)) { } else if (o.first == "max_write_buffer_number") { new_options->max_write_buffer_number = ParseInt(o.second); } else if (o.first == "min_write_buffer_number_to_merge") { @@ -168,43 +215,8 @@ bool GetOptionsFromStrings( ParseInt(o.second.substr(start, o.second.size() - start)); } else if (o.first == "num_levels") { new_options->num_levels = ParseInt(o.second); - } else if (o.first == "level0_file_num_compaction_trigger") { - new_options->level0_file_num_compaction_trigger = ParseInt(o.second); - } else if (o.first == "level0_slowdown_writes_trigger") { - new_options->level0_slowdown_writes_trigger = ParseInt(o.second); - } else if (o.first == "level0_stop_writes_trigger") { - new_options->level0_stop_writes_trigger = ParseInt(o.second); } else if (o.first == "max_mem_compaction_level") { new_options->max_mem_compaction_level = ParseInt(o.second); - } else if (o.first == "target_file_size_base") { - new_options->target_file_size_base = ParseUint64(o.second); - } else if (o.first == "target_file_size_multiplier") { - new_options->target_file_size_multiplier = ParseInt(o.second); - } else if (o.first == "max_bytes_for_level_base") { - new_options->max_bytes_for_level_base = ParseUint64(o.second); - } else if (o.first == "max_bytes_for_level_multiplier") { - new_options->max_bytes_for_level_multiplier = ParseInt(o.second); - } else if (o.first == "max_bytes_for_level_multiplier_additional") { - new_options->max_bytes_for_level_multiplier_additional.clear(); - size_t start = 0; - while (true) { - size_t end = o.second.find_first_of(':', start); - if (end == std::string::npos) { - new_options->max_bytes_for_level_multiplier_additional.push_back( - ParseInt(o.second.substr(start))); - break; - } else { - new_options->max_bytes_for_level_multiplier_additional.push_back( - ParseInt(o.second.substr(start, end - start))); - start = end + 1; - } - } - } else if (o.first == "expanded_compaction_factor") { - new_options->expanded_compaction_factor = ParseInt(o.second); - } else if (o.first == "source_compaction_factor") { - new_options->source_compaction_factor = ParseInt(o.second); - } else if (o.first == "max_grandparent_overlap_factor") { - new_options->max_grandparent_overlap_factor = ParseInt(o.second); } else if (o.first == "soft_rate_limit") { new_options->soft_rate_limit = ParseDouble(o.second); } else if (o.first == "hard_rate_limit") {