From cda5fde2d96624df38afc7f02b6b3e699648c62d Mon Sep 17 00:00:00 2001 From: Andrew Kryczka Date: Wed, 10 May 2017 18:16:31 -0700 Subject: [PATCH] CoreLocalArray class Summary: Moved the logic for core-local array out of ConcurrentArena and into a separate class because I want to reuse it for core-local stats. Closes https://github.com/facebook/rocksdb/pull/2256 Differential Revision: D5011518 Pulled By: ajkr fbshipit-source-id: a75a7b8f7b7a42fd6273489ada405f14c6be196a --- util/concurrent_arena.cc | 25 ++++-------- util/concurrent_arena.h | 25 ++++++------ util/core_local.h | 84 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 31 deletions(-) create mode 100644 util/core_local.h diff --git a/util/concurrent_arena.cc b/util/concurrent_arena.cc index bd96fde82..7e4310080 100644 --- a/util/concurrent_arena.cc +++ b/util/concurrent_arena.cc @@ -18,35 +18,24 @@ namespace rocksdb { #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL -__thread uint32_t ConcurrentArena::tls_cpuid = 0; +__thread size_t ConcurrentArena::tls_cpuid = 0; #endif ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size) - : shard_block_size_(block_size / 8), arena_(block_size, huge_page_size) { - // find a power of two >= num_cpus and >= 8 - auto num_cpus = std::thread::hardware_concurrency(); - index_mask_ = 7; - while (index_mask_ + 1 < num_cpus) { - index_mask_ = index_mask_ * 2 + 1; - } - - shards_.reset(new Shard[index_mask_ + 1]); + : shard_block_size_(block_size / 8), + shards_(), + arena_(block_size, huge_page_size) { Fixup(); } ConcurrentArena::Shard* ConcurrentArena::Repick() { - int cpuid = port::PhysicalCoreID(); - if (UNLIKELY(cpuid < 0)) { - // cpu id unavailable, just pick randomly - cpuid = - Random::GetTLSInstance()->Uniform(static_cast(index_mask_) + 1); - } + auto shard_and_index = shards_.AccessElementAndIndex(); #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL // even if we are cpu 0, use a non-zero tls_cpuid so we can tell we // have repicked - tls_cpuid = cpuid | (static_cast(index_mask_) + 1); + tls_cpuid = shard_and_index.second | shards_.Size(); #endif - return &shards_[cpuid & index_mask_]; + return shard_and_index.first; } } // namespace rocksdb diff --git a/util/concurrent_arena.h b/util/concurrent_arena.h index 2a3d221e4..d2d448dc8 100644 --- a/util/concurrent_arena.h +++ b/util/concurrent_arena.h @@ -16,6 +16,7 @@ #include "port/likely.h" #include "util/allocator.h" #include "util/arena.h" +#include "util/core_local.h" #include "util/mutexlock.h" #include "util/thread_local.h" @@ -65,9 +66,7 @@ class ConcurrentArena : public Allocator { size_t ApproximateMemoryUsage() const { std::unique_lock lock(arena_mutex_, std::defer_lock); - if (index_mask_ != 0) { - lock.lock(); - } + lock.lock(); return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused(); } @@ -97,18 +96,16 @@ class ConcurrentArena : public Allocator { }; #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL - static __thread uint32_t tls_cpuid; + static __thread size_t tls_cpuid; #else - enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 }; + enum ZeroFirstEnum : size_t { tls_cpuid = 0 }; #endif char padding0[56] ROCKSDB_FIELD_UNUSED; size_t shard_block_size_; - // shards_[i & index_mask_] is valid - size_t index_mask_; - std::unique_ptr shards_; + CoreLocalArray shards_; Arena arena_; mutable SpinMutex arena_mutex_; @@ -122,15 +119,16 @@ class ConcurrentArena : public Allocator { size_t ShardAllocatedAndUnused() const { size_t total = 0; - for (size_t i = 0; i <= index_mask_; ++i) { - total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed); + for (size_t i = 0; i < shards_.Size(); ++i) { + total += shards_.AccessAtCore(i)->allocated_and_unused_.load( + std::memory_order_relaxed); } return total; } template char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { - uint32_t cpu; + size_t cpu; // Go directly to the arena if the allocation is too large, or if // we've never needed to Repick() and the arena mutex is available @@ -139,7 +137,8 @@ class ConcurrentArena : public Allocator { std::unique_lock arena_lock(arena_mutex_, std::defer_lock); if (bytes > shard_block_size_ / 4 || force_arena || ((cpu = tls_cpuid) == 0 && - !shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) && + !shards_.AccessAtCore(0)->allocated_and_unused_.load( + std::memory_order_relaxed) && arena_lock.try_lock())) { if (!arena_lock.owns_lock()) { arena_lock.lock(); @@ -150,7 +149,7 @@ class ConcurrentArena : public Allocator { } // pick a shard from which to allocate - Shard* s = &shards_[cpu & index_mask_]; + Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1)); if (!s->mutex.try_lock()) { s = Repick(); s->mutex.lock(); diff --git a/util/core_local.h b/util/core_local.h new file mode 100644 index 000000000..806584d46 --- /dev/null +++ b/util/core_local.h @@ -0,0 +1,84 @@ +// Copyright (c) 2017-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// This source code is also licensed under the GPLv2 license found in the +// COPYING file in the root directory of this source tree. + +#pragma once + +#include "port/likely.h" +#include "port/port.h" +#include "util/random.h" + +#include +#include +#include + +namespace rocksdb { + +// An array of core-local values. Ideally the value type, T, is cache aligned to +// prevent false sharing. +template +class CoreLocalArray { + public: + CoreLocalArray(); + + size_t Size() const; + // returns pointer to the element corresponding to the core that the thread + // currently runs on. + T* Access() const; + // same as above, but also returns the core index, which the client can cache + // to reduce how often core ID needs to be retrieved. Only do this if some + // inaccuracy is tolerable, as the thread may migrate to a different core. + std::pair AccessElementAndIndex() const; + // returns pointer to element for the specified core index. This can be used, + // e.g., for aggregation, or if the client caches core index. + T* AccessAtCore(size_t core_idx) const; + + private: + std::unique_ptr data_; + size_t size_shift_; +}; + +template +CoreLocalArray::CoreLocalArray() { + unsigned int num_cpus = std::thread::hardware_concurrency(); + // find a power of two >= num_cpus and >= 8 + size_shift_ = 3; + while (1u << size_shift_ < num_cpus) { + ++size_shift_; + } + data_.reset(new T[1 << size_shift_]); +} + +template +size_t CoreLocalArray::Size() const { + return 1u << size_shift_; +} + +template +T* CoreLocalArray::Access() const { + return AccessElementAndIndex().first; +} + +template +std::pair CoreLocalArray::AccessElementAndIndex() const { + int cpuid = port::PhysicalCoreID(); + size_t core_idx; + if (UNLIKELY(cpuid < 0)) { + // cpu id unavailable, just pick randomly + core_idx = Random::GetTLSInstance()->Uniform(1 << size_shift_); + } else { + core_idx = static_cast(cpuid & ((1 << size_shift_) - 1)); + } + return {AccessAtCore(core_idx), core_idx}; +} + +template +T* CoreLocalArray::AccessAtCore(size_t core_idx) const { + assert(core_idx < 1u << size_shift_); + return &data_[core_idx]; +} + +} // namespace rocksdb