diff --git a/db/builder.cc b/db/builder.cc index a932fe6fd..ab255e493 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -59,8 +59,8 @@ TableBuilder* NewTableBuilder( Status BuildTable( const std::string& dbname, Env* env, const ImmutableCFOptions& ioptions, - const EnvOptions& env_options, TableCache* table_cache, - InternalIterator* iter, FileMetaData* meta, + const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, + TableCache* table_cache, InternalIterator* iter, FileMetaData* meta, const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, @@ -113,7 +113,7 @@ Status BuildTable( MergeHelper merge(env, internal_comparator.user_comparator(), ioptions.merge_operator, nullptr, ioptions.info_log, - ioptions.min_partial_merge_operands, + mutable_cf_options.min_partial_merge_operands, true /* internal key corruption is not ok */, snapshots.empty() ? 0 : snapshots.back()); diff --git a/db/builder.h b/db/builder.h index 7ef1a29d8..62aa717c6 100644 --- a/db/builder.h +++ b/db/builder.h @@ -19,6 +19,7 @@ #include "rocksdb/table_properties.h" #include "rocksdb/types.h" #include "util/event_logger.h" +#include "util/mutable_cf_options.h" namespace rocksdb { @@ -61,8 +62,8 @@ TableBuilder* NewTableBuilder( // by column_family_id, or empty string if unknown. extern Status BuildTable( const std::string& dbname, Env* env, const ImmutableCFOptions& options, - const EnvOptions& env_options, TableCache* table_cache, - InternalIterator* iter, FileMetaData* meta, + const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, + TableCache* table_cache, InternalIterator* iter, FileMetaData* meta, const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, diff --git a/db/compaction.cc b/db/compaction.cc index c70936573..b74069b6b 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -16,8 +16,8 @@ #include #include -#include "rocksdb/compaction_filter.h" #include "db/column_family.h" +#include "rocksdb/compaction_filter.h" #include "util/logging.h" #include "util/sync_point.h" @@ -205,8 +205,9 @@ Compaction::~Compaction() { bool Compaction::InputCompressionMatchesOutput() const { VersionStorageInfo* vstorage = input_version_->storage_info(); int base_level = vstorage->base_level(); - bool matches = (GetCompressionType(*cfd_->ioptions(), vstorage, start_level_, - base_level) == output_compression_); + bool matches = + (GetCompressionType(*cfd_->ioptions(), vstorage, mutable_cf_options_, + start_level_, base_level) == output_compression_); if (matches) { TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches"); return true; @@ -292,8 +293,8 @@ bool Compaction::KeyNotExistsBeyondOutputLevel( void Compaction::MarkFilesBeingCompacted(bool mark_as_compacted) { for (size_t i = 0; i < num_input_levels(); i++) { for (size_t j = 0; j < inputs_[i].size(); j++) { - assert(mark_as_compacted ? !inputs_[i][j]->being_compacted : - inputs_[i][j]->being_compacted); + assert(mark_as_compacted ? !inputs_[i][j]->being_compacted + : inputs_[i][j]->being_compacted); inputs_[i][j]->being_compacted = mark_as_compacted; } } @@ -368,10 +369,8 @@ int InputSummary(const std::vector& files, char* output, void Compaction::Summary(char* output, int len) { int write = - snprintf(output, len, "Base version %" PRIu64 - " Base level %d, inputs: [", - input_version_->GetVersionNumber(), - start_level_); + snprintf(output, len, "Base version %" PRIu64 " Base level %d, inputs: [", + input_version_->GetVersionNumber(), start_level_); if (write < 0 || write >= len) { return; } diff --git a/db/compaction_job.cc b/db/compaction_job.cc index c4eb3a12a..d86167d5f 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -471,9 +471,9 @@ void CompactionJob::GenSubcompactionBoundaries() { // Group the ranges into subcompactions const double min_file_fill_percent = 4.0 / 5; - uint64_t max_output_files = static_cast(std::ceil( - sum / min_file_fill_percent / - cfd->GetCurrentMutableCFOptions()->MaxFileSizeForLevel(out_lvl))); + uint64_t max_output_files = static_cast( + std::ceil(sum / min_file_fill_percent / + c->mutable_cf_options()->MaxFileSizeForLevel(out_lvl))); uint64_t subcompactions = std::min({static_cast(ranges.size()), static_cast(db_options_.max_subcompactions), @@ -704,7 +704,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { MergeHelper merge( env_, cfd->user_comparator(), cfd->ioptions()->merge_operator, compaction_filter, db_options_.info_log.get(), - cfd->ioptions()->min_partial_merge_operands, + mutable_cf_options->min_partial_merge_operands, false /* internal key corruption is expected */, existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), compact_->compaction->level(), db_options_.statistics.get()); diff --git a/db/compaction_job.h b/db/compaction_job.h index c6edefbe0..b2a592ea9 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -122,6 +122,7 @@ class CompactionJob { const std::string& dbname_; const DBOptions& db_options_; const EnvOptions& env_options_; + Env* env_; VersionSet* versions_; std::atomic* shutting_down_; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 1a36930bc..794986e5b 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -70,7 +70,8 @@ struct UserKeyComparator { }; typedef std::priority_queue, - UserKeyComparator> SmallestKeyHeap; + UserKeyComparator> + SmallestKeyHeap; // This function creates the heap that is used to find if the files are // overlapping during universal compaction when the allow_trivial_move @@ -110,6 +111,7 @@ SmallestKeyHeap create_level_heap(Compaction* c, const Comparator* ucmp) { // Otherwise, the compression type is determined based on options and level. CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, const VersionStorageInfo* vstorage, + const MutableCFOptions& mutable_cf_options, int level, int base_level, const bool enable_compression) { if (!enable_compression) { @@ -123,7 +125,7 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, level > base_level && level >= (vstorage->num_non_empty_levels() - 1)) { return ioptions.bottommost_compression; } - // If the use has specified a different compression level for each level, + // If the user has specified a different compression level for each level, // then pick the compression for that level. if (!ioptions.compression_per_level.empty()) { assert(level == 0 || level >= base_level); @@ -137,7 +139,7 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, // specified compression levels, use the last value. return ioptions.compression_per_level[std::max(0, std::min(idx, n))]; } else { - return ioptions.compression; + return mutable_cf_options.compression; } } @@ -198,10 +200,9 @@ void CompactionPicker::GetRange(const CompactionInputFiles& inputs1, InternalKey smallest1, smallest2, largest1, largest2; GetRange(inputs1, &smallest1, &largest1); GetRange(inputs2, &smallest2, &largest2); - *smallest = icmp_->Compare(smallest1, smallest2) < 0 ? - smallest1 : smallest2; - *largest = icmp_->Compare(largest1, largest2) < 0 ? - largest2 : largest1; + *smallest = + icmp_->Compare(smallest1, smallest2) < 0 ? smallest1 : smallest2; + *largest = icmp_->Compare(largest1, largest2) < 0 ? largest2 : largest1; } } @@ -266,9 +267,9 @@ Compaction* CompactionPicker::FormCompaction( VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options, uint32_t output_path_id) { uint64_t max_grandparent_overlap_bytes = - output_level + 1 < vstorage->num_levels() ? - mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) : - std::numeric_limits::max(); + output_level + 1 < vstorage->num_levels() + ? mutable_cf_options.MaxGrandParentOverlapBytes(output_level + 1) + : std::numeric_limits::max(); assert(input_files.size()); // TODO(rven ): we might be able to run concurrent level 0 compaction @@ -292,8 +293,7 @@ Compaction* CompactionPicker::FormCompaction( Status CompactionPicker::GetCompactionInputsFromFileNumbers( std::vector* input_files, - std::unordered_set* input_set, - const VersionStorageInfo* vstorage, + std::unordered_set* input_set, const VersionStorageInfo* vstorage, const CompactionOptions& compact_options) const { if (input_set->size() == 0U) { return Status::InvalidArgument( @@ -331,8 +331,8 @@ Status CompactionPicker::GetCompactionInputsFromFileNumbers( return Status::InvalidArgument(message); } - for (int level = first_non_empty_level; - level <= last_non_empty_level; ++level) { + for (int level = first_non_empty_level; level <= last_non_empty_level; + ++level) { matched_input_files[level].level = level; input_files->emplace_back(std::move(matched_input_files[level])); } @@ -340,8 +340,6 @@ Status CompactionPicker::GetCompactionInputsFromFileNumbers( return Status::OK(); } - - // Returns true if any one of the parent files are being compacted bool CompactionPicker::RangeInCompaction(VersionStorageInfo* vstorage, const InternalKey* smallest, @@ -513,7 +511,8 @@ Compaction* CompactionPicker::CompactRange( vstorage, mutable_cf_options, std::move(inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), /* max_grandparent_overlap_bytes */ LLONG_MAX, output_path_id, - GetCompressionType(ioptions_, vstorage, output_level, 1), + GetCompressionType(ioptions_, vstorage, mutable_cf_options, + output_level, 1), /* grandparents */ {}, /* is manual */ true); if (start_level == 0) { level0_compactions_in_progress_.insert(c); @@ -550,7 +549,7 @@ Compaction* CompactionPicker::CompactRange( // two files overlap. if (input_level > 0) { const uint64_t limit = mutable_cf_options.MaxFileSizeForLevel(input_level) * - mutable_cf_options.source_compaction_factor; + mutable_cf_options.source_compaction_factor; uint64_t total = 0; for (size_t i = 0; i + 1 < inputs.size(); ++i) { uint64_t s = inputs[i]->compensated_file_size; @@ -613,8 +612,9 @@ Compaction* CompactionPicker::CompactRange( vstorage, mutable_cf_options, std::move(compaction_inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxGrandParentOverlapBytes(input_level), - output_path_id, GetCompressionType(ioptions_, vstorage, output_level, - vstorage->base_level()), + output_path_id, + GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, + vstorage->base_level()), std::move(grandparents), /* is manual compaction */ true); TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction); @@ -638,9 +638,8 @@ Compaction* CompactionPicker::CompactRange( #ifndef ROCKSDB_LITE namespace { // Test whether two files have overlapping key-ranges. -bool HaveOverlappingKeyRanges( - const Comparator* c, - const SstFileMetaData& a, const SstFileMetaData& b) { +bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a, + const SstFileMetaData& b) { if (c->Compare(a.smallestkey, b.smallestkey) >= 0) { if (c->Compare(a.smallestkey, b.largestkey) <= 0) { // b.smallestkey <= a.smallestkey <= b.largestkey @@ -664,9 +663,8 @@ bool HaveOverlappingKeyRanges( } // namespace Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( - std::unordered_set* input_files, - const ColumnFamilyMetaData& cf_meta, - const int output_level) const { + std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, const int output_level) const { auto& levels = cf_meta.levels; auto comparator = icmp_->user_comparator(); @@ -719,18 +717,17 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( // has overlapping key-range with other non-compaction input // files in the same level. while (first_included > 0) { - if (comparator->Compare( - current_files[first_included - 1].largestkey, - current_files[first_included].smallestkey) < 0) { + if (comparator->Compare(current_files[first_included - 1].largestkey, + current_files[first_included].smallestkey) < + 0) { break; } first_included--; } while (last_included < static_cast(current_files.size()) - 1) { - if (comparator->Compare( - current_files[last_included + 1].smallestkey, - current_files[last_included].largestkey) > 0) { + if (comparator->Compare(current_files[last_included + 1].smallestkey, + current_files[last_included].largestkey) > 0) { break; } last_included++; @@ -740,33 +737,31 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( // include all files between the first and the last compaction input files. for (int f = first_included; f <= last_included; ++f) { if (current_files[f].being_compacted) { - return Status::Aborted( - "Necessary compaction input file " + current_files[f].name + - " is currently being compacted."); + return Status::Aborted("Necessary compaction input file " + + current_files[f].name + + " is currently being compacted."); } - input_files->insert( - TableFileNameToNumber(current_files[f].name)); + input_files->insert(TableFileNameToNumber(current_files[f].name)); } // update smallest and largest key if (l == 0) { for (int f = first_included; f <= last_included; ++f) { - if (comparator->Compare( - smallestkey, current_files[f].smallestkey) > 0) { + if (comparator->Compare(smallestkey, current_files[f].smallestkey) > + 0) { smallestkey = current_files[f].smallestkey; } - if (comparator->Compare( - largestkey, current_files[f].largestkey) < 0) { + if (comparator->Compare(largestkey, current_files[f].largestkey) < 0) { largestkey = current_files[f].largestkey; } } } else { - if (comparator->Compare( - smallestkey, current_files[first_included].smallestkey) > 0) { + if (comparator->Compare(smallestkey, + current_files[first_included].smallestkey) > 0) { smallestkey = current_files[first_included].smallestkey; } - if (comparator->Compare( - largestkey, current_files[last_included].largestkey) < 0) { + if (comparator->Compare(largestkey, + current_files[last_included].largestkey) < 0) { largestkey = current_files[last_included].largestkey; } } @@ -783,16 +778,15 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( // time and not by key for (int m = std::max(l, 1); m <= output_level; ++m) { for (auto& next_lv_file : levels[m].files) { - if (HaveOverlappingKeyRanges( - comparator, aggregated_file_meta, next_lv_file)) { + if (HaveOverlappingKeyRanges(comparator, aggregated_file_meta, + next_lv_file)) { if (next_lv_file.being_compacted) { return Status::Aborted( "File " + next_lv_file.name + " that has overlapping key range with one of the compaction " " input file is currently being compacted."); } - input_files->insert( - TableFileNameToNumber(next_lv_file.name)); + input_files->insert(TableFileNameToNumber(next_lv_file.name)); } } } @@ -802,28 +796,25 @@ Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels( Status CompactionPicker::SanitizeCompactionInputFiles( std::unordered_set* input_files, - const ColumnFamilyMetaData& cf_meta, - const int output_level) const { + const ColumnFamilyMetaData& cf_meta, const int output_level) const { assert(static_cast(cf_meta.levels.size()) - 1 == cf_meta.levels[cf_meta.levels.size() - 1].level); if (output_level >= static_cast(cf_meta.levels.size())) { return Status::InvalidArgument( "Output level for column family " + cf_meta.name + " must between [0, " + - ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) + - "]."); + ToString(cf_meta.levels[cf_meta.levels.size() - 1].level) + "]."); } if (output_level > MaxOutputLevel()) { return Status::InvalidArgument( "Exceed the maximum output level defined by " "the current compaction algorithm --- " + - ToString(MaxOutputLevel())); + ToString(MaxOutputLevel())); } if (output_level < 0) { - return Status::InvalidArgument( - "Output level cannot be negative."); + return Status::InvalidArgument("Output level cannot be negative."); } if (input_files->size() == 0) { @@ -831,8 +822,8 @@ Status CompactionPicker::SanitizeCompactionInputFiles( "A compaction must contain at least one file."); } - Status s = SanitizeCompactionInputFilesForAllLevels( - input_files, cf_meta, output_level); + Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta, + output_level); if (!s.ok()) { return s; @@ -846,10 +837,9 @@ Status CompactionPicker::SanitizeCompactionInputFiles( for (auto file_meta : level_meta.files) { if (file_num == TableFileNameToNumber(file_meta.name)) { if (file_meta.being_compacted) { - return Status::Aborted( - "Specified compaction input file " + - MakeTableFileName("", file_num) + - " is already being compacted."); + return Status::Aborted("Specified compaction input file " + + MakeTableFileName("", file_num) + + " is already being compacted."); } found = true; break; @@ -861,8 +851,7 @@ Status CompactionPicker::SanitizeCompactionInputFiles( } if (!found) { return Status::InvalidArgument( - "Specified compaction input file " + - MakeTableFileName("", file_num) + + "Specified compaction input file " + MakeTableFileName("", file_num) + " does not exist in column family " + cf_meta.name + "."); } } @@ -871,8 +860,8 @@ Status CompactionPicker::SanitizeCompactionInputFiles( } #endif // !ROCKSDB_LITE -bool LevelCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage) - const { +bool LevelCompactionPicker::NeedsCompaction( + const VersionStorageInfo* vstorage) const { if (!vstorage->FilesMarkedForCompaction().empty()) { return true; } @@ -1018,7 +1007,7 @@ Compaction* LevelCompactionPicker::PickCompaction( CompactionInputFiles output_level_inputs; output_level_inputs.level = output_level; if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs, - &output_level_inputs, &parent_index, base_index)) { + &output_level_inputs, &parent_index, base_index)) { return nullptr; } @@ -1034,7 +1023,7 @@ Compaction* LevelCompactionPicker::PickCompaction( mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxGrandParentOverlapBytes(level), GetPathId(ioptions_, mutable_cf_options, output_level), - GetCompressionType(ioptions_, vstorage, output_level, + GetCompressionType(ioptions_, vstorage, mutable_cf_options, output_level, vstorage->base_level()), std::move(grandparents), is_manual, score, false /* deletion_compaction */, compaction_reason); @@ -1448,8 +1437,9 @@ uint32_t UniversalCompactionPicker::GetPathId( // considered in this algorithm. So the target size can be violated in // that case. We need to improve it. uint64_t accumulated_size = 0; - uint64_t future_size = file_size * - (100 - ioptions.compaction_options_universal.size_ratio) / 100; + uint64_t future_size = + file_size * (100 - ioptions.compaction_options_universal.size_ratio) / + 100; uint32_t p = 0; assert(!ioptions.db_paths.empty()); for (; p < ioptions.db_paths.size() - 1; p++) { @@ -1473,17 +1463,17 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( unsigned int max_number_of_files_to_compact, const std::vector& sorted_runs, LogBuffer* log_buffer) { unsigned int min_merge_width = - ioptions_.compaction_options_universal.min_merge_width; + ioptions_.compaction_options_universal.min_merge_width; unsigned int max_merge_width = - ioptions_.compaction_options_universal.max_merge_width; + ioptions_.compaction_options_universal.max_merge_width; const SortedRun* sr = nullptr; bool done = false; size_t start_index = 0; unsigned int candidate_count = 0; - unsigned int max_files_to_compact = std::min(max_merge_width, - max_number_of_files_to_compact); + unsigned int max_files_to_compact = + std::min(max_merge_width, max_number_of_files_to_compact); min_merge_width = std::max(min_merge_width, 2U); // Caller checks the size before executing this function. This invariant is @@ -1595,7 +1585,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( uint64_t older_file_size = 0; for (size_t i = sorted_runs.size() - 1; i >= first_index_after; i--) { older_file_size += sorted_runs[i].size; - if (older_file_size * 100L >= total_size * (long) ratio_to_compress) { + if (older_file_size * 100L >= total_size * (long)ratio_to_compress) { enable_compression = false; break; } @@ -1647,8 +1637,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( return new Compaction( vstorage, mutable_cf_options, std::move(inputs), output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage, start_level, 1, - enable_compression), + GetCompressionType(ioptions_, vstorage, mutable_cf_options, start_level, + 1, enable_compression), /* grandparents */ {}, /* is manual */ false, score, false /* deletion_compaction */, compaction_reason); } @@ -1664,8 +1654,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( VersionStorageInfo* vstorage, double score, const std::vector& sorted_runs, LogBuffer* log_buffer) { // percentage flexibilty while reducing size amplification - uint64_t ratio = ioptions_.compaction_options_universal. - max_size_amplification_percent; + uint64_t ratio = + ioptions_.compaction_options_universal.max_size_amplification_percent; unsigned int candidate_count = 0; uint64_t candidate_size = 0; @@ -1676,7 +1666,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( for (size_t loop = 0; loop < sorted_runs.size() - 1; loop++) { sr = &sorted_runs[loop]; if (!sr->being_compacted) { - start_index = loop; // Consider this as the first candidate. + start_index = loop; // Consider this as the first candidate. break; } char file_num_buf[kFormatFileNumberBufSize]; @@ -1688,7 +1678,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( } if (sr == nullptr) { - return nullptr; // no candidate files + return nullptr; // no candidate files } { char file_num_buf[kFormatFileNumberBufSize]; @@ -1773,14 +1763,15 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( vstorage->num_levels() - 1, mutable_cf_options.MaxFileSizeForLevel(vstorage->num_levels() - 1), /* max_grandparent_overlap_bytes */ LLONG_MAX, path_id, - GetCompressionType(ioptions_, vstorage, vstorage->num_levels() - 1, 1), + GetCompressionType(ioptions_, vstorage, mutable_cf_options, + vstorage->num_levels() - 1, 1), /* grandparents */ {}, /* is manual */ false, score, false /* deletion_compaction */, CompactionReason::kUniversalSizeAmplification); } -bool FIFOCompactionPicker::NeedsCompaction(const VersionStorageInfo* vstorage) - const { +bool FIFOCompactionPicker::NeedsCompaction( + const VersionStorageInfo* vstorage) const { const int kLevel0 = 0; return vstorage->CompactionScore(kLevel0) >= 1; } diff --git a/db/compaction_picker.h b/db/compaction_picker.h index c1b0fabbc..fca731959 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -63,23 +63,20 @@ class CompactionPicker { InternalKey** compaction_end, bool* manual_conflict); // The maximum allowed output level. Default value is NumberLevels() - 1. - virtual int MaxOutputLevel() const { - return NumberLevels() - 1; - } + virtual int MaxOutputLevel() const { return NumberLevels() - 1; } virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const = 0; - // Sanitize the input set of compaction input files. - // When the input parameters do not describe a valid compaction, the - // function will try to fix the input_files by adding necessary - // files. If it's not possible to conver an invalid input_files - // into a valid one by adding more files, the function will return a - // non-ok status with specific reason. +// Sanitize the input set of compaction input files. +// When the input parameters do not describe a valid compaction, the +// function will try to fix the input_files by adding necessary +// files. If it's not possible to conver an invalid input_files +// into a valid one by adding more files, the function will return a +// non-ok status with specific reason. #ifndef ROCKSDB_LITE - Status SanitizeCompactionInputFiles( - std::unordered_set* input_files, - const ColumnFamilyMetaData& cf_meta, - const int output_level) const; + Status SanitizeCompactionInputFiles(std::unordered_set* input_files, + const ColumnFamilyMetaData& cf_meta, + const int output_level) const; #endif // ROCKSDB_LITE // Free up the files that participated in a compaction @@ -123,15 +120,15 @@ class CompactionPicker { // Stores the minimal range that covers all entries in inputs in // *smallest, *largest. // REQUIRES: inputs is not empty - void GetRange(const CompactionInputFiles& inputs, - InternalKey* smallest, InternalKey* largest); + void GetRange(const CompactionInputFiles& inputs, InternalKey* smallest, + InternalKey* largest); // Stores the minimal range that covers all entries in inputs1 and inputs2 // in *smallest, *largest. // REQUIRES: inputs is not empty void GetRange(const CompactionInputFiles& inputs1, - const CompactionInputFiles& inputs2, - InternalKey* smallest, InternalKey* largest); + const CompactionInputFiles& inputs2, InternalKey* smallest, + InternalKey* largest); // Add more files to the inputs on "level" to make sure that // no newer version of a key is compacted to "level+1" while leaving an older @@ -166,13 +163,12 @@ class CompactionPicker { const ImmutableCFOptions& ioptions_; - // A helper function to SanitizeCompactionInputFiles() that - // sanitizes "input_files" by adding necessary files. +// A helper function to SanitizeCompactionInputFiles() that +// sanitizes "input_files" by adding necessary files. #ifndef ROCKSDB_LITE virtual Status SanitizeCompactionInputFilesForAllLevels( std::unordered_set* input_files, - const ColumnFamilyMetaData& cf_meta, - const int output_level) const; + const ColumnFamilyMetaData& cf_meta, const int output_level) const; #endif // ROCKSDB_LITE // Keeps track of all compactions that are running on Level0. @@ -192,8 +188,8 @@ class LevelCompactionPicker : public CompactionPicker { VersionStorageInfo* vstorage, LogBuffer* log_buffer) override; - virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const - override; + virtual bool NeedsCompaction( + const VersionStorageInfo* vstorage) const override; // Pick a path ID to place a newly generated file, with its level static uint32_t GetPathId(const ImmutableCFOptions& ioptions, @@ -232,8 +228,8 @@ class UniversalCompactionPicker : public CompactionPicker { virtual int MaxOutputLevel() const override { return NumberLevels() - 1; } - virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const - override; + virtual bool NeedsCompaction( + const VersionStorageInfo* vstorage) const override; private: struct SortedRun { @@ -307,19 +303,17 @@ class FIFOCompactionPicker : public CompactionPicker { InternalKey** compaction_end, bool* manual_conflict) override; // The maximum allowed output level. Always returns 0. - virtual int MaxOutputLevel() const override { - return 0; - } + virtual int MaxOutputLevel() const override { return 0; } - virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const - override; + virtual bool NeedsCompaction( + const VersionStorageInfo* vstorage) const override; }; class NullCompactionPicker : public CompactionPicker { public: NullCompactionPicker(const ImmutableCFOptions& ioptions, - const InternalKeyComparator* icmp) : - CompactionPicker(ioptions, icmp) {} + const InternalKeyComparator* icmp) + : CompactionPicker(ioptions, icmp) {} virtual ~NullCompactionPicker() {} // Always return "nullptr" @@ -342,8 +336,8 @@ class NullCompactionPicker : public CompactionPicker { } // Always returns false. - virtual bool NeedsCompaction(const VersionStorageInfo* vstorage) const - override { + virtual bool NeedsCompaction( + const VersionStorageInfo* vstorage) const override { return false; } }; @@ -351,6 +345,7 @@ class NullCompactionPicker : public CompactionPicker { CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, const VersionStorageInfo* vstorage, + const MutableCFOptions& mutable_cf_options, int level, int base_level, const bool enable_compression = true); diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index cdad513ea..571dca4a6 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -467,7 +467,6 @@ TEST_F(CompactionPickerTest, NeedsCompactionFIFO) { fifo_options_.max_table_files_size = kMaxSize; ioptions_.compaction_options_fifo = fifo_options_; FIFOCompactionPicker fifo_compaction_picker(ioptions_, &icmp_); - UpdateVersionStorageInfo(); // must return false when there's no files. ASSERT_EQ(fifo_compaction_picker.NeedsCompaction(vstorage_.get()), false); diff --git a/db/db_impl.cc b/db/db_impl.cc index 69fdfa7dd..bc9f18177 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -241,7 +241,9 @@ static Status ValidateOptions( return Status::OK(); } -CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions) { +CompressionType GetCompressionFlush( + const ImmutableCFOptions& ioptions, + const MutableCFOptions& mutable_cf_options) { // Compressing memtable flushes might not help unless the sequential load // optimization is used for leveled compaction. Otherwise the CPU and // latency overhead is not offset by saving much space. @@ -258,7 +260,7 @@ CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions) { } if (can_compress) { - return ioptions.compression; + return mutable_cf_options.compression; } else { return kNoCompression; } @@ -1629,6 +1631,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, " Level-0 table #%" PRIu64 ": started", cfd->GetName().c_str(), meta.fd.GetNumber()); + // Get the latest mutable cf options while the mutex is still locked + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); bool paranoid_file_checks = cfd->GetLatestMutableCFOptions()->paranoid_file_checks; { @@ -1639,11 +1644,11 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, snapshots_.GetAll(&earliest_write_conflict_snapshot); s = BuildTable( - dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), - iter.get(), &meta, cfd->internal_comparator(), + dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_, + cfd->table_cache(), iter.get(), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), snapshot_seqs, earliest_write_conflict_snapshot, - GetCompressionFlush(*cfd->ioptions()), + GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), TableFileCreationReason::kRecovery, &event_logger_, job_id); @@ -1690,13 +1695,13 @@ Status DBImpl::FlushMemTableToOutputFile( std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); - FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, - env_options_, versions_.get(), &mutex_, &shutting_down_, - snapshot_seqs, earliest_write_conflict_snapshot, - job_context, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(0U), - GetCompressionFlush(*cfd->ioptions()), stats_, - &event_logger_, mutable_cf_options.report_bg_io_stats); + FlushJob flush_job( + dbname_, cfd, db_options_, mutable_cf_options, env_options_, + versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, + earliest_write_conflict_snapshot, job_context, log_buffer, + directories_.GetDbDir(), directories_.GetDataDir(0U), + GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, + &event_logger_, mutable_cf_options.report_bg_io_stats); FileMetaData file_meta; diff --git a/db/db_impl.h b/db/db_impl.h index 99fbf6963..7e8c05a12 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -344,6 +344,10 @@ class DBImpl : public DB { Status TEST_GetAllImmutableCFOptions( std::unordered_map* iopts_map); + // Return the lastest MutableCFOptions of of a column family + Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family, + MutableCFOptions* mutable_cf_opitons); + Cache* TEST_table_cache() { return table_cache_.get(); } WriteController& TEST_write_controler() { return write_controller_; } diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index a62aa7546..37a58a307 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -168,5 +168,15 @@ uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() { uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { return FindMinPrepLogReferencedByMemTable(); } + +Status DBImpl::TEST_GetLatestMutableCFOptions( + ColumnFamilyHandle* column_family, MutableCFOptions* mutable_cf_options) { + InstrumentedMutexLock l(&mutex_); + + auto cfh = reinterpret_cast(column_family); + *mutable_cf_options = *cfh->cfd()->GetLatestMutableCFOptions(); + return Status::OK(); +} + } // namespace rocksdb #endif // NDEBUG diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index a77c888ea..1a7e98163 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -754,9 +754,8 @@ TEST_F(DBSSTTest, AddExternalSstFile) { env_->CreateDir(sst_files_folder); Options options = CurrentOptions(); options.env = env_; - const ImmutableCFOptions ioptions(options); - SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); // file1.sst (0 => 99) std::string file1 = sst_files_folder + "file1.sst"; @@ -934,9 +933,7 @@ TEST_F(DBSSTTest, AddExternalSstFilePurgeObsoleteFilesBug) { env_->CreateDir(sst_files_folder); Options options = CurrentOptions(); options.env = env_; - const ImmutableCFOptions ioptions(options); - - SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); // file1.sst (0 => 500) std::string sst_file_path = sst_files_folder + "file1.sst"; @@ -985,7 +982,7 @@ TEST_F(DBSSTTest, AddExternalSstFileNoCopy) { options.env = env_; const ImmutableCFOptions ioptions(options); - SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); // file1.sst (0 => 99) std::string file1 = sst_files_folder + "file1.sst"; @@ -1063,7 +1060,6 @@ TEST_F(DBSSTTest, AddExternalSstFileMultiThreaded) { do { env_->CreateDir(sst_files_folder); Options options = CurrentOptions(); - const ImmutableCFOptions ioptions(options); std::atomic thread_num(0); std::function write_file_func = [&]() { @@ -1071,7 +1067,7 @@ TEST_F(DBSSTTest, AddExternalSstFileMultiThreaded) { int range_start = file_idx * keys_per_file; int range_end = range_start + keys_per_file; - SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); ASSERT_OK(sst_file_writer.Open(file_names[file_idx])); @@ -1154,8 +1150,8 @@ TEST_F(DBSSTTest, AddExternalSstFileOverlappingRanges) { env_->CreateDir(sst_files_folder); Options options = CurrentOptions(); DestroyAndReopen(options); - const ImmutableCFOptions ioptions(options); - SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator); + + SstFileWriter sst_file_writer(EnvOptions(), options, options.comparator); printf("Option config = %d\n", option_config_); std::vector> key_ranges; diff --git a/db/db_test.cc b/db/db_test.cc index 38ab6b794..3a496e65c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -10,12 +10,12 @@ // Introduction of SyncPoint effectively disabled building and running this test // in Release build. // which is a pity, it is a good test +#include #include #include #include #include #include -#include #ifndef OS_WIN #include #endif @@ -23,10 +23,10 @@ #include #endif -#include "db/filename.h" -#include "db/dbformat.h" #include "db/db_impl.h" #include "db/db_test_util.h" +#include "db/dbformat.h" +#include "db/filename.h" #include "db/job_context.h" #include "db/version_set.h" #include "db/write_batch_internal.h" @@ -47,27 +47,27 @@ #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "rocksdb/thread_status.h" -#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/optimistic_transaction_db.h" +#include "rocksdb/utilities/write_batch_with_index.h" #include "table/block_based_table_factory.h" #include "table/mock_table.h" #include "table/plain_table_factory.h" #include "table/scoped_arena_iterator.h" +#include "util/compression.h" #include "util/file_reader_writer.h" #include "util/hash.h" -#include "utilities/merge_operators.h" #include "util/logging.h" -#include "util/compression.h" +#include "util/mock_env.h" #include "util/mutexlock.h" #include "util/rate_limiter.h" +#include "util/string_util.h" #include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" -#include "util/mock_env.h" -#include "util/string_util.h" #include "util/thread_status_util.h" #include "util/xfunc.h" +#include "utilities/merge_operators.h" namespace rocksdb { @@ -125,8 +125,8 @@ TEST_F(DBTest, MockEnvTest) { ASSERT_TRUE(!iterator->Valid()); delete iterator; - // TEST_FlushMemTable() is not supported in ROCKSDB_LITE - #ifndef ROCKSDB_LITE +// TEST_FlushMemTable() is not supported in ROCKSDB_LITE +#ifndef ROCKSDB_LITE DBImpl* dbi = reinterpret_cast(db); ASSERT_OK(dbi->TEST_FlushMemTable()); @@ -135,7 +135,7 @@ TEST_F(DBTest, MockEnvTest) { ASSERT_OK(db->Get(ReadOptions(), keys[i], &res)); ASSERT_TRUE(res == vals[i]); } - #endif // ROCKSDB_LITE +#endif // ROCKSDB_LITE delete db; } @@ -322,7 +322,8 @@ TEST_F(DBTest, CompactedDB) { // MultiGet std::vector values; - std::vector status_list = dbfull()->MultiGet(ReadOptions(), + std::vector status_list = dbfull()->MultiGet( + ReadOptions(), std::vector({Slice("aaa"), Slice("ccc"), Slice("eee"), Slice("ggg"), Slice("iii"), Slice("kkk")}), &values); @@ -662,8 +663,8 @@ TEST_F(DBTest, GetFromImmutableLayer) { // Block sync calls env_->delay_sstable_sync_.store(true, std::memory_order_release); - Put(1, "k1", std::string(100000, 'x')); // Fill memtable - Put(1, "k2", std::string(100000, 'y')); // Trigger flush + Put(1, "k1", std::string(100000, 'x')); // Fill memtable + Put(1, "k2", std::string(100000, 'y')); // Trigger flush ASSERT_EQ("v1", Get(1, "foo")); ASSERT_EQ("NOT_FOUND", Get(0, "foo")); // Release sync calls @@ -3192,8 +3193,8 @@ class ModelDB : public DB { virtual Status GetUpdatesSince( rocksdb::SequenceNumber, unique_ptr*, - const TransactionLogIterator::ReadOptions& - read_options = TransactionLogIterator::ReadOptions()) override { + const TransactionLogIterator::ReadOptions& read_options = + TransactionLogIterator::ReadOptions()) override { return Status::NotSupported("Not supported in Model DB"); } @@ -3279,8 +3280,7 @@ static bool CompareIterators(int step, DB* model, DB* db, ok && miter->Valid() && dbiter->Valid(); miter->Next(), dbiter->Next()) { count++; if (miter->key().compare(dbiter->key()) != 0) { - fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n", - step, + fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n", step, EscapeString(miter->key()).c_str(), EscapeString(dbiter->key()).c_str()); ok = false; @@ -3483,7 +3483,6 @@ TEST_F(DBTest, BlockBasedTablePrefixIndexTest) { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.prefix_extractor.reset(NewFixedPrefixTransform(1)); - Reopen(options); ASSERT_OK(Put("k1", "v1")); Flush(); @@ -3682,7 +3681,6 @@ TEST_F(DBTest, TableOptionsSanitizeTest) { ASSERT_OK(TryReopen(options)); } - // On Windows you can have either memory mapped file or a file // with unbuffered access. So this asserts and does not make // sense to run @@ -4912,6 +4910,48 @@ TEST_F(DBTest, DynamicMiscOptions) { dbfull()->TEST_FlushMemTable(true); // No reseek assert_reseek_count(300, 1); + + MutableCFOptions mutable_cf_options; + CreateAndReopenWithCF({"pikachu"}, options); + // Test soft_pending_compaction_bytes_limit, + // hard_pending_compaction_bytes_limit + ASSERT_OK(dbfull()->SetOptions( + handles_[1], {{"soft_pending_compaction_bytes_limit", "200"}, + {"hard_pending_compaction_bytes_limit", "300"}})); + ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1], + &mutable_cf_options)); + ASSERT_EQ(200, mutable_cf_options.soft_pending_compaction_bytes_limit); + ASSERT_EQ(300, mutable_cf_options.hard_pending_compaction_bytes_limit); + // Test report_bg_io_stats + ASSERT_OK( + dbfull()->SetOptions(handles_[1], {{"report_bg_io_stats", "true"}})); + // sanity check + ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1], + &mutable_cf_options)); + ASSERT_EQ(true, mutable_cf_options.report_bg_io_stats); + // Test min_partial_merge_operands + ASSERT_OK( + dbfull()->SetOptions(handles_[1], {{"min_partial_merge_operands", "4"}})); + ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1], + &mutable_cf_options)); + ASSERT_EQ(4, mutable_cf_options.min_partial_merge_operands); + // Test compression + // sanity check + ASSERT_OK(dbfull()->SetOptions({{"compression", "kNoCompression"}})); + ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[0], + &mutable_cf_options)); + ASSERT_EQ(CompressionType::kNoCompression, mutable_cf_options.compression); + ASSERT_OK(dbfull()->SetOptions({{"compression", "kSnappyCompression"}})); + ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[0], + &mutable_cf_options)); + ASSERT_EQ(CompressionType::kSnappyCompression, + mutable_cf_options.compression); + // Test paranoid_file_checks already done in db_block_cache_test + ASSERT_OK( + dbfull()->SetOptions(handles_[1], {{"paranoid_file_checks", "true"}})); + ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1], + &mutable_cf_options)); + ASSERT_EQ(true, mutable_cf_options.report_bg_io_stats); } #endif // ROCKSDB_LITE @@ -4960,7 +5000,7 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) { // iter 3 -- lz4HC // iter 4 -- xpress CompressionType compressions[] = {kZlibCompression, kBZip2Compression, - kLZ4Compression, kLZ4HCCompression, + kLZ4Compression, kLZ4HCCompression, kXpressCompression}; for (auto comp : compressions) { if (!CompressionTypeSupported(comp)) { @@ -5843,7 +5883,7 @@ TEST_F(DBTest, LastWriteBufferDelay) { TEST_F(DBTest, FailWhenCompressionNotSupportedTest) { CompressionType compressions[] = {kZlibCompression, kBZip2Compression, - kLZ4Compression, kLZ4HCCompression, + kLZ4Compression, kLZ4HCCompression, kXpressCompression}; for (auto comp : compressions) { if (!CompressionTypeSupported(comp)) { diff --git a/db/flush_job.cc b/db/flush_job.cc index 949c02622..abe50c35b 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -264,10 +264,10 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); s = BuildTable( - dbname_, db_options_.env, *cfd_->ioptions(), env_options_, - cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), - cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), - cfd_->GetName(), existing_snapshots_, + dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, + env_options_, cfd_->table_cache(), iter.get(), meta, + cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), + cfd_->GetID(), cfd_->GetName(), existing_snapshots_, earliest_write_conflict_snapshot_, output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), diff --git a/db/repair.cc b/db/repair.cc index 1dcffe7e1..437474cf1 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -291,9 +291,11 @@ class Repairer { ro.total_order_seek = true; Arena arena; ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); + MutableCFOptions mutable_cf_options(options_, ioptions_); status = BuildTable( - dbname_, env_, ioptions_, env_options_, table_cache_, iter.get(), - &meta, icmp_, &int_tbl_prop_collector_factories_, + dbname_, env_, ioptions_, mutable_cf_options, env_options_, + table_cache_, iter.get(), &meta, icmp_, + &int_tbl_prop_collector_factories_, TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, std::string() /* column_family_name */, {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false, diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 0a9c29bf6..9417aa2e9 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -75,14 +75,10 @@ struct ImmutableCFOptions { bool purge_redundant_kvs_while_flush; - uint32_t min_partial_merge_operands; - bool disable_data_sync; bool use_fsync; - CompressionType compression; - std::vector compression_per_level; CompressionType bottommost_compression; diff --git a/include/rocksdb/sst_file_writer.h b/include/rocksdb/sst_file_writer.h index fb01feb1f..713ab3ca1 100644 --- a/include/rocksdb/sst_file_writer.h +++ b/include/rocksdb/sst_file_writer.h @@ -8,6 +8,7 @@ #include "rocksdb/env.h" #include "rocksdb/immutable_options.h" #include "rocksdb/types.h" +#include "util/mutable_cf_options.h" namespace rocksdb { @@ -49,8 +50,7 @@ struct ExternalSstFileInfo { // All keys in files generated by SstFileWriter will have sequence number = 0 class SstFileWriter { public: - SstFileWriter(const EnvOptions& env_options, - const ImmutableCFOptions& ioptions, + SstFileWriter(const EnvOptions& env_options, const Options& options, const Comparator* user_comparator); ~SstFileWriter(); diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 0940b0d23..fd70058c8 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -71,25 +71,27 @@ class SstFileWriter::SstFileWriterPropertiesCollectorFactory }; struct SstFileWriter::Rep { - Rep(const EnvOptions& _env_options, const ImmutableCFOptions& _ioptions, + Rep(const EnvOptions& _env_options, const Options& options, const Comparator* _user_comparator) : env_options(_env_options), - ioptions(_ioptions), + ioptions(options), + mutable_cf_options(options, ioptions), internal_comparator(_user_comparator) {} std::unique_ptr file_writer; std::unique_ptr builder; EnvOptions env_options; ImmutableCFOptions ioptions; + MutableCFOptions mutable_cf_options; InternalKeyComparator internal_comparator; ExternalSstFileInfo file_info; std::string column_family_name; }; SstFileWriter::SstFileWriter(const EnvOptions& env_options, - const ImmutableCFOptions& ioptions, + const Options& options, const Comparator* user_comparator) - : rep_(new Rep(env_options, ioptions, user_comparator)) {} + : rep_(new Rep(env_options, options, user_comparator)) {} SstFileWriter::~SstFileWriter() { delete rep_; } @@ -102,7 +104,7 @@ Status SstFileWriter::Open(const std::string& file_path) { return s; } - CompressionType compression_type = r->ioptions.compression; + CompressionType compression_type = r->mutable_cf_options.compression; if (!r->ioptions.compression_per_level.empty()) { // Use the compression of the last level if we have per level compression compression_type = *(r->ioptions.compression_per_level.rbegin()); diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 8d9dc7d64..28b35d30b 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -6,8 +6,9 @@ #pragma once #include -#include "rocksdb/options.h" #include "rocksdb/immutable_options.h" +#include "rocksdb/options.h" +#include "util/compression.h" namespace rocksdb { @@ -47,9 +48,9 @@ struct MutableCFOptions { max_sequential_skip_in_iterations( options.max_sequential_skip_in_iterations), paranoid_file_checks(options.paranoid_file_checks), - report_bg_io_stats(options.report_bg_io_stats) - - { + report_bg_io_stats(options.report_bg_io_stats), + compression(options.compression), + min_partial_merge_operands(options.min_partial_merge_operands) { RefreshDerivedOptions(ioptions); } MutableCFOptions() @@ -80,7 +81,9 @@ struct MutableCFOptions { max_subcompactions(1), max_sequential_skip_in_iterations(0), paranoid_file_checks(false), - report_bg_io_stats(false) {} + report_bg_io_stats(false), + compression(Snappy_Supported() ? kSnappyCompression : kNoCompression), + min_partial_merge_operands(2) {} // Must be called after any change to MutableCFOptions void RefreshDerivedOptions(const ImmutableCFOptions& ioptions); @@ -136,6 +139,8 @@ struct MutableCFOptions { uint64_t max_sequential_skip_in_iterations; bool paranoid_file_checks; bool report_bg_io_stats; + CompressionType compression; + uint32_t min_partial_merge_operands; // Derived options // Per-level target file size. diff --git a/util/options.cc b/util/options.cc index d7f86c877..f73eabca5 100644 --- a/util/options.cc +++ b/util/options.cc @@ -61,10 +61,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) advise_random_on_open(options.advise_random_on_open), bloom_locality(options.bloom_locality), purge_redundant_kvs_while_flush(options.purge_redundant_kvs_while_flush), - min_partial_merge_operands(options.min_partial_merge_operands), disable_data_sync(options.disableDataSync), use_fsync(options.use_fsync), - compression(options.compression), compression_per_level(options.compression_per_level), bottommost_compression(options.bottommost_compression), compression_opts(options.compression_opts), diff --git a/util/options_helper.cc b/util/options_helper.cc index e42394133..98161138e 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -622,6 +622,16 @@ bool ParseMiscOptions(const std::string& name, const std::string& value, new_options->max_sequential_skip_in_iterations = ParseUint64(value); } else if (name == "paranoid_file_checks") { new_options->paranoid_file_checks = ParseBoolean(name, value); + } else if (name == "report_bg_io_stats") { + new_options->report_bg_io_stats = ParseBoolean(name, value); + } else if (name == "compression") { + bool is_ok = ParseEnum(compression_type_string_map, value, + &new_options->compression); + if (!is_ok) { + return false; + } + } else if (name == "min_partial_merge_operands") { + new_options->min_partial_merge_operands = ParseUint32(value); } else { return false; }