From 8515bd50c9cc01fe970ba5ef7c63561465c43549 Mon Sep 17 00:00:00 2001 From: Changyu Bi Date: Tue, 24 May 2022 10:28:57 -0700 Subject: [PATCH] Support read rate-limiting in SequentialFileReader (#9973) Summary: Added rate limiter and read rate-limiting support to SequentialFileReader. I've updated call sites to SequentialFileReader::Read with appropriate IO priority (or left a TODO and specified IO_TOTAL for now). The PR is separated into four commits: the first one added the rate-limiting support, but with some fixes in the unit test since the number of request bytes from rate limiter in SequentialFileReader are not accurate (there is overcharge at EOF). The second commit fixed this by allowing SequentialFileReader to check file size and determine how many bytes are left in the file to read. The third commit added benchmark related code. The fourth commit moved the logic of using file size to avoid overcharging the rate limiter into backup engine (the main user of SequentialFileReader). Pull Request resolved: https://github.com/facebook/rocksdb/pull/9973 Test Plan: - `make check`, backup_engine_test covers usage of SequentialFileReader with rate limiter. - Run db_bench to check if rate limiting is throttling as expected: Verified that reads and writes are together throttled at 2MB/s, and at 0.2MB chunks that are 100ms apart. - Set up: `./db_bench --benchmarks=fillrandom -db=/dev/shm/test_rocksdb` - Benchmark: ``` strace -ttfe read,write ./db_bench --benchmarks=backup -db=/dev/shm/test_rocksdb --backup_rate_limit=2097152 --use_existing_db strace -ttfe read,write ./db_bench --benchmarks=restore -db=/dev/shm/test_rocksdb --restore_rate_limit=2097152 --use_existing_db ``` - db bench on backup and restore to ensure no performance regression. - backup (avg over 50 runs): pre-change: 1.90443e+06 micros/op; post-change: 1.8993e+06 micros/op (improve by 0.2%) - restore (avg over 50 runs): pre-change: 1.79105e+06 micros/op; post-change: 1.78192e+06 micros/op (improve by 0.5%) ``` # Set up ./db_bench --benchmarks=fillrandom -db=/tmp/test_rocksdb -num=10000000 # benchmark TEST_TMPDIR=/tmp/test_rocksdb NUM_RUN=50 for ((j=0;j<$NUM_RUN;j++)) do ./db_bench -db=$TEST_TMPDIR -num=10000000 -benchmarks=backup -use_existing_db | egrep 'backup' # Restore #./db_bench -db=$TEST_TMPDIR -num=10000000 -benchmarks=restore -use_existing_db done > rate_limit.txt && awk -v NUM_RUN=$NUM_RUN '{sum+=$3;sum_sqrt+=$3^2}END{print sum/NUM_RUN, sqrt(sum_sqrt/NUM_RUN-(sum/NUM_RUN)^2)}' rate_limit.txt >> rate_limit_2.txt ``` Reviewed By: hx235 Differential Revision: D36327418 Pulled By: cbi42 fbshipit-source-id: e75d4307cff815945482df5ba630c1e88d064691 --- db/log_reader.cc | 26 ++++++-- db/repair.cc | 2 +- db/transaction_log_impl.cc | 5 +- env/io_posix.h | 3 +- file/file_util.cc | 5 +- file/line_file_reader.cc | 13 ++-- file/line_file_reader.h | 5 +- file/random_access_file_reader.cc | 2 +- file/sequence_file_reader.cc | 100 ++++++++++++++++++++---------- file/sequence_file_reader.h | 26 ++++++-- options/options_parser.cc | 2 +- tools/db_bench_tool.cc | 88 +++++++++++++++++++++++++- tools/ldb_cmd.cc | 5 +- tools/trace_analyzer_test.cc | 3 +- tools/trace_analyzer_tool.cc | 3 +- util/file_reader_writer_test.cc | 22 ++++--- utilities/backup/backup_engine.cc | 80 ++++++++---------------- 17 files changed, 265 insertions(+), 125 deletions(-) diff --git a/db/log_reader.cc b/db/log_reader.cc index fbf50d7c1..0c1852e82 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -303,8 +303,14 @@ void Reader::UnmarkEOFInternal() { } Slice read_buffer; - Status status = file_->Read(remaining, &read_buffer, - backing_store_ + eof_offset_); + // TODO: rate limit log reader with approriate priority. + // TODO: avoid overcharging rate limiter: + // Note that the Read here might overcharge SequentialFileReader's internal + // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough + // content left until EOF to read. + Status status = + file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_, + Env::IO_TOTAL /* rate_limiter_priority */); size_t added = read_buffer.size(); end_of_buffer_offset_ += added; @@ -349,7 +355,13 @@ bool Reader::ReadMore(size_t* drop_size, int *error) { if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); - Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + // TODO: rate limit log reader with approriate priority. + // TODO: avoid overcharging rate limiter: + // Note that the Read here might overcharge SequentialFileReader's internal + // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough + // content left until EOF to read. + Status status = file_->Read(kBlockSize, &buffer_, backing_store_, + Env::IO_TOTAL /* rate_limiter_priority */); TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status); end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { @@ -639,7 +651,13 @@ bool FragmentBufferedReader::TryReadMore(size_t* drop_size, int* error) { if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); - Status status = file_->Read(kBlockSize, &buffer_, backing_store_); + // TODO: rate limit log reader with approriate priority. + // TODO: avoid overcharging rate limiter: + // Note that the Read here might overcharge SequentialFileReader's internal + // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough + // content left until EOF to read. + Status status = file_->Read(kBlockSize, &buffer_, backing_store_, + Env::IO_TOTAL /* rate_limiter_priority */); end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { buffer_.clear(); diff --git a/db/repair.cc b/db/repair.cc index 4515be84b..f0beb3ff4 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -358,7 +358,7 @@ class Repairer { std::unique_ptr lfile_reader; Status status = SequentialFileReader::Create( fs, logname, fs->OptimizeForLogRead(file_options_), &lfile_reader, - nullptr); + nullptr /* dbg */, nullptr /* rate limiter */); if (!status.ok()) { return status; } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index 15cd8a28d..044adc2c5 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -67,8 +67,9 @@ Status TransactionLogIteratorImpl::OpenLogFile( } } if (s.ok()) { - file_reader->reset(new SequentialFileReader( - std::move(file), fname, io_tracer_, options_->listeners)); + file_reader->reset(new SequentialFileReader(std::move(file), fname, + io_tracer_, options_->listeners, + options_->rate_limiter.get())); } return s; } diff --git a/env/io_posix.h b/env/io_posix.h index 0ff787a70..1aacd75a3 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -187,8 +187,7 @@ class PosixSequentialFile : public FSSequentialFile { public: PosixSequentialFile(const std::string& fname, FILE* file, int fd, - size_t logical_block_size, - const EnvOptions& options); + size_t logical_block_size, const EnvOptions& options); virtual ~PosixSequentialFile(); virtual IOStatus Read(size_t n, const IOOptions& opts, Slice* result, diff --git a/file/file_util.cc b/file/file_util.cc index 011f12455..d7858f3c8 100644 --- a/file/file_util.cc +++ b/file/file_util.cc @@ -49,7 +49,10 @@ IOStatus CopyFile(FileSystem* fs, const std::string& source, Slice slice; while (size > 0) { size_t bytes_to_read = std::min(sizeof(buffer), static_cast(size)); - io_s = status_to_io_status(src_reader->Read(bytes_to_read, &slice, buffer)); + // TODO: rate limit copy file + io_s = status_to_io_status( + src_reader->Read(bytes_to_read, &slice, buffer, + Env::IO_TOTAL /* rate_limiter_priority */)); if (!io_s.ok()) { return io_s; } diff --git a/file/line_file_reader.cc b/file/line_file_reader.cc index af00d7780..50c415dc6 100644 --- a/file/line_file_reader.cc +++ b/file/line_file_reader.cc @@ -15,16 +15,20 @@ IOStatus LineFileReader::Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, - IODebugContext* dbg) { + IODebugContext* dbg, + RateLimiter* rate_limiter) { std::unique_ptr file; IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg); if (io_s.ok()) { - reader->reset(new LineFileReader(std::move(file), fname)); + reader->reset(new LineFileReader( + std::move(file), fname, nullptr, + std::vector>{}, rate_limiter)); } return io_s; } -bool LineFileReader::ReadLine(std::string* out) { +bool LineFileReader::ReadLine(std::string* out, + Env::IOPriority rate_limiter_priority) { assert(out); if (!io_status_.ok()) { // Status should be checked (or permit unchecked) any time we return false. @@ -50,7 +54,8 @@ bool LineFileReader::ReadLine(std::string* out) { // else flush and reload buffer out->append(buf_begin_, buf_end_ - buf_begin_); Slice result; - io_status_ = sfr_.Read(buf_.size(), &result, buf_.data()); + io_status_ = + sfr_.Read(buf_.size(), &result, buf_.data(), rate_limiter_priority); IOSTATS_ADD(bytes_read, result.size()); if (!io_status_.ok()) { io_status_.MustCheck(); diff --git a/file/line_file_reader.h b/file/line_file_reader.h index 4b4a9d564..cc302d311 100644 --- a/file/line_file_reader.h +++ b/file/line_file_reader.h @@ -32,7 +32,7 @@ class LineFileReader { static IOStatus Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, - IODebugContext* dbg); + IODebugContext* dbg, RateLimiter* rate_limiter); LineFileReader(const LineFileReader&) = delete; LineFileReader& operator=(const LineFileReader&) = delete; @@ -41,7 +41,8 @@ class LineFileReader { // the line to `out`, without delimiter, or returning false on failure. You // must check GetStatus() to determine whether the failure was just // end-of-file (OK status) or an I/O error (another status). - bool ReadLine(std::string* out); + // The internal rate limiter will be charged at the specified priority. + bool ReadLine(std::string* out, Env::IOPriority rate_limiter_priority); // Returns the number of the line most recently returned from ReadLine. // Return value is unspecified if ReadLine has returned false due to diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc index a919b6298..44580e4aa 100644 --- a/file/random_access_file_reader.cc +++ b/file/random_access_file_reader.cc @@ -283,7 +283,7 @@ IOStatus RandomAccessFileReader::MultiRead( #endif // !NDEBUG // To be paranoid modify scratch a little bit, so in case underlying - // FileSystem doesn't fill the buffer but return succee and `scratch` returns + // FileSystem doesn't fill the buffer but return success and `scratch` returns // contains a previous block, returned value will not pass checksum. // This byte might not change anything for direct I/O case, but it's OK. for (size_t i = 0; i < num_reqs; i++) { diff --git a/file/sequence_file_reader.cc b/file/sequence_file_reader.cc index e8591589c..d51d5be46 100644 --- a/file/sequence_file_reader.cc +++ b/file/sequence_file_reader.cc @@ -25,16 +25,18 @@ namespace ROCKSDB_NAMESPACE { IOStatus SequentialFileReader::Create( const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, - IODebugContext* dbg) { + IODebugContext* dbg, RateLimiter* rate_limiter) { std::unique_ptr file; IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg); if (io_s.ok()) { - reader->reset(new SequentialFileReader(std::move(file), fname)); + reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {}, + rate_limiter)); } return io_s; } -IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { +IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch, + Env::IOPriority rate_limiter_priority) { IOStatus io_s; if (use_direct_io()) { #ifndef ROCKSDB_LITE @@ -55,30 +57,48 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { buf.Alignment(alignment); buf.AllocateNewBuffer(size); - Slice tmp; - uint64_t orig_offset = 0; - FileOperationInfo::StartTimePoint start_ts; - if (ShouldNotifyListeners()) { - orig_offset = aligned_offset + buf.CurrentSize(); - start_ts = FileOperationInfo::StartNow(); + while (buf.CurrentSize() < size) { + size_t allowed; + if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + buf.Capacity() - buf.CurrentSize(), buf.Alignment(), + rate_limiter_priority, nullptr /* stats */, + RateLimiter::OpType::kRead); + } else { + assert(buf.CurrentSize() == 0); + allowed = size; + } + + Slice tmp; + uint64_t orig_offset = 0; + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + orig_offset = aligned_offset + buf.CurrentSize(); + start_ts = FileOperationInfo::StartNow(); + } + io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed, + IOOptions(), &tmp, buf.Destination(), + nullptr /* dbg */); + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, + io_s); + } + buf.Size(buf.CurrentSize() + tmp.size()); + if (!io_s.ok() || tmp.size() < allowed) { + break; + } } - io_s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp, - buf.BufferStart(), nullptr); - if (io_s.ok() && offset_advance < tmp.size()) { - buf.Size(tmp.size()); + + if (io_s.ok() && offset_advance < buf.CurrentSize()) { r = buf.Read(scratch, offset_advance, - std::min(tmp.size() - offset_advance, n)); + std::min(buf.CurrentSize() - offset_advance, n)); } *result = Slice(scratch, r); - if (ShouldNotifyListeners()) { - auto finish_ts = FileOperationInfo::FinishNow(); - NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, - io_s); - } #endif // !ROCKSDB_LITE } else { // To be paranoid, modify scratch a little bit, so in case underlying - // FileSystem doesn't fill the buffer but return succee and `scratch` + // FileSystem doesn't fill the buffer but return success and `scratch` // returns contains a previous block, returned value will not pass // checksum. // It's hard to find useful byte for direct I/O case, so we skip it. @@ -86,22 +106,38 @@ IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch) { scratch[0]++; } + size_t read = 0; + while (read < n) { + size_t allowed; + if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { + allowed = rate_limiter_->RequestToken( + n - read, 0 /* alignment */, rate_limiter_priority, + nullptr /* stats */, RateLimiter::OpType::kRead); + } else { + allowed = n; + } #ifndef ROCKSDB_LITE - FileOperationInfo::StartTimePoint start_ts; - if (ShouldNotifyListeners()) { - start_ts = FileOperationInfo::StartNow(); - } + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = FileOperationInfo::StartNow(); + } #endif - - io_s = file_->Read(n, IOOptions(), result, scratch, nullptr); - + Slice tmp; + io_s = file_->Read(allowed, IOOptions(), &tmp, scratch + read, + nullptr /* dbg */); #ifndef ROCKSDB_LITE - if (ShouldNotifyListeners()) { - auto finish_ts = FileOperationInfo::FinishNow(); - size_t offset = offset_.fetch_add(result->size()); - NotifyOnFileReadFinish(offset, result->size(), start_ts, finish_ts, io_s); - } + if (ShouldNotifyListeners()) { + auto finish_ts = FileOperationInfo::FinishNow(); + size_t offset = offset_.fetch_add(tmp.size()); + NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s); + } #endif + read += tmp.size(); + if (!io_s.ok() || tmp.size() < allowed) { + break; + } + } + *result = Slice(scratch, read); } IOSTATS_ADD(bytes_read, result->size()); return io_s; diff --git a/file/sequence_file_reader.h b/file/sequence_file_reader.h index 00051c080..baea10eb7 100644 --- a/file/sequence_file_reader.h +++ b/file/sequence_file_reader.h @@ -57,15 +57,19 @@ class SequentialFileReader { FSSequentialFilePtr file_; std::atomic offset_{0}; // read offset std::vector> listeners_{}; + RateLimiter* rate_limiter_; public: explicit SequentialFileReader( std::unique_ptr&& _file, const std::string& _file_name, const std::shared_ptr& io_tracer = nullptr, - const std::vector>& listeners = {}) + const std::vector>& listeners = {}, + RateLimiter* rate_limiter = + nullptr) // TODO: migrate call sites to provide rate limiter : file_name_(_file_name), file_(std::move(_file), io_tracer, _file_name), - listeners_() { + listeners_(), + rate_limiter_(rate_limiter) { #ifndef ROCKSDB_LITE AddFileIOListeners(listeners); #else @@ -77,11 +81,14 @@ class SequentialFileReader { std::unique_ptr&& _file, const std::string& _file_name, size_t _readahead_size, const std::shared_ptr& io_tracer = nullptr, - const std::vector>& listeners = {}) + const std::vector>& listeners = {}, + RateLimiter* rate_limiter = + nullptr) // TODO: migrate call sites to provide rate limiter : file_name_(_file_name), file_(NewReadaheadSequentialFile(std::move(_file), _readahead_size), io_tracer, _file_name), - listeners_() { + listeners_(), + rate_limiter_(rate_limiter) { #ifndef ROCKSDB_LITE AddFileIOListeners(listeners); #else @@ -91,12 +98,19 @@ class SequentialFileReader { static IOStatus Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, std::unique_ptr* reader, - IODebugContext* dbg); + IODebugContext* dbg, RateLimiter* rate_limiter); SequentialFileReader(const SequentialFileReader&) = delete; SequentialFileReader& operator=(const SequentialFileReader&) = delete; - IOStatus Read(size_t n, Slice* result, char* scratch); + // `rate_limiter_priority` is used to charge the internal rate limiter when + // enabled. The special value `Env::IO_TOTAL` makes this operation bypass the + // rate limiter. The amount charged to the internal rate limiter is n, even + // when less than n bytes are actually read (e.g. at end of file). To avoid + // overcharging the rate limiter, the caller can use file size to cap n to + // read until end of file. + IOStatus Read(size_t n, Slice* result, char* scratch, + Env::IOPriority rate_limiter_priority); IOStatus Skip(uint64_t n); diff --git a/options/options_parser.cc b/options/options_parser.cc index 73ccbdc4e..40ae47487 100644 --- a/options/options_parser.cc +++ b/options/options_parser.cc @@ -271,7 +271,7 @@ Status RocksDBOptionsParser::Parse(const ConfigOptions& config_options_in, std::unordered_map opt_map; std::string line; // we only support single-lined statement. - while (lf_reader.ReadLine(&line)) { + while (lf_reader.ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { int line_num = static_cast(lf_reader.GetLineNumber()); line = TrimAndRemoveComment(line); if (line.empty()) { diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index ef4203fbf..f59275998 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -60,6 +60,7 @@ #include "rocksdb/slice_transform.h" #include "rocksdb/stats_history.h" #include "rocksdb/table.h" +#include "rocksdb/utilities/backup_engine.h" #include "rocksdb/utilities/object_registry.h" #include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/options_type.h" @@ -159,8 +160,10 @@ IF_ROCKSDB_LITE("", "randomtransaction," "randomreplacekeys," "timeseries," - "getmergeoperands", + "getmergeoperands,", "readrandomoperands," + "backup," + "restore" "Comma-separated list of operations to run in the specified" " order. Available benchmarks:\n" @@ -250,7 +253,10 @@ IF_ROCKSDB_LITE("", "\treadrandomoperands -- read random keys using `GetMergeOperands()`. An " "operation includes a rare but possible retry in case it got " "`Status::Incomplete()`. This happens upon encountering more keys than " - "have ever been seen by the thread (or eight initially)\n"); + "have ever been seen by the thread (or eight initially)\n" + "\tbackup -- Create a backup of the current DB and verify that a new backup is corrected. " + "Rate limit can be specified through --backup_rate_limit\n" + "\trestore -- Restore the DB from the latest backup available, rate limit can be specified through --restore_rate_limit\n"); DEFINE_int64(num, 1000000, "Number of key/values to place in database"); @@ -1146,6 +1152,22 @@ DEFINE_bool(charge_table_reader, false, "CacheEntryRoleOptions::charged of" "CacheEntryRole::kBlockBasedTableReader"); +DEFINE_uint64(backup_rate_limit, 0ull, + "If non-zero, db_bench will rate limit reads and writes for DB " + "backup. This " + "is the global rate in ops/second."); + +DEFINE_uint64(restore_rate_limit, 0ull, + "If non-zero, db_bench will rate limit reads and writes for DB " + "restore. This " + "is the global rate in ops/second."); + +DEFINE_string(backup_dir, "", + "If not empty string, use the given dir for backup."); + +DEFINE_string(restore_dir, "", + "If not empty string, use the given dir for restore."); + static enum ROCKSDB_NAMESPACE::CompressionType StringToCompressionType( const char* ctype) { assert(ctype); @@ -3512,6 +3534,12 @@ class Benchmark { } else if (name == "readrandomoperands") { read_operands_ = true; method = &Benchmark::ReadRandom; +#ifndef ROCKSDB_LITE + } else if (name == "backup") { + method = &Benchmark::Backup; + } else if (name == "restore") { + method = &Benchmark::Restore; +#endif } else if (!name.empty()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.c_str()); ErrorExit(); @@ -3544,6 +3572,13 @@ class Benchmark { fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); #ifndef ROCKSDB_LITE + if (name == "backup") { + std::cout << "Backup path: [" << FLAGS_backup_dir << "]" << std::endl; + } else if (name == "restore") { + std::cout << "Backup path: [" << FLAGS_backup_dir << "]" << std::endl; + std::cout << "Restore path: [" << FLAGS_restore_dir << "]" + << std::endl; + } // A trace_file option can be provided both for trace and replay // operations. But db_bench does not support tracing and replaying at // the same time, for now. So, start tracing only when it is not a @@ -8224,6 +8259,47 @@ class Benchmark { } } + void Backup(ThreadState* thread) { + DB* db = SelectDB(thread); + std::unique_ptr engine_options( + new BackupEngineOptions(FLAGS_backup_dir)); + Status s; + BackupEngine* backup_engine; + if (FLAGS_backup_rate_limit > 0) { + engine_options->backup_rate_limiter.reset(NewGenericRateLimiter( + FLAGS_backup_rate_limit, 100000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo)); + } + // Build new backup of the entire DB + engine_options->destroy_old_data = true; + s = BackupEngine::Open(FLAGS_env, *engine_options, &backup_engine); + assert(s.ok()); + s = backup_engine->CreateNewBackup(db); + assert(s.ok()); + std::vector backup_info; + backup_engine->GetBackupInfo(&backup_info); + // Verify that a new backup is created + assert(backup_info.size() == 1); + } + + void Restore(ThreadState* /* thread */) { + std::unique_ptr engine_options( + new BackupEngineOptions(FLAGS_backup_dir)); + if (FLAGS_restore_rate_limit > 0) { + engine_options->restore_rate_limiter.reset(NewGenericRateLimiter( + FLAGS_restore_rate_limit, 100000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo)); + } + BackupEngineReadOnly* backup_engine; + Status s = + BackupEngineReadOnly::Open(FLAGS_env, *engine_options, &backup_engine); + assert(s.ok()); + s = backup_engine->RestoreDBFromLatestBackup(FLAGS_restore_dir, + FLAGS_restore_dir); + assert(s.ok()); + delete backup_engine; + } + #endif // ROCKSDB_LITE }; @@ -8369,6 +8445,14 @@ int db_bench_tool(int argc, char** argv) { FLAGS_db = default_db_path; } + if (FLAGS_backup_dir.empty()) { + FLAGS_backup_dir = FLAGS_db + "/backup"; + } + + if (FLAGS_restore_dir.empty()) { + FLAGS_restore_dir = FLAGS_db + "/restore"; + } + if (FLAGS_stats_interval_seconds > 0) { // When both are set then FLAGS_stats_interval determines the frequency // at which the timer is checked for FLAGS_stats_interval_seconds diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 26d1cb1d0..78696f817 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -2578,8 +2578,9 @@ void DumpWalFile(Options options, std::string wal_file, bool print_header, const auto& fs = options.env->GetFileSystem(); FileOptions soptions(options); std::unique_ptr wal_file_reader; - Status status = SequentialFileReader::Create(fs, wal_file, soptions, - &wal_file_reader, nullptr); + Status status = SequentialFileReader::Create( + fs, wal_file, soptions, &wal_file_reader, nullptr /* dbg */, + nullptr /* rate_limiter */); if (!status.ok()) { if (exec_state) { *exec_state = LDBCommandExecuteResult::Failed("Failed to open WAL file " + diff --git a/tools/trace_analyzer_test.cc b/tools/trace_analyzer_test.cc index c057c5ab2..619a3468a 100644 --- a/tools/trace_analyzer_test.cc +++ b/tools/trace_analyzer_test.cc @@ -160,7 +160,8 @@ class TraceAnalyzerTest : public testing::Test { std::vector result; std::string line; - while (lf_reader.ReadLine(&line)) { + while ( + lf_reader.ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { result.push_back(line); } diff --git a/tools/trace_analyzer_tool.cc b/tools/trace_analyzer_tool.cc index 6423352cd..578f14bfd 100644 --- a/tools/trace_analyzer_tool.cc +++ b/tools/trace_analyzer_tool.cc @@ -1054,7 +1054,8 @@ Status TraceAnalyzer::ReProcessing() { LineFileReader lf_reader( std::move(file), whole_key_path, kTraceFileReadaheadSize /* filereadahead_size */); - for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine(&get_key); + for (cfs_[cf_id].w_count = 0; lf_reader.ReadLine( + &get_key, Env::IO_TOTAL /* rate_limiter_priority */); ++cfs_[cf_id].w_count) { input_key = ROCKSDB_NAMESPACE::LDBCommand::HexToString(get_key); for (int type = 0; type < kTaTypeNum; type++) { diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 5e222c5ed..bf9c09244 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -597,7 +597,8 @@ class ReadaheadSequentialFileTest : public testing::Test, ReadaheadSequentialFileTest() {} std::string Read(size_t n) { Slice result; - Status s = test_read_holder_->Read(n, &result, scratch_.get()); + Status s = test_read_holder_->Read( + n, &result, scratch_.get(), Env::IO_TOTAL /* rate_limiter_priority*/); EXPECT_TRUE(s.ok() || s.IsInvalidArgument()); return std::string(result.data(), result.size()); } @@ -724,10 +725,11 @@ TEST(LineFileReaderTest, LineFileReaderTest) { { std::unique_ptr reader; ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader, - nullptr)); + nullptr /* dbg */, + nullptr /* rate_limiter */)); std::string line; int count = 0; - while (reader->ReadLine(&line)) { + while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { ASSERT_EQ(line, GenerateLine(count)); ++count; ASSERT_EQ(static_cast(reader->GetLineNumber()), count); @@ -736,7 +738,8 @@ TEST(LineFileReaderTest, LineFileReaderTest) { ASSERT_EQ(count, nlines); ASSERT_EQ(static_cast(reader->GetLineNumber()), count); // And still - ASSERT_FALSE(reader->ReadLine(&line)); + ASSERT_FALSE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); ASSERT_OK(reader->GetStatus()); ASSERT_EQ(static_cast(reader->GetLineNumber()), count); } @@ -745,12 +748,14 @@ TEST(LineFileReaderTest, LineFileReaderTest) { { std::unique_ptr reader; ASSERT_OK(LineFileReader::Create(fs, "testfile", FileOptions(), &reader, - nullptr)); + nullptr /* dbg */, + nullptr /* rate_limiter */)); std::string line; int count = 0; // Read part way through the file while (count < nlines / 4) { - ASSERT_TRUE(reader->ReadLine(&line)); + ASSERT_TRUE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); ASSERT_EQ(line, GenerateLine(count)); ++count; ASSERT_EQ(static_cast(reader->GetLineNumber()), count); @@ -767,7 +772,7 @@ TEST(LineFileReaderTest, LineFileReaderTest) { }); SyncPoint::GetInstance()->EnableProcessing(); - while (reader->ReadLine(&line)) { + while (reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)) { ASSERT_EQ(line, GenerateLine(count)); ++count; ASSERT_EQ(static_cast(reader->GetLineNumber()), count); @@ -777,7 +782,8 @@ TEST(LineFileReaderTest, LineFileReaderTest) { ASSERT_EQ(callback_count, 1); // Still get error & no retry - ASSERT_FALSE(reader->ReadLine(&line)); + ASSERT_FALSE( + reader->ReadLine(&line, Env::IO_TOTAL /* rate_limiter_priority */)); ASSERT_TRUE(reader->GetStatus().IsCorruption()); ASSERT_EQ(callback_count, 1); diff --git a/utilities/backup/backup_engine.cc b/utilities/backup/backup_engine.cc index 9771e2cb0..2681a65ca 100644 --- a/utilities/backup/backup_engine.cc +++ b/utilities/backup/backup_engine.cc @@ -1922,7 +1922,7 @@ IOStatus BackupEngineImpl::RestoreDBFromBackup( GetAbsolutePath(file), dst, Temperature::kUnknown /* src_temp */, file_info->temp, "" /* contents */, backup_env_, db_env_, EnvOptions() /* src_env_options */, options_.sync, - options_.restore_rate_limiter.get(), 0 /* size_limit */, + options_.restore_rate_limiter.get(), file_info->size, nullptr /* stats */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file, dst, @@ -2104,7 +2104,8 @@ IOStatus BackupEngineImpl::CopyOrCreateFile( // Return back current temperature in FileSystem *src_temperature = src_file->GetTemperature(); - src_reader.reset(new SequentialFileReader(std::move(src_file), src)); + src_reader.reset(new SequentialFileReader( + std::move(src_file), src, nullptr /* io_tracer */, {}, rate_limiter)); buf.reset(new char[buf_size]); } @@ -2116,11 +2117,8 @@ IOStatus BackupEngineImpl::CopyOrCreateFile( if (!src.empty()) { size_t buffer_to_read = (buf_size < size_limit) ? buf_size : static_cast(size_limit); - io_s = src_reader->Read(buffer_to_read, &data, buf.get()); - if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); - } + io_s = src_reader->Read(buffer_to_read, &data, buf.get(), + Env::IO_LOW /* rate_limiter_priority */); *bytes_toward_next_callback += data.size(); } else { data = contents; @@ -2421,16 +2419,14 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum( std::unique_ptr src_reader; auto file_options = FileOptions(src_env_options); file_options.temperature = src_temperature; - IOStatus io_s = SequentialFileReader::Create(src_fs, src, file_options, - &src_reader, nullptr); + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + IOStatus io_s = SequentialFileReader::Create( + src_fs, src, file_options, &src_reader, nullptr /* dbg */, rate_limiter); if (!io_s.ok()) { return io_s; } - RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); - size_t buf_size = - rate_limiter ? static_cast(rate_limiter->GetSingleBurstBytes()) - : kDefaultCopyFileBufferSize; + size_t buf_size = kDefaultCopyFileBufferSize; std::unique_ptr buf(new char[buf_size]); Slice data; @@ -2440,11 +2436,8 @@ IOStatus BackupEngineImpl::ReadFileAndComputeChecksum( } size_t buffer_to_read = (buf_size < size_limit) ? buf_size : static_cast(size_limit); - io_s = src_reader->Read(buffer_to_read, &data, buf.get()); - if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); - } + io_s = src_reader->Read(buffer_to_read, &data, buf.get(), + Env::IO_LOW /* rate_limiter_priority */); if (!io_s.ok()) { return io_s; } @@ -2847,7 +2840,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( std::unique_ptr backup_meta_reader; { IOStatus io_s = LineFileReader::Create(fs_, meta_filename_, FileOptions(), - &backup_meta_reader, nullptr); + &backup_meta_reader, + nullptr /* dbg */, rate_limiter); if (!io_s.ok()) { return io_s; } @@ -2859,12 +2853,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( // Failures handled at the end std::string line; - if (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + if (backup_meta_reader->ReadLine(&line, + Env::IO_LOW /* rate_limiter_priority */)) { if (StartsWith(line, kSchemaVersionPrefix)) { std::string ver = line.substr(kSchemaVersionPrefix.size()); if (ver == "2" || StartsWith(ver, "2.")) { @@ -2880,29 +2870,17 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( } if (!line.empty()) { timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); - } else if (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + } else if (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } - if (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + if (backup_meta_reader->ReadLine(&line, + Env::IO_LOW /* rate_limiter_priority */)) { sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } uint32_t num_files = UINT32_MAX; - while (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); } @@ -2941,12 +2919,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( } std::vector> files; bool footer_present = false; - while (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { std::vector components = StringSplit(line, ' '); if (components.size() < 1) { @@ -3046,12 +3020,8 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( if (footer_present) { assert(schema_major_version >= 2); - while (backup_meta_reader->ReadLine(&line)) { - if (rate_limiter != nullptr) { - LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, - nullptr /* stats */, - RateLimiter::OpType::kRead); - } + while (backup_meta_reader->ReadLine( + &line, Env::IO_LOW /* rate_limiter_priority */)) { if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); }