From 1026e794a368c1a9c98f442e24e1af12d23b3ffb Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Wed, 4 Oct 2017 19:02:22 -0700 Subject: [PATCH] rate limit auto-tuning Summary: Dynamic adjustment of rate limit according to demand for background I/O. It increases by a factor when limiter is drained too frequently, and decreases by the same factor when limiter is not drained frequently enough. The parameters for this behavior are fixed in `GenericRateLimiter::Tune`. Other changes: - make rate limiter's `Env*` configurable for testing - track num drain intervals in RateLimiter so we don't have to rely on stats, which may be shared across different DB instances from the ones that share the RateLimiter. Closes https://github.com/facebook/rocksdb/pull/2899 Differential Revision: D5858704 Pulled By: ajkr fbshipit-source-id: cc2bac30f85e7f6fd63655d0a6732ef9ed7403b1 --- HISTORY.md | 2 + Makefile | 2 +- include/rocksdb/rate_limiter.h | 6 ++- tools/db_bench_tool.cc | 7 ++- util/rate_limiter.cc | 83 +++++++++++++++++++++++++++++++--- util/rate_limiter.h | 17 +++++-- util/rate_limiter_test.cc | 69 ++++++++++++++++++++++++++-- 7 files changed, 167 insertions(+), 19 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 7c1aaaba0..6f2c16f80 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,8 @@ ### New Features * `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file. +* Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound. + ### Bug Fixes * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. diff --git a/Makefile b/Makefile index 487bc65df..791ffc7a3 100644 --- a/Makefile +++ b/Makefile @@ -1272,7 +1272,7 @@ env_test: env/env_test.o $(LIBOBJECTS) $(TESTHARNESS) fault_injection_test: db/fault_injection_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) -rate_limiter_test: util/rate_limiter_test.o $(LIBOBJECTS) $(TESTHARNESS) +rate_limiter_test: util/rate_limiter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) delete_scheduler_test: util/delete_scheduler_test.o $(LIBOBJECTS) $(TESTHARNESS) diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 838c98a6d..841838035 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -127,9 +127,13 @@ class RateLimiter { // 1/fairness chance even though high-pri requests exist to avoid starvation. // You should be good by leaving it at default 10. // @mode: Mode indicates which types of operations count against the limit. +// @auto_tuned: Enables dynamic adjustment of rate limit within the range +// `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to +// the recent demand for background I/O. extern RateLimiter* NewGenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000, int32_t fairness = 10, - RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly); + RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly, + bool auto_tuned = false); } // namespace rocksdb diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index a23a247fe..3fa45a693 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -826,6 +826,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); +DEFINE_bool(rate_limiter_auto_tuned, false, + "Enable dynamic adjustment of rate limit according to demand for " + "background I/O"); + DEFINE_bool(rate_limit_bg_reads, false, "Use options.rate_limiter on compaction reads"); @@ -3257,7 +3261,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, 10 /* fairness */, FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly - : RateLimiter::Mode::kWritesOnly)); + : RateLimiter::Mode::kWritesOnly, + FLAGS_rate_limiter_auto_tuned)); } if (FLAGS_num_multi_db <= 1) { diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index b9160b250..9d23c38f7 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -45,13 +45,15 @@ struct GenericRateLimiter::Req { GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us, - int32_t fairness, RateLimiter::Mode mode) + int32_t fairness, RateLimiter::Mode mode, + Env* env, bool auto_tuned) : RateLimiter(mode), refill_period_us_(refill_period_us), - rate_bytes_per_sec_(rate_bytes_per_sec), + rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 + : rate_bytes_per_sec), refill_bytes_per_period_( - CalculateRefillBytesPerPeriod(rate_bytes_per_sec)), - env_(Env::Default()), + CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), + env_(env), stop_(false), exit_cv_(&request_mutex_), requests_to_wait_(0), @@ -59,7 +61,12 @@ GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, next_refill_us_(NowMicrosMonotonic(env_)), fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), - leader_(nullptr) { + leader_(nullptr), + auto_tuned_(auto_tuned), + num_drains_(0), + prev_num_drains_(0), + max_bytes_per_sec_(rate_bytes_per_sec), + tuned_time_(NowMicrosMonotonic(env_)) { total_requests_[0] = 0; total_requests_[1] = 0; total_bytes_through_[0] = 0; @@ -98,6 +105,16 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1", &rate_bytes_per_sec_); MutexLock g(&request_mutex_); + + if (auto_tuned_) { + static const int kRefillsPerTune = 100; + std::chrono::microseconds now(NowMicrosMonotonic(env_)); + if (now - tuned_time_ >= + kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { + Tune(); + } + } + if (stop_) { return; } @@ -138,6 +155,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } else { int64_t wait_until = env_->NowMicros() + delta; RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); + ++num_drains_; timedout = r.cv.TimedWait(wait_until); } } else { @@ -256,15 +274,66 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( } } +Status GenericRateLimiter::Tune() { + const int kLowWatermarkPct = 50; + const int kHighWatermarkPct = 90; + const int kAdjustFactorPct = 5; + // computed rate limit will be in + // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`. + const int kAllowedRangeFactor = 20; + + std::chrono::microseconds prev_tuned_time = tuned_time_; + tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_)); + + int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + + std::chrono::microseconds(refill_period_us_) - + std::chrono::microseconds(1)) / + std::chrono::microseconds(refill_period_us_); + // We tune every kRefillsPerTune intervals, so the overflow and division-by- + // zero conditions should never happen. + assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100); + assert(elapsed_intervals > 0); + int64_t drained_pct = + (num_drains_ - prev_num_drains_) * 100 / elapsed_intervals; + + int64_t prev_bytes_per_sec = GetBytesPerSecond(); + int64_t new_bytes_per_sec; + if (drained_pct == 0) { + new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor; + } else if (drained_pct < kLowWatermarkPct) { + // sanitize to prevent overflow + int64_t sanitized_prev_bytes_per_sec = + std::min(prev_bytes_per_sec, port::kMaxInt64 / 100); + new_bytes_per_sec = + std::max(max_bytes_per_sec_ / kAllowedRangeFactor, + sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); + } else if (drained_pct > kHighWatermarkPct) { + // sanitize to prevent overflow + int64_t sanitized_prev_bytes_per_sec = std::min( + prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct)); + new_bytes_per_sec = + std::min(max_bytes_per_sec_, + sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); + } else { + new_bytes_per_sec = prev_bytes_per_sec; + } + if (new_bytes_per_sec != prev_bytes_per_sec) { + SetBytesPerSecond(new_bytes_per_sec); + } + num_drains_ = prev_num_drains_; + return Status::OK(); +} + RateLimiter* NewGenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, int32_t fairness /* = 10 */, - RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */) { + RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */, + bool auto_tuned /* = false */) { assert(rate_bytes_per_sec > 0); assert(refill_period_us > 0); assert(fairness > 0); return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, - mode); + mode, Env::Default(), auto_tuned); } } // namespace rocksdb diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 0564bd07c..cb91f0aeb 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -11,20 +11,21 @@ #include #include +#include #include #include "port/port.h" -#include "util/mutexlock.h" -#include "util/random.h" #include "rocksdb/env.h" #include "rocksdb/rate_limiter.h" +#include "util/mutexlock.h" +#include "util/random.h" namespace rocksdb { class GenericRateLimiter : public RateLimiter { public: GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us, - int32_t fairness, - RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly); + int32_t fairness, RateLimiter::Mode mode, Env* env, + bool auto_tuned); virtual ~GenericRateLimiter(); @@ -68,6 +69,8 @@ class GenericRateLimiter : public RateLimiter { private: void Refill(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); + Status Tune(); + uint64_t NowMicrosMonotonic(Env* env) { return env->NowNanos() / std::milli::den; } @@ -99,6 +102,12 @@ class GenericRateLimiter : public RateLimiter { struct Req; Req* leader_; std::deque queue_[Env::IO_TOTAL]; + + bool auto_tuned_; + int64_t num_drains_; + int64_t prev_num_drains_; + const int64_t max_bytes_per_sec_; + std::chrono::microseconds tuned_time_; }; } // namespace rocksdb diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index f099808b5..95da6cd53 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -12,8 +12,12 @@ #endif #include "util/rate_limiter.h" + #include +#include #include + +#include "db/db_test_util.h" #include "rocksdb/env.h" #include "util/random.h" #include "util/sync_point.h" @@ -25,7 +29,9 @@ namespace rocksdb { class RateLimiterTest : public testing::Test {}; TEST_F(RateLimiterTest, OverflowRate) { - GenericRateLimiter limiter(port::kMaxInt64, 1000, 10); + GenericRateLimiter limiter(port::kMaxInt64, 1000, 10, + RateLimiter::Mode::kWritesOnly, Env::Default(), + false /* auto_tuned */); ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll); } @@ -36,9 +42,9 @@ TEST_F(RateLimiterTest, StartStop) { TEST_F(RateLimiterTest, Modes) { for (auto mode : {RateLimiter::Mode::kWritesOnly, RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { - GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */, - 1000 * 1000 /* refill_period_us */, - 10 /* fairness */, mode); + GenericRateLimiter limiter( + 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, mode, Env::Default(), false /* auto_tuned */); limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); if (mode == RateLimiter::Mode::kWritesOnly) { @@ -147,7 +153,9 @@ TEST_F(RateLimiterTest, LimitChangeTest) { // refill per second for (int iter = 0; iter < 2; iter++) { std::shared_ptr limiter = - std::make_shared(target, refill_period, 10); + std::make_shared( + target, refill_period, 10, RateLimiter::Mode::kWritesOnly, + Env::Default(), false /* auto_tuned */); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"GenericRateLimiter::Request", "RateLimiterTest::LimitChangeTest:changeLimitStart"}, @@ -172,6 +180,57 @@ TEST_F(RateLimiterTest, LimitChangeTest) { } } +TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { + const std::chrono::seconds kTimePerRefill(1); + const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc + + SpecialEnv special_env(Env::Default()); + special_env.no_slowdown_ = true; + special_env.time_elapse_only_sleep_ = true; + + auto stats = CreateDBStatistics(); + std::unique_ptr rate_limiter(new GenericRateLimiter( + 1000 /* rate_bytes_per_sec */, + std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, + RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */)); + + // Use callback to advance time because we need to advance (1) after Request() + // has determined the bytes are not available; and (2) before Refill() + // computes the next refill time (ensuring refill time in the future allows + // the next request to drain the rate limiter). + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Refill", [&](void* arg) { + special_env.SleepForMicroseconds(static_cast( + std::chrono::microseconds(kTimePerRefill).count())); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // verify rate limit increases after a sequence of periods where rate limiter + // is always drained + int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); + rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), + RateLimiter::OpType::kWrite); + while (std::chrono::microseconds(special_env.NowMicros()) <= + kRefillsPerTune * kTimePerRefill) { + rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), + RateLimiter::OpType::kWrite); + } + int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); + ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + // decreases after a sequence of periods where rate limiter is not drained + orig_bytes_per_sec = new_bytes_per_sec; + special_env.SleepForMicroseconds(static_cast( + kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count())); + // make a request so tuner can be triggered + rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(), + RateLimiter::OpType::kWrite); + new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); + ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); +} + } // namespace rocksdb int main(int argc, char** argv) {