diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 903fca762..d32993577 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -328,9 +328,10 @@ TEST_F(DBSSTTest, RateLimitedDelete) { std::string trash_dir = test::TmpDir(env_) + "/trash"; int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec Status s; - options.sst_file_manager.reset(NewSstFileManager( - env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s)); + options.sst_file_manager.reset( + NewSstFileManager(env_, nullptr, trash_dir, 0, false, &s)); ASSERT_OK(s); + options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec); auto sfm = static_cast(options.sst_file_manager.get()); ASSERT_OK(TryReopen(options)); diff --git a/include/rocksdb/sst_file_manager.h b/include/rocksdb/sst_file_manager.h index f8fee2742..7490615c5 100644 --- a/include/rocksdb/sst_file_manager.h +++ b/include/rocksdb/sst_file_manager.h @@ -50,6 +50,11 @@ class SstFileManager { // Return delete rate limit in bytes per second. // thread-safe virtual int64_t GetDeleteRateBytesPerSecond() = 0; + + // Update the delete rate limit in bytes per second. + // zero means disable delete rate limiting and delete files immediately + // thread-safe + virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) = 0; }; // Create a new SstFileManager that can be shared among multiple RocksDB diff --git a/util/delete_scheduler.cc b/util/delete_scheduler.cc index 34a6935c7..d11a55a9a 100644 --- a/util/delete_scheduler.cc +++ b/util/delete_scheduler.cc @@ -30,13 +30,8 @@ DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir, cv_(&mu_), info_log_(info_log), sst_file_manager_(sst_file_manager) { - if (rate_bytes_per_sec_ <= 0) { - // Rate limiting is disabled - bg_thread_.reset(); - } else { - bg_thread_.reset( - new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this)); - } + bg_thread_.reset( + new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this)); } DeleteScheduler::~DeleteScheduler() { @@ -52,8 +47,9 @@ DeleteScheduler::~DeleteScheduler() { Status DeleteScheduler::DeleteFile(const std::string& file_path) { Status s; - if (rate_bytes_per_sec_ <= 0) { + if (rate_bytes_per_sec_.load() <= 0) { // Rate limiting is disabled + TEST_SYNC_POINT("DeleteScheduler::DeleteFile"); s = env_->DeleteFile(file_path); if (s.ok() && sst_file_manager_) { sst_file_manager_->OnDeleteFile(file_path); @@ -147,7 +143,16 @@ void DeleteScheduler::BackgroundEmptyTrash() { // Delete all files in queue_ uint64_t start_time = env_->NowMicros(); uint64_t total_deleted_bytes = 0; + int64_t current_delete_rate = rate_bytes_per_sec_.load(); while (!queue_.empty() && !closing_) { + if (current_delete_rate != rate_bytes_per_sec_.load()) { + // User changed the delete rate + current_delete_rate = rate_bytes_per_sec_.load(); + start_time = env_->NowMicros(); + total_deleted_bytes = 0; + } + + // Get new file to delete std::string path_in_trash = queue_.front(); queue_.pop(); @@ -164,9 +169,16 @@ void DeleteScheduler::BackgroundEmptyTrash() { } // Apply penlty if necessary - uint64_t total_penlty = - ((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_); - while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {} + uint64_t total_penlty; + if (current_delete_rate > 0) { + // rate limiting is enabled + total_penlty = + ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate); + while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {} + } else { + // rate limiting is disabled + total_penlty = 0; + } TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait", &total_penlty); diff --git a/util/delete_scheduler.h b/util/delete_scheduler.h index 878a117e5..678bf5c17 100644 --- a/util/delete_scheduler.h +++ b/util/delete_scheduler.h @@ -39,7 +39,12 @@ class DeleteScheduler { ~DeleteScheduler(); // Return delete rate limit in bytes per second - int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_; } + int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_.load(); } + + // Set delete rate limit in bytes per second + void SetRateBytesPerSecond(int64_t bytes_per_sec) { + return rate_bytes_per_sec_.store(bytes_per_sec); + } // Move file to trash directory and schedule it's deletion Status DeleteFile(const std::string& fname); @@ -64,7 +69,7 @@ class DeleteScheduler { // Path to the trash directory std::string trash_dir_; // Maximum number of bytes that should be deleted per second - int64_t rate_bytes_per_sec_; + std::atomic rate_bytes_per_sec_; // Mutex to protect queue_, pending_files_, bg_errors_, closing_ InstrumentedMutex mu_; // Queue of files in trash that need to be deleted diff --git a/util/delete_scheduler_test.cc b/util/delete_scheduler_test.cc index 087ba38af..0301c23cf 100644 --- a/util/delete_scheduler_test.cc +++ b/util/delete_scheduler_test.cc @@ -422,6 +422,90 @@ TEST_F(DeleteSchedulerTest, MoveToTrashError) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } + +TEST_F(DeleteSchedulerTest, DynamicRateLimiting1) { + std::vector penalties; + int bg_delete_file = 0; + int fg_delete_file = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::DeleteTrashFile:DeleteFile", + [&](void* arg) { bg_delete_file++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::DeleteFile", + [&](void* arg) { fg_delete_file++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::BackgroundEmptyTrash:Wait", + [&](void* arg) { penalties.push_back(*(static_cast(arg))); }); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DeleteSchedulerTest::DynamicRateLimiting1:1", + "DeleteScheduler::BackgroundEmptyTrash"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + rate_bytes_per_sec_ = 0; // Disable rate limiting initially + NewDeleteScheduler(); + + + int num_files = 10; // 10 files + uint64_t file_size = 1024; // every file is 1 kb + + std::vector delete_kbs_per_sec = {512, 200, 0, 100, 50, -2, 25}; + for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) { + penalties.clear(); + bg_delete_file = 0; + fg_delete_file = 0; + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndCreateDir(dummy_files_dir_); + rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024; + delete_scheduler_->SetRateBytesPerSecond(rate_bytes_per_sec_); + + // Create 100 dummy files, every file is 1 Kb + std::vector generated_files; + for (int i = 0; i < num_files; i++) { + std::string file_name = "file" + ToString(i) + ".data"; + generated_files.push_back(NewDummyFile(file_name, file_size)); + } + + // Delete dummy files and measure time spent to empty trash + for (int i = 0; i < num_files; i++) { + ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i])); + } + ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0); + + if (rate_bytes_per_sec_ > 0) { + uint64_t delete_start_time = env_->NowMicros(); + TEST_SYNC_POINT("DeleteSchedulerTest::DynamicRateLimiting1:1"); + delete_scheduler_->WaitForEmptyTrash(); + uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time; + + auto bg_errors = delete_scheduler_->GetBackgroundErrors(); + ASSERT_EQ(bg_errors.size(), 0); + + uint64_t total_files_size = 0; + uint64_t expected_penlty = 0; + ASSERT_EQ(penalties.size(), num_files); + for (int i = 0; i < num_files; i++) { + total_files_size += file_size; + expected_penlty = ((total_files_size * 1000000) / rate_bytes_per_sec_); + ASSERT_EQ(expected_penlty, penalties[i]); + } + ASSERT_GT(time_spent_deleting, expected_penlty * 0.9); + ASSERT_EQ(bg_delete_file, num_files); + ASSERT_EQ(fg_delete_file, 0); + } else { + ASSERT_EQ(penalties.size(), 0); + ASSERT_EQ(bg_delete_file, 0); + ASSERT_EQ(fg_delete_file, num_files); + } + + ASSERT_EQ(CountFilesInDir(trash_dir_), 0); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc index 208971cb5..863118938 100644 --- a/util/sst_file_manager_impl.cc +++ b/util/sst_file_manager_impl.cc @@ -87,6 +87,10 @@ int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() { return delete_scheduler_.GetRateBytesPerSecond(); } +void SstFileManagerImpl::SetDeleteRateBytesPerSecond(int64_t delete_rate) { + return delete_scheduler_.SetRateBytesPerSecond(delete_rate); +} + Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path) { return delete_scheduler_.DeleteFile(file_path); } @@ -127,7 +131,7 @@ SstFileManager* NewSstFileManager(Env* env, std::shared_ptr info_log, new SstFileManagerImpl(env, info_log, trash_dir, rate_bytes_per_sec); Status s; - if (trash_dir != "" && rate_bytes_per_sec > 0) { + if (trash_dir != "") { s = env->CreateDirIfMissing(trash_dir); if (s.ok() && delete_existing_trash) { std::vector files_in_trash; diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h index 543c9d95a..c9df6e9ca 100644 --- a/util/sst_file_manager_impl.h +++ b/util/sst_file_manager_impl.h @@ -64,6 +64,9 @@ class SstFileManagerImpl : public SstFileManager { // Return delete rate limit in bytes per second. virtual int64_t GetDeleteRateBytesPerSecond() override; + // Update the delete rate limit in bytes per second. + virtual void SetDeleteRateBytesPerSecond(int64_t delete_rate) override; + // Move file to trash directory and schedule it's deletion. virtual Status ScheduleFileDeletion(const std::string& file_path);