Backup engine support for direct I/O reads (#4640)

Summary:
Use the `DBOptions` that the backup engine already holds to figure out the right `EnvOptions` to use when reading the DB files. This means that, if a user opened a DB instance with `use_direct_reads=true`, then using `BackupEngine` to back up that DB instance will use direct I/O to read files when calculating checksums and copying. Currently the WALs and manifests would still be read using buffered I/O to prevent mixing direct I/O reads with concurrent buffered I/O writes.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4640

Differential Revision: D13015268

Pulled By: ajkr

fbshipit-source-id: 77006ad6f3e00ce58374ca4793b785eea0db6269
main
Andrew Kryczka 6 years ago committed by Facebook Github Bot
parent b313019326
commit ea9454700a
  1. 3
      HISTORY.md
  2. 14
      db/db_test_util.cc
  3. 16
      util/testutil.cc
  4. 2
      util/testutil.h
  5. 120
      utilities/backupable/backupable_db.cc
  6. 111
      utilities/backupable/backupable_db_test.cc

@ -11,6 +11,9 @@
* Added "rocksdb.min-obsolete-sst-number-to-keep" DB property that reports the lower bound on SST file numbers that are being kept from deletion, even if the SSTs are obsolete.
* Add xxhash64 checksum support
### Public API Change
* `DBOptions::use_direct_reads` now affects reads issued by `BackupEngine` on the database's SSTs.
### Bug Fixes
* Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set.
* Fix in-memory range tombstone truncation to avoid erroneously covering newer keys at a lower level, and include range tombstones in compacted files whose largest key is the range tombstone's start key.

@ -665,19 +665,7 @@ Status DBTestBase::TryReopen(const Options& options) {
}
bool DBTestBase::IsDirectIOSupported() {
EnvOptions env_options;
env_options.use_mmap_writes = false;
env_options.use_direct_writes = true;
std::string tmp = TempFileName(dbname_, 999);
Status s;
{
std::unique_ptr<WritableFile> file;
s = env_->NewWritableFile(tmp, &file, env_options);
}
if (s.ok()) {
s = env_->DeleteFile(tmp);
}
return s.ok();
return test::IsDirectIOSupported(env_, dbname_);
}
bool DBTestBase::IsMemoryMappedAccessSupported() const {

@ -401,5 +401,21 @@ Status DestroyDir(Env* env, const std::string& dir) {
return s;
}
bool IsDirectIOSupported(Env* env, const std::string& dir) {
EnvOptions env_options;
env_options.use_mmap_writes = false;
env_options.use_direct_writes = true;
std::string tmp = TempFileName(dir, 999);
Status s;
{
std::unique_ptr<WritableFile> file;
s = env->NewWritableFile(tmp, &file, env_options);
}
if (s.ok()) {
s = env->DeleteFile(tmp);
}
return s.ok();
}
} // namespace test
} // namespace rocksdb

@ -748,5 +748,7 @@ std::string RandomName(Random* rnd, const size_t len);
Status DestroyDir(Env* env, const std::string& dir);
bool IsDirectIOSupported(Env* env, const std::string& dir);
} // namespace test
} // namespace rocksdb

@ -305,16 +305,16 @@ class BackupEngineImpl : public BackupEngine {
// @param contents If non-empty, the file will be created with these contents.
Status CopyOrCreateFile(const std::string& src, const std::string& dst,
const std::string& contents, Env* src_env,
Env* dst_env, bool sync, RateLimiter* rate_limiter,
Env* dst_env, const EnvOptions& src_env_options,
bool sync, RateLimiter* rate_limiter,
uint64_t* size = nullptr,
uint32_t* checksum_value = nullptr,
uint64_t size_limit = 0,
std::function<void()> progress_callback = []() {});
Status CalculateChecksum(const std::string& src,
Env* src_env,
uint64_t size_limit,
uint32_t* checksum_value);
Status CalculateChecksum(const std::string& src, Env* src_env,
const EnvOptions& src_env_options,
uint64_t size_limit, uint32_t* checksum_value);
struct CopyOrCreateResult {
uint64_t size;
@ -331,6 +331,7 @@ class BackupEngineImpl : public BackupEngine {
std::string contents;
Env* src_env;
Env* dst_env;
EnvOptions src_env_options;
bool sync;
RateLimiter* rate_limiter;
uint64_t size_limit;
@ -338,14 +339,15 @@ class BackupEngineImpl : public BackupEngine {
std::function<void()> progress_callback;
CopyOrCreateWorkItem()
: src_path(""),
dst_path(""),
contents(""),
src_env(nullptr),
dst_env(nullptr),
sync(false),
rate_limiter(nullptr),
size_limit(0) {}
: src_path(""),
dst_path(""),
contents(""),
src_env(nullptr),
dst_env(nullptr),
src_env_options(),
sync(false),
rate_limiter(nullptr),
size_limit(0) {}
CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete;
CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete;
@ -360,6 +362,7 @@ class BackupEngineImpl : public BackupEngine {
contents = std::move(o.contents);
src_env = o.src_env;
dst_env = o.dst_env;
src_env_options = std::move(o.src_env_options);
sync = o.sync;
rate_limiter = o.rate_limiter;
size_limit = o.size_limit;
@ -370,14 +373,15 @@ class BackupEngineImpl : public BackupEngine {
CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path,
std::string _contents, Env* _src_env, Env* _dst_env,
bool _sync, RateLimiter* _rate_limiter,
uint64_t _size_limit,
EnvOptions _src_env_options, bool _sync,
RateLimiter* _rate_limiter, uint64_t _size_limit,
std::function<void()> _progress_callback = []() {})
: src_path(std::move(_src_path)),
dst_path(std::move(_dst_path)),
contents(std::move(_contents)),
src_env(_src_env),
dst_env(_dst_env),
src_env_options(std::move(_src_env_options)),
sync(_sync),
rate_limiter(_rate_limiter),
size_limit(_size_limit),
@ -471,7 +475,8 @@ class BackupEngineImpl : public BackupEngine {
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& fname, // starts with "/"
RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit = 0,
const EnvOptions& src_env_options, RateLimiter* rate_limiter,
uint64_t size_bytes, uint64_t size_limit = 0,
bool shared_checksum = false,
std::function<void()> progress_callback = []() {},
const std::string& contents = std::string());
@ -723,9 +728,10 @@ Status BackupEngineImpl::Initialize() {
CopyOrCreateResult result;
result.status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
work_item.src_env, work_item.dst_env, work_item.sync,
work_item.rate_limiter, &result.size, &result.checksum_value,
work_item.size_limit, work_item.progress_callback);
work_item.src_env, work_item.dst_env, work_item.src_env_options,
work_item.sync, work_item.rate_limiter, &result.size,
&result.checksum_value, work_item.size_limit,
work_item.progress_callback);
work_item.result.set_value(std::move(result));
}
});
@ -796,8 +802,10 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
if (s.ok()) {
CheckpointImpl checkpoint(db);
uint64_t sequence_number = 0;
DBOptions db_options = db->GetDBOptions();
EnvOptions src_raw_env_options(db_options);
s = checkpoint.CreateCustomCheckpoint(
db->GetDBOptions(),
db_options,
[&](const std::string& /*src_dirname*/, const std::string& /*fname*/,
FileType) {
// custom checkpoint will switch to calling copy_file_cb after it sees
@ -815,11 +823,33 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
if (type == kTableFile) {
st = db_env_->GetFileSize(src_dirname + fname, &size_bytes);
}
EnvOptions src_env_options;
switch (type) {
case kLogFile:
src_env_options =
db_env_->OptimizeForLogRead(src_raw_env_options);
break;
case kTableFile:
src_env_options = db_env_->OptimizeForCompactionTableRead(
src_raw_env_options, ImmutableDBOptions(db_options));
break;
case kDescriptorFile:
src_env_options =
db_env_->OptimizeForManifestRead(src_raw_env_options);
break;
default:
// Other backed up files (like options file) are not read by live
// DB, so don't need to worry about avoiding mixing buffered and
// direct I/O. Just use plain defaults.
src_env_options = src_raw_env_options;
break;
}
if (st.ok()) {
st = AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
options_.share_table_files && type == kTableFile, src_dirname,
fname, rate_limiter, size_bytes, size_limit_bytes,
fname, src_env_options, rate_limiter, size_bytes,
size_limit_bytes,
options_.share_files_with_checksum && type == kTableFile,
progress_callback);
}
@ -829,8 +859,9 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
Log(options_.info_log, "add file for backup %s", fname.c_str());
return AddBackupFileWorkItem(
live_dst_paths, backup_items_to_finish, new_backup_id,
false /* shared */, "" /* src_dir */, fname, rate_limiter,
contents.size(), 0 /* size_limit */, false /* shared_checksum */,
false /* shared */, "" /* src_dir */, fname,
EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
0 /* size_limit */, false /* shared_checksum */,
progress_callback, contents);
} /* create_file_cb */,
&sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
@ -1114,7 +1145,8 @@ Status BackupEngineImpl::RestoreDBFromBackup(
dst.c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_,
false, rate_limiter, 0 /* size_limit */);
EnvOptions() /* src_env_options */, false, rate_limiter,
0 /* size_limit */);
RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(),
file_info->checksum_value);
@ -1183,15 +1215,15 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id) {
Status BackupEngineImpl::CopyOrCreateFile(
const std::string& src, const std::string& dst, const std::string& contents,
Env* src_env, Env* dst_env, bool sync, RateLimiter* rate_limiter,
uint64_t* size, uint32_t* checksum_value, uint64_t size_limit,
std::function<void()> progress_callback) {
Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync,
RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value,
uint64_t size_limit, std::function<void()> progress_callback) {
assert(src.empty() != contents.empty());
Status s;
std::unique_ptr<WritableFile> dst_file;
std::unique_ptr<SequentialFile> src_file;
EnvOptions env_options;
env_options.use_mmap_writes = false;
EnvOptions dst_env_options;
dst_env_options.use_mmap_writes = false;
// TODO:(gzh) maybe use direct reads/writes here if possible
if (size != nullptr) {
*size = 0;
@ -1205,16 +1237,16 @@ Status BackupEngineImpl::CopyOrCreateFile(
size_limit = std::numeric_limits<uint64_t>::max();
}
s = dst_env->NewWritableFile(dst, &dst_file, env_options);
s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options);
if (s.ok() && !src.empty()) {
s = src_env->NewSequentialFile(src, &src_file, env_options);
s = src_env->NewSequentialFile(src, &src_file, src_env_options);
}
if (!s.ok()) {
return s;
}
std::unique_ptr<WritableFileWriter> dest_writer(
new WritableFileWriter(std::move(dst_file), dst, env_options));
new WritableFileWriter(std::move(dst_file), dst, dst_env_options));
std::unique_ptr<SequentialFileReader> src_reader;
std::unique_ptr<char[]> buf;
if (!src.empty()) {
@ -1276,9 +1308,10 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
std::unordered_set<std::string>& live_dst_paths,
std::vector<BackupAfterCopyOrCreateWorkItem>& backup_items_to_finish,
BackupID backup_id, bool shared, const std::string& src_dir,
const std::string& fname, RateLimiter* rate_limiter, uint64_t size_bytes,
uint64_t size_limit, bool shared_checksum,
std::function<void()> progress_callback, const std::string& contents) {
const std::string& fname, const EnvOptions& src_env_options,
RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit,
bool shared_checksum, std::function<void()> progress_callback,
const std::string& contents) {
assert(!fname.empty() && fname[0] == '/');
assert(contents.empty() != src_dir.empty());
@ -1289,7 +1322,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
if (shared && shared_checksum) {
// add checksum and file length to the file name
s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit,
&checksum_value);
if (!s.ok()) {
return s;
@ -1365,8 +1398,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// the file is present and referenced by a backup
ROCKS_LOG_INFO(options_.info_log,
"%s already present, calculate checksum", fname.c_str());
s = CalculateChecksum(src_dir + fname, db_env_, size_limit,
&checksum_value);
s = CalculateChecksum(src_dir + fname, db_env_, src_env_options,
size_limit, &checksum_value);
}
}
live_dst_paths.insert(final_dest_path);
@ -1376,8 +1409,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
copy_dest_path->c_str());
CopyOrCreateWorkItem copy_or_create_work_item(
src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents,
db_env_, backup_env_, options_.sync, rate_limiter, size_limit,
progress_callback);
db_env_, backup_env_, src_env_options, options_.sync, rate_limiter,
size_limit, progress_callback);
BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item(
copy_or_create_work_item.result.get_future(), shared, need_to_copy,
backup_env_, temp_dest_path, final_dest_path, dst_relative);
@ -1399,6 +1432,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
}
Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
const EnvOptions& src_env_options,
uint64_t size_limit,
uint32_t* checksum_value) {
*checksum_value = 0;
@ -1406,12 +1440,8 @@ Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env,
size_limit = std::numeric_limits<uint64_t>::max();
}
EnvOptions env_options;
env_options.use_mmap_writes = false;
env_options.use_direct_reads = false;
std::unique_ptr<SequentialFile> src_file;
Status s = src_env->NewSequentialFile(src, &src_file, env_options);
Status s = src_env->NewSequentialFile(src, &src_file, src_env_options);
if (!s.ok()) {
return s;
}

@ -188,7 +188,14 @@ class TestEnv : public EnvWrapper {
new TestEnv::DummySequentialFile(dummy_sequential_file_fail_reads_));
return Status::OK();
} else {
return EnvWrapper::NewSequentialFile(f, r, options);
Status s = EnvWrapper::NewSequentialFile(f, r, options);
if (s.ok()) {
if ((*r)->use_direct_io()) {
++num_direct_seq_readers_;
}
++num_seq_readers_;
}
return s;
}
}
@ -200,7 +207,28 @@ class TestEnv : public EnvWrapper {
return Status::NotSupported("Sorry, can't do this");
}
limit_written_files_--;
return EnvWrapper::NewWritableFile(f, r, options);
Status s = EnvWrapper::NewWritableFile(f, r, options);
if (s.ok()) {
if ((*r)->use_direct_io()) {
++num_direct_writers_;
}
++num_writers_;
}
return s;
}
virtual Status NewRandomAccessFile(const std::string& fname,
unique_ptr<RandomAccessFile>* result,
const EnvOptions& options) override {
MutexLock l(&mutex_);
Status s = EnvWrapper::NewRandomAccessFile(fname, result, options);
if (s.ok()) {
if ((*result)->use_direct_io()) {
++num_direct_rand_readers_;
}
++num_rand_readers_;
}
return s;
}
virtual Status DeleteFile(const std::string& fname) override {
@ -316,6 +344,23 @@ class TestEnv : public EnvWrapper {
return EnvWrapper::NewDirectory(name, result);
}
void ClearFileOpenCounters() {
MutexLock l(&mutex_);
num_rand_readers_ = 0;
num_direct_rand_readers_ = 0;
num_seq_readers_ = 0;
num_direct_seq_readers_ = 0;
num_writers_ = 0;
num_direct_writers_ = 0;
}
int num_rand_readers() { return num_rand_readers_; }
int num_direct_rand_readers() { return num_direct_rand_readers_; }
int num_seq_readers() { return num_seq_readers_; }
int num_direct_seq_readers() { return num_direct_seq_readers_; }
int num_writers() { return num_writers_; }
int num_direct_writers() { return num_direct_writers_; }
private:
port::Mutex mutex_;
bool dummy_sequential_file_ = false;
@ -329,6 +374,15 @@ class TestEnv : public EnvWrapper {
bool get_children_failure_ = false;
bool create_dir_if_missing_failure_ = false;
bool new_directory_failure_ = false;
// Keeps track of how many files of each type were successfully opened, and
// out of those, how many were opened with direct I/O.
std::atomic<int> num_rand_readers_;
std::atomic<int> num_direct_rand_readers_;
std::atomic<int> num_seq_readers_;
std::atomic<int> num_direct_seq_readers_;
std::atomic<int> num_writers_;
std::atomic<int> num_direct_writers_;
}; // TestEnv
class FileManager : public EnvWrapper {
@ -1634,6 +1688,59 @@ TEST_F(BackupableDBTest, WriteOnlyEngineNoSharedFileDeletion) {
AssertBackupConsistency(i + 1, 0, (i + 1) * kNumKeys);
}
}
TEST_P(BackupableDBTestWithParam, BackupUsingDirectIO) {
// Tests direct I/O on the backup engine's reads and writes on the DB env and
// backup env
// We use ChrootEnv underneath so the below line checks for direct I/O support
// in the chroot directory, not the true filesystem root.
if (!test::IsDirectIOSupported(test_db_env_.get(), "/")) {
return;
}
const int kNumKeysPerBackup = 100;
const int kNumBackups = 3;
options_.use_direct_reads = true;
OpenDBAndBackupEngine(true /* destroy_old_data */);
for (int i = 0; i < kNumBackups; ++i) {
FillDB(db_.get(), i * kNumKeysPerBackup /* from */,
(i + 1) * kNumKeysPerBackup /* to */);
ASSERT_OK(db_->Flush(FlushOptions()));
// Clear the file open counters and then do a bunch of backup engine ops.
// For all ops, files should be opened in direct mode.
test_backup_env_->ClearFileOpenCounters();
test_db_env_->ClearFileOpenCounters();
CloseBackupEngine();
OpenBackupEngine();
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
false /* flush_before_backup */));
ASSERT_OK(backup_engine_->VerifyBackup(i + 1));
CloseBackupEngine();
OpenBackupEngine();
std::vector<BackupInfo> backup_infos;
backup_engine_->GetBackupInfo(&backup_infos);
ASSERT_EQ(static_cast<size_t>(i + 1), backup_infos.size());
// Verify backup engine always opened files with direct I/O
ASSERT_EQ(0, test_db_env_->num_writers());
ASSERT_EQ(0, test_db_env_->num_rand_readers());
ASSERT_GT(test_db_env_->num_direct_seq_readers(), 0);
// Currently the DB doesn't support reading WALs or manifest with direct
// I/O, so subtract two.
ASSERT_EQ(test_db_env_->num_seq_readers() - 2,
test_db_env_->num_direct_seq_readers());
ASSERT_EQ(0, test_db_env_->num_rand_readers());
}
CloseDBAndBackupEngine();
for (int i = 0; i < kNumBackups; ++i) {
AssertBackupConsistency(i + 1 /* backup_id */,
i * kNumKeysPerBackup /* start_exist */,
(i + 1) * kNumKeysPerBackup /* end_exist */,
(i + 2) * kNumKeysPerBackup /* end */);
}
}
} // anon namespace
} // namespace rocksdb

Loading…
Cancel
Save