Allow checkpointing without flushing

Summary:
Add a parameter to Checkpoint::CreateCheckpoint() so that flush can be skipped if total log file size is within a threshold.
Closes https://github.com/facebook/rocksdb/pull/1993

Differential Revision: D4719842

Pulled By: siying

fbshipit-source-id: 4f9d9e1
main
Siying Dong 8 years ago committed by Facebook Github Bot
parent 17866ecc3a
commit 9ef3627fd3
  1. 9
      include/rocksdb/utilities/checkpoint.h
  2. 37
      utilities/checkpoint/checkpoint.cc
  3. 69
      utilities/checkpoint/checkpoint_test.cc

@ -27,7 +27,14 @@ class Checkpoint {
// (2) a copied manifest files and other files // (2) a copied manifest files and other files
// The directory should not already exist and will be created by this API. // The directory should not already exist and will be created by this API.
// The directory will be an absolute path // The directory will be an absolute path
virtual Status CreateCheckpoint(const std::string& checkpoint_dir); // log_size_for_flush: if the total log file size is equal or larger than
// this value, then a flush is triggered for all the column families. The
// default value is 0, which means flush is always triggered. If you move
// away from the default, the checkpoint may not contain up-to-date data
// if WAL writing is not always enabled.
// Flush will always trigger if it is 2PC.
virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush = 0);
virtual ~Checkpoint() {} virtual ~Checkpoint() {}
}; };

@ -42,7 +42,8 @@ class CheckpointImpl : public Checkpoint {
// The directory should not already exist and will be created by this API. // The directory should not already exist and will be created by this API.
// The directory will be an absolute path // The directory will be an absolute path
using Checkpoint::CreateCheckpoint; using Checkpoint::CreateCheckpoint;
virtual Status CreateCheckpoint(const std::string& checkpoint_dir) override; virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush) override;
private: private:
DB* db_; DB* db_;
@ -53,12 +54,14 @@ Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
return Status::OK(); return Status::OK();
} }
Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir) { Status Checkpoint::CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush) {
return Status::NotSupported(""); return Status::NotSupported("");
} }
// Builds an openable snapshot of RocksDB // Builds an openable snapshot of RocksDB
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) { Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush) {
Status s; Status s;
std::vector<std::string> live_files; std::vector<std::string> live_files;
uint64_t manifest_file_size = 0; uint64_t manifest_file_size = 0;
@ -77,9 +80,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
} }
s = db_->DisableFileDeletions(); s = db_->DisableFileDeletions();
bool flush_memtable = true;
if (s.ok()) { if (s.ok()) {
if (!db_options.allow_2pc) {
// If out standing log files are small, we skip the flush.
s = db_->GetSortedWalFiles(live_wal_files);
if (!s.ok()) {
db_->EnableFileDeletions(false);
return s;
}
// Don't flush column families if total log size is smaller than
// log_size_for_flush. We copy the log files instead.
// We may be able to cover 2PC case too.
uint64_t total_wal_size = 0;
for (auto& wal : live_wal_files) {
total_wal_size += wal->SizeFileBytes();
}
if (total_wal_size < log_size_for_flush) {
flush_memtable = false;
}
live_wal_files.clear();
}
// this will return live_files prefixed with "/" // this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size); s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
if (s.ok() && db_options.allow_2pc) { if (s.ok() && db_options.allow_2pc) {
// If 2PC is enabled, we need to get minimum log number after the flush. // If 2PC is enabled, we need to get minimum log number after the flush.
@ -189,7 +215,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir) {
// that has changes after the last flush. // that has changes after the last flush.
for (size_t i = 0; s.ok() && i < wal_size; ++i) { for (size_t i = 0; s.ok() && i < wal_size; ++i) {
if ((live_wal_files[i]->Type() == kAliveLogFile) && if ((live_wal_files[i]->Type() == kAliveLogFile) &&
(live_wal_files[i]->StartSequence() >= sequence_number || (!flush_memtable ||
live_wal_files[i]->StartSequence() >= sequence_number ||
live_wal_files[i]->LogNumber() >= min_log_num)) { live_wal_files[i]->LogNumber() >= min_log_num)) {
if (i + 1 == wal_size) { if (i + 1 == wal_size) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", ROCKS_LOG_INFO(db_options.info_log, "Copying %s",

@ -217,6 +217,7 @@ class CheckpointTest : public testing::Test {
}; };
TEST_F(CheckpointTest, GetSnapshotLink) { TEST_F(CheckpointTest, GetSnapshotLink) {
for (uint64_t log_size_for_fush : {0, 1000000}) {
Options options; Options options;
const std::string snapshot_name = test::TmpDir(env_) + "/snapshot"; const std::string snapshot_name = test::TmpDir(env_) + "/snapshot";
DB* snapshotDB; DB* snapshotDB;
@ -239,7 +240,7 @@ TEST_F(CheckpointTest, GetSnapshotLink) {
ASSERT_OK(Put(key, "v1")); ASSERT_OK(Put(key, "v1"));
// Take a snapshot // Take a snapshot
ASSERT_OK(Checkpoint::Create(db_, &checkpoint)); ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name)); ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name, log_size_for_fush));
ASSERT_OK(Put(key, "v2")); ASSERT_OK(Put(key, "v2"));
ASSERT_EQ("v2", Get(key)); ASSERT_EQ("v2", Get(key));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
@ -269,6 +270,7 @@ TEST_F(CheckpointTest, GetSnapshotLink) {
// Restore DB name // Restore DB name
dbname_ = test::TmpDir(env_) + "/db_test"; dbname_ = test::TmpDir(env_) + "/db_test";
}
} }
TEST_F(CheckpointTest, CheckpointCF) { TEST_F(CheckpointTest, CheckpointCF) {
@ -345,6 +347,71 @@ TEST_F(CheckpointTest, CheckpointCF) {
ASSERT_OK(DestroyDB(snapshot_name, options)); ASSERT_OK(DestroyDB(snapshot_name, options));
} }
TEST_F(CheckpointTest, CheckpointCFNoFlush) {
Options options = CurrentOptions();
CreateAndReopenWithCF({"one", "two", "three", "four", "five"}, options);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(0, "Default", "Default"));
ASSERT_OK(Put(1, "one", "one"));
Flush();
ASSERT_OK(Put(2, "two", "two"));
const std::string snapshot_name = test::TmpDir(env_) + "/snapshot";
DB* snapshotDB;
ReadOptions roptions;
std::string result;
std::vector<ColumnFamilyHandle*> cphandles;
ASSERT_OK(DestroyDB(snapshot_name, options));
env_->DeleteDir(snapshot_name);
Status s;
// Take a snapshot
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCallFlush:start", [&](void* arg) {
// Flush should never trigger.
ASSERT_TRUE(false);
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Checkpoint* checkpoint;
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name, 1000000));
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
delete checkpoint;
ASSERT_OK(Put(1, "one", "two"));
ASSERT_OK(Flush(1));
ASSERT_OK(Put(2, "two", "twentytwo"));
Close();
EXPECT_OK(DestroyDB(dbname_, options));
// Open snapshot and verify contents while DB is running
options.create_if_missing = false;
std::vector<std::string> cfs;
cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
}
ASSERT_OK(DB::Open(options, snapshot_name, column_families, &cphandles,
&snapshotDB));
ASSERT_OK(snapshotDB->Get(roptions, cphandles[0], "Default", &result));
ASSERT_EQ("Default", result);
ASSERT_OK(snapshotDB->Get(roptions, cphandles[1], "one", &result));
ASSERT_EQ("one", result);
ASSERT_OK(snapshotDB->Get(roptions, cphandles[2], "two", &result));
ASSERT_EQ("two", result);
for (auto h : cphandles) {
delete h;
}
cphandles.clear();
delete snapshotDB;
snapshotDB = nullptr;
ASSERT_OK(DestroyDB(snapshot_name, options));
}
TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) { TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing) {
const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot"; const std::string kSnapshotName = test::TmpDir(env_) + "/snapshot";
ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions())); ASSERT_OK(DestroyDB(kSnapshotName, CurrentOptions()));

Loading…
Cancel
Save