diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 710fde5b4..dde80d451 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1314,32 +1314,28 @@ void DBImpl::SchedulePurge() { void DBImpl::BackgroundCallPurge() { mutex_.Lock(); - // We use one single loop to clear both queues so that after existing the loop - // both queues are empty. This is stricter than what is needed, but can make - // it easier for us to reason the correctness. - while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) { - // Check logs_to_free_queue_ first and close log writers. - if (!logs_to_free_queue_.empty()) { - assert(!logs_to_free_queue_.empty()); - log::Writer* log_writer = *(logs_to_free_queue_.begin()); - logs_to_free_queue_.pop_front(); - mutex_.Unlock(); - delete log_writer; - mutex_.Lock(); - } else { - auto purge_file = purge_queue_.begin(); - auto fname = purge_file->fname; - auto dir_to_sync = purge_file->dir_to_sync; - auto type = purge_file->type; - auto number = purge_file->number; - auto job_id = purge_file->job_id; - purge_queue_.pop_front(); + while (!logs_to_free_queue_.empty()) { + assert(!logs_to_free_queue_.empty()); + log::Writer* log_writer = *(logs_to_free_queue_.begin()); + logs_to_free_queue_.pop_front(); + mutex_.Unlock(); + delete log_writer; + mutex_.Lock(); + } + for (const auto& file : purge_files_) { + const PurgeFileInfo& purge_file = file.second; + const std::string& fname = purge_file.fname; + const std::string& dir_to_sync = purge_file.dir_to_sync; + FileType type = purge_file.type; + uint64_t number = purge_file.number; + int job_id = purge_file.job_id; - mutex_.Unlock(); - DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number); - mutex_.Lock(); - } + mutex_.Unlock(); + DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number); + mutex_.Lock(); } + purge_files_.clear(); + bg_purge_scheduled_--; bg_cv_.SignalAll(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 3552a7cf2..84acc8452 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -347,7 +347,8 @@ class DBImpl : public DB { uint64_t* manifest_file_size, bool flush_memtable = true) override; virtual Status GetSortedWalFiles(VectorLogPtr& files) override; - virtual Status GetCurrentWalFile(std::unique_ptr* current_log_file) override; + virtual Status GetCurrentWalFile( + std::unique_ptr* current_log_file) override; virtual Status GetUpdatesSince( SequenceNumber seq_number, std::unique_ptr* iter, @@ -1784,12 +1785,12 @@ class DBImpl : public DB { // ColumnFamilyData::pending_compaction_ == true) std::deque compaction_queue_; - // A queue to store filenames of the files to be purged - std::deque purge_queue_; + // A map to store file numbers and filenames of the files to be purged + std::unordered_map purge_files_; // A vector to store the file numbers that have been assigned to certain // JobContext. Current implementation tracks ssts only. - std::vector files_grabbed_for_purge_; + std::unordered_set files_grabbed_for_purge_; // A queue to store log writers to close std::deque logs_to_free_queue_; diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 724d54f09..d8e8a5f8d 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -2090,7 +2090,7 @@ void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync, FileType type, uint64_t number, int job_id) { mutex_.AssertHeld(); PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id); - purge_queue_.push_back(std::move(file_info)); + purge_files_.insert({{number, std::move(file_info)}}); } void DBImpl::BGWorkFlush(void* arg) { @@ -3077,34 +3077,20 @@ void DBImpl::InstallSuperVersionAndScheduleWork( } // ShouldPurge is called by FindObsoleteFiles when doing a full scan, -// and db mutex (mutex_) should already be held. This function performs a -// linear scan of an vector (files_grabbed_for_purge_) in search of a -// certain element. We expect FindObsoleteFiles with full scan to occur once -// every 10 hours by default, and the size of the vector is small. -// Therefore, the cost is affordable even if the mutex is held. +// and db mutex (mutex_) should already be held. // Actually, the current implementation of FindObsoleteFiles with // full_scan=true can issue I/O requests to obtain list of files in // directories, e.g. env_->getChildren while holding db mutex. -// In the future, if we want to reduce the cost of search, we may try to keep -// the vector sorted. bool DBImpl::ShouldPurge(uint64_t file_number) const { - for (auto fn : files_grabbed_for_purge_) { - if (file_number == fn) { - return false; - } - } - for (const auto& purge_file_info : purge_queue_) { - if (purge_file_info.number == file_number) { - return false; - } - } - return true; + return files_grabbed_for_purge_.find(file_number) == + files_grabbed_for_purge_.end() && + purge_files_.find(file_number) == purge_files_.end(); } // MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex // (mutex_) should already be held. void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) { - files_grabbed_for_purge_.emplace_back(file_number); + files_grabbed_for_purge_.insert(file_number); } void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) { diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 3c5fd4fcd..670d8e731 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -15,6 +15,7 @@ #include "db/memtable_list.h" #include "file/file_util.h" #include "file/sst_file_manager_impl.h" +#include "util/autovector.h" namespace rocksdb { @@ -495,13 +496,15 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // After purging obsolete files, remove them from files_grabbed_for_purge_. // Use a temporary vector to perform bulk deletion via swap. InstrumentedMutexLock guard_lock(&mutex_); - std::vector tmp; + autovector to_be_removed; for (auto fn : files_grabbed_for_purge_) { - if (files_to_del.count(fn) == 0) { - tmp.emplace_back(fn); + if (files_to_del.count(fn) != 0) { + to_be_removed.emplace_back(fn); } } - files_grabbed_for_purge_.swap(tmp); + for (auto fn : to_be_removed) { + files_grabbed_for_purge_.erase(fn); + } } // Delete old info log files. diff --git a/db/obsolete_files_test.cc b/db/obsolete_files_test.cc index 3a78869c9..305d1e3a3 100644 --- a/db/obsolete_files_test.cc +++ b/db/obsolete_files_test.cc @@ -171,8 +171,8 @@ TEST_F(ObsoleteFilesTest, RaceForObsoleteFileDeletion) { }); SyncPoint::GetInstance()->SetCallBack( "DBImpl::CloseHelper:PendingPurgeFinished", [&](void* arg) { - std::vector* files_grabbed_for_purge_ptr = - reinterpret_cast*>(arg); + std::unordered_set* files_grabbed_for_purge_ptr = + reinterpret_cast*>(arg); ASSERT_TRUE(files_grabbed_for_purge_ptr->empty()); }); SyncPoint::GetInstance()->EnableProcessing();