diff --git a/db/db_impl.cc b/db/db_impl.cc index aca964f84..b767a1ff3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -95,7 +95,6 @@ void DumpRocksDBBuildVersion(Logger * log); struct DBImpl::WriteContext { autovector superversions_to_free_; autovector memtables_to_free_; - bool schedule_bg_work_ = false; ~WriteContext() { for (auto& sv : superversions_to_free_) { @@ -1249,7 +1248,8 @@ Status DBImpl::FlushMemTableToOutputFile( Status s = flush_job.Run(&file_meta); if (s.ok()) { - InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); + InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context, + mutable_cf_options); if (madeProgress) { *madeProgress = 1; } @@ -1578,8 +1578,8 @@ Status DBImpl::CompactFilesImpl( compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); } c->ReleaseCompactionFiles(s); c.reset(); @@ -1791,7 +1791,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, directories_.GetDbDir()); - superversion_to_free = InstallSuperVersion( + superversion_to_free = InstallSuperVersionAndScheduleWork( cfd, new_superversion, mutable_cf_options); new_superversion = nullptr; @@ -1945,9 +1945,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, s = write_thread_.EnterWriteThread(&w, 0); assert(s.ok() && !w.done); // No timeout and nobody should do our job - // SetNewMemtableAndNewLogFile() will release and reacquire mutex + // SwitchMemtable() will release and reacquire mutex // during execution - s = SetNewMemtableAndNewLogFile(cfd, &context); + s = SwitchMemtable(cfd, &context); write_thread_.ExitWriteThread(&w, &w, s); cfd->imm()->FlushRequested(); @@ -2410,10 +2410,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, c->inputs(0)->size()); // There are three things that can change compaction score: // 1) When flush or compaction finish. This case is covered by - // InstallSuperVersion() + // InstallSuperVersionAndScheduleWork // 2) When MutableCFOptions changes. This case is also covered by - // InstallSuperVersion(), because this is when the new options take - // effect. + // InstallSuperVersionAndScheduleWork, because this is when the new + // options take effect. // 3) When we Pick a new compaction, we "remove" those files being // compacted from the calculation, which then influences compaction // score. Here we check if we need the new compaction even without the @@ -2449,8 +2449,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); LogToBuffer(log_buffer, "[%s] Deleted %d files\n", c->column_family_data()->GetName().c_str(), c->num_input_files(0)); @@ -2486,8 +2486,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); // Use latest MutableCFOptions - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); VersionStorageInfo::LevelSummaryStorage tmp; c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1, @@ -2532,8 +2532,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); if (status.ok()) { - InstallSuperVersionBackground(c->column_family_data(), job_context, - *c->mutable_cf_options()); + InstallSuperVersionAndScheduleWorkWrapper( + c->column_family_data(), job_context, *c->mutable_cf_options()); } *madeProgress = true; } @@ -2695,26 +2695,25 @@ Status DBImpl::Get(const ReadOptions& read_options, // * malloc one SuperVersion() outside of the lock -- new_superversion // * delete SuperVersion()s outside of the lock -- superversions_to_free // -// However, if InstallSuperVersion() gets called twice with the same -// job_context, we can't reuse the SuperVersion() that got -// malloced -// because +// However, if InstallSuperVersionAndScheduleWork() gets called twice with the +// same job_context, we can't reuse the SuperVersion() that got +// malloced because // first call already used it. In that rare case, we take a hit and create a // new SuperVersion() inside of the mutex. We do similar thing // for superversion_to_free -void DBImpl::InstallSuperVersionBackground( +void DBImpl::InstallSuperVersionAndScheduleWorkWrapper( ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); - SuperVersion* old_superversion = InstallSuperVersion( + SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork( cfd, job_context->new_superversion, mutable_cf_options); job_context->new_superversion = nullptr; job_context->superversions_to_free.push_back(old_superversion); } -SuperVersion* DBImpl::InstallSuperVersion( +SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork( ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options, bool dont_schedule_bg_work) { + const MutableCFOptions& mutable_cf_options) { mutex_.AssertHeld(); // Update max_total_in_memory_state_ @@ -2729,14 +2728,10 @@ SuperVersion* DBImpl::InstallSuperVersion( new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); // Whenever we install new SuperVersion, we might need to issue new flushes or - // compactions. dont_schedule_bg_work is true when scheduling from write - // thread and we don't want to add additional overhead. Callers promise to - // call SchedulePendingFlush() and MaybeScheduleFlushOrCompaction() eventually - if (!dont_schedule_bg_work) { - SchedulePendingFlush(cfd); - SchedulePendingCompaction(cfd); - MaybeScheduleFlushOrCompaction(); - } + // compactions. + SchedulePendingFlush(cfd); + SchedulePendingCompaction(cfd); + MaybeScheduleFlushOrCompaction(); // Update max_total_in_memory_state_ max_total_in_memory_state_ = @@ -2947,7 +2942,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - delete InstallSuperVersion( + delete InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); if (!cfd->mem()->IsSnapshotSupported()) { @@ -3371,15 +3366,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { - status = SetNewMemtableAndNewLogFile(cfd, &context); + status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); - context.schedule_bg_work_ = true; } } + MaybeScheduleFlushOrCompaction(); } else if (UNLIKELY(write_buffer_.ShouldFlush())) { Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, "Flushing all column families. Write buffer is using %" PRIu64 @@ -3392,13 +3387,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, continue; } if (!cfd->mem()->IsEmpty()) { - status = SetNewMemtableAndNewLogFile(cfd, &context); + status = SwitchMemtable(cfd, &context); if (!status.ok()) { break; } cfd->imm()->FlushRequested(); SchedulePendingFlush(cfd); - context.schedule_bg_work_ = true; } } MaybeScheduleFlushOrCompaction(); @@ -3414,11 +3408,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (UNLIKELY(status.ok()) && (write_controller_.IsStopped() || write_controller_.NeedsDelay())) { - // If writer is stopped, we need to get it going, - // so schedule flushes/compactions - if (context.schedule_bg_work_) { - MaybeScheduleFlushOrCompaction(); - } PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_GUARD(write_delay_time); // We don't know size of curent batch so that we always use the size @@ -3560,9 +3549,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, mutex_.AssertHeld(); write_thread_.ExitWriteThread(&w, last_writer, status); - if (context.schedule_bg_work_) { - MaybeScheduleFlushOrCompaction(); - } mutex_.Unlock(); if (status.IsTimedOut()) { @@ -3633,9 +3619,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) { Status DBImpl::ScheduleFlushes(WriteContext* context) { ColumnFamilyData* cfd; while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { - auto status = SetNewMemtableAndNewLogFile(cfd, context); - SchedulePendingFlush(cfd); - context->schedule_bg_work_ = true; + auto status = SwitchMemtable(cfd, context); if (cfd->Unref()) { delete cfd; } @@ -3648,8 +3632,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) { // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue -Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, - WriteContext* context) { +Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { mutex_.AssertHeld(); unique_ptr lfile; log::Writer* new_log = nullptr; @@ -3719,8 +3702,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); new_mem->Ref(); cfd->SetMemtable(new_mem); - context->superversions_to_free_.push_back( - InstallSuperVersion(cfd, new_superversion, mutable_cf_options, true)); + context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork( + cfd, new_superversion, mutable_cf_options)); return s; } @@ -4010,8 +3993,8 @@ Status DBImpl::DeleteFile(std::string name) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionBackground(cfd, &job_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); } FindObsoleteFiles(&job_context, false); } // lock released here @@ -4253,7 +4236,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } if (s.ok()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) { - delete impl->InstallSuperVersion( + delete impl->InstallSuperVersionAndScheduleWork( cfd, nullptr, *cfd->GetLatestMutableCFOptions()); } impl->alive_log_files_.push_back( diff --git a/db/db_impl.h b/db/db_impl.h index 89afda987..0d7e1b472 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -440,8 +440,7 @@ class DBImpl : public DB { Status ScheduleFlushes(WriteContext* context); - Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, - WriteContext* context); + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); // Force current memtable contents to be flushed. Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); @@ -719,21 +718,16 @@ class DBImpl : public DB { // the InstallSuperVersion() function. Background threads carry // job_context which can have new_superversion already // allocated. - void InstallSuperVersionBackground( + void InstallSuperVersionAndScheduleWorkWrapper( ColumnFamilyData* cfd, JobContext* job_context, const MutableCFOptions& mutable_cf_options); // All ColumnFamily state changes go through this function. Here we analyze // the new state and we schedule background work if we detect that the new // state needs flush or compaction. - // If dont_schedule_bg_work == true, then caller asks us to not schedule flush - // or compaction here, but it also promises to schedule needed background - // work. We use this to scheduling background compactions when we are in the - // write thread, which is very performance critical. Caller schedules - // background work as soon as it exits the write thread - SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_sv, - const MutableCFOptions& mutable_cf_options, - bool dont_schedule_bg_work = false); + SuperVersion* InstallSuperVersionAndScheduleWork( + ColumnFamilyData* cfd, SuperVersion* new_sv, + const MutableCFOptions& mutable_cf_options); #ifndef ROCKSDB_LITE using DB::GetPropertiesOfAllTables; diff --git a/db/db_impl_experimental.cc b/db/db_impl_experimental.cc index 65529307b..6bf0ba6a1 100644 --- a/db/db_impl_experimental.cc +++ b/db/db_impl_experimental.cc @@ -137,8 +137,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) { status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { - InstallSuperVersionBackground(cfd, &job_context, - *cfd->GetLatestMutableCFOptions()); + InstallSuperVersionAndScheduleWorkWrapper( + cfd, &job_context, *cfd->GetLatestMutableCFOptions()); } } // lock released here LogFlush(db_options_.info_log);