From a09ee1069d927b61ffd0d0e36b45b91e15275e7c Mon Sep 17 00:00:00 2001 From: Schalk-Willem Kruger Date: Fri, 10 Jan 2014 17:33:56 -0800 Subject: [PATCH] Improve RocksDB "get" performance by computing merge result in memtable Summary: Added an option (max_successive_merges) that can be used to specify the maximum number of successive merge operations on a key in the memtable. This can be used to improve performance of the "get" operation. If many successive merge operations are performed on a key, the performance of "get" operations on the key deteriorates, as the value has to be computed for each "get" operation by applying all the successive merge operations. FB Task ID: #3428853 Test Plan: make all check db_bench --benchmarks=readrandommergerandom counter_stress_test Reviewers: haobo, vamsi, dhruba, sdong Reviewed By: haobo CC: zshao Differential Revision: https://reviews.facebook.net/D14991 --- db/db_bench.cc | 97 ++++++++++++++++++++++++++++++- db/memtable.cc | 33 +++++++++++ db/memtable.h | 5 ++ db/merge_test.cc | 118 +++++++++++++++++++++++++++++++++++++- db/write_batch.cc | 58 ++++++++++++++++++- include/rocksdb/options.h | 11 ++++ util/options.cc | 5 +- 7 files changed, 320 insertions(+), 7 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index eb5d7cb42..e0ba58281 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -94,6 +94,8 @@ DEFINE_string(benchmarks, "\tmergerandom -- same as updaterandom/appendrandom using merge" " operator. " "Must be used with merge_operator\n" + "\treadrandommergerandom -- perform N random read-or-merge " + "operations. Must be used with merge_operator\n" "\tseekrandom -- N random seeks\n" "\tcrc32c -- repeated crc32c of 4K of data\n" "\tacquireload -- load N*1000 times\n" @@ -112,6 +114,11 @@ DEFINE_int64(numdistinct, 1000, "read/write on fewer keys so that gets are more likely to find the" " key and puts are more likely to update the same key"); +DEFINE_int64(merge_keys, -1, + "Number of distinct keys to use for MergeRandom and " + "ReadRandomMergeRandom. " + "If negative, there will be FLAGS_num keys."); + DEFINE_int64(reads, -1, "Number of read operations to do. " "If negative, do FLAGS_num reads."); @@ -297,6 +304,11 @@ DEFINE_int32(readwritepercent, 90, "Ratio of reads to reads/writes (expressed" "default value 90 means 90% operations out of all reads and writes" " operations are reads. In other words, 9 gets for every 1 put."); +DEFINE_int32(mergereadpercent, 70, "Ratio of merges to merges&reads (expressed" + " as percentage) for the ReadRandomMergeRandom workload. The" + " default value 70 means 70% out of all read and merge operations" + " are merges. In other words, 7 merges for every 3 gets."); + DEFINE_int32(deletepercent, 2, "Percentage of deletes out of reads/writes/" "deletes (used in RandomWithVerify only). RandomWithVerify " "calculates writepercent as (100 - FLAGS_readwritepercent - " @@ -446,6 +458,9 @@ DEFINE_uint64(bytes_per_sync, rocksdb::Options().bytes_per_sync, DEFINE_bool(filter_deletes, false, " On true, deletes use bloom-filter and drop" " the delete if key not present"); +DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge" + " operations on a key in the memtable"); + static bool ValidatePrefixSize(const char* flagname, int32_t value) { if (value < 0 || value>=2000000000) { fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n", @@ -784,6 +799,7 @@ class Benchmark { long long reads_; long long writes_; long long readwrites_; + long long merge_keys_; int heap_counter_; char keyFormat_[100]; // will contain the format of key. e.g "%016d" void PrintHeader() { @@ -958,6 +974,7 @@ class Benchmark { readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num : ((FLAGS_writes > FLAGS_reads) ? FLAGS_writes : FLAGS_reads) ), + merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), heap_counter_(0) { std::vector files; FLAGS_env->GetChildren(FLAGS_db, &files); @@ -985,8 +1002,8 @@ class Benchmark { } unique_ptr GenerateKeyFromInt(long long v, const char* suffix = "") { - unique_ptr keyInStr(new char[kMaxKeySize]); - snprintf(keyInStr.get(), kMaxKeySize, keyFormat_, v, suffix); + unique_ptr keyInStr(new char[kMaxKeySize + 1]); + snprintf(keyInStr.get(), kMaxKeySize + 1, keyFormat_, v, suffix); return keyInStr; } @@ -1087,6 +1104,14 @@ class Benchmark { method = &Benchmark::ReadWhileWriting; } else if (name == Slice("readrandomwriterandom")) { method = &Benchmark::ReadRandomWriteRandom; + } else if (name == Slice("readrandommergerandom")) { + if (FLAGS_merge_operator.empty()) { + fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n", + name.ToString().c_str()); + method = nullptr; + } else { + method = &Benchmark::ReadRandomMergeRandom; + } } else if (name == Slice("updaterandom")) { method = &Benchmark::UpdateRandom; } else if (name == Slice("appendrandom")) { @@ -1421,6 +1446,7 @@ class Benchmark { FLAGS_merge_operator.c_str()); exit(1); } + options.max_successive_merges = FLAGS_max_successive_merges; // set universal style compaction configurations, if applicable if (FLAGS_universal_size_ratio != 0) { @@ -2375,13 +2401,16 @@ class Benchmark { // // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8 // to simulate random additions over 64-bit integers using merge. + // + // The number of merges on the same key can be controlled by adjusting + // FLAGS_merge_keys. void MergeRandom(ThreadState* thread) { RandomGenerator gen; // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { - const long long k = thread->rand.Next() % FLAGS_num; + const long long k = thread->rand.Next() % merge_keys_; unique_ptr key = GenerateKeyFromInt(k); Status s = db_->Merge(write_options_, key.get(), @@ -2400,6 +2429,68 @@ class Benchmark { thread->stats.AddMessage(msg); } + // Read and merge random keys. The amount of reads and merges are controlled + // by adjusting FLAGS_num and FLAGS_mergereadpercent. The number of distinct + // keys (and thus also the number of reads and merges on the same key) can be + // adjusted with FLAGS_merge_keys. + // + // As with MergeRandom, the merge operator to use should be defined by + // FLAGS_merge_operator. + void ReadRandomMergeRandom(ThreadState* thread) { + ReadOptions options(FLAGS_verify_checksum, true); + RandomGenerator gen; + std::string value; + long long num_hits = 0; + long long num_gets = 0; + long long num_merges = 0; + size_t max_length = 0; + + // the number of iterations is the larger of read_ or write_ + Duration duration(FLAGS_duration, readwrites_); + + while (!duration.Done(1)) { + const long long k = thread->rand.Next() % merge_keys_; + unique_ptr key = GenerateKeyFromInt(k); + + bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent; + + if (do_merge) { + Status s = db_->Merge(write_options_, key.get(), + gen.Generate(value_size_)); + if (!s.ok()) { + fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); + exit(1); + } + + num_merges++; + + } else { + Status s = db_->Get(options, key.get(), &value); + if (value.length() > max_length) + max_length = value.length(); + + if (!s.ok() && !s.IsNotFound()) { + fprintf(stderr, "get error: %s\n", s.ToString().c_str()); + // we continue after error rather than exiting so that we can + // find more errors if any + } else if (!s.IsNotFound()) { + num_hits++; + } + + num_gets++; + + } + + thread->stats.FinishedSingleOp(db_); + } + char msg[100]; + snprintf(msg, sizeof(msg), + "(reads:%lld merges:%lld total:%lld hits:%lld maxlength:%zu)", + num_gets, num_merges, readwrites_, num_hits, max_length); + thread->stats.AddMessage(msg); + } + + void Compact(ThreadState* thread) { db_->CompactRange(nullptr, nullptr); } diff --git a/db/memtable.cc b/db/memtable.cc index 675a314ff..7881ce5bd 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -326,4 +326,37 @@ bool MemTable::Update(SequenceNumber seq, ValueType type, // Key doesn't exist return false; } + +size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { + Slice memkey = key.memtable_key(); + + // A total ordered iterator is costly for some memtablerep (prefix aware + // reps). By passing in the user key, we allow efficient iterator creation. + // The iterator only needs to be ordered within the same user key. + std::shared_ptr iter( + table_->GetIterator(key.user_key())); + iter->Seek(memkey.data()); + + size_t num_successive_merges = 0; + + for (; iter->Valid(); iter->Next()) { + const char* entry = iter->key(); + uint32_t key_length; + const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (!comparator_.comparator.user_comparator()->Compare( + Slice(iter_key_ptr, key_length - 8), key.user_key()) == 0) { + break; + } + + const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8); + if (static_cast(tag & 0xff) != kTypeMerge) { + break; + } + + ++num_successive_merges; + } + + return num_successive_merges; +} + } // namespace rocksdb diff --git a/db/memtable.h b/db/memtable.h index 79d5ba2d0..12ccf3d37 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -107,6 +107,11 @@ class MemTable { const Slice& key, const Slice& value); + // Returns the number of successive merge entries starting from the newest + // entry for the key up to the last non-merge entry or last entry for the + // key in the memtable. + size_t CountSuccessiveMergeEntries(const LookupKey& key); + // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } diff --git a/db/merge_test.cc b/db/merge_test.cc index 0c14aff2c..38acd8b29 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -14,6 +14,7 @@ #include "rocksdb/merge_operator.h" #include "db/dbformat.h" #include "db/db_impl.h" +#include "db/write_batch_internal.h" #include "utilities/merge_operators.h" #include "util/testharness.h" #include "utilities/utility_db.h" @@ -21,13 +22,52 @@ using namespace std; using namespace rocksdb; +namespace { + int numMergeOperatorCalls; -std::shared_ptr OpenDb(const string& dbname, const bool ttl = false) { + void resetNumMergeOperatorCalls() { + numMergeOperatorCalls = 0; + } +} + +class CountMergeOperator : public AssociativeMergeOperator { + public: + CountMergeOperator() { + mergeOperator_ = MergeOperators::CreateUInt64AddOperator(); + } + + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override { + ++numMergeOperatorCalls; + return mergeOperator_->PartialMerge( + key, + *existing_value, + value, + new_value, + logger); + } + + virtual const char* Name() const override { + return "UInt64AddOperator"; + } + + private: + std::shared_ptr mergeOperator_; +}; + +std::shared_ptr OpenDb( + const string& dbname, + const bool ttl = false, + const unsigned max_successive_merges = 0) { DB* db; StackableDB* sdb; Options options; options.create_if_missing = true; - options.merge_operator = MergeOperators::CreateUInt64AddOperator(); + options.merge_operator = std::make_shared(); + options.max_successive_merges = max_successive_merges; Status s; DestroyDB(dbname, Options()); if (ttl) { @@ -243,6 +283,67 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { } } +void testSuccessiveMerge( + Counters& counters, int max_num_merges, int num_merges) { + + counters.assert_remove("z"); + uint64_t sum = 0; + + for (int i = 1; i <= num_merges; ++i) { + resetNumMergeOperatorCalls(); + counters.assert_add("z", i); + sum += i; + + if (i % (max_num_merges + 1) == 0) { + assert(numMergeOperatorCalls == max_num_merges + 1); + } else { + assert(numMergeOperatorCalls == 0); + } + + resetNumMergeOperatorCalls(); + assert(counters.assert_get("z") == sum); + assert(numMergeOperatorCalls == i % (max_num_merges + 1)); + } +} + +void testSingleBatchSuccessiveMerge( + DB* db, + int max_num_merges, + int num_merges) { + assert(num_merges > max_num_merges); + + Slice key("BatchSuccessiveMerge"); + uint64_t merge_value = 1; + Slice merge_value_slice((char *)&merge_value, sizeof(merge_value)); + + // Create the batch + WriteBatch batch; + for (int i = 0; i < num_merges; ++i) { + batch.Merge(key, merge_value_slice); + } + + // Apply to memtable and count the number of merges + resetNumMergeOperatorCalls(); + { + Status s = db->Write(WriteOptions(), &batch); + assert(s.ok()); + } + assert(numMergeOperatorCalls == + num_merges - (num_merges % (max_num_merges + 1))); + + // Get the value + resetNumMergeOperatorCalls(); + string get_value_str; + { + Status s = db->Get(ReadOptions(), key, &get_value_str); + assert(s.ok()); + } + assert(get_value_str.size() == sizeof(uint64_t)); + uint64_t get_value = DecodeFixed64(&get_value_str[0]); + assert(get_value == num_merges * merge_value); + assert(numMergeOperatorCalls == (num_merges % (max_num_merges + 1))); +} + void runTest(int argc, const string& dbname, const bool use_ttl = false) { auto db = OpenDb(dbname, use_ttl); @@ -265,6 +366,19 @@ void runTest(int argc, const string& dbname, const bool use_ttl = false) { } DestroyDB(dbname, Options()); + db.reset(); + + { + cout << "Test merge in memtable... \n"; + unsigned maxMerge = 5; + auto db = OpenDb(dbname, use_ttl, maxMerge); + MergeBasedCounters counters(db, 0); + testCounters(counters, db.get(), compact); + testSuccessiveMerge(counters, maxMerge, maxMerge * 2); + testSingleBatchSuccessiveMerge(db.get(), 5, 7); + DestroyDB(dbname, Options()); + } + } int main(int argc, char *argv[]) { diff --git a/db/write_batch.cc b/db/write_batch.cc index c04930bbf..2cfc8bd7d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -21,6 +21,7 @@ #include "rocksdb/write_batch.h" #include "rocksdb/options.h" +#include "rocksdb/merge_operator.h" #include "db/dbformat.h" #include "db/db_impl.h" #include "db/memtable.h" @@ -203,7 +204,62 @@ class MemTableInserter : public WriteBatch::Handler { sequence_++; } virtual void Merge(const Slice& key, const Slice& value) { - mem_->Add(sequence_, kTypeMerge, key, value); + bool perform_merge = false; + + if (options_->max_successive_merges > 0 && db_ != nullptr) { + LookupKey lkey(key, sequence_); + + // Count the number of successive merges at the head + // of the key in the memtable + size_t num_merges = mem_->CountSuccessiveMergeEntries(lkey); + + if (num_merges >= options_->max_successive_merges) { + perform_merge = true; + } + } + + if (perform_merge) { + // 1) Get the existing value + std::string get_value; + + // Pass in the sequence number so that we also include previous merge + // operations in the same batch. + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions read_options; + read_options.snapshot = &read_from_snapshot; + + db_->Get(read_options, key, &get_value); + Slice get_value_slice = Slice(get_value); + + // 2) Apply this merge + auto merge_operator = options_->merge_operator.get(); + assert(merge_operator); + + std::deque operands; + operands.push_front(value.ToString()); + std::string new_value; + if (!merge_operator->FullMerge(key, + &get_value_slice, + operands, + &new_value, + options_->info_log.get())) { + // Failed to merge! + RecordTick(options_->statistics.get(), NUMBER_MERGE_FAILURES); + + // Store the delta in memtable + perform_merge = false; + } else { + // 3) Add value to memtable + mem_->Add(sequence_, kTypeValue, key, new_value); + } + } + + if (!perform_merge) { + // Add merge operator to memtable + mem_->Add(sequence_, kTypeMerge, key, value); + } + sequence_++; } virtual void Delete(const Slice& key) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b7eaff37d..b84bdcf38 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -643,6 +643,17 @@ struct Options { // Number of locks used for inplace update // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; + + // Maximum number of successive merge operations on a key in the memtable. + // + // When a merge operation is added to the memtable and the maximum number of + // successive merges is reached, the value of the key will be calculated and + // inserted into the memtable instead of the merge operation. This will + // ensure that there are never more than max_successive_merges merge + // operations in the memtable. + // + // Default: 0 (disabled) + size_t max_successive_merges; }; // diff --git a/util/options.cc b/util/options.cc index 198d55384..64cabc8ca 100644 --- a/util/options.cc +++ b/util/options.cc @@ -101,7 +101,8 @@ Options::Options() table_factory( std::shared_ptr(new BlockBasedTableFactory())), inplace_update_support(false), - inplace_update_num_locks(10000) { + inplace_update_num_locks(10000), + max_successive_merges(0) { assert(memtable_factory.get() != nullptr); } @@ -292,6 +293,8 @@ Options::Dump(Logger* log) const inplace_update_support); Log(log, " Options.inplace_update_num_locks: %zd", inplace_update_num_locks); + Log(log, " Options.max_successive_merges: %zd", + max_successive_merges); } // Options::Dump //