diff --git a/HISTORY.md b/HISTORY.md index dc3f1b2b4..99c5b8b8d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,7 @@ ### Behavior Changes * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. * `TransactionUtil::CheckKeyForConflicts` can also perform conflict-checking based on user-defined timestamps in addition to sequence numbers. +* Removed `GenericRateLimiter`'s minimum refill bytes per period previously enforced. ### Public Interface Change * When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly. @@ -45,6 +46,7 @@ * Fixed a bug where stalled writes would remain stalled forever after the user calls `WriteBufferManager::SetBufferSize()` with `new_size == 0` to dynamically disable memory limiting. * Make `DB::close()` thread-safe. * Fix a bug in atomic flush where one bg flush thread will wait forever for a preceding bg flush thread to commit its result to MANIFEST but encounters an error which is mapped to a soft error (DB not stopped). +* Fix a bug in `BackupEngine` where some internal callers of `GenericRateLimiter::Request()` do not honor `bytes <= GetSingleBurstBytes()`. ### New Features * Print information about blob files when using "ldb list_live_files_metadata" diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index f7f199625..c59a05c54 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -302,8 +302,7 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( // inaccurate but is a number that is large enough. return port::kMaxInt64 / 1000000; } else { - return std::max(kMinRefillBytesPerPeriod, - rate_bytes_per_sec * refill_period_us_ / 1000000); + return rate_bytes_per_sec * refill_period_us_ / 1000000; } } diff --git a/util/rate_limiter.h b/util/rate_limiter.h index dbdb1eed2..3752bf88d 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -106,8 +106,6 @@ class GenericRateLimiter : public RateLimiter { // This mutex guard all internal states mutable port::Mutex request_mutex_; - const int64_t kMinRefillBytesPerPeriod = 100; - const int64_t refill_period_us_; int64_t rate_bytes_per_sec_; diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index e49525c5c..06c9ff3ec 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -35,7 +35,9 @@ #include "logging/logging.h" #include "monitoring/iostats_context_imp.h" #include "port/port.h" +#include "rocksdb/env.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb/statistics.h" #include "rocksdb/transaction_log.h" #include "table/sst_file_dumper.h" #include "test_util/sync_point.h" @@ -232,6 +234,12 @@ class BackupEngineImpl { } }; + static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request, + RateLimiter* rate_limiter, + const Env::IOPriority pri, + Statistics* stats, + const RateLimiter::OpType op_type); + static inline std::string WithoutTrailingSlash(const std::string& path) { if (path.empty() || path.back() != '/') { return path; @@ -2022,9 +2030,16 @@ IOStatus BackupEngineImpl::CopyOrCreateFile( checksum_value = crc32c::Extend(checksum_value, data.data(), data.size()); } io_s = dest_writer->Append(data); + if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kWrite); + if (!src.empty()) { + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } else { + LoopRateLimitRequestHelper(data.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kWrite); + } } while (*bytes_toward_next_callback >= options_.callback_trigger_interval_size) { @@ -2358,8 +2373,9 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, // sizeof(*table_properties) is a sufficent but far-from-exact // approximation of read bytes due to metaindex block, std::string // properties and varint compression - rate_limiter->Request(sizeof(*table_properties), Env::IO_LOW, - nullptr /* stats */, RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(sizeof(*table_properties), rate_limiter, + Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); } } } else { @@ -2388,6 +2404,22 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, } } +void BackupEngineImpl::LoopRateLimitRequestHelper( + const size_t total_bytes_to_request, RateLimiter* rate_limiter, + const Env::IOPriority pri, Statistics* stats, + const RateLimiter::OpType op_type) { + assert(rate_limiter != nullptr); + size_t remaining_bytes = total_bytes_to_request; + size_t request_bytes = 0; + while (remaining_bytes > 0) { + request_bytes = + std::min(static_cast(rate_limiter->GetSingleBurstBytes()), + remaining_bytes); + rate_limiter->Request(request_bytes, pri, stats, op_type); + remaining_bytes -= request_bytes; + } +} + void BackupEngineImpl::DeleteChildren(const std::string& dir, uint32_t file_type_filter) const { std::vector children; @@ -2716,8 +2748,9 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( std::string line; if (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } if (StartsWith(line, kSchemaVersionPrefix)) { std::string ver = line.substr(kSchemaVersionPrefix.size()); @@ -2736,23 +2769,26 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } else if (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } if (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } uint32_t num_files = UINT32_MAX; while (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); @@ -2794,8 +2830,9 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( bool footer_present = false; while (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } std::vector components = StringSplit(line, ' '); @@ -2885,8 +2922,9 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( assert(schema_major_version >= 2); while (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index dca39c348..0c2ff66b1 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -13,20 +13,25 @@ #include #include +#include #include #include #include +#include #include #include #include #include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" #include "env/env_chroot.h" #include "file/filename.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/env.h" #include "rocksdb/file_checksum.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb/statistics.h" #include "rocksdb/transaction_log.h" #include "rocksdb/types.h" #include "rocksdb/utilities/options_util.h" @@ -36,6 +41,7 @@ #include "util/cast_util.h" #include "util/mutexlock.h" #include "util/random.h" +#include "util/rate_limiter.h" #include "util/stderr_logger.h" #include "util/string_util.h" #include "utilities/backupable/backupable_db_impl.h" @@ -2803,6 +2809,119 @@ TEST_P(BackupEngineRateLimitingTestWithParam, CloseDBAndBackupEngine(); DestroyDB(dbname_, Options()); } + +class BackupEngineRateLimitingTestWithParam2 + : public BackupEngineTest, + public testing::WithParamInterface< + std::tuple /* limits */>> { + public: + BackupEngineRateLimitingTestWithParam2() {} +}; + +INSTANTIATE_TEST_CASE_P( + LowRefillBytesPerPeriod, BackupEngineRateLimitingTestWithParam2, + ::testing::Values(std::make_tuple(std::make_pair(1, 1)))); +// To verify we don't request over-sized bytes relative to +// refill_bytes_per_period_ in each RateLimiter::Request() called in +// BackupEngine through verifying we don't trigger assertion +// failure on over-sized request in GenericRateLimiter in debug builds +TEST_P(BackupEngineRateLimitingTestWithParam2, + RateLimitingWithLowRefillBytesPerPeriod) { + SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); + + backupable_options_->max_background_operations = 1; + const uint64_t backup_rate_limiter_limit = std::get<0>(GetParam()).first; + std::shared_ptr backup_rate_limiter( + std::make_shared( + backup_rate_limiter_limit, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */, + special_env.GetSystemClock(), false /* auto_tuned */)); + + backupable_options_->backup_rate_limiter = backup_rate_limiter; + + const uint64_t restore_rate_limiter_limit = std::get<0>(GetParam()).second; + std::shared_ptr restore_rate_limiter( + std::make_shared( + restore_rate_limiter_limit, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */, + special_env.GetSystemClock(), false /* auto_tuned */)); + + backupable_options_->restore_rate_limiter = restore_rate_limiter; + + // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the + // `Env` to advance its time according to the fake wait duration. The + // workaround is to install a callback that advance the `Env`'s mock time. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { + int64_t time_waited_us = *static_cast(arg); + special_env.SleepForMicroseconds(static_cast(time_waited_us)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyDB(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum /* shared_option */); + + FillDB(db_.get(), 0, 100); + int64_t total_bytes_through_before_backup = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + int64_t total_bytes_through_after_backup = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_backup, + total_bytes_through_before_backup); + + std::vector backup_infos; + BackupInfo backup_info; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + const int backup_id = 1; + ASSERT_EQ(backup_id, backup_infos[0].backup_id); + ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info, + true /* include_file_details */)); + int64_t total_bytes_through_before_verify_backup = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK( + backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */)); + int64_t total_bytes_through_after_verify_backup = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_verify_backup, + total_bytes_through_before_verify_backup); + + CloseDBAndBackupEngine(); + AssertBackupConsistency(backup_id, 0, 100, 101); + + int64_t total_bytes_through_before_initialize = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + OpenDBAndBackupEngine(false /* destroy_old_data */); + // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile, + // which is called in BackupEngineImpl::Initialize() during + // OpenBackupEngine(false) + int64_t total_bytes_through_after_initialize = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_initialize, + total_bytes_through_before_initialize); + CloseDBAndBackupEngine(); + + DestroyDB(dbname_, Options()); + OpenBackupEngine(false /* destroy_old_data */); + int64_t total_bytes_through_before_restore = + backupable_options_->restore_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + int64_t total_bytes_through_after_restore = + backupable_options_->restore_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_restore, + total_bytes_through_before_restore); + CloseBackupEngine(); + + DestroyDB(dbname_, Options()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostTimedWait"); +} + #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) TEST_F(BackupEngineTest, ReadOnlyBackupEngine) {