From 554c06dd18d32866eb2430497a70c947923a4dbb Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Thu, 13 Jun 2013 22:09:08 -0700 Subject: [PATCH] Reduce write amplification by merging files in L0 back into L0 Summary: There is a new option called hybrid_mode which, when switched on, causes HBase style compactions. Files from L0 are compacted back into L0. This meat of this compaction algorithm is in PickCompactionHybrid(). All files reside in L0. That means all files have overlapping keys. Each file has a time-bound, i.e. each file contains a range of keys that were inserted around the same time. The start-seqno and the end-seqno refers to the timeframe when these keys were inserted. Files that have contiguous seqno are compacted together into a larger file. All files are ordered from most recent to the oldest. The current compaction algorithm starts to look for candidate files starting from the most recent file. It continues to add more files to the same compaction run as long as the sum of the files chosen till now is smaller than the next candidate file size. This logic needs to be debated and validated. The above logic should reduce write amplification to a large extent... will publish numbers shortly. Test Plan: dbstress runs for 6 hours with no data corruption (tested so far). Differential Revision: https://reviews.facebook.net/D11289 --- db/builder.cc | 9 ++ db/db_bench.cc | 8 ++ db/db_impl.cc | 35 ++++-- db/db_test.cc | 4 +- db/dbformat.h | 9 ++ db/repair.cc | 8 +- db/version_edit.cc | 25 +++- db/version_edit.h | 9 +- db/version_edit_test.cc | 4 +- db/version_set.cc | 254 +++++++++++++++++++++++++++++++++++--- db/version_set.h | 18 ++- include/leveldb/options.h | 3 + tools/db_stress.cc | 8 ++ util/options.cc | 5 +- 14 files changed, 365 insertions(+), 34 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 2b7c59283..db09c0d8c 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -28,6 +28,7 @@ Status BuildTable(const std::string& dbname, const SequenceNumber earliest_seqno_in_memtable) { Status s; meta->file_size = 0; + meta->smallest_seqno = meta->largest_seqno = 0; iter->SeekToFirst(); // If the sequence number of the smallest entry in the memtable is @@ -50,6 +51,8 @@ Status BuildTable(const std::string& dbname, // the first key is the smallest key Slice key = iter->key(); meta->smallest.DecodeFrom(key); + meta->smallest_seqno = GetInternalKeySeqno(key); + meta->largest_seqno = meta->smallest_seqno; MergeHelper merge(user_comparator, options.merge_operator, options.info_log.get(), @@ -124,12 +127,18 @@ Status BuildTable(const std::string& dbname, // output last key builder->Add(Slice(prev_key), Slice(prev_value)); meta->largest.DecodeFrom(Slice(prev_key)); + SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key)); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } else { for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); meta->largest.DecodeFrom(key); builder->Add(key, iter->value()); + SequenceNumber seqno = GetInternalKeySeqno(key); + meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); + meta->largest_seqno = std::max(meta->largest_seqno, seqno); } } diff --git a/db/db_bench.cc b/db/db_bench.cc index 179896a77..ad0765a31 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -137,6 +137,9 @@ static int FLAGS_min_write_buffer_number_to_merge = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// Run database in hybrid mode where all data resides in L0. +static bool FLAGS_hybrid_mode = false; + // Number of bytes to use as a cache of uncompressed data. // Negative means use default settings. static long FLAGS_cache_size = -1; @@ -1104,6 +1107,7 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.min_write_buffer_number_to_merge = FLAGS_min_write_buffer_number_to_merge; options.max_background_compactions = FLAGS_max_background_compactions; + options.hybrid_mode = FLAGS_hybrid_mode; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1986,6 +1990,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_hybrid_mode = + leveldb::Options().hybrid_mode; // Compression test code above refers to FLAGS_block_size FLAGS_block_size = leveldb::Options().block_size; FLAGS_use_os_buffer = leveldb::EnvOptions().use_os_buffer; @@ -2044,6 +2050,8 @@ int main(int argc, char** argv) { FLAGS_min_write_buffer_number_to_merge = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_mode = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/db/db_impl.cc b/db/db_impl.cc index c5c156feb..d5bd3cbd9 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -75,6 +75,7 @@ struct DBImpl::CompactionState { uint64_t number; uint64_t file_size; InternalKey smallest, largest; + SequenceNumber smallest_seqno, largest_seqno; }; std::vector outputs; std::list allocated_file_numbers; @@ -759,7 +760,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { int level = 0; if (s.ok() && meta.file_size > 0) { edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -833,11 +835,13 @@ Status DBImpl::WriteLevel0Table(std::vector &mems, VersionEdit* edit, // insert files directly into higher levels because some other // threads could be concurrently producing compacted files for // that key range. - if (base != nullptr && options_.max_background_compactions <= 1) { + if (base != nullptr && options_.max_background_compactions <= 1 && + !options_.hybrid_mode) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, - meta.smallest, meta.largest); + meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } CompactionStats stats; @@ -1356,7 +1360,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, - f->smallest, f->largest); + f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->edit(), &mutex_); VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", @@ -1468,6 +1473,7 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { out.number = file_number; out.smallest.Clear(); out.largest.Clear(); + out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); // Make the output file @@ -1478,10 +1484,10 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { // Over-estimate slightly so we don't end up just barely crossing // the threshold. compact->outfile->SetPreallocationBlockSize( - 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->level() + 1)); + 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), - compact->compaction->level() + 1)); + compact->compaction->output_level())); } return s; } @@ -1572,8 +1578,9 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) { for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; compact->compaction->edit()->AddFile( - level + 1, - out.number, out.file_size, out.smallest, out.largest); + options_.hybrid_mode? level : level + 1, + out.number, out.file_size, out.smallest, out.largest, + out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->edit(), &mutex_); } @@ -1821,7 +1828,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (bottommost_level && ikey.sequence < earliest_snapshot && + // Hybrid mode depends on the sequence number to determine + // time-order of files that is needed for compactions. + if (!options_.hybrid_mode && + bottommost_level && ikey.sequence < earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems @@ -1841,11 +1851,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { break; } } + SequenceNumber seqno = GetInternalKeySeqno(newkey); if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(newkey); + compact->current_output()->smallest_seqno = seqno; + } else { + compact->current_output()->smallest_seqno = + std::min(compact->current_output()->smallest_seqno, seqno); } compact->current_output()->largest.DecodeFrom(newkey); compact->builder->Add(newkey, value); + compact->current_output()->largest_seqno = + std::max(compact->current_output()->largest_seqno, seqno); // Close output file if it is big enough if (compact->builder->FileSize() >= diff --git a/db/db_test.cc b/db/db_test.cc index 7ce4ac419..8275029b4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3306,7 +3306,7 @@ void BM_LogAndApply(int iters, int num_base_files) { for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vbase.AddFile(2, fnum++, 1 /* file size */, start, limit); + vbase.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); } ASSERT_OK(vset.LogAndApply(&vbase, &mu)); @@ -3317,7 +3317,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2*fnum), 1, kTypeValue); InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion); - vedit.AddFile(2, fnum++, 1 /* file size */, start, limit); + vedit.AddFile(2, fnum++, 1 /* file size */, start, limit, 1, 1); vset.LogAndApply(&vedit, &mu); } uint64_t stop_micros = env->NowMicros(); diff --git a/db/dbformat.h b/db/dbformat.h index 1a0516670..5d596ad1a 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -173,6 +173,15 @@ inline void UpdateInternalKey(char* internal_key, EncodeFixed64(seqtype, newval); } +// Get the sequence number from the internal key +inline uint64_t GetInternalKeySeqno(const Slice& internal_key) { + const size_t n = internal_key.size(); + assert(n >= 8); + uint64_t num = DecodeFixed64(internal_key.data() + n - 8); + return num >> 8; +} + + // A helper class useful for DBImpl::Get() class LookupKey { public: diff --git a/db/repair.cc b/db/repair.cc index d1c0c4525..09406781a 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -88,6 +88,7 @@ class Repairer { private: struct TableInfo { FileMetaData meta; + SequenceNumber min_sequence; SequenceNumber max_sequence; }; @@ -263,6 +264,7 @@ class Repairer { ReadOptions(), storage_options_, t->meta.number, t->meta.file_size); bool empty = true; ParsedInternalKey parsed; + t->min_sequence = 0; t->max_sequence = 0; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); @@ -279,6 +281,9 @@ class Repairer { t->meta.smallest.DecodeFrom(key); } t->meta.largest.DecodeFrom(key); + if (parsed.sequence < t->min_sequence) { + t->min_sequence = parsed.sequence; + } if (parsed.sequence > t->max_sequence) { t->max_sequence = parsed.sequence; } @@ -319,7 +324,8 @@ class Repairer { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; edit_->AddFile(0, t.meta.number, t.meta.file_size, - t.meta.smallest, t.meta.largest); + t.meta.smallest, t.meta.largest, + t.min_sequence, t.max_sequence); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/version_edit.cc b/db/version_edit.cc index ed63c1013..65a63947a 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -20,7 +20,10 @@ enum Tag { kDeletedFile = 6, kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9 + kPrevLogNumber = 9, + + // these are new formats divergent from open source leveldb + kNewFile2 = 100 // store smallest & largest seqno }; void VersionEdit::Clear() { @@ -76,12 +79,14 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; - PutVarint32(dst, kNewFile); + PutVarint32(dst, kNewFile2); PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.number); PutVarint64(dst, f.file_size); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); + PutVarint64(dst, f.smallest_seqno); + PutVarint64(dst, f.largest_seqno); } } @@ -201,6 +206,22 @@ Status VersionEdit::DecodeFrom(const Slice& src) { } break; + case kNewFile2: + if (GetLevel(&input, &level, &msg) && + GetVarint64(&input, &f.number) && + GetVarint64(&input, &f.file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest) && + GetVarint64(&input, &f.smallest_seqno) && + GetVarint64(&input, &f.largest_seqno) ) { + new_files_.push_back(std::make_pair(level, f)); + } else { + if (!msg) { + msg = "new-file2 entry"; + } + } + break; + default: msg = "unknown tag"; break; diff --git a/db/version_edit.h b/db/version_edit.h index 2743e9e0d..7037763b8 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -22,6 +22,8 @@ struct FileMetaData { InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table bool being_compacted; // Is this file undergoing compaction? + SequenceNumber smallest_seqno;// The smallest seqno in this file + SequenceNumber largest_seqno; // The largest seqno in this file FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0), being_compacted(false) { } @@ -67,12 +69,17 @@ class VersionEdit { void AddFile(int level, uint64_t file, uint64_t file_size, const InternalKey& smallest, - const InternalKey& largest) { + const InternalKey& largest, + const SequenceNumber& smallest_seqno, + const SequenceNumber& largest_seqno) { FileMetaData f; f.number = file; f.file_size = file_size; f.smallest = smallest; f.largest = largest; + f.smallest_seqno = smallest_seqno; + f.largest_seqno = largest_seqno; + assert(smallest_seqno <= largest_seqno); new_files_.push_back(std::make_pair(level, f)); } diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index b211eb1a9..26b69199e 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -27,7 +27,9 @@ TEST(VersionEditTest, EncodeDecode) { TestEncodeDecode(edit); edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, InternalKey("foo", kBig + 500 + i, kTypeValue), - InternalKey("zoo", kBig + 600 + i, kTypeDeletion)); + InternalKey("zoo", kBig + 600 + i, kTypeDeletion), + kBig + 500 + i, + kBig + 600 + i); edit.DeleteFile(4, kBig + 700 + i); edit.SetCompactPointer(i, InternalKey("x", kBig + 900 + i, kTypeValue)); } diff --git a/db/version_set.cc b/db/version_set.cc index 9b01d935e..a31cbe01a 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5,6 +5,7 @@ #include "db/version_set.h" #include +#include #include #include "db/filename.h" #include "db/log_reader.h" @@ -309,6 +310,14 @@ static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ static bool NewestFirst(FileMetaData* a, FileMetaData* b) { return a->number > b->number; } +static bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) { + if (a->smallest_seqno > b->smallest_seqno) { + assert(a->largest_seqno > b->largest_seqno); + return true; + } + assert(a->largest_seqno <= b->largest_seqno); + return false; +} Version::Version(VersionSet* vset, uint64_t version_number) : vset_(vset), next_(this), prev_(this), refs_(0), @@ -375,7 +384,11 @@ void Version::Get(const ReadOptions& options, } if (tmp.empty()) continue; - std::sort(tmp.begin(), tmp.end(), NewestFirst); + if (vset_->options_->hybrid_mode) { + std::sort(tmp.begin(), tmp.end(), NewestFirstBySeqNo); + } else { + std::sort(tmp.begin(), tmp.end(), NewestFirst); + } files = &tmp[0]; num_files = tmp.size(); } else { @@ -1011,7 +1024,10 @@ void VersionSet::Init(int num_levels) { int target_file_size_multiplier = options_->target_file_size_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; for (int i = 0; i < num_levels; i++) { - if (i > 1) { + if (i == 0) { + max_file_size_[i] = LLONG_MAX; + level_max_bytes_[i] = options_->max_bytes_for_level_base; + } else if (i > 1) { max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier * options_->max_bytes_for_level_multiplier_additional[i-1]; @@ -1558,17 +1574,32 @@ void VersionSet::Finalize(Version* v, } } -// a static compator used to sort files based on their size -static bool compareSize(const VersionSet::Fsize& first, +// A static compator used to sort files based on their size +// In normal mode: descending size +static bool compareSizeDescending(const VersionSet::Fsize& first, const VersionSet::Fsize& second) { return (first.file->file_size > second.file->file_size); } +// A static compator used to sort files based on their seqno +// In hybrid mode: descending seqno +static bool compareSeqnoDescending(const VersionSet::Fsize& first, + const VersionSet::Fsize& second) { + if (first.file->smallest_seqno > second.file->smallest_seqno) { + assert(first.file->largest_seqno > second.file->largest_seqno); + return true; + } + assert(first.file->largest_seqno <= second.file->largest_seqno); + return false; +} // sort all files in level1 to level(n-1) based on file size void VersionSet::UpdateFilesBySize(Version* v) { // No need to sort the highest level because it is never compacted. - for (int level = 0; level < NumberLevels()-1; level++) { + int max_level = options_->hybrid_mode? NumberLevels() : + NumberLevels() - 1; + + for (int level = 0; level < max_level; level++) { const std::vector& files = v->files_[level]; std::vector& files_by_size = v->files_by_size_[level]; @@ -1582,12 +1613,18 @@ void VersionSet::UpdateFilesBySize(Version* v) { } // sort the top number_of_files_to_sort_ based on file size - int num = Version::number_of_files_to_sort_; - if (num > (int)temp.size()) { - num = temp.size(); + if (options_->hybrid_mode) { + int num = temp.size(); + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSeqnoDescending); + } else { + int num = Version::number_of_files_to_sort_; + if (num > (int)temp.size()) { + num = temp.size(); + } + std::partial_sort(temp.begin(), temp.begin() + num, + temp.end(), compareSizeDescending); } - std::partial_sort(temp.begin(), temp.begin() + num, - temp.end(), compareSize); assert(temp.size() == files.size()); // initialize files_by_size_ @@ -1620,7 +1657,8 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { const std::vector& files = current_->files_[level]; for (size_t i = 0; i < files.size(); i++) { const FileMetaData* f = files[i]; - edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest); + edit.AddFile(level, f->number, f->file_size, f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } } @@ -1664,6 +1702,23 @@ const char* VersionSet::LevelDataSizeSummary( return scratch->buffer; } +const char* VersionSet::LevelFileSummary( + FileSummaryStorage* scratch, int level) const { + int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size["); + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + FileMetaData* f = current_->files_[level][i]; + int sz = sizeof(scratch->buffer) - len; + int ret = snprintf(scratch->buffer + len, sz, "#%ld(seq=%ld,sz=%ld,%d) ", + f->number, f->smallest_seqno, + f->file_size, f->being_compacted); + if (ret < 0 || ret >= sz) + break; + len += ret; + } + snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]"); + return scratch->buffer; +} + // Opens the mainfest file and reads all records // till it finds the record we are looking for. bool VersionSet::ManifestContains(const std::string& record) const { @@ -1961,6 +2016,166 @@ void VersionSet::SizeBeingCompacted(std::vector& sizes) { } } +Compaction* VersionSet::PickCompactionHybrid(int level, double score) { + assert (level == 0); + + // percentage flexibilty while comparing file sizes + uint64_t ratio = 1; + + if ((current_->files_[level].size() <= + (unsigned int)options_->level0_file_num_compaction_trigger)) { + Log(options_->info_log, "XXX Hybrid: nothing to do\n"); + return nullptr; + } + VersionSet::FileSummaryStorage tmp; + Log(options_->info_log, "Hybrid: candidate files(%lu): %s\n", + current_->files_[level].size(), + LevelFileSummary(&tmp, 0)); + + Compaction* c = nullptr; + c = new Compaction(level, level, MaxFileSizeForLevel(level), + LLONG_MAX, NumberLevels()); + c->score_ = score; + + // The files are sorted from newest first to oldest last. + std::vector& file_by_time = current_->files_by_size_[level]; + FileMetaData* f = nullptr; + bool done = false; + assert(file_by_time.size() == current_->files_[level].size()); + + unsigned int max_files_to_compact = UINT_MAX; + + // Make two pass. The first pass considers a candidate file + // only if it is smaller than the total size accumulated so far. + // The second pass does not look at the slope of the + // file-size curve to decide what to pick for compaction. + for (int iter = 0; !done && iter < 2; iter++) { + + for (unsigned int loop = 0; loop < file_by_time.size(); ) { + + // Skip files that are already being compacted + for (f = nullptr; loop < file_by_time.size(); loop++) { + int index = file_by_time[loop]; + f = current_->files_[level][index]; + + if (!f->being_compacted) { + break; + } + Log(options_->info_log, "Hybrid: file %ld[%d] being compacted, skipping", + f->number, loop); + f = nullptr; + } + + // This file is not being compacted. Consider it as the + // first candidate to be compacted. + unsigned int candidate_count = 1; + uint64_t candidate_size = f != nullptr? f->file_size : 0; + if (f != nullptr) { + Log(options_->info_log, "Hybrid: Possible candidate file %ld[%d] %s.", + f->number, loop, iter == 0? "" : "forced "); + } + + // Check if the suceeding files need compaction. + for (unsigned int i = loop+1; + candidate_count < max_files_to_compact && i < file_by_time.size(); + i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + if (f->being_compacted) { + break; + } + // If this is the first iteration, then we pick files if the + // total candidate file size (increased by the specified ratio) + // is still larger than the next candidate file. + if (iter == 0) { + uint64_t sz = (candidate_size * (100 + ratio)) /100; + if (sz < f->file_size) { + break; + } + } + candidate_count++; + candidate_size += f->file_size; + } + + // Found a series of consecutive files that need compaction. + if (candidate_count > 1) { + for (unsigned int i = loop; i < loop + candidate_count; i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + c->inputs_[0].push_back(f); + Log(options_->info_log, "Hybrid: Picking file %ld[%d] with size %ld %s", + f->number, i, f->file_size, + (iter == 0 ? "" : "forced")); + } + done = true; + break; + } else { + for (unsigned int i = loop; + i < loop + candidate_count && i < file_by_time.size(); i++) { + int index = file_by_time[i]; + FileMetaData* f = current_->files_[level][index]; + Log(options_->info_log, "Hybrid: Skipping file %ld[%d] with size %ld %d %s", + f->number, i, f->file_size, f->being_compacted, + (iter == 0 ? "" : "forced")); + } + } + loop += candidate_count; + } + assert(done || c->inputs_[0].size() == 0); + + // If we are unable to find a normal compaction run and we are still + // above the compaction threshold, iterate again to pick compaction + // candidates, this time without considering their size differences. + if (!done) { + int files_not_in_compaction = 0; + for (unsigned int i = 0; i < current_->files_[level].size(); i++) { + f = current_->files_[level][i]; + if (!f->being_compacted) { + files_not_in_compaction++; + } + } + int expected_num_files = files_not_in_compaction + + compactions_in_progress_[level].size(); + if (expected_num_files <= + options_->level0_file_num_compaction_trigger + 1) { + done = true; // nothing more to do + } else { + max_files_to_compact = expected_num_files - + options_->level0_file_num_compaction_trigger; + Log(options_->info_log, "Hybrid: second loop with maxfiles %d", + max_files_to_compact); + } + } + } + if (c->inputs_[0].size() <= 1) { + Log(options_->info_log, "XXX Hybrid: only %ld files, nothing to do.\n", + c->inputs_[0].size()); + delete c; + return nullptr; + } + + // validate that all the chosen files are non overlapping in time + FileMetaData* newerfile __attribute__((unused)) = nullptr; + for (unsigned int i = 0; i < c->inputs_[0].size(); i++) { + FileMetaData* f = c->inputs_[0][i]; + assert (f->smallest_seqno <= f->largest_seqno); + assert(newerfile == nullptr || + newerfile->smallest_seqno > f->largest_seqno); + newerfile = f; + } + + c->input_version_ = current_; + c->input_version_->Ref(); + + // mark all the files that are being compacted + c->MarkFilesBeingCompacted(true); + + // remember this currently undergoing compaction + compactions_in_progress_[level].insert(c); + + return c; +} + Compaction* VersionSet::PickCompactionBySize(int level, double score) { Compaction* c = nullptr; @@ -1974,7 +2189,7 @@ Compaction* VersionSet::PickCompactionBySize(int level, double score) { assert(level >= 0); assert(level+1 < NumberLevels()); - c = new Compaction(level, MaxFileSizeForLevel(level), + c = new Compaction(level, level+1, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->score_ = score; @@ -2044,6 +2259,13 @@ Compaction* VersionSet::PickCompaction() { current_->vset_->SizeBeingCompacted(size_being_compacted); Finalize(current_, size_being_compacted); + // In hybrid mode compact L0 files back into L0. + if (options_->hybrid_mode) { + int level = 0; + c = PickCompactionHybrid(level, current_->compaction_score_[level]); + return c; + } + // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. // @@ -2072,7 +2294,7 @@ Compaction* VersionSet::PickCompaction() { if (level != 0 || compactions_in_progress_[0].empty()) { if(!ParentRangeInCompaction(&f->smallest, &f->largest, level, &parent_index)) { - c = new Compaction(level, MaxFileSizeForLevel(level), + c = new Compaction(level, level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels(), true); c->inputs_[0].push_back(f); c->parent_index_ = parent_index; @@ -2246,8 +2468,9 @@ Compaction* VersionSet::CompactRange( } } } + int out_level = options_->hybrid_mode ? level : level+1; - Compaction* c = new Compaction(level, MaxFileSizeForLevel(level), + Compaction* c = new Compaction(level, out_level, MaxFileSizeForLevel(level), MaxGrandParentOverlapBytes(level), NumberLevels()); c->input_version_ = current_; c->input_version_->Ref(); @@ -2261,10 +2484,11 @@ Compaction* VersionSet::CompactRange( return c; } -Compaction::Compaction(int level, uint64_t target_file_size, +Compaction::Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction) : level_(level), + out_level_(out_level), max_output_file_size_(target_file_size), maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes), input_version_(nullptr), diff --git a/db/version_set.h b/db/version_set.h index ba924126f..85c02b973 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -340,6 +340,9 @@ class VersionSet { struct LevelSummaryStorage { char buffer[100]; }; + struct FileSummaryStorage { + char buffer[1000]; + }; const char* LevelSummary(LevelSummaryStorage* scratch) const; // printf contents (for debugging) @@ -350,6 +353,10 @@ class VersionSet { // of files per level. Uses *scratch as backing store. const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const; + // Return a human-readable short (single-line) summary of files + // in a specified level. Uses *scratch as backing store. + const char* LevelFileSummary(FileSummaryStorage* scratch, int level) const; + // Return the size of the current manifest file const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } @@ -359,6 +366,9 @@ class VersionSet { // function will return nullptr. Compaction* PickCompactionBySize(int level, double score); + // Pick files to compact in hybrid mode + Compaction* PickCompactionHybrid(int level, double score); + // Free up the files that were participated in a compaction void ReleaseCompactionFiles(Compaction* c, Status status); @@ -489,9 +499,12 @@ class Compaction { ~Compaction(); // Return the level that is being compacted. Inputs from "level" - // and "level+1" will be merged to produce a set of "level+1" files. + // will be merged. int level() const { return level_; } + // Outputs will go to this level + int output_level() const { return out_level_; } + // Return the object that holds the edits to the descriptor done // by this compaction. VersionEdit* edit() { return edit_; } @@ -534,11 +547,12 @@ class Compaction { friend class Version; friend class VersionSet; - explicit Compaction(int level, uint64_t target_file_size, + explicit Compaction(int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, int number_levels, bool seek_compaction = false); int level_; + int out_level_; // levels to which output files are stored uint64_t max_output_file_size_; int64_t maxGrandParentOverlapBytes_; Version* input_version_; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index fa69a7eff..465e83a10 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -476,6 +476,9 @@ struct Options { // Default: 0 uint64_t bytes_per_sync; + // Hybrid Mode. There is only a single level and files in L0 are + // compacted back into L0. Default: false + bool hybrid_mode; }; // Options that control read operations diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 02afe306c..4c671d84f 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -88,6 +88,9 @@ static int FLAGS_max_write_buffer_number = 0; // This is initialized to default value of 1 in "main" function. static int FLAGS_max_background_compactions = 0; +// This is initialized to default value of false +static bool FLAGS_hybrid_mode = false; + // Number of bytes to use as a cache of uncompressed data. static long FLAGS_cache_size = 2 * KB * KB * KB; @@ -930,6 +933,7 @@ class StressTest { options.write_buffer_size = FLAGS_write_buffer_size; options.max_write_buffer_number = FLAGS_max_write_buffer_number; options.max_background_compactions = FLAGS_max_background_compactions; + options.hybrid_mode = FLAGS_hybrid_mode; options.block_size = FLAGS_block_size; options.filter_policy = filter_policy_; options.max_open_files = FLAGS_open_files; @@ -1016,6 +1020,8 @@ int main(int argc, char** argv) { FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_max_background_compactions = leveldb::Options().max_background_compactions; + FLAGS_hybrid_mode = + leveldb::Options().hybrid_mode; FLAGS_level0_file_num_compaction_trigger = leveldb::Options().level0_file_num_compaction_trigger; FLAGS_level0_slowdown_writes_trigger = @@ -1068,6 +1074,8 @@ int main(int argc, char** argv) { FLAGS_max_write_buffer_number = n; } else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) { FLAGS_max_background_compactions = n; + } else if (sscanf(argv[i], "--hybrid_mode=%d%c", &n, &junk) == 1) { + FLAGS_hybrid_mode = n; } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { FLAGS_cache_size = l; } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { diff --git a/util/options.cc b/util/options.cc index a8222ad5c..47dae8398 100644 --- a/util/options.cc +++ b/util/options.cc @@ -76,7 +76,8 @@ Options::Options() advise_random_on_open(true), access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), - bytes_per_sync(0) { + bytes_per_sync(0), + hybrid_mode(false) { } static const char* const access_hints[] = { @@ -217,6 +218,8 @@ Options::Dump(Logger* log) const use_adaptive_mutex); Log(log," Options.bytes_per_sync: %ld", bytes_per_sync); + Log(log," Options.hybrid_mode: %d", + hybrid_mode); } // Options::Dump //