diff --git a/db/db_impl.cc b/db/db_impl.cc index 1178d0ea4..9e36b500d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -960,8 +960,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, // no need to refcount since client still doesn't have access // to the DB and can not drop column families while we iterate for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->mem()->ApproximateMemoryUsage() > - cfd->options()->write_buffer_size) { + if (cfd->mem()->ShouldFlush()) { // If this asserts, it means that InsertInto failed in // filtering updates to already-flushed column families assert(cfd->GetLogNumber() <= log_number); @@ -1764,7 +1763,7 @@ void DBImpl::MaybeScheduleFlushOrCompaction() { if (bg_flush_scheduled_ < options_.max_background_flushes) { bg_flush_scheduled_++; env_->Schedule(&DBImpl::BGWorkFlush, this, Env::Priority::HIGH); - } else { + } else if (options_.max_background_flushes > 0) { bg_schedule_needed_ = true; } } @@ -3636,8 +3635,7 @@ Status DBImpl::MakeRoomForWrite(ColumnFamilyData* cfd, bool force) { allow_delay = false; // Do not delay a single write more than once mutex_.Lock(); delayed_writes_++; - } else if (!force && (cfd->mem()->ApproximateMemoryUsage() <= - cfd->options()->write_buffer_size)) { + } else if (!force && !cfd->mem()->ShouldFlush()) { // There is room in current memtable if (allow_delay) { DelayLoggingAndReset(); diff --git a/db/db_test.cc b/db/db_test.cc index 5b97fa383..3a9381dd5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include +#include #include #include #include @@ -23,20 +24,20 @@ #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/perf_context.h" -#include "table/plain_table_factory.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" #include "rocksdb/table.h" #include "rocksdb/table_properties.h" #include "table/block_based_table_factory.h" +#include "table/plain_table_factory.h" #include "util/hash.h" #include "util/hash_linklist_rep.h" +#include "utilities/merge_operators.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/statistics.h" #include "util/testharness.h" #include "util/testutil.h" -#include "utilities/merge_operators.h" namespace rocksdb { @@ -976,6 +977,28 @@ void VerifyTableProperties(DB* db, uint64_t expected_entries_size) { ASSERT_EQ(expected_entries_size, sum); } +std::unordered_map GetMemoryUsage(MemTable* memtable) { + const auto& arena = memtable->TEST_GetArena(); + return {{"memtable.approximate.usage", memtable->ApproximateMemoryUsage()}, + {"arena.approximate.usage", arena.ApproximateMemoryUsage()}, + {"arena.allocated.memory", arena.MemoryAllocatedBytes()}, + {"arena.unused.bytes", arena.AllocatedAndUnused()}, + {"irregular.blocks", arena.IrregularBlockNum()}}; +} + +void PrintMemoryUsage(const std::unordered_map& usage) { + for (const auto& item : usage) { + std::cout << "\t" << item.first << ": " << item.second << std::endl; + } +} + +void AddRandomKV(MemTable* memtable, Random* rnd, size_t arena_block_size) { + memtable->Add(0, kTypeValue, RandomString(rnd, 20) /* key */, + // make sure we will be able to generate some over sized entries + RandomString(rnd, rnd->Uniform(arena_block_size / 4) * 1.15 + + 10) /* value */); +} + TEST(DBTest, Empty) { do { Options options = CurrentOptions(); @@ -2069,7 +2092,7 @@ TEST(DBTest, NumImmutableMemTable) { options.write_buffer_size = 1000000; CreateAndReopenWithCF({"pikachu"}, &options); - std::string big_value(1000000, 'x'); + std::string big_value(1000000 * 2, 'x'); std::string num; SetPerfLevel(kEnableTime);; @@ -2353,6 +2376,10 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); } +// TODO(kailiu) The tests on UniversalCompaction has some issues: +// 1. A lot of magic numbers ("11" or "12"). +// 2. Made assumption on the memtable flush conidtions, which may change from +// time to time. TEST(DBTest, UniversalCompactionTrigger) { Options options = CurrentOptions(); options.compaction_style = kCompactionStyleUniversal; @@ -2369,7 +2396,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // compaction. for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; num++) { - // Write 120KB (12 values, each 10K) + // Write 110KB (11 values, each 10K) for (int i = 0; i < 12; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; @@ -2380,7 +2407,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Generate one more file at level-0, which should trigger level-0 // compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2403,8 +2430,8 @@ TEST(DBTest, UniversalCompactionTrigger) { ASSERT_OK(Flush(1)); for (int num = 0; num < options.level0_file_num_compaction_trigger - 3; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2414,7 +2441,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Generate one more file at level-0, which should trigger level-0 // compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2431,8 +2458,8 @@ TEST(DBTest, UniversalCompactionTrigger) { // generating new files at level 0. for (int num = 0; num < options.level0_file_num_compaction_trigger - 3; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2457,7 +2484,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Stage 4: // Now we have 3 files at level 0, with size 4, 2.4, 2. Let's generate a // new file of size 1. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2471,7 +2498,7 @@ TEST(DBTest, UniversalCompactionTrigger) { // Stage 5: // Now we have 4 files at level 0, with size 4, 2.4, 2, 1. Let's generate // a new file of size 1. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2500,8 +2527,8 @@ TEST(DBTest, UniversalCompactionSizeAmplification) { // Generate two files in Level 0. Both files are approx the same size. for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2534,8 +2561,8 @@ TEST(DBTest, UniversalCompactionOptions) { int key_idx = 0; for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(1, Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2573,8 +2600,8 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { for (int num = 0; num < options.level0_file_num_compaction_trigger-1; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2584,7 +2611,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { // Generate one more file at level-0, which should trigger level-0 // compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2605,8 +2632,8 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { for (int num = 0; num < options.level0_file_num_compaction_trigger-3; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2616,7 +2643,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { // Generate one more file at level-0, which should trigger level-0 // compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2627,7 +2654,7 @@ TEST(DBTest, UniversalCompactionStopStyleSimilarSize) { // Stage 3: // Now we have 3 files at level 0, with size 4, 0.4, 2. Generate one // more file at level-0, which should trigger level-0 compaction. - for (int i = 0; i < 12; i++) { + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), RandomString(&rnd, 10000))); key_idx++; } @@ -2744,54 +2771,54 @@ TEST(DBTest, UniversalCompactionCompressRatio1) { // The first compaction (2) is compressed. for (int num = 0; num < 2; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 2 * 0.9); + ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 2 * 0.9); // The second compaction (4) is compressed for (int num = 0; num < 2; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 4 * 0.9); + ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 4 * 0.9); // The third compaction (2 4) is compressed since this time it is // (1 1 3.2) and 3.2/5.2 doesn't reach ratio. for (int num = 0; num < 2; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 6 * 0.9); + ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 110000 * 6 * 0.9); // When we start for the compaction up to (2 4 8), the latest // compressed is not compressed. for (int num = 0; num < 8; num++) { - // Write 120KB (12 values, each 10K) - for (int i = 0; i < 12; i++) { + // Write 110KB (11 values, each 10K) + for (int i = 0; i < 11; i++) { ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000))); key_idx++; } dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_GT((int) dbfull()->TEST_GetLevel0TotalSize(), - 120000 * 12 * 0.8 + 110000 * 2); + ASSERT_GT((int)dbfull()->TEST_GetLevel0TotalSize(), + 110000 * 11 * 0.8 + 110000 * 2); } TEST(DBTest, UniversalCompactionCompressRatio2) { @@ -2817,8 +2844,8 @@ TEST(DBTest, UniversalCompactionCompressRatio2) { dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); } - ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), - 120000 * 12 * 0.8 + 110000 * 2); + ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), + 120000 * 12 * 0.8 + 120000 * 2); } #endif diff --git a/db/memtable.cc b/db/memtable.cc index d0f54ce4b..5d3e195ce 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -10,6 +10,7 @@ #include "db/memtable.h" #include +#include #include "db/dbformat.h" #include "db/merge_context.h" @@ -32,6 +33,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) : comparator_(cmp), refs_(0), + kArenaBlockSize(OptimizeBlockSize(options.arena_block_size)), + kWriteBufferSize(options.write_buffer_size), arena_(options.arena_block_size), table_(options.memtable_factory->CreateMemTableRep( comparator_, &arena_, options.prefix_extractor.get())), @@ -42,7 +45,11 @@ MemTable::MemTable(const InternalKeyComparator& cmp, mem_next_logfile_number_(0), locks_(options.inplace_update_support ? options.inplace_update_num_locks : 0), - prefix_extractor_(options.prefix_extractor.get()) { + prefix_extractor_(options.prefix_extractor.get()), + should_flush_(ShouldFlushNow()) { + // if should_flush_ == true without an entry inserted, something must have + // gone wrong already. + assert(!should_flush_); if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom(options.memtable_prefix_bloom_bits, options.memtable_prefix_bloom_probes)); @@ -57,6 +64,60 @@ size_t MemTable::ApproximateMemoryUsage() { return arena_.ApproximateMemoryUsage() + table_->ApproximateMemoryUsage(); } +bool MemTable::ShouldFlushNow() const { + // In a lot of times, we cannot allocate arena blocks that exactly matches the + // buffer size. Thus we have to decide if we should over-allocate or + // under-allocate. + // This constant avariable can be interpreted as: if we still have more than + // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over + // allocate one more block. + const double kAllowOverAllocationRatio = 0.6; + + // If arena still have room for new block allocation, we can safely say it + // shouldn't flush. + auto allocated_memory = + table_->ApproximateMemoryUsage() + arena_.MemoryAllocatedBytes(); + + if (allocated_memory + kArenaBlockSize * kAllowOverAllocationRatio < + kWriteBufferSize) { + return false; + } + + // if user keeps adding entries that exceeds kWriteBufferSize, we need to + // flush + // earlier even though we still have much available memory left. + if (allocated_memory > kWriteBufferSize * (1 + kAllowOverAllocationRatio)) { + return true; + } + + // In this code path, Arena has already allocated its "last block", which + // means the total allocatedmemory size is either: + // (1) "moderately" over allocated the memory (no more than `0.4 * arena + // block size`. Or, + // (2) the allocated memory is less than write buffer size, but we'll stop + // here since if we allocate a new arena block, we'll over allocate too much + // more (half of the arena block size) memory. + // + // In either case, to avoid over-allocate, the last block will stop allocation + // when its usage reaches a certain ratio, which we carefully choose "0.75 + // full" as the stop condition because it addresses the following issue with + // great simplicity: What if the next inserted entry's size is + // bigger than AllocatedAndUnused()? + // + // The answer is: if the entry size is also bigger than 0.25 * + // kArenaBlockSize, a dedicated block will be allocated for it; otherwise + // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty + // and regular block. In either case, we *overly* over-allocated. + // + // Therefore, setting the last block to be at most "0.75 full" avoids both + // cases. + // + // NOTE: the average percentage of waste space of this approach can be counted + // as: "arena block size * 0.25 / write buffer size". User who specify a small + // write buffer size and/or big arena block size may suffer. + return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; +} + int MemTable::KeyComparator::operator()(const char* prefix_len_key1, const char* prefix_len_key2) const { // Internal keys are encoded as length-prefixed strings. @@ -198,6 +259,8 @@ void MemTable::Add(SequenceNumber s, ValueType type, if (first_seqno_ == 0) { first_seqno_ = s; } + + should_flush_ = ShouldFlushNow(); } // Callback from MemTable::Get() @@ -460,13 +523,16 @@ bool MemTable::UpdateCallback(SequenceNumber seq, } } RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED); + should_flush_ = ShouldFlushNow(); return true; } else if (status == UpdateStatus::UPDATED) { Add(seq, kTypeValue, key, Slice(str_value)); RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN); + should_flush_ = ShouldFlushNow(); return true; } else if (status == UpdateStatus::UPDATE_FAILED) { // No action required. Return. + should_flush_ = ShouldFlushNow(); return true; } } diff --git a/db/memtable.h b/db/memtable.h index 38602e3aa..3d392820c 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -64,6 +64,10 @@ class MemTable { // operations on the same MemTable. size_t ApproximateMemoryUsage(); + // This method heuristically determines if the memtable should continue to + // host more data. + bool ShouldFlush() const { return should_flush_; } + // Return an iterator that yields the contents of the memtable. // // The caller must ensure that the underlying MemTable remains live @@ -153,13 +157,20 @@ class MemTable { return comparator_.comparator; } + const Arena& TEST_GetArena() const { return arena_; } + private: + // Dynamically check if we can add more incoming entries. + bool ShouldFlushNow() const; + friend class MemTableIterator; friend class MemTableBackwardIterator; friend class MemTableList; KeyComparator comparator_; int refs_; + const size_t kArenaBlockSize; + const size_t kWriteBufferSize; Arena arena_; unique_ptr table_; @@ -187,6 +198,9 @@ class MemTable { const SliceTransform* const prefix_extractor_; std::unique_ptr prefix_bloom_; + + // a flag indicating if a memtable has met the criteria to flush + bool should_flush_; }; extern const char* EncodeKey(std::string* scratch, const Slice& target); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 16603cfd6..2979a0508 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -84,9 +84,9 @@ def main(argv): --cache_size=1048576 --open_files=500000 --verify_checksum=1 - --sync=%s + --sync=0 --disable_wal=0 - --disable_data_sync=%s + --disable_data_sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --max_write_buffer_number=3 @@ -101,8 +101,6 @@ def main(argv): tempfile.mkdtemp(), random.randint(0, 1), random.randint(0, 1), - random.randint(0, 1), - random.randint(0, 1), random.randint(0, 1))) child = subprocess.Popen([cmd], diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index 6a28a0ba4..68cc42cf3 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -98,9 +98,9 @@ def main(argv): --cache_size=1048576 --open_files=500000 --verify_checksum=1 - --sync=%s + --sync=0 --disable_wal=0 - --disable_data_sync=%s + --disable_data_sync=1 --target_file_size_base=2097152 --target_file_size_multiplier=2 --max_write_buffer_number=3 @@ -117,8 +117,6 @@ def main(argv): random.randint(0, 1), random.randint(0, 1), random.randint(0, 1), - random.randint(0, 1), - random.randint(0, 1), additional_opts)) print "Running:" + cmd + "\n" diff --git a/tools/db_stress.cc b/tools/db_stress.cc index e6af188d5..f14e5b2bb 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1232,10 +1232,17 @@ class StressTest { for (size_t cf = 0; cf < column_families_.size(); ++cf) { if (!thread->rand.OneIn(2)) { // Use iterator to verify this range + options.prefix_seek = FLAGS_prefix_size > 0; unique_ptr iter( db_->NewIterator(options, column_families_[cf])); iter->Seek(Key(start)); for (long i = start; i < end; i++) { + // TODO(ljin): update "long" to uint64_t + // Reseek when the prefix changes + if (i % (static_cast(1) << 8 * (8 - FLAGS_prefix_size)) == + 0) { + iter->Seek(Key(i)); + } std::string from_db; std::string keystr = Key(i); Slice k = keystr; @@ -1266,10 +1273,10 @@ class StressTest { std::string keystr = Key(i); Slice k = keystr; Status s = db_->Get(options, column_families_[cf], k, &from_db); + VerifyValue(cf, i, options, shared, from_db, s, true); if (from_db.length()) { PrintKeyValue(cf, i, from_db.data(), from_db.length()); } - VerifyValue(cf, i, options, shared, from_db, s, true); } } } diff --git a/util/arena.cc b/util/arena.cc index dffc8b88e..9b2cb82d1 100644 --- a/util/arena.cc +++ b/util/arena.cc @@ -42,6 +42,7 @@ Arena::~Arena() { char* Arena::AllocateFallback(size_t bytes, bool aligned) { if (bytes > kBlockSize / 4) { + ++irregular_block_num; // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. return AllocateNewBlock(bytes); diff --git a/util/arena.h b/util/arena.h index bfa7fe4d8..e6963355b 100644 --- a/util/arena.h +++ b/util/arena.h @@ -46,12 +46,19 @@ class Arena { size_t MemoryAllocatedBytes() const { return blocks_memory_; } + size_t AllocatedAndUnused() const { return alloc_bytes_remaining_; } + + // If an allocation is too big, we'll allocate an irregular block with the + // same size of that allocation. + virtual size_t IrregularBlockNum() const { return irregular_block_num; } + private: // Number of bytes allocated in one block const size_t kBlockSize; // Array of new[] allocated memory blocks typedef std::vector Blocks; Blocks blocks_; + size_t irregular_block_num = 0; // Stats for current active block. // For each block, we allocate aligned memory chucks from one end and