diff --git a/HISTORY.md b/HISTORY.md index 7c1aaaba0..6f2c16f80 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -5,6 +5,8 @@ ### New Features * `DBOptions::bytes_per_sync` and `DBOptions::wal_bytes_per_sync` can now be changed dynamically, `DBOptions::wal_bytes_per_sync` will flush all memtables and switch to a new WAL file. +* Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound. + ### Bug Fixes * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. diff --git a/Makefile b/Makefile index 487bc65df..791ffc7a3 100644 --- a/Makefile +++ b/Makefile @@ -1272,7 +1272,7 @@ env_test: env/env_test.o $(LIBOBJECTS) $(TESTHARNESS) fault_injection_test: db/fault_injection_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) -rate_limiter_test: util/rate_limiter_test.o $(LIBOBJECTS) $(TESTHARNESS) +rate_limiter_test: util/rate_limiter_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) delete_scheduler_test: util/delete_scheduler_test.o $(LIBOBJECTS) $(TESTHARNESS) diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 838c98a6d..841838035 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -127,9 +127,13 @@ class RateLimiter { // 1/fairness chance even though high-pri requests exist to avoid starvation. // You should be good by leaving it at default 10. // @mode: Mode indicates which types of operations count against the limit. +// @auto_tuned: Enables dynamic adjustment of rate limit within the range +// `[rate_bytes_per_sec / 20, rate_bytes_per_sec]`, according to +// the recent demand for background I/O. extern RateLimiter* NewGenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us = 100 * 1000, int32_t fairness = 10, - RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly); + RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly, + bool auto_tuned = false); } // namespace rocksdb diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index a23a247fe..3fa45a693 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -826,6 +826,10 @@ DEFINE_int32(rate_limit_delay_max_milliseconds, 1000, DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); +DEFINE_bool(rate_limiter_auto_tuned, false, + "Enable dynamic adjustment of rate limit according to demand for " + "background I/O"); + DEFINE_bool(rate_limit_bg_reads, false, "Use options.rate_limiter on compaction reads"); @@ -3257,7 +3261,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { FLAGS_rate_limiter_bytes_per_sec, 100 * 1000 /* refill_period_us */, 10 /* fairness */, FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly - : RateLimiter::Mode::kWritesOnly)); + : RateLimiter::Mode::kWritesOnly, + FLAGS_rate_limiter_auto_tuned)); } if (FLAGS_num_multi_db <= 1) { diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index b9160b250..9d23c38f7 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -45,13 +45,15 @@ struct GenericRateLimiter::Req { GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, int64_t refill_period_us, - int32_t fairness, RateLimiter::Mode mode) + int32_t fairness, RateLimiter::Mode mode, + Env* env, bool auto_tuned) : RateLimiter(mode), refill_period_us_(refill_period_us), - rate_bytes_per_sec_(rate_bytes_per_sec), + rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 + : rate_bytes_per_sec), refill_bytes_per_period_( - CalculateRefillBytesPerPeriod(rate_bytes_per_sec)), - env_(Env::Default()), + CalculateRefillBytesPerPeriod(rate_bytes_per_sec_)), + env_(env), stop_(false), exit_cv_(&request_mutex_), requests_to_wait_(0), @@ -59,7 +61,12 @@ GenericRateLimiter::GenericRateLimiter(int64_t rate_bytes_per_sec, next_refill_us_(NowMicrosMonotonic(env_)), fairness_(fairness > 100 ? 100 : fairness), rnd_((uint32_t)time(nullptr)), - leader_(nullptr) { + leader_(nullptr), + auto_tuned_(auto_tuned), + num_drains_(0), + prev_num_drains_(0), + max_bytes_per_sec_(rate_bytes_per_sec), + tuned_time_(NowMicrosMonotonic(env_)) { total_requests_[0] = 0; total_requests_[1] = 0; total_bytes_through_[0] = 0; @@ -98,6 +105,16 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1", &rate_bytes_per_sec_); MutexLock g(&request_mutex_); + + if (auto_tuned_) { + static const int kRefillsPerTune = 100; + std::chrono::microseconds now(NowMicrosMonotonic(env_)); + if (now - tuned_time_ >= + kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { + Tune(); + } + } + if (stop_) { return; } @@ -138,6 +155,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } else { int64_t wait_until = env_->NowMicros() + delta; RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); + ++num_drains_; timedout = r.cv.TimedWait(wait_until); } } else { @@ -256,15 +274,66 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( } } +Status GenericRateLimiter::Tune() { + const int kLowWatermarkPct = 50; + const int kHighWatermarkPct = 90; + const int kAdjustFactorPct = 5; + // computed rate limit will be in + // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`. + const int kAllowedRangeFactor = 20; + + std::chrono::microseconds prev_tuned_time = tuned_time_; + tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(env_)); + + int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + + std::chrono::microseconds(refill_period_us_) - + std::chrono::microseconds(1)) / + std::chrono::microseconds(refill_period_us_); + // We tune every kRefillsPerTune intervals, so the overflow and division-by- + // zero conditions should never happen. + assert(num_drains_ - prev_num_drains_ <= port::kMaxInt64 / 100); + assert(elapsed_intervals > 0); + int64_t drained_pct = + (num_drains_ - prev_num_drains_) * 100 / elapsed_intervals; + + int64_t prev_bytes_per_sec = GetBytesPerSecond(); + int64_t new_bytes_per_sec; + if (drained_pct == 0) { + new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor; + } else if (drained_pct < kLowWatermarkPct) { + // sanitize to prevent overflow + int64_t sanitized_prev_bytes_per_sec = + std::min(prev_bytes_per_sec, port::kMaxInt64 / 100); + new_bytes_per_sec = + std::max(max_bytes_per_sec_ / kAllowedRangeFactor, + sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); + } else if (drained_pct > kHighWatermarkPct) { + // sanitize to prevent overflow + int64_t sanitized_prev_bytes_per_sec = std::min( + prev_bytes_per_sec, port::kMaxInt64 / (100 + kAdjustFactorPct)); + new_bytes_per_sec = + std::min(max_bytes_per_sec_, + sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); + } else { + new_bytes_per_sec = prev_bytes_per_sec; + } + if (new_bytes_per_sec != prev_bytes_per_sec) { + SetBytesPerSecond(new_bytes_per_sec); + } + num_drains_ = prev_num_drains_; + return Status::OK(); +} + RateLimiter* NewGenericRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, int32_t fairness /* = 10 */, - RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */) { + RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */, + bool auto_tuned /* = false */) { assert(rate_bytes_per_sec > 0); assert(refill_period_us > 0); assert(fairness > 0); return new GenericRateLimiter(rate_bytes_per_sec, refill_period_us, fairness, - mode); + mode, Env::Default(), auto_tuned); } } // namespace rocksdb diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 0564bd07c..cb91f0aeb 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -11,20 +11,21 @@ #include #include +#include #include #include "port/port.h" -#include "util/mutexlock.h" -#include "util/random.h" #include "rocksdb/env.h" #include "rocksdb/rate_limiter.h" +#include "util/mutexlock.h" +#include "util/random.h" namespace rocksdb { class GenericRateLimiter : public RateLimiter { public: GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us, - int32_t fairness, - RateLimiter::Mode mode = RateLimiter::Mode::kWritesOnly); + int32_t fairness, RateLimiter::Mode mode, Env* env, + bool auto_tuned); virtual ~GenericRateLimiter(); @@ -68,6 +69,8 @@ class GenericRateLimiter : public RateLimiter { private: void Refill(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); + Status Tune(); + uint64_t NowMicrosMonotonic(Env* env) { return env->NowNanos() / std::milli::den; } @@ -99,6 +102,12 @@ class GenericRateLimiter : public RateLimiter { struct Req; Req* leader_; std::deque queue_[Env::IO_TOTAL]; + + bool auto_tuned_; + int64_t num_drains_; + int64_t prev_num_drains_; + const int64_t max_bytes_per_sec_; + std::chrono::microseconds tuned_time_; }; } // namespace rocksdb diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index f099808b5..95da6cd53 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -12,8 +12,12 @@ #endif #include "util/rate_limiter.h" + #include +#include #include + +#include "db/db_test_util.h" #include "rocksdb/env.h" #include "util/random.h" #include "util/sync_point.h" @@ -25,7 +29,9 @@ namespace rocksdb { class RateLimiterTest : public testing::Test {}; TEST_F(RateLimiterTest, OverflowRate) { - GenericRateLimiter limiter(port::kMaxInt64, 1000, 10); + GenericRateLimiter limiter(port::kMaxInt64, 1000, 10, + RateLimiter::Mode::kWritesOnly, Env::Default(), + false /* auto_tuned */); ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll); } @@ -36,9 +42,9 @@ TEST_F(RateLimiterTest, StartStop) { TEST_F(RateLimiterTest, Modes) { for (auto mode : {RateLimiter::Mode::kWritesOnly, RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { - GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */, - 1000 * 1000 /* refill_period_us */, - 10 /* fairness */, mode); + GenericRateLimiter limiter( + 2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, mode, Env::Default(), false /* auto_tuned */); limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); if (mode == RateLimiter::Mode::kWritesOnly) { @@ -147,7 +153,9 @@ TEST_F(RateLimiterTest, LimitChangeTest) { // refill per second for (int iter = 0; iter < 2; iter++) { std::shared_ptr limiter = - std::make_shared(target, refill_period, 10); + std::make_shared( + target, refill_period, 10, RateLimiter::Mode::kWritesOnly, + Env::Default(), false /* auto_tuned */); rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"GenericRateLimiter::Request", "RateLimiterTest::LimitChangeTest:changeLimitStart"}, @@ -172,6 +180,57 @@ TEST_F(RateLimiterTest, LimitChangeTest) { } } +TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { + const std::chrono::seconds kTimePerRefill(1); + const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc + + SpecialEnv special_env(Env::Default()); + special_env.no_slowdown_ = true; + special_env.time_elapse_only_sleep_ = true; + + auto stats = CreateDBStatistics(); + std::unique_ptr rate_limiter(new GenericRateLimiter( + 1000 /* rate_bytes_per_sec */, + std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, + RateLimiter::Mode::kWritesOnly, &special_env, true /* auto_tuned */)); + + // Use callback to advance time because we need to advance (1) after Request() + // has determined the bytes are not available; and (2) before Refill() + // computes the next refill time (ensuring refill time in the future allows + // the next request to drain the rate limiter). + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Refill", [&](void* arg) { + special_env.SleepForMicroseconds(static_cast( + std::chrono::microseconds(kTimePerRefill).count())); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // verify rate limit increases after a sequence of periods where rate limiter + // is always drained + int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); + rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), + RateLimiter::OpType::kWrite); + while (std::chrono::microseconds(special_env.NowMicros()) <= + kRefillsPerTune * kTimePerRefill) { + rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), + RateLimiter::OpType::kWrite); + } + int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); + ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + // decreases after a sequence of periods where rate limiter is not drained + orig_bytes_per_sec = new_bytes_per_sec; + special_env.SleepForMicroseconds(static_cast( + kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count())); + // make a request so tuner can be triggered + rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(), + RateLimiter::OpType::kWrite); + new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); + ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); +} + } // namespace rocksdb int main(int argc, char** argv) {