Reduce access to atomic variables in a test (#10909)

Summary:
With TSAN build on CircleCI (see mini-tsan in .circleci/config).
Sometimes `SeqAdvanceConcurrentTest.SeqAdvanceConcurrent` will get stuck when an experimental feature called
"unordered write" is enabled. Stack trace will be the following
```
Thread 7 (Thread 0x7f2284a1c700 (LWP 481523) "write_prepared_"):
#0  0x00000000004fa3f5 in __tsan_atomic64_load () at ./db/merge_context.h:15
https://github.com/facebook/rocksdb/issues/1  0x00000000005e5942 in std::__atomic_base<unsigned long>::load (this=0x7b74000012f8, __m=std::memory_order_seq_cst) at /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:481
https://github.com/facebook/rocksdb/issues/2  std::__atomic_base<unsigned long>::operator unsigned long (this=0x7b74000012f8) at /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:341
https://github.com/facebook/rocksdb/issues/3  0x00000000005bf001 in rocksdb::SeqAdvanceConcurrentTest_SeqAdvanceConcurrent_Test::TestBody()::$_9::operator()(void*) const (this=0x7b14000085e8) at utilities/transactions/write_prepared_transaction_test.cc:1702

Thread 6 (Thread 0x7f228421b700 (LWP 481521) "write_prepared_"):
#0  0x000000000052178c in __tsan::MetaMap::GetAndLock(__tsan::ThreadState*, unsigned long, unsigned long, bool, bool) () at ./db/merge_context.h:15
https://github.com/facebook/rocksdb/issues/1  0x00000000004fa48e in __tsan_atomic64_load () at ./db/merge_context.h:15
https://github.com/facebook/rocksdb/issues/2  0x00000000005e5942 in std::__atomic_base<unsigned long>::load (this=0x7b74000012f8, __m=std::memory_order_seq_cst) at /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:481
https://github.com/facebook/rocksdb/issues/3  std::__atomic_base<unsigned long>::operator unsigned long (this=0x7b74000012f8) at /usr/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:341
https://github.com/facebook/rocksdb/issues/4  0x00000000005bf001 in rocksdb::SeqAdvanceConcurrentTest_SeqAdvanceConcurrent_Test::TestBody()::$_9::operator()(void*) const (this=0x7b14000085e8) at utilities/transactions/write_prepared_transaction_test.cc:1702
```

This is problematic and suspicious. Two threads will get stuck in the same place trying to load from an atomic variable.
https://github.com/facebook/rocksdb/blob/7.8.fb/utilities/transactions/write_prepared_transaction_test.cc#L1694:L1707. Not sure why two threads can reach the same point.

The stack trace shows that there may be a deadlock, since the two threads are on the same write thread (one is doing Prepare, while the other is trying to commit).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10909

Test Plan:
On CircleCI mini-tsan, apply a patch first so that we have a higher chance of hitting the same problematic situation,
```
 diff --git a/utilities/transactions/write_prepared_transaction_test.cc b/utilities/transactions/write_prepared_transaction_test.cc
index 4bc1f3744..bd5dc4924 100644
 --- a/utilities/transactions/write_prepared_transaction_test.cc
+++ b/utilities/transactions/write_prepared_transaction_test.cc
@@ -1714,13 +1714,13 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
       size_t d = (n % base[bi + 1]) / base[bi];
       switch (d) {
         case 0:
-          threads.emplace_back(txn_t0, bi);
+          threads.emplace_back(txn_t3, bi);
           break;
         case 1:
-          threads.emplace_back(txn_t1, bi);
+          threads.emplace_back(txn_t3, bi);
           break;
         case 2:
-          threads.emplace_back(txn_t2, bi);
+          threads.emplace_back(txn_t3, bi);
           break;
         case 3:
           threads.emplace_back(txn_t3, bi);
```
then build and run tests
```
COMPILE_WITH_TSAN=1 CC=clang-13 CXX=clang++-13 ROCKSDB_DISABLE_ALIGNED_NEW=1 USE_CLANG=1 make V=1 -j32 check
gtest-parallel -r 100 ./write_prepared_transaction_test --gtest_filter=TwoWriteQueues/SeqAdvanceConcurrentTest.SeqAdvanceConcurrent/19
```
In the above, `SeqAdvanceConcurrent/19`. The tests 10 to 19 correspond to unordered write in which Prepare() and Commit() can both enter the same write thread.
Before this PR, there is a high chance of hitting the deadlock. With this PR, no deadlock has been encountered so far.

Reviewed By: ltamasi

Differential Revision: D40869387

Pulled By: riversand963

fbshipit-source-id: 81e82a70c263e4f3417597a201b081ee54f1deab
main
Yanqin Jin 2 years ago committed by Facebook GitHub Bot
parent d80baa1396
commit 0547cecb81
  1. 18
      utilities/transactions/write_prepared_transaction_test.cc

@ -1684,22 +1684,23 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
expected_commits = 0; expected_commits = 0;
std::vector<port::Thread> threads; std::vector<port::Thread> threads;
linked = 0; linked.store(0, std::memory_order_release);
std::atomic<bool> batch_formed(false); std::atomic<bool> batch_formed(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::EnterAsBatchGroupLeader:End", "WriteThread::EnterAsBatchGroupLeader:End",
[&](void* /*arg*/) { batch_formed = true; }); [&](void* /*arg*/) { batch_formed = true; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) { "WriteThread::JoinBatchGroup:Wait", [&](void* /*arg*/) {
linked++; size_t orig_linked = linked.fetch_add(1, std::memory_order_acq_rel);
if (linked == 1) { if (orig_linked == 0) {
// Wait until the others are linked too. // Wait until the others are linked too.
while (linked < first_group_size) { while (linked.load(std::memory_order_acquire) < first_group_size) {
} }
} else if (linked == 1 + first_group_size) { } else if (orig_linked == first_group_size) {
// Make the 2nd batch of the rest of writes plus any followup // Make the 2nd batch of the rest of writes plus any followup
// commits from the first batch // commits from the first batch
while (linked < txn_cnt + commit_writes) { while (linked.load(std::memory_order_acquire) <
txn_cnt + commit_writes) {
} }
} }
// Then we will have one or more batches consisting of follow-up // Then we will have one or more batches consisting of follow-up
@ -1731,14 +1732,15 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
FAIL(); FAIL();
} }
// wait to be linked // wait to be linked
while (linked.load() <= bi) { while (linked.load(std::memory_order_acquire) <= bi) {
} }
// after a queue of size first_group_size // after a queue of size first_group_size
if (bi + 1 == first_group_size) { if (bi + 1 == first_group_size) {
while (!batch_formed) { while (!batch_formed) {
} }
// to make it more deterministic, wait until the commits are linked // to make it more deterministic, wait until the commits are linked
while (linked.load() <= bi + expected_commits) { while (linked.load(std::memory_order_acquire) <=
bi + expected_commits) {
} }
} }
} }

Loading…
Cancel
Save