diff --git a/HISTORY.md b/HISTORY.md index 2ef40c4a9..dc8a44bbf 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -22,6 +22,7 @@ * Add Cache.GetPinnedUsage() to get the size of memory occupied by entries that are in use by the system. * DB:Open() will fail if the compression specified in Options is not linked with the binary. If you see this failure, recompile RocksDB with compression libraries present on your system. Also, previously our default compression was snappy. This behavior is now changed. Now, the default compression is snappy only if it's available on the system. If it isn't we change the default to kNoCompression. * We changed how we account for memory used in block cache. Previously, we only counted the sum of block sizes currently present in block cache. Now, we count the actual memory usage of the blocks. For example, a block of size 4.5KB will use 8KB memory with jemalloc. This might decrease your memory usage and possibly decrease performance. Increase block cache size if you see this happening after an upgrade. +* Add BackupEngineImpl.options_.max_background_operations to specify the maximum number of operations that may be performed in parallel. Add support for parallelized backup and restore. ## 3.11.0 (5/19/2015) ### New Features diff --git a/include/rocksdb/utilities/backupable_db.h b/include/rocksdb/utilities/backupable_db.h index 956ab3d18..aeaa8f3b2 100644 --- a/include/rocksdb/utilities/backupable_db.h +++ b/include/rocksdb/utilities/backupable_db.h @@ -87,6 +87,11 @@ struct BackupableDBOptions { // *turn it on only if you know what you're doing* bool share_files_with_checksum; + // Up to this many background threads will copy files for CreateNewBackup() + // and RestoreDBFromBackup() + // Default: 1 + int max_background_operations; + void Dump(Logger* logger) const; explicit BackupableDBOptions(const std::string& _backup_dir, @@ -96,7 +101,8 @@ struct BackupableDBOptions { bool _destroy_old_data = false, bool _backup_log_files = true, uint64_t _backup_rate_limit = 0, - uint64_t _restore_rate_limit = 0) + uint64_t _restore_rate_limit = 0, + int _max_background_operations = 1) : backup_dir(_backup_dir), backup_env(_backup_env), share_table_files(_share_table_files), @@ -106,7 +112,8 @@ struct BackupableDBOptions { backup_log_files(_backup_log_files), backup_rate_limit(_backup_rate_limit), restore_rate_limit(_restore_rate_limit), - share_files_with_checksum(false) { + share_files_with_checksum(false), + max_background_operations(_max_background_operations) { assert(share_table_files || !share_files_with_checksum); } }; diff --git a/util/channel.h b/util/channel.h new file mode 100644 index 000000000..a8987163f --- /dev/null +++ b/util/channel.h @@ -0,0 +1,67 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include +#include +#include + +#pragma once + +namespace rocksdb { + +template +class channel { + public: + explicit channel() : eof_(false) {} + + channel(const channel&) = delete; + void operator=(const channel&) = delete; + + void sendEof() { + std::lock_guard lk(lock_); + eof_ = true; + cv_.notify_all(); + } + + bool eof() { + std::lock_guard lk(lock_); + return buffer_.empty() && eof_; + } + + size_t size() const { + std::lock_guard lk(lock_); + return buffer_.size(); + } + + // writes elem to the queue + void write(T&& elem) { + std::unique_lock lk(lock_); + buffer_.emplace(std::forward(elem)); + cv_.notify_one(); + } + + /// Moves a dequeued element onto elem, blocking until an element + /// is available. + // returns false if EOF + bool read(T& elem) { + std::unique_lock lk(lock_); + cv_.wait(lk, [&] { return eof_ || !buffer_.empty(); }); + if (eof_ && buffer_.empty()) { + return false; + } + elem = std::move(buffer_.front()); + buffer_.pop(); + cv_.notify_one(); + return true; + } + + private: + std::condition_variable cv_; + std::mutex lock_; + std::queue buffer_; + bool eof_; +}; +} // namespace rocksdb diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index ab640ed45..d7d128f9f 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -11,6 +11,7 @@ #include "rocksdb/utilities/backupable_db.h" #include "db/filename.h" +#include "util/channel.h" #include "util/coding.h" #include "util/crc32c.h" #include "util/logging.h" @@ -30,7 +31,10 @@ #include #include #include +#include +#include #include +#include namespace rocksdb { @@ -99,18 +103,21 @@ std::string BackupStatistics::ToString() const { } void BackupableDBOptions::Dump(Logger* logger) const { - Log(logger, " Options.backup_dir: %s", backup_dir.c_str()); - Log(logger, " Options.backup_env: %p", backup_env); - Log(logger, " Options.share_table_files: %d", + Log(logger, " Options.backup_dir: %s", backup_dir.c_str()); + Log(logger, " Options.backup_env: %p", backup_env); + Log(logger, " Options.share_table_files: %d", static_cast(share_table_files)); - Log(logger, " Options.info_log: %p", info_log); - Log(logger, " Options.sync: %d", static_cast(sync)); - Log(logger, " Options.destroy_old_data: %d", + Log(logger, " Options.info_log: %p", info_log); + Log(logger, " Options.sync: %d", static_cast(sync)); + Log(logger, " Options.destroy_old_data: %d", static_cast(destroy_old_data)); - Log(logger, " Options.backup_log_files: %d", + Log(logger, " Options.backup_log_files: %d", static_cast(backup_log_files)); - Log(logger, " Options.backup_rate_limit: %" PRIu64, backup_rate_limit); - Log(logger, "Options.restore_rate_limit: %" PRIu64, restore_rate_limit); + Log(logger, " Options.backup_rate_limit: %" PRIu64, backup_rate_limit); + Log(logger, " Options.restore_rate_limit: %" PRIu64, + restore_rate_limit); + Log(logger, "Options.max_background_operations: %d", + max_background_operations); } // -------- BackupEngineImpl class --------- @@ -303,21 +310,92 @@ class BackupEngineImpl : public BackupEngine { uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, uint64_t size_limit = 0); - // if size_limit == 0, there is no size limit, copy everything - Status BackupFile(BackupID backup_id, - BackupMeta* backup, - bool shared, - const std::string& src_dir, - const std::string& src_fname, // starts with "/" - BackupRateLimiter* rate_limiter, - uint64_t size_limit = 0, - bool shared_checksum = false); Status CalculateChecksum(const std::string& src, Env* src_env, uint64_t size_limit, uint32_t* checksum_value); + struct CopyResult { + uint64_t size; + uint32_t checksum_value; + Status status; + }; + struct CopyWorkItem { + std::string src_path; + std::string dst_path; + Env* src_env; + Env* dst_env; + bool sync; + BackupRateLimiter* rate_limiter; + uint64_t size_limit; + std::promise result; + CopyWorkItem() {} + CopyWorkItem(std::string _src_path, + std::string _dst_path, + Env* _src_env, + Env* _dst_env, + bool _sync, + BackupRateLimiter* _rate_limiter, + uint64_t _size_limit) + : src_path(std::move(_src_path)), + dst_path(std::move(_dst_path)), + src_env(_src_env), + dst_env(_dst_env), + sync(_sync), + rate_limiter(_rate_limiter), + size_limit(_size_limit) {} + }; + + struct BackupAfterCopyWorkItem { + std::future result; + bool shared; + bool needed_to_copy; + Env* backup_env; + std::string dst_path_tmp; + std::string dst_path; + std::string dst_relative; + BackupAfterCopyWorkItem() {} + BackupAfterCopyWorkItem(std::future _result, + bool _shared, + bool _needed_to_copy, + Env* _backup_env, + std::string _dst_path_tmp, + std::string _dst_path, + std::string _dst_relative) + : result(std::move(_result)), + shared(_shared), + needed_to_copy(_needed_to_copy), + backup_env(_backup_env), + dst_path_tmp(std::move(_dst_path_tmp)), + dst_path(std::move(_dst_path)), + dst_relative(std::move(_dst_relative)) {} + }; + + struct RestoreAfterCopyWorkItem { + std::future result; + uint32_t checksum_value; + RestoreAfterCopyWorkItem() {} + RestoreAfterCopyWorkItem(std::future _result, + uint32_t _checksum_value) + : result(std::move(_result)), + checksum_value(_checksum_value) {} + }; + + channel files_to_copy_; + std::vector threads_; + + Status AddBackupFileWorkItem( + std::unordered_set& live_dst_paths, + std::vector& backup_items_to_finish, + BackupID backup_id, + bool shared, + const std::string& src_dir, + const std::string& src_fname, // starts with "/" + BackupRateLimiter* rate_limiter, + uint64_t size_limit = 0, + bool shared_checksum = false); + // backup state data BackupID latest_backup_id_; std::map> backups_; @@ -365,6 +443,27 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, backup_env_(options.backup_env != nullptr ? options.backup_env : db_env_), copy_file_buffer_size_(kDefaultCopyFileBufferSize), read_only_(read_only) { + + // set up threads perform copies from files_to_copy_ in the background + for (int t = 0; t < options_.max_background_operations; t++) { + threads_.emplace_back([&]() { + CopyWorkItem work_item; + while (files_to_copy_.read(work_item)) { + CopyResult result; + result.status = CopyFile(work_item.src_path, + work_item.dst_path, + work_item.src_env, + work_item.dst_env, + work_item.sync, + work_item.rate_limiter, + &result.size, + &result.checksum_value, + work_item.size_limit); + work_item.result.set_value(std::move(result)); + } + }); + } + if (read_only_) { Log(options_.info_log, "Starting read_only backup engine"); } @@ -487,9 +586,20 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env, Log(options_.info_log, "Initialized BackupEngine"); } -BackupEngineImpl::~BackupEngineImpl() { LogFlush(options_.info_log); } +BackupEngineImpl::~BackupEngineImpl() { + files_to_copy_.sendEof(); + for (int i = 0; i < options_.max_background_operations; i++) { + threads_[i].join(); + } + LogFlush(options_.info_log); +} Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { + if (options_.max_background_operations > 1 && + options_.backup_rate_limit != 0) { + return Status::InvalidArgument( + "Multi-threaded backups cannot use a backup_rate_limit"); + } assert(!read_only_); Status s; std::vector live_files; @@ -539,7 +649,15 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { options_.backup_rate_limit, copy_file_buffer_size_)); } - // copy live_files + // A set into which we will insert the dst_paths that are calculated for live + // files and live WAL files. + // This is used to check whether a live files shares a dst_path with another + // live file. + std::unordered_set live_dst_paths; + live_dst_paths.reserve(live_files.size() + live_wal_files.size()); + + std::vector backup_items_to_finish; + // Add a CopyWorkItem to the channel for each live file for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { uint64_t number; FileType type; @@ -555,27 +673,49 @@ Status BackupEngineImpl::CreateNewBackup(DB* db, bool flush_before_backup) { // rules: // * if it's kTableFile, then it's shared // * if it's kDescriptorFile, limit the size to manifest_file_size - s = BackupFile(new_backup_id, - new_backup.get(), - options_.share_table_files && type == kTableFile, - db->GetName(), /* src_dir */ - live_files[i], /* src_fname */ - rate_limiter.get(), - (type == kDescriptorFile) ? manifest_file_size : 0, - options_.share_files_with_checksum && type == kTableFile); - } - - // copy WAL files + s = AddBackupFileWorkItem( + live_dst_paths, + backup_items_to_finish, + new_backup_id, + options_.share_table_files && type == kTableFile, + db->GetName(), + live_files[i], + rate_limiter.get(), + (type == kDescriptorFile) ? manifest_file_size : 0, + options_.share_files_with_checksum && type == kTableFile); + } + // Add a CopyWorkItem to the channel for each WAL file for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) { if (live_wal_files[i]->Type() == kAliveLogFile) { // we only care about live log files // copy the file into backup_dir/files// - s = BackupFile(new_backup_id, - new_backup.get(), - false, /* not shared */ - db->GetOptions().wal_dir, - live_wal_files[i]->PathName(), - rate_limiter.get()); + s = AddBackupFileWorkItem(live_dst_paths, + backup_items_to_finish, + new_backup_id, + false, /* not shared */ + db->GetOptions().wal_dir, + live_wal_files[i]->PathName(), + rate_limiter.get()); + } + } + + Status item_status; + for (auto& item : backup_items_to_finish) { + item.result.wait(); + auto result = item.result.get(); + item_status = result.status; + if (item_status.ok() && item.shared && item.needed_to_copy) { + item_status = item.backup_env->RenameFile(item.dst_path_tmp, + item.dst_path); + } + if (item_status.ok()) { + item_status = new_backup.get()->AddFile( + std::make_shared(item.dst_relative, + result.size, + result.checksum_value)); + } + if (!item_status.ok()) { + s = item_status; } } @@ -737,6 +877,11 @@ BackupEngineImpl::GetCorruptedBackups( Status BackupEngineImpl::RestoreDBFromBackup( BackupID backup_id, const std::string& db_dir, const std::string& wal_dir, const RestoreOptions& restore_options) { + if (options_.max_background_operations > 1 && + options_.restore_rate_limit != 0) { + return Status::InvalidArgument( + "Multi-threaded restores cannot use a restore_rate_limit"); + } auto corrupt_itr = corrupt_backups_.find(backup_id); if (corrupt_itr != corrupt_backups_.end()) { return corrupt_itr->second.first; @@ -794,6 +939,7 @@ Status BackupEngineImpl::RestoreDBFromBackup( options_.restore_rate_limit, copy_file_buffer_size_)); } Status s; + std::vector restore_items_to_finish; for (const auto& file_info : backup->GetFiles()) { const std::string &file = file_info->filename; std::string dst; @@ -823,14 +969,30 @@ Status BackupEngineImpl::RestoreDBFromBackup( "/" + dst; Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); - uint32_t checksum_value; - s = CopyFile(GetAbsolutePath(file), dst, backup_env_, db_env_, false, - rate_limiter.get(), nullptr /* size */, &checksum_value); - if (!s.ok()) { + CopyWorkItem copy_work_item(GetAbsolutePath(file), + dst, + backup_env_, + db_env_, + false, + rate_limiter.get(), + 0 /* size_limit */); + RestoreAfterCopyWorkItem after_copy_work_item( + copy_work_item.result.get_future(), + file_info->checksum_value); + files_to_copy_.write(std::move(copy_work_item)); + restore_items_to_finish.push_back(std::move(after_copy_work_item)); + } + Status item_status; + for (auto& item : restore_items_to_finish) { + item.result.wait(); + auto result = item.result.get(); + item_status = result.status; + // Note: It is possible that both of the following bad-status cases occur + // during copying. But, we only return one status. + if (!item_status.ok()) { + s = item_status; break; - } - - if (file_info->checksum_value != checksum_value) { + } else if (item.checksum_value != result.checksum_value) { s = Status::Corruption("Checksum check failed"); break; } @@ -972,13 +1134,16 @@ Status BackupEngineImpl::CopyFile( } // src_fname will always start with "/" -Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, - bool shared, const std::string& src_dir, - const std::string& src_fname, - BackupRateLimiter* rate_limiter, - uint64_t size_limit, - bool shared_checksum) { - +Status BackupEngineImpl::AddBackupFileWorkItem( + std::unordered_set& live_dst_paths, + std::vector& backup_items_to_finish, + BackupID backup_id, + bool shared, + const std::string& src_dir, + const std::string& src_fname, + BackupRateLimiter* rate_limiter, + uint64_t size_limit, + bool shared_checksum) { assert(src_fname.size() > 0 && src_fname[0] == '/'); std::string dst_relative = src_fname.substr(1); std::string dst_relative_tmp; @@ -1012,17 +1177,20 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, std::string dst_path = GetAbsolutePath(dst_relative); std::string dst_path_tmp = GetAbsolutePath(dst_relative_tmp); - // if it's shared, we also need to check if it exists -- if it does, - // no need to copy it again + // if it's shared, we also need to check if it exists -- if it does, no need + // to copy it again. bool need_to_copy = true; - if (shared && backup_env_->FileExists(dst_path)) { + // true if dst_path is the same path as another live file + const bool same_path = + live_dst_paths.find(dst_path) != live_dst_paths.end(); + if (shared && (backup_env_->FileExists(dst_path) || same_path)) { need_to_copy = false; if (shared_checksum) { Log(options_.info_log, "%s already present, with checksum %u and size %" PRIu64, src_fname.c_str(), checksum_value, size); } else if (backuped_file_infos_.find(dst_relative) == - backuped_file_infos_.end()) { + backuped_file_infos_.end() && !same_path) { // file already exists, but it's not referenced by any backup. overwrite // the file Log(options_.info_log, @@ -1040,25 +1208,44 @@ Status BackupEngineImpl::BackupFile(BackupID backup_id, BackupMeta* backup, &checksum_value); } } + live_dst_paths.insert(dst_path); + if (need_to_copy) { Log(options_.info_log, "Copying %s to %s", src_fname.c_str(), - dst_path_tmp.c_str()); - s = CopyFile(src_dir + src_fname, - dst_path_tmp, - db_env_, - backup_env_, - options_.sync, - rate_limiter, - &size, - &checksum_value, - size_limit); - if (s.ok() && shared) { - s = backup_env_->RenameFile(dst_path_tmp, dst_path); - } - } - if (s.ok()) { - s = backup->AddFile(std::make_shared( - dst_relative, size, checksum_value)); + dst_path_tmp.c_str()); + CopyWorkItem copy_work_item(src_dir + src_fname, + dst_path_tmp, + db_env_, + backup_env_, + options_.sync, + rate_limiter, + size_limit); + BackupAfterCopyWorkItem after_copy_work_item( + copy_work_item.result.get_future(), + shared, + need_to_copy, + backup_env_, + dst_path_tmp, + dst_path, + dst_relative); + files_to_copy_.write(std::move(copy_work_item)); + backup_items_to_finish.push_back(std::move(after_copy_work_item)); + } else { + std::promise promise_result; + BackupAfterCopyWorkItem after_copy_work_item( + promise_result.get_future(), + shared, + need_to_copy, + backup_env_, + dst_path_tmp, + dst_path, + dst_relative); + backup_items_to_finish.push_back(std::move(after_copy_work_item)); + CopyResult result; + result.status = s; + result.size = size; + result.checksum_value = checksum_value; + promise_result.set_value(std::move(result)); } return s; } diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 9c24140fb..29f3c1c01 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -21,6 +21,7 @@ #include "util/string_util.h" #include "util/testutil.h" #include "util/auto_roll_logger.h" +#include "util/mock_env.h" namespace rocksdb { @@ -361,6 +362,7 @@ class BackupableDBTest : public testing::Test { // set up envs env_ = Env::Default(); + mock_env_.reset(new MockEnv(env_)); test_db_env_.reset(new TestEnv(env_)); test_backup_env_.reset(new TestEnv(env_)); file_manager_.reset(new FileManager(env_)); @@ -377,6 +379,9 @@ class BackupableDBTest : public testing::Test { backupable_options_.reset(new BackupableDBOptions( backupdir_, test_backup_env_.get(), true, logger_.get(), true)); + // most tests will use multi-threaded backups + backupable_options_->max_background_operations = 7; + // delete old files in db DestroyDB(dbname_, Options()); } @@ -474,6 +479,7 @@ class BackupableDBTest : public testing::Test { // envs Env* env_; + unique_ptr mock_env_; unique_ptr test_db_env_; unique_ptr test_backup_env_; unique_ptr file_manager_; @@ -555,6 +561,30 @@ TEST_F(BackupableDBTest, NoDoubleCopy) { CloseBackupableDB(); } +// Verify that backup works when the database environment is not the same as +// the backup environment +// TODO(agf): Make all/most tests use different db and backup environments. +// This will probably require more implementation of MockEnv. +// For example, MockEnv::RenameFile() must be able to rename +// directories. +TEST_F(BackupableDBTest, DifferentEnvs) { + test_db_env_.reset(new TestEnv(mock_env_.get())); + options_.env = test_db_env_.get(); + + OpenBackupableDB(true, true); + + // should write 5 DB files + LATEST_BACKUP + one meta file + test_backup_env_->SetLimitWrittenFiles(7); + test_backup_env_->ClearWrittenFiles(); + test_db_env_->SetLimitWrittenFiles(0); + dummy_db_->live_files_ = { "/00010.sst", "/00011.sst", + "/CURRENT", "/MANIFEST-01" }; + dummy_db_->wal_files_ = {{"/00011.log", true}, {"/00012.log", false}}; + ASSERT_OK(db_->CreateNewBackup(false)); + + CloseBackupableDB(); +} + // test various kind of corruptions that may happen: // 1. Not able to write a file for backup - that backup should fail, // everything else should work @@ -966,6 +996,8 @@ TEST_F(BackupableDBTest, RateLimiting) { backupable_options_->backup_rate_limit = limit.first; backupable_options_->restore_rate_limit = limit.second; + // rate-limiting backups must be single-threaded + backupable_options_->max_background_operations = 1; options_.compression = kNoCompression; OpenBackupableDB(true); size_t bytes_written = FillDB(db_.get(), 0, 100000);