From d6c838f1e130d8860407bc771fa6d4ac238859ba Mon Sep 17 00:00:00 2001 From: Islam AbdelRahman Date: Thu, 28 Jan 2016 18:35:01 -0800 Subject: [PATCH] Add SstFileManager (component tracking all SST file in DBs and control the deletion rate) Summary: Add a new class SstFileTracker that will be notified whenever a DB add/delete/move and sst file, it will also replace DeleteScheduler SstFileTracker can be used later to abort writes when we exceed a specific size Test Plan: unit tests Reviewers: rven, anthony, yhchiang, sdong Reviewed By: sdong Subscribers: igor, lovro, march, dhruba Differential Revision: https://reviews.facebook.net/D50469 --- CMakeLists.txt | 3 +- HISTORY.md | 2 + db/compaction_job.cc | 6 + db/db_impl.cc | 43 ++++-- db/db_test.cc | 102 ++++++++++--- db/db_test_util.cc | 18 +++ db/db_test_util.h | 3 + include/rocksdb/delete_scheduler.h | 67 -------- include/rocksdb/options.h | 14 +- include/rocksdb/sst_file_manager.h | 64 ++++++++ src.mk | 3 +- ..._scheduler_impl.cc => delete_scheduler.cc} | 98 +++++------- ...te_scheduler_impl.h => delete_scheduler.h} | 26 ++-- util/delete_scheduler_test.cc | 87 ++++------- util/file_util.cc | 15 +- util/file_util.h | 4 +- util/options.cc | 11 +- util/options_test.cc | 4 +- util/sst_file_manager_impl.cc | 143 ++++++++++++++++++ util/sst_file_manager_impl.h | 77 ++++++++++ 20 files changed, 547 insertions(+), 243 deletions(-) delete mode 100644 include/rocksdb/delete_scheduler.h create mode 100644 include/rocksdb/sst_file_manager.h rename util/{delete_scheduler_impl.cc => delete_scheduler.cc} (63%) rename util/{delete_scheduler_impl.h => delete_scheduler.h} (73%) create mode 100644 util/sst_file_manager_impl.cc create mode 100644 util/sst_file_manager_impl.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b0fd54a46..a80344fe8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -196,13 +196,14 @@ set(SOURCES util/comparator.cc util/concurrent_arena.cc util/crc32c.cc - util/delete_scheduler_impl.cc + util/delete_scheduler.cc util/dynamic_bloom.cc util/env.cc util/env_hdfs.cc util/event_logger.cc util/file_util.cc util/file_reader_writer.cc + util/sst_file_manager_impl.cc util/filter_policy.cc util/hash.cc util/histogram.cc diff --git a/HISTORY.md b/HISTORY.md index 809cbf76c..f2476b8f1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,9 +3,11 @@ ### Public API Changes * Add a new perf context level between kEnableCount and kEnableTime. Level 2 now doesn't include timers for mutexes. * Statistics of mutex operation durations will not be measured by default. If you want to have them enabled, you need to set Statistics::stats_level_ to kAll. +* DBOptions::delete_scheduler and NewDeleteScheduler() are removed, please use DBOptions::sst_file_manager and NewSstFileManager() instead ### New Features * ldb tool now supports operations to non-default column families. +* Add DBOptions::sst_file_manager. Use NewSstFileManager() in include/rocksdb/sst_file_manager.h to create a SstFileManager that can be used to track the total size of SST files and control the SST files deletion rate. ## 4.4.0 (1/14/2016) ### Public API Changes diff --git a/db/compaction_job.cc b/db/compaction_job.cc index c30ee7736..5be4a2c2e 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -51,6 +51,7 @@ #include "util/iostats_context_imp.h" #include "util/log_buffer.h" #include "util/logging.h" +#include "util/sst_file_manager_impl.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" @@ -498,11 +499,16 @@ Status CompactionJob::Run() { } TablePropertiesCollection tp; + auto sfm = + static_cast(db_options_.sst_file_manager.get()); for (const auto& state : compact_->sub_compact_states) { for (const auto& output : state.outputs) { auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(), output.meta.fd.GetPathId()); tp[fn] = output.table_properties; + if (sfm && output.meta.fd.GetPathId() == 0) { + sfm->OnAddFile(fn); + } } } compact_->compaction->SetOutputTableProperties(std::move(tp)); diff --git a/db/db_impl.cc b/db/db_impl.cc index 02060393f..6db05ae66 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -64,7 +64,6 @@ #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" -#include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/sst_file_writer.h" @@ -89,6 +88,7 @@ #include "util/log_buffer.h" #include "util/logging.h" #include "util/mutexlock.h" +#include "util/sst_file_manager_impl.h" #include "util/options_helper.h" #include "util/options_parser.h" #include "util/perf_context_imp.h" @@ -786,8 +786,8 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { } #endif // !ROCKSDB_LITE Status file_deletion_status; - if (type == kTableFile && path_id == 0) { - file_deletion_status = DeleteOrMoveToTrash(&db_options_, fname); + if (type == kTableFile) { + file_deletion_status = DeleteSSTFile(&db_options_, fname, path_id); } else { file_deletion_status = env_->DeleteFile(fname); } @@ -1509,6 +1509,14 @@ Status DBImpl::FlushMemTableToOutputFile( // may temporarily unlock and lock the mutex. NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, job_context->job_id, flush_job.GetTableProperties()); + auto sfm = + static_cast(db_options_.sst_file_manager.get()); + if (sfm) { + // Notify sst_file_manager that a new file was added + std::string file_path = MakeTableFileName(db_options_.db_paths[0].path, + file_meta.fd.GetNumber()); + sfm->OnAddFile(file_path); + } } #endif // ROCKSDB_LITE return s; @@ -5406,6 +5414,25 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } impl->mutex_.Unlock(); + auto sfm = static_cast( + impl->db_options_.sst_file_manager.get()); + if (s.ok() && sfm) { + // Notify SstFileManager about all sst files that already exist in + // db_paths[0] when the DB is opened. + auto& db_path = impl->db_options_.db_paths[0]; + std::vector existing_files; + impl->db_options_.env->GetChildren(db_path.path, &existing_files); + for (auto& file_name : existing_files) { + uint64_t file_number; + FileType file_type; + std::string file_path = db_path.path + "/" + file_name; + if (ParseFileName(file_name, &file_number, &file_type) && + file_type == kTableFile) { + sfm->OnAddFile(file_path); + } + } + } + if (s.ok()) { Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p", impl); @@ -5465,7 +5492,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) { if (type == kMetaDatabase) { del = DestroyDB(path_to_delete, options); } else if (type == kTableFile) { - del = DeleteOrMoveToTrash(&options, path_to_delete); + del = DeleteSSTFile(&options, path_to_delete, 0); } else { del = env->DeleteFile(path_to_delete); } @@ -5481,13 +5508,9 @@ Status DestroyDB(const std::string& dbname, const Options& options) { for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) { // Lock file will be deleted at end - Status del; std::string table_path = db_path.path + "/" + filenames[i]; - if (path_id == 0) { - del = DeleteOrMoveToTrash(&options, table_path); - } else { - del = env->DeleteFile(table_path); - } + Status del = DeleteSSTFile(&options, table_path, + static_cast(path_id)); if (result.ok() && !del.ok()) { result = del; } diff --git a/db/db_test.cc b/db/db_test.cc index db3524345..76e64d484 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -37,9 +37,9 @@ #include "rocksdb/compaction_filter.h" #include "rocksdb/convenience.h" #include "rocksdb/db.h" -#include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/experimental.h" +#include "rocksdb/sst_file_manager.h" #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" #include "rocksdb/perf_context.h" @@ -65,9 +65,10 @@ #include "util/compression.h" #include "util/mutexlock.h" #include "util/rate_limiter.h" +#include "util/sst_file_manager_impl.h" #include "util/statistics.h" -#include "util/testharness.h" #include "util/sync_point.h" +#include "util/testharness.h" #include "util/testutil.h" #include "util/mock_env.h" #include "util/string_util.h" @@ -8431,15 +8432,78 @@ TEST_F(DBTest, DeletingOldWalAfterDrop) { } #ifndef ROCKSDB_LITE +TEST_F(DBTest, DBWithSstFileManager) { + std::shared_ptr sst_file_manager(NewSstFileManager(env_)); + auto sfm = static_cast(sst_file_manager.get()); + + int files_added = 0; + int files_deleted = 0; + int files_moved = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnAddFile", [&](void* arg) { files_added++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { files_deleted++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::OnMoveFile", [&](void* arg) { files_moved++; }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Options options = CurrentOptions(); + options.sst_file_manager = sst_file_manager; + DestroyAndReopen(options); + + Random rnd(301); + for (int i = 0; i < 25; i++) { + GenerateNewRandomFile(&rnd); + ASSERT_OK(Flush()); + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + // Verify that we are tracking all sst files in dbname_ + ASSERT_EQ(sfm->GetTrackedFiles(), GetAllSSTFiles()); + } + ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + + auto files_in_db = GetAllSSTFiles(); + // Verify that we are tracking all sst files in dbname_ + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + // Verify the total files size + uint64_t total_files_size = 0; + for (auto& file_to_size : files_in_db) { + total_files_size += file_to_size.second; + } + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + // We flushed at least 25 files + ASSERT_GE(files_added, 25); + // Compaction must have deleted some files + ASSERT_GT(files_deleted, 0); + // No files were moved + ASSERT_EQ(files_moved, 0); + + Close(); + Reopen(options); + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + + // Verify that we track all the files again after the DB is closed and opened + Close(); + sst_file_manager.reset(NewSstFileManager(env_)); + options.sst_file_manager = sst_file_manager; + sfm = static_cast(sst_file_manager.get()); + + Reopen(options); + ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db); + ASSERT_EQ(sfm->GetTotalSize(), total_files_size); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest, RateLimitedDelete) { rocksdb::SyncPoint::GetInstance()->LoadDependency({ - {"DBTest::RateLimitedDelete:1", - "DeleteSchedulerImpl::BackgroundEmptyTrash"}, + {"DBTest::RateLimitedDelete:1", "DeleteScheduler::BackgroundEmptyTrash"}, }); std::vector penalties; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::BackgroundEmptyTrash:Wait", + "DeleteScheduler::BackgroundEmptyTrash:Wait", [&](void* arg) { penalties.push_back(*(static_cast(arg))); }); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); @@ -8450,9 +8514,10 @@ TEST_F(DBTest, RateLimitedDelete) { std::string trash_dir = test::TmpDir(env_) + "/trash"; int64_t rate_bytes_per_sec = 1024 * 10; // 10 Kbs / Sec Status s; - options.delete_scheduler.reset(NewDeleteScheduler( - env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s)); + options.sst_file_manager.reset(NewSstFileManager( + env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s)); ASSERT_OK(s); + auto sfm = static_cast(options.sst_file_manager.get()); Destroy(last_options_); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -8479,7 +8544,7 @@ TEST_F(DBTest, RateLimitedDelete) { uint64_t delete_start_time = env_->NowMicros(); // Hold BackgroundEmptyTrash TEST_SYNC_POINT("DBTest::RateLimitedDelete:1"); - options.delete_scheduler->WaitForEmptyTrash(); + sfm->WaitForEmptyTrash(); uint64_t time_spent_deleting = env_->NowMicros() - delete_start_time; uint64_t total_files_size = 0; @@ -8502,7 +8567,7 @@ TEST_F(DBTest, RateLimitedDelete) { TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) { int bg_delete_file = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + "DeleteScheduler::DeleteTrashFile:DeleteFile", [&](void* arg) { bg_delete_file++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -8515,9 +8580,10 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) { std::string trash_dir = test::TmpDir(env_) + "/trash"; int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec Status s; - options.delete_scheduler.reset(NewDeleteScheduler( - env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s)); + options.sst_file_manager.reset(NewSstFileManager( + env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s)); ASSERT_OK(s); + auto sfm = static_cast(options.sst_file_manager.get()); DestroyAndReopen(options); @@ -8551,7 +8617,7 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) { ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); ASSERT_EQ("0,2", FilesPerLevel(0)); - options.delete_scheduler->WaitForEmptyTrash(); + sfm->WaitForEmptyTrash(); ASSERT_EQ(bg_delete_file, 8); compact_options.bottommost_level_compaction = @@ -8559,7 +8625,7 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) { ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); ASSERT_EQ("0,1", FilesPerLevel(0)); - options.delete_scheduler->WaitForEmptyTrash(); + sfm->WaitForEmptyTrash(); ASSERT_EQ(bg_delete_file, 8); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); @@ -8568,7 +8634,7 @@ TEST_F(DBTest, DeleteSchedulerMultipleDBPaths) { TEST_F(DBTest, DestroyDBWithRateLimitedDelete) { int bg_delete_file = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + "DeleteScheduler::DeleteTrashFile:DeleteFile", [&](void* arg) { bg_delete_file++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); @@ -8590,12 +8656,13 @@ TEST_F(DBTest, DestroyDBWithRateLimitedDelete) { std::string trash_dir = test::TmpDir(env_) + "/trash"; int64_t rate_bytes_per_sec = 1024 * 1024; // 1 Mb / Sec Status s; - options.delete_scheduler.reset(NewDeleteScheduler( - env_, trash_dir, rate_bytes_per_sec, nullptr, false, &s)); + options.sst_file_manager.reset(NewSstFileManager( + env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s)); ASSERT_OK(s); ASSERT_OK(DestroyDB(dbname_, options)); - options.delete_scheduler->WaitForEmptyTrash(); + auto sfm = static_cast(options.sst_file_manager.get()); + sfm->WaitForEmptyTrash(); // We have deleted the 4 sst files in the delete_scheduler ASSERT_EQ(bg_delete_file, 4); } @@ -10073,7 +10140,6 @@ TEST_F(DBTest, WalFilterTestWithChangeBatchExtraKeys) { ValidateKeyExistence(db_, keys_must_exist, keys_must_not_exist); } - #endif // ROCKSDB_LITE class SliceTransformLimitedDomain : public SliceTransform { diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 39a7a364f..e6ee304a5 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1005,4 +1005,22 @@ void DBTestBase::CopyFile(const std::string& source, ASSERT_OK(destfile->Close()); } +std::unordered_map DBTestBase::GetAllSSTFiles() { + std::unordered_map res; + + std::vector files; + env_->GetChildren(dbname_, &files); + for (auto& file_name : files) { + uint64_t number; + FileType type; + std::string file_path = dbname_ + "/" + file_name; + if (ParseFileName(file_name, &number, &type) && type == kTableFile) { + uint64_t file_size = 0; + env_->GetFileSize(file_path, &file_size); + res[file_path] = file_size; + } + } + return res; +} + } // namespace rocksdb diff --git a/db/db_test_util.h b/db/db_test_util.h index ebf105250..031057bbb 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -19,6 +19,7 @@ #endif #include +#include #include #include #include @@ -750,6 +751,8 @@ class DBTestBase : public testing::Test { void CopyFile(const std::string& source, const std::string& destination, uint64_t size = 0); + + std::unordered_map GetAllSSTFiles(); }; } // namespace rocksdb diff --git a/include/rocksdb/delete_scheduler.h b/include/rocksdb/delete_scheduler.h deleted file mode 100644 index 7c3eaee77..000000000 --- a/include/rocksdb/delete_scheduler.h +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright (c) 2015, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#pragma once - -#include -#include -#include - -#include "rocksdb/status.h" - -namespace rocksdb { - -class Env; -class Logger; - -// DeleteScheduler allow the DB to enforce a rate limit on file deletion, -// Instead of deleteing files immediately, files are moved to trash_dir -// and deleted in a background thread that apply sleep penlty between deletes -// if they are happening in a rate faster than rate_bytes_per_sec, -// -// Rate limiting can be turned off by setting rate_bytes_per_sec = 0, In this -// case DeleteScheduler will delete files immediately. -class DeleteScheduler { - public: - virtual ~DeleteScheduler() {} - - // Return delete rate limit in bytes per second - virtual int64_t GetRateBytesPerSecond() = 0; - - // Move file to trash directory and schedule it's deletion - virtual Status DeleteFile(const std::string& fname) = 0; - - // Return a map containing errors that happened in the background thread - // file_path => error status - virtual std::map GetBackgroundErrors() = 0; - - // Wait for all files being deleteing in the background to finish or for - // destructor to be called. - virtual void WaitForEmptyTrash() = 0; -}; - -// Create a new DeleteScheduler that can be shared among multiple RocksDB -// instances to control the file deletion rate. -// -// @env: Pointer to Env object, please see "rocksdb/env.h". -// @trash_dir: Path to the directory where deleted files will be moved into -// to be deleted in a background thread while applying rate limiting. If this -// directory dont exist, it will be created. This directory should not be -// used by any other process or any other DeleteScheduler. -// @rate_bytes_per_sec: How many bytes should be deleted per second, If this -// value is set to 1024 (1 Kb / sec) and we deleted a file of size 4 Kb -// in 1 second, we will wait for another 3 seconds before we delete other -// files, Set to 0 to disable rate limiting. -// @info_log: If not nullptr, info_log will be used to log errors. -// @delete_exisitng_trash: If set to true, the newly created DeleteScheduler -// will delete files that already exist in trash_dir. -// @status: If not nullptr, status will contain any errors that happened during -// creating the missing trash_dir or deleting existing files in trash. -extern DeleteScheduler* NewDeleteScheduler( - Env* env, const std::string& trash_dir, int64_t rate_bytes_per_sec, - std::shared_ptr info_log = nullptr, - bool delete_exisitng_trash = true, Status* status = nullptr); - -} // namespace rocksdb diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e7064b3cb..a3f410422 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -33,6 +33,7 @@ class CompactionFilterFactory; class Comparator; class Env; enum InfoLogLevel : unsigned char; +class SstFileManager; class FilterPolicy; class Logger; class MergeOperator; @@ -41,7 +42,6 @@ class TableFactory; class MemTableRepFactory; class TablePropertiesCollectorFactory; class RateLimiter; -class DeleteScheduler; class Slice; class SliceTransform; class Statistics; @@ -830,12 +830,12 @@ struct DBOptions { // Default: nullptr std::shared_ptr rate_limiter; - // Use to control files deletion rate, can be used among multiple - // RocksDB instances. delete_scheduler is only used to delete table files that - // need to be deleted from the first db_path (db_name if db_paths is empty), - // other files types and other db_paths wont be affected by delete_scheduler. - // Default: nullptr (disabled) - std::shared_ptr delete_scheduler; + // Use to track SST files and control their file deletion rate, can be used + // among multiple RocksDB instances, sst_file_manager only track and throttle + // deletes of SST files in first db_path (db_name if db_paths is empty), other + // files and other db_paths wont be tracked or affected by sst_file_manager. + // Default: nullptr + std::shared_ptr sst_file_manager; // Any internal progress/error information generated by the db will // be written to info_log if it is non-nullptr, or to a file stored diff --git a/include/rocksdb/sst_file_manager.h b/include/rocksdb/sst_file_manager.h new file mode 100644 index 000000000..665f01add --- /dev/null +++ b/include/rocksdb/sst_file_manager.h @@ -0,0 +1,64 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include +#include +#include + +#include "rocksdb/status.h" + +namespace rocksdb { + +class Env; +class Logger; + +// SstFileManager is used to track SST files in the DB and control there +// deletion rate. +// All SstFileManager public functions are thread-safe. +class SstFileManager { + public: + virtual ~SstFileManager() {} + + // Return the total size of all tracked files. + // thread-safe + virtual uint64_t GetTotalSize() = 0; + + // Return a map containing all tracked files and there corresponding sizes. + // thread-safe + virtual std::unordered_map GetTrackedFiles() = 0; + + // Return delete rate limit in bytes per second. + // thread-safe + virtual int64_t GetDeleteRateBytesPerSecond() = 0; +}; + +// Create a new SstFileManager that can be shared among multiple RocksDB +// instances to track SST file and control there deletion rate. +// +// @param env: Pointer to Env object, please see "rocksdb/env.h". +// @param info_log: If not nullptr, info_log will be used to log errors. +// +// == Deletion rate limiting specific arguments == +// @param trash_dir: Path to the directory where deleted files will be moved +// to be deleted in a background thread while applying rate limiting. If this +// directory dont exist, it will be created. This directory should not be +// used by any other process or any other SstFileManager, Set to "" to +// disable deletion rate limiting. +// @param rate_bytes_per_sec: How many bytes should be deleted per second, If +// this value is set to 1024 (1 Kb / sec) and we deleted a file of size 4 Kb +// in 1 second, we will wait for another 3 seconds before we delete other +// files, Set to 0 to disable deletion rate limiting. +// @param delete_exisitng_trash: If set to true, the newly created +// SstFileManager will delete files that already exist in trash_dir. +// @param status: If not nullptr, status will contain any errors that happened +// during creating the missing trash_dir or deleting existing files in trash. +extern SstFileManager* NewSstFileManager( + Env* env, std::shared_ptr info_log = nullptr, + std::string trash_dir = "", int64_t rate_bytes_per_sec = 0, + bool delete_exisitng_trash = true, Status* status = nullptr); + +} // namespace rocksdb diff --git a/src.mk b/src.mk index 4e95b1fc1..3fb811144 100644 --- a/src.mk +++ b/src.mk @@ -94,13 +94,14 @@ LIB_SOURCES = \ util/compaction_job_stats_impl.cc \ util/concurrent_arena.cc \ util/crc32c.cc \ - util/delete_scheduler_impl.cc \ + util/delete_scheduler.cc \ util/dynamic_bloom.cc \ util/env.cc \ util/env_hdfs.cc \ util/env_posix.cc \ util/io_posix.cc \ util/thread_posix.cc \ + util/sst_file_manager_impl.cc \ util/file_util.cc \ util/file_reader_writer.cc \ util/filter_policy.cc \ diff --git a/util/delete_scheduler_impl.cc b/util/delete_scheduler.cc similarity index 63% rename from util/delete_scheduler_impl.cc rename to util/delete_scheduler.cc index e0f7511e0..650b8582d 100644 --- a/util/delete_scheduler_impl.cc +++ b/util/delete_scheduler.cc @@ -3,38 +3,40 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. -#include "util/delete_scheduler_impl.h" +#include "util/delete_scheduler.h" #include #include #include "port/port.h" #include "rocksdb/env.h" +#include "util/sst_file_manager_impl.h" #include "util/mutexlock.h" #include "util/sync_point.h" namespace rocksdb { -DeleteSchedulerImpl::DeleteSchedulerImpl(Env* env, const std::string& trash_dir, - int64_t rate_bytes_per_sec, - std::shared_ptr info_log) +DeleteScheduler::DeleteScheduler(Env* env, const std::string& trash_dir, + int64_t rate_bytes_per_sec, Logger* info_log, + SstFileManagerImpl* sst_file_manager) : env_(env), trash_dir_(trash_dir), rate_bytes_per_sec_(rate_bytes_per_sec), pending_files_(0), closing_(false), cv_(&mu_), - info_log_(info_log) { - if (rate_bytes_per_sec_ == 0) { + info_log_(info_log), + sst_file_manager_(sst_file_manager) { + if (rate_bytes_per_sec_ <= 0) { // Rate limiting is disabled bg_thread_.reset(); } else { bg_thread_.reset( - new std::thread(&DeleteSchedulerImpl::BackgroundEmptyTrash, this)); + new std::thread(&DeleteScheduler::BackgroundEmptyTrash, this)); } } -DeleteSchedulerImpl::~DeleteSchedulerImpl() { +DeleteScheduler::~DeleteScheduler() { { MutexLock l(&mu_); closing_ = true; @@ -45,20 +47,29 @@ DeleteSchedulerImpl::~DeleteSchedulerImpl() { } } -Status DeleteSchedulerImpl::DeleteFile(const std::string& file_path) { - if (rate_bytes_per_sec_ == 0) { +Status DeleteScheduler::DeleteFile(const std::string& file_path) { + Status s; + if (rate_bytes_per_sec_ <= 0) { // Rate limiting is disabled - return env_->DeleteFile(file_path); + s = env_->DeleteFile(file_path); + if (s.ok() && sst_file_manager_) { + sst_file_manager_->OnDeleteFile(file_path); + } + return s; } // Move file to trash std::string path_in_trash; - Status s = MoveToTrash(file_path, &path_in_trash); + s = MoveToTrash(file_path, &path_in_trash); if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, info_log_, "Failed to move %s to trash directory (%s)", file_path.c_str(), trash_dir_.c_str()); - return env_->DeleteFile(file_path); + s = env_->DeleteFile(file_path); + if (s.ok() && sst_file_manager_) { + sst_file_manager_->OnDeleteFile(file_path); + } + return s; } // Add file to delete queue @@ -73,13 +84,13 @@ Status DeleteSchedulerImpl::DeleteFile(const std::string& file_path) { return s; } -std::map DeleteSchedulerImpl::GetBackgroundErrors() { +std::map DeleteScheduler::GetBackgroundErrors() { MutexLock l(&mu_); return bg_errors_; } -Status DeleteSchedulerImpl::MoveToTrash(const std::string& file_path, - std::string* path_in_trash) { +Status DeleteScheduler::MoveToTrash(const std::string& file_path, + std::string* path_in_trash) { Status s; // Figure out the name of the file in trash folder size_t idx = file_path.rfind("/"); @@ -112,11 +123,14 @@ Status DeleteSchedulerImpl::MoveToTrash(const std::string& file_path, break; } } + if (s.ok() && sst_file_manager_) { + sst_file_manager_->OnMoveFile(file_path, *path_in_trash); + } return s; } -void DeleteSchedulerImpl::BackgroundEmptyTrash() { - TEST_SYNC_POINT("DeleteSchedulerImpl::BackgroundEmptyTrash"); +void DeleteScheduler::BackgroundEmptyTrash() { + TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash"); while (true) { MutexLock l(&mu_); @@ -151,7 +165,7 @@ void DeleteSchedulerImpl::BackgroundEmptyTrash() { uint64_t total_penlty = ((total_deleted_bytes * kMicrosInSecond) / rate_bytes_per_sec_); while (!closing_ && !cv_.TimedWait(start_time + total_penlty)) {} - TEST_SYNC_POINT_CALLBACK("DeleteSchedulerImpl::BackgroundEmptyTrash:Wait", + TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait", &total_penlty); pending_files_--; @@ -164,12 +178,12 @@ void DeleteSchedulerImpl::BackgroundEmptyTrash() { } } -Status DeleteSchedulerImpl::DeleteTrashFile(const std::string& path_in_trash, - uint64_t* deleted_bytes) { +Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash, + uint64_t* deleted_bytes) { uint64_t file_size; Status s = env_->GetFileSize(path_in_trash, &file_size); if (s.ok()) { - TEST_SYNC_POINT("DeleteSchedulerImpl::DeleteTrashFile:DeleteFile"); + TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile"); s = env_->DeleteFile(path_in_trash); } @@ -181,51 +195,19 @@ Status DeleteSchedulerImpl::DeleteTrashFile(const std::string& path_in_trash, *deleted_bytes = 0; } else { *deleted_bytes = file_size; + if (sst_file_manager_) { + sst_file_manager_->OnDeleteFile(path_in_trash); + } } return s; } -void DeleteSchedulerImpl::WaitForEmptyTrash() { +void DeleteScheduler::WaitForEmptyTrash() { MutexLock l(&mu_); while (pending_files_ > 0 && !closing_) { cv_.Wait(); } } -DeleteScheduler* NewDeleteScheduler(Env* env, const std::string& trash_dir, - int64_t rate_bytes_per_sec, - std::shared_ptr info_log, - bool delete_exisitng_trash, - Status* status) { - DeleteScheduler* res = - new DeleteSchedulerImpl(env, trash_dir, rate_bytes_per_sec, info_log); - - Status s; - if (trash_dir != "") { - s = env->CreateDirIfMissing(trash_dir); - if (s.ok() && delete_exisitng_trash) { - std::vector files_in_trash; - s = env->GetChildren(trash_dir, &files_in_trash); - if (s.ok()) { - for (const std::string& trash_file : files_in_trash) { - if (trash_file == "." || trash_file == "..") { - continue; - } - Status file_delete = res->DeleteFile(trash_dir + "/" + trash_file); - if (s.ok() && !file_delete.ok()) { - s = file_delete; - } - } - } - } - } - - if (status) { - *status = s; - } - - return res; -} - } // namespace rocksdb diff --git a/util/delete_scheduler_impl.h b/util/delete_scheduler.h similarity index 73% rename from util/delete_scheduler_impl.h rename to util/delete_scheduler.h index 32ef65f0c..8ce2e3005 100644 --- a/util/delete_scheduler_impl.h +++ b/util/delete_scheduler.h @@ -12,21 +12,28 @@ #include "port/port.h" -#include "rocksdb/delete_scheduler.h" #include "rocksdb/status.h" namespace rocksdb { class Env; class Logger; - -class DeleteSchedulerImpl : public DeleteScheduler { +class SstFileManagerImpl; + +// DeleteScheduler allows the DB to enforce a rate limit on file deletion, +// Instead of deleteing files immediately, files are moved to trash_dir +// and deleted in a background thread that apply sleep penlty between deletes +// if they are happening in a rate faster than rate_bytes_per_sec, +// +// Rate limiting can be turned off by setting rate_bytes_per_sec = 0, In this +// case DeleteScheduler will delete files immediately. +class DeleteScheduler { public: - DeleteSchedulerImpl(Env* env, const std::string& trash_dir, - int64_t rate_bytes_per_sec, - std::shared_ptr info_log); + DeleteScheduler(Env* env, const std::string& trash_dir, + int64_t rate_bytes_per_sec, Logger* info_log, + SstFileManagerImpl* sst_file_manager); - ~DeleteSchedulerImpl(); + ~DeleteScheduler(); // Return delete rate limit in bytes per second int64_t GetRateBytesPerSecond() { return rate_bytes_per_sec_; } @@ -63,7 +70,7 @@ class DeleteSchedulerImpl : public DeleteScheduler { int32_t pending_files_; // Errors that happened in BackgroundEmptyTrash (file_path => error) std::map bg_errors_; - // Set to true in ~DeleteSchedulerImpl() to force BackgroundEmptyTrash to stop + // Set to true in ~DeleteScheduler() to force BackgroundEmptyTrash to stop bool closing_; // Condition variable signaled in these conditions // - pending_files_ value change from 0 => 1 @@ -74,7 +81,8 @@ class DeleteSchedulerImpl : public DeleteScheduler { std::unique_ptr bg_thread_; // Mutex to protect threads from file name conflicts port::Mutex file_move_mu_; - std::shared_ptr info_log_; + Logger* info_log_; + SstFileManagerImpl* sst_file_manager_; static const uint64_t kMicrosInSecond = 1000 * 1000LL; }; diff --git a/util/delete_scheduler_test.cc b/util/delete_scheduler_test.cc index fcd821c15..21b8a5b19 100644 --- a/util/delete_scheduler_test.cc +++ b/util/delete_scheduler_test.cc @@ -12,9 +12,9 @@ #include #include -#include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/options.h" +#include "util/delete_scheduler.h" #include "util/string_util.h" #include "util/sync_point.h" #include "util/testharness.h" @@ -74,6 +74,12 @@ class DeleteSchedulerTest : public testing::Test { return file_path; } + void NewDeleteScheduler() { + ASSERT_OK(env_->CreateDirIfMissing(trash_dir_)); + delete_scheduler_.reset(new DeleteScheduler( + env_, trash_dir_, rate_bytes_per_sec_, nullptr, nullptr)); + } + Env* env_; std::string dummy_files_dir_; std::string trash_dir_; @@ -84,19 +90,19 @@ class DeleteSchedulerTest : public testing::Test { // Test the basic functionality of DeleteScheduler (Rate Limiting). // 1- Create 100 dummy files // 2- Delete the 100 dummy files using DeleteScheduler -// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash --- +// --- Hold DeleteScheduler::BackgroundEmptyTrash --- // 3- Wait for DeleteScheduler to delete all files in trash // 4- Verify that BackgroundEmptyTrash used to correct penlties for the files // 5- Make sure that all created files were completely deleted TEST_F(DeleteSchedulerTest, BasicRateLimiting) { rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"DeleteSchedulerTest::BasicRateLimiting:1", - "DeleteSchedulerImpl::BackgroundEmptyTrash"}, + "DeleteScheduler::BackgroundEmptyTrash"}, }); std::vector penalties; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::BackgroundEmptyTrash:Wait", + "DeleteScheduler::BackgroundEmptyTrash:Wait", [&](void* arg) { penalties.push_back(*(static_cast(arg))); }); int num_files = 100; // 100 files @@ -110,8 +116,7 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) { DestroyAndCreateDir(dummy_files_dir_); rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024; - delete_scheduler_.reset( - NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_)); + NewDeleteScheduler(); // Create 100 dummy files, every file is 1 Kb std::vector generated_files; @@ -152,19 +157,19 @@ TEST_F(DeleteSchedulerTest, BasicRateLimiting) { // Same as the BasicRateLimiting test but delete files in multiple threads. // 1- Create 100 dummy files // 2- Delete the 100 dummy files using DeleteScheduler using 10 threads -// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash --- +// --- Hold DeleteScheduler::BackgroundEmptyTrash --- // 3- Wait for DeleteScheduler to delete all files in queue // 4- Verify that BackgroundEmptyTrash used to correct penlties for the files // 5- Make sure that all created files were completely deleted TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) { rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"DeleteSchedulerTest::RateLimitingMultiThreaded:1", - "DeleteSchedulerImpl::BackgroundEmptyTrash"}, + "DeleteScheduler::BackgroundEmptyTrash"}, }); std::vector penalties; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::BackgroundEmptyTrash:Wait", + "DeleteScheduler::BackgroundEmptyTrash:Wait", [&](void* arg) { penalties.push_back(*(static_cast(arg))); }); int thread_cnt = 10; @@ -179,8 +184,7 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) { DestroyAndCreateDir(dummy_files_dir_); rate_bytes_per_sec_ = delete_kbs_per_sec[t] * 1024; - delete_scheduler_.reset( - NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_)); + NewDeleteScheduler(); // Create 100 dummy files, every file is 1 Kb std::vector generated_files; @@ -239,12 +243,13 @@ TEST_F(DeleteSchedulerTest, RateLimitingMultiThreaded) { TEST_F(DeleteSchedulerTest, DisableRateLimiting) { int bg_delete_file = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + "DeleteScheduler::DeleteTrashFile:DeleteFile", [&](void* arg) { bg_delete_file++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - delete_scheduler_.reset(NewDeleteScheduler(env_, "", 0)); + rate_bytes_per_sec_ = 0; + NewDeleteScheduler(); for (int i = 0; i < 10; i++) { // Every file we delete will be deleted immediately @@ -264,18 +269,17 @@ TEST_F(DeleteSchedulerTest, DisableRateLimiting) { // 1- Create 10 files with the same name "conflict.data" // 2- Delete the 10 files using DeleteScheduler // 3- Make sure that trash directory contain 10 files ("conflict.data" x 10) -// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash --- +// --- Hold DeleteScheduler::BackgroundEmptyTrash --- // 4- Make sure that files are deleted from trash TEST_F(DeleteSchedulerTest, ConflictNames) { rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"DeleteSchedulerTest::ConflictNames:1", - "DeleteSchedulerImpl::BackgroundEmptyTrash"}, + "DeleteScheduler::BackgroundEmptyTrash"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec - delete_scheduler_.reset( - NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_)); + NewDeleteScheduler(); // Create "conflict.data" and move it to trash 10 times for (int i = 0; i < 10; i++) { @@ -300,19 +304,18 @@ TEST_F(DeleteSchedulerTest, ConflictNames) { // 1- Create 10 dummy files // 2- Delete the 10 files using DeleteScheduler (move them to trsah) // 3- Delete the 10 files directly (using env_->DeleteFile) -// --- Hold DeleteSchedulerImpl::BackgroundEmptyTrash --- +// --- Hold DeleteScheduler::BackgroundEmptyTrash --- // 4- Make sure that DeleteScheduler failed to delete the 10 files and // reported 10 background errors TEST_F(DeleteSchedulerTest, BackgroundError) { rocksdb::SyncPoint::GetInstance()->LoadDependency({ {"DeleteSchedulerTest::BackgroundError:1", - "DeleteSchedulerImpl::BackgroundEmptyTrash"}, + "DeleteScheduler::BackgroundEmptyTrash"}, }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec - delete_scheduler_.reset( - NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_)); + NewDeleteScheduler(); // Generate 10 dummy files and move them to trash for (int i = 0; i < 10; i++) { @@ -339,32 +342,6 @@ TEST_F(DeleteSchedulerTest, BackgroundError) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } -// 1- Create 10 files in trash -// 2- Create a DeleteScheduler with delete_exisitng_trash = true -// 3- Wait for DeleteScheduler to delete all files in queue -// 4- Make sure that all files in trash directory were deleted -TEST_F(DeleteSchedulerTest, TrashWithExistingFiles) { - std::vector dummy_files; - for (int i = 0; i < 10; i++) { - std::string file_name = "data_" + ToString(i) + ".data"; - std::string trash_path = trash_dir_ + "/" + file_name; - env_->RenameFile(NewDummyFile(file_name), trash_path); - } - ASSERT_EQ(CountFilesInDir(trash_dir_), 10); - - Status s; - rate_bytes_per_sec_ = 1024 * 1024; // 1 Mb/sec - delete_scheduler_.reset(NewDeleteScheduler( - env_, trash_dir_, rate_bytes_per_sec_, nullptr, true, &s)); - ASSERT_OK(s); - - delete_scheduler_->WaitForEmptyTrash(); - ASSERT_EQ(CountFilesInDir(trash_dir_), 0); - - auto bg_errors = delete_scheduler_->GetBackgroundErrors(); - ASSERT_EQ(bg_errors.size(), 0); -} - // 1- Create 10 dummy files // 2- Delete 10 dummy files using DeleteScheduler // 3- Wait for DeleteScheduler to delete all files in queue @@ -373,13 +350,12 @@ TEST_F(DeleteSchedulerTest, TrashWithExistingFiles) { TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) { int bg_delete_file = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + "DeleteScheduler::DeleteTrashFile:DeleteFile", [&](void* arg) { bg_delete_file++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rate_bytes_per_sec_ = 1024 * 1024; // 1 MB / sec - delete_scheduler_.reset( - NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_)); + NewDeleteScheduler(); // Move files to trash, wait for empty trash, start again for (int run = 1; run <= 5; run++) { @@ -409,13 +385,12 @@ TEST_F(DeleteSchedulerTest, StartBGEmptyTrashMultipleTimes) { TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) { int bg_delete_file = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + "DeleteScheduler::DeleteTrashFile:DeleteFile", [&](void* arg) { bg_delete_file++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rate_bytes_per_sec_ = 1; // 1 Byte / sec - delete_scheduler_.reset( - NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_)); + NewDeleteScheduler(); for (int i = 0; i < 100; i++) { std::string file_name = "data_" + ToString(i) + ".data"; @@ -439,13 +414,12 @@ TEST_F(DeleteSchedulerTest, DestructorWithNonEmptyQueue) { TEST_F(DeleteSchedulerTest, MoveToTrashError) { int bg_delete_file = 0; rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DeleteSchedulerImpl::DeleteTrashFile:DeleteFile", + "DeleteScheduler::DeleteTrashFile:DeleteFile", [&](void* arg) { bg_delete_file++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rate_bytes_per_sec_ = 1024; // 1 Kb / sec - delete_scheduler_.reset( - NewDeleteScheduler(env_, trash_dir_, rate_bytes_per_sec_)); + NewDeleteScheduler(); // We will delete the trash directory, that mean that DeleteScheduler wont // be able to move files to trash and will delete files them immediately. @@ -460,7 +434,6 @@ TEST_F(DeleteSchedulerTest, MoveToTrashError) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } - } // namespace rocksdb int main(int argc, char** argv) { diff --git a/util/file_util.cc b/util/file_util.cc index 55eeab722..1e2e84211 100644 --- a/util/file_util.cc +++ b/util/file_util.cc @@ -8,9 +8,9 @@ #include #include -#include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" #include "rocksdb/options.h" +#include "util/sst_file_manager_impl.h" #include "util/file_reader_writer.h" namespace rocksdb { @@ -66,12 +66,15 @@ Status CopyFile(Env* env, const std::string& source, return Status::OK(); } -Status DeleteOrMoveToTrash(const DBOptions* db_options, - const std::string& fname) { - if (db_options->delete_scheduler == nullptr) { - return db_options->env->DeleteFile(fname); +Status DeleteSSTFile(const DBOptions* db_options, const std::string& fname, + uint32_t path_id) { + // TODO(tec): support sst_file_manager for multiple path_ids + auto sfm = + static_cast(db_options->sst_file_manager.get()); + if (sfm && path_id == 0) { + return sfm->ScheduleFileDeletion(fname); } else { - return db_options->delete_scheduler->DeleteFile(fname); + return db_options->env->DeleteFile(fname); } } diff --git a/util/file_util.h b/util/file_util.h index f3e02fb0b..f19dc6f0b 100644 --- a/util/file_util.h +++ b/util/file_util.h @@ -16,7 +16,7 @@ namespace rocksdb { extern Status CopyFile(Env* env, const std::string& source, const std::string& destination, uint64_t size = 0); -extern Status DeleteOrMoveToTrash(const DBOptions* db_options, - const std::string& fname); +extern Status DeleteSSTFile(const DBOptions* db_options, + const std::string& fname, uint32_t path_id); } // namespace rocksdb diff --git a/util/options.cc b/util/options.cc index 13fee9051..d21d2a24b 100644 --- a/util/options.cc +++ b/util/options.cc @@ -20,8 +20,8 @@ #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/comparator.h" -#include "rocksdb/delete_scheduler.h" #include "rocksdb/env.h" +#include "rocksdb/sst_file_manager.h" #include "rocksdb/memtablerep.h" #include "rocksdb/merge_operator.h" #include "rocksdb/slice.h" @@ -213,7 +213,7 @@ DBOptions::DBOptions() paranoid_checks(true), env(Env::Default()), rate_limiter(nullptr), - delete_scheduler(nullptr), + sst_file_manager(nullptr), info_log(nullptr), #ifdef NDEBUG info_log_level(INFO_LEVEL), @@ -281,7 +281,7 @@ DBOptions::DBOptions(const Options& options) paranoid_checks(options.paranoid_checks), env(options.env), rate_limiter(options.rate_limiter), - delete_scheduler(options.delete_scheduler), + sst_file_manager(options.sst_file_manager), info_log(options.info_log), info_log_level(options.info_log_level), max_open_files(options.max_open_files), @@ -433,8 +433,9 @@ void DBOptions::Dump(Logger* log) const { use_adaptive_mutex); Header(log, " Options.rate_limiter: %p", rate_limiter.get()); - Header(log, " Options.delete_scheduler.rate_bytes_per_sec: %" PRIi64, - delete_scheduler ? delete_scheduler->GetRateBytesPerSecond() : 0); + Header( + log, " Options.sst_file_manager.rate_bytes_per_sec: %" PRIi64, + sst_file_manager ? sst_file_manager->GetDeleteRateBytesPerSecond() : 0); Header(log, " Options.bytes_per_sync: %" PRIu64, bytes_per_sync); Header(log, " Options.wal_bytes_per_sync: %" PRIu64, diff --git a/util/options_test.cc b/util/options_test.cc index 09ecbea03..65c45c2b0 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -1609,8 +1609,8 @@ TEST_F(OptionsParserTest, DBOptionsAllFieldsSettable) { {offsetof(struct DBOptions, env), sizeof(Env*)}, {offsetof(struct DBOptions, rate_limiter), sizeof(std::shared_ptr)}, - {offsetof(struct DBOptions, delete_scheduler), - sizeof(std::shared_ptr)}, + {offsetof(struct DBOptions, sst_file_manager), + sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, info_log), sizeof(std::shared_ptr)}, {offsetof(struct DBOptions, statistics), sizeof(std::shared_ptr)}, diff --git a/util/sst_file_manager_impl.cc b/util/sst_file_manager_impl.cc new file mode 100644 index 000000000..f4cc82e5d --- /dev/null +++ b/util/sst_file_manager_impl.cc @@ -0,0 +1,143 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/sst_file_manager_impl.h" + +#include + +#include "port/port.h" +#include "rocksdb/env.h" +#include "util/mutexlock.h" +#include "util/sync_point.h" + +namespace rocksdb { + +SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr logger, + const std::string& trash_dir, + int64_t rate_bytes_per_sec) + : env_(env), + logger_(logger), + total_files_size_(0), + delete_scheduler_(env, trash_dir, rate_bytes_per_sec, logger.get(), + this) {} + +SstFileManagerImpl::~SstFileManagerImpl() {} + +Status SstFileManagerImpl::OnAddFile(const std::string& file_path) { + uint64_t file_size; + Status s = env_->GetFileSize(file_path, &file_size); + if (s.ok()) { + MutexLock l(&mu_); + OnAddFileImpl(file_path, file_size); + } + TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile"); + return s; +} + +Status SstFileManagerImpl::OnDeleteFile(const std::string& file_path) { + { + MutexLock l(&mu_); + OnDeleteFileImpl(file_path); + } + TEST_SYNC_POINT("SstFileManagerImpl::OnDeleteFile"); + return Status::OK(); +} + +Status SstFileManagerImpl::OnMoveFile(const std::string& old_path, + const std::string& new_path) { + { + MutexLock l(&mu_); + OnAddFileImpl(new_path, tracked_files_[old_path]); + OnDeleteFileImpl(old_path); + } + TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile"); + return Status::OK(); +} + +uint64_t SstFileManagerImpl::GetTotalSize() { + MutexLock l(&mu_); + return total_files_size_; +} + +std::unordered_map +SstFileManagerImpl::GetTrackedFiles() { + MutexLock l(&mu_); + return tracked_files_; +} + +int64_t SstFileManagerImpl::GetDeleteRateBytesPerSecond() { + return delete_scheduler_.GetRateBytesPerSecond(); +} + +Status SstFileManagerImpl::ScheduleFileDeletion(const std::string& file_path) { + return delete_scheduler_.DeleteFile(file_path); +} + +void SstFileManagerImpl::WaitForEmptyTrash() { + delete_scheduler_.WaitForEmptyTrash(); +} + +void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path, + uint64_t file_size) { + auto tracked_file = tracked_files_.find(file_path); + if (tracked_file != tracked_files_.end()) { + // File was added before, we will just update the size + total_files_size_ -= tracked_file->second; + total_files_size_ += file_size; + } else { + total_files_size_ += file_size; + } + tracked_files_[file_path] = file_size; +} + +void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) { + auto tracked_file = tracked_files_.find(file_path); + if (tracked_file == tracked_files_.end()) { + // File is not tracked + return; + } + + total_files_size_ -= tracked_file->second; + tracked_files_.erase(tracked_file); +} + +SstFileManager* NewSstFileManager(Env* env, std::shared_ptr info_log, + std::string trash_dir, + int64_t rate_bytes_per_sec, + bool delete_exisitng_trash, Status* status) { + SstFileManagerImpl* res = + new SstFileManagerImpl(env, info_log, trash_dir, rate_bytes_per_sec); + + Status s; + if (trash_dir != "" && rate_bytes_per_sec > 0) { + s = env->CreateDirIfMissing(trash_dir); + if (s.ok() && delete_exisitng_trash) { + std::vector files_in_trash; + s = env->GetChildren(trash_dir, &files_in_trash); + if (s.ok()) { + for (const std::string& trash_file : files_in_trash) { + if (trash_file == "." || trash_file == "..") { + continue; + } + + std::string path_in_trash = trash_dir + "/" + trash_file; + res->OnAddFile(path_in_trash); + Status file_delete = res->ScheduleFileDeletion(path_in_trash); + if (s.ok() && !file_delete.ok()) { + s = file_delete; + } + } + } + } + } + + if (status) { + *status = s; + } + + return res; +} + +} // namespace rocksdb diff --git a/util/sst_file_manager_impl.h b/util/sst_file_manager_impl.h new file mode 100644 index 000000000..216f7cf05 --- /dev/null +++ b/util/sst_file_manager_impl.h @@ -0,0 +1,77 @@ +// Copyright (c) 2015, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once + +#include + +#include "port/port.h" + +#include "rocksdb/sst_file_manager.h" +#include "util/delete_scheduler.h" + +namespace rocksdb { + +class Env; +class Logger; + +// SstFileManager is used to track SST files in the DB and control there +// deletion rate. +// All SstFileManager public functions are thread-safe. +class SstFileManagerImpl : public SstFileManager { + public: + explicit SstFileManagerImpl(Env* env, std::shared_ptr logger, + const std::string& trash_dir, + int64_t rate_bytes_per_sec); + + ~SstFileManagerImpl(); + + // DB will call OnAddFile whenever a new sst file is added. + Status OnAddFile(const std::string& file_path); + + // DB will call OnDeleteFile whenever an sst file is deleted. + Status OnDeleteFile(const std::string& file_path); + + // DB will call OnMoveFile whenever an sst file is move to a new path. + Status OnMoveFile(const std::string& old_path, const std::string& new_path); + + // Return the total size of all tracked files. + uint64_t GetTotalSize() override; + + // Return a map containing all tracked files and there corresponding sizes. + std::unordered_map GetTrackedFiles() override; + + // Return delete rate limit in bytes per second. + virtual int64_t GetDeleteRateBytesPerSecond() override; + + // Move file to trash directory and schedule it's deletion. + virtual Status ScheduleFileDeletion(const std::string& file_path); + + // Wait for all files being deleteing in the background to finish or for + // destructor to be called. + virtual void WaitForEmptyTrash(); + + private: + // REQUIRES: mutex locked + void OnAddFileImpl(const std::string& file_path, uint64_t file_size); + // REQUIRES: mutex locked + void OnDeleteFileImpl(const std::string& file_path); + + Env* env_; + std::shared_ptr logger_; + // Mutex to protect tracked_files_, total_files_size_ + port::Mutex mu_; + // The summation of the sizes of all files in tracked_files_ map + uint64_t total_files_size_; + // A map containing all tracked files and there sizes + // file_path => file_size + std::unordered_map tracked_files_; + // DeleteScheduler used to throttle file deletition, if SstFileManagerImpl was + // created with rate_bytes_per_sec == 0 or trash_dir == "", delete_scheduler_ + // rate limiting will be disabled and will simply delete the files. + DeleteScheduler delete_scheduler_; +}; + +} // namespace rocksdb