Improve write buffer manager (and allow the size to be tracked in block cache)

Summary:
Improve write buffer manager in several ways:
1. Size is tracked when arena block is allocated, rather than every allocation, so that it can better track actual memory usage and the tracking overhead is slightly lower.
2. We start to trigger memtable flush when 7/8 of the memory cap hits, instead of 100%, and make 100% much harder to hit.
3. Allow a cache object to be passed into buffer manager and the size allocated by memtable can be costed there. This can help users have one single memory cap across block cache and memtable.
Closes https://github.com/facebook/rocksdb/pull/2350

Differential Revision: D5110648

Pulled By: siying

fbshipit-source-id: b4238113094bf22574001e446b5d88523ba00017
main
Siying Dong 7 years ago committed by Facebook Github Bot
parent a4d9c02511
commit 95b0e89b5d
  1. 4
      CMakeLists.txt
  2. 2
      HISTORY.md
  3. 4
      Makefile
  4. 1328
      TARGETS
  5. 6
      db/db_dynamic_level_test.cc
  6. 6
      db/db_memtable_test.cc
  7. 24
      db/db_test.cc
  8. 148
      db/db_test2.cc
  9. 6
      db/db_test_util.h
  10. 29
      db/memtable.cc
  11. 6
      db/memtable.h
  12. 17
      include/rocksdb/memtablerep.h
  13. 47
      include/rocksdb/write_buffer_manager.h
  14. 46
      memtable/alloc_tracker.cc
  15. 7
      memtable/hash_cuckoo_rep.cc
  16. 2
      memtable/hash_cuckoo_rep.h
  17. 19
      memtable/hash_linklist_rep.cc
  18. 2
      memtable/hash_linklist_rep.h
  19. 8
      memtable/hash_skiplist_rep.cc
  20. 2
      memtable/hash_skiplist_rep.h
  21. 52
      memtable/memtable_allocator.h
  22. 3
      memtable/memtablerep_bench.cc
  23. 24
      memtable/skiplistrep.cc
  24. 19
      memtable/vectorrep.cc
  25. 125
      memtable/write_buffer_manager.cc
  26. 141
      memtable/write_buffer_manager_test.cc
  27. 4
      options/options_helper.h
  28. 6
      src.mk
  29. 8
      tools/db_bench_tool.cc
  30. 27
      util/allocator.h
  31. 32
      util/arena.cc
  32. 4
      util/arena.h
  33. 6
      util/arena_test.cc
  34. 6
      util/concurrent_arena.cc
  35. 1
      util/concurrent_arena.h

@ -382,12 +382,13 @@ set(SOURCES
env/env_chroot.cc
env/env_hdfs.cc
env/mock_env.cc
memtable/alloc_tracker.cc
memtable/hash_cuckoo_rep.cc
memtable/hash_linklist_rep.cc
memtable/hash_skiplist_rep.cc
memtable/memtable_allocator.cc
memtable/skiplistrep.cc
memtable/vectorrep.cc
memtable/write_buffer_manager.cc
monitoring/histogram.cc
monitoring/histogram_windowing.cc
monitoring/instrumented_mutex.cc
@ -666,6 +667,7 @@ set(TESTS
env/mock_env_test.cc
memtable/inlineskiplist_test.cc
memtable/skiplist_test.cc
memtable/write_buffer_manager_test.cc
monitoring/histogram_test.cc
monitoring/iostats_context_test.cc
monitoring/statistics_test.cc

@ -7,6 +7,8 @@
### New Features
* Change ticker/histogram statistics implementations to use core-local storage. This improves aggregation speed compared to our previous thread-local approach, particularly for applications with many threads.
* Users can pass a cache object to write buffer manager, so that they can cap memory usage for memtable and block cache using one single limit.
* Flush will be triggered when 7/8 of the limit introduced by write_buffer_manager or db_write_buffer_size is triggered, so that the hard threshold is hard to hit.
## 5.5.0 (05/17/2017)
### New Features

@ -399,6 +399,7 @@ TESTS = \
external_sst_file_test \
prefix_test \
skiplist_test \
write_buffer_manager_test \
stringappend_test \
ttl_test \
date_tiered_test \
@ -1222,6 +1223,9 @@ inlineskiplist_test: memtable/inlineskiplist_test.o $(LIBOBJECTS) $(TESTHARNESS)
skiplist_test: memtable/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
write_buffer_manager_test: memtable/write_buffer_manager_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

1328
TARGETS

File diff suppressed because it is too large Load Diff

@ -59,7 +59,6 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBase) {
Options options;
options.env = env.get();
options.create_if_missing = true;
options.db_write_buffer_size = 2048;
options.write_buffer_size = 2048;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
@ -129,7 +128,6 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBase2) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.db_write_buffer_size = 204800;
options.write_buffer_size = 20480;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
@ -286,7 +284,6 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesCompactRange) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.db_write_buffer_size = 2048;
options.write_buffer_size = 2048;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
@ -364,7 +361,6 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesCompactRange) {
TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBaseInc) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.db_write_buffer_size = 2048;
options.write_buffer_size = 2048;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
@ -377,6 +373,7 @@ TEST_F(DBTestDynamicLevel, DynamicLevelMaxBytesBaseInc) {
options.soft_rate_limit = 1.1;
options.max_background_compactions = 2;
options.num_levels = 5;
options.max_compaction_bytes = 100000000;
DestroyAndReopen(options);
@ -416,7 +413,6 @@ TEST_F(DBTestDynamicLevel, DISABLED_MigrateToDynamicLevelMaxBytesBase) {
Options options;
options.create_if_missing = true;
options.db_write_buffer_size = 2048;
options.write_buffer_size = 2048;
options.max_write_buffer_number = 8;
options.level0_file_num_compaction_trigger = 4;

@ -23,7 +23,7 @@ class DBMemTableTest : public DBTestBase {
class MockMemTableRep : public MemTableRep {
public:
explicit MockMemTableRep(MemTableAllocator* allocator, MemTableRep* rep)
explicit MockMemTableRep(Allocator* allocator, MemTableRep* rep)
: MemTableRep(allocator), rep_(rep), num_insert_with_hint_(0) {}
virtual KeyHandle Allocate(const size_t len, char** buf) override {
@ -74,7 +74,7 @@ class MockMemTableRep : public MemTableRep {
class MockMemTableRepFactory : public MemTableRepFactory {
public:
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
MemTableAllocator* allocator,
Allocator* allocator,
const SliceTransform* transform,
Logger* logger) override {
SkipListFactory factory;
@ -85,7 +85,7 @@ class MockMemTableRepFactory : public MemTableRepFactory {
}
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator& cmp,
MemTableAllocator* allocator,
Allocator* allocator,
const SliceTransform* transform,
Logger* logger,
uint32_t column_family_id) override {

@ -62,7 +62,6 @@
#include "util/file_reader_writer.h"
#include "util/filename.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/rate_limiter.h"
#include "util/string_util.h"
@ -3692,19 +3691,14 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
Random rnd(301);
Options options;
options.create_if_missing = true;
options.db_write_buffer_size = 6000;
options.write_buffer_size = 6000;
options.db_write_buffer_size = 6000000;
options.write_buffer_size = 600000;
options.max_write_buffer_number = 2;
options.level0_file_num_compaction_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
options.level0_stop_writes_trigger = 2;
options.soft_pending_compaction_bytes_limit = 1024 * 1024;
// Use file size to distinguish levels
// L1: 10, L2: 20, L3 40, L4 80
// L0 is less than 30
options.target_file_size_base = 10;
options.target_file_size_multiplier = 2;
options.target_file_size_base = 20;
options.level_compaction_dynamic_level_bytes = true;
options.max_bytes_for_level_base = 200;
@ -3741,10 +3735,11 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 0; i < 100; i++) {
ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
if (i % 25 == 0) {
dbfull()->TEST_WaitForFlushMemTable();
std::string value = RandomString(&rnd, 200);
ASSERT_OK(Put(Key(keys[i]), value));
if (i % 25 == 24) {
Flush();
dbfull()->TEST_WaitForCompact();
}
}
@ -3785,7 +3780,8 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
for (int i = 101; i < 500; i++) {
ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
std::string value = RandomString(&rnd, 200);
ASSERT_OK(Put(Key(keys[i]), value));
if (i % 100 == 99) {
Flush();
dbfull()->TEST_WaitForCompact();

@ -167,20 +167,48 @@ TEST_F(DBTest2, MaxSuccessiveMergesChangeWithDBRecovery) {
#ifndef ROCKSDB_LITE
class DBTestSharedWriteBufferAcrossCFs
: public DBTestBase,
public testing::WithParamInterface<bool> {
public testing::WithParamInterface<std::tuple<bool, bool>> {
public:
DBTestSharedWriteBufferAcrossCFs()
: DBTestBase("/db_test_shared_write_buffer") {}
void SetUp() override { use_old_interface_ = GetParam(); }
void SetUp() override {
use_old_interface_ = std::get<0>(GetParam());
cost_cache_ = std::get<1>(GetParam());
}
bool use_old_interface_;
bool cost_cache_;
};
TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
Options options = CurrentOptions();
options.arena_block_size = 4096;
// Avoid undeterministic value by malloc_usable_size();
// Force arena block size to 1
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Arena::Arena:0", [&](void* arg) {
size_t* block_size = static_cast<size_t*>(arg);
*block_size = 1;
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Arena::AllocateNewBlock:0", [&](void* arg) {
std::pair<size_t*, size_t*>* pair =
static_cast<std::pair<size_t*, size_t*>*>(arg);
*std::get<0>(*pair) = *std::get<1>(*pair);
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
// The total soft write buffer size is about 105000
std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2);
ASSERT_LT(cache->GetUsage(), 1024 * 1024);
if (use_old_interface_) {
options.db_write_buffer_size = 100000; // this is the real limit
options.db_write_buffer_size = 120000; // this is the real limit
} else if (!cost_cache_) {
options.write_buffer_manager.reset(new WriteBufferManager(114285));
} else {
options.write_buffer_manager.reset(new WriteBufferManager(100000));
options.write_buffer_manager.reset(new WriteBufferManager(114285, cache));
}
options.write_buffer_size = 500000; // this is never hit
CreateAndReopenWithCF({"pikachu", "dobrynia", "nikitich"}, options);
@ -188,6 +216,13 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
WriteOptions wo;
wo.disableWAL = true;
std::function<void()> wait_flush = [&]() {
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
};
// Create some data and flush "default" and "nikitich" so that they
// are newer CFs created.
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
@ -201,13 +236,20 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
static_cast<uint64_t>(1));
ASSERT_OK(Put(3, Key(1), DummyString(30000), wo));
if (cost_cache_) {
ASSERT_GE(cache->GetUsage(), 1024 * 1024);
ASSERT_LE(cache->GetUsage(), 2 * 1024 * 1024);
}
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(60000), wo));
if (cost_cache_) {
ASSERT_GE(cache->GetUsage(), 1024 * 1024);
ASSERT_LE(cache->GetUsage(), 2 * 1024 * 1024);
}
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
// No flush should trigger
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
@ -221,11 +263,9 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
// Trigger a flush. Flushing "nikitich".
ASSERT_OK(Put(3, Key(2), DummyString(30000), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
@ -239,12 +279,11 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
// Without hitting the threshold, no flush should trigger.
ASSERT_OK(Put(2, Key(1), DummyString(30000), wo));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
@ -259,14 +298,15 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
// Hit the write buffer limit again. "default"
// will have been flushed.
ASSERT_OK(Put(2, Key(2), DummyString(10000), wo));
wait_flush();
ASSERT_OK(Put(3, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(2));
@ -281,13 +321,14 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
// Trigger another flush. This time "dobrynia". "pikachu" should not
// be flushed, althrough it was never flushed.
ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(80000), wo));
wait_flush();
ASSERT_OK(Put(1, Key(1), DummyString(1), wo));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(2));
@ -298,16 +339,45 @@ TEST_P(DBTestSharedWriteBufferAcrossCFs, SharedWriteBufferAcrossCFs) {
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "nikitich"),
static_cast<uint64_t>(2));
}
if (cost_cache_) {
ASSERT_GE(cache->GetUsage(), 1024 * 1024);
Close();
options.write_buffer_manager.reset();
ASSERT_LT(cache->GetUsage(), 1024 * 1024);
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBTestSharedWriteBufferAcrossCFs,
DBTestSharedWriteBufferAcrossCFs, ::testing::Bool());
DBTestSharedWriteBufferAcrossCFs,
::testing::Values(std::make_tuple(true, false),
std::make_tuple(false, false),
std::make_tuple(false, true)));
TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
std::string dbname2 = test::TmpDir(env_) + "/db_shared_wb_db2";
Options options = CurrentOptions();
options.arena_block_size = 4096;
// Avoid undeterministic value by malloc_usable_size();
// Force arena block size to 1
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Arena::Arena:0", [&](void* arg) {
size_t* block_size = static_cast<size_t*>(arg);
*block_size = 1;
});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Arena::AllocateNewBlock:0", [&](void* arg) {
std::pair<size_t*, size_t*>* pair =
static_cast<std::pair<size_t*, size_t*>*>(arg);
*std::get<0>(*pair) = *std::get<1>(*pair);
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
options.write_buffer_size = 500000; // this is never hit
options.write_buffer_manager.reset(new WriteBufferManager(100000));
// Use a write buffer total size so that the soft limit is about
// 105000.
options.write_buffer_manager.reset(new WriteBufferManager(120000));
CreateAndReopenWithCF({"cf1", "cf2"}, options);
ASSERT_OK(DestroyDB(dbname2, options));
@ -317,17 +387,25 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
WriteOptions wo;
wo.disableWAL = true;
std::function<void()> wait_flush = [&]() {
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
};
// Trigger a flush on cf2
ASSERT_OK(Put(2, Key(1), DummyString(70000), wo));
wait_flush();
ASSERT_OK(Put(0, Key(1), DummyString(20000), wo));
wait_flush();
// Insert to DB2
ASSERT_OK(db2->Put(wo, Key(2), DummyString(20000)));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
wait_flush();
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default") +
@ -340,10 +418,9 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
// Triggering to flush another CF in DB1
ASSERT_OK(db2->Put(wo, Key(2), DummyString(70000)));
wait_flush();
ASSERT_OK(Put(2, Key(1), DummyString(1), wo));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
wait_flush();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
static_cast<uint64_t>(1));
@ -357,10 +434,9 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
// Triggering flush in DB2.
ASSERT_OK(db2->Put(wo, Key(3), DummyString(40000)));
wait_flush();
ASSERT_OK(db2->Put(wo, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
wait_flush();
static_cast<DBImpl*>(db2)->TEST_WaitForFlushMemTable();
{
ASSERT_EQ(GetNumberOfSstFilesForColumnFamily(db_, "default"),
@ -375,6 +451,8 @@ TEST_F(DBTest2, SharedWriteBufferLimitAcrossDB) {
delete db2;
ASSERT_OK(DestroyDB(dbname2, options));
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
namespace {

@ -123,8 +123,8 @@ enum SkipPolicy { kSkipNone = 0, kSkipNoSnapshot = 1, kSkipNoPrefix = 2 };
// A hacky skip list mem table that triggers flush after number of entries.
class SpecialMemTableRep : public MemTableRep {
public:
explicit SpecialMemTableRep(MemTableAllocator* allocator,
MemTableRep* memtable, int num_entries_flush)
explicit SpecialMemTableRep(Allocator* allocator, MemTableRep* memtable,
int num_entries_flush)
: MemTableRep(allocator),
memtable_(memtable),
num_entries_flush_(num_entries_flush),
@ -186,7 +186,7 @@ class SpecialSkipListFactory : public MemTableRepFactory {
using MemTableRepFactory::CreateMemTableRep;
virtual MemTableRep* CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* logger) override {
return new SpecialMemTableRep(
allocator, factory_.CreateMemTableRep(compare, allocator, transform, 0),

@ -13,7 +13,6 @@
#include <memory>
#include <algorithm>
#include <limits>
#include "db/dbformat.h"
#include "db/merge_context.h"
@ -37,7 +36,6 @@
#include "util/memory_usage.h"
#include "util/murmurhash.h"
#include "util/mutexlock.h"
#include "util/stop_watch.h"
namespace rocksdb {
@ -68,14 +66,18 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
moptions_(ioptions, mutable_cf_options),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
arena_(moptions_.arena_block_size,
mutable_cf_options.memtable_huge_page_size),
allocator_(&arena_, write_buffer_manager),
mem_tracker_(write_buffer_manager),
arena_(
moptions_.arena_block_size,
(write_buffer_manager != nullptr && write_buffer_manager->enabled())
? &mem_tracker_
: nullptr,
mutable_cf_options.memtable_huge_page_size),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &allocator_, ioptions.prefix_extractor,
ioptions.info_log, column_family_id)),
comparator_, &arena_, ioptions.prefix_extractor, ioptions.info_log,
column_family_id)),
range_del_table_(SkipListFactory().CreateMemTableRep(
comparator_, &allocator_, nullptr /* transform */, ioptions.info_log,
comparator_, &arena_, nullptr /* transform */, ioptions.info_log,
column_family_id)),
is_range_del_table_empty_(true),
data_size_(0),
@ -103,13 +105,16 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(
&allocator_, moptions_.memtable_prefix_bloom_bits,
ioptions.bloom_locality, 6 /* hard coded 6 probes */, nullptr,
moptions_.memtable_huge_page_size, ioptions.info_log));
&arena_, moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality,
6 /* hard coded 6 probes */, nullptr, moptions_.memtable_huge_page_size,
ioptions.info_log));
}
}
MemTable::~MemTable() { assert(refs_ == 0); }
MemTable::~MemTable() {
mem_tracker_.FreeMem();
assert(refs_ == 0);
}
size_t MemTable::ApproximateMemoryUsage() {
autovector<size_t> usages = {arena_.ApproximateMemoryUsage(),

@ -20,12 +20,12 @@
#include "db/dbformat.h"
#include "db/range_del_aggregator.h"
#include "db/version_edit.h"
#include "memtable/memtable_allocator.h"
#include "monitoring/instrumented_mutex.h"
#include "options/cf_options.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/memtablerep.h"
#include "util/allocator.h"
#include "util/concurrent_arena.h"
#include "util/dynamic_bloom.h"
#include "util/hash.h"
@ -319,7 +319,7 @@ class MemTable {
// write anything to this MemTable(). (Ie. do not call Add() or Update()).
void MarkImmutable() {
table_->MarkReadOnly();
allocator_.DoneAllocating();
mem_tracker_.DoneAllocating();
}
// return true if the current MemTableRep supports merge operator.
@ -361,8 +361,8 @@ class MemTable {
const MemTableOptions moptions_;
int refs_;
const size_t kArenaBlockSize;
AllocTracker mem_tracker_;
ConcurrentArena arena_;
MemTableAllocator allocator_;
unique_ptr<MemTableRep> table_;
unique_ptr<MemTableRep> range_del_table_;
bool is_range_del_table_empty_;

@ -43,7 +43,7 @@
namespace rocksdb {
class Arena;
class MemTableAllocator;
class Allocator;
class LookupKey;
class Slice;
class SliceTransform;
@ -68,7 +68,7 @@ class MemTableRep {
virtual ~KeyComparator() { }
};
explicit MemTableRep(MemTableAllocator* allocator) : allocator_(allocator) {}
explicit MemTableRep(Allocator* allocator) : allocator_(allocator) {}
// Allocate a buf of len size for storing key. The idea is that a
// specific memtable representation knows its underlying data structure
@ -208,7 +208,7 @@ class MemTableRep {
// user key.
virtual Slice UserKey(const char* key) const;
MemTableAllocator* allocator_;
Allocator* allocator_;
};
// This is the base class for all factories that are used by RocksDB to create
@ -218,11 +218,10 @@ class MemTableRepFactory {
virtual ~MemTableRepFactory() {}
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
MemTableAllocator*,
const SliceTransform*,
Allocator*, const SliceTransform*,
Logger* logger) = 0;
virtual MemTableRep* CreateMemTableRep(
const MemTableRep::KeyComparator& key_cmp, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& key_cmp, Allocator* allocator,
const SliceTransform* slice_transform, Logger* logger,
uint32_t /* column_family_id */) {
return CreateMemTableRep(key_cmp, allocator, slice_transform, logger);
@ -248,8 +247,7 @@ class SkipListFactory : public MemTableRepFactory {
using MemTableRepFactory::CreateMemTableRep;
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
MemTableAllocator*,
const SliceTransform*,
Allocator*, const SliceTransform*,
Logger* logger) override;
virtual const char* Name() const override { return "SkipListFactory"; }
@ -276,8 +274,7 @@ class VectorRepFactory : public MemTableRepFactory {
using MemTableRepFactory::CreateMemTableRep;
virtual MemTableRep* CreateMemTableRep(const MemTableRep::KeyComparator&,
MemTableAllocator*,
const SliceTransform*,
Allocator*, const SliceTransform*,
Logger* logger) override;
virtual const char* Name() const override {

@ -16,17 +16,19 @@
#include <atomic>
#include <cstddef>
#include "rocksdb/cache.h"
namespace rocksdb {
class WriteBufferManager {
public:
// _buffer_size = 0 indicates no limit. Memory won't be tracked,
// _buffer_size = 0 indicates no limit. Memory won't be capped.
// memory_usage() won't be valid and ShouldFlush() will always return true.
explicit WriteBufferManager(size_t _buffer_size)
: buffer_size_(_buffer_size), memory_used_(0) {}
~WriteBufferManager() {}
// if `cache` is provided, we'll put dummy entries in the cache and cost
// the memory allocated to the cache. It can be used even if _buffer_size = 0.
explicit WriteBufferManager(size_t _buffer_size,
std::shared_ptr<Cache> cache = {});
~WriteBufferManager();
bool enabled() const { return buffer_size_ != 0; }
@ -34,21 +36,41 @@ class WriteBufferManager {
size_t memory_usage() const {
return memory_used_.load(std::memory_order_relaxed);
}
size_t mutable_memtable_memory_usage() const {
return memory_active_.load(std::memory_order_relaxed);
}
size_t buffer_size() const { return buffer_size_; }
// Should only be called from write thread
bool ShouldFlush() const {
return enabled() && memory_usage() >= buffer_size();
// Flush if memory usage hits a hard limit, or total size that hasn't been
// scheduled to free hits a soft limit, which is 7/8 of the hard limit.
return enabled() &&
(memory_usage() >= buffer_size() ||
mutable_memtable_memory_usage() >= buffer_size() / 8 * 7);
}
// Should only be called from write thread
void ReserveMem(size_t mem) {
if (enabled()) {
if (cache_rep_ != nullptr) {
ReserveMemWithCache(mem);
} else if (enabled()) {
memory_used_.fetch_add(mem, std::memory_order_relaxed);
}
if (enabled()) {
memory_active_.fetch_add(mem, std::memory_order_relaxed);
}
}
void FreeMem(size_t mem) {
// We are in the process of freeing `mem` bytes, so it is not considered
// when checking the soft limit.
void ScheduleFreeMem(size_t mem) {
if (enabled()) {
memory_active_.fetch_sub(mem, std::memory_order_relaxed);
}
}
void FreeMem(size_t mem) {
if (cache_rep_ != nullptr) {
FreeMemWithCache(mem);
} else if (enabled()) {
memory_used_.fetch_sub(mem, std::memory_order_relaxed);
}
}
@ -56,6 +78,13 @@ class WriteBufferManager {
private:
const size_t buffer_size_;
std::atomic<size_t> memory_used_;
// Memory that hasn't been scheduled to free.
std::atomic<size_t> memory_active_;
struct CacheRep;
std::unique_ptr<CacheRep> cache_rep_;
void ReserveMemWithCache(size_t mem);
void FreeMemWithCache(size_t mem);
// No copying allowed
WriteBufferManager(const WriteBufferManager&) = delete;

@ -9,53 +9,53 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "memtable/memtable_allocator.h"
#include <assert.h>
#include "rocksdb/write_buffer_manager.h"
#include "util/allocator.h"
#include "util/arena.h"
namespace rocksdb {
MemTableAllocator::MemTableAllocator(Allocator* allocator,
WriteBufferManager* write_buffer_manager)
: allocator_(allocator),
write_buffer_manager_(write_buffer_manager),
bytes_allocated_(0) {}
AllocTracker::AllocTracker(WriteBufferManager* write_buffer_manager)
: write_buffer_manager_(write_buffer_manager),
bytes_allocated_(0),
done_allocating_(false),
freed_(false) {}
MemTableAllocator::~MemTableAllocator() { DoneAllocating(); }
AllocTracker::~AllocTracker() { FreeMem(); }
char* MemTableAllocator::Allocate(size_t bytes) {
void AllocTracker::Allocate(size_t bytes) {
assert(write_buffer_manager_ != nullptr);
if (write_buffer_manager_->enabled()) {
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_manager_->ReserveMem(bytes);
}
return allocator_->Allocate(bytes);
}
char* MemTableAllocator::AllocateAligned(size_t bytes, size_t huge_page_size,
Logger* logger) {
assert(write_buffer_manager_ != nullptr);
if (write_buffer_manager_->enabled()) {
bytes_allocated_.fetch_add(bytes, std::memory_order_relaxed);
write_buffer_manager_->ReserveMem(bytes);
void AllocTracker::DoneAllocating() {
if (write_buffer_manager_ != nullptr && !done_allocating_) {
if (write_buffer_manager_->enabled()) {
write_buffer_manager_->ScheduleFreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
assert(bytes_allocated_.load(std::memory_order_relaxed) == 0);
}
done_allocating_ = true;
}
return allocator_->AllocateAligned(bytes, huge_page_size, logger);
}
void MemTableAllocator::DoneAllocating() {
if (write_buffer_manager_ != nullptr) {
void AllocTracker::FreeMem() {
if (!done_allocating_) {
DoneAllocating();
}
if (write_buffer_manager_ != nullptr && !freed_) {
if (write_buffer_manager_->enabled()) {
write_buffer_manager_->FreeMem(
bytes_allocated_.load(std::memory_order_relaxed));
} else {
assert(bytes_allocated_.load(std::memory_order_relaxed) == 0);
}
write_buffer_manager_ = nullptr;
freed_ = true;
}
}
size_t MemTableAllocator::BlockSize() const { return allocator_->BlockSize(); }
} // namespace rocksdb

@ -61,8 +61,7 @@ struct CuckooStep {
class HashCuckooRep : public MemTableRep {
public:
explicit HashCuckooRep(const MemTableRep::KeyComparator& compare,
MemTableAllocator* allocator,
const size_t bucket_count,
Allocator* allocator, const size_t bucket_count,
const unsigned int hash_func_count,
const size_t approximate_entry_size)
: MemTableRep(allocator),
@ -198,7 +197,7 @@ class HashCuckooRep : public MemTableRep {
private:
const MemTableRep::KeyComparator& compare_;
// the pointer to Allocator to allocate memory, immutable after construction.
MemTableAllocator* const allocator_;
Allocator* const allocator_;
// the number of hash bucket in the hash table.
const size_t bucket_count_;
// approximate size of each entry
@ -625,7 +624,7 @@ void HashCuckooRep::Iterator::SeekToLast() {
} // anom namespace
MemTableRep* HashCuckooRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* logger) {
// The estimated average fullness. The write performance of any close hash
// degrades as the fullness of the mem-table increases. Setting kFullness

@ -30,7 +30,7 @@ class HashCuckooRepFactory : public MemTableRepFactory {
using MemTableRepFactory::CreateMemTableRep;
virtual MemTableRep* CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* logger) override;
virtual const char* Name() const override { return "HashCuckooRepFactory"; }

@ -58,7 +58,7 @@ struct SkipListBucketHeader {
MemtableSkipList skip_list;
explicit SkipListBucketHeader(const MemTableRep::KeyComparator& cmp,
MemTableAllocator* allocator, uint32_t count)
Allocator* allocator, uint32_t count)
: Counting_header(this, // Pointing to itself to indicate header type.
count),
skip_list(cmp, allocator) {}
@ -164,7 +164,7 @@ struct Node {
class HashLinkListRep : public MemTableRep {
public:
HashLinkListRep(const MemTableRep::KeyComparator& compare,
MemTableAllocator* allocator, const SliceTransform* transform,
Allocator* allocator, const SliceTransform* transform,
size_t bucket_size, uint32_t threshold_use_skiplist,
size_t huge_page_tlb_size, Logger* logger,
int bucket_entries_logging_threshold,
@ -496,14 +496,11 @@ class HashLinkListRep : public MemTableRep {
};
};
HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare,
MemTableAllocator* allocator,
const SliceTransform* transform,
size_t bucket_size,
uint32_t threshold_use_skiplist,
size_t huge_page_tlb_size, Logger* logger,
int bucket_entries_logging_threshold,
bool if_log_bucket_dist_when_flash)
HashLinkListRep::HashLinkListRep(
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, size_t bucket_size,
uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, Logger* logger,
int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash)
: MemTableRep(allocator),
bucket_size_(bucket_size),
// Threshold to use skip list doesn't make sense if less than 3, so we
@ -831,7 +828,7 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head,
} // anon namespace
MemTableRep* HashLinkListRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* logger) {
return new HashLinkListRep(compare, allocator, transform, bucket_count_,
threshold_use_skiplist_, huge_page_tlb_size_,

@ -30,7 +30,7 @@ class HashLinkListRepFactory : public MemTableRepFactory {
using MemTableRepFactory::CreateMemTableRep;
virtual MemTableRep* CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* logger) override;
virtual const char* Name() const override {

@ -26,7 +26,7 @@ namespace {
class HashSkipListRep : public MemTableRep {
public:
HashSkipListRep(const MemTableRep::KeyComparator& compare,
MemTableAllocator* allocator, const SliceTransform* transform,
Allocator* allocator, const SliceTransform* transform,
size_t bucket_size, int32_t skiplist_height,
int32_t skiplist_branching_factor);
@ -65,7 +65,7 @@ class HashSkipListRep : public MemTableRep {
const MemTableRep::KeyComparator& compare_;
// immutable after construction
MemTableAllocator* const allocator_;
Allocator* const allocator_;
inline size_t GetHash(const Slice& slice) const {
return MurmurHash(slice.data(), static_cast<int>(slice.size()), 0) %
@ -233,7 +233,7 @@ class HashSkipListRep : public MemTableRep {
};
HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare,
MemTableAllocator* allocator,
Allocator* allocator,
const SliceTransform* transform,
size_t bucket_size, int32_t skiplist_height,
int32_t skiplist_branching_factor)
@ -336,7 +336,7 @@ MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) {
} // anon namespace
MemTableRep* HashSkipListRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* logger) {
return new HashSkipListRep(compare, allocator, transform, bucket_count_,
skiplist_height_, skiplist_branching_factor_);

@ -27,7 +27,7 @@ class HashSkipListRepFactory : public MemTableRepFactory {
using MemTableRepFactory::CreateMemTableRep;
virtual MemTableRep* CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* logger) override;
virtual const char* Name() const override {

@ -1,52 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// This is used by the MemTable to allocate write buffer memory. It connects
// to WriteBufferManager so we can track and enforce overall write buffer
// limits.
#pragma once
#include <atomic>
#include "rocksdb/write_buffer_manager.h"
#include "util/allocator.h"
namespace rocksdb {
class Logger;
class MemTableAllocator : public Allocator {
public:
explicit MemTableAllocator(Allocator* allocator,
WriteBufferManager* write_buffer_manager);
~MemTableAllocator();
// Allocator interface
char* Allocate(size_t bytes) override;
char* AllocateAligned(size_t bytes, size_t huge_page_size = 0,
Logger* logger = nullptr) override;
size_t BlockSize() const override;
// Call when we're finished allocating memory so we can free it from
// the write buffer's limit.
void DoneAllocating();
private:
Allocator* allocator_;
WriteBufferManager* write_buffer_manager_;
std::atomic<size_t> bytes_allocated_;
// No copying allowed
MemTableAllocator(const MemTableAllocator&);
void operator=(const MemTableAllocator&);
};
} // namespace rocksdb

@ -627,11 +627,10 @@ int main(int argc, char** argv) {
rocksdb::MemTable::KeyComparator key_comp(internal_key_comp);
rocksdb::Arena arena;
rocksdb::WriteBufferManager wb(FLAGS_write_buffer_size);
rocksdb::MemTableAllocator memtable_allocator(&arena, &wb);
uint64_t sequence;
auto createMemtableRep = [&] {
sequence = 0;
return factory->CreateMemTableRep(key_comp, &memtable_allocator,
return factory->CreateMemTableRep(key_comp, &arena,
options.prefix_extractor.get(),
options.info_log.get());
};

@ -20,16 +20,18 @@ class SkipListRep : public MemTableRep {
friend class LookaheadIterator;
public:
explicit SkipListRep(const MemTableRep::KeyComparator& compare,
MemTableAllocator* allocator,
const SliceTransform* transform, const size_t lookahead)
: MemTableRep(allocator), skip_list_(compare, allocator), cmp_(compare),
transform_(transform), lookahead_(lookahead) {
}
virtual KeyHandle Allocate(const size_t len, char** buf) override {
*buf = skip_list_.AllocateKey(len);
return static_cast<KeyHandle>(*buf);
explicit SkipListRep(const MemTableRep::KeyComparator& compare,
Allocator* allocator, const SliceTransform* transform,
const size_t lookahead)
: MemTableRep(allocator),
skip_list_(compare, allocator),
cmp_(compare),
transform_(transform),
lookahead_(lookahead) {}
virtual KeyHandle Allocate(const size_t len, char** buf) override {
*buf = skip_list_.AllocateKey(len);
return static_cast<KeyHandle>(*buf);
}
// Insert key into the list.
@ -269,7 +271,7 @@ public:
}
MemTableRep* SkipListFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform* transform, Logger* logger) {
return new SkipListRep(compare, allocator, transform, lookahead_);
}

@ -27,8 +27,7 @@ using namespace stl_wrappers;
class VectorRep : public MemTableRep {
public:
VectorRep(const KeyComparator& compare, MemTableAllocator* allocator,
size_t count);
VectorRep(const KeyComparator& compare, Allocator* allocator, size_t count);
// Insert key into the collection. (The caller will pack key and value into a
// single buffer and pass that in as the parameter to Insert)
@ -138,13 +137,15 @@ size_t VectorRep::ApproximateMemoryUsage() {
);
}
VectorRep::VectorRep(const KeyComparator& compare, MemTableAllocator* allocator,
VectorRep::VectorRep(const KeyComparator& compare, Allocator* allocator,
size_t count)
: MemTableRep(allocator),
bucket_(new Bucket()),
immutable_(false),
sorted_(false),
compare_(compare) { bucket_.get()->reserve(count); }
: MemTableRep(allocator),
bucket_(new Bucket()),
immutable_(false),
sorted_(false),
compare_(compare) {
bucket_.get()->reserve(count);
}
VectorRep::Iterator::Iterator(class VectorRep* vrep,
std::shared_ptr<std::vector<const char*>> bucket,
@ -296,7 +297,7 @@ MemTableRep::Iterator* VectorRep::GetIterator(Arena* arena) {
} // anon namespace
MemTableRep* VectorRepFactory::CreateMemTableRep(
const MemTableRep::KeyComparator& compare, MemTableAllocator* allocator,
const MemTableRep::KeyComparator& compare, Allocator* allocator,
const SliceTransform*, Logger* logger) {
return new VectorRep(compare, allocator, count_);
}

@ -0,0 +1,125 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/write_buffer_manager.h"
#include <mutex>
#include "util/coding.h"
namespace rocksdb {
#ifndef ROCKSDB_LITE
namespace {
const size_t kSizeDummyEntry = 1024 * 1024;
// The key will be longer than keys for blocks in SST files so they won't
// conflict.
const size_t kCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
} // namespace
struct WriteBufferManager::CacheRep {
std::shared_ptr<Cache> cache_;
std::mutex cache_mutex_;
std::atomic<size_t> cache_allocated_size_;
// The non-prefix part will be updated according to the ID to use.
char cache_key_[kCacheKeyPrefix + kMaxVarint64Length];
uint64_t next_cache_key_id_ = 0;
std::vector<Cache::Handle*> dummy_handles_;
explicit CacheRep(std::shared_ptr<Cache> cache)
: cache_(cache), cache_allocated_size_(0) {
memset(cache_key_, 0, kCacheKeyPrefix);
size_t pointer_size = sizeof(const void*);
assert(pointer_size <= kCacheKeyPrefix);
memcpy(cache_key_, static_cast<const void*>(this), pointer_size);
}
Slice GetNextCacheKey() {
memset(cache_key_ + kCacheKeyPrefix, 0, kMaxVarint64Length);
char* end =
EncodeVarint64(cache_key_ + kCacheKeyPrefix, next_cache_key_id_++);
return Slice(cache_key_, static_cast<size_t>(end - cache_key_));
}
};
#else
struct WriteBufferManager::CacheRep {};
#endif // ROCKSDB_LITE
WriteBufferManager::WriteBufferManager(size_t _buffer_size,
std::shared_ptr<Cache> cache)
: buffer_size_(_buffer_size),
memory_used_(0),
memory_active_(0),
cache_rep_(nullptr) {
#ifndef ROCKSDB_LITE
if (cache) {
// Construct the cache key using the pointer to this.
cache_rep_.reset(new CacheRep(cache));
}
#endif // ROCKSDB_LITE
}
WriteBufferManager::~WriteBufferManager() {
#ifndef ROCKSDB_LITE
if (cache_rep_) {
for (auto* handle : cache_rep_->dummy_handles_) {
cache_rep_->cache_->Release(handle, true);
}
}
#endif // ROCKSDB_LITE
}
// Should only be called from write thread
void WriteBufferManager::ReserveMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
assert(cache_rep_ != nullptr);
// Use a mutex to protect various data structures. Can be optimzied to a
// lock-free solution if it ends up with a performance bottleneck.
std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) + mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
while (new_mem_used > cache_rep_->cache_allocated_size_) {
// Expand size by at least 1MB.
// Add a dummy record to the cache
Cache::Handle* handle;
cache_rep_->cache_->Insert(cache_rep_->GetNextCacheKey(), nullptr,
kSizeDummyEntry, nullptr, &handle);
cache_rep_->dummy_handles_.push_back(handle);
cache_rep_->cache_allocated_size_ += kSizeDummyEntry;
}
#endif // ROCKSDB_LITE
}
void WriteBufferManager::FreeMemWithCache(size_t mem) {
#ifndef ROCKSDB_LITE
assert(cache_rep_ != nullptr);
// Use a mutex to protect various data structures. Can be optimzied to a
// lock-free solution if it ends up with a performance bottleneck.
std::lock_guard<std::mutex> lock(cache_rep_->cache_mutex_);
size_t new_mem_used = memory_used_.load(std::memory_order_relaxed) - mem;
memory_used_.store(new_mem_used, std::memory_order_relaxed);
// Gradually shrink memory costed in the block cache if the actual
// usage is less than 3/4 of what we reserve from the block cache.
// We do this becausse:
// 1. we don't pay the cost of the block cache immediately a memtable is
// freed, as block cache insert is expensive;
// 2. eventually, if we walk away from a temporary memtable size increase,
// we make sure shrink the memory costed in block cache over time.
// In this way, we only shrink costed memory showly even there is enough
// margin.
if (new_mem_used < cache_rep_->cache_allocated_size_ / 4 * 3 &&
cache_rep_->cache_allocated_size_ - kSizeDummyEntry > new_mem_used) {
assert(!cache_rep_->dummy_handles_.empty());
cache_rep_->cache_->Release(cache_rep_->dummy_handles_.back(), true);
cache_rep_->dummy_handles_.pop_back();
cache_rep_->cache_allocated_size_ -= kSizeDummyEntry;
}
#endif // ROCKSDB_LITE
}
} // namespace rocksdb

@ -0,0 +1,141 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
// This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "rocksdb/write_buffer_manager.h"
#include "util/testharness.h"
namespace rocksdb {
class WriteBufferManagerTest : public testing::Test {};
#ifndef ROCKSDB_LITE
TEST_F(WriteBufferManagerTest, ShouldFlush) {
// A write buffer manager of size 50MB
std::unique_ptr<WriteBufferManager> wbf(
new WriteBufferManager(10 * 1024 * 1024));
wbf->ReserveMem(8 * 1024 * 1024);
ASSERT_FALSE(wbf->ShouldFlush());
// 90% of the hard limit will hit the condition
wbf->ReserveMem(1 * 1024 * 1024);
ASSERT_TRUE(wbf->ShouldFlush());
// Scheduling for feeing will release the condition
wbf->ScheduleFreeMem(1 * 1024 * 1024);
ASSERT_FALSE(wbf->ShouldFlush());
wbf->ReserveMem(2 * 1024 * 1024);
ASSERT_TRUE(wbf->ShouldFlush());
wbf->ScheduleFreeMem(5 * 1024 * 1024);
// hard limit still hit
ASSERT_TRUE(wbf->ShouldFlush());
wbf->FreeMem(10 * 1024 * 1024);
ASSERT_FALSE(wbf->ShouldFlush());
}
TEST_F(WriteBufferManagerTest, CacheCost) {
// 1GB cache
std::shared_ptr<Cache> cache = NewLRUCache(1024 * 1024 * 1024, 4);
// A write buffer manager of size 50MB
std::unique_ptr<WriteBufferManager> wbf(
new WriteBufferManager(50 * 1024 * 1024, cache));
// Allocate 1.5MB will allocate 2MB
wbf->ReserveMem(1536 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 2 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 2 * 1024 * 1024 + 10000);
// Allocate another 2MB
wbf->ReserveMem(2 * 1024 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 4 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 4 * 1024 * 1024 + 10000);
// Allocate another 20MB
wbf->ReserveMem(20 * 1024 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 24 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 24 * 1024 * 1024 + 10000);
// Free 2MB will not cause any change in cache cost
wbf->FreeMem(2 * 1024 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 24 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 24 * 1024 * 1024 + 10000);
ASSERT_FALSE(wbf->ShouldFlush());
// Allocate another 30MB
wbf->ReserveMem(30 * 1024 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 52 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 52 * 1024 * 1024 + 10000);
ASSERT_TRUE(wbf->ShouldFlush());
ASSERT_TRUE(wbf->ShouldFlush());
wbf->ScheduleFreeMem(20 * 1024 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 52 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 52 * 1024 * 1024 + 10000);
// Still need flush as the hard limit hits
ASSERT_TRUE(wbf->ShouldFlush());
// Free 20MB will releae 1MB from cache
wbf->FreeMem(20 * 1024 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 51 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 51 * 1024 * 1024 + 10000);
ASSERT_FALSE(wbf->ShouldFlush());
// Every free will release 1MB if still not hit 3/4
wbf->FreeMem(16 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 50 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 50 * 1024 * 1024 + 10000);
wbf->FreeMem(16 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 49 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 49 * 1024 * 1024 + 10000);
// Free 2MB will not cause any change in cache cost
wbf->ReserveMem(2 * 1024 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 49 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 49 * 1024 * 1024 + 10000);
wbf->FreeMem(16 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 48 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 48 * 1024 * 1024 + 10000);
// Destory write buffer manger should free everything
wbf.reset();
ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024);
}
TEST_F(WriteBufferManagerTest, NoCapCacheCost) {
// 1GB cache
std::shared_ptr<Cache> cache = NewLRUCache(1024 * 1024 * 1024, 4);
// A write buffer manager of size 256MB
std::unique_ptr<WriteBufferManager> wbf(new WriteBufferManager(0, cache));
// Allocate 1.5MB will allocate 2MB
wbf->ReserveMem(10 * 1024 * 1024);
ASSERT_GE(cache->GetPinnedUsage(), 10 * 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 10 * 1024 * 1024 + 10000);
ASSERT_FALSE(wbf->ShouldFlush());
wbf->FreeMem(9 * 1024 * 1024);
for (int i = 0; i < 10; i++) {
wbf->FreeMem(16 * 1024);
}
ASSERT_GE(cache->GetPinnedUsage(), 1024 * 1024);
ASSERT_LT(cache->GetPinnedUsage(), 1024 * 1024 + 10000);
}
#endif // ROCKSDB_LITE
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -344,8 +344,8 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
OptionType::kBoolean, OptionVerificationType::kNormal, true,
offsetof(struct MutableDBOptions, avoid_flush_during_shutdown)}},
{"allow_ingest_behind",
{offsetof(struct DBOptions, allow_ingest_behind),
OptionType::kBoolean, OptionVerificationType::kNormal, false,
{offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}};
// offset_of is used to get the offset of a class data member

@ -60,13 +60,14 @@ LIB_SOURCES = \
env/env_hdfs.cc \
env/env_posix.cc \
env/io_posix.cc \
env/mock_env.cc \
env/mock_env.cc \
memtable/alloc_tracker.cc \
memtable/hash_cuckoo_rep.cc \
memtable/hash_linklist_rep.cc \
memtable/hash_skiplist_rep.cc \
memtable/memtable_allocator.cc \
memtable/skiplistrep.cc \
memtable/vectorrep.cc \
memtable/write_buffer_manager.cc \
monitoring/histogram.cc \
monitoring/histogram_windowing.cc \
monitoring/instrumented_mutex.cc \
@ -286,6 +287,7 @@ MAIN_SOURCES = \
memtable/inlineskiplist_test.cc \
memtable/memtablerep_bench.cc \
memtable/skiplist_test.cc \
memtable/write_buffer_manager_test.cc \
monitoring/histogram_test.cc \
monitoring/iostats_context_test.cc \
monitoring/statistics_test.cc \

@ -279,6 +279,9 @@ DEFINE_bool(enable_numa, false,
DEFINE_int64(db_write_buffer_size, rocksdb::Options().db_write_buffer_size,
"Number of bytes to buffer in all memtables before compacting");
DEFINE_bool(cost_write_buffer_to_cache, false,
"The usage of memtable is costed to the block cache");
DEFINE_int64(write_buffer_size, rocksdb::Options().write_buffer_size,
"Number of bytes to buffer in memtable before compacting");
@ -2823,7 +2826,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
options.create_missing_column_families = FLAGS_num_column_families > 1;
options.max_open_files = FLAGS_open_files;
options.db_write_buffer_size = FLAGS_db_write_buffer_size;
if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) {
options.write_buffer_manager.reset(
new WriteBufferManager(FLAGS_db_write_buffer_size, cache_));
}
options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.min_write_buffer_number_to_merge =

@ -13,8 +13,9 @@
// when the allocator object is destroyed. See the Arena class for more info.
#pragma once
#include <cstddef>
#include <cerrno>
#include <cstddef>
#include "rocksdb/write_buffer_manager.h"
namespace rocksdb {
@ -31,4 +32,28 @@ class Allocator {
virtual size_t BlockSize() const = 0;
};
class AllocTracker {
public:
explicit AllocTracker(WriteBufferManager* write_buffer_manager);
~AllocTracker();
void Allocate(size_t bytes);
// Call when we're finished allocating memory so we can free it from
// the write buffer's limit.
void DoneAllocating();
void FreeMem();
bool is_freed() const { return write_buffer_manager_ == nullptr || freed_; }
private:
WriteBufferManager* write_buffer_manager_;
std::atomic<size_t> bytes_allocated_;
bool done_allocating_;
bool freed_;
// No copying allowed
AllocTracker(const AllocTracker&);
void operator=(const AllocTracker&);
};
} // namespace rocksdb

@ -24,6 +24,7 @@
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/logging.h"
#include "util/sync_point.h"
namespace rocksdb {
@ -49,10 +50,11 @@ size_t OptimizeBlockSize(size_t block_size) {
return block_size;
}
Arena::Arena(size_t block_size, size_t huge_page_size)
: kBlockSize(OptimizeBlockSize(block_size)) {
Arena::Arena(size_t block_size, AllocTracker* tracker, size_t huge_page_size)
: kBlockSize(OptimizeBlockSize(block_size)), tracker_(tracker) {
assert(kBlockSize >= kMinBlockSize && kBlockSize <= kMaxBlockSize &&
kBlockSize % kAlignUnit == 0);
TEST_SYNC_POINT_CALLBACK("Arena::Arena:0", const_cast<size_t*>(&kBlockSize));
alloc_bytes_remaining_ = sizeof(inline_block_);
blocks_memory_ += alloc_bytes_remaining_;
aligned_alloc_ptr_ = inline_block_;
@ -63,9 +65,16 @@ Arena::Arena(size_t block_size, size_t huge_page_size)
hugetlb_size_ = ((kBlockSize - 1U) / hugetlb_size_ + 1U) * hugetlb_size_;
}
#endif
if (tracker_ != nullptr) {
tracker_->Allocate(kInlineSize);
}
}
Arena::~Arena() {
if (tracker_ != nullptr) {
assert(tracker_->is_freed());
tracker_->FreeMem();
}
for (const auto& block : blocks_) {
delete[] block;
}
@ -134,6 +143,9 @@ char* Arena::AllocateFromHugePage(size_t bytes) {
// the following shouldn't throw because of the above reserve()
huge_blocks_.emplace_back(MmapInfo(addr, bytes));
blocks_memory_ += bytes;
if (tracker_ != nullptr) {
tracker_->Allocate(bytes);
}
return reinterpret_cast<char*>(addr);
#else
return nullptr;
@ -190,12 +202,22 @@ char* Arena::AllocateNewBlock(size_t block_bytes) {
blocks_.reserve(blocks_.size() + 1);
char* block = new char[block_bytes];
size_t allocated_size;
#ifdef ROCKSDB_MALLOC_USABLE_SIZE
blocks_memory_ += malloc_usable_size(block);
allocated_size = malloc_usable_size(block);
#ifndef NDEBUG
// It's hard to predict what malloc_usable_size() returns.
// A callback can allow users to change the costed size.
std::pair<size_t*, size_t*> pair(&allocated_size, &block_bytes);
TEST_SYNC_POINT_CALLBACK("Arena::AllocateNewBlock:0", &pair);
#endif // NDEBUG
#else
blocks_memory_ += block_bytes;
allocated_size = block_bytes;
#endif // ROCKSDB_MALLOC_USABLE_SIZE
blocks_memory_ += allocated_size;
if (tracker_ != nullptr) {
tracker_->Allocate(allocated_size);
}
// the following shouldn't throw because of the above reserve()
blocks_.push_back(block);
return block;

@ -40,7 +40,8 @@ class Arena : public Allocator {
// huge_page_size: if 0, don't use huge page TLB. If > 0 (should set to the
// supported hugepage size of the system), block allocation will try huge
// page TLB first. If allocation fails, will fall back to normal case.
explicit Arena(size_t block_size = kMinBlockSize, size_t huge_page_size = 0);
explicit Arena(size_t block_size = kMinBlockSize,
AllocTracker* tracker = nullptr, size_t huge_page_size = 0);
~Arena();
char* Allocate(size_t bytes) override;
@ -114,6 +115,7 @@ class Arena : public Allocator {
// Bytes of memory in blocks allocated so far
size_t blocks_memory_ = 0;
AllocTracker* tracker_;
};
inline char* Arena::Allocate(size_t bytes) {

@ -37,7 +37,7 @@ void MemoryAllocatedBytesTest(size_t huge_page_size) {
size_t bsz = 32 * 1024; // block size
size_t expected_memory_allocated;
Arena arena(bsz, huge_page_size);
Arena arena(bsz, nullptr, huge_page_size);
// requested size > quarter of a block:
// allocate requested size separately
@ -89,7 +89,7 @@ static void ApproximateMemoryUsageTest(size_t huge_page_size) {
const size_t kBlockSize = 4096;
const size_t kEntrySize = kBlockSize / 8;
const size_t kZero = 0;
Arena arena(kBlockSize, huge_page_size);
Arena arena(kBlockSize, nullptr, huge_page_size);
ASSERT_EQ(kZero, arena.ApproximateMemoryUsage());
// allocate inline bytes
@ -131,7 +131,7 @@ static void ApproximateMemoryUsageTest(size_t huge_page_size) {
static void SimpleTest(size_t huge_page_size) {
std::vector<std::pair<size_t, char*>> allocated;
Arena arena(Arena::kMinBlockSize, huge_page_size);
Arena arena(Arena::kMinBlockSize, nullptr, huge_page_size);
const int N = 100000;
size_t bytes = 0;
Random rnd(301);

@ -11,7 +11,6 @@
#include "util/concurrent_arena.h"
#include <thread>
#include "port/likely.h"
#include "port/port.h"
#include "util/random.h"
@ -21,10 +20,11 @@ namespace rocksdb {
__thread size_t ConcurrentArena::tls_cpuid = 0;
#endif
ConcurrentArena::ConcurrentArena(size_t block_size, size_t huge_page_size)
ConcurrentArena::ConcurrentArena(size_t block_size, AllocTracker* tracker,
size_t huge_page_size)
: shard_block_size_(block_size / 8),
shards_(),
arena_(block_size, huge_page_size) {
arena_(block_size, tracker, huge_page_size) {
Fixup();
}

@ -46,6 +46,7 @@ class ConcurrentArena : public Allocator {
// shards compute their shard_block_size as a fraction of block_size
// that varies according to the hardware concurrency level.
explicit ConcurrentArena(size_t block_size = Arena::kMinBlockSize,
AllocTracker* tracker = nullptr,
size_t huge_page_size = 0);
char* Allocate(size_t bytes) override {

Loading…
Cancel
Save