diff --git a/include/rocksdb/utilities/backupable_db.h b/include/rocksdb/utilities/backupable_db.h index 6ae211476..27c1b49ac 100644 --- a/include/rocksdb/utilities/backupable_db.h +++ b/include/rocksdb/utilities/backupable_db.h @@ -75,11 +75,21 @@ struct BackupableDBOptions { // Default: 0 uint64_t backup_rate_limit; + // Backup rate limiter. Used to control transfer speed for backup. If this is + // not null, backup_rate_limit is ignored. + // Default: nullptr + std::shared_ptr backup_rate_limiter{nullptr}; + // Max bytes that can be transferred in a second during restore. // If 0, go as fast as you can // Default: 0 uint64_t restore_rate_limit; + // Restore rate limiter. Used to control transfer speed during restore. If + // this is not null, restore_rate_limit is ignored. + // Default: nullptr + std::shared_ptr restore_rate_limiter{nullptr}; + // Only used if share_table_files is set to true. If true, will consider that // backups can come from different databases, hence a sst is not uniquely // identifed by its name, but by the triple (file name, crc32, file length) diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 5c2c856da..422896e85 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -497,7 +497,18 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, db_env_(db_env), backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), copy_file_buffer_size_(kDefaultCopyFileBufferSize), - read_only_(read_only) {} + read_only_(read_only) { + if (options_.backup_rate_limiter == nullptr && + options_.backup_rate_limit > 0) { + options_.backup_rate_limiter.reset( + NewGenericRateLimiter(options_.backup_rate_limit)); + } + if (options_.restore_rate_limiter == nullptr && + options_.restore_rate_limit > 0) { + options_.restore_rate_limiter.reset( + NewGenericRateLimiter(options_.restore_rate_limit)); + } +} BackupEngineImpl::~BackupEngineImpl() { files_to_copy_or_create_.sendEof(); @@ -703,9 +714,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( s = backup_env_->CreateDir( GetAbsolutePath(GetPrivateFileRel(new_backup_id, true))); - unique_ptr rate_limiter; - if (options_.backup_rate_limit > 0) { - rate_limiter.reset(NewGenericRateLimiter(options_.backup_rate_limit)); + RateLimiter* rate_limiter = options_.backup_rate_limiter.get(); + if (rate_limiter) { copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes(); } @@ -758,7 +768,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( s = AddBackupFileWorkItem( live_dst_paths, backup_items_to_finish, new_backup_id, options_.share_table_files && type == kTableFile, db->GetName(), - live_files[i], rate_limiter.get(), size_bytes, + live_files[i], rate_limiter, size_bytes, (type == kDescriptorFile) ? manifest_file_size : 0, options_.share_files_with_checksum && type == kTableFile, progress_callback); @@ -767,10 +777,9 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( // Write the current file with the manifest filename as its contents. s = AddBackupFileWorkItem( live_dst_paths, backup_items_to_finish, new_backup_id, - false /* shared */, "" /* src_dir */, CurrentFileName(""), - rate_limiter.get(), manifest_fname.size(), 0 /* size_limit */, - false /* shared_checksum */, progress_callback, - manifest_fname.substr(1) + "\n"); + false /* shared */, "" /* src_dir */, CurrentFileName(""), rate_limiter, + manifest_fname.size(), 0 /* size_limit */, false /* shared_checksum */, + progress_callback, manifest_fname.substr(1) + "\n"); } // Pre-fetch sizes for WAL files @@ -797,8 +806,8 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( s = AddBackupFileWorkItem(live_dst_paths, backup_items_to_finish, new_backup_id, false, /* not shared */ db->GetOptions().wal_dir, - live_wal_files[i]->PathName(), - rate_limiter.get(), size_bytes); + live_wal_files[i]->PathName(), rate_limiter, + size_bytes); } } @@ -1043,9 +1052,8 @@ Status BackupEngineImpl::RestoreDBFromBackup( DeleteChildren(db_dir); } - unique_ptr rate_limiter; - if (options_.restore_rate_limit > 0) { - rate_limiter.reset(NewGenericRateLimiter(options_.restore_rate_limit)); + RateLimiter* rate_limiter = options_.restore_rate_limiter.get(); + if (rate_limiter) { copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes(); } Status s; @@ -1081,7 +1089,7 @@ Status BackupEngineImpl::RestoreDBFromBackup( Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); CopyOrCreateWorkItem copy_or_create_work_item( GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_, - false, rate_limiter.get(), 0 /* size_limit */); + false, rate_limiter, 0 /* size_limit */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file_info->checksum_value); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 680704b23..c41ce94d2 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -16,6 +16,7 @@ #include "db/filename.h" #include "port/port.h" #include "port/stack_trace.h" +#include "rocksdb/rate_limiter.h" #include "rocksdb/transaction_log.h" #include "rocksdb/types.h" #include "rocksdb/utilities/backupable_db.h" @@ -1117,45 +1118,58 @@ TEST_F(BackupableDBTest, KeepLogFiles) { } TEST_F(BackupableDBTest, RateLimiting) { - // iter 0 -- single threaded - // iter 1 -- multi threaded - for (int iter = 0; iter < 2; ++iter) { - uint64_t const KB = 1024 * 1024; - size_t const kMicrosPerSec = 1000 * 1000LL; - - std::vector> limits( - {{KB, 5 * KB}, {2 * KB, 3 * KB}}); - - for (const auto& limit : limits) { - // destroy old data - DestroyDB(dbname_, options_); - - backupable_options_->backup_rate_limit = limit.first; - backupable_options_->restore_rate_limit = limit.second; - backupable_options_->max_background_operations = (iter == 0) ? 1 : 10; - options_.compression = kNoCompression; - OpenDBAndBackupEngine(true); - size_t bytes_written = FillDB(db_.get(), 0, 100000); + size_t const kMicrosPerSec = 1000 * 1000LL; + uint64_t const MB = 1024 * 1024; - auto start_backup = db_chroot_env_->NowMicros(); - ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); - auto backup_time = db_chroot_env_->NowMicros() - start_backup; - auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / - backupable_options_->backup_rate_limit; - ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time); + const std::vector> limits( + {{1 * MB, 5 * MB}, {2 * MB, 3 * MB}}); - CloseDBAndBackupEngine(); + std::shared_ptr backupThrottler(NewGenericRateLimiter(1)); + std::shared_ptr restoreThrottler(NewGenericRateLimiter(1)); - OpenBackupEngine(); - auto start_restore = db_chroot_env_->NowMicros(); - ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); - auto restore_time = db_chroot_env_->NowMicros() - start_restore; - CloseBackupEngine(); - auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / - backupable_options_->restore_rate_limit; - ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time); - - AssertBackupConsistency(0, 0, 100000, 100010); + for (bool makeThrottler : {false, true}) { + if (makeThrottler) { + backupable_options_->backup_rate_limiter = backupThrottler; + backupable_options_->restore_rate_limiter = restoreThrottler; + } + // iter 0 -- single threaded + // iter 1 -- multi threaded + for (int iter = 0; iter < 2; ++iter) { + for (const auto& limit : limits) { + // destroy old data + DestroyDB(dbname_, Options()); + if (makeThrottler) { + backupThrottler->SetBytesPerSecond(limit.first); + restoreThrottler->SetBytesPerSecond(limit.second); + } else { + backupable_options_->backup_rate_limit = limit.first; + backupable_options_->restore_rate_limit = limit.second; + } + backupable_options_->max_background_operations = (iter == 0) ? 1 : 10; + options_.compression = kNoCompression; + OpenDBAndBackupEngine(true); + size_t bytes_written = FillDB(db_.get(), 0, 100000); + + auto start_backup = db_chroot_env_->NowMicros(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + auto backup_time = db_chroot_env_->NowMicros() - start_backup; + auto rate_limited_backup_time = + (bytes_written * kMicrosPerSec) / limit.first; + ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time); + + CloseDBAndBackupEngine(); + + OpenBackupEngine(); + auto start_restore = db_chroot_env_->NowMicros(); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + auto restore_time = db_chroot_env_->NowMicros() - start_restore; + CloseBackupEngine(); + auto rate_limited_restore_time = + (bytes_written * kMicrosPerSec) / limit.second; + ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time); + + AssertBackupConsistency(0, 0, 100000, 100010); + } } } }