From 554c06dd18d32866eb2430497a70c947923a4dbb Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Thu, 13 Jun 2013 22:09:08 -0700 Subject: [PATCH 1/8] Reduce write amplification by merging files in L0 back into L0 Summary: There is a new option called hybrid_mode which, when switched on, causes HBase style compactions. Files from L0 are compacted back into L0. This meat of this compaction algorithm is in PickCompactionHybrid(). All files reside in L0. That means all files have overlapping keys. Each file has a time-bound, i.e. each file contains a range of keys that were inserted around the same time. The start-seqno and the end-seqno refers to the timeframe when these keys were inserted. Files that have contiguous seqno are compacted together into a larger file. All files are ordered from most recent to the oldest. The current compaction algorithm starts to look for candidate files starting from the most recent file. It continues to add more files to the same compaction run as long as the sum of the files chosen till now is smaller than the next candidate file size. This logic needs to be debated and validated. The above logic should reduce write amplification to a large extent... will publish numbers shortly. Test Plan: dbstress runs for 6 hours with no data corruption (tested so far). Differential Revision: https://reviews.facebook.net/D11289 --- db/builder.cc | 9 ++ db/db_bench.cc | 8 ++ db/db_impl.cc | 35 ++++-- db/db_test.cc | 4 +- db/dbformat.h | 9 ++ db/repair.cc | 8 +- db/version_edit.cc | 25 +++- db/version_edit.h | 9 +- db/version_edit_test.cc | 4 +- db/version_set.cc | 254 +++++++++++++++++++++++++++++++++++--- db/version_set.h | 18 ++- include/leveldb/options.h | 3 + tools/db_stress.cc | 8 ++ util/options.cc | 5 +- 14 files changed, 365 insertions(+), 34 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 2b7c59283..db09c0d8c 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, const SequenceNumber earliest_seqno_in_memtable) { Status s; meta->file_size = 0; + meta->smallest_seqno = meta->largest_seqno = 0; iter->SeekToFirst(); // If the sequence number of the smallest entry in the memtable is @@ -50,6 +51,8 @@ Status BuildTable(const std::string& dbname, // the first key is the smallest key Slice key = iter->key(); meta->smallest.DecodeFrom(key); + meta->smallest_seqno = GetInternalKeySeqno(key); + meta->largest_seqno = meta->smallest_seqno; MergeHelper merge(user_comparator, options.merge_operator, options.info_log.get(), @@ -124,12 +127,18 @@ Status BuildTable(const std::string& dbname, // output last key builder->Add(Slice(prev_key), Slice(prev_value)); meta->largest.DecodeFrom(Slice(prev_key)); + SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key)); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } else { for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); meta->largest.DecodeFrom(key); builder->Add(key, iter->value()); + SequenceNumber seqno = GetInternalKeySeqno(key); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } } diff --git a/db/db_bench.cc b/db/db_bench.cc index 179896a77..ad0765a31 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -137,6 +137,9 @@ static int FLAGS_min_write_buffer_number_to_merge = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// Run database in hybrid mode where all data resides in L0. +static bool FLAGS_hybrid_mode = false; + // Number of bytes to use as a cache of uncompressed data. // Negative means use default settings. static long FLAGS_cache_size = -1; @@ -1104,6 +1107,7 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; + options.hybrid_mode = FLAGS_hybrid_mode; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1986,6 +1990,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_hybrid_mode = + leveldb::Options().hybrid_mode; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; FLAGS_use_os_buffer = leveldb::EnvOptions().use_os_buffer; @@ -2044,6 +2050,8 @@ int main(int argc, char** argv) { FLAGS_min_write_buffer_number_to_merge = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_mode = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index c5c156feb..d5bd3cbd9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -75,6 +75,7 @@ struct DBImpl::CompactionState { uint64_t number; uint64_t file_size; InternalKey smallest, largest; + SequenceNumber smallest_seqno, largest_seqno; }; std::vector outputs; std::list allocated_file_numbers; @@ -759,7 +760,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { int level = 0; if (s.ok() && meta.file_size > 0) { edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -833,11 +835,13 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for // that key range. - if (base != nullptr && options_.max_background_compactions <= 1) { + if (base != nullptr && options_.max_background_compactions <= 1 && + !options_.hybrid_mode) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -1356,7 +1360,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); + f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -1468,6 +1473,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); + out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); // Make the output file @@ -1478,10 +1484,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { // Over-estimate slightly so we don't end up just barely crossing // the threshold. compact->outfile->SetPreallocationBlockSize( - 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->level() + 1)); + 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), - compact->compaction->level() + 1)); + compact->compaction->output_level())); } return s; } @@ -1572,8 +1578,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( - level + 1, - out.number, out.file_size, out.smallest, out.largest); + options_.hybrid_mode? level : level + 1, + out.number, out.file_size, out.smallest, out.largest, + out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -1821,7 +1828,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot && + // Hybrid mode depends on the sequence number to determine + // time-order of files that is needed for compactions. + if (!options_.hybrid_mode && + bottommost_level && ikey.sequence < earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems @@ -1841,11 +1851,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } + SequenceNumber seqno = GetInternalKeySeqno(newkey); if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(newkey); + compact->current_output()->smallest_seqno = seqno; + } else { + compact->current_output()->smallest_seqno = + std::min(compact->current_output()->smallest_seqno, seqno); } compact->current_output()->largest.DecodeFrom(newkey); compact->builder->Add(newkey, value); + compact->current_output()->largest_seqno = + std::max(compact->current_output()->largest_seqno, seqno); // Close output file if it is big enough if (compact->builder->FileSize() >= diff --git a/db/db_test.cc b/db/db_test.cc index 7ce4ac419..8275029b4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3306,7 +3306,7 @@ void BM_LogAndApply(int iters, int num_base_files) { for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vbase.AddFile(2, fnum++, 1 /* file size */, start, limit); + vbase.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); } ASSERT_OK(vset.LogAndApply(&vbase, &mu)); @@ -3317,7 +3317,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vedit.AddFile(2, fnum++, 1 /* file size */, start, limit); + vedit.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); vset.LogAndApply(&vedit, &mu); } uint64_t stop_micros = env->NowMicros(); diff --git a/db/dbformat.h b/db/dbformat.h index 1a0516670..5d596ad1a 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -173,6 +173,15 @@ inline void UpdateInternalKey(char* internal_key, EncodeFixed64(seqtype, newval); } +// Get the sequence number from the internal key +inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { + const size_t n = internal_key.size(); + assert(n >= 8); + uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + return num >> 8; +} + + // A helper class useful for DBImpl::Get() class LookupKey { public: diff --git a/db/repair.cc b/db/repair.cc index d1c0c4525..09406781a 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -88,6 +88,7 @@ class Repairer { private: struct TableInfo { FileMetaData meta; + SequenceNumber min_sequence; SequenceNumber max_sequence; }; @@ -263,6 +264,7 @@ class Repairer { ReadOptions(), storage_options_, t->meta.number, t->meta.file_size); bool empty = true; ParsedInternalKey parsed; + t->min_sequence = 0; t->max_sequence = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); @@ -279,6 +281,9 @@ class Repairer { t->meta.smallest.DecodeFrom(key); } t->meta.largest.DecodeFrom(key); + if (parsed.sequence < t->min_sequence) { + t->min_sequence = parsed.sequence; + } if (parsed.sequence > t->max_sequence) { t->max_sequence = parsed.sequence; } @@ -319,7 +324,8 @@ class Repairer { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; edit_->AddFile(0, t.meta.number, t.meta.file_size, - t.meta.smallest, t.meta.largest); + t.meta.smallest, t.meta.largest, + t.min_sequence, t.max_sequence); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/version_edit.cc b/db/version_edit.cc index ed63c1013..65a63947a 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -20,7 +20,10 @@ enum Tag { kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9 + kPrevLogNumber = 9, + + // these are new formats divergent from open source leveldb + kNewFile2 = 100 // store smallest & largest seqno }; void VersionEdit::Clear() { @@ -76,12 +79,14 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; - PutVarint32(dst, kNewFile); + PutVarint32(dst, kNewFile2); PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.number); PutVarint64(dst, f.file_size); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); + PutVarint64(dst, f.smallest_seqno); + PutVarint64(dst, f.largest_seqno); } } @@ -201,6 +206,22 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kNewFile2: + if (GetLevel(&input, &level, &msg) && + GetVarint64(&input, &f.number) && + GetVarint64(&input, &f.file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest) && + GetVarint64(&input, &f.smallest_seqno) && + GetVarint64(&input, &f.largest_seqno) ) { + new_files_.push_back(std::make_pair(level, f)); + } else { + if (!msg) { + msg = "new-file2 entry"; + } + } + break; + default: msg = "unknown tag"; break; diff --git a/db/version_edit.h b/db/version_edit.h index 2743e9e0d..7037763b8 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -22,6 +22,8 @@ struct FileMetaData { InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table bool being_compacted; // Is this file undergoing compaction? + SequenceNumber smallest_seqno;// The smallest seqno in this file + SequenceNumber largest_seqno; // The largest seqno in this file FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0), being_compacted(false) { } @@ -67,12 +69,17 @@ class VersionEdit { void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey& smallest, - const InternalKey& largest) { + const InternalKey& largest, + const SequenceNumber& smallest_seqno, + const SequenceNumber& largest_seqno) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; + f.smallest_seqno = smallest_seqno; + f.largest_seqno = largest_seqno; + assert(smallest_seqno <= largest_seqno); new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index b211eb1a9..26b69199e 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -27,7 +27,9 @@ TEST(VersionEditTest, EncodeDecode) { TestEncodeDecode(edit); edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, InternalKey("foo", kBig + 500 + i, kTypeValue), - InternalKey("zoo", kBig + 600 + i, kTypeDeletion)); + InternalKey("zoo", kBig + 600 + i, kTypeDeletion), + kBig + 500 + i, + kBig + 600 + i); edit.DeleteFile(4, kBig + 700 + i); edit.SetCompactPointer(i, InternalKey("x", kBig + 900 + i, kTypeValue)); } diff --git a/db/version_set.cc b/db/version_set.cc index 9b01d935e..a31cbe01a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5,6 +5,7 @@ #include "db/version_set.h" #include +#include #include #include "db/filename.h" #include "db/log_reader.h" @@ -309,6 +310,14 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } +static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { + if (a->smallest_seqno > b->smallest_seqno) { + assert(a->largest_seqno > b->largest_seqno); + return true; + } + assert(a->largest_seqno <= b->largest_seqno); + return false; +} Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), next_(this), prev_(this), refs_(0), @@ -375,7 +384,11 @@ void Version::Get(const ReadOptions& options, } if (tmp.empty()) continue; - std::sort(tmp.begin(), tmp.end(), NewestFirst); + if (vset_->options_->hybrid_mode) { + std::sort(tmp.begin(), tmp.end(), NewestFirstBySeqNo); + } else { + std::sort(tmp.begin(), tmp.end(), NewestFirst); + } files = &tmp[0]; num_files = tmp.size(); } else { @@ -1011,7 +1024,10 @@ void VersionSet::Init(int num_levels) { int target_file_size_multiplier = options_->target_file_size_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; for (int i = 0; i < num_levels; i++) { - if (i > 1) { + if (i == 0) { + max_file_size_[i] = LLONG_MAX; + level_max_bytes_[i] = options_->max_bytes_for_level_base; + } else if (i > 1) { max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier * options_->max_bytes_for_level_multiplier_additional[i-1]; @@ -1558,17 +1574,32 @@ void VersionSet::Finalize(Version* v, } } -// a static compator used to sort files based on their size -static bool compareSize(const VersionSet::Fsize& first, +// A static compator used to sort files based on their size +// In normal mode: descending size +static bool compareSizeDescending(const VersionSet::Fsize& first, const VersionSet::Fsize& second) { return (first.file->file_size > second.file->file_size); } +// A static compator used to sort files based on their seqno +// In hybrid mode: descending seqno +static bool compareSeqnoDescending(const VersionSet::Fsize& first, + const VersionSet::Fsize& second) { + if (first.file->smallest_seqno > second.file->smallest_seqno) { + assert(first.file->largest_seqno > second.file->largest_seqno); + return true; + } + assert(first.file->largest_seqno <= second.file->largest_seqno); + return false; +} // sort all files in level1 to level(n-1) based on file size void VersionSet::UpdateFilesBySize(Version* v) { // No need to sort the highest level because it is never compacted. - for (int level = 0; level < NumberLevels()-1; level++) { + int max_level = options_->hybrid_mode? NumberLevels() : + NumberLevels() - 1; + + for (int level = 0; level < max_level; level++) { const std::vector& files = v->files_[level]; std::vector& files_by_size = v->files_by_size_[level]; @@ -1582,12 +1613,18 @@ void VersionSet::UpdateFilesBySize(Version* v) { } // sort the top number_of_files_to_sort_ based on file size - int num = Version::number_of_files_to_sort_; - if (num > (int)temp.size()) { - num = temp.size(); + if (options_->hybrid_mode) { + int num = temp.size(); + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSeqnoDescending); + } else { + int num = Version::number_of_files_to_sort_; + if (num > (int)temp.size()) { + num = temp.size(); + } + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSizeDescending); } - std::partial_sort(temp.begin(), temp.begin() + num, - temp.end(), compareSize); assert(temp.size() == files.size()); // initialize files_by_size_ @@ -1620,7 +1657,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } } @@ -1664,6 +1702,23 @@ const char* VersionSet::LevelDataSizeSummary( return scratch->buffer; } +const char* VersionSet::LevelFileSummary( + FileSummaryStorage* scratch, int level) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, "#%ld(seq=%ld,sz=%ld,%d) ", + f->number, f->smallest_seqno, + f->file_size, f->being_compacted); + if (ret < 0 || ret >= sz) + break; + len += ret; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + // Opens the mainfest file and reads all records // till it finds the record we are looking for. bool VersionSet::ManifestContains(const std::string& record) const { @@ -1961,6 +2016,166 @@ void VersionSet::SizeBeingCompacted(std::vector& sizes) { } } +Compaction* VersionSet::PickCompactionHybrid(int level, double score) { + assert (level == 0); + + // percentage flexibilty while comparing file sizes + uint64_t ratio = 1; + + if ((current_->files_[level].size() <= + (unsigned int)options_->level0_file_num_compaction_trigger)) { + Log(options_->info_log, "XXX Hybrid: nothing to do\n"); + return nullptr; + } + VersionSet::FileSummaryStorage tmp; + Log(options_->info_log, "Hybrid: candidate files(%lu): %s\n", + current_->files_[level].size(), + LevelFileSummary(&tmp, 0)); + + Compaction* c = nullptr; + c = new Compaction(level, level, MaxFileSizeForLevel(level), + LLONG_MAX, NumberLevels()); + c->score_ = score; + + // The files are sorted from newest first to oldest last. + std::vector& file_by_time = current_->files_by_size_[level]; + FileMetaData* f = nullptr; + bool done = false; + assert(file_by_time.size() == current_->files_[level].size()); + + unsigned int max_files_to_compact = UINT_MAX; + + // Make two pass. The first pass considers a candidate file + // only if it is smaller than the total size accumulated so far. + // The second pass does not look at the slope of the + // file-size curve to decide what to pick for compaction. + for (int iter = 0; !done && iter < 2; iter++) { + + for (unsigned int loop = 0; loop < file_by_time.size(); ) { + + // Skip files that are already being compacted + for (f = nullptr; loop < file_by_time.size(); loop++) { + int index = file_by_time[loop]; + f = current_->files_[level][index]; + + if (!f->being_compacted) { + break; + } + Log(options_->info_log, "Hybrid: file %ld[%d] being compacted, skipping", + f->number, loop); + f = nullptr; + } + + // This file is not being compacted. Consider it as the + // first candidate to be compacted. + unsigned int candidate_count = 1; + uint64_t candidate_size = f != nullptr? f->file_size : 0; + if (f != nullptr) { + Log(options_->info_log, "Hybrid: Possible candidate file %ld[%d] %s.", + f->number, loop, iter == 0? "" : "forced "); + } + + // Check if the suceeding files need compaction. + for (unsigned int i = loop+1; + candidate_count < max_files_to_compact && i < file_by_time.size(); + i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + if (f->being_compacted) { + break; + } + // If this is the first iteration, then we pick files if the + // total candidate file size (increased by the specified ratio) + // is still larger than the next candidate file. + if (iter == 0) { + uint64_t sz = (candidate_size * (100 + ratio)) /100; + if (sz < f->file_size) { + break; + } + } + candidate_count++; + candidate_size += f->file_size; + } + + // Found a series of consecutive files that need compaction. + if (candidate_count > 1) { + for (unsigned int i = loop; i < loop + candidate_count; i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + c->inputs_[0].push_back(f); + Log(options_->info_log, "Hybrid: Picking file %ld[%d] with size %ld %s", + f->number, i, f->file_size, + (iter == 0 ? "" : "forced")); + } + done = true; + break; + } else { + for (unsigned int i = loop; + i < loop + candidate_count && i < file_by_time.size(); i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + Log(options_->info_log, "Hybrid: Skipping file %ld[%d] with size %ld %d %s", + f->number, i, f->file_size, f->being_compacted, + (iter == 0 ? "" : "forced")); + } + } + loop += candidate_count; + } + assert(done || c->inputs_[0].size() == 0); + + // If we are unable to find a normal compaction run and we are still + // above the compaction threshold, iterate again to pick compaction + // candidates, this time without considering their size differences. + if (!done) { + int files_not_in_compaction = 0; + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + f = current_->files_[level][i]; + if (!f->being_compacted) { + files_not_in_compaction++; + } + } + int expected_num_files = files_not_in_compaction + + compactions_in_progress_[level].size(); + if (expected_num_files <= + options_->level0_file_num_compaction_trigger + 1) { + done = true; // nothing more to do + } else { + max_files_to_compact = expected_num_files - + options_->level0_file_num_compaction_trigger; + Log(options_->info_log, "Hybrid: second loop with maxfiles %d", + max_files_to_compact); + } + } + } + if (c->inputs_[0].size() <= 1) { + Log(options_->info_log, "XXX Hybrid: only %ld files, nothing to do.\n", + c->inputs_[0].size()); + delete c; + return nullptr; + } + + // validate that all the chosen files are non overlapping in time + FileMetaData* newerfile __attribute__((unused)) = nullptr; + for (unsigned int i = 0; i < c->inputs_[0].size(); i++) { + FileMetaData* f = c->inputs_[0][i]; + assert (f->smallest_seqno <= f->largest_seqno); + assert(newerfile == nullptr || + newerfile->smallest_seqno > f->largest_seqno); + newerfile = f; + } + + c->input_version_ = current_; + c->input_version_->Ref(); + + // mark all the files that are being compacted + c->MarkFilesBeingCompacted(true); + + // remember this currently undergoing compaction + compactions_in_progress_[level].insert(c); + + return c; +} + Compaction* VersionSet::PickCompactionBySize(int level, double score) { Compaction* c = nullptr; @@ -1974,7 +2189,7 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { assert(level >= 0); assert(level+1 < NumberLevels()); - c = new Compaction(level, MaxFileSizeForLevel(level), + c = new Compaction(level, level+1, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->score_ = score; @@ -2044,6 +2259,13 @@ Compaction* VersionSet::PickCompaction() { current_->vset_->SizeBeingCompacted(size_being_compacted); Finalize(current_, size_being_compacted); + // In hybrid mode compact L0 files back into L0. + if (options_->hybrid_mode) { + int level = 0; + c = PickCompactionHybrid(level, current_->compaction_score_[level]); + return c; + } + // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. // @@ -2072,7 +2294,7 @@ Compaction* VersionSet::PickCompaction() { if (level != 0 || compactions_in_progress_[0].empty()) { if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, &parent_index)) { - c = new Compaction(level, MaxFileSizeForLevel(level), + c = new Compaction(level, level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels(), true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; @@ -2246,8 +2468,9 @@ Compaction* VersionSet::CompactRange( } } } + int out_level = options_->hybrid_mode ? level : level+1; - Compaction* c = new Compaction(level, MaxFileSizeForLevel(level), + Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->input_version_ = current_; c->input_version_->Ref(); @@ -2261,10 +2484,11 @@ Compaction* VersionSet::CompactRange( return c; } -Compaction::Compaction(int level, uint64_t target_file_size, +Compaction::Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction) : level_(level), + out_level_(out_level), max_output_file_size_(target_file_size), maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), input_version_(nullptr), diff --git a/db/version_set.h b/db/version_set.h index ba924126f..85c02b973 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -340,6 +340,9 @@ class VersionSet { struct LevelSummaryStorage { char buffer[100]; }; + struct FileSummaryStorage { + char buffer[1000]; + }; const char* LevelSummary(LevelSummaryStorage* scratch) const; // printf contents (for debugging) @@ -350,6 +353,10 @@ class VersionSet { // of files per level. Uses *scratch as backing store. const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const; + // Return a human-readable short (single-line) summary of files + // in a specified level. Uses *scratch as backing store. + const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; + // Return the size of the current manifest file const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } @@ -359,6 +366,9 @@ class VersionSet { // function will return nullptr. Compaction* PickCompactionBySize(int level, double score); + // Pick files to compact in hybrid mode + Compaction* PickCompactionHybrid(int level, double score); + // Free up the files that were participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); @@ -489,9 +499,12 @@ class Compaction { ~Compaction(); // Return the level that is being compacted. Inputs from "level" - // and "level+1" will be merged to produce a set of "level+1" files. + // will be merged. int level() const { return level_; } + // Outputs will go to this level + int output_level() const { return out_level_; } + // Return the object that holds the edits to the descriptor done // by this compaction. VersionEdit* edit() { return edit_; } @@ -534,11 +547,12 @@ class Compaction { friend class Version; friend class VersionSet; - explicit Compaction(int level, uint64_t target_file_size, + explicit Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction = false); int level_; + int out_level_; // levels to which output files are stored uint64_t max_output_file_size_; int64_t maxGrandParentOverlapBytes_; Version* input_version_; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index fa69a7eff..465e83a10 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -476,6 +476,9 @@ struct Options { // Default: 0 uint64_t bytes_per_sync; + // Hybrid Mode. There is only a single level and files in L0 are + // compacted back into L0. Default: false + bool hybrid_mode; }; // Options that control read operations diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 02afe306c..4c671d84f 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -88,6 +88,9 @@ static int FLAGS_max_write_buffer_number = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// This is initialized to default value of false +static bool FLAGS_hybrid_mode = false; + // Number of bytes to use as a cache of uncompressed data. static long FLAGS_cache_size = 2 * KB * KB * KB; @@ -930,6 +933,7 @@ class StressTest { options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.max_background_compactions = FLAGS_max_background_compactions; + options.hybrid_mode = FLAGS_hybrid_mode; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1016,6 +1020,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_hybrid_mode = + leveldb::Options().hybrid_mode; FLAGS_level0_file_num_compaction_trigger = leveldb::Options().level0_file_num_compaction_trigger; FLAGS_level0_slowdown_writes_trigger = @@ -1068,6 +1074,8 @@ int main(int argc, char** argv) { FLAGS_max_write_buffer_number = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_mode = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/util/options.cc b/util/options.cc index a8222ad5c..47dae8398 100644 --- a/util/options.cc +++ b/util/options.cc @@ -76,7 +76,8 @@ Options::Options() advise_random_on_open(true), access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), - bytes_per_sync(0) { + bytes_per_sync(0), + hybrid_mode(false) { } static const char* const access_hints[] = { @@ -217,6 +218,8 @@ Options::Dump(Logger* log) const use_adaptive_mutex); Log(log," Options.bytes_per_sync: %ld", bytes_per_sync); + Log(log," Options.hybrid_mode: %d", + hybrid_mode); } // Options::Dump // From 47c4191fe8d35af986042bed16e56d5f85ab9184 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Thu, 13 Jun 2013 22:09:08 -0700 Subject: [PATCH 2/8] Reduce write amplification by merging files in L0 back into L0 Summary: There is a new option called hybrid_mode which, when switched on, causes HBase style compactions. Files from L0 are compacted back into L0. This meat of this compaction algorithm is in PickCompactionHybrid(). All files reside in L0. That means all files have overlapping keys. Each file has a time-bound, i.e. each file contains a range of keys that were inserted around the same time. The start-seqno and the end-seqno refers to the timeframe when these keys were inserted. Files that have contiguous seqno are compacted together into a larger file. All files are ordered from most recent to the oldest. The current compaction algorithm starts to look for candidate files starting from the most recent file. It continues to add more files to the same compaction run as long as the sum of the files chosen till now is smaller than the next candidate file size. This logic needs to be debated and validated. The above logic should reduce write amplification to a large extent... will publish numbers shortly. Test Plan: dbstress runs for 6 hours with no data corruption (tested so far). Differential Revision: https://reviews.facebook.net/D11289 --- db/builder.cc | 9 ++ db/db_bench.cc | 26 ++++ db/db_impl.cc | 35 +++-- db/db_test.cc | 4 +- db/dbformat.h | 9 ++ db/repair.cc | 8 +- db/version_edit.cc | 25 +++- db/version_edit.h | 9 +- db/version_edit_test.cc | 4 +- db/version_set.cc | 259 +++++++++++++++++++++++++++++++++-- db/version_set.h | 18 ++- include/leveldb/options.h | 11 ++ include/leveldb/statistics.h | 6 +- tools/db_stress.cc | 8 ++ util/options.cc | 11 +- 15 files changed, 406 insertions(+), 36 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 2b7c59283..db09c0d8c 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, const SequenceNumber earliest_seqno_in_memtable) { Status s; meta->file_size = 0; + meta->smallest_seqno = meta->largest_seqno = 0; iter->SeekToFirst(); // If the sequence number of the smallest entry in the memtable is @@ -50,6 +51,8 @@ Status BuildTable(const std::string& dbname, // the first key is the smallest key Slice key = iter->key(); meta->smallest.DecodeFrom(key); + meta->smallest_seqno = GetInternalKeySeqno(key); + meta->largest_seqno = meta->smallest_seqno; MergeHelper merge(user_comparator, options.merge_operator, options.info_log.get(), @@ -124,12 +127,18 @@ Status BuildTable(const std::string& dbname, // output last key builder->Add(Slice(prev_key), Slice(prev_value)); meta->largest.DecodeFrom(Slice(prev_key)); + SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key)); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } else { for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); meta->largest.DecodeFrom(key); builder->Add(key, iter->value()); + SequenceNumber seqno = GetInternalKeySeqno(key); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } } diff --git a/db/db_bench.cc b/db/db_bench.cc index 179896a77..f45777801 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -137,6 +137,15 @@ static int FLAGS_min_write_buffer_number_to_merge = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// Run database in hybrid mode where all data resides in L0. +static bool FLAGS_hybrid_mode = false; + +// Percentage flexibilty while comparing file size. +static int FLAGS_hybrid_size_ratio = 1; + +// The minimum number of files in a single compaction run. +static int FLAGS_hybrid_min_numfiles_in_single_compaction = 2; + // Number of bytes to use as a cache of uncompressed data. // Negative means use default settings. static long FLAGS_cache_size = -1; @@ -1104,6 +1113,10 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; + options.hybrid_mode = FLAGS_hybrid_mode; + options.hybrid_size_ratio = FLAGS_hybrid_size_ratio; + options.hybrid_min_numfiles_in_single_compaction = + FLAGS_hybrid_min_numfiles_in_single_compaction; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1986,6 +1999,12 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_hybrid_mode = + leveldb::Options().hybrid_mode; + FLAGS_hybrid_size_ratio = + leveldb::Options().hybrid_size_ratio; + FLAGS_hybrid_min_numfiles_in_single_compaction = + leveldb::Options().hybrid_min_numfiles_in_single_compaction; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; FLAGS_use_os_buffer = leveldb::EnvOptions().use_os_buffer; @@ -2044,6 +2063,13 @@ int main(int argc, char** argv) { FLAGS_min_write_buffer_number_to_merge = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_mode = n; + } else if (sscanf(argv[i], "--hybrid_size_ratio=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_size_ratio = n; + } else if (sscanf(argv[i], "--hybrid_min_numfiles_in_single_compaction=%d%c", + &n, &junk) == 1) { + FLAGS_hybrid_min_numfiles_in_single_compaction = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index c5c156feb..d5bd3cbd9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -75,6 +75,7 @@ struct DBImpl::CompactionState { uint64_t number; uint64_t file_size; InternalKey smallest, largest; + SequenceNumber smallest_seqno, largest_seqno; }; std::vector outputs; std::list allocated_file_numbers; @@ -759,7 +760,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { int level = 0; if (s.ok() && meta.file_size > 0) { edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -833,11 +835,13 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for // that key range. - if (base != nullptr && options_.max_background_compactions <= 1) { + if (base != nullptr && options_.max_background_compactions <= 1 && + !options_.hybrid_mode) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -1356,7 +1360,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); + f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -1468,6 +1473,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); + out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); // Make the output file @@ -1478,10 +1484,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { // Over-estimate slightly so we don't end up just barely crossing // the threshold. compact->outfile->SetPreallocationBlockSize( - 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->level() + 1)); + 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), - compact->compaction->level() + 1)); + compact->compaction->output_level())); } return s; } @@ -1572,8 +1578,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( - level + 1, - out.number, out.file_size, out.smallest, out.largest); + options_.hybrid_mode? level : level + 1, + out.number, out.file_size, out.smallest, out.largest, + out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -1821,7 +1828,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot && + // Hybrid mode depends on the sequence number to determine + // time-order of files that is needed for compactions. + if (!options_.hybrid_mode && + bottommost_level && ikey.sequence < earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems @@ -1841,11 +1851,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } + SequenceNumber seqno = GetInternalKeySeqno(newkey); if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(newkey); + compact->current_output()->smallest_seqno = seqno; + } else { + compact->current_output()->smallest_seqno = + std::min(compact->current_output()->smallest_seqno, seqno); } compact->current_output()->largest.DecodeFrom(newkey); compact->builder->Add(newkey, value); + compact->current_output()->largest_seqno = + std::max(compact->current_output()->largest_seqno, seqno); // Close output file if it is big enough if (compact->builder->FileSize() >= diff --git a/db/db_test.cc b/db/db_test.cc index 7ce4ac419..8275029b4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3306,7 +3306,7 @@ void BM_LogAndApply(int iters, int num_base_files) { for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vbase.AddFile(2, fnum++, 1 /* file size */, start, limit); + vbase.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); } ASSERT_OK(vset.LogAndApply(&vbase, &mu)); @@ -3317,7 +3317,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vedit.AddFile(2, fnum++, 1 /* file size */, start, limit); + vedit.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); vset.LogAndApply(&vedit, &mu); } uint64_t stop_micros = env->NowMicros(); diff --git a/db/dbformat.h b/db/dbformat.h index 1a0516670..5d596ad1a 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -173,6 +173,15 @@ inline void UpdateInternalKey(char* internal_key, EncodeFixed64(seqtype, newval); } +// Get the sequence number from the internal key +inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { + const size_t n = internal_key.size(); + assert(n >= 8); + uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + return num >> 8; +} + + // A helper class useful for DBImpl::Get() class LookupKey { public: diff --git a/db/repair.cc b/db/repair.cc index d1c0c4525..09406781a 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -88,6 +88,7 @@ class Repairer { private: struct TableInfo { FileMetaData meta; + SequenceNumber min_sequence; SequenceNumber max_sequence; }; @@ -263,6 +264,7 @@ class Repairer { ReadOptions(), storage_options_, t->meta.number, t->meta.file_size); bool empty = true; ParsedInternalKey parsed; + t->min_sequence = 0; t->max_sequence = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); @@ -279,6 +281,9 @@ class Repairer { t->meta.smallest.DecodeFrom(key); } t->meta.largest.DecodeFrom(key); + if (parsed.sequence < t->min_sequence) { + t->min_sequence = parsed.sequence; + } if (parsed.sequence > t->max_sequence) { t->max_sequence = parsed.sequence; } @@ -319,7 +324,8 @@ class Repairer { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; edit_->AddFile(0, t.meta.number, t.meta.file_size, - t.meta.smallest, t.meta.largest); + t.meta.smallest, t.meta.largest, + t.min_sequence, t.max_sequence); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/version_edit.cc b/db/version_edit.cc index ed63c1013..65a63947a 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -20,7 +20,10 @@ enum Tag { kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9 + kPrevLogNumber = 9, + + // these are new formats divergent from open source leveldb + kNewFile2 = 100 // store smallest & largest seqno }; void VersionEdit::Clear() { @@ -76,12 +79,14 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; - PutVarint32(dst, kNewFile); + PutVarint32(dst, kNewFile2); PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.number); PutVarint64(dst, f.file_size); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); + PutVarint64(dst, f.smallest_seqno); + PutVarint64(dst, f.largest_seqno); } } @@ -201,6 +206,22 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kNewFile2: + if (GetLevel(&input, &level, &msg) && + GetVarint64(&input, &f.number) && + GetVarint64(&input, &f.file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest) && + GetVarint64(&input, &f.smallest_seqno) && + GetVarint64(&input, &f.largest_seqno) ) { + new_files_.push_back(std::make_pair(level, f)); + } else { + if (!msg) { + msg = "new-file2 entry"; + } + } + break; + default: msg = "unknown tag"; break; diff --git a/db/version_edit.h b/db/version_edit.h index 2743e9e0d..7037763b8 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -22,6 +22,8 @@ struct FileMetaData { InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table bool being_compacted; // Is this file undergoing compaction? + SequenceNumber smallest_seqno;// The smallest seqno in this file + SequenceNumber largest_seqno; // The largest seqno in this file FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0), being_compacted(false) { } @@ -67,12 +69,17 @@ class VersionEdit { void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey& smallest, - const InternalKey& largest) { + const InternalKey& largest, + const SequenceNumber& smallest_seqno, + const SequenceNumber& largest_seqno) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; + f.smallest_seqno = smallest_seqno; + f.largest_seqno = largest_seqno; + assert(smallest_seqno <= largest_seqno); new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index b211eb1a9..26b69199e 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -27,7 +27,9 @@ TEST(VersionEditTest, EncodeDecode) { TestEncodeDecode(edit); edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, InternalKey("foo", kBig + 500 + i, kTypeValue), - InternalKey("zoo", kBig + 600 + i, kTypeDeletion)); + InternalKey("zoo", kBig + 600 + i, kTypeDeletion), + kBig + 500 + i, + kBig + 600 + i); edit.DeleteFile(4, kBig + 700 + i); edit.SetCompactPointer(i, InternalKey("x", kBig + 900 + i, kTypeValue)); } diff --git a/db/version_set.cc b/db/version_set.cc index 9b01d935e..0398bf6f6 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5,6 +5,7 @@ #include "db/version_set.h" #include +#include #include #include "db/filename.h" #include "db/log_reader.h" @@ -309,6 +310,14 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } +static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { + if (a->smallest_seqno > b->smallest_seqno) { + assert(a->largest_seqno > b->largest_seqno); + return true; + } + assert(a->largest_seqno <= b->largest_seqno); + return false; +} Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), next_(this), prev_(this), refs_(0), @@ -375,7 +384,11 @@ void Version::Get(const ReadOptions& options, } if (tmp.empty()) continue; - std::sort(tmp.begin(), tmp.end(), NewestFirst); + if (vset_->options_->hybrid_mode) { + std::sort(tmp.begin(), tmp.end(), NewestFirstBySeqNo); + } else { + std::sort(tmp.begin(), tmp.end(), NewestFirst); + } files = &tmp[0]; num_files = tmp.size(); } else { @@ -1011,7 +1024,10 @@ void VersionSet::Init(int num_levels) { int target_file_size_multiplier = options_->target_file_size_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; for (int i = 0; i < num_levels; i++) { - if (i > 1) { + if (i == 0) { + max_file_size_[i] = LLONG_MAX; + level_max_bytes_[i] = options_->max_bytes_for_level_base; + } else if (i > 1) { max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier * options_->max_bytes_for_level_multiplier_additional[i-1]; @@ -1558,17 +1574,32 @@ void VersionSet::Finalize(Version* v, } } -// a static compator used to sort files based on their size -static bool compareSize(const VersionSet::Fsize& first, +// A static compator used to sort files based on their size +// In normal mode: descending size +static bool compareSizeDescending(const VersionSet::Fsize& first, const VersionSet::Fsize& second) { return (first.file->file_size > second.file->file_size); } +// A static compator used to sort files based on their seqno +// In hybrid mode: descending seqno +static bool compareSeqnoDescending(const VersionSet::Fsize& first, + const VersionSet::Fsize& second) { + if (first.file->smallest_seqno > second.file->smallest_seqno) { + assert(first.file->largest_seqno > second.file->largest_seqno); + return true; + } + assert(first.file->largest_seqno <= second.file->largest_seqno); + return false; +} // sort all files in level1 to level(n-1) based on file size void VersionSet::UpdateFilesBySize(Version* v) { // No need to sort the highest level because it is never compacted. - for (int level = 0; level < NumberLevels()-1; level++) { + int max_level = options_->hybrid_mode? NumberLevels() : + NumberLevels() - 1; + + for (int level = 0; level < max_level; level++) { const std::vector& files = v->files_[level]; std::vector& files_by_size = v->files_by_size_[level]; @@ -1582,12 +1613,18 @@ void VersionSet::UpdateFilesBySize(Version* v) { } // sort the top number_of_files_to_sort_ based on file size - int num = Version::number_of_files_to_sort_; - if (num > (int)temp.size()) { - num = temp.size(); + if (options_->hybrid_mode) { + int num = temp.size(); + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSeqnoDescending); + } else { + int num = Version::number_of_files_to_sort_; + if (num > (int)temp.size()) { + num = temp.size(); + } + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSizeDescending); } - std::partial_sort(temp.begin(), temp.begin() + num, - temp.end(), compareSize); assert(temp.size() == files.size()); // initialize files_by_size_ @@ -1620,7 +1657,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } } @@ -1664,6 +1702,23 @@ const char* VersionSet::LevelDataSizeSummary( return scratch->buffer; } +const char* VersionSet::LevelFileSummary( + FileSummaryStorage* scratch, int level) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, "#%ld(seq=%ld,sz=%ld,%d) ", + f->number, f->smallest_seqno, + f->file_size, f->being_compacted); + if (ret < 0 || ret >= sz) + break; + len += ret; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + // Opens the mainfest file and reads all records // till it finds the record we are looking for. bool VersionSet::ManifestContains(const std::string& record) const { @@ -1961,6 +2016,171 @@ void VersionSet::SizeBeingCompacted(std::vector& sizes) { } } +Compaction* VersionSet::PickCompactionHybrid(int level, double score) { + assert (level == 0); + + // percentage flexibilty while comparing file sizes + uint64_t ratio = (uint64_t)options_->hybrid_size_ratio; + + if ((current_->files_[level].size() <= + (unsigned int)options_->level0_file_num_compaction_trigger)) { + Log(options_->info_log, "Hybrid: nothing to do\n"); + return nullptr; + } + VersionSet::FileSummaryStorage tmp; + Log(options_->info_log, "Hybrid: candidate files(%lu): %s\n", + current_->files_[level].size(), + LevelFileSummary(&tmp, 0)); + + Compaction* c = nullptr; + c = new Compaction(level, level, MaxFileSizeForLevel(level), + LLONG_MAX, NumberLevels()); + c->score_ = score; + + // The files are sorted from newest first to oldest last. + std::vector& file_by_time = current_->files_by_size_[level]; + FileMetaData* f = nullptr; + bool done = false; + assert(file_by_time.size() == current_->files_[level].size()); + + unsigned int max_files_to_compact = UINT_MAX; + + // Make two pass. The first pass considers a candidate file + // only if it is smaller than the total size accumulated so far. + // The second pass does not look at the slope of the + // file-size curve to decide what to pick for compaction. + for (int iter = 0; !done && iter < 2; iter++) { + + for (unsigned int loop = 0; loop < file_by_time.size(); ) { + + // Skip files that are already being compacted + for (f = nullptr; loop < file_by_time.size(); loop++) { + int index = file_by_time[loop]; + f = current_->files_[level][index]; + + if (!f->being_compacted) { + break; + } + Log(options_->info_log, "Hybrid: file %ld[%d] being compacted, skipping", + f->number, loop); + f = nullptr; + } + + // This file is not being compacted. Consider it as the + // first candidate to be compacted. + unsigned int candidate_count = 1; + uint64_t candidate_size = f != nullptr? f->file_size : 0; + if (f != nullptr) { + Log(options_->info_log, "Hybrid: Possible candidate file %ld[%d] %s.", + f->number, loop, iter == 0? "" : "forced "); + } + + // Check if the suceeding files need compaction. + for (unsigned int i = loop+1; + candidate_count < max_files_to_compact && i < file_by_time.size(); + i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + if (f->being_compacted) { + break; + } + // If this is the first iteration, then we pick files if the + // total candidate file size (increased by the specified ratio) + // is still larger than the next candidate file. + if (iter == 0) { + uint64_t sz = (candidate_size * (100 + ratio)) /100; + if (sz < f->file_size) { + break; + } + } + candidate_count++; + candidate_size += f->file_size; + } + + // Found a series of consecutive files that need compaction. + if (candidate_count >= (unsigned int) + options_->hybrid_min_numfiles_in_single_compaction) { + for (unsigned int i = loop; i < loop + candidate_count; i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + c->inputs_[0].push_back(f); + Log(options_->info_log, "Hybrid: Picking file %ld[%d] with size %ld %s", + f->number, i, f->file_size, + (iter == 0 ? "" : "forced")); + } + done = true; + break; + } else { + for (unsigned int i = loop; + i < loop + candidate_count && i < file_by_time.size(); i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + Log(options_->info_log, "Hybrid: Skipping file %ld[%d] with size %ld %d %s", + f->number, i, f->file_size, f->being_compacted, + (iter == 0 ? "" : "forced")); + } + } + loop += candidate_count; + } + assert(done || c->inputs_[0].size() == 0); + + // If we are unable to find a normal compaction run and we are still + // above the compaction threshold, iterate again to pick compaction + // candidates, this time without considering their size differences. + if (!done) { + int files_not_in_compaction = 0; + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + f = current_->files_[level][i]; + if (!f->being_compacted) { + files_not_in_compaction++; + } + } + int expected_num_files = files_not_in_compaction + + compactions_in_progress_[level].size(); + if (expected_num_files <= + options_->level0_file_num_compaction_trigger + 1) { + done = true; // nothing more to do + } else { + max_files_to_compact = expected_num_files - + options_->level0_file_num_compaction_trigger; + Log(options_->info_log, "Hybrid: second loop with maxfiles %d", + max_files_to_compact); + } + } + } + if (c->inputs_[0].size() <= 1) { + Log(options_->info_log, "Hybrid: only %ld files, nothing to do.\n", + c->inputs_[0].size()); + delete c; + return nullptr; + } + + // validate that all the chosen files are non overlapping in time + FileMetaData* newerfile __attribute__((unused)) = nullptr; + for (unsigned int i = 0; i < c->inputs_[0].size(); i++) { + FileMetaData* f = c->inputs_[0][i]; + assert (f->smallest_seqno <= f->largest_seqno); + assert(newerfile == nullptr || + newerfile->smallest_seqno > f->largest_seqno); + newerfile = f; + } + + // update statistics + options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION, + c->inputs_[0].size()); + + c->input_version_ = current_; + c->input_version_->Ref(); + + // mark all the files that are being compacted + c->MarkFilesBeingCompacted(true); + + // remember this currently undergoing compaction + compactions_in_progress_[level].insert(c); + + return c; +} + Compaction* VersionSet::PickCompactionBySize(int level, double score) { Compaction* c = nullptr; @@ -1974,7 +2194,7 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { assert(level >= 0); assert(level+1 < NumberLevels()); - c = new Compaction(level, MaxFileSizeForLevel(level), + c = new Compaction(level, level+1, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->score_ = score; @@ -2044,6 +2264,13 @@ Compaction* VersionSet::PickCompaction() { current_->vset_->SizeBeingCompacted(size_being_compacted); Finalize(current_, size_being_compacted); + // In hybrid mode compact L0 files back into L0. + if (options_->hybrid_mode) { + int level = 0; + c = PickCompactionHybrid(level, current_->compaction_score_[level]); + return c; + } + // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. // @@ -2072,7 +2299,7 @@ Compaction* VersionSet::PickCompaction() { if (level != 0 || compactions_in_progress_[0].empty()) { if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, &parent_index)) { - c = new Compaction(level, MaxFileSizeForLevel(level), + c = new Compaction(level, level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels(), true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; @@ -2246,8 +2473,9 @@ Compaction* VersionSet::CompactRange( } } } + int out_level = options_->hybrid_mode ? level : level+1; - Compaction* c = new Compaction(level, MaxFileSizeForLevel(level), + Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->input_version_ = current_; c->input_version_->Ref(); @@ -2261,10 +2489,11 @@ Compaction* VersionSet::CompactRange( return c; } -Compaction::Compaction(int level, uint64_t target_file_size, +Compaction::Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction) : level_(level), + out_level_(out_level), max_output_file_size_(target_file_size), maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), input_version_(nullptr), diff --git a/db/version_set.h b/db/version_set.h index ba924126f..85c02b973 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -340,6 +340,9 @@ class VersionSet { struct LevelSummaryStorage { char buffer[100]; }; + struct FileSummaryStorage { + char buffer[1000]; + }; const char* LevelSummary(LevelSummaryStorage* scratch) const; // printf contents (for debugging) @@ -350,6 +353,10 @@ class VersionSet { // of files per level. Uses *scratch as backing store. const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const; + // Return a human-readable short (single-line) summary of files + // in a specified level. Uses *scratch as backing store. + const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; + // Return the size of the current manifest file const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } @@ -359,6 +366,9 @@ class VersionSet { // function will return nullptr. Compaction* PickCompactionBySize(int level, double score); + // Pick files to compact in hybrid mode + Compaction* PickCompactionHybrid(int level, double score); + // Free up the files that were participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); @@ -489,9 +499,12 @@ class Compaction { ~Compaction(); // Return the level that is being compacted. Inputs from "level" - // and "level+1" will be merged to produce a set of "level+1" files. + // will be merged. int level() const { return level_; } + // Outputs will go to this level + int output_level() const { return out_level_; } + // Return the object that holds the edits to the descriptor done // by this compaction. VersionEdit* edit() { return edit_; } @@ -534,11 +547,12 @@ class Compaction { friend class Version; friend class VersionSet; - explicit Compaction(int level, uint64_t target_file_size, + explicit Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction = false); int level_; + int out_level_; // levels to which output files are stored uint64_t max_output_file_size_; int64_t maxGrandParentOverlapBytes_; Version* input_version_; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index fa69a7eff..b3f85e691 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -476,6 +476,17 @@ struct Options { // Default: 0 uint64_t bytes_per_sync; + // Hybrid Mode. There is only a single level and files in L0 are + // compacted back into L0. Default: false + bool hybrid_mode; + + // Percentage flexibilty while comparing file size. If the candidate file(s) + // size is 1% smaller than the next file's size, then include next file into + // this candidate set. // Default: 1 + int hybrid_size_ratio; + + // The minimum number of files in a single compaction run. Default: 2 + int hybrid_min_numfiles_in_single_compaction; }; // Options that control read operations diff --git a/include/leveldb/statistics.h b/include/leveldb/statistics.h index 155974659..883d30c41 100644 --- a/include/leveldb/statistics.h +++ b/include/leveldb/statistics.h @@ -106,7 +106,8 @@ enum Histograms { READ_BLOCK_COMPACTION_MICROS = 9, READ_BLOCK_GET_MICROS = 10, WRITE_RAW_BLOCK_MICROS = 11, - HISTOGRAM_ENUM_MAX = 12 + NUM_FILES_IN_SINGLE_COMPACTION = 12, + HISTOGRAM_ENUM_MAX = 13 }; const std::vector> HistogramsNameMap = { @@ -121,7 +122,8 @@ const std::vector> HistogramsNameMap = { { DB_MULTIGET, "rocksdb.db.multiget.micros" }, { READ_BLOCK_COMPACTION_MICROS, "rocksdb.read.block.compaction.micros" }, { READ_BLOCK_GET_MICROS, "rocksdb.read.block.get.micros" }, - { WRITE_RAW_BLOCK_MICROS, "rocksdb.write.raw.block.micros" } + { WRITE_RAW_BLOCK_MICROS, "rocksdb.write.raw.block.micros" }, + { NUM_FILES_IN_SINGLE_COMPACTION, "rocksdb.numfiles.in.singlecompaction" } }; struct HistogramData { diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 02afe306c..4c671d84f 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -88,6 +88,9 @@ static int FLAGS_max_write_buffer_number = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// This is initialized to default value of false +static bool FLAGS_hybrid_mode = false; + // Number of bytes to use as a cache of uncompressed data. static long FLAGS_cache_size = 2 * KB * KB * KB; @@ -930,6 +933,7 @@ class StressTest { options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.max_background_compactions = FLAGS_max_background_compactions; + options.hybrid_mode = FLAGS_hybrid_mode; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1016,6 +1020,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_hybrid_mode = + leveldb::Options().hybrid_mode; FLAGS_level0_file_num_compaction_trigger = leveldb::Options().level0_file_num_compaction_trigger; FLAGS_level0_slowdown_writes_trigger = @@ -1068,6 +1074,8 @@ int main(int argc, char** argv) { FLAGS_max_write_buffer_number = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_mode = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/util/options.cc b/util/options.cc index a8222ad5c..a38434749 100644 --- a/util/options.cc +++ b/util/options.cc @@ -76,7 +76,10 @@ Options::Options() advise_random_on_open(true), access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), - bytes_per_sync(0) { + bytes_per_sync(0), + hybrid_mode(false), + hybrid_size_ratio(1), + hybrid_min_numfiles_in_single_compaction(2) { } static const char* const access_hints[] = { @@ -217,6 +220,12 @@ Options::Dump(Logger* log) const use_adaptive_mutex); Log(log," Options.bytes_per_sync: %ld", bytes_per_sync); + Log(log," Options.hybrid_mode: %d", + hybrid_mode); + Log(log," Options.hybrid_size_ratio: %d", + hybrid_size_ratio); + Log(log,"Options.hybrid_min_numfiles_in_single_compaction: %d", + hybrid_min_numfiles_in_single_compaction); } // Options::Dump // From 116ec527f2e0f790e1220fdea0cbb9cd8ba81f0d Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 3 Jul 2013 15:32:49 -0700 Subject: [PATCH 3/8] Renamed 'hybrid_compaction' tp be "Universal Compaction'. Summary: All the universal compaction parameters are encapsulated in a new file universal_compaction.h Test Plan: make check --- db/db_bench.cc | 42 +++++++++---------- db/db_impl.cc | 9 ++-- db/version_set.cc | 32 ++++++++------- include/leveldb/options.h | 21 +++++----- include/leveldb/universal_compaction.h | 57 ++++++++++++++++++++++++++ tools/db_stress.cc | 12 +++--- util/ldb_cmd.cc | 2 +- util/options.cc | 18 ++++---- 8 files changed, 128 insertions(+), 65 deletions(-) create mode 100644 include/leveldb/universal_compaction.h diff --git a/db/db_bench.cc b/db/db_bench.cc index f45777801..ecd979ec8 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -9,6 +9,7 @@ #include "db/db_impl.h" #include "db/version_set.h" #include "db/db_statistics.h" +#include "leveldb/options.h" #include "leveldb/cache.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -125,7 +126,7 @@ static int FLAGS_max_write_buffer_number = 0; // The minimum number of write buffers that will be merged together // before writing to storage. This is cheap because it is an // in-memory merge. If this feature is not enabled, then all these -// write buffers are fushed to L0 as seperate files and this increases +// write buffers are fushed to L0 as separate files and this increases // read amplification because a get request has to check in all of these // files. Also, an in-memory merge may result in writing lesser // data to storage if there are duplicate records in each of these @@ -137,14 +138,14 @@ static int FLAGS_min_write_buffer_number_to_merge = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; -// Run database in hybrid mode where all data resides in L0. -static bool FLAGS_hybrid_mode = false; +// style of compaction: level-based vs universal +static leveldb::CompactionStyle FLAGS_compaction_style = leveldb::kCompactionStyleLevel; // Percentage flexibilty while comparing file size. -static int FLAGS_hybrid_size_ratio = 1; +static int FLAGS_universal_size_ratio = 1; // The minimum number of files in a single compaction run. -static int FLAGS_hybrid_min_numfiles_in_single_compaction = 2; +static int FLAGS_compaction_universal_min_merge_width = 2; // Number of bytes to use as a cache of uncompressed data. // Negative means use default settings. @@ -1113,10 +1114,10 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; - options.hybrid_mode = FLAGS_hybrid_mode; - options.hybrid_size_ratio = FLAGS_hybrid_size_ratio; - options.hybrid_min_numfiles_in_single_compaction = - FLAGS_hybrid_min_numfiles_in_single_compaction; + options.compaction_style = FLAGS_compaction_style; + options.compaction_options_universal.size_ratio = FLAGS_universal_size_ratio; + options.compaction_options_universal.min_merge_width = + FLAGS_compaction_universal_min_merge_width; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1999,12 +2000,11 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; - FLAGS_hybrid_mode = - leveldb::Options().hybrid_mode; - FLAGS_hybrid_size_ratio = - leveldb::Options().hybrid_size_ratio; - FLAGS_hybrid_min_numfiles_in_single_compaction = - leveldb::Options().hybrid_min_numfiles_in_single_compaction; + FLAGS_compaction_style = leveldb::Options().compaction_style; + FLAGS_universal_size_ratio = + leveldb::Options().compaction_options_universal.size_ratio; + FLAGS_compaction_universal_min_merge_width = + leveldb::Options().compaction_options_universal.min_merge_width; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; FLAGS_use_os_buffer = leveldb::EnvOptions().use_os_buffer; @@ -2063,13 +2063,13 @@ int main(int argc, char** argv) { FLAGS_min_write_buffer_number_to_merge = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; - } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { - FLAGS_hybrid_mode = n; - } else if (sscanf(argv[i], "--hybrid_size_ratio=%d%c", &n, &junk) == 1) { - FLAGS_hybrid_size_ratio = n; - } else if (sscanf(argv[i], "--hybrid_min_numfiles_in_single_compaction=%d%c", + } else if (sscanf(argv[i], "--compaction_style=%d%c", &n, &junk) == 1) { + FLAGS_compaction_style = (leveldb::CompactionStyle)n; + } else if (sscanf(argv[i], "--universal_size_ratio=%d%c", &n, &junk) == 1) { + FLAGS_universal_size_ratio = n; + } else if (sscanf(argv[i], "--universal_min_merge_width=%d%c", &n, &junk) == 1) { - FLAGS_hybrid_min_numfiles_in_single_compaction = n; + FLAGS_compaction_universal_min_merge_width = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index d5bd3cbd9..37fe889a5 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -836,7 +836,7 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // threads could be concurrently producing compacted files for // that key range. if (base != nullptr && options_.max_background_compactions <= 1 && - !options_.hybrid_mode) { + options_.compaction_style == kCompactionStyleLevel) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, @@ -1578,7 +1578,8 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( - options_.hybrid_mode? level : level + 1, + (options_.compaction_style == kCompactionStyleUniversal) ? + level : level + 1, out.number, out.file_size, out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); } @@ -1828,9 +1829,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - // Hybrid mode depends on the sequence number to determine + // Universal mode depends on the sequence number to determine // time-order of files that is needed for compactions. - if (!options_.hybrid_mode && + if (options_.compaction_style == kCompactionStyleLevel && bottommost_level && ikey.sequence < earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); diff --git a/db/version_set.cc b/db/version_set.cc index 0398bf6f6..3cd2b6683 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -384,7 +384,7 @@ void Version::Get(const ReadOptions& options, } if (tmp.empty()) continue; - if (vset_->options_->hybrid_mode) { + if (vset_->options_->compaction_style == kCompactionStyleUniversal) { std::sort(tmp.begin(), tmp.end(), NewestFirstBySeqNo); } else { std::sort(tmp.begin(), tmp.end(), NewestFirst); @@ -1581,7 +1581,7 @@ static bool compareSizeDescending(const VersionSet::Fsize& first, return (first.file->file_size > second.file->file_size); } // A static compator used to sort files based on their seqno -// In hybrid mode: descending seqno +// In universal style : descending seqno static bool compareSeqnoDescending(const VersionSet::Fsize& first, const VersionSet::Fsize& second) { if (first.file->smallest_seqno > second.file->smallest_seqno) { @@ -1596,8 +1596,8 @@ static bool compareSeqnoDescending(const VersionSet::Fsize& first, void VersionSet::UpdateFilesBySize(Version* v) { // No need to sort the highest level because it is never compacted. - int max_level = options_->hybrid_mode? NumberLevels() : - NumberLevels() - 1; + int max_level = (options_->compaction_style == kCompactionStyleUniversal) ? + NumberLevels() : NumberLevels() - 1; for (int level = 0; level < max_level; level++) { @@ -1613,7 +1613,7 @@ void VersionSet::UpdateFilesBySize(Version* v) { } // sort the top number_of_files_to_sort_ based on file size - if (options_->hybrid_mode) { + if (options_->compaction_style == kCompactionStyleUniversal) { int num = temp.size(); std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), compareSeqnoDescending); @@ -2020,7 +2020,11 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { assert (level == 0); // percentage flexibilty while comparing file sizes - uint64_t ratio = (uint64_t)options_->hybrid_size_ratio; + uint64_t ratio = options_->compaction_options_universal.size_ratio; + unsigned int min_merge_width = + options_->compaction_options_universal.min_merge_width; + unsigned int max_merge_width = + options_->compaction_options_universal.max_merge_width; if ((current_->files_[level].size() <= (unsigned int)options_->level0_file_num_compaction_trigger)) { @@ -2043,7 +2047,7 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { bool done = false; assert(file_by_time.size() == current_->files_[level].size()); - unsigned int max_files_to_compact = UINT_MAX; + unsigned int max_files_to_compact = std::min(max_merge_width, UINT_MAX); // Make two pass. The first pass considers a candidate file // only if it is smaller than the total size accumulated so far. @@ -2098,8 +2102,7 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { } // Found a series of consecutive files that need compaction. - if (candidate_count >= (unsigned int) - options_->hybrid_min_numfiles_in_single_compaction) { + if (candidate_count >= (unsigned int)min_merge_width) { for (unsigned int i = loop; i < loop + candidate_count; i++) { int index = file_by_time[i]; FileMetaData* f = current_->files_[level][index]; @@ -2141,8 +2144,8 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { options_->level0_file_num_compaction_trigger + 1) { done = true; // nothing more to do } else { - max_files_to_compact = expected_num_files - - options_->level0_file_num_compaction_trigger; + max_files_to_compact = std::min((int)max_merge_width, + expected_num_files - options_->level0_file_num_compaction_trigger); Log(options_->info_log, "Hybrid: second loop with maxfiles %d", max_files_to_compact); } @@ -2264,8 +2267,8 @@ Compaction* VersionSet::PickCompaction() { current_->vset_->SizeBeingCompacted(size_being_compacted); Finalize(current_, size_being_compacted); - // In hybrid mode compact L0 files back into L0. - if (options_->hybrid_mode) { + // In universal style of compaction, compact L0 files back into L0. + if (options_->compaction_style == kCompactionStyleUniversal) { int level = 0; c = PickCompactionHybrid(level, current_->compaction_score_[level]); return c; @@ -2473,7 +2476,8 @@ Compaction* VersionSet::CompactRange( } } } - int out_level = options_->hybrid_mode ? level : level+1; + int out_level = (options_->compaction_style == kCompactionStyleUniversal) ? + level : level+1; Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index b3f85e691..ac16b032c 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -12,6 +12,7 @@ #include #include "leveldb/slice.h" #include "leveldb/statistics.h" +#include "leveldb/universal_compaction.h" namespace leveldb { @@ -39,6 +40,11 @@ enum CompressionType { kBZip2Compression = 0x3 }; +enum CompactionStyle { + kCompactionStyleLevel = 0x0, // level based compaction style + kCompactionStyleUniversal = 0x1 // Universal compaction style +}; + // Compression options for different compression algorithms like Zlib struct CompressionOptions { int window_bits; @@ -132,7 +138,7 @@ struct Options { int max_write_buffer_number; // The minimum number of write buffers that will be merged together - // before writing to storage. If set to 1, then + // before writing to storage. If set to 1, then // all write buffers are fushed to L0 as individual files and this increases // read amplification because a get request has to check in all of these // files. Also, an in-memory merge may result in writing lesser @@ -476,17 +482,12 @@ struct Options { // Default: 0 uint64_t bytes_per_sync; - // Hybrid Mode. There is only a single level and files in L0 are + // The compaction style // compacted back into L0. Default: false - bool hybrid_mode; - - // Percentage flexibilty while comparing file size. If the candidate file(s) - // size is 1% smaller than the next file's size, then include next file into - // this candidate set. // Default: 1 - int hybrid_size_ratio; + CompactionStyle compaction_style; - // The minimum number of files in a single compaction run. Default: 2 - int hybrid_min_numfiles_in_single_compaction; + // The options needed to support Universal Style compactions + CompactionOptionsUniversal compaction_options_universal; }; // Options that control read operations diff --git a/include/leveldb/universal_compaction.h b/include/leveldb/universal_compaction.h new file mode 100644 index 000000000..df8402ca4 --- /dev/null +++ b/include/leveldb/universal_compaction.h @@ -0,0 +1,57 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H +#define STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H + +#include +#include +#include +#include +#include +#include +#include "leveldb/slice.h" +#include "leveldb/statistics.h" + +namespace leveldb { + +// +// Algorithm used to make a compaction request stop picking new files +// into a single compaction run +// +enum CompactionStopStyle { + kCompactionStopStyleSimilarSize, // pick files of similar size + kCompactionStopStyleTotalSize // total size of picked files > next file +}; + +class CompactionOptionsUniversal { + public: + + // Percentage flexibilty while comparing file size. If the candidate file(s) + // size is 1% smaller than the next file's size, then include next file into + // this candidate set. // Default: 1 + unsigned int size_ratio; + + // The minimum number of files in a single compaction run. Default: 2 + unsigned int min_merge_width; + + // The maximum number of files in a single compaction run. Default: INT_MAX + unsigned int max_merge_width; + + // The algorithm used to stop picking files into a single compaction run + // Default: kCompactionStopStyleTotalSize + CompactionStopStyle stop_style; + + // Default set of parameters + CompactionOptionsUniversal() : + size_ratio(1), + min_merge_width(2), + max_merge_width(UINT_MAX), + stop_style(kCompactionStopStyleTotalSize) { + } +}; + +} // namespace leveldb + +#endif // STORAGE_ROCKSDB_UNIVERSAL_COMPACTION_OPTIONS_H diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 4c671d84f..cf4951b5c 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -89,7 +89,7 @@ static int FLAGS_max_write_buffer_number = 0; static int FLAGS_max_background_compactions = 0; // This is initialized to default value of false -static bool FLAGS_hybrid_mode = false; +static leveldb::CompactionStyle FLAGS_compaction_style = leveldb::kCompactionStyleLevel; // Number of bytes to use as a cache of uncompressed data. static long FLAGS_cache_size = 2 * KB * KB * KB; @@ -933,7 +933,7 @@ class StressTest { options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.max_background_compactions = FLAGS_max_background_compactions; - options.hybrid_mode = FLAGS_hybrid_mode; + options.compaction_style = FLAGS_compaction_style; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1020,8 +1020,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; - FLAGS_hybrid_mode = - leveldb::Options().hybrid_mode; + FLAGS_compaction_style = + leveldb::Options().compaction_style; FLAGS_level0_file_num_compaction_trigger = leveldb::Options().level0_file_num_compaction_trigger; FLAGS_level0_slowdown_writes_trigger = @@ -1074,8 +1074,8 @@ int main(int argc, char** argv) { FLAGS_max_write_buffer_number = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; - } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { - FLAGS_hybrid_mode = n; + } else if (sscanf(argv[i], "--compaction_style=%d%c", &n, &junk) == 1) { + FLAGS_compaction_style = (leveldb::CompactionStyle)n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 780d3d2a4..d4fdf8c48 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1070,7 +1070,7 @@ void ApproxSizeCommand::DoCommand() { uint64_t sizes[1]; db_->GetApproximateSizes(ranges, 1, sizes); fprintf(stdout, "%ld\n", sizes[0]); - /* Wierd that GetApproximateSizes() returns void, although documentation + /* Weird that GetApproximateSizes() returns void, although documentation * says that it returns a Status object. if (!st.ok()) { exec_state_ = LDBCommandExecuteResult::FAILED(st.ToString()); diff --git a/util/options.cc b/util/options.cc index a38434749..c58d9614d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -77,9 +77,7 @@ Options::Options() access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), bytes_per_sync(0), - hybrid_mode(false), - hybrid_size_ratio(1), - hybrid_min_numfiles_in_single_compaction(2) { + compaction_style(kCompactionStyleLevel) { } static const char* const access_hints[] = { @@ -220,12 +218,14 @@ Options::Dump(Logger* log) const use_adaptive_mutex); Log(log," Options.bytes_per_sync: %ld", bytes_per_sync); - Log(log," Options.hybrid_mode: %d", - hybrid_mode); - Log(log," Options.hybrid_size_ratio: %d", - hybrid_size_ratio); - Log(log,"Options.hybrid_min_numfiles_in_single_compaction: %d", - hybrid_min_numfiles_in_single_compaction); + Log(log," Options.compaction_style: %d", + compaction_style); + Log(log," Options.compaction_options_universal.size_ratio: %d", + compaction_options_universal.size_ratio); + Log(log," Options.compaction_options_universal.min_merge_width: %d", + compaction_options_universal.min_merge_width); + Log(log," Options.compaction_options_universal.max_merge_width: %d", + compaction_options_universal.max_merge_width); } // Options::Dump // From 7cb8d462d5078088395075ed96f5582e0e34f24c Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Tue, 9 Jul 2013 16:08:54 -0700 Subject: [PATCH 4/8] Rename PickCompactionHybrid to PickCompactionUniversal. Summary: Rename PickCompactionHybrid to PickCompactionUniversal. Changed a few LOG message from "Hybrid:" to "Universal:". Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- db/version_set.cc | 20 ++++++++++---------- db/version_set.h | 4 ++-- include/leveldb/options.h | 3 +-- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index 3cd2b6683..088550120 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2016,7 +2016,7 @@ void VersionSet::SizeBeingCompacted(std::vector& sizes) { } } -Compaction* VersionSet::PickCompactionHybrid(int level, double score) { +Compaction* VersionSet::PickCompactionUniversal(int level, double score) { assert (level == 0); // percentage flexibilty while comparing file sizes @@ -2028,11 +2028,11 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { if ((current_->files_[level].size() <= (unsigned int)options_->level0_file_num_compaction_trigger)) { - Log(options_->info_log, "Hybrid: nothing to do\n"); + Log(options_->info_log, "Universal: nothing to do\n"); return nullptr; } VersionSet::FileSummaryStorage tmp; - Log(options_->info_log, "Hybrid: candidate files(%lu): %s\n", + Log(options_->info_log, "Universal: candidate files(%lu): %s\n", current_->files_[level].size(), LevelFileSummary(&tmp, 0)); @@ -2065,7 +2065,7 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { if (!f->being_compacted) { break; } - Log(options_->info_log, "Hybrid: file %ld[%d] being compacted, skipping", + Log(options_->info_log, "Universal: file %ld[%d] being compacted, skipping", f->number, loop); f = nullptr; } @@ -2075,7 +2075,7 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { unsigned int candidate_count = 1; uint64_t candidate_size = f != nullptr? f->file_size : 0; if (f != nullptr) { - Log(options_->info_log, "Hybrid: Possible candidate file %ld[%d] %s.", + Log(options_->info_log, "Universal: Possible candidate file %ld[%d] %s.", f->number, loop, iter == 0? "" : "forced "); } @@ -2107,7 +2107,7 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { int index = file_by_time[i]; FileMetaData* f = current_->files_[level][index]; c->inputs_[0].push_back(f); - Log(options_->info_log, "Hybrid: Picking file %ld[%d] with size %ld %s", + Log(options_->info_log, "Universal: Picking file %ld[%d] with size %ld %s", f->number, i, f->file_size, (iter == 0 ? "" : "forced")); } @@ -2118,7 +2118,7 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { i < loop + candidate_count && i < file_by_time.size(); i++) { int index = file_by_time[i]; FileMetaData* f = current_->files_[level][index]; - Log(options_->info_log, "Hybrid: Skipping file %ld[%d] with size %ld %d %s", + Log(options_->info_log, "Universal: Skipping file %ld[%d] with size %ld %d %s", f->number, i, f->file_size, f->being_compacted, (iter == 0 ? "" : "forced")); } @@ -2146,13 +2146,13 @@ Compaction* VersionSet::PickCompactionHybrid(int level, double score) { } else { max_files_to_compact = std::min((int)max_merge_width, expected_num_files - options_->level0_file_num_compaction_trigger); - Log(options_->info_log, "Hybrid: second loop with maxfiles %d", + Log(options_->info_log, "Universal: second loop with maxfiles %d", max_files_to_compact); } } } if (c->inputs_[0].size() <= 1) { - Log(options_->info_log, "Hybrid: only %ld files, nothing to do.\n", + Log(options_->info_log, "Universal: only %ld files, nothing to do.\n", c->inputs_[0].size()); delete c; return nullptr; @@ -2270,7 +2270,7 @@ Compaction* VersionSet::PickCompaction() { // In universal style of compaction, compact L0 files back into L0. if (options_->compaction_style == kCompactionStyleUniversal) { int level = 0; - c = PickCompactionHybrid(level, current_->compaction_score_[level]); + c = PickCompactionUniversal(level, current_->compaction_score_[level]); return c; } diff --git a/db/version_set.h b/db/version_set.h index 85c02b973..bec738340 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -366,8 +366,8 @@ class VersionSet { // function will return nullptr. Compaction* PickCompactionBySize(int level, double score); - // Pick files to compact in hybrid mode - Compaction* PickCompactionHybrid(int level, double score); + // Pick files to compact in Universal mode + Compaction* PickCompactionUniversal(int level, double score); // Free up the files that were participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index ac16b032c..1e88c8c8f 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -482,8 +482,7 @@ struct Options { // Default: 0 uint64_t bytes_per_sync; - // The compaction style - // compacted back into L0. Default: false + // The compaction style. Default: kCompactionStyleLevel CompactionStyle compaction_style; // The options needed to support Universal Style compactions From 289efe992257f6e6bda379181a78506658661e0a Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Tue, 9 Jul 2013 16:17:00 -0700 Subject: [PATCH 5/8] Update statistics only if needed. Summary: Update statistics only if needed. Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- db/version_set.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index 088550120..876e41e84 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2169,8 +2169,10 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) { } // update statistics - options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION, - c->inputs_[0].size()); + if (options_->statistics != nullptr) { + options_->statistics->measureTime(NUM_FILES_IN_SINGLE_COMPACTION, + c->inputs_[0].size()); + } c->input_version_ = current_; c->input_version_->Ref(); From 76a4923307a178b66bf294dff3560db0ef62efdb Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 17 Jul 2013 13:56:24 -0700 Subject: [PATCH 6/8] Make file-sizes and grandparentoverlap to be unsigned to avoid bad comparisions. Summary: The maxGrandParentOverlapBytes_ was signed which was causing an erroneous comparision between signed and unsigned longs. This, in turn, was causing compaction-created-output-files to be very small in size. Test Plan: make check Differential Revision: https://reviews.facebook.net/D11727 --- db/version_set.cc | 22 +++++++++++----------- db/version_set.h | 8 ++++---- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index 876e41e84..4b2774ecd 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -23,8 +23,8 @@ namespace leveldb { -static int64_t TotalFileSize(const std::vector& files) { - int64_t sum = 0; +static uint64_t TotalFileSize(const std::vector& files) { + uint64_t sum = 0; for (size_t i = 0; i < files.size() && files[i]; i++) { sum += files[i]->file_size; } @@ -527,7 +527,7 @@ int Version::PickLevelForMemTableOutput( break; } GetOverlappingInputs(level + 2, &start, &limit, &overlaps); - const int64_t sum = TotalFileSize(overlaps); + const uint64_t sum = TotalFileSize(overlaps); if (sum > vset_->MaxGrandParentOverlapBytes(level)) { break; } @@ -1824,14 +1824,14 @@ int64_t VersionSet::NumLevelBytes(int level) const { } int64_t VersionSet::MaxNextLevelOverlappingBytes() { - int64_t result = 0; + uint64_t result = 0; std::vector overlaps; for (int level = 1; level < NumberLevels() - 1; level++) { for (size_t i = 0; i < current_->files_[level].size(); i++) { const FileMetaData* f = current_->files_[level][i]; current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest, &overlaps); - const int64_t sum = TotalFileSize(overlaps); + const uint64_t sum = TotalFileSize(overlaps); if (sum > result) { result = sum; } @@ -1927,13 +1927,13 @@ uint64_t VersionSet::MaxFileSizeForLevel(int level) { return max_file_size_[level]; } -int64_t VersionSet::ExpandedCompactionByteSizeLimit(int level) { +uint64_t VersionSet::ExpandedCompactionByteSizeLimit(int level) { uint64_t result = MaxFileSizeForLevel(level); result *= options_->expanded_compaction_factor; return result; } -int64_t VersionSet::MaxGrandParentOverlapBytes(int level) { +uint64_t VersionSet::MaxGrandParentOverlapBytes(int level) { uint64_t result = MaxFileSizeForLevel(level); result *= options_->max_grandparent_overlap_factor; return result; @@ -2394,10 +2394,10 @@ void VersionSet::SetupOtherInputs(Compaction* c) { std::vector expanded0; current_->GetOverlappingInputs(level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); - const int64_t inputs0_size = TotalFileSize(c->inputs_[0]); - const int64_t inputs1_size = TotalFileSize(c->inputs_[1]); - const int64_t expanded0_size = TotalFileSize(expanded0); - int64_t limit = ExpandedCompactionByteSizeLimit(level); + const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]); + const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]); + const uint64_t expanded0_size = TotalFileSize(expanded0); + uint64_t limit = ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && !FilesInCompaction(expanded0)) { diff --git a/db/version_set.h b/db/version_set.h index bec738340..56eec7ecd 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -422,9 +422,9 @@ class VersionSet { bool ManifestContains(const std::string& record) const; - int64_t ExpandedCompactionByteSizeLimit(int level); + uint64_t ExpandedCompactionByteSizeLimit(int level); - int64_t MaxGrandParentOverlapBytes(int level); + uint64_t MaxGrandParentOverlapBytes(int level); Env* const env_; const std::string dbname_; @@ -554,7 +554,7 @@ class Compaction { int level_; int out_level_; // levels to which output files are stored uint64_t max_output_file_size_; - int64_t maxGrandParentOverlapBytes_; + uint64_t maxGrandParentOverlapBytes_; Version* input_version_; VersionEdit* edit_; int number_levels_; @@ -569,7 +569,7 @@ class Compaction { std::vector grandparents_; size_t grandparent_index_; // Index in grandparent_starts_ bool seen_key_; // Some output key has been seen - int64_t overlapped_bytes_; // Bytes of overlap between current output + uint64_t overlapped_bytes_; // Bytes of overlap between current output // and grandparent files int base_index_; // index of the file in files_[level_] int parent_index_; // index of some file with same range in files_[level_+1] From 9357a53a7daf9071fc7e46bb6b511c318f02fdbe Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 17 Jul 2013 15:08:56 -0700 Subject: [PATCH 7/8] Fix merge problems with options. Summary: Fix merge problems with options. Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- util/options.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/options.cc b/util/options.cc index 0ceba55a7..c2c7e15eb 100644 --- a/util/options.cc +++ b/util/options.cc @@ -75,7 +75,7 @@ Options::Options() access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), bytes_per_sync(0), - compaction_style(kCompactionStyleLevel) { + compaction_style(kCompactionStyleLevel), deletes_check_filter_first(false) { } From a91fdf1b998d9a2e5afb636b5b84712de47659d7 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 24 Jul 2013 14:20:54 -0700 Subject: [PATCH 8/8] The target file size for L0 files was incorrectly set to LLONG_MAX. Summary: The target file size should be valid value. Only if UniversalCompactionStyle is enabled then set max file size to be LLONG_MAX. Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- db/version_set.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/version_set.cc b/db/version_set.cc index c8877fb82..79965c962 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -1039,8 +1039,8 @@ void VersionSet::Init(int num_levels) { int target_file_size_multiplier = options_->target_file_size_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; for (int i = 0; i < num_levels; i++) { - if (i == 0) { - max_file_size_[i] = LLONG_MAX; + if (i == 0 && options_->compaction_style == kCompactionStyleUniversal) { + max_file_size_[i] = ULLONG_MAX; level_max_bytes_[i] = options_->max_bytes_for_level_base; } else if (i > 1) { max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier;