[rocksdb] make more options dynamic

Summary:
make more ColumnFamilyOptions dynamic:
- compression
- soft_pending_compaction_bytes_limit
- hard_pending_compaction_bytes_limit
- min_partial_merge_operands
- report_bg_io_stats
- paranoid_file_checks

Test Plan:
Add sanity check in `db_test.cc` for all above options except for soft_pending_compaction_bytes_limit and hard_pending_compaction_bytes_limit.
All passed.

Reviewers: andrewkr, sdong, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D57519
main
Aaron Gao 8 years ago
parent bac3be7c46
commit 43afd72bee
  1. 6
      db/builder.cc
  2. 5
      db/builder.h
  3. 17
      db/compaction.cc
  4. 8
      db/compaction_job.cc
  5. 1
      db/compaction_job.h
  6. 159
      db/compaction_picker.cc
  7. 63
      db/compaction_picker.h
  8. 1
      db/compaction_picker_test.cc
  9. 29
      db/db_impl.cc
  10. 4
      db/db_impl.h
  11. 10
      db/db_impl_debug.cc
  12. 16
      db/db_sst_test.cc
  13. 84
      db/db_test.cc
  14. 8
      db/flush_job.cc
  15. 6
      db/repair.cc
  16. 4
      include/rocksdb/immutable_options.h
  17. 4
      include/rocksdb/sst_file_writer.h
  18. 12
      table/sst_file_writer.cc
  19. 15
      util/mutable_cf_options.h
  20. 2
      util/options.cc
  21. 10
      util/options_helper.cc

@ -59,8 +59,8 @@ TableBuilder* NewTableBuilder(
Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions,
const EnvOptions& env_options, TableCache* table_cache,
InternalIterator* iter, FileMetaData* meta,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
TableCache* table_cache, InternalIterator* iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,
@ -113,7 +113,7 @@ Status BuildTable(
MergeHelper merge(env, internal_comparator.user_comparator(),
ioptions.merge_operator, nullptr, ioptions.info_log,
ioptions.min_partial_merge_operands,
mutable_cf_options.min_partial_merge_operands,
true /* internal key corruption is not ok */,
snapshots.empty() ? 0 : snapshots.back());

@ -19,6 +19,7 @@
#include "rocksdb/table_properties.h"
#include "rocksdb/types.h"
#include "util/event_logger.h"
#include "util/mutable_cf_options.h"
namespace rocksdb {
@ -61,8 +62,8 @@ TableBuilder* NewTableBuilder(
// by column_family_id, or empty string if unknown.
extern Status BuildTable(
const std::string& dbname, Env* env, const ImmutableCFOptions& options,
const EnvOptions& env_options, TableCache* table_cache,
InternalIterator* iter, FileMetaData* meta,
const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options,
TableCache* table_cache, InternalIterator* iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
int_tbl_prop_collector_factories,

@ -16,8 +16,8 @@
#include <inttypes.h>
#include <vector>
#include "rocksdb/compaction_filter.h"
#include "db/column_family.h"
#include "rocksdb/compaction_filter.h"
#include "util/logging.h"
#include "util/sync_point.h"
@ -205,8 +205,9 @@ Compaction::~Compaction() {
bool Compaction::InputCompressionMatchesOutput() const {
VersionStorageInfo* vstorage = input_version_->storage_info();
int base_level = vstorage->base_level();
bool matches = (GetCompressionType(*cfd_->ioptions(), vstorage, start_level_,
base_level) == output_compression_);
bool matches =
(GetCompressionType(*cfd_->ioptions(), vstorage, mutable_cf_options_,
start_level_, base_level) == output_compression_);
if (matches) {
TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
return true;
@ -292,8 +293,8 @@ bool Compaction::KeyNotExistsBeyondOutputLevel(
void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) {
for (size_t i = 0; i < num_input_levels(); i++) {
for (size_t j = 0; j < inputs_[i].size(); j++) {
assert(mark_as_compacted ? !inputs_[i][j]->being_compacted :
inputs_[i][j]->being_compacted);
assert(mark_as_compacted ? !inputs_[i][j]->being_compacted
: inputs_[i][j]->being_compacted);
inputs_[i][j]->being_compacted = mark_as_compacted;
}
}
@ -368,10 +369,8 @@ int InputSummary(const std::vector<FileMetaData*>& files, char* output,
void Compaction::Summary(char* output, int len) {
int write =
snprintf(output, len, "Base version %" PRIu64
" Base level %d, inputs: [",
input_version_->GetVersionNumber(),
start_level_);
snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [",
input_version_->GetVersionNumber(), start_level_);
if (write < 0 || write >= len) {
return;
}

@ -471,9 +471,9 @@ void CompactionJob::GenSubcompactionBoundaries() {
// Group the ranges into subcompactions
const double min_file_fill_percent = 4.0 / 5;
uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
sum / min_file_fill_percent /
cfd->GetCurrentMutableCFOptions()->MaxFileSizeForLevel(out_lvl)));
uint64_t max_output_files = static_cast<uint64_t>(
std::ceil(sum / min_file_fill_percent /
c->mutable_cf_options()->MaxFileSizeForLevel(out_lvl)));
uint64_t subcompactions =
std::min({static_cast<uint64_t>(ranges.size()),
static_cast<uint64_t>(db_options_.max_subcompactions),
@ -704,7 +704,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
MergeHelper merge(
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
compaction_filter, db_options_.info_log.get(),
cfd->ioptions()->min_partial_merge_operands,
mutable_cf_options->min_partial_merge_operands,
false /* internal key corruption is expected */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
compact_->compaction->level(), db_options_.statistics.get());

@ -122,6 +122,7 @@ class CompactionJob {
const std::string& dbname_;
const DBOptions& db_options_;
const EnvOptions& env_options_;
Env* env_;
VersionSet* versions_;
std::atomic<bool>* shutting_down_;

@ -70,7 +70,8 @@ struct UserKeyComparator {
};
typedef std::priority_queue<InputFileInfo, std::vector<InputFileInfo>,
UserKeyComparator> SmallestKeyHeap;
UserKeyComparator>
SmallestKeyHeap;
// This function creates the heap that is used to find if the files are
// overlapping during universal compaction when the allow_trivial_move
@ -110,6 +111,7 @@ SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) {
// Otherwise, the compression type is determined based on options and level.
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
int level, int base_level,
const bool enable_compression) {
if (!enable_compression) {
@ -123,7 +125,7 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
level > base_level && level >= (vstorage->num_non_empty_levels() - 1)) {
return ioptions.bottommost_compression;
}
// If the use has specified a different compression level for each level,
// If the user has specified a different compression level for each level,
// then pick the compression for that level.
if (!ioptions.compression_per_level.empty()) {
assert(level == 0 || level >= base_level);
@ -137,7 +139,7 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
// specified compression levels, use the last value.
return ioptions.compression_per_level[std::max(0, std::min(idx, n))];
} else {
return ioptions.compression;
return mutable_cf_options.compression;
}
}
@ -198,10 +200,9 @@ void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
InternalKey smallest1, smallest2, largest1, largest2;
GetRange(inputs1, &smallest1, &largest1);
GetRange(inputs2, &smallest2, &largest2);
*smallest = icmp_->Compare(smallest1, smallest2) < 0 ?
smallest1 : smallest2;
*largest = icmp_->Compare(largest1, largest2) < 0 ?
largest2 : largest1;
*smallest =
icmp_->Compare(smallest1, smallest2) < 0 ? smallest1 : smallest2;
*largest = icmp_->Compare(largest1, largest2) < 0 ? largest2 : largest1;
}
}
@ -266,9 +267,9 @@ Compaction* CompactionPicker::FormCompaction(
VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
uint32_t output_path_id) {
uint64_t max_grandparent_overlap_bytes =
output_level + 1 < vstorage->num_levels() ?
mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) :
std::numeric_limits<uint64_t>::max();
output_level + 1 < vstorage->num_levels()
? mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1)
: std::numeric_limits<uint64_t>::max();
assert(input_files.size());
// TODO(rven ): we might be able to run concurrent level 0 compaction
@ -292,8 +293,7 @@ Compaction* CompactionPicker::FormCompaction(
Status CompactionPicker::GetCompactionInputsFromFileNumbers(
std::vector<CompactionInputFiles>* input_files,
std::unordered_set<uint64_t>* input_set,
const VersionStorageInfo* vstorage,
std::unordered_set<uint64_t>* input_set, const VersionStorageInfo* vstorage,
const CompactionOptions& compact_options) const {
if (input_set->size() == 0U) {
return Status::InvalidArgument(
@ -331,8 +331,8 @@ Status CompactionPicker::GetCompactionInputsFromFileNumbers(
return Status::InvalidArgument(message);
}
for (int level = first_non_empty_level;
level <= last_non_empty_level; ++level) {
for (int level = first_non_empty_level; level <= last_non_empty_level;
++level) {
matched_input_files[level].level = level;
input_files->emplace_back(std::move(matched_input_files[level]));
}
@ -340,8 +340,6 @@ Status CompactionPicker::GetCompactionInputsFromFileNumbers(
return Status::OK();
}
// Returns true if any one of the parent files are being compacted
bool CompactionPicker::RangeInCompaction(VersionStorageInfo* vstorage,
const InternalKey* smallest,
@ -513,7 +511,8 @@ Compaction* CompactionPicker::CompactRange(
vstorage, mutable_cf_options, std::move(inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
/* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id,
GetCompressionType(ioptions_, vstorage, output_level, 1),
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
output_level, 1),
/* grandparents */ {}, /* is manual */ true);
if (start_level == 0) {
level0_compactions_in_progress_.insert(c);
@ -550,7 +549,7 @@ Compaction* CompactionPicker::CompactRange(
// two files overlap.
if (input_level > 0) {
const uint64_t limit = mutable_cf_options.MaxFileSizeForLevel(input_level) *
mutable_cf_options.source_compaction_factor;
mutable_cf_options.source_compaction_factor;
uint64_t total = 0;
for (size_t i = 0; i + 1 < inputs.size(); ++i) {
uint64_t s = inputs[i]->compensated_file_size;
@ -613,8 +612,9 @@ Compaction* CompactionPicker::CompactRange(
vstorage, mutable_cf_options, std::move(compaction_inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(input_level),
output_path_id, GetCompressionType(ioptions_, vstorage, output_level,
vstorage->base_level()),
output_path_id,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
vstorage->base_level()),
std::move(grandparents), /* is manual compaction */ true);
TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
@ -638,9 +638,8 @@ Compaction* CompactionPicker::CompactRange(
#ifndef ROCKSDB_LITE
namespace {
// Test whether two files have overlapping key-ranges.
bool HaveOverlappingKeyRanges(
const Comparator* c,
const SstFileMetaData& a, const SstFileMetaData& b) {
bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
const SstFileMetaData& b) {
if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
// b.smallestkey <= a.smallestkey <= b.largestkey
@ -664,9 +663,8 @@ bool HaveOverlappingKeyRanges(
} // namespace
Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const {
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta, const int output_level) const {
auto& levels = cf_meta.levels;
auto comparator = icmp_->user_comparator();
@ -719,18 +717,17 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
// has overlapping key-range with other non-compaction input
// files in the same level.
while (first_included > 0) {
if (comparator->Compare(
current_files[first_included - 1].largestkey,
current_files[first_included].smallestkey) < 0) {
if (comparator->Compare(current_files[first_included - 1].largestkey,
current_files[first_included].smallestkey) <
0) {
break;
}
first_included--;
}
while (last_included < static_cast<int>(current_files.size()) - 1) {
if (comparator->Compare(
current_files[last_included + 1].smallestkey,
current_files[last_included].largestkey) > 0) {
if (comparator->Compare(current_files[last_included + 1].smallestkey,
current_files[last_included].largestkey) > 0) {
break;
}
last_included++;
@ -740,33 +737,31 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
// include all files between the first and the last compaction input files.
for (int f = first_included; f <= last_included; ++f) {
if (current_files[f].being_compacted) {
return Status::Aborted(
"Necessary compaction input file " + current_files[f].name +
" is currently being compacted.");
return Status::Aborted("Necessary compaction input file " +
current_files[f].name +
" is currently being compacted.");
}
input_files->insert(
TableFileNameToNumber(current_files[f].name));
input_files->insert(TableFileNameToNumber(current_files[f].name));
}
// update smallest and largest key
if (l == 0) {
for (int f = first_included; f <= last_included; ++f) {
if (comparator->Compare(
smallestkey, current_files[f].smallestkey) > 0) {
if (comparator->Compare(smallestkey, current_files[f].smallestkey) >
0) {
smallestkey = current_files[f].smallestkey;
}
if (comparator->Compare(
largestkey, current_files[f].largestkey) < 0) {
if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) {
largestkey = current_files[f].largestkey;
}
}
} else {
if (comparator->Compare(
smallestkey, current_files[first_included].smallestkey) > 0) {
if (comparator->Compare(smallestkey,
current_files[first_included].smallestkey) > 0) {
smallestkey = current_files[first_included].smallestkey;
}
if (comparator->Compare(
largestkey, current_files[last_included].largestkey) < 0) {
if (comparator->Compare(largestkey,
current_files[last_included].largestkey) < 0) {
largestkey = current_files[last_included].largestkey;
}
}
@ -783,16 +778,15 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
// time and not by key
for (int m = std::max(l, 1); m <= output_level; ++m) {
for (auto& next_lv_file : levels[m].files) {
if (HaveOverlappingKeyRanges(
comparator, aggregated_file_meta, next_lv_file)) {
if (HaveOverlappingKeyRanges(comparator, aggregated_file_meta,
next_lv_file)) {
if (next_lv_file.being_compacted) {
return Status::Aborted(
"File " + next_lv_file.name +
" that has overlapping key range with one of the compaction "
" input file is currently being compacted.");
}
input_files->insert(
TableFileNameToNumber(next_lv_file.name));
input_files->insert(TableFileNameToNumber(next_lv_file.name));
}
}
}
@ -802,28 +796,25 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
Status CompactionPicker::SanitizeCompactionInputFiles(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const {
const ColumnFamilyMetaData& cf_meta, const int output_level) const {
assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
cf_meta.levels[cf_meta.levels.size() - 1].level);
if (output_level >= static_cast<int>(cf_meta.levels.size())) {
return Status::InvalidArgument(
"Output level for column family " + cf_meta.name +
" must between [0, " +
ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) +
"].");
ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) + "].");
}
if (output_level > MaxOutputLevel()) {
return Status::InvalidArgument(
"Exceed the maximum output level defined by "
"the current compaction algorithm --- " +
ToString(MaxOutputLevel()));
ToString(MaxOutputLevel()));
}
if (output_level < 0) {
return Status::InvalidArgument(
"Output level cannot be negative.");
return Status::InvalidArgument("Output level cannot be negative.");
}
if (input_files->size() == 0) {
@ -831,8 +822,8 @@ Status CompactionPicker::SanitizeCompactionInputFiles(
"A compaction must contain at least one file.");
}
Status s = SanitizeCompactionInputFilesForAllLevels(
input_files, cf_meta, output_level);
Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta,
output_level);
if (!s.ok()) {
return s;
@ -846,10 +837,9 @@ Status CompactionPicker::SanitizeCompactionInputFiles(
for (auto file_meta : level_meta.files) {
if (file_num == TableFileNameToNumber(file_meta.name)) {
if (file_meta.being_compacted) {
return Status::Aborted(
"Specified compaction input file " +
MakeTableFileName("", file_num) +
" is already being compacted.");
return Status::Aborted("Specified compaction input file " +
MakeTableFileName("", file_num) +
" is already being compacted.");
}
found = true;
break;
@ -861,8 +851,7 @@ Status CompactionPicker::SanitizeCompactionInputFiles(
}
if (!found) {
return Status::InvalidArgument(
"Specified compaction input file " +
MakeTableFileName("", file_num) +
"Specified compaction input file " + MakeTableFileName("", file_num) +
" does not exist in column family " + cf_meta.name + ".");
}
}
@ -871,8 +860,8 @@ Status CompactionPicker::SanitizeCompactionInputFiles(
}
#endif // !ROCKSDB_LITE
bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
const {
bool LevelCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
if (!vstorage->FilesMarkedForCompaction().empty()) {
return true;
}
@ -1018,7 +1007,7 @@ Compaction* LevelCompactionPicker::PickCompaction(
CompactionInputFiles output_level_inputs;
output_level_inputs.level = output_level;
if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
&output_level_inputs, &parent_index, base_index)) {
&output_level_inputs, &parent_index, base_index)) {
return nullptr;
}
@ -1034,7 +1023,7 @@ Compaction* LevelCompactionPicker::PickCompaction(
mutable_cf_options.MaxFileSizeForLevel(output_level),
mutable_cf_options.MaxGrandParentOverlapBytes(level),
GetPathId(ioptions_, mutable_cf_options, output_level),
GetCompressionType(ioptions_, vstorage, output_level,
GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level,
vstorage->base_level()),
std::move(grandparents), is_manual, score,
false /* deletion_compaction */, compaction_reason);
@ -1448,8 +1437,9 @@ uint32_t UniversalCompactionPicker::GetPathId(
// considered in this algorithm. So the target size can be violated in
// that case. We need to improve it.
uint64_t accumulated_size = 0;
uint64_t future_size = file_size *
(100 - ioptions.compaction_options_universal.size_ratio) / 100;
uint64_t future_size =
file_size * (100 - ioptions.compaction_options_universal.size_ratio) /
100;
uint32_t p = 0;
assert(!ioptions.db_paths.empty());
for (; p < ioptions.db_paths.size() - 1; p++) {
@ -1473,17 +1463,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
unsigned int max_number_of_files_to_compact,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
unsigned int min_merge_width =
ioptions_.compaction_options_universal.min_merge_width;
ioptions_.compaction_options_universal.min_merge_width;
unsigned int max_merge_width =
ioptions_.compaction_options_universal.max_merge_width;
ioptions_.compaction_options_universal.max_merge_width;
const SortedRun* sr = nullptr;
bool done = false;
size_t start_index = 0;
unsigned int candidate_count = 0;
unsigned int max_files_to_compact = std::min(max_merge_width,
max_number_of_files_to_compact);
unsigned int max_files_to_compact =
std::min(max_merge_width, max_number_of_files_to_compact);
min_merge_width = std::max(min_merge_width, 2U);
// Caller checks the size before executing this function. This invariant is
@ -1595,7 +1585,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
uint64_t older_file_size = 0;
for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) {
older_file_size += sorted_runs[i].size;
if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
if (older_file_size * 100L >= total_size * (long)ratio_to_compress) {
enable_compression = false;
break;
}
@ -1647,8 +1637,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp(
return new Compaction(
vstorage, mutable_cf_options, std::move(inputs), output_level,
mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, start_level, 1,
enable_compression),
GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level,
1, enable_compression),
/* grandparents */ {}, /* is manual */ false, score,
false /* deletion_compaction */, compaction_reason);
}
@ -1664,8 +1654,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
VersionStorageInfo* vstorage, double score,
const std::vector<SortedRun>& sorted_runs, LogBuffer* log_buffer) {
// percentage flexibilty while reducing size amplification
uint64_t ratio = ioptions_.compaction_options_universal.
max_size_amplification_percent;
uint64_t ratio =
ioptions_.compaction_options_universal.max_size_amplification_percent;
unsigned int candidate_count = 0;
uint64_t candidate_size = 0;
@ -1676,7 +1666,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) {
sr = &sorted_runs[loop];
if (!sr->being_compacted) {
start_index = loop; // Consider this as the first candidate.
start_index = loop; // Consider this as the first candidate.
break;
}
char file_num_buf[kFormatFileNumberBufSize];
@ -1688,7 +1678,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
}
if (sr == nullptr) {
return nullptr; // no candidate files
return nullptr; // no candidate files
}
{
char file_num_buf[kFormatFileNumberBufSize];
@ -1773,14 +1763,15 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp(
vstorage->num_levels() - 1,
mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1),
/* max_grandparent_overlap_bytes */ LLONG_MAX, path_id,
GetCompressionType(ioptions_, vstorage, vstorage->num_levels() - 1, 1),
GetCompressionType(ioptions_, vstorage, mutable_cf_options,
vstorage->num_levels() - 1, 1),
/* grandparents */ {}, /* is manual */ false, score,
false /* deletion_compaction */,
CompactionReason::kUniversalSizeAmplification);
}
bool FIFOCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage)
const {
bool FIFOCompactionPicker::NeedsCompaction(
const VersionStorageInfo* vstorage) const {
const int kLevel0 = 0;
return vstorage->CompactionScore(kLevel0) >= 1;
}

@ -63,23 +63,20 @@ class CompactionPicker {
InternalKey** compaction_end, bool* manual_conflict);
// The maximum allowed output level. Default value is NumberLevels() - 1.
virtual int MaxOutputLevel() const {
return NumberLevels() - 1;
}
virtual int MaxOutputLevel() const { return NumberLevels() - 1; }
virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const = 0;
// Sanitize the input set of compaction input files.
// When the input parameters do not describe a valid compaction, the
// function will try to fix the input_files by adding necessary
// files. If it's not possible to conver an invalid input_files
// into a valid one by adding more files, the function will return a
// non-ok status with specific reason.
// Sanitize the input set of compaction input files.
// When the input parameters do not describe a valid compaction, the
// function will try to fix the input_files by adding necessary
// files. If it's not possible to conver an invalid input_files
// into a valid one by adding more files, the function will return a
// non-ok status with specific reason.
#ifndef ROCKSDB_LITE
Status SanitizeCompactionInputFiles(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const;
Status SanitizeCompactionInputFiles(std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const;
#endif // ROCKSDB_LITE
// Free up the files that participated in a compaction
@ -123,15 +120,15 @@ class CompactionPicker {
// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
void GetRange(const CompactionInputFiles& inputs,
InternalKey* smallest, InternalKey* largest);
void GetRange(const CompactionInputFiles& inputs, InternalKey* smallest,
InternalKey* largest);
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void GetRange(const CompactionInputFiles& inputs1,
const CompactionInputFiles& inputs2,
InternalKey* smallest, InternalKey* largest);
const CompactionInputFiles& inputs2, InternalKey* smallest,
InternalKey* largest);
// Add more files to the inputs on "level" to make sure that
// no newer version of a key is compacted to "level+1" while leaving an older
@ -166,13 +163,12 @@ class CompactionPicker {
const ImmutableCFOptions& ioptions_;
// A helper function to SanitizeCompactionInputFiles() that
// sanitizes "input_files" by adding necessary files.
// A helper function to SanitizeCompactionInputFiles() that
// sanitizes "input_files" by adding necessary files.
#ifndef ROCKSDB_LITE
virtual Status SanitizeCompactionInputFilesForAllLevels(
std::unordered_set<uint64_t>* input_files,
const ColumnFamilyMetaData& cf_meta,
const int output_level) const;
const ColumnFamilyMetaData& cf_meta, const int output_level) const;
#endif // ROCKSDB_LITE
// Keeps track of all compactions that are running on Level0.
@ -192,8 +188,8 @@ class LevelCompactionPicker : public CompactionPicker {
VersionStorageInfo* vstorage,
LogBuffer* log_buffer) override;
virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const
override;
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
// Pick a path ID to place a newly generated file, with its level
static uint32_t GetPathId(const ImmutableCFOptions& ioptions,
@ -232,8 +228,8 @@ class UniversalCompactionPicker : public CompactionPicker {
virtual int MaxOutputLevel() const override { return NumberLevels() - 1; }
virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const
override;
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
private:
struct SortedRun {
@ -307,19 +303,17 @@ class FIFOCompactionPicker : public CompactionPicker {
InternalKey** compaction_end, bool* manual_conflict) override;
// The maximum allowed output level. Always returns 0.
virtual int MaxOutputLevel() const override {
return 0;
}
virtual int MaxOutputLevel() const override { return 0; }
virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const
override;
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override;
};
class NullCompactionPicker : public CompactionPicker {
public:
NullCompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp) :
CompactionPicker(ioptions, icmp) {}
const InternalKeyComparator* icmp)
: CompactionPicker(ioptions, icmp) {}
virtual ~NullCompactionPicker() {}
// Always return "nullptr"
@ -342,8 +336,8 @@ class NullCompactionPicker : public CompactionPicker {
}
// Always returns false.
virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const
override {
virtual bool NeedsCompaction(
const VersionStorageInfo* vstorage) const override {
return false;
}
};
@ -351,6 +345,7 @@ class NullCompactionPicker : public CompactionPicker {
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
const VersionStorageInfo* vstorage,
const MutableCFOptions& mutable_cf_options,
int level, int base_level,
const bool enable_compression = true);

@ -467,7 +467,6 @@ TEST_F(CompactionPickerTest, NeedsCompactionFIFO) {
fifo_options_.max_table_files_size = kMaxSize;
ioptions_.compaction_options_fifo = fifo_options_;
FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_);
UpdateVersionStorageInfo();
// must return false when there's no files.
ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), false);

@ -241,7 +241,9 @@ static Status ValidateOptions(
return Status::OK();
}
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions) {
CompressionType GetCompressionFlush(
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options) {
// Compressing memtable flushes might not help unless the sequential load
// optimization is used for leveled compaction. Otherwise the CPU and
// latency overhead is not offset by saving much space.
@ -258,7 +260,7 @@ CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions) {
}
if (can_compress) {
return ioptions.compression;
return mutable_cf_options.compression;
} else {
return kNoCompression;
}
@ -1629,6 +1631,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
" Level-0 table #%" PRIu64 ": started",
cfd->GetName().c_str(), meta.fd.GetNumber());
// Get the latest mutable cf options while the mutex is still locked
const MutableCFOptions mutable_cf_options =
*cfd->GetLatestMutableCFOptions();
bool paranoid_file_checks =
cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
{
@ -1639,11 +1644,11 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
snapshots_.GetAll(&earliest_write_conflict_snapshot);
s = BuildTable(
dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(),
iter.get(), &meta, cfd->internal_comparator(),
dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_,
cfd->table_cache(), iter.get(), &meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
snapshot_seqs, earliest_write_conflict_snapshot,
GetCompressionFlush(*cfd->ioptions()),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery,
&event_logger_, job_id);
@ -1690,13 +1695,13 @@ Status DBImpl::FlushMemTableToOutputFile(
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options,
env_options_, versions_.get(), &mutex_, &shutting_down_,
snapshot_seqs, earliest_write_conflict_snapshot,
job_context, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions()), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats);
FlushJob flush_job(
dbname_, cfd, db_options_, mutable_cf_options, env_options_,
versions_.get(), &mutex_, &shutting_down_, snapshot_seqs,
earliest_write_conflict_snapshot, job_context, log_buffer,
directories_.GetDbDir(), directories_.GetDataDir(0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats);
FileMetaData file_meta;

@ -344,6 +344,10 @@ class DBImpl : public DB {
Status TEST_GetAllImmutableCFOptions(
std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map);
// Return the lastest MutableCFOptions of of a column family
Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family,
MutableCFOptions* mutable_cf_opitons);
Cache* TEST_table_cache() { return table_cache_.get(); }
WriteController& TEST_write_controler() { return write_controller_; }

@ -168,5 +168,15 @@ uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() {
uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
return FindMinPrepLogReferencedByMemTable();
}
Status DBImpl::TEST_GetLatestMutableCFOptions(
ColumnFamilyHandle* column_family, MutableCFOptions* mutable_cf_options) {
InstrumentedMutexLock l(&mutex_);
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
*mutable_cf_options = *cfh->cfd()->GetLatestMutableCFOptions();
return Status::OK();
}
} // namespace rocksdb
#endif // NDEBUG

@ -754,9 +754,8 @@ TEST_F(DBSSTTest, AddExternalSstFile) {
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
options.env = env_;
const ImmutableCFOptions ioptions(options);
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
// file1.sst (0 => 99)
std::string file1 = sst_files_folder + "file1.sst";
@ -934,9 +933,7 @@ TEST_F(DBSSTTest, AddExternalSstFilePurgeObsoleteFilesBug) {
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
options.env = env_;
const ImmutableCFOptions ioptions(options);
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
// file1.sst (0 => 500)
std::string sst_file_path = sst_files_folder + "file1.sst";
@ -985,7 +982,7 @@ TEST_F(DBSSTTest, AddExternalSstFileNoCopy) {
options.env = env_;
const ImmutableCFOptions ioptions(options);
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
// file1.sst (0 => 99)
std::string file1 = sst_files_folder + "file1.sst";
@ -1063,7 +1060,6 @@ TEST_F(DBSSTTest, AddExternalSstFileMultiThreaded) {
do {
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
const ImmutableCFOptions ioptions(options);
std::atomic<int> thread_num(0);
std::function<void()> write_file_func = [&]() {
@ -1071,7 +1067,7 @@ TEST_F(DBSSTTest, AddExternalSstFileMultiThreaded) {
int range_start = file_idx * keys_per_file;
int range_end = range_start + keys_per_file;
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
ASSERT_OK(sst_file_writer.Open(file_names[file_idx]));
@ -1154,8 +1150,8 @@ TEST_F(DBSSTTest, AddExternalSstFileOverlappingRanges) {
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
DestroyAndReopen(options);
const ImmutableCFOptions ioptions(options);
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator);
printf("Option config = %d\n", option_config_);
std::vector<std::pair<int, int>> key_ranges;

@ -10,12 +10,12 @@
// Introduction of SyncPoint effectively disabled building and running this test
// in Release build.
// which is a pity, it is a good test
#include <fcntl.h>
#include <algorithm>
#include <set>
#include <thread>
#include <unordered_set>
#include <utility>
#include <fcntl.h>
#ifndef OS_WIN
#include <unistd.h>
#endif
@ -23,10 +23,10 @@
#include <alloca.h>
#endif
#include "db/filename.h"
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/db_test_util.h"
#include "db/dbformat.h"
#include "db/filename.h"
#include "db/job_context.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
@ -47,27 +47,27 @@
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "rocksdb/thread_status.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/optimistic_transaction_db.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "table/block_based_table_factory.h"
#include "table/mock_table.h"
#include "table/plain_table_factory.h"
#include "table/scoped_arena_iterator.h"
#include "util/compression.h"
#include "util/file_reader_writer.h"
#include "util/hash.h"
#include "utilities/merge_operators.h"
#include "util/logging.h"
#include "util/compression.h"
#include "util/mock_env.h"
#include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/mock_env.h"
#include "util/string_util.h"
#include "util/thread_status_util.h"
#include "util/xfunc.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
@ -125,8 +125,8 @@ TEST_F(DBTest, MockEnvTest) {
ASSERT_TRUE(!iterator->Valid());
delete iterator;
// TEST_FlushMemTable() is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE
// TEST_FlushMemTable() is not supported in ROCKSDB_LITE
#ifndef ROCKSDB_LITE
DBImpl* dbi = reinterpret_cast<DBImpl*>(db);
ASSERT_OK(dbi->TEST_FlushMemTable());
@ -135,7 +135,7 @@ TEST_F(DBTest, MockEnvTest) {
ASSERT_OK(db->Get(ReadOptions(), keys[i], &res));
ASSERT_TRUE(res == vals[i]);
}
#endif // ROCKSDB_LITE
#endif // ROCKSDB_LITE
delete db;
}
@ -322,7 +322,8 @@ TEST_F(DBTest, CompactedDB) {
// MultiGet
std::vector<std::string> values;
std::vector<Status> status_list = dbfull()->MultiGet(ReadOptions(),
std::vector<Status> status_list = dbfull()->MultiGet(
ReadOptions(),
std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
Slice("ggg"), Slice("iii"), Slice("kkk")}),
&values);
@ -662,8 +663,8 @@ TEST_F(DBTest, GetFromImmutableLayer) {
// Block sync calls
env_->delay_sstable_sync_.store(true, std::memory_order_release);
Put(1, "k1", std::string(100000, 'x')); // Fill memtable
Put(1, "k2", std::string(100000, 'y')); // Trigger flush
Put(1, "k1", std::string(100000, 'x')); // Fill memtable
Put(1, "k2", std::string(100000, 'y')); // Trigger flush
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
// Release sync calls
@ -3192,8 +3193,8 @@ class ModelDB : public DB {
virtual Status GetUpdatesSince(
rocksdb::SequenceNumber, unique_ptr<rocksdb::TransactionLogIterator>*,
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions()) override {
const TransactionLogIterator::ReadOptions& read_options =
TransactionLogIterator::ReadOptions()) override {
return Status::NotSupported("Not supported in Model DB");
}
@ -3279,8 +3280,7 @@ static bool CompareIterators(int step, DB* model, DB* db,
ok && miter->Valid() && dbiter->Valid(); miter->Next(), dbiter->Next()) {
count++;
if (miter->key().compare(dbiter->key()) != 0) {
fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n",
step,
fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n", step,
EscapeString(miter->key()).c_str(),
EscapeString(dbiter->key()).c_str());
ok = false;
@ -3483,7 +3483,6 @@ TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewFixedPrefixTransform(1));
Reopen(options);
ASSERT_OK(Put("k1", "v1"));
Flush();
@ -3682,7 +3681,6 @@ TEST_F(DBTest, TableOptionsSanitizeTest) {
ASSERT_OK(TryReopen(options));
}
// On Windows you can have either memory mapped file or a file
// with unbuffered access. So this asserts and does not make
// sense to run
@ -4912,6 +4910,48 @@ TEST_F(DBTest, DynamicMiscOptions) {
dbfull()->TEST_FlushMemTable(true);
// No reseek
assert_reseek_count(300, 1);
MutableCFOptions mutable_cf_options;
CreateAndReopenWithCF({"pikachu"}, options);
// Test soft_pending_compaction_bytes_limit,
// hard_pending_compaction_bytes_limit
ASSERT_OK(dbfull()->SetOptions(
handles_[1], {{"soft_pending_compaction_bytes_limit", "200"},
{"hard_pending_compaction_bytes_limit", "300"}}));
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
&mutable_cf_options));
ASSERT_EQ(200, mutable_cf_options.soft_pending_compaction_bytes_limit);
ASSERT_EQ(300, mutable_cf_options.hard_pending_compaction_bytes_limit);
// Test report_bg_io_stats
ASSERT_OK(
dbfull()->SetOptions(handles_[1], {{"report_bg_io_stats", "true"}}));
// sanity check
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
&mutable_cf_options));
ASSERT_EQ(true, mutable_cf_options.report_bg_io_stats);
// Test min_partial_merge_operands
ASSERT_OK(
dbfull()->SetOptions(handles_[1], {{"min_partial_merge_operands", "4"}}));
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
&mutable_cf_options));
ASSERT_EQ(4, mutable_cf_options.min_partial_merge_operands);
// Test compression
// sanity check
ASSERT_OK(dbfull()->SetOptions({{"compression", "kNoCompression"}}));
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[0],
&mutable_cf_options));
ASSERT_EQ(CompressionType::kNoCompression, mutable_cf_options.compression);
ASSERT_OK(dbfull()->SetOptions({{"compression", "kSnappyCompression"}}));
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[0],
&mutable_cf_options));
ASSERT_EQ(CompressionType::kSnappyCompression,
mutable_cf_options.compression);
// Test paranoid_file_checks already done in db_block_cache_test
ASSERT_OK(
dbfull()->SetOptions(handles_[1], {{"paranoid_file_checks", "true"}}));
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
&mutable_cf_options));
ASSERT_EQ(true, mutable_cf_options.report_bg_io_stats);
}
#endif // ROCKSDB_LITE
@ -4960,7 +5000,7 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
// iter 3 -- lz4HC
// iter 4 -- xpress
CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
kLZ4Compression, kLZ4HCCompression,
kLZ4Compression, kLZ4HCCompression,
kXpressCompression};
for (auto comp : compressions) {
if (!CompressionTypeSupported(comp)) {
@ -5843,7 +5883,7 @@ TEST_F(DBTest, LastWriteBufferDelay) {
TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
kLZ4Compression, kLZ4HCCompression,
kLZ4Compression, kLZ4HCCompression,
kXpressCompression};
for (auto comp : compressions) {
if (!CompressionTypeSupported(comp)) {

@ -264,10 +264,10 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), env_options_,
cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(), iter.get(), meta,
cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(),
cfd_->GetID(), cfd_->GetName(), existing_snapshots_,
earliest_write_conflict_snapshot_, output_compression_,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),

@ -291,9 +291,11 @@ class Repairer {
ro.total_order_seek = true;
Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
MutableCFOptions mutable_cf_options(options_, ioptions_);
status = BuildTable(
dbname_, env_, ioptions_, env_options_, table_cache_, iter.get(),
&meta, icmp_, &int_tbl_prop_collector_factories_,
dbname_, env_, ioptions_, mutable_cf_options, env_options_,
table_cache_, iter.get(), &meta, icmp_,
&int_tbl_prop_collector_factories_,
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
std::string() /* column_family_name */, {}, kMaxSequenceNumber,
kNoCompression, CompressionOptions(), false,

@ -75,14 +75,10 @@ struct ImmutableCFOptions {
bool purge_redundant_kvs_while_flush;
uint32_t min_partial_merge_operands;
bool disable_data_sync;
bool use_fsync;
CompressionType compression;
std::vector<CompressionType> compression_per_level;
CompressionType bottommost_compression;

@ -8,6 +8,7 @@
#include "rocksdb/env.h"
#include "rocksdb/immutable_options.h"
#include "rocksdb/types.h"
#include "util/mutable_cf_options.h"
namespace rocksdb {
@ -49,8 +50,7 @@ struct ExternalSstFileInfo {
// All keys in files generated by SstFileWriter will have sequence number = 0
class SstFileWriter {
public:
SstFileWriter(const EnvOptions& env_options,
const ImmutableCFOptions& ioptions,
SstFileWriter(const EnvOptions& env_options, const Options& options,
const Comparator* user_comparator);
~SstFileWriter();

@ -71,25 +71,27 @@ class SstFileWriter::SstFileWriterPropertiesCollectorFactory
};
struct SstFileWriter::Rep {
Rep(const EnvOptions& _env_options, const ImmutableCFOptions& _ioptions,
Rep(const EnvOptions& _env_options, const Options& options,
const Comparator* _user_comparator)
: env_options(_env_options),
ioptions(_ioptions),
ioptions(options),
mutable_cf_options(options, ioptions),
internal_comparator(_user_comparator) {}
std::unique_ptr<WritableFileWriter> file_writer;
std::unique_ptr<TableBuilder> builder;
EnvOptions env_options;
ImmutableCFOptions ioptions;
MutableCFOptions mutable_cf_options;
InternalKeyComparator internal_comparator;
ExternalSstFileInfo file_info;
std::string column_family_name;
};
SstFileWriter::SstFileWriter(const EnvOptions& env_options,
const ImmutableCFOptions& ioptions,
const Options& options,
const Comparator* user_comparator)
: rep_(new Rep(env_options, ioptions, user_comparator)) {}
: rep_(new Rep(env_options, options, user_comparator)) {}
SstFileWriter::~SstFileWriter() { delete rep_; }
@ -102,7 +104,7 @@ Status SstFileWriter::Open(const std::string& file_path) {
return s;
}
CompressionType compression_type = r->ioptions.compression;
CompressionType compression_type = r->mutable_cf_options.compression;
if (!r->ioptions.compression_per_level.empty()) {
// Use the compression of the last level if we have per level compression
compression_type = *(r->ioptions.compression_per_level.rbegin());

@ -6,8 +6,9 @@
#pragma once
#include <vector>
#include "rocksdb/options.h"
#include "rocksdb/immutable_options.h"
#include "rocksdb/options.h"
#include "util/compression.h"
namespace rocksdb {
@ -47,9 +48,9 @@ struct MutableCFOptions {
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations),
paranoid_file_checks(options.paranoid_file_checks),
report_bg_io_stats(options.report_bg_io_stats)
{
report_bg_io_stats(options.report_bg_io_stats),
compression(options.compression),
min_partial_merge_operands(options.min_partial_merge_operands) {
RefreshDerivedOptions(ioptions);
}
MutableCFOptions()
@ -80,7 +81,9 @@ struct MutableCFOptions {
max_subcompactions(1),
max_sequential_skip_in_iterations(0),
paranoid_file_checks(false),
report_bg_io_stats(false) {}
report_bg_io_stats(false),
compression(Snappy_Supported() ? kSnappyCompression : kNoCompression),
min_partial_merge_operands(2) {}
// Must be called after any change to MutableCFOptions
void RefreshDerivedOptions(const ImmutableCFOptions& ioptions);
@ -136,6 +139,8 @@ struct MutableCFOptions {
uint64_t max_sequential_skip_in_iterations;
bool paranoid_file_checks;
bool report_bg_io_stats;
CompressionType compression;
uint32_t min_partial_merge_operands;
// Derived options
// Per-level target file size.

@ -61,10 +61,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
advise_random_on_open(options.advise_random_on_open),
bloom_locality(options.bloom_locality),
purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush),
min_partial_merge_operands(options.min_partial_merge_operands),
disable_data_sync(options.disableDataSync),
use_fsync(options.use_fsync),
compression(options.compression),
compression_per_level(options.compression_per_level),
bottommost_compression(options.bottommost_compression),
compression_opts(options.compression_opts),

@ -622,6 +622,16 @@ bool ParseMiscOptions(const std::string& name, const std::string& value,
new_options->max_sequential_skip_in_iterations = ParseUint64(value);
} else if (name == "paranoid_file_checks") {
new_options->paranoid_file_checks = ParseBoolean(name, value);
} else if (name == "report_bg_io_stats") {
new_options->report_bg_io_stats = ParseBoolean(name, value);
} else if (name == "compression") {
bool is_ok = ParseEnum<CompressionType>(compression_type_string_map, value,
&new_options->compression);
if (!is_ok) {
return false;
}
} else if (name == "min_partial_merge_operands") {
new_options->min_partial_merge_operands = ParseUint32(value);
} else {
return false;
}

Loading…
Cancel
Save