diff --git a/INSTALL.md b/INSTALL.md index 607450f85..21e8d26f0 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -85,4 +85,4 @@ SSE4.2 is used to speed up CRC32 when calculating data checksum. We did not run any production workloads on it. * **iOS**: - * Run: `TARGET_OS=IOS make static_lib` + * Run: `TARGET_OS=IOS make static_lib`. When building the project which uses rocksdb iOS library, make sure to define two important pre-processing macros: `ROCKSDB_LITE` and `IOS_CROSS_COMPILE`. diff --git a/Makefile b/Makefile index c6b5404da..8abcfee18 100644 --- a/Makefile +++ b/Makefile @@ -179,22 +179,26 @@ ifneq ($(PLATFORM_SHARED_VERSIONED),true) SHARED1 = ${LIBNAME}.$(PLATFORM_SHARED_EXT) SHARED2 = $(SHARED1) SHARED3 = $(SHARED1) +SHARED4 = $(SHARED1) SHARED = $(SHARED1) else -# Update db.h if you change these. SHARED_MAJOR = $(ROCKSDB_MAJOR) SHARED_MINOR = $(ROCKSDB_MINOR) +SHARED_PATCH = $(ROCKSDB_PATCH) SHARED1 = ${LIBNAME}.$(PLATFORM_SHARED_EXT) SHARED2 = $(SHARED1).$(SHARED_MAJOR) SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR) -SHARED = $(SHARED1) $(SHARED2) $(SHARED3) -$(SHARED1): $(SHARED3) - ln -fs $(SHARED3) $(SHARED1) -$(SHARED2): $(SHARED3) - ln -fs $(SHARED3) $(SHARED2) +SHARED4 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR).$(SHARED_PATCH) +SHARED = $(SHARED1) $(SHARED2) $(SHARED3) $(SHARED4) +$(SHARED1): $(SHARED4) + ln -fs $(SHARED4) $(SHARED1) +$(SHARED2): $(SHARED4) + ln -fs $(SHARED4) $(SHARED2) +$(SHARED3): $(SHARED4) + ln -fs $(SHARED4) $(SHARED3) endif -$(SHARED3): +$(SHARED4): $(CXX) $(PLATFORM_SHARED_LDFLAGS)$(SHARED2) $(CXXFLAGS) $(PLATFORM_SHARED_CFLAGS) $(SOURCES) $(LDFLAGS) -o $@ endif # PLATFORM_SHARED_EXT diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index 8479e3127..c026782f6 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -326,6 +326,10 @@ PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS" VALGRIND_VER="$VALGRIND_VER" +ROCKSDB_MAJOR=`build_tools/version.sh major` +ROCKSDB_MINOR=`build_tools/version.sh minor` +ROCKSDB_PATCH=`build_tools/version.sh patch` + echo "CC=$CC" >> "$OUTPUT" echo "CXX=$CXX" >> "$OUTPUT" echo "PLATFORM=$PLATFORM" >> "$OUTPUT" @@ -341,3 +345,6 @@ echo "PLATFORM_SHARED_VERSIONED=$PLATFORM_SHARED_VERSIONED" >> "$OUTPUT" echo "EXEC_LDFLAGS=$EXEC_LDFLAGS" >> "$OUTPUT" echo "JEMALLOC_INCLUDE=$JEMALLOC_INCLUDE" >> "$OUTPUT" echo "JEMALLOC_LIB=$JEMALLOC_LIB" >> "$OUTPUT" +echo "ROCKSDB_MAJOR=$ROCKSDB_MAJOR" >> "$OUTPUT" +echo "ROCKSDB_MINOR=$ROCKSDB_MINOR" >> "$OUTPUT" +echo "ROCKSDB_PATCH=$ROCKSDB_PATCH" >> "$OUTPUT" diff --git a/build_tools/version.sh b/build_tools/version.sh new file mode 100755 index 000000000..afa7ed277 --- /dev/null +++ b/build_tools/version.sh @@ -0,0 +1,14 @@ +#!/bin/sh +if [ $# == 0 ]; then + echo "Usage: $0 major|minor|patch" + exit 1 +fi +if [ $1 = "major" ]; then + cat include/rocksdb/version.h | grep MAJOR | head -n1 | awk '{print $3}' +fi +if [ $1 = "minor" ]; then + cat include/rocksdb/version.h | grep MINOR | head -n1 | awk '{print $3}' +fi +if [ $1 = "patch" ]; then + cat include/rocksdb/version.h | grep PATCH | head -n1 | awk '{print $3}' +fi diff --git a/db/column_family.cc b/db/column_family.cc index f95090225..0beb23c91 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -230,7 +230,7 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, internal_comparator_(cf_options.comparator), options_(*db_options, SanitizeOptions(&internal_comparator_, cf_options)), ioptions_(options_), - mutable_cf_options_(options_), + mutable_cf_options_(options_, ioptions_), mem_(nullptr), imm_(options_.min_write_buffer_number_to_merge), super_version_(nullptr), @@ -245,27 +245,27 @@ ColumnFamilyData::ColumnFamilyData(uint32_t id, const std::string& name, // if dummy_versions is nullptr, then this is a dummy column family. if (dummy_versions != nullptr) { internal_stats_.reset( - new InternalStats(options_.num_levels, db_options->env, this)); + new InternalStats(ioptions_.num_levels, db_options->env, this)); table_cache_.reset(new TableCache(ioptions_, env_options, table_cache)); - if (options_.compaction_style == kCompactionStyleUniversal) { + if (ioptions_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset( - new UniversalCompactionPicker(&options_, &internal_comparator_)); - } else if (options_.compaction_style == kCompactionStyleLevel) { + new UniversalCompactionPicker(ioptions_, &internal_comparator_)); + } else if (ioptions_.compaction_style == kCompactionStyleLevel) { compaction_picker_.reset( - new LevelCompactionPicker(&options_, &internal_comparator_)); + new LevelCompactionPicker(ioptions_, &internal_comparator_)); } else { - assert(options_.compaction_style == kCompactionStyleFIFO); + assert(ioptions_.compaction_style == kCompactionStyleFIFO); compaction_picker_.reset( - new FIFOCompactionPicker(&options_, &internal_comparator_)); + new FIFOCompactionPicker(ioptions_, &internal_comparator_)); } - Log(options_.info_log, "Options for column family \"%s\":\n", + Log(ioptions_.info_log, "Options for column family \"%s\":\n", name.c_str()); const ColumnFamilyOptions* cf_options = &options_; - cf_options->Dump(options_.info_log.get()); + cf_options->Dump(ioptions_.info_log); } - RecalculateWriteStallConditions(); + RecalculateWriteStallConditions(mutable_cf_options_); } // DB mutex held @@ -318,7 +318,8 @@ ColumnFamilyData::~ColumnFamilyData() { } } -void ColumnFamilyData::RecalculateWriteStallConditions() { +void ColumnFamilyData::RecalculateWriteStallConditions( + const MutableCFOptions& mutable_cf_options) { if (current_ != nullptr) { const double score = current_->MaxCompactionScore(); const int max_level = current_->MaxCompactionScoreLevel(); @@ -328,26 +329,27 @@ void ColumnFamilyData::RecalculateWriteStallConditions() { if (imm()->size() == options_.max_write_buffer_number) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stopping writes because we have %d immutable memtables " "(waiting for flush)", name_.c_str(), imm()->size()); } else if (current_->NumLevelFiles(0) >= - options_.level0_stop_writes_trigger) { + mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES, 1); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), current_->NumLevelFiles(0)); - } else if (options_.level0_slowdown_writes_trigger >= 0 && + } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && current_->NumLevelFiles(0) >= - options_.level0_slowdown_writes_trigger) { + mutable_cf_options.level0_slowdown_writes_trigger) { uint64_t slowdown = SlowdownAmount( - current_->NumLevelFiles(0), options_.level0_slowdown_writes_trigger, - options_.level0_stop_writes_trigger); + current_->NumLevelFiles(0), + mutable_cf_options.level0_slowdown_writes_trigger, + mutable_cf_options.level0_stop_writes_trigger); write_controller_token_ = write_controller->GetDelayToken(slowdown); internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stalling writes because we have %d level-0 files (%" PRIu64 "us)", name_.c_str(), current_->NumLevelFiles(0), slowdown); @@ -358,7 +360,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() { write_controller->GetDelayToken(kHardLimitSlowdown); internal_stats_->RecordLevelNSlowdown(max_level, kHardLimitSlowdown, false); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stalling writes because we hit hard limit on level %d. " "(%" PRIu64 "us)", name_.c_str(), max_level, kHardLimitSlowdown); @@ -368,7 +370,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions() { options_.hard_rate_limit); write_controller_token_ = write_controller->GetDelayToken(slowdown); internal_stats_->RecordLevelNSlowdown(max_level, slowdown, true); - Log(options_.info_log, + Log(ioptions_.info_log, "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64 "us)", name_.c_str(), max_level, slowdown); @@ -393,19 +395,21 @@ void ColumnFamilyData::CreateNewMemtable(const MemTableOptions& moptions) { mem_->Ref(); } -Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) { - auto result = compaction_picker_->PickCompaction(current_, log_buffer); +Compaction* ColumnFamilyData::PickCompaction( + const MutableCFOptions& mutable_options, LogBuffer* log_buffer) { + auto result = compaction_picker_->PickCompaction( + mutable_options, current_, log_buffer); return result; } -Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, - uint32_t output_path_id, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end) { - return compaction_picker_->CompactRange(current_, input_level, output_level, - output_path_id, begin, end, - compaction_end); +Compaction* ColumnFamilyData::CompactRange( + const MutableCFOptions& mutable_cf_options, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end) { + return compaction_picker_->CompactRange( + mutable_cf_options, current_, input_level, output_level, + output_path_id, begin, end, compaction_end); } SuperVersion* ColumnFamilyData::GetReferencedSuperVersion( @@ -443,11 +447,11 @@ SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion( sv = static_cast(ptr); if (sv == SuperVersion::kSVObsolete || sv->version_number != super_version_number_.load()) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES); + RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_ACQUIRES); SuperVersion* sv_to_delete = nullptr; if (sv && sv->Unref()) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_CLEANUPS); + RecordTick(ioptions_.statistics, NUMBER_SUPERVERSION_CLEANUPS); db_mutex->Lock(); // NOTE: underlying resources held by superversion (sst files) might // not be released until the next background job. @@ -502,7 +506,7 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion( // Reset SuperVersions cached in thread local storage ResetThreadLocalSuperVersions(); - RecalculateWriteStallConditions(); + RecalculateWriteStallConditions(mutable_cf_options); if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); @@ -533,6 +537,7 @@ bool ColumnFamilyData::SetOptions( if (GetMutableOptionsFromStrings(mutable_cf_options_, options_map, &new_mutable_cf_options)) { mutable_cf_options_ = new_mutable_cf_options; + mutable_cf_options_.RefreshDerivedOptions(ioptions_); return true; } return false; diff --git a/db/column_family.h b/db/column_family.h index 65b4b53ba..9c415c2a8 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -203,11 +203,14 @@ class ColumnFamilyData { TableCache* table_cache() const { return table_cache_.get(); } // See documentation in compaction_picker.h - Compaction* PickCompaction(LogBuffer* log_buffer); - Compaction* CompactRange(int input_level, int output_level, - uint32_t output_path_id, const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end); + // REQUIRES: DB mutex held + Compaction* PickCompaction(const MutableCFOptions& mutable_options, + LogBuffer* log_buffer); + Compaction* CompactRange( + const MutableCFOptions& mutable_cf_options, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end); CompactionPicker* compaction_picker() { return compaction_picker_.get(); } // thread-safe @@ -260,7 +263,8 @@ class ColumnFamilyData { // recalculation of compaction score. These values are used in // DBImpl::MakeRoomForWrite function to decide, if it need to make // a write stall - void RecalculateWriteStallConditions(); + void RecalculateWriteStallConditions( + const MutableCFOptions& mutable_cf_options); uint32_t id_; const std::string name_; diff --git a/db/compaction.cc b/db/compaction.cc index 28a3174b0..f02feeee7 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -56,7 +56,6 @@ Compaction::Compaction(Version* input_version, int start_level, int out_level, is_full_compaction_(false), is_manual_compaction_(false), level_ptrs_(std::vector(number_levels_)) { - cfd_->Ref(); input_version_->Ref(); edit_ = new VersionEdit(); @@ -267,12 +266,12 @@ void Compaction::Summary(char* output, int len) { snprintf(output + write, len - write, "]"); } -uint64_t Compaction::OutputFilePreallocationSize() { +uint64_t Compaction::OutputFilePreallocationSize( + const MutableCFOptions& mutable_options) { uint64_t preallocation_size = 0; if (cfd_->ioptions()->compaction_style == kCompactionStyleLevel) { - preallocation_size = - cfd_->compaction_picker()->MaxFileSizeForLevel(output_level()); + preallocation_size = mutable_options.MaxFileSizeForLevel(output_level()); } else { for (int level = 0; level < num_input_levels(); ++level) { for (const auto& f : inputs_[level].files) { diff --git a/db/compaction.h b/db/compaction.h index 6000f636b..7c490946a 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -10,6 +10,7 @@ #pragma once #include "util/arena.h" #include "util/autovector.h" +#include "util/mutable_cf_options.h" #include "db/version_set.h" namespace rocksdb { @@ -151,10 +152,14 @@ class Compaction { // Was this compaction triggered manually by the client? bool IsManualCompaction() { return is_manual_compaction_; } + // Return the MutableCFOptions that should be used throughout the compaction + // procedure + const MutableCFOptions* mutable_cf_options() { return &mutable_cf_options_; } + // Returns the size in bytes that the output file should be preallocated to. // In level compaction, that is max_file_size_. In universal compaction, that // is the sum of all input file sizes. - uint64_t OutputFilePreallocationSize(); + uint64_t OutputFilePreallocationSize(const MutableCFOptions& mutable_options); private: friend class CompactionPicker; @@ -171,6 +176,7 @@ class Compaction { const int output_level_; // levels to which output files are stored uint64_t max_output_file_size_; uint64_t max_grandparent_overlap_bytes_; + MutableCFOptions mutable_cf_options_; Version* input_version_; VersionEdit* edit_; int number_levels_; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index eb434eeac..84bd95839 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -35,70 +35,36 @@ namespace { // If enable_compression is false, then compression is always disabled no // matter what the values of the other two parameters are. // Otherwise, the compression type is determined based on options and level. -CompressionType GetCompressionType(const Options& options, int level, - const bool enable_compression = true) { +CompressionType GetCompressionType( + const ImmutableCFOptions& ioptions, int level, + const bool enable_compression = true) { if (!enable_compression) { // disable compression return kNoCompression; } // If the use has specified a different compression level for each level, // then pick the compression for that level. - if (!options.compression_per_level.empty()) { - const int n = options.compression_per_level.size() - 1; + if (!ioptions.compression_per_level.empty()) { + const int n = ioptions.compression_per_level.size() - 1; // It is possible for level_ to be -1; in that case, we use level // 0's compression. This occurs mostly in backwards compatibility // situations when the builder doesn't know what level the file // belongs to. Likewise, if level is beyond the end of the // specified compression levels, use the last value. - return options.compression_per_level[std::max(0, std::min(level, n))]; + return ioptions.compression_per_level[std::max(0, std::min(level, n))]; } else { - return options.compression; + return ioptions.compression; } } -// Multiple two operands. If they overflow, return op1. -uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { - if (op1 == 0) { - return 0; - } - if (op2 <= 0) { - return op1; - } - uint64_t casted_op2 = (uint64_t) op2; - if (std::numeric_limits::max() / op1 < casted_op2) { - return op1; - } - return op1 * casted_op2; -} } // anonymous namespace -CompactionPicker::CompactionPicker(const Options* options, +CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) - : compactions_in_progress_(options->num_levels), - options_(options), - num_levels_(options->num_levels), + : ioptions_(ioptions), + compactions_in_progress_(ioptions_.num_levels), icmp_(icmp) { - - max_file_size_.reset(new uint64_t[NumberLevels()]); - level_max_bytes_.reset(new uint64_t[NumberLevels()]); - int target_file_size_multiplier = options_->target_file_size_multiplier; - int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; - for (int i = 0; i < NumberLevels(); i++) { - if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) { - max_file_size_[i] = ULLONG_MAX; - level_max_bytes_[i] = options_->max_bytes_for_level_base; - } else if (i > 1) { - max_file_size_[i] = MultiplyCheckOverflow(max_file_size_[i - 1], - target_file_size_multiplier); - level_max_bytes_[i] = MultiplyCheckOverflow( - MultiplyCheckOverflow(level_max_bytes_[i - 1], max_bytes_multiplier), - options_->max_bytes_for_level_multiplier_additional[i - 1]); - } else { - max_file_size_[i] = options_->target_file_size_base; - level_max_bytes_[i] = options_->max_bytes_for_level_base; - } - } } CompactionPicker::~CompactionPicker() {} @@ -126,26 +92,6 @@ void CompactionPicker::ReleaseCompactionFiles(Compaction* c, Status status) { } } -uint64_t CompactionPicker::MaxFileSizeForLevel(int level) const { - assert(level >= 0); - assert(level < NumberLevels()); - return max_file_size_[level]; -} - -uint64_t CompactionPicker::MaxGrandParentOverlapBytes(int level) { - uint64_t result = MaxFileSizeForLevel(level); - result *= options_->max_grandparent_overlap_factor; - return result; -} - -double CompactionPicker::MaxBytesForLevel(int level) { - // Note: the result for level zero is not really used since we set - // the level-0 compaction threshold based on number of files. - assert(level >= 0); - assert(level < NumberLevels()); - return level_max_bytes_[level]; -} - void CompactionPicker::GetRange(const std::vector& inputs, InternalKey* smallest, InternalKey* largest) { assert(!inputs.empty()); @@ -214,7 +160,7 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) { // compaction, then we must drop/cancel this compaction. int parent_index = -1; if (c->inputs_[0].empty()) { - Log(options_->info_log, + Log(ioptions_.info_log, "[%s] ExpandWhileOverlapping() failure because zero input files", c->column_family_data()->GetName().c_str()); } @@ -229,12 +175,6 @@ bool CompactionPicker::ExpandWhileOverlapping(Compaction* c) { return true; } -uint64_t CompactionPicker::ExpandedCompactionByteSizeLimit(int level) { - uint64_t result = MaxFileSizeForLevel(level); - result *= options_->expanded_compaction_factor; - return result; -} - // Returns true if any one of specified files are being compacted bool CompactionPicker::FilesInCompaction(std::vector& files) { for (unsigned int i = 0; i < files.size(); i++) { @@ -262,7 +202,8 @@ bool CompactionPicker::ParentRangeInCompaction(Version* version, // Will also attempt to expand "level" if that doesn't expand "level+1" // or cause "level" to include a file for compaction that has an overlapping // user-key with another file. -void CompactionPicker::SetupOtherInputs(Compaction* c) { +void CompactionPicker::SetupOtherInputs( + const MutableCFOptions& mutable_cf_options, Compaction* c) { // If inputs are empty, then there is nothing to expand. // If both input and output levels are the same, no need to consider // files at level "level+1" @@ -298,7 +239,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0].files); const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1].files); const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); - uint64_t limit = ExpandedCompactionByteSizeLimit(level); + uint64_t limit = mutable_cf_options.ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && !FilesInCompaction(expanded0) && @@ -311,7 +252,7 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { &c->parent_index_); if (expanded1.size() == c->inputs_[1].size() && !FilesInCompaction(expanded1)) { - Log(options_->info_log, + Log(ioptions_.info_log, "[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64 " bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n", c->column_family_data()->GetName().c_str(), level, @@ -336,21 +277,20 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { } } -Compaction* CompactionPicker::CompactRange(Version* version, int input_level, - int output_level, - uint32_t output_path_id, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end) { +Compaction* CompactionPicker::CompactRange( + const MutableCFOptions& mutable_cf_options, Version* version, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end) { // CompactionPickerFIFO has its own implementation of compact range - assert(options_->compaction_style != kCompactionStyleFIFO); + assert(ioptions_.compaction_style != kCompactionStyleFIFO); std::vector inputs; bool covering_the_whole_range = true; // All files are 'overlapping' in universal style compaction. // We have to compact the entire range in one shot. - if (options_->compaction_style == kCompactionStyleUniversal) { + if (ioptions_.compaction_style == kCompactionStyleUniversal) { begin = nullptr; end = nullptr; } @@ -364,8 +304,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, // and we must not pick one file and drop another older file if the // two files overlap. if (input_level > 0) { - const uint64_t limit = - MaxFileSizeForLevel(input_level) * options_->source_compaction_factor; + const uint64_t limit = mutable_cf_options.MaxFileSizeForLevel(input_level) * + mutable_cf_options.source_compaction_factor; uint64_t total = 0; for (size_t i = 0; i + 1 < inputs.size(); ++i) { uint64_t s = inputs[i]->compensated_file_size; @@ -378,22 +318,24 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, } } } - assert(output_path_id < static_cast(options_->db_paths.size())); + assert(output_path_id < static_cast(ioptions_.db_paths.size())); Compaction* c = new Compaction( - version, input_level, output_level, MaxFileSizeForLevel(output_level), - MaxGrandParentOverlapBytes(input_level), output_path_id, - GetCompressionType(*options_, output_level)); + version, input_level, output_level, + mutable_cf_options.MaxFileSizeForLevel(output_level), + mutable_cf_options.MaxGrandParentOverlapBytes(input_level), + output_path_id, + GetCompressionType(ioptions_, output_level)); c->inputs_[0].files = inputs; if (ExpandWhileOverlapping(c) == false) { delete c; - Log(options_->info_log, + Log(ioptions_.info_log, "[%s] Could not compact due to expansion failure.\n", version->cfd_->GetName().c_str()); return nullptr; } - SetupOtherInputs(c); + SetupOtherInputs(mutable_cf_options, c); if (covering_the_whole_range) { *compaction_end = nullptr; @@ -408,12 +350,14 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, c->SetupBottomMostLevel(true); c->is_manual_compaction_ = true; + c->mutable_cf_options_ = mutable_cf_options; return c; } -Compaction* LevelCompactionPicker::PickCompaction(Version* version, - LogBuffer* log_buffer) { +Compaction* LevelCompactionPicker::PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) { Compaction* c = nullptr; int level = -1; @@ -421,7 +365,7 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, // and also in LogAndApply(), otherwise the values could be stale. std::vector size_being_compacted(NumberLevels() - 1); SizeBeingCompacted(size_being_compacted); - version->ComputeCompactionScore(size_being_compacted); + version->ComputeCompactionScore(mutable_cf_options, size_being_compacted); // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. @@ -432,7 +376,8 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, version->compaction_score_[i] <= version->compaction_score_[i - 1]); level = version->compaction_level_[i]; if ((version->compaction_score_[i] >= 1)) { - c = PickCompactionBySize(version, level, version->compaction_score_[i]); + c = PickCompactionBySize(mutable_cf_options, version, level, + version->compaction_score_[i]); if (c == nullptr || ExpandWhileOverlapping(c) == false) { delete c; c = nullptr; @@ -472,7 +417,7 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, } // Setup "level+1" files (inputs_[1]) - SetupOtherInputs(c); + SetupOtherInputs(mutable_cf_options, c); // mark all the files that are being compacted c->MarkFilesBeingCompacted(true); @@ -483,12 +428,13 @@ Compaction* LevelCompactionPicker::PickCompaction(Version* version, // remember this currently undergoing compaction compactions_in_progress_[level].insert(c); + c->mutable_cf_options_ = mutable_cf_options; return c; } -Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, - int level, - double score) { +Compaction* LevelCompactionPicker::PickCompactionBySize( + const MutableCFOptions& mutable_cf_options, + Version* version, int level, double score) { Compaction* c = nullptr; // level 0 files are overlapping. So we cannot pick more @@ -501,9 +447,10 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, assert(level >= 0); assert(level + 1 < NumberLevels()); - c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1), - MaxGrandParentOverlapBytes(level), 0, - GetCompressionType(*options_, level + 1)); + c = new Compaction(version, level, level + 1, + mutable_cf_options.MaxFileSizeForLevel(level + 1), + mutable_cf_options.MaxGrandParentOverlapBytes(level), 0, + GetCompressionType(ioptions_, level + 1)); c->score_ = score; // Pick the largest file in this level that is not already @@ -563,13 +510,14 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, // Universal style of compaction. Pick files that are contiguous in // time-range to compact. // -Compaction* UniversalCompactionPicker::PickCompaction(Version* version, - LogBuffer* log_buffer) { +Compaction* UniversalCompactionPicker::PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) { int level = 0; double score = version->compaction_score_[0]; if ((version->files_[level].size() < - (unsigned int)options_->level0_file_num_compaction_trigger)) { + (unsigned int)mutable_cf_options.level0_file_num_compaction_trigger)) { LogToBuffer(log_buffer, "[%s] Universal: nothing to do\n", version->cfd_->GetName().c_str()); return nullptr; @@ -581,17 +529,18 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, // Check for size amplification first. Compaction* c; - if ((c = PickCompactionUniversalSizeAmp(version, score, log_buffer)) != - nullptr) { + if ((c = PickCompactionUniversalSizeAmp( + mutable_cf_options, version, score, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size amp\n", version->cfd_->GetName().c_str()); } else { // Size amplification is within limits. Try reducing read // amplification while maintaining file size ratios. - unsigned int ratio = options_->compaction_options_universal.size_ratio; + unsigned int ratio = ioptions_.compaction_options_universal.size_ratio; - if ((c = PickCompactionUniversalReadAmp(version, score, ratio, UINT_MAX, - log_buffer)) != nullptr) { + if ((c = PickCompactionUniversalReadAmp( + mutable_cf_options, version, score, ratio, + UINT_MAX, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for size ratio\n", version->cfd_->GetName().c_str()); } else { @@ -600,9 +549,10 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, // compaction without looking at filesize ratios and try to reduce // the number of files to fewer than level0_file_num_compaction_trigger. unsigned int num_files = version->files_[level].size() - - options_->level0_file_num_compaction_trigger; + mutable_cf_options.level0_file_num_compaction_trigger; if ((c = PickCompactionUniversalReadAmp( - version, score, UINT_MAX, num_files, log_buffer)) != nullptr) { + mutable_cf_options, version, score, UINT_MAX, + num_files, log_buffer)) != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: compacting for file num\n", version->cfd_->GetName().c_str()); } @@ -628,7 +578,7 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, c->bottommost_level_ = c->inputs_[0].files.back() == last_file; // update statistics - MeasureTime(options_->statistics.get(), + MeasureTime(ioptions_.statistics, NUM_FILES_IN_SINGLE_COMPACTION, c->inputs_[0].size()); // mark all the files that are being compacted @@ -642,11 +592,12 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, c->is_full_compaction_ = (c->inputs_[0].size() == c->input_version_->files_[0].size()); + c->mutable_cf_options_ = mutable_cf_options; return c; } -uint32_t UniversalCompactionPicker::GetPathId(const Options& options, - uint64_t file_size) { +uint32_t UniversalCompactionPicker::GetPathId( + const ImmutableCFOptions& ioptions, uint64_t file_size) { // Two conditions need to be satisfied: // (1) the target path needs to be able to hold the file's size // (2) Total size left in this and previous paths need to be not @@ -662,11 +613,11 @@ uint32_t UniversalCompactionPicker::GetPathId(const Options& options, // considered in this algorithm. So the target size can be violated in // that case. We need to improve it. uint64_t accumulated_size = 0; - uint64_t future_size = - file_size * (100 - options.compaction_options_universal.size_ratio) / 100; + uint64_t future_size = file_size * + (100 - ioptions.compaction_options_universal.size_ratio) / 100; uint32_t p = 0; - for (; p < options.db_paths.size() - 1; p++) { - uint64_t target_size = options.db_paths[p].target_size; + for (; p < ioptions.db_paths.size() - 1; p++) { + uint64_t target_size = ioptions.db_paths[p].target_size; if (target_size > file_size && accumulated_size + (target_size - file_size) > future_size) { return p; @@ -681,14 +632,15 @@ uint32_t UniversalCompactionPicker::GetPathId(const Options& options, // the next file in time order. // Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( - Version* version, double score, unsigned int ratio, + const MutableCFOptions& mutable_cf_options, Version* version, + double score, unsigned int ratio, unsigned int max_number_of_files_to_compact, LogBuffer* log_buffer) { int level = 0; unsigned int min_merge_width = - options_->compaction_options_universal.min_merge_width; + ioptions_.compaction_options_universal.min_merge_width; unsigned int max_merge_width = - options_->compaction_options_universal.max_merge_width; + ioptions_.compaction_options_universal.max_merge_width; // The files are sorted from newest first to oldest last. const auto& files = version->files_[level]; @@ -750,7 +702,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( if (sz < static_cast(f->fd.GetFileSize())) { break; } - if (options_->compaction_options_universal.stop_style == kCompactionStopStyleSimilarSize) { + if (ioptions_.compaction_options_universal.stop_style == + kCompactionStopStyleSimilarSize) { // Similar-size stopping rule: also check the last picked file isn't // far larger than the next candidate file. sz = (f->fd.GetFileSize() * (100.0 + ratio)) / 100.0; @@ -794,7 +747,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // size ratio of compression. bool enable_compression = true; int ratio_to_compress = - options_->compaction_options_universal.compression_size_percent; + ioptions_.compaction_options_universal.compression_size_percent; if (ratio_to_compress >= 0) { uint64_t total_size = version->NumLevelBytes(level); uint64_t older_file_size = 0; @@ -812,11 +765,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( for (unsigned int i = 0; i < first_index_after; i++) { estimated_total_size += files[i]->fd.GetFileSize(); } - uint32_t path_id = GetPathId(*options_, estimated_total_size); + uint32_t path_id = GetPathId(ioptions_, estimated_total_size); Compaction* c = new Compaction( - version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, path_id, - GetCompressionType(*options_, level, enable_compression)); + version, level, level, mutable_cf_options.MaxFileSizeForLevel(level), + LLONG_MAX, path_id, GetCompressionType(ioptions_, level, + enable_compression)); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { @@ -841,11 +795,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // min_merge_width and max_merge_width). // Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( - Version* version, double score, LogBuffer* log_buffer) { + const MutableCFOptions& mutable_cf_options, Version* version, + double score, LogBuffer* log_buffer) { int level = 0; // percentage flexibilty while reducing size amplification - uint64_t ratio = options_->compaction_options_universal. + uint64_t ratio = ioptions_.compaction_options_universal. max_size_amplification_percent; // The files are sorted from newest first to oldest last. @@ -920,20 +875,21 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( "earliest-file-size %" PRIu64, version->cfd_->GetName().c_str(), candidate_size, earliest_file_size); } - assert(start_index >= 0 && start_index < files.size() - 1); + assert(start_index < files.size() - 1); // Estimate total file size uint64_t estimated_total_size = 0; for (unsigned int loop = start_index; loop < files.size(); loop++) { estimated_total_size += files[loop]->fd.GetFileSize(); } - uint32_t path_id = GetPathId(*options_, estimated_total_size); + uint32_t path_id = GetPathId(ioptions_, estimated_total_size); // create a compaction request // We always compact all the files, so always compress. Compaction* c = - new Compaction(version, level, level, MaxFileSizeForLevel(level), - LLONG_MAX, path_id, GetCompressionType(*options_, level)); + new Compaction(version, level, level, + mutable_cf_options.MaxFileSizeForLevel(level), + LLONG_MAX, path_id, GetCompressionType(ioptions_, level)); c->score_ = score; for (unsigned int loop = start_index; loop < files.size(); loop++) { f = c->input_version_->files_[level][loop]; @@ -948,22 +904,23 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( return c; } -Compaction* FIFOCompactionPicker::PickCompaction(Version* version, - LogBuffer* log_buffer) { +Compaction* FIFOCompactionPicker::PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) { assert(version->NumberLevels() == 1); uint64_t total_size = 0; for (const auto& file : version->files_[0]) { total_size += file->compensated_file_size; } - if (total_size <= options_->compaction_options_fifo.max_table_files_size || + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size || version->files_[0].size() == 0) { // total size not exceeded LogToBuffer(log_buffer, "[%s] FIFO compaction: nothing to do. Total size %" PRIu64 ", max size %" PRIu64 "\n", version->cfd_->GetName().c_str(), total_size, - options_->compaction_options_fifo.max_table_files_size); + ioptions_.compaction_options_fifo.max_table_files_size); return nullptr; } @@ -988,28 +945,29 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, LogToBuffer(log_buffer, "[%s] FIFO compaction: picking file %" PRIu64 " with size %s for deletion", version->cfd_->GetName().c_str(), f->fd.GetNumber(), tmp_fsize); - if (total_size <= options_->compaction_options_fifo.max_table_files_size) { + if (total_size <= ioptions_.compaction_options_fifo.max_table_files_size) { break; } } c->MarkFilesBeingCompacted(true); compactions_in_progress_[0].insert(c); - + c->mutable_cf_options_ = mutable_cf_options; return c; } Compaction* FIFOCompactionPicker::CompactRange( + const MutableCFOptions& mutable_cf_options, Version* version, int input_level, int output_level, uint32_t output_path_id, const InternalKey* begin, const InternalKey* end, InternalKey** compaction_end) { assert(input_level == 0); assert(output_level == 0); *compaction_end = nullptr; - LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, options_->info_log.get()); - Compaction* c = PickCompaction(version, &log_buffer); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.info_log); + Compaction* c = PickCompaction(mutable_cf_options, version, &log_buffer); if (c != nullptr) { - assert(output_path_id < static_cast(options_->db_paths.size())); + assert(output_path_id < static_cast(ioptions_.db_paths.size())); c->output_path_id_ = output_path_id; } log_buffer.FlushBufferToLog(); diff --git a/db/compaction_picker.h b/db/compaction_picker.h index c1e27c471..138b97eb4 100644 --- a/db/compaction_picker.h +++ b/db/compaction_picker.h @@ -13,6 +13,7 @@ #include "rocksdb/status.h" #include "rocksdb/options.h" #include "rocksdb/env.h" +#include "util/mutable_cf_options.h" #include #include @@ -26,15 +27,17 @@ class Version; class CompactionPicker { public: - CompactionPicker(const Options* options, const InternalKeyComparator* icmp); + CompactionPicker(const ImmutableCFOptions& ioptions, + const InternalKeyComparator* icmp); virtual ~CompactionPicker(); // Pick level and inputs for a new compaction. // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - virtual Compaction* PickCompaction(Version* version, - LogBuffer* log_buffer) = 0; + virtual Compaction* PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) = 0; // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that @@ -47,11 +50,11 @@ class CompactionPicker { // compaction_end will be set to nullptr. // Client is responsible for compaction_end storage -- when called, // *compaction_end should point to valid InternalKey! - virtual Compaction* CompactRange(Version* version, int input_level, - int output_level, uint32_t output_path_id, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end); + virtual Compaction* CompactRange( + const MutableCFOptions& mutable_cf_options, Version* version, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end); // Given the current number of levels, returns the lowest allowed level // for compaction input. @@ -64,19 +67,8 @@ class CompactionPicker { // compactions per level void SizeBeingCompacted(std::vector& sizes); - // Returns maximum total overlap bytes with grandparent - // level (i.e., level+2) before we stop building a single - // file in level->level+1 compaction. - uint64_t MaxGrandParentOverlapBytes(int level); - - // Returns maximum total bytes of data on a given level. - double MaxBytesForLevel(int level); - - // Get the max file size in a given level. - uint64_t MaxFileSizeForLevel(int level) const; - protected: - int NumberLevels() const { return num_levels_; } + int NumberLevels() const { return ioptions_.num_levels; } // Stores the minimal range that covers all entries in inputs in // *smallest, *largest. @@ -103,8 +95,6 @@ class CompactionPicker { // Will return false if it is impossible to apply this compaction. bool ExpandWhileOverlapping(Compaction* c); - uint64_t ExpandedCompactionByteSizeLimit(int level); - // Returns true if any one of the specified files are being compacted bool FilesInCompaction(std::vector& files); @@ -113,32 +103,27 @@ class CompactionPicker { const InternalKey* largest, int level, int* index); - void SetupOtherInputs(Compaction* c); + void SetupOtherInputs(const MutableCFOptions& mutable_cf_options, + Compaction* c); + + const ImmutableCFOptions& ioptions_; // record all the ongoing compactions for all levels std::vector> compactions_in_progress_; - // Per-level target file size. - std::unique_ptr max_file_size_; - - // Per-level max bytes - std::unique_ptr level_max_bytes_; - - const Options* const options_; private: - int num_levels_; - const InternalKeyComparator* const icmp_; }; class UniversalCompactionPicker : public CompactionPicker { public: - UniversalCompactionPicker(const Options* options, + UniversalCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) - : CompactionPicker(options, icmp) {} - virtual Compaction* PickCompaction(Version* version, - LogBuffer* log_buffer) override; + : CompactionPicker(ioptions, icmp) {} + virtual Compaction* PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) override; // The maxinum allowed input level. Always return 0. virtual int MaxInputLevel(int current_num_levels) const override { @@ -147,27 +132,30 @@ class UniversalCompactionPicker : public CompactionPicker { private: // Pick Universal compaction to limit read amplification - Compaction* PickCompactionUniversalReadAmp(Version* version, double score, - unsigned int ratio, - unsigned int num_files, - LogBuffer* log_buffer); + Compaction* PickCompactionUniversalReadAmp( + const MutableCFOptions& mutable_cf_options, + Version* version, double score, unsigned int ratio, + unsigned int num_files, LogBuffer* log_buffer); // Pick Universal compaction to limit space amplification. - Compaction* PickCompactionUniversalSizeAmp(Version* version, double score, - LogBuffer* log_buffer); + Compaction* PickCompactionUniversalSizeAmp( + const MutableCFOptions& mutable_cf_options, + Version* version, double score, LogBuffer* log_buffer); // Pick a path ID to place a newly generated file, with its estimated file // size. - static uint32_t GetPathId(const Options& options, uint64_t file_size); + static uint32_t GetPathId(const ImmutableCFOptions& ioptions, + uint64_t file_size); }; class LevelCompactionPicker : public CompactionPicker { public: - LevelCompactionPicker(const Options* options, + LevelCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) - : CompactionPicker(options, icmp) {} - virtual Compaction* PickCompaction(Version* version, - LogBuffer* log_buffer) override; + : CompactionPicker(ioptions, icmp) {} + virtual Compaction* PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) override; // Returns current_num_levels - 2, meaning the last level cannot be // compaction input level. @@ -180,23 +168,25 @@ class LevelCompactionPicker : public CompactionPicker { // Returns nullptr if there is no compaction to be done. // If level is 0 and there is already a compaction on that level, this // function will return nullptr. - Compaction* PickCompactionBySize(Version* version, int level, double score); + Compaction* PickCompactionBySize(const MutableCFOptions& mutable_cf_options, + Version* version, int level, double score); }; class FIFOCompactionPicker : public CompactionPicker { public: - FIFOCompactionPicker(const Options* options, + FIFOCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) - : CompactionPicker(options, icmp) {} + : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction(Version* version, - LogBuffer* log_buffer) override; + virtual Compaction* PickCompaction( + const MutableCFOptions& mutable_cf_options, + Version* version, LogBuffer* log_buffer) override; - virtual Compaction* CompactRange(Version* version, int input_level, - int output_level, uint32_t output_path_id, - const InternalKey* begin, - const InternalKey* end, - InternalKey** compaction_end) override; + virtual Compaction* CompactRange( + const MutableCFOptions& mutable_cf_options, Version* version, + int input_level, int output_level, uint32_t output_path_id, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end) override; // The maxinum allowed input level. Always return 0. virtual int MaxInputLevel(int current_num_levels) const override { diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 09d78f89f..4fcea0d5a 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -131,7 +131,7 @@ class CorruptionTest { ASSERT_GE(max_expected, correct); } - void CorruptFile(const std::string fname, int offset, int bytes_to_corrupt) { + void CorruptFile(const std::string& fname, int offset, int bytes_to_corrupt) { struct stat sbuf; if (stat(fname.c_str(), &sbuf) != 0) { const char* msg = strerror(errno); diff --git a/db/cuckoo_table_db_test.cc b/db/cuckoo_table_db_test.cc index 2652d1776..4beee59e4 100644 --- a/db/cuckoo_table_db_test.cc +++ b/db/cuckoo_table_db_test.cc @@ -218,6 +218,7 @@ TEST(CuckooTableDBTest, Uint64Comparator) { // Add more keys. ASSERT_OK(Delete(Uint64Key(2))); // Delete. + dbfull()->TEST_FlushMemTable(); ASSERT_OK(Put(Uint64Key(3), "v0")); // Update. ASSERT_OK(Put(Uint64Key(4), "v4")); dbfull()->TEST_FlushMemTable(); diff --git a/db/db_impl.cc b/db/db_impl.cc index 7f5a382c0..680a22cb3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1410,7 +1410,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // VersionSet::next_file_number_ always to be strictly greater than any // log number versions_->MarkFileNumberUsed(max_log_number + 1); - status = versions_->LogAndApply(cfd, edit, &mutex_); + status = versions_->LogAndApply( + cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_); if (!status.ok()) { // Recovery failed break; @@ -1479,8 +1480,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, } Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, - autovector& mems, VersionEdit* edit, - uint64_t* filenumber, LogBuffer* log_buffer) { + const MutableCFOptions& mutable_cf_options, + const autovector& mems, + VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer) { mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; @@ -1560,7 +1562,8 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, if (base != nullptr && db_options_.max_background_compactions <= 1 && db_options_.max_background_flushes == 0 && cfd->ioptions()->compaction_style == kCompactionStyleLevel) { - level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); + level = base->PickLevelForMemTableOutput( + mutable_cf_options, min_user_key, max_user_key); } edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), meta.fd.GetFileSize(), meta.smallest, meta.largest, @@ -1577,10 +1580,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, return s; } -Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, - bool* madeProgress, - DeletionState& deletion_state, - LogBuffer* log_buffer) { +Status DBImpl::FlushMemTableToOutputFile( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer) { mutex_.AssertHeld(); assert(cfd->imm()->size() != 0); assert(cfd->imm()->IsFlushPending()); @@ -1607,8 +1609,10 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, edit->SetLogNumber(mems.back()->GetNextLogNumber()); edit->SetColumnFamily(cfd->GetID()); + // This will release and re-acquire the mutex. - Status s = WriteLevel0Table(cfd, mems, edit, &file_number, log_buffer); + Status s = WriteLevel0Table(cfd, mutable_cf_options, mems, edit, + &file_number, log_buffer); if (s.ok() && shutting_down_.Acquire_Load() && cfd->IsDropped()) { s = Status::ShutdownInProgress( @@ -1620,14 +1624,13 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, } else { // Replace immutable memtable with the generated Table s = cfd->imm()->InstallMemtableFlushResults( - cfd, mems, versions_.get(), &mutex_, db_options_.info_log.get(), - file_number, &pending_outputs_, &deletion_state.memtables_to_free, - db_directory_.get(), log_buffer); + cfd, mutable_cf_options, mems, versions_.get(), &mutex_, + db_options_.info_log.get(), file_number, &pending_outputs_, + &deletion_state.memtables_to_free, db_directory_.get(), log_buffer); } if (s.ok()) { - // Use latest MutableCFOptions - InstallSuperVersion(cfd, deletion_state); + InstallSuperVersion(cfd, deletion_state, mutable_cf_options); if (madeProgress) { *madeProgress = 1; } @@ -1726,7 +1729,8 @@ bool DBImpl::SetOptions(ColumnFamilyHandle* column_family, } // return the same level if it cannot be moved -int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) { +int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, + const MutableCFOptions& mutable_cf_options, int level) { mutex_.AssertHeld(); Version* current = cfd->current(); int minimum_level = level; @@ -1734,7 +1738,7 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level) { // stop if level i is not empty if (current->NumLevelFiles(i) > 0) break; // stop if level i is too small (cannot fit the level files) - if (cfd->compaction_picker()->MaxBytesForLevel(i) < + if (mutable_cf_options.MaxBytesForLevel(i) < current->NumLevelBytes(level)) { break; } @@ -1770,10 +1774,12 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { bg_cv_.Wait(); } + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); // move to a smaller level int to_level = target_level; if (target_level < 0) { - to_level = FindMinimumEmptyLevelFitting(cfd, level); + to_level = FindMinimumEmptyLevelFitting(cfd, mutable_cf_options, level); } assert(to_level <= level); @@ -1794,9 +1800,10 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { Log(db_options_.info_log, "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), edit.DebugString().data()); - status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get()); - // Use latest MutableCFOptions - superversion_to_free = cfd->InstallSuperVersion(new_superversion, &mutex_); + status = versions_->LogAndApply(cfd, + mutable_cf_options, &edit, &mutex_, db_directory_.get()); + superversion_to_free = cfd->InstallSuperVersion( + new_superversion, &mutex_, mutable_cf_options); new_superversion = nullptr; Log(db_options_.info_log, "[%s] LogAndApply: %s\n", cfd->GetName().c_str(), @@ -2058,6 +2065,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->Ref(); Status flush_status; + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); while (flush_status.ok() && cfd->imm()->IsFlushPending()) { LogToBuffer( log_buffer, @@ -2065,8 +2074,8 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, "family [%s], flush slots available %d", cfd->GetName().c_str(), db_options_.max_background_flushes - bg_flush_scheduled_); - flush_status = FlushMemTableToOutputFile(cfd, madeProgress, - deletion_state, log_buffer); + flush_status = FlushMemTableToOutputFile( + cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer); } if (call_status.ok() && !flush_status.ok()) { call_status = flush_status; @@ -2259,6 +2268,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, // FLUSH preempts compaction Status flush_stat; for (auto cfd : *versions_->GetColumnFamilySet()) { + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); while (cfd->imm()->IsFlushPending()) { LogToBuffer( log_buffer, @@ -2266,8 +2277,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, "compaction slots available %d", db_options_.max_background_compactions - bg_compaction_scheduled_); cfd->Ref(); - flush_stat = FlushMemTableToOutputFile(cfd, madeProgress, deletion_state, - log_buffer); + flush_stat = FlushMemTableToOutputFile( + cfd, mutable_cf_options, madeProgress, deletion_state, log_buffer); cfd->Unref(); if (!flush_stat.ok()) { if (is_manual) { @@ -2281,15 +2292,18 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } } + // Compaction makes a copy of the latest MutableCFOptions. It should be used + // throughout the compaction procedure to make sure consistency. It will + // eventually be installed into SuperVersion unique_ptr c; InternalKey manual_end_storage; InternalKey* manual_end = &manual_end_storage; if (is_manual) { ManualCompaction* m = manual_compaction_; assert(m->in_progress); - c.reset(m->cfd->CompactRange(m->input_level, m->output_level, - m->output_path_id, m->begin, m->end, - &manual_end)); + c.reset(m->cfd->CompactRange( + *m->cfd->GetLatestMutableCFOptions(), m->input_level, m->output_level, + m->output_path_id, m->begin, m->end, &manual_end)); if (!c) { m->done = true; } @@ -2306,7 +2320,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, // no need to refcount in iteration since it's always under a mutex for (auto cfd : *versions_->GetColumnFamilySet()) { if (!cfd->options()->disable_auto_compactions) { - c.reset(cfd->PickCompaction(log_buffer)); + // NOTE: try to avoid unnecessary copy of MutableCFOptions if + // compaction is not necessary. Need to make sure mutex is held + // until we make a copy in the following code + c.reset(cfd->PickCompaction( + *cfd->GetLatestMutableCFOptions(), log_buffer)); if (c != nullptr) { // update statistics MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, @@ -2331,10 +2349,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, for (const auto& f : *c->inputs(0)) { c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); } - status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, - db_directory_.get()); - // Use latest MutableCFOptions - InstallSuperVersion(c->column_family_data(), deletion_state); + status = versions_->LogAndApply( + c->column_family_data(), *c->mutable_cf_options(), c->edit(), + &mutex_, db_directory_.get()); + InstallSuperVersion(c->column_family_data(), deletion_state, + *c->mutable_cf_options()); LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); @@ -2348,10 +2367,12 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); - status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, - db_directory_.get()); + status = versions_->LogAndApply(c->column_family_data(), + *c->mutable_cf_options(), + c->edit(), &mutex_, db_directory_.get()); // Use latest MutableCFOptions - InstallSuperVersion(c->column_family_data(), deletion_state); + InstallSuperVersion(c->column_family_data(), deletion_state, + *c->mutable_cf_options()); Version::LevelSummaryStorage tmp; LogToBuffer( @@ -2366,7 +2387,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, } else { MaybeScheduleFlushOrCompaction(); // do more compaction work in parallel. CompactionState* compact = new CompactionState(c.get()); - status = DoCompactionWork(compact, deletion_state, log_buffer); + status = DoCompactionWork(compact, *c->mutable_cf_options(), + deletion_state, log_buffer); CleanupCompaction(compact, status); c->ReleaseCompactionFiles(status); c->ReleaseInputs(); @@ -2468,7 +2490,8 @@ void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) { } } -Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { +Status DBImpl::OpenCompactionOutputFile( + CompactionState* compact, const MutableCFOptions& mutable_cf_options) { assert(compact != nullptr); assert(compact->builder == nullptr); uint64_t file_number; @@ -2500,7 +2523,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { if (s.ok()) { compact->outfile->SetIOPriority(Env::IO_LOW); compact->outfile->SetPreallocationBlockSize( - compact->compaction->OutputFilePreallocationSize()); + compact->compaction->OutputFilePreallocationSize(mutable_cf_options)); ColumnFamilyData* cfd = compact->compaction->column_family_data(); compact->builder.reset(NewTableBuilder( @@ -2570,7 +2593,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, Status DBImpl::InstallCompactionResults(CompactionState* compact, - LogBuffer* log_buffer) { + const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer) { mutex_.AssertHeld(); // paranoia: verify that the files that we started with @@ -2604,6 +2627,7 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact, out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->column_family_data(), + mutable_cf_options, compact->compaction->edit(), &mutex_, db_directory_.get()); } @@ -2635,8 +2659,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( } uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, - DeletionState& deletion_state, - LogBuffer* log_buffer) { + const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, + LogBuffer* log_buffer) { if (db_options_.max_background_flushes > 0) { // flush thread will take care of this return 0; @@ -2646,7 +2670,8 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, mutex_.Lock(); if (cfd->imm()->IsFlushPending()) { cfd->Ref(); - FlushMemTableToOutputFile(cfd, nullptr, deletion_state, log_buffer); + FlushMemTableToOutputFile(cfd, mutable_cf_options, nullptr, + deletion_state, log_buffer); cfd->Unref(); bg_cv_.SignalAll(); // Wakeup DelayWrite() if necessary } @@ -2658,6 +2683,7 @@ uint64_t DBImpl::CallFlushDuringCompaction(ColumnFamilyData* cfd, } Status DBImpl::ProcessKeyValueCompaction( + const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported, SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot, @@ -2721,7 +2747,8 @@ Status DBImpl::ProcessKeyValueCompaction( // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on // other column families, too - imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer); + imm_micros += CallFlushDuringCompaction( + cfd, mutable_cf_options, deletion_state, log_buffer); Slice key; Slice value; @@ -2922,7 +2949,7 @@ Status DBImpl::ProcessKeyValueCompaction( // Open output file if necessary if (compact->builder == nullptr) { - status = OpenCompactionOutputFile(compact); + status = OpenCompactionOutputFile(compact, mutable_cf_options); if (!status.ok()) { break; } @@ -3059,12 +3086,12 @@ void DBImpl::CallCompactionFilterV2(CompactionState* compact, } Status DBImpl::DoCompactionWork(CompactionState* compact, + const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, LogBuffer* log_buffer) { assert(compact); compact->CleanupBatchBuffer(); compact->CleanupMergedBuffer(); - bool prefix_initialized = false; // Generate file_levels_ for compaction berfore making Iterator compact->compaction->GenerateFileLevels(); @@ -3130,6 +3157,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, if (!compaction_filter_v2) { status = ProcessKeyValueCompaction( + mutable_cf_options, is_snapshot_supported, visible_at_tip, earliest_snapshot, @@ -3149,6 +3177,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // 2) send value_buffer to compaction filter and alternate the values; // 3) merge value_buffer with ineligible_value_buffer; // 4) run the modified "compaction" using the old for loop. + bool prefix_initialized = false; shared_ptr backup_input( versions_->MakeInputIterator(compact->compaction)); backup_input->SeekToFirst(); @@ -3158,7 +3187,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // TODO(icanadi) this currently only checks if flush is necessary on // compacting column family. we should also check if flush is necessary on // other column families, too - imm_micros += CallFlushDuringCompaction(cfd, deletion_state, log_buffer); + imm_micros += CallFlushDuringCompaction(cfd, mutable_cf_options, + deletion_state, log_buffer); Slice key = backup_input->key(); Slice value = backup_input->value(); @@ -3208,6 +3238,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // Done buffering for the current prefix. Spit it out to disk // Now just iterate through all the kv-pairs status = ProcessKeyValueCompaction( + mutable_cf_options, is_snapshot_supported, visible_at_tip, earliest_snapshot, @@ -3244,6 +3275,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); status = ProcessKeyValueCompaction( + mutable_cf_options, is_snapshot_supported, visible_at_tip, earliest_snapshot, @@ -3266,6 +3298,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, } compact->MergeKeyValueSliceBuffer(&cfd->internal_comparator()); status = ProcessKeyValueCompaction( + mutable_cf_options, is_snapshot_supported, visible_at_tip, earliest_snapshot, @@ -3333,9 +3366,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, ReleaseCompactionUnusedFileNumbers(compact); if (status.ok()) { - status = InstallCompactionResults(compact, log_buffer); - // Use latest MutableCFOptions - InstallSuperVersion(cfd, deletion_state); + status = InstallCompactionResults(compact, mutable_cf_options, log_buffer); + InstallSuperVersion(cfd, deletion_state, mutable_cf_options); } Version::LevelSummaryStorage tmp; LogToBuffer( @@ -3434,16 +3466,16 @@ Status DBImpl::Get(const ReadOptions& options, // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersion(ColumnFamilyData* cfd, - DeletionState& deletion_state) { +void DBImpl::InstallSuperVersion( + ColumnFamilyData* cfd, DeletionState& deletion_state, + const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); // if new_superversion == nullptr, it means somebody already used it SuperVersion* new_superversion = (deletion_state.new_superversion != nullptr) ? deletion_state.new_superversion : new SuperVersion(); - // Use latest MutableCFOptions SuperVersion* old_superversion = - cfd->InstallSuperVersion(new_superversion, &mutex_); + cfd->InstallSuperVersion(new_superversion, &mutex_, mutable_cf_options); deletion_state.new_superversion = nullptr; deletion_state.superversions_to_free.push_back(old_superversion); } @@ -3627,15 +3659,17 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& options, // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object - Status s = versions_->LogAndApply(nullptr, &edit, &mutex_, - db_directory_.get(), false, &options); + Options opt(db_options_, options); + Status s = versions_->LogAndApply(nullptr, + MutableCFOptions(opt, ImmutableCFOptions(opt)), + &edit, &mutex_, db_directory_.get(), false, &options); if (s.ok()) { single_column_family_mode_ = false; auto cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - // Use latest MutableCFOptions - delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_); + delete cfd->InstallSuperVersion(new SuperVersion(), &mutex_, + *cfd->GetLatestMutableCFOptions()); *handle = new ColumnFamilyHandleImpl(cfd, this, &mutex_); Log(db_options_.info_log, "Created column family [%s] (ID %u)", column_family_name.c_str(), (unsigned)cfd->GetID()); @@ -3671,7 +3705,8 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { WriteThread::Writer w(&mutex_); s = write_thread_.EnterWriteThread(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job - s = versions_->LogAndApply(cfd, &edit, &mutex_); + s = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_); write_thread_.ExitWriteThread(&w, &w, s); } } @@ -4037,11 +4072,10 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { RecordTick(stats_, WAL_FILE_BYTES, log_size); if (status.ok() && options.sync) { RecordTick(stats_, WAL_FILE_SYNCED); + StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); if (db_options_.use_fsync) { - StopWatch(env_, stats_, WAL_FILE_SYNC_MICROS); status = log_->file()->Fsync(); } else { - StopWatch(env_, stats_, WAL_FILE_SYNC_MICROS); status = log_->file()->Sync(); } } @@ -4451,9 +4485,11 @@ Status DBImpl::DeleteFile(std::string name) { } } edit.DeleteFile(level, number); - status = versions_->LogAndApply(cfd, &edit, &mutex_, db_directory_.get()); + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit, &mutex_, db_directory_.get()); if (status.ok()) { - InstallSuperVersion(cfd, deletion_state); + InstallSuperVersion(cfd, deletion_state, + *cfd->GetLatestMutableCFOptions()); } FindObsoleteFiles(deletion_state, false); } // lock released here @@ -4682,8 +4718,8 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - // Use latest MutableCFOptions - delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_); + delete cfd->InstallSuperVersion(new SuperVersion(), &impl->mutex_, + *cfd->GetLatestMutableCFOptions()); } impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); diff --git a/db/db_impl.h b/db/db_impl.h index c6baf9c95..f1a81e00c 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -347,9 +347,9 @@ class DBImpl : public DB { // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. - Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, bool* madeProgress, - DeletionState& deletion_state, - LogBuffer* log_buffer); + Status FlushMemTableToOutputFile( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + bool* madeProgress, DeletionState& deletion_state, LogBuffer* log_buffer); // REQUIRES: log_numbers are sorted in ascending order Status RecoverLogFiles(const std::vector& log_numbers, @@ -362,9 +362,10 @@ class DBImpl : public DB { // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit); - Status WriteLevel0Table(ColumnFamilyData* cfd, autovector& mems, - VersionEdit* edit, uint64_t* filenumber, - LogBuffer* log_buffer); + Status WriteLevel0Table(ColumnFamilyData* cfd, + const MutableCFOptions& mutable_cf_options, + const autovector& mems, + VersionEdit* edit, uint64_t* filenumber, LogBuffer* log_buffer); void DelayWrite(uint64_t expiration_time); @@ -393,6 +394,7 @@ class DBImpl : public DB { LogBuffer* log_buffer); void CleanupCompaction(CompactionState* compact, Status status); Status DoCompactionWork(CompactionState* compact, + const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, LogBuffer* log_buffer); @@ -400,12 +402,13 @@ class DBImpl : public DB { // preempt compaction, since it's higher prioirty // Returns: micros spent executing uint64_t CallFlushDuringCompaction(ColumnFamilyData* cfd, - DeletionState& deletion_state, - LogBuffer* log_buffer); + const MutableCFOptions& mutable_cf_options, DeletionState& deletion_state, + LogBuffer* log_buffer); // Call compaction filter if is_compaction_v2 is not true. Then iterate // through input and compact the kv-pairs Status ProcessKeyValueCompaction( + const MutableCFOptions& mutable_cf_options, bool is_snapshot_supported, SequenceNumber visible_at_tip, SequenceNumber earliest_snapshot, @@ -422,10 +425,11 @@ class DBImpl : public DB { void CallCompactionFilterV2(CompactionState* compact, CompactionFilterV2* compaction_filter_v2); - Status OpenCompactionOutputFile(CompactionState* compact); + Status OpenCompactionOutputFile(CompactionState* compact, + const MutableCFOptions& mutable_cf_options); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status InstallCompactionResults(CompactionState* compact, - LogBuffer* log_buffer); + const MutableCFOptions& mutable_cf_options, LogBuffer* log_buffer); void AllocateCompactionOutputFileNumbers(CompactionState* compact); void ReleaseCompactionUnusedFileNumbers(CompactionState* compact); @@ -467,7 +471,8 @@ class DBImpl : public DB { // Return the minimum empty level that could hold the total data in the // input level. Return the input level, if such level could not be found. - int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, int level); + int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, + const MutableCFOptions& mutable_cf_options, int level); // Move the files in the input level to the target level. // If target_level < 0, automatically calculate the minimum level that could @@ -621,7 +626,8 @@ class DBImpl : public DB { // the cfd->InstallSuperVersion() function. Background threads carry // deletion_state which can have new_superversion already allocated. void InstallSuperVersion(ColumnFamilyData* cfd, - DeletionState& deletion_state); + DeletionState& deletion_state, + const MutableCFOptions& mutable_cf_options); // Find Super version and reference it. Based on options, it might return // the thread local cached one. diff --git a/db/db_iter.cc b/db/db_iter.cc index db86ebc2c..815562c9f 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -287,7 +287,6 @@ void DBIter::MergeValuesNewToOld() { std::deque operands; operands.push_front(iter_->value().ToString()); - std::string merge_result; // Temporary string to hold merge result later ParsedInternalKey ikey; for (iter_->Next(); iter_->Valid(); iter_->Next()) { if (!ParseKey(&ikey)) { diff --git a/db/db_test.cc b/db/db_test.cc index c09cc74df..d402a3578 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -874,6 +874,18 @@ class DBTest { return atoi(property.c_str()); } + uint64_t SizeAtLevel(int level) { + std::vector metadata; + db_->GetLiveFilesMetaData(&metadata); + uint64_t sum = 0; + for (const auto& m : metadata) { + if (m.level == level) { + sum += m.size; + } + } + return sum; + } + int TotalTableFiles(int cf = 0, int levels = -1) { if (levels == -1) { levels = CurrentOptions().num_levels; @@ -4672,9 +4684,9 @@ TEST(DBTest, CompactionFilterContextManual) { ASSERT_EQ(NumTableFilesAtLevel(0), 1); // Verify total number of keys is correct after manual compaction. - int count = 0; - int total = 0; { + int count = 0; + int total = 0; Arena arena; ScopedArenaIterator iter(dbfull()->TEST_NewInternalIterator(&arena)); iter->SeekToFirst(); @@ -6138,7 +6150,7 @@ class WrappedBloom : public FilterPolicy { const FilterPolicy* filter_; mutable uint32_t counter_; - rocksdb::Slice convertKey(const rocksdb::Slice key) const { + rocksdb::Slice convertKey(const rocksdb::Slice& key) const { return key; } }; @@ -8193,7 +8205,6 @@ static void RandomTimeoutWriter(void* arg) { if (write_opt.timeout_hint_us == 0 || put_duration + kTimerBias < write_opt.timeout_hint_us) { ASSERT_OK(s); - std::string result; } if (s.IsTimedOut()) { timeout_count++; @@ -8527,6 +8538,102 @@ TEST(DBTest, DisableDataSyncTest) { } } +TEST(DBTest, DynamicCompactionOptions) { + const uint64_t k64KB = 1 << 16; + const uint64_t k128KB = 1 << 17; + const uint64_t k256KB = 1 << 18; + const uint64_t k5KB = 5 * 1024; + Options options; + options.env = env_; + options.create_if_missing = true; + options.compression = kNoCompression; + options.max_background_compactions = 4; + options.hard_rate_limit = 1.1; + options.write_buffer_size = k128KB; + options.max_write_buffer_number = 2; + // Compaction related options + options.level0_file_num_compaction_trigger = 3; + options.level0_slowdown_writes_trigger = 10; + options.level0_stop_writes_trigger = 20; + options.max_grandparent_overlap_factor = 10; + options.expanded_compaction_factor = 25; + options.source_compaction_factor = 1; + options.target_file_size_base = k128KB; + options.target_file_size_multiplier = 1; + options.max_bytes_for_level_base = k256KB; + options.max_bytes_for_level_multiplier = 4; + DestroyAndReopen(&options); + + auto gen_l0_kb = [this](int start, int size, int stride = 1) { + Random rnd(301); + std::vector values; + for (int i = 0; i < size; i++) { + values.push_back(RandomString(&rnd, 1024)); + ASSERT_OK(Put(Key(start + stride * i), values[i])); + } + dbfull()->TEST_WaitForFlushMemTable(); + }; + + // Write 3 files that have the same key range, trigger compaction and + // result in one L1 file + gen_l0_kb(0, 128); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + gen_l0_kb(0, 128); + ASSERT_EQ(NumTableFilesAtLevel(0), 2); + gen_l0_kb(0, 128); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,1", FilesPerLevel()); + std::vector metadata; + db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(1U, metadata.size()); + ASSERT_LE(metadata[0].size, k128KB + k5KB); // < 128KB + 5KB + ASSERT_GE(metadata[0].size, k128KB - k5KB); // > 128B - 5KB + + // Make compaction trigger and file size smaller + ASSERT_TRUE(dbfull()->SetOptions({ + {"level0_file_num_compaction_trigger", "2"}, + {"target_file_size_base", "65536"} + })); + + gen_l0_kb(0, 128); + ASSERT_EQ("1,1", FilesPerLevel()); + gen_l0_kb(0, 128); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ("0,2", FilesPerLevel()); + metadata.clear(); + db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(2U, metadata.size()); + ASSERT_LE(metadata[0].size, k64KB + k5KB); // < 64KB + 5KB + ASSERT_GE(metadata[0].size, k64KB - k5KB); // > 64KB - 5KB + + // Change base level size to 1MB + ASSERT_TRUE(dbfull()->SetOptions({ {"max_bytes_for_level_base", "1048576"} })); + + // writing 56 x 128KB => 7MB + // (L1 + L2) = (1 + 4) * 1MB = 5MB + for (int i = 0; i < 56; ++i) { + gen_l0_kb(i, 128, 56); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(SizeAtLevel(1) < 1048576 * 1.1); + ASSERT_TRUE(SizeAtLevel(2) < 4 * 1048576 * 1.1); + + // Change multiplier to 2 with smaller base + ASSERT_TRUE(dbfull()->SetOptions({ + {"max_bytes_for_level_multiplier", "2"}, + {"max_bytes_for_level_base", "262144"} + })); + + // writing 16 x 128KB + // (L1 + L2 + L3) = (1 + 2 + 4) * 256KB + for (int i = 0; i < 16; ++i) { + gen_l0_kb(i, 128, 50); + } + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(SizeAtLevel(1) < 262144 * 1.1); + ASSERT_TRUE(SizeAtLevel(2) < 2 * 262144 * 1.1); + ASSERT_TRUE(SizeAtLevel(3) < 4 * 262144 * 1.1); +} } // namespace rocksdb diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index a5af31284..f1cd4b040 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -148,7 +148,6 @@ class DeleteFileTest { TEST(DeleteFileTest, AddKeysAndQueryLevels) { CreateTwoLevels(); std::vector metadata; - std::vector keysinlevel; db_->GetLiveFilesMetaData(&metadata); std::string level1file = ""; diff --git a/db/forward_iterator.cc b/db/forward_iterator.cc index 6b78c4037..684045e05 100644 --- a/db/forward_iterator.cc +++ b/db/forward_iterator.cc @@ -125,7 +125,8 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options, mutable_iter_(nullptr), current_(nullptr), valid_(false), - is_prev_set_(false) {} + is_prev_set_(false), + is_prev_inclusive_(false) {} ForwardIterator::~ForwardIterator() { Cleanup(); @@ -314,11 +315,12 @@ void ForwardIterator::SeekInternal(const Slice& internal_key, } } - if (seek_to_first || immutable_min_heap_.empty()) { + if (seek_to_first) { is_prev_set_ = false; } else { prev_key_.SetKey(internal_key); is_prev_set_ = true; + is_prev_inclusive_ = true; } } else if (current_ && current_ != mutable_iter_) { // current_ is one of immutable iterators, push it back to the heap @@ -343,8 +345,20 @@ void ForwardIterator::Next() { } } else if (current_ != mutable_iter_) { // It is going to advance immutable iterator - prev_key_.SetKey(current_->key()); - is_prev_set_ = true; + + bool update_prev_key = true; + if (is_prev_set_ && prefix_extractor_) { + // advance prev_key_ to current_ only if they share the same prefix + update_prev_key = + prefix_extractor_->Transform(prev_key_.GetKey()).compare( + prefix_extractor_->Transform(current_->key())) == 0; + } + + if (update_prev_key) { + prev_key_.SetKey(current_->key()); + is_prev_set_ = true; + is_prev_inclusive_ = false; + } } current_->Next(); @@ -476,7 +490,14 @@ void ForwardIterator::UpdateCurrent() { } bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { - if (!valid_ || !is_prev_set_) { + // We maintain the interval (prev_key_, immutable_min_heap_.top()->key()) + // such that there are no records with keys within that range in + // immutable_min_heap_. Since immutable structures (SST files and immutable + // memtables) can't change in this version, we don't need to do a seek if + // 'target' belongs to that interval (immutable_min_heap_.top() is already + // at the correct position). + + if (!valid_ || !current_ || !is_prev_set_) { return true; } Slice prev_key = prev_key_.GetKey(); @@ -485,13 +506,17 @@ bool ForwardIterator::NeedToSeekImmutable(const Slice& target) { return true; } if (cfd_->internal_comparator().InternalKeyComparator::Compare( - prev_key, target) >= 0) { + prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) { return true; } - if (immutable_min_heap_.empty() || - cfd_->internal_comparator().InternalKeyComparator::Compare( - target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key() - : current_->key()) > 0) { + + if (immutable_min_heap_.empty() && current_ == mutable_iter_) { + // Nothing to seek on. + return false; + } + if (cfd_->internal_comparator().InternalKeyComparator::Compare( + target, current_ == mutable_iter_ ? immutable_min_heap_.top()->key() + : current_->key()) > 0) { return true; } return false; diff --git a/db/forward_iterator.h b/db/forward_iterator.h index 653a0ac0c..4d3761ee1 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -101,6 +101,7 @@ class ForwardIterator : public Iterator { IterKey prev_key_; bool is_prev_set_; + bool is_prev_inclusive_; Arena arena_; }; diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index 3a5535d2d..eba0a2787 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -60,7 +60,8 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); } - ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu)); + ASSERT_OK(vset->LogAndApply(default_cfd, + *default_cfd->GetLatestMutableCFOptions(), &vbase, &mu)); } for (int i = 0; i < iters; i++) { @@ -69,7 +70,8 @@ void BM_LogAndApply(int iters, int num_base_files) { InternalKey start(MakeKey(2 * fnum), 1, kTypeValue); InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); - vset->LogAndApply(default_cfd, &vedit, &mu); + vset->LogAndApply(default_cfd, *default_cfd->GetLatestMutableCFOptions(), + &vedit, &mu); } delete vset; } diff --git a/db/memtable.cc b/db/memtable.cc index bdfbc805f..b9b99a684 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -413,7 +413,6 @@ static bool SaveValue(void* arg, const char* entry) { *(s->found_final_value) = true; return false; } - std::string merge_result; // temporary area for merge results later Slice v = GetLengthPrefixedSlice(key_ptr + key_length); *(s->merge_in_progress) = true; merge_context->PushOperand(v); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 728b1c0a0..bd48f1f47 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -160,7 +160,8 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, // Record a successful flush in the manifest file Status MemTableList::InstallMemtableFlushResults( - ColumnFamilyData* cfd, const autovector& mems, VersionSet* vset, + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + const autovector& mems, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, FileNumToPathIdMap* pending_outputs, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer) { @@ -197,7 +198,7 @@ Status MemTableList::InstallMemtableFlushResults( cfd->GetName().c_str(), (unsigned long)m->file_number_); // this can release and reacquire the mutex. - s = vset->LogAndApply(cfd, &m->edit_, mu, db_directory); + s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory); // we will be changing the version in the next code path, // so we better create a new one, since versions are immutable diff --git a/db/memtable_list.h b/db/memtable_list.h index 92688825a..d93c7df92 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -113,7 +113,8 @@ class MemTableList { // Commit a successful flush in the manifest file Status InstallMemtableFlushResults( - ColumnFamilyData* cfd, const autovector& m, VersionSet* vset, + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + const autovector& m, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, FileNumToPathIdMap* pending_outputs, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer); diff --git a/db/repair.cc b/db/repair.cc index 2773d4c71..80fb92bd9 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -220,7 +220,7 @@ class Repairer { Slice record; WriteBatch batch; MemTable* mem = new MemTable(icmp_, ioptions_, - MemTableOptions(MutableCFOptions(options_), options_)); + MemTableOptions(MutableCFOptions(options_, ioptions_), options_)); auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem, &options_); mem->Ref(); int counter = 0; diff --git a/db/version_set.cc b/db/version_set.cc index 10649fa6c..78241d1f0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -672,7 +672,7 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, } } -void Version::Get(const ReadOptions& options, +void Version::Get(const ReadOptions& read_options, const LookupKey& k, std::string* value, Status* status, @@ -691,8 +691,8 @@ void Version::Get(const ReadOptions& options, &file_indexer_, user_comparator_, internal_comparator_); FdWithKeyRange* f = fp.GetNextFile(); while (f != nullptr) { - *status = table_cache_->Get(options, *internal_comparator_, f->fd, ikey, - &get_context); + *status = table_cache_->Get(read_options, *internal_comparator_, f->fd, + ikey, &get_context); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -746,9 +746,10 @@ void Version::GenerateFileLevels() { } } -void Version::PrepareApply(std::vector& size_being_compacted) { +void Version::PrepareApply(const MutableCFOptions& mutable_cf_options, + std::vector& size_being_compacted) { UpdateTemporaryStats(); - ComputeCompactionScore(size_being_compacted); + ComputeCompactionScore(mutable_cf_options, size_being_compacted); UpdateFilesBySize(); UpdateNumNonEmptyLevels(); file_indexer_.UpdateIndex(&arena_, num_non_empty_levels_, files_); @@ -817,13 +818,13 @@ void Version::UpdateTemporaryStats() { } void Version::ComputeCompactionScore( + const MutableCFOptions& mutable_cf_options, std::vector& size_being_compacted) { double max_score = 0; int max_score_level = 0; int max_input_level = cfd_->compaction_picker()->MaxInputLevel(NumberLevels()); - for (int level = 0; level <= max_input_level; level++) { double score; if (level == 0) { @@ -849,21 +850,22 @@ void Version::ComputeCompactionScore( if (cfd_->ioptions()->compaction_style == kCompactionStyleFIFO) { score = static_cast(total_size) / cfd_->options()->compaction_options_fifo.max_table_files_size; - } else if (numfiles >= cfd_->options()->level0_stop_writes_trigger) { + } else if (numfiles >= mutable_cf_options.level0_stop_writes_trigger) { // If we are slowing down writes, then we better compact that first score = 1000000; - } else if (numfiles >= cfd_->options()->level0_slowdown_writes_trigger) { + } else if (numfiles >= + mutable_cf_options.level0_slowdown_writes_trigger) { score = 10000; } else { score = static_cast(numfiles) / - cfd_->options()->level0_file_num_compaction_trigger; + mutable_cf_options.level0_file_num_compaction_trigger; } } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = TotalCompensatedFileSize(files_[level]) - size_being_compacted[level]; score = static_cast(level_bytes) / - cfd_->compaction_picker()->MaxBytesForLevel(level); + mutable_cf_options.MaxBytesForLevel(level); if (max_score < score) { max_score = score; max_score_level = level; @@ -993,6 +995,7 @@ bool Version::OverlapInLevel(int level, } int Version::PickLevelForMemTableOutput( + const MutableCFOptions& mutable_cf_options, const Slice& smallest_user_key, const Slice& largest_user_key) { int level = 0; @@ -1013,7 +1016,7 @@ int Version::PickLevelForMemTableOutput( } GetOverlappingInputs(level + 2, &start, &limit, &overlaps); const uint64_t sum = TotalFileSize(overlaps); - if (sum > cfd_->compaction_picker()->MaxGrandParentOverlapBytes(level)) { + if (sum > mutable_cf_options.MaxGrandParentOverlapBytes(level)) { break; } level++; @@ -1216,7 +1219,7 @@ bool Version::HasOverlappingUserKey( // Check the last file in inputs against the file after it size_t last_file = FindFile(cfd_->internal_comparator(), file_level, inputs->back()->largest.Encode()); - assert(0 <= last_file && last_file < kNumFiles); // File should exist! + assert(last_file < kNumFiles); // File should exist! if (last_file < kNumFiles-1) { // If not the last file const Slice last_key_in_input = ExtractUserKey( files[last_file].largest_key); @@ -1231,7 +1234,7 @@ bool Version::HasOverlappingUserKey( // Check the first file in inputs against the file just before it size_t first_file = FindFile(cfd_->internal_comparator(), file_level, inputs->front()->smallest.Encode()); - assert(0 <= first_file && first_file <= last_file); // File should exist! + assert(first_file <= last_file); // File should exist! if (first_file > 0) { // If not first file const Slice& first_key_in_input = ExtractUserKey( files[first_file].smallest_key); @@ -1246,7 +1249,7 @@ bool Version::HasOverlappingUserKey( return false; } -int64_t Version::NumLevelBytes(int level) const { +uint64_t Version::NumLevelBytes(int level) const { assert(level >= 0); assert(level < NumberLevels()); return TotalFileSize(files_[level]); @@ -1653,16 +1656,17 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, } Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, + const MutableCFOptions& mutable_cf_options, VersionEdit* edit, port::Mutex* mu, Directory* db_directory, bool new_descriptor_log, - const ColumnFamilyOptions* options) { + const ColumnFamilyOptions* new_cf_options) { mu->AssertHeld(); // column_family_data can be nullptr only if this is column_family_add. // in that case, we also need to specify ColumnFamilyOptions if (column_family_data == nullptr) { assert(edit->is_column_family_add_); - assert(options != nullptr); + assert(new_cf_options != nullptr); } // queue our request @@ -1777,7 +1781,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!edit->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. - v->PrepareApply(size_being_compacted); + v->PrepareApply(mutable_cf_options, size_being_compacted); } // Write new record to MANIFEST log @@ -1853,8 +1857,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (edit->is_column_family_add_) { // no group commit on column family add assert(batch_edits.size() == 1); - assert(options != nullptr); - CreateColumnFamily(*options, edit); + assert(new_cf_options != nullptr); + CreateColumnFamily(*new_cf_options, edit); } else if (edit->is_column_family_drop_) { assert(batch_edits.size() == 1); column_family_data->SetDropped(); @@ -2169,7 +2173,7 @@ Status VersionSet::Recover( // there were some column families in the MANIFEST that weren't specified // in the argument. This is OK in read_only mode - if (read_only == false && column_families_not_found.size() > 0) { + if (read_only == false && !column_families_not_found.empty()) { std::string list_of_not_found; for (const auto& cf : column_families_not_found) { list_of_not_found += ", " + cf.second; @@ -2198,7 +2202,7 @@ Status VersionSet::Recover( // Install recovered version std::vector size_being_compacted(v->NumberLevels() - 1); cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); - v->PrepareApply(size_being_compacted); + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); AppendVersion(cfd, v); } @@ -2374,11 +2378,13 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, current_version->files_ = new_files_list; current_version->num_levels_ = new_levels; + MutableCFOptions mutable_cf_options(*options, ImmutableCFOptions(*options)); VersionEdit ve; port::Mutex dummy_mutex; MutexLock l(&dummy_mutex); - return versions.LogAndApply(versions.GetColumnFamilySet()->GetDefault(), &ve, - &dummy_mutex, nullptr, true); + return versions.LogAndApply( + versions.GetColumnFamilySet()->GetDefault(), + mutable_cf_options, &ve, &dummy_mutex, nullptr, true); } Status VersionSet::DumpManifest(Options& options, std::string& dscname, @@ -2530,7 +2536,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, builder->SaveTo(v); std::vector size_being_compacted(v->NumberLevels() - 1); cfd->compaction_picker()->SizeBeingCompacted(size_being_compacted); - v->PrepareApply(size_being_compacted); + v->PrepareApply(*cfd->GetLatestMutableCFOptions(), size_being_compacted); delete builder; printf("--------------- Column family \"%s\" (ID %u) --------------\n", diff --git a/db/version_set.h b/db/version_set.h index 4a27a9592..05e6e9a65 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -103,14 +103,18 @@ class Version { // We use compaction scores to figure out which compaction to do next // REQUIRES: If Version is not yet saved to current_, it can be called without // a lock. Once a version is saved to current_, call only with mutex held - void ComputeCompactionScore(std::vector& size_being_compacted); + void ComputeCompactionScore( + const MutableCFOptions& mutable_cf_options, + std::vector& size_being_compacted); // Generate file_levels_ from files_ void GenerateFileLevels(); // Update scores, pre-calculated variables. It needs to be called before // applying the version to the version set. - void PrepareApply(std::vector& size_being_compacted); + void PrepareApply( + const MutableCFOptions& mutable_cf_options, + std::vector& size_being_compacted); // Reference count management (so Versions do not disappear out from // under live iterators) @@ -169,7 +173,8 @@ class Version { // Return the level at which we should place a new memtable compaction // result that covers the range [smallest_user_key,largest_user_key]. - int PickLevelForMemTableOutput(const Slice& smallest_user_key, + int PickLevelForMemTableOutput(const MutableCFOptions& mutable_cf_options, + const Slice& smallest_user_key, const Slice& largest_user_key); int NumberLevels() const { return num_levels_; } @@ -178,7 +183,7 @@ class Version { int NumLevelFiles(int level) const { return files_[level].size(); } // Return the combined file size of all files at the specified level. - int64_t NumLevelBytes(int level) const; + uint64_t NumLevelBytes(int level) const; // Return a human-readable short (single-line) summary of the number // of files per level. Uses *scratch as backing store. @@ -369,7 +374,9 @@ class VersionSet { // column_family_options has to be set if edit is column family add // REQUIRES: *mu is held on entry. // REQUIRES: no other thread concurrently calls LogAndApply() - Status LogAndApply(ColumnFamilyData* column_family_data, VersionEdit* edit, + Status LogAndApply(ColumnFamilyData* column_family_data, + const MutableCFOptions& mutable_cf_options, + VersionEdit* edit, port::Mutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index ba7451078..cb4048214 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -27,8 +27,9 @@ static std::string PrintContents(WriteBatch* b) { auto factory = std::make_shared(); Options options; options.memtable_factory = factory; - MemTable* mem = new MemTable(cmp, ImmutableCFOptions(options), - MemTableOptions(MutableCFOptions(options), options)); + ImmutableCFOptions ioptions(options); + MemTable* mem = new MemTable(cmp, ioptions, + MemTableOptions(MutableCFOptions(options, ioptions), options)); mem->Ref(); std::string state; ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 54b676626..2dd50f756 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -22,6 +22,7 @@ struct ImmutableCFOptions { CompactionStyle compaction_style; CompactionOptionsUniversal compaction_options_universal; + CompactionOptionsFIFO compaction_options_fifo; const SliceTransform* prefix_extractor; @@ -79,6 +80,8 @@ struct ImmutableCFOptions { CompressionOptions compression_opts; Options::AccessHint access_hint_on_compaction_start; + + int num_levels; }; } // namespace rocksdb diff --git a/include/rocksdb/version.h b/include/rocksdb/version.h index d6ccaeda5..285278854 100644 --- a/include/rocksdb/version.h +++ b/include/rocksdb/version.h @@ -4,9 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. #pragma once -// Also update Makefile if you change these #define ROCKSDB_MAJOR 3 -#define ROCKSDB_MINOR 5 +#define ROCKSDB_MINOR 6 #define ROCKSDB_PATCH 0 // Do not use these. We made the mistake of declaring macros starting with diff --git a/java/Makefile b/java/Makefile index dec0480dc..4ee73daf9 100644 --- a/java/Makefile +++ b/java/Makefile @@ -38,6 +38,7 @@ test: java javac org/rocksdb/test/*.java java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.WriteBatchTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.BackupableDBTest + java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.FilterTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.OptionsTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.ReadOptionsTest java -ea -Djava.library.path=.:../ -cp "$(ROCKSDB_JAR):.:./*" org.rocksdb.test.StatisticsCollectorTest diff --git a/java/RocksDBSample.java b/java/RocksDBSample.java index bd5a85076..c9a30476a 100644 --- a/java/RocksDBSample.java +++ b/java/RocksDBSample.java @@ -80,9 +80,10 @@ public class RocksDBSample { 10000, 10)); options.setRateLimiterConfig(new GenericRateLimiterConfig(10000000)); + Filter bloomFilter = new BloomFilter(10); BlockBasedTableConfig table_options = new BlockBasedTableConfig(); table_options.setBlockCacheSize(64 * SizeUnit.KB) - .setFilterBitsPerKey(10) + .setFilter(bloomFilter) .setCacheNumShardBits(6) .setBlockSizeDeviation(5) .setBlockRestartInterval(10) diff --git a/java/org/rocksdb/BlockBasedTableConfig.java b/java/org/rocksdb/BlockBasedTableConfig.java index bdb27d6c2..9a6967a95 100644 --- a/java/org/rocksdb/BlockBasedTableConfig.java +++ b/java/org/rocksdb/BlockBasedTableConfig.java @@ -18,7 +18,7 @@ public class BlockBasedTableConfig extends TableFormatConfig { blockSizeDeviation_ = 10; blockRestartInterval_ = 16; wholeKeyFiltering_ = true; - bitsPerKey_ = 10; + filter_ = null; cacheIndexAndFilterBlocks_ = false; hashIndexAllowCollision_ = true; blockCacheCompressedSize_ = 0; @@ -182,30 +182,30 @@ public class BlockBasedTableConfig extends TableFormatConfig { * * Filter instance can be re-used in multiple options instances. * - * @param Filter policy java instance. + * @param Filter Filter Policy java instance. * @return the reference to the current config. */ - public BlockBasedTableConfig setFilterBitsPerKey(int bitsPerKey) { - bitsPerKey_ = bitsPerKey; + public BlockBasedTableConfig setFilter(Filter filter) { + filter_ = filter; return this; } - + /** * Indicating if we'd put index/filter blocks to the block cache. If not specified, each "table reader" object will pre-load index/filter block during table initialization. - * + * * @return if index and filter blocks should be put in block cache. */ public boolean cacheIndexAndFilterBlocks() { return cacheIndexAndFilterBlocks_; } - + /** * Indicating if we'd put index/filter blocks to the block cache. If not specified, each "table reader" object will pre-load index/filter block during table initialization. - * + * * @param index and filter blocks should be put in block cache. * @return the reference to the current config. */ @@ -214,25 +214,25 @@ public class BlockBasedTableConfig extends TableFormatConfig { cacheIndexAndFilterBlocks_ = cacheIndexAndFilterBlocks; return this; } - + /** * Influence the behavior when kHashSearch is used. if false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision (less memory consumption) - * + * * @return if hash collisions should be allowed. */ public boolean hashIndexAllowCollision() { return hashIndexAllowCollision_; } - + /** * Influence the behavior when kHashSearch is used. if false, stores a precise prefix to block range mapping if true, does not store prefix and allows prefix hash collision (less memory consumption) - * + * * @param if hash collisions should be allowed. * @return the reference to the current config. */ @@ -241,21 +241,21 @@ public class BlockBasedTableConfig extends TableFormatConfig { hashIndexAllowCollision_ = hashIndexAllowCollision; return this; } - + /** * Size of compressed block cache. If 0, then block_cache_compressed is set * to null. - * + * * @return size of compressed block cache. */ public long blockCacheCompressedSize() { return blockCacheCompressedSize_; } - + /** * Size of compressed block cache. If 0, then block_cache_compressed is set * to null. - * + * * @param size of compressed block cache. * @return the reference to the current config. */ @@ -264,7 +264,7 @@ public class BlockBasedTableConfig extends TableFormatConfig { blockCacheCompressedSize_ = blockCacheCompressedSize; return this; } - + /** * Controls the number of shards for the block compressed cache. * This is applied only if blockCompressedCacheSize is set to non-negative. @@ -276,7 +276,7 @@ public class BlockBasedTableConfig extends TableFormatConfig { public int blockCacheCompressedNumShardBits() { return blockCacheCompressedNumShardBits_; } - + /** * Controls the number of shards for the block compressed cache. * This is applied only if blockCompressedCacheSize is set to non-negative. @@ -293,17 +293,23 @@ public class BlockBasedTableConfig extends TableFormatConfig { } @Override protected long newTableFactoryHandle() { + long filterHandle = 0; + if (filter_ != null) { + filterHandle = filter_.nativeHandle_; + } + return newTableFactoryHandle(noBlockCache_, blockCacheSize_, blockCacheNumShardBits_, blockSize_, blockSizeDeviation_, - blockRestartInterval_, wholeKeyFiltering_, bitsPerKey_, - cacheIndexAndFilterBlocks_, hashIndexAllowCollision_, - blockCacheCompressedSize_, blockCacheCompressedNumShardBits_); + blockRestartInterval_, wholeKeyFiltering_, + filterHandle, cacheIndexAndFilterBlocks_, + hashIndexAllowCollision_, blockCacheCompressedSize_, + blockCacheCompressedNumShardBits_); } private native long newTableFactoryHandle( boolean noBlockCache, long blockCacheSize, int blockCacheNumShardBits, long blockSize, int blockSizeDeviation, int blockRestartInterval, - boolean wholeKeyFiltering, int bitsPerKey, + boolean wholeKeyFiltering, long filterPolicyHandle, boolean cacheIndexAndFilterBlocks, boolean hashIndexAllowCollision, long blockCacheCompressedSize, int blockCacheCompressedNumShardBits); @@ -315,7 +321,7 @@ public class BlockBasedTableConfig extends TableFormatConfig { private int blockSizeDeviation_; private int blockRestartInterval_; private boolean wholeKeyFiltering_; - private int bitsPerKey_; + private Filter filter_; private boolean cacheIndexAndFilterBlocks_; private boolean hashIndexAllowCollision_; private long blockCacheCompressedSize_; diff --git a/java/org/rocksdb/test/FilterTest.java b/java/org/rocksdb/test/FilterTest.java new file mode 100644 index 000000000..7475d2c34 --- /dev/null +++ b/java/org/rocksdb/test/FilterTest.java @@ -0,0 +1,27 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +package org.rocksdb.test; + +import org.rocksdb.*; + +public class FilterTest { + static { + RocksDB.loadLibrary(); + } + public static void main(String[] args) { + Options options = new Options(); + // test table config without filter + BlockBasedTableConfig blockConfig = new BlockBasedTableConfig(); + options.setTableFormatConfig(blockConfig); + options.dispose(); + // new Bloom filter + options = new Options(); + blockConfig = new BlockBasedTableConfig(); + blockConfig.setFilter(new BloomFilter()); + options.setTableFormatConfig(blockConfig); + System.out.println("Filter test passed"); + } +} diff --git a/java/rocksjni/table.cc b/java/rocksjni/table.cc index 500cb255e..846526292 100644 --- a/java/rocksjni/table.cc +++ b/java/rocksjni/table.cc @@ -37,7 +37,7 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( JNIEnv* env, jobject jobj, jboolean no_block_cache, jlong block_cache_size, jint block_cache_num_shardbits, jlong block_size, jint block_size_deviation, jint block_restart_interval, jboolean whole_key_filtering, - jint bits_per_key, jboolean cache_index_and_filter_blocks, + jlong jfilterPolicy, jboolean cache_index_and_filter_blocks, jboolean hash_index_allow_collision, jlong block_cache_compressed_size, jint block_cache_compressd_num_shard_bits) { rocksdb::BlockBasedTableOptions options; @@ -55,8 +55,9 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( options.block_size_deviation = block_size_deviation; options.block_restart_interval = block_restart_interval; options.whole_key_filtering = whole_key_filtering; - if (bits_per_key > 0) { - options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(bits_per_key)); + if (jfilterPolicy > 0) { + options.filter_policy.reset( + reinterpret_cast(jfilterPolicy)); } options.cache_index_and_filter_blocks = cache_index_and_filter_blocks; options.hash_index_allow_collision = hash_index_allow_collision; @@ -69,6 +70,6 @@ jlong Java_org_rocksdb_BlockBasedTableConfig_newTableFactoryHandle( options.block_cache = rocksdb::NewLRUCache(block_cache_compressed_size); } } - + return reinterpret_cast(rocksdb::NewBlockBasedTableFactory(options)); } diff --git a/java/rocksjni/write_batch.cc b/java/rocksjni/write_batch.cc index 0492ea1be..46e7a6fa0 100644 --- a/java/rocksjni/write_batch.cc +++ b/java/rocksjni/write_batch.cc @@ -206,7 +206,8 @@ jbyteArray Java_org_rocksdb_WriteBatchTest_getContents( options.memtable_factory = factory; rocksdb::MemTable* mem = new rocksdb::MemTable( cmp, rocksdb::ImmutableCFOptions(options), - rocksdb::MemTableOptions(rocksdb::MutableCFOptions(options), options)); + rocksdb::MemTableOptions(rocksdb::MutableCFOptions(options, + rocksdb::ImmutableCFOptions(options)), options)); mem->Ref(); std::string state; rocksdb::ColumnFamilyMemTablesDefault cf_mems_default(mem, &options); diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 2f373fff1..9e4328cd4 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -721,7 +721,6 @@ Status BlockBasedTableBuilder::Finish() { // Write properties block. { PropertyBlockBuilder property_block_builder; - std::vector failed_user_prop_collectors; r->props.filter_policy_name = r->table_options.filter_policy != nullptr ? r->table_options.filter_policy->Name() : ""; r->props.index_size = diff --git a/table/bloom_block.cc b/table/bloom_block.cc index c44ab66ca..cfea8a2c5 100644 --- a/table/bloom_block.cc +++ b/table/bloom_block.cc @@ -11,7 +11,7 @@ namespace rocksdb { -void BloomBlockBuilder::AddKeysHashes(const std::vector keys_hashes) { +void BloomBlockBuilder::AddKeysHashes(const std::vector& keys_hashes) { for (auto hash : keys_hashes) { bloom_.AddHash(hash); } diff --git a/table/bloom_block.h b/table/bloom_block.h index d55453eda..7ef5d14b6 100644 --- a/table/bloom_block.h +++ b/table/bloom_block.h @@ -26,7 +26,7 @@ class BloomBlockBuilder { uint32_t GetNumBlocks() const { return bloom_.GetNumBlocks(); } - void AddKeysHashes(const std::vector keys_hashes); + void AddKeysHashes(const std::vector& keys_hashes); Slice Finish(); diff --git a/table/cuckoo_table_reader.cc b/table/cuckoo_table_reader.cc index f8da4e288..c0ca38bb7 100644 --- a/table/cuckoo_table_reader.cc +++ b/table/cuckoo_table_reader.cc @@ -191,9 +191,9 @@ class CuckooTableIterator : public Iterator { private: struct BucketComparator { - BucketComparator(const Slice file_data, const Comparator* ucomp, + BucketComparator(const Slice& file_data, const Comparator* ucomp, uint32_t bucket_len, uint32_t user_key_len, - const Slice target = Slice()) + const Slice& target = Slice()) : file_data_(file_data), ucomp_(ucomp), bucket_len_(bucket_len), diff --git a/table/format.cc b/table/format.cc index db11f9d4a..768e00165 100644 --- a/table/format.cc +++ b/table/format.cc @@ -334,9 +334,9 @@ Status UncompressBlockContents(const char* data, size_t n, case kZlibCompression: ubuf = std::unique_ptr( port::Zlib_Uncompress(data, n, &decompress_size)); - static char zlib_corrupt_msg[] = - "Zlib not supported or corrupted Zlib compressed block contents"; if (!ubuf) { + static char zlib_corrupt_msg[] = + "Zlib not supported or corrupted Zlib compressed block contents"; return Status::Corruption(zlib_corrupt_msg); } *contents = @@ -345,9 +345,9 @@ Status UncompressBlockContents(const char* data, size_t n, case kBZip2Compression: ubuf = std::unique_ptr( port::BZip2_Uncompress(data, n, &decompress_size)); - static char bzip2_corrupt_msg[] = - "Bzip2 not supported or corrupted Bzip2 compressed block contents"; if (!ubuf) { + static char bzip2_corrupt_msg[] = + "Bzip2 not supported or corrupted Bzip2 compressed block contents"; return Status::Corruption(bzip2_corrupt_msg); } *contents = @@ -356,9 +356,9 @@ Status UncompressBlockContents(const char* data, size_t n, case kLZ4Compression: ubuf = std::unique_ptr( port::LZ4_Uncompress(data, n, &decompress_size)); - static char lz4_corrupt_msg[] = - "LZ4 not supported or corrupted LZ4 compressed block contents"; if (!ubuf) { + static char lz4_corrupt_msg[] = + "LZ4 not supported or corrupted LZ4 compressed block contents"; return Status::Corruption(lz4_corrupt_msg); } *contents = @@ -367,9 +367,9 @@ Status UncompressBlockContents(const char* data, size_t n, case kLZ4HCCompression: ubuf = std::unique_ptr( port::LZ4_Uncompress(data, n, &decompress_size)); - static char lz4hc_corrupt_msg[] = - "LZ4HC not supported or corrupted LZ4HC compressed block contents"; if (!ubuf) { + static char lz4hc_corrupt_msg[] = + "LZ4HC not supported or corrupted LZ4HC compressed block contents"; return Status::Corruption(lz4hc_corrupt_msg); } *contents = diff --git a/table/plain_table_factory.cc b/table/plain_table_factory.cc index de23cc902..fae0d8018 100644 --- a/table/plain_table_factory.cc +++ b/table/plain_table_factory.cc @@ -52,10 +52,10 @@ std::string PlainTableFactory::GetPrintableTableOptions() const { snprintf(buffer, kBufferSize, " hash_table_ratio: %lf\n", hash_table_ratio_); ret.append(buffer); - snprintf(buffer, kBufferSize, " index_sparseness: %zd\n", + snprintf(buffer, kBufferSize, " index_sparseness: %zu\n", index_sparseness_); ret.append(buffer); - snprintf(buffer, kBufferSize, " huge_page_tlb_size: %zd\n", + snprintf(buffer, kBufferSize, " huge_page_tlb_size: %zu\n", huge_page_tlb_size_); ret.append(buffer); snprintf(buffer, kBufferSize, " encoding_type: %d\n", diff --git a/table/table_test.cc b/table/table_test.cc index 1b032db53..e4657e8cd 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -437,8 +437,9 @@ class MemTableConstructor: public Constructor { table_factory_(new SkipListFactory) { Options options; options.memtable_factory = table_factory_; - memtable_ = new MemTable(internal_comparator_, ImmutableCFOptions(options), - MemTableOptions(MutableCFOptions(options), options)); + ImmutableCFOptions ioptions(options); + memtable_ = new MemTable(internal_comparator_, ioptions, + MemTableOptions(MutableCFOptions(options, ioptions), options)); memtable_->Ref(); } ~MemTableConstructor() { @@ -452,8 +453,9 @@ class MemTableConstructor: public Constructor { delete memtable_->Unref(); Options options; options.memtable_factory = table_factory_; - memtable_ = new MemTable(internal_comparator_, ImmutableCFOptions(options), - MemTableOptions(MutableCFOptions(options), options)); + ImmutableCFOptions mem_ioptions(options); + memtable_ = new MemTable(internal_comparator_, mem_ioptions, + MemTableOptions(MutableCFOptions(options, mem_ioptions), options)); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -1216,7 +1218,7 @@ static std::string RandomString(Random* rnd, int len) { return r; } -void AddInternalKey(TableConstructor* c, const std::string prefix, +void AddInternalKey(TableConstructor* c, const std::string& prefix, int suffix_len = 800) { static Random rnd(1023); InternalKey k(prefix + RandomString(&rnd, 800), 0, kTypeValue); @@ -1864,8 +1866,9 @@ TEST(MemTableTest, Simple) { auto table_factory = std::make_shared(); Options options; options.memtable_factory = table_factory; - MemTable* memtable = new MemTable(cmp, ImmutableCFOptions(options), - MemTableOptions(MutableCFOptions(options), options)); + ImmutableCFOptions ioptions(options); + MemTable* memtable = new MemTable(cmp, ioptions, + MemTableOptions(MutableCFOptions(options, ioptions), options)); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/util/cache_test.cc b/util/cache_test.cc index c12cdb7e1..74109ff0c 100644 --- a/util/cache_test.cc +++ b/util/cache_test.cc @@ -386,7 +386,7 @@ class Value { namespace { void deleter(const Slice& key, void* value) { - delete (Value *)value; + delete static_cast(value); } } // namespace diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 9f00757b8..8eda39bf9 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -325,7 +325,7 @@ bool LDBCommand::ParseKeyValue(const string& line, string* key, string* value, bool LDBCommand::ValidateCmdLineOptions() { for (map::const_iterator itr = option_map_.begin(); - itr != option_map_.end(); itr++) { + itr != option_map_.end(); ++itr) { if (find(valid_cmd_line_options_.begin(), valid_cmd_line_options_.end(), itr->first) == valid_cmd_line_options_.end()) { @@ -335,7 +335,7 @@ bool LDBCommand::ValidateCmdLineOptions() { } for (vector::const_iterator itr = flags_.begin(); - itr != flags_.end(); itr++) { + itr != flags_.end(); ++itr) { if (find(valid_cmd_line_options_.begin(), valid_cmd_line_options_.end(), *itr) == valid_cmd_line_options_.end()) { @@ -1538,7 +1538,7 @@ void BatchPutCommand::DoCommand() { WriteBatch batch; for (vector>::const_iterator itr - = key_values_.begin(); itr != key_values_.end(); itr++) { + = key_values_.begin(); itr != key_values_.end(); ++itr) { batch.Put(itr->first, itr->second); } Status st = db_->Write(WriteOptions(), &batch); diff --git a/util/ldb_cmd_execute_result.h b/util/ldb_cmd_execute_result.h index b9121b2b0..b8e6c4634 100644 --- a/util/ldb_cmd_execute_result.h +++ b/util/ldb_cmd_execute_result.h @@ -13,15 +13,10 @@ public: EXEC_NOT_STARTED = 0, EXEC_SUCCEED = 1, EXEC_FAILED = 2, }; - LDBCommandExecuteResult() { - state_ = EXEC_NOT_STARTED; - message_ = ""; - } + LDBCommandExecuteResult() : state_(EXEC_NOT_STARTED), message_("") {} - LDBCommandExecuteResult(State state, std::string& msg) { - state_ = state; - message_ = msg; - } + LDBCommandExecuteResult(State state, std::string& msg) : + state_(state), message_(msg) {} std::string ToString() { std::string ret; diff --git a/util/mutable_cf_options.cc b/util/mutable_cf_options.cc new file mode 100644 index 000000000..1c710c656 --- /dev/null +++ b/util/mutable_cf_options.cc @@ -0,0 +1,72 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include "rocksdb/options.h" +#include "rocksdb/immutable_options.h" +#include "util/mutable_cf_options.h" + +namespace rocksdb { + +namespace { +// Multiple two operands. If they overflow, return op1. +uint64_t MultiplyCheckOverflow(uint64_t op1, int op2) { + if (op1 == 0) { + return 0; + } + if (op2 <= 0) { + return op1; + } + uint64_t casted_op2 = (uint64_t) op2; + if (std::numeric_limits::max() / op1 < casted_op2) { + return op1; + } + return op1 * casted_op2; +} +} // anonymous namespace + +void MutableCFOptions::RefreshDerivedOptions( + const ImmutableCFOptions& ioptions) { + max_file_size.resize(ioptions.num_levels); + level_max_bytes.resize(ioptions.num_levels); + for (int i = 0; i < ioptions.num_levels; ++i) { + if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) { + max_file_size[i] = ULLONG_MAX; + level_max_bytes[i] = max_bytes_for_level_base; + } else if (i > 1) { + max_file_size[i] = MultiplyCheckOverflow(max_file_size[i - 1], + target_file_size_multiplier); + level_max_bytes[i] = MultiplyCheckOverflow( + MultiplyCheckOverflow(level_max_bytes[i - 1], + max_bytes_for_level_multiplier), + max_bytes_for_level_multiplier_additional[i - 1]); + } else { + max_file_size[i] = target_file_size_base; + level_max_bytes[i] = max_bytes_for_level_base; + } + } +} + +uint64_t MutableCFOptions::MaxFileSizeForLevel(int level) const { + assert(level >= 0); + assert(level < (int)max_file_size.size()); + return max_file_size[level]; +} +uint64_t MutableCFOptions::MaxBytesForLevel(int level) const { + // Note: the result for level zero is not really used since we set + // the level-0 compaction threshold based on number of files. + assert(level >= 0); + assert(level < (int)level_max_bytes.size()); + return level_max_bytes[level]; +} +uint64_t MutableCFOptions::MaxGrandParentOverlapBytes(int level) const { + return MaxFileSizeForLevel(level) * max_grandparent_overlap_factor; +} +uint64_t MutableCFOptions::ExpandedCompactionByteSizeLimit(int level) const { + return MaxFileSizeForLevel(level) * expanded_compaction_factor; +} + +} // namespace rocksdb diff --git a/util/mutable_cf_options.h b/util/mutable_cf_options.h index 39ebe2d85..02f63fed4 100644 --- a/util/mutable_cf_options.h +++ b/util/mutable_cf_options.h @@ -5,12 +5,14 @@ #pragma once +#include #include "rocksdb/options.h" +#include "rocksdb/immutable_options.h" namespace rocksdb { struct MutableCFOptions { - explicit MutableCFOptions(const Options& options) + MutableCFOptions(const Options& options, const ImmutableCFOptions& ioptions) : write_buffer_size(options.write_buffer_size), arena_block_size(options.arena_block_size), memtable_prefix_bloom_bits(options.memtable_prefix_bloom_bits), @@ -18,7 +20,22 @@ struct MutableCFOptions { memtable_prefix_bloom_huge_page_tlb_size( options.memtable_prefix_bloom_huge_page_tlb_size), max_successive_merges(options.max_successive_merges), - filter_deletes(options.filter_deletes) { + filter_deletes(options.filter_deletes), + level0_file_num_compaction_trigger( + options.level0_file_num_compaction_trigger), + level0_slowdown_writes_trigger(options.level0_slowdown_writes_trigger), + level0_stop_writes_trigger(options.level0_stop_writes_trigger), + max_grandparent_overlap_factor(options.max_grandparent_overlap_factor), + expanded_compaction_factor(options.expanded_compaction_factor), + source_compaction_factor(options.source_compaction_factor), + target_file_size_base(options.target_file_size_base), + target_file_size_multiplier(options.target_file_size_multiplier), + max_bytes_for_level_base(options.max_bytes_for_level_base), + max_bytes_for_level_multiplier(options.max_bytes_for_level_multiplier), + max_bytes_for_level_multiplier_additional( + options.max_bytes_for_level_multiplier_additional) + { + RefreshDerivedOptions(ioptions); } MutableCFOptions() : write_buffer_size(0), @@ -27,8 +44,33 @@ struct MutableCFOptions { memtable_prefix_bloom_probes(0), memtable_prefix_bloom_huge_page_tlb_size(0), max_successive_merges(0), - filter_deletes(false) {} + filter_deletes(false), + level0_file_num_compaction_trigger(0), + level0_slowdown_writes_trigger(0), + level0_stop_writes_trigger(0), + max_grandparent_overlap_factor(0), + expanded_compaction_factor(0), + source_compaction_factor(0), + target_file_size_base(0), + target_file_size_multiplier(0), + max_bytes_for_level_base(0), + max_bytes_for_level_multiplier(0) + {} + // Must be called after any change to MutableCFOptions + void RefreshDerivedOptions(const ImmutableCFOptions& ioptions); + + // Get the max file size in a given level. + uint64_t MaxFileSizeForLevel(int level) const; + // Returns maximum total bytes of data on a given level. + uint64_t MaxBytesForLevel(int level) const; + // Returns maximum total overlap bytes with grandparent + // level (i.e., level+2) before we stop building a single + // file in level->level+1 compaction. + uint64_t MaxGrandParentOverlapBytes(int level) const; + uint64_t ExpandedCompactionByteSizeLimit(int level) const; + + // Memtable related options size_t write_buffer_size; size_t arena_block_size; uint32_t memtable_prefix_bloom_bits; @@ -36,6 +78,25 @@ struct MutableCFOptions { size_t memtable_prefix_bloom_huge_page_tlb_size; size_t max_successive_merges; bool filter_deletes; + + // Compaction related options + int level0_file_num_compaction_trigger; + int level0_slowdown_writes_trigger; + int level0_stop_writes_trigger; + int max_grandparent_overlap_factor; + int expanded_compaction_factor; + int source_compaction_factor; + int target_file_size_base; + int target_file_size_multiplier; + uint64_t max_bytes_for_level_base; + int max_bytes_for_level_multiplier; + std::vector max_bytes_for_level_multiplier_additional; + + // Derived options + // Per-level target file size. + std::vector max_file_size; + // Per-level max bytes + std::vector level_max_bytes; }; } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index 8716b465d..b5dc98317 100644 --- a/util/options.cc +++ b/util/options.cc @@ -35,6 +35,7 @@ namespace rocksdb { ImmutableCFOptions::ImmutableCFOptions(const Options& options) : compaction_style(options.compaction_style), compaction_options_universal(options.compaction_options_universal), + compaction_options_fifo(options.compaction_options_fifo), prefix_extractor(options.prefix_extractor.get()), comparator(options.comparator), merge_operator(options.merge_operator.get()), @@ -60,7 +61,8 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) compression(options.compression), compression_per_level(options.compression_per_level), compression_opts(options.compression_opts), - access_hint_on_compaction_start(options.access_hint_on_compaction_start) {} + access_hint_on_compaction_start(options.access_hint_on_compaction_start), + num_levels(options.num_levels) {} ColumnFamilyOptions::ColumnFamilyOptions() : comparator(BytewiseComparator()), diff --git a/util/options_helper.cc b/util/options_helper.cc index 35c3f63df..2a61c8b69 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include +#include #include "rocksdb/options.h" #include "util/options_helper.h" @@ -73,8 +74,8 @@ CompactionStyle ParseCompactionStyle(const std::string& type) { } // anonymouse namespace template -bool ParseMemtableOption(const std::string& name, const std::string& value, - OptionsType* new_options) { +bool ParseMemtableOptions(const std::string& name, const std::string& value, + OptionsType* new_options) { if (name == "write_buffer_size") { new_options->write_buffer_size = ParseInt64(value); } else if (name == "arena_block_size") { @@ -96,6 +97,50 @@ bool ParseMemtableOption(const std::string& name, const std::string& value, return true; } +template +bool ParseCompactionOptions(const std::string& name, const std::string& value, + OptionsType* new_options) { + if (name == "level0_file_num_compaction_trigger") { + new_options->level0_file_num_compaction_trigger = ParseInt(value); + } else if (name == "level0_slowdown_writes_trigger") { + new_options->level0_slowdown_writes_trigger = ParseInt(value); + } else if (name == "level0_stop_writes_trigger") { + new_options->level0_stop_writes_trigger = ParseInt(value); + } else if (name == "max_grandparent_overlap_factor") { + new_options->max_grandparent_overlap_factor = ParseInt(value); + } else if (name == "expanded_compaction_factor") { + new_options->expanded_compaction_factor = ParseInt(value); + } else if (name == "source_compaction_factor") { + new_options->source_compaction_factor = ParseInt(value); + } else if (name == "target_file_size_base") { + new_options->target_file_size_base = ParseInt(value); + } else if (name == "target_file_size_multiplier") { + new_options->target_file_size_multiplier = ParseInt(value); + } else if (name == "max_bytes_for_level_base") { + new_options->max_bytes_for_level_base = ParseUint64(value); + } else if (name == "max_bytes_for_level_multiplier") { + new_options->max_bytes_for_level_multiplier = ParseInt(value); + } else if (name == "max_bytes_for_level_multiplier_additional") { + new_options->max_bytes_for_level_multiplier_additional.clear(); + size_t start = 0; + while (true) { + size_t end = value.find_first_of(':', start); + if (end == std::string::npos) { + new_options->max_bytes_for_level_multiplier_additional.push_back( + ParseInt(value.substr(start))); + break; + } else { + new_options->max_bytes_for_level_multiplier_additional.push_back( + ParseInt(value.substr(start, end - start))); + start = end + 1; + } + } + } else { + return false; + } + return true; +} + bool GetMutableOptionsFromStrings( const MutableCFOptions& base_options, const std::unordered_map& options_map, @@ -104,7 +149,8 @@ bool GetMutableOptionsFromStrings( *new_options = base_options; try { for (const auto& o : options_map) { - if (ParseMemtableOption(o.first, o.second, new_options)) { + if (ParseMemtableOptions(o.first, o.second, new_options)) { + } else if (ParseCompactionOptions(o.first, o.second, new_options)) { } else { return false; } @@ -123,7 +169,8 @@ bool GetOptionsFromStrings( *new_options = base_options; for (const auto& o : options_map) { try { - if (ParseMemtableOption(o.first, o.second, new_options)) { + if (ParseMemtableOptions(o.first, o.second, new_options)) { + } else if (ParseCompactionOptions(o.first, o.second, new_options)) { } else if (o.first == "max_write_buffer_number") { new_options->max_write_buffer_number = ParseInt(o.second); } else if (o.first == "min_write_buffer_number_to_merge") { @@ -168,43 +215,8 @@ bool GetOptionsFromStrings( ParseInt(o.second.substr(start, o.second.size() - start)); } else if (o.first == "num_levels") { new_options->num_levels = ParseInt(o.second); - } else if (o.first == "level0_file_num_compaction_trigger") { - new_options->level0_file_num_compaction_trigger = ParseInt(o.second); - } else if (o.first == "level0_slowdown_writes_trigger") { - new_options->level0_slowdown_writes_trigger = ParseInt(o.second); - } else if (o.first == "level0_stop_writes_trigger") { - new_options->level0_stop_writes_trigger = ParseInt(o.second); } else if (o.first == "max_mem_compaction_level") { new_options->max_mem_compaction_level = ParseInt(o.second); - } else if (o.first == "target_file_size_base") { - new_options->target_file_size_base = ParseUint64(o.second); - } else if (o.first == "target_file_size_multiplier") { - new_options->target_file_size_multiplier = ParseInt(o.second); - } else if (o.first == "max_bytes_for_level_base") { - new_options->max_bytes_for_level_base = ParseUint64(o.second); - } else if (o.first == "max_bytes_for_level_multiplier") { - new_options->max_bytes_for_level_multiplier = ParseInt(o.second); - } else if (o.first == "max_bytes_for_level_multiplier_additional") { - new_options->max_bytes_for_level_multiplier_additional.clear(); - size_t start = 0; - while (true) { - size_t end = o.second.find_first_of(':', start); - if (end == std::string::npos) { - new_options->max_bytes_for_level_multiplier_additional.push_back( - ParseInt(o.second.substr(start))); - break; - } else { - new_options->max_bytes_for_level_multiplier_additional.push_back( - ParseInt(o.second.substr(start, end - start))); - start = end + 1; - } - } - } else if (o.first == "expanded_compaction_factor") { - new_options->expanded_compaction_factor = ParseInt(o.second); - } else if (o.first == "source_compaction_factor") { - new_options->source_compaction_factor = ParseInt(o.second); - } else if (o.first == "max_grandparent_overlap_factor") { - new_options->max_grandparent_overlap_factor = ParseInt(o.second); } else if (o.first == "soft_rate_limit") { new_options->soft_rate_limit = ParseDouble(o.second); } else if (o.first == "hard_rate_limit") { diff --git a/util/signal_test.cc b/util/signal_test.cc index f51fa548e..b23ad6a98 100644 --- a/util/signal_test.cc +++ b/util/signal_test.cc @@ -9,6 +9,7 @@ namespace { void f0() { char *p = nullptr; + // cppcheck-suppress nullPointer *p = 10; /* SIGSEGV here!! */ } diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index a585d1a9c..281837773 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -228,7 +228,7 @@ class FileManager : public EnvWrapper { public: explicit FileManager(Env* t) : EnvWrapper(t), rnd_(5) {} - Status DeleteRandomFileInDir(const std::string dir) { + Status DeleteRandomFileInDir(const std::string& dir) { std::vector children; GetChildren(dir, &children); if (children.size() <= 2) { // . and .. diff --git a/utilities/document/document_db.cc b/utilities/document/document_db.cc index 8e15a52ca..b19618533 100644 --- a/utilities/document/document_db.cc +++ b/utilities/document/document_db.cc @@ -33,7 +33,7 @@ namespace { // > 0 <=> lhs == rhs // TODO(icanadi) move this to JSONDocument? int DocumentCompare(const JSONDocument& lhs, const JSONDocument& rhs) { - assert(rhs.IsObject() == false && rhs.IsObject() == false && + assert(lhs.IsObject() == false && rhs.IsObject() == false && lhs.type() == rhs.type()); switch (lhs.type()) { @@ -376,7 +376,7 @@ class IndexKey { class SimpleSortedIndex : public Index { public: - SimpleSortedIndex(const std::string field, const std::string& name) + SimpleSortedIndex(const std::string& field, const std::string& name) : field_(field), name_(name) {} virtual const char* Name() const override { return name_.c_str(); } @@ -407,7 +407,6 @@ class SimpleSortedIndex : public Index { assert(interval != nullptr); // because index is useful Direction direction; - std::string op; const JSONDocument* limit; if (interval->lower_bound != nullptr) { limit = interval->lower_bound; diff --git a/utilities/document/document_db_test.cc b/utilities/document/document_db_test.cc index d4c632cce..5b36a2060 100644 --- a/utilities/document/document_db_test.cc +++ b/utilities/document/document_db_test.cc @@ -56,7 +56,7 @@ class DocumentDBTest { } } - JSONDocument* Parse(const std::string doc) { + JSONDocument* Parse(const std::string& doc) { return JSONDocument::ParseJSON(ConvertQuotes(doc).c_str()); } diff --git a/utilities/spatialdb/spatial_db.cc b/utilities/spatialdb/spatial_db.cc index 9c44027c8..6fbb780bc 100644 --- a/utilities/spatialdb/spatial_db.cc +++ b/utilities/spatialdb/spatial_db.cc @@ -369,7 +369,7 @@ class SpatialIndexCursor : public Cursor { } delete spatial_iterator; - valid_ = valid_ && primary_key_ids_.size() > 0; + valid_ = valid_ && !primary_key_ids_.empty(); if (valid_) { primary_keys_iterator_ = primary_key_ids_.begin(); diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index 84fb55568..92b8eab7f 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -206,7 +206,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory { class TtlMergeOperator : public MergeOperator { public: - explicit TtlMergeOperator(const std::shared_ptr merge_op, + explicit TtlMergeOperator(const std::shared_ptr& merge_op, Env* env) : user_merge_op_(merge_op), env_(env) { assert(merge_op); diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index e6d64e54e..d1c1235c3 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -120,7 +120,7 @@ class TtlTest { static FlushOptions flush_opts; WriteBatch batch; kv_it_ = kvmap_.begin(); - for (int i = 0; i < num_ops && kv_it_ != kvmap_.end(); i++, kv_it_++) { + for (int i = 0; i < num_ops && kv_it_ != kvmap_.end(); i++, ++kv_it_) { switch (batch_ops[i]) { case PUT: batch.Put(kv_it_->first, kv_it_->second); @@ -145,7 +145,7 @@ class TtlTest { static FlushOptions flush_opts; kv_it_ = kvmap_.begin(); advance(kv_it_, start_pos_map); - for (int i = 0; kv_it_ != kvmap_.end() && i < num_entries; i++, kv_it_++) { + for (int i = 0; kv_it_ != kvmap_.end() && i < num_entries; i++, ++kv_it_) { ASSERT_OK(cf == nullptr ? db_ttl_->Put(wopts, kv_it_->first, kv_it_->second) : db_ttl_->Put(wopts, cf, kv_it_->first, kv_it_->second)); @@ -207,7 +207,7 @@ class TtlTest { kv_it_ = kvmap_.begin(); advance(kv_it_, st_pos); std::string v; - for (int i = 0; kv_it_ != kvmap_.end() && i < span; i++, kv_it_++) { + for (int i = 0; kv_it_ != kvmap_.end() && i < span; i++, ++kv_it_) { Status s = (cf == nullptr) ? db_ttl_->Get(ropts, kv_it_->first, &v) : db_ttl_->Get(ropts, cf, kv_it_->first, &v); if (s.ok() != check) { @@ -252,7 +252,7 @@ class TtlTest { } else { // dbiter should have found out kvmap_[st_pos] for (int i = st_pos; kv_it_ != kvmap_.end() && i < st_pos + span; - i++, kv_it_++) { + i++, ++kv_it_) { ASSERT_TRUE(dbiter->Valid()); ASSERT_EQ(dbiter->value().compare(kv_it_->second), 0); dbiter->Next(); @@ -263,7 +263,7 @@ class TtlTest { class TestFilter : public CompactionFilter { public: - TestFilter(const int64_t kSampleSize, const std::string kNewValue) + TestFilter(const int64_t kSampleSize, const std::string& kNewValue) : kSampleSize_(kSampleSize), kNewValue_(kNewValue) { } @@ -311,7 +311,7 @@ class TtlTest { class TestFilterFactory : public CompactionFilterFactory { public: - TestFilterFactory(const int64_t kSampleSize, const std::string kNewValue) + TestFilterFactory(const int64_t kSampleSize, const std::string& kNewValue) : kSampleSize_(kSampleSize), kNewValue_(kNewValue) { }