diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 4378f3212..49ce9d3a0 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -229,6 +229,8 @@ Status DBImpl::FlushMemTableToOutputFile( mutex_.Lock(); if (log_io_s.ok() && synced_wals.IsWalAddition()) { log_io_s = status_to_io_status(ApplyWALToManifest(&synced_wals)); + TEST_SYNC_POINT_CALLBACK("DBImpl::FlushMemTableToOutputFile:CommitWal:1", + nullptr); } if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 96b5d4f91..5b5ec76af 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1523,8 +1523,25 @@ TEST_F(DBWALTest, RaceInstallFlushResultsWithWalObsoletion) { /*wait=*/false, /*allow_write_stall=*/true, handles_[0])); bool called = false; + std::atomic bg_flush_threads{0}; + std::atomic wal_synced{false}; SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) { + int cur = bg_flush_threads.load(); + int desired = cur + 1; + if (cur > 0 || + !bg_flush_threads.compare_exchange_strong(cur, desired)) { + while (!wal_synced.load()) { + // Wait until the other bg flush thread finishes committing WAL sync + // operation to the MANIFEST. + } + } + }); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::FlushMemTableToOutputFile:CommitWal:1", + [&](void* /*arg*/) { wal_synced.store(true); }); // This callback will be called when the first bg flush thread reaches the // point before entering the MANIFEST write queue after flushing the SST // file.