From e5fa4944fcba1df6ac414858ca36d64cefa15d0a Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Fri, 7 Mar 2014 14:43:22 -0800 Subject: [PATCH] use CAS when returning SuperVersion to ThreadLocal Summary: Add a check at the end of GetImpl to release SuperVersion if it becomes obsolete. Also do Scrape() inside InstallSuperVersion so it happens more frequent. Test Plan: make all check running asan_check now Reviewers: igor, haobo, sdong, dhruba Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D16641 --- db/db_impl.cc | 53 ++++++++++++++++++++++++++++-------- db/db_impl.h | 13 +++++++++ include/rocksdb/statistics.h | 6 ++-- util/thread_local.cc | 25 ++++++++++++++--- util/thread_local.h | 20 ++++++++++---- util/thread_local_test.cc | 20 ++++++++++++-- 6 files changed, 112 insertions(+), 25 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index b8942c42a..99cfc6e6c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -63,6 +63,10 @@ namespace rocksdb { +int DBImpl::SuperVersion::dummy = 0; +void* const DBImpl::SuperVersion::kSVInUse = &DBImpl::SuperVersion::dummy; +void* const DBImpl::SuperVersion::kSVObsolete = nullptr; + void DumpLeveldbBuildVersion(Logger * log); // Information kept for every waiting writer @@ -1327,10 +1331,6 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, if (s.ok()) { InstallSuperVersion(deletion_state); - // Reset SuperVersions cached in thread local storage - if (options_.allow_thread_local) { - ResetThreadLocalSuperVersions(&deletion_state); - } if (madeProgress) { *madeProgress = 1; } @@ -2874,6 +2874,10 @@ void DBImpl::InstallSuperVersion(DeletionState& deletion_state) { SuperVersion* old_superversion = InstallSuperVersion(new_superversion); deletion_state.new_superversion = nullptr; deletion_state.superversions_to_free.push_back(old_superversion); + // Reset SuperVersions cached in thread local storage + if (options_.allow_thread_local) { + ResetThreadLocalSuperVersions(&deletion_state); + } } DBImpl::SuperVersion* DBImpl::InstallSuperVersion( @@ -2896,9 +2900,12 @@ DBImpl::SuperVersion* DBImpl::InstallSuperVersion( void DBImpl::ResetThreadLocalSuperVersions(DeletionState* deletion_state) { mutex_.AssertHeld(); autovector sv_ptrs; - local_sv_->Scrape(&sv_ptrs); + local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete); for (auto ptr : sv_ptrs) { assert(ptr); + if (ptr == SuperVersion::kSVInUse) { + continue; + } auto sv = static_cast(ptr); if (static_cast(ptr)->Unref()) { sv->Cleanup(); @@ -2936,10 +2943,17 @@ Status DBImpl::GetImpl(const ReadOptions& options, // is being used while a new SuperVersion is installed, the cached // SuperVersion can become stale. It will eventually get refreshed either // on the next GetImpl() call or next SuperVersion installation. - sv = static_cast(local_sv_->Swap(nullptr)); - if (!sv || sv->version_number != - super_version_number_.load(std::memory_order_relaxed)) { - RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_UPDATES); + void* ptr = local_sv_->Swap(SuperVersion::kSVInUse); + // Invariant: + // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage + // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage + // should only keep kSVInUse during a GetImpl. + assert(ptr != SuperVersion::kSVInUse); + sv = static_cast(ptr); + if (sv == SuperVersion::kSVObsolete || + sv->version_number != super_version_number_.load( + std::memory_order_relaxed)) { + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_ACQUIRES); SuperVersion* sv_to_delete = nullptr; if (sv && sv->Unref()) { @@ -2999,11 +3013,25 @@ Status DBImpl::GetImpl(const ReadOptions& options, mutex_.Unlock(); } - // Release SuperVersion + bool unref_sv = true; if (LIKELY(options_.allow_thread_local)) { // Put the SuperVersion back - local_sv_->Reset(static_cast(sv)); - } else { + void* expected = SuperVersion::kSVInUse; + if (local_sv_->CompareAndSwap(static_cast(sv), expected)) { + // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal + // storage has not been altered and no Scrape has happend. The + // SuperVersion is still current. + unref_sv = false; + } else { + // ThreadLocal scrape happened in the process of this GetImpl call (after + // thread local Swap() at the beginning and before CompareAndSwap()). + // This means the SuperVersion it holds is obsolete. + assert(expected == SuperVersion::kSVObsolete); + } + } + + if (unref_sv) { + // Release SuperVersion bool delete_sv = false; if (sv->Unref()) { mutex_.Lock(); @@ -3014,6 +3042,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, if (delete_sv) { delete sv; } + RecordTick(options_.statistics.get(), NUMBER_SUPERVERSION_RELEASES); } // Note, tickers are atomic now - no lock protection needed any more. diff --git a/db/db_impl.h b/db/db_impl.h index c27dac849..2877317cb 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -174,9 +174,22 @@ class DBImpl : public DB { void Cleanup(); void Init(MemTable* new_mem, MemTableListVersion* new_imm, Version* new_current); + + // The value of dummy is not actually used. kSVInUse takes its address as a + // mark in the thread local storage to indicate the SuperVersion is in use + // by thread. This way, the value of kSVInUse is guaranteed to have no + // conflict with SuperVersion object address and portable on different + // platform. + static int dummy; + static void* const kSVInUse; + static void* const kSVObsolete; }; static void SuperVersionUnrefHandle(void* ptr) { + // UnrefHandle is called when a thread exists or a ThreadLocalPtr gets + // destroyed. When former happens, the thread shouldn't see kSVInUse. + // When latter happens, we are in ~DBImpl(), no get should happen as well. + assert(ptr != SuperVersion::kSVInUse); DBImpl::SuperVersion* sv = static_cast(ptr); if (sv->Unref()) { sv->db->mutex_.Lock(); diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 82cc7133f..d076f6f76 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -122,7 +122,8 @@ enum Tickers { // Number of table's properties loaded directly from file, without creating // table reader object. NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, - NUMBER_SUPERVERSION_UPDATES, + NUMBER_SUPERVERSION_ACQUIRES, + NUMBER_SUPERVERSION_RELEASES, TICKER_ENUM_MAX }; @@ -178,7 +179,8 @@ const std::vector> TickersNameMap = { {COMPACT_WRITE_BYTES, "rocksdb.compact.write.bytes"}, {NUMBER_DIRECT_LOAD_TABLE_PROPERTIES, "rocksdb.number.direct.load.table.properties"}, - {NUMBER_SUPERVERSION_UPDATES, "rocksdb.number.superversion_updates"}, + {NUMBER_SUPERVERSION_ACQUIRES, "rocksdb.number.superversion_acquires"}, + {NUMBER_SUPERVERSION_RELEASES, "rocksdb.number.superversion_releases"}, }; /** diff --git a/util/thread_local.cc b/util/thread_local.cc index 2e5d3618b..1b4220b8f 100644 --- a/util/thread_local.cc +++ b/util/thread_local.cc @@ -138,12 +138,25 @@ void* ThreadLocalPtr::StaticMeta::Swap(uint32_t id, void* ptr) { return tls->entries[id].ptr.exchange(ptr, std::memory_order_relaxed); } -void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector* ptrs) { +bool ThreadLocalPtr::StaticMeta::CompareAndSwap(uint32_t id, void* ptr, + void*& expected) { + auto* tls = GetThreadLocal(); + if (UNLIKELY(id >= tls->entries.size())) { + // Need mutex to protect entries access within ReclaimId + MutexLock l(&mutex_); + tls->entries.resize(id + 1); + } + return tls->entries[id].ptr.compare_exchange_strong(expected, ptr, + std::memory_order_relaxed, std::memory_order_relaxed); +} + +void ThreadLocalPtr::StaticMeta::Scrape(uint32_t id, autovector* ptrs, + void* const replacement) { MutexLock l(&mutex_); for (ThreadData* t = head_.next; t != &head_; t = t->next) { if (id < t->entries.size()) { void* ptr = - t->entries[id].ptr.exchange(nullptr, std::memory_order_relaxed); + t->entries[id].ptr.exchange(replacement, std::memory_order_relaxed); if (ptr != nullptr) { ptrs->push_back(ptr); } @@ -225,8 +238,12 @@ void* ThreadLocalPtr::Swap(void* ptr) { return StaticMeta::Instance()->Swap(id_, ptr); } -void ThreadLocalPtr::Scrape(autovector* ptrs) { - StaticMeta::Instance()->Scrape(id_, ptrs); +bool ThreadLocalPtr::CompareAndSwap(void* ptr, void*& expected) { + return StaticMeta::Instance()->CompareAndSwap(id_, ptr, expected); +} + +void ThreadLocalPtr::Scrape(autovector* ptrs, void* const replacement) { + StaticMeta::Instance()->Scrape(id_, ptrs, replacement); } } // namespace rocksdb diff --git a/util/thread_local.h b/util/thread_local.h index d6fc5f085..d1434e3e5 100644 --- a/util/thread_local.h +++ b/util/thread_local.h @@ -47,9 +47,15 @@ class ThreadLocalPtr { // Atomically swap the supplied ptr and return the previous value void* Swap(void* ptr); - // Return non-nullptr data for all existing threads and reset them - // to nullptr - void Scrape(autovector* ptrs); + // Atomically compare the stored value with expected. Set the new + // pointer value to thread local only if the comparision is true. + // Otherwise, expected returns the stored value. + // Return true on success, false on failure + bool CompareAndSwap(void* ptr, void*& expected); + + // Reset all thread local data to replacement, and return non-nullptr + // data for all existing threads + void Scrape(autovector* ptrs, void* const replacement); protected: struct Entry { @@ -99,8 +105,12 @@ class ThreadLocalPtr { void Reset(uint32_t id, void* ptr); // Atomically swap the supplied ptr and return the previous value void* Swap(uint32_t id, void* ptr); - // Return data for all existing threads and return them to nullptr - void Scrape(uint32_t id, autovector* ptrs); + // Atomically compare and swap the provided value only if it equals + // to expected value. + bool CompareAndSwap(uint32_t id, void* ptr, void*& expected); + // Reset all thread local data to replacement, and return non-nullptr + // data for all existing threads + void Scrape(uint32_t id, autovector* ptrs, void* const replacement); // Register the UnrefHandler for id void SetHandler(uint32_t id, UnrefHandler handler); diff --git a/util/thread_local_test.cc b/util/thread_local_test.cc index 96e35d959..b1a865d5e 100644 --- a/util/thread_local_test.cc +++ b/util/thread_local_test.cc @@ -435,8 +435,8 @@ TEST(ThreadLocalTest, Scrape) { // Scrape all thread local data. No unref at thread // exit or ThreadLocalPtr destruction autovector ptrs; - p.tls1.Scrape(&ptrs); - p.tls2->Scrape(&ptrs); + p.tls1.Scrape(&ptrs, nullptr); + p.tls2->Scrape(&ptrs, nullptr); delete p.tls2; // Signal to exit mu.Lock(); @@ -449,6 +449,22 @@ TEST(ThreadLocalTest, Scrape) { } } +TEST(ThreadLocalTest, CompareAndSwap) { + ThreadLocalPtr tls; + ASSERT_TRUE(tls.Swap(reinterpret_cast(1)) == nullptr); + void* expected = reinterpret_cast(1); + // Swap in 2 + ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast(2), expected)); + expected = reinterpret_cast(100); + // Fail Swap, still 2 + ASSERT_TRUE(!tls.CompareAndSwap(reinterpret_cast(2), expected)); + ASSERT_EQ(expected, reinterpret_cast(2)); + // Swap in 3 + expected = reinterpret_cast(2); + ASSERT_TRUE(tls.CompareAndSwap(reinterpret_cast(3), expected)); + ASSERT_EQ(tls.Get(), reinterpret_cast(3)); +} + } // namespace rocksdb int main(int argc, char** argv) {