From 8ca433f912f1d90058d7174d1923d571c341257f Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Tue, 22 Feb 2022 12:13:39 -0800 Subject: [PATCH] Fix test race conditions with OnFlushCompleted() (#9617) Summary: We often see flaky tests due to `DB::Flush()` or `DBImpl::TEST_WaitForFlushMemTable()` not waiting until event listeners complete. For example, https://github.com/facebook/rocksdb/issues/9084, https://github.com/facebook/rocksdb/issues/9400, https://github.com/facebook/rocksdb/issues/9528, plus two new ones this week: "EventListenerTest.OnSingleDBFlushTest" and "DBFlushTest.FireOnFlushCompletedAfterCommittedResult". I ran a `make check` with the below race condition-coercing patch and fixed issues it found besides old BlobDB. ``` diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 0e1864788..aaba68c4a 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -861,6 +861,8 @@ void DBImpl::NotifyOnFlushCompleted( mutable_cf_options.level0_stop_writes_trigger); // release lock while notifying events mutex_.Unlock(); + bg_cv_.SignalAll(); + sleep(1); { for (auto& info : *flush_jobs_info) { info->triggered_writes_slowdown = triggered_writes_slowdown; ``` The reason I did not fix old BlobDB issues is because it appears to have a fundamental (non-test) issue. In particular, it uses an EventListener to keep track of the files. OnFlushCompleted() could be delayed until even after a compaction involving that flushed file completes, causing the compaction to unexpectedly delete an untracked file. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9617 Test Plan: `make check` including the race condition coercing patch Reviewed By: hx235 Differential Revision: D34384022 Pulled By: ajkr fbshipit-source-id: 2652ded39b415277c5d6a628414345223930514e --- db/compact_files_test.cc | 9 +++++++++ db/db_flush_test.cc | 3 +++ db/db_test2.cc | 8 ++++++++ db/listener_test.cc | 16 +++++++++++++--- 4 files changed, 33 insertions(+), 3 deletions(-) diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 4793adddf..29e3494ea 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -138,6 +138,9 @@ TEST_F(CompactFilesTest, MultipleLevel) { collector->ClearFlushedFiles(); ASSERT_OK(db->Put(WriteOptions(), ToString(i), "")); ASSERT_OK(db->Flush(FlushOptions())); + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(static_cast_with_check(db)->TEST_WaitForBackgroundWork()); auto l0_files = collector->GetFlushedFiles(); ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files, i)); @@ -296,6 +299,9 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) { ASSERT_OK(db->Flush(FlushOptions())); } + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(static_cast_with_check(db)->TEST_WaitForBackgroundWork()); auto l0_files = collector->GetFlushedFiles(); EXPECT_EQ(5, l0_files.size()); @@ -414,6 +420,9 @@ TEST_F(CompactFilesTest, SentinelCompressionType) { ASSERT_OK(db->Put(WriteOptions(), "key", "val")); ASSERT_OK(db->Flush(FlushOptions())); + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(static_cast_with_check(db)->TEST_WaitForBackgroundWork()); auto l0_files = collector->GetFlushedFiles(); ASSERT_EQ(1, l0_files.size()); diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index b4ca7f019..2fe3c559d 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -1574,6 +1574,9 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { flush_opts.wait = false; ASSERT_OK(db_->Flush(flush_opts)); t1.join(); + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); ASSERT_TRUE(listener->completed1); ASSERT_TRUE(listener->completed2); SyncPoint::GetInstance()->DisableProcessing(); diff --git a/db/db_test2.cc b/db/db_test2.cc index d8f54938f..7cecc2911 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -410,6 +410,9 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) { ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3])); + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); }; // Create some data and flush "default" and "nikitich" so that they @@ -588,6 +591,11 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) { ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1])); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2])); ASSERT_OK(static_cast(db2)->TEST_WaitForFlushMemTable()); + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); + ASSERT_OK( + static_cast_with_check(db2)->TEST_WaitForBackgroundWork()); }; // Trigger a flush on cf2 diff --git a/db/listener_test.cc b/db/listener_test.cc index bef81f050..036762a44 100644 --- a/db/listener_test.cc +++ b/db/listener_test.cc @@ -340,6 +340,9 @@ TEST_F(EventListenerTest, OnSingleDBFlushTest) { for (int i = 1; i < 8; ++i) { ASSERT_OK(Flush(i)); ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); ASSERT_EQ(listener->flushed_dbs_.size(), i); ASSERT_EQ(listener->flushed_column_family_names_.size(), i); } @@ -462,6 +465,13 @@ TEST_F(EventListenerTest, MultiDBMultiListeners) { } } + for (int d = 0; d < kNumDBs; ++d) { + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. + ASSERT_OK( + static_cast_with_check(dbs[d])->TEST_WaitForBackgroundWork()); + } + for (auto* listener : listeners) { int pos = 0; for (size_t c = 0; c < cf_names.size(); ++c) { @@ -523,10 +533,10 @@ TEST_F(EventListenerTest, DisableBGCompaction) { ASSERT_OK(db_->Flush(fo, handles_[1])); db_->GetColumnFamilyMetaData(handles_[1], &cf_meta); } - ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9); - // We don't want the listener executing during DBTestBase::Close() due to - // race on handles_. + // Ensure background work is fully finished including listener callbacks + // before accessing listener state. ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork()); + ASSERT_GE(listener->slowdown_count, kSlowdownTrigger * 9); } class TestCompactionReasonListener : public EventListener {