diff --git a/db/db_impl.cc b/db/db_impl.cc index d89cbdf8c..435138b87 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -414,7 +414,6 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { } void DBImpl::EvictObsoleteFiles(DeletionState& state) { - mutex_.AssertHeld(); for (unsigned int i = 0; i < state.files_to_evict.size(); i++) { table_cache_->Evict(state.files_to_evict[i]); } @@ -946,8 +945,8 @@ void DBImpl::BackgroundCall() { if (!deletion_state.live.empty()) { mutex_.Unlock(); PurgeObsoleteFiles(deletion_state); - mutex_.Lock(); EvictObsoleteFiles(deletion_state); + mutex_.Lock(); } bg_compaction_scheduled_--; @@ -1020,13 +1019,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, static_cast(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); - versions_->ReleaseCompactionFiles(c); + versions_->ReleaseCompactionFiles(c, status); *madeProgress = true; } else { CompactionState* compact = new CompactionState(c); status = DoCompactionWork(compact); CleanupCompaction(compact); - versions_->ReleaseCompactionFiles(c); + versions_->ReleaseCompactionFiles(c, status); c->ReleaseInputs(); FindObsoleteFiles(deletion_state); *madeProgress = true; diff --git a/db/version_set.cc b/db/version_set.cc index 211cc4123..7cb550d2d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -269,6 +269,7 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), next_(this), prev_(this), refs_(0), files_by_size_(vset->NumberLevels()), + next_file_to_compact_by_size_(vset->NumberLevels()), file_to_compact_(NULL), file_to_compact_level_(-1), compaction_score_(vset->NumberLevels()), @@ -677,11 +678,6 @@ class VersionSet::Builder { Version* base_; LevelState* levels_; - typedef struct fsize { - int index; - FileMetaData* file; - } Fsize; - public: // Initialize a builder with the files from *base and other info from *vset Builder(VersionSet* vset, Version* base) @@ -734,7 +730,6 @@ class VersionSet::Builder { } } } - assert(v->files_[level].size() == v->files_by_size_[level].size()); } #endif } @@ -832,7 +827,6 @@ class VersionSet::Builder { MaybeAddFile(v, level, *base_iter); } } - UpdateFilesBySize(v); CheckConsistency(v); } @@ -850,34 +844,6 @@ class VersionSet::Builder { files->push_back(f); } } - - static bool compareSize(const Fsize& first, const Fsize& second) { - return (first.file->file_size > second.file->file_size); - } - - void UpdateFilesBySize(Version* v) { - - for (int level = 0; level < vset_->NumberLevels(); level++) { - // populate a temp vector for sorting based on size - const std::vector& files = v->files_[level]; - std::vector temp(files.size()); - for (unsigned int i = 0; i < files.size(); i++) { - temp[i].index = i; - temp[i].file = files[i]; - } - - // do the sort based on file size - std::sort(temp.begin(), temp.end(), compareSize); - assert(temp.size() == files.size()); - - // initialize files_by_size_ - std::vector& files_by_size = v->files_by_size_[level]; - for (unsigned int i = 0; i < temp.size(); i++) { - files_by_size.push_back(temp[i].index); - } - assert(v->files_[level].size() == v->files_by_size_[level].size()); - } - } }; VersionSet::VersionSet(const std::string& dbname, @@ -1002,7 +968,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu, // because &w is ensuring that all new writes get queued. { mu->Unlock(); + + // The calles to Finalize and UpdateFilesBySize are cpu-heavy + // and is best called outside the mutex. Finalize(v); + UpdateFilesBySize(v); // Write new record to MANIFEST log if (s.ok()) { @@ -1416,6 +1386,47 @@ void VersionSet::Finalize(Version* v) { } } +// a static compator used to sort files based on their size +static bool compareSize(const VersionSet::Fsize& first, + const VersionSet::Fsize& second) { + return (first.file->file_size > second.file->file_size); +} + +// sort all files in level1 to level(n-1) based on file size +void VersionSet::UpdateFilesBySize(Version* v) { + + // No need to sort the highest level because it is never compacted. + for (int level = 0; level < NumberLevels()-1; level++) { + + const std::vector& files = v->files_[level]; + std::vector& files_by_size = v->files_by_size_[level]; + assert(files_by_size.size() == 0); + + // populate a temp vector for sorting based on size + std::vector temp(files.size()); + for (unsigned int i = 0; i < files.size(); i++) { + temp[i].index = i; + temp[i].file = files[i]; + } + + // sort the top number_of_files_to_sort_ based on file size + int num = Version::number_of_files_to_sort_; + if (num > (int)temp.size()) { + num = temp.size(); + } + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSize); + assert(temp.size() == files.size()); + + // initialize files_by_size_ + for (unsigned int i = 0; i < temp.size(); i++) { + files_by_size.push_back(temp[i].index); + } + v->next_file_to_compact_by_size_[level] = 0; + assert(v->files_[level].size() == v->files_by_size_[level].size()); + } +} + Status VersionSet::WriteSnapshot(log::Writer* log) { // TODO: Break up into multiple records to reduce memory usage on recovery? @@ -1706,9 +1717,12 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) { // Clear all files to indicate that they are not being compacted // Delete this compaction from the list of running compactions. -void VersionSet::ReleaseCompactionFiles(Compaction* c) { +void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) { c->MarkFilesBeingCompacted(false); compactions_in_progress_[c->level()].erase(c); + if (!status.ok()) { + c->ResetNextCompactionIndex(); + } } // The total size of files that are currently being compacted @@ -1746,12 +1760,18 @@ Compaction* VersionSet::PickCompactionBySize(int level) { // Pick the largest file in this level that is not already // being compacted std::vector& file_size = current_->files_by_size_[level]; - for (unsigned int i = 0; i < file_size.size(); i++) { + + // record the first file that is not yet compacted + int nextIndex = -1; + + for (unsigned int i = current_->next_file_to_compact_by_size_[level]; + i < file_size.size(); i++) { int index = file_size[i]; FileMetaData* f = current_->files_[level][index]; // check to verify files are arranged in descending size assert((i == file_size.size() - 1) || + (i >= VersionSet::number_of_files_to_sort_-1) || (f->file_size >= current_->files_[level][file_size[i+1]]->file_size)); // do not pick a file to compact if it is being compacted @@ -1759,6 +1779,16 @@ Compaction* VersionSet::PickCompactionBySize(int level) { if (f->being_compacted) { continue; } + + // remember the startIndex for the next call to PickCompaction + if (nextIndex == -1) { + nextIndex = i; + } + + //if (i > Version::number_of_files_to_sort_) { + // Log(options_->info_log, "XXX Looking at index %d", i); + //} + // Do not pick this file if its parents at level+1 are being compacted. // Maybe we can avoid redoing this work in SetupOtherInputs int parent_index = -1; @@ -1775,6 +1805,10 @@ Compaction* VersionSet::PickCompactionBySize(int level) { delete c; c = NULL; } + + // store where to start the iteration in the next call to PickCompaction + current_->next_file_to_compact_by_size_[level] = nextIndex; + return c; } @@ -2082,6 +2116,10 @@ void Compaction::ReleaseInputs() { } } +void Compaction::ResetNextCompactionIndex() { + input_version_->ResetNextCompactionIndex(level_); +} + static void InputSummary(std::vector& files, char* output, int len) { diff --git a/db/version_set.h b/db/version_set.h index 654ee1de5..d46a6ea2b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -152,6 +152,17 @@ class Version { // This vector stores the index of the file from files_. std::vector< std::vector > files_by_size_; + // An index into files_by_size_ that specifies the first + // file that is not yet compacted + std::vector next_file_to_compact_by_size_; + + // Only the first few entries of files_by_size_ are sorted. + // There is no need to sort all the files because it is likely + // that on a running system, we need to look at only the first + // few largest files because a new version is created every few + // seconds/minutes (because of concurrent compactions). + static const int number_of_files_to_sort_ = 50; + // Next file to compact based on seek stats. FileMetaData* file_to_compact_; int file_to_compact_level_; @@ -176,6 +187,12 @@ class Version { ~Version(); + // re-initializes the index that is used to offset into files_by_size_ + // to find the next compaction candidate file. + void ResetNextCompactionIndex(int level) { + next_file_to_compact_by_size_[level] = 0; + } + // No copying allowed Version(const Version&); void operator=(const Version&); @@ -330,7 +347,7 @@ class VersionSet { Compaction* PickCompactionBySize(int level); // Free up the files that were participated in a compaction - void ReleaseCompactionFiles(Compaction* c); + void ReleaseCompactionFiles(Compaction* c, Status status); // verify that the files that we started with for a compaction // still exist in the current version and in the same original level. @@ -338,6 +355,16 @@ class VersionSet { // pick the same files to compact. bool VerifyCompactionFileConsistency(Compaction* c); + // used to sort files by size + typedef struct fsize { + int index; + FileMetaData* file; + } Fsize; + + // Sort all files for this version based on their file size and + // record results in files_by_size_. The largest files are listed first. + void UpdateFilesBySize(Version *v); + private: class Builder; struct ManifestWriter; @@ -513,6 +540,10 @@ class Compaction { // mark (or clear) all files that are being compacted void MarkFilesBeingCompacted(bool); + + // In case of compaction error, reset the nextIndex that is used + // to pick up the next file to be compacted from files_by_size_ + void ResetNextCompactionIndex(); }; } // namespace leveldb