From e5e545a021542f95712039b67b9863175433a88b Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Mon, 24 Apr 2017 14:57:27 -0700 Subject: [PATCH] Reunite checkpoint and backup core logic Summary: These code paths forked when checkpoint was introduced by copy/pasting the core backup logic. Over time they diverged and bug fixes were sometimes applied to one but not the other (like fix to include all relevant WALs for 2PC), or it required extra effort to fix both (like fix to forge CURRENT file). This diff reunites the code paths by extracting the core logic into a function, CreateCustomCheckpoint(), that is customizable via callbacks to implement both checkpoint and backup. Related changes: - flush_before_backup is now forcibly enabled when 2PC is enabled - Extracted CheckpointImpl class definition into a header file. This is so the function, CreateCustomCheckpoint(), can be called by internal rocksdb code but not exposed to users. - Implemented more functions in DummyDB/DummyLogFile (in backupable_db_test.cc) that are used by CreateCustomCheckpoint(). Closes https://github.com/facebook/rocksdb/pull/1932 Differential Revision: D4622986 Pulled By: ajkr fbshipit-source-id: 157723884236ee3999a682673b64f7457a7a0d87 --- CMakeLists.txt | 2 +- TARGETS | 2 +- include/rocksdb/utilities/backupable_db.h | 2 + src.mk | 2 +- utilities/backupable/backupable_db.cc | 142 ++++------ utilities/backupable/backupable_db_test.cc | 35 ++- .../{checkpoint.cc => checkpoint_impl.cc} | 260 +++++++++--------- utilities/checkpoint/checkpoint_impl.h | 55 ++++ 8 files changed, 269 insertions(+), 231 deletions(-) rename utilities/checkpoint/{checkpoint.cc => checkpoint_impl.cc} (65%) create mode 100644 utilities/checkpoint/checkpoint_impl.h diff --git a/CMakeLists.txt b/CMakeLists.txt index c015454a9..b26b1a39e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -446,7 +446,7 @@ set(SOURCES util/xxhash.cc utilities/backupable/backupable_db.cc utilities/blob_db/blob_db.cc - utilities/checkpoint/checkpoint.cc + utilities/checkpoint/checkpoint_impl.cc utilities/col_buf_decoder.cc utilities/col_buf_encoder.cc utilities/column_aware_encoding_util.cc diff --git a/TARGETS b/TARGETS index 728b023ef..715110ba5 100644 --- a/TARGETS +++ b/TARGETS @@ -195,7 +195,7 @@ cpp_library( "util/xxhash.cc", "utilities/backupable/backupable_db.cc", "utilities/blob_db/blob_db.cc", - "utilities/checkpoint/checkpoint.cc", + "utilities/checkpoint/checkpoint_impl.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/convenience/info_log_finder.cc", "utilities/date_tiered/date_tiered_db_impl.cc", diff --git a/include/rocksdb/utilities/backupable_db.h b/include/rocksdb/utilities/backupable_db.h index d80601c03..071f3e480 100644 --- a/include/rocksdb/utilities/backupable_db.h +++ b/include/rocksdb/utilities/backupable_db.h @@ -256,12 +256,14 @@ class BackupEngine { BackupEngine** backup_engine_ptr); // same as CreateNewBackup, but stores extra application metadata + // Flush will always trigger if 2PC is enabled. virtual Status CreateNewBackupWithMetadata( DB* db, const std::string& app_metadata, bool flush_before_backup = false, std::function progress_callback = []() {}) = 0; // Captures the state of the database in the latest backup // NOT a thread safe call + // Flush will always trigger if 2PC is enabled. virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false, std::function progress_callback = []() {}) { diff --git a/src.mk b/src.mk index 6f4303cc2..39f4218d0 100644 --- a/src.mk +++ b/src.mk @@ -150,7 +150,7 @@ LIB_SOURCES = \ util/xxhash.cc \ utilities/backupable/backupable_db.cc \ utilities/blob_db/blob_db.cc \ - utilities/checkpoint/checkpoint.cc \ + utilities/checkpoint/checkpoint_impl.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/convenience/info_log_finder.cc \ utilities/date_tiered/date_tiered_db_impl.cc \ diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 4e5aff20a..c40c78179 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -21,6 +21,7 @@ #include "util/logging.h" #include "util/string_util.h" #include "util/sync_point.h" +#include "utilities/checkpoint/checkpoint_impl.h" #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS @@ -702,28 +703,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( if (app_metadata.size() > kMaxAppMetaSize) { return Status::InvalidArgument("App metadata too large"); } - Status s; - std::vector live_files; - VectorLogPtr live_wal_files; - uint64_t manifest_file_size = 0; - uint64_t sequence_number = db->GetLatestSequenceNumber(); - - s = db->DisableFileDeletions(); - if (s.ok()) { - // this will return live_files prefixed with "/" - s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup); - } - // if we didn't flush before backup, we need to also get WAL files - if (s.ok() && !flush_before_backup && options_.backup_log_files) { - // returns file names prefixed with "/" - s = db->GetSortedWalFiles(live_wal_files); - } - if (!s.ok()) { - db->EnableFileDeletions(false); - return s; - } - TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles1"); - TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles2"); BackupID new_backup_id = latest_backup_id_ + 1; @@ -735,7 +714,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( assert(ret.second == true); auto& new_backup = ret.first->second; new_backup->RecordTimestamp(); - new_backup->SetSequenceNumber(sequence_number); new_backup->SetAppMetadata(app_metadata); auto start_backup = backup_env_-> NowMicros(); @@ -745,7 +723,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( new_backup_id); auto private_tmp_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)); - s = backup_env_->FileExists(private_tmp_dir); + Status s = backup_env_->FileExists(private_tmp_dir); if (s.ok()) { // maybe last backup failed and left partial state behind, clean it up s = GarbageCollect(); @@ -767,81 +745,53 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( // This is used to check whether a live files shares a dst_path with another // live file. std::unordered_set live_dst_paths; - live_dst_paths.reserve(live_files.size() + live_wal_files.size()); - - // Pre-fetch sizes for data files - std::unordered_map data_path_to_size; - if (s.ok()) { - s = InsertPathnameToSizeBytes(db->GetName(), db_env_, &data_path_to_size); - } std::vector backup_items_to_finish; // Add a CopyOrCreateWorkItem to the channel for each live file - std::string manifest_fname, current_fname; - for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { - uint64_t number; - FileType type; - bool ok = ParseFileName(live_files[i], &number, &type); - if (!ok) { - assert(false); - return Status::Corruption("Can't parse file name. This is very bad"); - } - // we should only get sst, manifest and current files here - assert(type == kTableFile || type == kDescriptorFile || - type == kCurrentFile || type == kOptionsFile); - - if (type == kCurrentFile) { - // We will craft the current file manually to ensure it's consistent with - // the manifest number. This is necessary because current's file contents - // can change during backup. - current_fname = live_files[i]; - continue; - } else if (type == kDescriptorFile) { - manifest_fname = live_files[i]; - } - - auto data_path_to_size_iter = - data_path_to_size.find(db->GetName() + live_files[i]); - uint64_t size_bytes = data_path_to_size_iter == data_path_to_size.end() - ? port::kMaxUint64 - : data_path_to_size_iter->second; - - // rules: - // * if it's kTableFile, then it's shared - // * if it's kDescriptorFile, limit the size to manifest_file_size - s = AddBackupFileWorkItem( - live_dst_paths, backup_items_to_finish, new_backup_id, - options_.share_table_files && type == kTableFile, db->GetName(), - live_files[i], rate_limiter, size_bytes, - (type == kDescriptorFile) ? manifest_file_size : 0, - options_.share_files_with_checksum && type == kTableFile, - progress_callback); - } - if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { - // Write the current file with the manifest filename as its contents. - s = AddBackupFileWorkItem( - live_dst_paths, backup_items_to_finish, new_backup_id, - false /* shared */, "" /* src_dir */, CurrentFileName(""), rate_limiter, - manifest_fname.size(), 0 /* size_limit */, false /* shared_checksum */, - progress_callback, manifest_fname.substr(1) + "\n"); - } - ROCKS_LOG_INFO(options_.info_log, - "begin add wal files for backup -- %" ROCKSDB_PRIszt, - live_wal_files.size()); - // Add a CopyOrCreateWorkItem to the channel for each WAL file - for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) { - uint64_t size_bytes = live_wal_files[i]->SizeFileBytes(); - if (live_wal_files[i]->Type() == kAliveLogFile) { - ROCKS_LOG_INFO(options_.info_log, - "add wal file for backup %s -- %" PRIu64, - live_wal_files[i]->PathName().c_str(), size_bytes); - // we only care about live log files - // copy the file into backup_dir/files// - s = AddBackupFileWorkItem(live_dst_paths, backup_items_to_finish, - new_backup_id, false, /* not shared */ - db->GetOptions().wal_dir, - live_wal_files[i]->PathName(), rate_limiter, - size_bytes, size_bytes); + db->DisableFileDeletions(); + if (s.ok()) { + CheckpointImpl checkpoint(db); + uint64_t sequence_number = 0; + s = checkpoint.CreateCustomCheckpoint( + db->GetDBOptions(), + [&](const std::string& src_dirname, const std::string& fname, + FileType) { + // custom checkpoint will switch to calling copy_file_cb after it sees + // NotSupported returned from link_file_cb. + return Status::NotSupported(); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType type) { + if (type == kLogFile && !options_.backup_log_files) { + return Status::OK(); + } + Log(options_.info_log, "add file for backup %s", fname.c_str()); + uint64_t size_bytes = 0; + Status st; + if (type == kTableFile) { + st = db_env_->GetFileSize(src_dirname + fname, &size_bytes); + } + if (st.ok()) { + st = AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + options_.share_table_files && type == kTableFile, src_dirname, + fname, rate_limiter, size_bytes, size_limit_bytes, + options_.share_files_with_checksum && type == kTableFile, + progress_callback); + } + return st; + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, FileType) { + Log(options_.info_log, "add file for backup %s", fname.c_str()); + return AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + false /* shared */, "" /* src_dir */, fname, rate_limiter, + contents.size(), 0 /* size_limit */, false /* shared_checksum */, + progress_callback, contents); + } /* create_file_cb */, + &sequence_number, flush_before_backup ? 0 : port::kMaxUint64); + if (s.ok()) { + new_backup->SetSequenceNumber(sequence_number); } } ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 507b6fbb6..6e6e38d3f 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -61,6 +61,10 @@ class DummyDB : public StackableDB { return options_; } + virtual DBOptions GetDBOptions() const override { + return DBOptions(options_); + } + virtual Status EnableFileDeletions(bool force) override { EXPECT_TRUE(!deletions_enabled_); deletions_enabled_ = true; @@ -106,9 +110,9 @@ class DummyDB : public StackableDB { } virtual SequenceNumber StartSequence() const override { - // backupabledb should not need this method - EXPECT_TRUE(false); - return 0; + // this seqnum guarantees the dummy file will be included in the backup + // as long as it is alive. + return kMaxSequenceNumber; } virtual uint64_t SizeFileBytes() const override { @@ -204,6 +208,14 @@ class TestEnv : public EnvWrapper { return EnvWrapper::DeleteFile(fname); } + virtual Status DeleteDir(const std::string& dirname) override { + MutexLock l(&mutex_); + if (fail_delete_files_) { + return Status::IOError(); + } + return EnvWrapper::DeleteDir(dirname); + } + void AssertWrittenFiles(std::vector& should_have_written) { MutexLock l(&mutex_); std::sort(should_have_written.begin(), should_have_written.end()); @@ -266,6 +278,19 @@ class TestEnv : public EnvWrapper { } return EnvWrapper::GetChildrenFileAttributes(dir, r); } + Status GetFileSize(const std::string& path, uint64_t* size_bytes) override { + if (filenames_for_mocked_attrs_.size() > 0) { + auto fname = path.substr(path.find_last_of('/')); + auto filename_iter = std::find(filenames_for_mocked_attrs_.begin(), + filenames_for_mocked_attrs_.end(), fname); + if (filename_iter != filenames_for_mocked_attrs_.end()) { + *size_bytes = 10; + return Status::OK(); + } + return Status::NotFound(fname); + } + return EnvWrapper::GetFileSize(path, size_bytes); + } void SetCreateDirIfMissingFailure(bool fail) { create_dir_if_missing_failure_ = fail; @@ -1374,10 +1399,10 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) { FillDB(db_.get(), 0, 100); rocksdb::SyncPoint::GetInstance()->LoadDependency({ - {"BackupEngineImpl::CreateNewBackup:SavedLiveFiles1", + {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", "VersionSet::LogAndApply:WriteManifest"}, {"VersionSet::LogAndApply:WriteManifestDone", - "BackupEngineImpl::CreateNewBackup:SavedLiveFiles2"}, + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/utilities/checkpoint/checkpoint.cc b/utilities/checkpoint/checkpoint_impl.cc similarity index 65% rename from utilities/checkpoint/checkpoint.cc rename to utilities/checkpoint/checkpoint_impl.cc index fc9e16f5c..3242946af 100644 --- a/utilities/checkpoint/checkpoint.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -9,7 +9,7 @@ #ifndef ROCKSDB_LITE -#include "rocksdb/utilities/checkpoint.h" +#include "utilities/checkpoint/checkpoint_impl.h" #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS @@ -18,37 +18,20 @@ #include #include #include +#include + #include "db/wal_manager.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/transaction_log.h" +#include "rocksdb/utilities/checkpoint.h" #include "util/file_util.h" #include "util/filename.h" #include "util/sync_point.h" namespace rocksdb { -class CheckpointImpl : public Checkpoint { - public: - // Creates a Checkpoint object to be used for creating openable snapshots - explicit CheckpointImpl(DB* db) : db_(db) {} - - // Builds an openable snapshot of RocksDB on the same disk, which - // accepts an output directory on the same disk, and under the directory - // (1) hard-linked SST files pointing to existing live SST files - // SST files will be copied if output directory is on a different filesystem - // (2) a copied manifest files and other files - // The directory should not already exist and will be created by this API. - // The directory will be an absolute path - using Checkpoint::CreateCheckpoint; - virtual Status CreateCheckpoint(const std::string& checkpoint_dir, - uint64_t log_size_for_flush) override; - - private: - DB* db_; -}; - Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) { *checkpoint_ptr = new CheckpointImpl(db); return Status::OK(); @@ -62,16 +45,9 @@ Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir, // Builds an openable snapshot of RocksDB Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush) { - Status s; - std::vector live_files; - uint64_t manifest_file_size = 0; DBOptions db_options = db_->GetDBOptions(); - uint64_t min_log_num = port::kMaxUint64; - uint64_t sequence_number = db_->GetLatestSequenceNumber(); - bool same_fs = true; - VectorLogPtr live_wal_files; - s = db_->GetEnv()->FileExists(checkpoint_dir); + Status s = db_->GetEnv()->FileExists(checkpoint_dir); if (s.ok()) { return Status::InvalidArgument("Directory exists"); } else if (!s.IsNotFound()) { @@ -79,29 +55,124 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, return s; } - s = db_->DisableFileDeletions(); + ROCKS_LOG_INFO( + db_options.info_log, + "Started the snapshot process -- creating snapshot in directory %s", + checkpoint_dir.c_str()); + std::string full_private_path = checkpoint_dir + ".tmp"; + // create snapshot directory + s = db_->GetEnv()->CreateDir(full_private_path); + uint64_t sequence_number = 0; + if (s.ok()) { + db_->DisableFileDeletions(); + s = CreateCustomCheckpoint( + db_options, + [&](const std::string& src_dirname, const std::string& fname, + FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str()); + return db_->GetEnv()->LinkFile(src_dirname + fname, + full_private_path + fname); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str()); + return CopyFile(db_->GetEnv(), src_dirname + fname, + full_private_path + fname, size_limit_bytes, + db_options.use_fsync); + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str()); + return CreateFile(db_->GetEnv(), full_private_path + fname, contents); + } /* create_file_cb */, + &sequence_number, log_size_for_flush); + // we copied all the files, enable file deletions + db_->EnableFileDeletions(false); + } + + if (s.ok()) { + // move tmp private backup to real snapshot directory + s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir); + } + if (s.ok()) { + unique_ptr checkpoint_directory; + db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory); + if (checkpoint_directory != nullptr) { + s = checkpoint_directory->Fsync(); + } + } + + if (s.ok()) { + // here we know that we succeeded and installed the new snapshot + ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good"); + ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64, + sequence_number); + } else { + // clean all the files we might have created + ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s", + s.ToString().c_str()); + // we have to delete the dir and all its children + std::vector subchildren; + db_->GetEnv()->GetChildren(full_private_path, &subchildren); + for (auto& subchild : subchildren) { + std::string subchild_path = full_private_path + "/" + subchild; + Status s1 = db_->GetEnv()->DeleteFile(subchild_path); + ROCKS_LOG_INFO(db_options.info_log, "Delete file %s -- %s", + subchild_path.c_str(), s1.ToString().c_str()); + } + // finally delete the private dir + Status s1 = db_->GetEnv()->DeleteDir(full_private_path); + ROCKS_LOG_INFO(db_options.info_log, "Delete dir %s -- %s", + full_private_path.c_str(), s1.ToString().c_str()); + } + return s; +} + +Status CheckpointImpl::CreateCustomCheckpoint( + const DBOptions& db_options, + std::function + link_file_cb, + std::function + copy_file_cb, + std::function + create_file_cb, + uint64_t* sequence_number, uint64_t log_size_for_flush) { + Status s; + std::vector live_files; + uint64_t manifest_file_size = 0; + uint64_t min_log_num = port::kMaxUint64; + *sequence_number = db_->GetLatestSequenceNumber(); + bool same_fs = true; + VectorLogPtr live_wal_files; + bool flush_memtable = true; if (s.ok()) { if (!db_options.allow_2pc) { - // If out standing log files are small, we skip the flush. - s = db_->GetSortedWalFiles(live_wal_files); + if (log_size_for_flush == port::kMaxUint64) { + flush_memtable = false; + } else if (log_size_for_flush > 0) { + // If out standing log files are small, we skip the flush. + s = db_->GetSortedWalFiles(live_wal_files); - if (!s.ok()) { - db_->EnableFileDeletions(false); - return s; - } + if (!s.ok()) { + return s; + } - // Don't flush column families if total log size is smaller than - // log_size_for_flush. We copy the log files instead. - // We may be able to cover 2PC case too. - uint64_t total_wal_size = 0; - for (auto& wal : live_wal_files) { - total_wal_size += wal->SizeFileBytes(); - } - if (total_wal_size < log_size_for_flush) { - flush_memtable = false; + // Don't flush column families if total log size is smaller than + // log_size_for_flush. We copy the log files instead. + // We may be able to cover 2PC case too. + uint64_t total_wal_size = 0; + for (auto& wal : live_wal_files) { + total_wal_size += wal->SizeFileBytes(); + } + if (total_wal_size < log_size_for_flush) { + flush_memtable = false; + } + live_wal_files.clear(); } - live_wal_files.clear(); } // this will return live_files prefixed with "/" @@ -112,7 +183,6 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, // Need to refetch the live files to recapture the snapshot. if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep, &min_log_num)) { - db_->EnableFileDeletions(false); return Status::InvalidArgument( "2PC enabled but cannot fine the min log number to keep."); } @@ -127,12 +197,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, // be skipped. 000003.log contains commit message of tnx1, but we don't // have respective prepare record for it. // In order to avoid this situation, we need to force flush to make sure - // all transactions commited before getting min_log_num will be flushed + // all transactions committed before getting min_log_num will be flushed // to SST files. // We cannot get min_log_num before calling the GetLiveFiles() for the // first time, because if we do that, all the logs files will be included, // far more than needed. - s = db_->GetLiveFiles(live_files, &manifest_file_size, /* flush */ true); + s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); } TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); @@ -143,20 +213,10 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, s = db_->GetSortedWalFiles(live_wal_files); } if (!s.ok()) { - db_->EnableFileDeletions(false); return s; } size_t wal_size = live_wal_files.size(); - ROCKS_LOG_INFO( - db_options.info_log, - "Started the snapshot process -- creating snapshot in directory %s", - checkpoint_dir.c_str()); - - std::string full_private_path = checkpoint_dir + ".tmp"; - - // create snapshot directory - s = db_->GetEnv()->CreateDir(full_private_path); // copy/hard link live_files std::string manifest_fname, current_fname; @@ -188,25 +248,21 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, // * if it's kDescriptorFile, limit the size to manifest_file_size // * always copy if cross-device link if ((type == kTableFile) && same_fs) { - ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", src_fname.c_str()); - s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname, - full_private_path + src_fname); + s = link_file_cb(db_->GetName(), src_fname, type); if (s.IsNotSupported()) { same_fs = false; s = Status::OK(); } } if ((type != kTableFile) || (!same_fs)) { - ROCKS_LOG_INFO(db_options.info_log, "Copying %s", src_fname.c_str()); - s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname, - full_private_path + src_fname, - (type == kDescriptorFile) ? manifest_file_size : 0, - db_options.use_fsync); + s = copy_file_cb(db_->GetName(), src_fname, + (type == kDescriptorFile) ? manifest_file_size : 0, + type); } } if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { - s = CreateFile(db_->GetEnv(), full_private_path + current_fname, - manifest_fname.substr(1) + "\n"); + create_file_cb(current_fname, manifest_fname.substr(1) + "\n", + kCurrentFile); } ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt, live_wal_files.size()); @@ -216,82 +272,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, for (size_t i = 0; s.ok() && i < wal_size; ++i) { if ((live_wal_files[i]->Type() == kAliveLogFile) && (!flush_memtable || - live_wal_files[i]->StartSequence() >= sequence_number || + live_wal_files[i]->StartSequence() >= *sequence_number || live_wal_files[i]->LogNumber() >= min_log_num)) { if (i + 1 == wal_size) { - ROCKS_LOG_INFO(db_options.info_log, "Copying %s", - live_wal_files[i]->PathName().c_str()); - s = CopyFile(db_->GetEnv(), - db_options.wal_dir + live_wal_files[i]->PathName(), - full_private_path + live_wal_files[i]->PathName(), - live_wal_files[i]->SizeFileBytes(), db_options.use_fsync); + s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), + live_wal_files[i]->SizeFileBytes(), kLogFile); break; } if (same_fs) { // we only care about live log files - ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", - live_wal_files[i]->PathName().c_str()); - s = db_->GetEnv()->LinkFile( - db_options.wal_dir + live_wal_files[i]->PathName(), - full_private_path + live_wal_files[i]->PathName()); + s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), + kLogFile); if (s.IsNotSupported()) { same_fs = false; s = Status::OK(); } } if (!same_fs) { - ROCKS_LOG_INFO(db_options.info_log, "Copying %s", - live_wal_files[i]->PathName().c_str()); - s = CopyFile(db_->GetEnv(), - db_options.wal_dir + live_wal_files[i]->PathName(), - full_private_path + live_wal_files[i]->PathName(), 0, - db_options.use_fsync); + s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0, + kLogFile); } } } - // we copied all the files, enable file deletions - db_->EnableFileDeletions(false); - - if (s.ok()) { - // move tmp private backup to real snapshot directory - s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir); - } - if (s.ok()) { - unique_ptr checkpoint_directory; - db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory); - if (checkpoint_directory != nullptr) { - s = checkpoint_directory->Fsync(); - } - } - - if (!s.ok()) { - // clean all the files we might have created - ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s", - s.ToString().c_str()); - // we have to delete the dir and all its children - std::vector subchildren; - db_->GetEnv()->GetChildren(full_private_path, &subchildren); - for (auto& subchild : subchildren) { - std::string subchild_path = full_private_path + "/" + subchild; - Status s1 = db_->GetEnv()->DeleteFile(subchild_path); - ROCKS_LOG_INFO(db_options.info_log, "Delete file %s -- %s", - subchild_path.c_str(), s1.ToString().c_str()); - } - // finally delete the private dir - Status s1 = db_->GetEnv()->DeleteDir(full_private_path); - ROCKS_LOG_INFO(db_options.info_log, "Delete dir %s -- %s", - full_private_path.c_str(), s1.ToString().c_str()); - return s; - } - - // here we know that we succeeded and installed the new snapshot - ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good"); - ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64, - sequence_number); - return s; } + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/checkpoint/checkpoint_impl.h b/utilities/checkpoint/checkpoint_impl.h new file mode 100644 index 000000000..f364b9ee6 --- /dev/null +++ b/utilities/checkpoint/checkpoint_impl.h @@ -0,0 +1,55 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/checkpoint.h" + +#include +#include "rocksdb/db.h" +#include "util/filename.h" + +namespace rocksdb { + +class CheckpointImpl : public Checkpoint { + public: + // Creates a Checkpoint object to be used for creating openable snapshots + explicit CheckpointImpl(DB* db) : db_(db) {} + + // Builds an openable snapshot of RocksDB on the same disk, which + // accepts an output directory on the same disk, and under the directory + // (1) hard-linked SST files pointing to existing live SST files + // SST files will be copied if output directory is on a different filesystem + // (2) a copied manifest files and other files + // The directory should not already exist and will be created by this API. + // The directory will be an absolute path + using Checkpoint::CreateCheckpoint; + virtual Status CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush) override; + + // Checkpoint logic can be customized by providing callbacks for link, copy, + // or create. + Status CreateCustomCheckpoint( + const DBOptions& db_options, + std::function + link_file_cb, + std::function + copy_file_cb, + std::function + create_file_cb, + uint64_t* sequence_number, uint64_t log_size_for_flush); + + private: + DB* db_; +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE