diff --git a/db/db_bench.cc b/db/db_bench.cc index 455a1469a..5e1bb4d26 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -25,6 +25,7 @@ #include "util/string_util.h" #include "util/testutil.h" #include "hdfs/env_hdfs.h" +#include "utilities/merge_operators.h" // Comma-separated list of operations to run in the specified order // Actual benchmarks: @@ -43,6 +44,9 @@ // readwhilewriting -- 1 writer, N threads doing random reads // readrandomwriterandom - N threads doing random-read, random-write // updaterandom -- N threads doing read-modify-write for random keys +// appendrandom -- N threads doing read-modify-write with growing values +// mergerandom -- same as updaterandom/appendrandom using merge operator +// -- must be used with FLAGS_merge_operator (see below) // seekrandom -- N random seeks // crc32c -- repeated crc32c of 4K of data // acquireload -- load N*1000 times @@ -349,6 +353,11 @@ static auto FLAGS_bytes_per_sync = // On true, deletes use bloom-filter and drop the delete if key not present static bool FLAGS_filter_deletes = false; +// The merge operator to use with the database. +// If a new merge operator is specified, be sure to use fresh database +// The possible merge operators are defined in utilities/merge_operators.h +static std::string FLAGS_merge_operator = ""; + namespace leveldb { // Helper for quickly generating random data. @@ -628,6 +637,7 @@ class Benchmark { int key_size_; int entries_per_batch_; WriteOptions write_options_; + std::shared_ptr merge_operator_; long reads_; long writes_; long readwrites_; @@ -769,6 +779,7 @@ class Benchmark { value_size_(FLAGS_value_size), key_size_(FLAGS_key_size), entries_per_batch_(1), + merge_operator_(nullptr), reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes), readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num : @@ -896,6 +907,16 @@ class Benchmark { method = &Benchmark::ReadRandomWriteRandom; } else if (name == Slice("updaterandom")) { method = &Benchmark::UpdateRandom; + } else if (name == Slice("appendrandom")) { + method = &Benchmark::AppendRandom; + } else if (name == Slice("mergerandom")) { + if (FLAGS_merge_operator.empty()) { + fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n", + name.ToString().c_str()); + method = nullptr; + } else { + method = &Benchmark::MergeRandom; + } } else if (name == Slice("randomwithverify")) { method = &Benchmark::RandomWithVerify; } else if (name == Slice("compact")) { @@ -1190,6 +1211,15 @@ class Benchmark { options.use_adaptive_mutex = FLAGS_use_adaptive_mutex; options.bytes_per_sync = FLAGS_bytes_per_sync; + // merge operator options + merge_operator_ = MergeOperators::CreateFromStringId(FLAGS_merge_operator); + if (merge_operator_ == nullptr && !FLAGS_merge_operator.empty()) { + fprintf(stderr, "invalid merge operator: %s\n", + FLAGS_merge_operator.c_str()); + exit(1); + } + options.merge_operator = merge_operator_.get(); + Status s; if(FLAGS_read_only) { s = DB::OpenForReadOnly(options, FLAGS_db, &db_); @@ -1915,8 +1945,6 @@ class Benchmark { // // Read-modify-write for random keys - // - // TODO: Implement MergeOperator tests here (Read-modify-write) void UpdateRandom(ThreadState* thread) { ReadOptions options(FLAGS_verify_checksum, true); RandomGenerator gen; @@ -1963,6 +1991,100 @@ class Benchmark { thread->stats.AddMessage(msg); } + // Read-modify-write for random keys. + // Each operation causes the key grow by value_size (simulating an append). + // Generally used for benchmarking against merges of similar type + void AppendRandom(ThreadState* thread) { + ReadOptions options(FLAGS_verify_checksum, true); + RandomGenerator gen; + std::string value; + long found = 0; + + // The number of iterations is the larger of read_ or write_ + Duration duration(FLAGS_duration, readwrites_); + while (!duration.Done(1)) { + const int k = thread->rand.Next() % FLAGS_num; + unique_ptr key = GenerateKeyFromInt(k); + + if (FLAGS_use_snapshot) { + options.snapshot = db_->GetSnapshot(); + } + + if (FLAGS_get_approx) { + char key2[100]; + snprintf(key2, sizeof(key2), "%016d", k + 1); + Slice skey2(key2); + Slice skey(key2); + Range range(skey, skey2); + uint64_t sizes; + db_->GetApproximateSizes(&range, 1, &sizes); + } + + // Get the existing value + if (db_->Get(options, key.get(), &value).ok()) { + found++; + } else { + // If not existing, then just assume an empty string of data + value.clear(); + } + + if (FLAGS_use_snapshot) { + db_->ReleaseSnapshot(options.snapshot); + } + + // Update the value (by appending data) + Slice operand = gen.Generate(value_size_); + if (value.size() > 0) { + // Use a delimeter to match the semantics for StringAppendOperator + value.append(1,','); + } + value.append(operand.data(), operand.size()); + + // Write back to the database + Status s = db_->Put(write_options_, key.get(), value); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + thread->stats.FinishedSingleOp(db_); + } + char msg[100]; + snprintf(msg, sizeof(msg), "( updates:%ld found:%ld)", readwrites_, found); + thread->stats.AddMessage(msg); + } + + // Read-modify-write for random keys (using MergeOperator) + // The merge operator to use should be defined by FLAGS_merge_operator + // Adjust FLAGS_value_size so that the keys are reasonable for this operator + // Assumes that the merge operator is non-null (i.e.: is well-defined) + // + // For example, use FLAGS_merge_operator="uint64add" and FLAGS_value_size=8 + // to simulate random additions over 64-bit integers using merge. + void MergeRandom(ThreadState* thread) { + RandomGenerator gen; + + // The number of iterations is the larger of read_ or write_ + Duration duration(FLAGS_duration, readwrites_); + while (!duration.Done(1)) { + const int k = thread->rand.Next() % FLAGS_num; + unique_ptr key = GenerateKeyFromInt(k); + + Status s = db_->Merge(write_options_, key.get(), + gen.Generate(value_size_)); + + if (!s.ok()) { + fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); + exit(1); + } + thread->stats.FinishedSingleOp(db_); + } + + // Print some statistics + char msg[100]; + snprintf(msg, sizeof(msg), "( updates:%ld)", readwrites_); + thread->stats.AddMessage(msg); + } + void Compact(ThreadState* thread) { db_->CompactRange(nullptr, nullptr); } @@ -2260,11 +2382,13 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--keys_per_multiget=%d%c", &n, &junk) == 1) { FLAGS_keys_per_multiget = n; - } else if (sscanf(argv[i], "--bytes_per_sync=%ld%c", &l, &junk) == 1) { + } else if (sscanf(argv[i], "--bytes_per_sync=%ld%c", &l, &junk) == 1) { FLAGS_bytes_per_sync = l; } else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk) == 1 && (n == 0 || n ==1 )) { FLAGS_filter_deletes = n; + } else if (sscanf(argv[i], "--merge_operator=%s", buf) == 1) { + FLAGS_merge_operator = buf; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 941b326d3..45cf99f01 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -36,6 +36,7 @@ #include "util/logging.h" #include "utilities/ttl/db_ttl.h" #include "hdfs/env_hdfs.h" +#include "utilities/merge_operators.h" static const long KB = 1024; @@ -198,6 +199,9 @@ static bool FLAGS_filter_deletes = false; // Level0 compaction start trigger static int FLAGS_level0_file_num_compaction_trigger = 0; +// On true, replaces all writes with a Merge that behaves like a Put +static bool FLAGS_use_merge_put = false; + namespace leveldb { // convert long to a big-endian slice key @@ -533,6 +537,7 @@ class StressTest { FLAGS_test_batches_snapshots ? sizeof(long) : sizeof(long)-1)), db_(nullptr), + merge_operator_(MergeOperators::CreatePutOperator()), num_times_reopened_(0) { if (FLAGS_destroy_db_initially) { std::vector files; @@ -548,6 +553,7 @@ class StressTest { ~StressTest() { delete db_; + merge_operator_ = nullptr; delete filter_policy_; delete prefix_extractor_; } @@ -677,7 +683,11 @@ class StressTest { keys[i] += key.ToString(); values[i] += value.ToString(); value_slices[i] = values[i]; - batch.Put(keys[i], value_slices[i]); + if (FLAGS_use_merge_put) { + batch.Merge(keys[i], value_slices[i]); + } else { + batch.Put(keys[i], value_slices[i]); + } } s = db_->Write(writeoptions, &batch); @@ -942,7 +952,11 @@ class StressTest { VerifyValue(rand_key, read_opts, *(thread->shared), &from_db, true); } thread->shared->Put(rand_key, value_base); - db_->Put(write_opts, key, v); + if (FLAGS_use_merge_put) { + db_->Merge(write_opts, key, v); + } else { + db_->Put(write_opts, key, v); + } thread->stats.AddBytesForWrites(1, sz); } else { MultiPut(thread, write_opts, key, v, sz); @@ -1125,6 +1139,10 @@ class StressTest { options.purge_redundant_kvs_while_flush = false; } + if (FLAGS_use_merge_put) { + options.merge_operator = merge_operator_.get(); + } + fprintf(stdout, "DB path: [%s]\n", FLAGS_db); Status s; @@ -1170,6 +1188,7 @@ class StressTest { const SliceTransform* prefix_extractor_; DB* db_; StackableDB* sdb_; + std::shared_ptr merge_operator_; int num_times_reopened_; }; @@ -1335,6 +1354,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { FLAGS_filter_deletes = n; + } else if (sscanf(argv[i], "--use_merge=%d%c", &n, &junk) + == 1 && (n == 0 || n == 1)) { + FLAGS_use_merge_put = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index c7c06422a..d4661300e 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -2,6 +2,7 @@ #define MERGE_OPERATORS_H #include +#include #include "leveldb/merge_operator.h" @@ -9,10 +10,31 @@ namespace leveldb { class MergeOperators { public: - static std::shared_ptr CreatePutOperator(); - static std::shared_ptr CreateUInt64AddOperator(); + static std::shared_ptr CreatePutOperator(); + static std::shared_ptr CreateUInt64AddOperator(); + static std::shared_ptr CreateStringAppendOperator(); + static std::shared_ptr CreateStringAppendTESTOperator(); + + // Will return a different merge operator depending on the string. + // TODO: Hook the "name" up to the actual Name() of the MergeOperators? + static std::shared_ptr CreateFromStringId( + const std::string& name) { + if (name == "put") { + return CreatePutOperator(); + } else if ( name == "uint64add") { + return CreateUInt64AddOperator(); + } else if (name == "stringappend") { + return CreateStringAppendOperator(); + } else if (name == "stringappendtest") { + return CreateStringAppendTESTOperator(); + } else { + // Empty or unknown, just return nullptr + return nullptr; + } + } + }; -} +} // namespace leveldb #endif diff --git a/utilities/merge_operators/string_append/stringappend.cc b/utilities/merge_operators/string_append/stringappend.cc index 9bded8a98..ff5301f27 100644 --- a/utilities/merge_operators/string_append/stringappend.cc +++ b/utilities/merge_operators/string_append/stringappend.cc @@ -50,6 +50,10 @@ const char* StringAppendOperator::Name() const { return "StringAppendOperator"; } +std::shared_ptr MergeOperators::CreateStringAppendOperator() { + return std::make_shared(','); +} + } // namespace leveldb diff --git a/utilities/merge_operators/string_append/stringappend2.cc b/utilities/merge_operators/string_append/stringappend2.cc index c0930d653..93e65a7b8 100644 --- a/utilities/merge_operators/string_append/stringappend2.cc +++ b/utilities/merge_operators/string_append/stringappend2.cc @@ -66,25 +66,38 @@ bool StringAppendTESTOperator::PartialMerge(const Slice& key, std::string* new_value, Logger* logger) const { return false; +} -// // Clear the *new_value for writing. -// assert(new_value); -// new_value->clear(); -// -// // Generic append -// // Reserve correct size for *new_value, and apply concatenation. -// new_value->reserve(left_operand.size() + 1 + right_operand.size()); -// new_value->assign(left_operand.data(), left_operand.size()); -// new_value->append(1,delim_); -// new_value->append(right_operand.data(), right_operand.size()); -// -// return true; +// A version of PartialMerge that actually performs "partial merging". +// Use this to simulate the exact behaviour of the StringAppendOperator. +bool StringAppendTESTOperator::_AssocPartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const { + // Clear the *new_value for writing. + assert(new_value); + new_value->clear(); + + // Generic append + // Reserve correct size for *new_value, and apply concatenation. + new_value->reserve(left_operand.size() + 1 + right_operand.size()); + new_value->assign(left_operand.data(), left_operand.size()); + new_value->append(1,delim_); + new_value->append(right_operand.data(), right_operand.size()); + return true; } const char* StringAppendTESTOperator::Name() const { return "StringAppendTESTOperator"; } + +std::shared_ptr +MergeOperators::CreateStringAppendTESTOperator() { + return std::make_shared(','); +} + } // namespace leveldb diff --git a/utilities/merge_operators/string_append/stringappend2.h b/utilities/merge_operators/string_append/stringappend2.h index 71135c826..2e9f505bd 100644 --- a/utilities/merge_operators/string_append/stringappend2.h +++ b/utilities/merge_operators/string_append/stringappend2.h @@ -35,6 +35,14 @@ class StringAppendTESTOperator : public MergeOperator { virtual const char* Name() const override; private: + // A version of PartialMerge that actually performs "partial merging". + // Use this to simulate the exact behaviour of the StringAppendOperator. + bool _AssocPartialMerge(const Slice& key, + const Slice& left_operand, + const Slice& right_operand, + std::string* new_value, + Logger* logger) const; + char delim_; // The delimiter is inserted between elements };