From 53d66df0c426efcfc2d81e27cf1b7e7f13869c15 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Fri, 23 Mar 2018 12:48:45 -0700 Subject: [PATCH] Refactor sync_point to make implementation either customizable or replaceable Summary: Closes https://github.com/facebook/rocksdb/pull/3637 Differential Revision: D7354373 Pulled By: ajkr fbshipit-source-id: 6816c7bbc192ed0fb944942b11c7074bf24eddf1 --- CMakeLists.txt | 1 + TARGETS | 1 + src.mk | 1 + util/sync_point.cc | 150 +++++++--------------------------------- util/sync_point.h | 39 ++++------- util/sync_point_impl.cc | 129 ++++++++++++++++++++++++++++++++++ util/sync_point_impl.h | 74 ++++++++++++++++++++ 7 files changed, 245 insertions(+), 150 deletions(-) create mode 100644 util/sync_point_impl.cc create mode 100644 util/sync_point_impl.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f2364512b..8c74e1db4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -569,6 +569,7 @@ set(SOURCES util/status_message.cc util/string_util.cc util/sync_point.cc + util/sync_point_impl.cc util/testutil.cc util/thread_local.cc util/threadpool_imp.cc diff --git a/TARGETS b/TARGETS index 7d0d71818..4123b3582 100644 --- a/TARGETS +++ b/TARGETS @@ -215,6 +215,7 @@ cpp_library( "util/status_message.cc", "util/string_util.cc", "util/sync_point.cc", + "util/sync_point_impl.cc", "util/thread_local.cc", "util/threadpool_imp.cc", "util/transaction_test_util.cc", diff --git a/src.mk b/src.mk index 196686c9f..4089bf0f3 100644 --- a/src.mk +++ b/src.mk @@ -148,6 +148,7 @@ LIB_SOURCES = \ util/status_message.cc \ util/string_util.cc \ util/sync_point.cc \ + util/sync_point_impl.cc \ util/thread_local.cc \ util/threadpool_imp.cc \ util/transaction_test_util.cc \ diff --git a/util/sync_point.cc b/util/sync_point.cc index c8c9fbc26..ce0fa0a97 100644 --- a/util/sync_point.cc +++ b/util/sync_point.cc @@ -4,10 +4,7 @@ // (found in the LICENSE.Apache file in the root directory). #include "util/sync_point.h" -#include -#include -#include "port/port.h" -#include "util/random.h" +#include "util/sync_point_impl.h" int rocksdb_kill_odds = 0; std::vector rocksdb_kill_prefix_blacklist; @@ -15,156 +12,57 @@ std::vector rocksdb_kill_prefix_blacklist; #ifndef NDEBUG namespace rocksdb { -void TestKillRandom(std::string kill_point, int odds, - const std::string& srcfile, int srcline) { - for (auto& p : rocksdb_kill_prefix_blacklist) { - if (kill_point.substr(0, p.length()) == p) { - return; - } - } - - assert(odds > 0); - if (odds % 7 == 0) { - // class Random uses multiplier 16807, which is 7^5. If odds are - // multiplier of 7, there might be limited values generated. - odds++; - } - auto* r = Random::GetTLSInstance(); - bool crash = r->OneIn(odds); - if (crash) { - port::Crash(srcfile, srcline); - } -} - SyncPoint* SyncPoint::GetInstance() { static SyncPoint sync_point; return &sync_point; } -void SyncPoint::LoadDependency(const std::vector& dependencies) { - std::unique_lock lock(mutex_); - successors_.clear(); - predecessors_.clear(); - cleared_points_.clear(); - for (const auto& dependency : dependencies) { - successors_[dependency.predecessor].push_back(dependency.successor); - predecessors_[dependency.successor].push_back(dependency.predecessor); - } - cv_.notify_all(); +SyncPoint::SyncPoint() : + impl_(new Data) { } -void SyncPoint::LoadDependencyAndMarkers( - const std::vector& dependencies, - const std::vector& markers) { - std::unique_lock lock(mutex_); - successors_.clear(); - predecessors_.clear(); - cleared_points_.clear(); - markers_.clear(); - marked_thread_id_.clear(); - for (const auto& dependency : dependencies) { - successors_[dependency.predecessor].push_back(dependency.successor); - predecessors_[dependency.successor].push_back(dependency.predecessor); - } - for (const auto& marker : markers) { - successors_[marker.predecessor].push_back(marker.successor); - predecessors_[marker.successor].push_back(marker.predecessor); - markers_[marker.predecessor].push_back(marker.successor); - } - cv_.notify_all(); +SyncPoint:: ~SyncPoint() { + delete impl_; } -bool SyncPoint::PredecessorsAllCleared(const std::string& point) { - for (const auto& pred : predecessors_[point]) { - if (cleared_points_.count(pred) == 0) { - return false; - } - } - return true; +void SyncPoint::LoadDependency(const std::vector& dependencies) { + impl_->LoadDependency(dependencies); } -void SyncPoint::SetCallBack(const std::string point, - std::function callback) { - std::unique_lock lock(mutex_); - callbacks_[point] = callback; +void SyncPoint::LoadDependencyAndMarkers( + const std::vector& dependencies, + const std::vector& markers) { + impl_->LoadDependencyAndMarkers(dependencies, markers); } -void SyncPoint::ClearCallBack(const std::string point) { - std::unique_lock lock(mutex_); - while (num_callbacks_running_ > 0) { - cv_.wait(lock); - } - callbacks_.erase(point); +void SyncPoint::SetCallBack(const std::string& point, + const std::function& callback) { + impl_->SetCallBack(point, callback); +} + +void SyncPoint::ClearCallBack(const std::string& point) { + impl_->ClearCallBack(point); } void SyncPoint::ClearAllCallBacks() { - std::unique_lock lock(mutex_); - while (num_callbacks_running_ > 0) { - cv_.wait(lock); - } - callbacks_.clear(); + impl_->ClearAllCallBacks(); } void SyncPoint::EnableProcessing() { - std::unique_lock lock(mutex_); - enabled_ = true; + impl_->EnableProcessing(); } void SyncPoint::DisableProcessing() { - std::unique_lock lock(mutex_); - enabled_ = false; + impl_->DisableProcessing(); } void SyncPoint::ClearTrace() { - std::unique_lock lock(mutex_); - cleared_points_.clear(); -} - -bool SyncPoint::DisabledByMarker(const std::string& point, - std::thread::id thread_id) { - auto marked_point_iter = marked_thread_id_.find(point); - return marked_point_iter != marked_thread_id_.end() && - thread_id != marked_point_iter->second; + impl_->ClearTrace(); } void SyncPoint::Process(const std::string& point, void* cb_arg) { - std::unique_lock lock(mutex_); - if (!enabled_) { - return; - } - - auto thread_id = std::this_thread::get_id(); - - auto marker_iter = markers_.find(point); - if (marker_iter != markers_.end()) { - for (auto marked_point : marker_iter->second) { - marked_thread_id_.insert(std::make_pair(marked_point, thread_id)); - } - } - - if (DisabledByMarker(point, thread_id)) { - return; - } - - while (!PredecessorsAllCleared(point)) { - cv_.wait(lock); - if (DisabledByMarker(point, thread_id)) { - return; - } - } - - auto callback_pair = callbacks_.find(point); - if (callback_pair != callbacks_.end()) { - num_callbacks_running_++; - mutex_.unlock(); - callback_pair->second(cb_arg); - mutex_.lock(); - num_callbacks_running_--; - cv_.notify_all(); - } - - cleared_points_.insert(point); - cv_.notify_all(); + impl_->Process(point, cb_arg); } + } // namespace rocksdb #endif // NDEBUG diff --git a/util/sync_point.h b/util/sync_point.h index 3e0831b67..c85be9a48 100644 --- a/util/sync_point.h +++ b/util/sync_point.h @@ -5,13 +5,10 @@ #pragma once #include -#include #include #include #include #include -#include -#include #include // This is only set from db_stress.cc and for testing only. @@ -65,6 +62,10 @@ class SyncPoint { public: static SyncPoint* GetInstance(); + SyncPoint(const SyncPoint&) = delete; + SyncPoint& operator=(const SyncPoint&) = delete; + ~SyncPoint(); + struct SyncPointPair { std::string predecessor; std::string successor; @@ -81,15 +82,14 @@ class SyncPoint { void LoadDependencyAndMarkers(const std::vector& dependencies, const std::vector& markers); - // Set up a call back function in sync point. // The argument to the callback is passed through from // TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or // TEST_IDX_SYNC_POINT was used. - void SetCallBack(const std::string point, - std::function callback); + void SetCallBack(const std::string& point, + const std::function& callback); // Clear callback function by point - void ClearCallBack(const std::string point); + void ClearCallBack(const std::string& point); // Clear all call back functions. void ClearAllCallBacks(); @@ -111,29 +111,20 @@ class SyncPoint { // TODO: it might be useful to provide a function that blocks until all // sync points are cleared. + // We want this to be public so we can + // subclass the implementation + struct Data; + private: - bool PredecessorsAllCleared(const std::string& point); - bool DisabledByMarker(const std::string& point, std::thread::id thread_id); - - // successor/predecessor map loaded from LoadDependency - std::unordered_map> successors_; - std::unordered_map> predecessors_; - std::unordered_map > callbacks_; - std::unordered_map > markers_; - std::unordered_map marked_thread_id_; - - std::mutex mutex_; - std::condition_variable cv_; - // sync points that have been passed through - std::unordered_set cleared_points_; - bool enabled_ = false; - int num_callbacks_running_ = 0; + // Singleton + SyncPoint(); + Data* impl_; }; } // namespace rocksdb // Use TEST_SYNC_POINT to specify sync points inside code base. -// Sync points can have happens-after dependency on other sync points, +// Sync points can have happens-after depedency on other sync points, // configured at runtime via SyncPoint::LoadDependency. This could be // utilized to re-produce race conditions between threads. // See TransactionLogIteratorRace in db_test.cc for an example use case. diff --git a/util/sync_point_impl.cc b/util/sync_point_impl.cc new file mode 100644 index 000000000..ab4cc5ae5 --- /dev/null +++ b/util/sync_point_impl.cc @@ -0,0 +1,129 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/sync_point_impl.h" + +#ifndef NDEBUG +namespace rocksdb { + +void TestKillRandom(std::string kill_point, int odds, + const std::string& srcfile, int srcline) { + for (auto& p : rocksdb_kill_prefix_blacklist) { + if (kill_point.substr(0, p.length()) == p) { + return; + } + } + + assert(odds > 0); + if (odds % 7 == 0) { + // class Random uses multiplier 16807, which is 7^5. If odds are + // multiplier of 7, there might be limited values generated. + odds++; + } + auto* r = Random::GetTLSInstance(); + bool crash = r->OneIn(odds); + if (crash) { + port::Crash(srcfile, srcline); + } +} + + +void SyncPoint::Data::LoadDependency(const std::vector& dependencies) { + std::lock_guard lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + cv_.notify_all(); +} + +void SyncPoint::Data::LoadDependencyAndMarkers( + const std::vector& dependencies, + const std::vector& markers) { + std::lock_guard lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + markers_.clear(); + marked_thread_id_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + for (const auto& marker : markers) { + successors_[marker.predecessor].push_back(marker.successor); + predecessors_[marker.successor].push_back(marker.predecessor); + markers_[marker.predecessor].push_back(marker.successor); + } + cv_.notify_all(); +} + +bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::Data::ClearCallBack(const std::string& point) { + std::unique_lock lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.erase(point); +} + +void SyncPoint::Data::ClearAllCallBacks() { + std::unique_lock lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.clear(); +} + +void SyncPoint::Data::Process(const std::string& point, void* cb_arg) { + std::unique_lock lock(mutex_); + if (!enabled_) { + return; + } + + auto thread_id = std::this_thread::get_id(); + + auto marker_iter = markers_.find(point); + if (marker_iter != markers_.end()) { + for (auto& marked_point : marker_iter->second) { + marked_thread_id_.emplace(marked_point, thread_id); + } + } + + if (DisabledByMarker(point, thread_id)) { + return; + } + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + if (DisabledByMarker(point, thread_id)) { + return; + } + } + + auto callback_pair = callbacks_.find(point); + if (callback_pair != callbacks_.end()) { + num_callbacks_running_++; + mutex_.unlock(); + callback_pair->second(cb_arg); + mutex_.lock(); + num_callbacks_running_--; + } + cleared_points_.insert(point); + cv_.notify_all(); +} +} // rocksdb +#endif diff --git a/util/sync_point_impl.h b/util/sync_point_impl.h new file mode 100644 index 000000000..8c7bd7a2d --- /dev/null +++ b/util/sync_point_impl.h @@ -0,0 +1,74 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "util/sync_point.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "port/port.h" +#include "util/random.h" + +#pragma once + +#ifndef NDEBUG +namespace rocksdb { +struct SyncPoint::Data { + // Enable proper deletion by subclasses + virtual ~Data() {} + // successor/predecessor map loaded from LoadDependency + std::unordered_map> successors_; + std::unordered_map> predecessors_; + std::unordered_map > callbacks_; + std::unordered_map > markers_; + std::unordered_map marked_thread_id_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set cleared_points_; + bool enabled_ = false; + int num_callbacks_running_ = 0; + + void LoadDependency(const std::vector& dependencies); + void LoadDependencyAndMarkers(const std::vector& dependencies, + const std::vector& markers); + bool PredecessorsAllCleared(const std::string& point); + void SetCallBack(const std::string& point, + const std::function& callback) { + std::lock_guard lock(mutex_); + callbacks_[point] = callback; +} + + void ClearCallBack(const std::string& point); + void ClearAllCallBacks(); + void EnableProcessing() { + std::lock_guard lock(mutex_); + enabled_ = true; + } + void DisableProcessing() { + std::lock_guard lock(mutex_); + enabled_ = false; + } + void ClearTrace() { + std::lock_guard lock(mutex_); + cleared_points_.clear(); + } + bool DisabledByMarker(const std::string& point, + std::thread::id thread_id) { + auto marked_point_iter = marked_thread_id_.find(point); + return marked_point_iter != marked_thread_id_.end() && + thread_id != marked_point_iter->second; + } + void Process(const std::string& point, void* cb_arg); +}; +} +#endif // NDEBUG