diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index e88326952..965f0b5c5 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -51,7 +52,10 @@ class BackupRateLimiter { micros_start_time_(env->NowMicros()), bytes_since_start_(0) {} + // thread safe void ReportAndWait(uint64_t bytes_since_last_call) { + std::unique_lock lk(lock_); + bytes_since_start_ += bytes_since_last_call; if (bytes_since_start_ < bytes_per_check_) { // not enough bytes to be rate-limited @@ -75,6 +79,7 @@ class BackupRateLimiter { private: Env* env_; + std::mutex lock_; uint64_t max_bytes_per_second_; uint64_t bytes_per_check_; uint64_t micros_start_time_; @@ -338,9 +343,9 @@ class BackupEngineImpl : public BackupEngine { CopyWorkItem(const CopyWorkItem&) = delete; CopyWorkItem& operator=(const CopyWorkItem&) = delete; - CopyWorkItem(CopyWorkItem&& o) { *this = std::move(o); } + CopyWorkItem(CopyWorkItem&& o) noexcept { *this = std::move(o); } - CopyWorkItem& operator=(CopyWorkItem&& o) { + CopyWorkItem& operator=(CopyWorkItem&& o) noexcept { src_path = std::move(o.src_path); dst_path = std::move(o.dst_path); src_env = o.src_env; @@ -378,11 +383,11 @@ class BackupEngineImpl : public BackupEngine { std::string dst_relative; BackupAfterCopyWorkItem() {} - BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) { + BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) noexcept { *this = std::move(o); } - BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) { + BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) noexcept { result = std::move(o.result); shared = o.shared; needed_to_copy = o.needed_to_copy; @@ -413,11 +418,11 @@ class BackupEngineImpl : public BackupEngine { RestoreAfterCopyWorkItem(std::future&& _result, uint32_t _checksum_value) : result(std::move(_result)), checksum_value(_checksum_value) {} - RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) { + RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) noexcept { *this = std::move(o); } - RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) { + RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) noexcept { result = std::move(o.result); checksum_value = o.checksum_value; return *this; @@ -672,11 +677,6 @@ Status BackupEngineImpl::Initialize() { Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { assert(initialized_); - if (options_.max_background_operations > 1 && - options_.backup_rate_limit != 0) { - return Status::InvalidArgument( - "Multi-threaded backups cannot use a backup_rate_limit"); - } assert(!read_only_); Status s; std::vector live_files; @@ -968,11 +968,6 @@ Status BackupEngineImpl::RestoreDBFromBackup( BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, const RestoreOptions& restore_options) { assert(initialized_); - if (options_.max_background_operations > 1 && - options_.restore_rate_limit != 0) { - return Status::InvalidArgument( - "Multi-threaded restores cannot use a restore_rate_limit"); - } auto corrupt_itr = corrupt_backups_.find(backup_id); if (corrupt_itr != corrupt_backups_.end()) { return corrupt_itr->second.first; @@ -1146,7 +1141,8 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) { unique_ptr file_writer( new WritableFileWriter(std::move(file), env_options)); char file_contents[10]; - int len = sprintf(file_contents, "%u\n", latest_backup); + int len = + snprintf(file_contents, sizeof(file_contents), "%u\n", latest_backup); s = file_writer->Append(Slice(file_contents, len)); if (s.ok() && options_.sync) { file_writer->Sync(false); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 8cd4d356c..ee8e2d44c 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -1037,43 +1037,46 @@ TEST_F(BackupableDBTest, KeepLogFiles) { } TEST_F(BackupableDBTest, RateLimiting) { - uint64_t const KB = 1024 * 1024; - size_t const kMicrosPerSec = 1000 * 1000LL; + // iter 0 -- single threaded + // iter 1 -- multi threaded + for (int iter = 0; iter < 2; ++iter) { + uint64_t const KB = 1024 * 1024; + size_t const kMicrosPerSec = 1000 * 1000LL; - std::vector> limits( - {{KB, 5 * KB}, {2 * KB, 3 * KB}}); + std::vector> limits( + {{KB, 5 * KB}, {2 * KB, 3 * KB}}); - for (const auto& limit : limits) { - // destroy old data - DestroyDB(dbname_, Options()); + for (const auto& limit : limits) { + // destroy old data + DestroyDB(dbname_, Options()); - backupable_options_->backup_rate_limit = limit.first; - backupable_options_->restore_rate_limit = limit.second; - // rate-limiting backups must be single-threaded - backupable_options_->max_background_operations = 1; - options_.compression = kNoCompression; - OpenDBAndBackupEngine(true); - size_t bytes_written = FillDB(db_.get(), 0, 100000); - - auto start_backup = env_->NowMicros(); - ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); - auto backup_time = env_->NowMicros() - start_backup; - auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / - backupable_options_->backup_rate_limit; - ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time); + backupable_options_->backup_rate_limit = limit.first; + backupable_options_->restore_rate_limit = limit.second; + backupable_options_->max_background_operations = (iter == 0) ? 1 : 10; + options_.compression = kNoCompression; + OpenDBAndBackupEngine(true); + size_t bytes_written = FillDB(db_.get(), 0, 100000); - CloseDBAndBackupEngine(); + auto start_backup = env_->NowMicros(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false)); + auto backup_time = env_->NowMicros() - start_backup; + auto rate_limited_backup_time = (bytes_written * kMicrosPerSec) / + backupable_options_->backup_rate_limit; + ASSERT_GT(backup_time, 0.8 * rate_limited_backup_time); - OpenBackupEngine(); - auto start_restore = env_->NowMicros(); - ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); - auto restore_time = env_->NowMicros() - start_restore; - CloseBackupEngine(); - auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / - backupable_options_->restore_rate_limit; - ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time); + CloseDBAndBackupEngine(); - AssertBackupConsistency(0, 0, 100000, 100010); + OpenBackupEngine(); + auto start_restore = env_->NowMicros(); + ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_)); + auto restore_time = env_->NowMicros() - start_restore; + CloseBackupEngine(); + auto rate_limited_restore_time = (bytes_written * kMicrosPerSec) / + backupable_options_->restore_rate_limit; + ASSERT_GT(restore_time, 0.8 * rate_limited_restore_time); + + AssertBackupConsistency(0, 0, 100000, 100010); + } } }