make compaction related options changeable

Summary:
make compaction related options changeable. Most of changes are tedious,
following the same convention: grabs MutableCFOptions at the beginning
of compaction under mutex, then pass it throughout the job and register
it in SuperVersion at the end.

Test Plan: make all check

Reviewers: igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D23349
main
Lei Jin 10 years ago
parent d122e7bcf4
commit 5ec53f3edf
  1. 75
      db/column_family.cc
  2. 16
      db/column_family.h
  3. 7
      db/compaction.cc
  4. 8
      db/compaction.h
  5. 238
      db/compaction_picker.cc
  6. 107
      db/compaction_picker.h
  7. 159
      db/db_impl.cc
  8. 30
      db/db_impl.h
  9. 108
      db/db_test.cc
  10. 6
      db/log_and_apply_bench.cc
  11. 5
      db/memtable_list.cc
  12. 3
      db/memtable_list.h
  13. 2
      db/repair.cc
  14. 48
      db/version_set.cc
  15. 17
      db/version_set.h
  16. 5
      db/write_batch_test.cc
  17. 3
      include/rocksdb/immutable_options.h
  18. 15
      table/table_test.cc
  19. 72
      util/mutable_cf_options.cc
  20. 67
      util/mutable_cf_options.h
  21. 4
      util/options.cc
  22. 90
      util/options_helper.cc

@ -230,7 +230,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
internal_comparator_(cf_options.comparator),
options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)),
ioptions_(options_),
mutable_cf_options_(options_),
mutable_cf_options_(options_, ioptions_),
mem_(nullptr),
imm_(options_.min_write_buffer_number_to_merge),
super_version_(nullptr),
@ -245,27 +245,27 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name,
// if dummy_versions is nullptr, then this is a dummy column family.
if (dummy_versions != nullptr) {
internal_stats_.reset(
new InternalStats(options_.num_levels, db_options->env, this));
new InternalStats(ioptions_.num_levels, db_options->env, this));
table_cache_.reset(new TableCache(ioptions_, env_options, table_cache));
if (options_.compaction_style == kCompactionStyleUniversal) {
if (ioptions_.compaction_style == kCompactionStyleUniversal) {
compaction_picker_.reset(
new UniversalCompactionPicker(&options_, &internal_comparator_));
} else if (options_.compaction_style == kCompactionStyleLevel) {
new UniversalCompactionPicker(ioptions_, &internal_comparator_));
} else if (ioptions_.compaction_style == kCompactionStyleLevel) {
compaction_picker_.reset(
new LevelCompactionPicker(&options_, &internal_comparator_));
new LevelCompactionPicker(ioptions_, &internal_comparator_));
} else {
assert(options_.compaction_style == kCompactionStyleFIFO);
assert(ioptions_.compaction_style == kCompactionStyleFIFO);
compaction_picker_.reset(
new FIFOCompactionPicker(&options_, &internal_comparator_));
new FIFOCompactionPicker(ioptions_, &internal_comparator_));
}
Log(options_.info_log, "Options for column family \"%s\":\n",
Log(ioptions_.info_log, "Options for column family \"%s\":\n",
name.c_str());
const ColumnFamilyOptions* cf_options = &options_;
cf_options->Dump(options_.info_log.get());
cf_options->Dump(ioptions_.info_log);
}
RecalculateWriteStallConditions();
RecalculateWriteStallConditions(mutable_cf_options_);
}
// DB mutex held
@ -318,7 +318,8 @@ ColumnFamilyData::~ColumnFamilyData() {
}
}
void ColumnFamilyData::RecalculateWriteStallConditions() {
void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
if (current_ != nullptr) {
const double score = current_->MaxCompactionScore();
const int max_level = current_->MaxCompactionScoreLevel();
@ -328,26 +329,27 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
if (imm()->size() == options_.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stopping writes because we have %d immutable memtables "
"(waiting for flush)",
name_.c_str(), imm()->size());
} else if (current_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) {
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), current_->NumLevelFiles(0));
} else if (options_.level0_slowdown_writes_trigger >= 0 &&
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
current_->NumLevelFiles(0) >=
options_.level0_slowdown_writes_trigger) {
mutable_cf_options.level0_slowdown_writes_trigger) {
uint64_t slowdown = SlowdownAmount(
current_->NumLevelFiles(0), options_.level0_slowdown_writes_trigger,
options_.level0_stop_writes_trigger);
current_->NumLevelFiles(0),
mutable_cf_options.level0_slowdown_writes_trigger,
mutable_cf_options.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64
"us)",
name_.c_str(), current_->NumLevelFiles(0), slowdown);
@ -358,7 +360,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
write_controller->GetDelayToken(kHardLimitSlowdown);
internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown,
false);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we hit hard limit on level %d. "
"(%" PRIu64 "us)",
name_.c_str(), max_level, kHardLimitSlowdown);
@ -368,7 +370,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() {
options_.hard_rate_limit);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true);
Log(options_.info_log,
Log(ioptions_.info_log,
"[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64
"us)",
name_.c_str(), max_level, slowdown);
@ -393,19 +395,21 @@ void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) {
mem_->Ref();
}
Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(current_, log_buffer);
Compaction* ColumnFamilyData::PickCompaction(
const MutableCFOptions& mutable_options, LogBuffer* log_buffer) {
auto result = compaction_picker_->PickCompaction(
mutable_options, current_, log_buffer);
return result;
}
Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level,
uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(current_, input_level, output_level,
output_path_id, begin, end,
compaction_end);
Compaction* ColumnFamilyData::CompactRange(
const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
return compaction_picker_->CompactRange(
mutable_cf_options, current_, input_level, output_level,
output_path_id, begin, end, compaction_end);
}
SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(
@ -443,11 +447,11 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(
sv = static_cast<SuperVersion*>(ptr);
if (sv == SuperVersion::kSVObsolete ||
sv->version_number != super_version_number_.load()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES);
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES);
SuperVersion* sv_to_delete = nullptr;
if (sv && sv->Unref()) {
RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS);
RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS);
db_mutex->Lock();
// NOTE: underlying resources held by superversion (sst files) might
// not be released until the next background job.
@ -502,7 +506,7 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion(
// Reset SuperVersions cached in thread local storage
ResetThreadLocalSuperVersions();
RecalculateWriteStallConditions();
RecalculateWriteStallConditions(mutable_cf_options);
if (old_superversion != nullptr && old_superversion->Unref()) {
old_superversion->Cleanup();
@ -533,6 +537,7 @@ bool ColumnFamilyData::SetOptions(
if (GetMutableOptionsFromStrings(mutable_cf_options_, options_map,
&new_mutable_cf_options)) {
mutable_cf_options_ = new_mutable_cf_options;
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
return true;
}
return false;

@ -203,11 +203,14 @@ class ColumnFamilyData {
TableCache* table_cache() const { return table_cache_.get(); }
// See documentation in compaction_picker.h
Compaction* PickCompaction(LogBuffer* log_buffer);
Compaction* CompactRange(int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);
// REQUIRES: DB mutex held
Compaction* PickCompaction(const MutableCFOptions& mutable_options,
LogBuffer* log_buffer);
Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
CompactionPicker* compaction_picker() { return compaction_picker_.get(); }
// thread-safe
@ -260,7 +263,8 @@ class ColumnFamilyData {
// recalculation of compaction score. These values are used in
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions();
void RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
uint32_t id_;
const std::string name_;

@ -56,7 +56,6 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level,
is_full_compaction_(false),
is_manual_compaction_(false),
level_ptrs_(std::vector<size_t>(number_levels_)) {
cfd_->Ref();
input_version_->Ref();
edit_ = new VersionEdit();
@ -267,12 +266,12 @@ void Compaction::Summary(char* output, int len) {
snprintf(output + write, len - write, "]");
}
uint64_t Compaction::OutputFilePreallocationSize() {
uint64_t Compaction::OutputFilePreallocationSize(
const MutableCFOptions& mutable_options) {
uint64_t preallocation_size = 0;
if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) {
preallocation_size =
cfd_->compaction_picker()->MaxFileSizeForLevel(output_level());
preallocation_size = mutable_options.MaxFileSizeForLevel(output_level());
} else {
for (int level = 0; level < num_input_levels(); ++level) {
for (const auto& f : inputs_[level].files) {

@ -10,6 +10,7 @@
#pragma once
#include "util/arena.h"
#include "util/autovector.h"
#include "util/mutable_cf_options.h"
#include "db/version_set.h"
namespace rocksdb {
@ -151,10 +152,14 @@ class Compaction {
// Was this compaction triggered manually by the client?
bool IsManualCompaction() { return is_manual_compaction_; }
// Return the MutableCFOptions that should be used throughout the compaction
// procedure
const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; }
// Returns the size in bytes that the output file should be preallocated to.
// In level compaction, that is max_file_size_. In universal compaction, that
// is the sum of all input file sizes.
uint64_t OutputFilePreallocationSize();
uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options);
private:
friend class CompactionPicker;
@ -171,6 +176,7 @@ class Compaction {
const int output_level_; // levels to which output files are stored
uint64_t max_output_file_size_;
uint64_t max_grandparent_overlap_bytes_;
MutableCFOptions mutable_cf_options_;
Version* input_version_;
VersionEdit* edit_;
int number_levels_;

@ -35,70 +35,36 @@ namespace {
// If enable_compression is false, then compression is always disabled no
// matter what the values of the other two parameters are.
// Otherwise, the compression type is determined based on options and level.
CompressionType GetCompressionType(const Options& options, int level,
const bool enable_compression = true) {
CompressionType GetCompressionType(
const ImmutableCFOptions& ioptions, int level,
const bool enable_compression = true) {
if (!enable_compression) {
// disable compression
return kNoCompression;
}
// If the use has specified a different compression level for each level,
// then pick the compression for that level.
if (!options.compression_per_level.empty()) {
const int n = options.compression_per_level.size() - 1;
if (!ioptions.compression_per_level.empty()) {
const int n = ioptions.compression_per_level.size() - 1;
// It is possible for level_ to be -1; in that case, we use level
// 0's compression. This occurs mostly in backwards compatibility
// situations when the builder doesn't know what level the file
// belongs to. Likewise, if level is beyond the end of the
// specified compression levels, use the last value.
return options.compression_per_level[std::max(0, std::min(level, n))];
return ioptions.compression_per_level[std::max(0, std::min(level, n))];
} else {
return options.compression;
return ioptions.compression;
}
}
// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
if (op1 == 0) {
return 0;
}
if (op2 <= 0) {
return op1;
}
uint64_t casted_op2 = (uint64_t) op2;
if (std::numeric_limits<uint64_t>::max() / op1 < casted_op2) {
return op1;
}
return op1 * casted_op2;
}
} // anonymous namespace
CompactionPicker::CompactionPicker(const Options* options,
CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: compactions_in_progress_(options->num_levels),
options_(options),
num_levels_(options->num_levels),
: ioptions_(ioptions),
compactions_in_progress_(ioptions_.num_levels),
icmp_(icmp) {
max_file_size_.reset(new uint64_t[NumberLevels()]);
level_max_bytes_.reset(new uint64_t[NumberLevels()]);
int target_file_size_multiplier = options_->target_file_size_multiplier;
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
for (int i = 0; i < NumberLevels(); i++) {
if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) {
max_file_size_[i] = ULLONG_MAX;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
} else if (i > 1) {
max_file_size_[i] = MultiplyCheckOverflow(max_file_size_[i - 1],
target_file_size_multiplier);
level_max_bytes_[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes_[i - 1], max_bytes_multiplier),
options_->max_bytes_for_level_multiplier_additional[i - 1]);
} else {
max_file_size_[i] = options_->target_file_size_base;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
}
}
}
CompactionPicker::~CompactionPicker() {}
@ -126,26 +92,6 @@ void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) {
}
}
uint64_t CompactionPicker::MaxFileSizeForLevel(int level) const {
assert(level >= 0);
assert(level < NumberLevels());
return max_file_size_[level];
}
uint64_t CompactionPicker::MaxGrandParentOverlapBytes(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->max_grandparent_overlap_factor;
return result;
}
double CompactionPicker::MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < NumberLevels());
return level_max_bytes_[level];
}
void CompactionPicker::GetRange(const std::vector<FileMetaData*>& inputs,
InternalKey* smallest, InternalKey* largest) {
assert(!inputs.empty());
@ -214,7 +160,7 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
// compaction, then we must drop/cancel this compaction.
int parent_index = -1;
if (c->inputs_[0].empty()) {
Log(options_->info_log,
Log(ioptions_.info_log,
"[%s] ExpandWhileOverlapping() failure because zero input files",
c->column_family_data()->GetName().c_str());
}
@ -229,12 +175,6 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) {
return true;
}
uint64_t CompactionPicker::ExpandedCompactionByteSizeLimit(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->expanded_compaction_factor;
return result;
}
// Returns true if any one of specified files are being compacted
bool CompactionPicker::FilesInCompaction(std::vector<FileMetaData*>& files) {
for (unsigned int i = 0; i < files.size(); i++) {
@ -262,7 +202,8 @@ bool CompactionPicker::ParentRangeInCompaction(Version* version,
// Will also attempt to expand "level" if that doesn't expand "level+1"
// or cause "level" to include a file for compaction that has an overlapping
// user-key with another file.
void CompactionPicker::SetupOtherInputs(Compaction* c) {
void CompactionPicker::SetupOtherInputs(
const MutableCFOptions& mutable_cf_options, Compaction* c) {
// If inputs are empty, then there is nothing to expand.
// If both input and output levels are the same, no need to consider
// files at level "level+1"
@ -298,7 +239,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files);
const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1].files);
const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0);
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < limit &&
!FilesInCompaction(expanded0) &&
@ -311,7 +252,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
&c->parent_index_);
if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) {
Log(options_->info_log,
Log(ioptions_.info_log,
"[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64
" bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n",
c->column_family_data()->GetName().c_str(), level,
@ -336,21 +277,20 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) {
}
}
Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
int output_level,
uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) {
Compaction* CompactionPicker::CompactRange(
const MutableCFOptions& mutable_cf_options, Version* version,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
// CompactionPickerFIFO has its own implementation of compact range
assert(options_->compaction_style != kCompactionStyleFIFO);
assert(ioptions_.compaction_style != kCompactionStyleFIFO);
std::vector<FileMetaData*> inputs;
bool covering_the_whole_range = true;
// All files are 'overlapping' in universal style compaction.
// We have to compact the entire range in one shot.
if (options_->compaction_style == kCompactionStyleUniversal) {
if (ioptions_.compaction_style == kCompactionStyleUniversal) {
begin = nullptr;
end = nullptr;
}
@ -364,8 +304,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
// and we must not pick one file and drop another older file if the
// two files overlap.
if (input_level > 0) {
const uint64_t limit =
MaxFileSizeForLevel(input_level) * options_->source_compaction_factor;
const uint64_t limit = mutable_cf_options.MaxFileSizeForLevel(input_level) *
mutable_cf_options.source_compaction_factor;
uint64_t total = 0;
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
uint64_t s = inputs[i]->compensated_file_size;
@ -378,22 +318,24 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
}
}
}
assert(output_path_id < static_cast<uint32_t>(options_->db_paths.size()));
assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
Compaction* c = new Compaction(
version, input_level, output_level, MaxFileSizeForLevel(output_level),
MaxGrandParentOverlapBytes(input_level), output_path_id,
GetCompressionType(*options_, output_level));
version, input_level, output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
output_path_id,
GetCompressionType(ioptions_, output_level));
c->inputs_[0].files = inputs;
if (ExpandWhileOverlapping(c) == false) {
delete c;
Log(options_->info_log,
Log(ioptions_.info_log,
"[%s] Could not compact due to expansion failure.\n",
version->cfd_->GetName().c_str());
return nullptr;
}
SetupOtherInputs(c);
SetupOtherInputs(mutable_cf_options, c);
if (covering_the_whole_range) {
*compaction_end = nullptr;
@ -408,12 +350,14 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level,
c->SetupBottomMostLevel(true);
c->is_manual_compaction_ = true;
c->mutable_cf_options_ = mutable_cf_options;
return c;
}
Compaction* LevelCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
Compaction* LevelCompactionPicker::PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
Compaction* c = nullptr;
int level = -1;
@ -421,7 +365,7 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
// and also in LogAndApply(), otherwise the values could be stale.
std::vector<uint64_t> size_being_compacted(NumberLevels() - 1);
SizeBeingCompacted(size_being_compacted);
version->ComputeCompactionScore(size_being_compacted);
version->ComputeCompactionScore(mutable_cf_options, size_being_compacted);
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
@ -432,7 +376,8 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
version->compaction_score_[i] <= version->compaction_score_[i - 1]);
level = version->compaction_level_[i];
if ((version->compaction_score_[i] >= 1)) {
c = PickCompactionBySize(version, level, version->compaction_score_[i]);
c = PickCompactionBySize(mutable_cf_options, version, level,
version->compaction_score_[i]);
if (c == nullptr || ExpandWhileOverlapping(c) == false) {
delete c;
c = nullptr;
@ -472,7 +417,7 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
}
// Setup "level+1" files (inputs_[1])
SetupOtherInputs(c);
SetupOtherInputs(mutable_cf_options, c);
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
@ -483,12 +428,13 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version,
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
c->mutable_cf_options_ = mutable_cf_options;
return c;
}
Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
int level,
double score) {
Compaction* LevelCompactionPicker::PickCompactionBySize(
const MutableCFOptions& mutable_cf_options,
Version* version, int level, double score) {
Compaction* c = nullptr;
// level 0 files are overlapping. So we cannot pick more
@ -501,9 +447,10 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
assert(level >= 0);
assert(level + 1 < NumberLevels());
c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1),
MaxGrandParentOverlapBytes(level), 0,
GetCompressionType(*options_, level + 1));
c = new Compaction(version, level, level + 1,
mutable_cf_options.MaxFileSizeForLevel(level + 1),
mutable_cf_options.MaxGrandParentOverlapBytes(level), 0,
GetCompressionType(ioptions_, level + 1));
c->score_ = score;
// Pick the largest file in this level that is not already
@ -563,13 +510,14 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version,
// Universal style of compaction. Pick files that are contiguous in
// time-range to compact.
//
Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
Compaction* UniversalCompactionPicker::PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
int level = 0;
double score = version->compaction_score_[0];
if ((version->files_[level].size() <
(unsigned int)options_->level0_file_num_compaction_trigger)) {
(unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) {
LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n",
version->cfd_->GetName().c_str());
return nullptr;
@ -581,17 +529,18 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
// Check for size amplification first.
Compaction* c;
if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) !=
nullptr) {
if ((c = PickCompactionUniversalSizeAmp(
mutable_cf_options, version, score, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n",
version->cfd_->GetName().c_str());
} else {
// Size amplification is within limits. Try reducing read
// amplification while maintaining file size ratios.
unsigned int ratio = options_->compaction_options_universal.size_ratio;
unsigned int ratio = ioptions_.compaction_options_universal.size_ratio;
if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX,
log_buffer)) != nullptr) {
if ((c = PickCompactionUniversalReadAmp(
mutable_cf_options, version, score, ratio,
UINT_MAX, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n",
version->cfd_->GetName().c_str());
} else {
@ -600,9 +549,10 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
// compaction without looking at filesize ratios and try to reduce
// the number of files to fewer than level0_file_num_compaction_trigger.
unsigned int num_files = version->files_[level].size() -
options_->level0_file_num_compaction_trigger;
mutable_cf_options.level0_file_num_compaction_trigger;
if ((c = PickCompactionUniversalReadAmp(
version, score, UINT_MAX, num_files, log_buffer)) != nullptr) {
mutable_cf_options, version, score, UINT_MAX,
num_files, log_buffer)) != nullptr) {
LogToBuffer(log_buffer, "[%s] Universal: compacting for file num\n",
version->cfd_->GetName().c_str());
}
@ -628,7 +578,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
c->bottommost_level_ = c->inputs_[0].files.back() == last_file;
// update statistics
MeasureTime(options_->statistics.get(),
MeasureTime(ioptions_.statistics,
NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[0].size());
// mark all the files that are being compacted
@ -642,11 +592,12 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version,
c->is_full_compaction_ =
(c->inputs_[0].size() == c->input_version_->files_[0].size());
c->mutable_cf_options_ = mutable_cf_options;
return c;
}
uint32_t UniversalCompactionPicker::GetPathId(const Options& options,
uint64_t file_size) {
uint32_t UniversalCompactionPicker::GetPathId(
const ImmutableCFOptions& ioptions, uint64_t file_size) {
// Two conditions need to be satisfied:
// (1) the target path needs to be able to hold the file's size
// (2) Total size left in this and previous paths need to be not
@ -662,11 +613,11 @@ uint32_t UniversalCompactionPicker::GetPathId(const Options& options,
// considered in this algorithm. So the target size can be violated in
// that case. We need to improve it.
uint64_t accumulated_size = 0;
uint64_t future_size =
file_size * (100 - options.compaction_options_universal.size_ratio) / 100;
uint64_t future_size = file_size *
(100 - ioptions.compaction_options_universal.size_ratio) / 100;
uint32_t p = 0;
for (; p < options.db_paths.size() - 1; p++) {
uint64_t target_size = options.db_paths[p].target_size;
for (; p < ioptions.db_paths.size() - 1; p++) {
uint64_t target_size = ioptions.db_paths[p].target_size;
if (target_size > file_size &&
accumulated_size + (target_size - file_size) > future_size) {
return p;
@ -681,14 +632,15 @@ uint32_t UniversalCompactionPicker::GetPathId(const Options& options,
// the next file in time order.
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
Version* version, double score, unsigned int ratio,
const MutableCFOptions& mutable_cf_options, Version* version,
double score, unsigned int ratio,
unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) {
int level = 0;
unsigned int min_merge_width =
options_->compaction_options_universal.min_merge_width;
ioptions_.compaction_options_universal.min_merge_width;
unsigned int max_merge_width =
options_->compaction_options_universal.max_merge_width;
ioptions_.compaction_options_universal.max_merge_width;
// The files are sorted from newest first to oldest last.
const auto& files = version->files_[level];
@ -750,7 +702,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
if (sz < static_cast<double>(f->fd.GetFileSize())) {
break;
}
if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) {
if (ioptions_.compaction_options_universal.stop_style ==
kCompactionStopStyleSimilarSize) {
// Similar-size stopping rule: also check the last picked file isn't
// far larger than the next candidate file.
sz = (f->fd.GetFileSize() * (100.0 + ratio)) / 100.0;
@ -794,7 +747,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// size ratio of compression.
bool enable_compression = true;
int ratio_to_compress =
options_->compaction_options_universal.compression_size_percent;
ioptions_.compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = version->NumLevelBytes(level);
uint64_t older_file_size = 0;
@ -812,11 +765,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
for (unsigned int i = 0; i < first_index_after; i++) {
estimated_total_size += files[i]->fd.GetFileSize();
}
uint32_t path_id = GetPathId(*options_, estimated_total_size);
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
Compaction* c = new Compaction(
version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, path_id,
GetCompressionType(*options_, level, enable_compression));
version, level, level, mutable_cf_options.MaxFileSizeForLevel(level),
LLONG_MAX, path_id, GetCompressionType(ioptions_, level,
enable_compression));
c->score_ = score;
for (unsigned int i = start_index; i < first_index_after; i++) {
@ -841,11 +795,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
// min_merge_width and max_merge_width).
//
Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
Version* version, double score, LogBuffer* log_buffer) {
const MutableCFOptions& mutable_cf_options, Version* version,
double score, LogBuffer* log_buffer) {
int level = 0;
// percentage flexibilty while reducing size amplification
uint64_t ratio = options_->compaction_options_universal.
uint64_t ratio = ioptions_.compaction_options_universal.
max_size_amplification_percent;
// The files are sorted from newest first to oldest last.
@ -927,13 +882,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
for (unsigned int loop = start_index; loop < files.size(); loop++) {
estimated_total_size += files[loop]->fd.GetFileSize();
}
uint32_t path_id = GetPathId(*options_, estimated_total_size);
uint32_t path_id = GetPathId(ioptions_, estimated_total_size);
// create a compaction request
// We always compact all the files, so always compress.
Compaction* c =
new Compaction(version, level, level, MaxFileSizeForLevel(level),
LLONG_MAX, path_id, GetCompressionType(*options_, level));
new Compaction(version, level, level,
mutable_cf_options.MaxFileSizeForLevel(level),
LLONG_MAX, path_id, GetCompressionType(ioptions_, level));
c->score_ = score;
for (unsigned int loop = start_index; loop < files.size(); loop++) {
f = c->input_version_->files_[level][loop];
@ -948,22 +904,23 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
return c;
}
Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
LogBuffer* log_buffer) {
Compaction* FIFOCompactionPicker::PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) {
assert(version->NumberLevels() == 1);
uint64_t total_size = 0;
for (const auto& file : version->files_[0]) {
total_size += file->compensated_file_size;
}
if (total_size <= options_->compaction_options_fifo.max_table_files_size ||
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size ||
version->files_[0].size() == 0) {
// total size not exceeded
LogToBuffer(log_buffer,
"[%s] FIFO compaction: nothing to do. Total size %" PRIu64
", max size %" PRIu64 "\n",
version->cfd_->GetName().c_str(), total_size,
options_->compaction_options_fifo.max_table_files_size);
ioptions_.compaction_options_fifo.max_table_files_size);
return nullptr;
}
@ -988,28 +945,29 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version,
LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
version->cfd_->GetName().c_str(), f->fd.GetNumber(), tmp_fsize);
if (total_size <= options_->compaction_options_fifo.max_table_files_size) {
if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) {
break;
}
}
c->MarkFilesBeingCompacted(true);
compactions_in_progress_[0].insert(c);
c->mutable_cf_options_ = mutable_cf_options;
return c;
}
Compaction* FIFOCompactionPicker::CompactRange(
const MutableCFOptions& mutable_cf_options,
Version* version, int input_level, int output_level,
uint32_t output_path_id, const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) {
assert(input_level == 0);
assert(output_level == 0);
*compaction_end = nullptr;
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get());
Compaction* c = PickCompaction(version, &log_buffer);
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log);
Compaction* c = PickCompaction(mutable_cf_options, version, &log_buffer);
if (c != nullptr) {
assert(output_path_id < static_cast<uint32_t>(options_->db_paths.size()));
assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
c->output_path_id_ = output_path_id;
}
log_buffer.FlushBufferToLog();

@ -13,6 +13,7 @@
#include "rocksdb/status.h"
#include "rocksdb/options.h"
#include "rocksdb/env.h"
#include "util/mutable_cf_options.h"
#include <vector>
#include <memory>
@ -26,15 +27,17 @@ class Version;
class CompactionPicker {
public:
CompactionPicker(const Options* options, const InternalKeyComparator* icmp);
CompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp);
virtual ~CompactionPicker();
// Pick level and inputs for a new compaction.
// Returns nullptr if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
// describes the compaction. Caller should delete the result.
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) = 0;
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) = 0;
// Return a compaction object for compacting the range [begin,end] in
// the specified level. Returns nullptr if there is nothing in that
@ -47,11 +50,11 @@ class CompactionPicker {
// compaction_end will be set to nullptr.
// Client is responsible for compaction_end storage -- when called,
// *compaction_end should point to valid InternalKey!
virtual Compaction* CompactRange(Version* version, int input_level,
int output_level, uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end);
virtual Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options, Version* version,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end);
// Given the current number of levels, returns the lowest allowed level
// for compaction input.
@ -64,19 +67,8 @@ class CompactionPicker {
// compactions per level
void SizeBeingCompacted(std::vector<uint64_t>& sizes);
// Returns maximum total overlap bytes with grandparent
// level (i.e., level+2) before we stop building a single
// file in level->level+1 compaction.
uint64_t MaxGrandParentOverlapBytes(int level);
// Returns maximum total bytes of data on a given level.
double MaxBytesForLevel(int level);
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level) const;
protected:
int NumberLevels() const { return num_levels_; }
int NumberLevels() const { return ioptions_.num_levels; }
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
@ -103,8 +95,6 @@ class CompactionPicker {
// Will return false if it is impossible to apply this compaction.
bool ExpandWhileOverlapping(Compaction* c);
uint64_t ExpandedCompactionByteSizeLimit(int level);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(std::vector<FileMetaData*>& files);
@ -113,32 +103,30 @@ class CompactionPicker {
const InternalKey* largest, int level,
int* index);
void SetupOtherInputs(Compaction* c);
void SetupOtherInputs(const MutableCFOptions& mutable_cf_options,
Compaction* c);
const ImmutableCFOptions& ioptions_;
// record all the ongoing compactions for all levels
std::vector<std::set<Compaction*>> compactions_in_progress_;
// Per-level target file size.
std::unique_ptr<uint64_t[]> max_file_size_;
// Per-level max bytes
std::unique_ptr<uint64_t[]> level_max_bytes_;
const Options* const options_;
private:
int num_levels_;
const InternalKeyComparator* const icmp_;
int max_grandparent_overlap_factor_;
int expanded_compaction_factor_;
};
class UniversalCompactionPicker : public CompactionPicker {
public:
UniversalCompactionPicker(const Options* options,
UniversalCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
// The maxinum allowed input level. Always return 0.
virtual int MaxInputLevel(int current_num_levels) const override {
@ -147,27 +135,30 @@ class UniversalCompactionPicker : public CompactionPicker {
private:
// Pick Universal compaction to limit read amplification
Compaction* PickCompactionUniversalReadAmp(Version* version, double score,
unsigned int ratio,
unsigned int num_files,
LogBuffer* log_buffer);
Compaction* PickCompactionUniversalReadAmp(
const MutableCFOptions& mutable_cf_options,
Version* version, double score, unsigned int ratio,
unsigned int num_files, LogBuffer* log_buffer);
// Pick Universal compaction to limit space amplification.
Compaction* PickCompactionUniversalSizeAmp(Version* version, double score,
LogBuffer* log_buffer);
Compaction* PickCompactionUniversalSizeAmp(
const MutableCFOptions& mutable_cf_options,
Version* version, double score, LogBuffer* log_buffer);
// Pick a path ID to place a newly generated file, with its estimated file
// size.
static uint32_t GetPathId(const Options& options, uint64_t file_size);
static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
uint64_t file_size);
};
class LevelCompactionPicker : public CompactionPicker {
public:
LevelCompactionPicker(const Options* options,
LevelCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
// Returns current_num_levels - 2, meaning the last level cannot be
// compaction input level.
@ -180,23 +171,25 @@ class LevelCompactionPicker : public CompactionPicker {
// Returns nullptr if there is no compaction to be done.
// If level is 0 and there is already a compaction on that level, this
// function will return nullptr.
Compaction* PickCompactionBySize(Version* version, int level, double score);
Compaction* PickCompactionBySize(const MutableCFOptions& mutable_cf_options,
Version* version, int level, double score);
};
class FIFOCompactionPicker : public CompactionPicker {
public:
FIFOCompactionPicker(const Options* options,
FIFOCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp)
: CompactionPicker(options, icmp) {}
: CompactionPicker(ioptions, icmp) {}
virtual Compaction* PickCompaction(Version* version,
LogBuffer* log_buffer) override;
virtual Compaction* PickCompaction(
const MutableCFOptions& mutable_cf_options,
Version* version, LogBuffer* log_buffer) override;
virtual Compaction* CompactRange(Version* version, int input_level,
int output_level, uint32_t output_path_id,
const InternalKey* begin,
const InternalKey* end,
InternalKey** compaction_end) override;
virtual Compaction* CompactRange(
const MutableCFOptions& mutable_cf_options, Version* version,
int input_level, int output_level, uint32_t output_path_id,
const InternalKey* begin, const InternalKey* end,
InternalKey** compaction_end) override;
// The maxinum allowed input level. Always return 0.
virtual int MaxInputLevel(int current_num_levels) const override {

@ -1410,7 +1410,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(max_log_number + 1);
status = versions_->LogAndApply(cfd, edit, &mutex_);
status = versions_->LogAndApply(
cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
if (!status.ok()) {
// Recovery failed
break;
@ -1479,8 +1480,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
}
Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
autovector<MemTable*>& mems, VersionEdit* edit,
uint64_t* filenumber, LogBuffer* log_buffer) {
const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
@ -1560,7 +1562,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
if (base != nullptr && db_options_.max_background_compactions <= 1 &&
db_options_.max_background_flushes == 0 &&
cfd->ioptions()->compaction_style == kCompactionStyleLevel) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
level = base->PickLevelForMemTableOutput(
mutable_cf_options, min_user_key, max_user_key);
}
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
@ -1577,10 +1580,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
return s;
}
Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
bool* madeProgress,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer) {
mutex_.AssertHeld();
assert(cfd->imm()->size() != 0);
assert(cfd->imm()->IsFlushPending());
@ -1607,8 +1609,10 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
edit->SetLogNumber(mems.back()->GetNextLogNumber());
edit->SetColumnFamily(cfd->GetID());
// This will release and re-acquire the mutex.
Status s = WriteLevel0Table(cfd, mems, edit, &file_number, log_buffer);
Status s = WriteLevel0Table(cfd, mutable_cf_options, mems, edit,
&file_number, log_buffer);
if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) {
s = Status::ShutdownInProgress(
@ -1620,14 +1624,13 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd,
} else {
// Replace immutable memtable with the generated Table
s = cfd->imm()->InstallMemtableFlushResults(
cfd, mems, versions_.get(), &mutex_, db_options_.info_log.get(),
file_number, &pending_outputs_, &deletion_state.memtables_to_free,
db_directory_.get(), log_buffer);
cfd, mutable_cf_options, mems, versions_.get(), &mutex_,
db_options_.info_log.get(), file_number, &pending_outputs_,
&deletion_state.memtables_to_free, db_directory_.get(), log_buffer);
}
if (s.ok()) {
// Use latest MutableCFOptions
InstallSuperVersion(cfd, deletion_state);
InstallSuperVersion(cfd, deletion_state, mutable_cf_options);
if (madeProgress) {
*madeProgress = 1;
}
@ -1726,7 +1729,8 @@ bool DBImpl::SetOptions(ColumnFamilyHandle* column_family,
}
// return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) {
int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level) {
mutex_.AssertHeld();
Version* current = cfd->current();
int minimum_level = level;
@ -1734,7 +1738,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) {
// stop if level i is not empty
if (current->NumLevelFiles(i) > 0) break;
// stop if level i is too small (cannot fit the level files)
if (cfd->compaction_picker()->MaxBytesForLevel(i) <
if (mutable_cf_options.MaxBytesForLevel(i) <
current->NumLevelBytes(level)) {
break;
}
@ -1770,10 +1774,12 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
bg_cv_.Wait();
}
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
// move to a smaller level
int to_level = target_level;
if (target_level < 0) {
to_level = FindMinimumEmptyLevelFitting(cfd, level);
to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level);
}
assert(to_level <= level);
@ -1794,9 +1800,10 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
Log(db_options_.info_log, "[%s] Apply version edit:\n%s",
cfd->GetName().c_str(), edit.DebugString().data());
status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
// Use latest MutableCFOptions
superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_);
status = versions_->LogAndApply(cfd,
mutable_cf_options, &edit, &mutex_, db_directory_.get());
superversion_to_free = cfd->InstallSuperVersion(
new_superversion, &mutex_, mutable_cf_options);
new_superversion = nullptr;
Log(db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(),
@ -2058,6 +2065,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
for (auto cfd : *versions_->GetColumnFamilySet()) {
cfd->Ref();
Status flush_status;
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
while (flush_status.ok() && cfd->imm()->IsFlushPending()) {
LogToBuffer(
log_buffer,
@ -2065,8 +2074,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress,
"family [%s], flush slots available %d",
cfd->GetName().c_str(),
db_options_.max_background_flushes - bg_flush_scheduled_);
flush_status = FlushMemTableToOutputFile(cfd, madeProgress,
deletion_state, log_buffer);
flush_status = FlushMemTableToOutputFile(
cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer);
}
if (call_status.ok() && !flush_status.ok()) {
call_status = flush_status;
@ -2259,6 +2268,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// FLUSH preempts compaction
Status flush_stat;
for (auto cfd : *versions_->GetColumnFamilySet()) {
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
while (cfd->imm()->IsFlushPending()) {
LogToBuffer(
log_buffer,
@ -2266,8 +2277,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
"compaction slots available %d",
db_options_.max_background_compactions - bg_compaction_scheduled_);
cfd->Ref();
flush_stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state,
log_buffer);
flush_stat = FlushMemTableToOutputFile(
cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer);
cfd->Unref();
if (!flush_stat.ok()) {
if (is_manual) {
@ -2281,15 +2292,18 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
}
}
// Compaction makes a copy of the latest MutableCFOptions. It should be used
// throughout the compaction procedure to make sure consistency. It will
// eventually be installed into SuperVersion
unique_ptr<Compaction> c;
InternalKey manual_end_storage;
InternalKey* manual_end = &manual_end_storage;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
assert(m->in_progress);
c.reset(m->cfd->CompactRange(m->input_level, m->output_level,
m->output_path_id, m->begin, m->end,
&manual_end));
c.reset(m->cfd->CompactRange(
*m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level,
m->output_path_id, m->begin, m->end, &manual_end));
if (!c) {
m->done = true;
}
@ -2306,7 +2320,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
// no need to refcount in iteration since it's always under a mutex
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->options()->disable_auto_compactions) {
c.reset(cfd->PickCompaction(log_buffer));
// NOTE: try to avoid unnecessary copy of MutableCFOptions if
// compaction is not necessary. Need to make sure mutex is held
// until we make a copy in the following code
c.reset(cfd->PickCompaction(
*cfd->GetLatestMutableCFOptions(), log_buffer));
if (c != nullptr) {
// update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,
@ -2331,10 +2349,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
}
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
// Use latest MutableCFOptions
InstallSuperVersion(c->column_family_data(), deletion_state);
status = versions_->LogAndApply(
c->column_family_data(), *c->mutable_cf_options(), c->edit(),
&mutex_, db_directory_.get());
InstallSuperVersion(c->column_family_data(), deletion_state,
*c->mutable_cf_options());
LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
@ -2348,10 +2367,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_,
db_directory_.get());
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(),
c->edit(), &mutex_, db_directory_.get());
// Use latest MutableCFOptions
InstallSuperVersion(c->column_family_data(), deletion_state);
InstallSuperVersion(c->column_family_data(), deletion_state,
*c->mutable_cf_options());
Version::LevelSummaryStorage tmp;
LogToBuffer(
@ -2366,7 +2387,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
} else {
MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel.
CompactionState* compact = new CompactionState(c.get());
status = DoCompactionWork(compact, deletion_state, log_buffer);
status = DoCompactionWork(compact, *c->mutable_cf_options(),
deletion_state, log_buffer);
CleanupCompaction(compact, status);
c->ReleaseCompactionFiles(status);
c->ReleaseInputs();
@ -2468,7 +2490,8 @@ void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) {
}
}
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
Status DBImpl::OpenCompactionOutputFile(
CompactionState* compact, const MutableCFOptions& mutable_cf_options) {
assert(compact != nullptr);
assert(compact->builder == nullptr);
uint64_t file_number;
@ -2500,7 +2523,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
if (s.ok()) {
compact->outfile->SetIOPriority(Env::IO_LOW);
compact->outfile->SetPreallocationBlockSize(
compact->compaction->OutputFilePreallocationSize());
compact->compaction->OutputFilePreallocationSize(mutable_cf_options));
ColumnFamilyData* cfd = compact->compaction->column_family_data();
compact->builder.reset(NewTableBuilder(
@ -2570,7 +2593,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Status DBImpl::InstallCompactionResults(CompactionState* compact,
LogBuffer* log_buffer) {
const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer) {
mutex_.AssertHeld();
// paranoia: verify that the files that we started with
@ -2604,6 +2627,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact,
out.smallest_seqno, out.largest_seqno);
}
return versions_->LogAndApply(compact->compaction->column_family_data(),
mutable_cf_options,
compact->compaction->edit(), &mutex_,
db_directory_.get());
}
@ -2635,8 +2659,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot(
}
uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state,
LogBuffer* log_buffer) {
if (db_options_.max_background_flushes > 0) {
// flush thread will take care of this
return 0;
@ -2646,7 +2670,8 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
mutex_.Lock();
if (cfd->imm()->IsFlushPending()) {
cfd->Ref();
FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer);
FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr,
deletion_state, log_buffer);
cfd->Unref();
bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary
}
@ -2658,6 +2683,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd,
}
Status DBImpl::ProcessKeyValueCompaction(
const MutableCFOptions& mutable_cf_options,
bool is_snapshot_supported,
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
@ -2721,7 +2747,8 @@ Status DBImpl::ProcessKeyValueCompaction(
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer);
imm_micros += CallFlushDuringCompaction(
cfd, mutable_cf_options, deletion_state, log_buffer);
Slice key;
Slice value;
@ -2922,7 +2949,7 @@ Status DBImpl::ProcessKeyValueCompaction(
// Open output file if necessary
if (compact->builder == nullptr) {
status = OpenCompactionOutputFile(compact);
status = OpenCompactionOutputFile(compact, mutable_cf_options);
if (!status.ok()) {
break;
}
@ -3059,6 +3086,7 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact,
}
Status DBImpl::DoCompactionWork(CompactionState* compact,
const MutableCFOptions& mutable_cf_options,
DeletionState& deletion_state,
LogBuffer* log_buffer) {
assert(compact);
@ -3129,6 +3157,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
if (!compaction_filter_v2) {
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
@ -3158,7 +3187,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// TODO(icanadi) this currently only checks if flush is necessary on
// compacting column family. we should also check if flush is necessary on
// other column families, too
imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer);
imm_micros += CallFlushDuringCompaction(cfd, mutable_cf_options,
deletion_state, log_buffer);
Slice key = backup_input->key();
Slice value = backup_input->value();
@ -3208,6 +3238,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// Done buffering for the current prefix. Spit it out to disk
// Now just iterate through all the kv-pairs
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
@ -3244,6 +3275,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
@ -3266,6 +3298,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
}
compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator());
status = ProcessKeyValueCompaction(
mutable_cf_options,
is_snapshot_supported,
visible_at_tip,
earliest_snapshot,
@ -3333,9 +3366,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
ReleaseCompactionUnusedFileNumbers(compact);
if (status.ok()) {
status = InstallCompactionResults(compact, log_buffer);
// Use latest MutableCFOptions
InstallSuperVersion(cfd, deletion_state);
status = InstallCompactionResults(compact, mutable_cf_options, log_buffer);
InstallSuperVersion(cfd, deletion_state, mutable_cf_options);
}
Version::LevelSummaryStorage tmp;
LogToBuffer(
@ -3434,16 +3466,16 @@ Status DBImpl::Get(const ReadOptions& options,
// first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free
void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state) {
void DBImpl::InstallSuperVersion(
ColumnFamilyData* cfd, DeletionState& deletion_state,
const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld();
// if new_superversion == nullptr, it means somebody already used it
SuperVersion* new_superversion =
(deletion_state.new_superversion != nullptr) ?
deletion_state.new_superversion : new SuperVersion();
// Use latest MutableCFOptions
SuperVersion* old_superversion =
cfd->InstallSuperVersion(new_superversion, &mutex_);
cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options);
deletion_state.new_superversion = nullptr;
deletion_state.superversions_to_free.push_back(old_superversion);
}
@ -3627,15 +3659,17 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
// LogAndApply will both write the creation in MANIFEST and create
// ColumnFamilyData object
Status s = versions_->LogAndApply(nullptr, &edit, &mutex_,
db_directory_.get(), false, &options);
Options opt(db_options_, options);
Status s = versions_->LogAndApply(nullptr,
MutableCFOptions(opt, ImmutableCFOptions(opt)),
&edit, &mutex_, db_directory_.get(), false, &options);
if (s.ok()) {
single_column_family_mode_ = false;
auto cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
// Use latest MutableCFOptions
delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_);
delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_,
*cfd->GetLatestMutableCFOptions());
*handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_);
Log(db_options_.info_log, "Created column family [%s] (ID %u)",
column_family_name.c_str(), (unsigned)cfd->GetID());
@ -3671,7 +3705,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
WriteThread::Writer w(&mutex_);
s = write_thread_.EnterWriteThread(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job
s = versions_->LogAndApply(cfd, &edit, &mutex_);
s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_);
write_thread_.ExitWriteThread(&w, &w, s);
}
}
@ -4450,9 +4485,11 @@ Status DBImpl::DeleteFile(std::string name) {
}
}
edit.DeleteFile(level, number);
status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get());
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, db_directory_.get());
if (status.ok()) {
InstallSuperVersion(cfd, deletion_state);
InstallSuperVersion(cfd, deletion_state,
*cfd->GetLatestMutableCFOptions());
}
FindObsoleteFiles(deletion_state, false);
} // lock released here
@ -4681,8 +4718,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
}
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
// Use latest MutableCFOptions
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_);
delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_,
*cfd->GetLatestMutableCFOptions());
}
impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_));

@ -347,9 +347,9 @@ class DBImpl : public DB {
// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful.
Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress,
DeletionState& deletion_state,
LogBuffer* log_buffer);
Status FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer);
// REQUIRES: log_numbers are sorted in ascending order
Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
@ -362,9 +362,10 @@ class DBImpl : public DB {
// concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
VersionEdit* edit);
Status WriteLevel0Table(ColumnFamilyData* cfd, autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber,
LogBuffer* log_buffer);
Status WriteLevel0Table(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems,
VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer);
void DelayWrite(uint64_t expiration_time);
@ -393,6 +394,7 @@ class DBImpl : public DB {
LogBuffer* log_buffer);
void CleanupCompaction(CompactionState* compact, Status status);
Status DoCompactionWork(CompactionState* compact,
const MutableCFOptions& mutable_cf_options,
DeletionState& deletion_state,
LogBuffer* log_buffer);
@ -400,12 +402,13 @@ class DBImpl : public DB {
// preempt compaction, since it's higher prioirty
// Returns: micros spent executing
uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd,
DeletionState& deletion_state,
LogBuffer* log_buffer);
const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state,
LogBuffer* log_buffer);
// Call compaction filter if is_compaction_v2 is not true. Then iterate
// through input and compact the kv-pairs
Status ProcessKeyValueCompaction(
const MutableCFOptions& mutable_cf_options,
bool is_snapshot_supported,
SequenceNumber visible_at_tip,
SequenceNumber earliest_snapshot,
@ -422,10 +425,11 @@ class DBImpl : public DB {
void CallCompactionFilterV2(CompactionState* compact,
CompactionFilterV2* compaction_filter_v2);
Status OpenCompactionOutputFile(CompactionState* compact);
Status OpenCompactionOutputFile(CompactionState* compact,
const MutableCFOptions& mutable_cf_options);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact,
LogBuffer* log_buffer);
const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer);
void AllocateCompactionOutputFileNumbers(CompactionState* compact);
void ReleaseCompactionUnusedFileNumbers(CompactionState* compact);
@ -467,7 +471,8 @@ class DBImpl : public DB {
// Return the minimum empty level that could hold the total data in the
// input level. Return the input level, if such level could not be found.
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level);
int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
const MutableCFOptions& mutable_cf_options, int level);
// Move the files in the input level to the target level.
// If target_level < 0, automatically calculate the minimum level that could
@ -621,7 +626,8 @@ class DBImpl : public DB {
// the cfd->InstallSuperVersion() function. Background threads carry
// deletion_state which can have new_superversion already allocated.
void InstallSuperVersion(ColumnFamilyData* cfd,
DeletionState& deletion_state);
DeletionState& deletion_state,
const MutableCFOptions& mutable_cf_options);
// Find Super version and reference it. Based on options, it might return
// the thread local cached one.

@ -874,6 +874,18 @@ class DBTest {
return atoi(property.c_str());
}
uint64_t SizeAtLevel(int level) {
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
uint64_t sum = 0;
for (const auto& m : metadata) {
if (m.level == level) {
sum += m.size;
}
}
return sum;
}
int TotalTableFiles(int cf = 0, int levels = -1) {
if (levels == -1) {
levels = CurrentOptions().num_levels;
@ -8527,6 +8539,102 @@ TEST(DBTest, DisableDataSyncTest) {
}
}
TEST(DBTest, DynamicCompactionOptions) {
const uint64_t k64KB = 1 << 16;
const uint64_t k128KB = 1 << 17;
const uint64_t k256KB = 1 << 18;
const uint64_t k5KB = 5 * 1024;
Options options;
options.env = env_;
options.create_if_missing = true;
options.compression = kNoCompression;
options.max_background_compactions = 4;
options.hard_rate_limit = 1.1;
options.write_buffer_size = k128KB;
options.max_write_buffer_number = 2;
// Compaction related options
options.level0_file_num_compaction_trigger = 3;
options.level0_slowdown_writes_trigger = 10;
options.level0_stop_writes_trigger = 20;
options.max_grandparent_overlap_factor = 10;
options.expanded_compaction_factor = 25;
options.source_compaction_factor = 1;
options.target_file_size_base = k128KB;
options.target_file_size_multiplier = 1;
options.max_bytes_for_level_base = k256KB;
options.max_bytes_for_level_multiplier = 4;
DestroyAndReopen(&options);
auto gen_l0_kb = [this](int start, int size, int stride = 1) {
Random rnd(301);
std::vector<std::string> values;
for (int i = 0; i < size; i++) {
values.push_back(RandomString(&rnd, 1024));
ASSERT_OK(Put(Key(start + stride * i), values[i]));
}
dbfull()->TEST_WaitForFlushMemTable();
};
// Write 3 files that have the same key range, trigger compaction and
// result in one L1 file
gen_l0_kb(0, 128);
ASSERT_EQ(NumTableFilesAtLevel(0), 1);
gen_l0_kb(0, 128);
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
gen_l0_kb(0, 128);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,1", FilesPerLevel());
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(1, metadata.size());
ASSERT_LE(metadata[0].size, k128KB + k5KB); // < 128KB + 5KB
ASSERT_GE(metadata[0].size, k128KB - k5KB); // > 128B - 5KB
// Make compaction trigger and file size smaller
ASSERT_TRUE(dbfull()->SetOptions({
{"level0_file_num_compaction_trigger", "2"},
{"target_file_size_base", "65536"}
}));
gen_l0_kb(0, 128);
ASSERT_EQ("1,1", FilesPerLevel());
gen_l0_kb(0, 128);
dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,2", FilesPerLevel());
metadata.clear();
db_->GetLiveFilesMetaData(&metadata);
ASSERT_EQ(2, metadata.size());
ASSERT_LE(metadata[0].size, k64KB + k5KB); // < 64KB + 5KB
ASSERT_GE(metadata[0].size, k64KB - k5KB); // > 64KB - 5KB
// Change base level size to 1MB
ASSERT_TRUE(dbfull()->SetOptions({ {"max_bytes_for_level_base", "1048576"} }));
// writing 56 x 128KB => 7MB
// (L1 + L2) = (1 + 4) * 1MB = 5MB
for (int i = 0; i < 56; ++i) {
gen_l0_kb(i, 128, 56);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(SizeAtLevel(1) < 1048576 * 1.1);
ASSERT_TRUE(SizeAtLevel(2) < 4 * 1048576 * 1.1);
// Change multiplier to 2 with smaller base
ASSERT_TRUE(dbfull()->SetOptions({
{"max_bytes_for_level_multiplier", "2"},
{"max_bytes_for_level_base", "262144"}
}));
// writing 16 x 128KB
// (L1 + L2 + L3) = (1 + 2 + 4) * 256KB
for (int i = 0; i < 16; ++i) {
gen_l0_kb(i, 128, 50);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1);
ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1);
ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1);
}
} // namespace rocksdb

@ -60,7 +60,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
}
ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu));
ASSERT_OK(vset->LogAndApply(default_cfd,
*default_cfd->GetLatestMutableCFOptions(), &vbase, &mu));
}
for (int i = 0; i < iters; i++) {
@ -69,7 +70,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKey start(MakeKey(2 * fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion);
vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1);
vset->LogAndApply(default_cfd, &vedit, &mu);
vset->LogAndApply(default_cfd, *default_cfd->GetLatestMutableCFOptions(),
&vedit, &mu);
}
delete vset;
}

@ -160,7 +160,8 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const autovector<MemTable*>& mems, VersionSet* vset,
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, VersionSet* vset,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer) {
@ -197,7 +198,7 @@ Status MemTableList::InstallMemtableFlushResults(
cfd->GetName().c_str(), (unsigned long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory);
s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory);
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable

@ -113,7 +113,8 @@ class MemTableList {
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(
ColumnFamilyData* cfd, const autovector<MemTable*>& m, VersionSet* vset,
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, VersionSet* vset,
port::Mutex* mu, Logger* info_log, uint64_t file_number,
FileNumToPathIdMap* pending_outputs, autovector<MemTable*>* to_delete,
Directory* db_directory, LogBuffer* log_buffer);

@ -220,7 +220,7 @@ class Repairer {
Slice record;
WriteBatch batch;
MemTable* mem = new MemTable(icmp_, ioptions_,
MemTableOptions(MutableCFOptions(options_), options_));
MemTableOptions(MutableCFOptions(options_, ioptions_), options_));
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_);
mem->Ref();
int counter = 0;

@ -672,7 +672,7 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
}
}
void Version::Get(const ReadOptions& options,
void Version::Get(const ReadOptions& read_options,
const LookupKey& k,
std::string* value,
Status* status,
@ -691,8 +691,8 @@ void Version::Get(const ReadOptions& options,
&file_indexer_, user_comparator_, internal_comparator_);
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
*status = table_cache_->Get(options, *internal_comparator_, f->fd, ikey,
&get_context);
*status = table_cache_->Get(read_options, *internal_comparator_, f->fd,
ikey, &get_context);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
@ -746,9 +746,10 @@ void Version::GenerateFileLevels() {
}
}
void Version::PrepareApply(std::vector<uint64_t>& size_being_compacted) {
void Version::PrepareApply(const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted) {
UpdateTemporaryStats();
ComputeCompactionScore(size_being_compacted);
ComputeCompactionScore(mutable_cf_options, size_being_compacted);
UpdateFilesBySize();
UpdateNumNonEmptyLevels();
file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_);
@ -817,13 +818,13 @@ void Version::UpdateTemporaryStats() {
}
void Version::ComputeCompactionScore(
const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted) {
double max_score = 0;
int max_score_level = 0;
int max_input_level =
cfd_->compaction_picker()->MaxInputLevel(NumberLevels());
for (int level = 0; level <= max_input_level; level++) {
double score;
if (level == 0) {
@ -849,21 +850,22 @@ void Version::ComputeCompactionScore(
if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) {
score = static_cast<double>(total_size) /
cfd_->options()->compaction_options_fifo.max_table_files_size;
} else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) {
} else if (numfiles >= mutable_cf_options.level0_stop_writes_trigger) {
// If we are slowing down writes, then we better compact that first
score = 1000000;
} else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) {
} else if (numfiles >=
mutable_cf_options.level0_slowdown_writes_trigger) {
score = 10000;
} else {
score = static_cast<double>(numfiles) /
cfd_->options()->level0_file_num_compaction_trigger;
mutable_cf_options.level0_file_num_compaction_trigger;
}
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes =
TotalCompensatedFileSize(files_[level]) - size_being_compacted[level];
score = static_cast<double>(level_bytes) /
cfd_->compaction_picker()->MaxBytesForLevel(level);
mutable_cf_options.MaxBytesForLevel(level);
if (max_score < score) {
max_score = score;
max_score_level = level;
@ -993,6 +995,7 @@ bool Version::OverlapInLevel(int level,
}
int Version::PickLevelForMemTableOutput(
const MutableCFOptions& mutable_cf_options,
const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
@ -1013,7 +1016,7 @@ int Version::PickLevelForMemTableOutput(
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const uint64_t sum = TotalFileSize(overlaps);
if (sum > cfd_->compaction_picker()->MaxGrandParentOverlapBytes(level)) {
if (sum > mutable_cf_options.MaxGrandParentOverlapBytes(level)) {
break;
}
level++;
@ -1246,7 +1249,7 @@ bool Version::HasOverlappingUserKey(
return false;
}
int64_t Version::NumLevelBytes(int level) const {
uint64_t Version::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < NumberLevels());
return TotalFileSize(files_[level]);
@ -1653,16 +1656,17 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
}
Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options,
VersionEdit* edit, port::Mutex* mu,
Directory* db_directory, bool new_descriptor_log,
const ColumnFamilyOptions* options) {
const ColumnFamilyOptions* new_cf_options) {
mu->AssertHeld();
// column_family_data can be nullptr only if this is column_family_add.
// in that case, we also need to specify ColumnFamilyOptions
if (column_family_data == nullptr) {
assert(edit->is_column_family_add_);
assert(options != nullptr);
assert(new_cf_options != nullptr);
}
// queue our request
@ -1777,7 +1781,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!edit->IsColumnFamilyManipulation()) {
// This is cpu-heavy operations, which should be called outside mutex.
v->PrepareApply(size_being_compacted);
v->PrepareApply(mutable_cf_options, size_being_compacted);
}
// Write new record to MANIFEST log
@ -1853,8 +1857,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (edit->is_column_family_add_) {
// no group commit on column family add
assert(batch_edits.size() == 1);
assert(options != nullptr);
CreateColumnFamily(*options, edit);
assert(new_cf_options != nullptr);
CreateColumnFamily(*new_cf_options, edit);
} else if (edit->is_column_family_drop_) {
assert(batch_edits.size() == 1);
column_family_data->SetDropped();
@ -2198,7 +2202,7 @@ Status VersionSet::Recover(
// Install recovered version
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(size_being_compacted);
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted);
AppendVersion(cfd, v);
}
@ -2374,11 +2378,13 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
current_version->files_ = new_files_list;
current_version->num_levels_ = new_levels;
MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options));
VersionEdit ve;
port::Mutex dummy_mutex;
MutexLock l(&dummy_mutex);
return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(), &ve,
&dummy_mutex, nullptr, true);
return versions.LogAndApply(
versions.GetColumnFamilySet()->GetDefault(),
mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
}
Status VersionSet::DumpManifest(Options& options, std::string& dscname,
@ -2530,7 +2536,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
builder->SaveTo(v);
std::vector<uint64_t> size_being_compacted(v->NumberLevels() - 1);
cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted);
v->PrepareApply(size_being_compacted);
v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted);
delete builder;
printf("--------------- Column family \"%s\" (ID %u) --------------\n",

@ -103,14 +103,18 @@ class Version {
// We use compaction scores to figure out which compaction to do next
// REQUIRES: If Version is not yet saved to current_, it can be called without
// a lock. Once a version is saved to current_, call only with mutex held
void ComputeCompactionScore(std::vector<uint64_t>& size_being_compacted);
void ComputeCompactionScore(
const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted);
// Generate file_levels_ from files_
void GenerateFileLevels();
// Update scores, pre-calculated variables. It needs to be called before
// applying the version to the version set.
void PrepareApply(std::vector<uint64_t>& size_being_compacted);
void PrepareApply(
const MutableCFOptions& mutable_cf_options,
std::vector<uint64_t>& size_being_compacted);
// Reference count management (so Versions do not disappear out from
// under live iterators)
@ -169,7 +173,8 @@ class Version {
// Return the level at which we should place a new memtable compaction
// result that covers the range [smallest_user_key,largest_user_key].
int PickLevelForMemTableOutput(const Slice& smallest_user_key,
int PickLevelForMemTableOutput(const MutableCFOptions& mutable_cf_options,
const Slice& smallest_user_key,
const Slice& largest_user_key);
int NumberLevels() const { return num_levels_; }
@ -178,7 +183,7 @@ class Version {
int NumLevelFiles(int level) const { return files_[level].size(); }
// Return the combined file size of all files at the specified level.
int64_t NumLevelBytes(int level) const;
uint64_t NumLevelBytes(int level) const;
// Return a human-readable short (single-line) summary of the number
// of files per level. Uses *scratch as backing store.
@ -369,7 +374,9 @@ class VersionSet {
// column_family_options has to be set if edit is column family add
// REQUIRES: *mu is held on entry.
// REQUIRES: no other thread concurrently calls LogAndApply()
Status LogAndApply(ColumnFamilyData* column_family_data, VersionEdit* edit,
Status LogAndApply(ColumnFamilyData* column_family_data,
const MutableCFOptions& mutable_cf_options,
VersionEdit* edit,
port::Mutex* mu, Directory* db_directory = nullptr,
bool new_descriptor_log = false,
const ColumnFamilyOptions* column_family_options =

@ -27,8 +27,9 @@ static std::string PrintContents(WriteBatch* b) {
auto factory = std::make_shared<SkipListFactory>();
Options options;
options.memtable_factory = factory;
MemTable* mem = new MemTable(cmp, ImmutableCFOptions(options),
MemTableOptions(MutableCFOptions(options), options));
ImmutableCFOptions ioptions(options);
MemTable* mem = new MemTable(cmp, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
mem->Ref();
std::string state;
ColumnFamilyMemTablesDefault cf_mems_default(mem, &options);

@ -22,6 +22,7 @@ struct ImmutableCFOptions {
CompactionStyle compaction_style;
CompactionOptionsUniversal compaction_options_universal;
CompactionOptionsFIFO compaction_options_fifo;
const SliceTransform* prefix_extractor;
@ -79,6 +80,8 @@ struct ImmutableCFOptions {
CompressionOptions compression_opts;
Options::AccessHint access_hint_on_compaction_start;
int num_levels;
};
} // namespace rocksdb

@ -437,8 +437,9 @@ class MemTableConstructor: public Constructor {
table_factory_(new SkipListFactory) {
Options options;
options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, ImmutableCFOptions(options),
MemTableOptions(MutableCFOptions(options), options));
ImmutableCFOptions ioptions(options);
memtable_ = new MemTable(internal_comparator_, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
memtable_->Ref();
}
~MemTableConstructor() {
@ -452,8 +453,9 @@ class MemTableConstructor: public Constructor {
delete memtable_->Unref();
Options options;
options.memtable_factory = table_factory_;
memtable_ = new MemTable(internal_comparator_, ImmutableCFOptions(options),
MemTableOptions(MutableCFOptions(options), options));
ImmutableCFOptions mem_ioptions(options);
memtable_ = new MemTable(internal_comparator_, mem_ioptions,
MemTableOptions(MutableCFOptions(options, mem_ioptions), options));
memtable_->Ref();
int seq = 1;
for (KVMap::const_iterator it = data.begin();
@ -1864,8 +1866,9 @@ TEST(MemTableTest, Simple) {
auto table_factory = std::make_shared<SkipListFactory>();
Options options;
options.memtable_factory = table_factory;
MemTable* memtable = new MemTable(cmp, ImmutableCFOptions(options),
MemTableOptions(MutableCFOptions(options), options));
ImmutableCFOptions ioptions(options);
MemTable* memtable = new MemTable(cmp, ioptions,
MemTableOptions(MutableCFOptions(options, ioptions), options));
memtable->Ref();
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);

@ -0,0 +1,72 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <limits>
#include <cassert>
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h"
#include "util/mutable_cf_options.h"
namespace rocksdb {
namespace {
// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) {
if (op1 == 0) {
return 0;
}
if (op2 <= 0) {
return op1;
}
uint64_t casted_op2 = (uint64_t) op2;
if (std::numeric_limits<uint64_t>::max() / op1 < casted_op2) {
return op1;
}
return op1 * casted_op2;
}
} // anonymous namespace
void MutableCFOptions::RefreshDerivedOptions(
const ImmutableCFOptions& ioptions) {
max_file_size.resize(ioptions.num_levels);
level_max_bytes.resize(ioptions.num_levels);
for (int i = 0; i < ioptions.num_levels; ++i) {
if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
max_file_size[i] = ULLONG_MAX;
level_max_bytes[i] = max_bytes_for_level_base;
} else if (i > 1) {
max_file_size[i] = MultiplyCheckOverflow(max_file_size[i - 1],
target_file_size_multiplier);
level_max_bytes[i] = MultiplyCheckOverflow(
MultiplyCheckOverflow(level_max_bytes[i - 1],
max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional[i - 1]);
} else {
max_file_size[i] = target_file_size_base;
level_max_bytes[i] = max_bytes_for_level_base;
}
}
}
uint64_t MutableCFOptions::MaxFileSizeForLevel(int level) const {
assert(level >= 0);
assert(level < (int)max_file_size.size());
return max_file_size[level];
}
uint64_t MutableCFOptions::MaxBytesForLevel(int level) const {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < (int)level_max_bytes.size());
return level_max_bytes[level];
}
uint64_t MutableCFOptions::MaxGrandParentOverlapBytes(int level) const {
return MaxFileSizeForLevel(level) * max_grandparent_overlap_factor;
}
uint64_t MutableCFOptions::ExpandedCompactionByteSizeLimit(int level) const {
return MaxFileSizeForLevel(level) * expanded_compaction_factor;
}
} // namespace rocksdb

@ -5,12 +5,14 @@
#pragma once
#include <vector>
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h"
namespace rocksdb {
struct MutableCFOptions {
explicit MutableCFOptions(const Options& options)
MutableCFOptions(const Options& options, const ImmutableCFOptions& ioptions)
: write_buffer_size(options.write_buffer_size),
arena_block_size(options.arena_block_size),
memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits),
@ -18,7 +20,22 @@ struct MutableCFOptions {
memtable_prefix_bloom_huge_page_tlb_size(
options.memtable_prefix_bloom_huge_page_tlb_size),
max_successive_merges(options.max_successive_merges),
filter_deletes(options.filter_deletes) {
filter_deletes(options.filter_deletes),
level0_file_num_compaction_trigger(
options.level0_file_num_compaction_trigger),
level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger),
level0_stop_writes_trigger(options.level0_stop_writes_trigger),
max_grandparent_overlap_factor(options.max_grandparent_overlap_factor),
expanded_compaction_factor(options.expanded_compaction_factor),
source_compaction_factor(options.source_compaction_factor),
target_file_size_base(options.target_file_size_base),
target_file_size_multiplier(options.target_file_size_multiplier),
max_bytes_for_level_base(options.max_bytes_for_level_base),
max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier),
max_bytes_for_level_multiplier_additional(
options.max_bytes_for_level_multiplier_additional)
{
RefreshDerivedOptions(ioptions);
}
MutableCFOptions()
: write_buffer_size(0),
@ -27,8 +44,33 @@ struct MutableCFOptions {
memtable_prefix_bloom_probes(0),
memtable_prefix_bloom_huge_page_tlb_size(0),
max_successive_merges(0),
filter_deletes(false) {}
filter_deletes(false),
level0_file_num_compaction_trigger(0),
level0_slowdown_writes_trigger(0),
level0_stop_writes_trigger(0),
max_grandparent_overlap_factor(0),
expanded_compaction_factor(0),
source_compaction_factor(0),
target_file_size_base(0),
target_file_size_multiplier(0),
max_bytes_for_level_base(0),
max_bytes_for_level_multiplier(0)
{}
// Must be called after any change to MutableCFOptions
void RefreshDerivedOptions(const ImmutableCFOptions& ioptions);
// Get the max file size in a given level.
uint64_t MaxFileSizeForLevel(int level) const;
// Returns maximum total bytes of data on a given level.
uint64_t MaxBytesForLevel(int level) const;
// Returns maximum total overlap bytes with grandparent
// level (i.e., level+2) before we stop building a single
// file in level->level+1 compaction.
uint64_t MaxGrandParentOverlapBytes(int level) const;
uint64_t ExpandedCompactionByteSizeLimit(int level) const;
// Memtable related options
size_t write_buffer_size;
size_t arena_block_size;
uint32_t memtable_prefix_bloom_bits;
@ -36,6 +78,25 @@ struct MutableCFOptions {
size_t memtable_prefix_bloom_huge_page_tlb_size;
size_t max_successive_merges;
bool filter_deletes;
// Compaction related options
int level0_file_num_compaction_trigger;
int level0_slowdown_writes_trigger;
int level0_stop_writes_trigger;
int max_grandparent_overlap_factor;
int expanded_compaction_factor;
int source_compaction_factor;
int target_file_size_base;
int target_file_size_multiplier;
uint64_t max_bytes_for_level_base;
int max_bytes_for_level_multiplier;
std::vector<int> max_bytes_for_level_multiplier_additional;
// Derived options
// Per-level target file size.
std::vector<uint64_t> max_file_size;
// Per-level max bytes
std::vector<uint64_t> level_max_bytes;
};
} // namespace rocksdb

@ -35,6 +35,7 @@ namespace rocksdb {
ImmutableCFOptions::ImmutableCFOptions(const Options& options)
: compaction_style(options.compaction_style),
compaction_options_universal(options.compaction_options_universal),
compaction_options_fifo(options.compaction_options_fifo),
prefix_extractor(options.prefix_extractor.get()),
comparator(options.comparator),
merge_operator(options.merge_operator.get()),
@ -60,7 +61,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
compression(options.compression),
compression_per_level(options.compression_per_level),
compression_opts(options.compression_opts),
access_hint_on_compaction_start(options.access_hint_on_compaction_start) {}
access_hint_on_compaction_start(options.access_hint_on_compaction_start),
num_levels(options.num_levels) {}
ColumnFamilyOptions::ColumnFamilyOptions()
: comparator(BytewiseComparator()),

@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include <cassert>
#include <unordered_set>
#include "rocksdb/options.h"
#include "util/options_helper.h"
@ -73,8 +74,8 @@ CompactionStyle ParseCompactionStyle(const std::string& type) {
} // anonymouse namespace
template<typename OptionsType>
bool ParseMemtableOption(const std::string& name, const std::string& value,
OptionsType* new_options) {
bool ParseMemtableOptions(const std::string& name, const std::string& value,
OptionsType* new_options) {
if (name == "write_buffer_size") {
new_options->write_buffer_size = ParseInt64(value);
} else if (name == "arena_block_size") {
@ -96,6 +97,50 @@ bool ParseMemtableOption(const std::string& name, const std::string& value,
return true;
}
template<typename OptionsType>
bool ParseCompactionOptions(const std::string& name, const std::string& value,
OptionsType* new_options) {
if (name == "level0_file_num_compaction_trigger") {
new_options->level0_file_num_compaction_trigger = ParseInt(value);
} else if (name == "level0_slowdown_writes_trigger") {
new_options->level0_slowdown_writes_trigger = ParseInt(value);
} else if (name == "level0_stop_writes_trigger") {
new_options->level0_stop_writes_trigger = ParseInt(value);
} else if (name == "max_grandparent_overlap_factor") {
new_options->max_grandparent_overlap_factor = ParseInt(value);
} else if (name == "expanded_compaction_factor") {
new_options->expanded_compaction_factor = ParseInt(value);
} else if (name == "source_compaction_factor") {
new_options->source_compaction_factor = ParseInt(value);
} else if (name == "target_file_size_base") {
new_options->target_file_size_base = ParseInt(value);
} else if (name == "target_file_size_multiplier") {
new_options->target_file_size_multiplier = ParseInt(value);
} else if (name == "max_bytes_for_level_base") {
new_options->max_bytes_for_level_base = ParseUint64(value);
} else if (name == "max_bytes_for_level_multiplier") {
new_options->max_bytes_for_level_multiplier = ParseInt(value);
} else if (name == "max_bytes_for_level_multiplier_additional") {
new_options->max_bytes_for_level_multiplier_additional.clear();
size_t start = 0;
while (true) {
size_t end = value.find_first_of(':', start);
if (end == std::string::npos) {
new_options->max_bytes_for_level_multiplier_additional.push_back(
ParseInt(value.substr(start)));
break;
} else {
new_options->max_bytes_for_level_multiplier_additional.push_back(
ParseInt(value.substr(start, end - start)));
start = end + 1;
}
}
} else {
return false;
}
return true;
}
bool GetMutableOptionsFromStrings(
const MutableCFOptions& base_options,
const std::unordered_map<std::string, std::string>& options_map,
@ -104,7 +149,8 @@ bool GetMutableOptionsFromStrings(
*new_options = base_options;
try {
for (const auto& o : options_map) {
if (ParseMemtableOption(o.first, o.second, new_options)) {
if (ParseMemtableOptions(o.first, o.second, new_options)) {
} else if (ParseCompactionOptions(o.first, o.second, new_options)) {
} else {
return false;
}
@ -123,7 +169,8 @@ bool GetOptionsFromStrings(
*new_options = base_options;
for (const auto& o : options_map) {
try {
if (ParseMemtableOption(o.first, o.second, new_options)) {
if (ParseMemtableOptions(o.first, o.second, new_options)) {
} else if (ParseCompactionOptions(o.first, o.second, new_options)) {
} else if (o.first == "max_write_buffer_number") {
new_options->max_write_buffer_number = ParseInt(o.second);
} else if (o.first == "min_write_buffer_number_to_merge") {
@ -168,43 +215,8 @@ bool GetOptionsFromStrings(
ParseInt(o.second.substr(start, o.second.size() - start));
} else if (o.first == "num_levels") {
new_options->num_levels = ParseInt(o.second);
} else if (o.first == "level0_file_num_compaction_trigger") {
new_options->level0_file_num_compaction_trigger = ParseInt(o.second);
} else if (o.first == "level0_slowdown_writes_trigger") {
new_options->level0_slowdown_writes_trigger = ParseInt(o.second);
} else if (o.first == "level0_stop_writes_trigger") {
new_options->level0_stop_writes_trigger = ParseInt(o.second);
} else if (o.first == "max_mem_compaction_level") {
new_options->max_mem_compaction_level = ParseInt(o.second);
} else if (o.first == "target_file_size_base") {
new_options->target_file_size_base = ParseUint64(o.second);
} else if (o.first == "target_file_size_multiplier") {
new_options->target_file_size_multiplier = ParseInt(o.second);
} else if (o.first == "max_bytes_for_level_base") {
new_options->max_bytes_for_level_base = ParseUint64(o.second);
} else if (o.first == "max_bytes_for_level_multiplier") {
new_options->max_bytes_for_level_multiplier = ParseInt(o.second);
} else if (o.first == "max_bytes_for_level_multiplier_additional") {
new_options->max_bytes_for_level_multiplier_additional.clear();
size_t start = 0;
while (true) {
size_t end = o.second.find_first_of(':', start);
if (end == std::string::npos) {
new_options->max_bytes_for_level_multiplier_additional.push_back(
ParseInt(o.second.substr(start)));
break;
} else {
new_options->max_bytes_for_level_multiplier_additional.push_back(
ParseInt(o.second.substr(start, end - start)));
start = end + 1;
}
}
} else if (o.first == "expanded_compaction_factor") {
new_options->expanded_compaction_factor = ParseInt(o.second);
} else if (o.first == "source_compaction_factor") {
new_options->source_compaction_factor = ParseInt(o.second);
} else if (o.first == "max_grandparent_overlap_factor") {
new_options->max_grandparent_overlap_factor = ParseInt(o.second);
} else if (o.first == "soft_rate_limit") {
new_options->soft_rate_limit = ParseDouble(o.second);
} else if (o.first == "hard_rate_limit") {

Loading…
Cancel
Save