Fix test race conditions with OnFlushCompleted() (#9617)

Summary:
We often see flaky tests due to `DB::Flush()` or `DBImpl::TEST_WaitForFlushMemTable()` not waiting until event listeners complete. For example, https://github.com/facebook/rocksdb/issues/9084, https://github.com/facebook/rocksdb/issues/9400, https://github.com/facebook/rocksdb/issues/9528, plus two new ones this week: "EventListenerTest.OnSingleDBFlushTest" and "DBFlushTest.FireOnFlushCompletedAfterCommittedResult". I ran a `make check` with the below race condition-coercing patch and fixed  issues it found besides old BlobDB.

```
 diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc
index 0e1864788..aaba68c4a 100644
 --- a/db/db_impl/db_impl_compaction_flush.cc
+++ b/db/db_impl/db_impl_compaction_flush.cc
@@ -861,6 +861,8 @@ void DBImpl::NotifyOnFlushCompleted(
        mutable_cf_options.level0_stop_writes_trigger);
   // release lock while notifying events
   mutex_.Unlock();
+  bg_cv_.SignalAll();
+  sleep(1);
   {
     for (auto& info : *flush_jobs_info) {
       info->triggered_writes_slowdown = triggered_writes_slowdown;
```

The reason I did not fix old BlobDB issues is because it appears to have a fundamental (non-test) issue. In particular, it uses an EventListener to keep track of the files. OnFlushCompleted() could be delayed until even after a compaction involving that flushed file completes, causing the compaction to unexpectedly delete an untracked file.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9617

Test Plan: `make check` including the race condition coercing patch

Reviewed By: hx235

Differential Revision: D34384022

Pulled By: ajkr

fbshipit-source-id: 2652ded39b415277c5d6a628414345223930514e
main
Andrew Kryczka 3 years ago committed by Facebook GitHub Bot
parent 96978e4d96
commit 8ca433f912
  1. 9
      db/compact_files_test.cc
  2. 3
      db/db_flush_test.cc
  3. 8
      db/db_test2.cc
  4. 16
      db/listener_test.cc

@ -138,6 +138,9 @@ TEST_F(CompactFilesTest, MultipleLevel) {
collector->ClearFlushedFiles(); collector->ClearFlushedFiles();
ASSERT_OK(db->Put(WriteOptions(), ToString(i), "")); ASSERT_OK(db->Put(WriteOptions(), ToString(i), ""));
ASSERT_OK(db->Flush(FlushOptions())); ASSERT_OK(db->Flush(FlushOptions()));
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
auto l0_files = collector->GetFlushedFiles(); auto l0_files = collector->GetFlushedFiles();
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, i)); ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, i));
@ -296,6 +299,9 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) {
ASSERT_OK(db->Flush(FlushOptions())); ASSERT_OK(db->Flush(FlushOptions()));
} }
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
auto l0_files = collector->GetFlushedFiles(); auto l0_files = collector->GetFlushedFiles();
EXPECT_EQ(5, l0_files.size()); EXPECT_EQ(5, l0_files.size());
@ -414,6 +420,9 @@ TEST_F(CompactFilesTest, SentinelCompressionType) {
ASSERT_OK(db->Put(WriteOptions(), "key", "val")); ASSERT_OK(db->Put(WriteOptions(), "key", "val"));
ASSERT_OK(db->Flush(FlushOptions())); ASSERT_OK(db->Flush(FlushOptions()));
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(static_cast_with_check<DBImpl>(db)->TEST_WaitForBackgroundWork());
auto l0_files = collector->GetFlushedFiles(); auto l0_files = collector->GetFlushedFiles();
ASSERT_EQ(1, l0_files.size()); ASSERT_EQ(1, l0_files.size());

@ -1574,6 +1574,9 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
flush_opts.wait = false; flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts)); ASSERT_OK(db_->Flush(flush_opts));
t1.join(); t1.join();
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_TRUE(listener->completed1); ASSERT_TRUE(listener->completed1);
ASSERT_TRUE(listener->completed2); ASSERT_TRUE(listener->completed2);
SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->DisableProcessing();

@ -410,6 +410,9 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
}; };
// Create some data and flush "default" and "nikitich" so that they // Create some data and flush "default" and "nikitich" so that they
@ -588,6 +591,11 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
ASSERT_OK(static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable()); ASSERT_OK(static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable());
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_OK(
static_cast_with_check<DBImpl>(db2)->TEST_WaitForBackgroundWork());
}; };
// Trigger a flush on cf2 // Trigger a flush on cf2

@ -340,6 +340,9 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) {
for (int i = 1; i < 8; ++i) { for (int i = 1; i < 8; ++i) {
ASSERT_OK(Flush(i)); ASSERT_OK(Flush(i));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_EQ(listener->flushed_dbs_.size(), i); ASSERT_EQ(listener->flushed_dbs_.size(), i);
ASSERT_EQ(listener->flushed_column_family_names_.size(), i); ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
} }
@ -462,6 +465,13 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) {
} }
} }
for (int d = 0; d < kNumDBs; ++d) {
// Ensure background work is fully finished including listener callbacks
// before accessing listener state.
ASSERT_OK(
static_cast_with_check<DBImpl>(dbs[d])->TEST_WaitForBackgroundWork());
}
for (auto* listener : listeners) { for (auto* listener : listeners) {
int pos = 0; int pos = 0;
for (size_t c = 0; c < cf_names.size(); ++c) { for (size_t c = 0; c < cf_names.size(); ++c) {
@ -523,10 +533,10 @@ TEST_F(EventListenerTest, DisableBGCompaction) {
ASSERT_OK(db_->Flush(fo, handles_[1])); ASSERT_OK(db_->Flush(fo, handles_[1]));
db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); db_->GetColumnFamilyMetaData(handles_[1], &cf_meta);
} }
ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9); // Ensure background work is fully finished including listener callbacks
// We don't want the listener executing during DBTestBase::Close() due to // before accessing listener state.
// race on handles_.
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9);
} }
class TestCompactionReasonListener : public EventListener { class TestCompactionReasonListener : public EventListener {

Loading…
Cancel
Save