Move filesize-based-sorting to outside the Mutex

Summary:
When a new version is created, we sort all the files at every
level based on their size. This is necessary because we want
to compact the largest file first. The sorting takes quite a
bit of CPU.

Moved the sorting code to be outside the mutex. Also, the
earlier code was sorting files at all levels but we do not
need to sort the highest-number level because those files
are never the cause of any compaction. To reduce sorting
costs, we sort only the first few files in each level
because it is likely that those are the only files in that
level that will be picked for compaction.

At steady state, I have seen that this patch increase
throughout from 1500 writes/sec to 1700 writes/sec at the
end of a 72 hour run. The cpu saving by not sorting the
last level was not distinctive in this test run because
there were only 100K files in the highest numbered level.
I expect the cpu saving to be significant when the number of
files is much higher.

This is mostly an early preview and not ready for rigorous review.

With this patch, the writs/sec is now bottlenecked not by the sorting code but by GetOverlappingInputs. I am working on a patch to optimize GetOverlappingInputs.

Test Plan: make check

Reviewers: MarkCallaghan, heyongqiang

Reviewed By: heyongqiang

Differential Revision: https://reviews.facebook.net/D6411
main
Dhruba Borthakur 12 years ago
parent 18cb6004d2
commit 95dda37858
  1. 7
      db/db_impl.cc
  2. 112
      db/version_set.cc
  3. 33
      db/version_set.h

@ -414,7 +414,6 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) {
} }
void DBImpl::EvictObsoleteFiles(DeletionState& state) { void DBImpl::EvictObsoleteFiles(DeletionState& state) {
mutex_.AssertHeld();
for (unsigned int i = 0; i < state.files_to_evict.size(); i++) { for (unsigned int i = 0; i < state.files_to_evict.size(); i++) {
table_cache_->Evict(state.files_to_evict[i]); table_cache_->Evict(state.files_to_evict[i]);
} }
@ -946,8 +945,8 @@ void DBImpl::BackgroundCall() {
if (!deletion_state.live.empty()) { if (!deletion_state.live.empty()) {
mutex_.Unlock(); mutex_.Unlock();
PurgeObsoleteFiles(deletion_state); PurgeObsoleteFiles(deletion_state);
mutex_.Lock();
EvictObsoleteFiles(deletion_state); EvictObsoleteFiles(deletion_state);
mutex_.Lock();
} }
bg_compaction_scheduled_--; bg_compaction_scheduled_--;
@ -1020,13 +1019,13 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
static_cast<unsigned long long>(f->file_size), static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), status.ToString().c_str(),
versions_->LevelSummary(&tmp)); versions_->LevelSummary(&tmp));
versions_->ReleaseCompactionFiles(c); versions_->ReleaseCompactionFiles(c, status);
*madeProgress = true; *madeProgress = true;
} else { } else {
CompactionState* compact = new CompactionState(c); CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact); status = DoCompactionWork(compact);
CleanupCompaction(compact); CleanupCompaction(compact);
versions_->ReleaseCompactionFiles(c); versions_->ReleaseCompactionFiles(c, status);
c->ReleaseInputs(); c->ReleaseInputs();
FindObsoleteFiles(deletion_state); FindObsoleteFiles(deletion_state);
*madeProgress = true; *madeProgress = true;

@ -269,6 +269,7 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
Version::Version(VersionSet* vset, uint64_t version_number) Version::Version(VersionSet* vset, uint64_t version_number)
: vset_(vset), next_(this), prev_(this), refs_(0), : vset_(vset), next_(this), prev_(this), refs_(0),
files_by_size_(vset->NumberLevels()), files_by_size_(vset->NumberLevels()),
next_file_to_compact_by_size_(vset->NumberLevels()),
file_to_compact_(NULL), file_to_compact_(NULL),
file_to_compact_level_(-1), file_to_compact_level_(-1),
compaction_score_(vset->NumberLevels()), compaction_score_(vset->NumberLevels()),
@ -677,11 +678,6 @@ class VersionSet::Builder {
Version* base_; Version* base_;
LevelState* levels_; LevelState* levels_;
typedef struct fsize {
int index;
FileMetaData* file;
} Fsize;
public: public:
// Initialize a builder with the files from *base and other info from *vset // Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base) Builder(VersionSet* vset, Version* base)
@ -734,7 +730,6 @@ class VersionSet::Builder {
} }
} }
} }
assert(v->files_[level].size() == v->files_by_size_[level].size());
} }
#endif #endif
} }
@ -832,7 +827,6 @@ class VersionSet::Builder {
MaybeAddFile(v, level, *base_iter); MaybeAddFile(v, level, *base_iter);
} }
} }
UpdateFilesBySize(v);
CheckConsistency(v); CheckConsistency(v);
} }
@ -850,34 +844,6 @@ class VersionSet::Builder {
files->push_back(f); files->push_back(f);
} }
} }
static bool compareSize(const Fsize& first, const Fsize& second) {
return (first.file->file_size > second.file->file_size);
}
void UpdateFilesBySize(Version* v) {
for (int level = 0; level < vset_->NumberLevels(); level++) {
// populate a temp vector for sorting based on size
const std::vector<FileMetaData*>& files = v->files_[level];
std::vector<Fsize> temp(files.size());
for (unsigned int i = 0; i < files.size(); i++) {
temp[i].index = i;
temp[i].file = files[i];
}
// do the sort based on file size
std::sort(temp.begin(), temp.end(), compareSize);
assert(temp.size() == files.size());
// initialize files_by_size_
std::vector<int>& files_by_size = v->files_by_size_[level];
for (unsigned int i = 0; i < temp.size(); i++) {
files_by_size.push_back(temp[i].index);
}
assert(v->files_[level].size() == v->files_by_size_[level].size());
}
}
}; };
VersionSet::VersionSet(const std::string& dbname, VersionSet::VersionSet(const std::string& dbname,
@ -1002,7 +968,11 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu,
// because &w is ensuring that all new writes get queued. // because &w is ensuring that all new writes get queued.
{ {
mu->Unlock(); mu->Unlock();
// The calles to Finalize and UpdateFilesBySize are cpu-heavy
// and is best called outside the mutex.
Finalize(v); Finalize(v);
UpdateFilesBySize(v);
// Write new record to MANIFEST log // Write new record to MANIFEST log
if (s.ok()) { if (s.ok()) {
@ -1416,6 +1386,47 @@ void VersionSet::Finalize(Version* v) {
} }
} }
// a static compator used to sort files based on their size
static bool compareSize(const VersionSet::Fsize& first,
const VersionSet::Fsize& second) {
return (first.file->file_size > second.file->file_size);
}
// sort all files in level1 to level(n-1) based on file size
void VersionSet::UpdateFilesBySize(Version* v) {
// No need to sort the highest level because it is never compacted.
for (int level = 0; level < NumberLevels()-1; level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
std::vector<int>& files_by_size = v->files_by_size_[level];
assert(files_by_size.size() == 0);
// populate a temp vector for sorting based on size
std::vector<Fsize> temp(files.size());
for (unsigned int i = 0; i < files.size(); i++) {
temp[i].index = i;
temp[i].file = files[i];
}
// sort the top number_of_files_to_sort_ based on file size
int num = Version::number_of_files_to_sort_;
if (num > (int)temp.size()) {
num = temp.size();
}
std::partial_sort(temp.begin(), temp.begin() + num,
temp.end(), compareSize);
assert(temp.size() == files.size());
// initialize files_by_size_
for (unsigned int i = 0; i < temp.size(); i++) {
files_by_size.push_back(temp[i].index);
}
v->next_file_to_compact_by_size_[level] = 0;
assert(v->files_[level].size() == v->files_by_size_[level].size());
}
}
Status VersionSet::WriteSnapshot(log::Writer* log) { Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery? // TODO: Break up into multiple records to reduce memory usage on recovery?
@ -1706,9 +1717,12 @@ bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
// Clear all files to indicate that they are not being compacted // Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions. // Delete this compaction from the list of running compactions.
void VersionSet::ReleaseCompactionFiles(Compaction* c) { void VersionSet::ReleaseCompactionFiles(Compaction* c, Status status) {
c->MarkFilesBeingCompacted(false); c->MarkFilesBeingCompacted(false);
compactions_in_progress_[c->level()].erase(c); compactions_in_progress_[c->level()].erase(c);
if (!status.ok()) {
c->ResetNextCompactionIndex();
}
} }
// The total size of files that are currently being compacted // The total size of files that are currently being compacted
@ -1746,12 +1760,18 @@ Compaction* VersionSet::PickCompactionBySize(int level) {
// Pick the largest file in this level that is not already // Pick the largest file in this level that is not already
// being compacted // being compacted
std::vector<int>& file_size = current_->files_by_size_[level]; std::vector<int>& file_size = current_->files_by_size_[level];
for (unsigned int i = 0; i < file_size.size(); i++) {
// record the first file that is not yet compacted
int nextIndex = -1;
for (unsigned int i = current_->next_file_to_compact_by_size_[level];
i < file_size.size(); i++) {
int index = file_size[i]; int index = file_size[i];
FileMetaData* f = current_->files_[level][index]; FileMetaData* f = current_->files_[level][index];
// check to verify files are arranged in descending size // check to verify files are arranged in descending size
assert((i == file_size.size() - 1) || assert((i == file_size.size() - 1) ||
(i >= VersionSet::number_of_files_to_sort_-1) ||
(f->file_size >= current_->files_[level][file_size[i+1]]->file_size)); (f->file_size >= current_->files_[level][file_size[i+1]]->file_size));
// do not pick a file to compact if it is being compacted // do not pick a file to compact if it is being compacted
@ -1759,6 +1779,16 @@ Compaction* VersionSet::PickCompactionBySize(int level) {
if (f->being_compacted) { if (f->being_compacted) {
continue; continue;
} }
// remember the startIndex for the next call to PickCompaction
if (nextIndex == -1) {
nextIndex = i;
}
//if (i > Version::number_of_files_to_sort_) {
// Log(options_->info_log, "XXX Looking at index %d", i);
//}
// Do not pick this file if its parents at level+1 are being compacted. // Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs // Maybe we can avoid redoing this work in SetupOtherInputs
int parent_index = -1; int parent_index = -1;
@ -1775,6 +1805,10 @@ Compaction* VersionSet::PickCompactionBySize(int level) {
delete c; delete c;
c = NULL; c = NULL;
} }
// store where to start the iteration in the next call to PickCompaction
current_->next_file_to_compact_by_size_[level] = nextIndex;
return c; return c;
} }
@ -2082,6 +2116,10 @@ void Compaction::ReleaseInputs() {
} }
} }
void Compaction::ResetNextCompactionIndex() {
input_version_->ResetNextCompactionIndex(level_);
}
static void InputSummary(std::vector<FileMetaData*>& files, static void InputSummary(std::vector<FileMetaData*>& files,
char* output, char* output,
int len) { int len) {

@ -152,6 +152,17 @@ class Version {
// This vector stores the index of the file from files_. // This vector stores the index of the file from files_.
std::vector< std::vector<int> > files_by_size_; std::vector< std::vector<int> > files_by_size_;
// An index into files_by_size_ that specifies the first
// file that is not yet compacted
std::vector<int> next_file_to_compact_by_size_;
// Only the first few entries of files_by_size_ are sorted.
// There is no need to sort all the files because it is likely
// that on a running system, we need to look at only the first
// few largest files because a new version is created every few
// seconds/minutes (because of concurrent compactions).
static const int number_of_files_to_sort_ = 50;
// Next file to compact based on seek stats. // Next file to compact based on seek stats.
FileMetaData* file_to_compact_; FileMetaData* file_to_compact_;
int file_to_compact_level_; int file_to_compact_level_;
@ -176,6 +187,12 @@ class Version {
~Version(); ~Version();
// re-initializes the index that is used to offset into files_by_size_
// to find the next compaction candidate file.
void ResetNextCompactionIndex(int level) {
next_file_to_compact_by_size_[level] = 0;
}
// No copying allowed // No copying allowed
Version(const Version&); Version(const Version&);
void operator=(const Version&); void operator=(const Version&);
@ -330,7 +347,7 @@ class VersionSet {
Compaction* PickCompactionBySize(int level); Compaction* PickCompactionBySize(int level);
// Free up the files that were participated in a compaction // Free up the files that were participated in a compaction
void ReleaseCompactionFiles(Compaction* c); void ReleaseCompactionFiles(Compaction* c, Status status);
// verify that the files that we started with for a compaction // verify that the files that we started with for a compaction
// still exist in the current version and in the same original level. // still exist in the current version and in the same original level.
@ -338,6 +355,16 @@ class VersionSet {
// pick the same files to compact. // pick the same files to compact.
bool VerifyCompactionFileConsistency(Compaction* c); bool VerifyCompactionFileConsistency(Compaction* c);
// used to sort files by size
typedef struct fsize {
int index;
FileMetaData* file;
} Fsize;
// Sort all files for this version based on their file size and
// record results in files_by_size_. The largest files are listed first.
void UpdateFilesBySize(Version *v);
private: private:
class Builder; class Builder;
struct ManifestWriter; struct ManifestWriter;
@ -513,6 +540,10 @@ class Compaction {
// mark (or clear) all files that are being compacted // mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool); void MarkFilesBeingCompacted(bool);
// In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex();
}; };
} // namespace leveldb } // namespace leveldb

Loading…
Cancel
Save