diff --git a/HISTORY.md b/HISTORY.md index 8bac7fd1e..bca3080c9 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,11 +2,12 @@ ## Unreleased ### Public API Change * Iterator::SeekForPrev is now a pure virtual method. This is to prevent user who implement the Iterator interface fail to implement SeekForPrev by mistake. -- Add `include_end` option to make the range end exclusive when `include_end == false` in `DeleteFilesInRange()`. +* Add `include_end` option to make the range end exclusive when `include_end == false` in `DeleteFilesInRange()`. +* Add `CompactRangeOptions::allow_write_stall`, which makes `CompactRange` start working immediately, even if it causes user writes to stall. The default value is false, meaning we add delay to `CompactRange` calls until stalling can be avoided when possible. Note this delay is not present in previous RocksDB versions. ### New Features * Improve the performance of iterators doing long range scans by using readahead. -- Add new function `DeleteFilesInRanges()` to delete files in multiple ranges at once for better performance. +* Add new function `DeleteFilesInRanges()` to delete files in multiple ranges at once for better performance. ### Bug Fixes * Fix `DisableFileDeletions()` followed by `GetSortedWalFiles()` to not return obsolete WAL files that `PurgeObsoleteFiles()` is going to delete. diff --git a/db/column_family.cc b/db/column_family.cc index c05373293..ad5019e80 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -632,6 +632,41 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, } } // namespace +std::pair +ColumnFamilyData::GetWriteStallConditionAndCause( + int num_unflushed_memtables, int num_l0_files, + uint64_t num_compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options) { + if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) { + return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit}; + } else if (!mutable_cf_options.disable_auto_compactions && + num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) { + return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit}; + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && + num_compaction_needed_bytes >= + mutable_cf_options.hard_pending_compaction_bytes_limit) { + return {WriteStallCondition::kStopped, + WriteStallCause::kPendingCompactionBytes}; + } else if (mutable_cf_options.max_write_buffer_number > 3 && + num_unflushed_memtables >= + mutable_cf_options.max_write_buffer_number - 1) { + return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit}; + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.level0_slowdown_writes_trigger >= 0 && + num_l0_files >= + mutable_cf_options.level0_slowdown_writes_trigger) { + return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit}; + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && + num_compaction_needed_bytes >= + mutable_cf_options.soft_pending_compaction_bytes_limit) { + return {WriteStallCondition::kDelayed, + WriteStallCause::kPendingCompactionBytes}; + } + return {WriteStallCondition::kNormal, WriteStallCause::kNone}; +} + WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { auto write_stall_condition = WriteStallCondition::kNormal; @@ -641,25 +676,29 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( uint64_t compaction_needed_bytes = vstorage->estimated_compaction_needed_bytes(); + auto write_stall_condition_and_cause = GetWriteStallConditionAndCause( + imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(), + vstorage->estimated_compaction_needed_bytes(), mutable_cf_options); + write_stall_condition = write_stall_condition_and_cause.first; + auto write_stall_cause = write_stall_condition_and_cause.second; + bool was_stopped = write_controller->IsStopped(); bool needed_delay = write_controller->NeedsDelay(); - if (imm()->NumNotFlushed() >= mutable_cf_options.max_write_buffer_number) { + if (write_stall_condition == WriteStallCondition::kStopped && + write_stall_cause == WriteStallCause::kMemtableLimit) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1); - write_stall_condition = WriteStallCondition::kStopped; ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stopping writes because we have %d immutable memtables " "(waiting for flush), max_write_buffer_number is set to %d", name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number); - } else if (!mutable_cf_options.disable_auto_compactions && - vstorage->l0_delay_trigger_count() >= - mutable_cf_options.level0_stop_writes_trigger) { + } else if (write_stall_condition == WriteStallCondition::kStopped && + write_stall_cause == WriteStallCause::kL0FileCountLimit) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1); - write_stall_condition = WriteStallCondition::kStopped; if (compaction_picker_->IsLevel0CompactionInProgress()) { internal_stats_->AddCFStats( InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1); @@ -667,28 +706,23 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( ROCKS_LOG_WARN(ioptions_.info_log, "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), vstorage->l0_delay_trigger_count()); - } else if (!mutable_cf_options.disable_auto_compactions && - mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && - compaction_needed_bytes >= - mutable_cf_options.hard_pending_compaction_bytes_limit) { + } else if (write_stall_condition == WriteStallCondition::kStopped && + write_stall_cause == WriteStallCause::kPendingCompactionBytes) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1); - write_stall_condition = WriteStallCondition::kStopped; ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stopping writes because of estimated pending compaction " "bytes %" PRIu64, name_.c_str(), compaction_needed_bytes); - } else if (mutable_cf_options.max_write_buffer_number > 3 && - imm()->NumNotFlushed() >= - mutable_cf_options.max_write_buffer_number - 1) { + } else if (write_stall_condition == WriteStallCondition::kDelayed && + write_stall_cause == WriteStallCause::kMemtableLimit) { write_controller_token_ = SetupDelay(write_controller, compaction_needed_bytes, prev_compaction_needed_bytes_, was_stopped, mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1); - write_stall_condition = WriteStallCondition::kDelayed; ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stalling writes because we have %d immutable memtables " @@ -697,10 +731,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number, write_controller->delayed_write_rate()); - } else if (!mutable_cf_options.disable_auto_compactions && - mutable_cf_options.level0_slowdown_writes_trigger >= 0 && - vstorage->l0_delay_trigger_count() >= - mutable_cf_options.level0_slowdown_writes_trigger) { + } else if (write_stall_condition == WriteStallCondition::kDelayed && + write_stall_cause == WriteStallCause::kL0FileCountLimit) { // L0 is the last two files from stopping. bool near_stop = vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_stop_writes_trigger - 2; @@ -710,7 +742,6 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1); - write_stall_condition = WriteStallCondition::kDelayed; if (compaction_picker_->IsLevel0CompactionInProgress()) { internal_stats_->AddCFStats( InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1); @@ -720,10 +751,8 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( "rate %" PRIu64, name_.c_str(), vstorage->l0_delay_trigger_count(), write_controller->delayed_write_rate()); - } else if (!mutable_cf_options.disable_auto_compactions && - mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && - vstorage->estimated_compaction_needed_bytes() >= - mutable_cf_options.soft_pending_compaction_bytes_limit) { + } else if (write_stall_condition == WriteStallCondition::kDelayed && + write_stall_cause == WriteStallCause::kPendingCompactionBytes) { // If the distance to hard limit is less than 1/4 of the gap between soft // and // hard bytes limit, we think it is near stop and speed up the slowdown. @@ -741,7 +770,6 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( mutable_cf_options.disable_auto_compactions); internal_stats_->AddCFStats( InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1); - write_stall_condition = WriteStallCondition::kDelayed; ROCKS_LOG_WARN( ioptions_.info_log, "[%s] Stalling writes because of estimated pending compaction " @@ -749,6 +777,7 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), vstorage->estimated_compaction_needed_bytes(), write_controller->delayed_write_rate()); } else { + assert(write_stall_condition == WriteStallCondition::kNormal); if (vstorage->l0_delay_trigger_count() >= GetL0ThresholdSpeedupCompaction( mutable_cf_options.level0_file_num_compaction_trigger, diff --git a/db/column_family.h b/db/column_family.h index ad0ec536b..e5abb485e 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -30,6 +30,7 @@ namespace rocksdb { class Version; class VersionSet; +class VersionStorageInfo; class MemTable; class MemTableListVersion; class CompactionPicker; @@ -335,6 +336,17 @@ class ColumnFamilyData { bool pending_flush() { return pending_flush_; } bool pending_compaction() { return pending_compaction_; } + enum class WriteStallCause { + kNone, + kMemtableLimit, + kL0FileCountLimit, + kPendingCompactionBytes, + }; + static std::pair + GetWriteStallConditionAndCause(int num_unflushed_memtables, int num_l0_files, + uint64_t num_compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options); + // Recalculate some small conditions, which are changed only during // compaction, adding new memtable and/or // recalculation of compaction score. These values are used in diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 4225ddf66..3b77228d8 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3105,6 +3105,218 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) { } } +TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) { + // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual + // compaction only triggers flush after it's sure stall won't be triggered for + // L0 file count going too high. + const int kNumL0FilesTrigger = 4; + const int kNumL0FilesLimit = 8; + // i == 0: verifies normal case where stall is avoided by delay + // i == 1: verifies no delay in edge case where stall trigger is same as + // compaction trigger, so stall can't be avoided + for (int i = 0; i < 2; ++i) { + Options options = CurrentOptions(); + options.level0_slowdown_writes_trigger = kNumL0FilesLimit; + if (i == 0) { + options.level0_file_num_compaction_trigger = kNumL0FilesTrigger; + } else { + options.level0_file_num_compaction_trigger = kNumL0FilesLimit; + } + Reopen(options); + + if (i == 0) { + // ensure the auto compaction doesn't finish until manual compaction has + // had a chance to be delayed. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CompactRange:StallWait", "CompactionJob::Run():End"}}); + } else { + // ensure the auto-compaction doesn't finish until manual compaction has + // continued without delay. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}}); + } + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + for (int j = 0; j < kNumL0FilesLimit - 1; ++j) { + for (int k = 0; k < 2; ++k) { + ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024))); + } + Flush(); + } + auto manual_compaction_thread = port::Thread([this]() { + CompactRangeOptions cro; + cro.allow_write_stall = false; + db_->CompactRange(cro, nullptr, nullptr); + }); + + manual_compaction_thread.join(); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + ASSERT_GT(NumTableFilesAtLevel(1), 0); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } +} + +TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) { + // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual + // compaction only triggers flush after it's sure stall won't be triggered for + // immutable memtable count going too high. + const int kNumImmMemTableLimit = 8; + // i == 0: verifies normal case where stall is avoided by delay + // i == 1: verifies no delay in edge case where stall trigger is same as flush + // trigger, so stall can't be avoided + for (int i = 0; i < 2; ++i) { + Options options = CurrentOptions(); + options.disable_auto_compactions = true; + // the delay limit is one less than the stop limit. This test focuses on + // avoiding delay limit, but this option sets stop limit, so add one. + options.max_write_buffer_number = kNumImmMemTableLimit + 1; + if (i == 1) { + options.min_write_buffer_number_to_merge = kNumImmMemTableLimit; + } + Reopen(options); + + if (i == 0) { + // ensure the flush doesn't finish until manual compaction has had a + // chance to be delayed. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CompactRange:StallWait", "FlushJob::WriteLevel0Table"}}); + } else { + // ensure the flush doesn't finish until manual compaction has continued + // without delay. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CompactRange:StallWaitDone", + "FlushJob::WriteLevel0Table"}}); + } + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) { + ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024))); + FlushOptions flush_opts; + flush_opts.wait = false; + dbfull()->Flush(flush_opts); + } + + auto manual_compaction_thread = port::Thread([this]() { + CompactRangeOptions cro; + cro.allow_write_stall = false; + db_->CompactRange(cro, nullptr, nullptr); + }); + + manual_compaction_thread.join(); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + ASSERT_GT(NumTableFilesAtLevel(1), 0); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } +} + +TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) { + // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay + // does not hang if CF is dropped or DB is closed + const int kNumL0FilesTrigger = 4; + const int kNumL0FilesLimit = 8; + Options options = CurrentOptions(); + options.level0_file_num_compaction_trigger = kNumL0FilesTrigger; + options.level0_slowdown_writes_trigger = kNumL0FilesLimit; + // i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it + // i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to + // simulate what happens during Close as we can't call Close (it + // blocks on the auto-compaction, making a cycle). + for (int i = 0; i < 2; ++i) { + CreateAndReopenWithCF({"one"}, options); + // The calls to close CF/DB wait until the manual compaction stalls. + // The auto-compaction waits until the manual compaction finishes to ensure + // the signal comes from closing CF/DB, not from compaction making progress. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CompactRange:StallWait", + "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"}, + {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual", + "CompactionJob::Run():End"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Random rnd(301); + for (int j = 0; j < kNumL0FilesLimit - 1; ++j) { + for (int k = 0; k < 2; ++k) { + ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024))); + } + Flush(1); + } + auto manual_compaction_thread = port::Thread([this]() { + CompactRangeOptions cro; + cro.allow_write_stall = false; + ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr) + .IsShutdownInProgress()); + }); + + TEST_SYNC_POINT( + "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"); + if (i == 0) { + ASSERT_OK(db_->DropColumnFamily(handles_[1])); + } else { + dbfull()->CancelAllBackgroundWork(false /* wait */); + } + manual_compaction_thread.join(); + TEST_SYNC_POINT( + "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual"); + dbfull()->TEST_WaitForCompact(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + } +} + +TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) { + // Verify that, when `CompactRangeOptions::allow_write_stall == false`, + // CompactRange skips its flush if the delay is long enough that the memtables + // existing at the beginning of the call have already been flushed. + const int kNumL0FilesTrigger = 4; + const int kNumL0FilesLimit = 8; + Options options = CurrentOptions(); + options.level0_slowdown_writes_trigger = kNumL0FilesLimit; + options.level0_file_num_compaction_trigger = kNumL0FilesTrigger; + Reopen(options); + + Random rnd(301); + // The manual flush includes the memtable that was active when CompactRange + // began. So it unblocks CompactRange and precludes its flush. Throughout the + // test, stall conditions are upheld via high L0 file count. + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::CompactRange:StallWait", + "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"}, + {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush", + "DBImpl::CompactRange:StallWaitDone"}, + {"DBImpl::CompactRange:StallWaitDone", "CompactionJob::Run():End"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + for (int i = 0; i < kNumL0FilesLimit - 1; ++i) { + for (int j = 0; j < 2; ++j) { + ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024))); + } + Flush(); + } + auto manual_compaction_thread = port::Thread([this]() { + CompactRangeOptions cro; + cro.allow_write_stall = false; + db_->CompactRange(cro, nullptr, nullptr); + }); + + TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"); + Put(ToString(0), RandomString(&rnd, 1024)); + Flush(); + Put(ToString(0), RandomString(&rnd, 1024)); + TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush"); + manual_compaction_thread.join(); + + // If CompactRange's flush was skipped, the final Put above will still be + // in the active memtable. + std::string num_keys_in_memtable; + db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable); + ASSERT_EQ(ToString(1), num_keys_in_memtable); + + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, ::testing::Values(std::make_tuple(1, true), std::make_tuple(1, false), diff --git a/db/db_impl.cc b/db/db_impl.cc index 7136a813e..2cd232355 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -549,6 +549,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, persist_options_status = WriteOptionsFile( false /*need_mutex_lock*/, true /*need_enter_write_thread*/); + bg_cv_.SignalAll(); } } sv_context.Clean(); @@ -1433,6 +1434,7 @@ Status DBImpl::DropColumnFamilyImpl(ColumnFamilyHandle* column_family) { } is_snapshot_supported_ = new_is_snapshot_supported; } + bg_cv_.SignalAll(); } if (s.ok()) { diff --git a/db/db_impl.h b/db/db_impl.h index 364c22f9c..f7af551e4 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -963,6 +963,8 @@ class DBImpl : public DB { // * whenever num_running_ingest_file_ goes to 0. // * whenever pending_purge_obsolete_files_ goes to 0. // * whenever disable_delete_obsolete_files_ goes to 0. + // * whenever SetOptions successfully updates options. + // * whenever a column family is dropped. InstrumentedCondVar bg_cv_; // Writes are protected by locking both mutex_ and log_write_mutex_, and reads // must be under either mutex_ or log_write_mutex_. Since after ::Open, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index b9e1b9b56..6bf5beddc 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -290,10 +290,65 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options, auto cfd = cfh->cfd(); bool exclusive = options.exclusive_manual_compaction; - Status s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction); - if (!s.ok()) { - LogFlush(immutable_db_options_.info_log); - return s; + bool flush_needed = true; + if (!options.allow_write_stall) { + InstrumentedMutexLock l(&mutex_); + uint64_t orig_active_memtable_id = cfd->mem()->GetID(); + WriteStallCondition write_stall_condition = WriteStallCondition::kNormal; + do { + if (write_stall_condition != WriteStallCondition::kNormal) { + TEST_SYNC_POINT("DBImpl::CompactRange:StallWait"); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] CompactRange waiting on stall conditions to clear", + cfd->GetName().c_str()); + bg_cv_.Wait(); + } + if (cfd->IsDropped() || shutting_down_.load(std::memory_order_acquire)) { + return Status::ShutdownInProgress(); + } + + uint64_t earliest_memtable_id = + std::min(cfd->mem()->GetID(), cfd->imm()->GetEarliestMemTableID()); + if (earliest_memtable_id > orig_active_memtable_id) { + // We waited so long that the memtable we were originally waiting on was + // flushed. + flush_needed = false; + break; + } + + const auto& mutable_cf_options = *cfd->GetLatestMutableCFOptions(); + const auto* vstorage = cfd->current()->storage_info(); + + // Skip stalling check if we're below auto-flush and auto-compaction + // triggers. If it stalled in these conditions, that'd mean the stall + // triggers are so low that stalling is needed for any background work. In + // that case we shouldn't wait since background work won't be scheduled. + if (cfd->imm()->NumNotFlushed() < + cfd->ioptions()->min_write_buffer_number_to_merge && + vstorage->l0_delay_trigger_count() < + mutable_cf_options.level0_file_num_compaction_trigger) { + break; + } + + // check whether one extra immutable memtable or an extra L0 file would + // cause write stalling mode to be entered. It could still enter stall + // mode due to pending compaction bytes, but that's less common + write_stall_condition = + ColumnFamilyData::GetWriteStallConditionAndCause( + cfd->imm()->NumNotFlushed() + 1, + vstorage->l0_delay_trigger_count() + 1, + vstorage->estimated_compaction_needed_bytes(), mutable_cf_options) + .first; + } while (write_stall_condition != WriteStallCondition::kNormal); + } + TEST_SYNC_POINT("DBImpl::CompactRange:StallWaitDone"); + Status s; + if (flush_needed) { + s = FlushMemTable(cfd, FlushOptions(), FlushReason::kManualCompaction); + if (!s.ok()) { + LogFlush(immutable_db_options_.info_log); + return s; + } } int max_level_with_files = 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f027ed85b..24f5f8fd0 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1205,6 +1205,9 @@ struct CompactRangeOptions { // if there is a compaction filter BottommostLevelCompaction bottommost_level_compaction = BottommostLevelCompaction::kIfHaveCompactionFilter; + // If true, will execute immediately even if doing so would cause the DB to + // enter write stall mode. Otherwise, it'll sleep until load is low enough. + bool allow_write_stall = false; }; // IngestExternalFileOptions is used by IngestExternalFile()