diff --git a/CMakeLists.txt b/CMakeLists.txt index 00b93188e..082b13c22 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -347,6 +347,7 @@ set(TESTS db/db_inplace_update_test.cc db/db_iter_test.cc db/db_log_iter_test.cc + db/db_options_test.cc db/db_properties_test.cc db/db_table_properties_test.cc db/db_tailing_iter_test.cc diff --git a/Makefile b/Makefile index 2467fd06f..541069510 100644 --- a/Makefile +++ b/Makefile @@ -926,6 +926,9 @@ db_inplace_update_test: db/db_inplace_update_test.o db/db_test_util.o $(LIBOBJEC db_iterator_test: db/db_iterator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_options_test: db/db_options_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_sst_test: db/db_sst_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/db/db_impl.cc b/db/db_impl.cc index 61bdfc4da..8a6f5831e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2288,6 +2288,20 @@ void DBImpl::NotifyOnCompactionCompleted( #endif // ROCKSDB_LITE } +bool DBImpl::NeedFlushOrCompaction(const MutableCFOptions& base_options, + const MutableCFOptions& new_options) { + return (base_options.disable_auto_compactions && + !new_options.disable_auto_compactions) || + base_options.level0_slowdown_writes_trigger < + new_options.level0_slowdown_writes_trigger || + base_options.level0_stop_writes_trigger < + new_options.level0_stop_writes_trigger || + base_options.soft_pending_compaction_bytes_limit < + new_options.soft_pending_compaction_bytes_limit || + base_options.hard_pending_compaction_bytes_limit < + new_options.hard_pending_compaction_bytes_limit; +} + Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, const std::unordered_map& options_map) { #ifdef ROCKSDB_LITE @@ -2301,6 +2315,7 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, return Status::InvalidArgument("empty input"); } + MutableCFOptions prev_options = *cfd->GetLatestMutableCFOptions(); MutableCFOptions new_options; Status s; Status persist_options_status; @@ -2309,8 +2324,15 @@ Status DBImpl::SetOptions(ColumnFamilyHandle* column_family, s = cfd->SetOptions(options_map); if (s.ok()) { new_options = *cfd->GetLatestMutableCFOptions(); - } - if (s.ok()) { + if (NeedFlushOrCompaction(prev_options, new_options)) { + // Trigger possible flush/compactions. This has to be before we persist + // options to file, otherwise there will be a deadlock with writer + // thread. + auto* old_sv = + InstallSuperVersionAndScheduleWork(cfd, nullptr, new_options); + delete old_sv; + } + // Persist RocksDB options under the single write thread WriteThread::Writer w; write_thread_.EnterUnbatched(&w, &mutex_); @@ -2760,13 +2782,7 @@ Status DBImpl::EnableAutoCompaction( for (auto cf_ptr : column_family_handles) { Status status = this->SetOptions(cf_ptr, {{"disable_auto_compactions", "false"}}); - if (status.ok()) { - ColumnFamilyData* cfd = - reinterpret_cast(cf_ptr)->cfd(); - InstrumentedMutexLock guard_lock(&mutex_); - delete this->InstallSuperVersionAndScheduleWork( - cfd, nullptr, *cfd->GetLatestMutableCFOptions()); - } else { + if (!status.ok()) { s = status; } } @@ -3468,6 +3484,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } m->in_progress = false; // not being processed anymore } + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:Finish"); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index 34d27f5ac..add0d751d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -670,6 +670,11 @@ class DBImpl : public DB { Status BackgroundFlush(bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer); + // Compare options before and after to see whether flush or compaction is + // needed immediately after dynamic option change. + bool NeedFlushOrCompaction(const MutableCFOptions& base_options, + const MutableCFOptions& new_options); + void PrintStatistics(); // dump rocksdb.stats to LOG diff --git a/db/db_options_test.cc b/db/db_options_test.cc new file mode 100644 index 000000000..569a085cc --- /dev/null +++ b/db/db_options_test.cc @@ -0,0 +1,101 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "util/sync_point.h" + +namespace rocksdb { + +class DBOptionsTest : public DBTestBase { + public: + DBOptionsTest() : DBTestBase("/db_options_test") {} +}; + +// When write stalls, user can enable auto compaction to unblock writes. +// However, we had an issue where the stalled write thread blocks the attempt +// to persist auto compaction option, thus creating a deadlock. The test +// verifies the issue is fixed. +TEST_F(DBOptionsTest, EnableAutoCompactionToUnblockWrites) { + Options options; + options.disable_auto_compactions = true; + options.write_buffer_size = 1000 * 1000; // 1M + options.level0_file_num_compaction_trigger = 1; + options.level0_slowdown_writes_trigger = 1; + options.level0_stop_writes_trigger = 1; + options.compression = kNoCompression; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::DelayWrite:Wait", + "DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}, + {"DBImpl::BackgroundCompaction:Finish", + "DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // Stall writes. + Reopen(options); + env_->StartThread( + [](void* arg) { + std::string value(1000, 'v'); + auto* t = static_cast(arg); + for (int i = 0; i < 2000; i++) { + ASSERT_OK(t->Put(t->Key(i), value)); + } + }, + this); + TEST_SYNC_POINT("DBOptionsTest::EnableAutoCompactionToUnblockWrites:1"); + ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); + ColumnFamilyHandle* handle = dbfull()->DefaultColumnFamily(); + // We will get a deadlock here if we hit the issue. + dbfull()->EnableAutoCompaction({handle}); + env_->WaitForJoin(); +} + +// Similar to EnableAutoCompactionAfterStallDeadlock. See comments there. +TEST_F(DBOptionsTest, ToggleStopTriggerToUnblockWrites) { + Options options; + options.disable_auto_compactions = true; + options.write_buffer_size = 1000 * 1000; // 1M + options.level0_file_num_compaction_trigger = 1; + options.level0_slowdown_writes_trigger = 1; + options.level0_stop_writes_trigger = 1; + options.compression = kNoCompression; + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::DelayWrite:Wait", + "DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}, + {"DBImpl::BackgroundCompaction:Finish", + "DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + // Stall writes. + Reopen(options); + env_->StartThread( + [](void* arg) { + std::string value(1000, 'v'); + auto* t = static_cast(arg); + for (int i = 0; i < 2000; i++) { + ASSERT_OK(t->Put(t->Key(i), value)); + } + }, + this); + TEST_SYNC_POINT("DBOptionsTest::ToggleStopTriggerToUnblockWrites:1"); + ASSERT_TRUE(dbfull()->TEST_write_controler().IsStopped()); + // We will get a deadlock here if we hit the issue. + dbfull()->SetOptions({{"level0_stop_writes_trigger", "1000000"}, + {"level0_slowdown_writes_trigger", "1000000"}}); + env_->WaitForJoin(); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src.mk b/src.mk index 3488da0a6..4a320f28a 100644 --- a/src.mk +++ b/src.mk @@ -217,6 +217,7 @@ MAIN_SOURCES = \ db/db_inplace_update_test.cc \ db/db_iterator_test.cc \ db/db_log_iter_test.cc \ + db/db_options_test.cc \ db/db_sst_test.cc \ db/db_tailing_iter_test.cc \ db/db_universal_compaction_test.cc \