diff --git a/db/builder.cc b/db/builder.cc index 76c087346..27643ddd9 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, const SequenceNumber earliest_seqno_in_memtable) { Status s; meta->file_size = 0; + meta->smallest_seqno = meta->largest_seqno = 0; iter->SeekToFirst(); // If the sequence number of the smallest entry in the memtable is @@ -48,7 +49,10 @@ Status BuildTable(const std::string& dbname, TableBuilder* builder = new TableBuilder(options, file.get(), 0); // the first key is the smallest key - meta->smallest.DecodeFrom(iter->key()); + Slice key = iter->key(); + meta->smallest.DecodeFrom(key); + meta->smallest_seqno = GetInternalKeySeqno(key); + meta->largest_seqno = meta->smallest_seqno; MergeHelper merge(user_comparator, options.merge_operator, options.info_log.get(), @@ -135,12 +139,18 @@ Status BuildTable(const std::string& dbname, // The last key is the largest key meta->largest.DecodeFrom(Slice(prev_key)); + SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key)); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } else { for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); meta->largest.DecodeFrom(key); builder->Add(key, iter->value()); + SequenceNumber seqno = GetInternalKeySeqno(key); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } } diff --git a/db/db_bench.cc b/db/db_bench.cc index 5f5eb506b..d7d9a9547 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -9,6 +9,7 @@ #include "db/db_impl.h" #include "db/version_set.h" #include "db/db_statistics.h" +#include "leveldb/options.h" #include "leveldb/cache.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -125,7 +126,7 @@ static int FLAGS_max_write_buffer_number = 0; // The minimum number of write buffers that will be merged together // before writing to storage. This is cheap because it is an // in-memory merge. If this feature is not enabled, then all these -// write buffers are fushed to L0 as seperate files and this increases +// write buffers are fushed to L0 as separate files and this increases // read amplification because a get request has to check in all of these // files. Also, an in-memory merge may result in writing lesser // data to storage if there are duplicate records in each of these @@ -137,6 +138,15 @@ static int FLAGS_min_write_buffer_number_to_merge = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// style of compaction: level-based vs universal +static leveldb::CompactionStyle FLAGS_compaction_style = leveldb::kCompactionStyleLevel; + +// Percentage flexibilty while comparing file size. +static int FLAGS_universal_size_ratio = 1; + +// The minimum number of files in a single compaction run. +static int FLAGS_compaction_universal_min_merge_width = 2; + // Number of bytes to use as a cache of uncompressed data. // Negative means use default settings. static long FLAGS_cache_size = -1; @@ -1104,6 +1114,10 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; + options.compaction_style = FLAGS_compaction_style; + options.compaction_options_universal.size_ratio = FLAGS_universal_size_ratio; + options.compaction_options_universal.min_merge_width = + FLAGS_compaction_universal_min_merge_width; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1988,6 +2002,11 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_compaction_style = leveldb::Options().compaction_style; + FLAGS_universal_size_ratio = + leveldb::Options().compaction_options_universal.size_ratio; + FLAGS_compaction_universal_min_merge_width = + leveldb::Options().compaction_options_universal.min_merge_width; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; FLAGS_use_os_buffer = leveldb::EnvOptions().use_os_buffer; @@ -2047,6 +2066,13 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--compaction_style=%d%c", &n, &junk) == 1) { + FLAGS_compaction_style = (leveldb::CompactionStyle)n; + } else if (sscanf(argv[i], "--universal_size_ratio=%d%c", &n, &junk) == 1) { + FLAGS_universal_size_ratio = n; + } else if (sscanf(argv[i], "--universal_min_merge_width=%d%c", + &n, &junk) == 1) { + FLAGS_compaction_universal_min_merge_width = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 3fc387e17..dd942f36b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -76,6 +76,7 @@ struct DBImpl::CompactionState { uint64_t number; uint64_t file_size; InternalKey smallest, largest; + SequenceNumber smallest_seqno, largest_seqno; }; std::vector outputs; std::list allocated_file_numbers; @@ -780,7 +781,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { int level = 0; if (s.ok() && meta.file_size > 0) { edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -854,11 +856,13 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for // that key range. - if (base != nullptr && options_.max_background_compactions <= 1) { + if (base != nullptr && options_.max_background_compactions <= 1 && + options_.compaction_style == kCompactionStyleLevel) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -991,7 +995,8 @@ void DBImpl::ReFitLevel(int level) { VersionEdit edit(NumberLevels()); for (const auto& f : versions_->current()->files_[level]) { edit.DeleteFile(level, f->number); - edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } Log(options_.info_log, "Apply version edit:\n%s", edit.DebugString().data()); @@ -1452,7 +1457,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); + f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -1564,6 +1570,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); + out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); // Make the output file @@ -1574,10 +1581,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { // Over-estimate slightly so we don't end up just barely crossing // the threshold. compact->outfile->SetPreallocationBlockSize( - 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->level() + 1)); + 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), - compact->compaction->level() + 1)); + compact->compaction->output_level())); } return s; } @@ -1668,8 +1675,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( - level + 1, - out.number, out.file_size, out.smallest, out.largest); + (options_.compaction_style == kCompactionStyleUniversal) ? + level : level + 1, + out.number, out.file_size, out.smallest, out.largest, + out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -1950,7 +1959,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot && + if (options_.compaction_style == kCompactionStyleLevel && + bottommost_level && ikey.sequence < earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems @@ -1970,11 +1980,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } + + SequenceNumber seqno = GetInternalKeySeqno(newkey); if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(newkey); + compact->current_output()->smallest_seqno = seqno; + } else { + compact->current_output()->smallest_seqno = + std::min(compact->current_output()->smallest_seqno, seqno); } compact->current_output()->largest.DecodeFrom(newkey); compact->builder->Add(newkey, value); + compact->current_output()->largest_seqno = + std::max(compact->current_output()->largest_seqno, seqno); // Close output file if it is big enough if (compact->builder->FileSize() >= diff --git a/db/db_test.cc b/db/db_test.cc index 61fdae82a..b861a4cc4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3401,7 +3401,7 @@ void BM_LogAndApply(int iters, int num_base_files) { for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vbase.AddFile(2, fnum++, 1 /* file size */, start, limit); + vbase.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); } ASSERT_OK(vset.LogAndApply(&vbase, &mu)); @@ -3412,7 +3412,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vedit.AddFile(2, fnum++, 1 /* file size */, start, limit); + vedit.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); vset.LogAndApply(&vedit, &mu); } uint64_t stop_micros = env->NowMicros(); diff --git a/db/dbformat.h b/db/dbformat.h index 1a0516670..5d596ad1a 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -173,6 +173,15 @@ inline void UpdateInternalKey(char* internal_key, EncodeFixed64(seqtype, newval); } +// Get the sequence number from the internal key +inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { + const size_t n = internal_key.size(); + assert(n >= 8); + uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + return num >> 8; +} + + // A helper class useful for DBImpl::Get() class LookupKey { public: diff --git a/db/repair.cc b/db/repair.cc index 049aabb3d..3737b09c2 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -88,6 +88,7 @@ class Repairer { private: struct TableInfo { FileMetaData meta; + SequenceNumber min_sequence; SequenceNumber max_sequence; }; @@ -264,6 +265,7 @@ class Repairer { ReadOptions(), storage_options_, t->meta.number, t->meta.file_size); bool empty = true; ParsedInternalKey parsed; + t->min_sequence = 0; t->max_sequence = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); @@ -280,6 +282,9 @@ class Repairer { t->meta.smallest.DecodeFrom(key); } t->meta.largest.DecodeFrom(key); + if (parsed.sequence < t->min_sequence) { + t->min_sequence = parsed.sequence; + } if (parsed.sequence > t->max_sequence) { t->max_sequence = parsed.sequence; } @@ -320,7 +325,8 @@ class Repairer { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; edit_->AddFile(0, t.meta.number, t.meta.file_size, - t.meta.smallest, t.meta.largest); + t.meta.smallest, t.meta.largest, + t.min_sequence, t.max_sequence); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/version_edit.cc b/db/version_edit.cc index ed63c1013..65a63947a 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -20,7 +20,10 @@ enum Tag { kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9 + kPrevLogNumber = 9, + + // these are new formats divergent from open source leveldb + kNewFile2 = 100 // store smallest & largest seqno }; void VersionEdit::Clear() { @@ -76,12 +79,14 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; - PutVarint32(dst, kNewFile); + PutVarint32(dst, kNewFile2); PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.number); PutVarint64(dst, f.file_size); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); + PutVarint64(dst, f.smallest_seqno); + PutVarint64(dst, f.largest_seqno); } } @@ -201,6 +206,22 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kNewFile2: + if (GetLevel(&input, &level, &msg) && + GetVarint64(&input, &f.number) && + GetVarint64(&input, &f.file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest) && + GetVarint64(&input, &f.smallest_seqno) && + GetVarint64(&input, &f.largest_seqno) ) { + new_files_.push_back(std::make_pair(level, f)); + } else { + if (!msg) { + msg = "new-file2 entry"; + } + } + break; + default: msg = "unknown tag"; break; diff --git a/db/version_edit.h b/db/version_edit.h index 2743e9e0d..7037763b8 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -22,6 +22,8 @@ struct FileMetaData { InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table bool being_compacted; // Is this file undergoing compaction? + SequenceNumber smallest_seqno;// The smallest seqno in this file + SequenceNumber largest_seqno; // The largest seqno in this file FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0), being_compacted(false) { } @@ -67,12 +69,17 @@ class VersionEdit { void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey& smallest, - const InternalKey& largest) { + const InternalKey& largest, + const SequenceNumber& smallest_seqno, + const SequenceNumber& largest_seqno) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; + f.smallest_seqno = smallest_seqno; + f.largest_seqno = largest_seqno; + assert(smallest_seqno <= largest_seqno); new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index b211eb1a9..26b69199e 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -27,7 +27,9 @@ TEST(VersionEditTest, EncodeDecode) { TestEncodeDecode(edit); edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, InternalKey("foo", kBig + 500 + i, kTypeValue), - InternalKey("zoo", kBig + 600 + i, kTypeDeletion)); + InternalKey("zoo", kBig + 600 + i, kTypeDeletion), + kBig + 500 + i, + kBig + 600 + i); edit.DeleteFile(4, kBig + 700 + i); edit.SetCompactPointer(i, InternalKey("x", kBig + 900 + i, kTypeValue)); } diff --git a/db/version_set.cc b/db/version_set.cc index 9ea9ad809..1b5001d59 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5,6 +5,7 @@ #include "db/version_set.h" #include +#include #include #include "db/filename.h" #include "db/log_reader.h" @@ -22,8 +23,8 @@ namespace leveldb { -static int64_t TotalFileSize(const std::vector& files) { - int64_t sum = 0; +static uint64_t TotalFileSize(const std::vector& files) { + uint64_t sum = 0; for (size_t i = 0; i < files.size() && files[i]; i++) { sum += files[i]->file_size; } @@ -338,6 +339,14 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } +static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { + if (a->smallest_seqno > b->smallest_seqno) { + assert(a->largest_seqno > b->largest_seqno); + return true; + } + assert(a->largest_seqno <= b->largest_seqno); + return false; +} Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), next_(this), prev_(this), refs_(0), @@ -434,7 +443,11 @@ void Version::Get(const ReadOptions& options, if (important_files.empty()) continue; if (level == 0) { - std::sort(important_files.begin(), important_files.end(), NewestFirst); + if (vset_->options_->compaction_style == kCompactionStyleUniversal) { + std::sort(important_files.begin(), important_files.end(), NewestFirstBySeqNo); + } else { + std::sort(important_files.begin(), important_files.end(), NewestFirst); + } } else { // Sanity check to make sure that the files are correctly sorted #ifndef NDEBUG @@ -565,7 +578,7 @@ int Version::PickLevelForMemTableOutput( break; } GetOverlappingInputs(level + 2, &start, &limit, &overlaps); - const int64_t sum = TotalFileSize(overlaps); + const uint64_t sum = TotalFileSize(overlaps); if (sum > vset_->MaxGrandParentOverlapBytes(level)) { break; } @@ -1109,7 +1122,10 @@ void VersionSet::Init(int num_levels) { int target_file_size_multiplier = options_->target_file_size_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; for (int i = 0; i < num_levels; i++) { - if (i > 1) { + if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) { + max_file_size_[i] = ULLONG_MAX; + level_max_bytes_[i] = options_->max_bytes_for_level_base; + } else if (i > 1) { max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier * options_->max_bytes_for_level_multiplier_additional[i-1]; @@ -1656,17 +1672,32 @@ void VersionSet::Finalize(Version* v, } } -// a static compator used to sort files based on their size -static bool compareSize(const VersionSet::Fsize& first, +// A static compator used to sort files based on their size +// In normal mode: descending size +static bool compareSizeDescending(const VersionSet::Fsize& first, const VersionSet::Fsize& second) { return (first.file->file_size > second.file->file_size); } +// A static compator used to sort files based on their seqno +// In universal style : descending seqno +static bool compareSeqnoDescending(const VersionSet::Fsize& first, + const VersionSet::Fsize& second) { + if (first.file->smallest_seqno > second.file->smallest_seqno) { + assert(first.file->largest_seqno > second.file->largest_seqno); + return true; + } + assert(first.file->largest_seqno <= second.file->largest_seqno); + return false; +} // sort all files in level1 to level(n-1) based on file size void VersionSet::UpdateFilesBySize(Version* v) { // No need to sort the highest level because it is never compacted. - for (int level = 0; level < NumberLevels()-1; level++) { + int max_level = (options_->compaction_style == kCompactionStyleUniversal) ? + NumberLevels() : NumberLevels() - 1; + + for (int level = 0; level < max_level; level++) { const std::vector& files = v->files_[level]; std::vector& files_by_size = v->files_by_size_[level]; @@ -1680,12 +1711,18 @@ void VersionSet::UpdateFilesBySize(Version* v) { } // sort the top number_of_files_to_sort_ based on file size - int num = Version::number_of_files_to_sort_; - if (num > (int)temp.size()) { - num = temp.size(); + if (options_->compaction_style == kCompactionStyleUniversal) { + int num = temp.size(); + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSeqnoDescending); + } else { + int num = Version::number_of_files_to_sort_; + if (num > (int)temp.size()) { + num = temp.size(); + } + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSizeDescending); } - std::partial_sort(temp.begin(), temp.begin() + num, - temp.end(), compareSize); assert(temp.size() == files.size()); // initialize files_by_size_ @@ -1718,7 +1755,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } } @@ -1762,6 +1800,23 @@ const char* VersionSet::LevelDataSizeSummary( return scratch->buffer; } +const char* VersionSet::LevelFileSummary( + FileSummaryStorage* scratch, int level) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, "#%ld(seq=%ld,sz=%ld,%d) ", + f->number, f->smallest_seqno, + f->file_size, f->being_compacted); + if (ret < 0 || ret >= sz) + break; + len += ret; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + // Opens the mainfest file and reads all records // till it finds the record we are looking for. bool VersionSet::ManifestContains(const std::string& record) const { @@ -1867,14 +1922,14 @@ int64_t VersionSet::NumLevelBytes(int level) const { } int64_t VersionSet::MaxNextLevelOverlappingBytes() { - int64_t result = 0; + uint64_t result = 0; std::vector overlaps; for (int level = 1; level < NumberLevels() - 1; level++) { for (size_t i = 0; i < current_->files_[level].size(); i++) { const FileMetaData* f = current_->files_[level][i]; current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest, &overlaps); - const int64_t sum = TotalFileSize(overlaps); + const uint64_t sum = TotalFileSize(overlaps); if (sum > result) { result = sum; } @@ -1970,13 +2025,13 @@ uint64_t VersionSet::MaxFileSizeForLevel(int level) { return max_file_size_[level]; } -int64_t VersionSet::ExpandedCompactionByteSizeLimit(int level) { +uint64_t VersionSet::ExpandedCompactionByteSizeLimit(int level) { uint64_t result = MaxFileSizeForLevel(level); result *= options_->expanded_compaction_factor; return result; } -int64_t VersionSet::MaxGrandParentOverlapBytes(int level) { +uint64_t VersionSet::MaxGrandParentOverlapBytes(int level) { uint64_t result = MaxFileSizeForLevel(level); result *= options_->max_grandparent_overlap_factor; return result; @@ -2059,6 +2114,176 @@ void VersionSet::SizeBeingCompacted(std::vector& sizes) { } } +Compaction* VersionSet::PickCompactionUniversal(int level, double score) { + assert (level == 0); + + // percentage flexibilty while comparing file sizes + uint64_t ratio = options_->compaction_options_universal.size_ratio; + unsigned int min_merge_width = + options_->compaction_options_universal.min_merge_width; + unsigned int max_merge_width = + options_->compaction_options_universal.max_merge_width; + + if ((current_->files_[level].size() <= + (unsigned int)options_->level0_file_num_compaction_trigger)) { + Log(options_->info_log, "Universal: nothing to do\n"); + return nullptr; + } + VersionSet::FileSummaryStorage tmp; + Log(options_->info_log, "Universal: candidate files(%lu): %s\n", + current_->files_[level].size(), + LevelFileSummary(&tmp, 0)); + + Compaction* c = nullptr; + c = new Compaction(level, level, MaxFileSizeForLevel(level), + LLONG_MAX, NumberLevels()); + c->score_ = score; + + // The files are sorted from newest first to oldest last. + std::vector& file_by_time = current_->files_by_size_[level]; + FileMetaData* f = nullptr; + bool done = false; + assert(file_by_time.size() == current_->files_[level].size()); + + unsigned int max_files_to_compact = std::min(max_merge_width, UINT_MAX); + + // Make two pass. The first pass considers a candidate file + // only if it is smaller than the total size accumulated so far. + // The second pass does not look at the slope of the + // file-size curve to decide what to pick for compaction. + for (int iter = 0; !done && iter < 2; iter++) { + + for (unsigned int loop = 0; loop < file_by_time.size(); ) { + + // Skip files that are already being compacted + for (f = nullptr; loop < file_by_time.size(); loop++) { + int index = file_by_time[loop]; + f = current_->files_[level][index]; + + if (!f->being_compacted) { + break; + } + Log(options_->info_log, "Universal: file %ld[%d] being compacted, skipping", + f->number, loop); + f = nullptr; + } + + // This file is not being compacted. Consider it as the + // first candidate to be compacted. + unsigned int candidate_count = 1; + uint64_t candidate_size = f != nullptr? f->file_size : 0; + if (f != nullptr) { + Log(options_->info_log, "Universal: Possible candidate file %ld[%d] %s.", + f->number, loop, iter == 0? "" : "forced "); + } + + // Check if the suceeding files need compaction. + for (unsigned int i = loop+1; + candidate_count < max_files_to_compact && i < file_by_time.size(); + i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + if (f->being_compacted) { + break; + } + // If this is the first iteration, then we pick files if the + // total candidate file size (increased by the specified ratio) + // is still larger than the next candidate file. + if (iter == 0) { + uint64_t sz = (candidate_size * (100 + ratio)) /100; + if (sz < f->file_size) { + break; + } + } + candidate_count++; + candidate_size += f->file_size; + } + + // Found a series of consecutive files that need compaction. + if (candidate_count >= (unsigned int)min_merge_width) { + for (unsigned int i = loop; i < loop + candidate_count; i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + c->inputs_[0].push_back(f); + Log(options_->info_log, "Universal: Picking file %ld[%d] with size %ld %s", + f->number, i, f->file_size, + (iter == 0 ? "" : "forced")); + } + done = true; + break; + } else { + for (unsigned int i = loop; + i < loop + candidate_count && i < file_by_time.size(); i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + Log(options_->info_log, "Universal: Skipping file %ld[%d] with size %ld %d %s", + f->number, i, f->file_size, f->being_compacted, + (iter == 0 ? "" : "forced")); + } + } + loop += candidate_count; + } + assert(done || c->inputs_[0].size() == 0); + + // If we are unable to find a normal compaction run and we are still + // above the compaction threshold, iterate again to pick compaction + // candidates, this time without considering their size differences. + if (!done) { + int files_not_in_compaction = 0; + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + f = current_->files_[level][i]; + if (!f->being_compacted) { + files_not_in_compaction++; + } + } + int expected_num_files = files_not_in_compaction + + compactions_in_progress_[level].size(); + if (expected_num_files <= + options_->level0_file_num_compaction_trigger + 1) { + done = true; // nothing more to do + } else { + max_files_to_compact = std::min((int)max_merge_width, + expected_num_files - options_->level0_file_num_compaction_trigger); + Log(options_->info_log, "Universal: second loop with maxfiles %d", + max_files_to_compact); + } + } + } + if (c->inputs_[0].size() <= 1) { + Log(options_->info_log, "Universal: only %ld files, nothing to do.\n", + c->inputs_[0].size()); + delete c; + return nullptr; + } + + // validate that all the chosen files are non overlapping in time + FileMetaData* newerfile __attribute__((unused)) = nullptr; + for (unsigned int i = 0; i < c->inputs_[0].size(); i++) { + FileMetaData* f = c->inputs_[0][i]; + assert (f->smallest_seqno <= f->largest_seqno); + assert(newerfile == nullptr || + newerfile->smallest_seqno > f->largest_seqno); + newerfile = f; + } + + // update statistics + if (options_->statistics != nullptr) { + options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION, + c->inputs_[0].size()); + } + + c->input_version_ = current_; + c->input_version_->Ref(); + + // mark all the files that are being compacted + c->MarkFilesBeingCompacted(true); + + // remember this currently undergoing compaction + compactions_in_progress_[level].insert(c); + + return c; +} + Compaction* VersionSet::PickCompactionBySize(int level, double score) { Compaction* c = nullptr; @@ -2072,7 +2297,7 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { assert(level >= 0); assert(level+1 < NumberLevels()); - c = new Compaction(level, MaxFileSizeForLevel(level+1), + c = new Compaction(level, level+1, MaxFileSizeForLevel(level+1), MaxGrandParentOverlapBytes(level), NumberLevels()); c->score_ = score; @@ -2142,6 +2367,13 @@ Compaction* VersionSet::PickCompaction() { current_->vset_->SizeBeingCompacted(size_being_compacted); Finalize(current_, size_being_compacted); + // In universal style of compaction, compact L0 files back into L0. + if (options_->compaction_style == kCompactionStyleUniversal) { + int level = 0; + c = PickCompactionUniversal(level, current_->compaction_score_[level]); + return c; + } + // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. // @@ -2169,9 +2401,9 @@ Compaction* VersionSet::PickCompaction() { // Only allow one level 0 compaction at a time. // Do not pick this file if its parents at level+1 are being compacted. if (level != 0 || compactions_in_progress_[0].empty()) { - if (!ParentRangeInCompaction(&f->smallest, &f->largest, level, - &parent_index)) { - c = new Compaction(level, MaxFileSizeForLevel(level+1), + if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, + &parent_index)) { + c = new Compaction(level, level, MaxFileSizeForLevel(level+1), MaxGrandParentOverlapBytes(level), NumberLevels(), true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; @@ -2331,10 +2563,10 @@ void VersionSet::SetupOtherInputs(Compaction* c) { std::vector expanded0; current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); - const int64_t inputs0_size = TotalFileSize(c->inputs_[0]); - const int64_t inputs1_size = TotalFileSize(c->inputs_[1]); - const int64_t expanded0_size = TotalFileSize(expanded0); - int64_t limit = ExpandedCompactionByteSizeLimit(level); + const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]); + const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]); + const uint64_t expanded0_size = TotalFileSize(expanded0); + uint64_t limit = ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && !FilesInCompaction(expanded0) && @@ -2414,7 +2646,10 @@ Compaction* VersionSet::CompactRange( } } } - Compaction* c = new Compaction(level, MaxFileSizeForLevel(level+1), + int out_level = (options_->compaction_style == kCompactionStyleUniversal) ? + level : level+1; + + Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(out_level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->inputs_[0] = inputs; @@ -2435,10 +2670,11 @@ Compaction* VersionSet::CompactRange( return c; } -Compaction::Compaction(int level, uint64_t target_file_size, +Compaction::Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction) : level_(level), + out_level_(out_level), max_output_file_size_(target_file_size), maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), input_version_(nullptr), diff --git a/db/version_set.h b/db/version_set.h index a8403868b..25ed97750 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -352,6 +352,9 @@ class VersionSet { struct LevelSummaryStorage { char buffer[100]; }; + struct FileSummaryStorage { + char buffer[1000]; + }; const char* LevelSummary(LevelSummaryStorage* scratch) const; // printf contents (for debugging) @@ -362,6 +365,10 @@ class VersionSet { // of files per level. Uses *scratch as backing store. const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const; + // Return a human-readable short (single-line) summary of files + // in a specified level. Uses *scratch as backing store. + const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; + // Return the size of the current manifest file const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } @@ -371,6 +378,9 @@ class VersionSet { // function will return nullptr. Compaction* PickCompactionBySize(int level, double score); + // Pick files to compact in Universal mode + Compaction* PickCompactionUniversal(int level, double score); + // Free up the files that were participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); @@ -426,9 +436,9 @@ class VersionSet { bool ManifestContains(const std::string& record) const; - int64_t ExpandedCompactionByteSizeLimit(int level); + uint64_t ExpandedCompactionByteSizeLimit(int level); - int64_t MaxGrandParentOverlapBytes(int level); + uint64_t MaxGrandParentOverlapBytes(int level); Env* const env_; const std::string dbname_; @@ -503,9 +513,12 @@ class Compaction { ~Compaction(); // Return the level that is being compacted. Inputs from "level" - // and "level+1" will be merged to produce a set of "level+1" files. + // will be merged. int level() const { return level_; } + // Outputs will go to this level + int output_level() const { return out_level_; } + // Return the object that holds the edits to the descriptor done // by this compaction. VersionEdit* edit() { return edit_; } @@ -548,13 +561,14 @@ class Compaction { friend class Version; friend class VersionSet; - explicit Compaction(int level, uint64_t target_file_size, + explicit Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction = false); int level_; + int out_level_; // levels to which output files are stored uint64_t max_output_file_size_; - int64_t maxGrandParentOverlapBytes_; + uint64_t maxGrandParentOverlapBytes_; Version* input_version_; VersionEdit* edit_; int number_levels_; @@ -569,7 +583,7 @@ class Compaction { std::vector grandparents_; size_t grandparent_index_; // Index in grandparent_starts_ bool seen_key_; // Some output key has been seen - int64_t overlapped_bytes_; // Bytes of overlap between current output + uint64_t overlapped_bytes_; // Bytes of overlap between current output // and grandparent files int base_index_; // index of the file in files_[level_] int parent_index_; // index of some file with same range in files_[level_+1] diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 1bf63c33f..52b6a2058 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -12,6 +12,7 @@ #include #include "leveldb/slice.h" #include "leveldb/statistics.h" +#include "leveldb/universal_compaction.h" #include "leveldb/memtablerep.h" namespace leveldb { @@ -40,6 +41,11 @@ enum CompressionType { kBZip2Compression = 0x3 }; +enum CompactionStyle { + kCompactionStyleLevel = 0x0, // level based compaction style + kCompactionStyleUniversal = 0x1 // Universal compaction style +}; + // Compression options for different compression algorithms like Zlib struct CompressionOptions { int window_bits; @@ -483,6 +489,12 @@ struct Options { // Default: 0 uint64_t bytes_per_sync; + // The compaction style. Default: kCompactionStyleLevel + CompactionStyle compaction_style; + + // The options needed to support Universal Style compactions + CompactionOptionsUniversal compaction_options_universal; + // Use KeyMayExist API to filter deletes when this is true. // If KeyMayExist returns false, i.e. the key definitely does not exist, then // the delete is a noop. KeyMayExist only incurs in-memory look up. @@ -494,7 +506,6 @@ struct Options { // Default: a factory that provides a skip-list-based implementation of // MemTableRep. std::shared_ptr memtable_factory; - }; // Options that control read operations diff --git a/include/leveldb/statistics.h b/include/leveldb/statistics.h index 679236fee..5ab7f8ed6 100644 --- a/include/leveldb/statistics.h +++ b/include/leveldb/statistics.h @@ -115,7 +115,8 @@ enum Histograms { STALL_L0_NUM_FILES_COUNT = 14, HARD_RATE_LIMIT_DELAY_COUNT = 15, SOFT_RATE_LIMIT_DELAY_COUNT = 16, - HISTOGRAM_ENUM_MAX = 17 + NUM_FILES_IN_SINGLE_COMPACTION = 17, + HISTOGRAM_ENUM_MAX = 18 }; const std::vector> HistogramsNameMap = { @@ -135,7 +136,8 @@ const std::vector> HistogramsNameMap = { { STALL_MEMTABLE_COMPACTION_COUNT, "rocksdb.memtable.compaction.count"}, { STALL_L0_NUM_FILES_COUNT, "rocksdb.num.files.stall.count"}, { HARD_RATE_LIMIT_DELAY_COUNT, "rocksdb.hard.rate.limit.delay.count"}, - { SOFT_RATE_LIMIT_DELAY_COUNT, "rocksdb.soft.rate.limit.delay.count"} + { SOFT_RATE_LIMIT_DELAY_COUNT, "rocksdb.soft.rate.limit.delay.count"}, + { NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction" } }; struct HistogramData { diff --git a/include/leveldb/universal_compaction.h b/include/leveldb/universal_compaction.h new file mode 100644 index 000000000..df8402ca4 --- /dev/null +++ b/include/leveldb/universal_compaction.h @@ -0,0 +1,57 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H +#define STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H + +#include +#include +#include +#include +#include +#include +#include "leveldb/slice.h" +#include "leveldb/statistics.h" + +namespace leveldb { + +// +// Algorithm used to make a compaction request stop picking new files +// into a single compaction run +// +enum CompactionStopStyle { + kCompactionStopStyleSimilarSize, // pick files of similar size + kCompactionStopStyleTotalSize // total size of picked files > next file +}; + +class CompactionOptionsUniversal { + public: + + // Percentage flexibilty while comparing file size. If the candidate file(s) + // size is 1% smaller than the next file's size, then include next file into + // this candidate set. // Default: 1 + unsigned int size_ratio; + + // The minimum number of files in a single compaction run. Default: 2 + unsigned int min_merge_width; + + // The maximum number of files in a single compaction run. Default: INT_MAX + unsigned int max_merge_width; + + // The algorithm used to stop picking files into a single compaction run + // Default: kCompactionStopStyleTotalSize + CompactionStopStyle stop_style; + + // Default set of parameters + CompactionOptionsUniversal() : + size_ratio(1), + min_merge_width(2), + max_merge_width(UINT_MAX), + stop_style(kCompactionStopStyleTotalSize) { + } +}; + +} // namespace leveldb + +#endif // STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 15f08cd2a..cce60b16e 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -88,6 +88,9 @@ static int FLAGS_max_write_buffer_number = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// This is initialized to default value of false +static leveldb::CompactionStyle FLAGS_compaction_style = leveldb::kCompactionStyleLevel; + // Number of bytes to use as a cache of uncompressed data. static long FLAGS_cache_size = 2 * KB * KB * KB; @@ -935,6 +938,7 @@ class StressTest { options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.max_background_compactions = FLAGS_max_background_compactions; + options.compaction_style = FLAGS_compaction_style; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1021,6 +1025,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_compaction_style = + leveldb::Options().compaction_style; FLAGS_level0_file_num_compaction_trigger = leveldb::Options().level0_file_num_compaction_trigger; FLAGS_level0_slowdown_writes_trigger = @@ -1073,6 +1079,8 @@ int main(int argc, char** argv) { FLAGS_max_write_buffer_number = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--compaction_style=%d%c", &n, &junk) == 1) { + FLAGS_compaction_style = (leveldb::CompactionStyle)n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 083683ee5..86e4fbb00 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1197,7 +1197,7 @@ void ApproxSizeCommand::DoCommand() { uint64_t sizes[1]; db_->GetApproximateSizes(ranges, 1, sizes); fprintf(stdout, "%ld\n", sizes[0]); - /* Wierd that GetApproximateSizes() returns void, although documentation + /* Weird that GetApproximateSizes() returns void, although documentation * says that it returns a Status object. if (!st.ok()) { exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); diff --git a/util/options.cc b/util/options.cc index 5f41a1c15..800c8d321 100644 --- a/util/options.cc +++ b/util/options.cc @@ -78,6 +78,7 @@ Options::Options() access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), bytes_per_sync(0), + compaction_style(kCompactionStyleLevel), filter_deletes(false), memtable_factory(std::shared_ptr(new SkipListFactory)) { assert(memtable_factory.get() != nullptr); @@ -218,6 +219,14 @@ Options::Dump(Logger* log) const bytes_per_sync); Log(log," Options.filter_deletes: %d", filter_deletes); + Log(log," Options.compaction_style: %d", + compaction_style); + Log(log," Options.compaction_options_universal.size_ratio: %d", + compaction_options_universal.size_ratio); + Log(log," Options.compaction_options_universal.min_merge_width: %d", + compaction_options_universal.min_merge_width); + Log(log," Options.compaction_options_universal.max_merge_width: %d", + compaction_options_universal.max_merge_width); } // Options::Dump //