From 25d600569db4fa001c0ad7dfe76901311b0b0ccc Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 17 Jun 2015 12:37:59 -0700 Subject: [PATCH] Clean up InstallSuperVersion Summary: We go to great lengths to make sure MaybeScheduleFlushOrCompaction() is called outside of write thread. But anyway, it's still called in the mutex, so it's not that much cheaper. This diff removes the "optimization" and cleans up the code a bit. Test Plan: make check Reviewers: rven, sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D40113 --- db/db_impl.cc | 93 ++++++++++++++++---------------------- db/db_impl.h | 16 ++----- db/db_impl_experimental.cc | 4 +- 3 files changed, 45 insertions(+), 68 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index aca964f84..b767a1ff3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -95,7 +95,6 @@ void DumpRocksDBBuildVersion(Logger * log); struct DBImpl::WriteContext { autovector superversions_to_free_; autovector memtables_to_free_; - bool schedule_bg_work_ = false; ~WriteContext() { for (auto& sv : superversions_to_free_) { @@ -1249,7 +1248,8 @@ Status DBImpl::FlushMemTableToOutputFile( Status s = flush_job.Run(&file_meta); if (s.ok()) { - InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); + InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, + mutable_cf_options); if (madeProgress) { *madeProgress = 1; } @@ -1578,8 +1578,8 @@ Status DBImpl::CompactFilesImpl( compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); } c->ReleaseCompactionFiles(s); c.reset(); @@ -1791,7 +1791,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, directories_.GetDbDir()); - superversion_to_free = InstallSuperVersion( + superversion_to_free = InstallSuperVersionAndScheduleWork( cfd, new_superversion, mutable_cf_options); new_superversion = nullptr; @@ -1945,9 +1945,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, s = write_thread_.EnterWriteThread(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job - // SetNewMemtableAndNewLogFile() will release and reacquire mutex + // SwitchMemtable() will release and reacquire mutex // during execution - s = SetNewMemtableAndNewLogFile(cfd, &context); + s = SwitchMemtable(cfd, &context); write_thread_.ExitWriteThread(&w, &w, s); cfd->imm()->FlushRequested(); @@ -2410,10 +2410,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->inputs(0)->size()); // There are three things that can change compaction score: // 1) When flush or compaction finish. This case is covered by - // InstallSuperVersion() + // InstallSuperVersionAndScheduleWork // 2) When MutableCFOptions changes. This case is also covered by - // InstallSuperVersion(), because this is when the new options take - // effect. + // InstallSuperVersionAndScheduleWork, because this is when the new + // options take effect. // 3) When we Pick a new compaction, we "remove" those files being // compacted from the calculation, which then influences compaction // score. Here we check if we need the new compaction even without the @@ -2449,8 +2449,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); @@ -2486,8 +2486,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); // Use latest MutableCFOptions - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); VersionStorageInfo::LevelSummaryStorage tmp; c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1, @@ -2532,8 +2532,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); } *madeProgress = true; } @@ -2695,26 +2695,25 @@ Status DBImpl::Get(const ReadOptions& read_options, // * malloc one SuperVersion() outside of the lock -- new_superversion // * delete SuperVersion()s outside of the lock -- superversions_to_free // -// However, if InstallSuperVersion() gets called twice with the same -// job_context, we can't reuse the SuperVersion() that got -// malloced -// because +// However, if InstallSuperVersionAndScheduleWork() gets called twice with the +// same job_context, we can't reuse the SuperVersion() that got +// malloced because // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersionBackground( +void DBImpl::InstallSuperVersionAndScheduleWorkWrapper( ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); - SuperVersion* old_superversion = InstallSuperVersion( + SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork( cfd, job_context->new_superversion, mutable_cf_options); job_context->new_superversion = nullptr; job_context->superversions_to_free.push_back(old_superversion); } -SuperVersion* DBImpl::InstallSuperVersion( +SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options, bool dont_schedule_bg_work) { + const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -2729,14 +2728,10 @@ SuperVersion* DBImpl::InstallSuperVersion( new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); // Whenever we install new SuperVersion, we might need to issue new flushes or - // compactions. dont_schedule_bg_work is true when scheduling from write - // thread and we don't want to add additional overhead. Callers promise to - // call SchedulePendingFlush() and MaybeScheduleFlushOrCompaction() eventually - if (!dont_schedule_bg_work) { - SchedulePendingFlush(cfd); - SchedulePendingCompaction(cfd); - MaybeScheduleFlushOrCompaction(); - } + // compactions. + SchedulePendingFlush(cfd); + SchedulePendingCompaction(cfd); + MaybeScheduleFlushOrCompaction(); // Update max_total_in_memory_state_ max_total_in_memory_state_ = @@ -2947,7 +2942,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - delete InstallSuperVersion( + delete InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); if (!cfd->mem()->IsSnapshotSupported()) { @@ -3371,15 +3366,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { - status = SetNewMemtableAndNewLogFile(cfd, &context); + status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); - context.schedule_bg_work_ = true; } } + MaybeScheduleFlushOrCompaction(); } else if (UNLIKELY(write_buffer_.ShouldFlush())) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families. Write buffer is using %" PRIu64 @@ -3392,13 +3387,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } if (!cfd->mem()->IsEmpty()) { - status = SetNewMemtableAndNewLogFile(cfd, &context); + status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); - context.schedule_bg_work_ = true; } } MaybeScheduleFlushOrCompaction(); @@ -3414,11 +3408,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (UNLIKELY(status.ok()) && (write_controller_.IsStopped() || write_controller_.NeedsDelay())) { - // If writer is stopped, we need to get it going, - // so schedule flushes/compactions - if (context.schedule_bg_work_) { - MaybeScheduleFlushOrCompaction(); - } PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size @@ -3560,9 +3549,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.AssertHeld(); write_thread_.ExitWriteThread(&w, last_writer, status); - if (context.schedule_bg_work_) { - MaybeScheduleFlushOrCompaction(); - } mutex_.Unlock(); if (status.IsTimedOut()) { @@ -3633,9 +3619,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) { Status DBImpl::ScheduleFlushes(WriteContext* context) { ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { - auto status = SetNewMemtableAndNewLogFile(cfd, context); - SchedulePendingFlush(cfd); - context->schedule_bg_work_ = true; + auto status = SwitchMemtable(cfd, context); if (cfd->Unref()) { delete cfd; } @@ -3648,8 +3632,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, - WriteContext* context) { +Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); unique_ptr lfile; log::Writer* new_log = nullptr; @@ -3719,8 +3702,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); cfd->SetMemtable(new_mem); - context->superversions_to_free_.push_back( - InstallSuperVersion(cfd, new_superversion, mutable_cf_options, true)); + context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork( + cfd, new_superversion, mutable_cf_options)); return s; } @@ -4010,8 +3993,8 @@ Status DBImpl::DeleteFile(std::string name) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionBackground(cfd, &job_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); } FindObsoleteFiles(&job_context, false); } // lock released here @@ -4253,7 +4236,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete impl->InstallSuperVersion( + delete impl->InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); } impl->alive_log_files_.push_back( diff --git a/db/db_impl.h b/db/db_impl.h index 89afda987..0d7e1b472 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -440,8 +440,7 @@ class DBImpl : public DB { Status ScheduleFlushes(WriteContext* context); - Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, - WriteContext* context); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); @@ -719,21 +718,16 @@ class DBImpl : public DB { // the InstallSuperVersion() function. Background threads carry // job_context which can have new_superversion already // allocated. - void InstallSuperVersionBackground( + void InstallSuperVersionAndScheduleWorkWrapper( ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options); // All ColumnFamily state changes go through this function. Here we analyze // the new state and we schedule background work if we detect that the new // state needs flush or compaction. - // If dont_schedule_bg_work == true, then caller asks us to not schedule flush - // or compaction here, but it also promises to schedule needed background - // work. We use this to scheduling background compactions when we are in the - // write thread, which is very performance critical. Caller schedules - // background work as soon as it exits the write thread - SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options, - bool dont_schedule_bg_work = false); + SuperVersion* InstallSuperVersionAndScheduleWork( + ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options); #ifndef ROCKSDB_LITE using DB::GetPropertiesOfAllTables; diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index 65529307b..6bf0ba6a1 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -137,8 +137,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionBackground(cfd, &job_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); } } // lock released here LogFlush(db_options_.info_log);