Support delete rate limiting

Summary:
Introduce DeleteScheduler that allow enforcing a rate limit on file deletion
Instead of deleting files immediately, files are moved to trash directory and deleted in a background thread that apply sleep penalty between deletes if needed.

I have updated PurgeObsoleteFiles and PurgeObsoleteWALFiles to use the delete_scheduler instead of env_->DeleteFile

Test Plan:
added delete_scheduler_test
existing unit tests

Reviewers: kradhakrishnan, anthony, rven, yhchiang, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D43221
main
Islam AbdelRahman 9 years ago
parent 102ac118b2
commit c45a57b41e
  1. 1
      HISTORY.md
  2. 4
      Makefile
  3. 9
      db/db_impl.cc
  4. 141
      db/db_test.cc
  5. 62
      include/rocksdb/delete_scheduler.h
  6. 8
      include/rocksdb/options.h
  7. 1
      src.mk
  8. 228
      util/delete_scheduler_impl.cc
  9. 81
      util/delete_scheduler_impl.h
  10. 439
      util/delete_scheduler_test.cc
  11. 5
      util/options.cc

@ -6,6 +6,7 @@
* Add DBOptions::skip_stats_update_on_db_open. When it is on, DB::Open() will run faster as it skips the random reads required for loading necessary stats from SST files to optimize compaction. * Add DBOptions::skip_stats_update_on_db_open. When it is on, DB::Open() will run faster as it skips the random reads required for loading necessary stats from SST files to optimize compaction.
* RollbackToSavePoint() in WriteBatch/WriteBatchWithIndex * RollbackToSavePoint() in WriteBatch/WriteBatchWithIndex
* Add NewCompactOnDeletionCollectorFactory() in utilities/table_properties_collectors, which allows rocksdb to mark a SST file as need-compaction when it observes at least D deletion entries in any N consecutive entries in that SST file. Note that this feature depends on an experimental NeedCompact() API --- the result of this API will not persist after DB restart. * Add NewCompactOnDeletionCollectorFactory() in utilities/table_properties_collectors, which allows rocksdb to mark a SST file as need-compaction when it observes at least D deletion entries in any N consecutive entries in that SST file. Note that this feature depends on an experimental NeedCompact() API --- the result of this API will not persist after DB restart.
* Add DBOptions::delete_scheduler. Use NewDeleteScheduler() in include/rocksdb/delete_scheduler.h to create a DeleteScheduler that can be shared among multiple RocksDB instances to control the file deletion rate of SST files that exist in the first db_path.
### Public API Changes ### Public API Changes
* Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it. * Deprecated WriteOptions::timeout_hint_us. We no longer support write timeout. If you really need this option, talk to us and we might consider returning it.

@ -285,6 +285,7 @@ TESTS = \
thread_local_test \ thread_local_test \
geodb_test \ geodb_test \
rate_limiter_test \ rate_limiter_test \
delete_scheduler_test \
options_test \ options_test \
event_logger_test \ event_logger_test \
cuckoo_table_builder_test \ cuckoo_table_builder_test \
@ -785,6 +786,9 @@ fault_injection_test: db/fault_injection_test.o $(LIBOBJECTS) $(TESTHARNESS)
rate_limiter_test: util/rate_limiter_test.o $(LIBOBJECTS) $(TESTHARNESS) rate_limiter_test: util/rate_limiter_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
delete_scheduler_test: util/delete_scheduler_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

@ -54,6 +54,7 @@
#include "port/likely.h" #include "port/likely.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
#include "rocksdb/version.h" #include "rocksdb/version.h"
@ -750,7 +751,13 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) {
continue; continue;
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
auto file_deletion_status = env_->DeleteFile(fname); Status file_deletion_status;
if (db_options_.delete_scheduler != nullptr && type == kTableFile &&
path_id == 0) {
file_deletion_status = db_options_.delete_scheduler->DeleteFile(fname);
} else {
file_deletion_status = env_->DeleteFile(fname);
}
if (file_deletion_status.ok()) { if (file_deletion_status.ok()) {
Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log,
"[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id, "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id,

@ -35,6 +35,7 @@
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/experimental.h" #include "rocksdb/experimental.h"
#include "rocksdb/filter_policy.h" #include "rocksdb/filter_policy.h"
@ -58,6 +59,7 @@
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/delete_scheduler_impl.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
#include "util/statistics.h" #include "util/statistics.h"
@ -8255,6 +8257,145 @@ TEST_F(DBTest, DeletingOldWalAfterDrop) {
EXPECT_GT(lognum2, lognum1); EXPECT_GT(lognum2, lognum1);
} }
TEST_F(DBTest, RateLimitedDelete) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DBTest::RateLimitedDelete:1",
"DeleteSchedulerImpl::BackgroundEmptyTrash"},
});
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Options options = CurrentOptions();
options.disable_auto_compactions = true;
env_->no_sleep_ = true;
options.env = env_;
std::string trash_dir = test::TmpDir(env_) + "/trash";
int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec
Status s;
options.delete_scheduler.reset(NewDeleteScheduler(
env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s));
ASSERT_OK(s);
Destroy(last_options_);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(TryReopen(options));
// Create 4 files in L0
for (char v = 'a'; v <= 'd'; v++) {
ASSERT_OK(Put("Key2", DummyString(1024, v)));
ASSERT_OK(Put("Key3", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Put("Key1", DummyString(1024, v)));
ASSERT_OK(Put("Key4", DummyString(1024, v)));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
ASSERT_EQ("4", FilesPerLevel(0));
uint64_t total_files_size = 0;
std::vector<LiveFileMetaData> metadata;
db_->GetLiveFilesMetaData(&metadata);
for (const auto& meta : metadata) {
total_files_size += meta.size;
}
// Compaction will move the 4 files in L0 to trash and create 1 L1 file
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0));
// Hold BackgroundEmptyTrash
TEST_SYNC_POINT("DBTest::RateLimitedDelete:1");
uint64_t delete_start_time = env_->NowMicros();
reinterpret_cast<DeleteSchedulerImpl*>(options.delete_scheduler.get())
->TEST_WaitForEmptyTrash();
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
uint64_t expected_delete_time =
((total_files_size * 1000000) / rate_bytes_per_sec);
ASSERT_GT(time_spent_deleting, expected_delete_time * 0.9);
ASSERT_LT(time_spent_deleting, expected_delete_time * 1.1);
printf("Delete time = %" PRIu64 ", Expected delete time = %" PRIu64
", Ratio %f\n",
time_spent_deleting, expected_delete_time,
static_cast<double>(time_spent_deleting) / expected_delete_time);
env_->no_sleep_ = false;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// Create a DB with 2 db_paths, and generate multiple files in the 2
// db_paths using CompactRangeOptions, make sure that files that were
// deleted from first db_path were deleted using DeleteScheduler and
// files in the second path were not.
TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions();
options.disable_auto_compactions = true;
options.db_paths.emplace_back(dbname_, 1024 * 100);
options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100);
env_->no_sleep_ = true;
options.env = env_;
std::string trash_dir = test::TmpDir(env_) + "/trash";
int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec
Status s;
options.delete_scheduler.reset(NewDeleteScheduler(
env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s));
ASSERT_OK(s);
DestroyAndReopen(options);
// Create 4 files in L0
for (int i = 0; i < 4; i++) {
ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
ASSERT_OK(Flush());
}
// We created 4 sst files in L0
ASSERT_EQ("4", FilesPerLevel(0));
// Compaction will delete files from L0 in first db path and generate a new
// file in L1 in second db path
CompactRangeOptions compact_options;
compact_options.target_path_id = 1;
Slice begin("Key0");
Slice end("Key3");
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
ASSERT_EQ("0,1", FilesPerLevel(0));
// Create 4 files in L0
for (int i = 4; i < 8; i++) {
ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B')));
ASSERT_OK(Flush());
}
ASSERT_EQ("4,1", FilesPerLevel(0));
// Compaction will delete files from L0 in first db path and generate a new
// file in L1 in second db path
begin = "Key4";
end = "Key7";
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
ASSERT_EQ("0,2", FilesPerLevel(0));
reinterpret_cast<DeleteSchedulerImpl*>(options.delete_scheduler.get())
->TEST_WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, 8);
compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForce;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(0));
reinterpret_cast<DeleteSchedulerImpl*>(options.delete_scheduler.get())
->TEST_WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, 8);
env_->no_sleep_ = false;
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb } // namespace rocksdb
#endif #endif

@ -0,0 +1,62 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <map>
#include <string>
#include "rocksdb/status.h"
namespace rocksdb {
class Env;
class Logger;
// DeleteScheduler allow the DB to enforce a rate limit on file deletion,
// Instead of deleteing files immediately, files are moved to trash_dir
// and deleted in a background thread that apply sleep penlty between deletes
// if they are happening in a rate faster than rate_bytes_per_sec,
//
// Rate limiting can be turned off by setting rate_bytes_per_sec = 0, In this
// case DeleteScheduler will delete files immediately.
class DeleteScheduler {
public:
virtual ~DeleteScheduler() {}
// Return delete rate limit in bytes per second
virtual int64_t GetRateBytesPerSecond() = 0;
// Move file to trash directory and schedule it's deletion
virtual Status DeleteFile(const std::string& fname) = 0;
// Return a map containing errors that happened in the background thread
// file_path => error status
virtual std::map<std::string, Status> GetBackgroundErrors() = 0;
};
// Create a new DeleteScheduler that can be shared among multiple RocksDB
// instances to control the file deletion rate.
//
// @env: Pointer to Env object, please see "rocksdb/env.h".
// @trash_dir: Path to the directory where deleted files will be moved into
// to be deleted in a background thread while applying rate limiting. If this
// directory dont exist, it will be created. This directory should not be
// used by any other process or any other DeleteScheduler.
// @rate_bytes_per_sec: How many bytes should be deleted per second, If this
// value is set to 1024 (1 Kb / sec) and we deleted a file of size 4 Kb
// in 1 second, we will wait for another 3 seconds before we delete other
// files, Set to 0 to disable rate limiting.
// @info_log: If not nullptr, info_log will be used to log errors.
// @delete_exisitng_trash: If set to true, the newly created DeleteScheduler
// will delete files that already exist in trash_dir.
// @status: If not nullptr, status will contain any errors that happened during
// creating the missing trash_dir or deleting existing files in trash.
extern DeleteScheduler* NewDeleteScheduler(
Env* env, const std::string& trash_dir, int64_t rate_bytes_per_sec,
std::shared_ptr<Logger> info_log = nullptr,
bool delete_exisitng_trash = true, Status* status = nullptr);
} // namespace rocksdb

@ -43,6 +43,7 @@ class TableFactory;
class MemTableRepFactory; class MemTableRepFactory;
class TablePropertiesCollectorFactory; class TablePropertiesCollectorFactory;
class RateLimiter; class RateLimiter;
class DeleteScheduler;
class Slice; class Slice;
class SliceTransform; class SliceTransform;
class Statistics; class Statistics;
@ -786,6 +787,13 @@ struct DBOptions {
// Default: nullptr // Default: nullptr
std::shared_ptr<RateLimiter> rate_limiter; std::shared_ptr<RateLimiter> rate_limiter;
// Use to control files deletion rate, can be used among multiple
// RocksDB instances. delete_scheduler is only used to delete table files that
// need to be deleted from the first db_path (db_name if db_paths is empty),
// other files types and other db_paths wont be affected by delete_scheduler.
// Default: nullptr (disabled)
std::shared_ptr<DeleteScheduler> delete_scheduler;
// Any internal progress/error information generated by the db will // Any internal progress/error information generated by the db will
// be written to info_log if it is non-nullptr, or to a file stored // be written to info_log if it is non-nullptr, or to a file stored
// in the same directory as the DB contents if info_log is nullptr. // in the same directory as the DB contents if info_log is nullptr.

@ -83,6 +83,7 @@ LIB_SOURCES = \
util/compaction_job_stats_impl.cc \ util/compaction_job_stats_impl.cc \
util/crc32c.cc \ util/crc32c.cc \
util/db_info_dumper.cc \ util/db_info_dumper.cc \
util/delete_scheduler_impl.cc \
util/dynamic_bloom.cc \ util/dynamic_bloom.cc \
util/env.cc \ util/env.cc \
util/env_hdfs.cc \ util/env_hdfs.cc \

@ -0,0 +1,228 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include <thread>
#include <vector>
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/delete_scheduler_impl.h"
#include "util/mutexlock.h"
#include "util/sync_point.h"
namespace rocksdb {
DeleteSchedulerImpl::DeleteSchedulerImpl(Env* env, const std::string& trash_dir,
int64_t rate_bytes_per_sec,
std::shared_ptr<Logger> info_log)
: env_(env),
trash_dir_(trash_dir),
rate_bytes_per_sec_(rate_bytes_per_sec),
pending_files_(0),
closing_(false),
cv_(&mu_),
info_log_(info_log) {
if (rate_bytes_per_sec_ == 0) {
// Rate limiting is disabled
bg_thread_.reset();
} else {
bg_thread_.reset(
new std::thread(&DeleteSchedulerImpl::BackgroundEmptyTrash, this));
}
}
DeleteSchedulerImpl::~DeleteSchedulerImpl() {
{
MutexLock l(&mu_);
closing_ = true;
cv_.SignalAll();
}
if (bg_thread_) {
bg_thread_->join();
}
}
Status DeleteSchedulerImpl::DeleteFile(const std::string& file_path) {
if (rate_bytes_per_sec_ == 0) {
// Rate limiting is disabled
return env_->DeleteFile(file_path);
}
// Move file to trash
std::string path_in_trash;
Status s = MoveToTrash(file_path, &path_in_trash);
if (!s.ok()) {
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
"Failed to move %s to trash directory (%s)", file_path.c_str(),
trash_dir_.c_str());
return env_->DeleteFile(file_path);
}
// Add file to delete queue
{
MutexLock l(&mu_);
queue_.push(path_in_trash);
pending_files_++;
if (pending_files_ == 1) {
cv_.SignalAll();
}
}
return s;
}
std::map<std::string, Status> DeleteSchedulerImpl::GetBackgroundErrors() {
MutexLock l(&mu_);
return bg_errors_;
}
Status DeleteSchedulerImpl::MoveToTrash(const std::string& file_path,
std::string* path_in_trash) {
Status s;
// Figure out the name of the file in trash folder
size_t idx = file_path.rfind("/");
if (idx == std::string::npos || idx == file_path.size() - 1) {
return Status::InvalidArgument("file_path is corrupted");
}
*path_in_trash = trash_dir_ + file_path.substr(idx);
std::string unique_suffix = "";
if (*path_in_trash == file_path) {
// This file is already in trash
return s;
}
// TODO(tec) : Implement Env::RenameFileIfNotExist and remove
// file_move_mu mutex.
MutexLock l(&file_move_mu_);
while (true) {
s = env_->FileExists(*path_in_trash + unique_suffix);
if (s.IsNotFound()) {
// We found a path for our file in trash
*path_in_trash += unique_suffix;
s = env_->RenameFile(file_path, *path_in_trash);
break;
} else if (s.ok()) {
// Name conflict, generate new random suffix
unique_suffix = env_->GenerateUniqueId();
} else {
// Error during FileExists call, we cannot continue
break;
}
}
return s;
}
void DeleteSchedulerImpl::BackgroundEmptyTrash() {
TEST_SYNC_POINT("DeleteSchedulerImpl::BackgroundEmptyTrash");
while (true) {
MutexLock l(&mu_);
while (queue_.empty() && !closing_) {
cv_.Wait();
}
if (closing_) {
return;
}
// Delete all files in queue_
uint64_t start_time = env_->NowMicros();
uint64_t total_deleted_bytes = 0;
while (!queue_.empty() && !closing_) {
std::string path_in_trash = queue_.front();
queue_.pop();
// We dont need to hold the lock while deleting the file
mu_.Unlock();
uint64_t deleted_bytes = 0;
// Delete file from trash and update total_penlty value
Status s = DeleteTrashFile(path_in_trash, &deleted_bytes);
total_deleted_bytes += deleted_bytes;
mu_.Lock();
if (!s.ok()) {
bg_errors_[path_in_trash] = s;
}
// Apply penlty if necessary
uint64_t total_penlty =
((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_);
while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {}
pending_files_--;
if (pending_files_ == 0) {
// Unblock TEST_WaitForEmptyTrash since there are no more files waiting
// to be deleted
cv_.SignalAll();
}
}
}
}
Status DeleteSchedulerImpl::DeleteTrashFile(const std::string& path_in_trash,
uint64_t* deleted_bytes) {
uint64_t file_size;
Status s = env_->GetFileSize(path_in_trash, &file_size);
if (s.ok()) {
TEST_SYNC_POINT("DeleteSchedulerImpl::DeleteTrashFile:DeleteFile");
s = env_->DeleteFile(path_in_trash);
}
if (!s.ok()) {
// Error while getting file size or while deleting
Log(InfoLogLevel::ERROR_LEVEL, info_log_,
"Failed to delete %s from trash -- %s", path_in_trash.c_str(),
s.ToString().c_str());
*deleted_bytes = 0;
} else {
*deleted_bytes = file_size;
}
return s;
}
void DeleteSchedulerImpl::TEST_WaitForEmptyTrash() {
MutexLock l(&mu_);
while (pending_files_ > 0 && !closing_) {
cv_.Wait();
}
}
DeleteScheduler* NewDeleteScheduler(Env* env, const std::string& trash_dir,
int64_t rate_bytes_per_sec,
std::shared_ptr<Logger> info_log,
bool delete_exisitng_trash,
Status* status) {
DeleteScheduler* res =
new DeleteSchedulerImpl(env, trash_dir, rate_bytes_per_sec, info_log);
Status s;
if (trash_dir != "") {
s = env->CreateDirIfMissing(trash_dir);
if (s.ok() && delete_exisitng_trash) {
std::vector<std::string> files_in_trash;
s = env->GetChildren(trash_dir, &files_in_trash);
if (s.ok()) {
for (const std::string& trash_file : files_in_trash) {
if (trash_file == "." || trash_file == "..") {
continue;
}
Status file_delete = res->DeleteFile(trash_dir + "/" + trash_file);
if (s.ok() && !file_delete.ok()) {
s = file_delete;
}
}
}
}
}
if (status) {
*status = s;
}
return res;
}
} // namespace rocksdb

@ -0,0 +1,81 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <map>
#include <queue>
#include <string>
#include <thread>
#include "port/port.h"
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/status.h"
namespace rocksdb {
class Env;
class Logger;
class DeleteSchedulerImpl : public DeleteScheduler {
public:
DeleteSchedulerImpl(Env* env, const std::string& trash_dir,
int64_t rate_bytes_per_sec,
std::shared_ptr<Logger> info_log);
~DeleteSchedulerImpl();
// Return delete rate limit in bytes per second
int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_; }
// Move file to trash directory and schedule it's deletion
Status DeleteFile(const std::string& fname);
// Wait for all files being deleteing in the background to finish or for
// destructor to be called.
void TEST_WaitForEmptyTrash();
// Return a map containing errors that happened in BackgroundEmptyTrash
// file_path => error status
std::map<std::string, Status> GetBackgroundErrors();
private:
Status MoveToTrash(const std::string& file_path, std::string* path_in_trash);
Status DeleteTrashFile(const std::string& path_in_trash,
uint64_t* deleted_bytes);
void BackgroundEmptyTrash();
Env* env_;
// Path to the trash directory
std::string trash_dir_;
// Maximum number of bytes that should be deleted per second
int64_t rate_bytes_per_sec_;
// Mutex to protect queue_, pending_files_, bg_errors_, closing_
port::Mutex mu_;
// Queue of files in trash that need to be deleted
std::queue<std::string> queue_;
// Number of files in trash that are waiting to be deleted
int32_t pending_files_;
// Errors that happened in BackgroundEmptyTrash (file_path => error)
std::map<std::string, Status> bg_errors_;
// Set to true in ~DeleteSchedulerImpl() to force BackgroundEmptyTrash to stop
bool closing_;
// Condition variable signaled in these conditions
// - pending_files_ value change from 0 => 1
// - pending_files_ value change from 1 => 0
// - closing_ value is set to true
port::CondVar cv_;
// Background thread running BackgroundEmptyTrash
std::unique_ptr<std::thread> bg_thread_;
// Mutex to protect threads from file name conflicts
port::Mutex file_move_mu_;
std::shared_ptr<Logger> info_log_;
static const uint64_t kMicrosInSecond = 1000 * 1000LL;
};
} // namespace rocksdb

@ -0,0 +1,439 @@
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include <atomic>
#include <thread>
#include <vector>
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "util/delete_scheduler_impl.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/testharness.h"
namespace rocksdb {
class DeleteSchedulerTest : public testing::Test {
public:
DeleteSchedulerTest() : env_(Env::Default()) {
dummy_files_dir_ = test::TmpDir(env_) + "/dummy_data_dir";
DestroyAndCreateDir(dummy_files_dir_);
trash_dir_ = test::TmpDir(env_) + "/trash";
DestroyAndCreateDir(trash_dir_);
}
~DeleteSchedulerTest() {
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
rocksdb::SyncPoint::GetInstance()->LoadDependency({});
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
DestroyDir(dummy_files_dir_);
if (delete_scheduler_ != nullptr) {
delete delete_scheduler_;
delete_scheduler_ = nullptr;
}
}
void WaitForEmptyTrash() {
reinterpret_cast<DeleteSchedulerImpl*>(delete_scheduler_)
->TEST_WaitForEmptyTrash();
}
void DestroyDir(const std::string& dir) {
if (env_->FileExists(dir).IsNotFound()) {
return;
}
std::vector<std::string> files_in_dir;
EXPECT_OK(env_->GetChildren(dir, &files_in_dir));
for (auto& file_in_dir : files_in_dir) {
if (file_in_dir == "." || file_in_dir == "..") {
continue;
}
EXPECT_OK(env_->DeleteFile(dir + "/" + file_in_dir));
}
EXPECT_OK(env_->DeleteDir(dir));
}
void DestroyAndCreateDir(const std::string& dir) {
DestroyDir(dir);
EXPECT_OK(env_->CreateDir(dir));
}
int CountFilesInDir(const std::string& dir) {
std::vector<std::string> files_in_dir;
EXPECT_OK(env_->GetChildren(dir, &files_in_dir));
// Ignore "." and ".."
return static_cast<int>(files_in_dir.size()) - 2;
}
std::string NewDummyFile(const std::string& file_name, uint64_t size = 1024) {
std::string file_path = dummy_files_dir_ + "/" + file_name;
std::unique_ptr<WritableFile> f;
env_->NewWritableFile(file_path, &f, EnvOptions());
std::string data(size, 'A');
EXPECT_OK(f->Append(data));
EXPECT_OK(f->Close());
return file_path;
}
Env* env_;
std::string dummy_files_dir_;
std::string trash_dir_;
int64_t rate_bytes_per_sec_;
DeleteScheduler* delete_scheduler_;
};
// Test the basic functionality of DeleteScheduler (Rate Limiting).
// 1- Create 100 dummy files
// 2- Delete the 100 dummy files using DeleteScheduler
// 3- Wait for DeleteScheduler to delete all files in trash
// 4- Measure time spent in step 2,3 and make sure it matches the expected
// time from a rate limited delete
// 5- Make sure that all created files were completely deleted
TEST_F(DeleteSchedulerTest, BasicRateLimiting) {
int num_files = 100; // 100 files
uint64_t file_size = 1024; // every file is 1 kb
std::vector<uint64_t> delete_kbs_per_sec = {512, 200, 100, 50, 25};
for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
DestroyAndCreateDir(dummy_files_dir_);
rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
delete_scheduler_ =
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_);
// Create 100 dummy files, every file is 1 Kb
std::vector<std::string> generated_files;
uint64_t total_files_size = 0;
for (int i = 0; i < num_files; i++) {
std::string file_name = "file" + ToString(i) + ".data";
generated_files.push_back(NewDummyFile(file_name, file_size));
total_files_size += file_size;
}
// Delete dummy files and measure time spent to empty trash
uint64_t delete_start_time = env_->NowMicros();
for (int i = 0; i < num_files; i++) {
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[i]));
}
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
WaitForEmptyTrash();
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
uint64_t expected_delete_time =
((total_files_size * 1000000) / rate_bytes_per_sec_);
ASSERT_GT(time_spent_deleting, expected_delete_time * 0.9);
ASSERT_LT(time_spent_deleting, expected_delete_time * 1.1);
printf("Delete time = %" PRIu64 ", Expected delete time = %" PRIu64
", Ratio %f\n",
time_spent_deleting, expected_delete_time,
static_cast<double>(time_spent_deleting) / expected_delete_time);
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 0);
}
}
// Same as the BasicRateLimiting test but delete files in multiple threads.
// 1- Create 100 dummy files
// 2- Delete the 100 dummy files using DeleteScheduler using 10 threads
// 3- Wait for DeleteScheduler to delete all files in queue
// 4- Measure time spent in step 2,3 and make sure it matches the expected
// time from a rate limited delete
// 5- Make sure that all created files were completely deleted
TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) {
int thread_cnt = 10;
int num_files = 10; // 10 files per thread
uint64_t file_size = 1024; // every file is 1 kb
std::vector<uint64_t> delete_kbs_per_sec = {512, 200, 100, 50, 25};
for (size_t t = 0; t < delete_kbs_per_sec.size(); t++) {
DestroyAndCreateDir(dummy_files_dir_);
rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024;
delete_scheduler_ =
NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_);
// Create 100 dummy files, every file is 1 Kb
std::vector<std::string> generated_files;
uint64_t total_files_size = 0;
for (int i = 0; i < num_files * thread_cnt; i++) {
std::string file_name = "file" + ToString(i) + ".data";
generated_files.push_back(NewDummyFile(file_name, file_size));
total_files_size += file_size;
}
// Delete dummy files using 10 threads and measure time spent to empty trash
uint64_t delete_start_time = env_->NowMicros();
std::atomic<int> thread_num(0);
std::vector<std::thread> threads;
for (int i = 0; i < thread_cnt; i++) {
threads.emplace_back([&]() {
int idx = thread_num.fetch_add(1);
int range_start = idx * num_files;
int range_end = range_start + num_files;
for (int j = range_start; j < range_end; j++){
ASSERT_OK(delete_scheduler_->DeleteFile(generated_files[j]));
}
});
}
for (size_t i = 0; i < threads.size(); i++) {
threads[i].join();
}
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
WaitForEmptyTrash();
uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time;
uint64_t expected_delete_time =
((total_files_size * 1000000) / rate_bytes_per_sec_);
ASSERT_GT(time_spent_deleting, expected_delete_time * 0.9);
ASSERT_LT(time_spent_deleting, expected_delete_time * 1.1);
printf("Delete time = %" PRIu64 ", Expected delete time = %" PRIu64
", Ratio %f\n",
time_spent_deleting, expected_delete_time,
static_cast<double>(time_spent_deleting) / expected_delete_time);
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 0);
}
}
// Disable rate limiting by setting rate_bytes_per_sec_ to 0 and make sure
// that when DeleteScheduler delete a file it delete it immediately and dont
// move it to trash
TEST_F(DeleteSchedulerTest, DisableRateLimiting) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
delete_scheduler_ = NewDeleteScheduler(env_, "", 0);
for (int i = 0; i < 10; i++) {
// Every file we delete will be deleted immediately
std::string dummy_file = NewDummyFile("dummy.data");
ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file));
ASSERT_TRUE(env_->FileExists(dummy_file).IsNotFound());
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
}
ASSERT_EQ(bg_delete_file, 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// Testing that moving files to trash with the same name is not a problem
// 1- Create 10 files with the same name "conflict.data"
// 2- Delete the 10 files using DeleteScheduler
// 3- Make sure that trash directory contain 10 files ("conflict.data" x 10)
// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash ---
// 4- Make sure that files are deleted from trash
TEST_F(DeleteSchedulerTest, ConflictNames) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DeleteSchedulerTest::ConflictNames:1",
"DeleteSchedulerImpl::BackgroundEmptyTrash"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec
delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_);
// Create "conflict.data" and move it to trash 10 times
for (int i = 0; i < 10; i++) {
std::string dummy_file = NewDummyFile("conflict.data");
ASSERT_OK(delete_scheduler_->DeleteFile(dummy_file));
}
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
// 10 files ("conflict.data" x 10) in trash
ASSERT_EQ(CountFilesInDir(trash_dir_), 10);
// Hold BackgroundEmptyTrash
TEST_SYNC_POINT("DeleteSchedulerTest::ConflictNames:1");
WaitForEmptyTrash();
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// 1- Create 10 dummy files
// 2- Delete the 10 files using DeleteScheduler (move them to trsah)
// 3- Delete the 10 files directly (using env_->DeleteFile)
// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash ---
// 4- Make sure that DeleteScheduler failed to delete the 10 files and
// reported 10 background errors
TEST_F(DeleteSchedulerTest, BackgroundError) {
rocksdb::SyncPoint::GetInstance()->LoadDependency({
{"DeleteSchedulerTest::BackgroundError:1",
"DeleteSchedulerImpl::BackgroundEmptyTrash"},
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec
delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_);
// Generate 10 dummy files and move them to trash
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
}
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
ASSERT_EQ(CountFilesInDir(trash_dir_), 10);
// Delete 10 files from trash, this will cause background errors in
// BackgroundEmptyTrash since we already deleted the files it was
// goind to delete
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(env_->DeleteFile(trash_dir_ + "/" + file_name));
}
// Hold BackgroundEmptyTrash
TEST_SYNC_POINT("DeleteSchedulerTest::BackgroundError:1");
WaitForEmptyTrash();
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 10);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// 1- Create 10 files in trash
// 2- Create a DeleteScheduler with delete_exisitng_trash = true
// 3- Wait for DeleteScheduler to delete all files in queue
// 4- Make sure that all files in trash directory were deleted
TEST_F(DeleteSchedulerTest, TrashWithExistingFiles) {
std::vector<std::string> dummy_files;
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
std::string trash_path = trash_dir_ + "/" + file_name;
env_->RenameFile(NewDummyFile(file_name), trash_path);
}
ASSERT_EQ(CountFilesInDir(trash_dir_), 10);
Status s;
rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec
delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_,
nullptr, true, &s);
ASSERT_OK(s);
WaitForEmptyTrash();
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 0);
}
// 1- Create 10 dummy files
// 2- Delete 10 dummy files using DeleteScheduler
// 3- Wait for DeleteScheduler to delete all files in queue
// 4- Make sure all files in trash directory were deleted
// 5- Repeat previous steps 5 times
TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec
delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_);
// Move files to trash, wait for empty trash, start again
for (int run = 1; run <= 5; run++) {
// Generate 10 dummy files and move them to trash
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
}
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
WaitForEmptyTrash();
ASSERT_EQ(bg_delete_file, 10 * run);
ASSERT_EQ(CountFilesInDir(trash_dir_), 0);
auto bg_errors = delete_scheduler_->GetBackgroundErrors();
ASSERT_EQ(bg_errors.size(), 0);
}
ASSERT_EQ(bg_delete_file, 50);
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
}
// 1- Create a DeleteScheduler with very slow rate limit (1 Byte / sec)
// 2- Delete 100 files using DeleteScheduler
// 3- Delete the DeleteScheduler (call the destructor while queue is not empty)
// 4- Make sure that not all files were deleted from trash and that
// DeleteScheduler background thread did not delete all files
TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1; // 1 Byte / sec
delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_);
for (int i = 0; i < 100; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
}
// Deleting 100 files will need >28 hours to delete
// we will delete the DeleteScheduler while delete queue is not empty
delete delete_scheduler_;
delete_scheduler_ = nullptr;
ASSERT_LT(bg_delete_file, 100);
ASSERT_GT(CountFilesInDir(trash_dir_), 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
// 1- Delete the trash directory
// 2- Delete 10 files using DeleteScheduler
// 3- Make sure that the 10 files were deleted immediately since DeleteScheduler
// failed to move them to trash directory
TEST_F(DeleteSchedulerTest, MoveToTrashError) {
int bg_delete_file = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DeleteSchedulerImpl::DeleteTrashFile:DeleteFile",
[&](void* arg) { bg_delete_file++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rate_bytes_per_sec_ = 1024; // 1 Kb / sec
delete_scheduler_ = NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_);
// We will delete the trash directory, that mean that DeleteScheduler wont
// be able to move files to trash and will delete files them immediately.
DestroyDir(trash_dir_);
for (int i = 0; i < 10; i++) {
std::string file_name = "data_" + ToString(i) + ".data";
ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name)));
}
ASSERT_EQ(CountFilesInDir(dummy_files_dir_), 0);
ASSERT_EQ(bg_delete_file, 0);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -21,6 +21,7 @@
#include "rocksdb/cache.h" #include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h" #include "rocksdb/compaction_filter.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "rocksdb/delete_scheduler.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h"
#include "rocksdb/merge_operator.h" #include "rocksdb/merge_operator.h"
@ -199,6 +200,7 @@ DBOptions::DBOptions()
paranoid_checks(true), paranoid_checks(true),
env(Env::Default()), env(Env::Default()),
rate_limiter(nullptr), rate_limiter(nullptr),
delete_scheduler(nullptr),
info_log(nullptr), info_log(nullptr),
#ifdef NDEBUG #ifdef NDEBUG
info_log_level(INFO_LEVEL), info_log_level(INFO_LEVEL),
@ -250,6 +252,7 @@ DBOptions::DBOptions(const Options& options)
paranoid_checks(options.paranoid_checks), paranoid_checks(options.paranoid_checks),
env(options.env), env(options.env),
rate_limiter(options.rate_limiter), rate_limiter(options.rate_limiter),
delete_scheduler(options.delete_scheduler),
info_log(options.info_log), info_log(options.info_log),
info_log_level(options.info_log_level), info_log_level(options.info_log_level),
max_open_files(options.max_open_files), max_open_files(options.max_open_files),
@ -360,6 +363,8 @@ void DBOptions::Dump(Logger* log) const {
use_adaptive_mutex); use_adaptive_mutex);
Warn(log, " Options.rate_limiter: %p", Warn(log, " Options.rate_limiter: %p",
rate_limiter.get()); rate_limiter.get());
Warn(log, " Options.delete_scheduler.rate_bytes_per_sec: %" PRIi64,
delete_scheduler ? delete_scheduler->GetRateBytesPerSecond() : 0);
Warn(log, " Options.bytes_per_sync: %" PRIu64, Warn(log, " Options.bytes_per_sync: %" PRIu64,
bytes_per_sync); bytes_per_sync);
Warn(log, " Options.wal_bytes_per_sync: %" PRIu64, Warn(log, " Options.wal_bytes_per_sync: %" PRIu64,

Loading…
Cancel
Save