diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 4e836d030..9f2a84e43 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -10,6 +10,7 @@ #include "util/rate_limiter.h" #include "port/port.h" #include "rocksdb/env.h" +#include "util/sync_point.h" namespace rocksdb { @@ -17,7 +18,8 @@ namespace rocksdb { // Pending request struct GenericRateLimiter::Req { explicit Req(int64_t _bytes, port::Mutex* _mu) - : bytes(_bytes), cv(_mu), granted(false) {} + : request_bytes(_bytes), bytes(_bytes), cv(_mu), granted(false) {} + int64_t request_bytes; int64_t bytes; port::CondVar cv; bool granted; @@ -70,7 +72,7 @@ void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { assert(bytes <= refill_bytes_per_period_.load(std::memory_order_relaxed)); - + TEST_SYNC_POINT("GenericRateLimiter::Request"); MutexLock g(&request_mutex_); if (stop_) { return; @@ -175,6 +177,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri) { } void GenericRateLimiter::Refill() { + TEST_SYNC_POINT("GenericRateLimiter::Refill"); next_refill_us_ = env_->NowMicros() + refill_period_us_; // Carry over the left over quota from the last period auto refill_bytes_per_period = @@ -189,10 +192,14 @@ void GenericRateLimiter::Refill() { auto* queue = &queue_[use_pri]; while (!queue->empty()) { auto* next_req = queue->front(); - if (available_bytes_ < next_req->bytes) { + if (available_bytes_ < next_req->request_bytes) { + // avoid starvation + next_req->request_bytes -= available_bytes_; + available_bytes_ = 0; break; } - available_bytes_ -= next_req->bytes; + available_bytes_ -= next_req->request_bytes; + next_req->request_bytes = 0; total_bytes_through_[use_pri] += next_req->bytes; queue->pop_front(); diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index d1152ed56..43fa5ef45 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -11,12 +11,13 @@ #define __STDC_FORMAT_MACROS #endif +#include "util/rate_limiter.h" #include #include -#include "util/testharness.h" -#include "util/rate_limiter.h" -#include "util/random.h" #include "rocksdb/env.h" +#include "util/random.h" +#include "util/sync_point.h" +#include "util/testharness.h" namespace rocksdb { @@ -92,6 +93,55 @@ TEST_F(RateLimiterTest, Rate) { } } +TEST_F(RateLimiterTest, LimitChangeTest) { + // starvation test when limit changes to a smaller value + int64_t refill_period = 1000 * 1000; + auto* env = Env::Default(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + struct Arg { + Arg(int32_t _request_size, Env::IOPriority _pri, + std::shared_ptr _limiter) + : request_size(_request_size), pri(_pri), limiter(_limiter) {} + int32_t request_size; + Env::IOPriority pri; + std::shared_ptr limiter; + }; + + auto writer = [](void* p) { + auto* arg = static_cast(p); + arg->limiter->Request(arg->request_size, arg->pri); + }; + + for (uint32_t i = 1; i <= 16; i <<= 1) { + int32_t target = i * 1024 * 10; + // refill per second + for (int iter = 0; iter < 2; iter++) { + std::shared_ptr limiter = + std::make_shared(target, refill_period, 10); + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"GenericRateLimiter::Request", + "RateLimiterTest::LimitChangeTest:changeLimitStart"}, + {"RateLimiterTest::LimitChangeTest:changeLimitEnd", + "GenericRateLimiter::Refill"}}); + Arg arg(target, Env::IO_HIGH, limiter); + // The idea behind is to start a request first, then before it refills, + // update limit to a different value (2X/0.5X). No starvation should + // be guaranteed under any situation + // TODO(lightmark): more test cases are welcome. + env->StartThread(writer, &arg); + int32_t new_limit = (target << 1) >> (iter << 1); + TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart"); + arg.limiter->SetBytesPerSecond(new_limit); + TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd"); + env->WaitForJoin(); + fprintf(stderr, + "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32 + "KB/sec, refill period %" PRIi64 " ms\n", + target / 1024, new_limit / 1024, refill_period / 1000); + } + } +} + } // namespace rocksdb int main(int argc, char** argv) {