diff --git a/db/version_set.cc b/db/version_set.cc index a97686cda..bda1b56ab 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -430,17 +430,30 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, return false; } -static bool NewestFirst(FileMetaData* a, FileMetaData* b) { +namespace { +bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } -static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { - if (a->smallest_seqno > b->smallest_seqno) { - assert(a->largest_seqno > b->largest_seqno); - return true; +bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { + if (a->smallest_seqno != b->smallest_seqno) { + return a->smallest_seqno > b->smallest_seqno; } - assert(a->largest_seqno <= b->largest_seqno); - return false; + if (a->largest_seqno != b->largest_seqno) { + return a->largest_seqno > b->largest_seqno; + } + // Break ties by file number + return NewestFirst(a, b); } +bool BySmallestKey(FileMetaData* a, FileMetaData* b, + const InternalKeyComparator* cmp) { + int r = cmp->Compare(a->smallest, b->smallest); + if (r != 0) { + return (r < 0); + } + // Break ties by file number + return (a->number < b->number); +} +} // anonymous namespace Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), @@ -1172,22 +1185,32 @@ struct VersionSet::ManifestWriter { // Versions that contain full copies of the intermediate state. class VersionSet::Builder { private: - // Helper to sort by v->files_[file_number].smallest - struct BySmallestKey { + // Helper to sort v->files_ + // kLevel0LevelCompaction -- NewestFirst + // kLevel0UniversalCompaction -- NewestFirstBySeqNo + // kLevelNon0 -- BySmallestKey + struct FileComparator { + enum SortMethod { + kLevel0LevelCompaction = 0, + kLevel0UniversalCompaction = 1, + kLevelNon0 = 2, + } sort_method; const InternalKeyComparator* internal_comparator; bool operator()(FileMetaData* f1, FileMetaData* f2) const { - int r = internal_comparator->Compare(f1->smallest, f2->smallest); - if (r != 0) { - return (r < 0); - } else { - // Break ties by file number - return (f1->number < f2->number); + switch (sort_method) { + case kLevel0LevelCompaction: + return NewestFirst(f1, f2); + case kLevel0UniversalCompaction: + return NewestFirstBySeqNo(f1, f2); + case kLevelNon0: + return BySmallestKey(f1, f2, internal_comparator); } + assert(false); } }; - typedef std::set FileSet; + typedef std::set FileSet; struct LevelState { std::set deleted_files; FileSet* added_files; @@ -1196,16 +1219,24 @@ class VersionSet::Builder { VersionSet* vset_; Version* base_; LevelState* levels_; + FileComparator level_zero_cmp_; + FileComparator level_nonzero_cmp_; public: // Initialize a builder with the files from *base and other info from *vset Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) { base_->Ref(); levels_ = new LevelState[base->NumberLevels()]; - BySmallestKey cmp; - cmp.internal_comparator = &vset_->icmp_; - for (int level = 0; level < base->NumberLevels(); level++) { - levels_[level].added_files = new FileSet(cmp); + level_zero_cmp_.sort_method = + (vset_->options_->compaction_style == kCompactionStyleUniversal) + ? FileComparator::kLevel0UniversalCompaction + : FileComparator::kLevel0LevelCompaction; + level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0; + level_nonzero_cmp_.internal_comparator = &vset_->icmp_; + + levels_[0].added_files = new FileSet(level_zero_cmp_); + for (int level = 1; level < base->NumberLevels(); level++) { + levels_[level].added_files = new FileSet(level_nonzero_cmp_); } } @@ -1239,16 +1270,24 @@ class VersionSet::Builder { void CheckConsistency(Version* v) { #ifndef NDEBUG + // make sure the files are sorted correctly for (int level = 0; level < v->NumberLevels(); level++) { - // Make sure there is no overlap in levels > 0 - if (level > 0) { - for (uint32_t i = 1; i < v->files_[level].size(); i++) { - const InternalKey& prev_end = v->files_[level][i-1]->largest; - const InternalKey& this_begin = v->files_[level][i]->smallest; - if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) { + for (size_t i = 1; i < v->files_[level].size(); i++) { + auto f1 = v->files_[level][i - 1]; + auto f2 = v->files_[level][i]; + if (level == 0) { + assert(level_zero_cmp_(f1, f2)); + if (vset_->options_->compaction_style == kCompactionStyleUniversal) { + assert(f1->largest_seqno > f2->largest_seqno); + } + } else { + assert(level_nonzero_cmp_(f1, f2)); + + // Make sure there is no overlap in levels > 0 + if (vset_->icmp_.Compare(f1->largest, f2->smallest) >= 0) { fprintf(stderr, "overlapping ranges in same level %s vs. %s\n", - prev_end.DebugString().c_str(), - this_begin.DebugString().c_str()); + (f1->largest).DebugString().c_str(), + (f2->smallest).DebugString().c_str()); abort(); } } @@ -1347,9 +1386,8 @@ class VersionSet::Builder { void SaveTo(Version* v) { CheckConsistency(base_); CheckConsistency(v); - BySmallestKey cmp; - cmp.internal_comparator = &vset_->icmp_; for (int level = 0; level < base_->NumberLevels(); level++) { + const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_; // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. const auto& base_files = base_->files_[level]; @@ -1375,13 +1413,6 @@ class VersionSet::Builder { } } - // TODO(icanadi) do it in the loop above, which already sorts the files - // Pre-sort level0 for Get() - if (v->vset_->options_->compaction_style == kCompactionStyleUniversal) { - std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirstBySeqNo); - } else { - std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirst); - } CheckConsistency(v); }