Make backup restore atomic, with sync option (#8568)

Summary:
Guarantees that if a restore is interrupted, DB::Open will fail. This works by
restoring CURRENT first to CURRENT.tmp then as a final step renaming to CURRENT.

Also makes restore respect BackupEngineOptions::sync (default true). When set,
the restore is guaranteed persisted by the time it returns OK. Also makes the above
atomicity guarantee work in case the interruption is power loss or OS crash (not just
process interruption or crash).

Fixes https://github.com/facebook/rocksdb/issues/8500

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8568

Test Plan:
added to backup mini-stress unit test. Passes with
gtest_repeat=100 (whereas fails 7 times without the CURRENT.tmp)

Reviewed By: akankshamahajan15

Differential Revision: D29812605

Pulled By: pdillinger

fbshipit-source-id: 24e9a993b305b1835ca95558fa7a7152e54cda8e
main
Peter Dillinger 3 years ago committed by Facebook GitHub Bot
parent 8ca081780b
commit a7fd1d0881
  1. 2
      HISTORY.md
  2. 9
      include/rocksdb/utilities/backup_engine.h
  3. 57
      utilities/backupable/backupable_db.cc
  4. 174
      utilities/backupable/backupable_db_test.cc

@ -2,6 +2,7 @@
## Unreleased ## Unreleased
### Bug Fixes ### Bug Fixes
* If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file. * If the primary's CURRENT file is missing or inaccessible, the secondary instance should not hang repeatedly trying to switch to a new MANIFEST. It should instead return the error code encountered while accessing the file.
* Restoring backups with BackupEngine is now a logically atomic operation, so that if a restore operation is interrupted, DB::Open on it will fail. Using BackupEngineOptions::sync (default) ensures atomicity even in case of power loss or OS crash.
* Fixed a race related to the destruction of `ColumnFamilyData` objects. The earlier logic unlocked the DB mutex before destroying the thread-local `SuperVersion` pointers, which could result in a process crash if another thread managed to get a reference to the `ColumnFamilyData` object. * Fixed a race related to the destruction of `ColumnFamilyData` objects. The earlier logic unlocked the DB mutex before destroying the thread-local `SuperVersion` pointers, which could result in a process crash if another thread managed to get a reference to the `ColumnFamilyData` object.
* Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file. * Removed a call to `RenameFile()` on a non-existent info log file ("LOG") when opening a new DB. Such a call was guaranteed to fail though did not impact applications since we swallowed the error. Now we also stopped swallowing errors in renaming "LOG" file.
* Fixed an issue where `OnFlushCompleted` was not called for atomic flush. * Fixed an issue where `OnFlushCompleted` was not called for atomic flush.
@ -17,6 +18,7 @@
### Behavior Changes ### Behavior Changes
* `StringAppendOperator` additionally accepts a string as the delimiter. * `StringAppendOperator` additionally accepts a string as the delimiter.
* BackupEngineOptions::sync (default true) now applies to restoring backups in addition to creating backups. This could slow down restores, but ensures they are fully persisted before returning OK. (Consider increasing max_background_operations to improve performance.)
## 6.23.0 (2021-07-16) ## 6.23.0 (2021-07-16)
### Behavior Changes ### Behavior Changes

@ -56,10 +56,11 @@ struct BackupEngineOptions {
// Default: nullptr // Default: nullptr
Logger* info_log; Logger* info_log;
// If sync == true, we can guarantee you'll get consistent backup even // If sync == true, we can guarantee you'll get consistent backup and
// on a machine crash/reboot. Backup process is slower with sync enabled. // restore even on a machine crash/reboot. Backup and restore processes are
// If sync == false, we don't guarantee anything on machine reboot. However, // slower with sync enabled. If sync == false, we can only guarantee that
// chances are some of the backups are consistent. // other previously synced backups and restores are not modified while
// creating a new one.
// Default: true // Default: true
bool sync; bool sync;

@ -1707,6 +1707,11 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
} }
Status s; Status s;
std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish; std::vector<RestoreAfterCopyOrCreateWorkItem> restore_items_to_finish;
std::string temporary_current_file;
std::string final_current_file;
std::unique_ptr<Directory> db_dir_for_fsync;
std::unique_ptr<Directory> wal_dir_for_fsync;
for (const auto& file_info : backup->GetFiles()) { for (const auto& file_info : backup->GetFiles()) {
const std::string& file = file_info->filename; const std::string& file = file_info->filename;
// 1. get DB filename // 1. get DB filename
@ -1722,13 +1727,36 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
} }
// 3. Construct the final path // 3. Construct the final path
// kWalFile lives in wal_dir and all the rest live in db_dir // kWalFile lives in wal_dir and all the rest live in db_dir
dst = ((type == kWalFile) ? wal_dir : db_dir) + "/" + dst; if (type == kWalFile) {
dst = wal_dir + "/" + dst;
if (options_.sync && !wal_dir_for_fsync) {
s = db_env_->NewDirectory(wal_dir, &wal_dir_for_fsync);
if (!s.ok()) {
return s;
}
}
} else {
dst = db_dir + "/" + dst;
if (options_.sync && !db_dir_for_fsync) {
s = db_env_->NewDirectory(db_dir, &db_dir_for_fsync);
if (!s.ok()) {
return s;
}
}
}
// For atomicity, initially restore CURRENT file to a temporary name.
// This is useful even without options_.sync e.g. in case the restore
// process is interrupted.
if (type == kCurrentFile) {
final_current_file = dst;
dst = temporary_current_file = dst + ".tmp";
}
ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(), ROCKS_LOG_INFO(options_.info_log, "Restoring %s to %s\n", file.c_str(),
dst.c_str()); dst.c_str());
CopyOrCreateWorkItem copy_or_create_work_item( CopyOrCreateWorkItem copy_or_create_work_item(
GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_, GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
EnvOptions() /* src_env_options */, false, rate_limiter, EnvOptions() /* src_env_options */, options_.sync, rate_limiter,
0 /* size_limit */); 0 /* size_limit */);
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), file, dst, copy_or_create_work_item.result.get_future(), file, dst,
@ -1757,6 +1785,31 @@ Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
} }
} }
// When enabled, the first Fsync is to ensure all files are fully persisted
// before renaming CURRENT.tmp
if (s.ok() && db_dir_for_fsync) {
ROCKS_LOG_INFO(options_.info_log, "Restore: fsync\n");
s = db_dir_for_fsync->Fsync();
}
if (s.ok() && wal_dir_for_fsync) {
s = wal_dir_for_fsync->Fsync();
}
if (s.ok() && !temporary_current_file.empty()) {
ROCKS_LOG_INFO(options_.info_log, "Restore: atomic rename CURRENT.tmp\n");
assert(!final_current_file.empty());
s = db_env_->RenameFile(temporary_current_file, final_current_file);
}
if (s.ok() && db_dir_for_fsync && !temporary_current_file.empty()) {
// Second Fsync is to ensure the final atomic rename of DB restore is
// fully persisted even if power goes out right after restore operation
// returns success
assert(db_dir_for_fsync);
s = db_dir_for_fsync->Fsync();
}
ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n", ROCKS_LOG_INFO(options_.info_log, "Restoring done -- %s\n",
s.ToString().c_str()); s.ToString().c_str());
return s; return s;

@ -642,9 +642,22 @@ class BackupEngineTest : public testing::Test {
CreateLoggerFromOptions(dbname_, logger_options, &logger_) CreateLoggerFromOptions(dbname_, logger_options, &logger_)
.PermitUncheckedError(); .PermitUncheckedError();
// The sync option is not easily testable in unit tests, but should be
// smoke tested across all the other backup tests. However, it is
// certainly not worth doubling the runtime of backup tests for it.
// Thus, we can enable sync for one of our alternate testing
// configurations.
constexpr bool kUseSync =
#ifdef ROCKSDB_MODIFY_NPHASH
true;
#else
false;
#endif // ROCKSDB_MODIFY_NPHASH
// set up backup db options // set up backup db options
backupable_options_.reset(new BackupableDBOptions( backupable_options_.reset(new BackupableDBOptions(
backupdir_, test_backup_env_.get(), true, logger_.get(), true)); backupdir_, test_backup_env_.get(), /*share_table_files*/ true,
logger_.get(), kUseSync));
// most tests will use multi-threaded backups // most tests will use multi-threaded backups
backupable_options_->max_background_operations = 7; backupable_options_->max_background_operations = 7;
@ -3122,82 +3135,108 @@ TEST_F(BackupEngineTest, Concurrency) {
Options db_opts = options_; Options db_opts = options_;
db_opts.wal_dir = ""; db_opts.wal_dir = "";
db_opts.create_if_missing = false;
BackupableDBOptions be_opts = *backupable_options_; BackupableDBOptions be_opts = *backupable_options_;
be_opts.destroy_old_data = false; be_opts.destroy_old_data = false;
std::mt19937 rng{std::random_device()()}; std::mt19937 rng{std::random_device()()};
std::array<std::thread, 4> read_threads; std::array<std::thread, 4> read_threads;
std::array<std::thread, 4> restore_verify_threads;
for (uint32_t i = 0; i < read_threads.size(); ++i) { for (uint32_t i = 0; i < read_threads.size(); ++i) {
uint32_t sleep_micros = rng() % 100000; uint32_t sleep_micros = rng() % 100000;
read_threads[i] = std::thread([this, i, sleep_micros, &db_opts, &be_opts] { read_threads[i] = std::thread(
test_db_env_->SleepForMicroseconds(sleep_micros); [this, i, sleep_micros, &db_opts, &be_opts, &restore_verify_threads] {
test_db_env_->SleepForMicroseconds(sleep_micros);
// Whether to also re-open the BackupEngine, potentially seeing
// additional backups
bool reopen = i == 3;
// Whether we are going to restore "latest"
bool latest = i > 1;
BackupEngine* my_be;
if (reopen) {
ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be));
} else {
my_be = backup_engine_.get();
}
// Whether to also re-open the BackupEngine, potentially seeing // Verify metadata (we don't receive updates from concurrently
// additional backups // creating a new backup)
bool reopen = i == 3; std::vector<BackupInfo> infos;
// Whether we are going to restore "latest" my_be->GetBackupInfo(&infos);
bool latest = i > 1; const uint32_t count = static_cast<uint32_t>(infos.size());
infos.clear();
if (reopen) {
ASSERT_GE(count, 2U);
ASSERT_LE(count, 4U);
fprintf(stderr, "Reopen saw %u backups\n", count);
} else {
ASSERT_EQ(count, 2U);
}
std::vector<BackupID> ids;
my_be->GetCorruptedBackups(&ids);
ASSERT_EQ(ids.size(), 0U);
// (Eventually, see below) Restore one of the backups, or "latest"
std::string restore_db_dir = dbname_ + "/restore" + ToString(i);
DestroyDir(test_db_env_.get(), restore_db_dir).PermitUncheckedError();
BackupID to_restore;
if (latest) {
to_restore = count;
} else {
to_restore = i + 1;
}
BackupEngine* my_be; // Open restored DB to verify its contents, but test atomic restore
if (reopen) { // by doing it async and ensuring we either get OK or InvalidArgument
ASSERT_OK(BackupEngine::Open(test_db_env_.get(), be_opts, &my_be)); restore_verify_threads[i] =
} else { std::thread([this, &db_opts, restore_db_dir, to_restore] {
my_be = backup_engine_.get(); DB* restored;
} Status s;
for (;;) {
s = DB::Open(db_opts, restore_db_dir, &restored);
if (s.IsInvalidArgument()) {
// Restore hasn't finished
test_db_env_->SleepForMicroseconds(1000);
continue;
} else {
// We should only get InvalidArgument if restore is
// incomplete, or OK if complete
ASSERT_OK(s);
break;
}
}
int factor = std::min(static_cast<int>(to_restore), max_factor);
AssertExists(restored, 0, factor * keys_iteration);
AssertEmpty(restored, factor * keys_iteration,
(factor + 1) * keys_iteration);
delete restored;
});
// (Ok now) Restore one of the backups, or "latest"
if (latest) {
ASSERT_OK(my_be->RestoreDBFromLatestBackup(restore_db_dir,
restore_db_dir));
} else {
ASSERT_OK(my_be->VerifyBackup(to_restore, true));
ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir,
restore_db_dir));
}
// Verify metadata (we don't receive updates from concurrently // Re-verify metadata (we don't receive updates from concurrently
// creating a new backup) // creating a new backup)
std::vector<BackupInfo> infos; my_be->GetBackupInfo(&infos);
my_be->GetBackupInfo(&infos); ASSERT_EQ(infos.size(), count);
const uint32_t count = static_cast<uint32_t>(infos.size()); my_be->GetCorruptedBackups(&ids);
infos.clear(); ASSERT_EQ(ids.size(), 0);
if (reopen) { // fprintf(stderr, "Finished read thread\n");
ASSERT_GE(count, 2U);
ASSERT_LE(count, 4U);
fprintf(stderr, "Reopen saw %u backups\n", count);
} else {
ASSERT_EQ(count, 2U);
}
std::vector<BackupID> ids;
my_be->GetCorruptedBackups(&ids);
ASSERT_EQ(ids.size(), 0U);
// Restore one of the backups, or "latest"
std::string restore_db_dir = dbname_ + "/restore" + ToString(i);
BackupID to_restore;
if (latest) {
to_restore = count;
ASSERT_OK(
my_be->RestoreDBFromLatestBackup(restore_db_dir, restore_db_dir));
} else {
to_restore = i + 1;
ASSERT_OK(my_be->VerifyBackup(to_restore, true));
ASSERT_OK(my_be->RestoreDBFromBackup(to_restore, restore_db_dir,
restore_db_dir));
}
// Open restored DB to verify its contents if (reopen) {
DB* restored; delete my_be;
ASSERT_OK(DB::Open(db_opts, restore_db_dir, &restored)); }
int factor = std::min(static_cast<int>(to_restore), max_factor); });
AssertExists(restored, 0, factor * keys_iteration);
AssertEmpty(restored, factor * keys_iteration,
(factor + 1) * keys_iteration);
delete restored;
// Re-verify metadata (we don't receive updates from concurrently
// creating a new backup)
my_be->GetBackupInfo(&infos);
ASSERT_EQ(infos.size(), count);
my_be->GetCorruptedBackups(&ids);
ASSERT_EQ(ids.size(), 0);
// fprintf(stderr, "Finished read thread\n");
if (reopen) {
delete my_be;
}
});
} }
BackupEngine* alt_be; BackupEngine* alt_be;
@ -3229,6 +3268,11 @@ TEST_F(BackupEngineTest, Concurrency) {
} }
delete alt_be; delete alt_be;
for (auto& t : restore_verify_threads) {
t.join();
}
CloseDBAndBackupEngine(); CloseDBAndBackupEngine();
} }

Loading…
Cancel
Save