// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. // This source code is licensed under both the GPLv2 (found in the // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "util/rate_limiter.h" #include #include #include #include #include "db/db_test_util.h" #include "port/port.h" #include "rocksdb/system_clock.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/random.h" namespace ROCKSDB_NAMESPACE { // TODO(yhchiang): the rate will not be accurate when we run test in parallel. class RateLimiterTest : public testing::Test { protected: ~RateLimiterTest() override { SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); } }; TEST_F(RateLimiterTest, OverflowRate) { GenericRateLimiter limiter(std::numeric_limits::max(), 1000, 10, RateLimiter::Mode::kWritesOnly, SystemClock::Default(), false /* auto_tuned */); ASSERT_GT(limiter.GetSingleBurstBytes(), 1000000000ll); } TEST_F(RateLimiterTest, StartStop) { std::unique_ptr limiter(NewGenericRateLimiter(100, 100, 10)); } TEST_F(RateLimiterTest, GetTotalBytesThrough) { std::unique_ptr limiter(NewGenericRateLimiter( 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, 10 /* fairness */)); for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { ASSERT_EQ(limiter->GetTotalBytesThrough(static_cast(i)), 0); } std::int64_t request_byte = 200; std::int64_t request_byte_sum = 0; for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { limiter->Request(request_byte, static_cast(i), nullptr /* stats */, RateLimiter::OpType::kWrite); request_byte_sum += request_byte; } for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { EXPECT_EQ(limiter->GetTotalBytesThrough(static_cast(i)), request_byte) << "Failed to track total_bytes_through_ correctly when IOPriority = " << static_cast(i); } EXPECT_EQ(limiter->GetTotalBytesThrough(Env::IO_TOTAL), request_byte_sum) << "Failed to track total_bytes_through_ correctly when IOPriority = " "Env::IO_TOTAL"; } TEST_F(RateLimiterTest, GetTotalRequests) { std::unique_ptr limiter(NewGenericRateLimiter( 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, 10 /* fairness */)); for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { ASSERT_EQ(limiter->GetTotalRequests(static_cast(i)), 0); } std::int64_t total_requests_sum = 0; for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { limiter->Request(200, static_cast(i), nullptr /* stats */, RateLimiter::OpType::kWrite); total_requests_sum += 1; } for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { EXPECT_EQ(limiter->GetTotalRequests(static_cast(i)), 1) << "Failed to track total_requests_ correctly when IOPriority = " << static_cast(i); } EXPECT_EQ(limiter->GetTotalRequests(Env::IO_TOTAL), total_requests_sum) << "Failed to track total_requests_ correctly when IOPriority = " "Env::IO_TOTAL"; } TEST_F(RateLimiterTest, GetTotalPendingRequests) { std::unique_ptr limiter(NewGenericRateLimiter( 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, 10 /* fairness */)); int64_t total_pending_requests = 0; for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { ASSERT_OK(limiter->GetTotalPendingRequests( &total_pending_requests, static_cast(i))); ASSERT_EQ(total_pending_requests, 0); } // This is a variable for making sure the following callback is called // and the assertions in it are indeed excuted bool nonzero_pending_requests_verified = false; SyncPoint::GetInstance()->SetCallBack( "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) { port::Mutex* request_mutex = (port::Mutex*)arg; // We temporarily unlock the mutex so that the following // GetTotalPendingRequests() can acquire it request_mutex->Unlock(); for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { EXPECT_OK(limiter->GetTotalPendingRequests( &total_pending_requests, static_cast(i))) << "Failed to return total pending requests for priority level = " << static_cast(i); if (i == Env::IO_USER || i == Env::IO_TOTAL) { EXPECT_EQ(total_pending_requests, 1) << "Failed to correctly return total pending requests for " "priority level = " << static_cast(i); } else { EXPECT_EQ(total_pending_requests, 0) << "Failed to correctly return total pending requests for " "priority level = " << static_cast(i); } } // We lock the mutex again so that the request thread can resume running // with the mutex locked request_mutex->Lock(); nonzero_pending_requests_verified = true; }); SyncPoint::GetInstance()->EnableProcessing(); limiter->Request(200, Env::IO_USER, nullptr /* stats */, RateLimiter::OpType::kWrite); ASSERT_EQ(nonzero_pending_requests_verified, true); for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests, static_cast(i))) << "Failed to return total pending requests for priority level = " << static_cast(i); EXPECT_EQ(total_pending_requests, 0) << "Failed to correctly return total pending requests for priority " "level = " << static_cast(i); } SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearCallBack( "GenericRateLimiter::Request:PostEnqueueRequest"); } TEST_F(RateLimiterTest, Modes) { for (auto mode : {RateLimiter::Mode::kWritesOnly, RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) { GenericRateLimiter limiter(2000 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, 10 /* fairness */, mode, SystemClock::Default(), false /* auto_tuned */); limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kRead); if (mode == RateLimiter::Mode::kWritesOnly) { ASSERT_EQ(0, limiter.GetTotalBytesThrough(Env::IO_HIGH)); } else { ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); } limiter.Request(1000 /* bytes */, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kWrite); if (mode == RateLimiter::Mode::kAllIo) { ASSERT_EQ(2000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); } else { ASSERT_EQ(1000, limiter.GetTotalBytesThrough(Env::IO_HIGH)); } } } TEST_F(RateLimiterTest, GeneratePriorityIterationOrder) { std::unique_ptr limiter(NewGenericRateLimiter( 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, 10 /* fairness */)); bool possible_random_one_in_fairness_results_for_high_mid_pri[4][2] = { {false, false}, {false, true}, {true, false}, {true, true}}; std::vector possible_priority_iteration_orders[4] = { {Env::IO_USER, Env::IO_HIGH, Env::IO_MID, Env::IO_LOW}, {Env::IO_USER, Env::IO_HIGH, Env::IO_LOW, Env::IO_MID}, {Env::IO_USER, Env::IO_MID, Env::IO_LOW, Env::IO_HIGH}, {Env::IO_USER, Env::IO_LOW, Env::IO_MID, Env::IO_HIGH}}; for (int i = 0; i < 4; ++i) { // These are variables for making sure the following callbacks are called // and the assertion in the last callback is indeed excuted bool high_pri_iterated_after_mid_low_pri_set = false; bool mid_pri_itereated_after_low_pri_set = false; bool pri_iteration_order_verified = false; SyncPoint::GetInstance()->SetCallBack( "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForHighPri", [&](void* arg) { bool* high_pri_iterated_after_mid_low_pri = (bool*)arg; *high_pri_iterated_after_mid_low_pri = possible_random_one_in_fairness_results_for_high_mid_pri[i][0]; high_pri_iterated_after_mid_low_pri_set = true; }); SyncPoint::GetInstance()->SetCallBack( "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForMidPri", [&](void* arg) { bool* mid_pri_itereated_after_low_pri = (bool*)arg; *mid_pri_itereated_after_low_pri = possible_random_one_in_fairness_results_for_high_mid_pri[i][1]; mid_pri_itereated_after_low_pri_set = true; }); SyncPoint::GetInstance()->SetCallBack( "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PreReturnPriIterationOrder", [&](void* arg) { std::vector* pri_iteration_order = (std::vector*)arg; EXPECT_EQ(*pri_iteration_order, possible_priority_iteration_orders[i]) << "Failed to generate priority iteration order correctly when " "high_pri_iterated_after_mid_low_pri = " << possible_random_one_in_fairness_results_for_high_mid_pri[i][0] << ", mid_pri_itereated_after_low_pri = " << possible_random_one_in_fairness_results_for_high_mid_pri[i][1] << std::endl; pri_iteration_order_verified = true; }); SyncPoint::GetInstance()->EnableProcessing(); limiter->Request(200 /* request max bytes to drain so that refill and order generation will be triggered every time GenericRateLimiter::Request() is called */ , Env::IO_USER, nullptr /* stats */, RateLimiter::OpType::kWrite); ASSERT_EQ(high_pri_iterated_after_mid_low_pri_set, true); ASSERT_EQ(mid_pri_itereated_after_low_pri_set, true); ASSERT_EQ(pri_iteration_order_verified, true); SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearCallBack( "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PreReturnPriIterationOrder"); SyncPoint::GetInstance()->ClearCallBack( "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForMidPri"); SyncPoint::GetInstance()->ClearCallBack( "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" "PostRandomOneInFairnessForHighPri"); } } TEST_F(RateLimiterTest, Rate) { auto* env = Env::Default(); struct Arg { Arg(int32_t _target_rate, int _burst) : limiter(NewGenericRateLimiter(_target_rate /* rate_bytes_per_sec */, 100 * 1000 /* refill_period_us */, 10 /* fairness */)), request_size(_target_rate / 10 /* refill period here is 1/10 second */), burst(_burst) {} std::unique_ptr limiter; int32_t request_size; int burst; }; auto writer = [](void* p) { const auto& thread_clock = SystemClock::Default(); auto* arg = static_cast(p); // Test for 2 seconds auto until = thread_clock->NowMicros() + 2 * 1000000; Random r((uint32_t)(thread_clock->NowNanos() % std::numeric_limits::max())); while (thread_clock->NowMicros() < until) { for (int i = 0; i < static_cast(r.Skewed(arg->burst * 2) + 1); ++i) { arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_USER, nullptr /* stats */, RateLimiter::OpType::kWrite); } for (int i = 0; i < static_cast(r.Skewed(arg->burst) + 1); ++i) { arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_HIGH, nullptr /* stats */, RateLimiter::OpType::kWrite); } for (int i = 0; i < static_cast(r.Skewed(arg->burst / 2 + 1) + 1); ++i) { arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_MID, nullptr /* stats */, RateLimiter::OpType::kWrite); } arg->limiter->Request(r.Uniform(arg->request_size - 1) + 1, Env::IO_LOW, nullptr /* stats */, RateLimiter::OpType::kWrite); } }; int samples = 0; int samples_at_minimum = 0; for (int i = 1; i <= 16; i *= 2) { int32_t target = i * 1024 * 10; Arg arg(target, i / 4 + 1); int64_t old_total_bytes_through = 0; for (int iter = 1; iter <= 2; ++iter) { // second iteration changes the target dynamically if (iter == 2) { target *= 2; arg.limiter->SetBytesPerSecond(target); } auto start = env->NowMicros(); for (int t = 0; t < i; ++t) { env->StartThread(writer, &arg); } env->WaitForJoin(); auto elapsed = env->NowMicros() - start; double rate = (arg.limiter->GetTotalBytesThrough() - old_total_bytes_through) * 1000000.0 / elapsed; old_total_bytes_through = arg.limiter->GetTotalBytesThrough(); fprintf(stderr, "request size [1 - %" PRIi32 "], limit %" PRIi32 " KB/sec, actual rate: %lf KB/sec, elapsed %.2lf seconds\n", arg.request_size - 1, target / 1024, rate / 1024, elapsed / 1000000.0); ++samples; if (rate / target >= 0.80) { ++samples_at_minimum; } ASSERT_LE(rate / target, 1.25); } } // This can fail due to slow execution speed, like when using valgrind or in // heavily loaded CI environments bool skip_minimum_rate_check = #if (defined(CIRCLECI) && defined(OS_MACOSX)) || defined(ROCKSDB_VALGRIND_RUN) true; #else getenv("SANDCASTLE"); #endif if (skip_minimum_rate_check) { fprintf(stderr, "Skipped minimum rate check (%d / %d passed)\n", samples_at_minimum, samples); } else { ASSERT_EQ(samples_at_minimum, samples); } } TEST_F(RateLimiterTest, LimitChangeTest) { // starvation test when limit changes to a smaller value int64_t refill_period = 1000 * 1000; auto* env = Env::Default(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); struct Arg { Arg(int32_t _request_size, Env::IOPriority _pri, std::shared_ptr _limiter) : request_size(_request_size), pri(_pri), limiter(_limiter) {} int32_t request_size; Env::IOPriority pri; std::shared_ptr limiter; }; auto writer = [](void* p) { auto* arg = static_cast(p); arg->limiter->Request(arg->request_size, arg->pri, nullptr /* stats */, RateLimiter::OpType::kWrite); }; for (uint32_t i = 1; i <= 16; i <<= 1) { int32_t target = i * 1024 * 10; // refill per second for (int iter = 0; iter < 2; iter++) { std::shared_ptr limiter = std::make_shared( target, refill_period, 10, RateLimiter::Mode::kWritesOnly, SystemClock::Default(), false /* auto_tuned */); // After "GenericRateLimiter::Request:1" the mutex is held until the bytes // are refilled. This test could be improved to change the limit when lock // is released in `TimedWait()`. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( {{"GenericRateLimiter::Request", "RateLimiterTest::LimitChangeTest:changeLimitStart"}, {"RateLimiterTest::LimitChangeTest:changeLimitEnd", "GenericRateLimiter::Request:1"}}); Arg arg(target, Env::IO_HIGH, limiter); // The idea behind is to start a request first, then before it refills, // update limit to a different value (2X/0.5X). No starvation should // be guaranteed under any situation // TODO(lightmark): more test cases are welcome. env->StartThread(writer, &arg); int32_t new_limit = (target << 1) >> (iter << 1); TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitStart"); arg.limiter->SetBytesPerSecond(new_limit); TEST_SYNC_POINT("RateLimiterTest::LimitChangeTest:changeLimitEnd"); env->WaitForJoin(); fprintf(stderr, "[COMPLETE] request size %" PRIi32 " KB, new limit %" PRIi32 "KB/sec, refill period %" PRIi64 " ms\n", target / 1024, new_limit / 1024, refill_period / 1000); } } ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); } TEST_F(RateLimiterTest, AvailableByteSizeExhaustTest) { SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); const std::chrono::seconds kTimePerRefill(1); // This test makes sure available_bytes_ get exhausted first before queuing // any remaining bytes when requested_bytes > available_bytes const int64_t available_bytes_per_period = 500; std::shared_ptr limiter = std::make_shared( available_bytes_per_period, std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(), false /* auto_tuned */); // Step 1. Request 100 and wait for the refill // so that the remaining available bytes are 400 limiter->Request(100, Env::IO_USER, nullptr /* stats */, RateLimiter::OpType::kWrite); special_env.SleepForMicroseconds( static_cast(std::chrono::microseconds(kTimePerRefill).count())); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) { port::Mutex* request_mutex = (port::Mutex*)arg; request_mutex->Unlock(); // Step 3. Check GetTotalBytesThrough = available_bytes_per_period // to make sure that the first request (100) and the part of the second // request (400) made through when the remaining of the second request // got queued ASSERT_EQ(available_bytes_per_period, limiter->GetTotalBytesThrough(Env::IO_USER)); request_mutex->Lock(); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); // Step 2. Request 500, which is greater than the remaining available bytes // (400) limiter->Request(500, Env::IO_USER, nullptr /* stats */, RateLimiter::OpType::kWrite); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( "GenericRateLimiter::Request:PostEnqueueRequest"); } TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) { const std::chrono::seconds kTimePerRefill(1); const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); auto stats = CreateDBStatistics(); std::unique_ptr rate_limiter(new GenericRateLimiter( 1000 /* rate_bytes_per_sec */, std::chrono::microseconds(kTimePerRefill).count(), 10 /* fairness */, RateLimiter::Mode::kWritesOnly, special_env.GetSystemClock(), true /* auto_tuned */)); // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the // `Env` to advance its time according to the fake wait duration. The // workaround is to install a callback that advance the `Env`'s mock time. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { int64_t time_waited_us = *static_cast(arg); special_env.SleepForMicroseconds(static_cast(time_waited_us)); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); // verify rate limit increases after a sequence of periods where rate limiter // is always drained int64_t orig_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), RateLimiter::OpType::kWrite); while (std::chrono::microseconds(special_env.NowMicros()) <= kRefillsPerTune * kTimePerRefill) { rate_limiter->Request(orig_bytes_per_sec, Env::IO_HIGH, stats.get(), RateLimiter::OpType::kWrite); } int64_t new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); ASSERT_GT(new_bytes_per_sec, orig_bytes_per_sec); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( "GenericRateLimiter::Request:PostTimedWait"); // decreases after a sequence of periods where rate limiter is not drained orig_bytes_per_sec = new_bytes_per_sec; special_env.SleepForMicroseconds(static_cast( kRefillsPerTune * std::chrono::microseconds(kTimePerRefill).count())); // make a request so tuner can be triggered rate_limiter->Request(1 /* bytes */, Env::IO_HIGH, stats.get(), RateLimiter::OpType::kWrite); new_bytes_per_sec = rate_limiter->GetSingleBurstBytes(); ASSERT_LT(new_bytes_per_sec, orig_bytes_per_sec); } } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }