From 6ea41f852708cf09d861894d33e1b65cd1d81c45 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 12 Jul 2016 15:30:38 -0700 Subject: [PATCH] Fix deadlock when trying update options when write stalls Summary: When write stalls because of auto compaction is disabled, or stop write trigger is reached, user may change these two options to unblock writes. Unfortunately we had issue where the write thread will block the attempt to persist the options, thus creating a deadlock. This diff fix the issue and add two test cases to detect such deadlock. Test Plan: Run unit tests. Also, revert db_impl.cc to master (but don't revert `DBImpl::BackgroundCompaction:Finish` sync point) and run db_options_test. Both tests should hit deadlock. Reviewers: sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, leveldb Differential Revision: https://reviews.facebook.net/D60627 --- CMakeLists.txt | 1 + Makefile | 3 ++ db/db_impl.cc | 35 +++++++++++---- db/db_impl.h | 5 +++ db/db_options_test.cc | 101 ++++++++++++++++++++++++++++++++++++++++++ src.mk | 1 + 6 files changed, 137 insertions(+), 9 deletions(-) create mode 100644 db/db_options_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 00b93188e..082b13c22 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -347,6 +347,7 @@ set(TESTS db/db_inplace_update_test.cc db/db_iter_test.cc db/db_log_iter_test.cc + db/db_options_test.cc db/db_properties_test.cc db/db_table_properties_test.cc db/db_tailing_iter_test.cc diff --git a/Makefile b/Makefile index 2467fd06f..541069510 100644 --- a/Makefile +++ b/Makefile @@ -926,6 +926,9 @@ db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJEC db_iterator_test: db/db_iterator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_sst_test: db/db_sst_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_impl.cc b/db/db_impl.cc index 61bdfc4da..8a6f5831e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2288,6 +2288,20 @@ void DBImpl::NotifyOnCompactionCompleted( #endif // ROCKSDB_LITE } +bool DBImpl::NeedFlushOrCompaction(const MutableCFOptions& base_options, + const MutableCFOptions& new_options) { + return (base_options.disable_auto_compactions && + !new_options.disable_auto_compactions) || + base_options.level0_slowdown_writes_trigger < + new_options.level0_slowdown_writes_trigger || + base_options.level0_stop_writes_trigger < + new_options.level0_stop_writes_trigger || + base_options.soft_pending_compaction_bytes_limit < + new_options.soft_pending_compaction_bytes_limit || + base_options.hard_pending_compaction_bytes_limit < + new_options.hard_pending_compaction_bytes_limit; +} + Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& options_map) { #ifdef ROCKSDB_LITE @@ -2301,6 +2315,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, return Status::InvalidArgument("empty input"); } + MutableCFOptions prev_options = *cfd->GetLatestMutableCFOptions(); MutableCFOptions new_options; Status s; Status persist_options_status; @@ -2309,8 +2324,15 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, s = cfd->SetOptions(options_map); if (s.ok()) { new_options = *cfd->GetLatestMutableCFOptions(); - } - if (s.ok()) { + if (NeedFlushOrCompaction(prev_options, new_options)) { + // Trigger possible flush/compactions. This has to be before we persist + // options to file, otherwise there will be a deadlock with writer + // thread. + auto* old_sv = + InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); + delete old_sv; + } + // Persist RocksDB options under the single write thread WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); @@ -2760,13 +2782,7 @@ Status DBImpl::EnableAutoCompaction( for (auto cf_ptr : column_family_handles) { Status status = this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}}); - if (status.ok()) { - ColumnFamilyData* cfd = - reinterpret_cast(cf_ptr)->cfd(); - InstrumentedMutexLock guard_lock(&mutex_); - delete this->InstallSuperVersionAndScheduleWork( - cfd, nullptr, *cfd->GetLatestMutableCFOptions()); - } else { + if (!status.ok()) { s = status; } } @@ -3468,6 +3484,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } m->in_progress = false; // not being processed anymore } + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish"); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index 34d27f5ac..add0d751d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -670,6 +670,11 @@ class DBImpl : public DB { Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); + // Compare options before and after to see whether flush or compaction is + // needed immediately after dynamic option change. + bool NeedFlushOrCompaction(const MutableCFOptions& base_options, + const MutableCFOptions& new_options); + void PrintStatistics(); // dump rocksdb.stats to LOG diff --git a/db/db_options_test.cc b/db/db_options_test.cc new file mode 100644 index 000000000..569a085cc --- /dev/null +++ b/db/db_options_test.cc @@ -0,0 +1,101 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "util/sync_point.h" + +namespace rocksdb { + +class DBOptionsTest : public DBTestBase { + public: + DBOptionsTest() : DBTestBase("/db_options_test") {} +}; + +// When write stalls, user can enable auto compaction to unblock writes. +// However, we had an issue where the stalled write thread blocks the attempt +// to persist auto compaction option, thus creating a deadlock. The test +// verifies the issue is fixed. +TEST_F(DBOptionsTest, EnableAutoCompactionToUnblockWrites) { + Options options; + options.disable_auto_compactions = true; + options.write_buffer_size = 1000 * 1000; // 1M + options.level0_file_num_compaction_trigger = 1; + options.level0_slowdown_writes_trigger = 1; + options.level0_stop_writes_trigger = 1; + options.compression = kNoCompression; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::DelayWrite:Wait", + "DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}, + {"DBImpl::BackgroundCompaction:Finish", + "DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // Stall writes. + Reopen(options); + env_->StartThread( + [](void* arg) { + std::string value(1000, 'v'); + auto* t = static_cast(arg); + for (int i = 0; i < 2000; i++) { + ASSERT_OK(t->Put(t->Key(i), value)); + } + }, + this); + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"); + ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); + ColumnFamilyHandle* handle = dbfull()->DefaultColumnFamily(); + // We will get a deadlock here if we hit the issue. + dbfull()->EnableAutoCompaction({handle}); + env_->WaitForJoin(); +} + +// Similar to EnableAutoCompactionAfterStallDeadlock. See comments there. +TEST_F(DBOptionsTest, ToggleStopTriggerToUnblockWrites) { + Options options; + options.disable_auto_compactions = true; + options.write_buffer_size = 1000 * 1000; // 1M + options.level0_file_num_compaction_trigger = 1; + options.level0_slowdown_writes_trigger = 1; + options.level0_stop_writes_trigger = 1; + options.compression = kNoCompression; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::DelayWrite:Wait", + "DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}, + {"DBImpl::BackgroundCompaction:Finish", + "DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // Stall writes. + Reopen(options); + env_->StartThread( + [](void* arg) { + std::string value(1000, 'v'); + auto* t = static_cast(arg); + for (int i = 0; i < 2000; i++) { + ASSERT_OK(t->Put(t->Key(i), value)); + } + }, + this); + TEST_SYNC_POINT("DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"); + ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); + // We will get a deadlock here if we hit the issue. + dbfull()->SetOptions({{"level0_stop_writes_trigger", "1000000"}, + {"level0_slowdown_writes_trigger", "1000000"}}); + env_->WaitForJoin(); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src.mk b/src.mk index 3488da0a6..4a320f28a 100644 --- a/src.mk +++ b/src.mk @@ -217,6 +217,7 @@ MAIN_SOURCES = \ db/db_inplace_update_test.cc \ db/db_iterator_test.cc \ db/db_log_iter_test.cc \ + db/db_options_test.cc \ db/db_sst_test.cc \ db/db_tailing_iter_test.cc \ db/db_universal_compaction_test.cc \