Ignore write stall triggers when auto-compaction is disabled

Summary:
My understanding is that the purpose of write stall triggers are to wait for auto-compaction to catch up. Without auto-compaction, we don't need to stall writes.

Also with this diff, flush/compaction conditions are recalculated on dynamic option change. Previously the conditions are recalculate only when write stall options are changed.

Test Plan: See the new test. Removed two tests that are no longer valid.

Reviewers: IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D61437
main
Yi Wu 9 years ago
parent e4609a749b
commit ee027fc19f
  1. 14
      db/column_family.cc
  2. 7
      db/column_family_test.cc
  3. 31
      db/db_impl.cc
  4. 5
      db/db_impl.h
  5. 162
      db/db_options_test.cc

@ -558,8 +558,9 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"(waiting for flush), max_write_buffer_number is set to %d", "(waiting for flush), max_write_buffer_number is set to %d",
name_.c_str(), imm()->NumNotFlushed(), name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number); mutable_cf_options.max_write_buffer_number);
} else if (vstorage->l0_delay_trigger_count() >= } else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.level0_stop_writes_trigger) { vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_stop_writes_trigger) {
write_controller_token_ = write_controller->GetStopToken(); write_controller_token_ = write_controller->GetStopToken();
internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1); internal_stats_->AddCFStats(InternalStats::LEVEL0_NUM_FILES_TOTAL, 1);
if (compaction_picker_->IsLevel0CompactionInProgress()) { if (compaction_picker_->IsLevel0CompactionInProgress()) {
@ -569,7 +570,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log, Log(InfoLogLevel::WARN_LEVEL, ioptions_.info_log,
"[%s] Stopping writes because we have %d level-0 files", "[%s] Stopping writes because we have %d level-0 files",
name_.c_str(), vstorage->l0_delay_trigger_count()); name_.c_str(), vstorage->l0_delay_trigger_count());
} else if (mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && } else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.hard_pending_compaction_bytes_limit > 0 &&
compaction_needed_bytes >= compaction_needed_bytes >=
mutable_cf_options.hard_pending_compaction_bytes_limit) { mutable_cf_options.hard_pending_compaction_bytes_limit) {
write_controller_token_ = write_controller->GetStopToken(); write_controller_token_ = write_controller->GetStopToken();
@ -594,7 +596,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
name_.c_str(), imm()->NumNotFlushed(), name_.c_str(), imm()->NumNotFlushed(),
mutable_cf_options.max_write_buffer_number, mutable_cf_options.max_write_buffer_number,
write_controller->delayed_write_rate()); write_controller->delayed_write_rate());
} else if (mutable_cf_options.level0_slowdown_writes_trigger >= 0 && } else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.level0_slowdown_writes_trigger >= 0 &&
vstorage->l0_delay_trigger_count() >= vstorage->l0_delay_trigger_count() >=
mutable_cf_options.level0_slowdown_writes_trigger) { mutable_cf_options.level0_slowdown_writes_trigger) {
write_controller_token_ = write_controller_token_ =
@ -611,7 +614,8 @@ void ColumnFamilyData::RecalculateWriteStallConditions(
"rate %" PRIu64, "rate %" PRIu64,
name_.c_str(), vstorage->l0_delay_trigger_count(), name_.c_str(), vstorage->l0_delay_trigger_count(),
write_controller->delayed_write_rate()); write_controller->delayed_write_rate());
} else if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && } else if (!mutable_cf_options.disable_auto_compactions &&
mutable_cf_options.soft_pending_compaction_bytes_limit > 0 &&
vstorage->estimated_compaction_needed_bytes() >= vstorage->estimated_compaction_needed_bytes() >=
mutable_cf_options.soft_pending_compaction_bytes_limit) { mutable_cf_options.soft_pending_compaction_bytes_limit) {
write_controller_token_ = write_controller_token_ =

@ -2446,6 +2446,7 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
mutable_cf_options.level0_stop_writes_trigger = 10000; mutable_cf_options.level0_stop_writes_trigger = 10000;
mutable_cf_options.soft_pending_compaction_bytes_limit = 200; mutable_cf_options.soft_pending_compaction_bytes_limit = 200;
mutable_cf_options.hard_pending_compaction_bytes_limit = 2000; mutable_cf_options.hard_pending_compaction_bytes_limit = 2000;
mutable_cf_options.disable_auto_compactions = false;
vstorage->TEST_set_estimated_compaction_needed_bytes(50); vstorage->TEST_set_estimated_compaction_needed_bytes(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2592,16 +2593,17 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
vstorage->set_l0_delay_trigger_count(50); vstorage->set_l0_delay_trigger_count(50);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
vstorage->set_l0_delay_trigger_count(60); vstorage->set_l0_delay_trigger_count(60);
vstorage->TEST_set_estimated_compaction_needed_bytes(300); vstorage->TEST_set_estimated_compaction_needed_bytes(300);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped()); ASSERT_TRUE(!dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
mutable_cf_options.disable_auto_compactions = false;
vstorage->set_l0_delay_trigger_count(70); vstorage->set_l0_delay_trigger_count(70);
vstorage->TEST_set_estimated_compaction_needed_bytes(500); vstorage->TEST_set_estimated_compaction_needed_bytes(500);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);
@ -2609,7 +2611,6 @@ TEST_F(ColumnFamilyTest, WriteStallSingleColumnFamily) {
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay()); ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate()); ASSERT_EQ(kBaseRate, dbfull()->TEST_write_controler().delayed_write_rate());
mutable_cf_options.disable_auto_compactions = false;
vstorage->set_l0_delay_trigger_count(71); vstorage->set_l0_delay_trigger_count(71);
vstorage->TEST_set_estimated_compaction_needed_bytes(501); vstorage->TEST_set_estimated_compaction_needed_bytes(501);
cfd->RecalculateWriteStallConditions(mutable_cf_options); cfd->RecalculateWriteStallConditions(mutable_cf_options);

@ -2358,20 +2358,6 @@ void DBImpl::NotifyOnCompactionCompleted(
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} }
bool DBImpl::NeedFlushOrCompaction(const MutableCFOptions& base_options,
const MutableCFOptions& new_options) {
return (base_options.disable_auto_compactions &&
!new_options.disable_auto_compactions) ||
base_options.level0_slowdown_writes_trigger <
new_options.level0_slowdown_writes_trigger ||
base_options.level0_stop_writes_trigger <
new_options.level0_stop_writes_trigger ||
base_options.soft_pending_compaction_bytes_limit <
new_options.soft_pending_compaction_bytes_limit ||
base_options.hard_pending_compaction_bytes_limit <
new_options.hard_pending_compaction_bytes_limit;
}
Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
const std::unordered_map<std::string, std::string>& options_map) { const std::unordered_map<std::string, std::string>& options_map) {
#ifdef ROCKSDB_LITE #ifdef ROCKSDB_LITE
@ -2385,7 +2371,6 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
return Status::InvalidArgument("empty input"); return Status::InvalidArgument("empty input");
} }
MutableCFOptions prev_options = *cfd->GetLatestMutableCFOptions();
MutableCFOptions new_options; MutableCFOptions new_options;
Status s; Status s;
Status persist_options_status; Status persist_options_status;
@ -2394,14 +2379,12 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family,
s = cfd->SetOptions(options_map); s = cfd->SetOptions(options_map);
if (s.ok()) { if (s.ok()) {
new_options = *cfd->GetLatestMutableCFOptions(); new_options = *cfd->GetLatestMutableCFOptions();
if (NeedFlushOrCompaction(prev_options, new_options)) { // Trigger possible flush/compactions. This has to be before we persist
// Trigger possible flush/compactions. This has to be before we persist // options to file, otherwise there will be a deadlock with writer
// options to file, otherwise there will be a deadlock with writer // thread.
// thread. auto* old_sv =
auto* old_sv = InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options);
InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); delete old_sv;
delete old_sv;
}
// Persist RocksDB options under the single write thread // Persist RocksDB options under the single write thread
WriteThread::Writer w; WriteThread::Writer w;
@ -3343,7 +3326,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
// NOTE: try to avoid unnecessary copy of MutableCFOptions if // NOTE: try to avoid unnecessary copy of MutableCFOptions if
// compaction is not necessary. Need to make sure mutex is held // compaction is not necessary. Need to make sure mutex is held
// until we make a copy in the following code // until we make a copy in the following code
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():BeforePickCompaction");
c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer)); c.reset(cfd->PickCompaction(*mutable_cf_options, log_buffer));
TEST_SYNC_POINT("DBImpl::BackgroundCompaction():AfterPickCompaction");
if (c != nullptr) { if (c != nullptr) {
// update statistics // update statistics
MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION, MeasureTime(stats_, NUM_FILES_IN_SINGLE_COMPACTION,

@ -672,11 +672,6 @@ class DBImpl : public DB {
Status BackgroundFlush(bool* madeProgress, JobContext* job_context, Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer); LogBuffer* log_buffer);
// Compare options before and after to see whether flush or compaction is
// needed immediately after dynamic option change.
bool NeedFlushOrCompaction(const MutableCFOptions& base_options,
const MutableCFOptions& new_options);
void PrintStatistics(); void PrintStatistics();
// dump rocksdb.stats to LOG // dump rocksdb.stats to LOG

@ -6,6 +6,9 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <limits>
#include <string>
#include "db/db_test_util.h" #include "db/db_test_util.h"
#include "port/stack_trace.h" #include "port/stack_trace.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -20,80 +23,101 @@ class DBOptionsTest : public DBTestBase {
// RocksDB lite don't support dynamic options. // RocksDB lite don't support dynamic options.
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
// When write stalls, user can enable auto compaction to unblock writes. TEST_F(DBOptionsTest, EnableAutoCompactionAndTriggerStall) {
// However, we had an issue where the stalled write thread blocks the attempt const std::string kValue(1024, 'v');
// to persist auto compaction option, thus creating a deadlock. The test for (int method_type = 0; method_type < 2; method_type++) {
// verifies the issue is fixed. for (int option_type = 0; option_type < 4; option_type++) {
TEST_F(DBOptionsTest, EnableAutoCompactionToUnblockWrites) { Options options;
Options options; options.create_if_missing = true;
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.write_buffer_size = 1000 * 1000; // 1M options.write_buffer_size = 1024 * 1024;
options.level0_file_num_compaction_trigger = 1; options.compression = CompressionType::kNoCompression;
options.level0_slowdown_writes_trigger = 1; options.level0_file_num_compaction_trigger = 1;
options.level0_stop_writes_trigger = 1; options.level0_stop_writes_trigger = std::numeric_limits<int>::max();
options.compression = kNoCompression; options.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();
options.hard_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
options.soft_pending_compaction_bytes_limit =
std::numeric_limits<uint64_t>::max();
SyncPoint::GetInstance()->LoadDependency( DestroyAndReopen(options);
{{"DBImpl::DelayWrite:Wait", for (int i = 0; i < 1024 * 2; i++) {
"DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}, Put(Key(i), kValue);
{"DBImpl::BackgroundCompaction:Finish", }
"DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}}); dbfull()->TEST_WaitForFlushMemTable();
SyncPoint::GetInstance()->EnableProcessing(); ASSERT_EQ(2, NumTableFilesAtLevel(0));
uint64_t l0_size = SizeAtLevel(0);
// Stall writes. switch (option_type) {
Reopen(options); case 0:
env_->StartThread( // test with level0_stop_writes_trigger
[](void* arg) { options.level0_stop_writes_trigger = 2;
std::string value(1000, 'v'); options.level0_slowdown_writes_trigger = 2;
auto* t = static_cast<DBOptionsTest*>(arg); break;
for (int i = 0; i < 2000; i++) { case 1:
ASSERT_OK(t->Put(t->Key(i), value)); options.level0_slowdown_writes_trigger = 2;
} break;
}, case 2:
this); options.hard_pending_compaction_bytes_limit = l0_size;
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"); options.soft_pending_compaction_bytes_limit = l0_size;
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); break;
ColumnFamilyHandle* handle = dbfull()->DefaultColumnFamily(); case 3:
// We will get a deadlock here if we hit the issue. options.soft_pending_compaction_bytes_limit = l0_size;
ASSERT_OK(dbfull()->EnableAutoCompaction({handle})); break;
env_->WaitForJoin(); }
} Reopen(options);
dbfull()->TEST_WaitForCompact();
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
SyncPoint::GetInstance()->LoadDependency(
{{"DBOptionsTest::EnableAutoCompactionAndTriggerStall:1",
"BackgroundCallCompaction:0"},
{"DBImpl::BackgroundCompaction():BeforePickCompaction",
"DBOptionsTest::EnableAutoCompactionAndTriggerStall:2"},
{"DBOptionsTest::EnableAutoCompactionAndTriggerStall:3",
"DBImpl::BackgroundCompaction():AfterPickCompaction"}});
// Block background compaction.
SyncPoint::GetInstance()->EnableProcessing();
// Similar to EnableAutoCompactionAfterStallDeadlock. See comments there. switch (method_type) {
TEST_F(DBOptionsTest, ToggleStopTriggerToUnblockWrites) { case 0:
Options options; ASSERT_OK(
options.disable_auto_compactions = true; dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
options.write_buffer_size = 1000 * 1000; // 1M break;
options.level0_file_num_compaction_trigger = 1; case 1:
options.level0_slowdown_writes_trigger = 1; ASSERT_OK(dbfull()->EnableAutoCompaction(
options.level0_stop_writes_trigger = 1; {dbfull()->DefaultColumnFamily()}));
options.compression = kNoCompression; break;
}
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:1");
// Wait for stall condition recalculate.
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:2");
SyncPoint::GetInstance()->LoadDependency( switch (option_type) {
{{"DBImpl::DelayWrite:Wait", case 0:
"DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}, ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
{"DBImpl::BackgroundCompaction:Finish", break;
"DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}}); case 1:
SyncPoint::GetInstance()->EnableProcessing(); ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
break;
case 2:
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
break;
case 3:
ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
break;
}
TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionAndTriggerStall:3");
// Stall writes. // Background compaction executed.
Reopen(options); dbfull()->TEST_WaitForCompact();
env_->StartThread( ASSERT_FALSE(dbfull()->TEST_write_controler().IsStopped());
[](void* arg) { ASSERT_FALSE(dbfull()->TEST_write_controler().NeedsDelay());
std::string value(1000, 'v'); }
auto* t = static_cast<DBOptionsTest*>(arg); }
for (int i = 0; i < 2000; i++) {
ASSERT_OK(t->Put(t->Key(i), value));
}
},
this);
TEST_SYNC_POINT("DBOptionsTest::ToggleStopTriggerToUnblockWrites:1");
ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped());
// We will get a deadlock here if we hit the issue.
ASSERT_OK(
dbfull()->SetOptions({{"level0_stop_writes_trigger", "1000000"},
{"level0_slowdown_writes_trigger", "1000000"}}));
env_->WaitForJoin();
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

Loading…
Cancel
Save