Add a new feature to enforce a sync point only active on a thread

Summary: Add markers to sync points. A marked sync point will only be active when it is on the same thread as the marker sync point.

Test Plan: Write a unit test to validate.

Reviewers: sdong, IslamAbdelRahman, andrewkr

Reviewed By: andrewkr

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60375
main
omegaga 8 years ago
parent b954847fca
commit cd4178a015
  1. 21
      db/db_compaction_test.cc
  2. 48
      db/db_test2.cc
  3. 58
      util/sync_point.cc
  4. 18
      util/sync_point.h

@ -949,19 +949,21 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
"DBImpl::BackgroundCompaction:NonTrivial",
[&](void* arg) { non_trivial_move++; });
bool first = true;
bool second = true;
// Purpose of dependencies:
// 4 -> 1: ensure the order of two non-trivial compactions
// 5 -> 2 and 5 -> 3: ensure we do a check before two non-trivial compactions
// are installed
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"},
{"DBCompaction::ManualPartial:2", "DBCompaction::ManualPartial:3"},
{"DBCompaction::ManualPartial:5",
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
{"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"},
{"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) {
if (first) {
TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
first = false;
TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
TEST_SYNC_POINT("DBCompaction::ManualPartial:3");
} else if (second) {
} else { // second non-trivial compaction
TEST_SYNC_POINT("DBCompaction::ManualPartial:2");
}
});
@ -1038,6 +1040,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
std::string end_string = Key(199);
Slice begin(begin_string);
Slice end(end_string);
// First non-trivial compaction is triggered
ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
});
@ -1061,15 +1064,17 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
// Second non-trivial compaction is triggered
ASSERT_OK(Flush());
// 3 files in L0
// Before two non-trivial compactions are installed, there are 3 files in L0
ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
TEST_SYNC_POINT("DBCompaction::ManualPartial:5");
// 1 file in L6, 1 file in L1
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
// After two non-trivial compactions are installed, there is 1 file in L6, and
// 1 file in L1
ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
threads.join();

@ -6,7 +6,9 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <atomic>
#include <cstdlib>
#include <functional>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
@ -1438,6 +1440,52 @@ TEST_F(DBTest2, PersistentCache) {
}
}
}
namespace {
void CountSyncPoint() {
TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */);
}
} // namespace
TEST_F(DBTest2, SyncPointMarker) {
std::atomic<int> sync_point_called(0);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBTest2::MarkedPoint",
[&](void* arg) { sync_point_called.fetch_add(1); });
// The first dependency enforces Marker can be loaded before MarkedPoint.
// The second checks that thread 1's MarkedPoint should be disabled here.
// Execution order:
// | Thread 1 | Thread 2 |
// | | Marker |
// | MarkedPoint | |
// | Thread1First | |
// | | MarkedPoint |
rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers(
{{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}},
{{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::function<void()> func1 = [&]() {
CountSyncPoint();
TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First");
};
std::function<void()> func2 = [&]() {
TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker");
CountSyncPoint();
};
auto thread1 = std::thread(func1);
auto thread2 = std::thread(func2);
thread1.join();
thread2.join();
// Callback is only executed once
ASSERT_EQ(sync_point_called.load(), 1);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
#endif
} // namespace rocksdb

@ -4,6 +4,7 @@
// of patent rights can be found in the PATENTS file in the same directory.
#include "util/sync_point.h"
#include <thread>
#include "port/port.h"
#include "util/random.h"
@ -39,7 +40,7 @@ SyncPoint* SyncPoint::GetInstance() {
return &sync_point;
}
void SyncPoint::LoadDependency(const std::vector<Dependency>& dependencies) {
void SyncPoint::LoadDependency(const std::vector<SyncPointPair>& dependencies) {
std::unique_lock<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
@ -51,6 +52,27 @@ void SyncPoint::LoadDependency(const std::vector<Dependency>& dependencies) {
cv_.notify_all();
}
void SyncPoint::LoadDependencyAndMarkers(
const std::vector<SyncPointPair>& dependencies,
const std::vector<SyncPointPair>& markers) {
std::unique_lock<std::mutex> lock(mutex_);
successors_.clear();
predecessors_.clear();
cleared_points_.clear();
markers_.clear();
marked_thread_id_.clear();
for (const auto& dependency : dependencies) {
successors_[dependency.predecessor].push_back(dependency.successor);
predecessors_[dependency.successor].push_back(dependency.predecessor);
}
for (const auto& marker : markers) {
successors_[marker.predecessor].push_back(marker.successor);
predecessors_[marker.successor].push_back(marker.predecessor);
markers_[marker.predecessor].push_back(marker.successor);
}
cv_.notify_all();
}
bool SyncPoint::PredecessorsAllCleared(const std::string& point) {
for (const auto& pred : predecessors_[point]) {
if (cleared_points_.count(pred) == 0) {
@ -89,10 +111,38 @@ void SyncPoint::ClearTrace() {
cleared_points_.clear();
}
bool SyncPoint::DisabledByMarker(const std::string& point,
std::thread::id thread_id) {
auto marked_point_iter = marked_thread_id_.find(point);
return marked_point_iter != marked_thread_id_.end() &&
thread_id != marked_point_iter->second;
}
void SyncPoint::Process(const std::string& point, void* cb_arg) {
std::unique_lock<std::mutex> lock(mutex_);
auto thread_id = std::this_thread::get_id();
auto marker_iter = markers_.find(point);
if (marker_iter != markers_.end()) {
for (auto marked_point : marker_iter->second) {
marked_thread_id_.insert(std::make_pair(marked_point, thread_id));
}
}
if (DisabledByMarker(point, thread_id)) {
return;
}
if (!enabled_) return;
if (!enabled_) {
return;
}
while (!PredecessorsAllCleared(point)) {
cv_.wait(lock);
if (DisabledByMarker(point, thread_id)) {
return;
}
}
auto callback_pair = callbacks_.find(point);
if (callback_pair != callbacks_.end()) {
@ -104,10 +154,6 @@ void SyncPoint::Process(const std::string& point, void* cb_arg) {
cv_.notify_all();
}
while (!PredecessorsAllCleared(point)) {
cv_.wait(lock);
}
cleared_points_.insert(point);
cv_.notify_all();
}

@ -8,8 +8,9 @@
#include <condition_variable>
#include <mutex>
#include <string>
#include <unordered_set>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
// This is only set from db_stress.cc and for testing only.
@ -62,13 +63,21 @@ class SyncPoint {
public:
static SyncPoint* GetInstance();
struct Dependency {
struct SyncPointPair {
std::string predecessor;
std::string successor;
};
// call once at the beginning of a test to setup the dependency between
// sync points
void LoadDependency(const std::vector<Dependency>& dependencies);
void LoadDependency(const std::vector<SyncPointPair>& dependencies);
// call once at the beginning of a test to setup the dependency between
// sync points and setup markers indicating the successor is only enabled
// when it is processed on the same thread as the predecessor.
// When adding a marker, it implicitly adds a dependency for the marker pair.
void LoadDependencyAndMarkers(const std::vector<SyncPointPair>& dependencies,
const std::vector<SyncPointPair>& markers);
// Set up a call back function in sync point.
void SetCallBack(const std::string point,
@ -95,11 +104,14 @@ class SyncPoint {
private:
bool PredecessorsAllCleared(const std::string& point);
bool DisabledByMarker(const std::string& point, std::thread::id thread_id);
// successor/predecessor map loaded from LoadDependency
std::unordered_map<std::string, std::vector<std::string>> successors_;
std::unordered_map<std::string, std::vector<std::string>> predecessors_;
std::unordered_map<std::string, std::function<void(void*)> > callbacks_;
std::unordered_map<std::string, std::vector<std::string> > markers_;
std::unordered_map<std::string, std::thread::id> marked_thread_id_;
std::mutex mutex_;
std::condition_variable cv_;

Loading…
Cancel
Save