diff --git a/db/log_reader.cc b/db/log_reader.cc index fbf50d7c1..0c1852e82 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -303,8 +303,14 @@ void Reader::UnmarkEOFInternal() { } Slice read_buffer; - Status status = file_->Read(remaining, &read_buffer, - backing_store_ + eof_offset_); + // TODO: rate limit log reader with approriate priority. + // TODO: avoid overcharging rate limiter: + // Note that the Read here might overcharge SequentialFileReader's internal + // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough + // content left until EOF to read. + Status status = + file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_, + Env::IO_TOTAL /* rate_limiter_priority */); size_t added = read_buffer.size(); end_of_buffer_offset_ += added; @@ -349,7 +355,13 @@ bool Reader::ReadMore(size_t* drop_size, int *error) { if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); - Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + // TODO: rate limit log reader with approriate priority. + // TODO: avoid overcharging rate limiter: + // Note that the Read here might overcharge SequentialFileReader's internal + // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough + // content left until EOF to read. + Status status = file_->Read(kBlockSize, &buffer_, backing_store_, + Env::IO_TOTAL /* rate_limiter_priority */); TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status); end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { @@ -639,7 +651,13 @@ bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) { if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); - Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + // TODO: rate limit log reader with approriate priority. + // TODO: avoid overcharging rate limiter: + // Note that the Read here might overcharge SequentialFileReader's internal + // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough + // content left until EOF to read. + Status status = file_->Read(kBlockSize, &buffer_, backing_store_, + Env::IO_TOTAL /* rate_limiter_priority */); end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { buffer_.clear(); diff --git a/db/repair.cc b/db/repair.cc index 4515be84b..f0beb3ff4 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -358,7 +358,7 @@ class Repairer { std::unique_ptr lfile_reader; Status status = SequentialFileReader::Create( fs, logname, fs->OptimizeForLogRead(file_options_), &lfile_reader, - nullptr); + nullptr /* dbg */, nullptr /* rate limiter */); if (!status.ok()) { return status; } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 15cd8a28d..044adc2c5 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -67,8 +67,9 @@ Status TransactionLogIteratorImpl::OpenLogFile( } } if (s.ok()) { - file_reader->reset(new SequentialFileReader( - std::move(file), fname, io_tracer_, options_->listeners)); + file_reader->reset(new SequentialFileReader(std::move(file), fname, + io_tracer_, options_->listeners, + options_->rate_limiter.get())); } return s; } diff --git a/env/io_posix.h b/env/io_posix.h index 0ff787a70..1aacd75a3 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -187,8 +187,7 @@ class PosixSequentialFile : public FSSequentialFile { public: PosixSequentialFile(const std::string& fname, FILE* file, int fd, - size_t logical_block_size, - const EnvOptions& options); + size_t logical_block_size, const EnvOptions& options); virtual ~PosixSequentialFile(); virtual IOStatus Read(size_t n, const IOOptions& opts, Slice* result, diff --git a/file/file_util.cc b/file/file_util.cc index 011f12455..d7858f3c8 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -49,7 +49,10 @@ IOStatus CopyFile(FileSystem* fs, const std::string& source, Slice slice; while (size > 0) { size_t bytes_to_read = std::min(sizeof(buffer), static_cast(size)); - io_s = status_to_io_status(src_reader->Read(bytes_to_read, &slice, buffer)); + // TODO: rate limit copy file + io_s = status_to_io_status( + src_reader->Read(bytes_to_read, &slice, buffer, + Env::IO_TOTAL /* rate_limiter_priority */)); if (!io_s.ok()) { return io_s; } diff --git a/file/line_file_reader.cc b/file/line_file_reader.cc index af00d7780..50c415dc6 100644 --- a/file/line_file_reader.cc +++ b/file/line_file_reader.cc @@ -15,16 +15,20 @@ IOStatus LineFileReader::Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, - IODebugContext* dbg) { + IODebugContext* dbg, + RateLimiter* rate_limiter) { std::unique_ptr file; IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg); if (io_s.ok()) { - reader->reset(new LineFileReader(std::move(file), fname)); + reader->reset(new LineFileReader( + std::move(file), fname, nullptr, + std::vector>{}, rate_limiter)); } return io_s; } -bool LineFileReader::ReadLine(std::string* out) { +bool LineFileReader::ReadLine(std::string* out, + Env::IOPriority rate_limiter_priority) { assert(out); if (!io_status_.ok()) { // Status should be checked (or permit unchecked) any time we return false. @@ -50,7 +54,8 @@ bool LineFileReader::ReadLine(std::string* out) { // else flush and reload buffer out->append(buf_begin_, buf_end_ - buf_begin_); Slice result; - io_status_ = sfr_.Read(buf_.size(), &result, buf_.data()); + io_status_ = + sfr_.Read(buf_.size(), &result, buf_.data(), rate_limiter_priority); IOSTATS_ADD(bytes_read, result.size()); if (!io_status_.ok()) { io_status_.MustCheck(); diff --git a/file/line_file_reader.h b/file/line_file_reader.h index 4b4a9d564..cc302d311 100644 --- a/file/line_file_reader.h +++ b/file/line_file_reader.h @@ -32,7 +32,7 @@ class LineFileReader { static IOStatus Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, - IODebugContext* dbg); + IODebugContext* dbg, RateLimiter* rate_limiter); LineFileReader(const LineFileReader&) = delete; LineFileReader& operator=(const LineFileReader&) = delete; @@ -41,7 +41,8 @@ class LineFileReader { // the line to `out`, without delimiter, or returning false on failure. You // must check GetStatus() to determine whether the failure was just // end-of-file (OK status) or an I/O error (another status). - bool ReadLine(std::string* out); + // The internal rate limiter will be charged at the specified priority. + bool ReadLine(std::string* out, Env::IOPriority rate_limiter_priority); // Returns the number of the line most recently returned from ReadLine. // Return value is unspecified if ReadLine has returned false due to diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index a919b6298..44580e4aa 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -283,7 +283,7 @@ IOStatus RandomAccessFileReader::MultiRead( #endif // !NDEBUG // To be paranoid modify scratch a little bit, so in case underlying - // FileSystem doesn't fill the buffer but return succee and `scratch` returns + // FileSystem doesn't fill the buffer but return success and `scratch` returns // contains a previous block, returned value will not pass checksum. // This byte might not change anything for direct I/O case, but it's OK. for (size_t i = 0; i < num_reqs; i++) { diff --git a/file/sequence_file_reader.cc b/file/sequence_file_reader.cc index e8591589c..d51d5be46 100644 --- a/file/sequence_file_reader.cc +++ b/file/sequence_file_reader.cc @@ -25,16 +25,18 @@ namespace ROCKSDB_NAMESPACE { IOStatus SequentialFileReader::Create( const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, - IODebugContext* dbg) { + IODebugContext* dbg, RateLimiter* rate_limiter) { std::unique_ptr file; IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg); if (io_s.ok()) { - reader->reset(new SequentialFileReader(std::move(file), fname)); + reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {}, + rate_limiter)); } return io_s; } -IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { +IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch, + Env::IOPriority rate_limiter_priority) { IOStatus io_s; if (use_direct_io()) { #ifndef ROCKSDB_LITE @@ -55,30 +57,48 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { buf.Alignment(alignment); buf.AllocateNewBuffer(size); - Slice tmp; - uint64_t orig_offset = 0; - FileOperationInfo::StartTimePoint start_ts; - if (ShouldNotifyListeners()) { - orig_offset = aligned_offset + buf.CurrentSize(); - start_ts = FileOperationInfo::StartNow(); + while (buf.CurrentSize() < size) { + size_t allowed; + if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + buf.Capacity() - buf.CurrentSize(), buf.Alignment(), + rate_limiter_priority, nullptr /* stats */, + RateLimiter::OpType::kRead); + } else { + assert(buf.CurrentSize() == 0); + allowed = size; + } + + Slice tmp; + uint64_t orig_offset = 0; + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + orig_offset = aligned_offset + buf.CurrentSize(); + start_ts = FileOperationInfo::StartNow(); + } + io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed, + IOOptions(), &tmp, buf.Destination(), + nullptr /* dbg */); + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, + io_s); + } + buf.Size(buf.CurrentSize() + tmp.size()); + if (!io_s.ok() || tmp.size() < allowed) { + break; + } } - io_s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp, - buf.BufferStart(), nullptr); - if (io_s.ok() && offset_advance < tmp.size()) { - buf.Size(tmp.size()); + + if (io_s.ok() && offset_advance < buf.CurrentSize()) { r = buf.Read(scratch, offset_advance, - std::min(tmp.size() - offset_advance, n)); + std::min(buf.CurrentSize() - offset_advance, n)); } *result = Slice(scratch, r); - if (ShouldNotifyListeners()) { - auto finish_ts = FileOperationInfo::FinishNow(); - NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, - io_s); - } #endif // !ROCKSDB_LITE } else { // To be paranoid, modify scratch a little bit, so in case underlying - // FileSystem doesn't fill the buffer but return succee and `scratch` + // FileSystem doesn't fill the buffer but return success and `scratch` // returns contains a previous block, returned value will not pass // checksum. // It's hard to find useful byte for direct I/O case, so we skip it. @@ -86,22 +106,38 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { scratch[0]++; } + size_t read = 0; + while (read < n) { + size_t allowed; + if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + n - read, 0 /* alignment */, rate_limiter_priority, + nullptr /* stats */, RateLimiter::OpType::kRead); + } else { + allowed = n; + } #ifndef ROCKSDB_LITE - FileOperationInfo::StartTimePoint start_ts; - if (ShouldNotifyListeners()) { - start_ts = FileOperationInfo::StartNow(); - } + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = FileOperationInfo::StartNow(); + } #endif - - io_s = file_->Read(n, IOOptions(), result, scratch, nullptr); - + Slice tmp; + io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read, + nullptr /* dbg */); #ifndef ROCKSDB_LITE - if (ShouldNotifyListeners()) { - auto finish_ts = FileOperationInfo::FinishNow(); - size_t offset = offset_.fetch_add(result->size()); - NotifyOnFileReadFinish(offset, result->size(), start_ts, finish_ts, io_s); - } + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + size_t offset = offset_.fetch_add(tmp.size()); + NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s); + } #endif + read += tmp.size(); + if (!io_s.ok() || tmp.size() < allowed) { + break; + } + } + *result = Slice(scratch, read); } IOSTATS_ADD(bytes_read, result->size()); return io_s; diff --git a/file/sequence_file_reader.h b/file/sequence_file_reader.h index 00051c080..baea10eb7 100644 --- a/file/sequence_file_reader.h +++ b/file/sequence_file_reader.h @@ -57,15 +57,19 @@ class SequentialFileReader { FSSequentialFilePtr file_; std::atomic offset_{0}; // read offset std::vector> listeners_{}; + RateLimiter* rate_limiter_; public: explicit SequentialFileReader( std::unique_ptr&& _file, const std::string& _file_name, const std::shared_ptr& io_tracer = nullptr, - const std::vector>& listeners = {}) + const std::vector>& listeners = {}, + RateLimiter* rate_limiter = + nullptr) // TODO: migrate call sites to provide rate limiter : file_name_(_file_name), file_(std::move(_file), io_tracer, _file_name), - listeners_() { + listeners_(), + rate_limiter_(rate_limiter) { #ifndef ROCKSDB_LITE AddFileIOListeners(listeners); #else @@ -77,11 +81,14 @@ class SequentialFileReader { std::unique_ptr&& _file, const std::string& _file_name, size_t _readahead_size, const std::shared_ptr& io_tracer = nullptr, - const std::vector>& listeners = {}) + const std::vector>& listeners = {}, + RateLimiter* rate_limiter = + nullptr) // TODO: migrate call sites to provide rate limiter : file_name_(_file_name), file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size), io_tracer, _file_name), - listeners_() { + listeners_(), + rate_limiter_(rate_limiter) { #ifndef ROCKSDB_LITE AddFileIOListeners(listeners); #else @@ -91,12 +98,19 @@ class SequentialFileReader { static IOStatus Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, - IODebugContext* dbg); + IODebugContext* dbg, RateLimiter* rate_limiter); SequentialFileReader(const SequentialFileReader&) = delete; SequentialFileReader& operator=(const SequentialFileReader&) = delete; - IOStatus Read(size_t n, Slice* result, char* scratch); + // `rate_limiter_priority` is used to charge the internal rate limiter when + // enabled. The special value `Env::IO_TOTAL` makes this operation bypass the + // rate limiter. The amount charged to the internal rate limiter is n, even + // when less than n bytes are actually read (e.g. at end of file). To avoid + // overcharging the rate limiter, the caller can use file size to cap n to + // read until end of file. + IOStatus Read(size_t n, Slice* result, char* scratch, + Env::IOPriority rate_limiter_priority); IOStatus Skip(uint64_t n); diff --git a/options/options_parser.cc b/options/options_parser.cc index 73ccbdc4e..40ae47487 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -271,7 +271,7 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in, std::unordered_map opt_map; std::string line; // we only support single-lined statement. - while (lf_reader.ReadLine(&line)) { + while (lf_reader.ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { int line_num = static_cast(lf_reader.GetLineNumber()); line = TrimAndRemoveComment(line); if (line.empty()) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index ef4203fbf..f59275998 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -60,6 +60,7 @@ #include "rocksdb/slice_transform.h" #include "rocksdb/stats_history.h" #include "rocksdb/table.h" +#include "rocksdb/utilities/backup_engine.h" #include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/options_type.h" @@ -159,8 +160,10 @@ IF_ROCKSDB_LITE("", "randomtransaction," "randomreplacekeys," "timeseries," - "getmergeoperands", + "getmergeoperands,", "readrandomoperands," + "backup," + "restore" "Comma-separated list of operations to run in the specified" " order. Available benchmarks:\n" @@ -250,7 +253,10 @@ IF_ROCKSDB_LITE("", "\treadrandomoperands -- read random keys using `GetMergeOperands()`. An " "operation includes a rare but possible retry in case it got " "`Status::Incomplete()`. This happens upon encountering more keys than " - "have ever been seen by the thread (or eight initially)\n"); + "have ever been seen by the thread (or eight initially)\n" + "\tbackup -- Create a backup of the current DB and verify that a new backup is corrected. " + "Rate limit can be specified through --backup_rate_limit\n" + "\trestore -- Restore the DB from the latest backup available, rate limit can be specified through --restore_rate_limit\n"); DEFINE_int64(num, 1000000, "Number of key/values to place in database"); @@ -1146,6 +1152,22 @@ DEFINE_bool(charge_table_reader, false, "CacheEntryRoleOptions::charged of" "CacheEntryRole::kBlockBasedTableReader"); +DEFINE_uint64(backup_rate_limit, 0ull, + "If non-zero, db_bench will rate limit reads and writes for DB " + "backup. This " + "is the global rate in ops/second."); + +DEFINE_uint64(restore_rate_limit, 0ull, + "If non-zero, db_bench will rate limit reads and writes for DB " + "restore. This " + "is the global rate in ops/second."); + +DEFINE_string(backup_dir, "", + "If not empty string, use the given dir for backup."); + +DEFINE_string(restore_dir, "", + "If not empty string, use the given dir for restore."); + static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( const char* ctype) { assert(ctype); @@ -3512,6 +3534,12 @@ class Benchmark { } else if (name == "readrandomoperands") { read_operands_ = true; method = &Benchmark::ReadRandom; +#ifndef ROCKSDB_LITE + } else if (name == "backup") { + method = &Benchmark::Backup; + } else if (name == "restore") { + method = &Benchmark::Restore; +#endif } else if (!name.empty()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.c_str()); ErrorExit(); @@ -3544,6 +3572,13 @@ class Benchmark { fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); #ifndef ROCKSDB_LITE + if (name == "backup") { + std::cout << "Backup path: [" << FLAGS_backup_dir << "]" << std::endl; + } else if (name == "restore") { + std::cout << "Backup path: [" << FLAGS_backup_dir << "]" << std::endl; + std::cout << "Restore path: [" << FLAGS_restore_dir << "]" + << std::endl; + } // A trace_file option can be provided both for trace and replay // operations. But db_bench does not support tracing and replaying at // the same time, for now. So, start tracing only when it is not a @@ -8224,6 +8259,47 @@ class Benchmark { } } + void Backup(ThreadState* thread) { + DB* db = SelectDB(thread); + std::unique_ptr engine_options( + new BackupEngineOptions(FLAGS_backup_dir)); + Status s; + BackupEngine* backup_engine; + if (FLAGS_backup_rate_limit > 0) { + engine_options->backup_rate_limiter.reset(NewGenericRateLimiter( + FLAGS_backup_rate_limit, 100000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo)); + } + // Build new backup of the entire DB + engine_options->destroy_old_data = true; + s = BackupEngine::Open(FLAGS_env, *engine_options, &backup_engine); + assert(s.ok()); + s = backup_engine->CreateNewBackup(db); + assert(s.ok()); + std::vector backup_info; + backup_engine->GetBackupInfo(&backup_info); + // Verify that a new backup is created + assert(backup_info.size() == 1); + } + + void Restore(ThreadState* /* thread */) { + std::unique_ptr engine_options( + new BackupEngineOptions(FLAGS_backup_dir)); + if (FLAGS_restore_rate_limit > 0) { + engine_options->restore_rate_limiter.reset(NewGenericRateLimiter( + FLAGS_restore_rate_limit, 100000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo)); + } + BackupEngineReadOnly* backup_engine; + Status s = + BackupEngineReadOnly::Open(FLAGS_env, *engine_options, &backup_engine); + assert(s.ok()); + s = backup_engine->RestoreDBFromLatestBackup(FLAGS_restore_dir, + FLAGS_restore_dir); + assert(s.ok()); + delete backup_engine; + } + #endif // ROCKSDB_LITE }; @@ -8369,6 +8445,14 @@ int db_bench_tool(int argc, char** argv) { FLAGS_db = default_db_path; } + if (FLAGS_backup_dir.empty()) { + FLAGS_backup_dir = FLAGS_db + "/backup"; + } + + if (FLAGS_restore_dir.empty()) { + FLAGS_restore_dir = FLAGS_db + "/restore"; + } + if (FLAGS_stats_interval_seconds > 0) { // When both are set then FLAGS_stats_interval determines the frequency // at which the timer is checked for FLAGS_stats_interval_seconds diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 26d1cb1d0..78696f817 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -2578,8 +2578,9 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header, const auto& fs = options.env->GetFileSystem(); FileOptions soptions(options); std::unique_ptr wal_file_reader; - Status status = SequentialFileReader::Create(fs, wal_file, soptions, - &wal_file_reader, nullptr); + Status status = SequentialFileReader::Create( + fs, wal_file, soptions, &wal_file_reader, nullptr /* dbg */, + nullptr /* rate_limiter */); if (!status.ok()) { if (exec_state) { *exec_state = LDBCommandExecuteResult::Failed("Failed to open WAL file " + diff --git a/tools/trace_analyzer_test.cc b/tools/trace_analyzer_test.cc index c057c5ab2..619a3468a 100644 --- a/tools/trace_analyzer_test.cc +++ b/tools/trace_analyzer_test.cc @@ -160,7 +160,8 @@ class TraceAnalyzerTest : public testing::Test { std::vector result; std::string line; - while (lf_reader.ReadLine(&line)) { + while ( + lf_reader.ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { result.push_back(line); } diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index 6423352cd..578f14bfd 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -1054,7 +1054,8 @@ Status TraceAnalyzer::ReProcessing() { LineFileReader lf_reader( std::move(file), whole_key_path, kTraceFileReadaheadSize /* filereadahead_size */); - for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine(&get_key); + for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine( + &get_key, Env::IO_TOTAL /* rate_limiter_priority */); ++cfs_[cf_id].w_count) { input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key); for (int type = 0; type < kTaTypeNum; type++) { diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 5e222c5ed..bf9c09244 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -597,7 +597,8 @@ class ReadaheadSequentialFileTest : public testing::Test, ReadaheadSequentialFileTest() {} std::string Read(size_t n) { Slice result; - Status s = test_read_holder_->Read(n, &result, scratch_.get()); + Status s = test_read_holder_->Read( + n, &result, scratch_.get(), Env::IO_TOTAL /* rate_limiter_priority*/); EXPECT_TRUE(s.ok() || s.IsInvalidArgument()); return std::string(result.data(), result.size()); } @@ -724,10 +725,11 @@ TEST(LineFileReaderTest, LineFileReaderTest) { { std::unique_ptr reader; ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader, - nullptr)); + nullptr /* dbg */, + nullptr /* rate_limiter */)); std::string line; int count = 0; - while (reader->ReadLine(&line)) { + while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { ASSERT_EQ(line, GenerateLine(count)); ++count; ASSERT_EQ(static_cast(reader->GetLineNumber()), count); @@ -736,7 +738,8 @@ TEST(LineFileReaderTest, LineFileReaderTest) { ASSERT_EQ(count, nlines); ASSERT_EQ(static_cast(reader->GetLineNumber()), count); // And still - ASSERT_FALSE(reader->ReadLine(&line)); + ASSERT_FALSE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); ASSERT_OK(reader->GetStatus()); ASSERT_EQ(static_cast(reader->GetLineNumber()), count); } @@ -745,12 +748,14 @@ TEST(LineFileReaderTest, LineFileReaderTest) { { std::unique_ptr reader; ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader, - nullptr)); + nullptr /* dbg */, + nullptr /* rate_limiter */)); std::string line; int count = 0; // Read part way through the file while (count < nlines / 4) { - ASSERT_TRUE(reader->ReadLine(&line)); + ASSERT_TRUE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); ASSERT_EQ(line, GenerateLine(count)); ++count; ASSERT_EQ(static_cast(reader->GetLineNumber()), count); @@ -767,7 +772,7 @@ TEST(LineFileReaderTest, LineFileReaderTest) { }); SyncPoint::GetInstance()->EnableProcessing(); - while (reader->ReadLine(&line)) { + while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { ASSERT_EQ(line, GenerateLine(count)); ++count; ASSERT_EQ(static_cast(reader->GetLineNumber()), count); @@ -777,7 +782,8 @@ TEST(LineFileReaderTest, LineFileReaderTest) { ASSERT_EQ(callback_count, 1); // Still get error & no retry - ASSERT_FALSE(reader->ReadLine(&line)); + ASSERT_FALSE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); ASSERT_TRUE(reader->GetStatus().IsCorruption()); ASSERT_EQ(callback_count, 1); diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index 9771e2cb0..2681a65ca 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -1922,7 +1922,7 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup( GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */, file_info->temp, "" /* contents */, backup_env_, db_env_, EnvOptions() /* src_env_options */, options_.sync, - options_.restore_rate_limiter.get(), 0 /* size_limit */, + options_.restore_rate_limiter.get(), file_info->size, nullptr /* stats */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file, dst, @@ -2104,7 +2104,8 @@ IOStatus BackupEngineImpl::CopyOrCreateFile( // Return back current temperature in FileSystem *src_temperature = src_file->GetTemperature(); - src_reader.reset(new SequentialFileReader(std::move(src_file), src)); + src_reader.reset(new SequentialFileReader( + std::move(src_file), src, nullptr /* io_tracer */, {}, rate_limiter)); buf.reset(new char[buf_size]); } @@ -2116,11 +2117,8 @@ IOStatus BackupEngineImpl::CopyOrCreateFile( if (!src.empty()) { size_t buffer_to_read = (buf_size < size_limit) ? buf_size : static_cast(size_limit); - io_s = src_reader->Read(buffer_to_read, &data, buf.get()); - if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); - } + io_s = src_reader->Read(buffer_to_read, &data, buf.get(), + Env::IO_LOW /* rate_limiter_priority */); *bytes_toward_next_callback += data.size(); } else { data = contents; @@ -2421,16 +2419,14 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum( std::unique_ptr src_reader; auto file_options = FileOptions(src_env_options); file_options.temperature = src_temperature; - IOStatus io_s = SequentialFileReader::Create(src_fs, src, file_options, - &src_reader, nullptr); + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + IOStatus io_s = SequentialFileReader::Create( + src_fs, src, file_options, &src_reader, nullptr /* dbg */, rate_limiter); if (!io_s.ok()) { return io_s; } - RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); - size_t buf_size = - rate_limiter ? static_cast(rate_limiter->GetSingleBurstBytes()) - : kDefaultCopyFileBufferSize; + size_t buf_size = kDefaultCopyFileBufferSize; std::unique_ptr buf(new char[buf_size]); Slice data; @@ -2440,11 +2436,8 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum( } size_t buffer_to_read = (buf_size < size_limit) ? buf_size : static_cast(size_limit); - io_s = src_reader->Read(buffer_to_read, &data, buf.get()); - if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); - } + io_s = src_reader->Read(buffer_to_read, &data, buf.get(), + Env::IO_LOW /* rate_limiter_priority */); if (!io_s.ok()) { return io_s; } @@ -2847,7 +2840,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( std::unique_ptr backup_meta_reader; { IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(), - &backup_meta_reader, nullptr); + &backup_meta_reader, + nullptr /* dbg */, rate_limiter); if (!io_s.ok()) { return io_s; } @@ -2859,12 +2853,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( // Failures handled at the end std::string line; - if (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + if (backup_meta_reader->ReadLine(&line, + Env::IO_LOW /* rate_limiter_priority */)) { if (StartsWith(line, kSchemaVersionPrefix)) { std::string ver = line.substr(kSchemaVersionPrefix.size()); if (ver == "2" || StartsWith(ver, "2.")) { @@ -2880,29 +2870,17 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( } if (!line.empty()) { timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); - } else if (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + } else if (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } - if (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + if (backup_meta_reader->ReadLine(&line, + Env::IO_LOW /* rate_limiter_priority */)) { sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } uint32_t num_files = UINT32_MAX; - while (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); } @@ -2941,12 +2919,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( } std::vector> files; bool footer_present = false; - while (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { std::vector components = StringSplit(line, ' '); if (components.size() < 1) { @@ -3046,12 +3020,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( if (footer_present) { assert(schema_major_version >= 2); - while (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); }