diff --git a/HISTORY.md b/HISTORY.md index 2480ad92f..41f1e8bd1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ ### New Features * Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called. +* Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()` ## 8.1.0 (03/18/2023) ### Behavior changes diff --git a/db/db_write_buffer_manager_test.cc b/db/db_write_buffer_manager_test.cc index 294244547..82704e194 100644 --- a/db/db_write_buffer_manager_test.cc +++ b/db/db_write_buffer_manager_test.cc @@ -846,6 +846,73 @@ TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) { delete shared_wbm_db; } +TEST_F(DBWriteBufferManagerTest, RuntimeChangeableAllowStall) { + constexpr int kBigValue = 10000; + + Options options = CurrentOptions(); + options.write_buffer_manager.reset( + new WriteBufferManager(1, nullptr /* cache */, true /* allow_stall */)); + DestroyAndReopen(options); + + // Pause flush thread so that + // (a) the only way to exist write stall below is to change the `allow_stall` + // (b) the write stall is "stable" without being interfered by flushes so that + // we can check it without flakiness + std::unique_ptr sleeping_task( + new test::SleepingBackgroundTask()); + env_->SetBackgroundThreads(1, Env::HIGH); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, + sleeping_task.get(), Env::Priority::HIGH); + sleeping_task->WaitUntilSleeping(); + + // Test 1: test setting `allow_stall` from true to false + // + // Assert existence of a write stall + WriteOptions wo_no_slowdown; + wo_no_slowdown.no_slowdown = true; + Status s = Put(Key(0), DummyString(kBigValue), wo_no_slowdown); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.ToString().find("Write stall") != std::string::npos); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"WBMStallInterface::BlockDB", + "DBWriteBufferManagerTest::RuntimeChangeableThreadSafeParameters::" + "ChangeParameter"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + // Test `SetAllowStall()` + port::Thread thread1([&] { ASSERT_OK(Put(Key(0), DummyString(kBigValue))); }); + port::Thread thread2([&] { + TEST_SYNC_POINT( + "DBWriteBufferManagerTest::RuntimeChangeableThreadSafeParameters::" + "ChangeParameter"); + options.write_buffer_manager->SetAllowStall(false); + }); + + // Verify `allow_stall` is successfully set to false in thread2. + // Othwerwise, thread1's write will be stalled and this test will hang + // forever. + thread1.join(); + thread2.join(); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); + + // Test 2: test setting `allow_stall` from false to true + // + // Assert no write stall + ASSERT_OK(Put(Key(0), DummyString(kBigValue), wo_no_slowdown)); + + // Test `SetAllowStall()` + options.write_buffer_manager->SetAllowStall(true); + + // Verify `allow_stall` is successfully set to true. + // Otherwise the following write will not be stalled and therefore succeed. + s = Put(Key(0), DummyString(kBigValue), wo_no_slowdown); + ASSERT_TRUE(s.IsIncomplete()); + ASSERT_TRUE(s.ToString().find("Write stall") != std::string::npos); + sleeping_task->WakeUp(); +} INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest, testing::Bool()); diff --git a/include/rocksdb/write_buffer_manager.h b/include/rocksdb/write_buffer_manager.h index 7fb18196d..61e75c888 100644 --- a/include/rocksdb/write_buffer_manager.h +++ b/include/rocksdb/write_buffer_manager.h @@ -81,13 +81,20 @@ class WriteBufferManager final { return buffer_size_.load(std::memory_order_relaxed); } + // REQUIRED: `new_size` > 0 void SetBufferSize(size_t new_size) { + assert(new_size > 0); buffer_size_.store(new_size, std::memory_order_relaxed); mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed); // Check if stall is active and can be ended. MaybeEndWriteStall(); } + void SetAllowStall(bool new_allow_stall) { + allow_stall_.store(new_allow_stall, std::memory_order_relaxed); + MaybeEndWriteStall(); + } + // Below functions should be called by RocksDB internally. // Should only be called from write thread @@ -117,7 +124,7 @@ class WriteBufferManager final { // // Should only be called by RocksDB internally . bool ShouldStall() const { - if (!allow_stall_ || !enabled()) { + if (!allow_stall_.load(std::memory_order_relaxed) || !enabled()) { return false; } @@ -165,7 +172,7 @@ class WriteBufferManager final { std::list queue_; // Protects the queue_ and stall_active_. std::mutex mu_; - bool allow_stall_; + std::atomic allow_stall_; // Value should only be changed by BeginWriteStall() and MaybeEndWriteStall() // while holding mu_, but it can be read without a lock. std::atomic stall_active_; diff --git a/memtable/write_buffer_manager.cc b/memtable/write_buffer_manager.cc index d2cfd3487..ce1789c20 100644 --- a/memtable/write_buffer_manager.cc +++ b/memtable/write_buffer_manager.cc @@ -117,7 +117,6 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) { void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) { assert(wbm_stall != nullptr); - assert(allow_stall_); // Allocate outside of the lock. std::list new_node = {wbm_stall}; @@ -140,16 +139,12 @@ void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) { // Called when memory is freed in FreeMem or the buffer size has changed. void WriteBufferManager::MaybeEndWriteStall() { - // Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock - // the writers. - if (!allow_stall_) { + // Stall conditions have not been resolved. + if (allow_stall_.load(std::memory_order_relaxed) && + IsStallThresholdExceeded()) { return; } - if (IsStallThresholdExceeded()) { - return; // Stall conditions have not resolved. - } - // Perform all deallocations outside of the lock. std::list cleanup; @@ -174,7 +169,7 @@ void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) { // Deallocate the removed nodes outside of the lock. std::list cleanup; - if (enabled() && allow_stall_) { + if (enabled() && allow_stall_.load(std::memory_order_relaxed)) { std::unique_lock lock(mu_); for (auto it = queue_.begin(); it != queue_.end();) { auto next = std::next(it);