When slowdown is triggered, reduce the write rate

Summary: It's usually hard for users to set a value of options.delayed_write_rate. With this diff, after slowdown condition triggers, we greedily reduce write rate if estimated pending compaction bytes increase. If estimated compaction pending bytes drop, we increase the write rate.

Test Plan:
Add a unit test
Test with db_bench setting:
TEST_TMPDIR=/dev/shm/ ./db_bench --benchmarks=fillrandom -num=10000000 --soft_pending_compaction_bytes_limit=1000000000 --hard_pending_compaction_bytes_limit=3000000000 --delayed_write_rate=100000000

and make sure without the commit, write stop will happen, but with the commit, it will not happen.

Reviewers: igor, anthony, rven, yhchiang, kradhakrishnan, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D52131
main
sdong 9 years ago
parent 445d5b8c5c
commit b9f77ba12b
  1. 1
      HISTORY.md
  2. 93
      db/column_family.cc
  3. 20
      db/column_family.h
  4. 268
      db/column_family_test.cc
  5. 4
      db/db_impl.h
  6. 57
      db/db_test.cc
  7. 1
      db/version_set.cc
  8. 4
      db/version_set.h
  9. 12
      db/write_controller.cc
  10. 18
      db/write_controller.h
  11. 33
      db/write_controller_test.cc
  12. 2
      include/rocksdb/immutable_options.h
  13. 9
      include/rocksdb/options.h
  14. 1
      util/options.cc

@ -6,6 +6,7 @@
* Deprecate options.soft_rate_limit and add options.soft_pending_compaction_bytes_limit. * Deprecate options.soft_rate_limit and add options.soft_pending_compaction_bytes_limit.
* If options.max_write_buffer_number > 3, writes will be slowed down when writing to the last write buffer to delay a full stop. * If options.max_write_buffer_number > 3, writes will be slowed down when writing to the last write buffer to delay a full stop.
* Introduce CompactionJobInfo::compaction_reason, this field include the reason to trigger the compaction. * Introduce CompactionJobInfo::compaction_reason, this field include the reason to trigger the compaction.
* After slow down is triggered, if estimated pending compaction bytes keep increasing, slowdown more.
## 4.3.0 (12/8/2015) ## 4.3.0 (12/8/2015)
### New Features ### New Features

@ -315,7 +315,8 @@ ColumnFamilyData::ColumnFamilyData(
log_number_(0), log_number_(0),
column_family_set_(column_family_set), column_family_set_(column_family_set),
pending_flush_(false), pending_flush_(false),
pending_compaction_(false) { pending_compaction_(false),
prev_compaction_needed_bytes_(0) {
Ref(); Ref();
// Convert user defined table properties collector factories to internal ones. // Convert user defined table properties collector factories to internal ones.
@ -433,11 +434,64 @@ void ColumnFamilyData::SetDropped() {
column_family_set_->RemoveColumnFamily(this); column_family_set_->RemoveColumnFamily(this);
} }
const double kSlowdownRatio = 1.2;
namespace {
std::unique_ptr<WriteControllerToken> SetupDelay(
uint64_t max_write_rate, WriteController* write_controller,
uint64_t compaction_needed_bytes, uint64_t prev_compaction_neeed_bytes,
bool auto_comapctions_disabled) {
const uint64_t kMinWriteRate = 1024u; // Minimum write rate 1KB/s.
uint64_t write_rate = write_controller->delayed_write_rate();
if (auto_comapctions_disabled) {
// When auto compaction is disabled, always use the value user gave.
write_rate = max_write_rate;
} else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) {
// If user gives rate less than kMinWriteRate, don't adjust it.
//
// If already delayed, need to adjust based on previous compaction debt.
// When there are two or more column families require delay, we always
// increase or reduce write rate based on information for one single
// column family. It is likely to be OK but we can improve if there is a
// problem.
// Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes
// is only available in level-based compaction
//
// If the compaction debt stays the same as previously, we also further slow
// down. It usually means a mem table is full. It's mainly for the case
// where both of flush and compaction are much slower than the speed we
// insert to mem tables, so we need to actively slow down before we get
// feedback signal from compaction and flushes to avoid the full stop
// because of hitting the max write buffer number.
if (prev_compaction_neeed_bytes > 0 &&
prev_compaction_neeed_bytes <= compaction_needed_bytes) {
write_rate /= kSlowdownRatio;
if (write_rate < kMinWriteRate) {
write_rate = kMinWriteRate;
}
} else if (prev_compaction_neeed_bytes > compaction_needed_bytes) {
// We are speeding up by ratio of kSlowdownRatio when we have paid
// compaction debt. But we'll never speed up to faster than the write rate
// given by users.
write_rate *= kSlowdownRatio;
if (write_rate > max_write_rate) {
write_rate = max_write_rate;
}
}
}
return write_controller->GetDelayToken(write_rate);
}
} // namespace
void ColumnFamilyData::RecalculateWriteStallConditions( void ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options) {
if (current_ != nullptr) { if (current_ != nullptr) {
auto* vstorage = current_->storage_info(); auto* vstorage = current_->storage_info();
auto write_controller = column_family_set_->write_controller_; auto write_controller = column_family_set_->write_controller_;
uint64_t compaction_needed_bytes =
vstorage->estimated_compaction_needed_bytes();
if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) { if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
write_controller_token_ = write_controller->GetStopToken(); write_controller_token_ = write_controller->GetStopToken();
@ -450,13 +504,18 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
} else if (mutable_cf_options.max_write_buffer_number > 3 && } else if (mutable_cf_options.max_write_buffer_number > 3 &&
imm()->NumNotFlushed() >= imm()->NumNotFlushed() >=
mutable_cf_options.max_write_buffer_number - 1) { mutable_cf_options.max_write_buffer_number - 1) {
write_controller_token_ = write_controller->GetDelayToken(); write_controller_token_ =
SetupDelay(ioptions_.delayed_write_rate, write_controller,
compaction_needed_bytes, prev_compaction_needed_bytes_,
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1); internal_stats_->AddCFStats(InternalStats::MEMTABLE_SLOWDOWN, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d immutable memtables " "[%s] Stalling writes because we have %d immutable memtables "
"(waiting for flush), max_write_buffer_number is set to %d", "(waiting for flush), max_write_buffer_number is set to %d "
"rate %" PRIu64,
name_.c_str(), imm()->NumNotFlushed(), name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number); mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate());
} else if (vstorage->l0_delay_trigger_count() >= } else if (vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger) { mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken(); write_controller_token_ = write_controller->GetStopToken();
@ -469,7 +528,7 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"[%s] Stopping writes because we have %d level-0 files", "[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count()); name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
vstorage->estimated_compaction_needed_bytes() >= compaction_needed_bytes >=
mutable_cf_options.hard_pending_compaction_bytes_limit) { mutable_cf_options.hard_pending_compaction_bytes_limit) {
write_controller_token_ = write_controller->GetStopToken(); write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats( internal_stats_->AddCFStats(
@ -477,32 +536,42 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because of estimated pending compaction " "[%s] Stopping writes because of estimated pending compaction "
"bytes %" PRIu64, "bytes %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes()); name_.c_str(), compaction_needed_bytes);
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->l0_delay_trigger_count() >= vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) { mutable_cf_options.level0_slowdown_writes_trigger) {
write_controller_token_ = write_controller->GetDelayToken(); write_controller_token_ =
SetupDelay(ioptions_.delayed_write_rate, write_controller,
compaction_needed_bytes, prev_compaction_needed_bytes_,
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1); internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN_TOTAL, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) { if (compaction_picker_->IsLevel0CompactionInProgress()) {
internal_stats_->AddCFStats( internal_stats_->AddCFStats(
InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1); InternalStats::LEVEL0_SLOWDOWN_WITH_COMPACTION, 1);
} }
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files", "[%s] Stalling writes because we have %d level-0 files "
name_.c_str(), vstorage->l0_delay_trigger_count()); "rate %" PRIu64,
name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate());
} else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
vstorage->estimated_compaction_needed_bytes() >= vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit) { mutable_cf_options.soft_pending_compaction_bytes_limit) {
write_controller_token_ = write_controller->GetDelayToken(); write_controller_token_ =
SetupDelay(ioptions_.delayed_write_rate, write_controller,
compaction_needed_bytes, prev_compaction_needed_bytes_,
mutable_cf_options.disable_auto_compactions);
internal_stats_->AddCFStats( internal_stats_->AddCFStats(
InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1); InternalStats::SOFT_PENDING_COMPACTION_BYTES_LIMIT, 1);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because of estimated pending compaction " "[%s] Stalling writes because of estimated pending compaction "
"bytes %" PRIu64, "bytes %" PRIu64 " rate %" PRIu64,
name_.c_str(), vstorage->estimated_compaction_needed_bytes()); name_.c_str(), vstorage->estimated_compaction_needed_bytes(),
write_controller->delayed_write_rate());
} else { } else {
write_controller_token_.reset(); write_controller_token_.reset();
} }
prev_compaction_needed_bytes_ = compaction_needed_bytes;
} }
} }

@ -44,6 +44,8 @@ class LogBuffer;
class InstrumentedMutex; class InstrumentedMutex;
class InstrumentedMutexLock; class InstrumentedMutexLock;
extern const double kSlowdownRatio;
// ColumnFamilyHandleImpl is the class that clients use to access different // ColumnFamilyHandleImpl is the class that clients use to access different
// column families. It has non-trivial destructor, which gets called when client // column families. It has non-trivial destructor, which gets called when client
// is done using the column family // is done using the column family
@ -305,6 +307,14 @@ class ColumnFamilyData {
bool pending_flush() { return pending_flush_; } bool pending_flush() { return pending_flush_; }
bool pending_compaction() { return pending_compaction_; } bool pending_compaction() { return pending_compaction_; }
// Recalculate some small conditions, which are changed only during
// compaction, adding new memtable and/or
// recalculation of compaction score. These values are used in
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
private: private:
friend class ColumnFamilySet; friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name, ColumnFamilyData(uint32_t id, const std::string& name,
@ -314,14 +324,6 @@ class ColumnFamilyData {
const DBOptions* db_options, const EnvOptions& env_options, const DBOptions* db_options, const EnvOptions& env_options,
ColumnFamilySet* column_family_set); ColumnFamilySet* column_family_set);
// Recalculate some small conditions, which are changed only during
// compaction, adding new memtable and/or
// recalculation of compaction score. These values are used in
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
void RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
uint32_t id_; uint32_t id_;
const std::string name_; const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions. Version* dummy_versions_; // Head of circular doubly-linked list of versions.
@ -382,6 +384,8 @@ class ColumnFamilyData {
// If true --> this ColumnFamily is currently present in // If true --> this ColumnFamily is currently present in
// DBImpl::compaction_queue_ // DBImpl::compaction_queue_
bool pending_compaction_; bool pending_compaction_;
uint64_t prev_compaction_needed_bytes_;
}; };
// ColumnFamilySet has interesting thread-safety requirements // ColumnFamilySet has interesting thread-safety requirements

@ -2127,6 +2127,274 @@ TEST_F(ColumnFamilyTest, CreateAndDropRace) {
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
const uint64_t kBaseRate = 810000u;
db_options_.delayed_write_rate = kBaseRate;
Open({"default"});
ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
VersionStorageInfo* vstorage = cfd->current()->storage_info();
MutableCFOptions mutable_cf_options(
Options(db_options_, column_family_options_),
ImmutableCFOptions(Options(db_options_, column_family_options_)));
mutable_cf_options.level0_slowdown_writes_trigger = 20;
mutable_cf_options.level0_stop_writes_trigger = 10000;
mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
vstorage->TEST_set_estimated_compaction_needed_bytes(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
vstorage->TEST_set_estimated_compaction_needed_bytes(201);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(400);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(500);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(450);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(205);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(202);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(201);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(198);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
vstorage->TEST_set_estimated_compaction_needed_bytes(399);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(599);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(2001);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
vstorage->TEST_set_estimated_compaction_needed_bytes(3001);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
vstorage->TEST_set_estimated_compaction_needed_bytes(390);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(100);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
vstorage->set_l0_delay_trigger_count(100);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(101);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(0);
vstorage->TEST_set_estimated_compaction_needed_bytes(300);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(101);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2 / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(200);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2 / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(0);
vstorage->TEST_set_estimated_compaction_needed_bytes(0);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
mutable_cf_options.disable_auto_compactions = true;
dbfull()->TEST_write_controler().set_delayed_write_rate(kBaseRate);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
vstorage->set_l0_delay_trigger_count(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(60);
vstorage->TEST_set_estimated_compaction_needed_bytes(300);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(70);
vstorage->TEST_set_estimated_compaction_needed_bytes(500);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
mutable_cf_options.disable_auto_compactions = false;
vstorage->set_l0_delay_trigger_count(71);
vstorage->TEST_set_estimated_compaction_needed_bytes(501);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
Close();
}
TEST_F(ColumnFamilyTest, WriteStallTwoColumnFamilies) {
const uint64_t kBaseRate = 810000u;
db_options_.delayed_write_rate = kBaseRate;
Open();
CreateColumnFamilies({"one"});
ColumnFamilyData* cfd =
static_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())->cfd();
VersionStorageInfo* vstorage = cfd->current()->storage_info();
ColumnFamilyData* cfd1 =
static_cast<ColumnFamilyHandleImpl*>(handles_[1])->cfd();
VersionStorageInfo* vstorage1 = cfd1->current()->storage_info();
MutableCFOptions mutable_cf_options(
Options(db_options_, column_family_options_),
ImmutableCFOptions(Options(db_options_, column_family_options_)));
mutable_cf_options.level0_slowdown_writes_trigger = 20;
mutable_cf_options.level0_stop_writes_trigger = 10000;
mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
MutableCFOptions mutable_cf_options1 = mutable_cf_options;
mutable_cf_options1.soft_pending_compaction_bytes_limit = 500;
vstorage->TEST_set_estimated_compaction_needed_bytes(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
vstorage1->TEST_set_estimated_compaction_needed_bytes(201);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(70);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage1->TEST_set_estimated_compaction_needed_bytes(800);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(300);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage1->TEST_set_estimated_compaction_needed_bytes(700);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->TEST_set_estimated_compaction_needed_bytes(500);
cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2 / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
vstorage1->TEST_set_estimated_compaction_needed_bytes(600);
cfd1->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate / 1.2,
dbfull()->TEST_write_controler().delayed_write_rate());
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -341,9 +341,7 @@ class DBImpl : public DB {
Cache* TEST_table_cache() { return table_cache_.get(); } Cache* TEST_table_cache() { return table_cache_.get(); }
const WriteController& TEST_write_controler() const { WriteController& TEST_write_controler() { return write_controller_; }
return write_controller_;
}
#endif // NDEBUG #endif // NDEBUG

@ -9042,47 +9042,68 @@ TEST_F(DBTest, FlushesInParallelWithCompactRange) {
} }
TEST_F(DBTest, DelayedWriteRate) { TEST_F(DBTest, DelayedWriteRate) {
const int kEntriesPerMemTable = 100;
const int kTotalFlushes = 20;
Options options; Options options;
env_->SetBackgroundThreads(1, Env::LOW);
options.env = env_; options.env = env_;
env_->no_sleep_ = true; env_->no_sleep_ = true;
options = CurrentOptions(options); options = CurrentOptions(options);
options.write_buffer_size = 100000; // Small write buffer options.write_buffer_size = 100000000;
options.max_write_buffer_number = 256; options.max_write_buffer_number = 256;
options.disable_auto_compactions = true; options.max_background_compactions = 1;
options.level0_file_num_compaction_trigger = 3; options.level0_file_num_compaction_trigger = 3;
options.level0_slowdown_writes_trigger = 3; options.level0_slowdown_writes_trigger = 3;
options.level0_stop_writes_trigger = 999999; options.level0_stop_writes_trigger = 999999;
options.delayed_write_rate = 200000; // About 200KB/s limited rate options.delayed_write_rate = 20000000; // Start with 200MB/s
options.memtable_factory.reset(
new SpecialSkipListFactory(kEntriesPerMemTable));
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
// Block compactions
test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW);
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
Put(Key(i), std::string(10000, 'x')); Put(Key(i), std::string(10000, 'x'));
Flush(); Flush();
} }
// These writes will be slowed down to 1KB/s // These writes will be slowed down to 1KB/s
size_t estimated_total_size = 0; uint64_t estimated_sleep_time = 0;
Random rnd(301); Random rnd(301);
for (int i = 0; i < 3000; i++) { Put("", "");
auto rand_num = rnd.Uniform(20); uint64_t cur_rate = options.delayed_write_rate;
// Spread the size range to more. for (int i = 0; i < kTotalFlushes; i++) {
size_t entry_size = rand_num * rand_num * rand_num; uint64_t size_memtable = 0;
WriteOptions wo; for (int j = 0; j < kEntriesPerMemTable; j++) {
Put(Key(i), std::string(entry_size, 'x'), wo); auto rand_num = rnd.Uniform(20);
estimated_total_size += entry_size + 20; // Spread the size range to more.
// Occasionally sleep a while size_t entry_size = rand_num * rand_num * rand_num;
if (rnd.Uniform(20) == 6) { WriteOptions wo;
env_->SleepForMicroseconds(2666); Put(Key(i), std::string(entry_size, 'x'), wo);
size_memtable += entry_size + 18;
// Occasionally sleep a while
if (rnd.Uniform(20) == 6) {
env_->SleepForMicroseconds(2666);
}
} }
dbfull()->TEST_WaitForFlushMemTable();
estimated_sleep_time += size_memtable * 1000000u / cur_rate;
// Slow down twice. One for memtable switch and one for flush finishes.
cur_rate /= kSlowdownRatio * kSlowdownRatio;
} }
uint64_t estimated_sleep_time = // Estimate the total sleep time fall into the rough range.
estimated_total_size / options.delayed_write_rate * 1000000U; ASSERT_GT(env_->addon_time_.load(), estimated_sleep_time / 2);
ASSERT_GT(env_->addon_time_.load(), estimated_sleep_time * 0.8); ASSERT_LT(env_->addon_time_.load(), estimated_sleep_time * 2);
ASSERT_LT(env_->addon_time_.load(), estimated_sleep_time * 1.1);
env_->no_sleep_ = false; env_->no_sleep_ = false;
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone();
} }
TEST_F(DBTest, HardLimit) { TEST_F(DBTest, HardLimit) {

@ -1119,6 +1119,7 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded(
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options) {
// Only implemented for level-based compaction // Only implemented for level-based compaction
if (compaction_style_ != kCompactionStyleLevel) { if (compaction_style_ != kCompactionStyleLevel) {
estimated_compaction_needed_bytes_ = 0;
return; return;
} }

@ -325,6 +325,10 @@ class VersionStorageInfo {
return estimated_compaction_needed_bytes_; return estimated_compaction_needed_bytes_;
} }
void TEST_set_estimated_compaction_needed_bytes(uint64_t v) {
estimated_compaction_needed_bytes_ = v;
}
private: private:
const InternalKeyComparator* internal_comparator_; const InternalKeyComparator* internal_comparator_;
const Comparator* user_comparator_; const Comparator* user_comparator_;

@ -16,11 +16,13 @@ std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this)); return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
} }
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken() { std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
if (total_delayed_++ == 0) { uint64_t write_rate) {
last_refill_time_ = 0; total_delayed_++;
bytes_left_ = 0; // Reset counters.
} last_refill_time_ = 0;
bytes_left_ = 0;
set_delayed_write_rate(write_rate);
return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this)); return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
} }

@ -20,12 +20,12 @@ class WriteControllerToken;
// to be called while holding DB mutex // to be called while holding DB mutex
class WriteController { class WriteController {
public: public:
explicit WriteController(uint64_t delayed_write_rate = 1024u * 1024u * 32u) explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u)
: total_stopped_(0), : total_stopped_(0),
total_delayed_(0), total_delayed_(0),
bytes_left_(0), bytes_left_(0),
last_refill_time_(0) { last_refill_time_(0) {
set_delayed_write_rate(delayed_write_rate); set_delayed_write_rate(_delayed_write_rate);
} }
~WriteController() = default; ~WriteController() = default;
@ -36,7 +36,8 @@ class WriteController {
// writes to the DB will be controlled under the delayed write rate. Every // writes to the DB will be controlled under the delayed write rate. Every
// write needs to call GetDelay() with number of bytes writing to the DB, // write needs to call GetDelay() with number of bytes writing to the DB,
// which returns number of microseconds to sleep. // which returns number of microseconds to sleep.
std::unique_ptr<WriteControllerToken> GetDelayToken(); std::unique_ptr<WriteControllerToken> GetDelayToken(
uint64_t delayed_write_rate);
// these two metods are querying the state of the WriteController // these two metods are querying the state of the WriteController
bool IsStopped() const; bool IsStopped() const;
@ -45,13 +46,14 @@ class WriteController {
// num_bytes: how many number of bytes to put into the DB. // num_bytes: how many number of bytes to put into the DB.
// Prerequisite: DB mutex held. // Prerequisite: DB mutex held.
uint64_t GetDelay(Env* env, uint64_t num_bytes); uint64_t GetDelay(Env* env, uint64_t num_bytes);
void set_delayed_write_rate(uint64_t delayed_write_rate) { void set_delayed_write_rate(uint64_t write_rate) {
delayed_write_rate_ = delayed_write_rate; // avoid divide 0
if (delayed_write_rate_ == 0) { if (write_rate == 0) {
// avoid divide 0 write_rate = 1u;
delayed_write_rate_ = 1U;
} }
delayed_write_rate_ = write_rate;
} }
uint64_t delayed_write_rate() const { return delayed_write_rate_; }
private: private:
friend class WriteControllerToken; friend class WriteControllerToken;

@ -19,6 +19,28 @@ class TimeSetEnv : public EnvWrapper {
virtual uint64_t NowMicros() override { return now_micros_; } virtual uint64_t NowMicros() override { return now_micros_; }
}; };
TEST_F(WriteControllerTest, ChangeDelayRateTest) {
TimeSetEnv env;
WriteController controller(10000000u);
auto delay_token_0 =
controller.GetDelayToken(controller.delayed_write_rate());
ASSERT_EQ(static_cast<uint64_t>(2000000),
controller.GetDelay(&env, 20000000u));
auto delay_token_1 = controller.GetDelayToken(2000000u);
ASSERT_EQ(static_cast<uint64_t>(10000000),
controller.GetDelay(&env, 20000000u));
auto delay_token_2 = controller.GetDelayToken(1000000u);
ASSERT_EQ(static_cast<uint64_t>(20000000),
controller.GetDelay(&env, 20000000u));
auto delay_token_3 = controller.GetDelayToken(20000000u);
ASSERT_EQ(static_cast<uint64_t>(1000000),
controller.GetDelay(&env, 20000000u));
auto delay_token_4 =
controller.GetDelayToken(controller.delayed_write_rate() * 2);
ASSERT_EQ(static_cast<uint64_t>(500000),
controller.GetDelay(&env, 20000000u));
}
TEST_F(WriteControllerTest, SanityTest) { TEST_F(WriteControllerTest, SanityTest) {
WriteController controller(10000000u); WriteController controller(10000000u);
auto stop_token_1 = controller.GetStopToken(); auto stop_token_1 = controller.GetStopToken();
@ -32,12 +54,19 @@ TEST_F(WriteControllerTest, SanityTest) {
TimeSetEnv env; TimeSetEnv env;
auto delay_token_1 = controller.GetDelayToken(); auto delay_token_1 = controller.GetDelayToken(10000000u);
ASSERT_EQ(static_cast<uint64_t>(2000000),
controller.GetDelay(&env, 20000000u));
env.now_micros_ += 1999900u; // sleep debt 1000
auto delay_token_2 = controller.GetDelayToken(10000000u);
// Rate reset after changing the token.
ASSERT_EQ(static_cast<uint64_t>(2000000), ASSERT_EQ(static_cast<uint64_t>(2000000),
controller.GetDelay(&env, 20000000u)); controller.GetDelay(&env, 20000000u));
env.now_micros_ += 1999900u; // sleep debt 1000 env.now_micros_ += 1999900u; // sleep debt 1000
auto delay_token_2 = controller.GetDelayToken();
// One refill: 10240 bytes allowed, 1000 used, 9240 left // One refill: 10240 bytes allowed, 1000 used, 9240 left
ASSERT_EQ(static_cast<uint64_t>(1124), controller.GetDelay(&env, 1000u)); ASSERT_EQ(static_cast<uint64_t>(1124), controller.GetDelay(&env, 1000u));
env.now_micros_ += 1124u; // sleep debt 0 env.now_micros_ += 1124u; // sleep debt 0

@ -50,6 +50,8 @@ struct ImmutableCFOptions {
Env* env; Env* env;
uint64_t delayed_write_rate;
// Allow the OS to mmap file for reading sst tables. Default: false // Allow the OS to mmap file for reading sst tables. Default: false
bool allow_mmap_reads; bool allow_mmap_reads;

@ -1162,9 +1162,12 @@ struct DBOptions {
// Default: false // Default: false
bool enable_thread_tracking; bool enable_thread_tracking;
// The limited write rate to DB if soft_rate_limit or // The limited write rate to DB if soft_pending_compaction_bytes_limit or
// level0_slowdown_writes_trigger is triggered. It is calculated using // level0_slowdown_writes_trigger is triggered, or we are writing to the
// size of user write requests before compression. // last mem table allowed and we allow more than 3 mem tables. It is
// calculated using size of user write requests before compression.
// RocksDB may decide to slow down more if the compaction still
// gets behind further.
// Unit: byte per second. // Unit: byte per second.
// //
// Default: 1MB/s // Default: 1MB/s

@ -51,6 +51,7 @@ ImmutableCFOptions::ImmutableCFOptions(const Options& options)
info_log(options.info_log.get()), info_log(options.info_log.get()),
statistics(options.statistics.get()), statistics(options.statistics.get()),
env(options.env), env(options.env),
delayed_write_rate(options.delayed_write_rate),
allow_mmap_reads(options.allow_mmap_reads), allow_mmap_reads(options.allow_mmap_reads),
allow_mmap_writes(options.allow_mmap_writes), allow_mmap_writes(options.allow_mmap_writes),
db_paths(options.db_paths), db_paths(options.db_paths),

Loading…
Cancel
Save