diff --git a/CMakeLists.txt b/CMakeLists.txt index b550602c8..c204baa21 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1196,6 +1196,7 @@ if(WITH_TESTS) db/db_options_test.cc db/db_properties_test.cc db/db_range_del_test.cc + db/db_rate_limiter_test.cc db/db_secondary_test.cc db/db_sst_test.cc db/db_statistics_test.cc diff --git a/HISTORY.md b/HISTORY.md index 2ae0a3715..524391e39 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -13,6 +13,7 @@ ### Public API changes * Require C++17 compatible compiler (GCC >= 7, Clang >= 5, Visual Studio >= 2017). See #9388. +* Added `ReadOptions::rate_limiter_priority`. When set to something other than `Env::IO_TOTAL`, the internal rate limiter (`DBOptions::rate_limiter`) will be charged at the specified priority for file reads associated with the API to which the `ReadOptions` was provided. * Remove HDFS support from main repo. * Remove librados support from main repo. * Remove obsolete backupable_db.h and type alias `BackupableDBOptions`. Use backup_engine.h and `BackupEngineOptions`. Similar renamings are in the C and Java APIs. diff --git a/Makefile b/Makefile index 09f9d5fcd..6a3ae6977 100644 --- a/Makefile +++ b/Makefile @@ -1533,6 +1533,9 @@ db_options_test: $(OBJ_DIR)/db/db_options_test.o $(TEST_LIBRARY) $(LIBRARY) db_range_del_test: $(OBJ_DIR)/db/db_range_del_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) +db_rate_limiter_test: $(OBJ_DIR)/db/db_rate_limiter_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + db_sst_test: $(OBJ_DIR)/db/db_sst_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) diff --git a/TARGETS b/TARGETS index bcc468edf..e51c2c278 100644 --- a/TARGETS +++ b/TARGETS @@ -1441,6 +1441,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_rate_limiter_test", + "db/db_rate_limiter_test.cc", + "parallel", + [], + [], + ], [ "db_secondary_test", "db/db_secondary_test.cc", diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc index 981261001..3651bb4a7 100644 --- a/db/blob/blob_file_reader.cc +++ b/db/blob/blob_file_reader.cc @@ -148,9 +148,10 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader, constexpr uint64_t read_offset = 0; constexpr size_t read_size = BlobLogHeader::kSize; - const Status s = - ReadFromFile(file_reader, read_offset, read_size, statistics, - &header_slice, &buf, &aligned_buf); + // TODO: rate limit reading headers from blob files. + const Status s = ReadFromFile(file_reader, read_offset, read_size, + statistics, &header_slice, &buf, &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { return s; } @@ -198,9 +199,10 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader, const uint64_t read_offset = file_size - BlobLogFooter::kSize; constexpr size_t read_size = BlobLogFooter::kSize; - const Status s = - ReadFromFile(file_reader, read_offset, read_size, statistics, - &footer_slice, &buf, &aligned_buf); + // TODO: rate limit reading footers from blob files. + const Status s = ReadFromFile(file_reader, read_offset, read_size, + statistics, &footer_slice, &buf, &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { return s; } @@ -230,7 +232,8 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader, Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader, uint64_t read_offset, size_t read_size, Statistics* statistics, Slice* slice, - Buffer* buf, AlignedBuf* aligned_buf) { + Buffer* buf, AlignedBuf* aligned_buf, + Env::IOPriority rate_limiter_priority) { assert(slice); assert(buf); assert(aligned_buf); @@ -245,13 +248,13 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader, constexpr char* scratch = nullptr; s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch, - aligned_buf); + aligned_buf, rate_limiter_priority); } else { buf->reset(new char[read_size]); constexpr AlignedBuf* aligned_scratch = nullptr; s = file_reader->Read(IOOptions(), read_offset, read_size, slice, - buf->get(), aligned_scratch); + buf->get(), aligned_scratch, rate_limiter_priority); } if (!s.ok()) { @@ -323,7 +326,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, prefetched = prefetch_buffer->TryReadFromCache( IOOptions(), file_reader_.get(), record_offset, - static_cast(record_size), &record_slice, &s, for_compaction); + static_cast(record_size), &record_slice, &s, + read_options.rate_limiter_priority, for_compaction); if (!s.ok()) { return s; } @@ -334,7 +338,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options, const Status s = ReadFromFile(file_reader_.get(), record_offset, static_cast(record_size), statistics_, - &record_slice, &buf, &aligned_buf); + &record_slice, &buf, &aligned_buf, + read_options.rate_limiter_priority); if (!s.ok()) { return s; } @@ -424,7 +429,8 @@ void BlobFileReader::MultiGetBlob( } TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile"); s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(), - direct_io ? &aligned_buf : nullptr); + direct_io ? &aligned_buf : nullptr, + read_options.rate_limiter_priority); if (!s.ok()) { for (auto& req : read_reqs) { req.status.PermitUncheckedError(); diff --git a/db/blob/blob_file_reader.h b/db/blob/blob_file_reader.h index ffd1d11d5..59bf13182 100644 --- a/db/blob/blob_file_reader.h +++ b/db/blob/blob_file_reader.h @@ -83,7 +83,8 @@ class BlobFileReader { static Status ReadFromFile(const RandomAccessFileReader* file_reader, uint64_t read_offset, size_t read_size, Statistics* statistics, Slice* slice, Buffer* buf, - AlignedBuf* aligned_buf); + AlignedBuf* aligned_buf, + Env::IOPriority rate_limiter_priority); static Status VerifyBlob(const Slice& record_slice, const Slice& user_key, uint64_t value_size); diff --git a/db/blob/blob_log_sequential_reader.cc b/db/blob/blob_log_sequential_reader.cc index 448b3b6f7..778725189 100644 --- a/db/blob/blob_log_sequential_reader.cc +++ b/db/blob/blob_log_sequential_reader.cc @@ -28,8 +28,10 @@ Status BlobLogSequentialReader::ReadSlice(uint64_t size, Slice* slice, assert(file_); StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); - Status s = file_->Read(IOOptions(), next_byte_, static_cast(size), - slice, buf, nullptr); + // TODO: rate limit `BlobLogSequentialReader` reads (it appears unused?) + Status s = + file_->Read(IOOptions(), next_byte_, static_cast(size), slice, + buf, nullptr, Env::IO_TOTAL /* rate_limiter_priority */); next_byte_ += size; if (!s.ok()) { return s; diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index cde52cb00..e84ad734f 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1264,6 +1264,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { ReadOptions read_options; read_options.verify_checksums = true; read_options.fill_cache = false; + read_options.rate_limiter_priority = Env::IO_LOW; // Compaction iterators shouldn't be confined to a single prefix. // Compactions use Seek() for // (a) concurrent compactions, diff --git a/db/convenience.cc b/db/convenience.cc index 097d1fd47..713ca8da9 100644 --- a/db/convenience.cc +++ b/db/convenience.cc @@ -56,7 +56,10 @@ Status VerifySstFileChecksum(const Options& options, } std::unique_ptr table_reader; std::unique_ptr file_reader( - new RandomAccessFileReader(std::move(file), file_path)); + new RandomAccessFileReader( + std::move(file), file_path, ioptions.clock, nullptr /* io_tracer */, + nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */, + ioptions.rate_limiter.get())); const bool kImmortal = true; s = ioptions.table_factory->NewTableReader( TableReaderOptions(ioptions, options.prefix_extractor, env_options, diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 2a822880d..196b428a3 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -5175,7 +5175,8 @@ Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected, fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(), func_name_expected, &file_checksum, &func_name, read_options.readahead_size, immutable_db_options_.allow_mmap_reads, - io_tracer_, immutable_db_options_.rate_limiter.get()); + io_tracer_, immutable_db_options_.rate_limiter.get(), + read_options.rate_limiter_priority); if (s.ok()) { assert(func_name_expected == func_name); if (file_checksum != file_checksum_expected) { diff --git a/db/db_rate_limiter_test.cc b/db/db_rate_limiter_test.cc new file mode 100644 index 000000000..7b200add1 --- /dev/null +++ b/db/db_rate_limiter_test.cc @@ -0,0 +1,262 @@ +// Copyright (c) 2022-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "util/file_checksum_helper.h" + +namespace ROCKSDB_NAMESPACE { + +class DBRateLimiterTest + : public DBTestBase, + public ::testing::WithParamInterface> { + public: + DBRateLimiterTest() + : DBTestBase("db_rate_limiter_test", /*env_do_fsync=*/false), + use_direct_io_(std::get<0>(GetParam())), + use_block_cache_(std::get<1>(GetParam())), + use_readahead_(std::get<2>(GetParam())) {} + + void Init() { + options_ = GetOptions(); + Reopen(options_); + for (int i = 0; i < kNumFiles; ++i) { + for (int j = 0; j < kNumKeysPerFile; ++j) { + ASSERT_OK(Put(Key(i * kNumKeysPerFile + j), "val")); + } + ASSERT_OK(Flush()); + } + MoveFilesToLevel(1); + } + + BlockBasedTableOptions GetTableOptions() { + BlockBasedTableOptions table_options; + table_options.no_block_cache = !use_block_cache_; + return table_options; + } + + ReadOptions GetReadOptions() { + ReadOptions read_options; + read_options.rate_limiter_priority = Env::IO_USER; + read_options.readahead_size = use_readahead_ ? kReadaheadBytes : 0; + return read_options; + } + + Options GetOptions() { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.file_checksum_gen_factory.reset(new FileChecksumGenCrc32cFactory()); + options.rate_limiter.reset(NewGenericRateLimiter( + 1 << 20 /* rate_bytes_per_sec */, 100 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo)); + options.table_factory.reset(NewBlockBasedTableFactory(GetTableOptions())); + options.use_direct_reads = use_direct_io_; + return options; + } + + protected: + const static int kNumKeysPerFile = 1; + const static int kNumFiles = 3; + const static int kReadaheadBytes = 32 << 10; // 32KB + + Options options_; + const bool use_direct_io_; + const bool use_block_cache_; + const bool use_readahead_; +}; + +std::string GetTestNameSuffix( + ::testing::TestParamInfo> info) { + std::ostringstream oss; + if (std::get<0>(info.param)) { + oss << "DirectIO"; + } else { + oss << "BufferedIO"; + } + if (std::get<1>(info.param)) { + oss << "_BlockCache"; + } else { + oss << "_NoBlockCache"; + } + if (std::get<2>(info.param)) { + oss << "_Readahead"; + } else { + oss << "_NoReadahead"; + } + return oss.str(); +} + +#ifndef ROCKSDB_LITE +INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest, + ::testing::Combine(::testing::Bool(), ::testing::Bool(), + ::testing::Bool()), + GetTestNameSuffix); +#else // ROCKSDB_LITE +// Cannot use direct I/O in lite mode. +INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest, + ::testing::Combine(::testing::Values(false), + ::testing::Bool(), + ::testing::Bool()), + GetTestNameSuffix); +#endif // ROCKSDB_LITE + +TEST_P(DBRateLimiterTest, Get) { + if (use_direct_io_ && !IsDirectIOSupported()) { + return; + } + Init(); + + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + int expected = 0; + for (int i = 0; i < kNumFiles; ++i) { + { + std::string value; + ASSERT_OK(db_->Get(GetReadOptions(), Key(i * kNumKeysPerFile), &value)); + ++expected; + } + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + { + std::string value; + ASSERT_OK(db_->Get(GetReadOptions(), Key(i * kNumKeysPerFile), &value)); + if (!use_block_cache_) { + ++expected; + } + } + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + } +} + +TEST_P(DBRateLimiterTest, NewMultiGet) { + // The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not + // yet support rate limiting. + if (use_direct_io_ && !IsDirectIOSupported()) { + return; + } + Init(); + + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + const int kNumKeys = kNumFiles * kNumKeysPerFile; + { + std::vector key_bufs; + key_bufs.reserve(kNumKeys); + std::vector keys; + keys.reserve(kNumKeys); + for (int i = 0; i < kNumKeys; ++i) { + key_bufs.emplace_back(Key(i)); + keys.emplace_back(key_bufs[i]); + } + std::vector statuses(kNumKeys); + std::vector values(kNumKeys); + db_->MultiGet(GetReadOptions(), dbfull()->DefaultColumnFamily(), kNumKeys, + keys.data(), values.data(), statuses.data()); + for (int i = 0; i < kNumKeys; ++i) { + ASSERT_TRUE(statuses[i].IsNotSupported()); + } + } + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); +} + +TEST_P(DBRateLimiterTest, OldMultiGet) { + // The old `vector`-returning `MultiGet()` APIs use `Read()`, which + // supports rate limiting. + if (use_direct_io_ && !IsDirectIOSupported()) { + return; + } + Init(); + + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + const int kNumKeys = kNumFiles * kNumKeysPerFile; + int expected = 0; + { + std::vector key_bufs; + key_bufs.reserve(kNumKeys); + std::vector keys; + keys.reserve(kNumKeys); + for (int i = 0; i < kNumKeys; ++i) { + key_bufs.emplace_back(Key(i)); + keys.emplace_back(key_bufs[i]); + } + std::vector values; + std::vector statuses = + db_->MultiGet(GetReadOptions(), keys, &values); + for (int i = 0; i < kNumKeys; ++i) { + ASSERT_OK(statuses[i]); + } + } + expected += kNumKeys; + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); +} + +TEST_P(DBRateLimiterTest, Iterator) { + if (use_direct_io_ && !IsDirectIOSupported()) { + return; + } + Init(); + + std::unique_ptr iter(db_->NewIterator(GetReadOptions())); + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + int expected = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ++expected; + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + } + + for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { + // When `use_block_cache_ == true`, the reverse scan will access the blocks + // loaded to cache during the above forward scan, in which case no further + // file reads are expected. + if (!use_block_cache_) { + ++expected; + } + } + // Reverse scan does not read evenly (one block per iteration) due to + // descending seqno ordering, so wait until after the loop to check total. + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); +} + +#if !defined(ROCKSDB_LITE) + +TEST_P(DBRateLimiterTest, VerifyChecksum) { + if (use_direct_io_ && !IsDirectIOSupported()) { + return; + } + Init(); + + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + ASSERT_OK(db_->VerifyChecksum(GetReadOptions())); + // The files are tiny so there should have just been one read per file. + int expected = kNumFiles; + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); +} + +TEST_P(DBRateLimiterTest, VerifyFileChecksums) { + if (use_direct_io_ && !IsDirectIOSupported()) { + return; + } + Init(); + + ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); + + ASSERT_OK(db_->VerifyFileChecksums(GetReadOptions())); + // The files are tiny so there should have just been one read per file. + int expected = kNumFiles; + ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER)); +} + +#endif // !defined(ROCKSDB_LITE) + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/db_test2.cc b/db/db_test2.cc index 5ed08e77f..1d23a3e89 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -3937,68 +3937,74 @@ TEST_F(DBTest2, RateLimitedCompactionReads) { const int kBytesPerKey = 1024; const int kNumL0Files = 4; - for (auto use_direct_io : {false, true}) { - if (use_direct_io && !IsDirectIOSupported()) { - continue; - } - Options options = CurrentOptions(); - options.compression = kNoCompression; - options.level0_file_num_compaction_trigger = kNumL0Files; - options.memtable_factory.reset( - test::NewSpecialSkipListFactory(kNumKeysPerFile)); - // takes roughly one second, split into 100 x 10ms intervals. Each interval - // permits 5.12KB, which is smaller than the block size, so this test - // exercises the code for chunking reads. - options.rate_limiter.reset(NewGenericRateLimiter( - static_cast(kNumL0Files * kNumKeysPerFile * - kBytesPerKey) /* rate_bytes_per_sec */, - 10 * 1000 /* refill_period_us */, 10 /* fairness */, - RateLimiter::Mode::kReadsOnly)); - options.use_direct_reads = options.use_direct_io_for_flush_and_compaction = - use_direct_io; - BlockBasedTableOptions bbto; - bbto.block_size = 16384; - bbto.no_block_cache = true; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - DestroyAndReopen(options); + for (int compaction_readahead_size : {0, 32 << 10}) { + for (auto use_direct_io : {false, true}) { + if (use_direct_io && !IsDirectIOSupported()) { + continue; + } + Options options = CurrentOptions(); + options.compaction_readahead_size = compaction_readahead_size; + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = kNumL0Files; + options.memtable_factory.reset( + test::NewSpecialSkipListFactory(kNumKeysPerFile)); + // takes roughly one second, split into 100 x 10ms intervals. Each + // interval permits 5.12KB, which is smaller than the block size, so this + // test exercises the code for chunking reads. + options.rate_limiter.reset(NewGenericRateLimiter( + static_cast(kNumL0Files * kNumKeysPerFile * + kBytesPerKey) /* rate_bytes_per_sec */, + 10 * 1000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kReadsOnly)); + options.use_direct_reads = + options.use_direct_io_for_flush_and_compaction = use_direct_io; + BlockBasedTableOptions bbto; + bbto.block_size = 16384; + bbto.no_block_cache = true; + options.table_factory.reset(NewBlockBasedTableFactory(bbto)); + DestroyAndReopen(options); - for (int i = 0; i < kNumL0Files; ++i) { - for (int j = 0; j <= kNumKeysPerFile; ++j) { - ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey))); + for (int i = 0; i < kNumL0Files; ++i) { + for (int j = 0; j <= kNumKeysPerFile; ++j) { + ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey))); + } + ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + if (i + 1 < kNumL0Files) { + ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); + } } - ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); - if (i + 1 < kNumL0Files) { - ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); + ASSERT_OK(dbfull()->TEST_WaitForCompact()); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + + // should be slightly above 512KB due to non-data blocks read. Arbitrarily + // chose 1MB as the upper bound on the total bytes read. + size_t rate_limited_bytes = + options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL); + // There must be no charges at non-`IO_LOW` priorities. + ASSERT_EQ(rate_limited_bytes, + static_cast( + options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW))); + // Include the explicit prefetch of the footer in direct I/O case. + size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0; + ASSERT_GE( + rate_limited_bytes, + static_cast(kNumKeysPerFile * kBytesPerKey * kNumL0Files)); + ASSERT_LT( + rate_limited_bytes, + static_cast(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files + + direct_io_extra)); + + Iterator* iter = db_->NewIterator(ReadOptions()); + ASSERT_OK(iter->status()); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey)); } + delete iter; + // bytes read for user iterator shouldn't count against the rate limit. + ASSERT_EQ(rate_limited_bytes, + static_cast( + options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW))); } - ASSERT_OK(dbfull()->TEST_WaitForCompact()); - ASSERT_EQ(0, NumTableFilesAtLevel(0)); - - ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH)); - // should be slightly above 512KB due to non-data blocks read. Arbitrarily - // chose 1MB as the upper bound on the total bytes read. - size_t rate_limited_bytes = - options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW); - // Include the explicit prefetch of the footer in direct I/O case. - size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0; - ASSERT_GE( - rate_limited_bytes, - static_cast(kNumKeysPerFile * kBytesPerKey * kNumL0Files)); - ASSERT_LT( - rate_limited_bytes, - static_cast(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files + - direct_io_extra)); - - Iterator* iter = db_->NewIterator(ReadOptions()); - ASSERT_OK(iter->status()); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey)); - } - delete iter; - // bytes read for user iterator shouldn't count against the rate limit. - ASSERT_EQ(rate_limited_bytes, - static_cast( - options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW))); } } #endif // ROCKSDB_LITE diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index e1e873396..3959974df 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -212,6 +212,8 @@ Status ExternalSstFileIngestionJob::Prepare( std::string generated_checksum; std::string generated_checksum_func_name; std::string requested_checksum_func_name; + // TODO: rate limit file reads for checksum calculation during file + // ingestion. IOStatus io_s = GenerateOneFileChecksum( fs_.get(), files_to_ingest_[i].internal_file_path, db_options_.file_checksum_gen_factory.get(), @@ -219,7 +221,8 @@ Status ExternalSstFileIngestionJob::Prepare( &generated_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, db_options_.allow_mmap_reads, io_tracer_, - db_options_.rate_limiter.get()); + db_options_.rate_limiter.get(), + Env::IO_TOTAL /* rate_limiter_priority */); if (!io_s.ok()) { status = io_s; ROCKS_LOG_WARN(db_options_.info_log, @@ -907,12 +910,14 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile( std::string file_checksum; std::string file_checksum_func_name; std::string requested_checksum_func_name; + // TODO: rate limit file reads for checksum calculation during file ingestion. IOStatus io_s = GenerateOneFileChecksum( fs_.get(), file_to_ingest->internal_file_path, db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name, &file_checksum, &file_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, - db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get()); + db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get(), + Env::IO_TOTAL /* rate_limiter_priority */); if (!io_s.ok()) { return io_s; } diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index c3a4f69b1..ae1e57314 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -286,6 +286,8 @@ class CfConsistencyStressTest : public StressTest { } void VerifyDb(ThreadState* thread) const override { + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions options(FLAGS_verify_checksum, true); // We must set total_order_seek to true because we are doing a SeekToFirst // on a column family whose memtables may support (by default) prefix-based @@ -472,6 +474,8 @@ class CfConsistencyStressTest : public StressTest { *checksum = ret; return iter->status(); }; + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions ropts; ropts.total_order_seek = true; ropts.snapshot = snapshot_guard.get(); diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 22e3c9e43..5f67b0b8e 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -175,6 +175,7 @@ DECLARE_double(max_bytes_for_level_multiplier); DECLARE_int32(range_deletion_width); DECLARE_uint64(rate_limiter_bytes_per_sec); DECLARE_bool(rate_limit_bg_reads); +DECLARE_bool(rate_limit_user_ops); DECLARE_uint64(sst_file_manager_bytes_per_sec); DECLARE_uint64(sst_file_manager_bytes_per_truncate); DECLARE_bool(use_txn); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 8d9a04427..4933b35a4 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -546,6 +546,10 @@ DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); DEFINE_bool(rate_limit_bg_reads, false, "Use options.rate_limiter on compaction reads"); +DEFINE_bool(rate_limit_user_ops, false, + "When true use Env::IO_USER priority level to charge internal rate " + "limiter for reads associated with user operations."); + DEFINE_uint64(sst_file_manager_bytes_per_sec, 0, "Set `Options::sst_file_manager` to delete at this rate. By " "default the deletion rate is unbounded."); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 67b0caeb5..3ad358876 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -349,6 +349,8 @@ bool StressTest::VerifySecondaries() { fprintf(stderr, "Secondary failed to catch up with primary\n"); return false; } + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions ropts; ropts.total_order_seek = true; // Verify only the default column family since the primary may have @@ -397,6 +399,8 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf, if (cf->GetName() != snap_state.cf_at_name) { return s; } + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions ropt; ropt.snapshot = snap_state.snapshot; Slice ts; @@ -633,6 +637,8 @@ Status StressTest::RollbackTxn(Transaction* txn) { void StressTest::OperateDb(ThreadState* thread) { ReadOptions read_opts(FLAGS_verify_checksum, true); + read_opts.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; WriteOptions write_opts; auto shared = thread->shared; char value[100]; @@ -1133,6 +1139,8 @@ Status StressTest::TestIterate(ThreadState* thread, // to bounds, prefix extractor or reseeking. Sometimes we are comparing // iterators with the same set-up, and it doesn't hurt to check them // to be equal. + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions cmp_ro; cmp_ro.timestamp = readoptionscopy.timestamp; cmp_ro.snapshot = snapshot; @@ -1573,6 +1581,8 @@ Status StressTest::TestBackupRestore( std::string key_str = Key(rand_keys[0]); Slice key = key_str; std::string restored_value; + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions read_opts; std::string ts_str; Slice ts; @@ -1943,6 +1953,8 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread, const std::string& keystr, uint64_t i) { Slice key = keystr; ColumnFamilyHandle* column_family = column_families_[rand_column_family]; + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions ropt; #ifndef ROCKSDB_LITE auto db_impl = static_cast_with_check(db_->GetRootDB()); @@ -2096,6 +2108,8 @@ uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot, const Slice& end_key) { const std::string kCrcCalculatorSepearator = ";"; uint32_t crc = 0; + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions ro; ro.snapshot = snapshot; ro.total_order_seek = true; diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 9420969fb..e51f17a15 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -477,6 +477,8 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, }); ReadOptions ropts; + ropts.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; std::string value; s = txn->GetForUpdate(ropts, old_pk, &value); if (!s.ok()) { @@ -596,6 +598,8 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, } ropts.total_order_seek = true; ropts.iterate_upper_bound = &iter_ub; + ropts.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; it = txn->GetIterator(ropts); assert(it); @@ -620,6 +624,8 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, std::string pk = Record::EncodePrimaryKey(record.a_value()); std::string value; ReadOptions read_opts; + read_opts.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; read_opts.snapshot = txn->GetSnapshot(); s = txn->GetForUpdate(read_opts, pk, &value); if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || @@ -722,6 +728,8 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, RollbackTxn(txn).PermitUncheckedError(); }); ReadOptions ropts; + ropts.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; std::string value; s = txn->GetForUpdate(ropts, pk_str, &value); if (!s.ok()) { @@ -851,6 +859,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { std::string iter_ub_str(buf, sizeof(buf)); Slice iter_ub = iter_ub_str; + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions ropts; ropts.snapshot = snapshot; ropts.total_order_seek = true; @@ -870,6 +880,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { std::reverse(buf, buf + sizeof(buf)); const std::string start_key(buf, sizeof(buf)); + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions ropts; ropts.snapshot = snapshot; ropts.total_order_seek = true; diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index fdabef965..02a71e30a 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -21,6 +21,8 @@ class NonBatchedOpsStressTest : public StressTest { virtual ~NonBatchedOpsStressTest() {} void VerifyDb(ThreadState* thread) const override { + // This `ReadOptions` is for validation purposes. Ignore + // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. ReadOptions options(FLAGS_verify_checksum, true); std::string ts_str; Slice ts; diff --git a/file/file_prefetch_buffer.cc b/file/file_prefetch_buffer.cc index 719e6e7d2..488844c4f 100644 --- a/file/file_prefetch_buffer.cc +++ b/file/file_prefetch_buffer.cc @@ -24,7 +24,7 @@ namespace ROCKSDB_NAMESPACE { Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, - bool for_compaction) { + Env::IOPriority rate_limiter_priority) { if (!enable_ || reader == nullptr) { return Status::OK(); } @@ -90,7 +90,8 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, Slice result; size_t read_len = static_cast(roundup_len - chunk_len); s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result, - buffer_.BufferStart() + chunk_len, nullptr, for_compaction); + buffer_.BufferStart() + chunk_len, nullptr, + rate_limiter_priority); if (!s.ok()) { return s; } @@ -111,7 +112,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* status, - bool for_compaction) { + Env::IOPriority rate_limiter_priority, + bool for_compaction /* = false */) { if (track_min_offset_ && offset < min_offset_read_) { min_offset_read_ = static_cast(offset); } @@ -132,7 +134,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, Status s; if (for_compaction) { s = Prefetch(opts, reader, offset, std::max(n, readahead_size_), - for_compaction); + rate_limiter_priority); } else { if (implicit_auto_readahead_) { // Prefetch only if this read is sequential otherwise reset @@ -152,7 +154,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts, return false; } } - s = Prefetch(opts, reader, offset, n + readahead_size_, for_compaction); + s = Prefetch(opts, reader, offset, n + readahead_size_, + rate_limiter_priority); } if (!s.ok()) { if (status) { diff --git a/file/file_prefetch_buffer.h b/file/file_prefetch_buffer.h index 6607a34de..a6e135e4e 100644 --- a/file/file_prefetch_buffer.h +++ b/file/file_prefetch_buffer.h @@ -68,12 +68,14 @@ class FilePrefetchBuffer { num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1) {} // Load data into the buffer from a file. - // reader : the file reader. - // offset : the file offset to start reading from. - // n : the number of bytes to read. - // for_compaction : if prefetch is done for compaction read. + // reader : the file reader. + // offset : the file offset to start reading from. + // n : the number of bytes to read. + // rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to + // bypass. Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, - uint64_t offset, size_t n, bool for_compaction = false); + uint64_t offset, size_t n, + Env::IOPriority rate_limiter_priority); // Tries returning the data for a file read from this buffer if that data is // in the buffer. @@ -81,15 +83,18 @@ class FilePrefetchBuffer { // It also does the exponential readahead when readahead_size is set as part // of the constructor. // - // opts : the IO options to use. - // reader : the file reader. - // offset : the file offset. - // n : the number of bytes. - // result : output buffer to put the data into. - // s : output status. - // for_compaction : true if cache read is done for compaction read. + // opts : the IO options to use. + // reader : the file reader. + // offset : the file offset. + // n : the number of bytes. + // result : output buffer to put the data into. + // s : output status. + // rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to + // bypass. + // for_compaction : true if cache read is done for compaction read. bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader, uint64_t offset, size_t n, Slice* result, Status* s, + Env::IOPriority rate_limiter_priority, bool for_compaction = false); // The minimum `offset` ever passed to TryReadFromCache(). This will nly be diff --git a/file/file_util.cc b/file/file_util.cc index 87e343a53..332f2263d 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -123,7 +123,8 @@ IOStatus GenerateOneFileChecksum( const std::string& requested_checksum_func_name, std::string* file_checksum, std::string* file_checksum_func_name, size_t verify_checksums_readahead_size, bool allow_mmap_reads, - std::shared_ptr& io_tracer, RateLimiter* rate_limiter) { + std::shared_ptr& io_tracer, RateLimiter* rate_limiter, + Env::IOPriority rate_limiter_priority) { if (checksum_factory == nullptr) { return IOStatus::InvalidArgument("Checksum factory is invalid"); } @@ -195,7 +196,8 @@ IOStatus GenerateOneFileChecksum( static_cast(std::min(uint64_t{readahead_size}, size)); if (!prefetch_buffer.TryReadFromCache( opts, reader.get(), offset, bytes_to_read, &slice, - nullptr /* status */, false /* for_compaction */)) { + nullptr /* status */, rate_limiter_priority, + false /* for_compaction */)) { return IOStatus::Corruption("file read failed"); } if (slice.size() == 0) { diff --git a/file/file_util.h b/file/file_util.h index 4728e511d..41ffc98a2 100644 --- a/file/file_util.h +++ b/file/file_util.h @@ -51,20 +51,8 @@ extern IOStatus GenerateOneFileChecksum( const std::string& requested_checksum_func_name, std::string* file_checksum, std::string* file_checksum_func_name, size_t verify_checksums_readahead_size, bool allow_mmap_reads, - std::shared_ptr& io_tracer, RateLimiter* rate_limiter = nullptr); - -inline IOStatus GenerateOneFileChecksum( - const std::shared_ptr& fs, const std::string& file_path, - FileChecksumGenFactory* checksum_factory, - const std::string& requested_checksum_func_name, std::string* file_checksum, - std::string* file_checksum_func_name, - size_t verify_checksums_readahead_size, bool allow_mmap_reads, - std::shared_ptr& io_tracer) { - return GenerateOneFileChecksum( - fs.get(), file_path, checksum_factory, requested_checksum_func_name, - file_checksum, file_checksum_func_name, verify_checksums_readahead_size, - allow_mmap_reads, io_tracer); -} + std::shared_ptr& io_tracer, RateLimiter* rate_limiter, + Env::IOPriority rate_limiter_priority); inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, SystemClock* clock, IOOptions& opts) { diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index 6857f253a..dc74eb0fb 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -116,10 +116,10 @@ IOStatus RandomAccessFileReader::Create( return io_s; } -IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, - size_t n, Slice* result, char* scratch, - AlignedBuf* aligned_buf, - bool for_compaction) const { +IOStatus RandomAccessFileReader::Read( + const IOOptions& opts, uint64_t offset, size_t n, Slice* result, + char* scratch, AlignedBuf* aligned_buf, + Env::IOPriority rate_limiter_priority) const { (void)aligned_buf; TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr); @@ -153,10 +153,11 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, buf.AllocateNewBuffer(read_size); while (buf.CurrentSize() < read_size) { size_t allowed; - if (for_compaction && rate_limiter_ != nullptr) { + if (rate_limiter_priority != Env::IO_TOTAL && + rate_limiter_ != nullptr) { allowed = rate_limiter_->RequestToken( buf.Capacity() - buf.CurrentSize(), buf.Alignment(), - Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); + rate_limiter_priority, stats_, RateLimiter::OpType::kRead); } else { assert(buf.CurrentSize() == 0); allowed = read_size; @@ -212,12 +213,13 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, const char* res_scratch = nullptr; while (pos < n) { size_t allowed; - if (for_compaction && rate_limiter_ != nullptr) { + if (rate_limiter_priority != Env::IO_TOTAL && + rate_limiter_ != nullptr) { if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { sw.DelayStart(); } allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, - Env::IOPriority::IO_LOW, stats_, + rate_limiter_priority, stats_, RateLimiter::OpType::kRead); if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { sw.DelayStop(); @@ -311,10 +313,12 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) { return true; } -IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts, - FSReadRequest* read_reqs, - size_t num_reqs, - AlignedBuf* aligned_buf) const { +IOStatus RandomAccessFileReader::MultiRead( + const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs, + AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const { + if (rate_limiter_priority != Env::IO_TOTAL) { + return IOStatus::NotSupported("Unable to rate limit MultiRead()"); + } (void)aligned_buf; // suppress warning of unused variable in LITE mode assert(num_reqs > 0); diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h index 8b4ef4b94..a8d4d15d8 100644 --- a/file/random_access_file_reader.h +++ b/file/random_access_file_reader.h @@ -139,17 +139,26 @@ class RandomAccessFileReader { // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns // the internally allocated buffer on return, and the result refers to a // region in aligned_buf. + // + // `rate_limiter_priority` is used to charge the internal rate limiter when + // enabled. The special value `Env::IO_TOTAL` makes this operation bypass the + // rate limiter. IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result, char* scratch, AlignedBuf* aligned_buf, - bool for_compaction = false) const; + Env::IOPriority rate_limiter_priority) const; // REQUIRES: // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing. // In non-direct IO mode, aligned_buf should be null; // In direct IO mode, aligned_buf stores the aligned buffer allocated inside // MultiRead, the result Slices in reqs refer to aligned_buf. + // + // `rate_limiter_priority` will be used to charge the internal rate limiter. + // It is not yet supported so the client must provide the special value + // `Env::IO_TOTAL` to bypass the rate limiter. IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs, - size_t num_reqs, AlignedBuf* aligned_buf) const; + size_t num_reqs, AlignedBuf* aligned_buf, + Env::IOPriority rate_limiter_priority) const; IOStatus Prefetch(uint64_t offset, size_t n) const { return file_->Prefetch(offset, n, IOOptions(), nullptr); diff --git a/file/random_access_file_reader_test.cc b/file/random_access_file_reader_test.cc index 77a0e84a0..0f5402686 100644 --- a/file/random_access_file_reader_test.cc +++ b/file/random_access_file_reader_test.cc @@ -85,9 +85,9 @@ TEST_F(RandomAccessFileReaderTest, ReadDirectIO) { size_t len = page_size / 3; Slice result; AlignedBuf buf; - for (bool for_compaction : {true, false}) { + for (Env::IOPriority rate_limiter_priority : {Env::IO_LOW, Env::IO_TOTAL}) { ASSERT_OK(r->Read(IOOptions(), offset, len, &result, nullptr, &buf, - for_compaction)); + rate_limiter_priority)); ASSERT_EQ(result.ToString(), content.substr(offset, len)); } } @@ -138,8 +138,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r0)); reqs.push_back(std::move(r1)); AlignedBuf aligned_buf; - ASSERT_OK( - r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); + ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */)); AssertResult(content, reqs); @@ -183,8 +183,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r1)); reqs.push_back(std::move(r2)); AlignedBuf aligned_buf; - ASSERT_OK( - r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); + ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */)); AssertResult(content, reqs); @@ -228,8 +228,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r1)); reqs.push_back(std::move(r2)); AlignedBuf aligned_buf; - ASSERT_OK( - r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); + ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */)); AssertResult(content, reqs); @@ -265,8 +265,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) { reqs.push_back(std::move(r0)); reqs.push_back(std::move(r1)); AlignedBuf aligned_buf; - ASSERT_OK( - r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); + ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */)); AssertResult(content, reqs); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 2bc80f007..cbbf4cfd0 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -491,9 +491,18 @@ struct DBOptions { // Default: Env::Default() Env* env = Env::Default(); - // Use to control write/read rate of flush and compaction. Flush has higher - // priority than compaction. Rate limiting is disabled if nullptr. - // If rate limiter is enabled, bytes_per_sync is set to 1MB by default. + // Limits internal file read/write bandwidth: + // + // - Flush requests write bandwidth at `Env::IOPriority::IO_HIGH` + // - Compaction requests read and write bandwidth at + // `Env::IOPriority::IO_LOW` + // - Reads associated with a `ReadOptions` can be charged at + // `ReadOptions::rate_limiter_priority` (see that option's API doc for usage + // and limitations). + // + // Rate limiting is disabled if nullptr. If rate limiter is enabled, + // bytes_per_sync is set to 1MB by default. + // // Default: nullptr std::shared_ptr rate_limiter = nullptr; @@ -1560,6 +1569,26 @@ struct ReadOptions { // Default: false bool adaptive_readahead; + // For file reads associated with this option, charge the internal rate + // limiter (see `DBOptions::rate_limiter`) at the specified priority. The + // special value `Env::IO_TOTAL` disables charging the rate limiter. + // + // The rate limiting is bypassed no matter this option's value for file reads + // on plain tables (these can exist when `ColumnFamilyOptions::table_factory` + // is a `PlainTableFactory`) and cuckoo tables (these can exist when + // `ColumnFamilyOptions::table_factory` is a `CuckooTableFactory`). + // + // The new `DB::MultiGet()` APIs (i.e., the ones returning `void`) will return + // `Status::NotSupported` when that operation requires file read(s) and + // `rate_limiter_priority != Env::IO_TOTAL`. + // + // The bytes charged to rate limiter may not exactly match the file read bytes + // since there are some seemingly insignificant reads, like for file + // headers/footers, that we currently do not charge to rate limiter. + // + // Default: `Env::IO_TOTAL`. + Env::IOPriority rate_limiter_priority = Env::IO_TOTAL; + ReadOptions(); ReadOptions(bool cksum, bool cache); }; diff --git a/src.mk b/src.mk index 6c6dee439..d0bd9fd8a 100644 --- a/src.mk +++ b/src.mk @@ -445,6 +445,7 @@ TEST_MAIN_SOURCES = \ db/db_options_test.cc \ db/db_properties_test.cc \ db/db_range_del_test.cc \ + db/db_rate_limiter_test.cc \ db/db_secondary_test.cc \ db/db_sst_test.cc \ db/db_statistics_test.cc \ diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc index 0090b4fea..4908a03af 100644 --- a/table/block_based/block_based_table_reader.cc +++ b/table/block_based/block_based_table_reader.cc @@ -771,7 +771,9 @@ Status BlockBasedTable::PrefetchTail( IOOptions opts; Status s = file->PrepareIOOptions(ro, opts); if (s.ok()) { - s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len); + s = (*prefetch_buffer) + ->Prefetch(opts, file, prefetch_off, prefetch_len, + ro.rate_limiter_priority); } return s; } @@ -1730,8 +1732,8 @@ void BlockBasedTable::RetrieveMultipleBlocks( IOOptions opts; IOStatus s = file->PrepareIOOptions(options, opts); if (s.ok()) { - s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), - &direct_io_buf); + s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf, + options.rate_limiter_priority); } if (!s.ok()) { // Discard all the results in this batch if there is any time out @@ -2981,7 +2983,7 @@ Status BlockBasedTable::VerifyChecksumInBlocks( BlockHandle handle = index_iter->value().handle; BlockContents contents; BlockFetcher block_fetcher( - rep_->file.get(), &prefetch_buffer, rep_->footer, ReadOptions(), handle, + rep_->file.get(), &prefetch_buffer, rep_->footer, read_options, handle, &contents, rep_->ioptions, false /* decompress */, false /*maybe_compressed*/, BlockType::kData, UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options); diff --git a/table/block_based/partitioned_filter_block.cc b/table/block_based/partitioned_filter_block.cc index d3cbe84fb..f9d53aba7 100644 --- a/table/block_based/partitioned_filter_block.cc +++ b/table/block_based/partitioned_filter_block.cc @@ -503,7 +503,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro, s = rep->file->PrepareIOOptions(ro, opts); if (s.ok()) { s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, - static_cast(prefetch_len)); + static_cast(prefetch_len), + ro.rate_limiter_priority); } if (!s.ok()) { return s; diff --git a/table/block_based/partitioned_index_reader.cc b/table/block_based/partitioned_index_reader.cc index 71af6af9a..e295d41a4 100644 --- a/table/block_based/partitioned_index_reader.cc +++ b/table/block_based/partitioned_index_reader.cc @@ -158,7 +158,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro, Status s = rep->file->PrepareIOOptions(ro, opts); if (s.ok()) { s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, - static_cast(prefetch_len)); + static_cast(prefetch_len), + ro.rate_limiter_priority); } if (!s.ok()) { return s; diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index 54604238c..cca6d4911 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -70,9 +70,9 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() { IOOptions opts; IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); if (io_s.ok() && - prefetch_buffer_->TryReadFromCache(opts, file_, handle_.offset(), - block_size_with_trailer_, &slice_, - &io_s, for_compaction_)) { + prefetch_buffer_->TryReadFromCache( + opts, file_, handle_.offset(), block_size_with_trailer_, &slice_, + &io_s, read_options_.rate_limiter_priority, for_compaction_)) { ProcessTrailerIfPresent(); if (!io_status_.ok()) { return true; @@ -245,17 +245,17 @@ IOStatus BlockFetcher::ReadBlockContents() { if (io_status_.ok()) { if (file_->use_direct_io()) { PERF_TIMER_GUARD(block_read_time); - io_status_ = - file_->Read(opts, handle_.offset(), block_size_with_trailer_, - &slice_, nullptr, &direct_io_buf_, for_compaction_); + io_status_ = file_->Read( + opts, handle_.offset(), block_size_with_trailer_, &slice_, nullptr, + &direct_io_buf_, read_options_.rate_limiter_priority); PERF_COUNTER_ADD(block_read_count, 1); used_buf_ = const_cast(slice_.data()); } else { PrepareBufferForBlockFromFile(); PERF_TIMER_GUARD(block_read_time); - io_status_ = - file_->Read(opts, handle_.offset(), block_size_with_trailer_, - &slice_, used_buf_, nullptr, for_compaction_); + io_status_ = file_->Read(opts, handle_.offset(), + block_size_with_trailer_, &slice_, used_buf_, + nullptr, read_options_.rate_limiter_priority); PERF_COUNTER_ADD(block_read_count, 1); #ifndef NDEBUG if (slice_.data() == &stack_buf_[0]) { diff --git a/table/cuckoo/cuckoo_table_builder_test.cc b/table/cuckoo/cuckoo_table_builder_test.cc index 5d6ebfbdd..c3f2b5379 100644 --- a/table/cuckoo/cuckoo_table_builder_test.cc +++ b/table/cuckoo/cuckoo_table_builder_test.cc @@ -114,7 +114,8 @@ class CuckooBuilderTest : public testing::Test { for (uint32_t i = 0; i + 1 < table_size + cuckoo_block_size; ++i) { Slice read_slice; ASSERT_OK(file_reader->Read(IOOptions(), i * bucket_size, bucket_size, - &read_slice, nullptr, nullptr)); + &read_slice, nullptr, nullptr, + Env::IO_TOTAL /* rate_limiter_priority */)); size_t key_idx = std::find(expected_locations.begin(), expected_locations.end(), i) - expected_locations.begin(); diff --git a/table/cuckoo/cuckoo_table_reader.cc b/table/cuckoo/cuckoo_table_reader.cc index 44725767b..69c93d76e 100644 --- a/table/cuckoo/cuckoo_table_reader.cc +++ b/table/cuckoo/cuckoo_table_reader.cc @@ -141,8 +141,10 @@ CuckooTableReader::CuckooTableReader( cuckoo_block_size_ = *reinterpret_cast( cuckoo_block_size->second.data()); cuckoo_block_bytes_minus_one_ = cuckoo_block_size_ * bucket_length_ - 1; - status_ = file_->Read(IOOptions(), 0, static_cast(file_size), - &file_data_, nullptr, nullptr); + // TODO: rate limit reads of whole cuckoo tables. + status_ = + file_->Read(IOOptions(), 0, static_cast(file_size), &file_data_, + nullptr, nullptr, Env::IO_TOTAL /* rate_limiter_priority */); } Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/, diff --git a/table/format.cc b/table/format.cc index 9f79db46c..4db3367ea 100644 --- a/table/format.cc +++ b/table/format.cc @@ -369,17 +369,20 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file, // the required data is not in the prefetch buffer. Once deadline is enabled // for iterator, TryReadFromCache might do a readahead. Revisit to see if we // need to pass a timeout at that point + // TODO: rate limit footer reads. if (prefetch_buffer == nullptr || - !prefetch_buffer->TryReadFromCache(IOOptions(), file, read_offset, - Footer::kMaxEncodedLength, - &footer_input, nullptr)) { + !prefetch_buffer->TryReadFromCache( + IOOptions(), file, read_offset, Footer::kMaxEncodedLength, + &footer_input, nullptr, Env::IO_TOTAL /* rate_limiter_priority */)) { if (file->use_direct_io()) { s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, - &footer_input, nullptr, &internal_buf); + &footer_input, nullptr, &internal_buf, + Env::IO_TOTAL /* rate_limiter_priority */); } else { footer_buf.reserve(Footer::kMaxEncodedLength); s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, - &footer_input, &footer_buf[0], nullptr); + &footer_input, &footer_buf[0], nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); } if (!s.ok()) return s; } diff --git a/table/mock_table.cc b/table/mock_table.cc index c40b6a270..03c594bc7 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -291,7 +291,8 @@ Status MockTableFactory::GetIDFromFile(RandomAccessFileReader* file, uint32_t* id) const { char buf[4]; Slice result; - Status s = file->Read(IOOptions(), 0, 4, &result, buf, nullptr); + Status s = file->Read(IOOptions(), 0, 4, &result, buf, nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); assert(result.size() == 4); *id = DecodeFixed32(buf); return s; diff --git a/table/plain/plain_table_key_coding.cc b/table/plain/plain_table_key_coding.cc index e3a76f89e..93b198fc5 100644 --- a/table/plain/plain_table_key_coding.cc +++ b/table/plain/plain_table_key_coding.cc @@ -212,9 +212,11 @@ bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len, new_buffer->buf_len = 0; } Slice read_result; + // TODO: rate limit plain table reads. Status s = file_info_->file->Read(IOOptions(), file_offset, size_to_read, - &read_result, new_buffer->buf.get(), nullptr); + &read_result, new_buffer->buf.get(), nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { status_ = s; return false; diff --git a/table/plain/plain_table_reader.cc b/table/plain/plain_table_reader.cc index 2d6902a2a..67b16e075 100644 --- a/table/plain/plain_table_reader.cc +++ b/table/plain/plain_table_reader.cc @@ -288,9 +288,9 @@ void PlainTableReader::FillBloom(const std::vector& prefix_hashes) { Status PlainTableReader::MmapDataIfNeeded() { if (file_info_.is_mmap_mode) { // Get mmapped memory. - return file_info_.file->Read(IOOptions(), 0, - static_cast(file_size_), - &file_info_.file_data, nullptr, nullptr); + return file_info_.file->Read( + IOOptions(), 0, static_cast(file_size_), &file_info_.file_data, + nullptr, nullptr, Env::IO_TOTAL /* rate_limiter_priority */); } return Status::OK(); } diff --git a/table/sst_file_dumper.cc b/table/sst_file_dumper.cc index a887c8568..1aad1d5ae 100644 --- a/table/sst_file_dumper.cc +++ b/table/sst_file_dumper.cc @@ -107,7 +107,8 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) { uint64_t prefetch_off = file_size - prefetch_size; IOOptions opts; s = prefetch_buffer.Prefetch(opts, file_.get(), prefetch_off, - static_cast(prefetch_size)); + static_cast(prefetch_size), + Env::IO_TOTAL /* rate_limiter_priority */); s = ReadFooterFromFile(opts, file_.get(), &prefetch_buffer, file_size, &footer); diff --git a/table/table_test.cc b/table/table_test.cc index 3b8d59500..78c126272 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1319,7 +1319,7 @@ class FileChecksumTestHelper { uint64_t offset = 0; Status s; s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(), - nullptr, false); + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { return s; } @@ -1327,7 +1327,8 @@ class FileChecksumTestHelper { file_checksum_generator->Update(scratch.get(), result.size()); offset += static_cast(result.size()); s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(), - nullptr, false); + nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { return s; } @@ -5001,13 +5002,16 @@ TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) { IOOptions opts; buffer.TryReadFromCache(opts, nullptr /* reader */, 500 /* offset */, 10 /* n */, nullptr /* result */, - nullptr /* status */); + nullptr /* status */, + Env::IO_TOTAL /* rate_limiter_priority */); buffer.TryReadFromCache(opts, nullptr /* reader */, 480 /* offset */, 10 /* n */, nullptr /* result */, - nullptr /* status */); + nullptr /* status */, + Env::IO_TOTAL /* rate_limiter_priority */); buffer.TryReadFromCache(opts, nullptr /* reader */, 490 /* offset */, 10 /* n */, nullptr /* result */, - nullptr /* status */); + nullptr /* status */, + Env::IO_TOTAL /* rate_limiter_priority */); ASSERT_EQ(480, buffer.min_offset_read()); } diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index c35940398..b4df6a432 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -77,6 +77,7 @@ #include "util/cast_util.h" #include "util/compression.h" #include "util/crc32c.h" +#include "util/file_checksum_helper.h" #include "util/gflags_compat.h" #include "util/mutexlock.h" #include "util/random.h" @@ -1077,6 +1078,14 @@ DEFINE_bool(adaptive_readahead, false, "carry forward internal auto readahead size from one file to next " "file at each level during iteration"); +DEFINE_bool(rate_limit_user_ops, false, + "When true use Env::IO_USER priority level to charge internal rate " + "limiter for reads associated with user operations."); + +DEFINE_bool(file_checksum, false, + "When true use FileChecksumGenCrc32cFactory for " + "file_checksum_gen_factory."); + static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( const char* ctype) { assert(ctype); @@ -3047,6 +3056,8 @@ class Benchmark { read_options_ = ReadOptions(FLAGS_verify_checksum, true); read_options_.total_order_seek = FLAGS_total_order_seek; read_options_.prefix_same_as_start = FLAGS_prefix_same_as_start; + read_options_.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; read_options_.tailing = FLAGS_use_tailing_iterator; read_options_.readahead_size = FLAGS_readahead_size; read_options_.adaptive_readahead = FLAGS_adaptive_readahead; @@ -3313,6 +3324,12 @@ class Benchmark { #endif // ROCKSDB_LITE } else if (name == "getmergeoperands") { method = &Benchmark::GetMergeOperands; +#ifndef ROCKSDB_LITE + } else if (name == "verifychecksum") { + method = &Benchmark::VerifyChecksum; + } else if (name == "verifyfilechecksums") { + method = &Benchmark::VerifyFileChecksums; +#endif // ROCKSDB_LITE } else if (!name.empty()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.c_str()); ErrorExit(); @@ -4267,6 +4284,11 @@ class Benchmark { options.listeners.emplace_back(listener_); + if (FLAGS_file_checksum) { + options.file_checksum_gen_factory.reset( + new FileChecksumGenCrc32cFactory()); + } + if (FLAGS_num_multi_db <= 1) { OpenDb(options, FLAGS_db, &db_); } else { @@ -7207,6 +7229,35 @@ class Benchmark { } #ifndef ROCKSDB_LITE + void VerifyChecksum(ThreadState* thread) { + DB* db = SelectDB(thread); + ReadOptions ro; + ro.adaptive_readahead = FLAGS_adaptive_readahead; + ro.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; + ro.readahead_size = FLAGS_readahead_size; + Status s = db->VerifyChecksum(ro); + if (!s.ok()) { + fprintf(stderr, "VerifyChecksum() failed: %s\n", s.ToString().c_str()); + exit(1); + } + } + + void VerifyFileChecksums(ThreadState* thread) { + DB* db = SelectDB(thread); + ReadOptions ro; + ro.adaptive_readahead = FLAGS_adaptive_readahead; + ro.rate_limiter_priority = + FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL; + ro.readahead_size = FLAGS_readahead_size; + Status s = db->VerifyFileChecksums(ro); + if (!s.ok()) { + fprintf(stderr, "VerifyFileChecksums() failed: %s\n", + s.ToString().c_str()); + exit(1); + } + } + // This benchmark stress tests Transactions. For a given --duration (or // total number of --writes, a Transaction will perform a read-modify-write // to increment the value of a key in each of N(--transaction-sets) sets of diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index 55c655152..6834d0b80 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -1541,15 +1541,16 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number, { StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); + // TODO: rate limit old blob DB file reads. if (reader->use_direct_io()) { s = reader->Read(IOOptions(), record_offset, static_cast(record_size), &blob_record, nullptr, - &aligned_buf); + &aligned_buf, Env::IO_TOTAL /* rate_limiter_priority */); } else { buf.reserve(static_cast(record_size)); s = reader->Read(IOOptions(), record_offset, static_cast(record_size), &blob_record, &buf[0], - nullptr); + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); } RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size()); } diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index d6b8930ff..3ea627553 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -103,8 +103,8 @@ Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) { } buffer_.reset(new char[buffer_size_]); } - Status s = - reader_->Read(IOOptions(), offset, size, result, buffer_.get(), nullptr); + Status s = reader_->Read(IOOptions(), offset, size, result, buffer_.get(), + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { return s; } diff --git a/utilities/blob_db/blob_file.cc b/utilities/blob_db/blob_file.cc index f12c3b9b3..d092d45f8 100644 --- a/utilities/blob_db/blob_file.cc +++ b/utilities/blob_db/blob_file.cc @@ -112,13 +112,16 @@ Status BlobFile::ReadFooter(BlobLogFooter* bf) { std::string buf; AlignedBuf aligned_buf; Status s; + // TODO: rate limit reading footers from blob files. if (ra_file_reader_->use_direct_io()) { s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, - &result, nullptr, &aligned_buf); + &result, nullptr, &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); } else { buf.reserve(BlobLogFooter::kSize + 10); s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, - &result, &buf[0], nullptr); + &result, &buf[0], nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); } if (!s.ok()) return s; if (result.size() != BlobLogFooter::kSize) { @@ -235,13 +238,16 @@ Status BlobFile::ReadMetadata(const std::shared_ptr& fs, std::string header_buf; AlignedBuf aligned_buf; Slice header_slice; + // TODO: rate limit reading headers from blob files. if (file_reader->use_direct_io()) { s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, - nullptr, &aligned_buf); + nullptr, &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); } else { header_buf.reserve(BlobLogHeader::kSize); s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, - &header_buf[0], nullptr); + &header_buf[0], nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); } if (!s.ok()) { ROCKS_LOG_ERROR(info_log_, @@ -275,15 +281,17 @@ Status BlobFile::ReadMetadata(const std::shared_ptr& fs, } std::string footer_buf; Slice footer_slice; + // TODO: rate limit reading footers from blob files. if (file_reader->use_direct_io()) { s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, &footer_slice, nullptr, - &aligned_buf); + &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); } else { footer_buf.reserve(BlobLogFooter::kSize); s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, &footer_slice, &footer_buf[0], - nullptr); + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); } if (!s.ok()) { ROCKS_LOG_ERROR(info_log_, diff --git a/utilities/cache_dump_load_impl.h b/utilities/cache_dump_load_impl.h index 28dba671d..0e9ea5286 100644 --- a/utilities/cache_dump_load_impl.h +++ b/utilities/cache_dump_load_impl.h @@ -258,7 +258,8 @@ class FromFileCacheDumpReader : public CacheDumpReader { while (to_read > 0) { io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, - buffer_, nullptr); + buffer_, nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); if (!io_s.ok()) { return io_s; } diff --git a/utilities/persistent_cache/block_cache_tier_file.cc b/utilities/persistent_cache/block_cache_tier_file.cc index 149275fb2..dbddc8232 100644 --- a/utilities/persistent_cache/block_cache_tier_file.cc +++ b/utilities/persistent_cache/block_cache_tier_file.cc @@ -238,7 +238,7 @@ bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val, Slice result; Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch, - nullptr); + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { Error(log_, "Error reading from file %s. %s", Path().c_str(), s.ToString().c_str()); diff --git a/utilities/trace/file_trace_reader_writer.cc b/utilities/trace/file_trace_reader_writer.cc index f2ca74144..5886d3539 100644 --- a/utilities/trace/file_trace_reader_writer.cc +++ b/utilities/trace/file_trace_reader_writer.cc @@ -42,7 +42,8 @@ Status FileTraceReader::Reset() { Status FileTraceReader::Read(std::string* data) { assert(file_reader_ != nullptr); Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize, - &result_, buffer_, nullptr); + &result_, buffer_, nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { return s; } @@ -67,7 +68,7 @@ Status FileTraceReader::Read(std::string* data) { bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; while (to_read > 0) { s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, buffer_, - nullptr); + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); if (!s.ok()) { return s; }