diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 73071a75a..9766efb91 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -552,6 +552,91 @@ TEST_F(CompactionPickerTest, OverlappingUserKeys3) { ASSERT_EQ(7U, compaction->input(1, 1)->fd.GetNumber()); } +TEST_F(CompactionPickerTest, EstimateCompactionBytesNeeded1) { + int num_levels = ioptions_.num_levels; + ioptions_.level_compaction_dynamic_level_bytes = false; + mutable_cf_options_.level0_file_num_compaction_trigger = 3; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + NewVersionStorage(num_levels, kCompactionStyleLevel); + Add(0, 1U, "150", "200", 200); + Add(0, 2U, "150", "200", 200); + Add(0, 3U, "150", "200", 200); + // Level 1 is over target by 200 + Add(1, 4U, "400", "500", 600); + Add(1, 5U, "600", "700", 600); + // Level 2 is less than target 10000 even added size of level 1 + Add(2, 6U, "150", "200", 2500); + Add(2, 7U, "201", "210", 2000); + Add(2, 8U, "300", "310", 2500); + Add(2, 9U, "400", "500", 2500); + // Level 3 exceeds target 100,000 of 1000 + Add(3, 10U, "400", "500", 101000); + // Level 4 exceeds target 1,000,000 of 500 after adding size from level 3 + Add(4, 11U, "400", "500", 999500); + Add(5, 11U, "400", "500", 8000000); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(2200u + 11000u + 5500u, + vstorage_->estimated_compaction_needed_bytes()); +} + +TEST_F(CompactionPickerTest, EstimateCompactionBytesNeeded2) { + int num_levels = ioptions_.num_levels; + ioptions_.level_compaction_dynamic_level_bytes = false; + mutable_cf_options_.level0_file_num_compaction_trigger = 3; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + NewVersionStorage(num_levels, kCompactionStyleLevel); + Add(0, 1U, "150", "200", 200); + Add(0, 2U, "150", "200", 200); + Add(0, 4U, "150", "200", 200); + Add(0, 5U, "150", "200", 200); + Add(0, 6U, "150", "200", 200); + // Level 1 is over target by + Add(1, 7U, "400", "500", 200); + Add(1, 8U, "600", "700", 200); + // Level 2 is less than target 10000 even added size of level 1 + Add(2, 9U, "150", "200", 9500); + Add(3, 10U, "400", "500", 101000); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(1400u + 4400u + 11000u, + vstorage_->estimated_compaction_needed_bytes()); +} + +TEST_F(CompactionPickerTest, EstimateCompactionBytesNeededDynamicLevel) { + int num_levels = ioptions_.num_levels; + ioptions_.level_compaction_dynamic_level_bytes = true; + mutable_cf_options_.level0_file_num_compaction_trigger = 3; + mutable_cf_options_.max_bytes_for_level_base = 1000; + mutable_cf_options_.max_bytes_for_level_multiplier = 10; + NewVersionStorage(num_levels, kCompactionStyleLevel); + + // Set Last level size 50000 + // num_levels - 1 target 5000 + // num_levels - 2 is base level with taret 500 + Add(num_levels - 1, 10U, "400", "500", 50000); + + Add(0, 1U, "150", "200", 200); + Add(0, 2U, "150", "200", 200); + Add(0, 4U, "150", "200", 200); + Add(0, 5U, "150", "200", 200); + Add(0, 6U, "150", "200", 200); + // num_levels - 3 is over target by 100 + 1000 + Add(num_levels - 3, 7U, "400", "500", 300); + Add(num_levels - 3, 8U, "600", "700", 300); + // Level 2 is over target by 1100 + 100 + Add(num_levels - 2, 9U, "150", "200", 5100); + + UpdateVersionStorageInfo(); + + ASSERT_EQ(1600u + 12100u + 13200u, + vstorage_->estimated_compaction_needed_bytes()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_test.cc b/db/db_test.cc index 9412a78b0..3c8db7d69 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2287,6 +2287,58 @@ TEST_F(DBTest, ApproximateMemoryUsage) { ASSERT_EQ(unflushed_mem, all_mem); } +TEST_F(DBTest, EstimatePendingCompBytes) { + // Set sizes to both background thread pool to be 1 and block them. + env_->SetBackgroundThreads(1, Env::HIGH); + env_->SetBackgroundThreads(1, Env::LOW); + SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + + Options options = CurrentOptions(); + WriteOptions writeOpt = WriteOptions(); + writeOpt.disableWAL = true; + options.compaction_style = kCompactionStyleLevel; + options.level0_file_num_compaction_trigger = 2; + options.max_background_compactions = 1; + options.max_background_flushes = 1; + options.max_write_buffer_number = 10; + options.min_write_buffer_number_to_merge = 1; + options.max_write_buffer_number_to_maintain = 0; + options.write_buffer_size = 1000000; + Reopen(options); + + std::string big_value(1000000 * 2, 'x'); + std::string num; + uint64_t int_num; + + ASSERT_OK(dbfull()->Put(writeOpt, "k1", big_value)); + Flush(); + ASSERT_TRUE(dbfull()->GetIntProperty( + "rocksdb.estimate-pending-compaction-bytes", &int_num)); + ASSERT_EQ(int_num, 0U); + + ASSERT_OK(dbfull()->Put(writeOpt, "k2", big_value)); + Flush(); + ASSERT_TRUE(dbfull()->GetIntProperty( + "rocksdb.estimate-pending-compaction-bytes", &int_num)); + ASSERT_EQ(int_num, 0U); + + ASSERT_OK(dbfull()->Put(writeOpt, "k3", big_value)); + Flush(); + ASSERT_TRUE(dbfull()->GetIntProperty( + "rocksdb.estimate-pending-compaction-bytes", &int_num)); + ASSERT_GT(int_num, 0U); + + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); + + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(dbfull()->GetIntProperty( + "rocksdb.estimate-pending-compaction-bytes", &int_num)); + ASSERT_EQ(int_num, 0U); +} + TEST_F(DBTest, FLUSH) { do { CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); diff --git a/db/internal_stats.cc b/db/internal_stats.cc index 5e9333a13..19ab73659 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -123,6 +123,8 @@ static const std::string num_live_versions = "num-live-versions"; static const std::string estimate_live_data_size = "estimate-live-data-size"; static const std::string base_level = "base-level"; static const std::string total_sst_files_size = "total-sst-files-size"; +static const std::string estimate_pending_comp_bytes = + "estimate-pending-compaction-bytes"; const std::string DB::Properties::kNumFilesAtLevelPrefix = rocksdb_prefix + num_files_at_level_prefix; @@ -168,6 +170,8 @@ const std::string DB::Properties::kEstimateLiveDataSize = rocksdb_prefix + estimate_live_data_size; const std::string DB::Properties::kTotalSstFilesSize = rocksdb_prefix + total_sst_files_size; +const std::string DB::Properties::kEstimatePendingCompactionBytes = + rocksdb_prefix + estimate_pending_comp_bytes; DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property, bool* need_out_of_mutex) { @@ -241,6 +245,8 @@ DBPropertyType GetPropertyType(const Slice& property, bool* is_int_property, return kBaseLevel; } else if (in == total_sst_files_size) { return kTotalSstFilesSize; + } else if (in == estimate_pending_comp_bytes) { + return kEstimatePendingCompactionBytes; } return kUnknown; } @@ -409,6 +415,9 @@ bool InternalStats::GetIntProperty(DBPropertyType property_type, case kTotalSstFilesSize: *value = cfd_->GetTotalSstFilesSize(); return true; + case kEstimatePendingCompactionBytes: + *value = vstorage->estimated_compaction_needed_bytes(); + return true; default: return false; } diff --git a/db/internal_stats.h b/db/internal_stats.h index 57d9e4ab5..c746f397f 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -60,9 +60,10 @@ enum DBPropertyType : uint32_t { kNumSnapshots, // Number of snapshots in the system kOldestSnapshotTime, // Unix timestamp of the first snapshot kNumLiveVersions, - kEstimateLiveDataSize, // Estimated amount of live data in bytes - kTotalSstFilesSize, // Total size of all sst files. - kBaseLevel, // The level that L0 data is compacted to + kEstimateLiveDataSize, // Estimated amount of live data in bytes + kTotalSstFilesSize, // Total size of all sst files. + kBaseLevel, // The level that L0 data is compacted to + kEstimatePendingCompactionBytes, // Estimated bytes to compaction }; extern DBPropertyType GetPropertyType(const Slice& property, diff --git a/db/version_set.cc b/db/version_set.cc index e7284a0d6..2740e45ea 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -762,6 +762,7 @@ VersionStorageInfo::VersionStorageInfo( accumulated_num_non_deletions_(0), accumulated_num_deletions_(0), num_samples_(0), + estimated_compaction_needed_bytes_(0), finalized_(false) { if (ref_vstorage != nullptr) { accumulated_file_size_ = ref_vstorage->accumulated_file_size_; @@ -1014,6 +1015,62 @@ int VersionStorageInfo::MaxInputLevel() const { return 0; } +void VersionStorageInfo::EstimateCompactionBytesNeeded( + const MutableCFOptions& mutable_cf_options) { + // Only implemented for level-based compaction + if (compaction_style_ != kCompactionStyleLevel) { + return; + } + + // Start from Level 0, if level 0 qualifies compaction to level 1, + // we estimate the size of compaction. + // Then we move on to the next level and see whether it qualifies compaction + // to the next level. The size of the level is estimated as the actual size + // on the level plus the input bytes from the previous level if there is any. + // If it exceeds, take the exceeded bytes as compaction input and add the size + // of the compaction size to tatal size. + // We keep doing it to Level 2, 3, etc, until the last level and return the + // accumulated bytes. + + size_t bytes_compact_to_next_level = 0; + // Level 0 + bool level0_compact_triggered = false; + if (static_cast(files_[0].size()) > + mutable_cf_options.level0_file_num_compaction_trigger) { + level0_compact_triggered = true; + for (auto* f : files_[0]) { + bytes_compact_to_next_level += f->fd.GetFileSize(); + } + estimated_compaction_needed_bytes_ = bytes_compact_to_next_level; + } else { + estimated_compaction_needed_bytes_ = 0; + } + + // Level 1 and up. + for (int level = base_level(); level <= MaxInputLevel(); level++) { + size_t level_size = 0; + for (auto* f : files_[level]) { + level_size += f->fd.GetFileSize(); + } + if (level == base_level() && level0_compact_triggered) { + // Add base level size to compaction if level0 compaction triggered. + estimated_compaction_needed_bytes_ += level_size; + } + // Add size added by previous compaction + level_size += bytes_compact_to_next_level; + bytes_compact_to_next_level = 0; + size_t level_target = MaxBytesForLevel(level); + if (level_size > level_target) { + bytes_compact_to_next_level = level_size - level_target; + // Simplify to assume the actual compaction fan-out ratio is always + // mutable_cf_options.max_bytes_for_level_multiplier. + estimated_compaction_needed_bytes_ += + bytes_compact_to_next_level * + (1 + mutable_cf_options.max_bytes_for_level_multiplier); + } + } +} + void VersionStorageInfo::ComputeCompactionScore( const MutableCFOptions& mutable_cf_options, const CompactionOptionsFIFO& compaction_options_fifo) { @@ -1098,6 +1155,7 @@ void VersionStorageInfo::ComputeCompactionScore( } } ComputeFilesMarkedForCompaction(); + EstimateCompactionBytesNeeded(mutable_cf_options); } void VersionStorageInfo::ComputeFilesMarkedForCompaction() { diff --git a/db/version_set.h b/db/version_set.h index 39a7a2cf3..6702144b0 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -121,6 +121,10 @@ class VersionStorageInfo { const MutableCFOptions& mutable_cf_options, const CompactionOptionsFIFO& compaction_options_fifo); + // Estimate est_comp_needed_bytes_ + void EstimateCompactionBytesNeeded( + const MutableCFOptions& mutable_cf_options); + // This computes files_marked_for_compaction_ and is called by // ComputeCompactionScore() void ComputeFilesMarkedForCompaction(); @@ -315,6 +319,10 @@ class VersionStorageInfo { // Returns an estimate of the amount of live data in bytes. uint64_t EstimateLiveDataSize() const; + uint64_t estimated_compaction_needed_bytes() const { + return estimated_compaction_needed_bytes_; + } + private: const InternalKeyComparator* internal_comparator_; const Comparator* user_comparator_; @@ -389,6 +397,9 @@ class VersionStorageInfo { uint64_t accumulated_num_deletions_; // the number of samples uint64_t num_samples_; + // Estimated bytes needed to be compacted until all levels' size is down to + // target sizes. + uint64_t estimated_compaction_needed_bytes_; bool finalized_; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 311ed8983..0fb11dee8 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -330,6 +330,10 @@ class DB { // "rocksdb.estimate-live-data-size" // "rocksdb.total-sst-files-size" - total size of all used sst files, this may // slow down online queries if there are too many files. +// "rocksdb.base-level" +// "rocksdb.estimate-pending-compaction-bytes" - estimated total number of +// bytes compaction needs to rewrite the data to get all levels down +// to under target size. Not valid for other compactions than level-based. #ifndef ROCKSDB_LITE struct Properties { static const std::string kNumFilesAtLevelPrefix; @@ -356,6 +360,7 @@ class DB { static const std::string kNumLiveVersions; static const std::string kEstimateLiveDataSize; static const std::string kTotalSstFilesSize; + static const std::string kEstimatePendingCompactionBytes; }; #endif /* ROCKSDB_LITE */ @@ -387,6 +392,8 @@ class DB { // "rocksdb.num-live-versions" // "rocksdb.estimate-live-data-size" // "rocksdb.total-sst-files-size" + // "rocksdb.base-level" + // "rocksdb.estimate-pending-compaction-bytes" virtual bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property, uint64_t* value) = 0; virtual bool GetIntProperty(const Slice& property, uint64_t* value) {