Avoid updating memtable allocated bytes if write_buffer_size is not set

Summary: If options.write_buffer_size is not set, nor options.write_buffer_manager, no need to update the bytes allocated counter in MemTableAllocator, which is expensive in parallel memtable insert case. Remove it can improve parallel memtable insert throughput by 10% with write batch size 128.

Test Plan:
Run benchmarks
TEST_TMPDIR=/dev/shm/ ./db_bench --benchmarks=fillrandom -disable_auto_compactions -level0_slowdown_writes_trigger=9999 -level0_stop_writes_trigger=9999 -num=10000000 --writes=1000000 -max_background_flushes=16 -max_write_buffer_number=16 --threads=32 --batch_size=128   -allow_concurrent_memtable_write -enable_write_thread_adaptive_yield

The throughput grows 10% with the benchmark.

Reviewers: andrewkr, yiwu, IslamAbdelRahman, igor, ngbronson

Reviewed By: ngbronson

Subscribers: ngbronson, leveldb, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60465
main
sdong 8 years ago
parent dda6c72ac8
commit 6797e6ffac
  1. 20
      db/memtable_allocator.cc
  2. 15
      include/rocksdb/write_buffer_manager.h

@ -25,23 +25,31 @@ MemTableAllocator::~MemTableAllocator() { DoneAllocating(); }
char* MemTableAllocator::Allocate(size_t bytes) { char* MemTableAllocator::Allocate(size_t bytes) {
assert(write_buffer_manager_ != nullptr); assert(write_buffer_manager_ != nullptr);
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); if (write_buffer_manager_->enabled()) {
write_buffer_manager_->ReserveMem(bytes); bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_manager_->ReserveMem(bytes);
}
return allocator_->Allocate(bytes); return allocator_->Allocate(bytes);
} }
char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size, char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size,
Logger* logger) { Logger* logger) {
assert(write_buffer_manager_ != nullptr); assert(write_buffer_manager_ != nullptr);
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed); if (write_buffer_manager_->enabled()) {
write_buffer_manager_->ReserveMem(bytes); bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_manager_->ReserveMem(bytes);
}
return allocator_->AllocateAligned(bytes, huge_page_size, logger); return allocator_->AllocateAligned(bytes, huge_page_size, logger);
} }
void MemTableAllocator::DoneAllocating() { void MemTableAllocator::DoneAllocating() {
if (write_buffer_manager_ != nullptr) { if (write_buffer_manager_ != nullptr) {
write_buffer_manager_->FreeMem( if (write_buffer_manager_->enabled()) {
bytes_allocated_.load(std::memory_order_relaxed)); write_buffer_manager_->FreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
assert(bytes_allocated_.load(std::memory_order_relaxed) == 0);
}
write_buffer_manager_ = nullptr; write_buffer_manager_ = nullptr;
} }
} }

@ -19,11 +19,16 @@ namespace rocksdb {
class WriteBufferManager { class WriteBufferManager {
public: public:
// _buffer_size = 0 indicates no limit. Memory won't be tracked,
// memory_usage() won't be valid and ShouldFlush() will always return true.
explicit WriteBufferManager(size_t _buffer_size) explicit WriteBufferManager(size_t _buffer_size)
: buffer_size_(_buffer_size), memory_used_(0) {} : buffer_size_(_buffer_size), memory_used_(0) {}
~WriteBufferManager() {} ~WriteBufferManager() {}
bool enabled() const { return buffer_size_ != 0; }
// Only valid if enabled()
size_t memory_usage() const { size_t memory_usage() const {
return memory_used_.load(std::memory_order_relaxed); return memory_used_.load(std::memory_order_relaxed);
} }
@ -31,15 +36,19 @@ class WriteBufferManager {
// Should only be called from write thread // Should only be called from write thread
bool ShouldFlush() const { bool ShouldFlush() const {
return buffer_size() > 0 && memory_usage() >= buffer_size(); return enabled() && memory_usage() >= buffer_size();
} }
// Should only be called from write thread // Should only be called from write thread
void ReserveMem(size_t mem) { void ReserveMem(size_t mem) {
memory_used_.fetch_add(mem, std::memory_order_relaxed); if (enabled()) {
memory_used_.fetch_add(mem, std::memory_order_relaxed);
}
} }
void FreeMem(size_t mem) { void FreeMem(size_t mem) {
memory_used_.fetch_sub(mem, std::memory_order_relaxed); if (enabled()) {
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
}
} }
private: private:

Loading…
Cancel
Save