A heuristic way to check if a memtable is full

Summary:
This is is based on https://reviews.facebook.net/D15027. It's not finished but I would like to give a prototype to avoid arena over-allocation while making better use of the already allocated memory blocks.

Instead of check approximate memtable size, we will take a deeper look at the arena, which incorporate essential idea that @sdong suggests: flush when arena has allocated its last and the last is "almost full"

Test Plan: N/A

Reviewers: haobo, sdong

Reviewed By: sdong

CC: leveldb, sdong

Differential Revision: https://reviews.facebook.net/D15051
main
Kai Liu 11 years ago
parent 7b7793e97a
commit 11da8bc5df
  1. 6
      db/db_impl.cc
  2. 105
      db/db_test.cc
  3. 68
      db/memtable.cc
  4. 14
      db/memtable.h
  5. 1
      util/arena.cc
  6. 7
      util/arena.h

@ -1094,8 +1094,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
*max_sequence = last_seq; *max_sequence = last_seq;
} }
if (!read_only && if (!read_only && mem_->ShouldFlush()) {
mem_->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0TableForRecovery(mem_, &edit); status = WriteLevel0TableForRecovery(mem_, &edit);
// we still want to clear memtable, even if the recovery failed // we still want to clear memtable, even if the recovery failed
delete mem_->Unref(); delete mem_->Unref();
@ -3533,8 +3532,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
allow_delay = false; // Do not delay a single write more than once allow_delay = false; // Do not delay a single write more than once
mutex_.Lock(); mutex_.Lock();
delayed_writes_++; delayed_writes_++;
} else if (!force && } else if (!force && !mem_->ShouldFlush()) {
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable // There is room in current memtable
if (allow_delay) { if (allow_delay) {
DelayLoggingAndReset(); DelayLoggingAndReset();

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm> #include <algorithm>
#include <iostream>
#include <set> #include <set>
#include <unistd.h> #include <unistd.h>
#include <unordered_set> #include <unordered_set>
@ -23,20 +24,20 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/filter_policy.h" #include "rocksdb/filter_policy.h"
#include "rocksdb/perf_context.h" #include "rocksdb/perf_context.h"
#include "table/plain_table_factory.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h" #include "rocksdb/slice_transform.h"
#include "rocksdb/table.h" #include "rocksdb/table.h"
#include "rocksdb/table_properties.h" #include "rocksdb/table_properties.h"
#include "table/block_based_table_factory.h" #include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h"
#include "util/hash.h" #include "util/hash.h"
#include "util/hash_linklist_rep.h" #include "util/hash_linklist_rep.h"
#include "utilities/merge_operators.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/statistics.h" #include "util/statistics.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
#include "utilities/merge_operators.h"
namespace rocksdb { namespace rocksdb {
@ -850,6 +851,28 @@ void VerifyTableProperties(DB* db, uint64_t expected_entries_size) {
ASSERT_EQ(expected_entries_size, sum); ASSERT_EQ(expected_entries_size, sum);
} }
std::unordered_map<std::string, size_t> GetMemoryUsage(MemTable* memtable) {
const auto& arena = memtable->TEST_GetArena();
return {{"memtable.approximate.usage", memtable->ApproximateMemoryUsage()},
{"arena.approximate.usage", arena.ApproximateMemoryUsage()},
{"arena.allocated.memory", arena.MemoryAllocatedBytes()},
{"arena.unused.bytes", arena.AllocatedAndUnused()},
{"irregular.blocks", arena.IrregularBlockNum()}};
}
void PrintMemoryUsage(const std::unordered_map<std::string, size_t>& usage) {
for (const auto& item : usage) {
std::cout << "\t" << item.first << ": " << item.second << std::endl;
}
}
void AddRandomKV(MemTable* memtable, Random* rnd, size_t arena_block_size) {
memtable->Add(0, kTypeValue, RandomString(rnd, 20) /* key */,
// make sure we will be able to generate some over sized entries
RandomString(rnd, rnd->Uniform(arena_block_size / 4) * 1.15 +
10) /* value */);
}
TEST(DBTest, Empty) { TEST(DBTest, Empty) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
@ -1922,7 +1945,7 @@ TEST(DBTest, NumImmutableMemTable) {
options.write_buffer_size = 1000000; options.write_buffer_size = 1000000;
Reopen(&options); Reopen(&options);
std::string big_value(1000000, 'x'); std::string big_value(1000000 * 2, 'x');
std::string num; std::string num;
SetPerfLevel(kEnableTime);; SetPerfLevel(kEnableTime);;
@ -2205,6 +2228,10 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1), 1); ASSERT_EQ(NumTableFilesAtLevel(1), 1);
} }
// TODO(kailiu) The tests on UniversalCompaction has some issues:
// 1. A lot of magic numbers ("11" or "12").
// 2. Made assumption on the memtable flush conidtions, which may change from
// time to time.
TEST(DBTest, UniversalCompactionTrigger) { TEST(DBTest, UniversalCompactionTrigger) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal; options.compaction_style = kCompactionStyleUniversal;
@ -2222,8 +2249,8 @@ TEST(DBTest, UniversalCompactionTrigger) {
for (int num = 0; for (int num = 0;
num < options.level0_file_num_compaction_trigger-1; num < options.level0_file_num_compaction_trigger-1;
num++) { num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2233,7 +2260,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Generate one more file at level-0, which should trigger level-0 // Generate one more file at level-0, which should trigger level-0
// compaction. // compaction.
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2257,8 +2284,8 @@ TEST(DBTest, UniversalCompactionTrigger) {
for (int num = 0; for (int num = 0;
num < options.level0_file_num_compaction_trigger-3; num < options.level0_file_num_compaction_trigger-3;
num++) { num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2268,7 +2295,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Generate one more file at level-0, which should trigger level-0 // Generate one more file at level-0, which should trigger level-0
// compaction. // compaction.
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2286,8 +2313,8 @@ TEST(DBTest, UniversalCompactionTrigger) {
for (int num = 0; for (int num = 0;
num < options.level0_file_num_compaction_trigger-3; num < options.level0_file_num_compaction_trigger-3;
num++) { num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2312,7 +2339,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Stage 4: // Stage 4:
// Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
// new file of size 1. // new file of size 1.
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2326,7 +2353,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Stage 5: // Stage 5:
// Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
// a new file of size 1. // a new file of size 1.
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2356,8 +2383,8 @@ TEST(DBTest, UniversalCompactionSizeAmplification) {
for (int num = 0; for (int num = 0;
num < options.level0_file_num_compaction_trigger-1; num < options.level0_file_num_compaction_trigger-1;
num++) { num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2392,8 +2419,8 @@ TEST(DBTest, UniversalCompactionOptions) {
for (int num = 0; for (int num = 0;
num < options.level0_file_num_compaction_trigger; num < options.level0_file_num_compaction_trigger;
num++) { num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2431,8 +2458,8 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
for (int num = 0; for (int num = 0;
num < options.level0_file_num_compaction_trigger-1; num < options.level0_file_num_compaction_trigger-1;
num++) { num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2442,7 +2469,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
// Generate one more file at level-0, which should trigger level-0 // Generate one more file at level-0, which should trigger level-0
// compaction. // compaction.
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2463,8 +2490,8 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
for (int num = 0; for (int num = 0;
num < options.level0_file_num_compaction_trigger-3; num < options.level0_file_num_compaction_trigger-3;
num++) { num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2474,7 +2501,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
// Generate one more file at level-0, which should trigger level-0 // Generate one more file at level-0, which should trigger level-0
// compaction. // compaction.
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2485,7 +2512,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
// Stage 3: // Stage 3:
// Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
// more file at level-0, which should trigger level-0 compaction. // more file at level-0, which should trigger level-0 compaction.
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++; key_idx++;
} }
@ -2593,54 +2620,54 @@ TEST(DBTest, UniversalCompactionCompressRatio1) {
// The first compaction (2) is compressed. // The first compaction (2) is compressed.
for (int num = 0; num < 2; num++) { for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++; key_idx++;
} }
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
} }
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 2 * 0.9); ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 2 * 0.9);
// The second compaction (4) is compressed // The second compaction (4) is compressed
for (int num = 0; num < 2; num++) { for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++; key_idx++;
} }
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
} }
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 4 * 0.9); ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 4 * 0.9);
// The third compaction (2 4) is compressed since this time it is // The third compaction (2 4) is compressed since this time it is
// (1 1 3.2) and 3.2/5.2 doesn't reach ratio. // (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
for (int num = 0; num < 2; num++) { for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++; key_idx++;
} }
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
} }
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 6 * 0.9); ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 6 * 0.9);
// When we start for the compaction up to (2 4 8), the latest // When we start for the compaction up to (2 4 8), the latest
// compressed is not compressed. // compressed is not compressed.
for (int num = 0; num < 8; num++) { for (int num = 0; num < 8; num++) {
// Write 120KB (12 values, each 10K) // Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) { for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++; key_idx++;
} }
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
} }
ASSERT_GT((int) dbfull()->TEST_GetLevel0TotalSize(), ASSERT_GT((int)dbfull()->TEST_GetLevel0TotalSize(),
120000 * 12 * 0.8 + 110000 * 2); 110000 * 11 * 0.8 + 110000 * 2);
} }
TEST(DBTest, UniversalCompactionCompressRatio2) { TEST(DBTest, UniversalCompactionCompressRatio2) {
@ -2666,8 +2693,8 @@ TEST(DBTest, UniversalCompactionCompressRatio2) {
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
} }
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(),
120000 * 12 * 0.8 + 110000 * 2); 120000 * 12 * 0.8 + 120000 * 2);
} }
#endif #endif

@ -10,6 +10,7 @@
#include "db/memtable.h" #include "db/memtable.h"
#include <memory> #include <memory>
#include <algorithm>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/merge_context.h" #include "db/merge_context.h"
@ -31,6 +32,8 @@ namespace rocksdb {
MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
: comparator_(cmp), : comparator_(cmp),
refs_(0), refs_(0),
kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)),
kWriteBufferSize(options.write_buffer_size),
arena_(options.arena_block_size), arena_(options.arena_block_size),
table_(options.memtable_factory->CreateMemTableRep( table_(options.memtable_factory->CreateMemTableRep(
comparator_, &arena_, options.prefix_extractor.get())), comparator_, &arena_, options.prefix_extractor.get())),
@ -42,7 +45,11 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options)
mem_logfile_number_(0), mem_logfile_number_(0),
locks_(options.inplace_update_support ? options.inplace_update_num_locks locks_(options.inplace_update_support ? options.inplace_update_num_locks
: 0), : 0),
prefix_extractor_(options.prefix_extractor.get()) { prefix_extractor_(options.prefix_extractor.get()),
should_flush_(ShouldFlushNow()) {
// if should_flush_ == true without an entry inserted, something must have
// gone wrong already.
assert(!should_flush_);
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits, prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
options.memtable_prefix_bloom_probes)); options.memtable_prefix_bloom_probes));
@ -57,6 +64,60 @@ size_t MemTable::ApproximateMemoryUsage() {
return arena_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage(); return arena_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage();
} }
bool MemTable::ShouldFlushNow() const {
// In a lot of times, we cannot allocate arena blocks that exactly matches the
// buffer size. Thus we have to decide if we should over-allocate or
// under-allocate.
// This constant avariable can be interpreted as: if we still have more than
// "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
// allocate one more block.
const double kAllowOverAllocationRatio = 0.6;
// If arena still have room for new block allocation, we can safely say it
// shouldn't flush.
auto allocated_memory =
table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();
if (allocated_memory + kArenaBlockSize * kAllowOverAllocationRatio <
kWriteBufferSize) {
return false;
}
// if user keeps adding entries that exceeds kWriteBufferSize, we need to
// flush
// earlier even though we still have much available memory left.
if (allocated_memory > kWriteBufferSize * (1 + kAllowOverAllocationRatio)) {
return true;
}
// In this code path, Arena has already allocated its "last block", which
// means the total allocatedmemory size is either:
// (1) "moderately" over allocated the memory (no more than `0.4 * arena
// block size`. Or,
// (2) the allocated memory is less than write buffer size, but we'll stop
// here since if we allocate a new arena block, we'll over allocate too much
// more (half of the arena block size) memory.
//
// In either case, to avoid over-allocate, the last block will stop allocation
// when its usage reaches a certain ratio, which we carefully choose "0.75
// full" as the stop condition because it addresses the following issue with
// great simplicity: What if the next inserted entry's size is
// bigger than AllocatedAndUnused()?
//
// The answer is: if the entry size is also bigger than 0.25 *
// kArenaBlockSize, a dedicated block will be allocated for it; otherwise
// arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
// and regular block. In either case, we *overly* over-allocated.
//
// Therefore, setting the last block to be at most "0.75 full" avoids both
// cases.
//
// NOTE: the average percentage of waste space of this approach can be counted
// as: "arena block size * 0.25 / write buffer size". User who specify a small
// write buffer size and/or big arena block size may suffer.
return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}
int MemTable::KeyComparator::operator()(const char* prefix_len_key1, int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const { const char* prefix_len_key2) const {
// Internal keys are encoded as length-prefixed strings. // Internal keys are encoded as length-prefixed strings.
@ -198,6 +259,8 @@ void MemTable::Add(SequenceNumber s, ValueType type,
if (first_seqno_ == 0) { if (first_seqno_ == 0) {
first_seqno_ = s; first_seqno_ = s;
} }
should_flush_ = ShouldFlushNow();
} }
// Callback from MemTable::Get() // Callback from MemTable::Get()
@ -460,13 +523,16 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
} }
} }
RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED); RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED);
should_flush_ = ShouldFlushNow();
return true; return true;
} else if (status == UpdateStatus::UPDATED) { } else if (status == UpdateStatus::UPDATED) {
Add(seq, kTypeValue, key, Slice(str_value)); Add(seq, kTypeValue, key, Slice(str_value));
RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN); RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN);
should_flush_ = ShouldFlushNow();
return true; return true;
} else if (status == UpdateStatus::UPDATE_FAILED) { } else if (status == UpdateStatus::UPDATE_FAILED) {
// No action required. Return. // No action required. Return.
should_flush_ = ShouldFlushNow();
return true; return true;
} }
} }

@ -64,6 +64,10 @@ class MemTable {
// operations on the same MemTable. // operations on the same MemTable.
size_t ApproximateMemoryUsage(); size_t ApproximateMemoryUsage();
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldFlush() const { return should_flush_; }
// Return an iterator that yields the contents of the memtable. // Return an iterator that yields the contents of the memtable.
// //
// The caller must ensure that the underlying MemTable remains live // The caller must ensure that the underlying MemTable remains live
@ -161,13 +165,20 @@ class MemTable {
return comparator_.comparator; return comparator_.comparator;
} }
const Arena& TEST_GetArena() const { return arena_; }
private: private:
// Dynamically check if we can add more incoming entries.
bool ShouldFlushNow() const;
friend class MemTableIterator; friend class MemTableIterator;
friend class MemTableBackwardIterator; friend class MemTableBackwardIterator;
friend class MemTableList; friend class MemTableList;
KeyComparator comparator_; KeyComparator comparator_;
int refs_; int refs_;
const size_t kArenaBlockSize;
const size_t kWriteBufferSize;
Arena arena_; Arena arena_;
unique_ptr<MemTableRep> table_; unique_ptr<MemTableRep> table_;
@ -199,6 +210,9 @@ class MemTable {
const SliceTransform* const prefix_extractor_; const SliceTransform* const prefix_extractor_;
std::unique_ptr<DynamicBloom> prefix_bloom_; std::unique_ptr<DynamicBloom> prefix_bloom_;
// a flag indicating if a memtable has met the criteria to flush
bool should_flush_;
}; };
extern const char* EncodeKey(std::string* scratch, const Slice& target); extern const char* EncodeKey(std::string* scratch, const Slice& target);

@ -42,6 +42,7 @@ Arena::~Arena() {
char* Arena::AllocateFallback(size_t bytes, bool aligned) { char* Arena::AllocateFallback(size_t bytes, bool aligned) {
if (bytes > kBlockSize / 4) { if (bytes > kBlockSize / 4) {
++irregular_block_num;
// Object is more than a quarter of our block size. Allocate it separately // Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes. // to avoid wasting too much space in leftover bytes.
return AllocateNewBlock(bytes); return AllocateNewBlock(bytes);

@ -46,12 +46,19 @@ class Arena {
size_t MemoryAllocatedBytes() const { return blocks_memory_; } size_t MemoryAllocatedBytes() const { return blocks_memory_; }
size_t AllocatedAndUnused() const { return alloc_bytes_remaining_; }
// If an allocation is too big, we'll allocate an irregular block with the
// same size of that allocation.
virtual size_t IrregularBlockNum() const { return irregular_block_num; }
private: private:
// Number of bytes allocated in one block // Number of bytes allocated in one block
const size_t kBlockSize; const size_t kBlockSize;
// Array of new[] allocated memory blocks // Array of new[] allocated memory blocks
typedef std::vector<char*> Blocks; typedef std::vector<char*> Blocks;
Blocks blocks_; Blocks blocks_;
size_t irregular_block_num = 0;
// Stats for current active block. // Stats for current active block.
// For each block, we allocate aligned memory chucks from one end and // For each block, we allocate aligned memory chucks from one end and

Loading…
Cancel
Save