diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 685d34b83..03b3f1a9a 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -306,17 +306,28 @@ Status MemTableList::InstallMemtableFlushResults( // scan all memtables from the earliest, and commit those // (in that order) that have finished flushing. Memetables // are always committed in the order that they were created. - while (!current_->memlist_.empty() && s.ok()) { - MemTable* m = current_->memlist_.back(); // get the last element + uint64_t batch_file_number = 0; + size_t batch_count = 0; + autovector edit_list; + auto& memlist = current_->memlist_; + // enumerate from the last (earliest) element to see how many batch finished + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; if (!m->flush_completed_) { break; } + if (it == memlist.rbegin() || batch_file_number != m->file_number_) { + batch_file_number = m->file_number_; + LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started", + cfd->GetName().c_str(), m->file_number_); + edit_list.push_back(&m->edit_); + } + batch_count++; + } - LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 " started", - cfd->GetName().c_str(), m->file_number_); - + if (batch_count > 0) { // this can release and reacquire the mutex. - s = vset->LogAndApply(cfd, mutable_cf_options, &m->edit_, mu, db_directory); + s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory); // we will be changing the version in the next code path, // so we better create a new one, since versions are immutable @@ -325,14 +336,19 @@ Status MemTableList::InstallMemtableFlushResults( // All the later memtables that have the same filenum // are part of the same batch. They can be committed now. uint64_t mem_id = 1; // how many memtables have been flushed. - do { - if (s.ok()) { // commit new state + if (s.ok()) { // commit new state + while (batch_count-- > 0) { + MemTable* m = current_->memlist_.back(); LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64 " done", cfd->GetName().c_str(), m->file_number_, mem_id); assert(m->file_number_ > 0); current_->Remove(m, to_delete); - } else { + ++mem_id; + } + } else { + for (auto it = current_->memlist_.rbegin(); batch_count-- > 0; it++) { + MemTable* m = *it; // commit failed. setup state so that we can flush again. LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64 ": memtable #%" PRIu64 " failed", @@ -343,10 +359,9 @@ Status MemTableList::InstallMemtableFlushResults( num_flush_not_started_++; m->file_number_ = 0; imm_flush_needed.store(true, std::memory_order_release); + ++mem_id; } - ++mem_id; - } while (!current_->memlist_.empty() && (nullptr != (m = current_->memlist_.back())) && - (m->file_number_ == file_number)); + } } commit_in_progress_ = false; return s; diff --git a/db/version_set.cc b/db/version_set.cc index 6863f0df8..2bd87386b 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2077,11 +2077,11 @@ struct VersionSet::ManifestWriter { bool done; InstrumentedCondVar cv; ColumnFamilyData* cfd; - VersionEdit* edit; + const autovector& edit_list; explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, - VersionEdit* e) - : done(false), cv(mu), cfd(_cfd), edit(e) {} + const autovector& e) + : done(false), cv(mu), cfd(_cfd), edit_list(e) {} }; VersionSet::VersionSet(const std::string& dbname, const DBOptions* db_options, @@ -2150,20 +2150,32 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, const MutableCFOptions& mutable_cf_options, - VersionEdit* edit, InstrumentedMutex* mu, - Directory* db_directory, bool new_descriptor_log, + const autovector& edit_list, + InstrumentedMutex* mu, Directory* db_directory, + bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { mu->AssertHeld(); + // num of edits + auto num_edits = edit_list.size(); + if (num_edits == 0) { + return Status::OK(); + } else if (num_edits > 1) { + // no group commits for column family add or drop + for (auto& edit : edit_list) { + assert(!edit->IsColumnFamilyManipulation()); + } + } // column_family_data can be nullptr only if this is column_family_add. // in that case, we also need to specify ColumnFamilyOptions if (column_family_data == nullptr) { - assert(edit->is_column_family_add_); + assert(num_edits == 1); + assert(edit_list[0]->is_column_family_add_); assert(new_cf_options != nullptr); } // queue our request - ManifestWriter w(mu, column_family_data, edit); + ManifestWriter w(mu, column_family_data, edit_list); manifest_writers_.push_back(&w); while (!w.done && &w != manifest_writers_.front()) { w.cv.Wait(); @@ -2183,7 +2195,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, return Status::ShutdownInProgress(); } - std::vector batch_edits; + autovector batch_edits; Version* v = nullptr; std::unique_ptr builder_guard(nullptr); @@ -2191,24 +2203,26 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, ManifestWriter* last_writer = &w; assert(!manifest_writers_.empty()); assert(manifest_writers_.front() == &w); - if (edit->IsColumnFamilyManipulation()) { + if (w.edit_list.front()->IsColumnFamilyManipulation()) { // no group commits for column family add or drop - LogAndApplyCFHelper(edit); - batch_edits.push_back(edit); + LogAndApplyCFHelper(w.edit_list.front()); + batch_edits.push_back(w.edit_list.front()); } else { v = new Version(column_family_data, this, current_version_number_++); builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data)); auto* builder = builder_guard->version_builder(); for (const auto& writer : manifest_writers_) { - if (writer->edit->IsColumnFamilyManipulation() || + if (writer->edit_list.front()->IsColumnFamilyManipulation() || writer->cfd->GetID() != column_family_data->GetID()) { // no group commits for column family add or drop // also, group commits across column families are not supported break; } last_writer = writer; - LogAndApplyHelper(column_family_data, builder, v, last_writer->edit, mu); - batch_edits.push_back(last_writer->edit); + for (const auto& edit : writer->edit_list) { + LogAndApplyHelper(column_family_data, builder, v, edit, mu); + batch_edits.push_back(edit); + } } builder->SaveTo(v->storage_info()); } @@ -2231,7 +2245,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (new_descriptor_log) { // if we're writing out new snapshot make sure to persist max column family if (column_family_set_->GetMaxColumnFamily() > 0) { - edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily()); + w.edit_list.front()->SetMaxColumnFamily( + column_family_set_->GetMaxColumnFamily()); } } @@ -2242,7 +2257,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, mu->Unlock(); TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); - if (!edit->IsColumnFamilyManipulation() && + if (!w.edit_list.front()->IsColumnFamilyManipulation() && db_options_->max_open_files == -1) { // unlimited table cache. Pre-load table handle now. // Need to do it out of the mutex. @@ -2268,12 +2283,13 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, unique_ptr file_writer( new WritableFileWriter(std::move(descriptor_file), opt_env_opts)); - descriptor_log_.reset(new log::Writer(std::move(file_writer), 0, false)); + descriptor_log_.reset( + new log::Writer(std::move(file_writer), 0, false)); s = WriteSnapshot(descriptor_log_.get()); } } - if (!edit->IsColumnFamilyManipulation()) { + if (!w.edit_list.front()->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. v->PrepareApply(mutable_cf_options, true); } @@ -2315,7 +2331,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, new_manifest_file_size = descriptor_log_->file()->GetFileSize(); } - if (edit->is_column_family_drop_) { + if (w.edit_list.front()->is_column_family_drop_) { TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2"); @@ -2335,12 +2351,12 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, // Install the new version if (s.ok()) { - if (edit->is_column_family_add_) { + if (w.edit_list.front()->is_column_family_add_) { // no group commit on column family add assert(batch_edits.size() == 1); assert(new_cf_options != nullptr); - CreateColumnFamily(*new_cf_options, edit); - } else if (edit->is_column_family_drop_) { + CreateColumnFamily(*new_cf_options, w.edit_list.front()); + } else if (w.edit_list.front()->is_column_family_drop_) { assert(batch_edits.size() == 1); column_family_data->SetDropped(); if (column_family_data->Unref()) { @@ -2363,7 +2379,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, manifest_file_number_ = pending_manifest_file_number_; manifest_file_size_ = new_manifest_file_size; - prev_log_number_ = edit->prev_log_number_; + prev_log_number_ = w.edit_list.front()->prev_log_number_; } else { std::string version_edits; for (auto& e : batch_edits) { diff --git a/db/version_set.h b/db/version_set.h index e379d9cce..83270b35b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -589,6 +589,19 @@ class VersionSet { const MutableCFOptions& mutable_cf_options, VersionEdit* edit, InstrumentedMutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, + const ColumnFamilyOptions* column_family_options = nullptr) { + autovector edit_list; + edit_list.push_back(edit); + return LogAndApply(column_family_data, mutable_cf_options, edit_list, mu, + db_directory, new_descriptor_log, column_family_options); + } + // The batch version. If edit_list.size() > 1, caller must ensure that + // no edit in the list column family add or drop + Status LogAndApply( + ColumnFamilyData* column_family_data, + const MutableCFOptions& mutable_cf_options, + const autovector& edit_list, InstrumentedMutex* mu, + Directory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr); // Recover the last saved descriptor from persistent storage.