Move ThreadLocal implementation into .cc

Summary: Closes https://github.com/facebook/rocksdb/pull/1829

Differential Revision: D4502314

Pulled By: siying

fbshipit-source-id: f46fac1
main
Dmitri Smirnov 8 years ago committed by Facebook Github Bot
parent 71d2496afc
commit add8b50cc9
  1. 145
      util/thread_local.cc
  2. 135
      util/thread_local.h
  3. 6
      util/thread_local_test.cc

@ -14,8 +14,141 @@
namespace rocksdb { namespace rocksdb {
struct Entry {
Entry() : ptr(nullptr) {}
Entry(const Entry& e) : ptr(e.ptr.load(std::memory_order_relaxed)) {}
std::atomic<void*> ptr;
};
class StaticMeta;
// This is the structure that is declared as "thread_local" storage.
// The vector keep list of atomic pointer for all instances for "current"
// thread. The vector is indexed by an Id that is unique in process and
// associated with one ThreadLocalPtr instance. The Id is assigned by a
// global StaticMeta singleton. So if we instantiated 3 ThreadLocalPtr
// instances, each thread will have a ThreadData with a vector of size 3:
// ---------------------------------------------------
// | | instance 1 | instance 2 | instnace 3 |
// ---------------------------------------------------
// | thread 1 | void* | void* | void* | <- ThreadData
// ---------------------------------------------------
// | thread 2 | void* | void* | void* | <- ThreadData
// ---------------------------------------------------
// | thread 3 | void* | void* | void* | <- ThreadData
// ---------------------------------------------------
struct ThreadData {
explicit ThreadData(ThreadLocalPtr::StaticMeta* _inst) : entries(), inst(_inst) {}
std::vector<Entry> entries;
ThreadData* next;
ThreadData* prev;
ThreadLocalPtr::StaticMeta* inst;
};
class ThreadLocalPtr::StaticMeta {
public:
StaticMeta();
// Return the next available Id
uint32_t GetId();
// Return the next available Id without claiming it
uint32_t PeekId() const;
// Return the given Id back to the free pool. This also triggers
// UnrefHandler for associated pointer value (if not NULL) for all threads.
void ReclaimId(uint32_t id);
// Return the pointer value for the given id for the current thread.
void* Get(uint32_t id) const;
// Reset the pointer value for the given id for the current thread.
void Reset(uint32_t id, void* ptr);
// Atomically swap the supplied ptr and return the previous value
void* Swap(uint32_t id, void* ptr);
// Atomically compare and swap the provided value only if it equals
// to expected value.
bool CompareAndSwap(uint32_t id, void* ptr, void*& expected);
// Reset all thread local data to replacement, and return non-nullptr
// data for all existing threads
void Scrape(uint32_t id, autovector<void*>* ptrs, void* const replacement);
// Update res by applying func on each thread-local value. Holds a lock that
// prevents unref handler from running during this call, but clients must
// still provide external synchronization since the owning thread can
// access the values without internal locking, e.g., via Get() and Reset().
void Fold(uint32_t id, FoldFunc func, void* res);
// Register the UnrefHandler for id
void SetHandler(uint32_t id, UnrefHandler handler);
// protect inst, next_instance_id_, free_instance_ids_, head_,
// ThreadData.entries
//
// Note that here we prefer function static variable instead of the usual
// global static variable. The reason is that c++ destruction order of
// static variables in the reverse order of their construction order.
// However, C++ does not guarantee any construction order when global
// static variables are defined in different files, while the function
// static variables are initialized when their function are first called.
// As a result, the construction order of the function static variables
// can be controlled by properly invoke their first function calls in
// the right order.
//
// For instance, the following function contains a function static
// variable. We place a dummy function call of this inside
// Env::Default() to ensure the construction order of the construction
// order.
static port::Mutex* Mutex();
// Returns the member mutex of the current StaticMeta. In general,
// Mutex() should be used instead of this one. However, in case where
// the static variable inside Instance() goes out of scope, MemberMutex()
// should be used. One example is OnThreadExit() function.
port::Mutex* MemberMutex() { return &mutex_; }
private:
// Get UnrefHandler for id with acquiring mutex
// REQUIRES: mutex locked
UnrefHandler GetHandler(uint32_t id);
// Triggered before a thread terminates
static void OnThreadExit(void* ptr);
// Add current thread's ThreadData to the global chain
// REQUIRES: mutex locked
void AddThreadData(ThreadData* d);
// Remove current thread's ThreadData from the global chain
// REQUIRES: mutex locked
void RemoveThreadData(ThreadData* d);
static ThreadData* GetThreadLocal();
uint32_t next_instance_id_;
// Used to recycle Ids in case ThreadLocalPtr is instantiated and destroyed
// frequently. This also prevents it from blowing up the vector space.
autovector<uint32_t> free_instance_ids_;
// Chain all thread local structure together. This is necessary since
// when one ThreadLocalPtr gets destroyed, we need to loop over each
// thread's version of pointer corresponding to that instance and
// call UnrefHandler for it.
ThreadData head_;
std::unordered_map<uint32_t, UnrefHandler> handler_map_;
// The private mutex. Developers should always use Mutex() instead of
// using this variable directly.
port::Mutex mutex_;
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
__thread ThreadLocalPtr::ThreadData* ThreadLocalPtr::StaticMeta::tls_ = nullptr; // Thread local storage
static __thread ThreadData* tls_;
#endif
// Used to make thread exit trigger possible if !defined(OS_MACOSX).
// Otherwise, used to retrieve thread data.
pthread_key_t pthread_key_;
};
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
__thread ThreadData* ThreadLocalPtr::StaticMeta::tls_ = nullptr;
#endif #endif
// Windows doesn't support a per-thread destructor with its // Windows doesn't support a per-thread destructor with its
@ -205,7 +338,7 @@ ThreadLocalPtr::StaticMeta::StaticMeta() : next_instance_id_(0), head_(this) {
#endif #endif
} }
void ThreadLocalPtr::StaticMeta::AddThreadData(ThreadLocalPtr::ThreadData* d) { void ThreadLocalPtr::StaticMeta::AddThreadData(ThreadData* d) {
Mutex()->AssertHeld(); Mutex()->AssertHeld();
d->next = &head_; d->next = &head_;
d->prev = head_.prev; d->prev = head_.prev;
@ -214,14 +347,14 @@ void ThreadLocalPtr::StaticMeta::AddThreadData(ThreadLocalPtr::ThreadData* d) {
} }
void ThreadLocalPtr::StaticMeta::RemoveThreadData( void ThreadLocalPtr::StaticMeta::RemoveThreadData(
ThreadLocalPtr::ThreadData* d) { ThreadData* d) {
Mutex()->AssertHeld(); Mutex()->AssertHeld();
d->next->prev = d->prev; d->next->prev = d->prev;
d->prev->next = d->next; d->prev->next = d->next;
d->next = d->prev = d; d->next = d->prev = d;
} }
ThreadLocalPtr::ThreadData* ThreadLocalPtr::StaticMeta::GetThreadLocal() { ThreadData* ThreadLocalPtr::StaticMeta::GetThreadLocal() {
#ifndef ROCKSDB_SUPPORT_THREAD_LOCAL #ifndef ROCKSDB_SUPPORT_THREAD_LOCAL
// Make this local variable name look like a member variable so that we // Make this local variable name look like a member variable so that we
// can share all the code below // can share all the code below
@ -318,6 +451,10 @@ void ThreadLocalPtr::StaticMeta::Fold(uint32_t id, FoldFunc func, void* res) {
} }
} }
uint32_t ThreadLocalPtr::TEST_PeekId() {
return Instance()->PeekId();
}
void ThreadLocalPtr::StaticMeta::SetHandler(uint32_t id, UnrefHandler handler) { void ThreadLocalPtr::StaticMeta::SetHandler(uint32_t id, UnrefHandler handler) {
MutexLock l(Mutex()); MutexLock l(Mutex());
handler_map_[id] = handler; handler_map_[id] = handler;

@ -74,6 +74,10 @@ class ThreadLocalPtr {
// access the values without internal locking, e.g., via Get() and Reset(). // access the values without internal locking, e.g., via Get() and Reset().
void Fold(FoldFunc func, void* res); void Fold(FoldFunc func, void* res);
// Add here for testing
// Return the next available Id without claiming it
static uint32_t TEST_PeekId();
// Initialize the static singletons of the ThreadLocalPtr. // Initialize the static singletons of the ThreadLocalPtr.
// //
// If this function is not called, then the singletons will be // If this function is not called, then the singletons will be
@ -83,138 +87,9 @@ class ThreadLocalPtr {
// initialized will be no-op. // initialized will be no-op.
static void InitSingletons(); static void InitSingletons();
protected:
struct Entry {
Entry() : ptr(nullptr) {}
Entry(const Entry& e) : ptr(e.ptr.load(std::memory_order_relaxed)) {}
std::atomic<void*> ptr;
};
class StaticMeta; class StaticMeta;
// This is the structure that is declared as "thread_local" storage. private:
// The vector keep list of atomic pointer for all instances for "current"
// thread. The vector is indexed by an Id that is unique in process and
// associated with one ThreadLocalPtr instance. The Id is assigned by a
// global StaticMeta singleton. So if we instantiated 3 ThreadLocalPtr
// instances, each thread will have a ThreadData with a vector of size 3:
// ---------------------------------------------------
// | | instance 1 | instance 2 | instnace 3 |
// ---------------------------------------------------
// | thread 1 | void* | void* | void* | <- ThreadData
// ---------------------------------------------------
// | thread 2 | void* | void* | void* | <- ThreadData
// ---------------------------------------------------
// | thread 3 | void* | void* | void* | <- ThreadData
// ---------------------------------------------------
struct ThreadData {
explicit ThreadData(StaticMeta* _inst) : entries(), inst(_inst) {}
std::vector<Entry> entries;
ThreadData* next;
ThreadData* prev;
StaticMeta* inst;
};
class StaticMeta {
public:
StaticMeta();
// Return the next available Id
uint32_t GetId();
// Return the next available Id without claiming it
uint32_t PeekId() const;
// Return the given Id back to the free pool. This also triggers
// UnrefHandler for associated pointer value (if not NULL) for all threads.
void ReclaimId(uint32_t id);
// Return the pointer value for the given id for the current thread.
void* Get(uint32_t id) const;
// Reset the pointer value for the given id for the current thread.
void Reset(uint32_t id, void* ptr);
// Atomically swap the supplied ptr and return the previous value
void* Swap(uint32_t id, void* ptr);
// Atomically compare and swap the provided value only if it equals
// to expected value.
bool CompareAndSwap(uint32_t id, void* ptr, void*& expected);
// Reset all thread local data to replacement, and return non-nullptr
// data for all existing threads
void Scrape(uint32_t id, autovector<void*>* ptrs, void* const replacement);
// Update res by applying func on each thread-local value. Holds a lock that
// prevents unref handler from running during this call, but clients must
// still provide external synchronization since the owning thread can
// access the values without internal locking, e.g., via Get() and Reset().
void Fold(uint32_t id, FoldFunc func, void* res);
// Register the UnrefHandler for id
void SetHandler(uint32_t id, UnrefHandler handler);
// protect inst, next_instance_id_, free_instance_ids_, head_,
// ThreadData.entries
//
// Note that here we prefer function static variable instead of the usual
// global static variable. The reason is that c++ destruction order of
// static variables in the reverse order of their construction order.
// However, C++ does not guarantee any construction order when global
// static variables are defined in different files, while the function
// static variables are initialized when their function are first called.
// As a result, the construction order of the function static variables
// can be controlled by properly invoke their first function calls in
// the right order.
//
// For instance, the following function contains a function static
// variable. We place a dummy function call of this inside
// Env::Default() to ensure the construction order of the construction
// order.
static port::Mutex* Mutex();
// Returns the member mutex of the current StaticMeta. In general,
// Mutex() should be used instead of this one. However, in case where
// the static variable inside Instance() goes out of scope, MemberMutex()
// should be used. One example is OnThreadExit() function.
port::Mutex* MemberMutex() { return &mutex_; }
private:
// Get UnrefHandler for id with acquiring mutex
// REQUIRES: mutex locked
UnrefHandler GetHandler(uint32_t id);
// Triggered before a thread terminates
static void OnThreadExit(void* ptr);
// Add current thread's ThreadData to the global chain
// REQUIRES: mutex locked
void AddThreadData(ThreadData* d);
// Remove current thread's ThreadData from the global chain
// REQUIRES: mutex locked
void RemoveThreadData(ThreadData* d);
static ThreadData* GetThreadLocal();
uint32_t next_instance_id_;
// Used to recycle Ids in case ThreadLocalPtr is instantiated and destroyed
// frequently. This also prevents it from blowing up the vector space.
autovector<uint32_t> free_instance_ids_;
// Chain all thread local structure together. This is necessary since
// when one ThreadLocalPtr gets destroyed, we need to loop over each
// thread's version of pointer corresponding to that instance and
// call UnrefHandler for it.
ThreadData head_;
std::unordered_map<uint32_t, UnrefHandler> handler_map_;
// The private mutex. Developers should always use Mutex() instead of
// using this variable directly.
port::Mutex mutex_;
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
// Thread local storage
static __thread ThreadData* tls_;
#endif
// Used to make thread exit trigger possible if !defined(OS_MACOSX).
// Otherwise, used to retrieve thread data.
pthread_key_t pthread_key_;
};
static StaticMeta* Instance(); static StaticMeta* Instance();

@ -51,8 +51,10 @@ struct Params {
}; };
class IDChecker : public ThreadLocalPtr { class IDChecker : public ThreadLocalPtr {
public: public:
static uint32_t PeekId() { return Instance()->PeekId(); } static uint32_t PeekId() {
return TEST_PeekId();
}
}; };
} // anonymous namespace } // anonymous namespace

Loading…
Cancel
Save