From cff7819dff824b6935c5385dcefde769529e31b6 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Tue, 16 Nov 2021 09:50:59 -0800 Subject: [PATCH] Fix BackupEngine's internal callers of GenericRateLimiter::Request() not honoring bytes <= GetSingleBurstBytes() (#9063) Summary: **Context:** Some existing internal calls of `GenericRateLimiter::Request()` in backupable_db.cc and newly added internal calls in https://github.com/facebook/rocksdb/pull/8722/ do not make sure `bytes <= GetSingleBurstBytes()` as required by rate_limiter https://github.com/facebook/rocksdb/blob/master/include/rocksdb/rate_limiter.h#L47. **Impacts of this bug include:** (1) In debug build, when `GenericRateLimiter::Request()` requests bytes greater than `GenericRateLimiter:: kMinRefillBytesPerPeriod = 100` byte, process will crash due to assertion failure. See https://github.com/facebook/rocksdb/pull/9063#discussion_r737034133 and for possible scenario (2) In production build, although there will not be the above crash due to disabled assertion, the bug can lead to a request of small bytes being blocked for a long time by a request of same priority with insanely large bytes from a different thread. See updated https://github.com/facebook/rocksdb/wiki/Rate-Limiter ("Notice that although....the maximum bytes that can be granted in a single request have to be bounded...") for more info. There is an on-going effort to move rate-limiting to file wrapper level so rate limiting in `BackupEngine` and this PR might be made obsolete in the future. **Summary:** - Implemented loop-calling `GenericRateLimiter::Request()` with `bytes <= GetSingleBurstBytes()` as a static private helper function `BackupEngineImpl::LoopRateLimitRequestHelper` -- Considering make this a util function in `RateLimiter` later or do something with `RateLimiter::RequestToken()` - Replaced buggy internal callers with this helper function wherever requested byte is not pre-limited by `GetSingleBurstBytes()` - Removed the minimum refill bytes per period enforced by `GenericRateLimiter` since it is useless and prevents testing `GenericRateLimiter` for extreme case with small refill bytes per period. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9063 Test Plan: - Added a new test that failed the assertion before this change and now passes - It exposed bugs in [the write during creation in `CopyOrCreateFile()`](https://github.com/facebook/rocksdb/blob/df7cc66e171dfa665e34d293717242784195e1da/utilities/backupable/backupable_db.cc#L2034-L2043), [the read of table properties in `GetFileDbIdentities()`](https://github.com/facebook/rocksdb/blob/df7cc66e171dfa665e34d293717242784195e1da/utilities/backupable/backupable_db.cc#L2372-L2378), [some read of metadata in `BackupMeta::LoadFromFile()`](https://github.com/facebook/rocksdb/blob/df7cc66e171dfa665e34d293717242784195e1da/utilities/backupable/backupable_db.cc#L2726) - Passing Existing tests Reviewed By: ajkr Differential Revision: D31824535 Pulled By: hx235 fbshipit-source-id: d2b3dea7a64e2a4b1e6a59fca322f0800a4fcbcc --- HISTORY.md | 2 + util/rate_limiter.cc | 3 +- util/rate_limiter.h | 2 - utilities/backupable/backupable_db.cc | 70 +++++++++--- utilities/backupable/backupable_db_test.cc | 119 +++++++++++++++++++++ 5 files changed, 176 insertions(+), 20 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index dc3f1b2b4..99c5b8b8d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -19,6 +19,7 @@ ### Behavior Changes * `NUM_FILES_IN_SINGLE_COMPACTION` was only counting the first input level files, now it's including all input files. * `TransactionUtil::CheckKeyForConflicts` can also perform conflict-checking based on user-defined timestamps in addition to sequence numbers. +* Removed `GenericRateLimiter`'s minimum refill bytes per period previously enforced. ### Public Interface Change * When options.ttl is used with leveled compaction with compactinon priority kMinOverlappingRatio, files exceeding half of TTL value will be prioritized more, so that by the time TTL is reached, fewer extra compactions will be scheduled to clear them up. At the same time, when compacting files with data older than half of TTL, output files may be cut off based on those files' boundaries, in order for the early TTL compaction to work properly. @@ -45,6 +46,7 @@ * Fixed a bug where stalled writes would remain stalled forever after the user calls `WriteBufferManager::SetBufferSize()` with `new_size == 0` to dynamically disable memory limiting. * Make `DB::close()` thread-safe. * Fix a bug in atomic flush where one bg flush thread will wait forever for a preceding bg flush thread to commit its result to MANIFEST but encounters an error which is mapped to a soft error (DB not stopped). +* Fix a bug in `BackupEngine` where some internal callers of `GenericRateLimiter::Request()` do not honor `bytes <= GetSingleBurstBytes()`. ### New Features * Print information about blob files when using "ldb list_live_files_metadata" diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc index f7f199625..c59a05c54 100644 --- a/util/rate_limiter.cc +++ b/util/rate_limiter.cc @@ -302,8 +302,7 @@ int64_t GenericRateLimiter::CalculateRefillBytesPerPeriod( // inaccurate but is a number that is large enough. return port::kMaxInt64 / 1000000; } else { - return std::max(kMinRefillBytesPerPeriod, - rate_bytes_per_sec * refill_period_us_ / 1000000); + return rate_bytes_per_sec * refill_period_us_ / 1000000; } } diff --git a/util/rate_limiter.h b/util/rate_limiter.h index dbdb1eed2..3752bf88d 100644 --- a/util/rate_limiter.h +++ b/util/rate_limiter.h @@ -106,8 +106,6 @@ class GenericRateLimiter : public RateLimiter { // This mutex guard all internal states mutable port::Mutex request_mutex_; - const int64_t kMinRefillBytesPerPeriod = 100; - const int64_t refill_period_us_; int64_t rate_bytes_per_sec_; diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index e49525c5c..06c9ff3ec 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -35,7 +35,9 @@ #include "logging/logging.h" #include "monitoring/iostats_context_imp.h" #include "port/port.h" +#include "rocksdb/env.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb/statistics.h" #include "rocksdb/transaction_log.h" #include "table/sst_file_dumper.h" #include "test_util/sync_point.h" @@ -232,6 +234,12 @@ class BackupEngineImpl { } }; + static void LoopRateLimitRequestHelper(const size_t total_bytes_to_request, + RateLimiter* rate_limiter, + const Env::IOPriority pri, + Statistics* stats, + const RateLimiter::OpType op_type); + static inline std::string WithoutTrailingSlash(const std::string& path) { if (path.empty() || path.back() != '/') { return path; @@ -2022,9 +2030,16 @@ IOStatus BackupEngineImpl::CopyOrCreateFile( checksum_value = crc32c::Extend(checksum_value, data.data(), data.size()); } io_s = dest_writer->Append(data); + if (rate_limiter != nullptr) { - rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kWrite); + if (!src.empty()) { + rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } else { + LoopRateLimitRequestHelper(data.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kWrite); + } } while (*bytes_toward_next_callback >= options_.callback_trigger_interval_size) { @@ -2358,8 +2373,9 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, // sizeof(*table_properties) is a sufficent but far-from-exact // approximation of read bytes due to metaindex block, std::string // properties and varint compression - rate_limiter->Request(sizeof(*table_properties), Env::IO_LOW, - nullptr /* stats */, RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(sizeof(*table_properties), rate_limiter, + Env::IO_LOW, nullptr /* stats */, + RateLimiter::OpType::kRead); } } } else { @@ -2388,6 +2404,22 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env, } } +void BackupEngineImpl::LoopRateLimitRequestHelper( + const size_t total_bytes_to_request, RateLimiter* rate_limiter, + const Env::IOPriority pri, Statistics* stats, + const RateLimiter::OpType op_type) { + assert(rate_limiter != nullptr); + size_t remaining_bytes = total_bytes_to_request; + size_t request_bytes = 0; + while (remaining_bytes > 0) { + request_bytes = + std::min(static_cast(rate_limiter->GetSingleBurstBytes()), + remaining_bytes); + rate_limiter->Request(request_bytes, pri, stats, op_type); + remaining_bytes -= request_bytes; + } +} + void BackupEngineImpl::DeleteChildren(const std::string& dir, uint32_t file_type_filter) const { std::vector children; @@ -2716,8 +2748,9 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( std::string line; if (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } if (StartsWith(line, kSchemaVersionPrefix)) { std::string ver = line.substr(kSchemaVersionPrefix.size()); @@ -2736,23 +2769,26 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } else if (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } if (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10); } uint32_t num_files = UINT32_MAX; while (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); @@ -2794,8 +2830,9 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( bool footer_present = false; while (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } std::vector components = StringSplit(line, ' '); @@ -2885,8 +2922,9 @@ IOStatus BackupEngineImpl::BackupMeta::LoadFromFile( assert(schema_major_version >= 2); while (backup_meta_reader->ReadLine(&line)) { if (rate_limiter != nullptr) { - rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */, - RateLimiter::OpType::kRead); + LoopRateLimitRequestHelper(line.size(), rate_limiter, Env::IO_LOW, + nullptr /* stats */, + RateLimiter::OpType::kRead); } if (line.empty()) { return IOStatus::Corruption("Unexpected empty line"); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index dca39c348..0c2ff66b1 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -13,20 +13,25 @@ #include #include +#include #include #include #include +#include #include #include #include #include "db/db_impl/db_impl.h" +#include "db/db_test_util.h" #include "env/env_chroot.h" #include "file/filename.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/env.h" #include "rocksdb/file_checksum.h" #include "rocksdb/rate_limiter.h" +#include "rocksdb/statistics.h" #include "rocksdb/transaction_log.h" #include "rocksdb/types.h" #include "rocksdb/utilities/options_util.h" @@ -36,6 +41,7 @@ #include "util/cast_util.h" #include "util/mutexlock.h" #include "util/random.h" +#include "util/rate_limiter.h" #include "util/stderr_logger.h" #include "util/string_util.h" #include "utilities/backupable/backupable_db_impl.h" @@ -2803,6 +2809,119 @@ TEST_P(BackupEngineRateLimitingTestWithParam, CloseDBAndBackupEngine(); DestroyDB(dbname_, Options()); } + +class BackupEngineRateLimitingTestWithParam2 + : public BackupEngineTest, + public testing::WithParamInterface< + std::tuple /* limits */>> { + public: + BackupEngineRateLimitingTestWithParam2() {} +}; + +INSTANTIATE_TEST_CASE_P( + LowRefillBytesPerPeriod, BackupEngineRateLimitingTestWithParam2, + ::testing::Values(std::make_tuple(std::make_pair(1, 1)))); +// To verify we don't request over-sized bytes relative to +// refill_bytes_per_period_ in each RateLimiter::Request() called in +// BackupEngine through verifying we don't trigger assertion +// failure on over-sized request in GenericRateLimiter in debug builds +TEST_P(BackupEngineRateLimitingTestWithParam2, + RateLimitingWithLowRefillBytesPerPeriod) { + SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true); + + backupable_options_->max_background_operations = 1; + const uint64_t backup_rate_limiter_limit = std::get<0>(GetParam()).first; + std::shared_ptr backup_rate_limiter( + std::make_shared( + backup_rate_limiter_limit, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */, + special_env.GetSystemClock(), false /* auto_tuned */)); + + backupable_options_->backup_rate_limiter = backup_rate_limiter; + + const uint64_t restore_rate_limiter_limit = std::get<0>(GetParam()).second; + std::shared_ptr restore_rate_limiter( + std::make_shared( + restore_rate_limiter_limit, 1000 * 1000 /* refill_period_us */, + 10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */, + special_env.GetSystemClock(), false /* auto_tuned */)); + + backupable_options_->restore_rate_limiter = restore_rate_limiter; + + // Rate limiter uses `CondVar::TimedWait()`, which does not have access to the + // `Env` to advance its time according to the fake wait duration. The + // workaround is to install a callback that advance the `Env`'s mock time. + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "GenericRateLimiter::Request:PostTimedWait", [&](void* arg) { + int64_t time_waited_us = *static_cast(arg); + special_env.SleepForMicroseconds(static_cast(time_waited_us)); + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyDB(dbname_, Options()); + OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */, + kShareWithChecksum /* shared_option */); + + FillDB(db_.get(), 0, 100); + int64_t total_bytes_through_before_backup = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + int64_t total_bytes_through_after_backup = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_backup, + total_bytes_through_before_backup); + + std::vector backup_infos; + BackupInfo backup_info; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(1, backup_infos.size()); + const int backup_id = 1; + ASSERT_EQ(backup_id, backup_infos[0].backup_id); + ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info, + true /* include_file_details */)); + int64_t total_bytes_through_before_verify_backup = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK( + backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */)); + int64_t total_bytes_through_after_verify_backup = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_verify_backup, + total_bytes_through_before_verify_backup); + + CloseDBAndBackupEngine(); + AssertBackupConsistency(backup_id, 0, 100, 101); + + int64_t total_bytes_through_before_initialize = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + OpenDBAndBackupEngine(false /* destroy_old_data */); + // We charge read in BackupEngineImpl::BackupMeta::LoadFromFile, + // which is called in BackupEngineImpl::Initialize() during + // OpenBackupEngine(false) + int64_t total_bytes_through_after_initialize = + backupable_options_->backup_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_initialize, + total_bytes_through_before_initialize); + CloseDBAndBackupEngine(); + + DestroyDB(dbname_, Options()); + OpenBackupEngine(false /* destroy_old_data */); + int64_t total_bytes_through_before_restore = + backupable_options_->restore_rate_limiter->GetTotalBytesThrough(); + EXPECT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + int64_t total_bytes_through_after_restore = + backupable_options_->restore_rate_limiter->GetTotalBytesThrough(); + ASSERT_GT(total_bytes_through_after_restore, + total_bytes_through_before_restore); + CloseBackupEngine(); + + DestroyDB(dbname_, Options()); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearCallBack( + "GenericRateLimiter::Request:PostTimedWait"); +} + #endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN) TEST_F(BackupEngineTest, ReadOnlyBackupEngine) {