diff --git a/HISTORY.md b/HISTORY.md index 559f942c1..7f8efa535 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,8 @@ * Change default value for options.compaction_filter_factory and options.compaction_filter_factory_v2 to nullptr instead of DefaultCompactionFilterFactory and DefaultCompactionFilterFactoryV2. * If CancelAllBackgroundWork is called without doing a flush after doing loads with WAL disabled, the changes which haven't been flushed before the call to CancelAllBackgroundWork will be lost. * WBWIIterator::Entry() now returns WriteEntry instead of `const WriteEntry&` +* options.hard_rate_limit is deprecated. +* When options.soft_rate_limit or options.level0_slowdown_writes_trigger is triggered, the way to slow down writes is changed to: write rate to DB is limited to to options.delayed_write_rate. ## 3.11.0 (5/19/2015) ### New Features diff --git a/db/column_family.cc b/db/column_family.cc index 298b63704..1d4e01732 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -36,36 +36,6 @@ namespace rocksdb { -namespace { -// This function computes the amount of time in microseconds by which a write -// should be delayed based on the number of level-0 files according to the -// following formula: -// if n < bottom, return 0; -// if n >= top, return 1000; -// otherwise, let r = (n - bottom) / -// (top - bottom) -// and return r^2 * 1000. -// The goal of this formula is to gradually increase the rate at which writes -// are slowed. We also tried linear delay (r * 1000), but it seemed to do -// slightly worse. There is no other particular reason for choosing quadratic. -uint64_t SlowdownAmount(int n, double bottom, double top) { - uint64_t delay; - if (n >= top) { - delay = 1000; - } else if (n < bottom) { - delay = 0; - } else { - // If we are here, we know that: - // level0_start_slowdown <= n < level0_slowdown - // since the previous two conditions are false. - double how_much = static_cast(n - bottom) / (top - bottom); - delay = std::max(how_much * how_much * 1000, 100.0); - } - assert(delay <= 1000); - return delay; -} -} // namespace - ColumnFamilyHandleImpl::ColumnFamilyHandleImpl( ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex) : cfd_(column_family_data), db_(db), mutex_(mutex) { @@ -147,9 +117,6 @@ ColumnFamilyOptions SanitizeOptions(const DBOptions& db_options, if (result.max_mem_compaction_level >= result.num_levels) { result.max_mem_compaction_level = result.num_levels - 1; } - if (result.soft_rate_limit > result.hard_rate_limit) { - result.soft_rate_limit = result.hard_rate_limit; - } if (result.max_write_buffer_number < 2) { result.max_write_buffer_number = 2; } @@ -456,7 +423,6 @@ void ColumnFamilyData::RecalculateWriteStallConditions( auto* vstorage = current_->storage_info(); const double score = vstorage->max_compaction_score(); const int max_level = vstorage->max_compaction_score_level(); - auto write_controller = column_family_set_->write_controller_; if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) { @@ -477,37 +443,18 @@ void ColumnFamilyData::RecalculateWriteStallConditions( } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_slowdown_writes_trigger) { - uint64_t slowdown = - SlowdownAmount(vstorage->l0_delay_trigger_count(), - mutable_cf_options.level0_slowdown_writes_trigger, - mutable_cf_options.level0_stop_writes_trigger); - write_controller_token_ = write_controller->GetDelayToken(slowdown); - internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, slowdown); + write_controller_token_ = write_controller->GetDelayToken(); + internal_stats_->AddCFStats(InternalStats::LEVEL0_SLOWDOWN, 1); Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, - "[%s] Stalling writes because we have %d level-0 files (%" PRIu64 - "us)", - name_.c_str(), vstorage->l0_delay_trigger_count(), slowdown); - } else if (mutable_cf_options.hard_rate_limit > 1.0 && - score > mutable_cf_options.hard_rate_limit) { - uint64_t kHardLimitSlowdown = 1000; - write_controller_token_ = - write_controller->GetDelayToken(kHardLimitSlowdown); - internal_stats_->RecordLevelNSlowdown(max_level, false); - Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, - "[%s] Stalling writes because we hit hard limit on level %d. " - "(%" PRIu64 "us)", - name_.c_str(), max_level, kHardLimitSlowdown); + "[%s] Stalling writes because we have %d level-0 files", + name_.c_str(), vstorage->l0_delay_trigger_count()); } else if (mutable_cf_options.soft_rate_limit > 0.0 && score > mutable_cf_options.soft_rate_limit) { - uint64_t slowdown = SlowdownAmount(score, - mutable_cf_options.soft_rate_limit, - mutable_cf_options.hard_rate_limit); - write_controller_token_ = write_controller->GetDelayToken(slowdown); + write_controller_token_ = write_controller->GetDelayToken(); internal_stats_->RecordLevelNSlowdown(max_level, true); Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, - "[%s] Stalling writes because we hit soft limit on level %d (%" PRIu64 - "us)", - name_.c_str(), max_level, slowdown); + "[%s] Stalling writes because we hit soft limit on level %d", + name_.c_str(), max_level); } else { write_controller_token_.reset(); } diff --git a/db/db_bench.cc b/db/db_bench.cc index d9ee310ce..0de99c974 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -543,6 +543,10 @@ DEFINE_double(hard_rate_limit, 0.0, "When not equal to 0 this make threads " "sleep at each stats reporting interval until the compaction" " score for all levels is less than or equal to this value."); +DEFINE_uint64(delayed_write_rate, 2097152u, + "Limited bytes allowed to DB when soft_rate_limit or " + "level0_slowdown_writes_trigger triggers"); + DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, "When hard_rate_limit is set then this is the max time a put will" " be stalled."); @@ -2270,6 +2274,7 @@ class Benchmark { } options.soft_rate_limit = FLAGS_soft_rate_limit; options.hard_rate_limit = FLAGS_hard_rate_limit; + options.delayed_write_rate = FLAGS_delayed_write_rate; options.rate_limit_delay_max_milliseconds = FLAGS_rate_limit_delay_max_milliseconds; options.table_cache_numshardbits = FLAGS_table_cache_numshardbits; diff --git a/db/db_impl.cc b/db/db_impl.cc index 30cf8e496..c1677eb9c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -225,6 +225,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) max_total_in_memory_state_(0), is_snapshot_supported_(true), write_buffer_(options.db_write_buffer_size), + write_controller_(options.delayed_write_rate), + last_batch_group_size_(0), unscheduled_flushes_(0), unscheduled_compactions_(0), bg_compaction_scheduled_(0), @@ -1691,6 +1693,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, Log(InfoLogLevel::WARN_LEVEL, db_options_.info_log, "[%s] SetOptions failed", cfd->GetName().c_str()); } + LogFlush(db_options_.info_log); return s; #endif // ROCKSDB_LITE } @@ -3409,8 +3412,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, status = ScheduleFlushes(&context); } - if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || - write_controller_.GetDelay() > 0))) { + if (UNLIKELY(status.ok()) && + (write_controller_.IsStopped() || write_controller_.NeedsDelay())) { // If writer is stopped, we need to get it going, // so schedule flushes/compactions if (context.schedule_bg_work_) { @@ -3418,7 +3421,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, } PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_delay_time); - status = DelayWrite(expiration_time); + // We don't know size of curent batch so that we always use the size + // for previous one. It might create a fairness issue that expiration + // might happen for smaller writes but larger writes can go through. + // Can optimize it if it is an issue. + status = DelayWrite(last_batch_group_size_, expiration_time); PERF_TIMER_START(write_pre_and_post_process_time); } @@ -3432,7 +3439,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, autovector write_batch_group; if (status.ok()) { - write_thread_.BuildBatchGroup(&last_writer, &write_batch_group); + last_batch_group_size_ = + write_thread_.BuildBatchGroup(&last_writer, &write_batch_group); // Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging @@ -3566,24 +3574,36 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::DelayWrite(uint64_t expiration_time) { +Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) { uint64_t time_delayed = 0; bool delayed = false; bool timed_out = false; { StopWatch sw(env_, stats_, WRITE_STALL, &time_delayed); bool has_timeout = (expiration_time > 0); - auto delay = write_controller_.GetDelay(); - if (write_controller_.IsStopped() == false && delay > 0) { + auto delay = write_controller_.GetDelay(env_, num_bytes); + if (delay > 0) { mutex_.Unlock(); delayed = true; // hopefully we don't have to sleep more than 2 billion microseconds TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); - env_->SleepForMicroseconds(static_cast(delay)); + if (has_timeout) { + auto time_now = env_->NowMicros(); + if (time_now + delay >= expiration_time) { + if (expiration_time > time_now) { + env_->SleepForMicroseconds( + static_cast(expiration_time - time_now)); + } + timed_out = true; + } + } + if (!timed_out) { + env_->SleepForMicroseconds(static_cast(delay)); + } mutex_.Lock(); } - while (bg_error_.ok() && write_controller_.IsStopped()) { + while (!timed_out && bg_error_.ok() && write_controller_.IsStopped()) { delayed = true; if (has_timeout) { TEST_SYNC_POINT("DBImpl::DelayWrite:TimedWait"); diff --git a/db/db_impl.h b/db/db_impl.h index 50e470e21..9a42da247 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -433,7 +433,10 @@ class DBImpl : public DB { // concurrent flush memtables to storage. Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, MemTable* mem, VersionEdit* edit); - Status DelayWrite(uint64_t expiration_time); + + // num_bytes: for slowdown case, delay time is calculated based on + // `num_bytes` going through. + Status DelayWrite(uint64_t num_bytes, uint64_t expiration_time); Status ScheduleFlushes(WriteContext* context); @@ -582,6 +585,11 @@ class DBImpl : public DB { WriteBatch tmp_batch_; WriteController write_controller_; + + // Size of the last batch group. In slowdown mode, next write needs to + // sleep if it uses up the quota. + uint64_t last_batch_group_size_; + FlushScheduler flush_scheduler_; SnapshotList snapshots_; diff --git a/db/db_test.cc b/db/db_test.cc index 8271cc624..a8a6c4bdc 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -154,9 +154,11 @@ class SpecialEnv : public EnvWrapper { std::function* table_write_callback_; - int64_t addon_time_; + std::atomic addon_time_; + bool no_sleep_; - explicit SpecialEnv(Env* base) : EnvWrapper(base), rnd_(301), addon_time_(0) { + explicit SpecialEnv(Env* base) + : EnvWrapper(base), rnd_(301), addon_time_(0), no_sleep_(false) { delay_sstable_sync_.store(false, std::memory_order_release); drop_writes_.store(false, std::memory_order_release); no_space_.store(false, std::memory_order_release); @@ -358,19 +360,27 @@ class SpecialEnv : public EnvWrapper { virtual void SleepForMicroseconds(int micros) override { sleep_counter_.Increment(); - target()->SleepForMicroseconds(micros); + if (no_sleep_) { + addon_time_.fetch_add(micros); + } else { + target()->SleepForMicroseconds(micros); + } } virtual Status GetCurrentTime(int64_t* unix_time) override { Status s = target()->GetCurrentTime(unix_time); if (s.ok()) { - *unix_time += addon_time_; + *unix_time += addon_time_.load(); } return s; } virtual uint64_t NowNanos() override { - return target()->NowNanos() + addon_time_ * 1000; + return target()->NowNanos() + addon_time_.load() * 1000; + } + + virtual uint64_t NowMicros() override { + return target()->NowMicros() + addon_time_.load(); } }; @@ -639,8 +649,8 @@ class DBTest : public testing::Test { !options.purge_redundant_kvs_while_flush; break; case kPerfOptions: - options.hard_rate_limit = 2.0; - options.rate_limit_delay_max_milliseconds = 2; + options.soft_rate_limit = 2.0; + options.delayed_write_rate = 8 * 1024 * 1024; // TODO -- test more options break; case kDeletesFilterFirst: @@ -4134,7 +4144,7 @@ class DelayFilter : public CompactionFilter { virtual bool Filter(int level, const Slice& key, const Slice& value, std::string* new_value, bool* value_changed) const override { - db_test->env_->addon_time_ += 1000; + db_test->env_->addon_time_.fetch_add(1000); return true; } @@ -6704,7 +6714,7 @@ TEST_F(DBTest, Snapshot) { Put(0, "foo", "0v2"); Put(1, "foo", "1v2"); - env_->addon_time_++; + env_->addon_time_.fetch_add(1); const Snapshot* s2 = db_->GetSnapshot(); ASSERT_EQ(2U, GetNumSnapshots()); @@ -11200,7 +11210,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase) { options.level_compaction_dynamic_level_bytes = true; options.max_bytes_for_level_base = 10240; options.max_bytes_for_level_multiplier = 4; - options.hard_rate_limit = 1.1; + options.soft_rate_limit = 1.1; options.max_background_compactions = max_background_compactions; options.num_levels = 5; @@ -11493,7 +11503,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBaseInc) { options.level_compaction_dynamic_level_bytes = true; options.max_bytes_for_level_base = 10240; options.max_bytes_for_level_multiplier = 4; - options.hard_rate_limit = 1.1; + options.soft_rate_limit = 1.1; options.max_background_compactions = 2; options.num_levels = 5; @@ -11545,7 +11555,7 @@ TEST_F(DBTest, MigrateToDynamicLevelMaxBytesBase) { options.level_compaction_dynamic_level_bytes = false; options.max_bytes_for_level_base = 10240; options.max_bytes_for_level_multiplier = 4; - options.hard_rate_limit = 1.1; + options.soft_rate_limit = 1.1; options.num_levels = 8; DestroyAndReopen(options); @@ -11743,7 +11753,7 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { options.level0_file_num_compaction_trigger = 2; options.level0_slowdown_writes_trigger = 2; options.level0_stop_writes_trigger = 2; - options.hard_rate_limit = 1.1; + options.soft_rate_limit = 1.1; // Use file size to distinguish levels // L1: 10, L2: 20, L3 40, L4 80 @@ -11853,7 +11863,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { options.env = env_; options.create_if_missing = true; options.compression = kNoCompression; - options.hard_rate_limit = 1.1; + options.soft_rate_limit = 1.1; options.write_buffer_size = k64KB; options.max_write_buffer_number = 2; // Compaction related options @@ -12042,73 +12052,6 @@ TEST_F(DBTest, DynamicCompactionOptions) { dbfull()->TEST_WaitForCompact(); ASSERT_LT(NumTableFilesAtLevel(0), 4); - // Test for hard_rate_limit. - // First change max_bytes_for_level_base to a big value and populate - // L1 - L3. Then thrink max_bytes_for_level_base and disable auto compaction - // at the same time, we should see some level with score greater than 2. - ASSERT_OK(dbfull()->SetOptions({ - {"max_bytes_for_level_base", ToString(k1MB) } - })); - // writing 40 x 64KB = 10 x 256KB - // (L1 + L2 + L3) = (1 + 2 + 4) * 256KB - for (int i = 0; i < 40; ++i) { - gen_l0_kb(i, 64, 32); - } - dbfull()->TEST_WaitForCompact(); - ASSERT_TRUE((SizeAtLevel(1) > k1MB * 0.8 && - SizeAtLevel(1) < k1MB * 1.2) || - (SizeAtLevel(2) > 2 * k1MB * 0.8 && - SizeAtLevel(2) < 2 * k1MB * 1.2) || - (SizeAtLevel(3) > 4 * k1MB * 0.8 && - SizeAtLevel(3) < 4 * k1MB * 1.2)); - // Reduce max_bytes_for_level_base and disable compaction at the same time - // This should cause score to increase - ASSERT_OK(dbfull()->SetOptions({ - {"disable_auto_compactions", "true"}, - {"max_bytes_for_level_base", "65536"}, - })); - ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024))); - dbfull()->TEST_FlushMemTable(true); - - // Check score is above 2 - ASSERT_TRUE(SizeAtLevel(1) / k64KB > 2 || - SizeAtLevel(2) / k64KB > 4 || - SizeAtLevel(3) / k64KB > 8); - - // Enfoce hard rate limit. Now set hard_rate_limit to 2, - // we should start to see put delay (1000 us) and timeout as a result - // (L0 score is not regulated by this limit). - ASSERT_OK(dbfull()->SetOptions({ - {"hard_rate_limit", "2"}, - {"level0_slowdown_writes_trigger", "18"}, - {"level0_stop_writes_trigger", "20"} - })); - ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024))); - dbfull()->TEST_FlushMemTable(true); - - std::atomic sleep_count(0); - rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::DelayWrite:Sleep", [&](void* arg) { sleep_count.fetch_add(1); }); - rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - - // Hard rate limit slow down for 1000 us, so default 10ms should be ok - ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); - sleep_count.store(0); - ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); - ASSERT_GT(sleep_count.load(), 0); - - // Lift the limit and no timeout - ASSERT_OK(dbfull()->SetOptions({ - {"hard_rate_limit", "200"}, - })); - dbfull()->TEST_FlushMemTable(true); - sleep_count.store(0); - ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo)); - // Technically, time out is still possible for timing issue. - ASSERT_EQ(sleep_count.load(), 0); - rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - - // Test max_mem_compaction_level. // Destroy DB and start from scratch options.max_background_compactions = 1; @@ -12784,7 +12727,7 @@ class DelayedMergeOperator : public AssociativeMergeOperator { virtual bool Merge(const Slice& key, const Slice* existing_value, const Slice& value, std::string* new_value, Logger* logger) const override { - db_test_->env_->addon_time_ += 1000; + db_test_->env_->addon_time_.fetch_add(1000); return true; } @@ -12799,7 +12742,7 @@ TEST_F(DBTest, MergeTestTime) { // Enable time profiling SetPerfLevel(kEnableTime); - this->env_->addon_time_ = 0; + this->env_->addon_time_.store(0); Options options; options = CurrentOptions(options); options.statistics = rocksdb::CreateDBStatistics(); @@ -13070,7 +13013,7 @@ TEST_F(DBTest, TablePropertiesNeedCompactTest) { options.target_file_size_base = 2048; options.max_bytes_for_level_base = 10240; options.max_bytes_for_level_multiplier = 4; - options.hard_rate_limit = 1.1; + options.soft_rate_limit = 1.1; options.num_levels = 8; std::shared_ptr collector_factory( @@ -13496,6 +13439,174 @@ TEST_F(DBTest, SuggestCompactRangeNoTwoLevel0Compactions) { TEST_SYNC_POINT("DBTest::SuggestCompactRangeNoTwoLevel0Compactions:2"); } +TEST_F(DBTest, DelayedWriteRate) { + Options options; + options.env = env_; + env_->no_sleep_ = true; + options = CurrentOptions(options); + options.write_buffer_size = 100000; // Small write buffer + options.max_write_buffer_number = 256; + options.disable_auto_compactions = true; + options.level0_file_num_compaction_trigger = 3; + options.level0_slowdown_writes_trigger = 3; + options.level0_stop_writes_trigger = 999999; + options.delayed_write_rate = 200000; // About 200KB/s limited rate + + CreateAndReopenWithCF({"pikachu"}, options); + + for (int i = 0; i < 3; i++) { + Put(Key(i), std::string(10000, 'x')); + Flush(); + } + + // These writes will be slowed down to 1KB/s + size_t estimated_total_size = 0; + Random rnd(301); + for (int i = 0; i < 3000; i++) { + auto rand_num = rnd.Uniform(20); + // Spread the size range to more. + size_t entry_size = rand_num * rand_num * rand_num; + WriteOptions wo; + // For a small chance, set a timeout. + if (rnd.Uniform(20) == 6) { + wo.timeout_hint_us = 1500; + } + Put(Key(i), std::string(entry_size, 'x'), wo); + estimated_total_size += entry_size + 20; + // Ocassionally sleep a while + if (rnd.Uniform(20) == 6) { + env_->SleepForMicroseconds(2666); + } + } + uint64_t estimated_sleep_time = + estimated_total_size / options.delayed_write_rate * 1000000U; + ASSERT_GT(env_->addon_time_.load(), estimated_sleep_time * 0.8); + ASSERT_LT(env_->addon_time_.load(), estimated_sleep_time * 1.1); + + env_->no_sleep_ = false; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest, SoftLimit) { + Options options; + options.env = env_; + options = CurrentOptions(options); + options.write_buffer_size = 100000; // Small write buffer + options.max_write_buffer_number = 256; + options.level0_file_num_compaction_trigger = 3; + options.level0_slowdown_writes_trigger = 3; + options.level0_stop_writes_trigger = 999999; + options.delayed_write_rate = 200000; // About 200KB/s limited rate + options.soft_rate_limit = 1.1; + options.target_file_size_base = 99999999; // All into one file + options.max_bytes_for_level_base = 50000; + options.compression = kNoCompression; + + Reopen(options); + Put(Key(0), ""); + + // Only allow two compactions + port::Mutex mut; + port::CondVar cv(&mut); + std::atomic compaction_cnt(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void* arg) { + // Three flushes and the first compaction, + // three flushes and the second compaction go through. + MutexLock l(&mut); + while (compaction_cnt.load() >= 8) { + cv.Wait(); + } + compaction_cnt.fetch_add(1); + }); + + std::atomic sleep_count(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::DelayWrite:Sleep", [&](void* arg) { sleep_count.fetch_add(1); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < 3; i++) { + Put(Key(i), std::string(5000, 'x')); + Put(Key(100 - i), std::string(5000, 'x')); + Flush(); + } + while (compaction_cnt.load() < 4 || NumTableFilesAtLevel(0) > 0) { + env_->SleepForMicroseconds(1000); + } + // Now there is one L1 file but doesn't trigger soft_rate_limit + ASSERT_EQ(NumTableFilesAtLevel(1), 1); + ASSERT_EQ(sleep_count.load(), 0); + + for (int i = 0; i < 3; i++) { + Put(Key(10 + i), std::string(5000, 'x')); + Put(Key(90 - i), std::string(5000, 'x')); + Flush(); + } + while (compaction_cnt.load() < 8 || NumTableFilesAtLevel(0) > 0) { + env_->SleepForMicroseconds(1000); + } + ASSERT_EQ(NumTableFilesAtLevel(1), 1); + ASSERT_EQ(sleep_count.load(), 0); + + // Slowdown is triggered now + for (int i = 0; i < 10; i++) { + Put(Key(i), std::string(100, 'x')); + } + ASSERT_GT(sleep_count.load(), 0); + + { + MutexLock l(&mut); + compaction_cnt.store(7); + cv.SignalAll(); + } + while (NumTableFilesAtLevel(1) > 0) { + env_->SleepForMicroseconds(1000); + } + + // Slowdown is not triggered any more. + sleep_count.store(0); + // Slowdown is not triggered now + for (int i = 0; i < 10; i++) { + Put(Key(i), std::string(100, 'x')); + } + ASSERT_EQ(sleep_count.load(), 0); + + // shrink level base so L2 will hit soft limit easier. + ASSERT_OK(dbfull()->SetOptions({ + {"max_bytes_for_level_base", "5000"}, + })); + compaction_cnt.store(7); + Flush(); + + while (NumTableFilesAtLevel(0) == 0) { + env_->SleepForMicroseconds(1000); + } + + // Slowdown is triggered now + for (int i = 0; i < 10; i++) { + Put(Key(i), std::string(100, 'x')); + } + ASSERT_GT(sleep_count.load(), 0); + + { + MutexLock l(&mut); + compaction_cnt.store(7); + cv.SignalAll(); + } + + while (NumTableFilesAtLevel(2) != 0) { + env_->SleepForMicroseconds(1000); + } + + // Slowdown is not triggered anymore + sleep_count.store(0); + for (int i = 0; i < 10; i++) { + Put(Key(i), std::string(100, 'x')); + } + ASSERT_EQ(sleep_count.load(), 0); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 598c6b805..d809d0f16 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -66,7 +66,7 @@ class MemTableListTest : public testing::Test { EnvOptions env_options; shared_ptr table_cache(NewLRUCache(50000, 16)); WriteBuffer write_buffer(db_options.db_write_buffer_size); - WriteController write_controller; + WriteController write_controller(10000000u); CreateDB(); VersionSet versions(dbname, &db_options, env_options, table_cache.get(), diff --git a/db/version_set.cc b/db/version_set.cc index 7494ce5ee..397e63fda 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -44,6 +44,7 @@ #include "util/coding.h" #include "util/logging.h" #include "util/stop_watch.h" +#include "util/sync_point.h" namespace rocksdb { @@ -1924,6 +1925,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, mu->Unlock(); + TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); if (!edit->IsColumnFamilyManipulation() && db_options_->max_open_files == -1) { // unlimited table cache. Pre-load table handle now. @@ -2494,7 +2496,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, ColumnFamilyOptions cf_options(*options); std::shared_ptr tc(NewLRUCache(options->max_open_files - 10, options->table_cache_numshardbits)); - WriteController wc; + WriteController wc(options->delayed_write_rate); WriteBuffer wb(options->db_write_buffer_size); VersionSet versions(dbname, options, env_options, tc.get(), &wb, &wc); Status status; diff --git a/db/write_controller.cc b/db/write_controller.cc index bb6f8ecf7..c26f6fbc4 100644 --- a/db/write_controller.cc +++ b/db/write_controller.cc @@ -5,7 +5,9 @@ #include "db/write_controller.h" +#include #include +#include "rocksdb/env.h" namespace rocksdb { @@ -14,15 +16,83 @@ std::unique_ptr WriteController::GetStopToken() { return std::unique_ptr(new StopWriteToken(this)); } -std::unique_ptr WriteController::GetDelayToken( - uint64_t delay_us) { - total_delay_us_ += delay_us; - return std::unique_ptr( - new DelayWriteToken(this, delay_us)); +std::unique_ptr WriteController::GetDelayToken() { + if (total_delayed_++ == 0) { + last_refill_time_ = 0; + bytes_left_ = 0; + } + return std::unique_ptr(new DelayWriteToken(this)); } bool WriteController::IsStopped() const { return total_stopped_ > 0; } -uint64_t WriteController::GetDelay() const { return total_delay_us_; } +// Tihs is inside DB mutex, so we can't sleep and need to minimize +// frequency to get time. +// If it turns out to be a performance issue, we can redesign the thread +// synchronization model here. +// The function trust caller will sleep micros returned. +uint64_t WriteController::GetDelay(Env* env, uint64_t num_bytes) { + if (total_stopped_ > 0) { + return 0; + } + if (total_delayed_ == 0) { + return 0; + } + + const uint64_t kMicrosPerSecond = 1000000; + const uint64_t kRefillInterval = 1024U; + + if (bytes_left_ >= num_bytes) { + bytes_left_ -= num_bytes; + return 0; + } + // The frequency to get time inside DB mutex is less than one per refill + // interval. + auto time_now = env->NowMicros(); + + uint64_t sleep_debt = 0; + uint64_t time_since_last_refill = 0; + if (last_refill_time_ != 0) { + if (last_refill_time_ > time_now) { + sleep_debt = last_refill_time_ - time_now; + } else { + time_since_last_refill = time_now - last_refill_time_; + bytes_left_ += + static_cast(static_cast(time_since_last_refill) / + kMicrosPerSecond * delayed_write_rate_); + if (time_since_last_refill >= kRefillInterval && + bytes_left_ > num_bytes) { + // If refill interval already passed and we have enough bytes + // return without extra sleeping. + last_refill_time_ = time_now; + bytes_left_ -= num_bytes; + return 0; + } + } + } + + uint64_t single_refill_amount = + delayed_write_rate_ * kRefillInterval / kMicrosPerSecond; + if (bytes_left_ + single_refill_amount >= num_bytes) { + // Wait until a refill interval + // Never trigger expire for less than one refill interval to avoid to get + // time. + bytes_left_ = bytes_left_ + single_refill_amount - num_bytes; + last_refill_time_ = time_now + kRefillInterval; + return kRefillInterval + sleep_debt; + } + + // Need to refill more than one interval. Need to sleep longer. Check + // whether expiration will hit + + // Sleep just until `num_bytes` is allowed. + uint64_t sleep_amount = + static_cast(num_bytes / + static_cast(delayed_write_rate_) * + kMicrosPerSecond) + + sleep_debt; + last_refill_time_ = time_now + sleep_amount; + return sleep_amount; +} StopWriteToken::~StopWriteToken() { assert(controller_->total_stopped_ >= 1); @@ -30,8 +100,8 @@ StopWriteToken::~StopWriteToken() { } DelayWriteToken::~DelayWriteToken() { - assert(controller_->total_delay_us_ >= delay_us_); - controller_->total_delay_us_ -= delay_us_; + controller_->total_delayed_--; + assert(controller_->total_delayed_ >= 0); } } // namespace rocksdb diff --git a/db/write_controller.h b/db/write_controller.h index 32e1d58f1..50e5a99be 100644 --- a/db/write_controller.h +++ b/db/write_controller.h @@ -11,6 +11,7 @@ namespace rocksdb { +class Env; class WriteControllerToken; // WriteController is controlling write stalls in our write code-path. Write @@ -19,20 +20,38 @@ class WriteControllerToken; // to be called while holding DB mutex class WriteController { public: - WriteController() : total_stopped_(0), total_delay_us_(0) {} + explicit WriteController(uint64_t delayed_write_rate = 1024u * 1024u * 32u) + : total_stopped_(0), + total_delayed_(0), + bytes_left_(0), + last_refill_time_(0) { + set_delayed_write_rate(delayed_write_rate); + } ~WriteController() = default; // When an actor (column family) requests a stop token, all writes will be // stopped until the stop token is released (deleted) std::unique_ptr GetStopToken(); // When an actor (column family) requests a delay token, total delay for all - // writes will be increased by delay_us. The delay will last until delay token - // is released - std::unique_ptr GetDelayToken(uint64_t delay_us); + // writes to the DB will be controlled under the delayed write rate. Every + // write needs to call GetDelay() with number of bytes writing to the DB, + // which returns number of microseconds to sleep. + std::unique_ptr GetDelayToken(); // these two metods are querying the state of the WriteController bool IsStopped() const; - uint64_t GetDelay() const; + bool NeedsDelay() const { return total_delayed_ > 0; } + // return how many microseconds the caller needs to sleep after the call + // num_bytes: how many number of bytes to put into the DB. + // Prerequisite: DB mutex held. + uint64_t GetDelay(Env* env, uint64_t num_bytes); + void set_delayed_write_rate(uint64_t delayed_write_rate) { + delayed_write_rate_ = delayed_write_rate; + if (delayed_write_rate_ == 0) { + // avoid divide 0 + delayed_write_rate_ = 1U; + } + } private: friend class WriteControllerToken; @@ -40,7 +59,10 @@ class WriteController { friend class DelayWriteToken; int total_stopped_; - uint64_t total_delay_us_; + int total_delayed_; + uint64_t bytes_left_; + uint64_t last_refill_time_; + uint64_t delayed_write_rate_; }; class WriteControllerToken { @@ -67,12 +89,9 @@ class StopWriteToken : public WriteControllerToken { class DelayWriteToken : public WriteControllerToken { public: - DelayWriteToken(WriteController* controller, uint64_t delay_us) - : WriteControllerToken(controller), delay_us_(delay_us) {} + explicit DelayWriteToken(WriteController* controller) + : WriteControllerToken(controller) {} virtual ~DelayWriteToken(); - - private: - uint64_t delay_us_; }; } // namespace rocksdb diff --git a/db/write_controller_test.cc b/db/write_controller_test.cc index 41f831389..aa8175d65 100644 --- a/db/write_controller_test.cc +++ b/db/write_controller_test.cc @@ -5,14 +5,22 @@ // #include "db/write_controller.h" +#include "rocksdb/env.h" #include "util/testharness.h" namespace rocksdb { class WriteControllerTest : public testing::Test {}; +class TimeSetEnv : public EnvWrapper { + public: + explicit TimeSetEnv() : EnvWrapper(nullptr) {} + uint64_t now_micros_ = 6666; + virtual uint64_t NowMicros() override { return now_micros_; } +}; + TEST_F(WriteControllerTest, SanityTest) { - WriteController controller; + WriteController controller(10000000u); auto stop_token_1 = controller.GetStopToken(); auto stop_token_2 = controller.GetStopToken(); @@ -22,15 +30,66 @@ TEST_F(WriteControllerTest, SanityTest) { stop_token_2.reset(); ASSERT_FALSE(controller.IsStopped()); - auto delay_token_1 = controller.GetDelayToken(5); - ASSERT_EQ(static_cast(5), controller.GetDelay()); - auto delay_token_2 = controller.GetDelayToken(8); - ASSERT_EQ(static_cast(13), controller.GetDelay()); + TimeSetEnv env; + + auto delay_token_1 = controller.GetDelayToken(); + ASSERT_EQ(static_cast(2000000), + controller.GetDelay(&env, 20000000u)); + + env.now_micros_ += 1999900u; // sleep debt 1000 + auto delay_token_2 = controller.GetDelayToken(); + // One refill: 10240 bytes allowed, 1000 used, 9240 left + ASSERT_EQ(static_cast(1124), controller.GetDelay(&env, 1000u)); + env.now_micros_ += 1124u; // sleep debt 0 delay_token_2.reset(); - ASSERT_EQ(static_cast(5), controller.GetDelay()); + // 1000 used, 8240 left + ASSERT_EQ(static_cast(0), controller.GetDelay(&env, 1000u)); + + env.now_micros_ += 100u; // sleep credit 100 + // 1000 used, 7240 left + ASSERT_EQ(static_cast(0), controller.GetDelay(&env, 1000u)); + + env.now_micros_ += 100u; // sleep credit 200 + // One refill: 10240 fileed, sleep credit generates 2000. 8000 used + // 7240 + 10240 + 2000 - 8000 = 11480 left + ASSERT_EQ(static_cast(1024u), controller.GetDelay(&env, 8000u)); + + env.now_micros_ += 200u; // sleep debt 824 + // 1000 used, 10480 left. + ASSERT_EQ(static_cast(0), controller.GetDelay(&env, 1000u)); + + env.now_micros_ += 200u; // sleep debt 624 + // Out of bound sleep, still 10480 left + ASSERT_EQ(static_cast(3000624u), + controller.GetDelay(&env, 30000000u)); + + env.now_micros_ += 3000724u; // sleep credit 100 + // 6000 used, 4480 left. + ASSERT_EQ(static_cast(0), controller.GetDelay(&env, 6000u)); + + env.now_micros_ += 200u; // sleep credit 300 + // One refill, credit 4480 balance + 3000 credit + 10240 refill + // Use 8000, 9720 left + ASSERT_EQ(static_cast(1024u), controller.GetDelay(&env, 8000u)); + + env.now_micros_ += 3024u; // sleep credit 2000 + + // 1720 left + ASSERT_EQ(static_cast(0u), controller.GetDelay(&env, 8000u)); + + // 1720 balance + 20000 credit = 20170 left + // Use 8000, 12170 left + ASSERT_EQ(static_cast(0u), controller.GetDelay(&env, 8000u)); + + // 4170 left + ASSERT_EQ(static_cast(0u), controller.GetDelay(&env, 8000u)); + + // Need a refill + ASSERT_EQ(static_cast(1024u), controller.GetDelay(&env, 9000u)); + delay_token_1.reset(); - ASSERT_EQ(static_cast(0), controller.GetDelay()); + ASSERT_EQ(static_cast(0), controller.GetDelay(&env, 30000000u)); delay_token_1.reset(); ASSERT_FALSE(controller.IsStopped()); } diff --git a/db/write_thread.cc b/db/write_thread.cc index 3d007219e..c77df9253 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -87,8 +87,9 @@ void WriteThread::ExitWriteThread(WriteThread::Writer* w, // // REQUIRES: Writer list must be non-empty // REQUIRES: First writer must have a non-nullptr batch -void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer, - autovector* write_batch_group) { +size_t WriteThread::BuildBatchGroup( + WriteThread::Writer** last_writer, + autovector* write_batch_group) { assert(!writers_.empty()); Writer* first = writers_.front(); assert(first->batch != nullptr); @@ -109,7 +110,7 @@ void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer, if (first->has_callback) { // TODO(agiardullo:) Batching not currently supported as this write may // fail if the callback function decides to abort this write. - return; + return size; } std::deque::iterator iter = writers_.begin(); @@ -155,6 +156,7 @@ void WriteThread::BuildBatchGroup(WriteThread::Writer** last_writer, w->in_batch_group = true; *last_writer = w; } + return size; } } // namespace rocksdb diff --git a/db/write_thread.h b/db/write_thread.h index 2ee1224cb..471cbca01 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -72,8 +72,9 @@ class WriteThread { // REQUIRES: db mutex held void ExitWriteThread(Writer* w, Writer* last_writer, Status status); - void BuildBatchGroup(Writer** last_writer, - autovector* write_batch_group); + // return total batch group size + size_t BuildBatchGroup(Writer** last_writer, + autovector* write_batch_group); private: // Queue of writers. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 31b94af41..1fab82cbb 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -480,8 +480,8 @@ struct ColumnFamilyOptions { // Dynamically changeable through SetOptions() API int max_grandparent_overlap_factor; - // Puts are delayed 0-1 ms when any level has a compaction score that exceeds - // soft_rate_limit. This is ignored when == 0.0. + // Puts are delayed to options.delayed_write_rate when any level has a + // compaction score that exceeds soft_rate_limit. This is ignored when == 0.0. // CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not // hold, RocksDB will set soft_rate_limit = hard_rate_limit // @@ -490,12 +490,7 @@ struct ColumnFamilyOptions { // Dynamically changeable through SetOptions() API double soft_rate_limit; - // Puts are delayed 1ms at a time when any level has a compaction score that - // exceeds hard_rate_limit. This is ignored when <= 1.0. - // - // Default: 0 (disabled) - // - // Dynamically changeable through SetOptions() API + // DEPRECATED -- this options is no longer usde double hard_rate_limit; // DEPRECATED -- this options is no longer used @@ -1031,6 +1026,14 @@ struct DBOptions { // // Default: false bool enable_thread_tracking; + + // The limited write rate to DB if soft_rate_limit or + // level0_slowdown_writes_trigger is triggered. It is calcualted using + // size of user write requests before compression. + // Unit: byte per second. + // + // Default: 1MB/s + uint64_t delayed_write_rate; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index e7b29d24f..2e19cd0e9 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -537,7 +537,7 @@ void DumpManifestFile(std::string file, bool verbose, bool hex) { // if VersionSet::DumpManifest() depends on any option done by // SanitizeOptions(), we need to initialize it manually. options.db_paths.emplace_back("dummy", 0); - WriteController wc; + WriteController wc(options.delayed_write_rate); WriteBuffer wb(options.db_write_buffer_size); VersionSet versions(dbname, &options, sopt, tc.get(), &wb, &wc); Status s = versions.DumpManifest(options, file, verbose, hex); @@ -1146,7 +1146,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, std::shared_ptr tc( NewLRUCache(opt.max_open_files - 10, opt.table_cache_numshardbits)); const InternalKeyComparator cmp(opt.comparator); - WriteController wc; + WriteController wc(opt.delayed_write_rate); WriteBuffer wb(opt.db_write_buffer_size); VersionSet versions(db_path_, &opt, soptions, tc.get(), &wb, &wc); std::vector dummy; diff --git a/util/options.cc b/util/options.cc index 57c0cab96..111a55d2e 100644 --- a/util/options.cc +++ b/util/options.cc @@ -241,7 +241,8 @@ DBOptions::DBOptions() bytes_per_sync(0), wal_bytes_per_sync(0), listeners(), - enable_thread_tracking(false) { + enable_thread_tracking(false), + delayed_write_rate(1024U * 1024U) { } DBOptions::DBOptions(const Options& options) @@ -286,7 +287,8 @@ DBOptions::DBOptions(const Options& options) bytes_per_sync(options.bytes_per_sync), wal_bytes_per_sync(options.wal_bytes_per_sync), listeners(options.listeners), - enable_thread_tracking(options.enable_thread_tracking) {} + enable_thread_tracking(options.enable_thread_tracking), + delayed_write_rate(options.delayed_write_rate) {} static const char* const access_hints[] = { "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED"