From c5cd31c12bb02fa0db0a01caabede0dd9a69e7d8 Mon Sep 17 00:00:00 2001 From: Hui Xiao Date: Thu, 10 Feb 2022 10:17:53 -0800 Subject: [PATCH] Fix TSAN data race in EventListenerTest.MultiCF (#9528) Summary: **Context:** `EventListenerTest.MultiCF` occasionally failed on TSAN data race as below: ``` WARNING: ThreadSanitizer: data race (pid=2047633) Read of size 8 at 0x7b6000001440 by main thread: #0 std::vector >::size() const /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/stl_vector.h:916:40 (listener_test+0x52337c) https://github.com/facebook/rocksdb/issues/1 rocksdb::EventListenerTest_MultiCF_Test::TestBody() /home/circleci/project/db/listener_test.cc:384:7 (listener_test+0x52337c) Previous write of size 8 at 0x7b6000001440 by thread T2: #0 void std::vector >::_M_realloc_insert(__gnu_cxx::__normal_iterator > >, rocksdb::DB* const&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/vector.tcc:503:31 (listener_test+0x550654) https://github.com/facebook/rocksdb/issues/1 std::vector >::push_back(rocksdb::DB* const&) /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/stl_vector.h:1195:4 (listener_test+0x550654) https://github.com/facebook/rocksdb/issues/2 rocksdb::TestFlushListener::OnFlushCompleted(rocksdb::DB*, rocksdb::FlushJobInfo const&) /home/circleci/project/db/listener_test.cc:255:18 (listener_test+0x550654) ``` After investigation, it is due to the following: (1) `ASSERT_OK(Flush(i));` before the read `std::vector::size()` is supposed to be [blocked on `DB::Impl::bg_cv_` for memtable flush to finish](https://github.com/facebook/rocksdb/blob/320d9a8e8a1b6998f92934f87fc71ad8bd6d4596/db/db_impl/db_impl_compaction_flush.cc#L2319) and get signaled [at the end of background flush ](https://github.com/facebook/rocksdb/blob/320d9a8e8a1b6998f92934f87fc71ad8bd6d4596/db/db_impl/db_impl_compaction_flush.cc#L2830), which happens after the write `std::vector::push_back()` . So the sequence of execution should have been synchronized as `call flush() -> write -> return from flush() -> read` and would not cause any TSAN data race. - The subsequent `ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());` serves a similar purpose based on [the previous attempt to deflake the test.](https://github.com/facebook/rocksdb/pull/9084) (2) However, there are multiple places in the code can signal this `DB::Impl::bg_cv_` and mistakenly wake up `ASSERT_OK(Flush(i));` (or `ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());`) too early (and with the lock available to them), resulting in non-synchronized read and write thus a TSAN data race. - Reproduced by the following, suggested by ajkr: ``` diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4ff87c1e4..52492e9cf 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -22,7 +22,7 @@ #include "test_util/sync_point.h" #include "util/cast_util.h" #include "util/concurrent_task_limiter_impl.h" namespace ROCKSDB_NAMESPACE { bool DBImpl::EnoughRoomForCompaction( @@ -855,6 +855,7 @@ void DBImpl::NotifyOnFlushCompleted( mutable_cf_options.level0_stop_writes_trigger); // release lock while notifying events mutex_.Unlock(); + bg_cv_.SignalAll(); ``` **Summary:** - Added synchornization between read and write by ` ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency()` mechanism Pull Request resolved: https://github.com/facebook/rocksdb/pull/9528 Test Plan: `./listener_test --gtest_filter=EventListenerTest.MultiCF --gtest_repeat=10` - pre-fix: ``` Repeating all tests (iteration 3) Note: Google Test filter = EventListenerTest.MultiCF [==========] Running 1 test from 1 test case. [----------] Global test environment set-up. [----------] 1 test from EventListenerTest [ RUN ] EventListenerTest.MultiCF ================== WARNING: ThreadSanitizer: data race (pid=3377137) Read of size 8 at 0x7b6000000840 by main thread: #0 std::vector >::size() https://github.com/facebook/rocksdb/issues/1 rocksdb::EventListenerTest_MultiCF_Test::TestBody() db/listener_test.cc:384 (listener_test+0x4bb300) Previous write of size 8 at 0x7b6000000840 by thread T2: #0 void std::vector >::_M_realloc_insert(__gnu_cxx::__normal_iterator > >, rocksdb::DB* const&) https://github.com/facebook/rocksdb/issues/1 std::vector >::push_back(rocksdb::DB* const&) https://github.com/facebook/rocksdb/issues/2 rocksdb::TestFlushListener::OnFlushCompleted(rocksdb::DB*, rocksdb::FlushJobInfo const&) db/listener_test.cc:255 (listener_test+0x4e820f) ``` - post-fix: `All passed` Reviewed By: ajkr Differential Revision: D34085791 Pulled By: hx235 fbshipit-source-id: f877aa687ea1d5cb6f31ef8c4772625d22868e8b --- db/db_impl/db_impl_compaction_flush.cc | 2 ++ db/listener_test.cc | 21 +++++++++++++++------ 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 5550d935b..e249b82d9 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -868,6 +868,8 @@ void DBImpl::NotifyOnFlushCompleted( for (auto listener : immutable_db_options_.listeners) { listener->OnFlushCompleted(this, *info); } + TEST_SYNC_POINT( + "DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted"); } flush_jobs_info->clear(); } diff --git a/db/listener_test.cc b/db/listener_test.cc index 1bad5c19f..bef81f050 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -378,18 +378,27 @@ TEST_F(EventListenerTest, MultiCF) { ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n'))); ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a'))); ASSERT_OK(Put(7, "popovich", std::string(90000, 'p'))); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 1; i < 8; ++i) { + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::NotifyOnFlushCompleted::PostAllOnFlushCompleted", + "EventListenerTest.MultiCF:PreVerifyListener"}}); ASSERT_OK(Flush(i)); - ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + TEST_SYNC_POINT("EventListenerTest.MultiCF:PreVerifyListener"); ASSERT_EQ(listener->flushed_dbs_.size(), i); ASSERT_EQ(listener->flushed_column_family_names_.size(), i); + // make sure callback functions are called in the right order + if (i == 7) { + for (size_t j = 0; j < cf_names.size(); j++) { + ASSERT_EQ(listener->flushed_dbs_[j], db_); + ASSERT_EQ(listener->flushed_column_family_names_[j], cf_names[j]); + } + } } - // make sure callback functions are called in the right order - for (size_t i = 0; i < cf_names.size(); i++) { - ASSERT_EQ(listener->flushed_dbs_[i], db_); - ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]); - } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); Close(); } }