revert perf_context and io_stats to __thread

Summary:
https://github.com/facebook/rocksdb/pull/2380 introduces a regression by replacing __thread with ThreadLocalPtr. Revert the thread local implementation back.
Closes https://github.com/facebook/rocksdb/pull/2485

Differential Revision: D5308050

Pulled By: lightmark

fbshipit-source-id: 2676e9c22edf76e8133d3f4c50e2711e11a95480
main
Aaron Gao 8 years ago committed by Facebook Github Bot
parent 5c97a7c066
commit 0025a36409
  1. 1
      include/rocksdb/perf_context.h
  2. 16
      monitoring/iostats_context.cc
  3. 23
      monitoring/perf_context.cc
  4. 5
      monitoring/perf_context_imp.h
  5. 65
      util/thread_local_test.cc

@ -154,7 +154,6 @@ struct PerfContext {
// if defined(NPERF_CONTEXT), then the pointer is not thread-local // if defined(NPERF_CONTEXT), then the pointer is not thread-local
PerfContext* get_perf_context(); PerfContext* get_perf_context();
} }
#endif #endif

@ -10,16 +10,16 @@
namespace rocksdb { namespace rocksdb {
ThreadLocalPtr iostats_context([](void* ptr) { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
auto* p = static_cast<IOStatsContext*>(ptr); __thread IOStatsContext iostats_context;
delete p; #endif
});
IOStatsContext* get_iostats_context() { IOStatsContext* get_iostats_context() {
if (iostats_context.Get() == nullptr) { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
iostats_context.Reset(static_cast<void*>(new IOStatsContext())); return &iostats_context;
} #else
return static_cast<IOStatsContext*>(iostats_context.Get()); return nullptr;
#endif
} }
void IOStatsContext::Reset() { void IOStatsContext::Reset() {

@ -8,27 +8,28 @@
#include <sstream> #include <sstream>
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "util/thread_local.h"
namespace rocksdb { namespace rocksdb {
#ifdef NPERF_CONTEXT #if defined(NPERF_CONTEXT) || !defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
PerfContext perf_context; PerfContext perf_context;
#else #else
ThreadLocalPtr perf_context([](void* ptr) { #if defined(OS_SOLARIS)
auto* p = static_cast<PerfContext*>(ptr); __thread PerfContext perf_context_;
delete p; #else
}); __thread PerfContext perf_context;
#endif
#endif #endif
PerfContext* get_perf_context() { PerfContext* get_perf_context() {
#ifdef NPERF_CONTEXT #if defined(NPERF_CONTEXT) || !defined(ROCKSDB_SUPPORT_THREAD_LOCAL)
return &perf_context; return &perf_context;
#else #else
if (perf_context.Get() == nullptr) { #if defined(OS_SOLARIS)
perf_context.Reset(static_cast<void*>(new PerfContext())); return &perf_context_;
} #else
return static_cast<PerfContext*>(perf_context.Get()); return &perf_context;
#endif
#endif #endif
} }

@ -44,7 +44,10 @@ namespace rocksdb {
#define PERF_TIMER_MEASURE(metric) perf_step_timer_##metric.Measure(); #define PERF_TIMER_MEASURE(metric) perf_step_timer_##metric.Measure();
// Increase metric value // Increase metric value
#define PERF_COUNTER_ADD(metric, value) get_perf_context()->metric += value; #define PERF_COUNTER_ADD(metric, value) \
if (perf_level >= PerfLevel::kEnableCount) { \
get_perf_context()->metric += value; \
}
#endif #endif

@ -67,54 +67,53 @@ TEST_F(ThreadLocalTest, UniqueIdTest) {
port::Mutex mu; port::Mutex mu;
port::CondVar cv(&mu); port::CondVar cv(&mu);
// perf_context and iostats_context take 2 ids ASSERT_EQ(IDChecker::PeekId(), 0u);
ASSERT_EQ(IDChecker::PeekId(), 2u);
// New ThreadLocal instance bumps id by 1 // New ThreadLocal instance bumps id by 1
{ {
// Id used 2 // Id used 0
Params p1(&mu, &cv, nullptr, 1u); Params p1(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 3u); ASSERT_EQ(IDChecker::PeekId(), 1u);
// Id used 3 // Id used 1
Params p2(&mu, &cv, nullptr, 1u); Params p2(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 4u); ASSERT_EQ(IDChecker::PeekId(), 2u);
// Id used 4 // Id used 2
Params p3(&mu, &cv, nullptr, 1u); Params p3(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 5u); ASSERT_EQ(IDChecker::PeekId(), 3u);
// Id used 5 // Id used 3
Params p4(&mu, &cv, nullptr, 1u); Params p4(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 6u); ASSERT_EQ(IDChecker::PeekId(), 4u);
} }
// id 5, 4, 3, 2 are in the free queue in order // id 3, 2, 1, 0 are in the free queue in order
ASSERT_EQ(IDChecker::PeekId(), 2u); ASSERT_EQ(IDChecker::PeekId(), 0u);
// pick up 2 // pick up 0
Params p1(&mu, &cv, nullptr, 1u); Params p1(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 3u); ASSERT_EQ(IDChecker::PeekId(), 1u);
// pick up 3 // pick up 1
Params* p2 = new Params(&mu, &cv, nullptr, 1u); Params* p2 = new Params(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 4u); ASSERT_EQ(IDChecker::PeekId(), 2u);
// pick up 4 // pick up 2
Params p3(&mu, &cv, nullptr, 1u); Params p3(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 5u); ASSERT_EQ(IDChecker::PeekId(), 3u);
// return up 3 // return up 1
delete p2; delete p2;
ASSERT_EQ(IDChecker::PeekId(), 1u);
// Now we have 3, 1 in queue
// pick up 1
Params p4(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 3u); ASSERT_EQ(IDChecker::PeekId(), 3u);
// Now we have 4, 2 in queue
// pick up 3 // pick up 3
Params p4(&mu, &cv, nullptr, 1u);
ASSERT_EQ(IDChecker::PeekId(), 5u);
// pick up 5
Params p5(&mu, &cv, nullptr, 1u); Params p5(&mu, &cv, nullptr, 1u);
// next new id // next new id
ASSERT_EQ(IDChecker::PeekId(), 6u); ASSERT_EQ(IDChecker::PeekId(), 4u);
// After exit, id sequence in queue: // After exit, id sequence in queue:
// 5, 4, 3, 2(, 1, 0) // 3, 1, 2, 0
} }
#endif // __clang_analyzer__ #endif // __clang_analyzer__
TEST_F(ThreadLocalTest, SequentialReadWriteTest) { TEST_F(ThreadLocalTest, SequentialReadWriteTest) {
// global id list carries over 5, 4, 3, 2 // global id list carries over 3, 1, 2, 0
ASSERT_EQ(IDChecker::PeekId(), 2u); ASSERT_EQ(IDChecker::PeekId(), 0u);
port::Mutex mu; port::Mutex mu;
port::CondVar cv(&mu); port::CondVar cv(&mu);
@ -144,7 +143,7 @@ TEST_F(ThreadLocalTest, SequentialReadWriteTest) {
}; };
for (int iter = 0; iter < 1024; ++iter) { for (int iter = 0; iter < 1024; ++iter) {
ASSERT_EQ(IDChecker::PeekId(), 3u); ASSERT_EQ(IDChecker::PeekId(), 1u);
// Another new thread, read/write should not see value from previous thread // Another new thread, read/write should not see value from previous thread
env_->StartThread(func, static_cast<void*>(&p)); env_->StartThread(func, static_cast<void*>(&p));
mu.Lock(); mu.Lock();
@ -152,13 +151,13 @@ TEST_F(ThreadLocalTest, SequentialReadWriteTest) {
cv.Wait(); cv.Wait();
} }
mu.Unlock(); mu.Unlock();
ASSERT_EQ(IDChecker::PeekId(), 3u); ASSERT_EQ(IDChecker::PeekId(), 1u);
} }
} }
TEST_F(ThreadLocalTest, ConcurrentReadWriteTest) { TEST_F(ThreadLocalTest, ConcurrentReadWriteTest) {
// global id list carries over 5, 4, 3, 2 // global id list carries over 3, 1, 2, 0
ASSERT_EQ(IDChecker::PeekId(), 2u); ASSERT_EQ(IDChecker::PeekId(), 0u);
ThreadLocalPtr tls2; ThreadLocalPtr tls2;
port::Mutex mu1; port::Mutex mu1;
@ -239,11 +238,11 @@ TEST_F(ThreadLocalTest, ConcurrentReadWriteTest) {
} }
mu2.Unlock(); mu2.Unlock();
ASSERT_EQ(IDChecker::PeekId(), 5u); ASSERT_EQ(IDChecker::PeekId(), 3u);
} }
TEST_F(ThreadLocalTest, Unref) { TEST_F(ThreadLocalTest, Unref) {
ASSERT_EQ(IDChecker::PeekId(), 2u); ASSERT_EQ(IDChecker::PeekId(), 0u);
auto unref = [](void* ptr) { auto unref = [](void* ptr) {
auto& p = *static_cast<Params*>(ptr); auto& p = *static_cast<Params*>(ptr);

Loading…
Cancel
Save