From b25f2afeff04850fb4170d230d5f15c96e873f12 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Wed, 22 Sep 2021 19:35:05 -0700 Subject: [PATCH] Return Status::NotSupported() in RateLimiter::GetTotalPendingRequests default impl (#8950) Summary: Context: After more discussion, a fix in https://github.com/facebook/rocksdb/issues/8938 might turn out to be too restrictive for the case where `GetTotalPendingRequests` might be invoked on RateLimiter classes that does not support the recently added API `RateLimiter::GetTotalPendingRequests` (https://github.com/facebook/rocksdb/issues/8890) due to the `assert(false)` in https://github.com/facebook/rocksdb/issues/8938. Furthermore, sentinel value like `-1` proposed in https://github.com/facebook/rocksdb/issues/8938 is easy to be ignored and unchecked. Therefore we decided to adopt `Status::NotSupported()`, which is also a convention of adding new API to public header in RocksDB. - Changed return value type of `RateLimiter::GetTotalPendingRequests` in related declaration/definition - Passed in pointer argument to hold the output instead of returning it as before - Adapted to the changes above in calling `RateLimiter::GetTotalPendingRequests` in test - Minor improvement to `TEST_F(RateLimiterTest, GetTotalPendingRequests)`: added failure message for assertion and replaced repetitive statements with a loop Pull Request resolved: https://github.com/facebook/rocksdb/pull/8950 Reviewed By: ajkr, pdillinger Differential Revision: D31128450 Pulled By: hx235 fbshipit-source-id: 282ac9c4f3dacaa0aec6d0a993161f77ad47a040 --- HISTORY.md | 2 +- include/rocksdb/rate_limiter.h | 16 ++++++++----- util/rate_limiter.h | 11 ++++++--- util/rate_limiter_test.cc | 43 ++++++++++++++++++++++++---------- 4 files changed, 50 insertions(+), 22 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index d5cc3df06..0f5c3a3de 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -33,7 +33,7 @@ * Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file. * Batch blob read requests for `DB::MultiGet` using `MultiRead`. * Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result. -* Add built-in rate limiter's implementation for `RateLimiter::GetTotalPendingRequests()` for the total number of requests that are pending for bytes in the rate limiter. +* Add built-in rate limiter's implementation of `RateLimiter::GetTotalPendingRequest(int64_t* total_pending_requests, const Env::IOPriority pri)` for the total number of requests that are pending for bytes in the rate limiter. ### Public API change * Remove obsolete implementation details FullKey and ParseFullKey from public API diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 1e91047de..6bc0e7a4d 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -11,6 +11,7 @@ #include "rocksdb/env.h" #include "rocksdb/statistics.h" +#include "rocksdb/status.h" namespace ROCKSDB_NAMESPACE { @@ -90,14 +91,17 @@ class RateLimiter { const Env::IOPriority pri = Env::IO_TOTAL) const = 0; // Total # of requests that are pending for bytes in rate limiter - // For convenience, this function is implemented by the RateLimiter returned - // by NewGenericRateLimiter but is not required by RocksDB. The default - // implementation indicates "not supported". - virtual int64_t GetTotalPendingRequests( + // For convenience, this function is supported by the RateLimiter returned + // by NewGenericRateLimiter but is not required by RocksDB. + // + // REQUIRED: total_pending_request != nullptr + virtual Status GetTotalPendingRequests( + int64_t* total_pending_requests, const Env::IOPriority pri = Env::IO_TOTAL) const { - assert(false); + assert(total_pending_requests != nullptr); + (void)total_pending_requests; (void)pri; - return -1; + return Status::NotSupported(); } virtual int64_t GetBytesPerSecond() const = 0; diff --git a/util/rate_limiter.h b/util/rate_limiter.h index 7596374b0..1640ef076 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -17,6 +17,7 @@ #include "port/port.h" #include "rocksdb/env.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb/status.h" #include "rocksdb/system_clock.h" #include "util/mutexlock.h" #include "util/random.h" @@ -72,17 +73,21 @@ class GenericRateLimiter : public RateLimiter { return total_requests_[pri]; } - virtual int64_t GetTotalPendingRequests( + virtual Status GetTotalPendingRequests( + int64_t* total_pending_requests, const Env::IOPriority pri = Env::IO_TOTAL) const override { + assert(total_pending_requests != nullptr); MutexLock g(&request_mutex_); if (pri == Env::IO_TOTAL) { int64_t total_pending_requests_sum = 0; for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { total_pending_requests_sum += static_cast(queue_[i].size()); } - return total_pending_requests_sum; + *total_pending_requests = total_pending_requests_sum; + } else { + *total_pending_requests = static_cast(queue_[pri].size()); } - return static_cast(queue_[pri].size()); + return Status::OK(); } virtual int64_t GetBytesPerSecond() const override { diff --git a/util/rate_limiter_test.cc b/util/rate_limiter_test.cc index f99394475..5ea3da475 100644 --- a/util/rate_limiter_test.cc +++ b/util/rate_limiter_test.cc @@ -100,9 +100,11 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) { std::unique_ptr limiter(NewGenericRateLimiter( 200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */, 10 /* fairness */)); + int64_t total_pending_requests = 0; for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { - ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast(i)), - 0); + ASSERT_OK(limiter->GetTotalPendingRequests( + &total_pending_requests, static_cast(i))); + ASSERT_EQ(total_pending_requests, 0); } // This is a variable for making sure the following callback is called // and the assertions in it are indeed excuted @@ -113,11 +115,23 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) { // We temporarily unlock the mutex so that the following // GetTotalPendingRequests() can acquire it request_mutex->Unlock(); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1); + for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { + EXPECT_OK(limiter->GetTotalPendingRequests( + &total_pending_requests, static_cast(i))) + << "Failed to return total pending requests for priority level = " + << static_cast(i); + if (i == Env::IO_USER || i == Env::IO_TOTAL) { + EXPECT_EQ(total_pending_requests, 1) + << "Failed to correctly return total pending requests for " + "priority level = " + << static_cast(i); + } else { + EXPECT_EQ(total_pending_requests, 0) + << "Failed to correctly return total pending requests for " + "priority level = " + << static_cast(i); + } + } // We lock the mutex again so that the request thread can resume running // with the mutex locked request_mutex->Lock(); @@ -128,11 +142,16 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) { limiter->Request(200, Env::IO_USER, nullptr /* stats */, RateLimiter::OpType::kWrite); ASSERT_EQ(nonzero_pending_requests_verified, true); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0); - EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0); + for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) { + EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests, + static_cast(i))) + << "Failed to return total pending requests for priority level = " + << static_cast(i); + EXPECT_EQ(total_pending_requests, 0) + << "Failed to correctly return total pending requests for priority " + "level = " + << static_cast(i); + } SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearCallBack( "GenericRateLimiter::Request:PostEnqueueRequest");