diff --git a/HISTORY.md b/HISTORY.md index b5cb8cd25..a115c051a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -36,6 +36,7 @@ * Add a new column family option `blob_file_starting_level` to enable writing blob files during flushes and compactions starting from the specified LSM tree level. * Add support for timestamped snapshots (#9879) * Provide support for AbortIO in posix to cancel submitted asynchronous requests using io_uring. +* Add support for rate-limiting batched `MultiGet()` APIs ### Behavior changes * DB::Open(), DB::OpenAsSecondary() will fail if a Logger cannot be created (#9984) diff --git a/db/db_rate_limiter_test.cc b/db/db_rate_limiter_test.cc index f30af1974..e44cc047d 100644 --- a/db/db_rate_limiter_test.cc +++ b/db/db_rate_limiter_test.cc @@ -139,8 +139,6 @@ TEST_P(DBRateLimiterOnReadTest, Get) { } TEST_P(DBRateLimiterOnReadTest, NewMultiGet) { - // The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not - // yet support rate limiting. if (use_direct_io_ && !IsDirectIOSupported()) { return; } @@ -149,6 +147,7 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) { ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); const int kNumKeys = kNumFiles * kNumKeysPerFile; + int64_t expected = 0; { std::vector key_bufs; key_bufs.reserve(kNumKeys); @@ -160,13 +159,19 @@ TEST_P(DBRateLimiterOnReadTest, NewMultiGet) { } std::vector statuses(kNumKeys); std::vector values(kNumKeys); + const int64_t prev_total_rl_req = options_.rate_limiter->GetTotalRequests(); db_->MultiGet(GetReadOptions(), dbfull()->DefaultColumnFamily(), kNumKeys, keys.data(), values.data(), statuses.data()); + const int64_t cur_total_rl_req = options_.rate_limiter->GetTotalRequests(); for (int i = 0; i < kNumKeys; ++i) { - ASSERT_TRUE(statuses[i].IsNotSupported()); + ASSERT_TRUE(statuses[i].ok()); } + ASSERT_GT(cur_total_rl_req, prev_total_rl_req); + ASSERT_EQ(cur_total_rl_req - prev_total_rl_req, + options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } - ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + expected += kNumKeys; + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); } TEST_P(DBRateLimiterOnReadTest, OldMultiGet) { diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index 13f3aba5c..52287c0ae 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -190,6 +190,8 @@ class BatchedOpsStressTest : public StressTest { std::vector statuses(num_prefixes); ReadOptions readoptionscopy = readoptions; readoptionscopy.snapshot = db_->GetSnapshot(); + readoptionscopy.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; std::vector key_str; key_str.reserve(num_prefixes); key_slices.reserve(num_prefixes); diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index b7cc4c376..4f6530590 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -214,12 +214,15 @@ class CfConsistencyStressTest : public StressTest { std::vector values(num_keys); std::vector statuses(num_keys); ColumnFamilyHandle* cfh = column_families_[rand_column_families[0]]; + ReadOptions readoptionscopy = read_opts; + readoptionscopy.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; for (size_t i = 0; i < num_keys; ++i) { key_str.emplace_back(Key(rand_keys[i])); keys.emplace_back(key_str.back()); } - db_->MultiGet(read_opts, cfh, num_keys, keys.data(), values.data(), + db_->MultiGet(readoptionscopy, cfh, num_keys, keys.data(), values.data(), statuses.data()); for (auto s : statuses) { if (s.ok()) { diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 08cd9b91a..16e238501 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -391,6 +391,8 @@ class NonBatchedOpsStressTest : public StressTest { if (do_consistency_check) { readoptionscopy.snapshot = db_->GetSnapshot(); } + readoptionscopy.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; // To appease clang analyzer const bool use_txn = FLAGS_use_txn; diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index e74b78cc4..d02b7b5f6 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -270,9 +270,6 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) { IOStatus RandomAccessFileReader::MultiRead( const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs, AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const { - if (rate_limiter_priority != Env::IO_TOTAL) { - return IOStatus::NotSupported("Unable to rate limit MultiRead()"); - } (void)aligned_buf; // suppress warning of unused variable in LITE mode assert(num_reqs > 0); @@ -359,6 +356,30 @@ IOStatus RandomAccessFileReader::MultiRead( { IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); + if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { + // TODO: ideally we should call `RateLimiter::RequestToken()` for + // allowed bytes to multi-read and then consume those bytes by + // satisfying as many requests in `MultiRead()` as possible, instead of + // what we do here, which can cause burst when the + // `total_multi_read_size` is big. + size_t total_multi_read_size = 0; + assert(fs_reqs != nullptr); + for (size_t i = 0; i < num_fs_reqs; ++i) { + FSReadRequest& req = fs_reqs[i]; + total_multi_read_size += req.len; + } + size_t remaining_bytes = total_multi_read_size; + size_t request_bytes = 0; + while (remaining_bytes > 0) { + request_bytes = std::min( + static_cast(rate_limiter_->GetSingleBurstBytes()), + remaining_bytes); + rate_limiter_->Request(request_bytes, rate_limiter_priority, + nullptr /* stats */, + RateLimiter::OpType::kRead); + remaining_bytes -= request_bytes; + } + } io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr); } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 72a2f7de1..542955e90 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1641,10 +1641,6 @@ struct ReadOptions { // is a `PlainTableFactory`) and cuckoo tables (these can exist when // `ColumnFamilyOptions::table_factory` is a `CuckooTableFactory`). // - // The new `DB::MultiGet()` APIs (i.e., the ones returning `void`) will return - // `Status::NotSupported` when that operation requires file read(s) and - // `rate_limiter_priority != Env::IO_TOTAL`. - // // The bytes charged to rate limiter may not exactly match the file read bytes // since there are some seemingly insignificant reads, like for file // headers/footers, that we currently do not charge to rate limiter. diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 9cebbbca9..5b20143fd 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -4562,6 +4562,9 @@ class Benchmark { options.rate_limiter.reset(NewGenericRateLimiter( FLAGS_rate_limiter_bytes_per_sec, FLAGS_rate_limiter_refill_period_us, 10 /* fairness */, + // TODO: replace this with a more general FLAG for deciding + // RateLimiter::Mode as now we also rate-limit foreground reads e.g, + // Get()/MultiGet() FLAGS_rate_limit_bg_reads ? RateLimiter::Mode::kReadsOnly : RateLimiter::Mode::kWritesOnly, FLAGS_rate_limiter_auto_tuned)); diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index a14dbc880..37a8cfff6 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -296,6 +296,9 @@ class BackupEngineImpl { } }; + // TODO: deprecate this function once we migrate all BackupEngine's rate + // limiting to lower-level ones (i.e, ones in file access wrapper level like + // `WritableFileWriter`) static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request, RateLimiter* rate_limiter, const Env::IOPriority pri,