From 06f378d75edfc08d2e3d1af42bab2f63bd0267df Mon Sep 17 00:00:00 2001 From: Siying Dong Date: Thu, 21 Feb 2019 17:23:05 -0800 Subject: [PATCH] When closing BlobDB, should first wait for all background tasks (#5005) Summary: When closing a BlobDB, it only waits for background tasks to finish as the last thing, but the background task may access some variables that are destroyed. The fix is to introduce a shutdown function in the timer queue and call the function as the first thing when destorying BlobDB. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5005 Differential Revision: D14170342 Pulled By: siying fbshipit-source-id: 081e6a2d99b9765d5956cf6cdfc290c07270c233 --- util/timer_queue.h | 16 +++++-- utilities/blob_db/blob_db_impl.cc | 8 ++++ utilities/blob_db/blob_db_test.cc | 69 +++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 3 deletions(-) diff --git a/util/timer_queue.h b/util/timer_queue.h index f068ffefb..bd8a4f850 100644 --- a/util/timer_queue.h +++ b/util/timer_queue.h @@ -22,8 +22,6 @@ #pragma once -#include "port/port.h" - #include #include #include @@ -33,6 +31,9 @@ #include #include +#include "port/port.h" +#include "util/sync_point.h" + // Allows execution of handlers at a specified time in the future // Guarantees: // - All handlers are executed ONCE, even if cancelled (aborted parameter will @@ -48,7 +49,13 @@ class TimerQueue { public: TimerQueue() : m_th(&TimerQueue::run, this) {} - ~TimerQueue() { + ~TimerQueue() { shutdown(); } + + // This function is not thread-safe. + void shutdown() { + if (closed_) { + return; + } cancelAll(); // Abusing the timer queue to trigger the shutdown. add(0, [this](bool) { @@ -56,6 +63,7 @@ class TimerQueue { return std::make_pair(false, 0); }); m_th.join(); + closed_ = true; } // Adds a new timer @@ -67,6 +75,7 @@ class TimerQueue { WorkItem item; Clock::time_point tp = Clock::now(); item.end = tp + std::chrono::milliseconds(milliseconds); + TEST_SYNC_POINT_CALLBACK("TimeQueue::Add:item.end", &item.end); item.period = milliseconds; item.handler = std::move(handler); @@ -217,4 +226,5 @@ class TimerQueue { std::vector& getContainer() { return this->c; } } m_items; rocksdb::port::Thread m_th; + bool closed_ = false; }; diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index d1f6c87ef..7c938c16e 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -94,6 +94,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname, } BlobDBImpl::~BlobDBImpl() { + tqueue_.shutdown(); // CancelAllBackgroundWork(db_, true); Status s __attribute__((__unused__)) = Close(); assert(s.ok()); @@ -1308,6 +1309,9 @@ std::pair BlobDBImpl::EvictExpiredFiles(bool aborted) { return std::make_pair(false, -1); } + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0"); + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1"); + std::vector> process_files; uint64_t now = EpochNow(); { @@ -1322,6 +1326,10 @@ std::pair BlobDBImpl::EvictExpiredFiles(bool aborted) { } } + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2"); + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3"); + TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr); + SequenceNumber seq = GetLatestSequenceNumber(); { MutexLock l(&write_mutex_); diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index bc86d04b3..f9b4063c7 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -6,6 +6,7 @@ #ifndef ROCKSDB_LITE #include +#include #include #include #include @@ -72,6 +73,12 @@ class BlobDBTest : public testing::Test { Open(bdb_options, options); } + void Close() { + assert(blob_db_ != nullptr); + delete blob_db_; + blob_db_ = nullptr; + } + void Destroy() { if (blob_db_) { Options options = blob_db_->GetOptions(); @@ -1542,6 +1549,68 @@ TEST_F(BlobDBTest, DisableFileDeletions) { } } +TEST_F(BlobDBTest, ShutdownWait) { + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 100; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = false; + Options options; + options.env = mock_env_.get(); + + SyncPoint::GetInstance()->LoadDependency({ + {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"}, + {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"}, + {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"}, + {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"}, + }); + // Force all tasks to be scheduled immediately. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TimeQueue::Add:item.end", [&](void *arg) { + std::chrono::steady_clock::time_point *tp = + static_cast(arg); + *tp = + std::chrono::steady_clock::now() - std::chrono::milliseconds(10000); + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) { + // Sleep 3 ms to increase the chance of data race. + // We've synced up the code so that EvictExpiredFiles() + // is called concurrently with ~BlobDBImpl(). + // ~BlobDBImpl() is supposed to wait for all background + // task to shutdown before doing anything else. In order + // to use the same test to reproduce a bug of the waiting + // logic, we wait a little bit here, so that TSAN can + // catch the data race. + // We should improve the test if we find a better way. + Env::Default()->SleepForMicroseconds(3000); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + Open(bdb_options, options); + mock_env_->set_current_time(50); + std::map data; + ASSERT_OK(PutWithTTL("foo", "bar", 100, &data)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + auto blob_file = blob_files[0]; + ASSERT_FALSE(blob_file->Immutable()); + ASSERT_FALSE(blob_file->Obsolete()); + VerifyDB(data); + + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0"); + mock_env_->set_current_time(250); + // The key should expired now. + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1"); + + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2"); + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3"); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace blob_db } // namespace rocksdb