|
|
@ -247,6 +247,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) |
|
|
|
super_version_(nullptr), |
|
|
|
super_version_(nullptr), |
|
|
|
tmp_batch_(), |
|
|
|
tmp_batch_(), |
|
|
|
bg_compaction_scheduled_(0), |
|
|
|
bg_compaction_scheduled_(0), |
|
|
|
|
|
|
|
bg_manual_only_(0), |
|
|
|
bg_flush_scheduled_(0), |
|
|
|
bg_flush_scheduled_(0), |
|
|
|
bg_logstats_scheduled_(false), |
|
|
|
bg_logstats_scheduled_(false), |
|
|
|
manual_compaction_(nullptr), |
|
|
|
manual_compaction_(nullptr), |
|
|
@ -1605,45 +1606,44 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { |
|
|
|
|
|
|
|
|
|
|
|
MutexLock l(&mutex_); |
|
|
|
MutexLock l(&mutex_); |
|
|
|
|
|
|
|
|
|
|
|
// When a manual compaction arrives, temporarily throttle down
|
|
|
|
// When a manual compaction arrives, temporarily disable scheduling of
|
|
|
|
// the number of background compaction threads to 1. This is
|
|
|
|
// non-manual compactions and wait until the number of scheduled compaction
|
|
|
|
// needed to ensure that this manual compaction can compact
|
|
|
|
// jobs drops to zero. This is needed to ensure that this manual compaction
|
|
|
|
// any range of keys/files. We artificialy increase
|
|
|
|
// can compact any range of keys/files.
|
|
|
|
// bg_compaction_scheduled_ by a large number, this causes
|
|
|
|
//
|
|
|
|
// the system to have a single background thread. Now,
|
|
|
|
// bg_manual_only_ is non-zero when at least one thread is inside
|
|
|
|
// this manual compaction can progress without stomping
|
|
|
|
// TEST_CompactRange(), i.e. during that time no other compaction will
|
|
|
|
// on any other concurrent compactions.
|
|
|
|
// get scheduled (see MaybeScheduleFlushOrCompaction).
|
|
|
|
const int LargeNumber = 10000000; |
|
|
|
//
|
|
|
|
const int newvalue = options_.max_background_compactions-1; |
|
|
|
// Note that the following loop doesn't stop more that one thread calling
|
|
|
|
bg_compaction_scheduled_ += LargeNumber; |
|
|
|
// TEST_CompactRange() from getting to the second while loop below.
|
|
|
|
while (bg_compaction_scheduled_ > LargeNumber) { |
|
|
|
// However, only one of them will actually schedule compaction, while
|
|
|
|
Log(options_.info_log, "Manual compaction request waiting for background threads to fall below 1"); |
|
|
|
// others will wait on a condition variable until it completes.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
++bg_manual_only_; |
|
|
|
|
|
|
|
while (bg_compaction_scheduled_ > 0) { |
|
|
|
|
|
|
|
Log(options_.info_log, |
|
|
|
|
|
|
|
"Manual compaction waiting for all other scheduled background " |
|
|
|
|
|
|
|
"compactions to finish"); |
|
|
|
bg_cv_.Wait(); |
|
|
|
bg_cv_.Wait(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Log(options_.info_log, "Manual compaction starting"); |
|
|
|
Log(options_.info_log, "Manual compaction starting"); |
|
|
|
|
|
|
|
|
|
|
|
while (!manual.done) { |
|
|
|
while (!manual.done && !shutting_down_.Acquire_Load() && bg_error_.ok()) { |
|
|
|
while (manual_compaction_ != nullptr) { |
|
|
|
assert(bg_manual_only_ > 0); |
|
|
|
bg_cv_.Wait(); |
|
|
|
if (manual_compaction_ != nullptr) { |
|
|
|
} |
|
|
|
// Running either this or some other manual compaction
|
|
|
|
manual_compaction_ = &manual; |
|
|
|
|
|
|
|
if (bg_compaction_scheduled_ == LargeNumber) { |
|
|
|
|
|
|
|
bg_compaction_scheduled_ = newvalue; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
MaybeScheduleFlushOrCompaction(); |
|
|
|
|
|
|
|
while (manual_compaction_ == &manual) { |
|
|
|
|
|
|
|
bg_cv_.Wait(); |
|
|
|
bg_cv_.Wait(); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
manual_compaction_ = &manual; |
|
|
|
|
|
|
|
MaybeScheduleFlushOrCompaction(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
assert(!manual.in_progress); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// wait till there are no background threads scheduled
|
|
|
|
assert(!manual.in_progress); |
|
|
|
bg_compaction_scheduled_ += LargeNumber; |
|
|
|
assert(bg_manual_only_ > 0); |
|
|
|
while (bg_compaction_scheduled_ > LargeNumber + newvalue) { |
|
|
|
--bg_manual_only_; |
|
|
|
Log(options_.info_log, "Manual compaction resetting background threads"); |
|
|
|
|
|
|
|
bg_cv_.Wait(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
bg_compaction_scheduled_ = 0; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
Status DBImpl::FlushMemTable(const FlushOptions& options) { |
|
|
|
Status DBImpl::FlushMemTable(const FlushOptions& options) { |
|
|
@ -1708,11 +1708,16 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { |
|
|
|
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); |
|
|
|
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Schedule BGWorkCompaction if there's a compaction pending (or a memtable
|
|
|
|
|
|
|
|
// flush, but the HIGH pool is not enabled). Do it only if
|
|
|
|
|
|
|
|
// max_background_compactions hasn't been reached and, in case
|
|
|
|
|
|
|
|
// bg_manual_only_ > 0, if it's a manual compaction.
|
|
|
|
if ((manual_compaction_ || |
|
|
|
if ((manual_compaction_ || |
|
|
|
versions_->NeedsCompaction() || |
|
|
|
versions_->NeedsCompaction() || |
|
|
|
(is_flush_pending && (options_.max_background_flushes <= 0))) && |
|
|
|
(is_flush_pending && (options_.max_background_flushes <= 0))) && |
|
|
|
bg_compaction_scheduled_ < options_.max_background_compactions) { |
|
|
|
bg_compaction_scheduled_ < options_.max_background_compactions && |
|
|
|
// compaction needed, or memtable flush needed but HIGH pool not enabled.
|
|
|
|
(!bg_manual_only_ || manual_compaction_)) { |
|
|
|
|
|
|
|
|
|
|
|
bg_compaction_scheduled_++; |
|
|
|
bg_compaction_scheduled_++; |
|
|
|
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); |
|
|
|
env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); |
|
|
|
} |
|
|
|
} |
|
|
|