Charge read to rate limiter in BackupEngine (#8722)

Summary:
Context:
While all the non-trivial write operations in BackupEngine go through the RateLimiter, reads currently do not. In general, this is not a huge issue because (especially since some I/O efficiency fixes) reads in BackupEngine are mostly limited by corresponding writes, for both backup and restore. But in principle we should charge the RateLimiter for reads as well.
- Charged read operations in `BackupEngineImpl::CopyOrCreateFile`, `BackupEngineImpl::ReadFileAndComputeChecksum`, `BackupEngineImpl::BackupMeta::LoadFromFile` and `BackupEngineImpl::GetFileDbIdentities`

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8722

Test Plan:
- Passed existing tests
- Passed added unit tests

Reviewed By: pdillinger

Differential Revision: D30610464

Pulled By: hx235

fbshipit-source-id: 9b08c9387159a5385c8d390d6666377a0d0117e5
main
hx235 3 years ago committed by Facebook GitHub Bot
parent dd092c2d11
commit 45175ca2e1
  1. 1
      HISTORY.md
  2. 6
      include/rocksdb/utilities/backup_engine.h
  3. 59
      utilities/backupable/backupable_db.cc
  4. 170
      utilities/backupable/backupable_db_test.cc

@ -8,6 +8,7 @@
* Fix a race in item ref counting in LRUCache when promoting an item from the SecondaryCache.
* Fix a race in BackupEngine if RateLimiter is reconfigured during concurrent Restore operations.
* Fix a bug on POSIX in which failure to create a lock file (e.g. out of space) can prevent future LockFile attempts in the same process on the same file from succeeding.
* Fix a bug that backup_rate_limiter and restore_rate_limiter in BackupEngine could not limit read rates.
### New Features
* RemoteCompaction's interface now includes `db_name`, `db_id`, `session_id`, which could help the user uniquely identify compaction job between db instances and sessions.

@ -76,6 +76,9 @@ struct BackupEngineOptions {
// Max bytes that can be transferred in a second during backup.
// If 0, go as fast as you can
// This limit only applies to writes. To also limit reads,
// a rate limiter able to also limit reads (e.g, its mode = kAllIo)
// have to be passed in through the option "backup_rate_limiter"
// Default: 0
uint64_t backup_rate_limit;
@ -86,6 +89,9 @@ struct BackupEngineOptions {
// Max bytes that can be transferred in a second during restore.
// If 0, go as fast as you can
// This limit only applies to writes. To also limit reads,
// a rate limiter able to also limit reads (e.g, its mode = kAllIo)
// have to be passed in through the option "restore_rate_limiter"
// Default: 0
uint64_t restore_rate_limit;

@ -407,7 +407,7 @@ class BackupEngineImpl {
Status LoadFromFile(
const std::string& backup_dir,
const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
Logger* info_log,
RateLimiter* rate_limiter, Logger* info_log,
std::unordered_set<std::string>* reported_ignored_fields);
Status StoreToFile(
bool sync, const TEST_FutureSchemaVersion2Options* test_future_options);
@ -550,7 +550,8 @@ class BackupEngineImpl {
// Obtain db_id and db_session_id from the table properties of file_path
Status GetFileDbIdentities(Env* src_env, const EnvOptions& src_env_options,
const std::string& file_path, std::string* db_id,
const std::string& file_path,
RateLimiter* rate_limiter, std::string* db_id,
std::string* db_session_id);
struct CopyOrCreateResult {
@ -1088,7 +1089,8 @@ Status BackupEngineImpl::Initialize() {
&abs_path_to_size);
if (s.ok()) {
s = backup_iter->second->LoadFromFile(
options_.backup_dir, abs_path_to_size, options_.info_log,
options_.backup_dir, abs_path_to_size,
options_.backup_rate_limiter.get(), options_.info_log,
&reported_ignored_fields_);
}
if (s.IsCorruption() || s.IsNotSupported()) {
@ -1954,6 +1956,10 @@ Status BackupEngineImpl::CopyOrCreateFile(
size_t buffer_to_read =
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
s = src_reader->Read(buffer_to_read, &data, buf.get());
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
processed_buffer_size += buffer_to_read;
} else {
data = contents;
@ -2046,8 +2052,8 @@ Status BackupEngineImpl::AddBackupFileWorkItem(
// Prepare db_session_id to add to the file name
// Ignore the returned status
// In the failed cases, db_id and db_session_id will be empty
GetFileDbIdentities(db_env_, src_env_options, src_dir + fname, &db_id,
&db_session_id)
GetFileDbIdentities(db_env_, src_env_options, src_dir + fname,
rate_limiter, &db_id, &db_session_id)
.PermitUncheckedError();
}
// Calculate checksum if checksum and db session id are not available.
@ -2259,7 +2265,10 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
size_t buffer_to_read =
(buf_size < size_limit) ? buf_size : static_cast<size_t>(size_limit);
s = src_reader->Read(buffer_to_read, &data, buf.get());
if (rate_limiter != nullptr) {
rate_limiter->Request(data.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
if (!s.ok()) {
return s;
}
@ -2276,6 +2285,7 @@ Status BackupEngineImpl::ReadFileAndComputeChecksum(
Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
const EnvOptions& src_env_options,
const std::string& file_path,
RateLimiter* rate_limiter,
std::string* db_id,
std::string* db_session_id) {
assert(db_id != nullptr || db_session_id != nullptr);
@ -2300,6 +2310,13 @@ Status BackupEngineImpl::GetFileDbIdentities(Env* src_env,
table_properties = sst_reader.GetInitTableProperties();
} else {
table_properties = tp.get();
if (table_properties != nullptr && rate_limiter != nullptr) {
// sizeof(*table_properties) is a sufficent but far-from-exact
// approximation of read bytes due to metaindex block, std::string
// properties and varint compression
rate_limiter->Request(sizeof(*table_properties), Env::IO_LOW,
nullptr /* stats */, RateLimiter::OpType::kRead);
}
}
} else {
ROCKS_LOG_INFO(options_.info_log, "Failed to read %s: %s",
@ -2623,7 +2640,7 @@ const std::string kNonIgnorableFieldPrefix{"ni::"};
Status BackupEngineImpl::BackupMeta::LoadFromFile(
const std::string& backup_dir,
const std::unordered_map<std::string, uint64_t>& abs_path_to_size,
Logger* info_log,
RateLimiter* rate_limiter, Logger* info_log,
std::unordered_set<std::string>* reported_ignored_fields) {
assert(reported_ignored_fields);
assert(Empty());
@ -2645,6 +2662,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
// Failures handled at the end
std::string line;
if (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
if (StartsWith(line, kSchemaVersionPrefix)) {
std::string ver = line.substr(kSchemaVersionPrefix.size());
if (ver == "2" || StartsWith(ver, "2.")) {
@ -2658,14 +2679,28 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
return Status::Corruption("Unexpected empty line");
}
}
if (!line.empty() || backup_meta_reader->ReadLine(&line)) {
if (!line.empty()) {
timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
} else if (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
timestamp_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
}
if (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
sequence_number_ = std::strtoull(line.c_str(), nullptr, /*base*/ 10);
}
uint32_t num_files = UINT32_MAX;
while (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
if (line.empty()) {
return Status::Corruption("Unexpected empty line");
}
@ -2705,6 +2740,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
std::vector<std::shared_ptr<FileInfo>> files;
bool footer_present = false;
while (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
std::vector<std::string> components = StringSplit(line, ' ');
if (components.size() < 1) {
@ -2792,6 +2831,10 @@ Status BackupEngineImpl::BackupMeta::LoadFromFile(
if (footer_present) {
assert(schema_major_version >= 2);
while (backup_meta_reader->ReadLine(&line)) {
if (rate_limiter != nullptr) {
rate_limiter->Request(line.size(), Env::IO_LOW, nullptr /* stats */,
RateLimiter::OpType::kRead);
}
if (line.empty()) {
return Status::Corruption("Unexpected empty line");
}

@ -13,6 +13,8 @@
#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <limits>
#include <random>
#include <string>
@ -2622,6 +2624,174 @@ TEST_P(BackupEngineRateLimitingTestWithParam, RateLimiting) {
AssertBackupConsistency(0, 0, 100000, 100010);
}
TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingVerifyBackup) {
const std::size_t kMicrosPerSec = 1000 * 1000LL;
std::shared_ptr<RateLimiter> backupThrottler(NewGenericRateLimiter(
1, 100 * 1000 /* refill_period_us */, 10 /* fairness */,
RateLimiter::Mode::kAllIo /* mode */));
bool makeThrottler = std::get<0>(GetParam());
if (makeThrottler) {
backupable_options_->backup_rate_limiter = backupThrottler;
}
bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
if (makeThrottler) {
backupable_options_->backup_rate_limiter->SetBytesPerSecond(
backup_rate_limiter_limit);
} else {
backupable_options_->backup_rate_limit = backup_rate_limiter_limit;
}
DestroyDB(dbname_, Options());
OpenDBAndBackupEngine(true /* destroy_old_data */);
FillDB(db_.get(), 0, 100000);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
false /* flush_before_backup */));
std::vector<BackupInfo> backup_infos;
BackupInfo backup_info;
backup_engine_->GetBackupInfo(&backup_infos);
ASSERT_EQ(1, backup_infos.size());
const int backup_id = 1;
ASSERT_EQ(backup_id, backup_infos[0].backup_id);
ASSERT_OK(backup_engine_->GetBackupInfo(backup_id, &backup_info,
true /* include_file_details */));
std::uint64_t bytes_read_during_verify_backup = 0;
for (BackupFileInfo backup_file_info : backup_info.file_details) {
bytes_read_during_verify_backup += backup_file_info.size;
}
auto start_verify_backup = db_chroot_env_->NowMicros();
ASSERT_OK(
backup_engine_->VerifyBackup(backup_id, true /* verify_with_checksum */));
auto verify_backup_time = db_chroot_env_->NowMicros() - start_verify_backup;
auto rate_limited_verify_backup_time =
(bytes_read_during_verify_backup * kMicrosPerSec) /
backup_rate_limiter_limit;
if (makeThrottler) {
EXPECT_GE(verify_backup_time, 0.8 * rate_limited_verify_backup_time);
}
CloseDBAndBackupEngine();
AssertBackupConsistency(backup_id, 0, 100000, 100010);
DestroyDB(dbname_, Options());
}
TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInBackup) {
bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter(
backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */));
backupable_options_->backup_rate_limiter = backup_rate_limiter;
DestroyDB(dbname_, Options());
OpenDBAndBackupEngine(true /* destroy_old_data */, false /* dummy */,
kShareWithChecksum /* shared_option */);
FillDB(db_.get(), 0, 10);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
false /* flush_before_backup */));
std::int64_t total_bytes_through_with_no_read_charged =
backup_rate_limiter->GetTotalBytesThrough();
CloseBackupEngine();
backup_rate_limiter.reset(NewGenericRateLimiter(
backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
backupable_options_->backup_rate_limiter = backup_rate_limiter;
OpenBackupEngine(true);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
false /* flush_before_backup */));
std::int64_t total_bytes_through_with_read_charged =
backup_rate_limiter->GetTotalBytesThrough();
EXPECT_GT(total_bytes_through_with_read_charged,
total_bytes_through_with_no_read_charged);
CloseDBAndBackupEngine();
AssertBackupConsistency(1, 0, 10, 20);
DestroyDB(dbname_, Options());
}
TEST_P(BackupEngineRateLimitingTestWithParam, RateLimitingChargeReadInRestore) {
bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
const std::uint64_t restore_rate_limiter_limit =
std::get<2>(GetParam()).second;
std::shared_ptr<RateLimiter> restore_rate_limiter(NewGenericRateLimiter(
restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kWritesOnly /* mode */));
backupable_options_->restore_rate_limiter = restore_rate_limiter;
DestroyDB(dbname_, Options());
OpenDBAndBackupEngine(true /* destroy_old_data */);
FillDB(db_.get(), 0, 10);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
false /* flush_before_backup */));
CloseDBAndBackupEngine();
DestroyDB(dbname_, Options());
OpenBackupEngine(false /* destroy_old_data */);
ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
std::int64_t total_bytes_through_with_no_read_charged =
restore_rate_limiter->GetTotalBytesThrough();
CloseBackupEngine();
DestroyDB(dbname_, Options());
restore_rate_limiter.reset(NewGenericRateLimiter(
restore_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
backupable_options_->restore_rate_limiter = restore_rate_limiter;
OpenBackupEngine(false /* destroy_old_data */);
ASSERT_OK(backup_engine_->RestoreDBFromLatestBackup(dbname_, dbname_));
std::int64_t total_bytes_through_with_read_charged =
restore_rate_limiter->GetTotalBytesThrough();
EXPECT_EQ(total_bytes_through_with_read_charged,
total_bytes_through_with_no_read_charged * 2);
CloseBackupEngine();
AssertBackupConsistency(1, 0, 10, 20);
DestroyDB(dbname_, Options());
}
TEST_P(BackupEngineRateLimitingTestWithParam,
RateLimitingChargeReadInInitialize) {
bool is_single_threaded = std::get<1>(GetParam()) == 0 ? true : false;
backupable_options_->max_background_operations = is_single_threaded ? 1 : 10;
const std::uint64_t backup_rate_limiter_limit = std::get<2>(GetParam()).first;
std::shared_ptr<RateLimiter> backup_rate_limiter(NewGenericRateLimiter(
backup_rate_limiter_limit, 100 * 1000 /* refill_period_us */,
10 /* fairness */, RateLimiter::Mode::kAllIo /* mode */));
backupable_options_->backup_rate_limiter = backup_rate_limiter;
DestroyDB(dbname_, Options());
OpenDBAndBackupEngine(true /* destroy_old_data */);
FillDB(db_.get(), 0, 10);
ASSERT_OK(backup_engine_->CreateNewBackup(db_.get(),
false /* flush_before_backup */));
CloseDBAndBackupEngine();
AssertBackupConsistency(1, 0, 10, 20);
std::int64_t total_bytes_through_before_initialize =
backupable_options_->backup_rate_limiter->GetTotalBytesThrough();
OpenDBAndBackupEngine(false /* destroy_old_data */);
// We charge read in BackupEngineImpl::BackupMeta::LoadFromFile,
// which is called in BackupEngineImpl::Initialize() during
// OpenBackupEngine(false)
EXPECT_GT(backupable_options_->backup_rate_limiter->GetTotalBytesThrough(),
total_bytes_through_before_initialize);
CloseDBAndBackupEngine();
DestroyDB(dbname_, Options());
}
#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_F(BackupEngineTest, ReadOnlyBackupEngine) {

Loading…
Cancel
Save