Add rate limiter priority to ReadOptions (#9424)

Summary:
Users can set the priority for file reads associated with their operation by setting `ReadOptions::rate_limiter_priority` to something other than `Env::IO_TOTAL`. Rate limiting `VerifyChecksum()` and `VerifyFileChecksums()` is the motivation for this PR, so it also includes benchmarks and minor bug fixes to get that working.

`RandomAccessFileReader::Read()` already had support for rate limiting compaction reads. I changed that rate limiting to be non-specific to compaction, but rather performed according to the passed in `Env::IOPriority`. Now the compaction read rate limiting is supported by setting `rate_limiter_priority = Env::IO_LOW` on its `ReadOptions`.

There is no default value for the new `Env::IOPriority` parameter to `RandomAccessFileReader::Read()`. That means this PR goes through all callers (in some cases multiple layers up the call stack) to find a `ReadOptions` to provide the priority. There are TODOs for cases I believe it would be good to let user control the priority some day (e.g., file footer reads), and no TODO in cases I believe it doesn't matter (e.g., trace file reads).

The API doc only lists the missing cases where a file read associated with a provided `ReadOptions` cannot be rate limited. For cases like file ingestion checksum calculation, there is no API to provide `ReadOptions` or `Env::IOPriority`, so I didn't count that as missing.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9424

Test Plan:
- new unit tests
- new benchmarks on ~50MB database with 1MB/s read rate limit and 100ms refill interval; verified with strace reads are chunked (at 0.1MB per chunk) and spaced roughly 100ms apart.
  - setup command: `./db_bench -benchmarks=fillrandom,compact -db=/tmp/testdb -target_file_size_base=1048576 -disable_auto_compactions=true -file_checksum=true`
  - benchmarks command: `strace -ttfe pread64 ./db_bench -benchmarks=verifychecksum,verifyfilechecksums -use_existing_db=true -db=/tmp/testdb -rate_limiter_bytes_per_sec=1048576 -rate_limit_bg_reads=1 -rate_limit_user_ops=true -file_checksum=true`
- crash test using IO_USER priority on non-validation reads with https://github.com/facebook/rocksdb/issues/9567 reverted: `python3 tools/db_crashtest.py blackbox --max_key=1000000 --write_buffer_size=524288 --target_file_size_base=524288 --level_compaction_dynamic_level_bytes=true --duration=3600 --rate_limit_bg_reads=true --rate_limit_user_ops=true --rate_limiter_bytes_per_sec=10485760 --interval=10`

Reviewed By: hx235

Differential Revision: D33747386

Pulled By: ajkr

fbshipit-source-id: a2d985e97912fba8c54763798e04f006ccc56e0c
main
Andrew Kryczka 3 years ago committed by Facebook GitHub Bot
parent 1cda273dc3
commit babe56ddba
  1. 1
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 3
      Makefile
  4. 7
      TARGETS
  5. 30
      db/blob/blob_file_reader.cc
  6. 3
      db/blob/blob_file_reader.h
  7. 6
      db/blob/blob_log_sequential_reader.cc
  8. 1
      db/compaction/compaction_job.cc
  9. 5
      db/convenience.cc
  10. 3
      db/db_impl/db_impl.cc
  11. 262
      db/db_rate_limiter_test.cc
  12. 122
      db/db_test2.cc
  13. 9
      db/external_sst_file_ingestion_job.cc
  14. 4
      db_stress_tool/cf_consistency_stress.cc
  15. 1
      db_stress_tool/db_stress_common.h
  16. 4
      db_stress_tool/db_stress_gflags.cc
  17. 14
      db_stress_tool/db_stress_test_base.cc
  18. 12
      db_stress_tool/multi_ops_txns_stress.cc
  19. 2
      db_stress_tool/no_batched_ops_stress.cc
  20. 13
      file/file_prefetch_buffer.cc
  21. 29
      file/file_prefetch_buffer.h
  22. 6
      file/file_util.cc
  23. 16
      file/file_util.h
  24. 28
      file/random_access_file_reader.cc
  25. 13
      file/random_access_file_reader.h
  26. 20
      file/random_access_file_reader_test.cc
  27. 35
      include/rocksdb/options.h
  28. 1
      src.mk
  29. 10
      table/block_based/block_based_table_reader.cc
  30. 3
      table/block_based/partitioned_filter_block.cc
  31. 3
      table/block_based/partitioned_index_reader.cc
  32. 18
      table/block_fetcher.cc
  33. 3
      table/cuckoo/cuckoo_table_builder_test.cc
  34. 6
      table/cuckoo/cuckoo_table_reader.cc
  35. 13
      table/format.cc
  36. 3
      table/mock_table.cc
  37. 4
      table/plain/plain_table_key_coding.cc
  38. 6
      table/plain/plain_table_reader.cc
  39. 3
      table/sst_file_dumper.cc
  40. 14
      table/table_test.cc
  41. 51
      tools/db_bench_tool.cc
  42. 5
      utilities/blob_db/blob_db_impl.cc
  43. 4
      utilities/blob_db/blob_dump_tool.cc
  44. 20
      utilities/blob_db/blob_file.cc
  45. 3
      utilities/cache_dump_load_impl.h
  46. 2
      utilities/persistent_cache/block_cache_tier_file.cc
  47. 5
      utilities/trace/file_trace_reader_writer.cc

@ -1196,6 +1196,7 @@ if(WITH_TESTS)
db/db_options_test.cc db/db_options_test.cc
db/db_properties_test.cc db/db_properties_test.cc
db/db_range_del_test.cc db/db_range_del_test.cc
db/db_rate_limiter_test.cc
db/db_secondary_test.cc db/db_secondary_test.cc
db/db_sst_test.cc db/db_sst_test.cc
db/db_statistics_test.cc db/db_statistics_test.cc

@ -13,6 +13,7 @@
### Public API changes ### Public API changes
* Require C++17 compatible compiler (GCC >= 7, Clang >= 5, Visual Studio >= 2017). See #9388. * Require C++17 compatible compiler (GCC >= 7, Clang >= 5, Visual Studio >= 2017). See #9388.
* Added `ReadOptions::rate_limiter_priority`. When set to something other than `Env::IO_TOTAL`, the internal rate limiter (`DBOptions::rate_limiter`) will be charged at the specified priority for file reads associated with the API to which the `ReadOptions` was provided.
* Remove HDFS support from main repo. * Remove HDFS support from main repo.
* Remove librados support from main repo. * Remove librados support from main repo.
* Remove obsolete backupable_db.h and type alias `BackupableDBOptions`. Use backup_engine.h and `BackupEngineOptions`. Similar renamings are in the C and Java APIs. * Remove obsolete backupable_db.h and type alias `BackupableDBOptions`. Use backup_engine.h and `BackupEngineOptions`. Similar renamings are in the C and Java APIs.

@ -1533,6 +1533,9 @@ db_options_test: $(OBJ_DIR)/db/db_options_test.o $(TEST_LIBRARY) $(LIBRARY)
db_range_del_test: $(OBJ_DIR)/db/db_range_del_test.o $(TEST_LIBRARY) $(LIBRARY) db_range_del_test: $(OBJ_DIR)/db/db_range_del_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
db_rate_limiter_test: $(OBJ_DIR)/db/db_rate_limiter_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
db_sst_test: $(OBJ_DIR)/db/db_sst_test.o $(TEST_LIBRARY) $(LIBRARY) db_sst_test: $(OBJ_DIR)/db/db_sst_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)

@ -1441,6 +1441,13 @@ ROCKS_TESTS = [
[], [],
[], [],
], ],
[
"db_rate_limiter_test",
"db/db_rate_limiter_test.cc",
"parallel",
[],
[],
],
[ [
"db_secondary_test", "db_secondary_test",
"db/db_secondary_test.cc", "db/db_secondary_test.cc",

@ -148,9 +148,10 @@ Status BlobFileReader::ReadHeader(const RandomAccessFileReader* file_reader,
constexpr uint64_t read_offset = 0; constexpr uint64_t read_offset = 0;
constexpr size_t read_size = BlobLogHeader::kSize; constexpr size_t read_size = BlobLogHeader::kSize;
const Status s = // TODO: rate limit reading headers from blob files.
ReadFromFile(file_reader, read_offset, read_size, statistics, const Status s = ReadFromFile(file_reader, read_offset, read_size,
&header_slice, &buf, &aligned_buf); statistics, &header_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -198,9 +199,10 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
const uint64_t read_offset = file_size - BlobLogFooter::kSize; const uint64_t read_offset = file_size - BlobLogFooter::kSize;
constexpr size_t read_size = BlobLogFooter::kSize; constexpr size_t read_size = BlobLogFooter::kSize;
const Status s = // TODO: rate limit reading footers from blob files.
ReadFromFile(file_reader, read_offset, read_size, statistics, const Status s = ReadFromFile(file_reader, read_offset, read_size,
&footer_slice, &buf, &aligned_buf); statistics, &footer_slice, &buf, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -230,7 +232,8 @@ Status BlobFileReader::ReadFooter(const RandomAccessFileReader* file_reader,
Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader, Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
uint64_t read_offset, size_t read_size, uint64_t read_offset, size_t read_size,
Statistics* statistics, Slice* slice, Statistics* statistics, Slice* slice,
Buffer* buf, AlignedBuf* aligned_buf) { Buffer* buf, AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority) {
assert(slice); assert(slice);
assert(buf); assert(buf);
assert(aligned_buf); assert(aligned_buf);
@ -245,13 +248,13 @@ Status BlobFileReader::ReadFromFile(const RandomAccessFileReader* file_reader,
constexpr char* scratch = nullptr; constexpr char* scratch = nullptr;
s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch, s = file_reader->Read(IOOptions(), read_offset, read_size, slice, scratch,
aligned_buf); aligned_buf, rate_limiter_priority);
} else { } else {
buf->reset(new char[read_size]); buf->reset(new char[read_size]);
constexpr AlignedBuf* aligned_scratch = nullptr; constexpr AlignedBuf* aligned_scratch = nullptr;
s = file_reader->Read(IOOptions(), read_offset, read_size, slice, s = file_reader->Read(IOOptions(), read_offset, read_size, slice,
buf->get(), aligned_scratch); buf->get(), aligned_scratch, rate_limiter_priority);
} }
if (!s.ok()) { if (!s.ok()) {
@ -323,7 +326,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
prefetched = prefetch_buffer->TryReadFromCache( prefetched = prefetch_buffer->TryReadFromCache(
IOOptions(), file_reader_.get(), record_offset, IOOptions(), file_reader_.get(), record_offset,
static_cast<size_t>(record_size), &record_slice, &s, for_compaction); static_cast<size_t>(record_size), &record_slice, &s,
read_options.rate_limiter_priority, for_compaction);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -334,7 +338,8 @@ Status BlobFileReader::GetBlob(const ReadOptions& read_options,
const Status s = ReadFromFile(file_reader_.get(), record_offset, const Status s = ReadFromFile(file_reader_.get(), record_offset,
static_cast<size_t>(record_size), statistics_, static_cast<size_t>(record_size), statistics_,
&record_slice, &buf, &aligned_buf); &record_slice, &buf, &aligned_buf,
read_options.rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -424,7 +429,8 @@ void BlobFileReader::MultiGetBlob(
} }
TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile"); TEST_SYNC_POINT("BlobFileReader::MultiGetBlob:ReadFromFile");
s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(), s = file_reader_->MultiRead(IOOptions(), read_reqs.data(), read_reqs.size(),
direct_io ? &aligned_buf : nullptr); direct_io ? &aligned_buf : nullptr,
read_options.rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
for (auto& req : read_reqs) { for (auto& req : read_reqs) {
req.status.PermitUncheckedError(); req.status.PermitUncheckedError();

@ -83,7 +83,8 @@ class BlobFileReader {
static Status ReadFromFile(const RandomAccessFileReader* file_reader, static Status ReadFromFile(const RandomAccessFileReader* file_reader,
uint64_t read_offset, size_t read_size, uint64_t read_offset, size_t read_size,
Statistics* statistics, Slice* slice, Buffer* buf, Statistics* statistics, Slice* slice, Buffer* buf,
AlignedBuf* aligned_buf); AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority);
static Status VerifyBlob(const Slice& record_slice, const Slice& user_key, static Status VerifyBlob(const Slice& record_slice, const Slice& user_key,
uint64_t value_size); uint64_t value_size);

@ -28,8 +28,10 @@ Status BlobLogSequentialReader::ReadSlice(uint64_t size, Slice* slice,
assert(file_); assert(file_);
StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
Status s = file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size), // TODO: rate limit `BlobLogSequentialReader` reads (it appears unused?)
slice, buf, nullptr); Status s =
file_->Read(IOOptions(), next_byte_, static_cast<size_t>(size), slice,
buf, nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
next_byte_ += size; next_byte_ += size;
if (!s.ok()) { if (!s.ok()) {
return s; return s;

@ -1264,6 +1264,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
ReadOptions read_options; ReadOptions read_options;
read_options.verify_checksums = true; read_options.verify_checksums = true;
read_options.fill_cache = false; read_options.fill_cache = false;
read_options.rate_limiter_priority = Env::IO_LOW;
// Compaction iterators shouldn't be confined to a single prefix. // Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for // Compactions use Seek() for
// (a) concurrent compactions, // (a) concurrent compactions,

@ -56,7 +56,10 @@ Status VerifySstFileChecksum(const Options& options,
} }
std::unique_ptr<TableReader> table_reader; std::unique_ptr<TableReader> table_reader;
std::unique_ptr<RandomAccessFileReader> file_reader( std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(std::move(file), file_path)); new RandomAccessFileReader(
std::move(file), file_path, ioptions.clock, nullptr /* io_tracer */,
nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
ioptions.rate_limiter.get()));
const bool kImmortal = true; const bool kImmortal = true;
s = ioptions.table_factory->NewTableReader( s = ioptions.table_factory->NewTableReader(
TableReaderOptions(ioptions, options.prefix_extractor, env_options, TableReaderOptions(ioptions, options.prefix_extractor, env_options,

@ -5175,7 +5175,8 @@ Status DBImpl::VerifyFullFileChecksum(const std::string& file_checksum_expected,
fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(), fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
func_name_expected, &file_checksum, &func_name, func_name_expected, &file_checksum, &func_name,
read_options.readahead_size, immutable_db_options_.allow_mmap_reads, read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
io_tracer_, immutable_db_options_.rate_limiter.get()); io_tracer_, immutable_db_options_.rate_limiter.get(),
read_options.rate_limiter_priority);
if (s.ok()) { if (s.ok()) {
assert(func_name_expected == func_name); assert(func_name_expected == func_name);
if (file_checksum != file_checksum_expected) { if (file_checksum != file_checksum_expected) {

@ -0,0 +1,262 @@
// Copyright (c) 2022-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "util/file_checksum_helper.h"
namespace ROCKSDB_NAMESPACE {
class DBRateLimiterTest
: public DBTestBase,
public ::testing::WithParamInterface<std::tuple<bool, bool, bool>> {
public:
DBRateLimiterTest()
: DBTestBase("db_rate_limiter_test", /*env_do_fsync=*/false),
use_direct_io_(std::get<0>(GetParam())),
use_block_cache_(std::get<1>(GetParam())),
use_readahead_(std::get<2>(GetParam())) {}
void Init() {
options_ = GetOptions();
Reopen(options_);
for (int i = 0; i < kNumFiles; ++i) {
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(i * kNumKeysPerFile + j), "val"));
}
ASSERT_OK(Flush());
}
MoveFilesToLevel(1);
}
BlockBasedTableOptions GetTableOptions() {
BlockBasedTableOptions table_options;
table_options.no_block_cache = !use_block_cache_;
return table_options;
}
ReadOptions GetReadOptions() {
ReadOptions read_options;
read_options.rate_limiter_priority = Env::IO_USER;
read_options.readahead_size = use_readahead_ ? kReadaheadBytes : 0;
return read_options;
}
Options GetOptions() {
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.file_checksum_gen_factory.reset(new FileChecksumGenCrc32cFactory());
options.rate_limiter.reset(NewGenericRateLimiter(
1 << 20 /* rate_bytes_per_sec */, 100 * 1000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kAllIo));
options.table_factory.reset(NewBlockBasedTableFactory(GetTableOptions()));
options.use_direct_reads = use_direct_io_;
return options;
}
protected:
const static int kNumKeysPerFile = 1;
const static int kNumFiles = 3;
const static int kReadaheadBytes = 32 << 10; // 32KB
Options options_;
const bool use_direct_io_;
const bool use_block_cache_;
const bool use_readahead_;
};
std::string GetTestNameSuffix(
::testing::TestParamInfo<std::tuple<bool, bool, bool>> info) {
std::ostringstream oss;
if (std::get<0>(info.param)) {
oss << "DirectIO";
} else {
oss << "BufferedIO";
}
if (std::get<1>(info.param)) {
oss << "_BlockCache";
} else {
oss << "_NoBlockCache";
}
if (std::get<2>(info.param)) {
oss << "_Readahead";
} else {
oss << "_NoReadahead";
}
return oss.str();
}
#ifndef ROCKSDB_LITE
INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest,
::testing::Combine(::testing::Bool(), ::testing::Bool(),
::testing::Bool()),
GetTestNameSuffix);
#else // ROCKSDB_LITE
// Cannot use direct I/O in lite mode.
INSTANTIATE_TEST_CASE_P(DBRateLimiterTest, DBRateLimiterTest,
::testing::Combine(::testing::Values(false),
::testing::Bool(),
::testing::Bool()),
GetTestNameSuffix);
#endif // ROCKSDB_LITE
TEST_P(DBRateLimiterTest, Get) {
if (use_direct_io_ && !IsDirectIOSupported()) {
return;
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
int expected = 0;
for (int i = 0; i < kNumFiles; ++i) {
{
std::string value;
ASSERT_OK(db_->Get(GetReadOptions(), Key(i * kNumKeysPerFile), &value));
++expected;
}
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
{
std::string value;
ASSERT_OK(db_->Get(GetReadOptions(), Key(i * kNumKeysPerFile), &value));
if (!use_block_cache_) {
++expected;
}
}
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
}
TEST_P(DBRateLimiterTest, NewMultiGet) {
// The new void-returning `MultiGet()` APIs use `MultiRead()`, which does not
// yet support rate limiting.
if (use_direct_io_ && !IsDirectIOSupported()) {
return;
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
const int kNumKeys = kNumFiles * kNumKeysPerFile;
{
std::vector<std::string> key_bufs;
key_bufs.reserve(kNumKeys);
std::vector<Slice> keys;
keys.reserve(kNumKeys);
for (int i = 0; i < kNumKeys; ++i) {
key_bufs.emplace_back(Key(i));
keys.emplace_back(key_bufs[i]);
}
std::vector<Status> statuses(kNumKeys);
std::vector<PinnableSlice> values(kNumKeys);
db_->MultiGet(GetReadOptions(), dbfull()->DefaultColumnFamily(), kNumKeys,
keys.data(), values.data(), statuses.data());
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_TRUE(statuses[i].IsNotSupported());
}
}
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
TEST_P(DBRateLimiterTest, OldMultiGet) {
// The old `vector<Status>`-returning `MultiGet()` APIs use `Read()`, which
// supports rate limiting.
if (use_direct_io_ && !IsDirectIOSupported()) {
return;
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
const int kNumKeys = kNumFiles * kNumKeysPerFile;
int expected = 0;
{
std::vector<std::string> key_bufs;
key_bufs.reserve(kNumKeys);
std::vector<Slice> keys;
keys.reserve(kNumKeys);
for (int i = 0; i < kNumKeys; ++i) {
key_bufs.emplace_back(Key(i));
keys.emplace_back(key_bufs[i]);
}
std::vector<std::string> values;
std::vector<Status> statuses =
db_->MultiGet(GetReadOptions(), keys, &values);
for (int i = 0; i < kNumKeys; ++i) {
ASSERT_OK(statuses[i]);
}
}
expected += kNumKeys;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
TEST_P(DBRateLimiterTest, Iterator) {
if (use_direct_io_ && !IsDirectIOSupported()) {
return;
}
Init();
std::unique_ptr<Iterator> iter(db_->NewIterator(GetReadOptions()));
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
int expected = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
++expected;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
// When `use_block_cache_ == true`, the reverse scan will access the blocks
// loaded to cache during the above forward scan, in which case no further
// file reads are expected.
if (!use_block_cache_) {
++expected;
}
}
// Reverse scan does not read evenly (one block per iteration) due to
// descending seqno ordering, so wait until after the loop to check total.
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
#if !defined(ROCKSDB_LITE)
TEST_P(DBRateLimiterTest, VerifyChecksum) {
if (use_direct_io_ && !IsDirectIOSupported()) {
return;
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
ASSERT_OK(db_->VerifyChecksum(GetReadOptions()));
// The files are tiny so there should have just been one read per file.
int expected = kNumFiles;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
TEST_P(DBRateLimiterTest, VerifyFileChecksums) {
if (use_direct_io_ && !IsDirectIOSupported()) {
return;
}
Init();
ASSERT_EQ(0, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
ASSERT_OK(db_->VerifyFileChecksums(GetReadOptions()));
// The files are tiny so there should have just been one read per file.
int expected = kNumFiles;
ASSERT_EQ(expected, options_.rate_limiter->GetTotalRequests(Env::IO_USER));
}
#endif // !defined(ROCKSDB_LITE)
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -3937,68 +3937,74 @@ TEST_F(DBTest2, RateLimitedCompactionReads) {
const int kBytesPerKey = 1024; const int kBytesPerKey = 1024;
const int kNumL0Files = 4; const int kNumL0Files = 4;
for (auto use_direct_io : {false, true}) { for (int compaction_readahead_size : {0, 32 << 10}) {
if (use_direct_io && !IsDirectIOSupported()) { for (auto use_direct_io : {false, true}) {
continue; if (use_direct_io && !IsDirectIOSupported()) {
} continue;
Options options = CurrentOptions(); }
options.compression = kNoCompression; Options options = CurrentOptions();
options.level0_file_num_compaction_trigger = kNumL0Files; options.compaction_readahead_size = compaction_readahead_size;
options.memtable_factory.reset( options.compression = kNoCompression;
test::NewSpecialSkipListFactory(kNumKeysPerFile)); options.level0_file_num_compaction_trigger = kNumL0Files;
// takes roughly one second, split into 100 x 10ms intervals. Each interval options.memtable_factory.reset(
// permits 5.12KB, which is smaller than the block size, so this test test::NewSpecialSkipListFactory(kNumKeysPerFile));
// exercises the code for chunking reads. // takes roughly one second, split into 100 x 10ms intervals. Each
options.rate_limiter.reset(NewGenericRateLimiter( // interval permits 5.12KB, which is smaller than the block size, so this
static_cast<int64_t>(kNumL0Files * kNumKeysPerFile * // test exercises the code for chunking reads.
kBytesPerKey) /* rate_bytes_per_sec */, options.rate_limiter.reset(NewGenericRateLimiter(
10 * 1000 /* refill_period_us */, 10 /* fairness */, static_cast<int64_t>(kNumL0Files * kNumKeysPerFile *
RateLimiter::Mode::kReadsOnly)); kBytesPerKey) /* rate_bytes_per_sec */,
options.use_direct_reads = options.use_direct_io_for_flush_and_compaction = 10 * 1000 /* refill_period_us */, 10 /* fairness */,
use_direct_io; RateLimiter::Mode::kReadsOnly));
BlockBasedTableOptions bbto; options.use_direct_reads =
bbto.block_size = 16384; options.use_direct_io_for_flush_and_compaction = use_direct_io;
bbto.no_block_cache = true; BlockBasedTableOptions bbto;
options.table_factory.reset(NewBlockBasedTableFactory(bbto)); bbto.block_size = 16384;
DestroyAndReopen(options); bbto.no_block_cache = true;
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
DestroyAndReopen(options);
for (int i = 0; i < kNumL0Files; ++i) { for (int i = 0; i < kNumL0Files; ++i) {
for (int j = 0; j <= kNumKeysPerFile; ++j) { for (int j = 0; j <= kNumKeysPerFile; ++j) {
ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey))); ASSERT_OK(Put(Key(j), DummyString(kBytesPerKey)));
}
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
if (i + 1 < kNumL0Files) {
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
} }
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
if (i + 1 < kNumL0Files) { ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
// should be slightly above 512KB due to non-data blocks read. Arbitrarily
// chose 1MB as the upper bound on the total bytes read.
size_t rate_limited_bytes =
options.rate_limiter->GetTotalBytesThrough(Env::IO_TOTAL);
// There must be no charges at non-`IO_LOW` priorities.
ASSERT_EQ(rate_limited_bytes,
static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
// Include the explicit prefetch of the footer in direct I/O case.
size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
ASSERT_GE(
rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
ASSERT_LT(
rate_limited_bytes,
static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
direct_io_extra));
Iterator* iter = db_->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
} }
delete iter;
// bytes read for user iterator shouldn't count against the rate limit.
ASSERT_EQ(rate_limited_bytes,
static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
} }
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(0, options.rate_limiter->GetTotalBytesThrough(Env::IO_HIGH));
// should be slightly above 512KB due to non-data blocks read. Arbitrarily
// chose 1MB as the upper bound on the total bytes read.
size_t rate_limited_bytes =
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW);
// Include the explicit prefetch of the footer in direct I/O case.
size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0;
ASSERT_GE(
rate_limited_bytes,
static_cast<size_t>(kNumKeysPerFile * kBytesPerKey * kNumL0Files));
ASSERT_LT(
rate_limited_bytes,
static_cast<size_t>(2 * kNumKeysPerFile * kBytesPerKey * kNumL0Files +
direct_io_extra));
Iterator* iter = db_->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_EQ(iter->value().ToString(), DummyString(kBytesPerKey));
}
delete iter;
// bytes read for user iterator shouldn't count against the rate limit.
ASSERT_EQ(rate_limited_bytes,
static_cast<size_t>(
options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW)));
} }
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -212,6 +212,8 @@ Status ExternalSstFileIngestionJob::Prepare(
std::string generated_checksum; std::string generated_checksum;
std::string generated_checksum_func_name; std::string generated_checksum_func_name;
std::string requested_checksum_func_name; std::string requested_checksum_func_name;
// TODO: rate limit file reads for checksum calculation during file
// ingestion.
IOStatus io_s = GenerateOneFileChecksum( IOStatus io_s = GenerateOneFileChecksum(
fs_.get(), files_to_ingest_[i].internal_file_path, fs_.get(), files_to_ingest_[i].internal_file_path,
db_options_.file_checksum_gen_factory.get(), db_options_.file_checksum_gen_factory.get(),
@ -219,7 +221,8 @@ Status ExternalSstFileIngestionJob::Prepare(
&generated_checksum_func_name, &generated_checksum_func_name,
ingestion_options_.verify_checksums_readahead_size, ingestion_options_.verify_checksums_readahead_size,
db_options_.allow_mmap_reads, io_tracer_, db_options_.allow_mmap_reads, io_tracer_,
db_options_.rate_limiter.get()); db_options_.rate_limiter.get(),
Env::IO_TOTAL /* rate_limiter_priority */);
if (!io_s.ok()) { if (!io_s.ok()) {
status = io_s; status = io_s;
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
@ -907,12 +910,14 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
std::string file_checksum; std::string file_checksum;
std::string file_checksum_func_name; std::string file_checksum_func_name;
std::string requested_checksum_func_name; std::string requested_checksum_func_name;
// TODO: rate limit file reads for checksum calculation during file ingestion.
IOStatus io_s = GenerateOneFileChecksum( IOStatus io_s = GenerateOneFileChecksum(
fs_.get(), file_to_ingest->internal_file_path, fs_.get(), file_to_ingest->internal_file_path,
db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name, db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name,
&file_checksum, &file_checksum_func_name, &file_checksum, &file_checksum_func_name,
ingestion_options_.verify_checksums_readahead_size, ingestion_options_.verify_checksums_readahead_size,
db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get()); db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get(),
Env::IO_TOTAL /* rate_limiter_priority */);
if (!io_s.ok()) { if (!io_s.ok()) {
return io_s; return io_s;
} }

@ -286,6 +286,8 @@ class CfConsistencyStressTest : public StressTest {
} }
void VerifyDb(ThreadState* thread) const override { void VerifyDb(ThreadState* thread) const override {
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
// We must set total_order_seek to true because we are doing a SeekToFirst // We must set total_order_seek to true because we are doing a SeekToFirst
// on a column family whose memtables may support (by default) prefix-based // on a column family whose memtables may support (by default) prefix-based
@ -472,6 +474,8 @@ class CfConsistencyStressTest : public StressTest {
*checksum = ret; *checksum = ret;
return iter->status(); return iter->status();
}; };
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts; ReadOptions ropts;
ropts.total_order_seek = true; ropts.total_order_seek = true;
ropts.snapshot = snapshot_guard.get(); ropts.snapshot = snapshot_guard.get();

@ -175,6 +175,7 @@ DECLARE_double(max_bytes_for_level_multiplier);
DECLARE_int32(range_deletion_width); DECLARE_int32(range_deletion_width);
DECLARE_uint64(rate_limiter_bytes_per_sec); DECLARE_uint64(rate_limiter_bytes_per_sec);
DECLARE_bool(rate_limit_bg_reads); DECLARE_bool(rate_limit_bg_reads);
DECLARE_bool(rate_limit_user_ops);
DECLARE_uint64(sst_file_manager_bytes_per_sec); DECLARE_uint64(sst_file_manager_bytes_per_sec);
DECLARE_uint64(sst_file_manager_bytes_per_truncate); DECLARE_uint64(sst_file_manager_bytes_per_truncate);
DECLARE_bool(use_txn); DECLARE_bool(use_txn);

@ -546,6 +546,10 @@ DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value.");
DEFINE_bool(rate_limit_bg_reads, false, DEFINE_bool(rate_limit_bg_reads, false,
"Use options.rate_limiter on compaction reads"); "Use options.rate_limiter on compaction reads");
DEFINE_bool(rate_limit_user_ops, false,
"When true use Env::IO_USER priority level to charge internal rate "
"limiter for reads associated with user operations.");
DEFINE_uint64(sst_file_manager_bytes_per_sec, 0, DEFINE_uint64(sst_file_manager_bytes_per_sec, 0,
"Set `Options::sst_file_manager` to delete at this rate. By " "Set `Options::sst_file_manager` to delete at this rate. By "
"default the deletion rate is unbounded."); "default the deletion rate is unbounded.");

@ -349,6 +349,8 @@ bool StressTest::VerifySecondaries() {
fprintf(stderr, "Secondary failed to catch up with primary\n"); fprintf(stderr, "Secondary failed to catch up with primary\n");
return false; return false;
} }
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts; ReadOptions ropts;
ropts.total_order_seek = true; ropts.total_order_seek = true;
// Verify only the default column family since the primary may have // Verify only the default column family since the primary may have
@ -397,6 +399,8 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
if (cf->GetName() != snap_state.cf_at_name) { if (cf->GetName() != snap_state.cf_at_name) {
return s; return s;
} }
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropt; ReadOptions ropt;
ropt.snapshot = snap_state.snapshot; ropt.snapshot = snap_state.snapshot;
Slice ts; Slice ts;
@ -633,6 +637,8 @@ Status StressTest::RollbackTxn(Transaction* txn) {
void StressTest::OperateDb(ThreadState* thread) { void StressTest::OperateDb(ThreadState* thread) {
ReadOptions read_opts(FLAGS_verify_checksum, true); ReadOptions read_opts(FLAGS_verify_checksum, true);
read_opts.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
WriteOptions write_opts; WriteOptions write_opts;
auto shared = thread->shared; auto shared = thread->shared;
char value[100]; char value[100];
@ -1133,6 +1139,8 @@ Status StressTest::TestIterate(ThreadState* thread,
// to bounds, prefix extractor or reseeking. Sometimes we are comparing // to bounds, prefix extractor or reseeking. Sometimes we are comparing
// iterators with the same set-up, and it doesn't hurt to check them // iterators with the same set-up, and it doesn't hurt to check them
// to be equal. // to be equal.
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions cmp_ro; ReadOptions cmp_ro;
cmp_ro.timestamp = readoptionscopy.timestamp; cmp_ro.timestamp = readoptionscopy.timestamp;
cmp_ro.snapshot = snapshot; cmp_ro.snapshot = snapshot;
@ -1573,6 +1581,8 @@ Status StressTest::TestBackupRestore(
std::string key_str = Key(rand_keys[0]); std::string key_str = Key(rand_keys[0]);
Slice key = key_str; Slice key = key_str;
std::string restored_value; std::string restored_value;
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions read_opts; ReadOptions read_opts;
std::string ts_str; std::string ts_str;
Slice ts; Slice ts;
@ -1943,6 +1953,8 @@ void StressTest::TestAcquireSnapshot(ThreadState* thread,
const std::string& keystr, uint64_t i) { const std::string& keystr, uint64_t i) {
Slice key = keystr; Slice key = keystr;
ColumnFamilyHandle* column_family = column_families_[rand_column_family]; ColumnFamilyHandle* column_family = column_families_[rand_column_family];
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropt; ReadOptions ropt;
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB()); auto db_impl = static_cast_with_check<DBImpl>(db_->GetRootDB());
@ -2096,6 +2108,8 @@ uint32_t StressTest::GetRangeHash(ThreadState* thread, const Snapshot* snapshot,
const Slice& end_key) { const Slice& end_key) {
const std::string kCrcCalculatorSepearator = ";"; const std::string kCrcCalculatorSepearator = ";";
uint32_t crc = 0; uint32_t crc = 0;
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ro; ReadOptions ro;
ro.snapshot = snapshot; ro.snapshot = snapshot;
ro.total_order_seek = true; ro.total_order_seek = true;

@ -477,6 +477,8 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread,
}); });
ReadOptions ropts; ReadOptions ropts;
ropts.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
std::string value; std::string value;
s = txn->GetForUpdate(ropts, old_pk, &value); s = txn->GetForUpdate(ropts, old_pk, &value);
if (!s.ok()) { if (!s.ok()) {
@ -596,6 +598,8 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
} }
ropts.total_order_seek = true; ropts.total_order_seek = true;
ropts.iterate_upper_bound = &iter_ub; ropts.iterate_upper_bound = &iter_ub;
ropts.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
it = txn->GetIterator(ropts); it = txn->GetIterator(ropts);
assert(it); assert(it);
@ -620,6 +624,8 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread,
std::string pk = Record::EncodePrimaryKey(record.a_value()); std::string pk = Record::EncodePrimaryKey(record.a_value());
std::string value; std::string value;
ReadOptions read_opts; ReadOptions read_opts;
read_opts.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
read_opts.snapshot = txn->GetSnapshot(); read_opts.snapshot = txn->GetSnapshot();
s = txn->GetForUpdate(read_opts, pk, &value); s = txn->GetForUpdate(read_opts, pk, &value);
if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() ||
@ -722,6 +728,8 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread,
RollbackTxn(txn).PermitUncheckedError(); RollbackTxn(txn).PermitUncheckedError();
}); });
ReadOptions ropts; ReadOptions ropts;
ropts.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
std::string value; std::string value;
s = txn->GetForUpdate(ropts, pk_str, &value); s = txn->GetForUpdate(ropts, pk_str, &value);
if (!s.ok()) { if (!s.ok()) {
@ -851,6 +859,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
std::string iter_ub_str(buf, sizeof(buf)); std::string iter_ub_str(buf, sizeof(buf));
Slice iter_ub = iter_ub_str; Slice iter_ub = iter_ub_str;
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts; ReadOptions ropts;
ropts.snapshot = snapshot; ropts.snapshot = snapshot;
ropts.total_order_seek = true; ropts.total_order_seek = true;
@ -870,6 +880,8 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const {
std::reverse(buf, buf + sizeof(buf)); std::reverse(buf, buf + sizeof(buf));
const std::string start_key(buf, sizeof(buf)); const std::string start_key(buf, sizeof(buf));
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts; ReadOptions ropts;
ropts.snapshot = snapshot; ropts.snapshot = snapshot;
ropts.total_order_seek = true; ropts.total_order_seek = true;

@ -21,6 +21,8 @@ class NonBatchedOpsStressTest : public StressTest {
virtual ~NonBatchedOpsStressTest() {} virtual ~NonBatchedOpsStressTest() {}
void VerifyDb(ThreadState* thread) const override { void VerifyDb(ThreadState* thread) const override {
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
std::string ts_str; std::string ts_str;
Slice ts; Slice ts;

@ -24,7 +24,7 @@ namespace ROCKSDB_NAMESPACE {
Status FilePrefetchBuffer::Prefetch(const IOOptions& opts, Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
RandomAccessFileReader* reader, RandomAccessFileReader* reader,
uint64_t offset, size_t n, uint64_t offset, size_t n,
bool for_compaction) { Env::IOPriority rate_limiter_priority) {
if (!enable_ || reader == nullptr) { if (!enable_ || reader == nullptr) {
return Status::OK(); return Status::OK();
} }
@ -90,7 +90,8 @@ Status FilePrefetchBuffer::Prefetch(const IOOptions& opts,
Slice result; Slice result;
size_t read_len = static_cast<size_t>(roundup_len - chunk_len); size_t read_len = static_cast<size_t>(roundup_len - chunk_len);
s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result, s = reader->Read(opts, rounddown_offset + chunk_len, read_len, &result,
buffer_.BufferStart() + chunk_len, nullptr, for_compaction); buffer_.BufferStart() + chunk_len, nullptr,
rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -111,7 +112,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
RandomAccessFileReader* reader, RandomAccessFileReader* reader,
uint64_t offset, size_t n, uint64_t offset, size_t n,
Slice* result, Status* status, Slice* result, Status* status,
bool for_compaction) { Env::IOPriority rate_limiter_priority,
bool for_compaction /* = false */) {
if (track_min_offset_ && offset < min_offset_read_) { if (track_min_offset_ && offset < min_offset_read_) {
min_offset_read_ = static_cast<size_t>(offset); min_offset_read_ = static_cast<size_t>(offset);
} }
@ -132,7 +134,7 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
Status s; Status s;
if (for_compaction) { if (for_compaction) {
s = Prefetch(opts, reader, offset, std::max(n, readahead_size_), s = Prefetch(opts, reader, offset, std::max(n, readahead_size_),
for_compaction); rate_limiter_priority);
} else { } else {
if (implicit_auto_readahead_) { if (implicit_auto_readahead_) {
// Prefetch only if this read is sequential otherwise reset // Prefetch only if this read is sequential otherwise reset
@ -152,7 +154,8 @@ bool FilePrefetchBuffer::TryReadFromCache(const IOOptions& opts,
return false; return false;
} }
} }
s = Prefetch(opts, reader, offset, n + readahead_size_, for_compaction); s = Prefetch(opts, reader, offset, n + readahead_size_,
rate_limiter_priority);
} }
if (!s.ok()) { if (!s.ok()) {
if (status) { if (status) {

@ -68,12 +68,14 @@ class FilePrefetchBuffer {
num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1) {} num_file_reads_(kMinNumFileReadsToStartAutoReadahead + 1) {}
// Load data into the buffer from a file. // Load data into the buffer from a file.
// reader : the file reader. // reader : the file reader.
// offset : the file offset to start reading from. // offset : the file offset to start reading from.
// n : the number of bytes to read. // n : the number of bytes to read.
// for_compaction : if prefetch is done for compaction read. // rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to
// bypass.
Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n, bool for_compaction = false); uint64_t offset, size_t n,
Env::IOPriority rate_limiter_priority);
// Tries returning the data for a file read from this buffer if that data is // Tries returning the data for a file read from this buffer if that data is
// in the buffer. // in the buffer.
@ -81,15 +83,18 @@ class FilePrefetchBuffer {
// It also does the exponential readahead when readahead_size is set as part // It also does the exponential readahead when readahead_size is set as part
// of the constructor. // of the constructor.
// //
// opts : the IO options to use. // opts : the IO options to use.
// reader : the file reader. // reader : the file reader.
// offset : the file offset. // offset : the file offset.
// n : the number of bytes. // n : the number of bytes.
// result : output buffer to put the data into. // result : output buffer to put the data into.
// s : output status. // s : output status.
// for_compaction : true if cache read is done for compaction read. // rate_limiter_priority : rate limiting priority, or `Env::IO_TOTAL` to
// bypass.
// for_compaction : true if cache read is done for compaction read.
bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader, bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader,
uint64_t offset, size_t n, Slice* result, Status* s, uint64_t offset, size_t n, Slice* result, Status* s,
Env::IOPriority rate_limiter_priority,
bool for_compaction = false); bool for_compaction = false);
// The minimum `offset` ever passed to TryReadFromCache(). This will nly be // The minimum `offset` ever passed to TryReadFromCache(). This will nly be

@ -123,7 +123,8 @@ IOStatus GenerateOneFileChecksum(
const std::string& requested_checksum_func_name, std::string* file_checksum, const std::string& requested_checksum_func_name, std::string* file_checksum,
std::string* file_checksum_func_name, std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool allow_mmap_reads, size_t verify_checksums_readahead_size, bool allow_mmap_reads,
std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter) { std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter,
Env::IOPriority rate_limiter_priority) {
if (checksum_factory == nullptr) { if (checksum_factory == nullptr) {
return IOStatus::InvalidArgument("Checksum factory is invalid"); return IOStatus::InvalidArgument("Checksum factory is invalid");
} }
@ -195,7 +196,8 @@ IOStatus GenerateOneFileChecksum(
static_cast<size_t>(std::min(uint64_t{readahead_size}, size)); static_cast<size_t>(std::min(uint64_t{readahead_size}, size));
if (!prefetch_buffer.TryReadFromCache( if (!prefetch_buffer.TryReadFromCache(
opts, reader.get(), offset, bytes_to_read, &slice, opts, reader.get(), offset, bytes_to_read, &slice,
nullptr /* status */, false /* for_compaction */)) { nullptr /* status */, rate_limiter_priority,
false /* for_compaction */)) {
return IOStatus::Corruption("file read failed"); return IOStatus::Corruption("file read failed");
} }
if (slice.size() == 0) { if (slice.size() == 0) {

@ -51,20 +51,8 @@ extern IOStatus GenerateOneFileChecksum(
const std::string& requested_checksum_func_name, std::string* file_checksum, const std::string& requested_checksum_func_name, std::string* file_checksum,
std::string* file_checksum_func_name, std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool allow_mmap_reads, size_t verify_checksums_readahead_size, bool allow_mmap_reads,
std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter = nullptr); std::shared_ptr<IOTracer>& io_tracer, RateLimiter* rate_limiter,
Env::IOPriority rate_limiter_priority);
inline IOStatus GenerateOneFileChecksum(
const std::shared_ptr<FileSystem>& fs, const std::string& file_path,
FileChecksumGenFactory* checksum_factory,
const std::string& requested_checksum_func_name, std::string* file_checksum,
std::string* file_checksum_func_name,
size_t verify_checksums_readahead_size, bool allow_mmap_reads,
std::shared_ptr<IOTracer>& io_tracer) {
return GenerateOneFileChecksum(
fs.get(), file_path, checksum_factory, requested_checksum_func_name,
file_checksum, file_checksum_func_name, verify_checksums_readahead_size,
allow_mmap_reads, io_tracer);
}
inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro, inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro,
SystemClock* clock, IOOptions& opts) { SystemClock* clock, IOOptions& opts) {

@ -116,10 +116,10 @@ IOStatus RandomAccessFileReader::Create(
return io_s; return io_s;
} }
IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, IOStatus RandomAccessFileReader::Read(
size_t n, Slice* result, char* scratch, const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
AlignedBuf* aligned_buf, char* scratch, AlignedBuf* aligned_buf,
bool for_compaction) const { Env::IOPriority rate_limiter_priority) const {
(void)aligned_buf; (void)aligned_buf;
TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr); TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
@ -153,10 +153,11 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
buf.AllocateNewBuffer(read_size); buf.AllocateNewBuffer(read_size);
while (buf.CurrentSize() < read_size) { while (buf.CurrentSize() < read_size) {
size_t allowed; size_t allowed;
if (for_compaction && rate_limiter_ != nullptr) { if (rate_limiter_priority != Env::IO_TOTAL &&
rate_limiter_ != nullptr) {
allowed = rate_limiter_->RequestToken( allowed = rate_limiter_->RequestToken(
buf.Capacity() - buf.CurrentSize(), buf.Alignment(), buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead); rate_limiter_priority, stats_, RateLimiter::OpType::kRead);
} else { } else {
assert(buf.CurrentSize() == 0); assert(buf.CurrentSize() == 0);
allowed = read_size; allowed = read_size;
@ -212,12 +213,13 @@ IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
const char* res_scratch = nullptr; const char* res_scratch = nullptr;
while (pos < n) { while (pos < n) {
size_t allowed; size_t allowed;
if (for_compaction && rate_limiter_ != nullptr) { if (rate_limiter_priority != Env::IO_TOTAL &&
rate_limiter_ != nullptr) {
if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
sw.DelayStart(); sw.DelayStart();
} }
allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
Env::IOPriority::IO_LOW, stats_, rate_limiter_priority, stats_,
RateLimiter::OpType::kRead); RateLimiter::OpType::kRead);
if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
sw.DelayStop(); sw.DelayStop();
@ -311,10 +313,12 @@ bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
return true; return true;
} }
IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts, IOStatus RandomAccessFileReader::MultiRead(
FSReadRequest* read_reqs, const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs,
size_t num_reqs, AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const {
AlignedBuf* aligned_buf) const { if (rate_limiter_priority != Env::IO_TOTAL) {
return IOStatus::NotSupported("Unable to rate limit MultiRead()");
}
(void)aligned_buf; // suppress warning of unused variable in LITE mode (void)aligned_buf; // suppress warning of unused variable in LITE mode
assert(num_reqs > 0); assert(num_reqs > 0);

@ -139,17 +139,26 @@ class RandomAccessFileReader {
// 2. Otherwise, scratch is not used and can be null, the aligned_buf owns // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
// the internally allocated buffer on return, and the result refers to a // the internally allocated buffer on return, and the result refers to a
// region in aligned_buf. // region in aligned_buf.
//
// `rate_limiter_priority` is used to charge the internal rate limiter when
// enabled. The special value `Env::IO_TOTAL` makes this operation bypass the
// rate limiter.
IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result, IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
char* scratch, AlignedBuf* aligned_buf, char* scratch, AlignedBuf* aligned_buf,
bool for_compaction = false) const; Env::IOPriority rate_limiter_priority) const;
// REQUIRES: // REQUIRES:
// num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing. // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
// In non-direct IO mode, aligned_buf should be null; // In non-direct IO mode, aligned_buf should be null;
// In direct IO mode, aligned_buf stores the aligned buffer allocated inside // In direct IO mode, aligned_buf stores the aligned buffer allocated inside
// MultiRead, the result Slices in reqs refer to aligned_buf. // MultiRead, the result Slices in reqs refer to aligned_buf.
//
// `rate_limiter_priority` will be used to charge the internal rate limiter.
// It is not yet supported so the client must provide the special value
// `Env::IO_TOTAL` to bypass the rate limiter.
IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs, IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs,
size_t num_reqs, AlignedBuf* aligned_buf) const; size_t num_reqs, AlignedBuf* aligned_buf,
Env::IOPriority rate_limiter_priority) const;
IOStatus Prefetch(uint64_t offset, size_t n) const { IOStatus Prefetch(uint64_t offset, size_t n) const {
return file_->Prefetch(offset, n, IOOptions(), nullptr); return file_->Prefetch(offset, n, IOOptions(), nullptr);

@ -85,9 +85,9 @@ TEST_F(RandomAccessFileReaderTest, ReadDirectIO) {
size_t len = page_size / 3; size_t len = page_size / 3;
Slice result; Slice result;
AlignedBuf buf; AlignedBuf buf;
for (bool for_compaction : {true, false}) { for (Env::IOPriority rate_limiter_priority : {Env::IO_LOW, Env::IO_TOTAL}) {
ASSERT_OK(r->Read(IOOptions(), offset, len, &result, nullptr, &buf, ASSERT_OK(r->Read(IOOptions(), offset, len, &result, nullptr, &buf,
for_compaction)); rate_limiter_priority));
ASSERT_EQ(result.ToString(), content.substr(offset, len)); ASSERT_EQ(result.ToString(), content.substr(offset, len));
} }
} }
@ -138,8 +138,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r0)); reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1)); reqs.push_back(std::move(r1));
AlignedBuf aligned_buf; AlignedBuf aligned_buf;
ASSERT_OK( ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf,
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); Env::IO_TOTAL /* rate_limiter_priority */));
AssertResult(content, reqs); AssertResult(content, reqs);
@ -183,8 +183,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r1)); reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2)); reqs.push_back(std::move(r2));
AlignedBuf aligned_buf; AlignedBuf aligned_buf;
ASSERT_OK( ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf,
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); Env::IO_TOTAL /* rate_limiter_priority */));
AssertResult(content, reqs); AssertResult(content, reqs);
@ -228,8 +228,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r1)); reqs.push_back(std::move(r1));
reqs.push_back(std::move(r2)); reqs.push_back(std::move(r2));
AlignedBuf aligned_buf; AlignedBuf aligned_buf;
ASSERT_OK( ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf,
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); Env::IO_TOTAL /* rate_limiter_priority */));
AssertResult(content, reqs); AssertResult(content, reqs);
@ -265,8 +265,8 @@ TEST_F(RandomAccessFileReaderTest, MultiReadDirectIO) {
reqs.push_back(std::move(r0)); reqs.push_back(std::move(r0));
reqs.push_back(std::move(r1)); reqs.push_back(std::move(r1));
AlignedBuf aligned_buf; AlignedBuf aligned_buf;
ASSERT_OK( ASSERT_OK(r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf,
r->MultiRead(IOOptions(), reqs.data(), reqs.size(), &aligned_buf)); Env::IO_TOTAL /* rate_limiter_priority */));
AssertResult(content, reqs); AssertResult(content, reqs);

@ -491,9 +491,18 @@ struct DBOptions {
// Default: Env::Default() // Default: Env::Default()
Env* env = Env::Default(); Env* env = Env::Default();
// Use to control write/read rate of flush and compaction. Flush has higher // Limits internal file read/write bandwidth:
// priority than compaction. Rate limiting is disabled if nullptr. //
// If rate limiter is enabled, bytes_per_sync is set to 1MB by default. // - Flush requests write bandwidth at `Env::IOPriority::IO_HIGH`
// - Compaction requests read and write bandwidth at
// `Env::IOPriority::IO_LOW`
// - Reads associated with a `ReadOptions` can be charged at
// `ReadOptions::rate_limiter_priority` (see that option's API doc for usage
// and limitations).
//
// Rate limiting is disabled if nullptr. If rate limiter is enabled,
// bytes_per_sync is set to 1MB by default.
//
// Default: nullptr // Default: nullptr
std::shared_ptr<RateLimiter> rate_limiter = nullptr; std::shared_ptr<RateLimiter> rate_limiter = nullptr;
@ -1560,6 +1569,26 @@ struct ReadOptions {
// Default: false // Default: false
bool adaptive_readahead; bool adaptive_readahead;
// For file reads associated with this option, charge the internal rate
// limiter (see `DBOptions::rate_limiter`) at the specified priority. The
// special value `Env::IO_TOTAL` disables charging the rate limiter.
//
// The rate limiting is bypassed no matter this option's value for file reads
// on plain tables (these can exist when `ColumnFamilyOptions::table_factory`
// is a `PlainTableFactory`) and cuckoo tables (these can exist when
// `ColumnFamilyOptions::table_factory` is a `CuckooTableFactory`).
//
// The new `DB::MultiGet()` APIs (i.e., the ones returning `void`) will return
// `Status::NotSupported` when that operation requires file read(s) and
// `rate_limiter_priority != Env::IO_TOTAL`.
//
// The bytes charged to rate limiter may not exactly match the file read bytes
// since there are some seemingly insignificant reads, like for file
// headers/footers, that we currently do not charge to rate limiter.
//
// Default: `Env::IO_TOTAL`.
Env::IOPriority rate_limiter_priority = Env::IO_TOTAL;
ReadOptions(); ReadOptions();
ReadOptions(bool cksum, bool cache); ReadOptions(bool cksum, bool cache);
}; };

@ -445,6 +445,7 @@ TEST_MAIN_SOURCES = \
db/db_options_test.cc \ db/db_options_test.cc \
db/db_properties_test.cc \ db/db_properties_test.cc \
db/db_range_del_test.cc \ db/db_range_del_test.cc \
db/db_rate_limiter_test.cc \
db/db_secondary_test.cc \ db/db_secondary_test.cc \
db/db_sst_test.cc \ db/db_sst_test.cc \
db/db_statistics_test.cc \ db/db_statistics_test.cc \

@ -771,7 +771,9 @@ Status BlockBasedTable::PrefetchTail(
IOOptions opts; IOOptions opts;
Status s = file->PrepareIOOptions(ro, opts); Status s = file->PrepareIOOptions(ro, opts);
if (s.ok()) { if (s.ok()) {
s = (*prefetch_buffer)->Prefetch(opts, file, prefetch_off, prefetch_len); s = (*prefetch_buffer)
->Prefetch(opts, file, prefetch_off, prefetch_len,
ro.rate_limiter_priority);
} }
return s; return s;
} }
@ -1730,8 +1732,8 @@ void BlockBasedTable::RetrieveMultipleBlocks(
IOOptions opts; IOOptions opts;
IOStatus s = file->PrepareIOOptions(options, opts); IOStatus s = file->PrepareIOOptions(options, opts);
if (s.ok()) { if (s.ok()) {
s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), s = file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf,
&direct_io_buf); options.rate_limiter_priority);
} }
if (!s.ok()) { if (!s.ok()) {
// Discard all the results in this batch if there is any time out // Discard all the results in this batch if there is any time out
@ -2981,7 +2983,7 @@ Status BlockBasedTable::VerifyChecksumInBlocks(
BlockHandle handle = index_iter->value().handle; BlockHandle handle = index_iter->value().handle;
BlockContents contents; BlockContents contents;
BlockFetcher block_fetcher( BlockFetcher block_fetcher(
rep_->file.get(), &prefetch_buffer, rep_->footer, ReadOptions(), handle, rep_->file.get(), &prefetch_buffer, rep_->footer, read_options, handle,
&contents, rep_->ioptions, false /* decompress */, &contents, rep_->ioptions, false /* decompress */,
false /*maybe_compressed*/, BlockType::kData, false /*maybe_compressed*/, BlockType::kData,
UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options); UncompressionDict::GetEmptyDict(), rep_->persistent_cache_options);

@ -503,7 +503,8 @@ Status PartitionedFilterBlockReader::CacheDependencies(const ReadOptions& ro,
s = rep->file->PrepareIOOptions(ro, opts); s = rep->file->PrepareIOOptions(ro, opts);
if (s.ok()) { if (s.ok()) {
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len)); static_cast<size_t>(prefetch_len),
ro.rate_limiter_priority);
} }
if (!s.ok()) { if (!s.ok()) {
return s; return s;

@ -158,7 +158,8 @@ Status PartitionIndexReader::CacheDependencies(const ReadOptions& ro,
Status s = rep->file->PrepareIOOptions(ro, opts); Status s = rep->file->PrepareIOOptions(ro, opts);
if (s.ok()) { if (s.ok()) {
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off, s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
static_cast<size_t>(prefetch_len)); static_cast<size_t>(prefetch_len),
ro.rate_limiter_priority);
} }
if (!s.ok()) { if (!s.ok()) {
return s; return s;

@ -70,9 +70,9 @@ inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
IOOptions opts; IOOptions opts;
IOStatus io_s = file_->PrepareIOOptions(read_options_, opts); IOStatus io_s = file_->PrepareIOOptions(read_options_, opts);
if (io_s.ok() && if (io_s.ok() &&
prefetch_buffer_->TryReadFromCache(opts, file_, handle_.offset(), prefetch_buffer_->TryReadFromCache(
block_size_with_trailer_, &slice_, opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
&io_s, for_compaction_)) { &io_s, read_options_.rate_limiter_priority, for_compaction_)) {
ProcessTrailerIfPresent(); ProcessTrailerIfPresent();
if (!io_status_.ok()) { if (!io_status_.ok()) {
return true; return true;
@ -245,17 +245,17 @@ IOStatus BlockFetcher::ReadBlockContents() {
if (io_status_.ok()) { if (io_status_.ok()) {
if (file_->use_direct_io()) { if (file_->use_direct_io()) {
PERF_TIMER_GUARD(block_read_time); PERF_TIMER_GUARD(block_read_time);
io_status_ = io_status_ = file_->Read(
file_->Read(opts, handle_.offset(), block_size_with_trailer_, opts, handle_.offset(), block_size_with_trailer_, &slice_, nullptr,
&slice_, nullptr, &direct_io_buf_, for_compaction_); &direct_io_buf_, read_options_.rate_limiter_priority);
PERF_COUNTER_ADD(block_read_count, 1); PERF_COUNTER_ADD(block_read_count, 1);
used_buf_ = const_cast<char*>(slice_.data()); used_buf_ = const_cast<char*>(slice_.data());
} else { } else {
PrepareBufferForBlockFromFile(); PrepareBufferForBlockFromFile();
PERF_TIMER_GUARD(block_read_time); PERF_TIMER_GUARD(block_read_time);
io_status_ = io_status_ = file_->Read(opts, handle_.offset(),
file_->Read(opts, handle_.offset(), block_size_with_trailer_, block_size_with_trailer_, &slice_, used_buf_,
&slice_, used_buf_, nullptr, for_compaction_); nullptr, read_options_.rate_limiter_priority);
PERF_COUNTER_ADD(block_read_count, 1); PERF_COUNTER_ADD(block_read_count, 1);
#ifndef NDEBUG #ifndef NDEBUG
if (slice_.data() == &stack_buf_[0]) { if (slice_.data() == &stack_buf_[0]) {

@ -114,7 +114,8 @@ class CuckooBuilderTest : public testing::Test {
for (uint32_t i = 0; i + 1 < table_size + cuckoo_block_size; ++i) { for (uint32_t i = 0; i + 1 < table_size + cuckoo_block_size; ++i) {
Slice read_slice; Slice read_slice;
ASSERT_OK(file_reader->Read(IOOptions(), i * bucket_size, bucket_size, ASSERT_OK(file_reader->Read(IOOptions(), i * bucket_size, bucket_size,
&read_slice, nullptr, nullptr)); &read_slice, nullptr, nullptr,
Env::IO_TOTAL /* rate_limiter_priority */));
size_t key_idx = size_t key_idx =
std::find(expected_locations.begin(), expected_locations.end(), i) - std::find(expected_locations.begin(), expected_locations.end(), i) -
expected_locations.begin(); expected_locations.begin();

@ -141,8 +141,10 @@ CuckooTableReader::CuckooTableReader(
cuckoo_block_size_ = *reinterpret_cast<const uint32_t*>( cuckoo_block_size_ = *reinterpret_cast<const uint32_t*>(
cuckoo_block_size->second.data()); cuckoo_block_size->second.data());
cuckoo_block_bytes_minus_one_ = cuckoo_block_size_ * bucket_length_ - 1; cuckoo_block_bytes_minus_one_ = cuckoo_block_size_ * bucket_length_ - 1;
status_ = file_->Read(IOOptions(), 0, static_cast<size_t>(file_size), // TODO: rate limit reads of whole cuckoo tables.
&file_data_, nullptr, nullptr); status_ =
file_->Read(IOOptions(), 0, static_cast<size_t>(file_size), &file_data_,
nullptr, nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
} }
Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/, Status CuckooTableReader::Get(const ReadOptions& /*readOptions*/,

@ -369,17 +369,20 @@ Status ReadFooterFromFile(const IOOptions& opts, RandomAccessFileReader* file,
// the required data is not in the prefetch buffer. Once deadline is enabled // the required data is not in the prefetch buffer. Once deadline is enabled
// for iterator, TryReadFromCache might do a readahead. Revisit to see if we // for iterator, TryReadFromCache might do a readahead. Revisit to see if we
// need to pass a timeout at that point // need to pass a timeout at that point
// TODO: rate limit footer reads.
if (prefetch_buffer == nullptr || if (prefetch_buffer == nullptr ||
!prefetch_buffer->TryReadFromCache(IOOptions(), file, read_offset, !prefetch_buffer->TryReadFromCache(
Footer::kMaxEncodedLength, IOOptions(), file, read_offset, Footer::kMaxEncodedLength,
&footer_input, nullptr)) { &footer_input, nullptr, Env::IO_TOTAL /* rate_limiter_priority */)) {
if (file->use_direct_io()) { if (file->use_direct_io()) {
s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
&footer_input, nullptr, &internal_buf); &footer_input, nullptr, &internal_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
} else { } else {
footer_buf.reserve(Footer::kMaxEncodedLength); footer_buf.reserve(Footer::kMaxEncodedLength);
s = file->Read(opts, read_offset, Footer::kMaxEncodedLength, s = file->Read(opts, read_offset, Footer::kMaxEncodedLength,
&footer_input, &footer_buf[0], nullptr); &footer_input, &footer_buf[0], nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
} }
if (!s.ok()) return s; if (!s.ok()) return s;
} }

@ -291,7 +291,8 @@ Status MockTableFactory::GetIDFromFile(RandomAccessFileReader* file,
uint32_t* id) const { uint32_t* id) const {
char buf[4]; char buf[4];
Slice result; Slice result;
Status s = file->Read(IOOptions(), 0, 4, &result, buf, nullptr); Status s = file->Read(IOOptions(), 0, 4, &result, buf, nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
assert(result.size() == 4); assert(result.size() == 4);
*id = DecodeFixed32(buf); *id = DecodeFixed32(buf);
return s; return s;

@ -212,9 +212,11 @@ bool PlainTableFileReader::ReadNonMmap(uint32_t file_offset, uint32_t len,
new_buffer->buf_len = 0; new_buffer->buf_len = 0;
} }
Slice read_result; Slice read_result;
// TODO: rate limit plain table reads.
Status s = Status s =
file_info_->file->Read(IOOptions(), file_offset, size_to_read, file_info_->file->Read(IOOptions(), file_offset, size_to_read,
&read_result, new_buffer->buf.get(), nullptr); &read_result, new_buffer->buf.get(), nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
status_ = s; status_ = s;
return false; return false;

@ -288,9 +288,9 @@ void PlainTableReader::FillBloom(const std::vector<uint32_t>& prefix_hashes) {
Status PlainTableReader::MmapDataIfNeeded() { Status PlainTableReader::MmapDataIfNeeded() {
if (file_info_.is_mmap_mode) { if (file_info_.is_mmap_mode) {
// Get mmapped memory. // Get mmapped memory.
return file_info_.file->Read(IOOptions(), 0, return file_info_.file->Read(
static_cast<size_t>(file_size_), IOOptions(), 0, static_cast<size_t>(file_size_), &file_info_.file_data,
&file_info_.file_data, nullptr, nullptr); nullptr, nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
} }
return Status::OK(); return Status::OK();
} }

@ -107,7 +107,8 @@ Status SstFileDumper::GetTableReader(const std::string& file_path) {
uint64_t prefetch_off = file_size - prefetch_size; uint64_t prefetch_off = file_size - prefetch_size;
IOOptions opts; IOOptions opts;
s = prefetch_buffer.Prefetch(opts, file_.get(), prefetch_off, s = prefetch_buffer.Prefetch(opts, file_.get(), prefetch_off,
static_cast<size_t>(prefetch_size)); static_cast<size_t>(prefetch_size),
Env::IO_TOTAL /* rate_limiter_priority */);
s = ReadFooterFromFile(opts, file_.get(), &prefetch_buffer, file_size, s = ReadFooterFromFile(opts, file_.get(), &prefetch_buffer, file_size,
&footer); &footer);

@ -1319,7 +1319,7 @@ class FileChecksumTestHelper {
uint64_t offset = 0; uint64_t offset = 0;
Status s; Status s;
s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(), s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(),
nullptr, false); nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -1327,7 +1327,8 @@ class FileChecksumTestHelper {
file_checksum_generator->Update(scratch.get(), result.size()); file_checksum_generator->Update(scratch.get(), result.size());
offset += static_cast<uint64_t>(result.size()); offset += static_cast<uint64_t>(result.size());
s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(), s = file_reader_->Read(IOOptions(), offset, 2048, &result, scratch.get(),
nullptr, false); nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -5001,13 +5002,16 @@ TEST_F(BBTTailPrefetchTest, FilePrefetchBufferMinOffset) {
IOOptions opts; IOOptions opts;
buffer.TryReadFromCache(opts, nullptr /* reader */, 500 /* offset */, buffer.TryReadFromCache(opts, nullptr /* reader */, 500 /* offset */,
10 /* n */, nullptr /* result */, 10 /* n */, nullptr /* result */,
nullptr /* status */); nullptr /* status */,
Env::IO_TOTAL /* rate_limiter_priority */);
buffer.TryReadFromCache(opts, nullptr /* reader */, 480 /* offset */, buffer.TryReadFromCache(opts, nullptr /* reader */, 480 /* offset */,
10 /* n */, nullptr /* result */, 10 /* n */, nullptr /* result */,
nullptr /* status */); nullptr /* status */,
Env::IO_TOTAL /* rate_limiter_priority */);
buffer.TryReadFromCache(opts, nullptr /* reader */, 490 /* offset */, buffer.TryReadFromCache(opts, nullptr /* reader */, 490 /* offset */,
10 /* n */, nullptr /* result */, 10 /* n */, nullptr /* result */,
nullptr /* status */); nullptr /* status */,
Env::IO_TOTAL /* rate_limiter_priority */);
ASSERT_EQ(480, buffer.min_offset_read()); ASSERT_EQ(480, buffer.min_offset_read());
} }

@ -77,6 +77,7 @@
#include "util/cast_util.h" #include "util/cast_util.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/file_checksum_helper.h"
#include "util/gflags_compat.h" #include "util/gflags_compat.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
@ -1077,6 +1078,14 @@ DEFINE_bool(adaptive_readahead, false,
"carry forward internal auto readahead size from one file to next " "carry forward internal auto readahead size from one file to next "
"file at each level during iteration"); "file at each level during iteration");
DEFINE_bool(rate_limit_user_ops, false,
"When true use Env::IO_USER priority level to charge internal rate "
"limiter for reads associated with user operations.");
DEFINE_bool(file_checksum, false,
"When true use FileChecksumGenCrc32cFactory for "
"file_checksum_gen_factory.");
static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType(
const char* ctype) { const char* ctype) {
assert(ctype); assert(ctype);
@ -3047,6 +3056,8 @@ class Benchmark {
read_options_ = ReadOptions(FLAGS_verify_checksum, true); read_options_ = ReadOptions(FLAGS_verify_checksum, true);
read_options_.total_order_seek = FLAGS_total_order_seek; read_options_.total_order_seek = FLAGS_total_order_seek;
read_options_.prefix_same_as_start = FLAGS_prefix_same_as_start; read_options_.prefix_same_as_start = FLAGS_prefix_same_as_start;
read_options_.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
read_options_.tailing = FLAGS_use_tailing_iterator; read_options_.tailing = FLAGS_use_tailing_iterator;
read_options_.readahead_size = FLAGS_readahead_size; read_options_.readahead_size = FLAGS_readahead_size;
read_options_.adaptive_readahead = FLAGS_adaptive_readahead; read_options_.adaptive_readahead = FLAGS_adaptive_readahead;
@ -3313,6 +3324,12 @@ class Benchmark {
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} else if (name == "getmergeoperands") { } else if (name == "getmergeoperands") {
method = &Benchmark::GetMergeOperands; method = &Benchmark::GetMergeOperands;
#ifndef ROCKSDB_LITE
} else if (name == "verifychecksum") {
method = &Benchmark::VerifyChecksum;
} else if (name == "verifyfilechecksums") {
method = &Benchmark::VerifyFileChecksums;
#endif // ROCKSDB_LITE
} else if (!name.empty()) { // No error message for empty name } else if (!name.empty()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.c_str()); fprintf(stderr, "unknown benchmark '%s'\n", name.c_str());
ErrorExit(); ErrorExit();
@ -4267,6 +4284,11 @@ class Benchmark {
options.listeners.emplace_back(listener_); options.listeners.emplace_back(listener_);
if (FLAGS_file_checksum) {
options.file_checksum_gen_factory.reset(
new FileChecksumGenCrc32cFactory());
}
if (FLAGS_num_multi_db <= 1) { if (FLAGS_num_multi_db <= 1) {
OpenDb(options, FLAGS_db, &db_); OpenDb(options, FLAGS_db, &db_);
} else { } else {
@ -7207,6 +7229,35 @@ class Benchmark {
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
void VerifyChecksum(ThreadState* thread) {
DB* db = SelectDB(thread);
ReadOptions ro;
ro.adaptive_readahead = FLAGS_adaptive_readahead;
ro.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
ro.readahead_size = FLAGS_readahead_size;
Status s = db->VerifyChecksum(ro);
if (!s.ok()) {
fprintf(stderr, "VerifyChecksum() failed: %s\n", s.ToString().c_str());
exit(1);
}
}
void VerifyFileChecksums(ThreadState* thread) {
DB* db = SelectDB(thread);
ReadOptions ro;
ro.adaptive_readahead = FLAGS_adaptive_readahead;
ro.rate_limiter_priority =
FLAGS_rate_limit_user_ops ? Env::IO_USER : Env::IO_TOTAL;
ro.readahead_size = FLAGS_readahead_size;
Status s = db->VerifyFileChecksums(ro);
if (!s.ok()) {
fprintf(stderr, "VerifyFileChecksums() failed: %s\n",
s.ToString().c_str());
exit(1);
}
}
// This benchmark stress tests Transactions. For a given --duration (or // This benchmark stress tests Transactions. For a given --duration (or
// total number of --writes, a Transaction will perform a read-modify-write // total number of --writes, a Transaction will perform a read-modify-write
// to increment the value of a key in each of N(--transaction-sets) sets of // to increment the value of a key in each of N(--transaction-sets) sets of

@ -1541,15 +1541,16 @@ Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
{ {
StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); StopWatch read_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
// TODO: rate limit old blob DB file reads.
if (reader->use_direct_io()) { if (reader->use_direct_io()) {
s = reader->Read(IOOptions(), record_offset, s = reader->Read(IOOptions(), record_offset,
static_cast<size_t>(record_size), &blob_record, nullptr, static_cast<size_t>(record_size), &blob_record, nullptr,
&aligned_buf); &aligned_buf, Env::IO_TOTAL /* rate_limiter_priority */);
} else { } else {
buf.reserve(static_cast<size_t>(record_size)); buf.reserve(static_cast<size_t>(record_size));
s = reader->Read(IOOptions(), record_offset, s = reader->Read(IOOptions(), record_offset,
static_cast<size_t>(record_size), &blob_record, &buf[0], static_cast<size_t>(record_size), &blob_record, &buf[0],
nullptr); nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
} }
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size()); RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
} }

@ -103,8 +103,8 @@ Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) {
} }
buffer_.reset(new char[buffer_size_]); buffer_.reset(new char[buffer_size_]);
} }
Status s = Status s = reader_->Read(IOOptions(), offset, size, result, buffer_.get(),
reader_->Read(IOOptions(), offset, size, result, buffer_.get(), nullptr); nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

@ -112,13 +112,16 @@ Status BlobFile::ReadFooter(BlobLogFooter* bf) {
std::string buf; std::string buf;
AlignedBuf aligned_buf; AlignedBuf aligned_buf;
Status s; Status s;
// TODO: rate limit reading footers from blob files.
if (ra_file_reader_->use_direct_io()) { if (ra_file_reader_->use_direct_io()) {
s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize,
&result, nullptr, &aligned_buf); &result, nullptr, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
} else { } else {
buf.reserve(BlobLogFooter::kSize + 10); buf.reserve(BlobLogFooter::kSize + 10);
s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize,
&result, &buf[0], nullptr); &result, &buf[0], nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
} }
if (!s.ok()) return s; if (!s.ok()) return s;
if (result.size() != BlobLogFooter::kSize) { if (result.size() != BlobLogFooter::kSize) {
@ -235,13 +238,16 @@ Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
std::string header_buf; std::string header_buf;
AlignedBuf aligned_buf; AlignedBuf aligned_buf;
Slice header_slice; Slice header_slice;
// TODO: rate limit reading headers from blob files.
if (file_reader->use_direct_io()) { if (file_reader->use_direct_io()) {
s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice,
nullptr, &aligned_buf); nullptr, &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
} else { } else {
header_buf.reserve(BlobLogHeader::kSize); header_buf.reserve(BlobLogHeader::kSize);
s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice,
&header_buf[0], nullptr); &header_buf[0], nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
} }
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_, ROCKS_LOG_ERROR(info_log_,
@ -275,15 +281,17 @@ Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs,
} }
std::string footer_buf; std::string footer_buf;
Slice footer_slice; Slice footer_slice;
// TODO: rate limit reading footers from blob files.
if (file_reader->use_direct_io()) { if (file_reader->use_direct_io()) {
s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize,
BlobLogFooter::kSize, &footer_slice, nullptr, BlobLogFooter::kSize, &footer_slice, nullptr,
&aligned_buf); &aligned_buf,
Env::IO_TOTAL /* rate_limiter_priority */);
} else { } else {
footer_buf.reserve(BlobLogFooter::kSize); footer_buf.reserve(BlobLogFooter::kSize);
s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize,
BlobLogFooter::kSize, &footer_slice, &footer_buf[0], BlobLogFooter::kSize, &footer_slice, &footer_buf[0],
nullptr); nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
} }
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(info_log_, ROCKS_LOG_ERROR(info_log_,

@ -258,7 +258,8 @@ class FromFileCacheDumpReader : public CacheDumpReader {
while (to_read > 0) { while (to_read > 0) {
io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, io_s = file_reader_->Read(IOOptions(), offset_, to_read, &result_,
buffer_, nullptr); buffer_, nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!io_s.ok()) { if (!io_s.ok()) {
return io_s; return io_s;
} }

@ -238,7 +238,7 @@ bool RandomAccessCacheFile::Read(const LBA& lba, Slice* key, Slice* val,
Slice result; Slice result;
Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch, Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch,
nullptr); nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
Error(log_, "Error reading from file %s. %s", Path().c_str(), Error(log_, "Error reading from file %s. %s", Path().c_str(),
s.ToString().c_str()); s.ToString().c_str());

@ -42,7 +42,8 @@ Status FileTraceReader::Reset() {
Status FileTraceReader::Read(std::string* data) { Status FileTraceReader::Read(std::string* data) {
assert(file_reader_ != nullptr); assert(file_reader_ != nullptr);
Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize, Status s = file_reader_->Read(IOOptions(), offset_, kTraceMetadataSize,
&result_, buffer_, nullptr); &result_, buffer_, nullptr,
Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
@ -67,7 +68,7 @@ Status FileTraceReader::Read(std::string* data) {
bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read; bytes_to_read > kBufferSize ? kBufferSize : bytes_to_read;
while (to_read > 0) { while (to_read > 0) {
s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, buffer_, s = file_reader_->Read(IOOptions(), offset_, to_read, &result_, buffer_,
nullptr); nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

Loading…
Cancel
Save