Add rate-limiting support to batched MultiGet() (#10159)

Summary:
**Context/Summary:**
https://github.com/facebook/rocksdb/pull/9424 added rate-limiting support for user reads, which does not include batched `MultiGet()`s that call `RandomAccessFileReader::MultiRead()`. The reason is that it's harder (compared with RandomAccessFileReader::Read()) to implement the ideal rate-limiting where we first call `RateLimiter::RequestToken()` for allowed bytes to multi-read and then consume those bytes by satisfying as many requests in `MultiRead()` as possible. For example, it can be tricky to decide whether we want partially fulfilled requests within one `MultiRead()` or not.

However, due to a recent urgent user request, we decide to pursue an elementary (but a conditionally ineffective) solution where we accumulate enough rate limiter requests toward the total bytes needed by one `MultiRead()` before doing that `MultiRead()`. This is not ideal when the total bytes are huge as we will actually consume a huge bandwidth from rate-limiter causing a burst on disk. This is not what we ultimately want with rate limiter. Therefore a follow-up work is noted through TODO comments.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10159

Test Plan:
- Modified existing unit test `DBRateLimiterOnReadTest/DBRateLimiterOnReadTest.NewMultiGet`
- Traced the underlying system calls `io_uring_enter` and verified they are 10 seconds apart from each other correctly under the setting of  `strace -ftt -e trace=io_uring_enter ./db_bench -benchmarks=multireadrandom -db=/dev/shm/testdb2 -readonly -num=50 -threads=1 -multiread_batched=1 -batch_size=100 -duration=10 -rate_limiter_bytes_per_sec=200 -rate_limiter_refill_period_us=1000000 -rate_limit_bg_reads=1 -disable_auto_compactions=1 -rate_limit_user_ops=1` where each `MultiRead()` read about 2000 bytes (inspected by debugger) and the rate limiter grants 200 bytes per seconds.
- Stress test:
   - Verified `./db_stress (-test_cf_consistency=1/test_batches_snapshots=1) -use_multiget=1 -cache_size=1048576 -rate_limiter_bytes_per_sec=10241024 -rate_limit_bg_reads=1 -rate_limit_user_ops=1` work

Reviewed By: ajkr, anand1976

Differential Revision: D37135172

Pulled By: hx235

fbshipit-source-id: 73b8e8f14761e5d4b77235dfe5d41f4eea968bcd
main
Hui Xiao 3 years ago committed by Facebook GitHub Bot
parent c965c9ef65
commit a5d773e077
  1. 1
      HISTORY.md
  2. 13
      db/db_rate_limiter_test.cc
  3. 2
      db_stress_tool/batched_ops_stress.cc
  4. 5
      db_stress_tool/cf_consistency_stress.cc
  5. 2
      db_stress_tool/no_batched_ops_stress.cc
  6. 27
      file/random_access_file_reader.cc
  7. 4
      include/rocksdb/options.h
  8. 3
      tools/db_bench_tool.cc
  9. 3
      utilities/backup/backup_engine.cc

@ -36,6 +36,7 @@
* Add a new column family option `blob_file_starting_level` to enable writing blob files during flushes and compactions starting from the specified LSM tree level. * Add a new column family option `blob_file_starting_level` to enable writing blob files during flushes and compactions starting from the specified LSM tree level.
* Add support for timestamped snapshots (#9879) * Add support for timestamped snapshots (#9879)
* Provide support for AbortIO in posix to cancel submitted asynchronous requests using io_uring. * Provide support for AbortIO in posix to cancel submitted asynchronous requests using io_uring.
* Add support for rate-limiting batched `MultiGet()` APIs
### Behavior changes ### Behavior changes
* DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984) * DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984)

@ -139,8 +139,6 @@ TEST_P(DBRateLimiterOnReadTest, Get) {
} }
TEST_P(DBRateLimiterOnReadTest, NewMultiGet) { TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
// The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not
// yet support rate limiting.
if (use_direct_io_ && !IsDirectIOSupported()) { if (use_direct_io_ && !IsDirectIOSupported()) {
return; return;
} }
@ -149,6 +147,7 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
const int kNumKeys = kNumFiles * kNumKeysPerFile; const int kNumKeys = kNumFiles * kNumKeysPerFile;
int64_t expected = 0;
{ {
std::vector<std::string> key_bufs; std::vector<std::string> key_bufs;
key_bufs.reserve(kNumKeys); key_bufs.reserve(kNumKeys);
@ -160,13 +159,19 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) {
} }
std::vector<Status> statuses(kNumKeys); std::vector<Status> statuses(kNumKeys);
std::vector<PinnableSlice> values(kNumKeys); std::vector<PinnableSlice> values(kNumKeys);
const int64_t prev_total_rl_req = options_.rate_limiter->GetTotalRequests();
db_->MultiGet(GetReadOptions(), dbfull()->DefaultColumnFamily(), kNumKeys, db_->MultiGet(GetReadOptions(), dbfull()->DefaultColumnFamily(), kNumKeys,
keys.data(), values.data(), statuses.data()); keys.data(), values.data(), statuses.data());
const int64_t cur_total_rl_req = options_.rate_limiter->GetTotalRequests();
for (int i = 0; i < kNumKeys; ++i) { for (int i = 0; i < kNumKeys; ++i) {
ASSERT_TRUE(statuses[i].IsNotSupported()); ASSERT_TRUE(statuses[i].ok());
} }
ASSERT_GT(cur_total_rl_req, prev_total_rl_req);
ASSERT_EQ(cur_total_rl_req - prev_total_rl_req,
options_.rate_limiter->GetTotalRequests(Env::IO_USER));
} }
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); expected += kNumKeys;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
} }
TEST_P(DBRateLimiterOnReadTest, OldMultiGet) { TEST_P(DBRateLimiterOnReadTest, OldMultiGet) {

@ -190,6 +190,8 @@ class BatchedOpsStressTest : public StressTest {
std::vector<Status> statuses(num_prefixes); std::vector<Status> statuses(num_prefixes);
ReadOptions readoptionscopy = readoptions; ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = db_->GetSnapshot(); readoptionscopy.snapshot = db_->GetSnapshot();
readoptionscopy.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
std::vector<std::string> key_str; std::vector<std::string> key_str;
key_str.reserve(num_prefixes); key_str.reserve(num_prefixes);
key_slices.reserve(num_prefixes); key_slices.reserve(num_prefixes);

@ -214,12 +214,15 @@ class CfConsistencyStressTest : public StressTest {
std::vector<PinnableSlice> values(num_keys); std::vector<PinnableSlice> values(num_keys);
std::vector<Status> statuses(num_keys); std::vector<Status> statuses(num_keys);
ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]];
ReadOptions readoptionscopy = read_opts;
readoptionscopy.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
for (size_t i = 0; i < num_keys; ++i) { for (size_t i = 0; i < num_keys; ++i) {
key_str.emplace_back(Key(rand_keys[i])); key_str.emplace_back(Key(rand_keys[i]));
keys.emplace_back(key_str.back()); keys.emplace_back(key_str.back());
} }
db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(),
statuses.data()); statuses.data());
for (auto s : statuses) { for (auto s : statuses) {
if (s.ok()) { if (s.ok()) {

@ -391,6 +391,8 @@ class NonBatchedOpsStressTest : public StressTest {
if (do_consistency_check) { if (do_consistency_check) {
readoptionscopy.snapshot = db_->GetSnapshot(); readoptionscopy.snapshot = db_->GetSnapshot();
} }
readoptionscopy.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
// To appease clang analyzer // To appease clang analyzer
const bool use_txn = FLAGS_use_txn; const bool use_txn = FLAGS_use_txn;

@ -270,9 +270,6 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
IOStatus RandomAccessFileReader::MultiRead( IOStatus RandomAccessFileReader::MultiRead(
const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs, const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs,
AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const { AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const {
if (rate_limiter_priority != Env::IO_TOTAL) {
return IOStatus::NotSupported("Unable to rate limit MultiRead()");
}
(void)aligned_buf; // suppress warning of unused variable in LITE mode (void)aligned_buf; // suppress warning of unused variable in LITE mode
assert(num_reqs > 0); assert(num_reqs > 0);
@ -359,6 +356,30 @@ IOStatus RandomAccessFileReader::MultiRead(
{ {
IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
// TODO: ideally we should call `RateLimiter::RequestToken()` for
// allowed bytes to multi-read and then consume those bytes by
// satisfying as many requests in `MultiRead()` as possible, instead of
// what we do here, which can cause burst when the
// `total_multi_read_size` is big.
size_t total_multi_read_size = 0;
assert(fs_reqs != nullptr);
for (size_t i = 0; i < num_fs_reqs; ++i) {
FSReadRequest& req = fs_reqs[i];
total_multi_read_size += req.len;
}
size_t remaining_bytes = total_multi_read_size;
size_t request_bytes = 0;
while (remaining_bytes > 0) {
request_bytes = std::min(
static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()),
remaining_bytes);
rate_limiter_->Request(request_bytes, rate_limiter_priority,
nullptr /* stats */,
RateLimiter::OpType::kRead);
remaining_bytes -= request_bytes;
}
}
io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr); io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
} }

@ -1641,10 +1641,6 @@ struct ReadOptions {
// is a `PlainTableFactory`) and cuckoo tables (these can exist when // is a `PlainTableFactory`) and cuckoo tables (these can exist when
// `ColumnFamilyOptions::table_factory` is a `CuckooTableFactory`). // `ColumnFamilyOptions::table_factory` is a `CuckooTableFactory`).
// //
// The new `DB::MultiGet()` APIs (i.e., the ones returning `void`) will return
// `Status::NotSupported` when that operation requires file read(s) and
// `rate_limiter_priority != Env::IO_TOTAL`.
//
// The bytes charged to rate limiter may not exactly match the file read bytes // The bytes charged to rate limiter may not exactly match the file read bytes
// since there are some seemingly insignificant reads, like for file // since there are some seemingly insignificant reads, like for file
// headers/footers, that we currently do not charge to rate limiter. // headers/footers, that we currently do not charge to rate limiter.

@ -4562,6 +4562,9 @@ class Benchmark {
options.rate_limiter.reset(NewGenericRateLimiter( options.rate_limiter.reset(NewGenericRateLimiter(
FLAGS_rate_limiter_bytes_per_sec, FLAGS_rate_limiter_bytes_per_sec,
FLAGS_rate_limiter_refill_period_us, 10 /* fairness */, FLAGS_rate_limiter_refill_period_us, 10 /* fairness */,
// TODO: replace this with a more general FLAG for deciding
// RateLimiter::Mode as now we also rate-limit foreground reads e.g,
// Get()/MultiGet()
FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly
: RateLimiter::Mode::kWritesOnly, : RateLimiter::Mode::kWritesOnly,
FLAGS_rate_limiter_auto_tuned)); FLAGS_rate_limiter_auto_tuned));

@ -296,6 +296,9 @@ class BackupEngineImpl {
} }
}; };
// TODO: deprecate this function once we migrate all BackupEngine's rate
// limiting to lower-level ones (i.e, ones in file access wrapper level like
// `WritableFileWriter`)
static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request, static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request,
RateLimiter* rate_limiter, RateLimiter* rate_limiter,
const Env::IOPriority pri, const Env::IOPriority pri,

Loading…
Cancel
Save