fast look up purge_queue (#5796)

Summary:
purge_queue_ maybe contains thousands sst files, for example manual compact a range. If full scan is triggered at the same time and the total sst files number is large, RocksDB will be blocked at https://github.com/facebook/rocksdb/blob/master/db/db_impl_files.cc#L150 for several seconds. In our environment we have 140,000 sst files and the manual compaction delete about 1000 sst files, it blocked about 2 minutes.

Commandeering https://github.com/facebook/rocksdb/issues/5290.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5796

Differential Revision: D17357775

Pulled By: riversand963

fbshipit-source-id: 20eacca917355b8de975ccc7b1c9a3e7bd5b201a
main
Yi Wu 5 years ago committed by Facebook Github Bot
parent 9a87ae46fd
commit a68d814570
  1. 26
      db/db_impl/db_impl.cc
  2. 9
      db/db_impl/db_impl.h
  3. 26
      db/db_impl/db_impl_compaction_flush.cc
  4. 11
      db/db_impl/db_impl_files.cc
  5. 4
      db/obsolete_files_test.cc

@ -1314,32 +1314,28 @@ void DBImpl::SchedulePurge() {
void DBImpl::BackgroundCallPurge() {
mutex_.Lock();
// We use one single loop to clear both queues so that after existing the loop
// both queues are empty. This is stricter than what is needed, but can make
// it easier for us to reason the correctness.
while (!purge_queue_.empty() || !logs_to_free_queue_.empty()) {
// Check logs_to_free_queue_ first and close log writers.
if (!logs_to_free_queue_.empty()) {
while (!logs_to_free_queue_.empty()) {
assert(!logs_to_free_queue_.empty());
log::Writer* log_writer = *(logs_to_free_queue_.begin());
logs_to_free_queue_.pop_front();
mutex_.Unlock();
delete log_writer;
mutex_.Lock();
} else {
auto purge_file = purge_queue_.begin();
auto fname = purge_file->fname;
auto dir_to_sync = purge_file->dir_to_sync;
auto type = purge_file->type;
auto number = purge_file->number;
auto job_id = purge_file->job_id;
purge_queue_.pop_front();
}
for (const auto& file : purge_files_) {
const PurgeFileInfo& purge_file = file.second;
const std::string& fname = purge_file.fname;
const std::string& dir_to_sync = purge_file.dir_to_sync;
FileType type = purge_file.type;
uint64_t number = purge_file.number;
int job_id = purge_file.job_id;
mutex_.Unlock();
DeleteObsoleteFileImpl(job_id, fname, dir_to_sync, type, number);
mutex_.Lock();
}
}
purge_files_.clear();
bg_purge_scheduled_--;
bg_cv_.SignalAll();

@ -347,7 +347,8 @@ class DBImpl : public DB {
uint64_t* manifest_file_size,
bool flush_memtable = true) override;
virtual Status GetSortedWalFiles(VectorLogPtr& files) override;
virtual Status GetCurrentWalFile(std::unique_ptr<LogFile>* current_log_file) override;
virtual Status GetCurrentWalFile(
std::unique_ptr<LogFile>* current_log_file) override;
virtual Status GetUpdatesSince(
SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter,
@ -1784,12 +1785,12 @@ class DBImpl : public DB {
// ColumnFamilyData::pending_compaction_ == true)
std::deque<ColumnFamilyData*> compaction_queue_;
// A queue to store filenames of the files to be purged
std::deque<PurgeFileInfo> purge_queue_;
// A map to store file numbers and filenames of the files to be purged
std::unordered_map<uint64_t, PurgeFileInfo> purge_files_;
// A vector to store the file numbers that have been assigned to certain
// JobContext. Current implementation tracks ssts only.
std::vector<uint64_t> files_grabbed_for_purge_;
std::unordered_set<uint64_t> files_grabbed_for_purge_;
// A queue to store log writers to close
std::deque<log::Writer*> logs_to_free_queue_;

@ -2090,7 +2090,7 @@ void DBImpl::SchedulePendingPurge(std::string fname, std::string dir_to_sync,
FileType type, uint64_t number, int job_id) {
mutex_.AssertHeld();
PurgeFileInfo file_info(fname, dir_to_sync, type, number, job_id);
purge_queue_.push_back(std::move(file_info));
purge_files_.insert({{number, std::move(file_info)}});
}
void DBImpl::BGWorkFlush(void* arg) {
@ -3077,34 +3077,20 @@ void DBImpl::InstallSuperVersionAndScheduleWork(
}
// ShouldPurge is called by FindObsoleteFiles when doing a full scan,
// and db mutex (mutex_) should already be held. This function performs a
// linear scan of an vector (files_grabbed_for_purge_) in search of a
// certain element. We expect FindObsoleteFiles with full scan to occur once
// every 10 hours by default, and the size of the vector is small.
// Therefore, the cost is affordable even if the mutex is held.
// and db mutex (mutex_) should already be held.
// Actually, the current implementation of FindObsoleteFiles with
// full_scan=true can issue I/O requests to obtain list of files in
// directories, e.g. env_->getChildren while holding db mutex.
// In the future, if we want to reduce the cost of search, we may try to keep
// the vector sorted.
bool DBImpl::ShouldPurge(uint64_t file_number) const {
for (auto fn : files_grabbed_for_purge_) {
if (file_number == fn) {
return false;
}
}
for (const auto& purge_file_info : purge_queue_) {
if (purge_file_info.number == file_number) {
return false;
}
}
return true;
return files_grabbed_for_purge_.find(file_number) ==
files_grabbed_for_purge_.end() &&
purge_files_.find(file_number) == purge_files_.end();
}
// MarkAsGrabbedForPurge is called by FindObsoleteFiles, and db mutex
// (mutex_) should already be held.
void DBImpl::MarkAsGrabbedForPurge(uint64_t file_number) {
files_grabbed_for_purge_.emplace_back(file_number);
files_grabbed_for_purge_.insert(file_number);
}
void DBImpl::SetSnapshotChecker(SnapshotChecker* snapshot_checker) {

@ -15,6 +15,7 @@
#include "db/memtable_list.h"
#include "file/file_util.h"
#include "file/sst_file_manager_impl.h"
#include "util/autovector.h"
namespace rocksdb {
@ -495,13 +496,15 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// After purging obsolete files, remove them from files_grabbed_for_purge_.
// Use a temporary vector to perform bulk deletion via swap.
InstrumentedMutexLock guard_lock(&mutex_);
std::vector<uint64_t> tmp;
autovector<uint64_t> to_be_removed;
for (auto fn : files_grabbed_for_purge_) {
if (files_to_del.count(fn) == 0) {
tmp.emplace_back(fn);
if (files_to_del.count(fn) != 0) {
to_be_removed.emplace_back(fn);
}
}
files_grabbed_for_purge_.swap(tmp);
for (auto fn : to_be_removed) {
files_grabbed_for_purge_.erase(fn);
}
}
// Delete old info log files.

@ -171,8 +171,8 @@ TEST_F(ObsoleteFilesTest, RaceForObsoleteFileDeletion) {
});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::CloseHelper:PendingPurgeFinished", [&](void* arg) {
std::vector<uint64_t>* files_grabbed_for_purge_ptr =
reinterpret_cast<std::vector<uint64_t>*>(arg);
std::unordered_set<uint64_t>* files_grabbed_for_purge_ptr =
reinterpret_cast<std::unordered_set<uint64_t>*>(arg);
ASSERT_TRUE(files_grabbed_for_purge_ptr->empty());
});
SyncPoint::GetInstance()->EnableProcessing();

Loading…
Cancel
Save