From 1f802773bc11c99e24fcc43bce6ac599e95be6af Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 3 Aug 2018 17:34:07 -0700 Subject: [PATCH] Update JobContext. (#3949) Summary: In the past, we assume that a job modifies a single column family. Therefore, a job can create at most one superversion since each superversion corresponds to one column family. This assumption leads to the fact that a `JobContext` has only one member variable called `superversion_context`. Now we want to support group flush of column families, indicating that each job can create multiple superversions. Therefore, we need to make the following change to accommodate this new feature. Add a vector of `SuperVersionContext` to `JobContext` to support installing superversions for multiple column families in one job context. This PR is a subset of [PR 3752](https://github.com/facebook/rocksdb/pull/3752). Pull Request resolved: https://github.com/facebook/rocksdb/pull/3949 Differential Revision: D8864895 Pulled By: riversand963 fbshipit-source-id: 5937a48817276370d3c8172db9c8aafc826d97ca --- db/db_impl.cc | 12 ++++++------ db/db_impl_compaction_flush.cc | 12 ++++++------ db/db_impl_experimental.cc | 6 +++--- db/job_context.h | 29 ++++++++++++++++++++++++----- 4 files changed, 39 insertions(+), 20 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 471853e5c..a9fde0b23 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2236,9 +2236,9 @@ Status DBImpl::DeleteFile(std::string name) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionAndScheduleWork(cfd, &job_context.superversion_context, - *cfd->GetLatestMutableCFOptions(), - FlushReason::kDeleteFiles); + InstallSuperVersionAndScheduleWork( + cfd, &job_context.superversion_contexts[0], + *cfd->GetLatestMutableCFOptions(), FlushReason::kDeleteFiles); } FindObsoleteFiles(&job_context, false); } // lock released here @@ -2321,9 +2321,9 @@ Status DBImpl::DeleteFilesInRanges(ColumnFamilyHandle* column_family, status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionAndScheduleWork(cfd, &job_context.superversion_context, - *cfd->GetLatestMutableCFOptions(), - FlushReason::kDeleteFiles); + InstallSuperVersionAndScheduleWork( + cfd, &job_context.superversion_contexts[0], + *cfd->GetLatestMutableCFOptions(), FlushReason::kDeleteFiles); } for (auto* deleted_file : deleted_files) { deleted_file->being_compacted = false; diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 18727e812..0d4afcbda 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -160,8 +160,8 @@ Status DBImpl::FlushMemTableToOutputFile( } if (s.ok()) { - InstallSuperVersionAndScheduleWork(cfd, &job_context->superversion_context, - mutable_cf_options); + InstallSuperVersionAndScheduleWork( + cfd, &job_context->superversion_contexts[0], mutable_cf_options); if (made_progress) { *made_progress = 1; } @@ -691,7 +691,7 @@ Status DBImpl::CompactFilesImpl( Status status = compaction_job.Install(*c->mutable_cf_options()); if (status.ok()) { InstallSuperVersionAndScheduleWork( - c->column_family_data(), &job_context->superversion_context, + c->column_family_data(), &job_context->superversion_contexts[0], *c->mutable_cf_options(), FlushReason::kManualCompaction); } c->ReleaseCompactionFiles(s); @@ -1775,7 +1775,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); InstallSuperVersionAndScheduleWork( - c->column_family_data(), &job_context->superversion_context, + c->column_family_data(), &job_context->superversion_contexts[0], *c->mutable_cf_options(), FlushReason::kAutoCompaction); ROCKS_LOG_BUFFER(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), @@ -1822,7 +1822,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, &mutex_, directories_.GetDbDir()); // Use latest MutableCFOptions InstallSuperVersionAndScheduleWork( - c->column_family_data(), &job_context->superversion_context, + c->column_family_data(), &job_context->superversion_contexts[0], *c->mutable_cf_options(), FlushReason::kAutoCompaction); VersionStorageInfo::LevelSummaryStorage tmp; @@ -1899,7 +1899,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, status = compaction_job.Install(*c->mutable_cf_options()); if (status.ok()) { InstallSuperVersionAndScheduleWork( - c->column_family_data(), &job_context->superversion_context, + c->column_family_data(), &job_context->superversion_contexts[0], *c->mutable_cf_options(), FlushReason::kAutoCompaction); } *made_progress = true; diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index def5755a3..47a880199 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -138,9 +138,9 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionAndScheduleWork( - cfd, &job_context.superversion_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWork(cfd, + &job_context.superversion_contexts[0], + *cfd->GetLatestMutableCFOptions()); } } // lock released here LogFlush(immutable_db_options_.info_log); diff --git a/db/job_context.h b/db/job_context.h index aa9a805ba..af640dab2 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -35,6 +35,14 @@ struct SuperVersionContext { explicit SuperVersionContext(bool create_superversion = false) : new_superversion(create_superversion ? new SuperVersion() : nullptr) {} + explicit SuperVersionContext(SuperVersionContext&& other) + : superversions_to_free(std::move(other.superversions_to_free)), +#ifndef ROCKSDB_DISABLE_STALL_NOTIFICATION + write_stall_notifications(std::move(other.write_stall_notifications)), +#endif + new_superversion(std::move(other.new_superversion)) { + } + void NewSuperVersion() { new_superversion = unique_ptr(new SuperVersion()); } @@ -98,8 +106,15 @@ struct JobContext { } inline bool HaveSomethingToClean() const { + bool sv_have_sth = false; + for (const auto& sv_ctx : superversion_contexts) { + if (sv_ctx.HaveSomethingToDelete()) { + sv_have_sth = true; + break; + } + } return memtables_to_free.size() > 0 || logs_to_free.size() > 0 || - superversion_context.HaveSomethingToDelete(); + sv_have_sth; } // Structure to store information for candidate files to delete. @@ -142,7 +157,8 @@ struct JobContext { // a list of memtables to be free autovector memtables_to_free; - SuperVersionContext superversion_context; + // contexts for installing superversions for multiple column families + std::vector superversion_contexts; autovector logs_to_free; @@ -158,13 +174,14 @@ struct JobContext { size_t num_alive_log_files = 0; uint64_t size_log_to_delete = 0; - explicit JobContext(int _job_id, bool create_superversion = false) - : superversion_context(create_superversion) { + explicit JobContext(int _job_id, bool create_superversion = false) { job_id = _job_id; manifest_file_number = 0; pending_manifest_file_number = 0; log_number = 0; prev_log_number = 0; + superversion_contexts.emplace_back( + SuperVersionContext(create_superversion)); } // For non-empty JobContext Clean() has to be called at least once before @@ -173,7 +190,9 @@ struct JobContext { // doing potentially slow Clean() with locked DB mutex. void Clean() { // free superversions - superversion_context.Clean(); + for (auto& sv_context : superversion_contexts) { + sv_context.Clean(); + } // free pending memtables for (auto m : memtables_to_free) { delete m;