diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 37bbcc485..f35378bf9 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -21,6 +21,7 @@ #endif #include #include +#include #include #include #include @@ -102,6 +103,7 @@ DEFINE_string( "compact," "compactall," "multireadrandom," + "mixgraph," "readseq," "readtocache," "readreverse," @@ -932,6 +934,52 @@ DEFINE_uint64( "If non-zero, db_bench will rate-limit the writes going into RocksDB. This " "is the global rate in bytes/second."); +// the parameters of mix_graph +DEFINE_double(key_dist_a, 0.0, + "The parameter 'a' of key access distribution model " + "f(x)=a*x^b"); +DEFINE_double(key_dist_b, 0.0, + "The parameter 'b' of key access distribution model " + "f(x)=a*x^b"); +DEFINE_double(value_theta, 0.0, + "The parameter 'theta' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(value_k, 0.0, + "The parameter 'k' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(value_sigma, 0.0, + "The parameter 'theta' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(iter_theta, 0.0, + "The parameter 'theta' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(iter_k, 0.0, + "The parameter 'k' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(iter_sigma, 0.0, + "The parameter 'sigma' of Generized Pareto Distribution " + "f(x)=(1/sigma)*(1+k*(x-theta)/sigma)^-(1/k+1)"); +DEFINE_double(mix_get_ratio, 1.0, + "The ratio of Get queries of mix_graph workload"); +DEFINE_double(mix_put_ratio, 0.0, + "The ratio of Put queries of mix_graph workload"); +DEFINE_double(mix_seek_ratio, 0.0, + "The ratio of Seek queries of mix_graph workload"); +DEFINE_int64(mix_max_scan_len, 10000, "The max scan length of Iterator"); +DEFINE_int64(mix_ave_kv_size, 512, + "The average key-value size of this workload"); +DEFINE_int64(mix_max_value_size, 1024, "The max value size of this workload"); +DEFINE_double( + sine_mix_rate_noise, 0.0, + "Add the noise ratio to the sine rate, it is between 0.0 and 1.0"); +DEFINE_bool(sine_mix_rate, false, + "Enable the sine QPS control on the mix workload"); +DEFINE_uint64( + sine_mix_rate_interval_milliseconds, 10000, + "Interval of which the sine wave read_rate_limit is recalculated"); +DEFINE_int64(mix_accesses, -1, + "The total query accesses of mix_graph workload"); + DEFINE_uint64( benchmark_read_rate_limit, 0, "If non-zero, db_bench will rate-limit the reads from RocksDB. This " @@ -2627,6 +2675,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { fprintf(stderr, "entries_per_batch = %" PRIi64 "\n", entries_per_batch_); method = &Benchmark::MultiReadRandom; + } else if (name == "mixgraph") { + method = &Benchmark::MixGraph; } else if (name == "readmissing") { ++key_size_; method = &Benchmark::ReadRandom; @@ -4536,6 +4586,249 @@ void VerifyDBFromDB(std::string& truth_db_name) { thread->stats.AddMessage(msg); } + // THe reverse function of Pareto function + int64_t ParetoCdfInversion(double u, double theta, double k, double sigma) { + double ret; + if (k == 0.0) { + ret = theta - sigma * std::log(u); + } else { + ret = theta + sigma * (std::pow(u, -1 * k) - 1) / k; + } + return static_cast(ceil(ret)); + } + // inversion of y=ax^b + int64_t PowerCdfInversion(double u, double a, double b) { + double ret; + ret = std::pow((u / a), (1 / b)); + return static_cast(ceil(ret)); + } + + // Add the noice to the QPS + double AddNoise(double origin, double noise_ratio) { + if (noise_ratio < 0.0 || noise_ratio > 1.0) { + return origin; + } + int band_int = static_cast(FLAGS_sine_a); + double delta = (rand() % band_int - band_int / 2) * noise_ratio; + if (origin + delta < 0) { + return origin; + } else { + return (origin + delta); + } + } + + // decide the query type + // 0 Get, 1 Put, 2 Seek, 3 SeekForPrev, 4 Delete, 5 SingleDelete, 6 merge + class QueryDecider { + public: + std::vector type_; + std::vector ratio_; + int range_; + + QueryDecider() {} + ~QueryDecider() {} + + Status Initiate(std::vector ratio_input) { + int range_max = 1000; + double sum = 0.0; + for (auto& ratio : ratio_input) { + sum += ratio; + } + range_ = 0; + for (auto& ratio : ratio_input) { + range_ += static_cast(ceil(range_max * (ratio / sum))); + type_.push_back(range_); + ratio_.push_back(ratio / sum); + } + return Status::OK(); + } + + int GetType(int64_t rand_num) { + if (rand_num < 0) { + rand_num = rand_num * (-1); + } + int pos = static_cast(rand_num % range_); + for (int i = 0; i < static_cast(type_.size()); i++) { + if (pos < type_[i]) { + return i; + } + } + return 0; + } + }; + + // The graph wokrload mixed with Get, Put, Iterator + void MixGraph(ThreadState* thread) { + int64_t read = 0; // including single gets and Next of iterators + int64_t gets = 0; + int64_t puts = 0; + int64_t found = 0; + int64_t seek = 0; + int64_t seek_found = 0; + int64_t bytes = 0; + int64_t value_max = FLAGS_mix_max_value_size; + int64_t scan_len_max = FLAGS_mix_max_scan_len; + double write_rate = 1000000.0; + double read_rate = 1000000.0; + std::vector ratio; + char value_buffer[2 * value_max]; + QueryDecider query; + RandomGenerator gen; + Status s; + + ReadOptions options(FLAGS_verify_checksum, true); + std::unique_ptr key_guard; + Slice key = AllocateKey(&key_guard); + PinnableSlice pinnable_val; + ratio.push_back(FLAGS_mix_get_ratio); + ratio.push_back(FLAGS_mix_put_ratio); + ratio.push_back(FLAGS_mix_seek_ratio); + query.Initiate(ratio); + + // the limit of qps initiation + if (FLAGS_sine_a != 0 || FLAGS_sine_d != 0) { + thread->shared->read_rate_limiter.reset(NewGenericRateLimiter( + read_rate, 100000 /* refill_period_us */, 10 /* fairness */, + RateLimiter::Mode::kReadsOnly)); + thread->shared->write_rate_limiter.reset( + NewGenericRateLimiter(write_rate)); + } + + Duration duration(FLAGS_duration, reads_); + while (!duration.Done(1)) { + DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread); + int64_t rand_v, key_rand, key_seed; + rand_v = GetRandomKey(&thread->rand) % FLAGS_num; + double u = static_cast(rand_v) / FLAGS_num; + key_seed = PowerCdfInversion(u, FLAGS_key_dist_a, FLAGS_key_dist_b); + Random64 rand(key_seed); + key_rand = static_cast(rand.Next()) % FLAGS_num; + GenerateKeyFromInt(key_rand, FLAGS_num, &key); + int query_type = query.GetType(rand_v); + + // change the qps + uint64_t now = FLAGS_env->NowMicros(); + uint64_t usecs_since_last; + if (now > thread->stats.GetSineInterval()) { + usecs_since_last = now - thread->stats.GetSineInterval(); + } else { + usecs_since_last = 0; + } + + if (usecs_since_last > + (FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000})) { + double usecs_since_start = + static_cast(now - thread->stats.GetStart()); + thread->stats.ResetSineInterval(); + double mix_rate_with_noise = AddNoise( + SineRate(usecs_since_start / 1000000.0), FLAGS_sine_mix_rate_noise); + read_rate = mix_rate_with_noise * (query.ratio_[0] + query.ratio_[2]); + write_rate = + mix_rate_with_noise * query.ratio_[1] * FLAGS_mix_ave_kv_size; + + thread->shared->write_rate_limiter.reset( + NewGenericRateLimiter(write_rate)); + thread->shared->read_rate_limiter.reset(NewGenericRateLimiter( + read_rate, + FLAGS_sine_mix_rate_interval_milliseconds * uint64_t{1000}, 10, + RateLimiter::Mode::kReadsOnly)); + } + // Start the query + if (query_type == 0) { + // the Get query + gets++; + read++; + if (FLAGS_num_column_families > 1) { + s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, + &pinnable_val); + } else { + pinnable_val.Reset(); + s = db_with_cfh->db->Get(options, + db_with_cfh->db->DefaultColumnFamily(), key, + &pinnable_val); + } + + if (s.ok()) { + found++; + bytes += key.size() + pinnable_val.size(); + } else if (!s.IsNotFound()) { + fprintf(stderr, "Get returned an error: %s\n", s.ToString().c_str()); + abort(); + } + + if (thread->shared->read_rate_limiter.get() != nullptr && + read % 256 == 255) { + thread->shared->read_rate_limiter->Request( + 256, Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kRead); + } + + } else if (query_type == 1) { + // the Put query + puts++; + int64_t value_size = ParetoCdfInversion( + u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma); + if (value_size < 0) { + value_size = 10; + } else if (value_size > value_max) { + value_size = value_size % value_max; + } + s = db_with_cfh->db->Put(write_options_, key, gen.Generate(value_size)); + if (!s.ok()) { + fprintf(stderr, "put error: %s\n", s.ToString().c_str()); + exit(1); + } + + if (thread->shared->write_rate_limiter) { + thread->shared->write_rate_limiter->Request( + key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/, + RateLimiter::OpType::kWrite); + } + + } else if (query_type == 2) { + // Seek query + if (db_with_cfh->db != nullptr) { + Iterator* single_iter = nullptr; + single_iter = db_with_cfh->db->NewIterator(options); + if (single_iter != nullptr) { + single_iter->Seek(key); + seek++; + read++; + if (single_iter->Valid() && single_iter->key().compare(key) == 0) { + seek_found++; + } + int64_t scan_length = + ParetoCdfInversion(u, FLAGS_iter_theta, FLAGS_iter_k, + FLAGS_iter_sigma) % + scan_len_max; + for (int64_t j = 0; j < scan_length && single_iter->Valid(); j++) { + Slice value = single_iter->value(); + memcpy(value_buffer, value.data(), + std::min(value.size(), sizeof(value_buffer))); + bytes += single_iter->key().size() + single_iter->value().size(); + single_iter->Next(); + assert(single_iter->status().ok()); + } + } + delete single_iter; + } + } + } + char msg[100]; + snprintf(msg, sizeof(msg), + "( Gets:%" PRIu64 " Puts:%" PRIu64 " Seek:%" PRIu64 " of %" PRIu64 + " in %" PRIu64 " found)\n", + gets, puts, seek, found, read); + + thread->stats.AddBytes(bytes); + thread->stats.AddMessage(msg); + + if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) { + thread->stats.AddMessage(std::string("PERF_CONTEXT:\n") + + get_perf_context()->ToString()); + } + } + void IteratorCreation(ThreadState* thread) { Duration duration(FLAGS_duration, reads_); ReadOptions options(FLAGS_verify_checksum, true);