diff --git a/include/rocksdb/utilities/checkpoint.h b/include/rocksdb/utilities/checkpoint.h index b4523c25e..342625618 100644 --- a/include/rocksdb/utilities/checkpoint.h +++ b/include/rocksdb/utilities/checkpoint.h @@ -27,7 +27,14 @@ class Checkpoint { // (2) a copied manifest files and other files // The directory should not already exist and will be created by this API. // The directory will be an absolute path - virtual Status CreateCheckpoint(const std::string& checkpoint_dir); + // log_size_for_flush: if the total log file size is equal or larger than + // this value, then a flush is triggered for all the column families. The + // default value is 0, which means flush is always triggered. If you move + // away from the default, the checkpoint may not contain up-to-date data + // if WAL writing is not always enabled. + // Flush will always trigger if it is 2PC. + virtual Status CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush = 0); virtual ~Checkpoint() {} }; diff --git a/utilities/checkpoint/checkpoint.cc b/utilities/checkpoint/checkpoint.cc index cc6e97184..840762435 100644 --- a/utilities/checkpoint/checkpoint.cc +++ b/utilities/checkpoint/checkpoint.cc @@ -42,7 +42,8 @@ class CheckpointImpl : public Checkpoint { // The directory should not already exist and will be created by this API. // The directory will be an absolute path using Checkpoint::CreateCheckpoint; - virtual Status CreateCheckpoint(const std::string& checkpoint_dir) override; + virtual Status CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush) override; private: DB* db_; @@ -53,12 +54,14 @@ Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) { return Status::OK(); } -Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir) { +Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush) { return Status::NotSupported(""); } // Builds an openable snapshot of RocksDB -Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { +Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir, + uint64_t log_size_for_flush) { Status s; std::vector live_files; uint64_t manifest_file_size = 0; @@ -77,9 +80,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { } s = db_->DisableFileDeletions(); + bool flush_memtable = true; if (s.ok()) { + if (!db_options.allow_2pc) { + // If out standing log files are small, we skip the flush. + s = db_->GetSortedWalFiles(live_wal_files); + + if (!s.ok()) { + db_->EnableFileDeletions(false); + return s; + } + + // Don't flush column families if total log size is smaller than + // log_size_for_flush. We copy the log files instead. + // We may be able to cover 2PC case too. + uint64_t total_wal_size = 0; + for (auto& wal : live_wal_files) { + total_wal_size += wal->SizeFileBytes(); + } + if (total_wal_size < log_size_for_flush) { + flush_memtable = false; + } + live_wal_files.clear(); + } + // this will return live_files prefixed with "/" - s = db_->GetLiveFiles(live_files, &manifest_file_size); + s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); if (s.ok() && db_options.allow_2pc) { // If 2PC is enabled, we need to get minimum log number after the flush. @@ -189,7 +215,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { // that has changes after the last flush. for (size_t i = 0; s.ok() && i < wal_size; ++i) { if ((live_wal_files[i]->Type() == kAliveLogFile) && - (live_wal_files[i]->StartSequence() >= sequence_number || + (!flush_memtable || + live_wal_files[i]->StartSequence() >= sequence_number || live_wal_files[i]->LogNumber() >= min_log_num)) { if (i + 1 == wal_size) { ROCKS_LOG_INFO(db_options.info_log, "Copying %s", diff --git a/utilities/checkpoint/checkpoint_test.cc b/utilities/checkpoint/checkpoint_test.cc index be9fe240f..46988898b 100644 --- a/utilities/checkpoint/checkpoint_test.cc +++ b/utilities/checkpoint/checkpoint_test.cc @@ -217,58 +217,60 @@ class CheckpointTest : public testing::Test { }; TEST_F(CheckpointTest, GetSnapshotLink) { - Options options; - const std::string snapshot_name = test::TmpDir(env_) + "/snapshot"; - DB* snapshotDB; - ReadOptions roptions; - std::string result; - Checkpoint* checkpoint; + for (uint64_t log_size_for_fush : {0, 1000000}) { + Options options; + const std::string snapshot_name = test::TmpDir(env_) + "/snapshot"; + DB* snapshotDB; + ReadOptions roptions; + std::string result; + Checkpoint* checkpoint; - options = CurrentOptions(); - delete db_; - db_ = nullptr; - ASSERT_OK(DestroyDB(dbname_, options)); - ASSERT_OK(DestroyDB(snapshot_name, options)); - env_->DeleteDir(snapshot_name); + options = CurrentOptions(); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + ASSERT_OK(DestroyDB(snapshot_name, options)); + env_->DeleteDir(snapshot_name); - // Create a database - Status s; - options.create_if_missing = true; - ASSERT_OK(DB::Open(options, dbname_, &db_)); - std::string key = std::string("foo"); - ASSERT_OK(Put(key, "v1")); - // Take a snapshot - ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); - ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name)); - ASSERT_OK(Put(key, "v2")); - ASSERT_EQ("v2", Get(key)); - ASSERT_OK(Flush()); - ASSERT_EQ("v2", Get(key)); - // Open snapshot and verify contents while DB is running - options.create_if_missing = false; - ASSERT_OK(DB::Open(options, snapshot_name, &snapshotDB)); - ASSERT_OK(snapshotDB->Get(roptions, key, &result)); - ASSERT_EQ("v1", result); - delete snapshotDB; - snapshotDB = nullptr; - delete db_; - db_ = nullptr; + // Create a database + Status s; + options.create_if_missing = true; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + std::string key = std::string("foo"); + ASSERT_OK(Put(key, "v1")); + // Take a snapshot + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name, log_size_for_fush)); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_OK(Flush()); + ASSERT_EQ("v2", Get(key)); + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + ASSERT_OK(DB::Open(options, snapshot_name, &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, key, &result)); + ASSERT_EQ("v1", result); + delete snapshotDB; + snapshotDB = nullptr; + delete db_; + db_ = nullptr; - // Destroy original DB - ASSERT_OK(DestroyDB(dbname_, options)); + // Destroy original DB + ASSERT_OK(DestroyDB(dbname_, options)); - // Open snapshot and verify contents - options.create_if_missing = false; - dbname_ = snapshot_name; - ASSERT_OK(DB::Open(options, dbname_, &db_)); - ASSERT_EQ("v1", Get(key)); - delete db_; - db_ = nullptr; - ASSERT_OK(DestroyDB(dbname_, options)); - delete checkpoint; + // Open snapshot and verify contents + options.create_if_missing = false; + dbname_ = snapshot_name; + ASSERT_OK(DB::Open(options, dbname_, &db_)); + ASSERT_EQ("v1", Get(key)); + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, options)); + delete checkpoint; - // Restore DB name - dbname_ = test::TmpDir(env_) + "/db_test"; + // Restore DB name + dbname_ = test::TmpDir(env_) + "/db_test"; + } } TEST_F(CheckpointTest, CheckpointCF) { @@ -345,6 +347,71 @@ TEST_F(CheckpointTest, CheckpointCF) { ASSERT_OK(DestroyDB(snapshot_name, options)); } +TEST_F(CheckpointTest, CheckpointCFNoFlush) { + Options options = CurrentOptions(); + CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(Put(0, "Default", "Default")); + ASSERT_OK(Put(1, "one", "one")); + Flush(); + ASSERT_OK(Put(2, "two", "two")); + + const std::string snapshot_name = test::TmpDir(env_) + "/snapshot"; + DB* snapshotDB; + ReadOptions roptions; + std::string result; + std::vector cphandles; + + ASSERT_OK(DestroyDB(snapshot_name, options)); + env_->DeleteDir(snapshot_name); + + Status s; + // Take a snapshot + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void* arg) { + // Flush should never trigger. + ASSERT_TRUE(false); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + Checkpoint* checkpoint; + ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); + ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name, 1000000)); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + delete checkpoint; + ASSERT_OK(Put(1, "one", "two")); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(2, "two", "twentytwo")); + Close(); + EXPECT_OK(DestroyDB(dbname_, options)); + + // Open snapshot and verify contents while DB is running + options.create_if_missing = false; + std::vector cfs; + cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"}; + std::vector column_families; + for (size_t i = 0; i < cfs.size(); ++i) { + column_families.push_back(ColumnFamilyDescriptor(cfs[i], options)); + } + ASSERT_OK(DB::Open(options, snapshot_name, column_families, &cphandles, + &snapshotDB)); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result)); + ASSERT_EQ("Default", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result)); + ASSERT_EQ("one", result); + ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result)); + ASSERT_EQ("two", result); + for (auto h : cphandles) { + delete h; + } + cphandles.clear(); + delete snapshotDB; + snapshotDB = nullptr; + ASSERT_OK(DestroyDB(snapshot_name, options)); +} + TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) { const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot"; ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions()));