From 5aaef91d4ab3ba2ca6f2566647049c2a2b55281e Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Tue, 5 Jul 2016 18:09:59 -0700 Subject: [PATCH] group multiple batch of flush into one manifest file (one call to LogAndApply) Summary: Currently, if several flush outputs are committed together, we issue each manifest write per batch (1 batch = 1 flush = 1 sst file = 1+ continuous memtables). Each manifest write requires one fsync and one fsync to parent directory. In some cases, it becomes the bottleneck of write. We should batch them and write in one manifest write when possible. Test Plan: ` ./db_bench -benchmarks="fillseq" -max_write_buffer_number=16 -max_background_flushes=16 -disable_auto_compactions=true -min_write_buffer_number_to_merge=1 -write_buffer_size=65536 -level0_stop_writes_trigger=10000 -level0_slowdown_writes_trigger=10000` **Before** ``` Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 4.9 Date: Fri Jul 1 15:38:17 2016 CPU: 32 * Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz CPUCache: 20480 KB Keys: 16 bytes each Values: 100 bytes each (50 bytes after compression) Entries: 1000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 110.6 MB (estimated) FileSize: 62.9 MB (estimated) Write rate: 0 bytes/second Compression: Snappy Memtablerep: skip_list Perf Level: 1 WARNING: Assertions are enabled; benchmarks unnecessarily slow ------------------------------------------------ Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags DB path: [/tmp/rocksdbtest-112628/dbbench] fillseq : 166.277 micros/op 6014 ops/sec; 0.7 MB/s ``` **After** ``` Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags RocksDB: version 4.9 Date: Fri Jul 1 15:35:05 2016 CPU: 32 * Intel(R) Xeon(R) CPU E5-2660 0 @ 2.20GHz CPUCache: 20480 KB Keys: 16 bytes each Values: 100 bytes each (50 bytes after compression) Entries: 1000000 Prefix: 0 bytes Keys per prefix: 0 RawSize: 110.6 MB (estimated) FileSize: 62.9 MB (estimated) Write rate: 0 bytes/second Compression: Snappy Memtablerep: skip_list Perf Level: 1 WARNING: Assertions are enabled; benchmarks unnecessarily slow ------------------------------------------------ Initializing RocksDB Options from the specified file Initializing RocksDB Options from command-line flags DB path: [/tmp/rocksdbtest-112628/dbbench] fillseq : 52.328 micros/op 19110 ops/sec; 2.1 MB/s ``` Reviewers: andrewkr, IslamAbdelRahman, yhchiang, sdong Reviewed By: sdong Subscribers: igor, andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D60075 --- db/memtable_list.cc | 39 +++++++++++++++++++--------- db/version_set.cc | 62 ++++++++++++++++++++++++++++----------------- db/version_set.h | 13 ++++++++++ 3 files changed, 79 insertions(+), 35 deletions(-) diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 685d34b83..03b3f1a9a 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -306,17 +306,28 @@ Status MemTableList::InstallMemtableFlushResults( // scan all memtables from the earliest, and commit those // (in that order) that have finished flushing. Memetables // are always committed in the order that they were created. - while (!current_->memlist_.empty() && s.ok()) { - MemTable* m = current_->memlist_.back(); // get the last element + uint64_t batch_file_number = 0; + size_t batch_count = 0; + autovector edit_list; + auto& memlist = current_->memlist_; + // enumerate from the last (earliest) element to see how many batch finished + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; if (!m->flush_completed_) { break; } + if (it == memlist.rbegin() || batch_file_number != m->file_number_) { + batch_file_number = m->file_number_; + LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started", + cfd->GetName().c_str(), m->file_number_); + edit_list.push_back(&m->edit_); + } + batch_count++; + } - LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started", - cfd->GetName().c_str(), m->file_number_); - + if (batch_count > 0) { // this can release and reacquire the mutex. - s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory); + s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory); // we will be changing the version in the next code path, // so we better create a new one, since versions are immutable @@ -325,14 +336,19 @@ Status MemTableList::InstallMemtableFlushResults( // All the later memtables that have the same filenum // are part of the same batch. They can be committed now. uint64_t mem_id = 1; // how many memtables have been flushed. - do { - if (s.ok()) { // commit new state + if (s.ok()) { // commit new state + while (batch_count-- > 0) { + MemTable* m = current_->memlist_.back(); LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64 " done", cfd->GetName().c_str(), m->file_number_, mem_id); assert(m->file_number_ > 0); current_->Remove(m, to_delete); - } else { + ++mem_id; + } + } else { + for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) { + MemTable* m = *it; // commit failed. setup state so that we can flush again. LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64 " failed", @@ -343,10 +359,9 @@ Status MemTableList::InstallMemtableFlushResults( num_flush_not_started_++; m->file_number_ = 0; imm_flush_needed.store(true, std::memory_order_release); + ++mem_id; } - ++mem_id; - } while (!current_->memlist_.empty() && (nullptr != (m = current_->memlist_.back())) && - (m->file_number_ == file_number)); + } } commit_in_progress_ = false; return s; diff --git a/db/version_set.cc b/db/version_set.cc index 6863f0df8..2bd87386b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2077,11 +2077,11 @@ struct VersionSet::ManifestWriter { bool done; InstrumentedCondVar cv; ColumnFamilyData* cfd; - VersionEdit* edit; + const autovector& edit_list; explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, - VersionEdit* e) - : done(false), cv(mu), cfd(_cfd), edit(e) {} + const autovector& e) + : done(false), cv(mu), cfd(_cfd), edit_list(e) {} }; VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, @@ -2150,20 +2150,32 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, const MutableCFOptions& mutable_cf_options, - VersionEdit* edit, InstrumentedMutex* mu, - Directory* db_directory, bool new_descriptor_log, + const autovector& edit_list, + InstrumentedMutex* mu, Directory* db_directory, + bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { mu->AssertHeld(); + // num of edits + auto num_edits = edit_list.size(); + if (num_edits == 0) { + return Status::OK(); + } else if (num_edits > 1) { + // no group commits for column family add or drop + for (auto& edit : edit_list) { + assert(!edit->IsColumnFamilyManipulation()); + } + } // column_family_data can be nullptr only if this is column_family_add. // in that case, we also need to specify ColumnFamilyOptions if (column_family_data == nullptr) { - assert(edit->is_column_family_add_); + assert(num_edits == 1); + assert(edit_list[0]->is_column_family_add_); assert(new_cf_options != nullptr); } // queue our request - ManifestWriter w(mu, column_family_data, edit); + ManifestWriter w(mu, column_family_data, edit_list); manifest_writers_.push_back(&w); while (!w.done && &w != manifest_writers_.front()) { w.cv.Wait(); @@ -2183,7 +2195,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, return Status::ShutdownInProgress(); } - std::vector batch_edits; + autovector batch_edits; Version* v = nullptr; std::unique_ptr builder_guard(nullptr); @@ -2191,24 +2203,26 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, ManifestWriter* last_writer = &w; assert(!manifest_writers_.empty()); assert(manifest_writers_.front() == &w); - if (edit->IsColumnFamilyManipulation()) { + if (w.edit_list.front()->IsColumnFamilyManipulation()) { // no group commits for column family add or drop - LogAndApplyCFHelper(edit); - batch_edits.push_back(edit); + LogAndApplyCFHelper(w.edit_list.front()); + batch_edits.push_back(w.edit_list.front()); } else { v = new Version(column_family_data, this, current_version_number_++); builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data)); auto* builder = builder_guard->version_builder(); for (const auto& writer : manifest_writers_) { - if (writer->edit->IsColumnFamilyManipulation() || + if (writer->edit_list.front()->IsColumnFamilyManipulation() || writer->cfd->GetID() != column_family_data->GetID()) { // no group commits for column family add or drop // also, group commits across column families are not supported break; } last_writer = writer; - LogAndApplyHelper(column_family_data, builder, v, last_writer->edit, mu); - batch_edits.push_back(last_writer->edit); + for (const auto& edit : writer->edit_list) { + LogAndApplyHelper(column_family_data, builder, v, edit, mu); + batch_edits.push_back(edit); + } } builder->SaveTo(v->storage_info()); } @@ -2231,7 +2245,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (new_descriptor_log) { // if we're writing out new snapshot make sure to persist max column family if (column_family_set_->GetMaxColumnFamily() > 0) { - edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + w.edit_list.front()->SetMaxColumnFamily( + column_family_set_->GetMaxColumnFamily()); } } @@ -2242,7 +2257,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, mu->Unlock(); TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); - if (!edit->IsColumnFamilyManipulation() && + if (!w.edit_list.front()->IsColumnFamilyManipulation() && db_options_->max_open_files == -1) { // unlimited table cache. Pre-load table handle now. // Need to do it out of the mutex. @@ -2268,12 +2283,13 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, unique_ptr file_writer( new WritableFileWriter(std::move(descriptor_file), opt_env_opts)); - descriptor_log_.reset(new log::Writer(std::move(file_writer), 0, false)); + descriptor_log_.reset( + new log::Writer(std::move(file_writer), 0, false)); s = WriteSnapshot(descriptor_log_.get()); } } - if (!edit->IsColumnFamilyManipulation()) { + if (!w.edit_list.front()->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. v->PrepareApply(mutable_cf_options, true); } @@ -2315,7 +2331,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, new_manifest_file_size = descriptor_log_->file()->GetFileSize(); } - if (edit->is_column_family_drop_) { + if (w.edit_list.front()->is_column_family_drop_) { TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2"); @@ -2335,12 +2351,12 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // Install the new version if (s.ok()) { - if (edit->is_column_family_add_) { + if (w.edit_list.front()->is_column_family_add_) { // no group commit on column family add assert(batch_edits.size() == 1); assert(new_cf_options != nullptr); - CreateColumnFamily(*new_cf_options, edit); - } else if (edit->is_column_family_drop_) { + CreateColumnFamily(*new_cf_options, w.edit_list.front()); + } else if (w.edit_list.front()->is_column_family_drop_) { assert(batch_edits.size() == 1); column_family_data->SetDropped(); if (column_family_data->Unref()) { @@ -2363,7 +2379,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, manifest_file_number_ = pending_manifest_file_number_; manifest_file_size_ = new_manifest_file_size; - prev_log_number_ = edit->prev_log_number_; + prev_log_number_ = w.edit_list.front()->prev_log_number_; } else { std::string version_edits; for (auto& e : batch_edits) { diff --git a/db/version_set.h b/db/version_set.h index e379d9cce..83270b35b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -589,6 +589,19 @@ class VersionSet { const MutableCFOptions& mutable_cf_options, VersionEdit* edit, InstrumentedMutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, + const ColumnFamilyOptions* column_family_options = nullptr) { + autovector edit_list; + edit_list.push_back(edit); + return LogAndApply(column_family_data, mutable_cf_options, edit_list, mu, + db_directory, new_descriptor_log, column_family_options); + } + // The batch version. If edit_list.size() > 1, caller must ensure that + // no edit in the list column family add or drop + Status LogAndApply( + ColumnFamilyData* column_family_data, + const MutableCFOptions& mutable_cf_options, + const autovector& edit_list, InstrumentedMutex* mu, + Directory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr); // Recover the last saved descriptor from persistent storage.