diff --git a/CMakeLists.txt b/CMakeLists.txt index c015454a9..b26b1a39e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -446,7 +446,7 @@ set(SOURCES util/xxhash.cc utilities/backupable/backupable_db.cc utilities/blob_db/blob_db.cc - utilities/checkpoint/checkpoint.cc + utilities/checkpoint/checkpoint_impl.cc utilities/col_buf_decoder.cc utilities/col_buf_encoder.cc utilities/column_aware_encoding_util.cc diff --git a/TARGETS b/TARGETS index 728b023ef..715110ba5 100644 --- a/TARGETS +++ b/TARGETS @@ -195,7 +195,7 @@ cpp_library( "util/xxhash.cc", "utilities/backupable/backupable_db.cc", "utilities/blob_db/blob_db.cc", - "utilities/checkpoint/checkpoint.cc", + "utilities/checkpoint/checkpoint_impl.cc", "utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc", "utilities/convenience/info_log_finder.cc", "utilities/date_tiered/date_tiered_db_impl.cc", diff --git a/include/rocksdb/utilities/backupable_db.h b/include/rocksdb/utilities/backupable_db.h index d80601c03..071f3e480 100644 --- a/include/rocksdb/utilities/backupable_db.h +++ b/include/rocksdb/utilities/backupable_db.h @@ -256,12 +256,14 @@ class BackupEngine { BackupEngine** backup_engine_ptr); // same as CreateNewBackup, but stores extra application metadata + // Flush will always trigger if 2PC is enabled. virtual Status CreateNewBackupWithMetadata( DB* db, const std::string& app_metadata, bool flush_before_backup = false, std::function progress_callback = []() {}) = 0; // Captures the state of the database in the latest backup // NOT a thread safe call + // Flush will always trigger if 2PC is enabled. virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false, std::function progress_callback = []() {}) { diff --git a/src.mk b/src.mk index 6f4303cc2..39f4218d0 100644 --- a/src.mk +++ b/src.mk @@ -150,7 +150,7 @@ LIB_SOURCES = \ util/xxhash.cc \ utilities/backupable/backupable_db.cc \ utilities/blob_db/blob_db.cc \ - utilities/checkpoint/checkpoint.cc \ + utilities/checkpoint/checkpoint_impl.cc \ utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc \ utilities/convenience/info_log_finder.cc \ utilities/date_tiered/date_tiered_db_impl.cc \ diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index 4e5aff20a..c40c78179 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -21,6 +21,7 @@ #include "util/logging.h" #include "util/string_util.h" #include "util/sync_point.h" +#include "utilities/checkpoint/checkpoint_impl.h" #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS @@ -702,28 +703,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( if (app_metadata.size() > kMaxAppMetaSize) { return Status::InvalidArgument("App metadata too large"); } - Status s; - std::vector live_files; - VectorLogPtr live_wal_files; - uint64_t manifest_file_size = 0; - uint64_t sequence_number = db->GetLatestSequenceNumber(); - - s = db->DisableFileDeletions(); - if (s.ok()) { - // this will return live_files prefixed with "/" - s = db->GetLiveFiles(live_files, &manifest_file_size, flush_before_backup); - } - // if we didn't flush before backup, we need to also get WAL files - if (s.ok() && !flush_before_backup && options_.backup_log_files) { - // returns file names prefixed with "/" - s = db->GetSortedWalFiles(live_wal_files); - } - if (!s.ok()) { - db->EnableFileDeletions(false); - return s; - } - TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles1"); - TEST_SYNC_POINT("BackupEngineImpl::CreateNewBackup:SavedLiveFiles2"); BackupID new_backup_id = latest_backup_id_ + 1; @@ -735,7 +714,6 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( assert(ret.second == true); auto& new_backup = ret.first->second; new_backup->RecordTimestamp(); - new_backup->SetSequenceNumber(sequence_number); new_backup->SetAppMetadata(app_metadata); auto start_backup = backup_env_-> NowMicros(); @@ -745,7 +723,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( new_backup_id); auto private_tmp_dir = GetAbsolutePath(GetPrivateFileRel(new_backup_id, true)); - s = backup_env_->FileExists(private_tmp_dir); + Status s = backup_env_->FileExists(private_tmp_dir); if (s.ok()) { // maybe last backup failed and left partial state behind, clean it up s = GarbageCollect(); @@ -767,81 +745,53 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( // This is used to check whether a live files shares a dst_path with another // live file. std::unordered_set live_dst_paths; - live_dst_paths.reserve(live_files.size() + live_wal_files.size()); - - // Pre-fetch sizes for data files - std::unordered_map data_path_to_size; - if (s.ok()) { - s = InsertPathnameToSizeBytes(db->GetName(), db_env_, &data_path_to_size); - } std::vector backup_items_to_finish; // Add a CopyOrCreateWorkItem to the channel for each live file - std::string manifest_fname, current_fname; - for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { - uint64_t number; - FileType type; - bool ok = ParseFileName(live_files[i], &number, &type); - if (!ok) { - assert(false); - return Status::Corruption("Can't parse file name. This is very bad"); - } - // we should only get sst, manifest and current files here - assert(type == kTableFile || type == kDescriptorFile || - type == kCurrentFile || type == kOptionsFile); - - if (type == kCurrentFile) { - // We will craft the current file manually to ensure it's consistent with - // the manifest number. This is necessary because current's file contents - // can change during backup. - current_fname = live_files[i]; - continue; - } else if (type == kDescriptorFile) { - manifest_fname = live_files[i]; - } - - auto data_path_to_size_iter = - data_path_to_size.find(db->GetName() + live_files[i]); - uint64_t size_bytes = data_path_to_size_iter == data_path_to_size.end() - ? port::kMaxUint64 - : data_path_to_size_iter->second; - - // rules: - // * if it's kTableFile, then it's shared - // * if it's kDescriptorFile, limit the size to manifest_file_size - s = AddBackupFileWorkItem( - live_dst_paths, backup_items_to_finish, new_backup_id, - options_.share_table_files && type == kTableFile, db->GetName(), - live_files[i], rate_limiter, size_bytes, - (type == kDescriptorFile) ? manifest_file_size : 0, - options_.share_files_with_checksum && type == kTableFile, - progress_callback); - } - if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { - // Write the current file with the manifest filename as its contents. - s = AddBackupFileWorkItem( - live_dst_paths, backup_items_to_finish, new_backup_id, - false /* shared */, "" /* src_dir */, CurrentFileName(""), rate_limiter, - manifest_fname.size(), 0 /* size_limit */, false /* shared_checksum */, - progress_callback, manifest_fname.substr(1) + "\n"); - } - ROCKS_LOG_INFO(options_.info_log, - "begin add wal files for backup -- %" ROCKSDB_PRIszt, - live_wal_files.size()); - // Add a CopyOrCreateWorkItem to the channel for each WAL file - for (size_t i = 0; s.ok() && i < live_wal_files.size(); ++i) { - uint64_t size_bytes = live_wal_files[i]->SizeFileBytes(); - if (live_wal_files[i]->Type() == kAliveLogFile) { - ROCKS_LOG_INFO(options_.info_log, - "add wal file for backup %s -- %" PRIu64, - live_wal_files[i]->PathName().c_str(), size_bytes); - // we only care about live log files - // copy the file into backup_dir/files// - s = AddBackupFileWorkItem(live_dst_paths, backup_items_to_finish, - new_backup_id, false, /* not shared */ - db->GetOptions().wal_dir, - live_wal_files[i]->PathName(), rate_limiter, - size_bytes, size_bytes); + db->DisableFileDeletions(); + if (s.ok()) { + CheckpointImpl checkpoint(db); + uint64_t sequence_number = 0; + s = checkpoint.CreateCustomCheckpoint( + db->GetDBOptions(), + [&](const std::string& src_dirname, const std::string& fname, + FileType) { + // custom checkpoint will switch to calling copy_file_cb after it sees + // NotSupported returned from link_file_cb. + return Status::NotSupported(); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType type) { + if (type == kLogFile && !options_.backup_log_files) { + return Status::OK(); + } + Log(options_.info_log, "add file for backup %s", fname.c_str()); + uint64_t size_bytes = 0; + Status st; + if (type == kTableFile) { + st = db_env_->GetFileSize(src_dirname + fname, &size_bytes); + } + if (st.ok()) { + st = AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + options_.share_table_files && type == kTableFile, src_dirname, + fname, rate_limiter, size_bytes, size_limit_bytes, + options_.share_files_with_checksum && type == kTableFile, + progress_callback); + } + return st; + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, FileType) { + Log(options_.info_log, "add file for backup %s", fname.c_str()); + return AddBackupFileWorkItem( + live_dst_paths, backup_items_to_finish, new_backup_id, + false /* shared */, "" /* src_dir */, fname, rate_limiter, + contents.size(), 0 /* size_limit */, false /* shared_checksum */, + progress_callback, contents); + } /* create_file_cb */, + &sequence_number, flush_before_backup ? 0 : port::kMaxUint64); + if (s.ok()) { + new_backup->SetSequenceNumber(sequence_number); } } ROCKS_LOG_INFO(options_.info_log, "add files for backup done, wait finish."); diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 507b6fbb6..6e6e38d3f 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -61,6 +61,10 @@ class DummyDB : public StackableDB { return options_; } + virtual DBOptions GetDBOptions() const override { + return DBOptions(options_); + } + virtual Status EnableFileDeletions(bool force) override { EXPECT_TRUE(!deletions_enabled_); deletions_enabled_ = true; @@ -106,9 +110,9 @@ class DummyDB : public StackableDB { } virtual SequenceNumber StartSequence() const override { - // backupabledb should not need this method - EXPECT_TRUE(false); - return 0; + // this seqnum guarantees the dummy file will be included in the backup + // as long as it is alive. + return kMaxSequenceNumber; } virtual uint64_t SizeFileBytes() const override { @@ -204,6 +208,14 @@ class TestEnv : public EnvWrapper { return EnvWrapper::DeleteFile(fname); } + virtual Status DeleteDir(const std::string& dirname) override { + MutexLock l(&mutex_); + if (fail_delete_files_) { + return Status::IOError(); + } + return EnvWrapper::DeleteDir(dirname); + } + void AssertWrittenFiles(std::vector& should_have_written) { MutexLock l(&mutex_); std::sort(should_have_written.begin(), should_have_written.end()); @@ -266,6 +278,19 @@ class TestEnv : public EnvWrapper { } return EnvWrapper::GetChildrenFileAttributes(dir, r); } + Status GetFileSize(const std::string& path, uint64_t* size_bytes) override { + if (filenames_for_mocked_attrs_.size() > 0) { + auto fname = path.substr(path.find_last_of('/')); + auto filename_iter = std::find(filenames_for_mocked_attrs_.begin(), + filenames_for_mocked_attrs_.end(), fname); + if (filename_iter != filenames_for_mocked_attrs_.end()) { + *size_bytes = 10; + return Status::OK(); + } + return Status::NotFound(fname); + } + return EnvWrapper::GetFileSize(path, size_bytes); + } void SetCreateDirIfMissingFailure(bool fail) { create_dir_if_missing_failure_ = fail; @@ -1374,10 +1399,10 @@ TEST_F(BackupableDBTest, ChangeManifestDuringBackupCreation) { FillDB(db_.get(), 0, 100); rocksdb::SyncPoint::GetInstance()->LoadDependency({ - {"BackupEngineImpl::CreateNewBackup:SavedLiveFiles1", + {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", "VersionSet::LogAndApply:WriteManifest"}, {"VersionSet::LogAndApply:WriteManifestDone", - "BackupEngineImpl::CreateNewBackup:SavedLiveFiles2"}, + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); diff --git a/utilities/checkpoint/checkpoint.cc b/utilities/checkpoint/checkpoint_impl.cc similarity index 65% rename from utilities/checkpoint/checkpoint.cc rename to utilities/checkpoint/checkpoint_impl.cc index fc9e16f5c..3242946af 100644 --- a/utilities/checkpoint/checkpoint.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -9,7 +9,7 @@ #ifndef ROCKSDB_LITE -#include "rocksdb/utilities/checkpoint.h" +#include "utilities/checkpoint/checkpoint_impl.h" #ifndef __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS @@ -18,37 +18,20 @@ #include #include #include +#include + #include "db/wal_manager.h" #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/transaction_log.h" +#include "rocksdb/utilities/checkpoint.h" #include "util/file_util.h" #include "util/filename.h" #include "util/sync_point.h" namespace rocksdb { -class CheckpointImpl : public Checkpoint { - public: - // Creates a Checkpoint object to be used for creating openable snapshots - explicit CheckpointImpl(DB* db) : db_(db) {} - - // Builds an openable snapshot of RocksDB on the same disk, which - // accepts an output directory on the same disk, and under the directory - // (1) hard-linked SST files pointing to existing live SST files - // SST files will be copied if output directory is on a different filesystem - // (2) a copied manifest files and other files - // The directory should not already exist and will be created by this API. - // The directory will be an absolute path - using Checkpoint::CreateCheckpoint; - virtual Status CreateCheckpoint(const std::string& checkpoint_dir, - uint64_t log_size_for_flush) override; - - private: - DB* db_; -}; - Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) { *checkpoint_ptr = new CheckpointImpl(db); return Status::OK(); @@ -62,16 +45,9 @@ Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir, // Builds an openable snapshot of RocksDB Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, uint64_t log_size_for_flush) { - Status s; - std::vector live_files; - uint64_t manifest_file_size = 0; DBOptions db_options = db_->GetDBOptions(); - uint64_t min_log_num = port::kMaxUint64; - uint64_t sequence_number = db_->GetLatestSequenceNumber(); - bool same_fs = true; - VectorLogPtr live_wal_files; - s = db_->GetEnv()->FileExists(checkpoint_dir); + Status s = db_->GetEnv()->FileExists(checkpoint_dir); if (s.ok()) { return Status::InvalidArgument("Directory exists"); } else if (!s.IsNotFound()) { @@ -79,29 +55,124 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, return s; } - s = db_->DisableFileDeletions(); + ROCKS_LOG_INFO( + db_options.info_log, + "Started the snapshot process -- creating snapshot in directory %s", + checkpoint_dir.c_str()); + std::string full_private_path = checkpoint_dir + ".tmp"; + // create snapshot directory + s = db_->GetEnv()->CreateDir(full_private_path); + uint64_t sequence_number = 0; + if (s.ok()) { + db_->DisableFileDeletions(); + s = CreateCustomCheckpoint( + db_options, + [&](const std::string& src_dirname, const std::string& fname, + FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str()); + return db_->GetEnv()->LinkFile(src_dirname + fname, + full_private_path + fname); + } /* link_file_cb */, + [&](const std::string& src_dirname, const std::string& fname, + uint64_t size_limit_bytes, FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str()); + return CopyFile(db_->GetEnv(), src_dirname + fname, + full_private_path + fname, size_limit_bytes, + db_options.use_fsync); + } /* copy_file_cb */, + [&](const std::string& fname, const std::string& contents, FileType) { + ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str()); + return CreateFile(db_->GetEnv(), full_private_path + fname, contents); + } /* create_file_cb */, + &sequence_number, log_size_for_flush); + // we copied all the files, enable file deletions + db_->EnableFileDeletions(false); + } + + if (s.ok()) { + // move tmp private backup to real snapshot directory + s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir); + } + if (s.ok()) { + unique_ptr checkpoint_directory; + db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory); + if (checkpoint_directory != nullptr) { + s = checkpoint_directory->Fsync(); + } + } + + if (s.ok()) { + // here we know that we succeeded and installed the new snapshot + ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good"); + ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64, + sequence_number); + } else { + // clean all the files we might have created + ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s", + s.ToString().c_str()); + // we have to delete the dir and all its children + std::vector subchildren; + db_->GetEnv()->GetChildren(full_private_path, &subchildren); + for (auto& subchild : subchildren) { + std::string subchild_path = full_private_path + "/" + subchild; + Status s1 = db_->GetEnv()->DeleteFile(subchild_path); + ROCKS_LOG_INFO(db_options.info_log, "Delete file %s -- %s", + subchild_path.c_str(), s1.ToString().c_str()); + } + // finally delete the private dir + Status s1 = db_->GetEnv()->DeleteDir(full_private_path); + ROCKS_LOG_INFO(db_options.info_log, "Delete dir %s -- %s", + full_private_path.c_str(), s1.ToString().c_str()); + } + return s; +} + +Status CheckpointImpl::CreateCustomCheckpoint( + const DBOptions& db_options, + std::function + link_file_cb, + std::function + copy_file_cb, + std::function + create_file_cb, + uint64_t* sequence_number, uint64_t log_size_for_flush) { + Status s; + std::vector live_files; + uint64_t manifest_file_size = 0; + uint64_t min_log_num = port::kMaxUint64; + *sequence_number = db_->GetLatestSequenceNumber(); + bool same_fs = true; + VectorLogPtr live_wal_files; + bool flush_memtable = true; if (s.ok()) { if (!db_options.allow_2pc) { - // If out standing log files are small, we skip the flush. - s = db_->GetSortedWalFiles(live_wal_files); + if (log_size_for_flush == port::kMaxUint64) { + flush_memtable = false; + } else if (log_size_for_flush > 0) { + // If out standing log files are small, we skip the flush. + s = db_->GetSortedWalFiles(live_wal_files); - if (!s.ok()) { - db_->EnableFileDeletions(false); - return s; - } + if (!s.ok()) { + return s; + } - // Don't flush column families if total log size is smaller than - // log_size_for_flush. We copy the log files instead. - // We may be able to cover 2PC case too. - uint64_t total_wal_size = 0; - for (auto& wal : live_wal_files) { - total_wal_size += wal->SizeFileBytes(); - } - if (total_wal_size < log_size_for_flush) { - flush_memtable = false; + // Don't flush column families if total log size is smaller than + // log_size_for_flush. We copy the log files instead. + // We may be able to cover 2PC case too. + uint64_t total_wal_size = 0; + for (auto& wal : live_wal_files) { + total_wal_size += wal->SizeFileBytes(); + } + if (total_wal_size < log_size_for_flush) { + flush_memtable = false; + } + live_wal_files.clear(); } - live_wal_files.clear(); } // this will return live_files prefixed with "/" @@ -112,7 +183,6 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, // Need to refetch the live files to recapture the snapshot. if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep, &min_log_num)) { - db_->EnableFileDeletions(false); return Status::InvalidArgument( "2PC enabled but cannot fine the min log number to keep."); } @@ -127,12 +197,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, // be skipped. 000003.log contains commit message of tnx1, but we don't // have respective prepare record for it. // In order to avoid this situation, we need to force flush to make sure - // all transactions commited before getting min_log_num will be flushed + // all transactions committed before getting min_log_num will be flushed // to SST files. // We cannot get min_log_num before calling the GetLiveFiles() for the // first time, because if we do that, all the logs files will be included, // far more than needed. - s = db_->GetLiveFiles(live_files, &manifest_file_size, /* flush */ true); + s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); } TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); @@ -143,20 +213,10 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, s = db_->GetSortedWalFiles(live_wal_files); } if (!s.ok()) { - db_->EnableFileDeletions(false); return s; } size_t wal_size = live_wal_files.size(); - ROCKS_LOG_INFO( - db_options.info_log, - "Started the snapshot process -- creating snapshot in directory %s", - checkpoint_dir.c_str()); - - std::string full_private_path = checkpoint_dir + ".tmp"; - - // create snapshot directory - s = db_->GetEnv()->CreateDir(full_private_path); // copy/hard link live_files std::string manifest_fname, current_fname; @@ -188,25 +248,21 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, // * if it's kDescriptorFile, limit the size to manifest_file_size // * always copy if cross-device link if ((type == kTableFile) && same_fs) { - ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", src_fname.c_str()); - s = db_->GetEnv()->LinkFile(db_->GetName() + src_fname, - full_private_path + src_fname); + s = link_file_cb(db_->GetName(), src_fname, type); if (s.IsNotSupported()) { same_fs = false; s = Status::OK(); } } if ((type != kTableFile) || (!same_fs)) { - ROCKS_LOG_INFO(db_options.info_log, "Copying %s", src_fname.c_str()); - s = CopyFile(db_->GetEnv(), db_->GetName() + src_fname, - full_private_path + src_fname, - (type == kDescriptorFile) ? manifest_file_size : 0, - db_options.use_fsync); + s = copy_file_cb(db_->GetName(), src_fname, + (type == kDescriptorFile) ? manifest_file_size : 0, + type); } } if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { - s = CreateFile(db_->GetEnv(), full_private_path + current_fname, - manifest_fname.substr(1) + "\n"); + create_file_cb(current_fname, manifest_fname.substr(1) + "\n", + kCurrentFile); } ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt, live_wal_files.size()); @@ -216,82 +272,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, for (size_t i = 0; s.ok() && i < wal_size; ++i) { if ((live_wal_files[i]->Type() == kAliveLogFile) && (!flush_memtable || - live_wal_files[i]->StartSequence() >= sequence_number || + live_wal_files[i]->StartSequence() >= *sequence_number || live_wal_files[i]->LogNumber() >= min_log_num)) { if (i + 1 == wal_size) { - ROCKS_LOG_INFO(db_options.info_log, "Copying %s", - live_wal_files[i]->PathName().c_str()); - s = CopyFile(db_->GetEnv(), - db_options.wal_dir + live_wal_files[i]->PathName(), - full_private_path + live_wal_files[i]->PathName(), - live_wal_files[i]->SizeFileBytes(), db_options.use_fsync); + s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), + live_wal_files[i]->SizeFileBytes(), kLogFile); break; } if (same_fs) { // we only care about live log files - ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", - live_wal_files[i]->PathName().c_str()); - s = db_->GetEnv()->LinkFile( - db_options.wal_dir + live_wal_files[i]->PathName(), - full_private_path + live_wal_files[i]->PathName()); + s = link_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), + kLogFile); if (s.IsNotSupported()) { same_fs = false; s = Status::OK(); } } if (!same_fs) { - ROCKS_LOG_INFO(db_options.info_log, "Copying %s", - live_wal_files[i]->PathName().c_str()); - s = CopyFile(db_->GetEnv(), - db_options.wal_dir + live_wal_files[i]->PathName(), - full_private_path + live_wal_files[i]->PathName(), 0, - db_options.use_fsync); + s = copy_file_cb(db_options.wal_dir, live_wal_files[i]->PathName(), 0, + kLogFile); } } } - // we copied all the files, enable file deletions - db_->EnableFileDeletions(false); - - if (s.ok()) { - // move tmp private backup to real snapshot directory - s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir); - } - if (s.ok()) { - unique_ptr checkpoint_directory; - db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory); - if (checkpoint_directory != nullptr) { - s = checkpoint_directory->Fsync(); - } - } - - if (!s.ok()) { - // clean all the files we might have created - ROCKS_LOG_INFO(db_options.info_log, "Snapshot failed -- %s", - s.ToString().c_str()); - // we have to delete the dir and all its children - std::vector subchildren; - db_->GetEnv()->GetChildren(full_private_path, &subchildren); - for (auto& subchild : subchildren) { - std::string subchild_path = full_private_path + "/" + subchild; - Status s1 = db_->GetEnv()->DeleteFile(subchild_path); - ROCKS_LOG_INFO(db_options.info_log, "Delete file %s -- %s", - subchild_path.c_str(), s1.ToString().c_str()); - } - // finally delete the private dir - Status s1 = db_->GetEnv()->DeleteDir(full_private_path); - ROCKS_LOG_INFO(db_options.info_log, "Delete dir %s -- %s", - full_private_path.c_str(), s1.ToString().c_str()); - return s; - } - - // here we know that we succeeded and installed the new snapshot - ROCKS_LOG_INFO(db_options.info_log, "Snapshot DONE. All is good"); - ROCKS_LOG_INFO(db_options.info_log, "Snapshot sequence number: %" PRIu64, - sequence_number); - return s; } + } // namespace rocksdb #endif // ROCKSDB_LITE diff --git a/utilities/checkpoint/checkpoint_impl.h b/utilities/checkpoint/checkpoint_impl.h new file mode 100644 index 000000000..f364b9ee6 --- /dev/null +++ b/utilities/checkpoint/checkpoint_impl.h @@ -0,0 +1,55 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/utilities/checkpoint.h" + +#include +#include "rocksdb/db.h" +#include "util/filename.h" + +namespace rocksdb { + +class CheckpointImpl : public Checkpoint { + public: + // Creates a Checkpoint object to be used for creating openable snapshots + explicit CheckpointImpl(DB* db) : db_(db) {} + + // Builds an openable snapshot of RocksDB on the same disk, which + // accepts an output directory on the same disk, and under the directory + // (1) hard-linked SST files pointing to existing live SST files + // SST files will be copied if output directory is on a different filesystem + // (2) a copied manifest files and other files + // The directory should not already exist and will be created by this API. + // The directory will be an absolute path + using Checkpoint::CreateCheckpoint; + virtual Status CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush) override; + + // Checkpoint logic can be customized by providing callbacks for link, copy, + // or create. + Status CreateCustomCheckpoint( + const DBOptions& db_options, + std::function + link_file_cb, + std::function + copy_file_cb, + std::function + create_file_cb, + uint64_t* sequence_number, uint64_t log_size_for_flush); + + private: + DB* db_; +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE