Merge branch 'master' into columnfamilies

Conflicts:
	db/db_impl.cc
	db/db_test.cc
	tools/db_stress.cc
main
Igor Canadi 11 years ago
commit e1f56e12cf
  1. 8
      db/db_impl.cc
  2. 103
      db/db_test.cc
  3. 68
      db/memtable.cc
  4. 14
      db/memtable.h
  5. 6
      tools/db_crashtest.py
  6. 6
      tools/db_crashtest2.py
  7. 9
      tools/db_stress.cc
  8. 1
      util/arena.cc
  9. 7
      util/arena.h

@ -960,8 +960,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence,
// no need to refcount since client still doesn't have access
// to the DB and can not drop column families while we iterate
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->mem()->ApproximateMemoryUsage() >
cfd->options()->write_buffer_size) {
if (cfd->mem()->ShouldFlush()) {
// If this asserts, it means that InsertInto failed in
// filtering updates to already-flushed column families
assert(cfd->GetLogNumber() <= log_number);
@ -1764,7 +1763,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
if (bg_flush_scheduled_ < options_.max_background_flushes) {
bg_flush_scheduled_++;
env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH);
} else {
} else if (options_.max_background_flushes > 0) {
bg_schedule_needed_ = true;
}
}
@ -3636,8 +3635,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) {
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
delayed_writes_++;
} else if (!force && (cfd->mem()->ApproximateMemoryUsage() <=
cfd->options()->write_buffer_size)) {
} else if (!force && !cfd->mem()->ShouldFlush()) {
// There is room in current memtable
if (allow_delay) {
DelayLoggingAndReset();

@ -8,6 +8,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm>
#include <iostream>
#include <set>
#include <unistd.h>
#include <unordered_set>
@ -23,20 +24,20 @@
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/perf_context.h"
#include "table/plain_table_factory.h"
#include "rocksdb/slice.h"
#include "rocksdb/slice_transform.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h"
#include "util/hash.h"
#include "util/hash_linklist_rep.h"
#include "utilities/merge_operators.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/statistics.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
namespace rocksdb {
@ -976,6 +977,28 @@ void VerifyTableProperties(DB* db, uint64_t expected_entries_size) {
ASSERT_EQ(expected_entries_size, sum);
}
std::unordered_map<std::string, size_t> GetMemoryUsage(MemTable* memtable) {
const auto& arena = memtable->TEST_GetArena();
return {{"memtable.approximate.usage", memtable->ApproximateMemoryUsage()},
{"arena.approximate.usage", arena.ApproximateMemoryUsage()},
{"arena.allocated.memory", arena.MemoryAllocatedBytes()},
{"arena.unused.bytes", arena.AllocatedAndUnused()},
{"irregular.blocks", arena.IrregularBlockNum()}};
}
void PrintMemoryUsage(const std::unordered_map<std::string, size_t>& usage) {
for (const auto& item : usage) {
std::cout << "\t" << item.first << ": " << item.second << std::endl;
}
}
void AddRandomKV(MemTable* memtable, Random* rnd, size_t arena_block_size) {
memtable->Add(0, kTypeValue, RandomString(rnd, 20) /* key */,
// make sure we will be able to generate some over sized entries
RandomString(rnd, rnd->Uniform(arena_block_size / 4) * 1.15 +
10) /* value */);
}
TEST(DBTest, Empty) {
do {
Options options = CurrentOptions();
@ -2069,7 +2092,7 @@ TEST(DBTest, NumImmutableMemTable) {
options.write_buffer_size = 1000000;
CreateAndReopenWithCF({"pikachu"}, &options);
std::string big_value(1000000, 'x');
std::string big_value(1000000 * 2, 'x');
std::string num;
SetPerfLevel(kEnableTime);;
@ -2353,6 +2376,10 @@ TEST(DBTest, CompactionTrigger) {
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
}
// TODO(kailiu) The tests on UniversalCompaction has some issues:
// 1. A lot of magic numbers ("11" or "12").
// 2. Made assumption on the memtable flush conidtions, which may change from
// time to time.
TEST(DBTest, UniversalCompactionTrigger) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
@ -2369,7 +2396,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// compaction.
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
// Write 120KB (12 values, each 10K)
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
@ -2380,7 +2407,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2403,8 +2430,8 @@ TEST(DBTest, UniversalCompactionTrigger) {
ASSERT_OK(Flush(1));
for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2414,7 +2441,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2431,8 +2458,8 @@ TEST(DBTest, UniversalCompactionTrigger) {
// generating new files at level 0.
for (int num = 0; num < options.level0_file_num_compaction_trigger - 3;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2457,7 +2484,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Stage 4:
// Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a
// new file of size 1.
for (int i = 0; i < 12; i++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2471,7 +2498,7 @@ TEST(DBTest, UniversalCompactionTrigger) {
// Stage 5:
// Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate
// a new file of size 1.
for (int i = 0; i < 12; i++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2500,8 +2527,8 @@ TEST(DBTest, UniversalCompactionSizeAmplification) {
// Generate two files in Level 0. Both files are approx the same size.
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2534,8 +2561,8 @@ TEST(DBTest, UniversalCompactionOptions) {
int key_idx = 0;
for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2573,8 +2600,8 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
for (int num = 0;
num < options.level0_file_num_compaction_trigger-1;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2584,7 +2611,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2605,8 +2632,8 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
for (int num = 0;
num < options.level0_file_num_compaction_trigger-3;
num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2616,7 +2643,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
// Generate one more file at level-0, which should trigger level-0
// compaction.
for (int i = 0; i < 12; i++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2627,7 +2654,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) {
// Stage 3:
// Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one
// more file at level-0, which should trigger level-0 compaction.
for (int i = 0; i < 12; i++) {
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000)));
key_idx++;
}
@ -2744,54 +2771,54 @@ TEST(DBTest, UniversalCompactionCompressRatio1) {
// The first compaction (2) is compressed.
for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 2 * 0.9);
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 2 * 0.9);
// The second compaction (4) is compressed
for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 4 * 0.9);
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 4 * 0.9);
// The third compaction (2 4) is compressed since this time it is
// (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 6 * 0.9);
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 6 * 0.9);
// When we start for the compaction up to (2 4 8), the latest
// compressed is not compressed.
for (int num = 0; num < 8; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
// Write 110KB (11 values, each 10K)
for (int i = 0; i < 11; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_GT((int) dbfull()->TEST_GetLevel0TotalSize(),
120000 * 12 * 0.8 + 110000 * 2);
ASSERT_GT((int)dbfull()->TEST_GetLevel0TotalSize(),
110000 * 11 * 0.8 + 110000 * 2);
}
TEST(DBTest, UniversalCompactionCompressRatio2) {
@ -2817,8 +2844,8 @@ TEST(DBTest, UniversalCompactionCompressRatio2) {
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(),
120000 * 12 * 0.8 + 110000 * 2);
ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(),
120000 * 12 * 0.8 + 120000 * 2);
}
#endif

@ -10,6 +10,7 @@
#include "db/memtable.h"
#include <memory>
#include <algorithm>
#include "db/dbformat.h"
#include "db/merge_context.h"
@ -32,6 +33,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
const Options& options)
: comparator_(cmp),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)),
kWriteBufferSize(options.write_buffer_size),
arena_(options.arena_block_size),
table_(options.memtable_factory->CreateMemTableRep(
comparator_, &arena_, options.prefix_extractor.get())),
@ -42,7 +45,11 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
mem_next_logfile_number_(0),
locks_(options.inplace_update_support ? options.inplace_update_num_locks
: 0),
prefix_extractor_(options.prefix_extractor.get()) {
prefix_extractor_(options.prefix_extractor.get()),
should_flush_(ShouldFlushNow()) {
// if should_flush_ == true without an entry inserted, something must have
// gone wrong already.
assert(!should_flush_);
if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) {
prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits,
options.memtable_prefix_bloom_probes));
@ -57,6 +64,60 @@ size_t MemTable::ApproximateMemoryUsage() {
return arena_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage();
}
bool MemTable::ShouldFlushNow() const {
// In a lot of times, we cannot allocate arena blocks that exactly matches the
// buffer size. Thus we have to decide if we should over-allocate or
// under-allocate.
// This constant avariable can be interpreted as: if we still have more than
// "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
// allocate one more block.
const double kAllowOverAllocationRatio = 0.6;
// If arena still have room for new block allocation, we can safely say it
// shouldn't flush.
auto allocated_memory =
table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes();
if (allocated_memory + kArenaBlockSize * kAllowOverAllocationRatio <
kWriteBufferSize) {
return false;
}
// if user keeps adding entries that exceeds kWriteBufferSize, we need to
// flush
// earlier even though we still have much available memory left.
if (allocated_memory > kWriteBufferSize * (1 + kAllowOverAllocationRatio)) {
return true;
}
// In this code path, Arena has already allocated its "last block", which
// means the total allocatedmemory size is either:
// (1) "moderately" over allocated the memory (no more than `0.4 * arena
// block size`. Or,
// (2) the allocated memory is less than write buffer size, but we'll stop
// here since if we allocate a new arena block, we'll over allocate too much
// more (half of the arena block size) memory.
//
// In either case, to avoid over-allocate, the last block will stop allocation
// when its usage reaches a certain ratio, which we carefully choose "0.75
// full" as the stop condition because it addresses the following issue with
// great simplicity: What if the next inserted entry's size is
// bigger than AllocatedAndUnused()?
//
// The answer is: if the entry size is also bigger than 0.25 *
// kArenaBlockSize, a dedicated block will be allocated for it; otherwise
// arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
// and regular block. In either case, we *overly* over-allocated.
//
// Therefore, setting the last block to be at most "0.75 full" avoids both
// cases.
//
// NOTE: the average percentage of waste space of this approach can be counted
// as: "arena block size * 0.25 / write buffer size". User who specify a small
// write buffer size and/or big arena block size may suffer.
return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
}
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const {
// Internal keys are encoded as length-prefixed strings.
@ -198,6 +259,8 @@ void MemTable::Add(SequenceNumber s, ValueType type,
if (first_seqno_ == 0) {
first_seqno_ = s;
}
should_flush_ = ShouldFlushNow();
}
// Callback from MemTable::Get()
@ -460,13 +523,16 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
}
}
RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED);
should_flush_ = ShouldFlushNow();
return true;
} else if (status == UpdateStatus::UPDATED) {
Add(seq, kTypeValue, key, Slice(str_value));
RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN);
should_flush_ = ShouldFlushNow();
return true;
} else if (status == UpdateStatus::UPDATE_FAILED) {
// No action required. Return.
should_flush_ = ShouldFlushNow();
return true;
}
}

@ -64,6 +64,10 @@ class MemTable {
// operations on the same MemTable.
size_t ApproximateMemoryUsage();
// This method heuristically determines if the memtable should continue to
// host more data.
bool ShouldFlush() const { return should_flush_; }
// Return an iterator that yields the contents of the memtable.
//
// The caller must ensure that the underlying MemTable remains live
@ -153,13 +157,20 @@ class MemTable {
return comparator_.comparator;
}
const Arena& TEST_GetArena() const { return arena_; }
private:
// Dynamically check if we can add more incoming entries.
bool ShouldFlushNow() const;
friend class MemTableIterator;
friend class MemTableBackwardIterator;
friend class MemTableList;
KeyComparator comparator_;
int refs_;
const size_t kArenaBlockSize;
const size_t kWriteBufferSize;
Arena arena_;
unique_ptr<MemTableRep> table_;
@ -187,6 +198,9 @@ class MemTable {
const SliceTransform* const prefix_extractor_;
std::unique_ptr<DynamicBloom> prefix_bloom_;
// a flag indicating if a memtable has met the criteria to flush
bool should_flush_;
};
extern const char* EncodeKey(std::string* scratch, const Slice& target);

@ -84,9 +84,9 @@ def main(argv):
--cache_size=1048576
--open_files=500000
--verify_checksum=1
--sync=%s
--sync=0
--disable_wal=0
--disable_data_sync=%s
--disable_data_sync=1
--target_file_size_base=2097152
--target_file_size_multiplier=2
--max_write_buffer_number=3
@ -101,8 +101,6 @@ def main(argv):
tempfile.mkdtemp(),
random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 1)))
child = subprocess.Popen([cmd],

@ -98,9 +98,9 @@ def main(argv):
--cache_size=1048576
--open_files=500000
--verify_checksum=1
--sync=%s
--sync=0
--disable_wal=0
--disable_data_sync=%s
--disable_data_sync=1
--target_file_size_base=2097152
--target_file_size_multiplier=2
--max_write_buffer_number=3
@ -117,8 +117,6 @@ def main(argv):
random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 1),
random.randint(0, 1),
additional_opts))
print "Running:" + cmd + "\n"

@ -1232,10 +1232,17 @@ class StressTest {
for (size_t cf = 0; cf < column_families_.size(); ++cf) {
if (!thread->rand.OneIn(2)) {
// Use iterator to verify this range
options.prefix_seek = FLAGS_prefix_size > 0;
unique_ptr<Iterator> iter(
db_->NewIterator(options, column_families_[cf]));
iter->Seek(Key(start));
for (long i = start; i < end; i++) {
// TODO(ljin): update "long" to uint64_t
// Reseek when the prefix changes
if (i % (static_cast<int64_t>(1) << 8 * (8 - FLAGS_prefix_size)) ==
0) {
iter->Seek(Key(i));
}
std::string from_db;
std::string keystr = Key(i);
Slice k = keystr;
@ -1266,10 +1273,10 @@ class StressTest {
std::string keystr = Key(i);
Slice k = keystr;
Status s = db_->Get(options, column_families_[cf], k, &from_db);
VerifyValue(cf, i, options, shared, from_db, s, true);
if (from_db.length()) {
PrintKeyValue(cf, i, from_db.data(), from_db.length());
}
VerifyValue(cf, i, options, shared, from_db, s, true);
}
}
}

@ -42,6 +42,7 @@ Arena::~Arena() {
char* Arena::AllocateFallback(size_t bytes, bool aligned) {
if (bytes > kBlockSize / 4) {
++irregular_block_num;
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
return AllocateNewBlock(bytes);

@ -46,12 +46,19 @@ class Arena {
size_t MemoryAllocatedBytes() const { return blocks_memory_; }
size_t AllocatedAndUnused() const { return alloc_bytes_remaining_; }
// If an allocation is too big, we'll allocate an irregular block with the
// same size of that allocation.
virtual size_t IrregularBlockNum() const { return irregular_block_num; }
private:
// Number of bytes allocated in one block
const size_t kBlockSize;
// Array of new[] allocated memory blocks
typedef std::vector<char*> Blocks;
Blocks blocks_;
size_t irregular_block_num = 0;
// Stats for current active block.
// For each block, we allocate aligned memory chucks from one end and

Loading…
Cancel
Save