Slow down writes by bytes written

Summary:
We slow down data into the database to the rate of options.delayed_write_rate (a new option) with this patch.

The thread synchronization approach I take is to still synchronize write controller by DB mutex and GetDelay() is inside DB mutex. Try to minimize the frequency of getting time in GetDelay(). I verified it through db_bench and it seems to work

hard_rate_limit is deprecated.

options.delayed_write_rate is still not dynamically changeable. Need to work on it as a follow-up.

Test Plan: Add new unit tests in db_test

Reviewers: yhchiang, rven, kradhakrishnan, anthony, MarkCallaghan, igor

Reviewed By: igor

Subscribers: ikabiljo, leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D36351
main
sdong 9 years ago
parent a84df655f3
commit 7842920be5
  1. 2
      HISTORY.md
  2. 67
      db/column_family.cc
  3. 5
      db/db_bench.cc
  4. 38
      db/db_impl.cc
  5. 10
      db/db_impl.h
  6. 279
      db/db_test.cc
  7. 2
      db/memtable_list_test.cc
  8. 4
      db/version_set.cc
  9. 86
      db/write_controller.cc
  10. 41
      db/write_controller.h
  11. 73
      db/write_controller_test.cc
  12. 8
      db/write_thread.cc
  13. 5
      db/write_thread.h
  14. 19
      include/rocksdb/options.h
  15. 4
      util/ldb_cmd.cc
  16. 6
      util/options.cc

@ -12,6 +12,8 @@
* Change default value for options.compaction_filter_factory and options.compaction_filter_factory_v2 to nullptr instead of DefaultCompactionFilterFactory and DefaultCompactionFilterFactoryV2. * Change default value for options.compaction_filter_factory and options.compaction_filter_factory_v2 to nullptr instead of DefaultCompactionFilterFactory and DefaultCompactionFilterFactoryV2.
* If CancelAllBackgroundWork is called without doing a flush after doing loads with WAL disabled, the changes which haven't been flushed before the call to CancelAllBackgroundWork will be lost. * If CancelAllBackgroundWork is called without doing a flush after doing loads with WAL disabled, the changes which haven't been flushed before the call to CancelAllBackgroundWork will be lost.
* WBWIIterator::Entry() now returns WriteEntry instead of `const WriteEntry&` * WBWIIterator::Entry() now returns WriteEntry instead of `const WriteEntry&`
* options.hard_rate_limit is deprecated.
* When options.soft_rate_limit or options.level0_slowdown_writes_trigger is triggered, the way to slow down writes is changed to: write rate to DB is limited to to options.delayed_write_rate.
## 3.11.0 (5/19/2015) ## 3.11.0 (5/19/2015)
### New Features ### New Features

@ -36,36 +36,6 @@
namespace rocksdb { namespace rocksdb {
namespace {
// This function computes the amount of time in microseconds by which a write
// should be delayed based on the number of level-0 files according to the
// following formula:
// if n < bottom, return 0;
// if n >= top, return 1000;
// otherwise, let r = (n - bottom) /
// (top - bottom)
// and return r^2 * 1000.
// The goal of this formula is to gradually increase the rate at which writes
// are slowed. We also tried linear delay (r * 1000), but it seemed to do
// slightly worse. There is no other particular reason for choosing quadratic.
uint64_t SlowdownAmount(int n, double bottom, double top) {
uint64_t delay;
if (n >= top) {
delay = 1000;
} else if (n < bottom) {
delay = 0;
} else {
// If we are here, we know that:
// level0_start_slowdown <= n < level0_slowdown
// since the previous two conditions are false.
double how_much = static_cast<double>(n - bottom) / (top - bottom);
delay = std::max(how_much * how_much * 1000, 100.0);
}
assert(delay <= 1000);
return delay;
}
} // namespace
ColumnFamilyHandleImpl::ColumnFamilyHandleImpl( ColumnFamilyHandleImpl::ColumnFamilyHandleImpl(
ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex) ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex)
: cfd_(column_family_data), db_(db), mutex_(mutex) { : cfd_(column_family_data), db_(db), mutex_(mutex) {
@ -147,9 +117,6 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options,
if (result.max_mem_compaction_level >= result.num_levels) { if (result.max_mem_compaction_level >= result.num_levels) {
result.max_mem_compaction_level = result.num_levels - 1; result.max_mem_compaction_level = result.num_levels - 1;
} }
if (result.soft_rate_limit > result.hard_rate_limit) {
result.soft_rate_limit = result.hard_rate_limit;
}
if (result.max_write_buffer_number < 2) { if (result.max_write_buffer_number < 2) {
result.max_write_buffer_number = 2; result.max_write_buffer_number = 2;
} }
@ -456,7 +423,6 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
auto* vstorage = current_->storage_info(); auto* vstorage = current_->storage_info();
const double score = vstorage->max_compaction_score(); const double score = vstorage->max_compaction_score();
const int max_level = vstorage->max_compaction_score_level(); const int max_level = vstorage->max_compaction_score_level();
auto write_controller = column_family_set_->write_controller_; auto write_controller = column_family_set_->write_controller_;
if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) { if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) {
@ -477,37 +443,18 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->l0_delay_trigger_count() >= vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) { mutable_cf_options.level0_slowdown_writes_trigger) {
uint64_t slowdown = write_controller_token_ = write_controller->GetDelayToken();
SlowdownAmount(vstorage->l0_delay_trigger_count(), internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, 1);
mutable_cf_options.level0_slowdown_writes_trigger,
mutable_cf_options.level0_stop_writes_trigger);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we have %d level-0 files (%" PRIu64 "[%s] Stalling writes because we have %d level-0 files",
"us)", name_.c_str(), vstorage->l0_delay_trigger_count());
name_.c_str(), vstorage->l0_delay_trigger_count(), slowdown);
} else if (mutable_cf_options.hard_rate_limit > 1.0 &&
score > mutable_cf_options.hard_rate_limit) {
uint64_t kHardLimitSlowdown = 1000;
write_controller_token_ =
write_controller->GetDelayToken(kHardLimitSlowdown);
internal_stats_->RecordLevelNSlowdown(max_level, false);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we hit hard limit on level %d. "
"(%" PRIu64 "us)",
name_.c_str(), max_level, kHardLimitSlowdown);
} else if (mutable_cf_options.soft_rate_limit > 0.0 && } else if (mutable_cf_options.soft_rate_limit > 0.0 &&
score > mutable_cf_options.soft_rate_limit) { score > mutable_cf_options.soft_rate_limit) {
uint64_t slowdown = SlowdownAmount(score, write_controller_token_ = write_controller->GetDelayToken();
mutable_cf_options.soft_rate_limit,
mutable_cf_options.hard_rate_limit);
write_controller_token_ = write_controller->GetDelayToken(slowdown);
internal_stats_->RecordLevelNSlowdown(max_level, true); internal_stats_->RecordLevelNSlowdown(max_level, true);
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64 "[%s] Stalling writes because we hit soft limit on level %d",
"us)", name_.c_str(), max_level);
name_.c_str(), max_level, slowdown);
} else { } else {
write_controller_token_.reset(); write_controller_token_.reset();
} }

@ -543,6 +543,10 @@ DEFINE_double(hard_rate_limit, 0.0, "When not equal to 0 this make threads "
"sleep at each stats reporting interval until the compaction" "sleep at each stats reporting interval until the compaction"
" score for all levels is less than or equal to this value."); " score for all levels is less than or equal to this value.");
DEFINE_uint64(delayed_write_rate, 2097152u,
"Limited bytes allowed to DB when soft_rate_limit or "
"level0_slowdown_writes_trigger triggers");
DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_int32(rate_limit_delay_max_milliseconds, 1000,
"When hard_rate_limit is set then this is the max time a put will" "When hard_rate_limit is set then this is the max time a put will"
" be stalled."); " be stalled.");
@ -2270,6 +2274,7 @@ class Benchmark {
} }
options.soft_rate_limit = FLAGS_soft_rate_limit; options.soft_rate_limit = FLAGS_soft_rate_limit;
options.hard_rate_limit = FLAGS_hard_rate_limit; options.hard_rate_limit = FLAGS_hard_rate_limit;
options.delayed_write_rate = FLAGS_delayed_write_rate;
options.rate_limit_delay_max_milliseconds = options.rate_limit_delay_max_milliseconds =
FLAGS_rate_limit_delay_max_milliseconds; FLAGS_rate_limit_delay_max_milliseconds;
options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; options.table_cache_numshardbits = FLAGS_table_cache_numshardbits;

@ -225,6 +225,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
max_total_in_memory_state_(0), max_total_in_memory_state_(0),
is_snapshot_supported_(true), is_snapshot_supported_(true),
write_buffer_(options.db_write_buffer_size), write_buffer_(options.db_write_buffer_size),
write_controller_(options.delayed_write_rate),
last_batch_group_size_(0),
unscheduled_flushes_(0), unscheduled_flushes_(0),
unscheduled_compactions_(0), unscheduled_compactions_(0),
bg_compaction_scheduled_(0), bg_compaction_scheduled_(0),
@ -1691,6 +1693,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log,
"[%s] SetOptions failed", cfd->GetName().c_str()); "[%s] SetOptions failed", cfd->GetName().c_str());
} }
LogFlush(db_options_.info_log);
return s; return s;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
@ -3409,8 +3412,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
status = ScheduleFlushes(&context); status = ScheduleFlushes(&context);
} }
if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || if (UNLIKELY(status.ok()) &&
write_controller_.GetDelay() > 0))) { (write_controller_.IsStopped() || write_controller_.NeedsDelay())) {
// If writer is stopped, we need to get it going, // If writer is stopped, we need to get it going,
// so schedule flushes/compactions // so schedule flushes/compactions
if (context.schedule_bg_work_) { if (context.schedule_bg_work_) {
@ -3418,7 +3421,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time); PERF_TIMER_GUARD(write_delay_time);
status = DelayWrite(expiration_time); // We don't know size of curent batch so that we always use the size
// for previous one. It might create a fairness issue that expiration
// might happen for smaller writes but larger writes can go through.
// Can optimize it if it is an issue.
status = DelayWrite(last_batch_group_size_, expiration_time);
PERF_TIMER_START(write_pre_and_post_process_time); PERF_TIMER_START(write_pre_and_post_process_time);
} }
@ -3432,7 +3439,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
autovector<WriteBatch*> write_batch_group; autovector<WriteBatch*> write_batch_group;
if (status.ok()) { if (status.ok()) {
write_thread_.BuildBatchGroup(&last_writer, &write_batch_group); last_batch_group_size_ =
write_thread_.BuildBatchGroup(&last_writer, &write_batch_group);
// Add to log and apply to memtable. We can release the lock // Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging // during this phase since &w is currently responsible for logging
@ -3566,24 +3574,36 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::DelayWrite(uint64_t expiration_time) { Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) {
uint64_t time_delayed = 0; uint64_t time_delayed = 0;
bool delayed = false; bool delayed = false;
bool timed_out = false; bool timed_out = false;
{ {
StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed); StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed);
bool has_timeout = (expiration_time > 0); bool has_timeout = (expiration_time > 0);
auto delay = write_controller_.GetDelay(); auto delay = write_controller_.GetDelay(env_, num_bytes);
if (write_controller_.IsStopped() == false && delay > 0) { if (delay > 0) {
mutex_.Unlock(); mutex_.Unlock();
delayed = true; delayed = true;
// hopefully we don't have to sleep more than 2 billion microseconds // hopefully we don't have to sleep more than 2 billion microseconds
TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep");
env_->SleepForMicroseconds(static_cast<int>(delay)); if (has_timeout) {
auto time_now = env_->NowMicros();
if (time_now + delay >= expiration_time) {
if (expiration_time > time_now) {
env_->SleepForMicroseconds(
static_cast<int>(expiration_time - time_now));
}
timed_out = true;
}
}
if (!timed_out) {
env_->SleepForMicroseconds(static_cast<int>(delay));
}
mutex_.Lock(); mutex_.Lock();
} }
while (bg_error_.ok() && write_controller_.IsStopped()) { while (!timed_out && bg_error_.ok() && write_controller_.IsStopped()) {
delayed = true; delayed = true;
if (has_timeout) { if (has_timeout) {
TEST_SYNC_POINT("DBImpl::DelayWrite:TimedWait"); TEST_SYNC_POINT("DBImpl::DelayWrite:TimedWait");

@ -433,7 +433,10 @@ class DBImpl : public DB {
// concurrent flush memtables to storage. // concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit); MemTable* mem, VersionEdit* edit);
Status DelayWrite(uint64_t expiration_time);
// num_bytes: for slowdown case, delay time is calculated based on
// `num_bytes` going through.
Status DelayWrite(uint64_t num_bytes, uint64_t expiration_time);
Status ScheduleFlushes(WriteContext* context); Status ScheduleFlushes(WriteContext* context);
@ -582,6 +585,11 @@ class DBImpl : public DB {
WriteBatch tmp_batch_; WriteBatch tmp_batch_;
WriteController write_controller_; WriteController write_controller_;
// Size of the last batch group. In slowdown mode, next write needs to
// sleep if it uses up the quota.
uint64_t last_batch_group_size_;
FlushScheduler flush_scheduler_; FlushScheduler flush_scheduler_;
SnapshotList snapshots_; SnapshotList snapshots_;

@ -154,9 +154,11 @@ class SpecialEnv : public EnvWrapper {
std::function<void()>* table_write_callback_; std::function<void()>* table_write_callback_;
int64_t addon_time_; std::atomic<int64_t> addon_time_;
bool no_sleep_;
explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301), addon_time_(0) { explicit SpecialEnv(Env* base)
: EnvWrapper(base), rnd_(301), addon_time_(0), no_sleep_(false) {
delay_sstable_sync_.store(false, std::memory_order_release); delay_sstable_sync_.store(false, std::memory_order_release);
drop_writes_.store(false, std::memory_order_release); drop_writes_.store(false, std::memory_order_release);
no_space_.store(false, std::memory_order_release); no_space_.store(false, std::memory_order_release);
@ -358,19 +360,27 @@ class SpecialEnv : public EnvWrapper {
virtual void SleepForMicroseconds(int micros) override { virtual void SleepForMicroseconds(int micros) override {
sleep_counter_.Increment(); sleep_counter_.Increment();
target()->SleepForMicroseconds(micros); if (no_sleep_) {
addon_time_.fetch_add(micros);
} else {
target()->SleepForMicroseconds(micros);
}
} }
virtual Status GetCurrentTime(int64_t* unix_time) override { virtual Status GetCurrentTime(int64_t* unix_time) override {
Status s = target()->GetCurrentTime(unix_time); Status s = target()->GetCurrentTime(unix_time);
if (s.ok()) { if (s.ok()) {
*unix_time += addon_time_; *unix_time += addon_time_.load();
} }
return s; return s;
} }
virtual uint64_t NowNanos() override { virtual uint64_t NowNanos() override {
return target()->NowNanos() + addon_time_ * 1000; return target()->NowNanos() + addon_time_.load() * 1000;
}
virtual uint64_t NowMicros() override {
return target()->NowMicros() + addon_time_.load();
} }
}; };
@ -639,8 +649,8 @@ class DBTest : public testing::Test {
!options.purge_redundant_kvs_while_flush; !options.purge_redundant_kvs_while_flush;
break; break;
case kPerfOptions: case kPerfOptions:
options.hard_rate_limit = 2.0; options.soft_rate_limit = 2.0;
options.rate_limit_delay_max_milliseconds = 2; options.delayed_write_rate = 8 * 1024 * 1024;
// TODO -- test more options // TODO -- test more options
break; break;
case kDeletesFilterFirst: case kDeletesFilterFirst:
@ -4134,7 +4144,7 @@ class DelayFilter : public CompactionFilter {
virtual bool Filter(int level, const Slice& key, const Slice& value, virtual bool Filter(int level, const Slice& key, const Slice& value,
std::string* new_value, std::string* new_value,
bool* value_changed) const override { bool* value_changed) const override {
db_test->env_->addon_time_ += 1000; db_test->env_->addon_time_.fetch_add(1000);
return true; return true;
} }
@ -6704,7 +6714,7 @@ TEST_F(DBTest, Snapshot) {
Put(0, "foo", "0v2"); Put(0, "foo", "0v2");
Put(1, "foo", "1v2"); Put(1, "foo", "1v2");
env_->addon_time_++; env_->addon_time_.fetch_add(1);
const Snapshot* s2 = db_->GetSnapshot(); const Snapshot* s2 = db_->GetSnapshot();
ASSERT_EQ(2U, GetNumSnapshots()); ASSERT_EQ(2U, GetNumSnapshots());
@ -11200,7 +11210,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase) {
options.level_compaction_dynamic_level_bytes = true; options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 10240; options.max_bytes_for_level_base = 10240;
options.max_bytes_for_level_multiplier = 4; options.max_bytes_for_level_multiplier = 4;
options.hard_rate_limit = 1.1; options.soft_rate_limit = 1.1;
options.max_background_compactions = max_background_compactions; options.max_background_compactions = max_background_compactions;
options.num_levels = 5; options.num_levels = 5;
@ -11493,7 +11503,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBaseInc) {
options.level_compaction_dynamic_level_bytes = true; options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 10240; options.max_bytes_for_level_base = 10240;
options.max_bytes_for_level_multiplier = 4; options.max_bytes_for_level_multiplier = 4;
options.hard_rate_limit = 1.1; options.soft_rate_limit = 1.1;
options.max_background_compactions = 2; options.max_background_compactions = 2;
options.num_levels = 5; options.num_levels = 5;
@ -11545,7 +11555,7 @@ TEST_F(DBTest, MigrateToDynamicLevelMaxBytesBase) {
options.level_compaction_dynamic_level_bytes = false; options.level_compaction_dynamic_level_bytes = false;
options.max_bytes_for_level_base = 10240; options.max_bytes_for_level_base = 10240;
options.max_bytes_for_level_multiplier = 4; options.max_bytes_for_level_multiplier = 4;
options.hard_rate_limit = 1.1; options.soft_rate_limit = 1.1;
options.num_levels = 8; options.num_levels = 8;
DestroyAndReopen(options); DestroyAndReopen(options);
@ -11743,7 +11753,7 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
options.level0_file_num_compaction_trigger = 2; options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2; options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2; options.level0_stop_writes_trigger = 2;
options.hard_rate_limit = 1.1; options.soft_rate_limit = 1.1;
// Use file size to distinguish levels // Use file size to distinguish levels
// L1: 10, L2: 20, L3 40, L4 80 // L1: 10, L2: 20, L3 40, L4 80
@ -11853,7 +11863,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
options.env = env_; options.env = env_;
options.create_if_missing = true; options.create_if_missing = true;
options.compression = kNoCompression; options.compression = kNoCompression;
options.hard_rate_limit = 1.1; options.soft_rate_limit = 1.1;
options.write_buffer_size = k64KB; options.write_buffer_size = k64KB;
options.max_write_buffer_number = 2; options.max_write_buffer_number = 2;
// Compaction related options // Compaction related options
@ -12042,73 +12052,6 @@ TEST_F(DBTest, DynamicCompactionOptions) {
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
ASSERT_LT(NumTableFilesAtLevel(0), 4); ASSERT_LT(NumTableFilesAtLevel(0), 4);
// Test for hard_rate_limit.
// First change max_bytes_for_level_base to a big value and populate
// L1 - L3. Then thrink max_bytes_for_level_base and disable auto compaction
// at the same time, we should see some level with score greater than 2.
ASSERT_OK(dbfull()->SetOptions({
{"max_bytes_for_level_base", ToString(k1MB) }
}));
// writing 40 x 64KB = 10 x 256KB
// (L1 + L2 + L3) = (1 + 2 + 4) * 256KB
for (int i = 0; i < 40; ++i) {
gen_l0_kb(i, 64, 32);
}
dbfull()->TEST_WaitForCompact();
ASSERT_TRUE((SizeAtLevel(1) > k1MB * 0.8 &&
SizeAtLevel(1) < k1MB * 1.2) ||
(SizeAtLevel(2) > 2 * k1MB * 0.8 &&
SizeAtLevel(2) < 2 * k1MB * 1.2) ||
(SizeAtLevel(3) > 4 * k1MB * 0.8 &&
SizeAtLevel(3) < 4 * k1MB * 1.2));
// Reduce max_bytes_for_level_base and disable compaction at the same time
// This should cause score to increase
ASSERT_OK(dbfull()->SetOptions({
{"disable_auto_compactions", "true"},
{"max_bytes_for_level_base", "65536"},
}));
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024)));
dbfull()->TEST_FlushMemTable(true);
// Check score is above 2
ASSERT_TRUE(SizeAtLevel(1) / k64KB > 2 ||
SizeAtLevel(2) / k64KB > 4 ||
SizeAtLevel(3) / k64KB > 8);
// Enfoce hard rate limit. Now set hard_rate_limit to 2,
// we should start to see put delay (1000 us) and timeout as a result
// (L0 score is not regulated by this limit).
ASSERT_OK(dbfull()->SetOptions({
{"hard_rate_limit", "2"},
{"level0_slowdown_writes_trigger", "18"},
{"level0_stop_writes_trigger", "20"}
}));
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024)));
dbfull()->TEST_FlushMemTable(true);
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep", [&](void* arg) { sleep_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// Hard rate limit slow down for 1000 us, so default 10ms should be ok
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
sleep_count.store(0);
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
ASSERT_GT(sleep_count.load(), 0);
// Lift the limit and no timeout
ASSERT_OK(dbfull()->SetOptions({
{"hard_rate_limit", "200"},
}));
dbfull()->TEST_FlushMemTable(true);
sleep_count.store(0);
ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
// Technically, time out is still possible for timing issue.
ASSERT_EQ(sleep_count.load(), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
// Test max_mem_compaction_level. // Test max_mem_compaction_level.
// Destroy DB and start from scratch // Destroy DB and start from scratch
options.max_background_compactions = 1; options.max_background_compactions = 1;
@ -12784,7 +12727,7 @@ class DelayedMergeOperator : public AssociativeMergeOperator {
virtual bool Merge(const Slice& key, const Slice* existing_value, virtual bool Merge(const Slice& key, const Slice* existing_value,
const Slice& value, std::string* new_value, const Slice& value, std::string* new_value,
Logger* logger) const override { Logger* logger) const override {
db_test_->env_->addon_time_ += 1000; db_test_->env_->addon_time_.fetch_add(1000);
return true; return true;
} }
@ -12799,7 +12742,7 @@ TEST_F(DBTest, MergeTestTime) {
// Enable time profiling // Enable time profiling
SetPerfLevel(kEnableTime); SetPerfLevel(kEnableTime);
this->env_->addon_time_ = 0; this->env_->addon_time_.store(0);
Options options; Options options;
options = CurrentOptions(options); options = CurrentOptions(options);
options.statistics = rocksdb::CreateDBStatistics(); options.statistics = rocksdb::CreateDBStatistics();
@ -13070,7 +13013,7 @@ TEST_F(DBTest, TablePropertiesNeedCompactTest) {
options.target_file_size_base = 2048; options.target_file_size_base = 2048;
options.max_bytes_for_level_base = 10240; options.max_bytes_for_level_base = 10240;
options.max_bytes_for_level_multiplier = 4; options.max_bytes_for_level_multiplier = 4;
options.hard_rate_limit = 1.1; options.soft_rate_limit = 1.1;
options.num_levels = 8; options.num_levels = 8;
std::shared_ptr<TablePropertiesCollectorFactory> collector_factory( std::shared_ptr<TablePropertiesCollectorFactory> collector_factory(
@ -13496,6 +13439,174 @@ TEST_F(DBTest, SuggestCompactRangeNoTwoLevel0Compactions) {
TEST_SYNC_POINT("DBTest::SuggestCompactRangeNoTwoLevel0Compactions:2"); TEST_SYNC_POINT("DBTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
} }
TEST_F(DBTest, DelayedWriteRate) {
Options options;
options.env = env_;
env_->no_sleep_ = true;
options = CurrentOptions(options);
options.write_buffer_size = 100000; // Small write buffer
options.max_write_buffer_number = 256;
options.disable_auto_compactions = true;
options.level0_file_num_compaction_trigger = 3;
options.level0_slowdown_writes_trigger = 3;
options.level0_stop_writes_trigger = 999999;
options.delayed_write_rate = 200000; // About 200KB/s limited rate
CreateAndReopenWithCF({"pikachu"}, options);
for (int i = 0; i < 3; i++) {
Put(Key(i), std::string(10000, 'x'));
Flush();
}
// These writes will be slowed down to 1KB/s
size_t estimated_total_size = 0;
Random rnd(301);
for (int i = 0; i < 3000; i++) {
auto rand_num = rnd.Uniform(20);
// Spread the size range to more.
size_t entry_size = rand_num * rand_num * rand_num;
WriteOptions wo;
// For a small chance, set a timeout.
if (rnd.Uniform(20) == 6) {
wo.timeout_hint_us = 1500;
}
Put(Key(i), std::string(entry_size, 'x'), wo);
estimated_total_size += entry_size + 20;
// Ocassionally sleep a while
if (rnd.Uniform(20) == 6) {
env_->SleepForMicroseconds(2666);
}
}
uint64_t estimated_sleep_time =
estimated_total_size / options.delayed_write_rate * 1000000U;
ASSERT_GT(env_->addon_time_.load(), estimated_sleep_time * 0.8);
ASSERT_LT(env_->addon_time_.load(), estimated_sleep_time * 1.1);
env_->no_sleep_ = false;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBTest, SoftLimit) {
Options options;
options.env = env_;
options = CurrentOptions(options);
options.write_buffer_size = 100000; // Small write buffer
options.max_write_buffer_number = 256;
options.level0_file_num_compaction_trigger = 3;
options.level0_slowdown_writes_trigger = 3;
options.level0_stop_writes_trigger = 999999;
options.delayed_write_rate = 200000; // About 200KB/s limited rate
options.soft_rate_limit = 1.1;
options.target_file_size_base = 99999999; // All into one file
options.max_bytes_for_level_base = 50000;
options.compression = kNoCompression;
Reopen(options);
Put(Key(0), "");
// Only allow two compactions
port::Mutex mut;
port::CondVar cv(&mut);
std::atomic<int> compaction_cnt(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest", [&](void* arg) {
// Three flushes and the first compaction,
// three flushes and the second compaction go through.
MutexLock l(&mut);
while (compaction_cnt.load() >= 8) {
cv.Wait();
}
compaction_cnt.fetch_add(1);
});
std::atomic<int> sleep_count(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DelayWrite:Sleep", [&](void* arg) { sleep_count.fetch_add(1); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 3; i++) {
Put(Key(i), std::string(5000, 'x'));
Put(Key(100 - i), std::string(5000, 'x'));
Flush();
}
while (compaction_cnt.load() < 4 || NumTableFilesAtLevel(0) > 0) {
env_->SleepForMicroseconds(1000);
}
// Now there is one L1 file but doesn't trigger soft_rate_limit
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_EQ(sleep_count.load(), 0);
for (int i = 0; i < 3; i++) {
Put(Key(10 + i), std::string(5000, 'x'));
Put(Key(90 - i), std::string(5000, 'x'));
Flush();
}
while (compaction_cnt.load() < 8 || NumTableFilesAtLevel(0) > 0) {
env_->SleepForMicroseconds(1000);
}
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
ASSERT_EQ(sleep_count.load(), 0);
// Slowdown is triggered now
for (int i = 0; i < 10; i++) {
Put(Key(i), std::string(100, 'x'));
}
ASSERT_GT(sleep_count.load(), 0);
{
MutexLock l(&mut);
compaction_cnt.store(7);
cv.SignalAll();
}
while (NumTableFilesAtLevel(1) > 0) {
env_->SleepForMicroseconds(1000);
}
// Slowdown is not triggered any more.
sleep_count.store(0);
// Slowdown is not triggered now
for (int i = 0; i < 10; i++) {
Put(Key(i), std::string(100, 'x'));
}
ASSERT_EQ(sleep_count.load(), 0);
// shrink level base so L2 will hit soft limit easier.
ASSERT_OK(dbfull()->SetOptions({
{"max_bytes_for_level_base", "5000"},
}));
compaction_cnt.store(7);
Flush();
while (NumTableFilesAtLevel(0) == 0) {
env_->SleepForMicroseconds(1000);
}
// Slowdown is triggered now
for (int i = 0; i < 10; i++) {
Put(Key(i), std::string(100, 'x'));
}
ASSERT_GT(sleep_count.load(), 0);
{
MutexLock l(&mut);
compaction_cnt.store(7);
cv.SignalAll();
}
while (NumTableFilesAtLevel(2) != 0) {
env_->SleepForMicroseconds(1000);
}
// Slowdown is not triggered anymore
sleep_count.store(0);
for (int i = 0; i < 10; i++) {
Put(Key(i), std::string(100, 'x'));
}
ASSERT_EQ(sleep_count.load(), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -66,7 +66,7 @@ class MemTableListTest : public testing::Test {
EnvOptions env_options; EnvOptions env_options;
shared_ptr<Cache> table_cache(NewLRUCache(50000, 16)); shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
WriteBuffer write_buffer(db_options.db_write_buffer_size); WriteBuffer write_buffer(db_options.db_write_buffer_size);
WriteController write_controller; WriteController write_controller(10000000u);
CreateDB(); CreateDB();
VersionSet versions(dbname, &db_options, env_options, table_cache.get(), VersionSet versions(dbname, &db_options, env_options, table_cache.get(),

@ -44,6 +44,7 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
@ -1924,6 +1925,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
mu->Unlock(); mu->Unlock();
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest");
if (!edit->IsColumnFamilyManipulation() && if (!edit->IsColumnFamilyManipulation() &&
db_options_->max_open_files == -1) { db_options_->max_open_files == -1) {
// unlimited table cache. Pre-load table handle now. // unlimited table cache. Pre-load table handle now.
@ -2494,7 +2496,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
ColumnFamilyOptions cf_options(*options); ColumnFamilyOptions cf_options(*options);
std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10, std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
options->table_cache_numshardbits)); options->table_cache_numshardbits));
WriteController wc; WriteController wc(options->delayed_write_rate);
WriteBuffer wb(options->db_write_buffer_size); WriteBuffer wb(options->db_write_buffer_size);
VersionSet versions(dbname, options, env_options, tc.get(), &wb, &wc); VersionSet versions(dbname, options, env_options, tc.get(), &wb, &wc);
Status status; Status status;

@ -5,7 +5,9 @@
#include "db/write_controller.h" #include "db/write_controller.h"
#include <atomic>
#include <cassert> #include <cassert>
#include "rocksdb/env.h"
namespace rocksdb { namespace rocksdb {
@ -14,15 +16,83 @@ std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this)); return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
} }
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken( std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken() {
uint64_t delay_us) { if (total_delayed_++ == 0) {
total_delay_us_ += delay_us; last_refill_time_ = 0;
return std::unique_ptr<WriteControllerToken>( bytes_left_ = 0;
new DelayWriteToken(this, delay_us)); }
return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
} }
bool WriteController::IsStopped() const { return total_stopped_ > 0; } bool WriteController::IsStopped() const { return total_stopped_ > 0; }
uint64_t WriteController::GetDelay() const { return total_delay_us_; } // Tihs is inside DB mutex, so we can't sleep and need to minimize
// frequency to get time.
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) {
if (total_stopped_ > 0) {
return 0;
}
if (total_delayed_ == 0) {
return 0;
}
const uint64_t kMicrosPerSecond = 1000000;
const uint64_t kRefillInterval = 1024U;
if (bytes_left_ >= num_bytes) {
bytes_left_ -= num_bytes;
return 0;
}
// The frequency to get time inside DB mutex is less than one per refill
// interval.
auto time_now = env->NowMicros();
uint64_t sleep_debt = 0;
uint64_t time_since_last_refill = 0;
if (last_refill_time_ != 0) {
if (last_refill_time_ > time_now) {
sleep_debt = last_refill_time_ - time_now;
} else {
time_since_last_refill = time_now - last_refill_time_;
bytes_left_ +=
static_cast<uint64_t>(static_cast<double>(time_since_last_refill) /
kMicrosPerSecond * delayed_write_rate_);
if (time_since_last_refill >= kRefillInterval &&
bytes_left_ > num_bytes) {
// If refill interval already passed and we have enough bytes
// return without extra sleeping.
last_refill_time_ = time_now;
bytes_left_ -= num_bytes;
return 0;
}
}
}
uint64_t single_refill_amount =
delayed_write_rate_ * kRefillInterval / kMicrosPerSecond;
if (bytes_left_ + single_refill_amount >= num_bytes) {
// Wait until a refill interval
// Never trigger expire for less than one refill interval to avoid to get
// time.
bytes_left_ = bytes_left_ + single_refill_amount - num_bytes;
last_refill_time_ = time_now + kRefillInterval;
return kRefillInterval + sleep_debt;
}
// Need to refill more than one interval. Need to sleep longer. Check
// whether expiration will hit
// Sleep just until `num_bytes` is allowed.
uint64_t sleep_amount =
static_cast<uint64_t>(num_bytes /
static_cast<long double>(delayed_write_rate_) *
kMicrosPerSecond) +
sleep_debt;
last_refill_time_ = time_now + sleep_amount;
return sleep_amount;
}
StopWriteToken::~StopWriteToken() { StopWriteToken::~StopWriteToken() {
assert(controller_->total_stopped_ >= 1); assert(controller_->total_stopped_ >= 1);
@ -30,8 +100,8 @@ StopWriteToken::~StopWriteToken() {
} }
DelayWriteToken::~DelayWriteToken() { DelayWriteToken::~DelayWriteToken() {
assert(controller_->total_delay_us_ >= delay_us_); controller_->total_delayed_--;
controller_->total_delay_us_ -= delay_us_; assert(controller_->total_delayed_ >= 0);
} }
} // namespace rocksdb } // namespace rocksdb

@ -11,6 +11,7 @@
namespace rocksdb { namespace rocksdb {
class Env;
class WriteControllerToken; class WriteControllerToken;
// WriteController is controlling write stalls in our write code-path. Write // WriteController is controlling write stalls in our write code-path. Write
@ -19,20 +20,38 @@ class WriteControllerToken;
// to be called while holding DB mutex // to be called while holding DB mutex
class WriteController { class WriteController {
public: public:
WriteController() : total_stopped_(0), total_delay_us_(0) {} explicit WriteController(uint64_t delayed_write_rate = 1024u * 1024u * 32u)
: total_stopped_(0),
total_delayed_(0),
bytes_left_(0),
last_refill_time_(0) {
set_delayed_write_rate(delayed_write_rate);
}
~WriteController() = default; ~WriteController() = default;
// When an actor (column family) requests a stop token, all writes will be // When an actor (column family) requests a stop token, all writes will be
// stopped until the stop token is released (deleted) // stopped until the stop token is released (deleted)
std::unique_ptr<WriteControllerToken> GetStopToken(); std::unique_ptr<WriteControllerToken> GetStopToken();
// When an actor (column family) requests a delay token, total delay for all // When an actor (column family) requests a delay token, total delay for all
// writes will be increased by delay_us. The delay will last until delay token // writes to the DB will be controlled under the delayed write rate. Every
// is released // write needs to call GetDelay() with number of bytes writing to the DB,
std::unique_ptr<WriteControllerToken> GetDelayToken(uint64_t delay_us); // which returns number of microseconds to sleep.
std::unique_ptr<WriteControllerToken> GetDelayToken();
// these two metods are querying the state of the WriteController // these two metods are querying the state of the WriteController
bool IsStopped() const; bool IsStopped() const;
uint64_t GetDelay() const; bool NeedsDelay() const { return total_delayed_ > 0; }
// return how many microseconds the caller needs to sleep after the call
// num_bytes: how many number of bytes to put into the DB.
// Prerequisite: DB mutex held.
uint64_t GetDelay(Env* env, uint64_t num_bytes);
void set_delayed_write_rate(uint64_t delayed_write_rate) {
delayed_write_rate_ = delayed_write_rate;
if (delayed_write_rate_ == 0) {
// avoid divide 0
delayed_write_rate_ = 1U;
}
}
private: private:
friend class WriteControllerToken; friend class WriteControllerToken;
@ -40,7 +59,10 @@ class WriteController {
friend class DelayWriteToken; friend class DelayWriteToken;
int total_stopped_; int total_stopped_;
uint64_t total_delay_us_; int total_delayed_;
uint64_t bytes_left_;
uint64_t last_refill_time_;
uint64_t delayed_write_rate_;
}; };
class WriteControllerToken { class WriteControllerToken {
@ -67,12 +89,9 @@ class StopWriteToken : public WriteControllerToken {
class DelayWriteToken : public WriteControllerToken { class DelayWriteToken : public WriteControllerToken {
public: public:
DelayWriteToken(WriteController* controller, uint64_t delay_us) explicit DelayWriteToken(WriteController* controller)
: WriteControllerToken(controller), delay_us_(delay_us) {} : WriteControllerToken(controller) {}
virtual ~DelayWriteToken(); virtual ~DelayWriteToken();
private:
uint64_t delay_us_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -5,14 +5,22 @@
// //
#include "db/write_controller.h" #include "db/write_controller.h"
#include "rocksdb/env.h"
#include "util/testharness.h" #include "util/testharness.h"
namespace rocksdb { namespace rocksdb {
class WriteControllerTest : public testing::Test {}; class WriteControllerTest : public testing::Test {};
class TimeSetEnv : public EnvWrapper {
public:
explicit TimeSetEnv() : EnvWrapper(nullptr) {}
uint64_t now_micros_ = 6666;
virtual uint64_t NowMicros() override { return now_micros_; }
};
TEST_F(WriteControllerTest, SanityTest) { TEST_F(WriteControllerTest, SanityTest) {
WriteController controller; WriteController controller(10000000u);
auto stop_token_1 = controller.GetStopToken(); auto stop_token_1 = controller.GetStopToken();
auto stop_token_2 = controller.GetStopToken(); auto stop_token_2 = controller.GetStopToken();
@ -22,15 +30,66 @@ TEST_F(WriteControllerTest, SanityTest) {
stop_token_2.reset(); stop_token_2.reset();
ASSERT_FALSE(controller.IsStopped()); ASSERT_FALSE(controller.IsStopped());
auto delay_token_1 = controller.GetDelayToken(5); TimeSetEnv env;
ASSERT_EQ(static_cast<uint64_t>(5), controller.GetDelay());
auto delay_token_2 = controller.GetDelayToken(8); auto delay_token_1 = controller.GetDelayToken();
ASSERT_EQ(static_cast<uint64_t>(13), controller.GetDelay()); ASSERT_EQ(static_cast<uint64_t>(2000000),
controller.GetDelay(&env, 20000000u));
env.now_micros_ += 1999900u; // sleep debt 1000
auto delay_token_2 = controller.GetDelayToken();
// One refill: 10240 bytes allowed, 1000 used, 9240 left
ASSERT_EQ(static_cast<uint64_t>(1124), controller.GetDelay(&env, 1000u));
env.now_micros_ += 1124u; // sleep debt 0
delay_token_2.reset(); delay_token_2.reset();
ASSERT_EQ(static_cast<uint64_t>(5), controller.GetDelay()); // 1000 used, 8240 left
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(&env, 1000u));
env.now_micros_ += 100u; // sleep credit 100
// 1000 used, 7240 left
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(&env, 1000u));
env.now_micros_ += 100u; // sleep credit 200
// One refill: 10240 fileed, sleep credit generates 2000. 8000 used
// 7240 + 10240 + 2000 - 8000 = 11480 left
ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(&env, 8000u));
env.now_micros_ += 200u; // sleep debt 824
// 1000 used, 10480 left.
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(&env, 1000u));
env.now_micros_ += 200u; // sleep debt 624
// Out of bound sleep, still 10480 left
ASSERT_EQ(static_cast<uint64_t>(3000624u),
controller.GetDelay(&env, 30000000u));
env.now_micros_ += 3000724u; // sleep credit 100
// 6000 used, 4480 left.
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(&env, 6000u));
env.now_micros_ += 200u; // sleep credit 300
// One refill, credit 4480 balance + 3000 credit + 10240 refill
// Use 8000, 9720 left
ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(&env, 8000u));
env.now_micros_ += 3024u; // sleep credit 2000
// 1720 left
ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(&env, 8000u));
// 1720 balance + 20000 credit = 20170 left
// Use 8000, 12170 left
ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(&env, 8000u));
// 4170 left
ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(&env, 8000u));
// Need a refill
ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(&env, 9000u));
delay_token_1.reset(); delay_token_1.reset();
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay()); ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(&env, 30000000u));
delay_token_1.reset(); delay_token_1.reset();
ASSERT_FALSE(controller.IsStopped()); ASSERT_FALSE(controller.IsStopped());
} }

@ -87,8 +87,9 @@ void WriteThread::ExitWriteThread(WriteThread::Writer* w,
// //
// REQUIRES: Writer list must be non-empty // REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-nullptr batch // REQUIRES: First writer must have a non-nullptr batch
void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer, size_t WriteThread::BuildBatchGroup(
autovector<WriteBatch*>* write_batch_group) { WriteThread::Writer** last_writer,
autovector<WriteBatch*>* write_batch_group) {
assert(!writers_.empty()); assert(!writers_.empty());
Writer* first = writers_.front(); Writer* first = writers_.front();
assert(first->batch != nullptr); assert(first->batch != nullptr);
@ -109,7 +110,7 @@ void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer,
if (first->has_callback) { if (first->has_callback) {
// TODO(agiardullo:) Batching not currently supported as this write may // TODO(agiardullo:) Batching not currently supported as this write may
// fail if the callback function decides to abort this write. // fail if the callback function decides to abort this write.
return; return size;
} }
std::deque<Writer*>::iterator iter = writers_.begin(); std::deque<Writer*>::iterator iter = writers_.begin();
@ -155,6 +156,7 @@ void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer,
w->in_batch_group = true; w->in_batch_group = true;
*last_writer = w; *last_writer = w;
} }
return size;
} }
} // namespace rocksdb } // namespace rocksdb

@ -72,8 +72,9 @@ class WriteThread {
// REQUIRES: db mutex held // REQUIRES: db mutex held
void ExitWriteThread(Writer* w, Writer* last_writer, Status status); void ExitWriteThread(Writer* w, Writer* last_writer, Status status);
void BuildBatchGroup(Writer** last_writer, // return total batch group size
autovector<WriteBatch*>* write_batch_group); size_t BuildBatchGroup(Writer** last_writer,
autovector<WriteBatch*>* write_batch_group);
private: private:
// Queue of writers. // Queue of writers.

@ -480,8 +480,8 @@ struct ColumnFamilyOptions {
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API
int max_grandparent_overlap_factor; int max_grandparent_overlap_factor;
// Puts are delayed 0-1 ms when any level has a compaction score that exceeds // Puts are delayed to options.delayed_write_rate when any level has a
// soft_rate_limit. This is ignored when == 0.0. // compaction score that exceeds soft_rate_limit. This is ignored when == 0.0.
// CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not // CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not
// hold, RocksDB will set soft_rate_limit = hard_rate_limit // hold, RocksDB will set soft_rate_limit = hard_rate_limit
// //
@ -490,12 +490,7 @@ struct ColumnFamilyOptions {
// Dynamically changeable through SetOptions() API // Dynamically changeable through SetOptions() API
double soft_rate_limit; double soft_rate_limit;
// Puts are delayed 1ms at a time when any level has a compaction score that // DEPRECATED -- this options is no longer usde
// exceeds hard_rate_limit. This is ignored when <= 1.0.
//
// Default: 0 (disabled)
//
// Dynamically changeable through SetOptions() API
double hard_rate_limit; double hard_rate_limit;
// DEPRECATED -- this options is no longer used // DEPRECATED -- this options is no longer used
@ -1031,6 +1026,14 @@ struct DBOptions {
// //
// Default: false // Default: false
bool enable_thread_tracking; bool enable_thread_tracking;
// The limited write rate to DB if soft_rate_limit or
// level0_slowdown_writes_trigger is triggered. It is calcualted using
// size of user write requests before compression.
// Unit: byte per second.
//
// Default: 1MB/s
uint64_t delayed_write_rate;
}; };
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)

@ -537,7 +537,7 @@ void DumpManifestFile(std::string file, bool verbose, bool hex) {
// if VersionSet::DumpManifest() depends on any option done by // if VersionSet::DumpManifest() depends on any option done by
// SanitizeOptions(), we need to initialize it manually. // SanitizeOptions(), we need to initialize it manually.
options.db_paths.emplace_back("dummy", 0); options.db_paths.emplace_back("dummy", 0);
WriteController wc; WriteController wc(options.delayed_write_rate);
WriteBuffer wb(options.db_write_buffer_size); WriteBuffer wb(options.db_write_buffer_size);
VersionSet versions(dbname, &options, sopt, tc.get(), &wb, &wc); VersionSet versions(dbname, &options, sopt, tc.get(), &wb, &wc);
Status s = versions.DumpManifest(options, file, verbose, hex); Status s = versions.DumpManifest(options, file, verbose, hex);
@ -1146,7 +1146,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt,
std::shared_ptr<Cache> tc( std::shared_ptr<Cache> tc(
NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits)); NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits));
const InternalKeyComparator cmp(opt.comparator); const InternalKeyComparator cmp(opt.comparator);
WriteController wc; WriteController wc(opt.delayed_write_rate);
WriteBuffer wb(opt.db_write_buffer_size); WriteBuffer wb(opt.db_write_buffer_size);
VersionSet versions(db_path_, &opt, soptions, tc.get(), &wb, &wc); VersionSet versions(db_path_, &opt, soptions, tc.get(), &wb, &wc);
std::vector<ColumnFamilyDescriptor> dummy; std::vector<ColumnFamilyDescriptor> dummy;

@ -241,7 +241,8 @@ DBOptions::DBOptions()
bytes_per_sync(0), bytes_per_sync(0),
wal_bytes_per_sync(0), wal_bytes_per_sync(0),
listeners(), listeners(),
enable_thread_tracking(false) { enable_thread_tracking(false),
delayed_write_rate(1024U * 1024U) {
} }
DBOptions::DBOptions(const Options& options) DBOptions::DBOptions(const Options& options)
@ -286,7 +287,8 @@ DBOptions::DBOptions(const Options& options)
bytes_per_sync(options.bytes_per_sync), bytes_per_sync(options.bytes_per_sync),
wal_bytes_per_sync(options.wal_bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync),
listeners(options.listeners), listeners(options.listeners),
enable_thread_tracking(options.enable_thread_tracking) {} enable_thread_tracking(options.enable_thread_tracking),
delayed_write_rate(options.delayed_write_rate) {}
static const char* const access_hints[] = { static const char* const access_hints[] = {
"NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"

Loading…
Cancel
Save