Refactor sync_point to make implementation either customizable or replaceable

Summary: Closes https://github.com/facebook/rocksdb/pull/3637

Differential Revision: D7354373

Pulled By: ajkr

fbshipit-source-id: 6816c7bbc192ed0fb944942b11c7074bf24eddf1
main
Dmitri Smirnov 6 years ago committed by Facebook Github Bot
parent a993c0139d
commit 53d66df0c4
  1. 1
      CMakeLists.txt
  2. 1
      TARGETS
  3. 1
      src.mk
  4. 150
      util/sync_point.cc
  5. 39
      util/sync_point.h
  6. 129
      util/sync_point_impl.cc
  7. 74
      util/sync_point_impl.h

@ -569,6 +569,7 @@ set(SOURCES
util/status_message.cc
util/string_util.cc
util/sync_point.cc
util/sync_point_impl.cc
util/testutil.cc
util/thread_local.cc
util/threadpool_imp.cc

@ -215,6 +215,7 @@ cpp_library(
"util/status_message.cc",
"util/string_util.cc",
"util/sync_point.cc",
"util/sync_point_impl.cc",
"util/thread_local.cc",
"util/threadpool_imp.cc",
"util/transaction_test_util.cc",

@ -148,6 +148,7 @@ LIB_SOURCES = \
util/status_message.cc \
util/string_util.cc \
util/sync_point.cc \
util/sync_point_impl.cc \
util/thread_local.cc \
util/threadpool_imp.cc \
util/transaction_test_util.cc \

@ -4,10 +4,7 @@
// (found in the LICENSE.Apache file in the root directory).
#include "util/sync_point.h"
#include <functional>
#include <thread>
#include "port/port.h"
#include "util/random.h"
#include "util/sync_point_impl.h"
int rocksdb_kill_odds = 0;
std::vector<std::string> rocksdb_kill_prefix_blacklist;
@ -15,156 +12,57 @@ std::vector<std::string> rocksdb_kill_prefix_blacklist;
#ifndef NDEBUG
namespace rocksdb {
void TestKillRandom(std::string kill_point, int odds,
const std::string& srcfile, int srcline) {
for (auto& p : rocksdb_kill_prefix_blacklist) {
if (kill_point.substr(0, p.length()) == p) {
return;
}
}
assert(odds > 0);
if (odds % 7 == 0) {
// class Random uses multiplier 16807, which is 7^5. If odds are
// multiplier of 7, there might be limited values generated.
odds++;
}
auto* r = Random::GetTLSInstance();
bool crash = r->OneIn(odds);
if (crash) {
port::Crash(srcfile, srcline);
}
}
SyncPoint* SyncPoint::GetInstance() {
static SyncPoint sync_point;
return &sync_point;
}
void SyncPoint::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
std::unique_lock<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
cv_.notify_all();
SyncPoint::SyncPoint() :
impl_(new Data) {
}
void SyncPoint::LoadDependencyAndMarkers(
const std::vector<SyncPointPair>& dependencies,
const std::vector<SyncPointPair>& markers) {
std::unique_lock<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
markers_.clear();
marked_thread_id_.clear();
for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
for (const auto& marker : markers) {
successors_[marker.predecessor].push_back(marker.successor);
predecessors_[marker.successor].push_back(marker.predecessor);
markers_[marker.predecessor].push_back(marker.successor);
}
cv_.notify_all();
SyncPoint:: ~SyncPoint() {
delete impl_;
}
bool SyncPoint::PredecessorsAllCleared(const std::string& point) {
for (const auto& pred : predecessors_[point]) {
if (cleared_points_.count(pred) == 0) {
return false;
}
}
return true;
void SyncPoint::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
impl_->LoadDependency(dependencies);
}
void SyncPoint::SetCallBack(const std::string point,
std::function<void(void*)> callback) {
std::unique_lock<std::mutex> lock(mutex_);
callbacks_[point] = callback;
void SyncPoint::LoadDependencyAndMarkers(
const std::vector<SyncPointPair>& dependencies,
const std::vector<SyncPointPair>& markers) {
impl_->LoadDependencyAndMarkers(dependencies, markers);
}
void SyncPoint::ClearCallBack(const std::string point) {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.erase(point);
void SyncPoint::SetCallBack(const std::string& point,
const std::function<void(void*)>& callback) {
impl_->SetCallBack(point, callback);
}
void SyncPoint::ClearCallBack(const std::string& point) {
impl_->ClearCallBack(point);
}
void SyncPoint::ClearAllCallBacks() {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.clear();
impl_->ClearAllCallBacks();
}
void SyncPoint::EnableProcessing() {
std::unique_lock<std::mutex> lock(mutex_);
enabled_ = true;
impl_->EnableProcessing();
}
void SyncPoint::DisableProcessing() {
std::unique_lock<std::mutex> lock(mutex_);
enabled_ = false;
impl_->DisableProcessing();
}
void SyncPoint::ClearTrace() {
std::unique_lock<std::mutex> lock(mutex_);
cleared_points_.clear();
}
bool SyncPoint::DisabledByMarker(const std::string& point,
std::thread::id thread_id) {
auto marked_point_iter = marked_thread_id_.find(point);
return marked_point_iter != marked_thread_id_.end() &&
thread_id != marked_point_iter->second;
impl_->ClearTrace();
}
void SyncPoint::Process(const std::string& point, void* cb_arg) {
std::unique_lock<std::mutex> lock(mutex_);
if (!enabled_) {
return;
}
auto thread_id = std::this_thread::get_id();
auto marker_iter = markers_.find(point);
if (marker_iter != markers_.end()) {
for (auto marked_point : marker_iter->second) {
marked_thread_id_.insert(std::make_pair(marked_point, thread_id));
}
}
if (DisabledByMarker(point, thread_id)) {
return;
}
while (!PredecessorsAllCleared(point)) {
cv_.wait(lock);
if (DisabledByMarker(point, thread_id)) {
return;
}
}
auto callback_pair = callbacks_.find(point);
if (callback_pair != callbacks_.end()) {
num_callbacks_running_++;
mutex_.unlock();
callback_pair->second(cb_arg);
mutex_.lock();
num_callbacks_running_--;
cv_.notify_all();
}
cleared_points_.insert(point);
cv_.notify_all();
impl_->Process(point, cb_arg);
}
} // namespace rocksdb
#endif // NDEBUG

@ -5,13 +5,10 @@
#pragma once
#include <assert.h>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
// This is only set from db_stress.cc and for testing only.
@ -65,6 +62,10 @@ class SyncPoint {
public:
static SyncPoint* GetInstance();
SyncPoint(const SyncPoint&) = delete;
SyncPoint& operator=(const SyncPoint&) = delete;
~SyncPoint();
struct SyncPointPair {
std::string predecessor;
std::string successor;
@ -81,15 +82,14 @@ class SyncPoint {
void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
const std::vector<SyncPointPair>& markers);
// Set up a call back function in sync point.
// The argument to the callback is passed through from
// TEST_SYNC_POINT_CALLBACK(); nullptr if TEST_SYNC_POINT or
// TEST_IDX_SYNC_POINT was used.
void SetCallBack(const std::string point,
std::function<void(void*)> callback);
void SetCallBack(const std::string& point,
const std::function<void(void*)>& callback);
// Clear callback function by point
void ClearCallBack(const std::string point);
void ClearCallBack(const std::string& point);
// Clear all call back functions.
void ClearAllCallBacks();
@ -111,29 +111,20 @@ class SyncPoint {
// TODO: it might be useful to provide a function that blocks until all
// sync points are cleared.
// We want this to be public so we can
// subclass the implementation
struct Data;
private:
bool PredecessorsAllCleared(const std::string& point);
bool DisabledByMarker(const std::string& point, std::thread::id thread_id);
// successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_;
std::unordered_map<std::string, std::function<void(void*)> > callbacks_;
std::unordered_map<std::string, std::vector<std::string> > markers_;
std::unordered_map<std::string, std::thread::id> marked_thread_id_;
std::mutex mutex_;
std::condition_variable cv_;
// sync points that have been passed through
std::unordered_set<std::string> cleared_points_;
bool enabled_ = false;
int num_callbacks_running_ = 0;
// Singleton
SyncPoint();
Data* impl_;
};
} // namespace rocksdb
// Use TEST_SYNC_POINT to specify sync points inside code base.
// Sync points can have happens-after dependency on other sync points,
// Sync points can have happens-after depedency on other sync points,
// configured at runtime via SyncPoint::LoadDependency. This could be
// utilized to re-produce race conditions between threads.
// See TransactionLogIteratorRace in db_test.cc for an example use case.

@ -0,0 +1,129 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "util/sync_point_impl.h"
#ifndef NDEBUG
namespace rocksdb {
void TestKillRandom(std::string kill_point, int odds,
const std::string& srcfile, int srcline) {
for (auto& p : rocksdb_kill_prefix_blacklist) {
if (kill_point.substr(0, p.length()) == p) {
return;
}
}
assert(odds > 0);
if (odds % 7 == 0) {
// class Random uses multiplier 16807, which is 7^5. If odds are
// multiplier of 7, there might be limited values generated.
odds++;
}
auto* r = Random::GetTLSInstance();
bool crash = r->OneIn(odds);
if (crash) {
port::Crash(srcfile, srcline);
}
}
void SyncPoint::Data::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
std::lock_guard<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
cv_.notify_all();
}
void SyncPoint::Data::LoadDependencyAndMarkers(
const std::vector<SyncPointPair>& dependencies,
const std::vector<SyncPointPair>& markers) {
std::lock_guard<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
markers_.clear();
marked_thread_id_.clear();
for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
for (const auto& marker : markers) {
successors_[marker.predecessor].push_back(marker.successor);
predecessors_[marker.successor].push_back(marker.predecessor);
markers_[marker.predecessor].push_back(marker.successor);
}
cv_.notify_all();
}
bool SyncPoint::Data::PredecessorsAllCleared(const std::string& point) {
for (const auto& pred : predecessors_[point]) {
if (cleared_points_.count(pred) == 0) {
return false;
}
}
return true;
}
void SyncPoint::Data::ClearCallBack(const std::string& point) {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.erase(point);
}
void SyncPoint::Data::ClearAllCallBacks() {
std::unique_lock<std::mutex> lock(mutex_);
while (num_callbacks_running_ > 0) {
cv_.wait(lock);
}
callbacks_.clear();
}
void SyncPoint::Data::Process(const std::string& point, void* cb_arg) {
std::unique_lock<std::mutex> lock(mutex_);
if (!enabled_) {
return;
}
auto thread_id = std::this_thread::get_id();
auto marker_iter = markers_.find(point);
if (marker_iter != markers_.end()) {
for (auto& marked_point : marker_iter->second) {
marked_thread_id_.emplace(marked_point, thread_id);
}
}
if (DisabledByMarker(point, thread_id)) {
return;
}
while (!PredecessorsAllCleared(point)) {
cv_.wait(lock);
if (DisabledByMarker(point, thread_id)) {
return;
}
}
auto callback_pair = callbacks_.find(point);
if (callback_pair != callbacks_.end()) {
num_callbacks_running_++;
mutex_.unlock();
callback_pair->second(cb_arg);
mutex_.lock();
num_callbacks_running_--;
}
cleared_points_.insert(point);
cv_.notify_all();
}
} // rocksdb
#endif

@ -0,0 +1,74 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "util/sync_point.h"
#include <assert.h>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include "port/port.h"
#include "util/random.h"
#pragma once
#ifndef NDEBUG
namespace rocksdb {
struct SyncPoint::Data {
// Enable proper deletion by subclasses
virtual ~Data() {}
// successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_;
std::unordered_map<std::string, std::function<void(void*)> > callbacks_;
std::unordered_map<std::string, std::vector<std::string> > markers_;
std::unordered_map<std::string, std::thread::id> marked_thread_id_;
std::mutex mutex_;
std::condition_variable cv_;
// sync points that have been passed through
std::unordered_set<std::string> cleared_points_;
bool enabled_ = false;
int num_callbacks_running_ = 0;
void LoadDependency(const std::vector<SyncPointPair>& dependencies);
void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
const std::vector<SyncPointPair>& markers);
bool PredecessorsAllCleared(const std::string& point);
void SetCallBack(const std::string& point,
const std::function<void(void*)>& callback) {
std::lock_guard<std::mutex> lock(mutex_);
callbacks_[point] = callback;
}
void ClearCallBack(const std::string& point);
void ClearAllCallBacks();
void EnableProcessing() {
std::lock_guard<std::mutex> lock(mutex_);
enabled_ = true;
}
void DisableProcessing() {
std::lock_guard<std::mutex> lock(mutex_);
enabled_ = false;
}
void ClearTrace() {
std::lock_guard<std::mutex> lock(mutex_);
cleared_points_.clear();
}
bool DisabledByMarker(const std::string& point,
std::thread::id thread_id) {
auto marked_point_iter = marked_thread_id_.find(point);
return marked_point_iter != marked_thread_id_.end() &&
thread_id != marked_point_iter->second;
}
void Process(const std::string& point, void* cb_arg);
};
}
#endif // NDEBUG
Loading…
Cancel
Save