diff --git a/CMakeLists.txt b/CMakeLists.txt index 1395952c9..5a9bd5318 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -602,6 +602,7 @@ set(SOURCES utilities/leveldb_options/leveldb_options.cc utilities/lua/rocks_lua_compaction_filter.cc utilities/memory/memory_util.cc + utilities/merge_operators/bytesxor.cc utilities/merge_operators/max.cc utilities/merge_operators/put.cc utilities/merge_operators/string_append/stringappend.cc diff --git a/TARGETS b/TARGETS index 2846bb826..ca0ab48d0 100644 --- a/TARGETS +++ b/TARGETS @@ -242,6 +242,7 @@ cpp_library( "utilities/leveldb_options/leveldb_options.cc", "utilities/lua/rocks_lua_compaction_filter.cc", "utilities/memory/memory_util.cc", + "utilities/merge_operators/bytesxor.cc", "utilities/merge_operators/max.cc", "utilities/merge_operators/put.cc", "utilities/merge_operators/string_append/stringappend.cc", diff --git a/src.mk b/src.mk index 17bf94d76..c2ad87c54 100644 --- a/src.mk +++ b/src.mk @@ -182,6 +182,7 @@ LIB_SOURCES = \ utilities/merge_operators/string_append/stringappend.cc \ utilities/merge_operators/string_append/stringappend2.cc \ utilities/merge_operators/uint64add.cc \ + utilities/merge_operators/bytesxor.cc \ utilities/option_change_migration/option_change_migration.cc \ utilities/options/options_util.cc \ utilities/persistent_cache/block_cache_tier.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index cfa365dea..40e033167 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -70,6 +70,7 @@ #include "util/xxhash.h" #include "utilities/blob_db/blob_db.h" #include "utilities/merge_operators.h" +#include "utilities/merge_operators/bytesxor.h" #include "utilities/persistent_cache/block_cache_tier.h" #ifdef OS_WIN @@ -107,6 +108,7 @@ DEFINE_string( "readwhilemerging," "readrandomwriterandom," "updaterandom," + "xorupdaterandom," "randomwithverify," "fill100K," "crc32c," @@ -151,6 +153,8 @@ DEFINE_string( "\tprefixscanrandom -- prefix scan N times in random order\n" "\tupdaterandom -- N threads doing read-modify-write for random " "keys\n" + "\txorupdaterandom -- N threads doing read-XOR-write for " + "random keys\n" "\tappendrandom -- N threads doing read-modify-write with " "growing values\n" "\tmergerandom -- same as updaterandom/appendrandom using merge" @@ -2526,6 +2530,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { method = &Benchmark::ReadRandomMergeRandom; } else if (name == "updaterandom") { method = &Benchmark::UpdateRandom; + } else if (name == "xorupdaterandom") { + method = &Benchmark::XORUpdateRandom; } else if (name == "appendrandom") { method = &Benchmark::AppendRandom; } else if (name == "mergerandom") { @@ -4743,6 +4749,58 @@ void VerifyDBFromDB(std::string& truth_db_name) { thread->stats.AddMessage(msg); } + // Read-XOR-write for random keys. Xors the existing value with a randomly + // generated value, and stores the result. Assuming A in the array of bytes + // representing the existing value, we generate an array B of the same size, + // then compute C = A^B as C[i]=A[i]^B[i], and store C + void XORUpdateRandom(ThreadState* thread) { + ReadOptions options(FLAGS_verify_checksum, true); + RandomGenerator gen; + std::string existing_value; + int64_t found = 0; + Duration duration(FLAGS_duration, readwrites_); + + BytesXOROperator xor_operator; + + std::unique_ptr key_guard; + Slice key = AllocateKey(&key_guard); + // the number of iterations is the larger of read_ or write_ + while (!duration.Done(1)) { + DB* db = SelectDB(thread); + GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); + + auto status = db->Get(options, key, &existing_value); + if (status.ok()) { + ++found; + } else if (!status.IsNotFound()) { + fprintf(stderr, "Get returned an error: %s\n", + status.ToString().c_str()); + exit(1); + } + + Slice value = gen.Generate(value_size_); + std::string new_value; + + if (status.ok()) { + Slice existing_value_slice = Slice(existing_value); + xor_operator.XOR(&existing_value_slice, value, &new_value); + } else { + xor_operator.XOR(nullptr, value, &new_value); + } + + Status s = db->Put(write_options_, key, Slice(new_value)); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + thread->stats.FinishedOps(nullptr, db, 1); + } + char msg[100]; + snprintf(msg, sizeof(msg), + "( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found); + thread->stats.AddMessage(msg); + } + // Read-modify-write for random keys. // Each operation causes the key grow by value_size (simulating an append). // Generally used for benchmarking against merges of similar type diff --git a/utilities/merge_operators.h b/utilities/merge_operators.h index 602a4d01a..40a19cf86 100644 --- a/utilities/merge_operators.h +++ b/utilities/merge_operators.h @@ -3,13 +3,13 @@ // COPYING file in the root directory) and Apache 2.0 License // (found in the LICENSE.Apache file in the root directory). // -#ifndef MERGE_OPERATORS_H -#define MERGE_OPERATORS_H +#pragma once +#include "rocksdb/merge_operator.h" -#include #include -#include "rocksdb/merge_operator.h" +#include +#include namespace rocksdb { @@ -21,6 +21,7 @@ class MergeOperators { static std::shared_ptr CreateStringAppendOperator(); static std::shared_ptr CreateStringAppendTESTOperator(); static std::shared_ptr CreateMaxOperator(); + static std::shared_ptr CreateBytesXOROperator(); // Will return a different merge operator depending on the string. // TODO: Hook the "name" up to the actual Name() of the MergeOperators? @@ -38,14 +39,13 @@ class MergeOperators { return CreateStringAppendTESTOperator(); } else if (name == "max") { return CreateMaxOperator(); + } else if (name == "bytesxor") { + return CreateBytesXOROperator(); } else { // Empty or unknown, just return nullptr return nullptr; } } - }; -} // namespace rocksdb - -#endif +} // namespace rocksdb diff --git a/utilities/merge_operators/bytesxor.cc b/utilities/merge_operators/bytesxor.cc new file mode 100644 index 000000000..cf9d97664 --- /dev/null +++ b/utilities/merge_operators/bytesxor.cc @@ -0,0 +1,59 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include +#include + +#include "utilities/merge_operators/bytesxor.h" + +namespace rocksdb { + +std::shared_ptr MergeOperators::CreateBytesXOROperator() { + return std::make_shared(); +} + +bool BytesXOROperator::Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const { + XOR(existing_value, value, new_value); + return true; +} + +void BytesXOROperator::XOR(const Slice* existing_value, + const Slice& value, std::string* new_value) const { + if (!existing_value) { + new_value->clear(); + new_value->assign(value.data(), value.size()); + return; + } + + size_t min_size = std::min(existing_value->size(), value.size()); + size_t max_size = std::max(existing_value->size(), value.size()); + + new_value->clear(); + new_value->reserve(max_size); + + const char* existing_value_data = existing_value->data(); + const char* value_data = value.data(); + + for (size_t i = 0; i < min_size; i++) { + new_value->push_back(existing_value_data[i] ^ value_data[i]); + } + + if (existing_value->size() == max_size) { + for (size_t i = min_size; i < max_size; i++) { + new_value->push_back(existing_value_data[i]); + } + } else { + assert(value.size() == max_size); + for (size_t i = min_size; i < max_size; i++) { + new_value->push_back(value_data[i]); + } + } +} + +} // namespace rocksdb diff --git a/utilities/merge_operators/bytesxor.h b/utilities/merge_operators/bytesxor.h new file mode 100644 index 000000000..1562ca852 --- /dev/null +++ b/utilities/merge_operators/bytesxor.h @@ -0,0 +1,42 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef UTILITIES_MERGE_OPERATORS_BYTESXOR_H_ +#define UTILITIES_MERGE_OPERATORS_BYTESXOR_H_ + +#include +#include +#include +#include "rocksdb/env.h" +#include "rocksdb/merge_operator.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "utilities/merge_operators.h" + +namespace rocksdb { + +// A 'model' merge operator that XORs two (same sized) array of bytes. +// Implemented as an AssociativeMergeOperator for simplicity and example. +class BytesXOROperator : public AssociativeMergeOperator { + public: + // XORs the two array of bytes one byte at a time and stores the result + // in new_value. len is the number of xored bytes, and the length of new_value + virtual bool Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const override; + + virtual const char* Name() const override { + return "BytesXOR"; + } + + void XOR(const Slice* existing_value, const Slice& value, + std::string* new_value) const; +}; + +} // namespace rocksdb + +#endif // UTILITIES_MERGE_OPERATORS_BYTESXOR_H_