Clean up InstallSuperVersion

Summary:
We go to great lengths to make sure MaybeScheduleFlushOrCompaction() is called outside of write thread. But anyway, it's still called in the mutex, so it's not that much cheaper.

This diff removes the "optimization" and cleans up the code a bit.

Test Plan: make check

Reviewers: rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D40113
main
Igor Canadi 11 years ago
parent 1369f015ee
commit 25d600569d
  1. 93
      db/db_impl.cc
  2. 16
      db/db_impl.h
  3. 4
      db/db_impl_experimental.cc

@ -95,7 +95,6 @@ void DumpRocksDBBuildVersion(Logger * log);
struct DBImpl::WriteContext { struct DBImpl::WriteContext {
autovector<SuperVersion*> superversions_to_free_; autovector<SuperVersion*> superversions_to_free_;
autovector<MemTable*> memtables_to_free_; autovector<MemTable*> memtables_to_free_;
bool schedule_bg_work_ = false;
~WriteContext() { ~WriteContext() {
for (auto& sv : superversions_to_free_) { for (auto& sv : superversions_to_free_) {
@ -1249,7 +1248,8 @@ Status DBImpl::FlushMemTableToOutputFile(
Status s = flush_job.Run(&file_meta); Status s = flush_job.Run(&file_meta);
if (s.ok()) { if (s.ok()) {
InstallSuperVersionBackground(cfd, job_context, mutable_cf_options); InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
mutable_cf_options);
if (madeProgress) { if (madeProgress) {
*madeProgress = 1; *madeProgress = 1;
} }
@ -1578,8 +1578,8 @@ Status DBImpl::CompactFilesImpl(
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionAndScheduleWorkWrapper(
*c->mutable_cf_options()); c->column_family_data(), job_context, *c->mutable_cf_options());
} }
c->ReleaseCompactionFiles(s); c->ReleaseCompactionFiles(s);
c.reset(); c.reset();
@ -1791,7 +1791,7 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_,
directories_.GetDbDir()); directories_.GetDbDir());
superversion_to_free = InstallSuperVersion( superversion_to_free = InstallSuperVersionAndScheduleWork(
cfd, new_superversion, mutable_cf_options); cfd, new_superversion, mutable_cf_options);
new_superversion = nullptr; new_superversion = nullptr;
@ -1945,9 +1945,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
s = write_thread_.EnterWriteThread(&w, 0); s = write_thread_.EnterWriteThread(&w, 0);
assert(s.ok() && !w.done); // No timeout and nobody should do our job assert(s.ok() && !w.done); // No timeout and nobody should do our job
// SetNewMemtableAndNewLogFile() will release and reacquire mutex // SwitchMemtable() will release and reacquire mutex
// during execution // during execution
s = SetNewMemtableAndNewLogFile(cfd, &context); s = SwitchMemtable(cfd, &context);
write_thread_.ExitWriteThread(&w, &w, s); write_thread_.ExitWriteThread(&w, &w, s);
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
@ -2410,10 +2410,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->inputs(0)->size()); c->inputs(0)->size());
// There are three things that can change compaction score: // There are three things that can change compaction score:
// 1) When flush or compaction finish. This case is covered by // 1) When flush or compaction finish. This case is covered by
// InstallSuperVersion() // InstallSuperVersionAndScheduleWork
// 2) When MutableCFOptions changes. This case is also covered by // 2) When MutableCFOptions changes. This case is also covered by
// InstallSuperVersion(), because this is when the new options take // InstallSuperVersionAndScheduleWork, because this is when the new
// effect. // options take effect.
// 3) When we Pick a new compaction, we "remove" those files being // 3) When we Pick a new compaction, we "remove" those files being
// compacted from the calculation, which then influences compaction // compacted from the calculation, which then influences compaction
// score. Here we check if we need the new compaction even without the // score. Here we check if we need the new compaction even without the
@ -2449,8 +2449,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionAndScheduleWorkWrapper(
*c->mutable_cf_options()); c->column_family_data(), job_context, *c->mutable_cf_options());
LogToBuffer(log_buffer, "[%s] Deleted %d files\n", LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(), c->column_family_data()->GetName().c_str(),
c->num_input_files(0)); c->num_input_files(0));
@ -2486,8 +2486,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
// Use latest MutableCFOptions // Use latest MutableCFOptions
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionAndScheduleWorkWrapper(
*c->mutable_cf_options()); c->column_family_data(), job_context, *c->mutable_cf_options());
VersionStorageInfo::LevelSummaryStorage tmp; VersionStorageInfo::LevelSummaryStorage tmp;
c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1, c->column_family_data()->internal_stats()->IncBytesMoved(c->level() + 1,
@ -2532,8 +2532,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_);
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(c->column_family_data(), job_context, InstallSuperVersionAndScheduleWorkWrapper(
*c->mutable_cf_options()); c->column_family_data(), job_context, *c->mutable_cf_options());
} }
*madeProgress = true; *madeProgress = true;
} }
@ -2695,26 +2695,25 @@ Status DBImpl::Get(const ReadOptions& read_options,
// * malloc one SuperVersion() outside of the lock -- new_superversion // * malloc one SuperVersion() outside of the lock -- new_superversion
// * delete SuperVersion()s outside of the lock -- superversions_to_free // * delete SuperVersion()s outside of the lock -- superversions_to_free
// //
// However, if InstallSuperVersion() gets called twice with the same // However, if InstallSuperVersionAndScheduleWork() gets called twice with the
// job_context, we can't reuse the SuperVersion() that got // same job_context, we can't reuse the SuperVersion() that got
// malloced // malloced because
// because
// first call already used it. In that rare case, we take a hit and create a // first call already used it. In that rare case, we take a hit and create a
// new SuperVersion() inside of the mutex. We do similar thing // new SuperVersion() inside of the mutex. We do similar thing
// for superversion_to_free // for superversion_to_free
void DBImpl::InstallSuperVersionBackground( void DBImpl::InstallSuperVersionAndScheduleWorkWrapper(
ColumnFamilyData* cfd, JobContext* job_context, ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options) { const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld(); mutex_.AssertHeld();
SuperVersion* old_superversion = InstallSuperVersion( SuperVersion* old_superversion = InstallSuperVersionAndScheduleWork(
cfd, job_context->new_superversion, mutable_cf_options); cfd, job_context->new_superversion, mutable_cf_options);
job_context->new_superversion = nullptr; job_context->new_superversion = nullptr;
job_context->superversions_to_free.push_back(old_superversion); job_context->superversions_to_free.push_back(old_superversion);
} }
SuperVersion* DBImpl::InstallSuperVersion( SuperVersion* DBImpl::InstallSuperVersionAndScheduleWork(
ColumnFamilyData* cfd, SuperVersion* new_sv, ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options, bool dont_schedule_bg_work) { const MutableCFOptions& mutable_cf_options) {
mutex_.AssertHeld(); mutex_.AssertHeld();
// Update max_total_in_memory_state_ // Update max_total_in_memory_state_
@ -2729,14 +2728,10 @@ SuperVersion* DBImpl::InstallSuperVersion(
new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options); new_sv ? new_sv : new SuperVersion(), &mutex_, mutable_cf_options);
// Whenever we install new SuperVersion, we might need to issue new flushes or // Whenever we install new SuperVersion, we might need to issue new flushes or
// compactions. dont_schedule_bg_work is true when scheduling from write // compactions.
// thread and we don't want to add additional overhead. Callers promise to SchedulePendingFlush(cfd);
// call SchedulePendingFlush() and MaybeScheduleFlushOrCompaction() eventually SchedulePendingCompaction(cfd);
if (!dont_schedule_bg_work) { MaybeScheduleFlushOrCompaction();
SchedulePendingFlush(cfd);
SchedulePendingCompaction(cfd);
MaybeScheduleFlushOrCompaction();
}
// Update max_total_in_memory_state_ // Update max_total_in_memory_state_
max_total_in_memory_state_ = max_total_in_memory_state_ =
@ -2947,7 +2942,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options,
auto* cfd = auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr); assert(cfd != nullptr);
delete InstallSuperVersion( delete InstallSuperVersionAndScheduleWork(
cfd, nullptr, *cfd->GetLatestMutableCFOptions()); cfd, nullptr, *cfd->GetLatestMutableCFOptions());
if (!cfd->mem()->IsSnapshotSupported()) { if (!cfd->mem()->IsSnapshotSupported()) {
@ -3371,15 +3366,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
continue; continue;
} }
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
status = SetNewMemtableAndNewLogFile(cfd, &context); status = SwitchMemtable(cfd, &context);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd); SchedulePendingFlush(cfd);
context.schedule_bg_work_ = true;
} }
} }
MaybeScheduleFlushOrCompaction();
} else if (UNLIKELY(write_buffer_.ShouldFlush())) { } else if (UNLIKELY(write_buffer_.ShouldFlush())) {
Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_.info_log,
"Flushing all column families. Write buffer is using %" PRIu64 "Flushing all column families. Write buffer is using %" PRIu64
@ -3392,13 +3387,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
continue; continue;
} }
if (!cfd->mem()->IsEmpty()) { if (!cfd->mem()->IsEmpty()) {
status = SetNewMemtableAndNewLogFile(cfd, &context); status = SwitchMemtable(cfd, &context);
if (!status.ok()) { if (!status.ok()) {
break; break;
} }
cfd->imm()->FlushRequested(); cfd->imm()->FlushRequested();
SchedulePendingFlush(cfd); SchedulePendingFlush(cfd);
context.schedule_bg_work_ = true;
} }
} }
MaybeScheduleFlushOrCompaction(); MaybeScheduleFlushOrCompaction();
@ -3414,11 +3408,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (UNLIKELY(status.ok()) && if (UNLIKELY(status.ok()) &&
(write_controller_.IsStopped() || write_controller_.NeedsDelay())) { (write_controller_.IsStopped() || write_controller_.NeedsDelay())) {
// If writer is stopped, we need to get it going,
// so schedule flushes/compactions
if (context.schedule_bg_work_) {
MaybeScheduleFlushOrCompaction();
}
PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_delay_time); PERF_TIMER_GUARD(write_delay_time);
// We don't know size of curent batch so that we always use the size // We don't know size of curent batch so that we always use the size
@ -3560,9 +3549,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_.AssertHeld(); mutex_.AssertHeld();
write_thread_.ExitWriteThread(&w, last_writer, status); write_thread_.ExitWriteThread(&w, last_writer, status);
if (context.schedule_bg_work_) {
MaybeScheduleFlushOrCompaction();
}
mutex_.Unlock(); mutex_.Unlock();
if (status.IsTimedOut()) { if (status.IsTimedOut()) {
@ -3633,9 +3619,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, uint64_t expiration_time) {
Status DBImpl::ScheduleFlushes(WriteContext* context) { Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd; ColumnFamilyData* cfd;
while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) { while ((cfd = flush_scheduler_.GetNextColumnFamily()) != nullptr) {
auto status = SetNewMemtableAndNewLogFile(cfd, context); auto status = SwitchMemtable(cfd, context);
SchedulePendingFlush(cfd);
context->schedule_bg_work_ = true;
if (cfd->Unref()) { if (cfd->Unref()) {
delete cfd; delete cfd;
} }
@ -3648,8 +3632,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue // REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
WriteContext* context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
unique_ptr<WritableFile> lfile; unique_ptr<WritableFile> lfile;
log::Writer* new_log = nullptr; log::Writer* new_log = nullptr;
@ -3719,8 +3702,8 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd,
cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_);
new_mem->Ref(); new_mem->Ref();
cfd->SetMemtable(new_mem); cfd->SetMemtable(new_mem);
context->superversions_to_free_.push_back( context->superversions_to_free_.push_back(InstallSuperVersionAndScheduleWork(
InstallSuperVersion(cfd, new_superversion, mutable_cf_options, true)); cfd, new_superversion, mutable_cf_options));
return s; return s;
} }
@ -4010,8 +3993,8 @@ Status DBImpl::DeleteFile(std::string name) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir()); &edit, &mutex_, directories_.GetDbDir());
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(cfd, &job_context, InstallSuperVersionAndScheduleWorkWrapper(
*cfd->GetLatestMutableCFOptions()); cfd, &job_context, *cfd->GetLatestMutableCFOptions());
} }
FindObsoleteFiles(&job_context, false); FindObsoleteFiles(&job_context, false);
} // lock released here } // lock released here
@ -4253,7 +4236,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname,
} }
if (s.ok()) { if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) { for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
delete impl->InstallSuperVersion( delete impl->InstallSuperVersionAndScheduleWork(
cfd, nullptr, *cfd->GetLatestMutableCFOptions()); cfd, nullptr, *cfd->GetLatestMutableCFOptions());
} }
impl->alive_log_files_.push_back( impl->alive_log_files_.push_back(

@ -440,8 +440,7 @@ class DBImpl : public DB {
Status ScheduleFlushes(WriteContext* context); Status ScheduleFlushes(WriteContext* context);
Status SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
WriteContext* context);
// Force current memtable contents to be flushed. // Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options); Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options);
@ -719,21 +718,16 @@ class DBImpl : public DB {
// the InstallSuperVersion() function. Background threads carry // the InstallSuperVersion() function. Background threads carry
// job_context which can have new_superversion already // job_context which can have new_superversion already
// allocated. // allocated.
void InstallSuperVersionBackground( void InstallSuperVersionAndScheduleWorkWrapper(
ColumnFamilyData* cfd, JobContext* job_context, ColumnFamilyData* cfd, JobContext* job_context,
const MutableCFOptions& mutable_cf_options); const MutableCFOptions& mutable_cf_options);
// All ColumnFamily state changes go through this function. Here we analyze // All ColumnFamily state changes go through this function. Here we analyze
// the new state and we schedule background work if we detect that the new // the new state and we schedule background work if we detect that the new
// state needs flush or compaction. // state needs flush or compaction.
// If dont_schedule_bg_work == true, then caller asks us to not schedule flush SuperVersion* InstallSuperVersionAndScheduleWork(
// or compaction here, but it also promises to schedule needed background ColumnFamilyData* cfd, SuperVersion* new_sv,
// work. We use this to scheduling background compactions when we are in the const MutableCFOptions& mutable_cf_options);
// write thread, which is very performance critical. Caller schedules
// background work as soon as it exits the write thread
SuperVersion* InstallSuperVersion(ColumnFamilyData* cfd, SuperVersion* new_sv,
const MutableCFOptions& mutable_cf_options,
bool dont_schedule_bg_work = false);
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
using DB::GetPropertiesOfAllTables; using DB::GetPropertiesOfAllTables;

@ -137,8 +137,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir()); &edit, &mutex_, directories_.GetDbDir());
if (status.ok()) { if (status.ok()) {
InstallSuperVersionBackground(cfd, &job_context, InstallSuperVersionAndScheduleWorkWrapper(
*cfd->GetLatestMutableCFOptions()); cfd, &job_context, *cfd->GetLatestMutableCFOptions());
} }
} // lock released here } // lock released here
LogFlush(db_options_.info_log); LogFlush(db_options_.info_log);

Loading…
Cancel
Save