Return Status::NotSupported() in RateLimiter::GetTotalPendingRequests default impl (#8950)

Summary:
Context:
After more discussion, a fix in https://github.com/facebook/rocksdb/issues/8938 might turn out to be too restrictive for the case where `GetTotalPendingRequests` might be invoked on RateLimiter classes that does not support the recently added API `RateLimiter::GetTotalPendingRequests` (https://github.com/facebook/rocksdb/issues/8890) due to the `assert(false)` in https://github.com/facebook/rocksdb/issues/8938. Furthermore, sentinel value like `-1` proposed in https://github.com/facebook/rocksdb/issues/8938 is easy to be ignored and unchecked. Therefore we decided to adopt `Status::NotSupported()`, which is also a convention of adding new API to public header in RocksDB.
- Changed return value type of  `RateLimiter::GetTotalPendingRequests` in related declaration/definition
- Passed in pointer argument to hold the output instead of returning it as before
- Adapted to the changes above in calling `RateLimiter::GetTotalPendingRequests` in test
- Minor improvement to `TEST_F(RateLimiterTest, GetTotalPendingRequests)`:  added failure message for assertion and replaced repetitive statements with a loop

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8950

Reviewed By: ajkr, pdillinger

Differential Revision: D31128450

Pulled By: hx235

fbshipit-source-id: 282ac9c4f3dacaa0aec6d0a993161f77ad47a040
main
Hui Xiao 3 years ago committed by Facebook GitHub Bot
parent be206db351
commit b25f2afeff
  1. 2
      HISTORY.md
  2. 16
      include/rocksdb/rate_limiter.h
  3. 11
      util/rate_limiter.h
  4. 43
      util/rate_limiter_test.cc

@ -33,7 +33,7 @@
* Added new callback APIs `OnBlobFileCreationStarted`,`OnBlobFileCreated`and `OnBlobFileDeleted` in `EventListener` class of listener.h. It notifies listeners during creation/deletion of individual blob files in Integrated BlobDB. It also log blob file creation finished event and deletion event in LOG file.
* Batch blob read requests for `DB::MultiGet` using `MultiRead`.
* Add support for fallback to local compaction, the user can return `CompactionServiceJobStatus::kUseLocal` to instruct RocksDB to run the compaction locally instead of waiting for the remote compaction result.
* Add built-in rate limiter's implementation for `RateLimiter::GetTotalPendingRequests()` for the total number of requests that are pending for bytes in the rate limiter.
* Add built-in rate limiter's implementation of `RateLimiter::GetTotalPendingRequest(int64_t* total_pending_requests, const Env::IOPriority pri)` for the total number of requests that are pending for bytes in the rate limiter.
### Public API change
* Remove obsolete implementation details FullKey and ParseFullKey from public API

@ -11,6 +11,7 @@
#include "rocksdb/env.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
@ -90,14 +91,17 @@ class RateLimiter {
const Env::IOPriority pri = Env::IO_TOTAL) const = 0;
// Total # of requests that are pending for bytes in rate limiter
// For convenience, this function is implemented by the RateLimiter returned
// by NewGenericRateLimiter but is not required by RocksDB. The default
// implementation indicates "not supported".
virtual int64_t GetTotalPendingRequests(
// For convenience, this function is supported by the RateLimiter returned
// by NewGenericRateLimiter but is not required by RocksDB.
//
// REQUIRED: total_pending_request != nullptr
virtual Status GetTotalPendingRequests(
int64_t* total_pending_requests,
const Env::IOPriority pri = Env::IO_TOTAL) const {
assert(false);
assert(total_pending_requests != nullptr);
(void)total_pending_requests;
(void)pri;
return -1;
return Status::NotSupported();
}
virtual int64_t GetBytesPerSecond() const = 0;

@ -17,6 +17,7 @@
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/status.h"
#include "rocksdb/system_clock.h"
#include "util/mutexlock.h"
#include "util/random.h"
@ -72,17 +73,21 @@ class GenericRateLimiter : public RateLimiter {
return total_requests_[pri];
}
virtual int64_t GetTotalPendingRequests(
virtual Status GetTotalPendingRequests(
int64_t* total_pending_requests,
const Env::IOPriority pri = Env::IO_TOTAL) const override {
assert(total_pending_requests != nullptr);
MutexLock g(&request_mutex_);
if (pri == Env::IO_TOTAL) {
int64_t total_pending_requests_sum = 0;
for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
}
return total_pending_requests_sum;
*total_pending_requests = total_pending_requests_sum;
} else {
*total_pending_requests = static_cast<int64_t>(queue_[pri].size());
}
return static_cast<int64_t>(queue_[pri].size());
return Status::OK();
}
virtual int64_t GetBytesPerSecond() const override {

@ -100,9 +100,11 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
std::unique_ptr<RateLimiter> limiter(NewGenericRateLimiter(
200 /* rate_bytes_per_sec */, 1000 * 1000 /* refill_period_us */,
10 /* fairness */));
int64_t total_pending_requests = 0;
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
ASSERT_EQ(limiter->GetTotalPendingRequests(static_cast<Env::IOPriority>(i)),
0);
ASSERT_OK(limiter->GetTotalPendingRequests(
&total_pending_requests, static_cast<Env::IOPriority>(i)));
ASSERT_EQ(total_pending_requests, 0);
}
// This is a variable for making sure the following callback is called
// and the assertions in it are indeed excuted
@ -113,11 +115,23 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
// We temporarily unlock the mutex so that the following
// GetTotalPendingRequests() can acquire it
request_mutex->Unlock();
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 1);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 1);
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
EXPECT_OK(limiter->GetTotalPendingRequests(
&total_pending_requests, static_cast<Env::IOPriority>(i)))
<< "Failed to return total pending requests for priority level = "
<< static_cast<Env::IOPriority>(i);
if (i == Env::IO_USER || i == Env::IO_TOTAL) {
EXPECT_EQ(total_pending_requests, 1)
<< "Failed to correctly return total pending requests for "
"priority level = "
<< static_cast<Env::IOPriority>(i);
} else {
EXPECT_EQ(total_pending_requests, 0)
<< "Failed to correctly return total pending requests for "
"priority level = "
<< static_cast<Env::IOPriority>(i);
}
}
// We lock the mutex again so that the request thread can resume running
// with the mutex locked
request_mutex->Lock();
@ -128,11 +142,16 @@ TEST_F(RateLimiterTest, GetTotalPendingRequests) {
limiter->Request(200, Env::IO_USER, nullptr /* stats */,
RateLimiter::OpType::kWrite);
ASSERT_EQ(nonzero_pending_requests_verified, true);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_USER), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_HIGH), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_MID), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_LOW), 0);
EXPECT_EQ(limiter->GetTotalPendingRequests(Env::IO_TOTAL), 0);
for (int i = Env::IO_LOW; i <= Env::IO_TOTAL; ++i) {
EXPECT_OK(limiter->GetTotalPendingRequests(&total_pending_requests,
static_cast<Env::IOPriority>(i)))
<< "Failed to return total pending requests for priority level = "
<< static_cast<Env::IOPriority>(i);
EXPECT_EQ(total_pending_requests, 0)
<< "Failed to correctly return total pending requests for priority "
"level = "
<< static_cast<Env::IOPriority>(i);
}
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearCallBack(
"GenericRateLimiter::Request:PostEnqueueRequest");

Loading…
Cancel
Save