You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/util/concurrent_arena.h

215 lines
7.4 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <atomic>
#include <memory>
#include <utility>
#include "port/likely.h"
#include "util/allocator.h"
#include "util/arena.h"
#include "util/core_local.h"
#include "util/mutexlock.h"
#include "util/thread_local.h"
// Only generate field unused warning for padding array, or build under
// GCC 4.8.1 will fail.
#ifdef __clang__
#define ROCKSDB_FIELD_UNUSED __attribute__((__unused__))
#else
#define ROCKSDB_FIELD_UNUSED
#endif // __clang__
namespace rocksdb {
class Logger;
// ConcurrentArena wraps an Arena. It makes it thread safe using a fast
// inlined spinlock, and adds small per-core allocation caches to avoid
// contention for small allocations. To avoid any memory waste from the
// per-core shards, they are kept small, they are lazily instantiated
// only if ConcurrentArena actually notices concurrent use, and they
// adjust their size so that there is no fragmentation waste when the
// shard blocks are allocated from the underlying main arena.
class ConcurrentArena : public Allocator {
public:
// block_size and huge_page_size are the same as for Arena (and are
// in fact just passed to the constructor of arena_. The core-local
// shards compute their shard_block_size as a fraction of block_size
// that varies according to the hardware concurrency level.
explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
AllocTracker* tracker = nullptr,
size_t huge_page_size = 0);
char* Allocate(size_t bytes) override {
return AllocateImpl(bytes, false /*force_arena*/,
[=]() { return arena_.Allocate(bytes); });
}
char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
Logger* logger = nullptr) override {
size_t rounded_up = ((bytes - 1) | (sizeof(void*) - 1)) + 1;
assert(rounded_up >= bytes && rounded_up < bytes + sizeof(void*) &&
(rounded_up % sizeof(void*)) == 0);
return AllocateImpl(rounded_up, huge_page_size != 0 /*force_arena*/, [=]() {
return arena_.AllocateAligned(rounded_up, huge_page_size, logger);
});
}
size_t ApproximateMemoryUsage() const {
std::unique_lock<SpinMutex> lock(arena_mutex_, std::defer_lock);
lock.lock();
return arena_.ApproximateMemoryUsage() - ShardAllocatedAndUnused();
}
size_t MemoryAllocatedBytes() const {
return memory_allocated_bytes_.load(std::memory_order_relaxed);
}
size_t AllocatedAndUnused() const {
return arena_allocated_and_unused_.load(std::memory_order_relaxed) +
ShardAllocatedAndUnused();
}
size_t IrregularBlockNum() const {
return irregular_block_num_.load(std::memory_order_relaxed);
}
size_t BlockSize() const override { return arena_.BlockSize(); }
private:
struct Shard {
char padding[40] ROCKSDB_FIELD_UNUSED;
mutable SpinMutex mutex;
char* free_begin_;
std::atomic<size_t> allocated_and_unused_;
Shard() : allocated_and_unused_(0) {}
};
#ifdef ROCKSDB_SUPPORT_THREAD_LOCAL
static __thread size_t tls_cpuid;
#else
enum ZeroFirstEnum : size_t { tls_cpuid = 0 };
#endif
char padding0[56] ROCKSDB_FIELD_UNUSED;
size_t shard_block_size_;
CoreLocalArray<Shard> shards_;
Arena arena_;
mutable SpinMutex arena_mutex_;
std::atomic<size_t> arena_allocated_and_unused_;
std::atomic<size_t> memory_allocated_bytes_;
std::atomic<size_t> irregular_block_num_;
char padding1[56] ROCKSDB_FIELD_UNUSED;
Shard* Repick();
size_t ShardAllocatedAndUnused() const {
size_t total = 0;
for (size_t i = 0; i < shards_.Size(); ++i) {
total += shards_.AccessAtCore(i)->allocated_and_unused_.load(
std::memory_order_relaxed);
}
return total;
}
template <typename Func>
char* AllocateImpl(size_t bytes, bool force_arena, const Func& func) {
size_t cpu;
// Go directly to the arena if the allocation is too large, or if
// we've never needed to Repick() and the arena mutex is available
// with no waiting. This keeps the fragmentation penalty of
// concurrency zero unless it might actually confer an advantage.
std::unique_lock<SpinMutex> arena_lock(arena_mutex_, std::defer_lock);
if (bytes > shard_block_size_ / 4 || force_arena ||
((cpu = tls_cpuid) == 0 &&
!shards_.AccessAtCore(0)->allocated_and_unused_.load(
std::memory_order_relaxed) &&
arena_lock.try_lock())) {
if (!arena_lock.owns_lock()) {
arena_lock.lock();
}
auto rv = func();
Fixup();
return rv;
}
// pick a shard from which to allocate
Shard* s = shards_.AccessAtCore(cpu & (shards_.Size() - 1));
if (!s->mutex.try_lock()) {
s = Repick();
s->mutex.lock();
}
std::unique_lock<SpinMutex> lock(s->mutex, std::adopt_lock);
size_t avail = s->allocated_and_unused_.load(std::memory_order_relaxed);
if (avail < bytes) {
// reload
std::lock_guard<SpinMutex> reload_lock(arena_mutex_);
// If the arena's current block is within a factor of 2 of the right
// size, we adjust our request to avoid arena waste.
auto exact = arena_allocated_and_unused_.load(std::memory_order_relaxed);
assert(exact == arena_.AllocatedAndUnused());
if (exact >= bytes && arena_.IsInInlineBlock()) {
// If we haven't exhausted arena's inline block yet, allocate from arena
// directly. This ensures that we'll do the first few small allocations
// without allocating any blocks.
// In particular this prevents empty memtables from using
// disproportionately large amount of memory: a memtable allocates on
// the order of 1 KB of memory when created; we wouldn't want to
// allocate a full arena block (typically a few megabytes) for that,
// especially if there are thousands of empty memtables.
auto rv = func();
Fixup();
return rv;
}
avail = exact >= shard_block_size_ / 2 && exact < shard_block_size_ * 2
? exact
: shard_block_size_;
s->free_begin_ = arena_.AllocateAligned(avail);
Fixup();
}
s->allocated_and_unused_.store(avail - bytes, std::memory_order_relaxed);
char* rv;
if ((bytes % sizeof(void*)) == 0) {
// aligned allocation from the beginning
rv = s->free_begin_;
s->free_begin_ += bytes;
} else {
// unaligned from the end
rv = s->free_begin_ + avail - bytes;
}
return rv;
}
void Fixup() {
arena_allocated_and_unused_.store(arena_.AllocatedAndUnused(),
std::memory_order_relaxed);
memory_allocated_bytes_.store(arena_.MemoryAllocatedBytes(),
std::memory_order_relaxed);
irregular_block_num_.store(arena_.IrregularBlockNum(),
std::memory_order_relaxed);
}
ConcurrentArena(const ConcurrentArena&) = delete;
ConcurrentArena& operator=(const ConcurrentArena&) = delete;
};
} // namespace rocksdb