diff --git a/db/db_impl.cc b/db/db_impl.cc index 619b6cc1c..5307b9c12 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -169,12 +169,14 @@ Options SanitizeOptions(const std::string& dbname, // function. auto factory = dynamic_cast( result.memtable_factory.get()); - if (factory != nullptr && + if (factory && factory->GetTransform() != result.prefix_extractor) { Log(result.info_log, "A prefix hash representation factory was supplied " "whose prefix extractor does not match options.prefix_extractor. " "Falling back to skip list representation factory"); result.memtable_factory = std::make_shared(); + } else if (factory) { + Log(result.info_log, "Prefix hash memtable rep is in use."); } } return result; @@ -198,6 +200,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) logfile_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), + bg_flush_scheduled_(0), bg_logstats_scheduled_(false), manual_compaction_(nullptr), logger_(nullptr), @@ -265,7 +268,9 @@ DBImpl::~DBImpl() { } mutex_.Lock(); shutting_down_.Release_Store(this); // Any non-nullptr value is ok - while (bg_compaction_scheduled_ || bg_logstats_scheduled_) { + while (bg_compaction_scheduled_ || + bg_flush_scheduled_ || + bg_logstats_scheduled_) { bg_cv_.Wait(); } mutex_.Unlock(); @@ -285,13 +290,17 @@ void DBImpl::TEST_Destroy_DBImpl() { // wait till all background compactions are done. mutex_.Lock(); - while (bg_compaction_scheduled_ || bg_logstats_scheduled_) { + while (bg_compaction_scheduled_ || + bg_flush_scheduled_ || + bg_logstats_scheduled_) { bg_cv_.Wait(); } // Prevent new compactions from occuring. + bg_work_gate_closed_ = true; const int LargeNumber = 10000000; bg_compaction_scheduled_ += LargeNumber; + mutex_.Unlock(); // force release the lock file. @@ -1015,10 +1024,10 @@ void DBImpl::ReFitLevel(int level, int target_level) { // wait for all background threads to stop bg_work_gate_closed_ = true; - while (bg_compaction_scheduled_ > 0) { + while (bg_compaction_scheduled_ > 0 || bg_flush_scheduled_) { Log(options_.info_log, - "RefitLevel: waiting for background threads to stop: %d", - bg_compaction_scheduled_); + "RefitLevel: waiting for background threads to stop: %d %d", + bg_compaction_scheduled_, bg_flush_scheduled_); bg_cv_.Wait(); } @@ -1369,7 +1378,8 @@ Status DBImpl::TEST_WaitForCompactMemTable() { Status DBImpl::TEST_WaitForCompact() { // Wait until the compaction completes MutexLock l(&mutex_); - while (bg_compaction_scheduled_ && bg_error_.ok()) { + while ((bg_compaction_scheduled_ || bg_flush_scheduled_) && + bg_error_.ok()) { bg_cv_.Wait(); } return bg_error_; @@ -1379,29 +1389,80 @@ void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (bg_work_gate_closed_) { // gate closed for backgrond work - } else if (bg_compaction_scheduled_ >= options_.max_background_compactions) { - // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions - } else if (!imm_.IsFlushPending(options_.min_write_buffer_number_to_merge) && - manual_compaction_ == nullptr && - !versions_->NeedsCompaction()) { - // No work to be done } else { - bg_compaction_scheduled_++; - env_->Schedule(&DBImpl::BGWork, this); + bool is_flush_pending = + imm_.IsFlushPending(options_.min_write_buffer_number_to_merge); + if (is_flush_pending && + (bg_flush_scheduled_ < options_.max_background_flushes)) { + // memtable flush needed + bg_flush_scheduled_++; + env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); + } + + if ((manual_compaction_ || + versions_->NeedsCompaction() || + (is_flush_pending && (options_.max_background_flushes <= 0))) && + bg_compaction_scheduled_ < options_.max_background_compactions) { + // compaction needed, or memtable flush needed but HIGH pool not enabled. + bg_compaction_scheduled_++; + env_->Schedule(&DBImpl::BGWorkCompaction, this, Env::Priority::LOW); + } + } +} + +void DBImpl::BGWorkFlush(void* db) { + reinterpret_cast(db)->BackgroundCallFlush(); +} + +void DBImpl::BGWorkCompaction(void* db) { + reinterpret_cast(db)->BackgroundCallCompaction(); +} + +Status DBImpl::BackgroundFlush() { + Status stat; + while (stat.ok() && + imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { + Log(options_.info_log, + "BackgroundCallFlush doing CompactMemTable, flush slots available %d", + options_.max_background_flushes - bg_flush_scheduled_); + stat = CompactMemTable(); } + return stat; } -void DBImpl::BGWork(void* db) { - reinterpret_cast(db)->BackgroundCall(); +void DBImpl::BackgroundCallFlush() { + assert(bg_flush_scheduled_); + MutexLock l(&mutex_); + + if (!shutting_down_.Acquire_Load()) { + Status s = BackgroundFlush(); + if (!s.ok()) { + // Wait a little bit before retrying background compaction in + // case this is an environmental problem and we do not want to + // chew up resources for failed compactions for the duration of + // the problem. + bg_cv_.SignalAll(); // In case a waiter can proceed despite the error + Log(options_.info_log, "Waiting after background flush error: %s", + s.ToString().c_str()); + mutex_.Unlock(); + env_->SleepForMicroseconds(1000000); + mutex_.Lock(); + } + } + + bg_flush_scheduled_--; + + bg_cv_.SignalAll(); } + void DBImpl::TEST_PurgeObsoleteteWAL() { PurgeObsoleteWALFiles(); } -void DBImpl::BackgroundCall() { +void DBImpl::BackgroundCallCompaction() { bool madeProgress = false; DeletionState deletion_state; @@ -1454,6 +1515,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, *madeProgress = false; mutex_.AssertHeld(); + // TODO: remove memtable flush from formal compaction while (imm_.IsFlushPending(options_.min_write_buffer_number_to_merge)) { Log(options_.info_log, "BackgroundCompaction doing CompactMemTable, compaction slots available %d", @@ -1830,6 +1892,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { // Prioritize immutable compaction work + // TODO: remove memtable flush from normal compaction work if (imm_.imm_flush_needed.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); diff --git a/db/db_impl.h b/db/db_impl.h index def204aa6..c4509c1da 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -187,9 +187,12 @@ class DBImpl : public DB { void LogDBDeployStats(); void MaybeScheduleCompaction(); - static void BGWork(void* db); - void BackgroundCall(); + static void BGWorkCompaction(void* db); + static void BGWorkFlush(void* db); + void BackgroundCallCompaction(); + void BackgroundCallFlush(); Status BackgroundCompaction(bool* madeProgress,DeletionState& deletion_state); + Status BackgroundFlush(); void CleanupCompaction(CompactionState* compact); Status DoCompactionWork(CompactionState* compact); @@ -283,6 +286,9 @@ class DBImpl : public DB { // count how many background compaction been scheduled or is running? int bg_compaction_scheduled_; + // number of background memtable flush jobs, submitted to the HIGH pool + int bg_flush_scheduled_; + // Has a background stats log thread scheduled? bool bg_logstats_scheduled_; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index c8fc6ea39..eb4014a3f 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -369,10 +369,23 @@ struct Options { // every compaction run. uint64_t delete_obsolete_files_period_micros; - // Maximum number of concurrent background compactions. + // Maximum number of concurrent background jobs, submitted to + // the default LOW priority thread pool // Default: 1 int max_background_compactions; + // Maximum number of concurrent background memtable flush jobs, submitted to + // the HIGH priority thread pool. + // By default, all background jobs (major compaction and memtable flush) go + // to the LOW priority pool. If this option is set to a positive number, + // memtable flush jobs will be submitted to the HIGH priority pool. + // It is important when the same Env is shared by multiple db instances. + // Without a separate pool, long running major compaction jobs could + // potentially block memtable flush jobs of other db instances, leading to + // unnecessary Put stalls. + // Default: 0 + int max_background_flushes; + // Specify the maximal size of the info log file. If the log file // is larger than `max_log_file_size`, a new info log file will // be created. diff --git a/util/options.cc b/util/options.cc index a1d6b81db..963449557 100644 --- a/util/options.cc +++ b/util/options.cc @@ -54,6 +54,7 @@ Options::Options() disable_seek_compaction(false), delete_obsolete_files_period_micros(0), max_background_compactions(1), + max_background_flushes(0), max_log_file_size(0), log_file_time_to_roll(0), keep_log_file_num(1000), @@ -199,6 +200,8 @@ Options::Dump(Logger* log) const delete_obsolete_files_period_micros); Log(log," Options.max_background_compactions: %d", max_background_compactions); + Log(log," Options.max_background_flushes: %d", + max_background_flushes); Log(log," Options.soft_rate_limit: %.2f", soft_rate_limit); Log(log," Options.hard_rate_limit: %.2f",