diff --git a/db/column_family.cc b/db/column_family.cc index 482b7111e..1ab8a7653 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -558,8 +558,9 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "(waiting for flush), max_write_buffer_number is set to %d", name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number); - } else if (vstorage->l0_delay_trigger_count() >= - mutable_cf_options.level0_stop_writes_trigger) { + } else if (!mutable_cf_options.disable_auto_compactions && + vstorage->l0_delay_trigger_count() >= + mutable_cf_options.level0_stop_writes_trigger) { write_controller_token_ = write_controller->GetStopToken(); internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1); if (compaction_picker_->IsLevel0CompactionInProgress()) { @@ -569,7 +570,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions( Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, "[%s] Stopping writes because we have %d level-0 files", name_.c_str(), vstorage->l0_delay_trigger_count()); - } else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && compaction_needed_bytes >= mutable_cf_options.hard_pending_compaction_bytes_limit) { write_controller_token_ = write_controller->GetStopToken(); @@ -594,7 +596,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions( name_.c_str(), imm()->NumNotFlushed(), mutable_cf_options.max_write_buffer_number, write_controller->delayed_write_rate()); - } else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.level0_slowdown_writes_trigger >= 0 && vstorage->l0_delay_trigger_count() >= mutable_cf_options.level0_slowdown_writes_trigger) { write_controller_token_ = @@ -611,7 +614,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions( "rate %" PRIu64, name_.c_str(), vstorage->l0_delay_trigger_count(), write_controller->delayed_write_rate()); - } else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && vstorage->estimated_compaction_needed_bytes() >= mutable_cf_options.soft_pending_compaction_bytes_limit) { write_controller_token_ = diff --git a/db/column_family_test.cc b/db/column_family_test.cc index aa00d59e5..18150970c 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -2446,6 +2446,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { mutable_cf_options.level0_stop_writes_trigger = 10000; mutable_cf_options.soft_pending_compaction_bytes_limit = 200; mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; + mutable_cf_options.disable_auto_compactions = false; vstorage->TEST_set_estimated_compaction_needed_bytes(50); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2592,16 +2593,17 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { vstorage->set_l0_delay_trigger_count(50); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); vstorage->set_l0_delay_trigger_count(60); vstorage->TEST_set_estimated_compaction_needed_bytes(300); cfd->RecalculateWriteStallConditions(mutable_cf_options); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); - ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); + mutable_cf_options.disable_auto_compactions = false; vstorage->set_l0_delay_trigger_count(70); vstorage->TEST_set_estimated_compaction_needed_bytes(500); cfd->RecalculateWriteStallConditions(mutable_cf_options); @@ -2609,7 +2611,6 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) { ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); - mutable_cf_options.disable_auto_compactions = false; vstorage->set_l0_delay_trigger_count(71); vstorage->TEST_set_estimated_compaction_needed_bytes(501); cfd->RecalculateWriteStallConditions(mutable_cf_options); diff --git a/db/db_impl.cc b/db/db_impl.cc index 30bd55368..e0f25bc75 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2358,20 +2358,6 @@ void DBImpl::NotifyOnCompactionCompleted( #endif // ROCKSDB_LITE } -bool DBImpl::NeedFlushOrCompaction(const MutableCFOptions& base_options, - const MutableCFOptions& new_options) { - return (base_options.disable_auto_compactions && - !new_options.disable_auto_compactions) || - base_options.level0_slowdown_writes_trigger < - new_options.level0_slowdown_writes_trigger || - base_options.level0_stop_writes_trigger < - new_options.level0_stop_writes_trigger || - base_options.soft_pending_compaction_bytes_limit < - new_options.soft_pending_compaction_bytes_limit || - base_options.hard_pending_compaction_bytes_limit < - new_options.hard_pending_compaction_bytes_limit; -} - Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& options_map) { #ifdef ROCKSDB_LITE @@ -2385,7 +2371,6 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, return Status::InvalidArgument("empty input"); } - MutableCFOptions prev_options = *cfd->GetLatestMutableCFOptions(); MutableCFOptions new_options; Status s; Status persist_options_status; @@ -2394,14 +2379,12 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, s = cfd->SetOptions(options_map); if (s.ok()) { new_options = *cfd->GetLatestMutableCFOptions(); - if (NeedFlushOrCompaction(prev_options, new_options)) { - // Trigger possible flush/compactions. This has to be before we persist - // options to file, otherwise there will be a deadlock with writer - // thread. - auto* old_sv = - InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); - delete old_sv; - } + // Trigger possible flush/compactions. This has to be before we persist + // options to file, otherwise there will be a deadlock with writer + // thread. + auto* old_sv = + InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); + delete old_sv; // Persist RocksDB options under the single write thread WriteThread::Writer w; @@ -3343,7 +3326,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, // NOTE: try to avoid unnecessary copy of MutableCFOptions if // compaction is not necessary. Need to make sure mutex is held // until we make a copy in the following code + TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction"); c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); + TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction"); if (c != nullptr) { // update statistics MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, diff --git a/db/db_impl.h b/db/db_impl.h index ab4cbc0c0..5a1a80f63 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -672,11 +672,6 @@ class DBImpl : public DB { Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); - // Compare options before and after to see whether flush or compaction is - // needed immediately after dynamic option change. - bool NeedFlushOrCompaction(const MutableCFOptions& base_options, - const MutableCFOptions& new_options); - void PrintStatistics(); // dump rocksdb.stats to LOG diff --git a/db/db_options_test.cc b/db/db_options_test.cc index 9df6b4e67..0d484d79f 100644 --- a/db/db_options_test.cc +++ b/db/db_options_test.cc @@ -6,6 +6,9 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include + #include "db/db_test_util.h" #include "port/stack_trace.h" #include "util/sync_point.h" @@ -20,80 +23,101 @@ class DBOptionsTest : public DBTestBase { // RocksDB lite don't support dynamic options. #ifndef ROCKSDB_LITE -// When write stalls, user can enable auto compaction to unblock writes. -// However, we had an issue where the stalled write thread blocks the attempt -// to persist auto compaction option, thus creating a deadlock. The test -// verifies the issue is fixed. -TEST_F(DBOptionsTest, EnableAutoCompactionToUnblockWrites) { - Options options; - options.disable_auto_compactions = true; - options.write_buffer_size = 1000 * 1000; // 1M - options.level0_file_num_compaction_trigger = 1; - options.level0_slowdown_writes_trigger = 1; - options.level0_stop_writes_trigger = 1; - options.compression = kNoCompression; +TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) { + const std::string kValue(1024, 'v'); + for (int method_type = 0; method_type < 2; method_type++) { + for (int option_type = 0; option_type < 4; option_type++) { + Options options; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.write_buffer_size = 1024 * 1024; + options.compression = CompressionType::kNoCompression; + options.level0_file_num_compaction_trigger = 1; + options.level0_stop_writes_trigger = std::numeric_limits::max(); + options.level0_slowdown_writes_trigger = std::numeric_limits::max(); + options.hard_pending_compaction_bytes_limit = + std::numeric_limits::max(); + options.soft_pending_compaction_bytes_limit = + std::numeric_limits::max(); - SyncPoint::GetInstance()->LoadDependency( - {{"DBImpl::DelayWrite:Wait", - "DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}, - {"DBImpl::BackgroundCompaction:Finish", - "DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}}); - SyncPoint::GetInstance()->EnableProcessing(); + DestroyAndReopen(options); + for (int i = 0; i < 1024 * 2; i++) { + Put(Key(i), kValue); + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(2, NumTableFilesAtLevel(0)); + uint64_t l0_size = SizeAtLevel(0); - // Stall writes. - Reopen(options); - env_->StartThread( - [](void* arg) { - std::string value(1000, 'v'); - auto* t = static_cast(arg); - for (int i = 0; i < 2000; i++) { - ASSERT_OK(t->Put(t->Key(i), value)); - } - }, - this); - TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"); - ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); - ColumnFamilyHandle* handle = dbfull()->DefaultColumnFamily(); - // We will get a deadlock here if we hit the issue. - ASSERT_OK(dbfull()->EnableAutoCompaction({handle})); - env_->WaitForJoin(); -} + switch (option_type) { + case 0: + // test with level0_stop_writes_trigger + options.level0_stop_writes_trigger = 2; + options.level0_slowdown_writes_trigger = 2; + break; + case 1: + options.level0_slowdown_writes_trigger = 2; + break; + case 2: + options.hard_pending_compaction_bytes_limit = l0_size; + options.soft_pending_compaction_bytes_limit = l0_size; + break; + case 3: + options.soft_pending_compaction_bytes_limit = l0_size; + break; + } + Reopen(options); + dbfull()->TEST_WaitForCompact(); + ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay()); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBOptionsTest::EnableAutoCompactionAndTriggerStall:1", + "BackgroundCallCompaction:0"}, + {"DBImpl::BackgroundCompaction():BeforePickCompaction", + "DBOptionsTest::EnableAutoCompactionAndTriggerStall:2"}, + {"DBOptionsTest::EnableAutoCompactionAndTriggerStall:3", + "DBImpl::BackgroundCompaction():AfterPickCompaction"}}); + // Block background compaction. + SyncPoint::GetInstance()->EnableProcessing(); -// Similar to EnableAutoCompactionAfterStallDeadlock. See comments there. -TEST_F(DBOptionsTest, ToggleStopTriggerToUnblockWrites) { - Options options; - options.disable_auto_compactions = true; - options.write_buffer_size = 1000 * 1000; // 1M - options.level0_file_num_compaction_trigger = 1; - options.level0_slowdown_writes_trigger = 1; - options.level0_stop_writes_trigger = 1; - options.compression = kNoCompression; + switch (method_type) { + case 0: + ASSERT_OK( + dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); + break; + case 1: + ASSERT_OK(dbfull()->EnableAutoCompaction( + {dbfull()->DefaultColumnFamily()})); + break; + } + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:1"); + // Wait for stall condition recalculate. + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:2"); - SyncPoint::GetInstance()->LoadDependency( - {{"DBImpl::DelayWrite:Wait", - "DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}, - {"DBImpl::BackgroundCompaction:Finish", - "DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}}); - SyncPoint::GetInstance()->EnableProcessing(); + switch (option_type) { + case 0: + ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); + break; + case 1: + ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + break; + case 2: + ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); + break; + case 3: + ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); + break; + } + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:3"); - // Stall writes. - Reopen(options); - env_->StartThread( - [](void* arg) { - std::string value(1000, 'v'); - auto* t = static_cast(arg); - for (int i = 0; i < 2000; i++) { - ASSERT_OK(t->Put(t->Key(i), value)); - } - }, - this); - TEST_SYNC_POINT("DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"); - ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); - // We will get a deadlock here if we hit the issue. - ASSERT_OK( - dbfull()->SetOptions({{"level0_stop_writes_trigger", "1000000"}, - {"level0_slowdown_writes_trigger", "1000000"}})); - env_->WaitForJoin(); + // Background compaction executed. + dbfull()->TEST_WaitForCompact(); + ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped()); + ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay()); + } + } } #endif // ROCKSDB_LITE