From c4e19b77e896e1100032d2f3bd46885b93d6170f Mon Sep 17 00:00:00 2001 From: omegaga Date: Tue, 21 Jun 2016 18:41:23 -0700 Subject: [PATCH] Add a read option to enable background purge when cleaning up iterators Summary: Add a read option `background_purge_on_iterator_cleanup` to avoid deleting files in foreground when destroying iterators. Instead, a job is scheduled in high priority queue and would be executed in a separate background thread. Test Plan: Add a variant of PurgeObsoleteFileTest. Turn on background purge option in the new test, and use sleeping task to ensure files are deleted in background. Reviewers: IslamAbdelRahman, sdong Reviewed By: IslamAbdelRahman Subscribers: andrewkr, dhruba Differential Revision: https://reviews.facebook.net/D59499 --- HISTORY.md | 1 + db/db_impl.cc | 146 +++++++++++++++++++++++++++++--------- db/db_impl.h | 40 +++++++++-- db/deletefile_test.cc | 99 ++++++++++++++++++++++++-- include/rocksdb/options.h | 6 ++ util/options.cc | 2 + 6 files changed, 251 insertions(+), 43 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index f2afdd959..0ced2e137 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ ### New Features * Add avoid_flush_during_recovery option. +* Add a read option background_purge_on_iterator_cleanup to avoid deleting files in foreground when destroying iterators. Instead, a job is scheduled in high priority queue and would be executed in a separate background thread. ## 4.9.0 (6/9/2016) ### Public API changes diff --git a/db/db_impl.cc b/db/db_impl.cc index 35e39ee17..8ab9777b4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -328,6 +328,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) num_running_compactions_(0), bg_flush_scheduled_(0), num_running_flushes_(0), + bg_purge_scheduled_(0), disable_delete_obsolete_files_(0), delete_obsolete_files_next_run_( options.env->NowMicros() + @@ -407,7 +408,9 @@ DBImpl::~DBImpl() { bg_flush_scheduled_ -= flushes_unscheduled; // Wait for background work to finish - while (bg_compaction_scheduled_ || bg_flush_scheduled_) { + while (bg_compaction_scheduled_ || bg_flush_scheduled_ || + bg_purge_scheduled_) { + TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob"); bg_cv_.Wait(); } EraseThreadStatusDbInfo(); @@ -880,11 +883,42 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, } }; // namespace +// Delete obsolete files and log status and information of file deletion +void DBImpl::DeleteObsoleteFileImpl(Status file_deletion_status, int job_id, + const std::string& fname, FileType type, + uint64_t number, uint32_t path_id) { + if (type == kTableFile) { + file_deletion_status = DeleteSSTFile(&db_options_, fname, path_id); + } else { + file_deletion_status = env_->DeleteFile(fname); + } + if (file_deletion_status.ok()) { + Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, + "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id, + fname.c_str(), type, number, file_deletion_status.ToString().c_str()); + } else if (env_->FileExists(fname).IsNotFound()) { + Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, + "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64 + " -- %s\n", + job_id, fname.c_str(), type, number, + file_deletion_status.ToString().c_str()); + } else { + Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, + "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n", job_id, + fname.c_str(), type, number, file_deletion_status.ToString().c_str()); + } + if (type == kTableFile) { + EventHelpers::LogAndNotifyTableFileDeletion( + &event_logger_, job_id, number, fname, file_deletion_status, GetName(), + db_options_.listeners); + } +} + // Diffs the files listed in filenames and those that do not // belong to live files are posibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. // It is not necessary to hold the mutex when invoking this method. -void DBImpl::PurgeObsoleteFiles(const JobContext& state) { +void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { // we'd better have sth to delete assert(state.HaveSomethingToDelete()); @@ -1012,33 +1046,12 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state) { } #endif // !ROCKSDB_LITE Status file_deletion_status; - if (type == kTableFile) { - file_deletion_status = DeleteSSTFile(&db_options_, fname, path_id); + if (schedule_only) { + InstrumentedMutexLock guard_lock(&mutex_); + SchedulePendingPurge(fname, type, number, path_id, state.job_id); } else { - file_deletion_status = env_->DeleteFile(fname); - } - if (file_deletion_status.ok()) { - Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, - "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", state.job_id, - fname.c_str(), type, number, - file_deletion_status.ToString().c_str()); - } else if (env_->FileExists(fname).IsNotFound()) { - Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, - "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64 - " -- %s\n", - state.job_id, fname.c_str(), type, number, - file_deletion_status.ToString().c_str()); - } else { - Log(InfoLogLevel::ERROR_LEVEL, db_options_.info_log, - "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n", - state.job_id, fname.c_str(), type, number, - file_deletion_status.ToString().c_str()); - } - if (type == kTableFile) { - EventHelpers::LogAndNotifyTableFileDeletion( - &event_logger_, state.job_id, number, fname, - file_deletion_status, GetName(), - db_options_.listeners); + DeleteObsoleteFileImpl(file_deletion_status, state.job_id, fname, type, + number, path_id); } } @@ -2800,6 +2813,15 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { } } +void DBImpl::SchedulePurge() { + mutex_.AssertHeld(); + assert(opened_successfully_); + + // Purge operations are put into High priority queue + bg_purge_scheduled_++; + env_->Schedule(&DBImpl::BGWorkPurge, this, Env::Priority::HIGH, nullptr); +} + int DBImpl::BGCompactionsAllowed() const { if (write_controller_.NeedSpeedupCompaction()) { return db_options_.max_background_compactions; @@ -2854,6 +2876,14 @@ void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { } } +void DBImpl::SchedulePendingPurge(std::string fname, FileType type, + uint64_t number, uint32_t path_id, + int job_id) { + mutex_.AssertHeld(); + PurgeFileInfo file_info(fname, type, number, path_id, job_id); + purge_queue_.push_back(std::move(file_info)); +} + void DBImpl::BGWorkFlush(void* db) { IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); TEST_SYNC_POINT("DBImpl::BGWorkFlush"); @@ -2869,6 +2899,12 @@ void DBImpl::BGWorkCompaction(void* arg) { reinterpret_cast(ca.db)->BackgroundCallCompaction(ca.m); } +void DBImpl::BGWorkPurge(void* db) { + IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH); + TEST_SYNC_POINT("DBImpl::BGWorkPurge"); + reinterpret_cast(db)->BackgroundCallPurge(); +} + void DBImpl::UnscheduleCallback(void* arg) { CompactionArg ca = *(reinterpret_cast(arg)); delete reinterpret_cast(arg); @@ -2878,6 +2914,34 @@ void DBImpl::UnscheduleCallback(void* arg) { TEST_SYNC_POINT("DBImpl::UnscheduleCallback"); } +void DBImpl::BackgroundCallPurge() { + mutex_.Lock(); + + while (!purge_queue_.empty()) { + auto purge_file = purge_queue_.begin(); + auto fname = purge_file->fname; + auto type = purge_file->type; + auto number = purge_file->number; + auto path_id = purge_file->path_id; + auto job_id = purge_file->job_id; + purge_queue_.pop_front(); + + mutex_.Unlock(); + Status file_deletion_status; + DeleteObsoleteFileImpl(file_deletion_status, job_id, fname, type, number, + path_id); + mutex_.Lock(); + } + bg_purge_scheduled_--; + + bg_cv_.SignalAll(); + // IMPORTANT:there should be no code after calling SignalAll. This call may + // signal the DB destructor that it's OK to proceed with destruction. In + // that case, all DB variables will be dealloacated and referencing them + // will cause trouble. + mutex_.Unlock(); +} + Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); @@ -3477,12 +3541,17 @@ bool DBImpl::MCOverlap(ManualCompaction* m, ManualCompaction* m1) { namespace { struct IterState { - IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version) - : db(_db), mu(_mu), super_version(_super_version) {} + IterState(DBImpl* _db, InstrumentedMutex* _mu, SuperVersion* _super_version, + const ReadOptions* _read_options) + : db(_db), + mu(_mu), + super_version(_super_version), + read_options(_read_options) {} DBImpl* db; InstrumentedMutex* mu; SuperVersion* super_version; + const ReadOptions* read_options; }; static void CleanupIteratorState(void* arg1, void* arg2) { @@ -3492,6 +3561,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) { // Job id == 0 means that this is not our background process, but rather // user thread JobContext job_context(0); + bool background_purge = + state->read_options->background_purge_on_iterator_cleanup; state->mu->Lock(); state->super_version->Cleanup(); @@ -3500,7 +3571,17 @@ static void CleanupIteratorState(void* arg1, void* arg2) { delete state->super_version; if (job_context.HaveSomethingToDelete()) { - state->db->PurgeObsoleteFiles(job_context); + if (background_purge) { + // PurgeObsoleteFiles here does not delete files. Instead, it adds the + // files to be deleted to a job queue, and deletes it in a separate + // background thread. + state->db->PurgeObsoleteFiles(job_context, true /* schedule only */); + state->mu->Lock(); + state->db->SchedulePurge(); + state->mu->Unlock(); + } else { + state->db->PurgeObsoleteFiles(job_context); + } } job_context.Clean(); } @@ -3526,7 +3607,8 @@ InternalIterator* DBImpl::NewInternalIterator(const ReadOptions& read_options, super_version->current->AddIterators(read_options, env_options_, &merge_iter_builder); internal_iter = merge_iter_builder.Finish(); - IterState* cleanup = new IterState(this, &mutex_, super_version); + IterState* cleanup = + new IterState(this, &mutex_, super_version, &read_options); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); return internal_iter; diff --git a/db/db_impl.h b/db/db_impl.h index 3c432582e..6a39ba5fe 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -374,7 +374,10 @@ class DBImpl : public DB { // belong to live files are posibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. // It is not necessary to hold the mutex when invoking this method. - void PurgeObsoleteFiles(const JobContext& background_contet); + void PurgeObsoleteFiles(const JobContext& background_contet, + bool schedule_only = false); + + void SchedulePurge(); ColumnFamilyHandle* DefaultColumnFamily() const override; @@ -554,6 +557,8 @@ class DBImpl : public DB { struct WriteContext; + struct PurgeFileInfo; + Status NewDB(); // Recover the descriptor from persistent storage. May do a significant @@ -569,6 +574,10 @@ class DBImpl : public DB { // Delete any unneeded files and stale in-memory entries. void DeleteObsoleteFiles(); + // Delete obsolete files and log status and information of file deletion + void DeleteObsoleteFileImpl(Status file_deletion_status, int job_id, + const std::string& fname, FileType type, + uint64_t number, uint32_t path_id); // Background process needs to call // auto x = CaptureCurrentFileNumberInPendingOutputs() @@ -640,11 +649,15 @@ class DBImpl : public DB { void MaybeScheduleFlushOrCompaction(); void SchedulePendingFlush(ColumnFamilyData* cfd); void SchedulePendingCompaction(ColumnFamilyData* cfd); + void SchedulePendingPurge(std::string fname, FileType type, uint64_t number, + uint32_t path_id, int job_id); static void BGWorkCompaction(void* arg); static void BGWorkFlush(void* db); + static void BGWorkPurge(void* arg); static void UnscheduleCallback(void* arg); void BackgroundCallCompaction(void* arg); void BackgroundCallFlush(); + void BackgroundCallPurge(); Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer, void* m = 0); Status BackgroundFlush(bool* madeProgress, JobContext* job_context, @@ -695,9 +708,9 @@ class DBImpl : public DB { // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't // made any progress // * whenever a compaction made any progress - // * whenever bg_flush_scheduled_ value decreases (i.e. whenever a flush is - // done, even if it didn't make any progress) - // * whenever there is an error in background flush or compaction + // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases + // (i.e. whenever a flush is done, even if it didn't make any progress) + // * whenever there is an error in background purge, flush or compaction InstrumentedCondVar bg_cv_; uint64_t logfile_number_; std::deque @@ -817,6 +830,19 @@ class DBImpl : public DB { // State is protected with db mutex. std::list pending_outputs_; + // PurgeFileInfo is a structure to hold information of files to be deleted in + // purge_queue_ + struct PurgeFileInfo { + std::string fname; + FileType type; + uint64_t number; + uint32_t path_id; + int job_id; + PurgeFileInfo(std::string fn, FileType t, uint64_t num, uint32_t pid, + int jid) + : fname(fn), type(t), number(num), path_id(pid), job_id(jid) {} + }; + // flush_queue_ and compaction_queue_ hold column families that we need to // flush and compact, respectively. // A column family is inserted into flush_queue_ when it satisfies condition @@ -841,6 +867,9 @@ class DBImpl : public DB { // invariant(column family present in compaction_queue_ <==> // ColumnFamilyData::pending_compaction_ == true) std::deque compaction_queue_; + + // A queue to store filenames of the files to be purged + std::deque purge_queue_; int unscheduled_flushes_; int unscheduled_compactions_; @@ -856,6 +885,9 @@ class DBImpl : public DB { // stores the number of flushes are currently running int num_running_flushes_; + // number of background obsolete file purge jobs, submitted to the HIGH pool + int bg_purge_scheduled_; + // Information for a manual compaction struct ManualCompaction { ColumnFamilyData* cfd; diff --git a/db/deletefile_test.cc b/db/deletefile_test.cc index 57fafa5e7..cd284ee72 100644 --- a/db/deletefile_test.cc +++ b/db/deletefile_test.cc @@ -9,20 +9,21 @@ #ifndef ROCKSDB_LITE -#include "rocksdb/db.h" +#include +#include +#include +#include #include "db/db_impl.h" #include "db/filename.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/transaction_log.h" #include "util/string_util.h" +#include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" -#include "rocksdb/env.h" -#include "rocksdb/transaction_log.h" -#include -#include -#include -#include namespace rocksdb { @@ -152,6 +153,15 @@ class DeleteFileTest : public testing::Test { ASSERT_EQ(required_manifest, manifest_cnt); } + static void DoSleep(void* arg) { + auto test = reinterpret_cast(arg); + test->env_->SleepForMicroseconds(2 * 1000 * 1000); + } + + // An empty job to guard all jobs are processed + static void GuardFinish(void* arg) { + TEST_SYNC_POINT("DeleteFileTest::GuardFinish"); + } }; TEST_F(DeleteFileTest, AddKeysAndQueryLevels) { @@ -231,6 +241,81 @@ TEST_F(DeleteFileTest, PurgeObsoleteFilesTest) { CloseDB(); } +TEST_F(DeleteFileTest, BackgroundPurgeTest) { + std::string first("0"), last("999999"); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; + Slice first_slice(first), last_slice(last); + + // We keep an iterator alive + Iterator* itr = 0; + CreateTwoLevels(); + ReadOptions options; + options.background_purge_on_iterator_cleanup = true; + itr = db_->NewIterator(options); + db_->CompactRange(compact_options, &first_slice, &last_slice); + // 3 sst after compaction with live iterator + CheckFileTypeCounts(dbname_, 0, 3, 1); + test::SleepingBackgroundTask sleeping_task_before; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_before, Env::Priority::HIGH); + delete itr; + test::SleepingBackgroundTask sleeping_task_after; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + &sleeping_task_after, Env::Priority::HIGH); + + // Make sure no purges are executed foreground + CheckFileTypeCounts(dbname_, 0, 3, 1); + sleeping_task_before.WakeUp(); + sleeping_task_before.WaitUntilDone(); + + // Make sure all background purges are executed + sleeping_task_after.WakeUp(); + sleeping_task_after.WaitUntilDone(); + // 1 sst after iterator deletion + CheckFileTypeCounts(dbname_, 0, 1, 1); + + CloseDB(); +} + +TEST_F(DeleteFileTest, BackgroundPurgeTestMultipleJobs) { + std::string first("0"), last("999999"); + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 2; + Slice first_slice(first), last_slice(last); + + // We keep an iterator alive + CreateTwoLevels(); + ReadOptions options; + options.background_purge_on_iterator_cleanup = true; + Iterator* itr1 = db_->NewIterator(options); + CreateTwoLevels(); + Iterator* itr2 = db_->NewIterator(options); + db_->CompactRange(compact_options, &first_slice, &last_slice); + // 5 sst files after 2 compactions with 2 live iterators + CheckFileTypeCounts(dbname_, 0, 5, 1); + + // ~DBImpl should wait until all BGWorkPurge are finished + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::~DBImpl:WaitJob", "DBImpl::BGWorkPurge"}, + {"DeleteFileTest::GuardFinish", + "DeleteFileTest::BackgroundPurgeTestMultipleJobs:DBClose"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + delete itr1; + env_->Schedule(&DeleteFileTest::DoSleep, this, Env::Priority::HIGH); + delete itr2; + env_->Schedule(&DeleteFileTest::GuardFinish, nullptr, Env::Priority::HIGH); + CloseDB(); + + TEST_SYNC_POINT("DeleteFileTest::BackgroundPurgeTestMultipleJobs:DBClose"); + // 1 sst after iterator deletion + CheckFileTypeCounts(dbname_, 0, 1, 1); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DeleteFileTest, DeleteFileWithIterator) { CreateTwoLevels(); ReadOptions options; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 4dbd585cf..501d99aed 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1487,6 +1487,12 @@ struct ReadOptions { // Default: false bool pin_data; + // If true, when PurgeObsoleteFile is called in CleanupIteratorState, we + // schedule a background job in the flush job queue and delete obsolete files + // in background. + // Default: false + bool background_purge_on_iterator_cleanup; + // If non-zero, NewIterator will create a new table reader which // performs reads of the given size. Using a large size (> 2MB) can // improve the performance of forward iteration on spinning disks. diff --git a/util/options.cc b/util/options.cc index 50a1af2f9..6bb3be4a9 100644 --- a/util/options.cc +++ b/util/options.cc @@ -811,6 +811,7 @@ ReadOptions::ReadOptions() total_order_seek(false), prefix_same_as_start(false), pin_data(false), + background_purge_on_iterator_cleanup(false), readahead_size(0) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this)); @@ -827,6 +828,7 @@ ReadOptions::ReadOptions(bool cksum, bool cache) total_order_seek(false), prefix_same_as_start(false), pin_data(false), + background_purge_on_iterator_cleanup(false), readahead_size(0) { XFUNC_TEST("", "managed_options", managed_options, xf_manage_options, reinterpret_cast(this));