Reunite checkpoint and backup core logic

Summary:
These code paths forked when checkpoint was introduced by copy/pasting the core backup logic. Over time they diverged and bug fixes were sometimes applied to one but not the other (like fix to include all relevant WALs for 2PC), or it required extra effort to fix both (like fix to forge CURRENT file). This diff reunites the code paths by extracting the core logic into a function, CreateCustomCheckpoint(), that is customizable via callbacks to implement both checkpoint and backup.

Related changes:

- flush_before_backup is now forcibly enabled when 2PC is enabled
- Extracted CheckpointImpl class definition into a header file. This is so the function, CreateCustomCheckpoint(), can be called by internal rocksdb code but not exposed to users.
- Implemented more functions in DummyDB/DummyLogFile (in backupable_db_test.cc) that are used by CreateCustomCheckpoint().
Closes https://github.com/facebook/rocksdb/pull/1932

Differential Revision: D4622986

Pulled By: ajkr

fbshipit-source-id: 157723884236ee3999a682673b64f7457a7a0d87
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 72c21fb3f2
commit e5e545a021
  1. 2
      CMakeLists.txt
  2. 2
      TARGETS
  3. 2
      include/rocksdb/utilities/backupable_db.h
  4. 2
      src.mk
  5. 142
      utilities/backupable/backupable_db.cc
  6. 35
      utilities/backupable/backupable_db_test.cc
  7. 260
      utilities/checkpoint/checkpoint_impl.cc
  8. 55
      utilities/checkpoint/checkpoint_impl.h

@ -446,7 +446,7 @@ set(SOURCES
util/xxhash.cc util/xxhash.cc
utilities/backupable/backupable_db.cc utilities/backupable/backupable_db.cc
utilities/blob_db/blob_db.cc utilities/blob_db/blob_db.cc
utilities/checkpoint/checkpoint.cc utilities/checkpoint/checkpoint_impl.cc
utilities/col_buf_decoder.cc utilities/col_buf_decoder.cc
utilities/col_buf_encoder.cc utilities/col_buf_encoder.cc
utilities/column_aware_encoding_util.cc utilities/column_aware_encoding_util.cc

@ -195,7 +195,7 @@ cpp_library(
"util/xxhash.cc", "util/xxhash.cc",
"utilities/backupable/backupable_db.cc", "utilities/backupable/backupable_db.cc",
"utilities/blob_db/blob_db.cc", "utilities/blob_db/blob_db.cc",
"utilities/checkpoint/checkpoint.cc", "utilities/checkpoint/checkpoint_impl.cc",
"utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc",
"utilities/convenience/info_log_finder.cc", "utilities/convenience/info_log_finder.cc",
"utilities/date_tiered/date_tiered_db_impl.cc", "utilities/date_tiered/date_tiered_db_impl.cc",

@ -256,12 +256,14 @@ class BackupEngine {
BackupEngine** backup_engine_ptr); BackupEngine** backup_engine_ptr);
// same as CreateNewBackup, but stores extra application metadata // same as CreateNewBackup, but stores extra application metadata
// Flush will always trigger if 2PC is enabled.
virtual Status CreateNewBackupWithMetadata( virtual Status CreateNewBackupWithMetadata(
DB* db, const std::string& app_metadata, bool flush_before_backup = false, DB* db, const std::string& app_metadata, bool flush_before_backup = false,
std::function<void()> progress_callback = []() {}) = 0; std::function<void()> progress_callback = []() {}) = 0;
// Captures the state of the database in the latest backup // Captures the state of the database in the latest backup
// NOT a thread safe call // NOT a thread safe call
// Flush will always trigger if 2PC is enabled.
virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false, virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false,
std::function<void()> progress_callback = std::function<void()> progress_callback =
[]() {}) { []() {}) {

@ -150,7 +150,7 @@ LIB_SOURCES = \
util/xxhash.cc \ util/xxhash.cc \
utilities/backupable/backupable_db.cc \ utilities/backupable/backupable_db.cc \
utilities/blob_db/blob_db.cc \ utilities/blob_db/blob_db.cc \
utilities/checkpoint/checkpoint.cc \ utilities/checkpoint/checkpoint_impl.cc \
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \
utilities/convenience/info_log_finder.cc \ utilities/convenience/info_log_finder.cc \
utilities/date_tiered/date_tiered_db_impl.cc \ utilities/date_tiered/date_tiered_db_impl.cc \

@ -21,6 +21,7 @@
#include "util/logging.h" #include "util/logging.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
#include "utilities/checkpoint/checkpoint_impl.h"
#ifndef __STDC_FORMAT_MACROS #ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
@ -702,28 +703,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
if (app_metadata.size() > kMaxAppMetaSize) { if (app_metadata.size() > kMaxAppMetaSize) {
return Status::InvalidArgument("App metadata too large"); return Status::InvalidArgument("App metadata too large");
} }
Status s;
std::vector<std::string> live_files;
VectorLogPtr live_wal_files;
uint64_t manifest_file_size = 0;
uint64_t sequence_number = db->GetLatestSequenceNumber();
s = db->DisableFileDeletions();
if (s.ok()) {
// this will return live_files prefixed with "/"
s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup);
}
// if we didn't flush before backup, we need to also get WAL files
if (s.ok() && !flush_before_backup && options_.backup_log_files) {
// returns file names prefixed with "/"
s = db->GetSortedWalFiles(live_wal_files);
}
if (!s.ok()) {
db->EnableFileDeletions(false);
return s;
}
TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles1");
TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles2");
BackupID new_backup_id = latest_backup_id_ + 1; BackupID new_backup_id = latest_backup_id_ + 1;
@ -735,7 +714,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
assert(ret.second == true); assert(ret.second == true);
auto& new_backup = ret.first->second; auto& new_backup = ret.first->second;
new_backup->RecordTimestamp(); new_backup->RecordTimestamp();
new_backup->SetSequenceNumber(sequence_number);
new_backup->SetAppMetadata(app_metadata); new_backup->SetAppMetadata(app_metadata);
auto start_backup = backup_env_-> NowMicros(); auto start_backup = backup_env_-> NowMicros();
@ -745,7 +723,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
new_backup_id); new_backup_id);
auto private_tmp_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)); auto private_tmp_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id, true));
s = backup_env_->FileExists(private_tmp_dir); Status s = backup_env_->FileExists(private_tmp_dir);
if (s.ok()) { if (s.ok()) {
// maybe last backup failed and left partial state behind, clean it up // maybe last backup failed and left partial state behind, clean it up
s = GarbageCollect(); s = GarbageCollect();
@ -767,81 +745,53 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
// This is used to check whether a live files shares a dst_path with another // This is used to check whether a live files shares a dst_path with another
// live file. // live file.
std::unordered_set<std::string> live_dst_paths; std::unordered_set<std::string> live_dst_paths;
live_dst_paths.reserve(live_files.size() + live_wal_files.size());
// Pre-fetch sizes for data files
std::unordered_map<std::string, uint64_t> data_path_to_size;
if (s.ok()) {
s = InsertPathnameToSizeBytes(db->GetName(), db_env_, &data_path_to_size);
}
std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish; std::vector<BackupAfterCopyOrCreateWorkItem> backup_items_to_finish;
// Add a CopyOrCreateWorkItem to the channel for each live file // Add a CopyOrCreateWorkItem to the channel for each live file
std::string manifest_fname, current_fname; db->DisableFileDeletions();
for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { if (s.ok()) {
uint64_t number; CheckpointImpl checkpoint(db);
FileType type; uint64_t sequence_number = 0;
bool ok = ParseFileName(live_files[i], &number, &type); s = checkpoint.CreateCustomCheckpoint(
if (!ok) { db->GetDBOptions(),
assert(false); [&](const std::string& src_dirname, const std::string& fname,
return Status::Corruption("Can't parse file name. This is very bad"); FileType) {
} // custom checkpoint will switch to calling copy_file_cb after it sees
// we should only get sst, manifest and current files here // NotSupported returned from link_file_cb.
assert(type == kTableFile || type == kDescriptorFile || return Status::NotSupported();
type == kCurrentFile || type == kOptionsFile); } /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
if (type == kCurrentFile) { uint64_t size_limit_bytes, FileType type) {
// We will craft the current file manually to ensure it's consistent with if (type == kLogFile && !options_.backup_log_files) {
// the manifest number. This is necessary because current's file contents return Status::OK();
// can change during backup. }
current_fname = live_files[i]; Log(options_.info_log, "add file for backup %s", fname.c_str());
continue; uint64_t size_bytes = 0;
} else if (type == kDescriptorFile) { Status st;
manifest_fname = live_files[i]; if (type == kTableFile) {
} st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
}
auto data_path_to_size_iter = if (st.ok()) {
data_path_to_size.find(db->GetName() + live_files[i]); st = AddBackupFileWorkItem(
uint64_t size_bytes = data_path_to_size_iter == data_path_to_size.end() live_dst_paths, backup_items_to_finish, new_backup_id,
? port::kMaxUint64 options_.share_table_files && type == kTableFile, src_dirname,
: data_path_to_size_iter->second; fname, rate_limiter, size_bytes, size_limit_bytes,
options_.share_files_with_checksum && type == kTableFile,
// rules: progress_callback);
// * if it's kTableFile, then it's shared }
// * if it's kDescriptorFile, limit the size to manifest_file_size return st;
s = AddBackupFileWorkItem( } /* copy_file_cb */,
live_dst_paths, backup_items_to_finish, new_backup_id, [&](const std::string& fname, const std::string& contents, FileType) {
options_.share_table_files && type == kTableFile, db->GetName(), Log(options_.info_log, "add file for backup %s", fname.c_str());
live_files[i], rate_limiter, size_bytes, return AddBackupFileWorkItem(
(type == kDescriptorFile) ? manifest_file_size : 0, live_dst_paths, backup_items_to_finish, new_backup_id,
options_.share_files_with_checksum && type == kTableFile, false /* shared */, "" /* src_dir */, fname, rate_limiter,
progress_callback); contents.size(), 0 /* size_limit */, false /* shared_checksum */,
} progress_callback, contents);
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { } /* create_file_cb */,
// Write the current file with the manifest filename as its contents. &sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
s = AddBackupFileWorkItem( if (s.ok()) {
live_dst_paths, backup_items_to_finish, new_backup_id, new_backup->SetSequenceNumber(sequence_number);
false /* shared */, "" /* src_dir */, CurrentFileName(""), rate_limiter,
manifest_fname.size(), 0 /* size_limit */, false /* shared_checksum */,
progress_callback, manifest_fname.substr(1) + "\n");
}
ROCKS_LOG_INFO(options_.info_log,
"begin add wal files for backup -- %" ROCKSDB_PRIszt,
live_wal_files.size());
// Add a CopyOrCreateWorkItem to the channel for each WAL file
for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) {
uint64_t size_bytes = live_wal_files[i]->SizeFileBytes();
if (live_wal_files[i]->Type() == kAliveLogFile) {
ROCKS_LOG_INFO(options_.info_log,
"add wal file for backup %s -- %" PRIu64,
live_wal_files[i]->PathName().c_str(), size_bytes);
// we only care about live log files
// copy the file into backup_dir/files/<new backup>/
s = AddBackupFileWorkItem(live_dst_paths, backup_items_to_finish,
new_backup_id, false, /* not shared */
db->GetOptions().wal_dir,
live_wal_files[i]->PathName(), rate_limiter,
size_bytes, size_bytes);
} }
} }
ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish.");

@ -61,6 +61,10 @@ class DummyDB : public StackableDB {
return options_; return options_;
} }
virtual DBOptions GetDBOptions() const override {
return DBOptions(options_);
}
virtual Status EnableFileDeletions(bool force) override { virtual Status EnableFileDeletions(bool force) override {
EXPECT_TRUE(!deletions_enabled_); EXPECT_TRUE(!deletions_enabled_);
deletions_enabled_ = true; deletions_enabled_ = true;
@ -106,9 +110,9 @@ class DummyDB : public StackableDB {
} }
virtual SequenceNumber StartSequence() const override { virtual SequenceNumber StartSequence() const override {
// backupabledb should not need this method // this seqnum guarantees the dummy file will be included in the backup
EXPECT_TRUE(false); // as long as it is alive.
return 0; return kMaxSequenceNumber;
} }
virtual uint64_t SizeFileBytes() const override { virtual uint64_t SizeFileBytes() const override {
@ -204,6 +208,14 @@ class TestEnv : public EnvWrapper {
return EnvWrapper::DeleteFile(fname); return EnvWrapper::DeleteFile(fname);
} }
virtual Status DeleteDir(const std::string& dirname) override {
MutexLock l(&mutex_);
if (fail_delete_files_) {
return Status::IOError();
}
return EnvWrapper::DeleteDir(dirname);
}
void AssertWrittenFiles(std::vector<std::string>& should_have_written) { void AssertWrittenFiles(std::vector<std::string>& should_have_written) {
MutexLock l(&mutex_); MutexLock l(&mutex_);
std::sort(should_have_written.begin(), should_have_written.end()); std::sort(should_have_written.begin(), should_have_written.end());
@ -266,6 +278,19 @@ class TestEnv : public EnvWrapper {
} }
return EnvWrapper::GetChildrenFileAttributes(dir, r); return EnvWrapper::GetChildrenFileAttributes(dir, r);
} }
Status GetFileSize(const std::string& path, uint64_t* size_bytes) override {
if (filenames_for_mocked_attrs_.size() > 0) {
auto fname = path.substr(path.find_last_of('/'));
auto filename_iter = std::find(filenames_for_mocked_attrs_.begin(),
filenames_for_mocked_attrs_.end(), fname);
if (filename_iter != filenames_for_mocked_attrs_.end()) {
*size_bytes = 10;
return Status::OK();
}
return Status::NotFound(fname);
}
return EnvWrapper::GetFileSize(path, size_bytes);
}
void SetCreateDirIfMissingFailure(bool fail) { void SetCreateDirIfMissingFailure(bool fail) {
create_dir_if_missing_failure_ = fail; create_dir_if_missing_failure_ = fail;
@ -1374,10 +1399,10 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) {
FillDB(db_.get(), 0, 100); FillDB(db_.get(), 0, 100);
rocksdb::SyncPoint::GetInstance()->LoadDependency({ rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"BackupEngineImpl::CreateNewBackup:SavedLiveFiles1", {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1",
"VersionSet::LogAndApply:WriteManifest"}, "VersionSet::LogAndApply:WriteManifest"},
{"VersionSet::LogAndApply:WriteManifestDone", {"VersionSet::LogAndApply:WriteManifestDone",
"BackupEngineImpl::CreateNewBackup:SavedLiveFiles2"}, "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"},
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();

@ -9,7 +9,7 @@
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
#include "rocksdb/utilities/checkpoint.h" #include "utilities/checkpoint/checkpoint_impl.h"
#ifndef __STDC_FORMAT_MACROS #ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
@ -18,37 +18,20 @@
#include <inttypes.h> #include <inttypes.h>
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include <vector>
#include "db/wal_manager.h" #include "db/wal_manager.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "rocksdb/utilities/checkpoint.h"
#include "util/file_util.h" #include "util/file_util.h"
#include "util/filename.h" #include "util/filename.h"
#include "util/sync_point.h" #include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
class CheckpointImpl : public Checkpoint {
public:
// Creates a Checkpoint object to be used for creating openable snapshots
explicit CheckpointImpl(DB* db) : db_(db) {}
// Builds an openable snapshot of RocksDB on the same disk, which
// accepts an output directory on the same disk, and under the directory
// (1) hard-linked SST files pointing to existing live SST files
// SST files will be copied if output directory is on a different filesystem
// (2) a copied manifest files and other files
// The directory should not already exist and will be created by this API.
// The directory will be an absolute path
using Checkpoint::CreateCheckpoint;
virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush) override;
private:
DB* db_;
};
Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) { Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
*checkpoint_ptr = new CheckpointImpl(db); *checkpoint_ptr = new CheckpointImpl(db);
return Status::OK(); return Status::OK();
@ -62,16 +45,9 @@ Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir,
// Builds an openable snapshot of RocksDB // Builds an openable snapshot of RocksDB
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush) { uint64_t log_size_for_flush) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
DBOptions db_options = db_->GetDBOptions(); DBOptions db_options = db_->GetDBOptions();
uint64_t min_log_num = port::kMaxUint64;
uint64_t sequence_number = db_->GetLatestSequenceNumber();
bool same_fs = true;
VectorLogPtr live_wal_files;
s = db_->GetEnv()->FileExists(checkpoint_dir); Status s = db_->GetEnv()->FileExists(checkpoint_dir);
if (s.ok()) { if (s.ok()) {
return Status::InvalidArgument("Directory exists"); return Status::InvalidArgument("Directory exists");
} else if (!s.IsNotFound()) { } else if (!s.IsNotFound()) {
@ -79,29 +55,124 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
return s; return s;
} }
s = db_->DisableFileDeletions(); ROCKS_LOG_INFO(
db_options.info_log,
"Started the snapshot process -- creating snapshot in directory %s",
checkpoint_dir.c_str());
std::string full_private_path = checkpoint_dir + ".tmp";
// create snapshot directory
s = db_->GetEnv()->CreateDir(full_private_path);
uint64_t sequence_number = 0;
if (s.ok()) {
db_->DisableFileDeletions();
s = CreateCustomCheckpoint(
db_options,
[&](const std::string& src_dirname, const std::string& fname,
FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str());
return db_->GetEnv()->LinkFile(src_dirname + fname,
full_private_path + fname);
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
return CopyFile(db_->GetEnv(), src_dirname + fname,
full_private_path + fname, size_limit_bytes,
db_options.use_fsync);
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
return CreateFile(db_->GetEnv(), full_private_path + fname, contents);
} /* create_file_cb */,
&sequence_number, log_size_for_flush);
// we copied all the files, enable file deletions
db_->EnableFileDeletions(false);
}
if (s.ok()) {
// move tmp private backup to real snapshot directory
s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
}
if (s.ok()) {
unique_ptr<Directory> checkpoint_directory;
db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
if (checkpoint_directory != nullptr) {
s = checkpoint_directory->Fsync();
}
}
if (s.ok()) {
// here we know that we succeeded and installed the new snapshot
ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
sequence_number);
} else {
// clean all the files we might have created
ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
s.ToString().c_str());
// we have to delete the dir and all its children
std::vector<std::string> subchildren;
db_->GetEnv()->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) {
std::string subchild_path = full_private_path + "/" + subchild;
Status s1 = db_->GetEnv()->DeleteFile(subchild_path);
ROCKS_LOG_INFO(db_options.info_log, "Delete file %s -- %s",
subchild_path.c_str(), s1.ToString().c_str());
}
// finally delete the private dir
Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
ROCKS_LOG_INFO(db_options.info_log, "Delete dir %s -- %s",
full_private_path.c_str(), s1.ToString().c_str());
}
return s;
}
Status CheckpointImpl::CreateCustomCheckpoint(
const DBOptions& db_options,
std::function<Status(const std::string& src_dirname,
const std::string& src_fname, FileType type)>
link_file_cb,
std::function<Status(const std::string& src_dirname,
const std::string& src_fname,
uint64_t size_limit_bytes, FileType type)>
copy_file_cb,
std::function<Status(const std::string& fname, const std::string& contents,
FileType type)>
create_file_cb,
uint64_t* sequence_number, uint64_t log_size_for_flush) {
Status s;
std::vector<std::string> live_files;
uint64_t manifest_file_size = 0;
uint64_t min_log_num = port::kMaxUint64;
*sequence_number = db_->GetLatestSequenceNumber();
bool same_fs = true;
VectorLogPtr live_wal_files;
bool flush_memtable = true; bool flush_memtable = true;
if (s.ok()) { if (s.ok()) {
if (!db_options.allow_2pc) { if (!db_options.allow_2pc) {
// If out standing log files are small, we skip the flush. if (log_size_for_flush == port::kMaxUint64) {
s = db_->GetSortedWalFiles(live_wal_files); flush_memtable = false;
} else if (log_size_for_flush > 0) {
// If out standing log files are small, we skip the flush.
s = db_->GetSortedWalFiles(live_wal_files);
if (!s.ok()) { if (!s.ok()) {
db_->EnableFileDeletions(false); return s;
return s; }
}
// Don't flush column families if total log size is smaller than // Don't flush column families if total log size is smaller than
// log_size_for_flush. We copy the log files instead. // log_size_for_flush. We copy the log files instead.
// We may be able to cover 2PC case too. // We may be able to cover 2PC case too.
uint64_t total_wal_size = 0; uint64_t total_wal_size = 0;
for (auto& wal : live_wal_files) { for (auto& wal : live_wal_files) {
total_wal_size += wal->SizeFileBytes(); total_wal_size += wal->SizeFileBytes();
} }
if (total_wal_size < log_size_for_flush) { if (total_wal_size < log_size_for_flush) {
flush_memtable = false; flush_memtable = false;
}
live_wal_files.clear();
} }
live_wal_files.clear();
} }
// this will return live_files prefixed with "/" // this will return live_files prefixed with "/"
@ -112,7 +183,6 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
// Need to refetch the live files to recapture the snapshot. // Need to refetch the live files to recapture the snapshot.
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep, if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
&min_log_num)) { &min_log_num)) {
db_->EnableFileDeletions(false);
return Status::InvalidArgument( return Status::InvalidArgument(
"2PC enabled but cannot fine the min log number to keep."); "2PC enabled but cannot fine the min log number to keep.");
} }
@ -127,12 +197,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
// be skipped. 000003.log contains commit message of tnx1, but we don't // be skipped. 000003.log contains commit message of tnx1, but we don't
// have respective prepare record for it. // have respective prepare record for it.
// In order to avoid this situation, we need to force flush to make sure // In order to avoid this situation, we need to force flush to make sure
// all transactions commited before getting min_log_num will be flushed // all transactions committed before getting min_log_num will be flushed
// to SST files. // to SST files.
// We cannot get min_log_num before calling the GetLiveFiles() for the // We cannot get min_log_num before calling the GetLiveFiles() for the
// first time, because if we do that, all the logs files will be included, // first time, because if we do that, all the logs files will be included,
// far more than needed. // far more than needed.
s = db_->GetLiveFiles(live_files, &manifest_file_size, /* flush */ true); s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
} }
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
@ -143,20 +213,10 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
s = db_->GetSortedWalFiles(live_wal_files); s = db_->GetSortedWalFiles(live_wal_files);
} }
if (!s.ok()) { if (!s.ok()) {
db_->EnableFileDeletions(false);
return s; return s;
} }
size_t wal_size = live_wal_files.size(); size_t wal_size = live_wal_files.size();
ROCKS_LOG_INFO(
db_options.info_log,
"Started the snapshot process -- creating snapshot in directory %s",
checkpoint_dir.c_str());
std::string full_private_path = checkpoint_dir + ".tmp";
// create snapshot directory
s = db_->GetEnv()->CreateDir(full_private_path);
// copy/hard link live_files // copy/hard link live_files
std::string manifest_fname, current_fname; std::string manifest_fname, current_fname;
@ -188,25 +248,21 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
// * if it's kDescriptorFile, limit the size to manifest_file_size // * if it's kDescriptorFile, limit the size to manifest_file_size
// * always copy if cross-device link // * always copy if cross-device link
if ((type == kTableFile) && same_fs) { if ((type == kTableFile) && same_fs) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", src_fname.c_str()); s = link_file_cb(db_->GetName(), src_fname, type);
s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname,
full_private_path + src_fname);
if (s.IsNotSupported()) { if (s.IsNotSupported()) {
same_fs = false; same_fs = false;
s = Status::OK(); s = Status::OK();
} }
} }
if ((type != kTableFile) || (!same_fs)) { if ((type != kTableFile) || (!same_fs)) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", src_fname.c_str()); s = copy_file_cb(db_->GetName(), src_fname,
s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname, (type == kDescriptorFile) ? manifest_file_size : 0,
full_private_path + src_fname, type);
(type == kDescriptorFile) ? manifest_file_size : 0,
db_options.use_fsync);
} }
} }
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
s = CreateFile(db_->GetEnv(), full_private_path + current_fname, create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
manifest_fname.substr(1) + "\n"); kCurrentFile);
} }
ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt, ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
live_wal_files.size()); live_wal_files.size());
@ -216,82 +272,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
for (size_t i = 0; s.ok() && i < wal_size; ++i) { for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) && if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(!flush_memtable || (!flush_memtable ||
live_wal_files[i]->StartSequence() >= sequence_number || live_wal_files[i]->StartSequence() >= *sequence_number ||
live_wal_files[i]->LogNumber() >= min_log_num)) { live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) { if (i + 1 == wal_size) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
live_wal_files[i]->PathName().c_str()); live_wal_files[i]->SizeFileBytes(), kLogFile);
s = CopyFile(db_->GetEnv(),
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(),
live_wal_files[i]->SizeFileBytes(), db_options.use_fsync);
break; break;
} }
if (same_fs) { if (same_fs) {
// we only care about live log files // we only care about live log files
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(),
live_wal_files[i]->PathName().c_str()); kLogFile);
s = db_->GetEnv()->LinkFile(
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName());
if (s.IsNotSupported()) { if (s.IsNotSupported()) {
same_fs = false; same_fs = false;
s = Status::OK(); s = Status::OK();
} }
} }
if (!same_fs) { if (!same_fs) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0,
live_wal_files[i]->PathName().c_str()); kLogFile);
s = CopyFile(db_->GetEnv(),
db_options.wal_dir + live_wal_files[i]->PathName(),
full_private_path + live_wal_files[i]->PathName(), 0,
db_options.use_fsync);
} }
} }
} }
// we copied all the files, enable file deletions
db_->EnableFileDeletions(false);
if (s.ok()) {
// move tmp private backup to real snapshot directory
s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
}
if (s.ok()) {
unique_ptr<Directory> checkpoint_directory;
db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
if (checkpoint_directory != nullptr) {
s = checkpoint_directory->Fsync();
}
}
if (!s.ok()) {
// clean all the files we might have created
ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s",
s.ToString().c_str());
// we have to delete the dir and all its children
std::vector<std::string> subchildren;
db_->GetEnv()->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) {
std::string subchild_path = full_private_path + "/" + subchild;
Status s1 = db_->GetEnv()->DeleteFile(subchild_path);
ROCKS_LOG_INFO(db_options.info_log, "Delete file %s -- %s",
subchild_path.c_str(), s1.ToString().c_str());
}
// finally delete the private dir
Status s1 = db_->GetEnv()->DeleteDir(full_private_path);
ROCKS_LOG_INFO(db_options.info_log, "Delete dir %s -- %s",
full_private_path.c_str(), s1.ToString().c_str());
return s;
}
// here we know that we succeeded and installed the new snapshot
ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good");
ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64,
sequence_number);
return s; return s;
} }
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -0,0 +1,55 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include "rocksdb/utilities/checkpoint.h"
#include <string>
#include "rocksdb/db.h"
#include "util/filename.h"
namespace rocksdb {
class CheckpointImpl : public Checkpoint {
public:
// Creates a Checkpoint object to be used for creating openable snapshots
explicit CheckpointImpl(DB* db) : db_(db) {}
// Builds an openable snapshot of RocksDB on the same disk, which
// accepts an output directory on the same disk, and under the directory
// (1) hard-linked SST files pointing to existing live SST files
// SST files will be copied if output directory is on a different filesystem
// (2) a copied manifest files and other files
// The directory should not already exist and will be created by this API.
// The directory will be an absolute path
using Checkpoint::CreateCheckpoint;
virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush) override;
// Checkpoint logic can be customized by providing callbacks for link, copy,
// or create.
Status CreateCustomCheckpoint(
const DBOptions& db_options,
std::function<Status(const std::string& src_dirname,
const std::string& fname, FileType type)>
link_file_cb,
std::function<Status(const std::string& src_dirname,
const std::string& fname, uint64_t size_limit_bytes,
FileType type)>
copy_file_cb,
std::function<Status(const std::string& fname,
const std::string& contents, FileType type)>
create_file_cb,
uint64_t* sequence_number, uint64_t log_size_for_flush);
private:
DB* db_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE
Loading…
Cancel
Save