Improve Write Stalling System

Summary:
Current write stalling system has the problem of lacking of positive feedback if the restricted rate is already too low. Users sometimes stack in very low slowdown value. With the diff, we add a positive feedback (increasing the slowdown value) if we recover from slowdown state back to normal. To avoid the positive feedback to keep the slowdown value to be to high, we add issue a negative feedback every time we are close to the stop condition. Experiments show it is easier to reach a relative balance than before.

Also increase level0_stop_writes_trigger default from 24 to 32. Since level0_slowdown_writes_trigger default is 20, stop trigger 24 only gives four files as the buffer time to slowdown writes. In order to avoid stop in four files while 20 files have been accumulated, the slowdown value must be very low, which is amost the same as stop. It also doesn't give enough time for the slowdown value to converge. Increase it to 32 will smooth out the system.
Closes https://github.com/facebook/rocksdb/pull/1562

Differential Revision: D4218519

Pulled By: siying

fbshipit-source-id: 95e4088
main
Siying Dong 8 years ago committed by Facebook Github Bot
parent dfb6fe6755
commit cd7c4143d7
  1. 4
      HISTORY.md
  2. 118
      db/column_family.cc
  3. 2
      db/column_family.h
  4. 35
      db/column_family_test.cc
  5. 6
      db/db_test.cc
  6. 2
      db/write_controller.h
  7. 6
      db/write_controller_test.cc
  8. 5
      util/options.cc

@ -1,4 +1,8 @@
# Rocksdb Change Log # Rocksdb Change Log
## Unreleased
### Public API Change
* Options.level0_stop_writes_trigger default value changes from 24 to 32.
## 5.0.0 (11/17/2016) ## 5.0.0 (11/17/2016)
### Public API Change ### Public API Change
* Options::max_bytes_for_level_multiplier is now a double along with all getters and setters. * Options::max_bytes_for_level_multiplier is now a double along with all getters and setters.

@ -492,14 +492,18 @@ ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const {
return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_); return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_);
} }
const double kSlowdownRatio = 1.2; const double kIncSlowdownRatio = 0.8;
const double kDecSlowdownRatio = 1 / kIncSlowdownRatio;
const double kNearStopSlowdownRatio = 0.6;
const double kDelayRecoverSlowdownRatio = 1.4;
namespace { namespace {
// If penalize_stop is true, we further reduce slowdown rate.
std::unique_ptr<WriteControllerToken> SetupDelay( std::unique_ptr<WriteControllerToken> SetupDelay(
WriteController* write_controller, WriteController* write_controller, uint64_t compaction_needed_bytes,
uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes, uint64_t prev_compaction_need_bytes, bool penalize_stop,
bool auto_comapctions_disabled) { bool auto_comapctions_disabled) {
const uint64_t kMinWriteRate = 1024u; // Minimum write rate 1KB/s. const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s.
uint64_t max_write_rate = write_controller->max_delayed_write_rate(); uint64_t max_write_rate = write_controller->max_delayed_write_rate();
uint64_t write_rate = write_controller->delayed_write_rate(); uint64_t write_rate = write_controller->delayed_write_rate();
@ -524,19 +528,32 @@ std::unique_ptr<WriteControllerToken> SetupDelay(
// insert to mem tables, so we need to actively slow down before we get // insert to mem tables, so we need to actively slow down before we get
// feedback signal from compaction and flushes to avoid the full stop // feedback signal from compaction and flushes to avoid the full stop
// because of hitting the max write buffer number. // because of hitting the max write buffer number.
if (prev_compaction_neeed_bytes > 0 && //
prev_compaction_neeed_bytes <= compaction_needed_bytes) { // If DB just falled into the stop condition, we need to further reduce
write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) / // the write rate to avoid the stop condition.
kSlowdownRatio); if (penalize_stop) {
// Penalize the near stop or stop condition by more agressive slowdown.
// This is to provide the long term slowdown increase signal.
// The penalty is more than the reward of recovering to the normal
// condition.
write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
kNearStopSlowdownRatio);
if (write_rate < kMinWriteRate) { if (write_rate < kMinWriteRate) {
write_rate = kMinWriteRate; write_rate = kMinWriteRate;
} }
} else if (prev_compaction_neeed_bytes > compaction_needed_bytes) { } else if (prev_compaction_need_bytes > 0 &&
prev_compaction_need_bytes <= compaction_needed_bytes) {
write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
kIncSlowdownRatio);
if (write_rate < kMinWriteRate) {
write_rate = kMinWriteRate;
}
} else if (prev_compaction_need_bytes > compaction_needed_bytes) {
// We are speeding up by ratio of kSlowdownRatio when we have paid // We are speeding up by ratio of kSlowdownRatio when we have paid
// compaction debt. But we'll never speed up to faster than the write rate // compaction debt. But we'll never speed up to faster than the write rate
// given by users. // given by users.
write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) * write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) *
kSlowdownRatio); kDecSlowdownRatio);
if (write_rate > max_write_rate) { if (write_rate > max_write_rate) {
write_rate = max_write_rate; write_rate = max_write_rate;
} }
@ -589,6 +606,9 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
uint64_t compaction_needed_bytes = uint64_t compaction_needed_bytes =
vstorage->estimated_compaction_needed_bytes(); vstorage->estimated_compaction_needed_bytes();
bool was_stopped = write_controller->IsStopped();
bool needed_delay = write_controller->NeedsDelay();
if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) { if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken(); write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1); internal_stats_->AddCFStats(InternalStats::MEMTABLE_COMPACTION, 1);
@ -625,7 +645,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.max_write_buffer_number - 1) { mutable_cf_options.max_write_buffer_number - 1) {
write_controller_token_ = write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes, SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, prev_compaction_needed_bytes_, was_stopped,
mutable_cf_options.disable_auto_compactions); mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
@ -639,9 +659,12 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.level0_slowdown_writes_trigger >= 0 && mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->l0_delay_trigger_count() >= vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) { mutable_cf_options.level0_slowdown_writes_trigger) {
// L0 is the last two files from stopping.
bool near_stop = vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger - 2;
write_controller_token_ = write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes, SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, prev_compaction_needed_bytes_, was_stopped || near_stop,
mutable_cf_options.disable_auto_compactions); mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1); internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) { if (compaction_picker_->IsLevel0CompactionInProgress()) {
@ -657,9 +680,20 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
vstorage->estimated_compaction_needed_bytes() >= vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit) { mutable_cf_options.soft_pending_compaction_bytes_limit) {
// If the distance to hard limit is less than 1/4 of the gap between soft
// and
// hard bytes limit, we think it is near stop and speed up the slowdown.
bool near_stop =
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
(compaction_needed_bytes -
mutable_cf_options.soft_pending_compaction_bytes_limit) >
3 * (mutable_cf_options.hard_pending_compaction_bytes_limit -
mutable_cf_options.soft_pending_compaction_bytes_limit) /
4;
write_controller_token_ = write_controller_token_ =
SetupDelay(write_controller, compaction_needed_bytes, SetupDelay(write_controller, compaction_needed_bytes,
prev_compaction_needed_bytes_, prev_compaction_needed_bytes_, was_stopped || near_stop,
mutable_cf_options.disable_auto_compactions); mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats( internal_stats_->AddCFStats(
InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1); InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
@ -668,31 +702,43 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"bytes %" PRIu64 " rate %" PRIu64, "bytes %" PRIu64 " rate %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes(), name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate()); write_controller->delayed_write_rate());
} else if (vstorage->l0_delay_trigger_count() >= } else {
GetL0ThresholdSpeedupCompaction( if (vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_file_num_compaction_trigger, GetL0ThresholdSpeedupCompaction(
mutable_cf_options.level0_slowdown_writes_trigger)) { mutable_cf_options.level0_file_num_compaction_trigger,
write_controller_token_ = write_controller->GetCompactionPressureToken(); mutable_cf_options.level0_slowdown_writes_trigger)) {
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, write_controller_token_ =
"[%s] Increasing compaction threads because we have %d level-0 " write_controller->GetCompactionPressureToken();
"files ",
name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
// Increase compaction threads if bytes needed for compaction exceeds
// 1/4 of threshold for slowing down.
// If soft pending compaction byte limit is not set, always speed up
// compaction.
write_controller_token_ = write_controller->GetCompactionPressureToken();
if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Increasing compaction threads because of estimated pending " "[%s] Increasing compaction threads because we have %d level-0 "
"compaction " "files ",
"bytes %" PRIu64, name_.c_str(), vstorage->l0_delay_trigger_count());
name_.c_str(), vstorage->estimated_compaction_needed_bytes()); } else if (vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit / 4) {
// Increase compaction threads if bytes needed for compaction exceeds
// 1/4 of threshold for slowing down.
// If soft pending compaction byte limit is not set, always speed up
// compaction.
write_controller_token_ =
write_controller->GetCompactionPressureToken();
if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) {
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Increasing compaction threads because of estimated pending "
"compaction "
"bytes %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes());
}
} else {
write_controller_token_.reset();
}
// If the DB recovers from delay conditions, we reward with reducing
// double the slowdown ratio. This is to balance the long term slowdown
// increase signal.
if (needed_delay) {
uint64_t write_rate = write_controller->delayed_write_rate();
write_controller->set_delayed_write_rate(static_cast<uint64_t>(
static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio));
} }
} else {
write_controller_token_.reset();
} }
prev_compaction_needed_bytes_ = compaction_needed_bytes; prev_compaction_needed_bytes_ = compaction_needed_bytes;
} }

@ -42,7 +42,7 @@ class LogBuffer;
class InstrumentedMutex; class InstrumentedMutex;
class InstrumentedMutexLock; class InstrumentedMutexLock;
extern const double kSlowdownRatio; extern const double kIncSlowdownRatio;
// ColumnFamilyHandleImpl is the class that clients use to access different // ColumnFamilyHandleImpl is the class that clients use to access different
// column families. It has non-trivial destructor, which gets called when client // column families. It has non-trivial destructor, which gets called when client

@ -2440,7 +2440,7 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) {
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
const uint64_t kBaseRate = 810000u; const uint64_t kBaseRate = 800000u;
db_options_.delayed_write_rate = kBaseRate; db_options_.delayed_write_rate = kBaseRate;
db_options_.base_background_compactions = 2; db_options_.base_background_compactions = 2;
db_options_.max_background_compactions = 6; db_options_.max_background_compactions = 6;
@ -2475,7 +2475,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
@ -2483,14 +2483,14 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2, ASSERT_EQ(kBaseRate / 1.25 / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(450); vstorage->TEST_set_estimated_compaction_needed_bytes(450);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(205); vstorage->TEST_set_estimated_compaction_needed_bytes(205);
@ -2526,7 +2526,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(2001); vstorage->TEST_set_estimated_compaction_needed_bytes(2001);
@ -2544,7 +2544,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(100); vstorage->TEST_set_estimated_compaction_needed_bytes(100);
@ -2556,15 +2556,14 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
dbfull()->TEST_write_controler().delayed_write_rate());
ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed()); ASSERT_EQ(6, dbfull()->TEST_BGCompactionsAllowed());
vstorage->set_l0_delay_trigger_count(101); vstorage->set_l0_delay_trigger_count(101);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(0); vstorage->set_l0_delay_trigger_count(0);
@ -2572,21 +2571,21 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2, ASSERT_EQ(kBaseRate / 1.25 / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(101); vstorage->set_l0_delay_trigger_count(101);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2 / 1.2, ASSERT_EQ(kBaseRate / 1.25 / 1.25 / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(200); vstorage->TEST_set_estimated_compaction_needed_bytes(200);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2, ASSERT_EQ(kBaseRate / 1.25 / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(0); vstorage->set_l0_delay_trigger_count(0);
@ -2627,7 +2626,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
} }
@ -2744,35 +2743,35 @@ TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
cfd1->RecalculateWriteStallConditions(mutable_cf_options); cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(300); vstorage->TEST_set_estimated_compaction_needed_bytes(300);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2, ASSERT_EQ(kBaseRate / 1.25 / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage1->TEST_set_estimated_compaction_needed_bytes(700); vstorage1->TEST_set_estimated_compaction_needed_bytes(700);
cfd1->RecalculateWriteStallConditions(mutable_cf_options); cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(500); vstorage->TEST_set_estimated_compaction_needed_bytes(500);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2, ASSERT_EQ(kBaseRate / 1.25 / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
vstorage1->TEST_set_estimated_compaction_needed_bytes(600); vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
cfd1->RecalculateWriteStallConditions(mutable_cf_options); cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2, ASSERT_EQ(kBaseRate / 1.25,
dbfull()->TEST_write_controler().delayed_write_rate()); dbfull()->TEST_write_controler().delayed_write_rate());
} }

@ -5378,7 +5378,7 @@ TEST_F(DBTest, FlushesInParallelWithCompactRange) {
TEST_F(DBTest, DelayedWriteRate) { TEST_F(DBTest, DelayedWriteRate) {
const int kEntriesPerMemTable = 100; const int kEntriesPerMemTable = 100;
const int kTotalFlushes = 20; const int kTotalFlushes = 12;
Options options = CurrentOptions(); Options options = CurrentOptions();
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
@ -5428,8 +5428,8 @@ TEST_F(DBTest, DelayedWriteRate) {
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
estimated_sleep_time += size_memtable * 1000000u / cur_rate; estimated_sleep_time += size_memtable * 1000000u / cur_rate;
// Slow down twice. One for memtable switch and one for flush finishes. // Slow down twice. One for memtable switch and one for flush finishes.
cur_rate = static_cast<uint64_t>(static_cast<double>(cur_rate) / cur_rate = static_cast<uint64_t>(static_cast<double>(cur_rate) *
kSlowdownRatio / kSlowdownRatio); kIncSlowdownRatio * kIncSlowdownRatio);
} }
// Estimate the total sleep time fall into the rough range. // Estimate the total sleep time fall into the rough range.
ASSERT_GT(env_->addon_time_.load(), ASSERT_GT(env_->addon_time_.load(),

@ -57,6 +57,8 @@ class WriteController {
// avoid divide 0 // avoid divide 0
if (write_rate == 0) { if (write_rate == 0) {
write_rate = 1u; write_rate = 1u;
} else if (write_rate > max_delayed_write_rate()) {
write_rate = max_delayed_write_rate();
} }
delayed_write_rate_ = write_rate; delayed_write_rate_ = write_rate;
} }

@ -21,7 +21,8 @@ class TimeSetEnv : public EnvWrapper {
TEST_F(WriteControllerTest, ChangeDelayRateTest) { TEST_F(WriteControllerTest, ChangeDelayRateTest) {
TimeSetEnv env; TimeSetEnv env;
WriteController controller(10000000u); WriteController controller(40000000u); // also set max delayed rate
controller.set_delayed_write_rate(10000000u);
auto delay_token_0 = auto delay_token_0 =
controller.GetDelayToken(controller.delayed_write_rate()); controller.GetDelayToken(controller.delayed_write_rate());
ASSERT_EQ(static_cast<uint64_t>(2000000), ASSERT_EQ(static_cast<uint64_t>(2000000),
@ -35,8 +36,9 @@ TEST_F(WriteControllerTest, ChangeDelayRateTest) {
auto delay_token_3 = controller.GetDelayToken(20000000u); auto delay_token_3 = controller.GetDelayToken(20000000u);
ASSERT_EQ(static_cast<uint64_t>(1000000), ASSERT_EQ(static_cast<uint64_t>(1000000),
controller.GetDelay(&env, 20000000u)); controller.GetDelay(&env, 20000000u));
// This is more than max rate. Max delayed rate will be used.
auto delay_token_4 = auto delay_token_4 =
controller.GetDelayToken(controller.delayed_write_rate() * 2); controller.GetDelayToken(controller.delayed_write_rate() * 3);
ASSERT_EQ(static_cast<uint64_t>(500000), ASSERT_EQ(static_cast<uint64_t>(500000),
controller.GetDelay(&env, 20000000u)); controller.GetDelay(&env, 20000000u));
} }

@ -50,7 +50,7 @@ ColumnFamilyOptions::ColumnFamilyOptions()
num_levels(7), num_levels(7),
level0_file_num_compaction_trigger(4), level0_file_num_compaction_trigger(4),
level0_slowdown_writes_trigger(20), level0_slowdown_writes_trigger(20),
level0_stop_writes_trigger(24), level0_stop_writes_trigger(32),
target_file_size_base(64 * 1048576), target_file_size_base(64 * 1048576),
target_file_size_multiplier(1), target_file_size_multiplier(1),
max_bytes_for_level_base(256 * 1048576), max_bytes_for_level_base(256 * 1048576),
@ -672,6 +672,9 @@ ColumnFamilyOptions* ColumnFamilyOptions::OldDefaults(
soft_pending_compaction_bytes_limit = 0; soft_pending_compaction_bytes_limit = 0;
hard_pending_compaction_bytes_limit = 0; hard_pending_compaction_bytes_limit = 0;
} }
if (rocksdb_major_version < 5) {
level0_stop_writes_trigger = 24;
}
compaction_pri = CompactionPri::kByCompensatedSize; compaction_pri = CompactionPri::kByCompensatedSize;
return this; return this;

Loading…
Cancel
Save