diff --git a/HISTORY.md b/HISTORY.md index aea78b98a..30f6c55aa 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Add DBOptions::skip_stats_update_on_db_open. When it is on, DB::Open() will run faster as it skips the random reads required for loading necessary stats from SST files to optimize compaction. * RollbackToSavePoint() in WriteBatch/WriteBatchWithIndex * Add NewCompactOnDeletionCollectorFactory() in utilities/table_properties_collectors, which allows rocksdb to mark a SST file as need-compaction when it observes at least D deletion entries in any N consecutive entries in that SST file. Note that this feature depends on an experimental NeedCompact() API --- the result of this API will not persist after DB restart. +* Add DBOptions::delete_scheduler. Use NewDeleteScheduler() in include/rocksdb/delete_scheduler.h to create a DeleteScheduler that can be shared among multiple RocksDB instances to control the file deletion rate of SST files that exist in the first db_path. ### Public API Changes * Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it. diff --git a/Makefile b/Makefile index 4e705b69d..37d8214c5 100644 --- a/Makefile +++ b/Makefile @@ -285,6 +285,7 @@ TESTS = \ thread_local_test \ geodb_test \ rate_limiter_test \ + delete_scheduler_test \ options_test \ event_logger_test \ cuckoo_table_builder_test \ @@ -785,6 +786,9 @@ fault_injection_test: db/fault_injection_test.o $(LIBOBJECTS) $(TESTHARNESS) rate_limiter_test: util/rate_limiter_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +delete_scheduler_test: util/delete_scheduler_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_impl.cc b/db/db_impl.cc index 3d98fbc7d..f6d0b8efc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -54,6 +54,7 @@ #include "port/likely.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" +#include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/version.h" @@ -750,7 +751,13 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { continue; } #endif // !ROCKSDB_LITE - auto file_deletion_status = env_->DeleteFile(fname); + Status file_deletion_status; + if (db_options_.delete_scheduler != nullptr && type == kTableFile && + path_id == 0) { + file_deletion_status = db_options_.delete_scheduler->DeleteFile(fname); + } else { + file_deletion_status = env_->DeleteFile(fname); + } if (file_deletion_status.ok()) { Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id, diff --git a/db/db_test.cc b/db/db_test.cc index c74cf370e..287854274 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -35,6 +35,7 @@ #include "rocksdb/compaction_filter.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" +#include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/experimental.h" #include "rocksdb/filter_policy.h" @@ -58,6 +59,7 @@ #include "utilities/merge_operators.h" #include "util/logging.h" #include "util/compression.h" +#include "util/delete_scheduler_impl.h" #include "util/mutexlock.h" #include "util/rate_limiter.h" #include "util/statistics.h" @@ -8255,6 +8257,145 @@ TEST_F(DBTest, DeletingOldWalAfterDrop) { EXPECT_GT(lognum2, lognum1); } +TEST_F(DBTest, RateLimitedDelete) { + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DBTest::RateLimitedDelete:1", + "DeleteSchedulerImpl::BackgroundEmptyTrash"}, + }); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + env_->no_sleep_ = true; + options.env = env_; + + std::string trash_dir = test::TmpDir(env_) + "/trash"; + int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec + Status s; + options.delete_scheduler.reset(NewDeleteScheduler( + env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s)); + ASSERT_OK(s); + + Destroy(last_options_); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(TryReopen(options)); + // Create 4 files in L0 + for (char v = 'a'; v <= 'd'; v++) { + ASSERT_OK(Put("Key2", DummyString(1024, v))); + ASSERT_OK(Put("Key3", DummyString(1024, v))); + ASSERT_OK(Put("Key4", DummyString(1024, v))); + ASSERT_OK(Put("Key1", DummyString(1024, v))); + ASSERT_OK(Put("Key4", DummyString(1024, v))); + ASSERT_OK(Flush()); + } + // We created 4 sst files in L0 + ASSERT_EQ("4", FilesPerLevel(0)); + + uint64_t total_files_size = 0; + std::vector metadata; + db_->GetLiveFilesMetaData(&metadata); + for (const auto& meta : metadata) { + total_files_size += meta.size; + } + + // Compaction will move the 4 files in L0 to trash and create 1 L1 file + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + + // Hold BackgroundEmptyTrash + TEST_SYNC_POINT("DBTest::RateLimitedDelete:1"); + + uint64_t delete_start_time = env_->NowMicros(); + reinterpret_cast(options.delete_scheduler.get()) + ->TEST_WaitForEmptyTrash(); + uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time; + uint64_t expected_delete_time = + ((total_files_size * 1000000) / rate_bytes_per_sec); + ASSERT_GT(time_spent_deleting, expected_delete_time * 0.9); + ASSERT_LT(time_spent_deleting, expected_delete_time * 1.1); + printf("Delete time = %" PRIu64 ", Expected delete time = %" PRIu64 + ", Ratio %f\n", + time_spent_deleting, expected_delete_time, + static_cast(time_spent_deleting) / expected_delete_time); + + env_->no_sleep_ = false; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Create a DB with 2 db_paths, and generate multiple files in the 2 +// db_paths using CompactRangeOptions, make sure that files that were +// deleted from first db_path were deleted using DeleteScheduler and +// files in the second path were not. +TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) { + int bg_delete_file = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + [&](void* arg) { bg_delete_file++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + options.db_paths.emplace_back(dbname_, 1024 * 100); + options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100); + env_->no_sleep_ = true; + options.env = env_; + + std::string trash_dir = test::TmpDir(env_) + "/trash"; + int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec + Status s; + options.delete_scheduler.reset(NewDeleteScheduler( + env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s)); + ASSERT_OK(s); + + DestroyAndReopen(options); + + // Create 4 files in L0 + for (int i = 0; i < 4; i++) { + ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'))); + ASSERT_OK(Flush()); + } + // We created 4 sst files in L0 + ASSERT_EQ("4", FilesPerLevel(0)); + // Compaction will delete files from L0 in first db path and generate a new + // file in L1 in second db path + CompactRangeOptions compact_options; + compact_options.target_path_id = 1; + Slice begin("Key0"); + Slice end("Key3"); + ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + + // Create 4 files in L0 + for (int i = 4; i < 8; i++) { + ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B'))); + ASSERT_OK(Flush()); + } + ASSERT_EQ("4,1", FilesPerLevel(0)); + + // Compaction will delete files from L0 in first db path and generate a new + // file in L1 in second db path + begin = "Key4"; + end = "Key7"; + ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); + ASSERT_EQ("0,2", FilesPerLevel(0)); + + reinterpret_cast(options.delete_scheduler.get()) + ->TEST_WaitForEmptyTrash(); + ASSERT_EQ(bg_delete_file, 8); + + compact_options.bottommost_level_compaction = + BottommostLevelCompaction::kForce; + ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel(0)); + + reinterpret_cast(options.delete_scheduler.get()) + ->TEST_WaitForEmptyTrash(); + ASSERT_EQ(bg_delete_file, 8); + + env_->no_sleep_ = false; + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace rocksdb #endif diff --git a/include/rocksdb/delete_scheduler.h b/include/rocksdb/delete_scheduler.h new file mode 100644 index 000000000..34849a611 --- /dev/null +++ b/include/rocksdb/delete_scheduler.h @@ -0,0 +1,62 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include + +#include "rocksdb/status.h" + +namespace rocksdb { + +class Env; +class Logger; + +// DeleteScheduler allow the DB to enforce a rate limit on file deletion, +// Instead of deleteing files immediately, files are moved to trash_dir +// and deleted in a background thread that apply sleep penlty between deletes +// if they are happening in a rate faster than rate_bytes_per_sec, +// +// Rate limiting can be turned off by setting rate_bytes_per_sec = 0, In this +// case DeleteScheduler will delete files immediately. +class DeleteScheduler { + public: + virtual ~DeleteScheduler() {} + + // Return delete rate limit in bytes per second + virtual int64_t GetRateBytesPerSecond() = 0; + + // Move file to trash directory and schedule it's deletion + virtual Status DeleteFile(const std::string& fname) = 0; + + // Return a map containing errors that happened in the background thread + // file_path => error status + virtual std::map GetBackgroundErrors() = 0; +}; + +// Create a new DeleteScheduler that can be shared among multiple RocksDB +// instances to control the file deletion rate. +// +// @env: Pointer to Env object, please see "rocksdb/env.h". +// @trash_dir: Path to the directory where deleted files will be moved into +// to be deleted in a background thread while applying rate limiting. If this +// directory dont exist, it will be created. This directory should not be +// used by any other process or any other DeleteScheduler. +// @rate_bytes_per_sec: How many bytes should be deleted per second, If this +// value is set to 1024 (1 Kb / sec) and we deleted a file of size 4 Kb +// in 1 second, we will wait for another 3 seconds before we delete other +// files, Set to 0 to disable rate limiting. +// @info_log: If not nullptr, info_log will be used to log errors. +// @delete_exisitng_trash: If set to true, the newly created DeleteScheduler +// will delete files that already exist in trash_dir. +// @status: If not nullptr, status will contain any errors that happened during +// creating the missing trash_dir or deleting existing files in trash. +extern DeleteScheduler* NewDeleteScheduler( + Env* env, const std::string& trash_dir, int64_t rate_bytes_per_sec, + std::shared_ptr info_log = nullptr, + bool delete_exisitng_trash = true, Status* status = nullptr); + +} // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c9246b4b3..7f9e42114 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -43,6 +43,7 @@ class TableFactory; class MemTableRepFactory; class TablePropertiesCollectorFactory; class RateLimiter; +class DeleteScheduler; class Slice; class SliceTransform; class Statistics; @@ -786,6 +787,13 @@ struct DBOptions { // Default: nullptr std::shared_ptr rate_limiter; + // Use to control files deletion rate, can be used among multiple + // RocksDB instances. delete_scheduler is only used to delete table files that + // need to be deleted from the first db_path (db_name if db_paths is empty), + // other files types and other db_paths wont be affected by delete_scheduler. + // Default: nullptr (disabled) + std::shared_ptr delete_scheduler; + // Any internal progress/error information generated by the db will // be written to info_log if it is non-nullptr, or to a file stored // in the same directory as the DB contents if info_log is nullptr. diff --git a/src.mk b/src.mk index 8311a77ed..5744df2f1 100644 --- a/src.mk +++ b/src.mk @@ -83,6 +83,7 @@ LIB_SOURCES = \ util/compaction_job_stats_impl.cc \ util/crc32c.cc \ util/db_info_dumper.cc \ + util/delete_scheduler_impl.cc \ util/dynamic_bloom.cc \ util/env.cc \ util/env_hdfs.cc \ diff --git a/util/delete_scheduler_impl.cc b/util/delete_scheduler_impl.cc new file mode 100644 index 000000000..2b9a4cf23 --- /dev/null +++ b/util/delete_scheduler_impl.cc @@ -0,0 +1,228 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include +#include + +#include "port/port.h" +#include "rocksdb/env.h" +#include "util/delete_scheduler_impl.h" +#include "util/mutexlock.h" +#include "util/sync_point.h" + +namespace rocksdb { + +DeleteSchedulerImpl::DeleteSchedulerImpl(Env* env, const std::string& trash_dir, + int64_t rate_bytes_per_sec, + std::shared_ptr info_log) + : env_(env), + trash_dir_(trash_dir), + rate_bytes_per_sec_(rate_bytes_per_sec), + pending_files_(0), + closing_(false), + cv_(&mu_), + info_log_(info_log) { + if (rate_bytes_per_sec_ == 0) { + // Rate limiting is disabled + bg_thread_.reset(); + } else { + bg_thread_.reset( + new std::thread(&DeleteSchedulerImpl::BackgroundEmptyTrash, this)); + } +} + +DeleteSchedulerImpl::~DeleteSchedulerImpl() { + { + MutexLock l(&mu_); + closing_ = true; + cv_.SignalAll(); + } + if (bg_thread_) { + bg_thread_->join(); + } +} + +Status DeleteSchedulerImpl::DeleteFile(const std::string& file_path) { + if (rate_bytes_per_sec_ == 0) { + // Rate limiting is disabled + return env_->DeleteFile(file_path); + } + + // Move file to trash + std::string path_in_trash; + Status s = MoveToTrash(file_path, &path_in_trash); + if (!s.ok()) { + Log(InfoLogLevel::ERROR_LEVEL, info_log_, + "Failed to move %s to trash directory (%s)", file_path.c_str(), + trash_dir_.c_str()); + return env_->DeleteFile(file_path); + } + + // Add file to delete queue + { + MutexLock l(&mu_); + queue_.push(path_in_trash); + pending_files_++; + if (pending_files_ == 1) { + cv_.SignalAll(); + } + } + return s; +} + +std::map DeleteSchedulerImpl::GetBackgroundErrors() { + MutexLock l(&mu_); + return bg_errors_; +} + +Status DeleteSchedulerImpl::MoveToTrash(const std::string& file_path, + std::string* path_in_trash) { + Status s; + // Figure out the name of the file in trash folder + size_t idx = file_path.rfind("/"); + if (idx == std::string::npos || idx == file_path.size() - 1) { + return Status::InvalidArgument("file_path is corrupted"); + } + *path_in_trash = trash_dir_ + file_path.substr(idx); + std::string unique_suffix = ""; + + if (*path_in_trash == file_path) { + // This file is already in trash + return s; + } + + // TODO(tec) : Implement Env::RenameFileIfNotExist and remove + // file_move_mu mutex. + MutexLock l(&file_move_mu_); + while (true) { + s = env_->FileExists(*path_in_trash + unique_suffix); + if (s.IsNotFound()) { + // We found a path for our file in trash + *path_in_trash += unique_suffix; + s = env_->RenameFile(file_path, *path_in_trash); + break; + } else if (s.ok()) { + // Name conflict, generate new random suffix + unique_suffix = env_->GenerateUniqueId(); + } else { + // Error during FileExists call, we cannot continue + break; + } + } + return s; +} + +void DeleteSchedulerImpl::BackgroundEmptyTrash() { + TEST_SYNC_POINT("DeleteSchedulerImpl::BackgroundEmptyTrash"); + + while (true) { + MutexLock l(&mu_); + while (queue_.empty() && !closing_) { + cv_.Wait(); + } + + if (closing_) { + return; + } + + // Delete all files in queue_ + uint64_t start_time = env_->NowMicros(); + uint64_t total_deleted_bytes = 0; + while (!queue_.empty() && !closing_) { + std::string path_in_trash = queue_.front(); + queue_.pop(); + + // We dont need to hold the lock while deleting the file + mu_.Unlock(); + uint64_t deleted_bytes = 0; + // Delete file from trash and update total_penlty value + Status s = DeleteTrashFile(path_in_trash, &deleted_bytes); + total_deleted_bytes += deleted_bytes; + mu_.Lock(); + + if (!s.ok()) { + bg_errors_[path_in_trash] = s; + } + + // Apply penlty if necessary + uint64_t total_penlty = + ((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_); + while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {} + + pending_files_--; + if (pending_files_ == 0) { + // Unblock TEST_WaitForEmptyTrash since there are no more files waiting + // to be deleted + cv_.SignalAll(); + } + } + } +} + +Status DeleteSchedulerImpl::DeleteTrashFile(const std::string& path_in_trash, + uint64_t* deleted_bytes) { + uint64_t file_size; + Status s = env_->GetFileSize(path_in_trash, &file_size); + if (s.ok()) { + TEST_SYNC_POINT("DeleteSchedulerImpl::DeleteTrashFile:DeleteFile"); + s = env_->DeleteFile(path_in_trash); + } + + if (!s.ok()) { + // Error while getting file size or while deleting + Log(InfoLogLevel::ERROR_LEVEL, info_log_, + "Failed to delete %s from trash -- %s", path_in_trash.c_str(), + s.ToString().c_str()); + *deleted_bytes = 0; + } else { + *deleted_bytes = file_size; + } + + return s; +} + +void DeleteSchedulerImpl::TEST_WaitForEmptyTrash() { + MutexLock l(&mu_); + while (pending_files_ > 0 && !closing_) { + cv_.Wait(); + } +} + +DeleteScheduler* NewDeleteScheduler(Env* env, const std::string& trash_dir, + int64_t rate_bytes_per_sec, + std::shared_ptr info_log, + bool delete_exisitng_trash, + Status* status) { + DeleteScheduler* res = + new DeleteSchedulerImpl(env, trash_dir, rate_bytes_per_sec, info_log); + + Status s; + if (trash_dir != "") { + s = env->CreateDirIfMissing(trash_dir); + if (s.ok() && delete_exisitng_trash) { + std::vector files_in_trash; + s = env->GetChildren(trash_dir, &files_in_trash); + if (s.ok()) { + for (const std::string& trash_file : files_in_trash) { + if (trash_file == "." || trash_file == "..") { + continue; + } + Status file_delete = res->DeleteFile(trash_dir + "/" + trash_file); + if (s.ok() && !file_delete.ok()) { + s = file_delete; + } + } + } + } + } + + if (status) { + *status = s; + } + + return res; +} + +} // namespace rocksdb diff --git a/util/delete_scheduler_impl.h b/util/delete_scheduler_impl.h new file mode 100644 index 000000000..a5b2e2a04 --- /dev/null +++ b/util/delete_scheduler_impl.h @@ -0,0 +1,81 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include +#include + +#include "port/port.h" + +#include "rocksdb/delete_scheduler.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +class Env; +class Logger; + +class DeleteSchedulerImpl : public DeleteScheduler { + public: + DeleteSchedulerImpl(Env* env, const std::string& trash_dir, + int64_t rate_bytes_per_sec, + std::shared_ptr info_log); + + ~DeleteSchedulerImpl(); + + // Return delete rate limit in bytes per second + int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_; } + + // Move file to trash directory and schedule it's deletion + Status DeleteFile(const std::string& fname); + + // Wait for all files being deleteing in the background to finish or for + // destructor to be called. + void TEST_WaitForEmptyTrash(); + + // Return a map containing errors that happened in BackgroundEmptyTrash + // file_path => error status + std::map GetBackgroundErrors(); + + private: + Status MoveToTrash(const std::string& file_path, std::string* path_in_trash); + + Status DeleteTrashFile(const std::string& path_in_trash, + uint64_t* deleted_bytes); + + void BackgroundEmptyTrash(); + + Env* env_; + // Path to the trash directory + std::string trash_dir_; + // Maximum number of bytes that should be deleted per second + int64_t rate_bytes_per_sec_; + // Mutex to protect queue_, pending_files_, bg_errors_, closing_ + port::Mutex mu_; + // Queue of files in trash that need to be deleted + std::queue queue_; + // Number of files in trash that are waiting to be deleted + int32_t pending_files_; + // Errors that happened in BackgroundEmptyTrash (file_path => error) + std::map bg_errors_; + // Set to true in ~DeleteSchedulerImpl() to force BackgroundEmptyTrash to stop + bool closing_; + // Condition variable signaled in these conditions + // - pending_files_ value change from 0 => 1 + // - pending_files_ value change from 1 => 0 + // - closing_ value is set to true + port::CondVar cv_; + // Background thread running BackgroundEmptyTrash + std::unique_ptr bg_thread_; + // Mutex to protect threads from file name conflicts + port::Mutex file_move_mu_; + std::shared_ptr info_log_; + static const uint64_t kMicrosInSecond = 1000 * 1000LL; +}; + +} // namespace rocksdb diff --git a/util/delete_scheduler_test.cc b/util/delete_scheduler_test.cc new file mode 100644 index 000000000..4c4997376 --- /dev/null +++ b/util/delete_scheduler_test.cc @@ -0,0 +1,439 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include +#include + +#include "rocksdb/delete_scheduler.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "util/delete_scheduler_impl.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/testharness.h" + +namespace rocksdb { + +class DeleteSchedulerTest : public testing::Test { + public: + DeleteSchedulerTest() : env_(Env::Default()) { + dummy_files_dir_ = test::TmpDir(env_) + "/dummy_data_dir"; + DestroyAndCreateDir(dummy_files_dir_); + trash_dir_ = test::TmpDir(env_) + "/trash"; + DestroyAndCreateDir(trash_dir_); + } + + ~DeleteSchedulerTest() { + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->LoadDependency({}); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + DestroyDir(dummy_files_dir_); + if (delete_scheduler_ != nullptr) { + delete delete_scheduler_; + delete_scheduler_ = nullptr; + } + } + + void WaitForEmptyTrash() { + reinterpret_cast(delete_scheduler_) + ->TEST_WaitForEmptyTrash(); + } + + void DestroyDir(const std::string& dir) { + if (env_->FileExists(dir).IsNotFound()) { + return; + } + std::vector files_in_dir; + EXPECT_OK(env_->GetChildren(dir, &files_in_dir)); + for (auto& file_in_dir : files_in_dir) { + if (file_in_dir == "." || file_in_dir == "..") { + continue; + } + EXPECT_OK(env_->DeleteFile(dir + "/" + file_in_dir)); + } + EXPECT_OK(env_->DeleteDir(dir)); + } + + void DestroyAndCreateDir(const std::string& dir) { + DestroyDir(dir); + EXPECT_OK(env_->CreateDir(dir)); + } + + int CountFilesInDir(const std::string& dir) { + std::vector files_in_dir; + EXPECT_OK(env_->GetChildren(dir, &files_in_dir)); + // Ignore "." and ".." + return static_cast(files_in_dir.size()) - 2; + } + + std::string NewDummyFile(const std::string& file_name, uint64_t size = 1024) { + std::string file_path = dummy_files_dir_ + "/" + file_name; + std::unique_ptr f; + env_->NewWritableFile(file_path, &f, EnvOptions()); + std::string data(size, 'A'); + EXPECT_OK(f->Append(data)); + EXPECT_OK(f->Close()); + return file_path; + } + + Env* env_; + std::string dummy_files_dir_; + std::string trash_dir_; + int64_t rate_bytes_per_sec_; + DeleteScheduler* delete_scheduler_; +}; + +// Test the basic functionality of DeleteScheduler (Rate Limiting). +// 1- Create 100 dummy files +// 2- Delete the 100 dummy files using DeleteScheduler +// 3- Wait for DeleteScheduler to delete all files in trash +// 4- Measure time spent in step 2,3 and make sure it matches the expected +// time from a rate limited delete +// 5- Make sure that all created files were completely deleted +TEST_F(DeleteSchedulerTest, BasicRateLimiting) { + int num_files = 100; // 100 files + uint64_t file_size = 1024; // every file is 1 kb + std::vector delete_kbs_per_sec = {512, 200, 100, 50, 25}; + + for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) { + DestroyAndCreateDir(dummy_files_dir_); + rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024; + delete_scheduler_ = + NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_); + + // Create 100 dummy files, every file is 1 Kb + std::vector generated_files; + uint64_t total_files_size = 0; + for (int i = 0; i < num_files; i++) { + std::string file_name = "file" + ToString(i) + ".data"; + generated_files.push_back(NewDummyFile(file_name, file_size)); + total_files_size += file_size; + } + + // Delete dummy files and measure time spent to empty trash + uint64_t delete_start_time = env_->NowMicros(); + for (int i = 0; i < num_files; i++) { + ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i])); + } + ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0); + + WaitForEmptyTrash(); + uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time; + uint64_t expected_delete_time = + ((total_files_size * 1000000) / rate_bytes_per_sec_); + ASSERT_GT(time_spent_deleting, expected_delete_time * 0.9); + ASSERT_LT(time_spent_deleting, expected_delete_time * 1.1); + printf("Delete time = %" PRIu64 ", Expected delete time = %" PRIu64 + ", Ratio %f\n", + time_spent_deleting, expected_delete_time, + static_cast(time_spent_deleting) / expected_delete_time); + + ASSERT_EQ(CountFilesInDir(trash_dir_), 0); + auto bg_errors = delete_scheduler_->GetBackgroundErrors(); + ASSERT_EQ(bg_errors.size(), 0); + } +} + +// Same as the BasicRateLimiting test but delete files in multiple threads. +// 1- Create 100 dummy files +// 2- Delete the 100 dummy files using DeleteScheduler using 10 threads +// 3- Wait for DeleteScheduler to delete all files in queue +// 4- Measure time spent in step 2,3 and make sure it matches the expected +// time from a rate limited delete +// 5- Make sure that all created files were completely deleted +TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) { + int thread_cnt = 10; + int num_files = 10; // 10 files per thread + uint64_t file_size = 1024; // every file is 1 kb + std::vector delete_kbs_per_sec = {512, 200, 100, 50, 25}; + + for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) { + DestroyAndCreateDir(dummy_files_dir_); + rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024; + delete_scheduler_ = + NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_); + + // Create 100 dummy files, every file is 1 Kb + std::vector generated_files; + uint64_t total_files_size = 0; + for (int i = 0; i < num_files * thread_cnt; i++) { + std::string file_name = "file" + ToString(i) + ".data"; + generated_files.push_back(NewDummyFile(file_name, file_size)); + total_files_size += file_size; + } + + // Delete dummy files using 10 threads and measure time spent to empty trash + uint64_t delete_start_time = env_->NowMicros(); + std::atomic thread_num(0); + std::vector threads; + for (int i = 0; i < thread_cnt; i++) { + threads.emplace_back([&]() { + int idx = thread_num.fetch_add(1); + int range_start = idx * num_files; + int range_end = range_start + num_files; + for (int j = range_start; j < range_end; j++){ + ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[j])); + } + }); + } + + for (size_t i = 0; i < threads.size(); i++) { + threads[i].join(); + } + ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0); + + WaitForEmptyTrash(); + uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time; + uint64_t expected_delete_time = + ((total_files_size * 1000000) / rate_bytes_per_sec_); + ASSERT_GT(time_spent_deleting, expected_delete_time * 0.9); + ASSERT_LT(time_spent_deleting, expected_delete_time * 1.1); + printf("Delete time = %" PRIu64 ", Expected delete time = %" PRIu64 + ", Ratio %f\n", + time_spent_deleting, expected_delete_time, + static_cast(time_spent_deleting) / expected_delete_time); + + ASSERT_EQ(CountFilesInDir(trash_dir_), 0); + auto bg_errors = delete_scheduler_->GetBackgroundErrors(); + ASSERT_EQ(bg_errors.size(), 0); + } +} + +// Disable rate limiting by setting rate_bytes_per_sec_ to 0 and make sure +// that when DeleteScheduler delete a file it delete it immediately and dont +// move it to trash +TEST_F(DeleteSchedulerTest, DisableRateLimiting) { + int bg_delete_file = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + [&](void* arg) { bg_delete_file++; }); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + delete_scheduler_ = NewDeleteScheduler(env_, "", 0); + + for (int i = 0; i < 10; i++) { + // Every file we delete will be deleted immediately + std::string dummy_file = NewDummyFile("dummy.data"); + ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file)); + ASSERT_TRUE(env_->FileExists(dummy_file).IsNotFound()); + ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0); + ASSERT_EQ(CountFilesInDir(trash_dir_), 0); + } + + ASSERT_EQ(bg_delete_file, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// Testing that moving files to trash with the same name is not a problem +// 1- Create 10 files with the same name "conflict.data" +// 2- Delete the 10 files using DeleteScheduler +// 3- Make sure that trash directory contain 10 files ("conflict.data" x 10) +// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash --- +// 4- Make sure that files are deleted from trash +TEST_F(DeleteSchedulerTest, ConflictNames) { + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DeleteSchedulerTest::ConflictNames:1", + "DeleteSchedulerImpl::BackgroundEmptyTrash"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec + delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_); + + // Create "conflict.data" and move it to trash 10 times + for (int i = 0; i < 10; i++) { + std::string dummy_file = NewDummyFile("conflict.data"); + ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file)); + } + ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0); + // 10 files ("conflict.data" x 10) in trash + ASSERT_EQ(CountFilesInDir(trash_dir_), 10); + + // Hold BackgroundEmptyTrash + TEST_SYNC_POINT("DeleteSchedulerTest::ConflictNames:1"); + WaitForEmptyTrash(); + ASSERT_EQ(CountFilesInDir(trash_dir_), 0); + + auto bg_errors = delete_scheduler_->GetBackgroundErrors(); + ASSERT_EQ(bg_errors.size(), 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// 1- Create 10 dummy files +// 2- Delete the 10 files using DeleteScheduler (move them to trsah) +// 3- Delete the 10 files directly (using env_->DeleteFile) +// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash --- +// 4- Make sure that DeleteScheduler failed to delete the 10 files and +// reported 10 background errors +TEST_F(DeleteSchedulerTest, BackgroundError) { + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"DeleteSchedulerTest::BackgroundError:1", + "DeleteSchedulerImpl::BackgroundEmptyTrash"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec + delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_); + + // Generate 10 dummy files and move them to trash + for (int i = 0; i < 10; i++) { + std::string file_name = "data_" + ToString(i) + ".data"; + ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); + } + ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0); + ASSERT_EQ(CountFilesInDir(trash_dir_), 10); + + // Delete 10 files from trash, this will cause background errors in + // BackgroundEmptyTrash since we already deleted the files it was + // goind to delete + for (int i = 0; i < 10; i++) { + std::string file_name = "data_" + ToString(i) + ".data"; + ASSERT_OK(env_->DeleteFile(trash_dir_ + "/" + file_name)); + } + + // Hold BackgroundEmptyTrash + TEST_SYNC_POINT("DeleteSchedulerTest::BackgroundError:1"); + WaitForEmptyTrash(); + auto bg_errors = delete_scheduler_->GetBackgroundErrors(); + ASSERT_EQ(bg_errors.size(), 10); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// 1- Create 10 files in trash +// 2- Create a DeleteScheduler with delete_exisitng_trash = true +// 3- Wait for DeleteScheduler to delete all files in queue +// 4- Make sure that all files in trash directory were deleted +TEST_F(DeleteSchedulerTest, TrashWithExistingFiles) { + std::vector dummy_files; + for (int i = 0; i < 10; i++) { + std::string file_name = "data_" + ToString(i) + ".data"; + std::string trash_path = trash_dir_ + "/" + file_name; + env_->RenameFile(NewDummyFile(file_name), trash_path); + } + ASSERT_EQ(CountFilesInDir(trash_dir_), 10); + + Status s; + rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec + delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_, + nullptr, true, &s); + ASSERT_OK(s); + + WaitForEmptyTrash(); + ASSERT_EQ(CountFilesInDir(trash_dir_), 0); + + auto bg_errors = delete_scheduler_->GetBackgroundErrors(); + ASSERT_EQ(bg_errors.size(), 0); +} + +// 1- Create 10 dummy files +// 2- Delete 10 dummy files using DeleteScheduler +// 3- Wait for DeleteScheduler to delete all files in queue +// 4- Make sure all files in trash directory were deleted +// 5- Repeat previous steps 5 times +TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) { + int bg_delete_file = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + [&](void* arg) { bg_delete_file++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec + delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_); + + // Move files to trash, wait for empty trash, start again + for (int run = 1; run <= 5; run++) { + // Generate 10 dummy files and move them to trash + for (int i = 0; i < 10; i++) { + std::string file_name = "data_" + ToString(i) + ".data"; + ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); + } + ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0); + WaitForEmptyTrash(); + ASSERT_EQ(bg_delete_file, 10 * run); + ASSERT_EQ(CountFilesInDir(trash_dir_), 0); + + auto bg_errors = delete_scheduler_->GetBackgroundErrors(); + ASSERT_EQ(bg_errors.size(), 0); + } + + ASSERT_EQ(bg_delete_file, 50); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +} + +// 1- Create a DeleteScheduler with very slow rate limit (1 Byte / sec) +// 2- Delete 100 files using DeleteScheduler +// 3- Delete the DeleteScheduler (call the destructor while queue is not empty) +// 4- Make sure that not all files were deleted from trash and that +// DeleteScheduler background thread did not delete all files +TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) { + int bg_delete_file = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + [&](void* arg) { bg_delete_file++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + rate_bytes_per_sec_ = 1; // 1 Byte / sec + delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_); + + for (int i = 0; i < 100; i++) { + std::string file_name = "data_" + ToString(i) + ".data"; + ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); + } + + // Deleting 100 files will need >28 hours to delete + // we will delete the DeleteScheduler while delete queue is not empty + delete delete_scheduler_; + delete_scheduler_ = nullptr; + + ASSERT_LT(bg_delete_file, 100); + ASSERT_GT(CountFilesInDir(trash_dir_), 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +// 1- Delete the trash directory +// 2- Delete 10 files using DeleteScheduler +// 3- Make sure that the 10 files were deleted immediately since DeleteScheduler +// failed to move them to trash directory +TEST_F(DeleteSchedulerTest, MoveToTrashError) { + int bg_delete_file = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + [&](void* arg) { bg_delete_file++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + rate_bytes_per_sec_ = 1024; // 1 Kb / sec + delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_); + + // We will delete the trash directory, that mean that DeleteScheduler wont + // be able to move files to trash and will delete files them immediately. + DestroyDir(trash_dir_); + for (int i = 0; i < 10; i++) { + std::string file_name = "data_" + ToString(i) + ".data"; + ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); + } + + ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0); + ASSERT_EQ(bg_delete_file, 0); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/util/options.cc b/util/options.cc index 76aa3c8a0..f42d41a78 100644 --- a/util/options.cc +++ b/util/options.cc @@ -21,6 +21,7 @@ #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/comparator.h" +#include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/memtablerep.h" #include "rocksdb/merge_operator.h" @@ -199,6 +200,7 @@ DBOptions::DBOptions() paranoid_checks(true), env(Env::Default()), rate_limiter(nullptr), + delete_scheduler(nullptr), info_log(nullptr), #ifdef NDEBUG info_log_level(INFO_LEVEL), @@ -250,6 +252,7 @@ DBOptions::DBOptions(const Options& options) paranoid_checks(options.paranoid_checks), env(options.env), rate_limiter(options.rate_limiter), + delete_scheduler(options.delete_scheduler), info_log(options.info_log), info_log_level(options.info_log_level), max_open_files(options.max_open_files), @@ -360,6 +363,8 @@ void DBOptions::Dump(Logger* log) const { use_adaptive_mutex); Warn(log, " Options.rate_limiter: %p", rate_limiter.get()); + Warn(log, " Options.delete_scheduler.rate_bytes_per_sec: %" PRIi64, + delete_scheduler ? delete_scheduler->GetRateBytesPerSecond() : 0); Warn(log, " Options.bytes_per_sync: %" PRIu64, bytes_per_sync); Warn(log, " Options.wal_bytes_per_sync: %" PRIu64,