Handle concurrent manifest update and backup creation

Summary:
Fixed two related race conditions in backup creation.

(1) CreateNewBackup() uses DB::DisableFileDeletions() to prevent table files
from being deleted while it is copying; however, the MANIFEST file could still
rotate during this time. The fix is to stop deleting the old manifest in the
rotation logic. It will be deleted safely later when PurgeObsoleteFiles() runs
(can only happen when file deletions are enabled).

(2) CreateNewBackup() did not account for the CURRENT file being mutable.
This is significant because the files returned by GetLiveFiles() contain a
particular manifest filename, but the manifest to which CURRENT refers can
change at any time. This causes problems when CURRENT changes between the call
to GetLiveFiles() and when it's copied to the backup directory. To workaround this, I
manually forge a CURRENT file referring to the manifest filename returned in
GetLiveFiles().

(2) also applies to the checkpointing code, so let me know if this approach is
good and I'll make the same change there.

Test Plan:
new test for roll manifest during backup creation.

running the test before this change:

  $ ./backupable_db_test --gtest_filter=BackupableDBTest.ChangeManifestDuringBackupCreation
  ...
  IO error: /tmp/rocksdbtest-9383/backupable_db/MANIFEST-000001: No such file or directory

running the test after this change:

  $ ./backupable_db_test --gtest_filter=BackupableDBTest.ChangeManifestDuringBackupCreation
  ...
  [ RUN      ] BackupableDBTest.ChangeManifestDuringBackupCreation
  [       OK ] BackupableDBTest.ChangeManifestDuringBackupCreation (2836 ms)

Reviewers: IslamAbdelRahman, anthony, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D54711
main
Andrew Kryczka 9 years ago
parent 3492889ab8
commit 69c471bd9b
  1. 1
      db/deletefile_test.cc
  2. 12
      db/version_set.cc
  3. 290
      utilities/backupable/backupable_db.cc
  4. 28
      utilities/backupable/backupable_db_test.cc

@ -37,6 +37,7 @@ class DeleteFileTest : public testing::Test {
DeleteFileTest() { DeleteFileTest() {
db_ = nullptr; db_ = nullptr;
env_ = Env::Default(); env_ = Env::Default();
options_.delete_obsolete_files_period_micros = 0; // always do full purge
options_.enable_thread_tracking = true; options_.enable_thread_tracking = true;
options_.write_buffer_size = 1024*1024*1000; options_.write_buffer_size = 1024*1024*1000;
options_.target_file_size_base = 1024*1024*1000; options_.target_file_size_base = 1024*1024*1000;

@ -2252,15 +2252,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (s.ok() && new_descriptor_log) { if (s.ok() && new_descriptor_log) {
s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
db_options_->disableDataSync ? nullptr : db_directory); db_options_->disableDataSync ? nullptr : db_directory);
if (s.ok() && pending_manifest_file_number_ > manifest_file_number_) { // Leave the old file behind since PurgeObsoleteFiles will take care of it
// delete old manifest file // later. It's unsafe to delete now since file deletion may be disabled.
Log(InfoLogLevel::INFO_LEVEL, db_options_->info_log,
"Deleting manifest %" PRIu64 " current manifest %" PRIu64 "\n",
manifest_file_number_, pending_manifest_file_number_);
// we don't care about an error here, PurgeObsoleteFiles will take care
// of it later
env_->DeleteFile(DescriptorFileName(dbname_, manifest_file_number_));
}
} }
if (s.ok()) { if (s.ok()) {
@ -2275,6 +2268,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
} }
LogFlush(db_options_->info_log); LogFlush(db_options_->info_log);
TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
mu->Lock(); mu->Lock();
} }

@ -20,10 +20,11 @@
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "port/port.h" #include "port/port.h"
#include "util/sync_point.h"
#ifndef __STDC_FORMAT_MACROS #ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#endif #endif // __STDC_FORMAT_MACROS
#include <inttypes.h> #include <inttypes.h>
#include <stdlib.h> #include <stdlib.h>
@ -39,8 +40,6 @@
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include "port/port.h"
namespace rocksdb { namespace rocksdb {
@ -269,10 +268,18 @@ class BackupEngineImpl : public BackupEngine {
} }
Status PutLatestBackupFileContents(uint32_t latest_backup); Status PutLatestBackupFileContents(uint32_t latest_backup);
// if size_limit == 0, there is no size limit, copy everything
Status CopyFile(const std::string& src, const std::string& dst, Env* src_env, // If size_limit == 0, there is no size limit, copy everything.
//
// Exactly one of src and contents must be non-empty.
//
// @param src If non-empty, the file is copied from this pathname.
// @param contents If non-empty, the file will be created with these contents.
Status CopyOrCreateFile(const std::string& src, const std::string& dst,
const std::string& contents, Env* src_env,
Env* dst_env, bool sync, RateLimiter* rate_limiter, Env* dst_env, bool sync, RateLimiter* rate_limiter,
uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0, uint64_t size_limit = 0,
std::function<void()> progress_callback = []() {}); std::function<void()> progress_callback = []() {});
@ -281,31 +288,39 @@ class BackupEngineImpl : public BackupEngine {
uint64_t size_limit, uint64_t size_limit,
uint32_t* checksum_value); uint32_t* checksum_value);
struct CopyResult { struct CopyOrCreateResult {
uint64_t size; uint64_t size;
uint32_t checksum_value; uint32_t checksum_value;
Status status; Status status;
}; };
struct CopyWorkItem {
// Exactly one of src_path and contents must be non-empty. If src_path is
// non-empty, the file is copied from this pathname. Otherwise, if contents is
// non-empty, the file will be created at dst_path with these contents.
struct CopyOrCreateWorkItem {
std::string src_path; std::string src_path;
std::string dst_path; std::string dst_path;
std::string contents;
Env* src_env; Env* src_env;
Env* dst_env; Env* dst_env;
bool sync; bool sync;
RateLimiter* rate_limiter; RateLimiter* rate_limiter;
uint64_t size_limit; uint64_t size_limit;
std::promise<CopyResult> result; std::promise<CopyOrCreateResult> result;
std::function<void()> progress_callback; std::function<void()> progress_callback;
CopyWorkItem() {} CopyOrCreateWorkItem() {}
CopyWorkItem(const CopyWorkItem&) = delete; CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
CopyWorkItem& operator=(const CopyWorkItem&) = delete; CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
CopyWorkItem(CopyWorkItem&& o) ROCKSDB_NOEXCEPT { *this = std::move(o); } CopyOrCreateWorkItem(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
*this = std::move(o);
}
CopyWorkItem& operator=(CopyWorkItem&& o) ROCKSDB_NOEXCEPT { CopyOrCreateWorkItem& operator=(CopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
src_path = std::move(o.src_path); src_path = std::move(o.src_path);
dst_path = std::move(o.dst_path); dst_path = std::move(o.dst_path);
contents = std::move(o.contents);
src_env = o.src_env; src_env = o.src_env;
dst_env = o.dst_env; dst_env = o.dst_env;
sync = o.sync; sync = o.sync;
@ -316,12 +331,14 @@ class BackupEngineImpl : public BackupEngine {
return *this; return *this;
} }
CopyWorkItem(std::string _src_path, std::string _dst_path, Env* _src_env, CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
Env* _dst_env, bool _sync, RateLimiter* _rate_limiter, std::string _contents, Env* _src_env, Env* _dst_env,
bool _sync, RateLimiter* _rate_limiter,
uint64_t _size_limit, uint64_t _size_limit,
std::function<void()> _progress_callback = []() {}) std::function<void()> _progress_callback = []() {})
: src_path(std::move(_src_path)), : src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)), dst_path(std::move(_dst_path)),
contents(std::move(_contents)),
src_env(_src_env), src_env(_src_env),
dst_env(_dst_env), dst_env(_dst_env),
sync(_sync), sync(_sync),
@ -330,21 +347,23 @@ class BackupEngineImpl : public BackupEngine {
progress_callback(_progress_callback) {} progress_callback(_progress_callback) {}
}; };
struct BackupAfterCopyWorkItem { struct BackupAfterCopyOrCreateWorkItem {
std::future<CopyResult> result; std::future<CopyOrCreateResult> result;
bool shared; bool shared;
bool needed_to_copy; bool needed_to_copy;
Env* backup_env; Env* backup_env;
std::string dst_path_tmp; std::string dst_path_tmp;
std::string dst_path; std::string dst_path;
std::string dst_relative; std::string dst_relative;
BackupAfterCopyWorkItem() {} BackupAfterCopyOrCreateWorkItem() {}
BackupAfterCopyWorkItem(BackupAfterCopyWorkItem&& o) ROCKSDB_NOEXCEPT { BackupAfterCopyOrCreateWorkItem(BackupAfterCopyOrCreateWorkItem&& o)
ROCKSDB_NOEXCEPT {
*this = std::move(o); *this = std::move(o);
} }
BackupAfterCopyWorkItem& operator=(BackupAfterCopyWorkItem&& o) ROCKSDB_NOEXCEPT { BackupAfterCopyOrCreateWorkItem& operator=(
BackupAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
result = std::move(o.result); result = std::move(o.result);
shared = o.shared; shared = o.shared;
needed_to_copy = o.needed_to_copy; needed_to_copy = o.needed_to_copy;
@ -355,9 +374,10 @@ class BackupEngineImpl : public BackupEngine {
return *this; return *this;
} }
BackupAfterCopyWorkItem(std::future<CopyResult>&& _result, bool _shared, BackupAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
bool _needed_to_copy, Env* _backup_env, bool _shared, bool _needed_to_copy,
std::string _dst_path_tmp, std::string _dst_path, Env* _backup_env, std::string _dst_path_tmp,
std::string _dst_path,
std::string _dst_relative) std::string _dst_relative)
: result(std::move(_result)), : result(std::move(_result)),
shared(_shared), shared(_shared),
@ -368,18 +388,20 @@ class BackupEngineImpl : public BackupEngine {
dst_relative(std::move(_dst_relative)) {} dst_relative(std::move(_dst_relative)) {}
}; };
struct RestoreAfterCopyWorkItem { struct RestoreAfterCopyOrCreateWorkItem {
std::future<CopyResult> result; std::future<CopyOrCreateResult> result;
uint32_t checksum_value; uint32_t checksum_value;
RestoreAfterCopyWorkItem() {} RestoreAfterCopyOrCreateWorkItem() {}
RestoreAfterCopyWorkItem(std::future<CopyResult>&& _result, RestoreAfterCopyOrCreateWorkItem(std::future<CopyOrCreateResult>&& _result,
uint32_t _checksum_value) uint32_t _checksum_value)
: result(std::move(_result)), checksum_value(_checksum_value) {} : result(std::move(_result)), checksum_value(_checksum_value) {}
RestoreAfterCopyWorkItem(RestoreAfterCopyWorkItem&& o) ROCKSDB_NOEXCEPT { RestoreAfterCopyOrCreateWorkItem(RestoreAfterCopyOrCreateWorkItem&& o)
ROCKSDB_NOEXCEPT {
*this = std::move(o); *this = std::move(o);
} }
RestoreAfterCopyWorkItem& operator=(RestoreAfterCopyWorkItem&& o) ROCKSDB_NOEXCEPT { RestoreAfterCopyOrCreateWorkItem& operator=(
RestoreAfterCopyOrCreateWorkItem&& o) ROCKSDB_NOEXCEPT {
result = std::move(o.result); result = std::move(o.result);
checksum_value = o.checksum_value; checksum_value = o.checksum_value;
return *this; return *this;
@ -388,17 +410,27 @@ class BackupEngineImpl : public BackupEngine {
bool initialized_; bool initialized_;
std::mutex byte_report_mutex_; std::mutex byte_report_mutex_;
channel<CopyWorkItem> files_to_copy_; channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
std::vector<std::thread> threads_; std::vector<std::thread> threads_;
// Adds a file to the backup work queue to be copied or created if it doesn't
// already exist.
//
// Exactly one of src_dir and contents must be non-empty.
//
// @param src_dir If non-empty, the file in this directory named fname will be
// copied.
// @param fname Name of destination file and, in case of copy, source file.
// @param contents If non-empty, the file will be created with these contents.
Status AddBackupFileWorkItem( Status AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths, std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish, std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir, BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& src_fname, // starts with "/" const std::string& fname, // starts with "/"
RateLimiter* rate_limiter, uint64_t size_limit = 0, RateLimiter* rate_limiter, uint64_t size_limit = 0,
bool shared_checksum = false, bool shared_checksum = false,
std::function<void()> progress_callback = []() {}); std::function<void()> progress_callback = []() {},
const std::string& contents = std::string());
// backup state data // backup state data
BackupID latest_backup_id_; BackupID latest_backup_id_;
@ -451,7 +483,7 @@ BackupEngineImpl::BackupEngineImpl(Env* db_env,
read_only_(read_only) {} read_only_(read_only) {}
BackupEngineImpl::~BackupEngineImpl() { BackupEngineImpl::~BackupEngineImpl() {
files_to_copy_.sendEof(); files_to_copy_or_create_.sendEof();
for (auto& t : threads_) { for (auto& t : threads_) {
t.join(); t.join();
} }
@ -571,17 +603,18 @@ Status BackupEngineImpl::Initialize() {
} }
} }
// set up threads perform copies from files_to_copy_ in the background // set up threads perform copies from files_to_copy_or_create_ in the
// background
for (int t = 0; t < options_.max_background_operations; t++) { for (int t = 0; t < options_.max_background_operations; t++) {
threads_.emplace_back([&]() { threads_.emplace_back([&]() {
CopyWorkItem work_item; CopyOrCreateWorkItem work_item;
while (files_to_copy_.read(work_item)) { while (files_to_copy_or_create_.read(work_item)) {
CopyResult result; CopyOrCreateResult result;
result.status = result.status = CopyOrCreateFile(
CopyFile(work_item.src_path, work_item.dst_path, work_item.src_env, work_item.src_path, work_item.dst_path, work_item.contents,
work_item.dst_env, work_item.sync, work_item.rate_limiter, work_item.src_env, work_item.dst_env, work_item.sync,
&result.size, &result.checksum_value, work_item.size_limit, work_item.rate_limiter, &result.size, &result.checksum_value,
work_item.progress_callback); work_item.size_limit, work_item.progress_callback);
work_item.result.set_value(std::move(result)); work_item.result.set_value(std::move(result));
} }
}); });
@ -616,6 +649,8 @@ Status BackupEngineImpl::CreateNewBackup(
db->EnableFileDeletions(false); db->EnableFileDeletions(false);
return s; return s;
} }
TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles1");
TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles2");
BackupID new_backup_id = latest_backup_id_ + 1; BackupID new_backup_id = latest_backup_id_ + 1;
assert(backups_.find(new_backup_id) == backups_.end()); assert(backups_.find(new_backup_id) == backups_.end());
@ -650,8 +685,9 @@ Status BackupEngineImpl::CreateNewBackup(
std::unordered_set<std::string> live_dst_paths; std::unordered_set<std::string> live_dst_paths;
live_dst_paths.reserve(live_files.size() + live_wal_files.size()); live_dst_paths.reserve(live_files.size() + live_wal_files.size());
std::vector<BackupAfterCopyWorkItem> backup_items_to_finish; std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
// Add a CopyWorkItem to the channel for each live file // Add a CopyOrCreateWorkItem to the channel for each live file
std::string manifest_fname, current_fname;
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { for (size_t i = 0; s.ok() && i < live_files.size(); ++i) {
uint64_t number; uint64_t number;
FileType type; FileType type;
@ -663,6 +699,15 @@ Status BackupEngineImpl::CreateNewBackup(
// we should only get sst, manifest and current files here // we should only get sst, manifest and current files here
assert(type == kTableFile || type == kDescriptorFile || assert(type == kTableFile || type == kDescriptorFile ||
type == kCurrentFile); type == kCurrentFile);
if (type == kCurrentFile) {
// We will craft the current file manually to ensure it's consistent with
// the manifest number. This is necessary because current's file contents
// can change during backup.
current_fname = live_files[i];
continue;
} else if (type == kDescriptorFile) {
manifest_fname = live_files[i];
}
// rules: // rules:
// * if it's kTableFile, then it's shared // * if it's kTableFile, then it's shared
@ -675,7 +720,15 @@ Status BackupEngineImpl::CreateNewBackup(
options_.share_files_with_checksum && type == kTableFile, options_.share_files_with_checksum && type == kTableFile,
progress_callback); progress_callback);
} }
// Add a CopyWorkItem to the channel for each WAL file if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
// Write the current file with the manifest filename as its contents.
s = AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
false /* shared */, "" /* src_dir */, CurrentFileName(""),
rate_limiter.get(), 0 /* size_limit */, false /* shared_checksum */,
progress_callback, manifest_fname.substr(1) + "\n");
}
// Add a CopyOrCreateWorkItem to the channel for each WAL file
for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) { for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
if (live_wal_files[i]->Type() == kAliveLogFile) { if (live_wal_files[i]->Type() == kAliveLogFile) {
// we only care about live log files // we only care about live log files
@ -938,7 +991,7 @@ Status BackupEngineImpl::RestoreDBFromBackup(
copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes(); copy_file_buffer_size_ = rate_limiter->GetSingleBurstBytes();
} }
Status s; Status s;
std::vector<RestoreAfterCopyWorkItem> restore_items_to_finish; std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
for (const auto& file_info : backup->GetFiles()) { for (const auto& file_info : backup->GetFiles()) {
const std::string &file = file_info->filename; const std::string &file = file_info->filename;
std::string dst; std::string dst;
@ -968,18 +1021,15 @@ Status BackupEngineImpl::RestoreDBFromBackup(
"/" + dst; "/" + dst;
Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str()); Log(options_.info_log, "Restoring %s to %s\n", file.c_str(), dst.c_str());
CopyWorkItem copy_work_item(GetAbsolutePath(file), CopyOrCreateWorkItem copy_or_create_work_item(
dst, GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
backup_env_, false, rate_limiter.get(), 0 /* size_limit */);
db_env_, RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
false, copy_or_create_work_item.result.get_future(),
rate_limiter.get(),
0 /* size_limit */);
RestoreAfterCopyWorkItem after_copy_work_item(
copy_work_item.result.get_future(),
file_info->checksum_value); file_info->checksum_value);
files_to_copy_.write(std::move(copy_work_item)); files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
restore_items_to_finish.push_back(std::move(after_copy_work_item)); restore_items_to_finish.push_back(
std::move(after_copy_or_create_work_item));
} }
Status item_status; Status item_status;
for (auto& item : restore_items_to_finish) { for (auto& item : restore_items_to_finish) {
@ -1078,12 +1128,12 @@ Status BackupEngineImpl::PutLatestBackupFileContents(uint32_t latest_backup) {
return s; return s;
} }
Status BackupEngineImpl::CopyFile(const std::string& src, Status BackupEngineImpl::CopyOrCreateFile(
const std::string& dst, Env* src_env, const std::string& src, const std::string& dst, const std::string& contents,
Env* dst_env, bool sync, Env* src_env, Env* dst_env, bool sync, RateLimiter* rate_limiter,
RateLimiter* rate_limiter, uint64_t* size, uint64_t* size, uint32_t* checksum_value, uint64_t size_limit,
uint32_t* checksum_value, uint64_t size_limit,
std::function<void()> progress_callback) { std::function<void()> progress_callback) {
assert(src.empty() != contents.empty());
Status s; Status s;
unique_ptr<WritableFile> dst_file; unique_ptr<WritableFile> dst_file;
unique_ptr<SequentialFile> src_file; unique_ptr<SequentialFile> src_file;
@ -1102,9 +1152,9 @@ Status BackupEngineImpl::CopyFile(const std::string& src,
size_limit = std::numeric_limits<uint64_t>::max(); size_limit = std::numeric_limits<uint64_t>::max();
} }
s = src_env->NewSequentialFile(src, &src_file, env_options);
if (s.ok()) {
s = dst_env->NewWritableFile(dst, &dst_file, env_options); s = dst_env->NewWritableFile(dst, &dst_file, env_options);
if (s.ok() && !src.empty()) {
s = src_env->NewSequentialFile(src, &src_file, env_options);
} }
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -1112,19 +1162,28 @@ Status BackupEngineImpl::CopyFile(const std::string& src,
unique_ptr<WritableFileWriter> dest_writer( unique_ptr<WritableFileWriter> dest_writer(
new WritableFileWriter(std::move(dst_file), env_options)); new WritableFileWriter(std::move(dst_file), env_options));
unique_ptr<SequentialFileReader> src_reader( unique_ptr<SequentialFileReader> src_reader;
new SequentialFileReader(std::move(src_file))); unique_ptr<char[]> buf;
unique_ptr<char[]> buf(new char[copy_file_buffer_size_]); if (!src.empty()) {
Slice data; src_reader.reset(new SequentialFileReader(std::move(src_file)));
buf.reset(new char[copy_file_buffer_size_]);
}
Slice data;
uint64_t processed_buffer_size = 0; uint64_t processed_buffer_size = 0;
do { do {
if (stop_backup_.load(std::memory_order_acquire)) { if (stop_backup_.load(std::memory_order_acquire)) {
return Status::Incomplete("Backup stopped"); return Status::Incomplete("Backup stopped");
} }
size_t buffer_to_read = (copy_file_buffer_size_ < size_limit) ? if (!src.empty()) {
copy_file_buffer_size_ : size_limit; size_t buffer_to_read = (copy_file_buffer_size_ < size_limit)
? copy_file_buffer_size_
: size_limit;
s = src_reader->Read(buffer_to_read, &data, buf.get()); s = src_reader->Read(buffer_to_read, &data, buf.get());
processed_buffer_size += buffer_to_read;
} else {
data = contents;
}
size_limit -= data.size(); size_limit -= data.size();
if (!s.ok()) { if (!s.ok()) {
@ -1135,38 +1194,38 @@ Status BackupEngineImpl::CopyFile(const std::string& src,
*size += data.size(); *size += data.size();
} }
if (checksum_value != nullptr) { if (checksum_value != nullptr) {
*checksum_value = crc32c::Extend(*checksum_value, data.data(), *checksum_value =
data.size()); crc32c::Extend(*checksum_value, data.data(), data.size());
} }
s = dest_writer->Append(data); s = dest_writer->Append(data);
if (rate_limiter != nullptr) { if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW); rate_limiter->Request(data.size(), Env::IO_LOW);
} }
processed_buffer_size += buffer_to_read;
if (processed_buffer_size > options_.callback_trigger_interval_size) { if (processed_buffer_size > options_.callback_trigger_interval_size) {
processed_buffer_size -= options_.callback_trigger_interval_size; processed_buffer_size -= options_.callback_trigger_interval_size;
std::lock_guard<std::mutex> lock(byte_report_mutex_); std::lock_guard<std::mutex> lock(byte_report_mutex_);
progress_callback(); progress_callback();
} }
} while (s.ok() && data.size() > 0 && size_limit > 0); } while (s.ok() && contents.empty() && data.size() > 0 && size_limit > 0);
if (s.ok() && sync) { if (s.ok() && sync) {
s = dest_writer->Sync(false); s = dest_writer->Sync(false);
} }
return s; return s;
} }
// src_fname will always start with "/" // fname will always start with "/"
Status BackupEngineImpl::AddBackupFileWorkItem( Status BackupEngineImpl::AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths, std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyWorkItem>& backup_items_to_finish, std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir, BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& src_fname, RateLimiter* rate_limiter, const std::string& fname, RateLimiter* rate_limiter, uint64_t size_limit,
uint64_t size_limit, bool shared_checksum, bool shared_checksum, std::function<void()> progress_callback,
std::function<void()> progress_callback) { const std::string& contents) {
assert(src_fname.size() > 0 && src_fname[0] == '/'); assert(!fname.empty() && fname[0] == '/');
std::string dst_relative = src_fname.substr(1); assert(contents.empty() != src_dir.empty());
std::string dst_relative = fname.substr(1);
std::string dst_relative_tmp; std::string dst_relative_tmp;
Status s; Status s;
uint64_t size; uint64_t size;
@ -1174,12 +1233,10 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
if (shared && shared_checksum) { if (shared && shared_checksum) {
// add checksum and file length to the file name // add checksum and file length to the file name
s = CalculateChecksum(src_dir + src_fname, s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
db_env_,
size_limit,
&checksum_value); &checksum_value);
if (s.ok()) { if (s.ok()) {
s = db_env_->GetFileSize(src_dir + src_fname, &size); s = db_env_->GetFileSize(src_dir + fname, &size);
} }
if (!s.ok()) { if (!s.ok()) {
return s; return s;
@ -1218,12 +1275,14 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
} }
} }
if (shared && (same_path || file_exists)) { if (!contents.empty()) {
need_to_copy = false;
} else if (shared && (same_path || file_exists)) {
need_to_copy = false; need_to_copy = false;
if (shared_checksum) { if (shared_checksum) {
Log(options_.info_log, Log(options_.info_log,
"%s already present, with checksum %u and size %" PRIu64, "%s already present, with checksum %u and size %" PRIu64,
src_fname.c_str(), checksum_value, size); fname.c_str(), checksum_value, size);
} else if (backuped_file_infos_.find(dst_relative) == } else if (backuped_file_infos_.find(dst_relative) ==
backuped_file_infos_.end() && !same_path) { backuped_file_infos_.end() && !same_path) {
// file already exists, but it's not referenced by any backup. overwrite // file already exists, but it's not referenced by any backup. overwrite
@ -1231,48 +1290,39 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
Log(options_.info_log, Log(options_.info_log,
"%s already present, but not referenced by any backup. We will " "%s already present, but not referenced by any backup. We will "
"overwrite the file.", "overwrite the file.",
src_fname.c_str()); fname.c_str());
need_to_copy = true; need_to_copy = true;
backup_env_->DeleteFile(dst_path); backup_env_->DeleteFile(dst_path);
} else { } else {
// the file is present and referenced by a backup // the file is present and referenced by a backup
db_env_->GetFileSize(src_dir + src_fname, &size); // Ignore error db_env_->GetFileSize(src_dir + fname, &size); // Ignore error
Log(options_.info_log, "%s already present, calculate checksum", Log(options_.info_log, "%s already present, calculate checksum",
src_fname.c_str()); fname.c_str());
s = CalculateChecksum(src_dir + src_fname, db_env_, size_limit, s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
&checksum_value); &checksum_value);
} }
} }
live_dst_paths.insert(dst_path); live_dst_paths.insert(dst_path);
if (need_to_copy) { if (!contents.empty() || need_to_copy) {
Log(options_.info_log, "Copying %s to %s", src_fname.c_str(), Log(options_.info_log, "Copying %s to %s", fname.c_str(),
dst_path_tmp.c_str()); dst_path_tmp.c_str());
CopyWorkItem copy_work_item(src_dir + src_fname, dst_path_tmp, db_env_, CopyOrCreateWorkItem copy_or_create_work_item(
backup_env_, options_.sync, rate_limiter, src_dir.empty() ? "" : src_dir + fname, dst_path_tmp, contents, db_env_,
size_limit, progress_callback); backup_env_, options_.sync, rate_limiter, size_limit,
BackupAfterCopyWorkItem after_copy_work_item( progress_callback);
copy_work_item.result.get_future(), BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
shared, copy_or_create_work_item.result.get_future(), shared, need_to_copy,
need_to_copy, backup_env_, dst_path_tmp, dst_path, dst_relative);
backup_env_, files_to_copy_or_create_.write(std::move(copy_or_create_work_item));
dst_path_tmp, backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
dst_path,
dst_relative);
files_to_copy_.write(std::move(copy_work_item));
backup_items_to_finish.push_back(std::move(after_copy_work_item));
} else { } else {
std::promise<CopyResult> promise_result; std::promise<CopyOrCreateResult> promise_result;
BackupAfterCopyWorkItem after_copy_work_item( BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
promise_result.get_future(), promise_result.get_future(), shared, need_to_copy, backup_env_,
shared, dst_path_tmp, dst_path, dst_relative);
need_to_copy, backup_items_to_finish.push_back(std::move(after_copy_or_create_work_item));
backup_env_, CopyOrCreateResult result;
dst_path_tmp,
dst_path,
dst_relative);
backup_items_to_finish.push_back(std::move(after_copy_work_item));
CopyResult result;
result.status = s; result.status = s;
result.size = size; result.size = size;
result.checksum_value = checksum_value; result.checksum_value = checksum_value;

@ -24,6 +24,7 @@
#include "util/random.h" #include "util/random.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testutil.h" #include "util/testutil.h"
#include "util/mock_env.h" #include "util/mock_env.h"
#include "utilities/backupable/backupable_db_testutil.h" #include "utilities/backupable/backupable_db_testutil.h"
@ -1317,6 +1318,33 @@ TEST_F(BackupableDBTest, EnvFailures) {
} }
} }
// Verify manifest can roll while a backup is being created with the old
// manifest.
TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) {
DestroyDB(dbname_, Options());
options_.max_manifest_file_size = 0; // always rollover manifest for file add
OpenDBAndBackupEngine(true);
FillDB(db_.get(), 0, 100);
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"BackupEngineImpl::CreateNewBackup:SavedLiveFiles1",
"VersionSet::LogAndApply:WriteManifest"},
{"VersionSet::LogAndApply:WriteManifestDone",
"BackupEngineImpl::CreateNewBackup:SavedLiveFiles2"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread flush_thread{[this]() { ASSERT_OK(db_->Flush(FlushOptions())); }};
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), false));
flush_thread.join();
CloseDBAndBackupEngine();
DestroyDB(dbname_, Options());
AssertBackupConsistency(0, 0, 100);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// see https://github.com/facebook/rocksdb/issues/921 // see https://github.com/facebook/rocksdb/issues/921
TEST_F(BackupableDBTest, Issue921Test) { TEST_F(BackupableDBTest, Issue921Test) {
BackupEngine* backup_engine; BackupEngine* backup_engine;

Loading…
Cancel
Save