diff --git a/HISTORY.md b/HISTORY.md index 821e58b24..ec65434d3 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,7 @@ ### Public API change * Remove obsolete implementation details FullKey and ParseFullKey from public API +* Add a public API RateLimiter::GetTotalPendingRequests() for the total number of requests that are pending for bytes in the rate limiter. ## 6.24.0 (2021-08-20) ### Bug Fixes diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 0ee89f5c8..518db7aa6 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -89,6 +89,10 @@ class RateLimiter { virtual int64_t GetTotalRequests( const Env::IOPriority pri = Env::IO_TOTAL) const = 0; + // Total # of requests that are pending for bytes in rate limiter + virtual int64_t GetTotalPendingRequests( + const Env::IOPriority pri = Env::IO_TOTAL) const = 0; + virtual int64_t GetBytesPerSecond() const = 0; virtual bool IsRateLimited(OpType op_type) { diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index 7d7936e5f..2faafdbbb 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -143,7 +143,8 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, // Request cannot be satisfied at this moment, enqueue Req r(bytes, &request_mutex_); queue_[pri].push_back(&r); - + TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest", + &request_mutex_); // A thread representing a queued request coordinates with other such threads. // There are two main duties. // diff --git a/util/rate_limiter.h b/util/rate_limiter.h index f3f593c36..7596374b0 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -72,6 +72,19 @@ class GenericRateLimiter : public RateLimiter { return total_requests_[pri]; } + virtual int64_t GetTotalPendingRequests( + const Env::IOPriority pri = Env::IO_TOTAL) const override { + MutexLock g(&request_mutex_); + if (pri == Env::IO_TOTAL) { + int64_t total_pending_requests_sum = 0; + for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { + total_pending_requests_sum += static_cast(queue_[i].size()); + } + return total_pending_requests_sum; + } + return static_cast(queue_[pri].size()); + } + virtual int64_t GetBytesPerSecond() const override { return rate_bytes_per_sec_; } diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index 27bbc2d31..cae82da34 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -15,6 +15,7 @@ #include #include "db/db_test_util.h" +#include "port/port.h" #include "rocksdb/system_clock.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" @@ -89,6 +90,47 @@ TEST_F(RateLimiterTest, GetTotalRequests) { "Env::IO_TOTAL"; } +TEST_F(RateLimiterTest, GetTotalPendingRequests) { + std::unique_ptr limiter( + NewGenericRateLimiter(20 /* rate_bytes_per_sec */)); + for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { + ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast(i)), + 0); + } + // This is a variable for making sure the following callback is called + // and the assertions in it are indeed excuted + bool nonzero_pending_requests_verified_ = false; + SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostEnqueueRequest", [&](void* arg) { + port::Mutex* request_mutex = (port::Mutex*)arg; + // We temporarily unlock the mutex so that the following + // GetTotalPendingRequests() can acquire it + request_mutex->Unlock(); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1); + // We lock the mutex again so that the request thread can resume running + // with the mutex locked + request_mutex->Lock(); + nonzero_pending_requests_verified_ = true; + }); + + SyncPoint::GetInstance()->EnableProcessing(); + limiter->Request(20, Env::IO_USER, nullptr /* stats */, + RateLimiter::OpType::kWrite); + ASSERT_EQ(nonzero_pending_requests_verified_, true); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0); + EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostEnqueueRequest"); +} + TEST_F(RateLimiterTest, Modes) { for (auto mode : {RateLimiter::Mode::kWritesOnly, RateLimiter::Mode::kReadsOnly, RateLimiter::Mode::kAllIo}) {