diff --git a/include/rocksdb/utilities/backupable_db.h b/include/rocksdb/utilities/backupable_db.h index f08752d88..c2f07c17c 100644 --- a/include/rocksdb/utilities/backupable_db.h +++ b/include/rocksdb/utilities/backupable_db.h @@ -92,17 +92,20 @@ struct BackupableDBOptions { // Default: 1 int max_background_operations; + // During backup user can get callback every time next + // callback_trigger_interval_size bytes being copied. + // Default: 4194304 + uint64_t callback_trigger_interval_size; + void Dump(Logger* logger) const; - explicit BackupableDBOptions(const std::string& _backup_dir, - Env* _backup_env = nullptr, - bool _share_table_files = true, - Logger* _info_log = nullptr, bool _sync = true, - bool _destroy_old_data = false, - bool _backup_log_files = true, - uint64_t _backup_rate_limit = 0, - uint64_t _restore_rate_limit = 0, - int _max_background_operations = 1) + explicit BackupableDBOptions( + const std::string& _backup_dir, Env* _backup_env = nullptr, + bool _share_table_files = true, Logger* _info_log = nullptr, + bool _sync = true, bool _destroy_old_data = false, + bool _backup_log_files = true, uint64_t _backup_rate_limit = 0, + uint64_t _restore_rate_limit = 0, int _max_background_operations = 1, + uint64_t _callback_trigger_interval_size = 4 * 1024 * 1024) : backup_dir(_backup_dir), backup_env(_backup_env), share_table_files(_share_table_files), @@ -113,7 +116,8 @@ struct BackupableDBOptions { backup_rate_limit(_backup_rate_limit), restore_rate_limit(_restore_rate_limit), share_files_with_checksum(false), - max_background_operations(_max_background_operations) { + max_background_operations(_max_background_operations), + callback_trigger_interval_size(_callback_trigger_interval_size) { assert(share_table_files || !share_files_with_checksum); } }; @@ -213,7 +217,9 @@ class BackupEngine { const BackupableDBOptions& options, BackupEngine** backup_engine_ptr); - virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false) = 0; + virtual Status CreateNewBackup( + DB* db, bool flush_before_backup = false, + std::function progress_callback = []() {}) = 0; virtual Status PurgeOldBackups(uint32_t num_backups_to_keep) = 0; virtual Status DeleteBackup(BackupID backup_id) = 0; virtual void StopBackup() = 0; diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index a574cb4d8..16f6d527b 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -89,7 +89,9 @@ class BackupEngineImpl : public BackupEngine { BackupEngineImpl(Env* db_env, const BackupableDBOptions& options, bool read_only = false); ~BackupEngineImpl(); - Status CreateNewBackup(DB* db, bool flush_before_backup = false) override; + Status CreateNewBackup(DB* db, bool flush_before_backup = false, + std::function progress_callback = []() { + }) override; Status PurgeOldBackups(uint32_t num_backups_to_keep) override; Status DeleteBackup(BackupID backup_id) override; void StopBackup() override { @@ -267,15 +269,11 @@ class BackupEngineImpl : public BackupEngine { Status PutLatestBackupFileContents(uint32_t latest_backup); // if size_limit == 0, there is no size limit, copy everything - Status CopyFile(const std::string& src, - const std::string& dst, - Env* src_env, - Env* dst_env, - bool sync, - RateLimiter* rate_limiter, - uint64_t* size = nullptr, - uint32_t* checksum_value = nullptr, - uint64_t size_limit = 0); + Status CopyFile(const std::string& src, const std::string& dst, Env* src_env, + Env* dst_env, bool sync, RateLimiter* rate_limiter, + uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, + uint64_t size_limit = 0, + std::function progress_callback = []() {}); Status CalculateChecksum(const std::string& src, Env* src_env, @@ -296,6 +294,7 @@ class BackupEngineImpl : public BackupEngine { RateLimiter* rate_limiter; uint64_t size_limit; std::promise result; + std::function progress_callback; CopyWorkItem() {} CopyWorkItem(const CopyWorkItem&) = delete; @@ -312,23 +311,22 @@ class BackupEngineImpl : public BackupEngine { rate_limiter = o.rate_limiter; size_limit = o.size_limit; result = std::move(o.result); + progress_callback = std::move(o.progress_callback); return *this; } - CopyWorkItem(std::string _src_path, - std::string _dst_path, - Env* _src_env, - Env* _dst_env, - bool _sync, - RateLimiter* _rate_limiter, - uint64_t _size_limit) + CopyWorkItem(std::string _src_path, std::string _dst_path, Env* _src_env, + Env* _dst_env, bool _sync, RateLimiter* _rate_limiter, + uint64_t _size_limit, + std::function _progress_callback = []() {}) : src_path(std::move(_src_path)), dst_path(std::move(_dst_path)), src_env(_src_env), dst_env(_dst_env), sync(_sync), rate_limiter(_rate_limiter), - size_limit(_size_limit) {} + size_limit(_size_limit), + progress_callback(_progress_callback) {} }; struct BackupAfterCopyWorkItem { @@ -388,19 +386,18 @@ class BackupEngineImpl : public BackupEngine { }; bool initialized_; + std::mutex byte_report_mutex_; channel files_to_copy_; std::vector threads_; Status AddBackupFileWorkItem( - std::unordered_set& live_dst_paths, - std::vector& backup_items_to_finish, - BackupID backup_id, - bool shared, - const std::string& src_dir, - const std::string& src_fname, // starts with "/" - RateLimiter* rate_limiter, - uint64_t size_limit = 0, - bool shared_checksum = false); + std::unordered_set& live_dst_paths, + std::vector& backup_items_to_finish, + BackupID backup_id, bool shared, const std::string& src_dir, + const std::string& src_fname, // starts with "/" + RateLimiter* rate_limiter, uint64_t size_limit = 0, + bool shared_checksum = false, + std::function progress_callback = []() {}); // backup state data BackupID latest_backup_id_; @@ -578,15 +575,11 @@ Status BackupEngineImpl::Initialize() { CopyWorkItem work_item; while (files_to_copy_.read(work_item)) { CopyResult result; - result.status = CopyFile(work_item.src_path, - work_item.dst_path, - work_item.src_env, - work_item.dst_env, - work_item.sync, - work_item.rate_limiter, - &result.size, - &result.checksum_value, - work_item.size_limit); + result.status = + CopyFile(work_item.src_path, work_item.dst_path, work_item.src_env, + work_item.dst_env, work_item.sync, work_item.rate_limiter, + &result.size, &result.checksum_value, work_item.size_limit, + work_item.progress_callback); work_item.result.set_value(std::move(result)); } }); @@ -597,7 +590,8 @@ Status BackupEngineImpl::Initialize() { return Status::OK(); } -Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { +Status BackupEngineImpl::CreateNewBackup( + DB* db, bool flush_before_backup, std::function progress_callback) { assert(initialized_); assert(!read_only_); Status s; @@ -672,15 +666,12 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { // * if it's kTableFile, then it's shared // * if it's kDescriptorFile, limit the size to manifest_file_size s = AddBackupFileWorkItem( - live_dst_paths, - backup_items_to_finish, - new_backup_id, - options_.share_table_files && type == kTableFile, - db->GetName(), - live_files[i], - rate_limiter.get(), - (type == kDescriptorFile) ? manifest_file_size : 0, - options_.share_files_with_checksum && type == kTableFile); + live_dst_paths, backup_items_to_finish, new_backup_id, + options_.share_table_files && type == kTableFile, db->GetName(), + live_files[i], rate_limiter.get(), + (type == kDescriptorFile) ? manifest_file_size : 0, + options_.share_files_with_checksum && type == kTableFile, + progress_callback); } // Add a CopyWorkItem to the channel for each WAL file for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) { @@ -1085,13 +1076,12 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) { return s; } -Status BackupEngineImpl::CopyFile( - const std::string& src, - const std::string& dst, Env* src_env, - Env* dst_env, bool sync, - RateLimiter* rate_limiter, uint64_t* size, - uint32_t* checksum_value, - uint64_t size_limit) { +Status BackupEngineImpl::CopyFile(const std::string& src, + const std::string& dst, Env* src_env, + Env* dst_env, bool sync, + RateLimiter* rate_limiter, uint64_t* size, + uint32_t* checksum_value, uint64_t size_limit, + std::function progress_callback) { Status s; unique_ptr dst_file; unique_ptr src_file; @@ -1125,6 +1115,7 @@ Status BackupEngineImpl::CopyFile( unique_ptr buf(new char[copy_file_buffer_size_]); Slice data; + uint64_t processed_buffer_size = 0; do { if (stop_backup_.load(std::memory_order_acquire)) { return Status::Incomplete("Backup stopped"); @@ -1149,6 +1140,12 @@ Status BackupEngineImpl::CopyFile( if (rate_limiter != nullptr) { rate_limiter->Request(data.size(), Env::IO_LOW); } + processed_buffer_size += buffer_to_read; + if (processed_buffer_size > options_.callback_trigger_interval_size) { + processed_buffer_size -= options_.callback_trigger_interval_size; + std::lock_guard lock(byte_report_mutex_); + progress_callback(); + } } while (s.ok() && data.size() > 0 && size_limit > 0); if (s.ok() && sync) { @@ -1160,15 +1157,12 @@ Status BackupEngineImpl::CopyFile( // src_fname will always start with "/" Status BackupEngineImpl::AddBackupFileWorkItem( - std::unordered_set& live_dst_paths, - std::vector& backup_items_to_finish, - BackupID backup_id, - bool shared, - const std::string& src_dir, - const std::string& src_fname, - RateLimiter* rate_limiter, - uint64_t size_limit, - bool shared_checksum) { + std::unordered_set& live_dst_paths, + std::vector& backup_items_to_finish, + BackupID backup_id, bool shared, const std::string& src_dir, + const std::string& src_fname, RateLimiter* rate_limiter, + uint64_t size_limit, bool shared_checksum, + std::function progress_callback) { assert(src_fname.size() > 0 && src_fname[0] == '/'); std::string dst_relative = src_fname.substr(1); std::string dst_relative_tmp; @@ -1252,13 +1246,9 @@ Status BackupEngineImpl::AddBackupFileWorkItem( if (need_to_copy) { Log(options_.info_log, "Copying %s to %s", src_fname.c_str(), dst_path_tmp.c_str()); - CopyWorkItem copy_work_item(src_dir + src_fname, - dst_path_tmp, - db_env_, - backup_env_, - options_.sync, - rate_limiter, - size_limit); + CopyWorkItem copy_work_item(src_dir + src_fname, dst_path_tmp, db_env_, + backup_env_, options_.sync, rate_limiter, + size_limit, progress_callback); BackupAfterCopyWorkItem after_copy_work_item( copy_work_item.result.get_future(), shared, diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 266ce6849..c9e0e9a13 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -1172,6 +1172,20 @@ TEST_F(BackupableDBTest, ReadOnlyBackupEngine) { delete db; } +TEST_F(BackupableDBTest, ProgressCallbackDuringBackup) { + DestroyDB(dbname_, Options()); + OpenDBAndBackupEngine(true); + FillDB(db_.get(), 0, 100); + bool is_callback_invoked = false; + ASSERT_OK(backup_engine_->CreateNewBackup( + db_.get(), true, + [&is_callback_invoked]() { is_callback_invoked = true; })); + + ASSERT_TRUE(is_callback_invoked); + CloseDBAndBackupEngine(); + DestroyDB(dbname_, Options()); +} + TEST_F(BackupableDBTest, GarbageCollectionBeforeBackup) { DestroyDB(dbname_, Options()); OpenDBAndBackupEngine(true);