Checkpoint dir options fix (#8572)

Summary:
Originally the 2 options `db_log_dir` and `wal_dir` will be reused in a snapshot db since the options files are just copied. By default, if `wal_dir` was not set when a db was created, it is set to the db's dir. Therefore, the snapshot db will use the same WAL dir. If both the original db and the snapshot db write to or delete from the WAL dir, one may modify or delete files which belong to the other. The same applies to `db_log_dir` as well, but as info log files are not copied or linked, it is simpler for this option.

2 arguments are added to `Checkpoint::CreateCheckpoint()`, allowing to override these 2 options.

`wal_dir`:  If the function argument `wal_dir` is empty, or set to the original db location, or the checkpoint location, the snapshot's `wal_dir` option will be updated to the checkpoint location. Otherwise, the absolute path specified in the argument will be used. During checkpointing, live WAL files will be copied or linked the new location, instead of the current WAL dir specified in the original db.

`db_log_dir`: Same as `wal_dir`, but no files will be copied or linked.

A new unit test was added: `CheckpointTest.CheckpointWithOptionsDirsTest`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8572

Test Plan:
New unit test
```
checkpoint_test --gtest_filter="CheckpointTest.CheckpointWithOptionsDirsTest"
```

Output
```
Note: Google Test filter = CheckpointTest.CheckpointWithOptionsDirsTest
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from CheckpointTest
[ RUN      ] CheckpointTest.CheckpointWithOptionsDirsTest
[       OK ] CheckpointTest.CheckpointWithOptionsDirsTest (11712 ms)
[----------] 1 test from CheckpointTest (11712 ms total)

[----------] Global test environment tear-down
[==========] 1 test from 1 test case ran. (11713 ms total)
[  PASSED  ] 1 test.
```
This test will fail without this patch. Just modify the code to remove the 2 arguments introduced in this patch in `CreateCheckpoint()`.

Reviewed By: zhichao-cao

Differential Revision: D29832761

Pulled By: autopear

fbshipit-source-id: e6a639b4d674380df82998c0839e79cab695fe29
main
Merlin Mao 3 years ago committed by Facebook GitHub Bot
parent 3b27725245
commit 55f7ded80d
  1. 6
      include/rocksdb/utilities/checkpoint.h
  2. 124
      utilities/checkpoint/checkpoint_impl.cc
  3. 9
      utilities/checkpoint/checkpoint_impl.h
  4. 149
      utilities/checkpoint/checkpoint_test.cc

@ -40,9 +40,13 @@ class Checkpoint {
// sequence_number_ptr: if it is not nullptr, the value it points to will be
// set to the DB's sequence number. The default value of this parameter is
// nullptr.
// db_log_dir / wal_dir: override db_log_dir or wal_dir option in the
// snapshot. If empty, checkpoint_dir will be used.
virtual Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush = 0,
uint64_t* sequence_number_ptr = nullptr);
uint64_t* sequence_number_ptr = nullptr,
const std::string& db_log_dir = "",
const std::string& wal_dir = "");
// Exports all live SST files of a specified Column Family onto export_dir,
// returning SST files information in metadata.

@ -20,12 +20,14 @@
#include "db/wal_manager.h"
#include "file/file_util.h"
#include "file/filename.h"
#include "options/options_parser.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/metadata.h"
#include "rocksdb/transaction_log.h"
#include "rocksdb/utilities/checkpoint.h"
#include "rocksdb/utilities/options_util.h"
#include "test_util/sync_point.h"
#include "util/cast_util.h"
#include "util/file_checksum_helper.h"
@ -39,7 +41,9 @@ Status Checkpoint::Create(DB* db, Checkpoint** checkpoint_ptr) {
Status Checkpoint::CreateCheckpoint(const std::string& /*checkpoint_dir*/,
uint64_t /*log_size_for_flush*/,
uint64_t* /*sequence_number_ptr*/) {
uint64_t* /*sequence_number_ptr*/,
const std::string& /*db_log_dir*/,
const std::string& /*wal_dir*/) {
return Status::NotSupported("");
}
@ -76,7 +80,9 @@ Status Checkpoint::ExportColumnFamily(
// Builds an openable snapshot of RocksDB
Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush,
uint64_t* sequence_number_ptr) {
uint64_t* sequence_number_ptr,
const std::string& db_log_dir,
const std::string& wal_dir) {
DBOptions db_options = db_->GetDBOptions();
Status s = db_->GetEnv()->FileExists(checkpoint_dir);
@ -101,15 +107,54 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
return Status::InvalidArgument("invalid checkpoint directory name");
}
std::string full_private_path =
checkpoint_dir.substr(0, final_nonslash_idx + 1) + ".tmp";
ROCKS_LOG_INFO(
db_options.info_log,
"Snapshot process -- using temporary directory %s",
full_private_path.c_str());
std::string parsed_checkpoint_dir =
checkpoint_dir.substr(0, final_nonslash_idx + 1);
std::string full_private_path = parsed_checkpoint_dir + ".tmp";
ROCKS_LOG_INFO(db_options.info_log,
"Snapshot process -- using temporary directory %s",
full_private_path.c_str());
CleanStagingDirectory(full_private_path, db_options.info_log.get());
// create snapshot directory
s = db_->GetEnv()->CreateDir(full_private_path);
// Remove the last `/`s if needed
std::string parsed_log_dir =
db_log_dir.empty()
? ""
: db_log_dir.substr(0, db_log_dir.find_last_not_of('/') + 1);
std::string parsed_wal_dir =
wal_dir.empty() ? ""
: wal_dir.substr(0, wal_dir.find_last_not_of('/') + 1);
// Info log files are not copied or linked, just update the option value.
std::string value_log_dir = parsed_log_dir == db_->GetName() ||
parsed_log_dir == parsed_checkpoint_dir
? ""
: parsed_log_dir;
// If the wal_dir is empty, or the same as the source db dir, update the
// option value to the checkpoint dir.
std::string value_wal_dir; // Option value to override
std::string new_wal_dir; // The target location to copy/link WAL files
if (parsed_wal_dir.empty() || parsed_wal_dir == db_->GetName() ||
parsed_wal_dir == parsed_checkpoint_dir) {
value_wal_dir = parsed_checkpoint_dir;
new_wal_dir = full_private_path; // Copy to the temp dir
} else {
value_wal_dir = parsed_wal_dir;
std::string prefix = parsed_checkpoint_dir + "/";
// If checkpoint_dir is parent of wal_dir, create the wal dir inside the tmp
// dir; otherwise, create it directly.
new_wal_dir =
parsed_wal_dir.rfind(prefix, 0) == 0
? full_private_path + "/" + parsed_wal_dir.substr(prefix.size())
: parsed_wal_dir;
s = db_->GetEnv()->FileExists(new_wal_dir);
if (s.IsNotFound()) {
s = db_->GetEnv()->CreateDir(new_wal_dir);
}
}
uint64_t sequence_number = 0;
if (s.ok()) {
// enable file deletions
@ -120,21 +165,32 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
s = CreateCustomCheckpoint(
db_options,
[&](const std::string& src_dirname, const std::string& fname,
FileType) {
FileType type) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
fname.c_str());
return db_->GetFileSystem()->LinkFile(src_dirname + fname,
full_private_path + fname,
IOOptions(), nullptr);
// WAL file links may be created in another location.
return db_->GetFileSystem()->LinkFile(
src_dirname + fname,
(type == kWalFile ? new_wal_dir : full_private_path) + fname,
IOOptions(), nullptr);
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType,
uint64_t size_limit_bytes, FileType type,
const std::string& /* checksum_func_name */,
const std::string& /* checksum_val */) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
return CopyFile(db_->GetFileSystem(), src_dirname + fname,
full_private_path + fname, size_limit_bytes,
db_options.use_fsync);
if (type == kOptionsFile) {
// Modify and rewrite option files
return CopyOptionsFile(src_dirname + fname,
full_private_path + fname, value_log_dir,
value_wal_dir);
} else {
// Copy other files. WAL files may be copied to another location.
return Status(CopyFile(
db_->GetFileSystem(), src_dirname + fname,
(type == kWalFile ? new_wal_dir : full_private_path) + fname,
size_limit_bytes, db_options.use_fsync));
}
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
@ -154,11 +210,12 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
if (s.ok()) {
// move tmp private backup to real snapshot directory
s = db_->GetEnv()->RenameFile(full_private_path, checkpoint_dir);
s = db_->GetEnv()->RenameFile(full_private_path, parsed_checkpoint_dir);
}
if (s.ok()) {
std::unique_ptr<Directory> checkpoint_directory;
s = db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
s = db_->GetEnv()->NewDirectory(parsed_checkpoint_dir,
&checkpoint_directory);
if (s.ok() && checkpoint_directory != nullptr) {
s = checkpoint_directory->Fsync();
}
@ -588,6 +645,37 @@ Status CheckpointImpl::ExportFilesInMetaData(
return s;
}
Status CheckpointImpl::CopyOptionsFile(const std::string& src_file,
const std::string& target_file,
const std::string& db_log_dir,
const std::string& wal_dir) {
Status s;
DBOptions src_db_options;
std::vector<ColumnFamilyDescriptor> src_cf_descs;
s = LoadOptionsFromFile(ConfigOptions(), src_file, &src_db_options,
&src_cf_descs);
if (!s.ok()) {
return s;
}
// Override these 2 options
src_db_options.db_log_dir = db_log_dir;
src_db_options.wal_dir = wal_dir;
std::vector<std::string> src_cf_names;
std::vector<ColumnFamilyOptions> src_cf_opts;
src_cf_names.reserve(src_cf_descs.size());
src_cf_opts.reserve(src_cf_descs.size());
for (ColumnFamilyDescriptor desc : src_cf_descs) {
src_cf_names.push_back(desc.name);
src_cf_opts.push_back(desc.options);
}
return PersistRocksDBOptions(src_db_options, src_cf_names, src_cf_opts,
target_file, db_->GetFileSystem());
}
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE

@ -20,7 +20,9 @@ class CheckpointImpl : public Checkpoint {
Status CreateCheckpoint(const std::string& checkpoint_dir,
uint64_t log_size_for_flush,
uint64_t* sequence_number_ptr) override;
uint64_t* sequence_number_ptr,
const std::string& db_log_dir,
const std::string& wal_dir) override;
Status ExportColumnFamily(ColumnFamilyHandle* handle,
const std::string& export_dir,
@ -57,6 +59,11 @@ class CheckpointImpl : public Checkpoint {
const std::string& fname)>
copy_file_cb);
Status CopyOptionsFile(const std::string& src_file,
const std::string& target_file,
const std::string& db_log_dir,
const std::string& wal_dir);
private:
DB* db_;
};

@ -14,16 +14,21 @@
#ifndef OS_WIN
#include <unistd.h>
#endif
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>
#include <utility>
#include <vector>
#include "db/db_impl/db_impl.h"
#include "file/file_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/utilities/options_util.h"
#include "rocksdb/utilities/transaction_db.h"
#include "test_util/sync_point.h"
#include "test_util/testharness.h"
@ -259,6 +264,12 @@ class CheckpointTest : public testing::Test {
}
return result;
}
static std::string IntToFixedWidthString(size_t i, int len) {
std::stringstream ss;
ss << std::setw(len) << std::setfill('0') << i;
return ss.str();
}
};
TEST_F(CheckpointTest, GetSnapshotLink) {
@ -902,6 +913,144 @@ TEST_F(CheckpointTest, CheckpointReadOnlyDBWithMultipleColumnFamilies) {
delete snapshot_db;
}
TEST_F(CheckpointTest, CheckpointWithOptionsDirsTest) {
// If the checkpoint and the source db share the same wal_dir, files may be
// corrupted if both write to or delete from the same wal_dir. db_log_dir
// should also be updated during checkpointing, but it is less important since
// log files are not copied or linked to the checkpoint.
// 8 bytes key, 1 kB record, 4 kB MemTable. Each batch should trigger 25
// flushes
const int key_len = 8;
const int value_len = 1016;
const size_t num_keys = 100;
const size_t buffer_size = 4096;
std::string value(value_len, ' ');
std::vector<std::string> dirs1 = {"", "", "", ""};
std::vector<std::string> dirs2 = {"/logs", "/wal", "", ""};
std::vector<std::string> dirs3 = {"/logs", "/wal", "/logs", "/wal"};
for (auto dirs : {dirs1, dirs2, dirs3}) {
std::string src_log_dir = dirs[0].empty() ? "" : dbname_ + dirs[0];
std::string src_wal_dir = dirs[1].empty() ? "" : dbname_ + dirs[1];
std::string snap_log_dir = dirs[2].empty() ? "" : snapshot_name_ + dirs[2];
std::string snap_wal_dir = dirs[3].empty() ? "" : snapshot_name_ + dirs[3];
Options src_opts = CurrentOptions();
WriteOptions w_opts;
ReadOptions r_opts;
DB* snapshotDB;
Checkpoint* checkpoint;
src_opts = CurrentOptions();
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, src_opts));
// Create a database
src_opts.create_if_missing = true;
src_opts.write_buffer_size = buffer_size;
src_opts.OptimizeUniversalStyleCompaction(buffer_size);
src_opts.db_log_dir = src_log_dir;
src_opts.wal_dir = src_wal_dir;
ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
// Write to src db
for (size_t i = 1; i <= num_keys; i++) {
ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
}
// Take a snapshot
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_, 0, nullptr,
snap_log_dir, snap_wal_dir));
// Write to src db again
for (size_t i = num_keys + 1; i <= num_keys * 2; i++) {
ASSERT_OK(db_->Put(w_opts, IntToFixedWidthString(i, key_len), value));
}
std::string result;
std::string key1 = IntToFixedWidthString(num_keys, key_len);
std::string key2 = IntToFixedWidthString(num_keys * 2, key_len);
std::string key3 = IntToFixedWidthString(num_keys * 3, key_len);
ASSERT_OK(db_->Get(r_opts, key1, &result));
ASSERT_OK(db_->Get(r_opts, key2, &result));
// Open snapshot with its own options
DBOptions snap_opts;
std::vector<ColumnFamilyDescriptor> snap_cfs;
ASSERT_OK(LoadLatestOptions(ConfigOptions(), snapshot_name_, &snap_opts,
&snap_cfs));
ASSERT_EQ(snap_opts.db_log_dir, snap_log_dir);
ASSERT_EQ(snap_opts.wal_dir,
snap_wal_dir.empty() ? snapshot_name_ : snap_wal_dir);
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(
DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
for (ColumnFamilyHandle* handle : handles) {
delete handle;
}
handles.clear();
ASSERT_OK(snapshotDB->Get(r_opts, key1, &result));
ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
// Write to snapshot
for (size_t i = num_keys * 2 + 1; i <= num_keys * 3; i++) {
ASSERT_OK(
snapshotDB->Put(w_opts, IntToFixedWidthString(i, key_len), value));
}
ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
// Close and reopen the snapshot
delete snapshotDB;
ASSERT_OK(
DB::Open(snap_opts, snapshot_name_, snap_cfs, &handles, &snapshotDB));
for (ColumnFamilyHandle* handle : handles) {
delete handle;
}
handles.clear();
ASSERT_TRUE(snapshotDB->Get(r_opts, key2, &result).IsNotFound());
ASSERT_OK(snapshotDB->Get(r_opts, key3, &result));
delete snapshotDB;
// Close and reopen the source db
delete db_;
src_opts.create_if_missing = false;
ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
ASSERT_OK(db_->Get(r_opts, key2, &result));
ASSERT_TRUE(db_->Get(r_opts, key3, &result).IsNotFound());
delete db_;
// Delete the snapshot
Options del_opts;
del_opts.db_log_dir = snap_opts.db_log_dir;
del_opts.wal_dir = snap_opts.wal_dir;
ASSERT_OK(DestroyDB(snapshot_name_, del_opts));
// Reopen the source db again
ASSERT_OK(DB::Open(src_opts, dbname_, &db_));
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, src_opts));
dbname_ = test::PerThreadDBPath(env_, "db_test");
delete checkpoint;
}
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

Loading…
Cancel
Save