Be able to decrease background thread's CPU priority when creating database backup (#6602)

Summary:
When creating a database backup, the background threads will not only consume IO resources by copying files, but also consuming CPU such as by computing checksums. During peak times, the CPU consumption by the background threads might affect online queries.

This PR makes it possible to decrease CPU priority of these threads when creating a new backup.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6602

Test Plan: make check

Reviewed By: siying, zhichao-cao

Differential Revision: D20683216

Pulled By: cheng-chang

fbshipit-source-id: 9978b9ed9488e8ce135e90ca083e5b4b7221fd84
main
Cheng Chang 5 years ago committed by Facebook GitHub Bot
parent 3a35542f86
commit ee50b8d499
  1. 1
      HISTORY.md
  2. 2
      db/db_impl/db_impl.cc
  3. 13
      db/db_impl/db_impl_compaction_flush.cc
  4. 2
      db/error_handler.cc
  5. 10
      db/error_handler_fs_test.cc
  6. 6
      db/version_set_test.cc
  7. 4
      file/filename.cc
  8. 4
      file/filename.h
  9. 7
      include/rocksdb/options.h
  10. 103
      include/rocksdb/utilities/backupable_db.h
  11. 30
      port/port_posix.cc
  12. 5
      port/port_posix.h
  13. 23
      port/win/port_win.cc
  14. 4
      port/win/port_win.h
  15. 8
      table/plain/plain_table_builder.cc
  16. 8
      util/threadpool_imp.cc
  17. 111
      utilities/backupable/backupable_db.cc
  18. 81
      utilities/backupable/backupable_db_test.cc

@ -4,6 +4,7 @@
* Fix spelling so that API now has correctly spelled transaction state name `COMMITTED`, while the old misspelled `COMMITED` is still available as an alias.
* Updated default format_version in BlockBasedTableOptions from 2 to 4. SST files generated with the new default can be read by RocksDB versions 5.16 and newer, and use more efficient encoding of keys in index blocks.
* `Cache::Insert` now expects clients to pass in function objects implementing the `Cache::Deleter` interface as deleters instead of plain function pointers.
* A new parameter `CreateBackupOptions` is added to both `BackupEngine::CreateNewBackup` and `BackupEngine::CreateNewBackupWithMetadata`, you can decrease CPU priority of `BackupEngine`'s background threads by setting `decrease_background_thread_cpu_priority` and `background_thread_cpu_priority` in `CreateBackupOptions`.
### Bug Fixes
* Fix a bug where range tombstone blocks in ingested files were cached incorrectly during ingestion. If range tombstones were read from those incorrectly cached blocks, the keys they covered would be exposed.

@ -312,7 +312,7 @@ Status DBImpl::ResumeImpl() {
}
// Make sure the IO Status stored in version set is set to OK.
if(s.ok()) {
if (s.ok()) {
versions_->SetIOStatusOK();
}

@ -208,7 +208,8 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok()&& !io_s.IsShutdownInProgress() && !io_s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
!io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
} else {
Status new_bg_error = s;
@ -2876,11 +2877,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
// Put this cfd back in the compaction queue so we can retry after some
// time

@ -256,7 +256,7 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
Status bg_err(new_bg_io_err, Status::Severity::kUnrecoverableError);
bg_error_ = bg_err;
EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s,
db_mutex_, &auto_recovery);
db_mutex_, &auto_recovery);
return bg_error_;
} else if (bg_io_err.GetRetryable()) {
// Second, check if the error is a retryable IO error or not. if it is

@ -183,7 +183,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
@ -289,7 +289,7 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
@ -457,7 +457,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
@ -556,7 +556,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
@ -778,7 +778,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteError) {
TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());

@ -1378,7 +1378,8 @@ class EmptyDefaultCfNewManifest : public VersionSetTestBase,
TEST_F(EmptyDefaultCfNewManifest, Recover) {
PrepareManifest(nullptr, nullptr, &log_writer_);
log_writer_.reset();
Status s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
Status s =
SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
@ -1440,7 +1441,8 @@ TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) {
db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
PrepareManifest(nullptr, nullptr, &log_writer_);
log_writer_.reset();
Status s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
Status s =
SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;

@ -369,8 +369,8 @@ bool ParseFileName(const std::string& fname, uint64_t* number,
}
IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
uint64_t descriptor_number,
FSDirectory* directory_to_fsync) {
uint64_t descriptor_number,
FSDirectory* directory_to_fsync) {
// Remove leading "dbname/" and add newline to manifest file name
std::string manifest = DescriptorFileName(dbname, descriptor_number);
Slice contents = manifest;

@ -170,8 +170,8 @@ extern bool ParseFileName(const std::string& filename, uint64_t* number,
// Make the CURRENT file point to the descriptor file with the
// specified number.
extern IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
uint64_t descriptor_number,
FSDirectory* directory_to_fsync);
uint64_t descriptor_number,
FSDirectory* directory_to_fsync);
// Make the IDENTITY file for the db
extern Status SetIdentityFile(Env* env, const std::string& dbname,

@ -51,6 +51,13 @@ class InternalKeyComparator;
class WalFilter;
class FileSystem;
enum class CpuPriority {
kIdle = 0,
kLow = 1,
kNormal = 2,
kHigh = 3,
};
// DB contents are stored in a set of blocks, each of which holds a
// sequence of key,value pairs. Each block may be compressed before
// being stored in a file. The following enum describes which

@ -19,6 +19,7 @@
#include "rocksdb/utilities/stackable_db.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE {
@ -142,6 +143,24 @@ struct BackupableDBOptions {
}
};
struct CreateBackupOptions {
// Flush will always trigger if 2PC is enabled.
// If write-ahead logs are disabled, set flush_before_backup=true to
// avoid losing unflushed key/value pairs from the memtable.
bool flush_before_backup = false;
// Callback for reporting progress.
std::function<void()> progress_callback = []() {};
// If false, background_thread_cpu_priority is ignored.
// Otherwise, the cpu priority can be decreased,
// if you try to increase the priority, the priority will not change.
// The initial priority of the threads is CpuPriority::kNormal,
// so you can decrease to priorities lower than kNormal.
bool decrease_background_thread_cpu_priority = false;
CpuPriority background_thread_cpu_priority = CpuPriority::kNormal;
};
struct RestoreOptions {
// If true, restore won't overwrite the existing log files in wal_dir. It will
// also move all log files from archive directory to wal_dir. Use this option
@ -208,8 +227,13 @@ class BackupEngineReadOnly {
public:
virtual ~BackupEngineReadOnly() {}
static Status Open(Env* db_env, const BackupableDBOptions& options,
static Status Open(const BackupableDBOptions& options, Env* db_env,
BackupEngineReadOnly** backup_engine_ptr);
// keep for backward compatibility.
static Status Open(Env* db_env, const BackupableDBOptions& options,
BackupEngineReadOnly** backup_engine_ptr) {
return BackupEngineReadOnly::Open(options, db_env, backup_engine_ptr);
}
// Returns info about backups in backup_info
// You can GetBackupInfo safely, even with other BackupEngine performing
@ -225,14 +249,29 @@ class BackupEngineReadOnly {
// responsibility to synchronize the operation, i.e. don't delete the backup
// when you're restoring from it
// See also the corresponding doc in BackupEngine
virtual Status RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) = 0;
// keep for backward compatibility.
virtual Status RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) = 0;
const RestoreOptions& options = RestoreOptions()) {
return RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
}
// See the corresponding doc in BackupEngine
virtual Status RestoreDBFromLatestBackup(const RestoreOptions& options,
const std::string& db_dir,
const std::string& wal_dir) = 0;
// keep for backward compatibility.
virtual Status RestoreDBFromLatestBackup(
const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) = 0;
const RestoreOptions& options = RestoreOptions()) {
return RestoreDBFromLatestBackup(options, db_dir, wal_dir);
}
// checks that each file exists and that the size of the file matches our
// expectations. it does not check file checksum.
@ -253,27 +292,44 @@ class BackupEngine {
// BackupableDBOptions have to be the same as the ones used in previous
// BackupEngines for the same backup directory.
static Status Open(Env* db_env, const BackupableDBOptions& options,
static Status Open(const BackupableDBOptions& options, Env* db_env,
BackupEngine** backup_engine_ptr);
// same as CreateNewBackup, but stores extra application metadata
// Flush will always trigger if 2PC is enabled.
// If write-ahead logs are disabled, set flush_before_backup=true to
// avoid losing unflushed key/value pairs from the memtable.
// keep for backward compatibility.
static Status Open(Env* db_env, const BackupableDBOptions& options,
BackupEngine** backup_engine_ptr) {
return BackupEngine::Open(options, db_env, backup_engine_ptr);
}
// same as CreateNewBackup, but stores extra application metadata.
virtual Status CreateNewBackupWithMetadata(
const CreateBackupOptions& options, DB* db,
const std::string& app_metadata) = 0;
// keep here for backward compatibility.
virtual Status CreateNewBackupWithMetadata(
DB* db, const std::string& app_metadata, bool flush_before_backup = false,
std::function<void()> progress_callback = []() {}) = 0;
std::function<void()> progress_callback = []() {}) {
CreateBackupOptions options;
options.flush_before_backup = flush_before_backup;
options.progress_callback = progress_callback;
return CreateNewBackupWithMetadata(options, db, app_metadata);
}
// Captures the state of the database in the latest backup
// NOT a thread safe call
// Flush will always trigger if 2PC is enabled.
// If write-ahead logs are disabled, set flush_before_backup=true to
// avoid losing unflushed key/value pairs from the memtable.
virtual Status CreateNewBackup(const CreateBackupOptions& options, DB* db) {
return CreateNewBackupWithMetadata(options, db, "");
}
// keep here for backward compatibility.
virtual Status CreateNewBackup(DB* db, bool flush_before_backup = false,
std::function<void()> progress_callback =
[]() {}) {
return CreateNewBackupWithMetadata(db, "", flush_before_backup,
progress_callback);
CreateBackupOptions options;
options.flush_before_backup = flush_before_backup;
options.progress_callback = progress_callback;
return CreateNewBackup(options, db);
}
// Deletes old backups, keeping latest num_backups_to_keep alive.
@ -313,14 +369,29 @@ class BackupEngine {
// database will diverge from backups 4 and 5 and the new backup will fail.
// If you want to create new backup, you will first have to delete backups 4
// and 5.
virtual Status RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) = 0;
// keep for backward compatibility.
virtual Status RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) = 0;
const RestoreOptions& options = RestoreOptions()) {
return RestoreDBFromBackup(options, backup_id, db_dir, wal_dir);
}
// restore from the latest backup
virtual Status RestoreDBFromLatestBackup(const RestoreOptions& options,
const std::string& db_dir,
const std::string& wal_dir) = 0;
// keep for backward compatibility.
virtual Status RestoreDBFromLatestBackup(
const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) = 0;
const RestoreOptions& options = RestoreOptions()) {
return RestoreDBFromLatestBackup(options, db_dir, wal_dir);
}
// checks that each file exists and that the size of the file matches our
// expectations. it does not check file checksum.

@ -19,6 +19,7 @@
#include <stdio.h>
#include <string.h>
#include <sys/resource.h>
#include <sys/syscall.h>
#include <sys/time.h>
#include <unistd.h>
#include <cstdlib>
@ -230,5 +231,34 @@ static size_t GetPageSize() {
const size_t kPageSize = GetPageSize();
void SetCpuPriority(ThreadId id, CpuPriority priority) {
#ifdef OS_LINUX
sched_param param;
param.sched_priority = 0;
switch (priority) {
case CpuPriority::kHigh:
sched_setscheduler(id, SCHED_OTHER, &param);
setpriority(PRIO_PROCESS, id, -20);
break;
case CpuPriority::kNormal:
sched_setscheduler(id, SCHED_OTHER, &param);
setpriority(PRIO_PROCESS, id, 0);
break;
case CpuPriority::kLow:
sched_setscheduler(id, SCHED_OTHER, &param);
setpriority(PRIO_PROCESS, id, 19);
break;
case CpuPriority::kIdle:
sched_setscheduler(id, SCHED_IDLE, &param);
break;
default:
assert(false);
}
#else
(void)id;
(void)priority;
#endif
}
} // namespace port
} // namespace ROCKSDB_NAMESPACE

@ -13,6 +13,7 @@
#include <thread>
#include "rocksdb/options.h"
#include "rocksdb/rocksdb_namespace.h"
// size_t printf formatting named in the manner of C99 standard formatting
@ -214,5 +215,9 @@ extern int GetMaxOpenFiles();
extern const size_t kPageSize;
using ThreadId = pid_t;
extern void SetCpuPriority(ThreadId id, CpuPriority priority);
} // namespace port
} // namespace ROCKSDB_NAMESPACE

@ -159,20 +159,19 @@ DIR* opendir(const char* name) {
std::unique_ptr<DIR> dir(new DIR);
dir->handle_ = RX_FindFirstFileEx(RX_FN(pattern).c_str(),
FindExInfoBasic, // Do not want alternative name
&dir->data_,
FindExSearchNameMatch,
NULL, // lpSearchFilter
0);
dir->handle_ =
RX_FindFirstFileEx(RX_FN(pattern).c_str(),
FindExInfoBasic, // Do not want alternative name
&dir->data_, FindExSearchNameMatch,
NULL, // lpSearchFilter
0);
if (dir->handle_ == INVALID_HANDLE_VALUE) {
return nullptr;
}
RX_FILESTRING x(dir->data_.cFileName, RX_FNLEN(dir->data_.cFileName));
strcpy_s(dir->entry_.d_name, sizeof(dir->entry_.d_name),
FN_TO_RX(x).c_str());
strcpy_s(dir->entry_.d_name, sizeof(dir->entry_.d_name), FN_TO_RX(x).c_str());
return dir.release();
}
@ -195,7 +194,7 @@ struct dirent* readdir(DIR* dirp) {
}
RX_FILESTRING x(dirp->data_.cFileName, RX_FNLEN(dirp->data_.cFileName));
strcpy_s(dirp->entry_.d_name, sizeof(dirp->entry_.d_name),
strcpy_s(dirp->entry_.d_name, sizeof(dirp->entry_.d_name),
FN_TO_RX(x).c_str());
return &dirp->entry_;
@ -265,5 +264,11 @@ int GetMaxOpenFiles() { return -1; }
// Assume 4KB page size
const size_t kPageSize = 4U * 1024U;
void SetCpuPriority(ThreadId id, CpuPriority priority) {
// Not supported
(void)id;
(void)priority;
}
} // namespace port
} // namespace ROCKSDB_NAMESPACE

@ -336,6 +336,10 @@ extern int GetMaxOpenFiles();
std::string utf16_to_utf8(const std::wstring& utf16);
std::wstring utf8_to_utf16(const std::string& utf8);
using ThreadId = int;
extern void SetCpuPriority(ThreadId id, CpuPriority priority);
} // namespace port

@ -258,12 +258,8 @@ Status PlainTableBuilder::Finish() {
// -- Write property block
BlockHandle property_block_handle;
IOStatus s = WriteBlock(
property_block_builder.Finish(),
file_,
&offset_,
&property_block_handle
);
IOStatus s = WriteBlock(property_block_builder.Finish(), file_, &offset_,
&property_block_handle);
if (!s.ok()) {
return std::move(s);
}

@ -232,12 +232,8 @@ void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
#ifdef OS_LINUX
if (decrease_cpu_priority) {
setpriority(
PRIO_PROCESS,
// Current thread.
0,
// Lowest priority possible.
19);
// 0 means current thread.
port::SetCpuPriority(0, CpuPriority::kLow);
low_cpu_priority = true;
}

@ -88,32 +88,41 @@ void BackupableDBOptions::Dump(Logger* logger) const {
// -------- BackupEngineImpl class ---------
class BackupEngineImpl : public BackupEngine {
public:
BackupEngineImpl(Env* db_env, const BackupableDBOptions& options,
BackupEngineImpl(const BackupableDBOptions& options, Env* db_env,
bool read_only = false);
~BackupEngineImpl() override;
Status CreateNewBackupWithMetadata(DB* db, const std::string& app_metadata,
bool flush_before_backup = false,
std::function<void()> progress_callback =
[]() {}) override;
using BackupEngine::CreateNewBackupWithMetadata;
Status CreateNewBackupWithMetadata(const CreateBackupOptions& options, DB* db,
const std::string& app_metadata) override;
Status PurgeOldBackups(uint32_t num_backups_to_keep) override;
Status DeleteBackup(BackupID backup_id) override;
void StopBackup() override {
stop_backup_.store(true, std::memory_order_release);
}
Status GarbageCollect() override;
// The returned BackupInfos are in chronological order, which means the
// latest backup comes last.
void GetBackupInfo(std::vector<BackupInfo>* backup_info) override;
void GetCorruptedBackups(std::vector<BackupID>* corrupt_backup_ids) override;
Status RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) override;
Status RestoreDBFromLatestBackup(
const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) override {
return RestoreDBFromBackup(latest_valid_backup_id_, db_dir, wal_dir,
restore_options);
using BackupEngine::RestoreDBFromBackup;
Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) override;
using BackupEngine::RestoreDBFromLatestBackup;
Status RestoreDBFromLatestBackup(const RestoreOptions& options,
const std::string& db_dir,
const std::string& wal_dir) override {
return RestoreDBFromBackup(options, latest_valid_backup_id_, db_dir,
wal_dir);
}
Status VerifyBackup(BackupID backup_id) override;
@ -459,6 +468,7 @@ class BackupEngineImpl : public BackupEngine {
std::mutex byte_report_mutex_;
channel<CopyOrCreateWorkItem> files_to_copy_or_create_;
std::vector<port::Thread> threads_;
std::atomic<CpuPriority> threads_cpu_priority_;
// Certain operations like PurgeOldBackups and DeleteBackup will trigger
// automatic GarbageCollect (true) unless we've already done one in this
// session and have not failed to delete backup files since then (false).
@ -512,10 +522,10 @@ class BackupEngineImpl : public BackupEngine {
static const size_t kMaxAppMetaSize = 1024 * 1024; // 1MB
};
Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
Status BackupEngine::Open(const BackupableDBOptions& options, Env* env,
BackupEngine** backup_engine_ptr) {
std::unique_ptr<BackupEngineImpl> backup_engine(
new BackupEngineImpl(env, options));
new BackupEngineImpl(options, env));
auto s = backup_engine->Initialize();
if (!s.ok()) {
*backup_engine_ptr = nullptr;
@ -525,9 +535,8 @@ Status BackupEngine::Open(Env* env, const BackupableDBOptions& options,
return Status::OK();
}
BackupEngineImpl::BackupEngineImpl(Env* db_env,
const BackupableDBOptions& options,
bool read_only)
BackupEngineImpl::BackupEngineImpl(const BackupableDBOptions& options,
Env* db_env, bool read_only)
: initialized_(false),
latest_backup_id_(0),
latest_valid_backup_id_(0),
@ -730,6 +739,8 @@ Status BackupEngineImpl::Initialize() {
// set up threads perform copies from files_to_copy_or_create_ in the
// background
threads_cpu_priority_ = CpuPriority::kNormal;
threads_.reserve(options_.max_background_operations);
for (int t = 0; t < options_.max_background_operations; t++) {
threads_.emplace_back([this]() {
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
@ -737,8 +748,16 @@ Status BackupEngineImpl::Initialize() {
pthread_setname_np(pthread_self(), "backup_engine");
#endif
#endif
CpuPriority current_priority = CpuPriority::kNormal;
CopyOrCreateWorkItem work_item;
while (files_to_copy_or_create_.read(work_item)) {
CpuPriority priority = threads_cpu_priority_;
if (current_priority != priority) {
TEST_SYNC_POINT_CALLBACK(
"BackupEngineImpl::Initialize:SetCpuPriority", &priority);
port::SetCpuPriority(0, priority);
current_priority = priority;
}
CopyOrCreateResult result;
result.status = CopyOrCreateFile(
work_item.src_path, work_item.dst_path, work_item.contents,
@ -756,14 +775,20 @@ Status BackupEngineImpl::Initialize() {
}
Status BackupEngineImpl::CreateNewBackupWithMetadata(
DB* db, const std::string& app_metadata, bool flush_before_backup,
std::function<void()> progress_callback) {
const CreateBackupOptions& options, DB* db,
const std::string& app_metadata) {
assert(initialized_);
assert(!read_only_);
if (app_metadata.size() > kMaxAppMetaSize) {
return Status::InvalidArgument("App metadata too large");
}
if (options.decrease_background_thread_cpu_priority) {
if (options.background_thread_cpu_priority < threads_cpu_priority_) {
threads_cpu_priority_.store(options.background_thread_cpu_priority);
}
}
BackupID new_backup_id = latest_backup_id_ + 1;
assert(backups_.find(new_backup_id) == backups_.end());
@ -869,7 +894,7 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
fname, src_env_options, rate_limiter, size_bytes,
size_limit_bytes,
options_.share_files_with_checksum && type == kTableFile,
progress_callback);
options.progress_callback);
}
return st;
} /* copy_file_cb */,
@ -880,9 +905,9 @@ Status BackupEngineImpl::CreateNewBackupWithMetadata(
false /* shared */, "" /* src_dir */, fname,
EnvOptions() /* src_env_options */, rate_limiter, contents.size(),
0 /* size_limit */, false /* shared_checksum */,
progress_callback, contents);
options.progress_callback, contents);
} /* create_file_cb */,
&sequence_number, flush_before_backup ? 0 : port::kMaxUint64);
&sequence_number, options.flush_before_backup ? 0 : port::kMaxUint64);
if (s.ok()) {
new_backup->SetSequenceNumber(sequence_number);
}
@ -1105,9 +1130,10 @@ BackupEngineImpl::GetCorruptedBackups(
}
}
Status BackupEngineImpl::RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options) {
Status BackupEngineImpl::RestoreDBFromBackup(const RestoreOptions& options,
BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) {
assert(initialized_);
auto corrupt_itr = corrupt_backups_.find(backup_id);
if (corrupt_itr != corrupt_backups_.end()) {
@ -1124,13 +1150,13 @@ Status BackupEngineImpl::RestoreDBFromBackup(
ROCKS_LOG_INFO(options_.info_log, "Restoring backup id %u\n", backup_id);
ROCKS_LOG_INFO(options_.info_log, "keep_log_files: %d\n",
static_cast<int>(restore_options.keep_log_files));
static_cast<int>(options.keep_log_files));
// just in case. Ignore errors
db_env_->CreateDirIfMissing(db_dir);
db_env_->CreateDirIfMissing(wal_dir);
if (restore_options.keep_log_files) {
if (options.keep_log_files) {
// delete files in db_dir, but keep all the log files
DeleteChildren(db_dir, 1 << kLogFile);
// move all the files from archive dir to wal_dir
@ -1928,8 +1954,8 @@ Status BackupEngineImpl::BackupMeta::StoreToFile(bool sync) {
// -------- BackupEngineReadOnlyImpl ---------
class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
public:
BackupEngineReadOnlyImpl(Env* db_env, const BackupableDBOptions& options)
: backup_engine_(new BackupEngineImpl(db_env, options, true)) {}
BackupEngineReadOnlyImpl(const BackupableDBOptions& options, Env* db_env)
: backup_engine_(new BackupEngineImpl(options, db_env, true)) {}
~BackupEngineReadOnlyImpl() override {}
@ -1943,18 +1969,19 @@ class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
backup_engine_->GetCorruptedBackups(corrupt_backup_ids);
}
Status RestoreDBFromBackup(
BackupID backup_id, const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) override {
return backup_engine_->RestoreDBFromBackup(backup_id, db_dir, wal_dir,
restore_options);
using BackupEngineReadOnly::RestoreDBFromBackup;
Status RestoreDBFromBackup(const RestoreOptions& options, BackupID backup_id,
const std::string& db_dir,
const std::string& wal_dir) override {
return backup_engine_->RestoreDBFromBackup(options, backup_id, db_dir,
wal_dir);
}
Status RestoreDBFromLatestBackup(
const std::string& db_dir, const std::string& wal_dir,
const RestoreOptions& restore_options = RestoreOptions()) override {
return backup_engine_->RestoreDBFromLatestBackup(db_dir, wal_dir,
restore_options);
using BackupEngineReadOnly::RestoreDBFromLatestBackup;
Status RestoreDBFromLatestBackup(const RestoreOptions& options,
const std::string& db_dir,
const std::string& wal_dir) override {
return backup_engine_->RestoreDBFromLatestBackup(options, db_dir, wal_dir);
}
Status VerifyBackup(BackupID backup_id) override {
@ -1967,14 +1994,14 @@ class BackupEngineReadOnlyImpl : public BackupEngineReadOnly {
std::unique_ptr<BackupEngineImpl> backup_engine_;
};
Status BackupEngineReadOnly::Open(Env* env, const BackupableDBOptions& options,
Status BackupEngineReadOnly::Open(const BackupableDBOptions& options, Env* env,
BackupEngineReadOnly** backup_engine_ptr) {
if (options.destroy_old_data) {
return Status::InvalidArgument(
"Can't destroy old data with ReadOnly BackupEngine");
}
std::unique_ptr<BackupEngineReadOnlyImpl> backup_engine(
new BackupEngineReadOnlyImpl(env, options));
new BackupEngineReadOnlyImpl(options, env));
auto s = backup_engine->Initialize();
if (!s.ok()) {
*backup_engine_ptr = nullptr;

@ -1842,6 +1842,87 @@ TEST_P(BackupableDBTestWithParam, BackupUsingDirectIO) {
}
}
TEST_F(BackupableDBTest, BackgroundThreadCpuPriority) {
std::atomic<CpuPriority> priority(CpuPriority::kNormal);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BackupEngineImpl::Initialize:SetCpuPriority", [&](void* new_priority) {
priority.store(*reinterpret_cast<CpuPriority*>(new_priority));
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// 1 thread is easier to test, otherwise, we may not be sure which thread
// actually does the work during CreateNewBackup.
backupable_options_->max_background_operations = 1;
OpenDBAndBackupEngine(true);
{
FillDB(db_.get(), 0, 100);
// by default, cpu priority is not changed.
CreateBackupOptions options;
ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
ASSERT_EQ(priority, CpuPriority::kNormal);
}
{
FillDB(db_.get(), 101, 200);
// decrease cpu priority from normal to low.
CreateBackupOptions options;
options.decrease_background_thread_cpu_priority = true;
options.background_thread_cpu_priority = CpuPriority::kLow;
ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
ASSERT_EQ(priority, CpuPriority::kLow);
}
{
FillDB(db_.get(), 201, 300);
// try to upgrade cpu priority back to normal,
// the priority should still low.
CreateBackupOptions options;
options.decrease_background_thread_cpu_priority = true;
options.background_thread_cpu_priority = CpuPriority::kNormal;
ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
ASSERT_EQ(priority, CpuPriority::kLow);
}
{
FillDB(db_.get(), 301, 400);
// decrease cpu priority from low to idle.
CreateBackupOptions options;
options.decrease_background_thread_cpu_priority = true;
options.background_thread_cpu_priority = CpuPriority::kIdle;
ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
ASSERT_EQ(priority, CpuPriority::kIdle);
}
{
FillDB(db_.get(), 301, 400);
// reset priority to later verify that it's not updated by SetCpuPriority.
priority = CpuPriority::kNormal;
// setting the same cpu priority won't call SetCpuPriority.
CreateBackupOptions options;
options.decrease_background_thread_cpu_priority = true;
options.background_thread_cpu_priority = CpuPriority::kIdle;
ASSERT_OK(backup_engine_->CreateNewBackup(options, db_.get()));
ASSERT_EQ(priority, CpuPriority::kNormal);
}
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
CloseDBAndBackupEngine();
DestroyDB(dbname_, options_);
}
} // anon namespace
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save