From 8a85946f58bc8db59c35685672ead1b5cae15c23 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Tue, 30 Aug 2022 16:24:01 -0700 Subject: [PATCH] Add missing mutex when reading from shared variable bg_bottom_compaction_scheduled_, bg_compaction_scheduled_ (#10610) Summary: **Context/Summary:** According to https://github.com/facebook/rocksdb/blob/7.6.fb/db/compaction/compaction_job.h#L328-L332, any reading in the form of `*bg_compaction_scheduled_` , `*bg_bottom_compaction_scheduled_` should be protected by mutex, which isn't the case for some assert statement. This leads to a data race that can be repro-ed by the following command (command coming soon) ``` db=/dev/shm/rocksdb_crashtest_blackbox exp=/dev/shm/rocksdb_crashtest_expected rm -rf $db $exp mkdir -p $exp ./db_stress --clear_column_family_one_in=0 --column_families=1 --db=$db --delpercent=10 --delrangepercent=0 --destroy_db_initially=1 --expected_values_dir=$exp --iterpercent=0 --key_len_percent_dist=1,30,69 --max_key=1000000 --max_key_len=3 --prefixpercent=0 --readpercent=0 --reopen=0 --ops_per_thread=100000000 --value_size_mult=32 --writepercent=90 --compaction_pri=4 --use_txn=1 --level_compaction_dynamic_level_bytes=True --compaction_ttl=0 --compact_files_one_in=1000000 --compact_range_one_in=1000000 --value_size_mult=32 --verify_db_one_in=1000 --write_buffer_size=65536 --mark_for_compaction_one_file_in=10 --max_background_compactions=20 --max_key=25000000 --max_key_len=3 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=2097152 --target_file_size_base=2097152 --target_file_size_multiplier=2 ``` ``` WARNING: ThreadSanitizer: data race (pid=73424) Read of size 4 at 0x7b8c0000151c by thread T13: #0 ReleaseSubcompactionResources internal_repo_rocksdb/repo/db/compaction/compaction_job.cc:390 (db_stress+0x630aa3) https://github.com/facebook/rocksdb/issues/1 rocksdb::CompactionJob::Run() internal_repo_rocksdb/repo/db/compaction/compaction_job.cc:741 (db_stress+0x630aa3) https://github.com/facebook/rocksdb/issues/2 rocksdb::DBImpl::BackgroundCompaction(bool*, rocksdb::JobContext*, rocksdb::LogBuffer*, rocksdb::DBImpl::PrepickedCompaction*, rocksdb::Env::Priority) internal_repo_rocksdb/repo/db/db_impl/db_impl_compaction_flush.cc:3436 (db_stress+0x60b2cc) https://github.com/facebook/rocksdb/issues/3 rocksdb::DBImpl::BackgroundCallCompaction(rocksdb::DBImpl::PrepickedCompaction*, rocksdb::Env::Priority) internal_repo_rocksdb/repo/db/db_impl/db_impl_compaction_flush.cc:2950 (db_stress+0x606d79) https://github.com/facebook/rocksdb/issues/4 rocksdb::DBImpl::BGWorkCompaction(void*) internal_repo_rocksdb/repo/db/db_impl/db_impl_compaction_flush.cc:2693 (db_stress+0x60356a) Previous write of size 4 at 0x7b8c0000151c by thread T12 (mutexes: write M438955329917552448): #0 rocksdb::DBImpl::BackgroundCallCompaction(rocksdb::DBImpl::PrepickedCompaction*, rocksdb::Env::Priority) internal_repo_rocksdb/repo/db/db_impl/db_impl_compaction_flush.cc:3018 (db_stress+0x6072a1) https://github.com/facebook/rocksdb/issues/1 rocksdb::DBImpl::BGWorkCompaction(void*) internal_repo_rocksdb/repo/db/db_impl/db_impl_compaction_flush.cc:2693 (db_stress+0x60356a) Location is heap block of size 6720 at 0x7b8c00000000 allocated by main thread: #0 operator new(unsigned long, std::align_val_t) (db_stress+0xbab5bb) https://github.com/facebook/rocksdb/issues/1 rocksdb::DBImpl::Open(rocksdb::DBOptions const&, std::__cxx11::basic_string, std::allocator > const&, std::vector > const&, std::vector >*, rocksdb::DB**, bool, bool) internal_repo_rocksdb/repo/db/db_impl/db_impl_open.cc:1811 (db_stress+0x69769a) https://github.com/facebook/rocksdb/issues/2 rocksdb::TransactionDB::Open(rocksdb::DBOptions const&, rocksdb::TransactionDBOptions const&, std::__cxx11::basic_string, std::allocator > const&, std::vector > const&, std::vector >*, rocksdb::TransactionDB**) internal_repo_rocksdb/repo/utilities/transactions/pessimistic_transaction_db.cc:258 (db_stress+0x8ae1f4) https://github.com/facebook/rocksdb/issues/3 rocksdb::StressTest::Open(rocksdb::SharedState*) internal_repo_rocksdb/repo/db_stress_tool/db_stress_test_base.cc:2611 (db_stress+0x32b927) https://github.com/facebook/rocksdb/issues/4 rocksdb::StressTest::InitDb(rocksdb::SharedState*) internal_repo_rocksdb/repo/db_stress_tool/db_stress_test_base.cc:290 (db_stress+0x34712c) ``` This PR added all the missing mutex that should've been in place Pull Request resolved: https://github.com/facebook/rocksdb/pull/10610 Test Plan: - Past repro command - Existing CI Reviewed By: riversand963 Differential Revision: D39143016 Pulled By: hx235 fbshipit-source-id: 51dd4db55ad306f3dbda5d0dd54d6f2513cf70f2 --- db/compaction/compaction_job.cc | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index a76e91696..90f960178 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -321,6 +321,7 @@ void CompactionJob::AcquireSubcompactionResources( ->write_controller() ->NeedSpeedupCompaction()) .max_compactions; + InstrumentedMutexLock l(db_mutex_); // Apply min function first since We need to compute the extra subcompaction // against compaction limits. And then try to reserve threads for extra // subcompactions. The actual number of reserved threads could be less than @@ -329,7 +330,6 @@ void CompactionJob::AcquireSubcompactionResources( std::max(max_db_compactions - *bg_compaction_scheduled_ - *bg_bottom_compaction_scheduled_, 0); - db_mutex_->Lock(); // Reservation only supports backgrdoun threads of which the priority is // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the // origin thread_pri_ is higher than that. Similar to ReleaseThreads(). @@ -346,7 +346,6 @@ void CompactionJob::AcquireSubcompactionResources( } else { *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_; } - db_mutex_->Unlock(); } void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) { @@ -380,17 +379,20 @@ void CompactionJob::ReleaseSubcompactionResources() { if (extra_num_subcompaction_threads_reserved_ == 0) { return; } - // The number of reserved threads becomes larger than 0 only if the - // compaction prioity is round robin and there is no sufficient - // sub-compactions available - - // The scheduled compaction must be no less than 1 + extra number - // subcompactions using acquired resources since this compaction job has not - // finished yet - assert(*bg_bottom_compaction_scheduled_ >= - 1 + extra_num_subcompaction_threads_reserved_ || - *bg_compaction_scheduled_ >= - 1 + extra_num_subcompaction_threads_reserved_); + { + InstrumentedMutexLock l(db_mutex_); + // The number of reserved threads becomes larger than 0 only if the + // compaction prioity is round robin and there is no sufficient + // sub-compactions available + + // The scheduled compaction must be no less than 1 + extra number + // subcompactions using acquired resources since this compaction job has not + // finished yet + assert(*bg_bottom_compaction_scheduled_ >= + 1 + extra_num_subcompaction_threads_reserved_ || + *bg_compaction_scheduled_ >= + 1 + extra_num_subcompaction_threads_reserved_); + } ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_); }