diff --git a/HISTORY.md b/HISTORY.md index fd2576caf..16ee9609d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Deprecate options.soft_rate_limit and add options.soft_pending_compaction_bytes_limit. * If options.max_write_buffer_number > 3, writes will be slowed down when writing to the last write buffer to delay a full stop. * Introduce CompactionJobInfo::compaction_reason, this field include the reason to trigger the compaction. +* After slow down is triggered, if estimated pending compaction bytes keep increasing, slowdown more. ## 4.3.0 (12/8/2015) ### New Features diff --git a/db/column_family.cc b/db/column_family.cc index 04f5db67d..2ef5a907f 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -315,7 +315,8 @@ ColumnFamilyData::ColumnFamilyData( log_number_(0), column_family_set_(column_family_set), pending_flush_(false), - pending_compaction_(false) { + pending_compaction_(false), + prev_compaction_needed_bytes_(0) { Ref(); // Convert user defined table properties collector factories to internal ones. @@ -433,11 +434,64 @@ void ColumnFamilyData::SetDropped() { column_family_set_->RemoveColumnFamily(this); } +const double kSlowdownRatio = 1.2; + +namespace { +std::unique_ptr SetupDelay( + uint64_t max_write_rate, WriteController* write_controller, + uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes, + bool auto_comapctions_disabled) { + const uint64_t kMinWriteRate = 1024u; // Minimum write rate 1KB/s. + + uint64_t write_rate = write_controller->delayed_write_rate(); + + if (auto_comapctions_disabled) { + // When auto compaction is disabled, always use the value user gave. + write_rate = max_write_rate; + } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) { + // If user gives rate less than kMinWriteRate, don't adjust it. + // + // If already delayed, need to adjust based on previous compaction debt. + // When there are two or more column families require delay, we always + // increase or reduce write rate based on information for one single + // column family. It is likely to be OK but we can improve if there is a + // problem. + // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes + // is only available in level-based compaction + // + // If the compaction debt stays the same as previously, we also further slow + // down. It usually means a mem table is full. It's mainly for the case + // where both of flush and compaction are much slower than the speed we + // insert to mem tables, so we need to actively slow down before we get + // feedback signal from compaction and flushes to avoid the full stop + // because of hitting the max write buffer number. + if (prev_compaction_neeed_bytes > 0 && + prev_compaction_neeed_bytes <= compaction_needed_bytes) { + write_rate /= kSlowdownRatio; + if (write_rate < kMinWriteRate) { + write_rate = kMinWriteRate; + } + } else if (prev_compaction_neeed_bytes > compaction_needed_bytes) { + // We are speeding up by ratio of kSlowdownRatio when we have paid + // compaction debt. But we'll never speed up to faster than the write rate + // given by users. + write_rate *= kSlowdownRatio; + if (write_rate > max_write_rate) { + write_rate = max_write_rate; + } + } + } + return write_controller->GetDelayToken(write_rate); +} +} // namespace + void ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { if (current_ != nullptr) { auto* vstorage = current_->storage_info(); auto write_controller = column_family_set_->write_controller_; + uint64_t compaction_needed_bytes = + vstorage->estimated_compaction_needed_bytes(); if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) { write_controller_token_ = write_controller->GetStopToken(); @@ -450,13 +504,18 @@ void ColumnFamilyData::RecalculateWriteStallConditions( } else if (mutable_cf_options.max_write_buffer_number > 3 && imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number - 1) { - write_controller_token_ = write_controller->GetDelayToken(); + write_controller_token_ = + SetupDelay(ioptions_.delayed_write_rate, write_controller, + compaction_needed_bytes, prev_compaction_needed_bytes_, + mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stalling writes because we have %d immutable memtables " - "(waiting for flush), max_write_buffer_number is set to %d", + "(waiting for flush), max_write_buffer_number is set to %d " + "rate %" PRIu64, name_.c_str(), imm()->NumNotFlushed(), - mutable_cf_options.max_write_buffer_number); + mutable_cf_options.max_write_buffer_number, + write_controller->delayed_write_rate()); } else if (vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); @@ -469,7 +528,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), vstorage->l0_delay_trigger_count()); } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && - vstorage->estimated_compaction_needed_bytes() >= + compaction_needed_bytes >= mutable_cf_options.hard_pending_compaction_bytes_limit) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats( @@ -477,32 +536,42 @@ void ColumnFamilyData::RecalculateWriteStallConditions( Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stopping writes because of estimated pending compaction " "bytes %" PRIu64, - name_.c_str(), vstorage->estimated_compaction_needed_bytes()); + name_.c_str(), compaction_needed_bytes); } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_slowdown_writes_trigger) { - write_controller_token_ = write_controller->GetDelayToken(); + write_controller_token_ = + SetupDelay(ioptions_.delayed_write_rate, write_controller, + compaction_needed_bytes, prev_compaction_needed_bytes_, + mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { internal_stats_->AddCFStats( InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1); } Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, - "[%s] Stalling writes because we have %d level-0 files", - name_.c_str(), vstorage->l0_delay_trigger_count()); + "[%s] Stalling writes because we have %d level-0 files " + "rate %" PRIu64, + name_.c_str(), vstorage->l0_delay_trigger_count(), + write_controller->delayed_write_rate()); } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && vstorage->estimated_compaction_needed_bytes() >= mutable_cf_options.soft_pending_compaction_bytes_limit) { - write_controller_token_ = write_controller->GetDelayToken(); + write_controller_token_ = + SetupDelay(ioptions_.delayed_write_rate, write_controller, + compaction_needed_bytes, prev_compaction_needed_bytes_, + mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats( InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1); Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stalling writes because of estimated pending compaction " - "bytes %" PRIu64, - name_.c_str(), vstorage->estimated_compaction_needed_bytes()); + "bytes %" PRIu64 " rate %" PRIu64, + name_.c_str(), vstorage->estimated_compaction_needed_bytes(), + write_controller->delayed_write_rate()); } else { write_controller_token_.reset(); } + prev_compaction_needed_bytes_ = compaction_needed_bytes; } } diff --git a/db/column_family.h b/db/column_family.h index 40a6d6910..64bb1c9a1 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -44,6 +44,8 @@ class LogBuffer; class InstrumentedMutex; class InstrumentedMutexLock; +extern const double kSlowdownRatio; + // ColumnFamilyHandleImpl is the class that clients use to access different // column families. It has non-trivial destructor, which gets called when client // is done using the column family @@ -305,6 +307,14 @@ class ColumnFamilyData { bool pending_flush() { return pending_flush_; } bool pending_compaction() { return pending_compaction_; } + // Recalculate some small conditions, which are changed only during + // compaction, adding new memtable and/or + // recalculation of compaction score. These values are used in + // DBImpl::MakeRoomForWrite function to decide, if it need to make + // a write stall + void RecalculateWriteStallConditions( + const MutableCFOptions& mutable_cf_options); + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -314,14 +324,6 @@ class ColumnFamilyData { const DBOptions* db_options, const EnvOptions& env_options, ColumnFamilySet* column_family_set); - // Recalculate some small conditions, which are changed only during - // compaction, adding new memtable and/or - // recalculation of compaction score. These values are used in - // DBImpl::MakeRoomForWrite function to decide, if it need to make - // a write stall - void RecalculateWriteStallConditions( - const MutableCFOptions& mutable_cf_options); - uint32_t id_; const std::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. @@ -382,6 +384,8 @@ class ColumnFamilyData { // If true --> this ColumnFamily is currently present in // DBImpl::compaction_queue_ bool pending_compaction_; + + uint64_t prev_compaction_needed_bytes_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/column_family_test.cc b/db/column_family_test.cc index e4e18c2f4..1e5d5fd79 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -2127,6 +2127,274 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) { } #endif // !ROCKSDB_LITE +TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { + const uint64_t kBaseRate = 810000u; + db_options_.delayed_write_rate = kBaseRate; + Open({"default"}); + ColumnFamilyData* cfd = + static_cast(db_->DefaultColumnFamily())->cfd(); + + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + + MutableCFOptions mutable_cf_options( + Options(db_options_, column_family_options_), + ImmutableCFOptions(Options(db_options_, column_family_options_))); + + mutable_cf_options.level0_slowdown_writes_trigger = 20; + mutable_cf_options.level0_stop_writes_trigger = 10000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + + vstorage->TEST_set_estimated_compaction_needed_bytes(50); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(201); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(400); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(500); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2 / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(450); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(205); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(202); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(201); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(198); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(399); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(599); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(2001); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(3001); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(390); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(100); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + vstorage->set_l0_delay_trigger_count(100); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->set_l0_delay_trigger_count(101); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2 / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->set_l0_delay_trigger_count(0); + vstorage->TEST_set_estimated_compaction_needed_bytes(300); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->set_l0_delay_trigger_count(101); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2 / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(200); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->set_l0_delay_trigger_count(0); + vstorage->TEST_set_estimated_compaction_needed_bytes(0); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + mutable_cf_options.disable_auto_compactions = true; + dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + vstorage->set_l0_delay_trigger_count(50); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->set_l0_delay_trigger_count(60); + vstorage->TEST_set_estimated_compaction_needed_bytes(300); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->set_l0_delay_trigger_count(70); + vstorage->TEST_set_estimated_compaction_needed_bytes(500); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + mutable_cf_options.disable_auto_compactions = false; + vstorage->set_l0_delay_trigger_count(71); + vstorage->TEST_set_estimated_compaction_needed_bytes(501); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + Close(); +} + +TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) { + const uint64_t kBaseRate = 810000u; + db_options_.delayed_write_rate = kBaseRate; + Open(); + CreateColumnFamilies({"one"}); + ColumnFamilyData* cfd = + static_cast(db_->DefaultColumnFamily())->cfd(); + VersionStorageInfo* vstorage = cfd->current()->storage_info(); + + ColumnFamilyData* cfd1 = + static_cast(handles_[1])->cfd(); + VersionStorageInfo* vstorage1 = cfd1->current()->storage_info(); + + MutableCFOptions mutable_cf_options( + Options(db_options_, column_family_options_), + ImmutableCFOptions(Options(db_options_, column_family_options_))); + mutable_cf_options.level0_slowdown_writes_trigger = 20; + mutable_cf_options.level0_stop_writes_trigger = 10000; + mutable_cf_options.soft_pending_compaction_bytes_limit = 200; + mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + + MutableCFOptions mutable_cf_options1 = mutable_cf_options; + mutable_cf_options1.soft_pending_compaction_bytes_limit = 500; + + vstorage->TEST_set_estimated_compaction_needed_bytes(50); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(201); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(600); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(70); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(800); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(300); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2 / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(700); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage->TEST_set_estimated_compaction_needed_bytes(500); + cfd->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2 / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); + + vstorage1->TEST_set_estimated_compaction_needed_bytes(600); + cfd1->RecalculateWriteStallConditions(mutable_cf_options); + ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_EQ(kBaseRate / 1.2, + dbfull()->TEST_write_controler().delayed_write_rate()); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.h b/db/db_impl.h index a37a7a128..a6653b091 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -341,9 +341,7 @@ class DBImpl : public DB { Cache* TEST_table_cache() { return table_cache_.get(); } - const WriteController& TEST_write_controler() const { - return write_controller_; - } + WriteController& TEST_write_controler() { return write_controller_; } #endif // NDEBUG diff --git a/db/db_test.cc b/db/db_test.cc index 79eb0917a..741b0168d 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -9042,47 +9042,68 @@ TEST_F(DBTest, FlushesInParallelWithCompactRange) { } TEST_F(DBTest, DelayedWriteRate) { + const int kEntriesPerMemTable = 100; + const int kTotalFlushes = 20; + Options options; + env_->SetBackgroundThreads(1, Env::LOW); options.env = env_; env_->no_sleep_ = true; options = CurrentOptions(options); - options.write_buffer_size = 100000; // Small write buffer + options.write_buffer_size = 100000000; options.max_write_buffer_number = 256; - options.disable_auto_compactions = true; + options.max_background_compactions = 1; options.level0_file_num_compaction_trigger = 3; options.level0_slowdown_writes_trigger = 3; options.level0_stop_writes_trigger = 999999; - options.delayed_write_rate = 200000; // About 200KB/s limited rate + options.delayed_write_rate = 20000000; // Start with 200MB/s + options.memtable_factory.reset( + new SpecialSkipListFactory(kEntriesPerMemTable)); CreateAndReopenWithCF({"pikachu"}, options); + // Block compactions + test::SleepingBackgroundTask sleeping_task_low; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, + Env::Priority::LOW); + for (int i = 0; i < 3; i++) { Put(Key(i), std::string(10000, 'x')); Flush(); } // These writes will be slowed down to 1KB/s - size_t estimated_total_size = 0; + uint64_t estimated_sleep_time = 0; Random rnd(301); - for (int i = 0; i < 3000; i++) { - auto rand_num = rnd.Uniform(20); - // Spread the size range to more. - size_t entry_size = rand_num * rand_num * rand_num; - WriteOptions wo; - Put(Key(i), std::string(entry_size, 'x'), wo); - estimated_total_size += entry_size + 20; - // Occasionally sleep a while - if (rnd.Uniform(20) == 6) { - env_->SleepForMicroseconds(2666); + Put("", ""); + uint64_t cur_rate = options.delayed_write_rate; + for (int i = 0; i < kTotalFlushes; i++) { + uint64_t size_memtable = 0; + for (int j = 0; j < kEntriesPerMemTable; j++) { + auto rand_num = rnd.Uniform(20); + // Spread the size range to more. + size_t entry_size = rand_num * rand_num * rand_num; + WriteOptions wo; + Put(Key(i), std::string(entry_size, 'x'), wo); + size_memtable += entry_size + 18; + // Occasionally sleep a while + if (rnd.Uniform(20) == 6) { + env_->SleepForMicroseconds(2666); + } } + dbfull()->TEST_WaitForFlushMemTable(); + estimated_sleep_time += size_memtable * 1000000u / cur_rate; + // Slow down twice. One for memtable switch and one for flush finishes. + cur_rate /= kSlowdownRatio * kSlowdownRatio; } - uint64_t estimated_sleep_time = - estimated_total_size / options.delayed_write_rate * 1000000U; - ASSERT_GT(env_->addon_time_.load(), estimated_sleep_time * 0.8); - ASSERT_LT(env_->addon_time_.load(), estimated_sleep_time * 1.1); + // Estimate the total sleep time fall into the rough range. + ASSERT_GT(env_->addon_time_.load(), estimated_sleep_time / 2); + ASSERT_LT(env_->addon_time_.load(), estimated_sleep_time * 2); env_->no_sleep_ = false; rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + sleeping_task_low.WakeUp(); + sleeping_task_low.WaitUntilDone(); } TEST_F(DBTest, HardLimit) { diff --git a/db/version_set.cc b/db/version_set.cc index 81e51ed31..a1e2a2dd1 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1119,6 +1119,7 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded( const MutableCFOptions& mutable_cf_options) { // Only implemented for level-based compaction if (compaction_style_ != kCompactionStyleLevel) { + estimated_compaction_needed_bytes_ = 0; return; } diff --git a/db/version_set.h b/db/version_set.h index edbb48404..2d9d93f6f 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -325,6 +325,10 @@ class VersionStorageInfo { return estimated_compaction_needed_bytes_; } + void TEST_set_estimated_compaction_needed_bytes(uint64_t v) { + estimated_compaction_needed_bytes_ = v; + } + private: const InternalKeyComparator* internal_comparator_; const Comparator* user_comparator_; diff --git a/db/write_controller.cc b/db/write_controller.cc index 3bbb11609..7a933ec42 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -16,11 +16,13 @@ std::unique_ptr WriteController::GetStopToken() { return std::unique_ptr(new StopWriteToken(this)); } -std::unique_ptr WriteController::GetDelayToken() { - if (total_delayed_++ == 0) { - last_refill_time_ = 0; - bytes_left_ = 0; - } +std::unique_ptr WriteController::GetDelayToken( + uint64_t write_rate) { + total_delayed_++; + // Reset counters. + last_refill_time_ = 0; + bytes_left_ = 0; + set_delayed_write_rate(write_rate); return std::unique_ptr(new DelayWriteToken(this)); } diff --git a/db/write_controller.h b/db/write_controller.h index 50e5a99be..a5d498c3a 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -20,12 +20,12 @@ class WriteControllerToken; // to be called while holding DB mutex class WriteController { public: - explicit WriteController(uint64_t delayed_write_rate = 1024u * 1024u * 32u) + explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u) : total_stopped_(0), total_delayed_(0), bytes_left_(0), last_refill_time_(0) { - set_delayed_write_rate(delayed_write_rate); + set_delayed_write_rate(_delayed_write_rate); } ~WriteController() = default; @@ -36,7 +36,8 @@ class WriteController { // writes to the DB will be controlled under the delayed write rate. Every // write needs to call GetDelay() with number of bytes writing to the DB, // which returns number of microseconds to sleep. - std::unique_ptr GetDelayToken(); + std::unique_ptr GetDelayToken( + uint64_t delayed_write_rate); // these two metods are querying the state of the WriteController bool IsStopped() const; @@ -45,13 +46,14 @@ class WriteController { // num_bytes: how many number of bytes to put into the DB. // Prerequisite: DB mutex held. uint64_t GetDelay(Env* env, uint64_t num_bytes); - void set_delayed_write_rate(uint64_t delayed_write_rate) { - delayed_write_rate_ = delayed_write_rate; - if (delayed_write_rate_ == 0) { - // avoid divide 0 - delayed_write_rate_ = 1U; + void set_delayed_write_rate(uint64_t write_rate) { + // avoid divide 0 + if (write_rate == 0) { + write_rate = 1u; } + delayed_write_rate_ = write_rate; } + uint64_t delayed_write_rate() const { return delayed_write_rate_; } private: friend class WriteControllerToken; diff --git a/db/write_controller_test.cc b/db/write_controller_test.cc index aa8175d65..dc5614855 100644 --- a/db/write_controller_test.cc +++ b/db/write_controller_test.cc @@ -19,6 +19,28 @@ class TimeSetEnv : public EnvWrapper { virtual uint64_t NowMicros() override { return now_micros_; } }; +TEST_F(WriteControllerTest, ChangeDelayRateTest) { + TimeSetEnv env; + WriteController controller(10000000u); + auto delay_token_0 = + controller.GetDelayToken(controller.delayed_write_rate()); + ASSERT_EQ(static_cast(2000000), + controller.GetDelay(&env, 20000000u)); + auto delay_token_1 = controller.GetDelayToken(2000000u); + ASSERT_EQ(static_cast(10000000), + controller.GetDelay(&env, 20000000u)); + auto delay_token_2 = controller.GetDelayToken(1000000u); + ASSERT_EQ(static_cast(20000000), + controller.GetDelay(&env, 20000000u)); + auto delay_token_3 = controller.GetDelayToken(20000000u); + ASSERT_EQ(static_cast(1000000), + controller.GetDelay(&env, 20000000u)); + auto delay_token_4 = + controller.GetDelayToken(controller.delayed_write_rate() * 2); + ASSERT_EQ(static_cast(500000), + controller.GetDelay(&env, 20000000u)); +} + TEST_F(WriteControllerTest, SanityTest) { WriteController controller(10000000u); auto stop_token_1 = controller.GetStopToken(); @@ -32,12 +54,19 @@ TEST_F(WriteControllerTest, SanityTest) { TimeSetEnv env; - auto delay_token_1 = controller.GetDelayToken(); + auto delay_token_1 = controller.GetDelayToken(10000000u); + ASSERT_EQ(static_cast(2000000), + controller.GetDelay(&env, 20000000u)); + + env.now_micros_ += 1999900u; // sleep debt 1000 + + auto delay_token_2 = controller.GetDelayToken(10000000u); + // Rate reset after changing the token. ASSERT_EQ(static_cast(2000000), controller.GetDelay(&env, 20000000u)); env.now_micros_ += 1999900u; // sleep debt 1000 - auto delay_token_2 = controller.GetDelayToken(); + // One refill: 10240 bytes allowed, 1000 used, 9240 left ASSERT_EQ(static_cast(1124), controller.GetDelay(&env, 1000u)); env.now_micros_ += 1124u; // sleep debt 0 diff --git a/include/rocksdb/immutable_options.h b/include/rocksdb/immutable_options.h index 589f14e99..52978691b 100644 --- a/include/rocksdb/immutable_options.h +++ b/include/rocksdb/immutable_options.h @@ -50,6 +50,8 @@ struct ImmutableCFOptions { Env* env; + uint64_t delayed_write_rate; + // Allow the OS to mmap file for reading sst tables. Default: false bool allow_mmap_reads; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a94571f95..8acddbfb7 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1162,9 +1162,12 @@ struct DBOptions { // Default: false bool enable_thread_tracking; - // The limited write rate to DB if soft_rate_limit or - // level0_slowdown_writes_trigger is triggered. It is calculated using - // size of user write requests before compression. + // The limited write rate to DB if soft_pending_compaction_bytes_limit or + // level0_slowdown_writes_trigger is triggered, or we are writing to the + // last mem table allowed and we allow more than 3 mem tables. It is + // calculated using size of user write requests before compression. + // RocksDB may decide to slow down more if the compaction still + // gets behind further. // Unit: byte per second. // // Default: 1MB/s diff --git a/util/options.cc b/util/options.cc index b59cc8d79..ad7dd9adc 100644 --- a/util/options.cc +++ b/util/options.cc @@ -51,6 +51,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options) info_log(options.info_log.get()), statistics(options.statistics.get()), env(options.env), + delayed_write_rate(options.delayed_write_rate), allow_mmap_reads(options.allow_mmap_reads), allow_mmap_writes(options.allow_mmap_writes), db_paths(options.db_paths),