From a28df7a75a60f71b8ccea9c16a50c5ab420a0290 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 15 Sep 2020 20:04:58 -0700 Subject: [PATCH] Add basic support for user-defined timestamp to db_bench (#7389) Summary: Update db_bench so that we can run it with user-defined timestamp. Currently, only 64-bit timestamp is supported, while others are disabled by assertion. Pull Request resolved: https://github.com/facebook/rocksdb/pull/7389 Test Plan: ./db_bench -benchmarks=fillseq,fillrandom,readrandom,readsequential,....., -user_timestamp_size=8 Reviewed By: ltamasi Differential Revision: D23720830 Pulled By: riversand963 fbshipit-source-id: 486eacbb82de9a5441e79a61bfa9beef6581608a --- tools/db_bench_tool.cc | 336 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 302 insertions(+), 34 deletions(-) diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 73fa35ac6..0e3aa223b 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -292,6 +292,9 @@ static bool ValidateUint32Range(const char* flagname, uint64_t value) { DEFINE_int32(key_size, 16, "size of each key"); +DEFINE_int32(user_timestamp_size, 0, + "number of bytes in a user-defined timestamp"); + DEFINE_int32(num_multi_db, 0, "Number of DBs used in the benchmark. 0 means single DB."); @@ -1339,6 +1342,10 @@ DEFINE_bool(report_file_operations, false, "if report number of file " "operations"); DEFINE_int32(readahead_size, 0, "Iterator readahead size"); +DEFINE_bool(read_with_latest_user_timestamp, true, + "If true, always use the current latest timestamp for read. If " + "false, choose a random timestamp from the past."); + static const bool FLAGS_soft_rate_limit_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_soft_rate_limit, &ValidateRateLimit); @@ -2246,6 +2253,25 @@ class TimestampEmulator { TimestampEmulator() : timestamp_(0) {} uint64_t Get() const { return timestamp_.load(); } void Inc() { timestamp_++; } + Slice Allocate(char* scratch) { + // TODO: support larger timestamp sizes + assert(FLAGS_user_timestamp_size == 8); + assert(scratch); + uint64_t ts = timestamp_.fetch_add(1); + EncodeFixed64(scratch, ts); + return Slice(scratch, FLAGS_user_timestamp_size); + } + Slice GetTimestampForRead(Random64& rand, char* scratch) { + assert(FLAGS_user_timestamp_size == 8); + assert(scratch); + if (FLAGS_read_with_latest_user_timestamp) { + return Allocate(scratch); + } + // Choose a random timestamp from the past. + uint64_t ts = rand.Next() % Get(); + EncodeFixed64(scratch, ts); + return Slice(scratch, FLAGS_user_timestamp_size); + } }; // State shared by all concurrent executions of the same benchmark. @@ -2277,10 +2303,8 @@ struct ThreadState { Stats stats; SharedState* shared; - /* implicit */ ThreadState(int index) - : tid(index), - rand((FLAGS_seed ? FLAGS_seed : 1000) + index) { - } + explicit ThreadState(int index) + : tid(index), rand((FLAGS_seed ? FLAGS_seed : 1000) + index) {} }; class Duration { @@ -2331,6 +2355,7 @@ class Benchmark { std::vector multi_dbs_; int64_t num_; int key_size_; + int user_timestamp_size_; int prefix_size_; int64_t keys_per_prefix_; int64_t entries_per_batch_; @@ -2406,6 +2431,8 @@ class Benchmark { std::shared_ptr listener_; + std::unique_ptr mock_app_clock_; + bool SanityCheck() { if (FLAGS_compression_ratio > 1) { fprintf(stderr, "compression_ratio should be between 0 and 1\n"); @@ -2454,7 +2481,9 @@ class Benchmark { void PrintHeader() { PrintEnvironment(); - fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size); + fprintf(stdout, + "Keys: %d bytes each (+ %d bytes user-defined timestamp)\n", + FLAGS_key_size, FLAGS_user_timestamp_size); auto avg_value_size = FLAGS_value_size; if (FLAGS_value_size_distribution_type_e == kFixed) { fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n", @@ -2694,6 +2723,7 @@ class Benchmark { prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)), num_(FLAGS_num), key_size_(FLAGS_key_size), + user_timestamp_size_(FLAGS_user_timestamp_size), prefix_size_(FLAGS_prefix_size), keys_per_prefix_(FLAGS_keys_per_prefix), entries_per_batch_(1), @@ -2769,6 +2799,9 @@ class Benchmark { } listener_.reset(new ErrorHandlerListener()); + if (user_timestamp_size_ > 0) { + mock_app_clock_.reset(new TimestampEmulator()); + } } ~Benchmark() { @@ -2788,17 +2821,19 @@ class Benchmark { } // Generate key according to the given specification and random number. - // The resulting key will have the following format (if keys_per_prefix_ - // is positive), extra trailing bytes are either cut off or padded with '0'. - // The prefix value is derived from key value. - // ---------------------------- - // | prefix 00000 | key 00000 | - // ---------------------------- - // If keys_per_prefix_ is 0, the key is simply a binary representation of - // random number followed by trailing '0's - // ---------------------------- - // | key 00000 | - // ---------------------------- + // The resulting key will have the following format: + // - If keys_per_prefix_ is positive, extra trailing bytes are either cut + // off or padded with '0'. + // The prefix value is derived from key value. + // ---------------------------- + // | prefix 00000 | key 00000 | + // ---------------------------- + // + // - If keys_per_prefix_ is 0, the key is simply a binary representation of + // random number followed by trailing '0's + // ---------------------------- + // | key 00000 | + // ---------------------------- void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) { if (!keys_.empty()) { assert(FLAGS_use_existing_keys); @@ -4037,6 +4072,14 @@ class Benchmark { options.enable_thread_tracking = true; } + if (FLAGS_user_timestamp_size > 0) { + if (FLAGS_user_timestamp_size != 8) { + fprintf(stderr, "Only 64 bits timestamps are supported.\n"); + exit(1); + } + options.comparator = ROCKSDB_NAMESPACE::test::ComparatorWithU64Ts(); + } + #ifndef ROCKSDB_LITE if (FLAGS_readonly && FLAGS_transaction_db) { fprintf(stderr, "Cannot use readonly flag with transaction_db\n"); @@ -4430,7 +4473,8 @@ class Benchmark { } RandomGenerator gen; - WriteBatch batch; + WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0, + user_timestamp_size_); Status s; int64_t bytes = 0; @@ -4449,6 +4493,11 @@ class Benchmark { } } + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } + int64_t stage = 0; int64_t num_written = 0; while (!duration.Done(entries_per_batch_)) { @@ -4492,8 +4541,8 @@ class Benchmark { batch.Put(db_with_cfh->GetCfh(rand_num), key, val); } - batch_bytes += val.size() + key_size_; - bytes += val.size() + key_size_; + batch_bytes += val.size() + key_size_ + user_timestamp_size_; + bytes += val.size() + key_size_ + user_timestamp_size_; ++num_written; if (writes_per_range_tombstone_ > 0 && num_written > writes_before_delete_range_ && @@ -4549,6 +4598,15 @@ class Benchmark { // once per write. thread->stats.ResetLastOpTime(); } + if (user_timestamp_size_ > 0) { + Slice user_ts = mock_app_clock_->Allocate(ts_guard.get()); + s = batch.AssignTimestamp(user_ts); + if (!s.ok()) { + fprintf(stderr, "assign timestamp to write batch: %s\n", + s.ToString().c_str()); + ErrorExit(); + } + } if (!use_blob_db_) { s = db_with_cfh->db->Write(write_options_, &batch); } @@ -4905,6 +4963,13 @@ class Benchmark { void ReadSequential(ThreadState* thread, DB* db) { ReadOptions options(FLAGS_verify_checksum, true); options.tailing = FLAGS_use_tailing_iterator; + std::unique_ptr ts_guard; + Slice ts; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); + options.timestamp = &ts; + } Iterator* iter = db->NewIterator(options); int64_t i = 0; @@ -5026,6 +5091,11 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); std::string value; + Slice ts; + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } DB* db = SelectDBWithCfh(thread)->db; int64_t pot = 1; @@ -5039,7 +5109,15 @@ class Benchmark { int64_t key_rand = thread->rand.Next() & (pot - 1); GenerateKeyFromInt(key_rand, FLAGS_num, &key); ++read; - auto status = db->Get(options, key, &value); + std::string ts_ret; + std::string* ts_ptr = nullptr; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->GetTimestampForRead(thread->rand, + ts_guard.get()); + options.timestamp = &ts; + ts_ptr = &ts_ret; + } + auto status = db->Get(options, key, &value, ts_ptr); if (status.ok()) { ++found; } else if (!status.IsNotFound()) { @@ -5103,6 +5181,11 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); PinnableSlice pinnable_val; + std::unique_ptr ts_guard; + Slice ts; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { @@ -5126,19 +5209,26 @@ class Benchmark { } GenerateKeyFromInt(key_rand, FLAGS_num, &key); read++; + std::string ts_ret; + std::string* ts_ptr = nullptr; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); + options.timestamp = &ts; + ts_ptr = &ts_ret; + } Status s; if (FLAGS_num_column_families > 1) { s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, - &pinnable_val); + &pinnable_val, ts_ptr); } else { pinnable_val.Reset(); s = db_with_cfh->db->Get(options, db_with_cfh->db->DefaultColumnFamily(), key, - &pinnable_val); + &pinnable_val, ts_ptr); } if (s.ok()) { found++; - bytes += key.size() + pinnable_val.size(); + bytes += key.size() + pinnable_val.size() + user_timestamp_size_; } else if (!s.IsNotFound()) { fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); abort(); @@ -5184,6 +5274,11 @@ class Benchmark { keys.push_back(AllocateKey(&key_guards.back())); } + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } + Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { DB* db = SelectDB(thread); @@ -5202,6 +5297,11 @@ class Benchmark { GenerateKeyFromInt(GetRandomKey(&thread->rand), FLAGS_num, &keys[i]); } } + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); + options.timestamp = &ts; + } if (!FLAGS_multiread_batched) { std::vector statuses = db->MultiGet(options, keys, &values); assert(static_cast(statuses.size()) == entries_per_batch_); @@ -5718,8 +5818,17 @@ class Benchmark { void IteratorCreation(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true); + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } while (!duration.Done(1)) { DB* db = SelectDB(thread); + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); + options.timestamp = &ts; + } Iterator* iter = db->NewIterator(options); delete iter; thread->stats.FinishedOps(nullptr, db, 1, kOthers); @@ -5743,6 +5852,13 @@ class Benchmark { options.prefix_same_as_start = FLAGS_prefix_same_as_start; options.tailing = FLAGS_use_tailing_iterator; options.readahead_size = FLAGS_readahead_size; + std::unique_ptr ts_guard; + Slice ts; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); + options.timestamp = &ts; + } Iterator* single_iter = nullptr; std::vector multi_iters; @@ -5866,11 +5982,17 @@ class Benchmark { } void DoDelete(ThreadState* thread, bool seq) { - WriteBatch batch; + WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0, + user_timestamp_size_); Duration duration(seq ? 0 : FLAGS_duration, deletes_); int64_t i = 0; std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); + std::unique_ptr ts_guard; + Slice ts; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } while (!duration.Done(entries_per_batch_)) { DB* db = SelectDB(thread); @@ -5880,7 +6002,16 @@ class Benchmark { GenerateKeyFromInt(k, FLAGS_num, &key); batch.Delete(key); } - auto s = db->Write(write_options_, &batch); + Status s; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + s = batch.AssignTimestamp(ts); + if (!s.ok()) { + fprintf(stderr, "assign timestamp: %s\n", s.ToString().c_str()); + ErrorExit(); + } + } + s = db->Write(write_options_, &batch); thread->stats.FinishedOps(nullptr, db, entries_per_batch_, kDelete); if (!s.ok()) { fprintf(stderr, "del error: %s\n", s.ToString().c_str()); @@ -5930,6 +6061,10 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } uint32_t written = 0; bool hint_printed = false; @@ -5961,18 +6096,27 @@ class Benchmark { Status s; Slice val = gen.Generate(); + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + write_options_.timestamp = &ts; + } if (write_merge == kWrite) { s = db->Put(write_options_, key, val); } else { s = db->Merge(write_options_, key, val); } + // Restore write_options_ + if (user_timestamp_size_ > 0) { + write_options_.timestamp = nullptr; + } written++; if (!s.ok()) { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); exit(1); } - bytes += key.size() + val.size(); + bytes += key.size() + val.size() + user_timestamp_size_; thread->stats.FinishedOps(&db_, db_.db, 1, kWrite); if (FLAGS_benchmark_write_rate_limit > 0) { @@ -5999,6 +6143,13 @@ class Benchmark { } assert(db_.db != nullptr); ReadOptions read_options; + std::unique_ptr ts_guard; + Slice ts; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); + read_options.timestamp = &ts; + } Iterator* iter = db_.db->NewIterator(read_options); fprintf(stderr, "num reads to do %" PRIu64 "\n", reads_); @@ -6030,13 +6181,26 @@ class Benchmark { std::string suffixes[3] = {"2", "1", "0"}; std::string keys[3]; - WriteBatch batch; + WriteBatch batch(/*reserved_bytes=*/0, /*max_bytes=*/0, + user_timestamp_size_); Status s; for (int i = 0; i < 3; i++) { keys[i] = key.ToString() + suffixes[i]; batch.Put(keys[i], value); } + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + Slice ts = mock_app_clock_->Allocate(ts_guard.get()); + s = batch.AssignTimestamp(ts); + if (!s.ok()) { + fprintf(stderr, "assign timestamp to batch: %s\n", + s.ToString().c_str()); + ErrorExit(); + } + } + s = db->Write(writeoptions, &batch); return s; } @@ -6049,13 +6213,25 @@ class Benchmark { std::string suffixes[3] = {"1", "2", "0"}; std::string keys[3]; - WriteBatch batch; + WriteBatch batch(0, 0, user_timestamp_size_); Status s; for (int i = 0; i < 3; i++) { keys[i] = key.ToString() + suffixes[i]; batch.Delete(keys[i]); } + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + Slice ts = mock_app_clock_->Allocate(ts_guard.get()); + s = batch.AssignTimestamp(ts); + if (!s.ok()) { + fprintf(stderr, "assign timestamp to batch: %s\n", + s.ToString().c_str()); + ErrorExit(); + } + } + s = db->Write(writeoptions, &batch); return s; } @@ -6070,6 +6246,15 @@ class Benchmark { Slice key_slices[3]; std::string values[3]; ReadOptions readoptionscopy = readoptions; + + std::unique_ptr ts_guard; + Slice ts; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + ts = mock_app_clock_->Allocate(ts_guard.get()); + readoptionscopy.timestamp = &ts; + } + readoptionscopy.snapshot = db->GetSnapshot(); Status s; for (int i = 0; i < 3; i++) { @@ -6192,6 +6377,11 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } + // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { DB* db = SelectDB(thread); @@ -6203,6 +6393,12 @@ class Benchmark { } if (get_weight > 0) { // do all the gets first + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->GetTimestampForRead(thread->rand, + ts_guard.get()); + options.timestamp = &ts; + } Status s = db->Get(options, key, &value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "get error: %s\n", s.ToString().c_str()); @@ -6217,6 +6413,11 @@ class Benchmark { } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + write_options_.timestamp = &ts; + } Status s = db->Put(write_options_, key, gen.Generate()); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); @@ -6246,15 +6447,25 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); + Slice ts; + if (user_timestamp_size_ > 0) { + // Read with newest timestamp because we are doing rmw. + ts = mock_app_clock_->Allocate(ts_guard.get()); + options.timestamp = &ts; + } auto status = db->Get(options, key, &value); if (status.ok()) { ++found; - bytes += key.size() + value.size(); + bytes += key.size() + value.size() + user_timestamp_size_; } else if (!status.IsNotFound()) { fprintf(stderr, "Get returned an error: %s\n", status.ToString().c_str()); @@ -6268,12 +6479,16 @@ class Benchmark { } Slice val = gen.Generate(); + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + write_options_.timestamp = &ts; + } Status s = db->Put(write_options_, key, val); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } - bytes += key.size() + val.size(); + bytes += key.size() + val.size() + user_timestamp_size_; thread->stats.FinishedOps(nullptr, db, 1, kUpdate); } char msg[100]; @@ -6298,10 +6513,19 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + options.timestamp = &ts; + } auto status = db->Get(options, key, &existing_value); if (status.ok()) { @@ -6322,6 +6546,11 @@ class Benchmark { xor_operator.XOR(nullptr, value, &new_value); } + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + write_options_.timestamp = &ts; + } + Status s = db->Put(write_options_, key, Slice(new_value)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); @@ -6347,16 +6576,25 @@ class Benchmark { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + options.timestamp = &ts; + } auto status = db->Get(options, key, &value); if (status.ok()) { ++found; - bytes += key.size() + value.size(); + bytes += key.size() + value.size() + user_timestamp_size_; } else if (!status.IsNotFound()) { fprintf(stderr, "Get returned an error: %s\n", status.ToString().c_str()); @@ -6374,13 +6612,18 @@ class Benchmark { } value.append(operand.data(), operand.size()); + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + write_options_.timestamp = &ts; + } + // Write back to the database Status s = db->Put(write_options_, key, value); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); ErrorExit(); } - bytes += key.size() + value.size(); + bytes += key.size() + value.size() + user_timestamp_size_; thread->stats.FinishedOps(nullptr, db, 1, kUpdate); } @@ -6506,8 +6749,15 @@ class Benchmark { thread->stats.Start(thread->tid); DB* db = SelectDB(thread); - std::unique_ptr iter( - db->NewIterator(ReadOptions(FLAGS_verify_checksum, true))); + ReadOptions read_opts(FLAGS_verify_checksum, true); + std::unique_ptr ts_guard; + Slice ts; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + ts = mock_app_clock_->GetTimestampForRead(thread->rand, ts_guard.get()); + read_opts.timestamp = &ts; + } + std::unique_ptr iter(db->NewIterator(read_opts)); std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); @@ -6735,6 +6985,10 @@ class Benchmark { void RandomReplaceKeys(ThreadState* thread) { std::unique_ptr key_guard; Slice key = AllocateKey(&key_guard); + std::unique_ptr ts_guard; + if (user_timestamp_size_ > 0) { + ts_guard.reset(new char[user_timestamp_size_]); + } std::vector counters(FLAGS_numdistinct, 0); size_t max_counter = 50; RandomGenerator gen; @@ -6743,6 +6997,11 @@ class Benchmark { DB* db = SelectDB(thread); for (int64_t i = 0; i < FLAGS_numdistinct; i++) { GenerateKeyFromInt(i * max_counter, FLAGS_num, &key); + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + write_options_.timestamp = &ts; + } s = db->Put(write_options_, key, gen.Generate()); if (!s.ok()) { fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str()); @@ -6762,12 +7021,21 @@ class Benchmark { static_cast(0)); GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num, &key); + Slice ts; + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + write_options_.timestamp = &ts; + } s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key) : db->Delete(write_options_, key); if (s.ok()) { counters[key_id] = (counters[key_id] + 1) % max_counter; GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num, &key); + if (user_timestamp_size_ > 0) { + ts = mock_app_clock_->Allocate(ts_guard.get()); + write_options_.timestamp = &ts; + } s = db->Put(write_options_, key, Slice()); }