Make MemoryAllocator into a Customizable class (#8980)

Summary:
- Make MemoryAllocator and its implementations into a Customizable class.
- Added a "DefaultMemoryAllocator" which uses new and delete
- Added a "CountedMemoryAllocator" that counts the number of allocs and free
- Updated the existing tests to use these new allocators
- Changed the memkind allocator test into a generic test that can test the various allocators.
- Added tests for creating all of the allocators
- Added tests to verify/create the JemallocNodumpAllocator using its options.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8980

Reviewed By: zhichao-cao

Differential Revision: D32990403

Pulled By: mrambacher

fbshipit-source-id: 6fdfe8218c10dd8dfef34344a08201be1fa95c76
main
mrambacher 3 years ago committed by Facebook GitHub Bot
parent 9828b6d5fd
commit 423538a816
  1. 3
      CMakeLists.txt
  2. 2
      Makefile
  3. 6
      TARGETS
  4. 4
      db/db_impl/db_impl_write.cc
  5. 8
      db/memtable_list.cc
  6. 6
      db/memtable_list.h
  7. 4
      db_stress_tool/db_stress_listener.cc
  8. 18
      include/rocksdb/memory_allocator.h
  9. 277
      memory/jemalloc_nodump_allocator.cc
  10. 46
      memory/jemalloc_nodump_allocator.h
  11. 15
      memory/memkind_kmem_allocator.cc
  12. 27
      memory/memkind_kmem_allocator.h
  13. 102
      memory/memkind_kmem_allocator_test.cc
  14. 91
      memory/memory_allocator.cc
  15. 236
      memory/memory_allocator_test.cc
  16. 31
      options/customizable_test.cc
  17. 3
      src.mk
  18. 24
      table/block_fetcher_test.cc
  19. 33
      table/table_test.cc
  20. 104
      utilities/memory_allocators.h

@ -746,6 +746,7 @@ set(SOURCES
memory/concurrent_arena.cc memory/concurrent_arena.cc
memory/jemalloc_nodump_allocator.cc memory/jemalloc_nodump_allocator.cc
memory/memkind_kmem_allocator.cc memory/memkind_kmem_allocator.cc
memory/memory_allocator.cc
memtable/alloc_tracker.cc memtable/alloc_tracker.cc
memtable/hash_linklist_rep.cc memtable/hash_linklist_rep.cc
memtable/hash_skiplist_rep.cc memtable/hash_skiplist_rep.cc
@ -1270,7 +1271,7 @@ if(WITH_TESTS)
logging/env_logger_test.cc logging/env_logger_test.cc
logging/event_logger_test.cc logging/event_logger_test.cc
memory/arena_test.cc memory/arena_test.cc
memory/memkind_kmem_allocator_test.cc memory/memory_allocator_test.cc
memtable/inlineskiplist_test.cc memtable/inlineskiplist_test.cc
memtable/skiplist_test.cc memtable/skiplist_test.cc
memtable/write_buffer_manager_test.cc memtable/write_buffer_manager_test.cc

@ -1361,7 +1361,7 @@ db_repl_stress: $(OBJ_DIR)/tools/db_repl_stress.o $(LIBRARY)
arena_test: $(OBJ_DIR)/memory/arena_test.o $(TEST_LIBRARY) $(LIBRARY) arena_test: $(OBJ_DIR)/memory/arena_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
memkind_kmem_allocator_test: memory/memkind_kmem_allocator_test.o $(TEST_LIBRARY) $(LIBRARY) memory_allocator_test: memory/memory_allocator_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
autovector_test: $(OBJ_DIR)/util/autovector_test.o $(TEST_LIBRARY) $(LIBRARY) autovector_test: $(OBJ_DIR)/util/autovector_test.o $(TEST_LIBRARY) $(LIBRARY)

@ -256,6 +256,7 @@ cpp_library(
"memory/concurrent_arena.cc", "memory/concurrent_arena.cc",
"memory/jemalloc_nodump_allocator.cc", "memory/jemalloc_nodump_allocator.cc",
"memory/memkind_kmem_allocator.cc", "memory/memkind_kmem_allocator.cc",
"memory/memory_allocator.cc",
"memtable/alloc_tracker.cc", "memtable/alloc_tracker.cc",
"memtable/hash_linklist_rep.cc", "memtable/hash_linklist_rep.cc",
"memtable/hash_skiplist_rep.cc", "memtable/hash_skiplist_rep.cc",
@ -586,6 +587,7 @@ cpp_library(
"memory/concurrent_arena.cc", "memory/concurrent_arena.cc",
"memory/jemalloc_nodump_allocator.cc", "memory/jemalloc_nodump_allocator.cc",
"memory/memkind_kmem_allocator.cc", "memory/memkind_kmem_allocator.cc",
"memory/memory_allocator.cc",
"memtable/alloc_tracker.cc", "memtable/alloc_tracker.cc",
"memtable/hash_linklist_rep.cc", "memtable/hash_linklist_rep.cc",
"memtable/hash_skiplist_rep.cc", "memtable/hash_skiplist_rep.cc",
@ -1781,8 +1783,8 @@ ROCKS_TESTS = [
[], [],
], ],
[ [
"memkind_kmem_allocator_test", "memory_allocator_test",
"memory/memkind_kmem_allocator_test.cc", "memory/memory_allocator_test.cc",
"parallel", "parallel",
[], [],
[], [],

@ -1660,8 +1660,8 @@ Status DBImpl::TrimMemtableHistory(WriteContext* context) {
} }
for (auto& cfd : cfds) { for (auto& cfd : cfds) {
autovector<MemTable*> to_delete; autovector<MemTable*> to_delete;
bool trimmed = cfd->imm()->TrimHistory( bool trimmed = cfd->imm()->TrimHistory(&context->memtables_to_free_,
&context->memtables_to_free_, cfd->mem()->MemoryAllocatedBytes()); cfd->mem()->MemoryAllocatedBytes());
if (trimmed) { if (trimmed) {
context->superversion_context.NewSuperVersion(); context->superversion_context.NewSuperVersion();
assert(context->superversion_context.new_superversion.get() != nullptr); assert(context->superversion_context.new_superversion.get() != nullptr);

@ -597,8 +597,8 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
size_t MemTableList::MemoryAllocatedBytesExcludingLast() const { size_t MemTableList::MemoryAllocatedBytesExcludingLast() const {
const size_t usage = const size_t usage = current_memory_allocted_bytes_excluding_last_.load(
current_memory_allocted_bytes_excluding_last_.load(std::memory_order_relaxed); std::memory_order_relaxed);
return usage; return usage;
} }
@ -610,8 +610,8 @@ bool MemTableList::HasHistory() const {
void MemTableList::UpdateCachedValuesFromMemTableListVersion() { void MemTableList::UpdateCachedValuesFromMemTableListVersion() {
const size_t total_memtable_size = const size_t total_memtable_size =
current_->MemoryAllocatedBytesExcludingLast(); current_->MemoryAllocatedBytesExcludingLast();
current_memory_allocted_bytes_excluding_last_.store(total_memtable_size, current_memory_allocted_bytes_excluding_last_.store(
std::memory_order_relaxed); total_memtable_size, std::memory_order_relaxed);
const bool has_history = current_->HasHistory(); const bool has_history = current_->HasHistory();
current_has_history_.store(has_history, std::memory_order_relaxed); current_has_history_.store(has_history, std::memory_order_relaxed);

@ -287,9 +287,9 @@ class MemTableList {
// Returns the cached current_has_history_ value. // Returns the cached current_has_history_ value.
bool HasHistory() const; bool HasHistory() const;
// Updates current_memory_allocted_bytes_excluding_last_ and current_has_history_ // Updates current_memory_allocted_bytes_excluding_last_ and
// from MemTableListVersion. Must be called whenever InstallNewVersion is // current_has_history_ from MemTableListVersion. Must be called whenever
// called. // InstallNewVersion is called.
void UpdateCachedValuesFromMemTableListVersion(); void UpdateCachedValuesFromMemTableListVersion();
// `usage` is the current size of the mutable Memtable. When // `usage` is the current size of the mutable Memtable. When

@ -33,8 +33,8 @@ UniqueIdVerifier::UniqueIdVerifier(const std::string& db_name)
Status st = fs.CreateDirIfMissing(db_name, opts, nullptr); Status st = fs.CreateDirIfMissing(db_name, opts, nullptr);
if (!st.ok()) { if (!st.ok()) {
fprintf(stderr, "Failed to create directory %s: %s\n", fprintf(stderr, "Failed to create directory %s: %s\n", db_name.c_str(),
db_name.c_str(), st.ToString().c_str()); st.ToString().c_str());
exit(1); exit(1);
} }

@ -5,22 +5,23 @@
#pragma once #pragma once
#include "rocksdb/status.h"
#include <memory> #include <memory>
#include "rocksdb/customizable.h"
#include "rocksdb/status.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
// MemoryAllocator is an interface that a client can implement to supply custom // MemoryAllocator is an interface that a client can implement to supply custom
// memory allocation and deallocation methods. See rocksdb/cache.h for more // memory allocation and deallocation methods. See rocksdb/cache.h for more
// information. // information.
// All methods should be thread-safe. // All methods should be thread-safe.
class MemoryAllocator { class MemoryAllocator : public Customizable {
public: public:
virtual ~MemoryAllocator() = default; static const char* Type() { return "MemoryAllocator"; }
static Status CreateFromString(const ConfigOptions& options,
// Name of the cache allocator, printed in the log const std::string& value,
virtual const char* Name() const = 0; std::shared_ptr<MemoryAllocator>* result);
// Allocate a block of at least size. Has to be thread-safe. // Allocate a block of at least size. Has to be thread-safe.
virtual void* Allocate(size_t size) = 0; virtual void* Allocate(size_t size) = 0;
@ -34,9 +35,12 @@ class MemoryAllocator {
// default implementation just returns the allocation size // default implementation just returns the allocation size
return allocation_size; return allocation_size;
} }
std::string GetId() const override { return GenerateIndividualId(); }
}; };
struct JemallocAllocatorOptions { struct JemallocAllocatorOptions {
static const char* kName() { return "JemallocAllocatorOptions"; }
// Jemalloc tcache cache allocations by size class. For each size class, // Jemalloc tcache cache allocations by size class. For each size class,
// it caches between 20 (for large size classes) to 200 (for small size // it caches between 20 (for large size classes) to 200 (for small size
// classes). To reduce tcache memory usage in case the allocator is access // classes). To reduce tcache memory usage in case the allocator is access

@ -10,22 +10,175 @@
#include "port/likely.h" #include "port/likely.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/convenience.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "util/string_util.h" #include "util/string_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
#ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR #ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
std::atomic<extent_alloc_t*> JemallocNodumpAllocator::original_alloc_{nullptr}; std::atomic<extent_alloc_t*> JemallocNodumpAllocator::original_alloc_{nullptr};
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
static std::unordered_map<std::string, OptionTypeInfo> jemalloc_type_info = {
#ifndef ROCKSDB_LITE
{"limit_tcache_size",
{offsetof(struct JemallocAllocatorOptions, limit_tcache_size),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"tcache_size_lower_bound",
{offsetof(struct JemallocAllocatorOptions, tcache_size_lower_bound),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
{"tcache_size_upper_bound",
{offsetof(struct JemallocAllocatorOptions, tcache_size_upper_bound),
OptionType::kSizeT, OptionVerificationType::kNormal,
OptionTypeFlags::kNone}},
#endif // ROCKSDB_LITE
};
bool JemallocNodumpAllocator::IsSupported(std::string* why) {
#ifndef ROCKSDB_JEMALLOC
*why = "Not compiled with ROCKSDB_JEMALLOC";
return false;
#else
static const std::string unsupported =
"JemallocNodumpAllocator only available with jemalloc version >= 5 "
"and MADV_DONTDUMP is available.";
if (!HasJemalloc()) {
*why = unsupported;
return false;
}
#ifndef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
*why = unsupported;
return false;
#else
return true;
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
#endif // ROCKSDB_MALLOC
}
JemallocNodumpAllocator::JemallocNodumpAllocator( JemallocNodumpAllocator::JemallocNodumpAllocator(
JemallocAllocatorOptions& options, JemallocAllocatorOptions& options)
std::unique_ptr<extent_hooks_t>&& arena_hooks, unsigned arena_index)
: options_(options), : options_(options),
arena_hooks_(std::move(arena_hooks)), #ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
arena_index_(arena_index), tcache_(&JemallocNodumpAllocator::DestroyThreadSpecificCache),
tcache_(&JemallocNodumpAllocator::DestroyThreadSpecificCache) {} #endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
arena_index_(0) {
RegisterOptions(&options_, &jemalloc_type_info);
}
#ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
JemallocNodumpAllocator::~JemallocNodumpAllocator() {
// Destroy tcache before destroying arena.
autovector<void*> tcache_list;
tcache_.Scrape(&tcache_list, nullptr);
for (void* tcache_index : tcache_list) {
DestroyThreadSpecificCache(tcache_index);
}
if (arena_index_ > 0) {
// Destroy arena. Silently ignore error.
Status s = DestroyArena(arena_index_);
assert(s.ok());
s.PermitUncheckedError();
}
}
size_t JemallocNodumpAllocator::UsableSize(void* p,
size_t /*allocation_size*/) const {
return malloc_usable_size(static_cast<void*>(p));
}
void* JemallocNodumpAllocator::Allocate(size_t size) {
int tcache_flag = GetThreadSpecificCache(size);
return mallocx(size, MALLOCX_ARENA(arena_index_) | tcache_flag);
}
void JemallocNodumpAllocator::Deallocate(void* p) {
// Obtain tcache.
size_t size = 0;
if (options_.limit_tcache_size) {
size = malloc_usable_size(p);
}
int tcache_flag = GetThreadSpecificCache(size);
// No need to pass arena index to dallocx(). Jemalloc will find arena index
// from its own metadata.
dallocx(p, tcache_flag);
}
Status JemallocNodumpAllocator::InitializeArenas() {
// Create arena.
size_t arena_index_size = sizeof(arena_index_);
int ret =
mallctl("arenas.create", &arena_index_, &arena_index_size, nullptr, 0);
if (ret != 0) {
return Status::Incomplete("Failed to create jemalloc arena, error code: " +
ROCKSDB_NAMESPACE::ToString(ret));
}
assert(arena_index_ != 0);
// Read existing hooks.
std::string key =
"arena." + ROCKSDB_NAMESPACE::ToString(arena_index_) + ".extent_hooks";
extent_hooks_t* hooks;
size_t hooks_size = sizeof(hooks);
ret = mallctl(key.c_str(), &hooks, &hooks_size, nullptr, 0);
if (ret != 0) {
return Status::Incomplete("Failed to read existing hooks, error code: " +
ROCKSDB_NAMESPACE::ToString(ret));
}
// Store existing alloc.
extent_alloc_t* original_alloc = hooks->alloc;
extent_alloc_t* expected = nullptr;
bool success =
JemallocNodumpAllocator::original_alloc_.compare_exchange_strong(
expected, original_alloc);
if (!success && original_alloc != expected) {
return Status::Incomplete("Original alloc conflict.");
}
// Set the custom hook.
arena_hooks_.reset(new extent_hooks_t(*hooks));
arena_hooks_->alloc = &JemallocNodumpAllocator::Alloc;
extent_hooks_t* hooks_ptr = arena_hooks_.get();
ret = mallctl(key.c_str(), nullptr, nullptr, &hooks_ptr, sizeof(hooks_ptr));
if (ret != 0) {
return Status::Incomplete("Failed to set custom hook, error code: " +
ROCKSDB_NAMESPACE::ToString(ret));
}
return Status::OK();
}
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
Status JemallocNodumpAllocator::PrepareOptions(
const ConfigOptions& config_options) {
std::string message;
if (!IsSupported(&message)) {
return Status::NotSupported(message);
} else if (options_.limit_tcache_size &&
options_.tcache_size_lower_bound >=
options_.tcache_size_upper_bound) {
return Status::InvalidArgument(
"tcache_size_lower_bound larger or equal to tcache_size_upper_bound.");
} else if (IsMutable()) {
Status s = MemoryAllocator::PrepareOptions(config_options);
#ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
if (s.ok()) {
s = InitializeArenas();
}
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
return s;
} else {
// Already prepared
return Status::OK();
}
}
#ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
int JemallocNodumpAllocator::GetThreadSpecificCache(size_t size) { int JemallocNodumpAllocator::GetThreadSpecificCache(size_t size) {
// We always enable tcache. The only corner case is when there are a ton of // We always enable tcache. The only corner case is when there are a ton of
// threads accessing with low frequency, then it could consume a lot of // threads accessing with low frequency, then it could consume a lot of
@ -50,24 +203,6 @@ int JemallocNodumpAllocator::GetThreadSpecificCache(size_t size) {
} }
return MALLOCX_TCACHE(*tcache_index); return MALLOCX_TCACHE(*tcache_index);
} }
void* JemallocNodumpAllocator::Allocate(size_t size) {
int tcache_flag = GetThreadSpecificCache(size);
return mallocx(size, MALLOCX_ARENA(arena_index_) | tcache_flag);
}
void JemallocNodumpAllocator::Deallocate(void* p) {
// Obtain tcache.
size_t size = 0;
if (options_.limit_tcache_size) {
size = malloc_usable_size(p);
}
int tcache_flag = GetThreadSpecificCache(size);
// No need to pass arena index to dallocx(). Jemalloc will find arena index
// from its own metadata.
dallocx(p, tcache_flag);
}
void* JemallocNodumpAllocator::Alloc(extent_hooks_t* extent, void* new_addr, void* JemallocNodumpAllocator::Alloc(extent_hooks_t* extent, void* new_addr,
size_t size, size_t alignment, bool* zero, size_t size, size_t alignment, bool* zero,
bool* commit, unsigned arena_ind) { bool* commit, unsigned arena_ind) {
@ -91,11 +226,12 @@ void* JemallocNodumpAllocator::Alloc(extent_hooks_t* extent, void* new_addr,
Status JemallocNodumpAllocator::DestroyArena(unsigned arena_index) { Status JemallocNodumpAllocator::DestroyArena(unsigned arena_index) {
assert(arena_index != 0); assert(arena_index != 0);
std::string key = "arena." + ToString(arena_index) + ".destroy"; std::string key =
"arena." + ROCKSDB_NAMESPACE::ToString(arena_index) + ".destroy";
int ret = mallctl(key.c_str(), nullptr, 0, nullptr, 0); int ret = mallctl(key.c_str(), nullptr, 0, nullptr, 0);
if (ret != 0) { if (ret != 0) {
return Status::Incomplete("Failed to destroy jemalloc arena, error code: " + return Status::Incomplete("Failed to destroy jemalloc arena, error code: " +
ToString(ret)); ROCKSDB_NAMESPACE::ToString(ret));
} }
return Status::OK(); return Status::OK();
} }
@ -111,22 +247,6 @@ void JemallocNodumpAllocator::DestroyThreadSpecificCache(void* ptr) {
delete tcache_index; delete tcache_index;
} }
JemallocNodumpAllocator::~JemallocNodumpAllocator() {
// Destroy tcache before destroying arena.
autovector<void*> tcache_list;
tcache_.Scrape(&tcache_list, nullptr);
for (void* tcache_index : tcache_list) {
DestroyThreadSpecificCache(tcache_index);
}
// Destroy arena. Silently ignore error.
Status s __attribute__((__unused__)) = DestroyArena(arena_index_);
assert(s.ok());
}
size_t JemallocNodumpAllocator::UsableSize(void* p,
size_t /*allocation_size*/) const {
return malloc_usable_size(static_cast<void*>(p));
}
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR #endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
Status NewJemallocNodumpAllocator( Status NewJemallocNodumpAllocator(
@ -135,72 +255,17 @@ Status NewJemallocNodumpAllocator(
if (memory_allocator == nullptr) { if (memory_allocator == nullptr) {
return Status::InvalidArgument("memory_allocator must be non-null."); return Status::InvalidArgument("memory_allocator must be non-null.");
} }
*memory_allocator = nullptr; #ifndef ROCKSDB_JEMALLOC
Status unsupported = Status::NotSupported(
"JemallocNodumpAllocator only available with jemalloc version >= 5 "
"and MADV_DONTDUMP is available.");
#ifndef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
(void)options; (void)options;
return unsupported; return Status::NotSupported("Not compiled with JEMALLOC");
#else #else
if (!HasJemalloc()) { std::unique_ptr<MemoryAllocator> allocator(
return unsupported; new JemallocNodumpAllocator(options));
} Status s = allocator->PrepareOptions(ConfigOptions());
if (options.limit_tcache_size && if (s.ok()) {
options.tcache_size_lower_bound >= options.tcache_size_upper_bound) { memory_allocator->reset(allocator.release());
return Status::InvalidArgument(
"tcache_size_lower_bound larger or equal to tcache_size_upper_bound.");
}
// Create arena.
unsigned arena_index = 0;
size_t arena_index_size = sizeof(arena_index);
int ret =
mallctl("arenas.create", &arena_index, &arena_index_size, nullptr, 0);
if (ret != 0) {
return Status::Incomplete("Failed to create jemalloc arena, error code: " +
ToString(ret));
}
assert(arena_index != 0);
// Read existing hooks.
std::string key = "arena." + ToString(arena_index) + ".extent_hooks";
extent_hooks_t* hooks;
size_t hooks_size = sizeof(hooks);
ret = mallctl(key.c_str(), &hooks, &hooks_size, nullptr, 0);
if (ret != 0) {
JemallocNodumpAllocator::DestroyArena(arena_index);
return Status::Incomplete("Failed to read existing hooks, error code: " +
ToString(ret));
} }
return s;
// Store existing alloc. #endif
extent_alloc_t* original_alloc = hooks->alloc;
extent_alloc_t* expected = nullptr;
bool success =
JemallocNodumpAllocator::original_alloc_.compare_exchange_strong(
expected, original_alloc);
if (!success && original_alloc != expected) {
JemallocNodumpAllocator::DestroyArena(arena_index);
return Status::Incomplete("Original alloc conflict.");
}
// Set the custom hook.
std::unique_ptr<extent_hooks_t> new_hooks(new extent_hooks_t(*hooks));
new_hooks->alloc = &JemallocNodumpAllocator::Alloc;
extent_hooks_t* hooks_ptr = new_hooks.get();
ret = mallctl(key.c_str(), nullptr, nullptr, &hooks_ptr, sizeof(hooks_ptr));
if (ret != 0) {
JemallocNodumpAllocator::DestroyArena(arena_index);
return Status::Incomplete("Failed to set custom hook, error code: " +
ToString(ret));
}
// Create cache allocator.
memory_allocator->reset(
new JemallocNodumpAllocator(options, std::move(new_hooks), arena_index));
return Status::OK();
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -12,6 +12,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/memory_allocator.h" #include "rocksdb/memory_allocator.h"
#include "util/thread_local.h" #include "util/thread_local.h"
#include "utilities/memory_allocators.h"
#if defined(ROCKSDB_JEMALLOC) && defined(ROCKSDB_PLATFORM_POSIX) #if defined(ROCKSDB_JEMALLOC) && defined(ROCKSDB_PLATFORM_POSIX)
@ -19,22 +20,38 @@
#if (JEMALLOC_VERSION_MAJOR >= 5) && defined(MADV_DONTDUMP) #if (JEMALLOC_VERSION_MAJOR >= 5) && defined(MADV_DONTDUMP)
#define ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR #define ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
#endif // (JEMALLOC_VERSION_MAJOR >= 5) && MADV_DONTDUMP
#endif // ROCKSDB_JEMALLOC && ROCKSDB_PLATFORM_POSIX
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class JemallocNodumpAllocator : public BaseMemoryAllocator {
class JemallocNodumpAllocator : public MemoryAllocator {
public: public:
JemallocNodumpAllocator(JemallocAllocatorOptions& options, explicit JemallocNodumpAllocator(JemallocAllocatorOptions& options);
std::unique_ptr<extent_hooks_t>&& arena_hooks, #ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
unsigned arena_index);
~JemallocNodumpAllocator(); ~JemallocNodumpAllocator();
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
static const char* kClassName() { return "JemallocNodumpAllocator"; }
const char* Name() const override { return kClassName(); }
static bool IsSupported() {
std::string unused;
return IsSupported(&unused);
}
static bool IsSupported(std::string* why);
bool IsMutable() const { return arena_index_ == 0; }
const char* Name() const override { return "JemallocNodumpAllocator"; } Status PrepareOptions(const ConfigOptions& config_options) override;
#ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
void* Allocate(size_t size) override; void* Allocate(size_t size) override;
void Deallocate(void* p) override; void Deallocate(void* p) override;
size_t UsableSize(void* p, size_t allocation_size) const override; size_t UsableSize(void* p, size_t allocation_size) const override;
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
private: private:
#ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
Status InitializeArenas();
friend Status NewJemallocNodumpAllocator( friend Status NewJemallocNodumpAllocator(
JemallocAllocatorOptions& options, JemallocAllocatorOptions& options,
std::shared_ptr<MemoryAllocator>* memory_allocator); std::shared_ptr<MemoryAllocator>* memory_allocator);
@ -53,7 +70,10 @@ class JemallocNodumpAllocator : public MemoryAllocator {
// Get or create tcache. Return flag suitable to use with `mallocx`: // Get or create tcache. Return flag suitable to use with `mallocx`:
// either MALLOCX_TCACHE_NONE or MALLOCX_TCACHE(tc). // either MALLOCX_TCACHE_NONE or MALLOCX_TCACHE(tc).
int GetThreadSpecificCache(size_t size); int GetThreadSpecificCache(size_t size);
#endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
JemallocAllocatorOptions options_;
#ifdef ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
// A function pointer to jemalloc default alloc. Use atomic to make sure // A function pointer to jemalloc default alloc. Use atomic to make sure
// NewJemallocNodumpAllocator is thread-safe. // NewJemallocNodumpAllocator is thread-safe.
// //
@ -61,18 +81,14 @@ class JemallocNodumpAllocator : public MemoryAllocator {
// alloc needs to be static to pass to jemalloc as function pointer. // alloc needs to be static to pass to jemalloc as function pointer.
static std::atomic<extent_alloc_t*> original_alloc_; static std::atomic<extent_alloc_t*> original_alloc_;
const JemallocAllocatorOptions options_;
// Custom hooks has to outlive corresponding arena. // Custom hooks has to outlive corresponding arena.
const std::unique_ptr<extent_hooks_t> arena_hooks_; std::unique_ptr<extent_hooks_t> arena_hooks_;
// Arena index.
const unsigned arena_index_;
// Hold thread-local tcache index. // Hold thread-local tcache index.
ThreadLocalPtr tcache_; ThreadLocalPtr tcache_;
}; #endif // ROCKSDB_JEMALLOC_NODUMP_ALLOCATOR
// Arena index.
unsigned arena_index_;
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#endif // (JEMALLOC_VERSION_MAJOR >= 5) && MADV_DONTDUMP
#endif // ROCKSDB_JEMALLOC && ROCKSDB_PLATFORM_POSIX

@ -5,11 +5,22 @@
// (found in the LICENSE.Apache file in the root directory). // (found in the LICENSE.Apache file in the root directory).
#ifdef MEMKIND #ifdef MEMKIND
#include <memkind.h>
#endif // MEMKIND
#include "memkind_kmem_allocator.h" #include "memory/memkind_kmem_allocator.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Status MemkindKmemAllocator::PrepareOptions(const ConfigOptions& options) {
std::string message;
if (!IsSupported(&message)) {
return Status::NotSupported(message);
} else {
return MemoryAllocator::PrepareOptions(options);
}
}
#ifdef MEMKIND
void* MemkindKmemAllocator::Allocate(size_t size) { void* MemkindKmemAllocator::Allocate(size_t size) {
void* p = memkind_malloc(MEMKIND_DAX_KMEM, size); void* p = memkind_malloc(MEMKIND_DAX_KMEM, size);
if (p == NULL) { if (p == NULL) {
@ -28,6 +39,6 @@ size_t MemkindKmemAllocator::UsableSize(void* p,
return memkind_malloc_usable_size(MEMKIND_DAX_KMEM, p); return memkind_malloc_usable_size(MEMKIND_DAX_KMEM, p);
} }
#endif // ROCKSDB_MALLOC_USABLE_SIZE #endif // ROCKSDB_MALLOC_USABLE_SIZE
#endif // MEMKIND
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#endif // MEMKIND

@ -6,22 +6,37 @@
#pragma once #pragma once
#ifdef MEMKIND
#include <memkind.h>
#include "rocksdb/memory_allocator.h" #include "rocksdb/memory_allocator.h"
#include "utilities/memory_allocators.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
class MemkindKmemAllocator : public MemoryAllocator { class MemkindKmemAllocator : public BaseMemoryAllocator {
public: public:
const char* Name() const override { return "MemkindKmemAllocator"; }; static const char* kClassName() { return "MemkindKmemAllocator"; }
const char* Name() const override { return kClassName(); }
static bool IsSupported() {
std::string unused;
return IsSupported(&unused);
}
static bool IsSupported(std::string* msg) {
#ifdef MEMKIND
return true;
#else
*msg = "Not compiled with MemKind";
return false;
#endif
}
Status PrepareOptions(const ConfigOptions& options) override;
#ifdef MEMKIND
void* Allocate(size_t size) override; void* Allocate(size_t size) override;
void Deallocate(void* p) override; void Deallocate(void* p) override;
#ifdef ROCKSDB_MALLOC_USABLE_SIZE #ifdef ROCKSDB_MALLOC_USABLE_SIZE
size_t UsableSize(void* p, size_t /*allocation_size*/) const override; size_t UsableSize(void* p, size_t /*allocation_size*/) const override;
#endif #endif
#endif // MEMKIND
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#endif // MEMKIND

@ -1,102 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// Copyright (c) 2019 Intel Corporation
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <cstdio>
#ifdef MEMKIND
#include "memkind_kmem_allocator.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "table/block_based/block_based_table_factory.h"
#include "test_util/testharness.h"
namespace ROCKSDB_NAMESPACE {
TEST(MemkindKmemAllocatorTest, Allocate) {
MemkindKmemAllocator allocator;
void* p;
try {
p = allocator.Allocate(1024);
} catch (const std::bad_alloc& e) {
return;
}
ASSERT_NE(p, nullptr);
size_t size = allocator.UsableSize(p, 1024);
ASSERT_GE(size, 1024);
allocator.Deallocate(p);
}
TEST(MemkindKmemAllocatorTest, DatabaseBlockCache) {
// Check if a memory node is available for allocation
try {
MemkindKmemAllocator allocator;
allocator.Allocate(1024);
} catch (const std::bad_alloc& e) {
return; // if no node available, skip the test
}
// Create database with block cache using MemkindKmemAllocator
Options options;
std::string dbname = test::PerThreadDBPath("memkind_kmem_allocator_test");
ASSERT_OK(DestroyDB(dbname, options));
options.create_if_missing = true;
std::shared_ptr<Cache> cache = NewLRUCache(
1024 * 1024, 6, false, false, std::make_shared<MemkindKmemAllocator>());
BlockBasedTableOptions table_options;
table_options.block_cache = cache;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DB* db = nullptr;
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_NE(db, nullptr);
ASSERT_EQ(cache->GetUsage(), 0);
// Write 2kB (200 values, each 10 bytes)
int num_keys = 200;
WriteOptions wo;
std::string val = "0123456789";
for (int i = 0; i < num_keys; i++) {
std::string key = std::to_string(i);
s = db->Put(wo, Slice(key), Slice(val));
ASSERT_OK(s);
}
ASSERT_OK(db->Flush(FlushOptions())); // Flush all data from memtable so that
// reads are from block cache
// Read and check block cache usage
ReadOptions ro;
std::string result;
for (int i = 0; i < num_keys; i++) {
std::string key = std::to_string(i);
s = db->Get(ro, key, &result);
ASSERT_OK(s);
ASSERT_EQ(result, val);
}
ASSERT_GT(cache->GetUsage(), 2000);
// Close database
s = db->Close();
ASSERT_OK(s);
ASSERT_OK(DestroyDB(dbname, options));
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
int main(int /*argc*/, char** /*argv*/) {
printf(
"Skip memkind_kmem_allocator_test as the required library memkind is "
"missing.");
}
#endif // MEMKIND

@ -0,0 +1,91 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "rocksdb/memory_allocator.h"
#include "memory/jemalloc_nodump_allocator.h"
#include "memory/memkind_kmem_allocator.h"
#include "rocksdb/utilities/customizable_util.h"
#include "rocksdb/utilities/object_registry.h"
#include "rocksdb/utilities/options_type.h"
#include "utilities/memory_allocators.h"
namespace ROCKSDB_NAMESPACE {
namespace {
static std::unordered_map<std::string, OptionTypeInfo> ma_wrapper_type_info = {
#ifndef ROCKSDB_LITE
{"target", OptionTypeInfo::AsCustomSharedPtr<MemoryAllocator>(
0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
#endif // ROCKSDB_LITE
};
#ifndef ROCKSDB_LITE
static int RegisterBuiltinAllocators(ObjectLibrary& library,
const std::string& /*arg*/) {
library.Register<MemoryAllocator>(
DefaultMemoryAllocator::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<MemoryAllocator>* guard,
std::string* /*errmsg*/) {
guard->reset(new DefaultMemoryAllocator());
return guard->get();
});
library.Register<MemoryAllocator>(
CountedMemoryAllocator::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<MemoryAllocator>* guard,
std::string* /*errmsg*/) {
guard->reset(new CountedMemoryAllocator(
std::make_shared<DefaultMemoryAllocator>()));
return guard->get();
});
library.Register<MemoryAllocator>(
JemallocNodumpAllocator::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<MemoryAllocator>* guard,
std::string* errmsg) {
if (JemallocNodumpAllocator::IsSupported(errmsg)) {
JemallocAllocatorOptions options;
guard->reset(new JemallocNodumpAllocator(options));
}
return guard->get();
});
library.Register<MemoryAllocator>(
MemkindKmemAllocator::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<MemoryAllocator>* guard,
std::string* errmsg) {
if (MemkindKmemAllocator::IsSupported(errmsg)) {
guard->reset(new MemkindKmemAllocator());
}
return guard->get();
});
size_t num_types;
return static_cast<int>(library.GetFactoryCount(&num_types));
}
#endif // ROCKSDB_LITE
} // namespace
MemoryAllocatorWrapper::MemoryAllocatorWrapper(
const std::shared_ptr<MemoryAllocator>& t)
: target_(t) {
RegisterOptions("", &target_, &ma_wrapper_type_info);
}
Status MemoryAllocator::CreateFromString(
const ConfigOptions& options, const std::string& value,
std::shared_ptr<MemoryAllocator>* result) {
#ifndef ROCKSDB_LITE
static std::once_flag once;
std::call_once(once, [&]() {
RegisterBuiltinAllocators(*(ObjectLibrary::Default().get()), "");
});
#else
if (value == DefaultMemoryAllocator::kClassName()) {
result->reset(new DefaultMemoryAllocator());
return Status::OK();
}
#endif // ROCKSDB_LITE
ConfigOptions copy = options;
copy.invoke_prepare_options = true;
return LoadManagedObject<MemoryAllocator>(copy, value, result);
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,236 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// Copyright (c) 2019 Intel Corporation
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <cstdio>
#include "memory/jemalloc_nodump_allocator.h"
#include "memory/memkind_kmem_allocator.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "table/block_based/block_based_table_factory.h"
#include "test_util/testharness.h"
#include "utilities/memory_allocators.h"
namespace ROCKSDB_NAMESPACE {
class MemoryAllocatorTest
: public testing::Test,
public ::testing::WithParamInterface<std::tuple<std::string, bool>> {
public:
MemoryAllocatorTest() {
std::tie(id_, supported_) = GetParam();
Status s =
MemoryAllocator::CreateFromString(ConfigOptions(), id_, &allocator_);
if (supported_) {
EXPECT_OK(s);
} else if (!s.ok()) {
EXPECT_TRUE(s.IsNotSupported());
}
}
bool IsSupported() { return supported_; }
std::shared_ptr<MemoryAllocator> allocator_;
std::string id_;
private:
bool supported_;
};
TEST_P(MemoryAllocatorTest, Allocate) {
if (!IsSupported()) {
return;
}
void* p = allocator_->Allocate(1024);
ASSERT_NE(p, nullptr);
size_t size = allocator_->UsableSize(p, 1024);
ASSERT_GE(size, 1024);
allocator_->Deallocate(p);
}
TEST_P(MemoryAllocatorTest, CreateAllocator) {
ConfigOptions config_options;
config_options.ignore_unknown_options = false;
config_options.ignore_unsupported_options = false;
std::shared_ptr<MemoryAllocator> orig, copy;
Status s = MemoryAllocator::CreateFromString(config_options, id_, &orig);
if (!IsSupported()) {
ASSERT_TRUE(s.IsNotSupported());
} else {
ASSERT_OK(s);
ASSERT_NE(orig, nullptr);
#ifndef ROCKSDB_LITE
std::string str = orig->ToString(config_options);
ASSERT_OK(MemoryAllocator::CreateFromString(config_options, str, &copy));
ASSERT_EQ(orig, copy);
#endif // ROCKSDB_LITE
}
}
TEST_P(MemoryAllocatorTest, DatabaseBlockCache) {
if (!IsSupported()) {
// Check if a memory node is available for allocation
}
// Create database with block cache using the MemoryAllocator
Options options;
std::string dbname = test::PerThreadDBPath("allocator_test");
ASSERT_OK(DestroyDB(dbname, options));
options.create_if_missing = true;
BlockBasedTableOptions table_options;
auto cache = NewLRUCache(1024 * 1024, 6, false, false, allocator_);
table_options.block_cache = cache;
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
DB* db = nullptr;
Status s = DB::Open(options, dbname, &db);
ASSERT_OK(s);
ASSERT_NE(db, nullptr);
ASSERT_LE(cache->GetUsage(), 104); // Cache will contain stats
// Write 2kB (200 values, each 10 bytes)
int num_keys = 200;
WriteOptions wo;
std::string val = "0123456789";
for (int i = 0; i < num_keys; i++) {
std::string key = std::to_string(i);
s = db->Put(wo, Slice(key), Slice(val));
ASSERT_OK(s);
}
ASSERT_OK(db->Flush(FlushOptions())); // Flush all data from memtable so that
// reads are from block cache
// Read and check block cache usage
ReadOptions ro;
std::string result;
for (int i = 0; i < num_keys; i++) {
std::string key = std::to_string(i);
s = db->Get(ro, key, &result);
ASSERT_OK(s);
ASSERT_EQ(result, val);
}
ASSERT_GT(cache->GetUsage(), 2000);
// Close database
s = db->Close();
ASSERT_OK(s);
delete db;
ASSERT_OK(DestroyDB(dbname, options));
}
class CreateMemoryAllocatorTest : public testing::Test {
public:
CreateMemoryAllocatorTest() {
config_options_.ignore_unknown_options = false;
config_options_.ignore_unsupported_options = false;
}
ConfigOptions config_options_;
};
TEST_F(CreateMemoryAllocatorTest, JemallocOptionsTest) {
std::shared_ptr<MemoryAllocator> allocator;
std::string id = std::string("id=") + JemallocNodumpAllocator::kClassName();
Status s = MemoryAllocator::CreateFromString(config_options_, id, &allocator);
if (!JemallocNodumpAllocator::IsSupported()) {
ASSERT_TRUE(s.IsNotSupported());
ROCKSDB_GTEST_SKIP("JEMALLOC not supported");
return;
}
ASSERT_OK(s);
ASSERT_NE(allocator, nullptr);
JemallocAllocatorOptions jopts;
auto opts = allocator->GetOptions<JemallocAllocatorOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->limit_tcache_size, jopts.limit_tcache_size);
ASSERT_EQ(opts->tcache_size_lower_bound, jopts.tcache_size_lower_bound);
ASSERT_EQ(opts->tcache_size_upper_bound, jopts.tcache_size_upper_bound);
ASSERT_NOK(MemoryAllocator::CreateFromString(
config_options_,
id + "; limit_tcache_size=true; tcache_size_lower_bound=4096; "
"tcache_size_upper_bound=1024",
&allocator));
ASSERT_OK(MemoryAllocator::CreateFromString(
config_options_,
id + "; limit_tcache_size=false; tcache_size_lower_bound=4096; "
"tcache_size_upper_bound=1024",
&allocator));
opts = allocator->GetOptions<JemallocAllocatorOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->limit_tcache_size, false);
ASSERT_EQ(opts->tcache_size_lower_bound, 4096U);
ASSERT_EQ(opts->tcache_size_upper_bound, 1024U);
ASSERT_OK(MemoryAllocator::CreateFromString(
config_options_,
id + "; limit_tcache_size=true; tcache_size_upper_bound=4096; "
"tcache_size_lower_bound=1024",
&allocator));
opts = allocator->GetOptions<JemallocAllocatorOptions>();
ASSERT_NE(opts, nullptr);
ASSERT_EQ(opts->limit_tcache_size, true);
ASSERT_EQ(opts->tcache_size_lower_bound, 1024U);
ASSERT_EQ(opts->tcache_size_upper_bound, 4096U);
}
TEST_F(CreateMemoryAllocatorTest, NewJemallocNodumpAllocator) {
JemallocAllocatorOptions jopts;
std::shared_ptr<MemoryAllocator> allocator;
jopts.limit_tcache_size = true;
jopts.tcache_size_lower_bound = 2 * 1024;
jopts.tcache_size_upper_bound = 1024;
ASSERT_NOK(NewJemallocNodumpAllocator(jopts, nullptr));
Status s = NewJemallocNodumpAllocator(jopts, &allocator);
std::string msg;
if (!JemallocNodumpAllocator::IsSupported(&msg)) {
ASSERT_TRUE(s.IsNotSupported());
ROCKSDB_GTEST_SKIP("JEMALLOC not supported");
return;
}
ASSERT_NOK(s); // Invalid options
ASSERT_EQ(allocator, nullptr);
jopts.tcache_size_upper_bound = 4 * 1024;
ASSERT_OK(NewJemallocNodumpAllocator(jopts, &allocator));
ASSERT_NE(allocator, nullptr);
auto opts = allocator->GetOptions<JemallocAllocatorOptions>();
ASSERT_EQ(opts->tcache_size_upper_bound, jopts.tcache_size_upper_bound);
ASSERT_EQ(opts->tcache_size_lower_bound, jopts.tcache_size_lower_bound);
ASSERT_EQ(opts->limit_tcache_size, jopts.limit_tcache_size);
jopts.limit_tcache_size = false;
ASSERT_OK(NewJemallocNodumpAllocator(jopts, &allocator));
ASSERT_NE(allocator, nullptr);
opts = allocator->GetOptions<JemallocAllocatorOptions>();
ASSERT_EQ(opts->tcache_size_upper_bound, jopts.tcache_size_upper_bound);
ASSERT_EQ(opts->tcache_size_lower_bound, jopts.tcache_size_lower_bound);
ASSERT_EQ(opts->limit_tcache_size, jopts.limit_tcache_size);
}
INSTANTIATE_TEST_CASE_P(DefaultMemoryAllocator, MemoryAllocatorTest,
::testing::Values(std::make_tuple(
DefaultMemoryAllocator::kClassName(), true)));
#ifdef MEMKIND
INSTANTIATE_TEST_CASE_P(
MemkindkMemAllocator, MemoryAllocatorTest,
::testing::Values(std::make_tuple(MemkindKmemAllocator::kClassName(),
MemkindKmemAllocator::IsSupported())));
#endif // MEMKIND
#ifdef ROCKSDB_JEMALLOC
INSTANTIATE_TEST_CASE_P(
JemallocNodumpAllocator, MemoryAllocatorTest,
::testing::Values(std::make_tuple(JemallocNodumpAllocator::kClassName(),
JemallocNodumpAllocator::IsSupported())));
#endif // ROCKSDB_JEMALLOC
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -22,6 +22,7 @@
#include "rocksdb/env_encryption.h" #include "rocksdb/env_encryption.h"
#include "rocksdb/file_checksum.h" #include "rocksdb/file_checksum.h"
#include "rocksdb/flush_block_policy.h" #include "rocksdb/flush_block_policy.h"
#include "rocksdb/memory_allocator.h"
#include "rocksdb/rate_limiter.h" #include "rocksdb/rate_limiter.h"
#include "rocksdb/secondary_cache.h" #include "rocksdb/secondary_cache.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
@ -39,6 +40,7 @@
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h" #include "utilities/compaction_filters/remove_emptyvalue_compactionfilter.h"
#include "utilities/memory_allocators.h"
#ifndef GFLAGS #ifndef GFLAGS
bool FLAGS_enable_print = false; bool FLAGS_enable_print = false;
@ -1305,6 +1307,12 @@ class MockSliceTransform : public SliceTransform {
bool InRange(const Slice& /*key*/) const override { return false; } bool InRange(const Slice& /*key*/) const override { return false; }
}; };
class MockMemoryAllocator : public BaseMemoryAllocator {
public:
static const char* kClassName() { return "MockMemoryAllocator"; }
const char* Name() const override { return kClassName(); }
};
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
class MockEncryptionProvider : public EncryptionProvider { class MockEncryptionProvider : public EncryptionProvider {
public: public:
@ -1463,6 +1471,13 @@ static int RegisterLocalObjects(ObjectLibrary& library,
guard->reset(new MockCipher()); guard->reset(new MockCipher());
return guard->get(); return guard->get();
}); });
library.Register<MemoryAllocator>(
MockMemoryAllocator::kClassName(),
[](const std::string& /*uri*/, std::unique_ptr<MemoryAllocator>* guard,
std::string* /* errmsg */) {
guard->reset(new MockMemoryAllocator());
return guard->get();
});
library.Register<FlushBlockPolicyFactory>( library.Register<FlushBlockPolicyFactory>(
TestFlushBlockPolicyFactory::kClassName(), TestFlushBlockPolicyFactory::kClassName(),
[](const std::string& /*uri*/, [](const std::string& /*uri*/,
@ -1921,6 +1936,22 @@ TEST_F(LoadCustomizableTest, LoadSystemClockTest) {
} }
} }
TEST_F(LoadCustomizableTest, LoadMemoryAllocatorTest) {
std::shared_ptr<MemoryAllocator> result;
ASSERT_NOK(MemoryAllocator::CreateFromString(
config_options_, MockMemoryAllocator::kClassName(), &result));
ASSERT_OK(MemoryAllocator::CreateFromString(
config_options_, DefaultMemoryAllocator::kClassName(), &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), DefaultMemoryAllocator::kClassName());
if (RegisterTests("Test")) {
ASSERT_OK(MemoryAllocator::CreateFromString(
config_options_, MockMemoryAllocator::kClassName(), &result));
ASSERT_NE(result, nullptr);
ASSERT_STREQ(result->Name(), MockMemoryAllocator::kClassName());
}
}
TEST_F(LoadCustomizableTest, LoadRateLimiterTest) { TEST_F(LoadCustomizableTest, LoadRateLimiterTest) {
std::shared_ptr<RateLimiter> result; std::shared_ptr<RateLimiter> result;
ASSERT_NOK(RateLimiter::CreateFromString( ASSERT_NOK(RateLimiter::CreateFromString(

@ -115,6 +115,7 @@ LIB_SOURCES = \
memory/concurrent_arena.cc \ memory/concurrent_arena.cc \
memory/jemalloc_nodump_allocator.cc \ memory/jemalloc_nodump_allocator.cc \
memory/memkind_kmem_allocator.cc \ memory/memkind_kmem_allocator.cc \
memory/memory_allocator.cc \
memtable/alloc_tracker.cc \ memtable/alloc_tracker.cc \
memtable/hash_linklist_rep.cc \ memtable/hash_linklist_rep.cc \
memtable/hash_skiplist_rep.cc \ memtable/hash_skiplist_rep.cc \
@ -501,7 +502,7 @@ TEST_MAIN_SOURCES = \
logging/env_logger_test.cc \ logging/env_logger_test.cc \
logging/event_logger_test.cc \ logging/event_logger_test.cc \
memory/arena_test.cc \ memory/arena_test.cc \
memory/memkind_kmem_allocator_test.cc \ memory/memory_allocator_test.cc \
memtable/inlineskiplist_test.cc \ memtable/inlineskiplist_test.cc \
memtable/skiplist_test.cc \ memtable/skiplist_test.cc \
memtable/write_buffer_manager_test.cc \ memtable/write_buffer_manager_test.cc \

@ -18,32 +18,10 @@
#include "table/block_based/block_based_table_reader.h" #include "table/block_based/block_based_table_reader.h"
#include "table/format.h" #include "table/format.h"
#include "test_util/testharness.h" #include "test_util/testharness.h"
#include "utilities/memory_allocators.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace { namespace {
class CountedMemoryAllocator : public MemoryAllocator {
public:
const char* Name() const override { return "CountedMemoryAllocator"; }
void* Allocate(size_t size) override {
num_allocations_++;
return static_cast<void*>(new char[size]);
}
void Deallocate(void* p) override {
num_deallocations_++;
delete[] static_cast<char*>(p);
}
int GetNumAllocations() const { return num_allocations_; }
int GetNumDeallocations() const { return num_deallocations_; }
private:
int num_allocations_ = 0;
int num_deallocations_ = 0;
};
struct MemcpyStats { struct MemcpyStats {
int num_stack_buf_memcpy; int num_stack_buf_memcpy;
int num_heap_buf_memcpy; int num_heap_buf_memcpy;

@ -69,6 +69,7 @@
#include "util/file_checksum_helper.h" #include "util/file_checksum_helper.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "utilities/memory_allocators.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -3635,30 +3636,10 @@ TEST_P(BlockBasedTableTest, BlockCacheLeak) {
c.ResetTableReader(); c.ResetTableReader();
} }
namespace {
class CustomMemoryAllocator : public MemoryAllocator {
public:
const char* Name() const override { return "CustomMemoryAllocator"; }
void* Allocate(size_t size) override {
++numAllocations;
auto ptr = new char[size + 16];
memcpy(ptr, "memory_allocator_", 16); // mangle first 16 bytes
return reinterpret_cast<void*>(ptr + 16);
}
void Deallocate(void* p) override {
++numDeallocations;
char* ptr = reinterpret_cast<char*>(p) - 16;
delete[] ptr;
}
std::atomic<int> numAllocations;
std::atomic<int> numDeallocations;
};
} // namespace
TEST_P(BlockBasedTableTest, MemoryAllocator) { TEST_P(BlockBasedTableTest, MemoryAllocator) {
auto custom_memory_allocator = std::make_shared<CustomMemoryAllocator>(); auto default_memory_allocator = std::make_shared<DefaultMemoryAllocator>();
auto custom_memory_allocator =
std::make_shared<CountedMemoryAllocator>(default_memory_allocator);
{ {
Options opt; Options opt;
std::unique_ptr<InternalKeyComparator> ikc; std::unique_ptr<InternalKeyComparator> ikc;
@ -3701,10 +3682,10 @@ TEST_P(BlockBasedTableTest, MemoryAllocator) {
// out of scope, block cache should have been deleted, all allocations // out of scope, block cache should have been deleted, all allocations
// deallocated // deallocated
EXPECT_EQ(custom_memory_allocator->numAllocations.load(), EXPECT_EQ(custom_memory_allocator->GetNumAllocations(),
custom_memory_allocator->numDeallocations.load()); custom_memory_allocator->GetNumDeallocations());
// make sure that allocations actually happened through the cache allocator // make sure that allocations actually happened through the cache allocator
EXPECT_GT(custom_memory_allocator->numAllocations.load(), 0); EXPECT_GT(custom_memory_allocator->GetNumAllocations(), 0);
} }
// Test the file checksum of block based table // Test the file checksum of block based table

@ -0,0 +1,104 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <atomic>
#include "rocksdb/memory_allocator.h"
namespace ROCKSDB_NAMESPACE {
// A memory allocator using new/delete
class DefaultMemoryAllocator : public MemoryAllocator {
public:
static const char* kClassName() { return "DefaultMemoryAllocator"; }
const char* Name() const override { return kClassName(); }
void* Allocate(size_t size) override {
return static_cast<void*>(new char[size]);
}
void Deallocate(void* p) override { delete[] static_cast<char*>(p); }
};
// Base class for a MemoryAllocator. This implementation does nothing
// and implements the methods in failuse mode (assert if the methods are
// invoked). Implementations can extend this class and override these methods
// when they are enabled via compiler switches (e.g., the
// JeMallocMemoryAllocator can define these methods if ROCKSDB_JEMALLOC is
// defined at compile time. If compiled in "disabled" mode, this class provides
// default/failure implementations. If compiled in "enabled" mode, the derived
// class needs to provide the appopriate "enabled" methods for the "real"
// implementation. Failure of the "real" implementation to implement ovreride
// any of these methods will result in an assert failure.
class BaseMemoryAllocator : public MemoryAllocator {
public:
void* Allocate(size_t /*size*/) override {
assert(false);
return nullptr;
}
void Deallocate(void* /*p*/) override { assert(false); }
};
// A Wrapped MemoryAllocator. Delegates the memory allcator functions to the
// wrapped one.
class MemoryAllocatorWrapper : public MemoryAllocator {
public:
// Initialize an MemoryAllocatorWrapper that delegates all calls to *t
explicit MemoryAllocatorWrapper(const std::shared_ptr<MemoryAllocator>& t);
~MemoryAllocatorWrapper() override {}
// Return the target to which to forward all calls
MemoryAllocator* target() const { return target_.get(); }
// Allocate a block of at least size. Has to be thread-safe.
void* Allocate(size_t size) override { return target_->Allocate(size); }
// Deallocate previously allocated block. Has to be thread-safe.
void Deallocate(void* p) override { return target_->Deallocate(p); }
// Returns the memory size of the block allocated at p. The default
// implementation that just returns the original allocation_size is fine.
size_t UsableSize(void* p, size_t allocation_size) const override {
return target_->UsableSize(p, allocation_size);
}
const Customizable* Inner() const override { return target_.get(); }
protected:
std::shared_ptr<MemoryAllocator> target_;
};
// A memory allocator that counts the number of allocations and deallocations
// This class is useful if the number of memory allocations/dellocations is
// important.
class CountedMemoryAllocator : public MemoryAllocatorWrapper {
public:
CountedMemoryAllocator()
: MemoryAllocatorWrapper(std::make_shared<DefaultMemoryAllocator>()),
allocations_(0),
deallocations_(0) {}
explicit CountedMemoryAllocator(const std::shared_ptr<MemoryAllocator>& t)
: MemoryAllocatorWrapper(t), allocations_(0), deallocations_(0) {}
static const char* kClassName() { return "CountedMemoryAllocator"; }
const char* Name() const override { return kClassName(); }
std::string GetId() const override { return std::string(Name()); }
void* Allocate(size_t size) override {
allocations_++;
return MemoryAllocatorWrapper::Allocate(size);
}
void Deallocate(void* p) override {
deallocations_++;
MemoryAllocatorWrapper::Deallocate(p);
}
uint64_t GetNumAllocations() const { return allocations_; }
uint64_t GetNumDeallocations() const { return deallocations_; }
private:
std::atomic<uint64_t> allocations_;
std::atomic<uint64_t> deallocations_;
};
} // namespace ROCKSDB_NAMESPACE
Loading…
Cancel
Save