diff --git a/util/file_util.cc b/util/file_util.cc index 0748a4cf9..c14309da2 100644 --- a/util/file_util.cc +++ b/util/file_util.cc @@ -66,6 +66,22 @@ Status CopyFile(Env* env, const std::string& source, return Status::OK(); } +// Utility function to create a file with the provided contents +Status CreateFile(Env* env, const std::string& destination, + const std::string& contents) { + const EnvOptions soptions; + Status s; + unique_ptr dest_writer; + + unique_ptr destfile; + s = env->NewWritableFile(destination, &destfile, soptions); + if (!s.ok()) { + return s; + } + dest_writer.reset(new WritableFileWriter(std::move(destfile), soptions)); + return dest_writer->Append(Slice(contents)); +} + Status DeleteSSTFile(const DBOptions* db_options, const std::string& fname, uint32_t path_id) { // TODO(tec): support sst_file_manager for multiple path_ids diff --git a/util/file_util.h b/util/file_util.h index b5cb0cf66..5b2320e33 100644 --- a/util/file_util.h +++ b/util/file_util.h @@ -16,6 +16,9 @@ namespace rocksdb { extern Status CopyFile(Env* env, const std::string& source, const std::string& destination, uint64_t size = 0); +extern Status CreateFile(Env* env, const std::string& destination, + const std::string& contents); + extern Status DeleteSSTFile(const DBOptions* db_options, const std::string& fname, uint32_t path_id); diff --git a/utilities/checkpoint/checkpoint.cc b/utilities/checkpoint/checkpoint.cc index dd41d9cdf..b8543bb5b 100644 --- a/utilities/checkpoint/checkpoint.cc +++ b/utilities/checkpoint/checkpoint.cc @@ -24,6 +24,7 @@ #include "rocksdb/env.h" #include "rocksdb/transaction_log.h" #include "util/file_util.h" +#include "util/sync_point.h" #include "port/port.h" namespace rocksdb { @@ -76,7 +77,9 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { s = db_->DisableFileDeletions(); if (s.ok()) { // this will return live_files prefixed with "/" - s = db_->GetLiveFiles(live_files, &manifest_file_size, true); + s = db_->GetLiveFiles(live_files, &manifest_file_size); + TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); + TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"); } // if we have more than one column family, we need to also get WAL files if (s.ok()) { @@ -98,6 +101,7 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { s = db_->GetEnv()->CreateDir(full_private_path); // copy/hard link live_files + std::string manifest_fname, current_fname; for (size_t i = 0; s.ok() && i < live_files.size(); ++i) { uint64_t number; FileType type; @@ -110,6 +114,15 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { assert(type == kTableFile || type == kDescriptorFile || type == kCurrentFile); assert(live_files[i].size() > 0 && live_files[i][0] == '/'); + if (type == kCurrentFile) { + // We will craft the current file manually to ensure it's consistent with + // the manifest number. This is necessary because current's file contents + // can change during checkpoint creation. + current_fname = live_files[i]; + continue; + } else if (type == kDescriptorFile) { + manifest_fname = live_files[i]; + } std::string src_fname = live_files[i]; // rules: @@ -132,6 +145,10 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { (type == kDescriptorFile) ? manifest_file_size : 0); } } + if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) { + s = CreateFile(db_->GetEnv(), full_private_path + current_fname, + manifest_fname.substr(1) + "\n"); + } Log(db_->GetOptions().info_log, "Number of log files %" ROCKSDB_PRIszt, live_wal_files.size()); diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index 42d180bba..3336e5af5 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -346,6 +346,50 @@ TEST_F(DBTest, CheckpointCF) { ASSERT_OK(DestroyDB(snapshot_name, options)); } +TEST_F(DBTest, CurrentFileModifiedWhileCheckpointing) { + const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot"; + ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions())); + env_->DeleteDir(kSnapshotName); + + Options options = CurrentOptions(); + options.max_manifest_file_size = 0; // always rollover manifest for file add + Reopen(options); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {// Get past the flush in the checkpoint thread before adding any keys to + // the db so the checkpoint thread won't hit the WriteManifest + // syncpoints. + {"DBImpl::GetLiveFiles:1", + "DBTest::CurrentFileModifiedWhileCheckpointing:PrePut"}, + // Roll the manifest during checkpointing right after live files are + // snapshotted. + {"CheckpointImpl::CreateCheckpoint:SavedLiveFiles1", + "VersionSet::LogAndApply:WriteManifest"}, + {"VersionSet::LogAndApply:WriteManifestDone", + "CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::thread t([&]() { + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(kSnapshotName)); + delete checkpoint; + }); + TEST_SYNC_POINT("DBTest::CurrentFileModifiedWhileCheckpointing:PrePut"); + ASSERT_OK(Put("Default", "Default1")); + ASSERT_OK(Flush()); + t.join(); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + DB* snapshotDB; + // Successful Open() implies that CURRENT pointed to the manifest in the + // checkpoint. + ASSERT_OK(DB::Open(options, kSnapshotName, &snapshotDB)); + delete snapshotDB; + snapshotDB = nullptr; +} + } // namespace rocksdb int main(int argc, char** argv) {