From 95dda37858dcabc9e73578d10b979f6932cb49f0 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 31 Oct 2012 22:01:57 -0700 Subject: [PATCH] Move filesize-based-sorting to outside the Mutex Summary: When a new version is created, we sort all the files at every level based on their size. This is necessary because we want to compact the largest file first. The sorting takes quite a bit of CPU. Moved the sorting code to be outside the mutex. Also, the earlier code was sorting files at all levels but we do not need to sort the highest-number level because those files are never the cause of any compaction. To reduce sorting costs, we sort only the first few files in each level because it is likely that those are the only files in that level that will be picked for compaction. At steady state, I have seen that this patch increase throughout from 1500 writes/sec to 1700 writes/sec at the end of a 72 hour run. The cpu saving by not sorting the last level was not distinctive in this test run because there were only 100K files in the highest numbered level. I expect the cpu saving to be significant when the number of files is much higher. This is mostly an early preview and not ready for rigorous review. With this patch, the writs/sec is now bottlenecked not by the sorting code but by GetOverlappingInputs. I am working on a patch to optimize GetOverlappingInputs. Test Plan: make check Reviewers: MarkCallaghan, heyongqiang Reviewed By: heyongqiang Differential Revision: https://reviews.facebook.net/D6411 --- db/db_impl.cc | 7 ++- db/version_set.cc | 112 +++++++++++++++++++++++++++++++--------------- db/version_set.h | 33 +++++++++++++- 3 files changed, 110 insertions(+), 42 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index d89cbdf8c..435138b87 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -414,7 +414,6 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { } void DBImpl::EvictObsoleteFiles(DeletionState& state) { - mutex_.AssertHeld(); for (unsigned int i = 0; i < state.files_to_evict.size(); i++) { table_cache_->Evict(state.files_to_evict[i]); } @@ -946,8 +945,8 @@ void DBImpl::BackgroundCall() { if (!deletion_state.live.empty()) { mutex_.Unlock(); PurgeObsoleteFiles(deletion_state); - mutex_.Lock(); EvictObsoleteFiles(deletion_state); + mutex_.Lock(); } bg_compaction_scheduled_--; @@ -1020,13 +1019,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, static_cast(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); - versions_->ReleaseCompactionFiles(c); + versions_->ReleaseCompactionFiles(c, status); *madeProgress = true; } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); CleanupCompaction(compact); - versions_->ReleaseCompactionFiles(c); + versions_->ReleaseCompactionFiles(c, status); c->ReleaseInputs(); FindObsoleteFiles(deletion_state); *madeProgress = true; diff --git a/db/version_set.cc b/db/version_set.cc index 211cc4123..7cb550d2d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -269,6 +269,7 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), next_(this), prev_(this), refs_(0), files_by_size_(vset->NumberLevels()), + next_file_to_compact_by_size_(vset->NumberLevels()), file_to_compact_(NULL), file_to_compact_level_(-1), compaction_score_(vset->NumberLevels()), @@ -677,11 +678,6 @@ class VersionSet::Builder { Version* base_; LevelState* levels_; - typedef struct fsize { - int index; - FileMetaData* file; - } Fsize; - public: // Initialize a builder with the files from *base and other info from *vset Builder(VersionSet* vset, Version* base) @@ -734,7 +730,6 @@ class VersionSet::Builder { } } } - assert(v->files_[level].size() == v->files_by_size_[level].size()); } #endif } @@ -832,7 +827,6 @@ class VersionSet::Builder { MaybeAddFile(v, level, *base_iter); } } - UpdateFilesBySize(v); CheckConsistency(v); } @@ -850,34 +844,6 @@ class VersionSet::Builder { files->push_back(f); } } - - static bool compareSize(const Fsize& first, const Fsize& second) { - return (first.file->file_size > second.file->file_size); - } - - void UpdateFilesBySize(Version* v) { - - for (int level = 0; level < vset_->NumberLevels(); level++) { - // populate a temp vector for sorting based on size - const std::vector& files = v->files_[level]; - std::vector temp(files.size()); - for (unsigned int i = 0; i < files.size(); i++) { - temp[i].index = i; - temp[i].file = files[i]; - } - - // do the sort based on file size - std::sort(temp.begin(), temp.end(), compareSize); - assert(temp.size() == files.size()); - - // initialize files_by_size_ - std::vector& files_by_size = v->files_by_size_[level]; - for (unsigned int i = 0; i < temp.size(); i++) { - files_by_size.push_back(temp[i].index); - } - assert(v->files_[level].size() == v->files_by_size_[level].size()); - } - } }; VersionSet::VersionSet(const std::string& dbname, @@ -1002,7 +968,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // because &w is ensuring that all new writes get queued. { mu->Unlock(); + + // The calles to Finalize and UpdateFilesBySize are cpu-heavy + // and is best called outside the mutex. Finalize(v); + UpdateFilesBySize(v); // Write new record to MANIFEST log if (s.ok()) { @@ -1416,6 +1386,47 @@ void VersionSet::Finalize(Version* v) { } } +// a static compator used to sort files based on their size +static bool compareSize(const VersionSet::Fsize& first, + const VersionSet::Fsize& second) { + return (first.file->file_size > second.file->file_size); +} + +// sort all files in level1 to level(n-1) based on file size +void VersionSet::UpdateFilesBySize(Version* v) { + + // No need to sort the highest level because it is never compacted. + for (int level = 0; level < NumberLevels()-1; level++) { + + const std::vector& files = v->files_[level]; + std::vector& files_by_size = v->files_by_size_[level]; + assert(files_by_size.size() == 0); + + // populate a temp vector for sorting based on size + std::vector temp(files.size()); + for (unsigned int i = 0; i < files.size(); i++) { + temp[i].index = i; + temp[i].file = files[i]; + } + + // sort the top number_of_files_to_sort_ based on file size + int num = Version::number_of_files_to_sort_; + if (num > (int)temp.size()) { + num = temp.size(); + } + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSize); + assert(temp.size() == files.size()); + + // initialize files_by_size_ + for (unsigned int i = 0; i < temp.size(); i++) { + files_by_size.push_back(temp[i].index); + } + v->next_file_to_compact_by_size_[level] = 0; + assert(v->files_[level].size() == v->files_by_size_[level].size()); + } +} + Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? @@ -1706,9 +1717,12 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { // Clear all files to indicate that they are not being compacted // Delete this compaction from the list of running compactions. -void VersionSet::ReleaseCompactionFiles(Compaction* c) { +void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) { c->MarkFilesBeingCompacted(false); compactions_in_progress_[c->level()].erase(c); + if (!status.ok()) { + c->ResetNextCompactionIndex(); + } } // The total size of files that are currently being compacted @@ -1746,12 +1760,18 @@ Compaction* VersionSet::PickCompactionBySize(int level) { // Pick the largest file in this level that is not already // being compacted std::vector& file_size = current_->files_by_size_[level]; - for (unsigned int i = 0; i < file_size.size(); i++) { + + // record the first file that is not yet compacted + int nextIndex = -1; + + for (unsigned int i = current_->next_file_to_compact_by_size_[level]; + i < file_size.size(); i++) { int index = file_size[i]; FileMetaData* f = current_->files_[level][index]; // check to verify files are arranged in descending size assert((i == file_size.size() - 1) || + (i >= VersionSet::number_of_files_to_sort_-1) || (f->file_size >= current_->files_[level][file_size[i+1]]->file_size)); // do not pick a file to compact if it is being compacted @@ -1759,6 +1779,16 @@ Compaction* VersionSet::PickCompactionBySize(int level) { if (f->being_compacted) { continue; } + + // remember the startIndex for the next call to PickCompaction + if (nextIndex == -1) { + nextIndex = i; + } + + //if (i > Version::number_of_files_to_sort_) { + // Log(options_->info_log, "XXX Looking at index %d", i); + //} + // Do not pick this file if its parents at level+1 are being compacted. // Maybe we can avoid redoing this work in SetupOtherInputs int parent_index = -1; @@ -1775,6 +1805,10 @@ Compaction* VersionSet::PickCompactionBySize(int level) { delete c; c = NULL; } + + // store where to start the iteration in the next call to PickCompaction + current_->next_file_to_compact_by_size_[level] = nextIndex; + return c; } @@ -2082,6 +2116,10 @@ void Compaction::ReleaseInputs() { } } +void Compaction::ResetNextCompactionIndex() { + input_version_->ResetNextCompactionIndex(level_); +} + static void InputSummary(std::vector& files, char* output, int len) { diff --git a/db/version_set.h b/db/version_set.h index 654ee1de5..d46a6ea2b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -152,6 +152,17 @@ class Version { // This vector stores the index of the file from files_. std::vector< std::vector > files_by_size_; + // An index into files_by_size_ that specifies the first + // file that is not yet compacted + std::vector next_file_to_compact_by_size_; + + // Only the first few entries of files_by_size_ are sorted. + // There is no need to sort all the files because it is likely + // that on a running system, we need to look at only the first + // few largest files because a new version is created every few + // seconds/minutes (because of concurrent compactions). + static const int number_of_files_to_sort_ = 50; + // Next file to compact based on seek stats. FileMetaData* file_to_compact_; int file_to_compact_level_; @@ -176,6 +187,12 @@ class Version { ~Version(); + // re-initializes the index that is used to offset into files_by_size_ + // to find the next compaction candidate file. + void ResetNextCompactionIndex(int level) { + next_file_to_compact_by_size_[level] = 0; + } + // No copying allowed Version(const Version&); void operator=(const Version&); @@ -330,7 +347,7 @@ class VersionSet { Compaction* PickCompactionBySize(int level); // Free up the files that were participated in a compaction - void ReleaseCompactionFiles(Compaction* c); + void ReleaseCompactionFiles(Compaction* c, Status status); // verify that the files that we started with for a compaction // still exist in the current version and in the same original level. @@ -338,6 +355,16 @@ class VersionSet { // pick the same files to compact. bool VerifyCompactionFileConsistency(Compaction* c); + // used to sort files by size + typedef struct fsize { + int index; + FileMetaData* file; + } Fsize; + + // Sort all files for this version based on their file size and + // record results in files_by_size_. The largest files are listed first. + void UpdateFilesBySize(Version *v); + private: class Builder; struct ManifestWriter; @@ -513,6 +540,10 @@ class Compaction { // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool); + + // In case of compaction error, reset the nextIndex that is used + // to pick up the next file to be compacted from files_by_size_ + void ResetNextCompactionIndex(); }; } // namespace leveldb