diff --git a/HISTORY.md b/HISTORY.md index 6bff7668b..9cc5b7f58 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -11,6 +11,9 @@ * Added "rocksdb.min-obsolete-sst-number-to-keep" DB property that reports the lower bound on SST file numbers that are being kept from deletion, even if the SSTs are obsolete. * Add xxhash64 checksum support +### Public API Change +* `DBOptions::use_direct_reads` now affects reads issued by `BackupEngine` on the database's SSTs. + ### Bug Fixes * Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set. * Fix in-memory range tombstone truncation to avoid erroneously covering newer keys at a lower level, and include range tombstones in compacted files whose largest key is the range tombstone's start key. diff --git a/db/db_test_util.cc b/db/db_test_util.cc index a0ee69c94..fd4d44144 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -665,19 +665,7 @@ Status DBTestBase::TryReopen(const Options& options) { } bool DBTestBase::IsDirectIOSupported() { - EnvOptions env_options; - env_options.use_mmap_writes = false; - env_options.use_direct_writes = true; - std::string tmp = TempFileName(dbname_, 999); - Status s; - { - std::unique_ptr file; - s = env_->NewWritableFile(tmp, &file, env_options); - } - if (s.ok()) { - s = env_->DeleteFile(tmp); - } - return s.ok(); + return test::IsDirectIOSupported(env_, dbname_); } bool DBTestBase::IsMemoryMappedAccessSupported() const { diff --git a/util/testutil.cc b/util/testutil.cc index b1a79fe93..2f8e31cd5 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -401,5 +401,21 @@ Status DestroyDir(Env* env, const std::string& dir) { return s; } +bool IsDirectIOSupported(Env* env, const std::string& dir) { + EnvOptions env_options; + env_options.use_mmap_writes = false; + env_options.use_direct_writes = true; + std::string tmp = TempFileName(dir, 999); + Status s; + { + std::unique_ptr file; + s = env->NewWritableFile(tmp, &file, env_options); + } + if (s.ok()) { + s = env->DeleteFile(tmp); + } + return s.ok(); +} + } // namespace test } // namespace rocksdb diff --git a/util/testutil.h b/util/testutil.h index c8c7d60d2..2aab3df72 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -748,5 +748,7 @@ std::string RandomName(Random* rnd, const size_t len); Status DestroyDir(Env* env, const std::string& dir); +bool IsDirectIOSupported(Env* env, const std::string& dir); + } // namespace test } // namespace rocksdb diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index e51b724c9..78def188c 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -305,16 +305,16 @@ class BackupEngineImpl : public BackupEngine { // @param contents If non-empty, the file will be created with these contents. Status CopyOrCreateFile(const std::string& src, const std::string& dst, const std::string& contents, Env* src_env, - Env* dst_env, bool sync, RateLimiter* rate_limiter, + Env* dst_env, const EnvOptions& src_env_options, + bool sync, RateLimiter* rate_limiter, uint64_t* size = nullptr, uint32_t* checksum_value = nullptr, uint64_t size_limit = 0, std::function progress_callback = []() {}); - Status CalculateChecksum(const std::string& src, - Env* src_env, - uint64_t size_limit, - uint32_t* checksum_value); + Status CalculateChecksum(const std::string& src, Env* src_env, + const EnvOptions& src_env_options, + uint64_t size_limit, uint32_t* checksum_value); struct CopyOrCreateResult { uint64_t size; @@ -331,6 +331,7 @@ class BackupEngineImpl : public BackupEngine { std::string contents; Env* src_env; Env* dst_env; + EnvOptions src_env_options; bool sync; RateLimiter* rate_limiter; uint64_t size_limit; @@ -338,14 +339,15 @@ class BackupEngineImpl : public BackupEngine { std::function progress_callback; CopyOrCreateWorkItem() - : src_path(""), - dst_path(""), - contents(""), - src_env(nullptr), - dst_env(nullptr), - sync(false), - rate_limiter(nullptr), - size_limit(0) {} + : src_path(""), + dst_path(""), + contents(""), + src_env(nullptr), + dst_env(nullptr), + src_env_options(), + sync(false), + rate_limiter(nullptr), + size_limit(0) {} CopyOrCreateWorkItem(const CopyOrCreateWorkItem&) = delete; CopyOrCreateWorkItem& operator=(const CopyOrCreateWorkItem&) = delete; @@ -360,6 +362,7 @@ class BackupEngineImpl : public BackupEngine { contents = std::move(o.contents); src_env = o.src_env; dst_env = o.dst_env; + src_env_options = std::move(o.src_env_options); sync = o.sync; rate_limiter = o.rate_limiter; size_limit = o.size_limit; @@ -370,14 +373,15 @@ class BackupEngineImpl : public BackupEngine { CopyOrCreateWorkItem(std::string _src_path, std::string _dst_path, std::string _contents, Env* _src_env, Env* _dst_env, - bool _sync, RateLimiter* _rate_limiter, - uint64_t _size_limit, + EnvOptions _src_env_options, bool _sync, + RateLimiter* _rate_limiter, uint64_t _size_limit, std::function _progress_callback = []() {}) : src_path(std::move(_src_path)), dst_path(std::move(_dst_path)), contents(std::move(_contents)), src_env(_src_env), dst_env(_dst_env), + src_env_options(std::move(_src_env_options)), sync(_sync), rate_limiter(_rate_limiter), size_limit(_size_limit), @@ -471,7 +475,8 @@ class BackupEngineImpl : public BackupEngine { std::vector& backup_items_to_finish, BackupID backup_id, bool shared, const std::string& src_dir, const std::string& fname, // starts with "/" - RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit = 0, + const EnvOptions& src_env_options, RateLimiter* rate_limiter, + uint64_t size_bytes, uint64_t size_limit = 0, bool shared_checksum = false, std::function progress_callback = []() {}, const std::string& contents = std::string()); @@ -723,9 +728,10 @@ Status BackupEngineImpl::Initialize() { CopyOrCreateResult result; result.status = CopyOrCreateFile( work_item.src_path, work_item.dst_path, work_item.contents, - work_item.src_env, work_item.dst_env, work_item.sync, - work_item.rate_limiter, &result.size, &result.checksum_value, - work_item.size_limit, work_item.progress_callback); + work_item.src_env, work_item.dst_env, work_item.src_env_options, + work_item.sync, work_item.rate_limiter, &result.size, + &result.checksum_value, work_item.size_limit, + work_item.progress_callback); work_item.result.set_value(std::move(result)); } }); @@ -796,8 +802,10 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( if (s.ok()) { CheckpointImpl checkpoint(db); uint64_t sequence_number = 0; + DBOptions db_options = db->GetDBOptions(); + EnvOptions src_raw_env_options(db_options); s = checkpoint.CreateCustomCheckpoint( - db->GetDBOptions(), + db_options, [&](const std::string& /*src_dirname*/, const std::string& /*fname*/, FileType) { // custom checkpoint will switch to calling copy_file_cb after it sees @@ -815,11 +823,33 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( if (type == kTableFile) { st = db_env_->GetFileSize(src_dirname + fname, &size_bytes); } + EnvOptions src_env_options; + switch (type) { + case kLogFile: + src_env_options = + db_env_->OptimizeForLogRead(src_raw_env_options); + break; + case kTableFile: + src_env_options = db_env_->OptimizeForCompactionTableRead( + src_raw_env_options, ImmutableDBOptions(db_options)); + break; + case kDescriptorFile: + src_env_options = + db_env_->OptimizeForManifestRead(src_raw_env_options); + break; + default: + // Other backed up files (like options file) are not read by live + // DB, so don't need to worry about avoiding mixing buffered and + // direct I/O. Just use plain defaults. + src_env_options = src_raw_env_options; + break; + } if (st.ok()) { st = AddBackupFileWorkItem( live_dst_paths, backup_items_to_finish, new_backup_id, options_.share_table_files && type == kTableFile, src_dirname, - fname, rate_limiter, size_bytes, size_limit_bytes, + fname, src_env_options, rate_limiter, size_bytes, + size_limit_bytes, options_.share_files_with_checksum && type == kTableFile, progress_callback); } @@ -829,8 +859,9 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata( Log(options_.info_log, "add file for backup %s", fname.c_str()); return AddBackupFileWorkItem( live_dst_paths, backup_items_to_finish, new_backup_id, - false /* shared */, "" /* src_dir */, fname, rate_limiter, - contents.size(), 0 /* size_limit */, false /* shared_checksum */, + false /* shared */, "" /* src_dir */, fname, + EnvOptions() /* src_env_options */, rate_limiter, contents.size(), + 0 /* size_limit */, false /* shared_checksum */, progress_callback, contents); } /* create_file_cb */, &sequence_number, flush_before_backup ? 0 : port::kMaxUint64); @@ -1114,7 +1145,8 @@ Status BackupEngineImpl::RestoreDBFromBackup( dst.c_str()); CopyOrCreateWorkItem copy_or_create_work_item( GetAbsolutePath(file), dst, "" /* contents */, backup_env_, db_env_, - false, rate_limiter, 0 /* size_limit */); + EnvOptions() /* src_env_options */, false, rate_limiter, + 0 /* size_limit */); RestoreAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), file_info->checksum_value); @@ -1183,15 +1215,15 @@ Status BackupEngineImpl::VerifyBackup(BackupID backup_id) { Status BackupEngineImpl::CopyOrCreateFile( const std::string& src, const std::string& dst, const std::string& contents, - Env* src_env, Env* dst_env, bool sync, RateLimiter* rate_limiter, - uint64_t* size, uint32_t* checksum_value, uint64_t size_limit, - std::function progress_callback) { + Env* src_env, Env* dst_env, const EnvOptions& src_env_options, bool sync, + RateLimiter* rate_limiter, uint64_t* size, uint32_t* checksum_value, + uint64_t size_limit, std::function progress_callback) { assert(src.empty() != contents.empty()); Status s; std::unique_ptr dst_file; std::unique_ptr src_file; - EnvOptions env_options; - env_options.use_mmap_writes = false; + EnvOptions dst_env_options; + dst_env_options.use_mmap_writes = false; // TODO:(gzh) maybe use direct reads/writes here if possible if (size != nullptr) { *size = 0; @@ -1205,16 +1237,16 @@ Status BackupEngineImpl::CopyOrCreateFile( size_limit = std::numeric_limits::max(); } - s = dst_env->NewWritableFile(dst, &dst_file, env_options); + s = dst_env->NewWritableFile(dst, &dst_file, dst_env_options); if (s.ok() && !src.empty()) { - s = src_env->NewSequentialFile(src, &src_file, env_options); + s = src_env->NewSequentialFile(src, &src_file, src_env_options); } if (!s.ok()) { return s; } std::unique_ptr dest_writer( - new WritableFileWriter(std::move(dst_file), dst, env_options)); + new WritableFileWriter(std::move(dst_file), dst, dst_env_options)); std::unique_ptr src_reader; std::unique_ptr buf; if (!src.empty()) { @@ -1276,9 +1308,10 @@ Status BackupEngineImpl::AddBackupFileWorkItem( std::unordered_set& live_dst_paths, std::vector& backup_items_to_finish, BackupID backup_id, bool shared, const std::string& src_dir, - const std::string& fname, RateLimiter* rate_limiter, uint64_t size_bytes, - uint64_t size_limit, bool shared_checksum, - std::function progress_callback, const std::string& contents) { + const std::string& fname, const EnvOptions& src_env_options, + RateLimiter* rate_limiter, uint64_t size_bytes, uint64_t size_limit, + bool shared_checksum, std::function progress_callback, + const std::string& contents) { assert(!fname.empty() && fname[0] == '/'); assert(contents.empty() != src_dir.empty()); @@ -1289,7 +1322,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem( if (shared && shared_checksum) { // add checksum and file length to the file name - s = CalculateChecksum(src_dir + fname, db_env_, size_limit, + s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, size_limit, &checksum_value); if (!s.ok()) { return s; @@ -1365,8 +1398,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem( // the file is present and referenced by a backup ROCKS_LOG_INFO(options_.info_log, "%s already present, calculate checksum", fname.c_str()); - s = CalculateChecksum(src_dir + fname, db_env_, size_limit, - &checksum_value); + s = CalculateChecksum(src_dir + fname, db_env_, src_env_options, + size_limit, &checksum_value); } } live_dst_paths.insert(final_dest_path); @@ -1376,8 +1409,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem( copy_dest_path->c_str()); CopyOrCreateWorkItem copy_or_create_work_item( src_dir.empty() ? "" : src_dir + fname, *copy_dest_path, contents, - db_env_, backup_env_, options_.sync, rate_limiter, size_limit, - progress_callback); + db_env_, backup_env_, src_env_options, options_.sync, rate_limiter, + size_limit, progress_callback); BackupAfterCopyOrCreateWorkItem after_copy_or_create_work_item( copy_or_create_work_item.result.get_future(), shared, need_to_copy, backup_env_, temp_dest_path, final_dest_path, dst_relative); @@ -1399,6 +1432,7 @@ Status BackupEngineImpl::AddBackupFileWorkItem( } Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, + const EnvOptions& src_env_options, uint64_t size_limit, uint32_t* checksum_value) { *checksum_value = 0; @@ -1406,12 +1440,8 @@ Status BackupEngineImpl::CalculateChecksum(const std::string& src, Env* src_env, size_limit = std::numeric_limits::max(); } - EnvOptions env_options; - env_options.use_mmap_writes = false; - env_options.use_direct_reads = false; - std::unique_ptr src_file; - Status s = src_env->NewSequentialFile(src, &src_file, env_options); + Status s = src_env->NewSequentialFile(src, &src_file, src_env_options); if (!s.ok()) { return s; } diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index 15098570b..26ff00e91 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -188,7 +188,14 @@ class TestEnv : public EnvWrapper { new TestEnv::DummySequentialFile(dummy_sequential_file_fail_reads_)); return Status::OK(); } else { - return EnvWrapper::NewSequentialFile(f, r, options); + Status s = EnvWrapper::NewSequentialFile(f, r, options); + if (s.ok()) { + if ((*r)->use_direct_io()) { + ++num_direct_seq_readers_; + } + ++num_seq_readers_; + } + return s; } } @@ -200,7 +207,28 @@ class TestEnv : public EnvWrapper { return Status::NotSupported("Sorry, can't do this"); } limit_written_files_--; - return EnvWrapper::NewWritableFile(f, r, options); + Status s = EnvWrapper::NewWritableFile(f, r, options); + if (s.ok()) { + if ((*r)->use_direct_io()) { + ++num_direct_writers_; + } + ++num_writers_; + } + return s; + } + + virtual Status NewRandomAccessFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& options) override { + MutexLock l(&mutex_); + Status s = EnvWrapper::NewRandomAccessFile(fname, result, options); + if (s.ok()) { + if ((*result)->use_direct_io()) { + ++num_direct_rand_readers_; + } + ++num_rand_readers_; + } + return s; } virtual Status DeleteFile(const std::string& fname) override { @@ -316,6 +344,23 @@ class TestEnv : public EnvWrapper { return EnvWrapper::NewDirectory(name, result); } + void ClearFileOpenCounters() { + MutexLock l(&mutex_); + num_rand_readers_ = 0; + num_direct_rand_readers_ = 0; + num_seq_readers_ = 0; + num_direct_seq_readers_ = 0; + num_writers_ = 0; + num_direct_writers_ = 0; + } + + int num_rand_readers() { return num_rand_readers_; } + int num_direct_rand_readers() { return num_direct_rand_readers_; } + int num_seq_readers() { return num_seq_readers_; } + int num_direct_seq_readers() { return num_direct_seq_readers_; } + int num_writers() { return num_writers_; } + int num_direct_writers() { return num_direct_writers_; } + private: port::Mutex mutex_; bool dummy_sequential_file_ = false; @@ -329,6 +374,15 @@ class TestEnv : public EnvWrapper { bool get_children_failure_ = false; bool create_dir_if_missing_failure_ = false; bool new_directory_failure_ = false; + + // Keeps track of how many files of each type were successfully opened, and + // out of those, how many were opened with direct I/O. + std::atomic num_rand_readers_; + std::atomic num_direct_rand_readers_; + std::atomic num_seq_readers_; + std::atomic num_direct_seq_readers_; + std::atomic num_writers_; + std::atomic num_direct_writers_; }; // TestEnv class FileManager : public EnvWrapper { @@ -1634,6 +1688,59 @@ TEST_F(BackupableDBTest, WriteOnlyEngineNoSharedFileDeletion) { AssertBackupConsistency(i + 1, 0, (i + 1) * kNumKeys); } } + +TEST_P(BackupableDBTestWithParam, BackupUsingDirectIO) { + // Tests direct I/O on the backup engine's reads and writes on the DB env and + // backup env + // We use ChrootEnv underneath so the below line checks for direct I/O support + // in the chroot directory, not the true filesystem root. + if (!test::IsDirectIOSupported(test_db_env_.get(), "/")) { + return; + } + const int kNumKeysPerBackup = 100; + const int kNumBackups = 3; + options_.use_direct_reads = true; + OpenDBAndBackupEngine(true /* destroy_old_data */); + for (int i = 0; i < kNumBackups; ++i) { + FillDB(db_.get(), i * kNumKeysPerBackup /* from */, + (i + 1) * kNumKeysPerBackup /* to */); + ASSERT_OK(db_->Flush(FlushOptions())); + + // Clear the file open counters and then do a bunch of backup engine ops. + // For all ops, files should be opened in direct mode. + test_backup_env_->ClearFileOpenCounters(); + test_db_env_->ClearFileOpenCounters(); + CloseBackupEngine(); + OpenBackupEngine(); + ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(), + false /* flush_before_backup */)); + ASSERT_OK(backup_engine_->VerifyBackup(i + 1)); + CloseBackupEngine(); + OpenBackupEngine(); + std::vector backup_infos; + backup_engine_->GetBackupInfo(&backup_infos); + ASSERT_EQ(static_cast(i + 1), backup_infos.size()); + + // Verify backup engine always opened files with direct I/O + ASSERT_EQ(0, test_db_env_->num_writers()); + ASSERT_EQ(0, test_db_env_->num_rand_readers()); + ASSERT_GT(test_db_env_->num_direct_seq_readers(), 0); + // Currently the DB doesn't support reading WALs or manifest with direct + // I/O, so subtract two. + ASSERT_EQ(test_db_env_->num_seq_readers() - 2, + test_db_env_->num_direct_seq_readers()); + ASSERT_EQ(0, test_db_env_->num_rand_readers()); + } + CloseDBAndBackupEngine(); + + for (int i = 0; i < kNumBackups; ++i) { + AssertBackupConsistency(i + 1 /* backup_id */, + i * kNumKeysPerBackup /* start_exist */, + (i + 1) * kNumKeysPerBackup /* end_exist */, + (i + 2) * kNumKeysPerBackup /* end */); + } +} + } // anon namespace } // namespace rocksdb