CoreLocalArray class

Summary:
Moved the logic for core-local array out of ConcurrentArena and into a separate class because I want to reuse it for core-local stats.
Closes https://github.com/facebook/rocksdb/pull/2256

Differential Revision: D5011518

Pulled By: ajkr

fbshipit-source-id: a75a7b8f7b7a42fd6273489ada405f14c6be196a
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 93949667cc
commit cda5fde2d9
  1. 25
      util/concurrent_arena.cc
  2. 25
      util/concurrent_arena.h
  3. 84
      util/core_local.h

@ -18,35 +18,24 @@
namespace rocksdb { namespace rocksdb {
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
__thread uint32_t ConcurrentArena::tls_cpuid = 0; __thread size_t ConcurrentArena::tls_cpuid = 0;
#endif #endif
ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size) ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size)
: shard_block_size_(block_size / 8), arena_(block_size, huge_page_size) { : shard_block_size_(block_size / 8),
// find a power of two >= num_cpus and >= 8 shards_(),
auto num_cpus = std::thread::hardware_concurrency(); arena_(block_size, huge_page_size) {
index_mask_ = 7;
while (index_mask_ + 1 < num_cpus) {
index_mask_ = index_mask_ * 2 + 1;
}
shards_.reset(new Shard[index_mask_ + 1]);
Fixup(); Fixup();
} }
ConcurrentArena::Shard* ConcurrentArena::Repick() { ConcurrentArena::Shard* ConcurrentArena::Repick() {
int cpuid = port::PhysicalCoreID(); auto shard_and_index = shards_.AccessElementAndIndex();
if (UNLIKELY(cpuid < 0)) {
// cpu id unavailable, just pick randomly
cpuid =
Random::GetTLSInstance()->Uniform(static_cast<int>(index_mask_) + 1);
}
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
// even if we are cpu 0, use a non-zero tls_cpuid so we can tell we // even if we are cpu 0, use a non-zero tls_cpuid so we can tell we
// have repicked // have repicked
tls_cpuid = cpuid | (static_cast<int>(index_mask_) + 1); tls_cpuid = shard_and_index.second | shards_.Size();
#endif #endif
return &shards_[cpuid & index_mask_]; return shard_and_index.first;
} }
} // namespace rocksdb } // namespace rocksdb

@ -16,6 +16,7 @@
#include "port/likely.h" #include "port/likely.h"
#include "util/allocator.h" #include "util/allocator.h"
#include "util/arena.h" #include "util/arena.h"
#include "util/core_local.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/thread_local.h" #include "util/thread_local.h"
@ -65,9 +66,7 @@ class ConcurrentArena : public Allocator {
size_t ApproximateMemoryUsage() const { size_t ApproximateMemoryUsage() const {
std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock); std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
if (index_mask_ != 0) { lock.lock();
lock.lock();
}
return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused(); return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
} }
@ -97,18 +96,16 @@ class ConcurrentArena : public Allocator {
}; };
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL #ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
static __thread uint32_t tls_cpuid; static __thread size_t tls_cpuid;
#else #else
enum ZeroFirstEnum : uint32_t { tls_cpuid = 0 }; enum ZeroFirstEnum : size_t { tls_cpuid = 0 };
#endif #endif
char padding0[56] ROCKSDB_FIELD_UNUSED; char padding0[56] ROCKSDB_FIELD_UNUSED;
size_t shard_block_size_; size_t shard_block_size_;
// shards_[i & index_mask_] is valid CoreLocalArray<Shard> shards_;
size_t index_mask_;
std::unique_ptr<Shard[]> shards_;
Arena arena_; Arena arena_;
mutable SpinMutex arena_mutex_; mutable SpinMutex arena_mutex_;
@ -122,15 +119,16 @@ class ConcurrentArena : public Allocator {
size_t ShardAllocatedAndUnused() const { size_t ShardAllocatedAndUnused() const {
size_t total = 0; size_t total = 0;
for (size_t i = 0; i <= index_mask_; ++i) { for (size_t i = 0; i < shards_.Size(); ++i) {
total += shards_[i].allocated_and_unused_.load(std::memory_order_relaxed); total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
std::memory_order_relaxed);
} }
return total; return total;
} }
template <typename Func> template <typename Func>
char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) { char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
uint32_t cpu; size_t cpu;
// Go directly to the arena if the allocation is too large, or if // Go directly to the arena if the allocation is too large, or if
// we've never needed to Repick() and the arena mutex is available // we've never needed to Repick() and the arena mutex is available
@ -139,7 +137,8 @@ class ConcurrentArena : public Allocator {
std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock); std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
if (bytes > shard_block_size_ / 4 || force_arena || if (bytes > shard_block_size_ / 4 || force_arena ||
((cpu = tls_cpuid) == 0 && ((cpu = tls_cpuid) == 0 &&
!shards_[0].allocated_and_unused_.load(std::memory_order_relaxed) && !shards_.AccessAtCore(0)->allocated_and_unused_.load(
std::memory_order_relaxed) &&
arena_lock.try_lock())) { arena_lock.try_lock())) {
if (!arena_lock.owns_lock()) { if (!arena_lock.owns_lock()) {
arena_lock.lock(); arena_lock.lock();
@ -150,7 +149,7 @@ class ConcurrentArena : public Allocator {
} }
// pick a shard from which to allocate // pick a shard from which to allocate
Shard* s = &shards_[cpu & index_mask_]; Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
if (!s->mutex.try_lock()) { if (!s->mutex.try_lock()) {
s = Repick(); s = Repick();
s->mutex.lock(); s->mutex.lock();

@ -0,0 +1,84 @@
// Copyright (c) 2017-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
#pragma once
#include "port/likely.h"
#include "port/port.h"
#include "util/random.h"
#include <cstddef>
#include <thread>
#include <vector>
namespace rocksdb {
// An array of core-local values. Ideally the value type, T, is cache aligned to
// prevent false sharing.
template<typename T>
class CoreLocalArray {
public:
CoreLocalArray();
size_t Size() const;
// returns pointer to the element corresponding to the core that the thread
// currently runs on.
T* Access() const;
// same as above, but also returns the core index, which the client can cache
// to reduce how often core ID needs to be retrieved. Only do this if some
// inaccuracy is tolerable, as the thread may migrate to a different core.
std::pair<T*, size_t> AccessElementAndIndex() const;
// returns pointer to element for the specified core index. This can be used,
// e.g., for aggregation, or if the client caches core index.
T* AccessAtCore(size_t core_idx) const;
private:
std::unique_ptr<T[]> data_;
size_t size_shift_;
};
template<typename T>
CoreLocalArray<T>::CoreLocalArray() {
unsigned int num_cpus = std::thread::hardware_concurrency();
// find a power of two >= num_cpus and >= 8
size_shift_ = 3;
while (1u << size_shift_ < num_cpus) {
++size_shift_;
}
data_.reset(new T[1 << size_shift_]);
}
template<typename T>
size_t CoreLocalArray<T>::Size() const {
return 1u << size_shift_;
}
template<typename T>
T* CoreLocalArray<T>::Access() const {
return AccessElementAndIndex().first;
}
template<typename T>
std::pair<T*, size_t> CoreLocalArray<T>::AccessElementAndIndex() const {
int cpuid = port::PhysicalCoreID();
size_t core_idx;
if (UNLIKELY(cpuid < 0)) {
// cpu id unavailable, just pick randomly
core_idx = Random::GetTLSInstance()->Uniform(1 << size_shift_);
} else {
core_idx = static_cast<size_t>(cpuid & ((1 << size_shift_) - 1));
}
return {AccessAtCore(core_idx), core_idx};
}
template<typename T>
T* CoreLocalArray<T>::AccessAtCore(size_t core_idx) const {
assert(core_idx < 1u << size_shift_);
return &data_[core_idx];
}
} // namespace rocksdb
Loading…
Cancel
Save