From 32752551b97468237d2e8b8054a291feefaa6cd3 Mon Sep 17 00:00:00 2001 From: Peter Dillinger Date: Wed, 1 Sep 2021 14:10:36 -0700 Subject: [PATCH] Fix a buffer size race condition in BackupEngine (#8732) Summary: If RateLimiter burst bytes changes during concurrent Restore operations Pull Request resolved: https://github.com/facebook/rocksdb/pull/8732 Test Plan: updated unit test fails with TSAN before change, passes after Reviewed By: ajkr Differential Revision: D30683879 Pulled By: pdillinger fbshipit-source-id: d0ddb3587ade91ee2a4d926b475acf7781b03086 --- HISTORY.md | 1 + utilities/backupable/backupable_db.cc | 38 ++++++++++------------ utilities/backupable/backupable_db_test.cc | 20 ++++++++++-- 3 files changed, 36 insertions(+), 23 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 94ee05da3..f0aff6294 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Fixed a bug that could lead to duplicate DB ID or DB session ID in POSIX environments without /proc/sys/kernel/random/uuid. * Fix a race in DumpStats() with column family destruction due to not taking a Ref on each entry while iterating the ColumnFamilySet. * Fix a race in item ref counting in LRUCache when promoting an item from the SecondaryCache. +* Fix a race in BackupEngine if RateLimiter is reconfigured during concurrent Restore operations. ### New Features * RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions. diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 294fde6ca..5700f45b3 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -785,7 +785,6 @@ class BackupEngineImpl { std::unique_ptr private_directory_; static const size_t kDefaultCopyFileBufferSize = 5 * 1024 * 1024LL; // 5MB - mutable size_t copy_file_buffer_size_; bool read_only_; BackupStatistics backup_statistics_; std::unordered_set reported_ignored_fields_; @@ -924,7 +923,6 @@ BackupEngineImpl::BackupEngineImpl(const BackupEngineOptions& options, options_(options), db_env_(db_env), backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), - copy_file_buffer_size_(kDefaultCopyFileBufferSize), read_only_(read_only) { if (options_.backup_rate_limiter == nullptr && options_.backup_rate_limit > 0) { @@ -1263,11 +1261,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( s = backup_env_->CreateDir(private_dir); } - RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); - if (rate_limiter) { - copy_file_buffer_size_ = static_cast(rate_limiter->GetSingleBurstBytes()); - } - // A set into which we will insert the dst_paths that are calculated for live // files and live WAL files. // This is used to check whether a live files shares a dst_path with another @@ -1291,6 +1284,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( ? true : false; EnvOptions src_raw_env_options(db_options); + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); s = checkpoint.CreateCustomCheckpoint( db_options, [&](const std::string& /*src_dirname*/, const std::string& /*fname*/, @@ -1700,11 +1694,6 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, DeleteChildren(db_dir); } - RateLimiter* rate_limiter = options_.restore_rate_limiter.get(); - if (rate_limiter) { - copy_file_buffer_size_ = - static_cast(rate_limiter->GetSingleBurstBytes()); - } Status s; std::vector restore_items_to_finish; std::string temporary_current_file; @@ -1756,8 +1745,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options, dst.c_str()); CopyOrCreateWorkItem copy_or_create_work_item( GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_, - EnvOptions() /* src_env_options */, options_.sync, rate_limiter, - 0 /* size_limit */); + EnvOptions() /* src_env_options */, options_.sync, + options_.restore_rate_limiter.get(), 0 /* size_limit */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file, dst, file_info->checksum_hex); @@ -1916,13 +1905,17 @@ Status BackupEngineImpl::CopyOrCreateFile( return s; } + size_t buf_size = + rate_limiter ? static_cast(rate_limiter->GetSingleBurstBytes()) + : kDefaultCopyFileBufferSize; + std::unique_ptr dest_writer( new WritableFileWriter(std::move(dst_file), dst, dst_file_options)); std::unique_ptr src_reader; std::unique_ptr buf; if (!src.empty()) { src_reader.reset(new SequentialFileReader(std::move(src_file), src)); - buf.reset(new char[copy_file_buffer_size_]); + buf.reset(new char[buf_size]); } Slice data; @@ -1932,9 +1925,8 @@ Status BackupEngineImpl::CopyOrCreateFile( return Status::Incomplete("Backup stopped"); } if (!src.empty()) { - size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) - ? copy_file_buffer_size_ - : static_cast(size_limit); + size_t buffer_to_read = + (buf_size < size_limit) ? buf_size : static_cast(size_limit); s = src_reader->Read(buffer_to_read, &data, buf.get()); processed_buffer_size += buffer_to_read; } else { @@ -2227,15 +2219,19 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum( return s; } - std::unique_ptr buf(new char[copy_file_buffer_size_]); + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + size_t buf_size = + rate_limiter ? static_cast(rate_limiter->GetSingleBurstBytes()) + : kDefaultCopyFileBufferSize; + std::unique_ptr buf(new char[buf_size]); Slice data; do { if (stop_backup_.load(std::memory_order_acquire)) { return Status::Incomplete("Backup stopped"); } - size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? - copy_file_buffer_size_ : static_cast(size_limit); + size_t buffer_to_read = + (buf_size < size_limit) ? buf_size : static_cast(size_limit); s = src_reader->Read(buffer_to_read, &data, buf.get()); if (!s.ok()) { diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 2b7c5c8a3..97ed3aec1 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -3120,6 +3120,13 @@ TEST_F(BackupEngineTest, Concurrency) { // // Because of the challenges of integrating this into db_stress, // this is a non-deterministic mini-stress test here instead. + + // To check for a race condition in handling buffer size based on byte + // burst limit, we need a (generous) rate limiter + std::shared_ptr limiter{NewGenericRateLimiter(1000000000)}; + backupable_options_->backup_rate_limiter = limiter; + backupable_options_->restore_rate_limiter = limiter; + OpenDBAndBackupEngine(true, false, kShareWithChecksum); static constexpr int keys_iteration = 5000; @@ -3145,8 +3152,9 @@ TEST_F(BackupEngineTest, Concurrency) { std::array restore_verify_threads; for (uint32_t i = 0; i < read_threads.size(); ++i) { uint32_t sleep_micros = rng() % 100000; - read_threads[i] = std::thread( - [this, i, sleep_micros, &db_opts, &be_opts, &restore_verify_threads] { + read_threads[i] = + std::thread([this, i, sleep_micros, &db_opts, &be_opts, + &restore_verify_threads, &limiter] { test_db_env_->SleepForMicroseconds(sleep_micros); // Whether to also re-open the BackupEngine, potentially seeing @@ -3225,6 +3233,14 @@ TEST_F(BackupEngineTest, Concurrency) { restore_db_dir)); } + // Test for race condition in reconfiguring limiter + // FIXME: this could set to a different value in all threads, except + // GenericRateLimiter::SetBytesPerSecond has a write-write race + // reported by TSAN + if (i == 0) { + limiter->SetBytesPerSecond(2000000000); + } + // Re-verify metadata (we don't receive updates from concurrently // creating a new backup) my_be->GetBackupInfo(&infos);