diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 76a3bcf79..c32abc4f1 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -434,7 +434,13 @@ class Env { static std::string PriorityToString(Priority priority); // Priority for requesting bytes in rate limiter scheduler - enum IOPriority { IO_LOW = 0, IO_HIGH = 1, IO_TOTAL = 2 }; + enum IOPriority { + IO_LOW = 0, + IO_MID = 1, + IO_HIGH = 2, + IO_USER = 3, + IO_TOTAL = 4 + }; // Arrange to run "(*function)(arg)" once in a background thread, in // the thread pool specified by pri. By default, jobs go to the 'LOW' diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index a98400a9b..7d7936e5f 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -68,23 +68,28 @@ GenericRateLimiter::GenericRateLimiter( prev_num_drains_(0), max_bytes_per_sec_(rate_bytes_per_sec), tuned_time_(NowMicrosMonotonic()) { - total_requests_[0] = 0; - total_requests_[1] = 0; - total_bytes_through_[0] = 0; - total_bytes_through_[1] = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + total_requests_[i] = 0; + total_bytes_through_[i] = 0; + } } GenericRateLimiter::~GenericRateLimiter() { MutexLock g(&request_mutex_); stop_ = true; - requests_to_wait_ = static_cast(queue_[Env::IO_LOW].size() + - queue_[Env::IO_HIGH].size()); - for (auto& r : queue_[Env::IO_HIGH]) { - r->cv.Signal(); + std::deque::size_type queues_size_sum = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + queues_size_sum += queue_[i].size(); } - for (auto& r : queue_[Env::IO_LOW]) { - r->cv.Signal(); + requests_to_wait_ = static_cast(queues_size_sum); + + for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) { + std::deque queue = queue_[i]; + for (auto& r : queue) { + r->cv.Signal(); + } } + while (requests_to_wait_ > 0) { exit_cv_.Wait(); } @@ -171,10 +176,12 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, // If there is any remaining requests, make sure there exists at least // one candidate is awake for future duties by signaling a front request // of a queue. - if (!queue_[Env::IO_HIGH].empty()) { - queue_[Env::IO_HIGH].front()->cv.Signal(); - } else if (!queue_[Env::IO_LOW].empty()) { - queue_[Env::IO_LOW].front()->cv.Signal(); + for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) { + std::deque queue = queue_[i]; + if (!queue.empty()) { + queue.front()->cv.Signal(); + break; + } } } } @@ -205,6 +212,45 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, } } +std::vector +GenericRateLimiter::GeneratePriorityIterationOrder() { + std::vector pri_iteration_order(Env::IO_TOTAL /* 4 */); + // We make Env::IO_USER a superior priority by always iterating its queue + // first + pri_iteration_order[0] = Env::IO_USER; + + bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_); + TEST_SYNC_POINT_CALLBACK( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PostRandomOneInFairnessForHighPri", + &high_pri_iterated_after_mid_low_pri); + bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_); + TEST_SYNC_POINT_CALLBACK( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PostRandomOneInFairnessForMidPri", + &mid_pri_itereated_after_low_pri); + + if (high_pri_iterated_after_mid_low_pri) { + pri_iteration_order[3] = Env::IO_HIGH; + pri_iteration_order[2] = + mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW; + pri_iteration_order[1] = + (pri_iteration_order[2] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID; + } else { + pri_iteration_order[1] = Env::IO_HIGH; + pri_iteration_order[3] = + mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW; + pri_iteration_order[2] = + (pri_iteration_order[3] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID; + } + + TEST_SYNC_POINT_CALLBACK( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PreReturnPriIterationOrder", + &pri_iteration_order); + return pri_iteration_order; +} + void GenericRateLimiter::RefillBytesAndGrantRequests() { TEST_SYNC_POINT("GenericRateLimiter::RefillBytesAndGrantRequests"); next_refill_us_ = NowMicrosMonotonic() + refill_period_us_; @@ -215,10 +261,13 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { available_bytes_ += refill_bytes_per_period; } - int use_low_pri_first = rnd_.OneIn(fairness_) ? 0 : 1; - for (int q = 0; q < 2; ++q) { - auto use_pri = (use_low_pri_first == q) ? Env::IO_LOW : Env::IO_HIGH; - auto* queue = &queue_[use_pri]; + std::vector pri_iteration_order = + GeneratePriorityIterationOrder(); + + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + assert(!pri_iteration_order.empty()); + Env::IOPriority current_pri = pri_iteration_order[i]; + auto* queue = &queue_[current_pri]; while (!queue->empty()) { auto* next_req = queue->front(); if (available_bytes_ < next_req->request_bytes) { @@ -232,7 +281,7 @@ void GenericRateLimiter::RefillBytesAndGrantRequests() { } available_bytes_ -= next_req->request_bytes; next_req->request_bytes = 0; - total_bytes_through_[use_pri] += next_req->bytes; + total_bytes_through_[current_pri] += next_req->bytes; queue->pop_front(); next_req->granted = true; diff --git a/util/rate_limiter.h b/util/rate_limiter.h index aa5e6e5e1..f3f593c36 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -50,8 +50,11 @@ class GenericRateLimiter : public RateLimiter { const Env::IOPriority pri = Env::IO_TOTAL) const override { MutexLock g(&request_mutex_); if (pri == Env::IO_TOTAL) { - return total_bytes_through_[Env::IO_LOW] + - total_bytes_through_[Env::IO_HIGH]; + int64_t total_bytes_through_sum = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + total_bytes_through_sum += total_bytes_through_[i]; + } + return total_bytes_through_sum; } return total_bytes_through_[pri]; } @@ -60,7 +63,11 @@ class GenericRateLimiter : public RateLimiter { const Env::IOPriority pri = Env::IO_TOTAL) const override { MutexLock g(&request_mutex_); if (pri == Env::IO_TOTAL) { - return total_requests_[Env::IO_LOW] + total_requests_[Env::IO_HIGH]; + int64_t total_requests_sum = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + total_requests_sum += total_requests_[i]; + } + return total_requests_sum; } return total_requests_[pri]; } @@ -71,6 +78,7 @@ class GenericRateLimiter : public RateLimiter { private: void RefillBytesAndGrantRequests(); + std::vector GeneratePriorityIterationOrder(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); Status Tune(); diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index a4b1de2f4..27bbc2d31 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -11,6 +11,7 @@ #include #include +#include #include #include "db/db_test_util.h" @@ -35,6 +36,59 @@ TEST_F(RateLimiterTest, StartStop) { std::unique_ptr limiter(NewGenericRateLimiter(100, 100, 10)); } +TEST_F(RateLimiterTest, GetTotalBytesThrough) { + std::unique_ptr limiter(NewGenericRateLimiter( + 20 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */)); + for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { + ASSERT_EQ(limiter->GetTotalBytesThrough(static_cast(i)), + 0); + } + + std::int64_t request_byte = 10; + std::int64_t request_byte_sum = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + limiter->Request(request_byte, static_cast(i), + nullptr /* stats */, RateLimiter::OpType::kWrite); + request_byte_sum += request_byte; + } + + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + EXPECT_EQ(limiter->GetTotalBytesThrough(static_cast(i)), + request_byte) + << "Failed to track total_bytes_through_ correctly when IOPriority = " + << static_cast(i); + } + EXPECT_EQ(limiter->GetTotalBytesThrough(Env::IO_TOTAL), request_byte_sum) + << "Failed to track total_bytes_through_ correctly when IOPriority = " + "Env::IO_TOTAL"; +} + +TEST_F(RateLimiterTest, GetTotalRequests) { + std::unique_ptr limiter(NewGenericRateLimiter( + 20 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */)); + for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { + ASSERT_EQ(limiter->GetTotalRequests(static_cast(i)), 0); + } + + std::int64_t total_requests_sum = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + limiter->Request(10, static_cast(i), nullptr /* stats */, + RateLimiter::OpType::kWrite); + total_requests_sum += 1; + } + + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + EXPECT_EQ(limiter->GetTotalRequests(static_cast(i)), 1) + << "Failed to track total_requests_ correctly when IOPriority = " + << static_cast(i); + } + EXPECT_EQ(limiter->GetTotalRequests(Env::IO_TOTAL), total_requests_sum) + << "Failed to track total_requests_ correctly when IOPriority = " + "Env::IO_TOTAL"; +} + TEST_F(RateLimiterTest, Modes) { for (auto mode : {RateLimiter::Mode::kWritesOnly, RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { @@ -60,6 +114,75 @@ TEST_F(RateLimiterTest, Modes) { } } +TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) { + std::unique_ptr limiter(NewGenericRateLimiter( + 20 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */)); + + bool possible_random_one_in_fairness_results_for_high_mid_pri[4][2] = { + {false, false}, {false, true}, {true, false}, {true, true}}; + std::vector possible_priority_iteration_orders[4] = { + {Env::IO_USER, Env::IO_HIGH, Env::IO_MID, Env::IO_LOW}, + {Env::IO_USER, Env::IO_HIGH, Env::IO_LOW, Env::IO_MID}, + {Env::IO_USER, Env::IO_MID, Env::IO_LOW, Env::IO_HIGH}, + {Env::IO_USER, Env::IO_LOW, Env::IO_MID, Env::IO_HIGH}}; + + for (int i = 0; i < 4; ++i) { + SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PostRandomOneInFairnessForHighPri", + [&](void* arg) { + bool* high_pri_iterated_after_mid_low_pri = (bool*)arg; + *high_pri_iterated_after_mid_low_pri = + possible_random_one_in_fairness_results_for_high_mid_pri[i][0]; + }); + + SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PostRandomOneInFairnessForMidPri", + [&](void* arg) { + bool* mid_pri_itereated_after_low_pri = (bool*)arg; + *mid_pri_itereated_after_low_pri = + possible_random_one_in_fairness_results_for_high_mid_pri[i][1]; + }); + + SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PreReturnPriIterationOrder", + [&](void* arg) { + std::vector* pri_iteration_order = + (std::vector*)arg; + EXPECT_EQ(*pri_iteration_order, possible_priority_iteration_orders[i]) + << "Failed to generate priority iteration order correctly when " + "high_pri_iterated_after_mid_low_pri = " + << possible_random_one_in_fairness_results_for_high_mid_pri[i][0] + << ", mid_pri_itereated_after_low_pri = " + << possible_random_one_in_fairness_results_for_high_mid_pri[i][1] + << std::endl; + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + limiter->Request(20 /* request max bytes to drain so that refill and order + generation will be triggered every time + GenericRateLimiter::Request() is called */ + , + Env::IO_USER, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PreReturnPriIterationOrder"); + SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PostRandomOneInFairnessForMidPri"); + SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::GeneratePriorityIterationOrder::" + "PostRandomOneInFairnessForHighPri"); +} + TEST_F(RateLimiterTest, Rate) { auto* env = Env::Default(); struct Arg { @@ -83,11 +206,24 @@ TEST_F(RateLimiterTest, Rate) { Random r((uint32_t)(thread_clock->NowNanos() % std::numeric_limits::max())); while (thread_clock->NowMicros() < until) { + for (int i = 0; i < static_cast(r.Skewed(arg->burst * 2) + 1); ++i) { + arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, + Env::IO_USER, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } + for (int i = 0; i < static_cast(r.Skewed(arg->burst) + 1); ++i) { arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kWrite); } + + for (int i = 0; i < static_cast(r.Skewed(arg->burst / 2 + 1) + 1); + ++i) { + arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_MID, + nullptr /* stats */, RateLimiter::OpType::kWrite); + } + arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW, nullptr /* stats */, RateLimiter::OpType::kWrite); }