Add `SetAllowStall()` (#11335)

Summary:
**Context/Summary:**
- Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()`
- Misc: some clean up - see PR conversation

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11335

Test Plan: - New UT

Reviewed By: akankshamahajan15

Differential Revision: D44502555

Pulled By: hx235

fbshipit-source-id: 24b5cc57df7734b11d42e4870c06c87b95312b5e
oxigraph-8.3.2
Hui Xiao 2 years ago committed by Facebook GitHub Bot
parent 0efd7b4ba1
commit 39c29372bf
  1. 1
      HISTORY.md
  2. 67
      db/db_write_buffer_manager_test.cc
  3. 11
      include/rocksdb/write_buffer_manager.h
  4. 13
      memtable/write_buffer_manager.cc

@ -9,6 +9,7 @@
### New Features
* Add experimental `PerfContext` counters `iter_{next|prev|seek}_count` for db iterator, each counting the times of corresponding API being called.
* Allow runtime changes to whether `WriteBufferManager` allows stall or not by calling `SetAllowStall()`
## 8.1.0 (03/18/2023)
### Behavior changes

@ -846,6 +846,73 @@ TEST_P(DBWriteBufferManagerTest, StopSwitchingMemTablesOnceFlushing) {
delete shared_wbm_db;
}
TEST_F(DBWriteBufferManagerTest, RuntimeChangeableAllowStall) {
constexpr int kBigValue = 10000;
Options options = CurrentOptions();
options.write_buffer_manager.reset(
new WriteBufferManager(1, nullptr /* cache */, true /* allow_stall */));
DestroyAndReopen(options);
// Pause flush thread so that
// (a) the only way to exist write stall below is to change the `allow_stall`
// (b) the write stall is "stable" without being interfered by flushes so that
// we can check it without flakiness
std::unique_ptr<test::SleepingBackgroundTask> sleeping_task(
new test::SleepingBackgroundTask());
env_->SetBackgroundThreads(1, Env::HIGH);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task.get(), Env::Priority::HIGH);
sleeping_task->WaitUntilSleeping();
// Test 1: test setting `allow_stall` from true to false
//
// Assert existence of a write stall
WriteOptions wo_no_slowdown;
wo_no_slowdown.no_slowdown = true;
Status s = Put(Key(0), DummyString(kBigValue), wo_no_slowdown);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.ToString().find("Write stall") != std::string::npos);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"WBMStallInterface::BlockDB",
"DBWriteBufferManagerTest::RuntimeChangeableThreadSafeParameters::"
"ChangeParameter"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Test `SetAllowStall()`
port::Thread thread1([&] { ASSERT_OK(Put(Key(0), DummyString(kBigValue))); });
port::Thread thread2([&] {
TEST_SYNC_POINT(
"DBWriteBufferManagerTest::RuntimeChangeableThreadSafeParameters::"
"ChangeParameter");
options.write_buffer_manager->SetAllowStall(false);
});
// Verify `allow_stall` is successfully set to false in thread2.
// Othwerwise, thread1's write will be stalled and this test will hang
// forever.
thread1.join();
thread2.join();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
// Test 2: test setting `allow_stall` from false to true
//
// Assert no write stall
ASSERT_OK(Put(Key(0), DummyString(kBigValue), wo_no_slowdown));
// Test `SetAllowStall()`
options.write_buffer_manager->SetAllowStall(true);
// Verify `allow_stall` is successfully set to true.
// Otherwise the following write will not be stalled and therefore succeed.
s = Put(Key(0), DummyString(kBigValue), wo_no_slowdown);
ASSERT_TRUE(s.IsIncomplete());
ASSERT_TRUE(s.ToString().find("Write stall") != std::string::npos);
sleeping_task->WakeUp();
}
INSTANTIATE_TEST_CASE_P(DBWriteBufferManagerTest, DBWriteBufferManagerTest,
testing::Bool());

@ -81,13 +81,20 @@ class WriteBufferManager final {
return buffer_size_.load(std::memory_order_relaxed);
}
// REQUIRED: `new_size` > 0
void SetBufferSize(size_t new_size) {
assert(new_size > 0);
buffer_size_.store(new_size, std::memory_order_relaxed);
mutable_limit_.store(new_size * 7 / 8, std::memory_order_relaxed);
// Check if stall is active and can be ended.
MaybeEndWriteStall();
}
void SetAllowStall(bool new_allow_stall) {
allow_stall_.store(new_allow_stall, std::memory_order_relaxed);
MaybeEndWriteStall();
}
// Below functions should be called by RocksDB internally.
// Should only be called from write thread
@ -117,7 +124,7 @@ class WriteBufferManager final {
//
// Should only be called by RocksDB internally .
bool ShouldStall() const {
if (!allow_stall_ || !enabled()) {
if (!allow_stall_.load(std::memory_order_relaxed) || !enabled()) {
return false;
}
@ -165,7 +172,7 @@ class WriteBufferManager final {
std::list<StallInterface*> queue_;
// Protects the queue_ and stall_active_.
std::mutex mu_;
bool allow_stall_;
std::atomic<bool> allow_stall_;
// Value should only be changed by BeginWriteStall() and MaybeEndWriteStall()
// while holding mu_, but it can be read without a lock.
std::atomic<bool> stall_active_;

@ -117,7 +117,6 @@ void WriteBufferManager::FreeMemWithCache(size_t mem) {
void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
assert(wbm_stall != nullptr);
assert(allow_stall_);
// Allocate outside of the lock.
std::list<StallInterface*> new_node = {wbm_stall};
@ -140,16 +139,12 @@ void WriteBufferManager::BeginWriteStall(StallInterface* wbm_stall) {
// Called when memory is freed in FreeMem or the buffer size has changed.
void WriteBufferManager::MaybeEndWriteStall() {
// Cannot early-exit on !enabled() because SetBufferSize(0) needs to unblock
// the writers.
if (!allow_stall_) {
// Stall conditions have not been resolved.
if (allow_stall_.load(std::memory_order_relaxed) &&
IsStallThresholdExceeded()) {
return;
}
if (IsStallThresholdExceeded()) {
return; // Stall conditions have not resolved.
}
// Perform all deallocations outside of the lock.
std::list<StallInterface*> cleanup;
@ -174,7 +169,7 @@ void WriteBufferManager::RemoveDBFromQueue(StallInterface* wbm_stall) {
// Deallocate the removed nodes outside of the lock.
std::list<StallInterface*> cleanup;
if (enabled() && allow_stall_) {
if (enabled() && allow_stall_.load(std::memory_order_relaxed)) {
std::unique_lock<std::mutex> lock(mu_);
for (auto it = queue_.begin(); it != queue_.end();) {
auto next = std::next(it);

Loading…
Cancel
Save