diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 19589e54f..c4437092f 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -32,6 +32,7 @@ #include #include #include +#include #include #include @@ -351,6 +352,42 @@ DEFINE_uint32(overwrite_window_size, 1, "Warning: large values can affect throughput. " "Valid overwrite_window_size values: [1, kMaxUint32]."); +DEFINE_uint64( + disposable_entries_delete_delay, 0, + "Minimum delay in microseconds for the series of Deletes " + "to be issued. When 0 the insertion of the last disposable entry is " + "immediately followed by the issuance of the Deletes. " + "(only compatible with fillanddeleteuniquerandom benchmark)."); + +DEFINE_uint64(disposable_entries_batch_size, 0, + "Number of consecutively inserted disposable KV entries " + "that will be deleted after 'delete_delay' microseconds. " + "A series of Deletes is always issued once all the " + "disposable KV entries it targets have been inserted " + "into the DB. When 0 no deletes are issued and a " + "regular 'filluniquerandom' benchmark occurs. " + "(only compatible with fillanddeleteuniquerandom benchmark)"); + +DEFINE_int32(disposable_entries_value_size, 64, + "Size of the values (in bytes) of the entries targeted by " + "selective deletes. " + "(only compatible with fillanddeleteuniquerandom benchmark)"); + +DEFINE_uint64( + persistent_entries_batch_size, 0, + "Number of KV entries being inserted right before the deletes " + "targeting the disposable KV entries are issued. These " + "persistent keys are not targeted by the deletes, and will always " + "remain valid in the DB. (only compatible with " + "--benchmarks='fillanddeleteuniquerandom' " + "and used when--disposable_entries_batch_size is > 0)."); + +DEFINE_int32(persistent_entries_value_size, 64, + "Size of the values (in bytes) of the entries not targeted by " + "deletes. (only compatible with " + "--benchmarks='fillanddeleteuniquerandom' " + "and used when--disposable_entries_batch_size is > 0)."); + DEFINE_double(read_random_exp_range, 0.0, "Read random's key will be generated using distribution of " "num * exp(-r) where r is uniform number from 0 to this value. " @@ -3265,12 +3302,13 @@ class Benchmark { } else if (name == "fillrandom") { fresh_db = true; method = &Benchmark::WriteRandom; - } else if (name == "filluniquerandom") { + } else if (name == "filluniquerandom" || + name == "fillanddeleteuniquerandom") { fresh_db = true; if (num_threads > 1) { fprintf(stderr, - "filluniquerandom multithreaded not supported" - ", use 1 thread"); + "filluniquerandom and fillanddeleteuniquerandom " + "multithreaded not supported, use 1 thread"); num_threads = 1; } method = &Benchmark::WriteUniqueRandom; @@ -4663,6 +4701,13 @@ class Benchmark { return std::numeric_limits::max(); } + // Only available for UNIQUE_RANDOM mode. + uint64_t Fetch(uint64_t index) { + assert(mode_ == UNIQUE_RANDOM); + assert(index < values_.size()); + return values_[index]; + } + private: Random64* rand_; WriteMode mode_; @@ -4734,7 +4779,7 @@ class Benchmark { std::unique_ptr end_key_guard; Slice end_key = AllocateKey(&end_key_guard); double p = 0.0; - uint64_t num_overwrites = 0, num_unique_keys = 0; + uint64_t num_overwrites = 0, num_unique_keys = 0, num_selective_deletes = 0; // If user set overwrite_probability flag, // check if value is in [0.0,1.0]. if (FLAGS_overwrite_probability > 0.0) { @@ -4763,6 +4808,49 @@ class Benchmark { std::deque inserted_key_window; Random64 reservoir_id_gen(FLAGS_seed); + // --- Variables used in disposable/persistent keys simulation: + // The following variables are used when + // disposable_entries_batch_size is >0. We simualte a workload + // where the following sequence is repeated multiple times: + // "A set of keys S1 is inserted ('disposable entries'), then after + // some delay another set of keys S2 is inserted ('persistent entries') + // and the first set of keys S1 is deleted. S2 artificially represents + // the insertion of hypothetical results from some undefined computation + // done on the first set of keys S1. The next sequence can start as soon + // as the last disposable entry in the set S1 of this sequence is + // inserted, if the delay is non negligible" + bool skip_for_loop = false, is_disposable_entry = true; + std::vector disposable_entries_index(num_key_gens, 0); + std::vector persistent_ent_and_del_index(num_key_gens, 0); + const uint64_t kNumDispAndPersEntries = + FLAGS_disposable_entries_batch_size + + FLAGS_persistent_entries_batch_size; + if (kNumDispAndPersEntries > 0) { + if ((write_mode != UNIQUE_RANDOM) || (writes_per_range_tombstone_ > 0) || + (p > 0.0)) { + fprintf( + stderr, + "Disposable/persistent deletes are not compatible with overwrites " + "and DeleteRanges; and are only supported in filluniquerandom.\n"); + ErrorExit(); + } + if (FLAGS_disposable_entries_value_size < 0 || + FLAGS_persistent_entries_value_size < 0) { + fprintf( + stderr, + "disposable_entries_value_size and persistent_entries_value_size" + "have to be positive.\n"); + ErrorExit(); + } + } + Random rnd_disposable_entry(static_cast(FLAGS_seed)); + std::string random_value; + // Queue that stores scheduled timestamp of disposable entries deletes, + // along with starting index of disposable entry keys to delete. + std::vector>> disposable_entries_q( + num_key_gens); + // --- End of variables used in disposable/persistent keys simulation. + std::vector> expanded_key_guards; std::vector expanded_keys; if (FLAGS_expand_range_tombstones) { @@ -4814,11 +4902,101 @@ class Benchmark { inserted_key_window.push_back(rand_num); } } + } else if (kNumDispAndPersEntries > 0) { + // Check if queue is non-empty and if we need to insert + // 'persistent' KV entries (KV entries that are never deleted) + // and delete disposable entries previously inserted. + if (!disposable_entries_q[id].empty() && + (disposable_entries_q[id].front().first < + FLAGS_env->NowMicros())) { + // If we need to perform a "merge op" pattern, + // we first write all the persistent KV entries not targeted + // by deletes, and then we write the disposable entries deletes. + if (persistent_ent_and_del_index[id] < + FLAGS_persistent_entries_batch_size) { + // Generate key to insert. + rand_num = + key_gens[id]->Fetch(disposable_entries_q[id].front().second + + FLAGS_disposable_entries_batch_size + + persistent_ent_and_del_index[id]); + persistent_ent_and_del_index[id]++; + is_disposable_entry = false; + skip_for_loop = false; + } else if (persistent_ent_and_del_index[id] < + kNumDispAndPersEntries) { + // Find key of the entry to delete. + rand_num = + key_gens[id]->Fetch(disposable_entries_q[id].front().second + + (persistent_ent_and_del_index[id] - + FLAGS_persistent_entries_batch_size)); + persistent_ent_and_del_index[id]++; + GenerateKeyFromInt(rand_num, FLAGS_num, &key); + // For the delete operation, everything happens here and we + // skip the rest of the for-loop, which is designed for + // inserts. + if (FLAGS_num_column_families <= 1) { + batch.Delete(key); + } else { + // We use same rand_num as seed for key and column family so + // that we can deterministically find the cfh corresponding to a + // particular key while reading the key. + batch.Delete(db_with_cfh->GetCfh(rand_num), key); + } + // A delete only includes Key+Timestamp (no value). + batch_bytes += key_size_ + user_timestamp_size_; + bytes += key_size_ + user_timestamp_size_; + num_selective_deletes++; + // Skip rest of the for-loop (j=0, jNowMicros()) && + persistent_ent_and_del_index[id] == kNumDispAndPersEntries) { + disposable_entries_q[id].pop(); + persistent_ent_and_del_index[id] = 0; + } + + // If we are deleting disposable entries, skip the rest of the + // for-loop since there is no key-value inserts at this moment in + // time. + if (skip_for_loop) { + continue; + } + + } + // If no job is in the queue, then we keep inserting disposable KV + // entries that will be deleted later by a series of deletes. + else { + rand_num = key_gens[id]->Fetch(disposable_entries_index[id]); + disposable_entries_index[id]++; + is_disposable_entry = true; + if ((disposable_entries_index[id] % + FLAGS_disposable_entries_batch_size) == 0) { + // Skip the persistent KV entries inserts for now + disposable_entries_index[id] += + FLAGS_persistent_entries_batch_size; + } + } } else { rand_num = key_gens[id]->Next(); } GenerateKeyFromInt(rand_num, FLAGS_num, &key); - Slice val = gen.Generate(); + Slice val; + if (kNumDispAndPersEntries > 0) { + random_value = rnd_disposable_entry.RandomString( + is_disposable_entry ? FLAGS_disposable_entries_value_size + : FLAGS_persistent_entries_value_size); + val = Slice(random_value); + num_unique_keys++; + } else { + val = gen.Generate(); + } if (use_blob_db_) { #ifndef ROCKSDB_LITE // Stacked BlobDB @@ -4843,6 +5021,23 @@ class Benchmark { batch_bytes += val.size() + key_size_ + user_timestamp_size_; bytes += val.size() + key_size_ + user_timestamp_size_; ++num_written; + + // If all disposable entries have been inserted, then we need to + // add in the job queue a call for 'persistent entry insertions + + // disposable entry deletions'. + if (kNumDispAndPersEntries > 0 && is_disposable_entry && + ((disposable_entries_index[id] % kNumDispAndPersEntries) == 0)) { + // Queue contains [timestamp, starting_idx], + // timestamp = current_time + delay (minimum aboslute time when to + // start inserting the selective deletes) starting_idx = index in the + // keygen of the rand_num to generate the key of the first KV entry to + // delete (= key of the first selective delete). + disposable_entries_q[id].push(std::make_pair( + FLAGS_env->NowMicros() + + FLAGS_disposable_entries_delete_delay /* timestamp */, + disposable_entries_index[id] - kNumDispAndPersEntries + /*starting idx*/)); + } if (writes_per_range_tombstone_ > 0 && num_written > writes_before_delete_range_ && (num_written - writes_before_delete_range_) / @@ -4946,9 +5141,14 @@ class Benchmark { } if ((write_mode == UNIQUE_RANDOM) && (p > 0.0)) { fprintf(stdout, - "Number of unique keys inerted: %" PRIu64 + "Number of unique keys inserted: %" PRIu64 ".\nNumber of overwrites: %" PRIu64 "\n", num_unique_keys, num_overwrites); + } else if (kNumDispAndPersEntries > 0) { + fprintf(stdout, + "Number of unique keys inserted (disposable+persistent): %" PRIu64 + ".\nNumber of 'disposable entry delete': %" PRIu64 "\n", + num_written, num_selective_deletes); } thread->stats.AddBytes(bytes); }