Sort files correctly in Builder::SaveTo

Summary:
Previously, we used to sort all files by BySmallestFirst comparator and then re-sort level0 files in the Finalize() (recently moved to end of SaveTo).

In this diff, I chose the correct comparator at the beginning and sort the files correctly in Builder::SaveTo.

I also added a verification that all files are sorted correctly in CheckConsistency()

NOTE: This diff depends on D17037

Test Plan: make check. Will also run db_stress

Reviewers: dhruba, haobo, sdong, ljin

Reviewed By: ljin

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17049
main
Igor Canadi 11 years ago
parent 954679bb0f
commit 1e9621d4e5
  1. 105
      db/version_set.cc

@ -430,17 +430,30 @@ static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
return false;
}
static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
namespace {
bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
if (a->smallest_seqno > b->smallest_seqno) {
assert(a->largest_seqno > b->largest_seqno);
return true;
bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
if (a->smallest_seqno != b->smallest_seqno) {
return a->smallest_seqno > b->smallest_seqno;
}
assert(a->largest_seqno <= b->largest_seqno);
return false;
if (a->largest_seqno != b->largest_seqno) {
return a->largest_seqno > b->largest_seqno;
}
// Break ties by file number
return NewestFirst(a, b);
}
bool BySmallestKey(FileMetaData* a, FileMetaData* b,
const InternalKeyComparator* cmp) {
int r = cmp->Compare(a->smallest, b->smallest);
if (r != 0) {
return (r < 0);
}
// Break ties by file number
return (a->number < b->number);
}
} // anonymous namespace
Version::Version(VersionSet* vset, uint64_t version_number)
: vset_(vset),
@ -1172,22 +1185,32 @@ struct VersionSet::ManifestWriter {
// Versions that contain full copies of the intermediate state.
class VersionSet::Builder {
private:
// Helper to sort by v->files_[file_number].smallest
struct BySmallestKey {
// Helper to sort v->files_
// kLevel0LevelCompaction -- NewestFirst
// kLevel0UniversalCompaction -- NewestFirstBySeqNo
// kLevelNon0 -- BySmallestKey
struct FileComparator {
enum SortMethod {
kLevel0LevelCompaction = 0,
kLevel0UniversalCompaction = 1,
kLevelNon0 = 2,
} sort_method;
const InternalKeyComparator* internal_comparator;
bool operator()(FileMetaData* f1, FileMetaData* f2) const {
int r = internal_comparator->Compare(f1->smallest, f2->smallest);
if (r != 0) {
return (r < 0);
} else {
// Break ties by file number
return (f1->number < f2->number);
switch (sort_method) {
case kLevel0LevelCompaction:
return NewestFirst(f1, f2);
case kLevel0UniversalCompaction:
return NewestFirstBySeqNo(f1, f2);
case kLevelNon0:
return BySmallestKey(f1, f2, internal_comparator);
}
assert(false);
}
};
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
typedef std::set<FileMetaData*, FileComparator> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
@ -1196,16 +1219,24 @@ class VersionSet::Builder {
VersionSet* vset_;
Version* base_;
LevelState* levels_;
FileComparator level_zero_cmp_;
FileComparator level_nonzero_cmp_;
public:
// Initialize a builder with the files from *base and other info from *vset
Builder(VersionSet* vset, Version* base) : vset_(vset), base_(base) {
base_->Ref();
levels_ = new LevelState[base->NumberLevels()];
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < base->NumberLevels(); level++) {
levels_[level].added_files = new FileSet(cmp);
level_zero_cmp_.sort_method =
(vset_->options_->compaction_style == kCompactionStyleUniversal)
? FileComparator::kLevel0UniversalCompaction
: FileComparator::kLevel0LevelCompaction;
level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
level_nonzero_cmp_.internal_comparator = &vset_->icmp_;
levels_[0].added_files = new FileSet(level_zero_cmp_);
for (int level = 1; level < base->NumberLevels(); level++) {
levels_[level].added_files = new FileSet(level_nonzero_cmp_);
}
}
@ -1239,16 +1270,24 @@ class VersionSet::Builder {
void CheckConsistency(Version* v) {
#ifndef NDEBUG
// make sure the files are sorted correctly
for (int level = 0; level < v->NumberLevels(); level++) {
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
for (size_t i = 1; i < v->files_[level].size(); i++) {
auto f1 = v->files_[level][i - 1];
auto f2 = v->files_[level][i];
if (level == 0) {
assert(level_zero_cmp_(f1, f2));
if (vset_->options_->compaction_style == kCompactionStyleUniversal) {
assert(f1->largest_seqno > f2->largest_seqno);
}
} else {
assert(level_nonzero_cmp_(f1, f2));
// Make sure there is no overlap in levels > 0
if (vset_->icmp_.Compare(f1->largest, f2->smallest) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
(f1->largest).DebugString().c_str(),
(f2->smallest).DebugString().c_str());
abort();
}
}
@ -1347,9 +1386,8 @@ class VersionSet::Builder {
void SaveTo(Version* v) {
CheckConsistency(base_);
CheckConsistency(v);
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < base_->NumberLevels(); level++) {
const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const auto& base_files = base_->files_[level];
@ -1375,13 +1413,6 @@ class VersionSet::Builder {
}
}
// TODO(icanadi) do it in the loop above, which already sorts the files
// Pre-sort level0 for Get()
if (v->vset_->options_->compaction_style == kCompactionStyleUniversal) {
std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirstBySeqNo);
} else {
std::sort(v->files_[0].begin(), v->files_[0].end(), NewestFirst);
}
CheckConsistency(v);
}

Loading…
Cancel
Save