diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index d1d681b7e..870f79044 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -10,14 +10,73 @@ #ifdef GFLAGS #include "db_stress_tool/db_stress_common.h" +#include rocksdb::Env* FLAGS_env = rocksdb::Env::Default(); enum rocksdb::CompressionType FLAGS_compression_type_e = rocksdb::kSnappyCompression; enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c; enum RepFactory FLAGS_rep_factory = kSkipList; +std::vector sum_probs(100001); +int64_t zipf_sum_size = 100000; namespace rocksdb { + +// Zipfian distribution is generated based on a pre-calculated array. +// It should be used before start the stress test. +// First, the probability distribution function (PDF) of this Zipfian follows +// power low. P(x) = 1/(x^alpha). +// So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop +// and add the PDF value togetger as c. So we get the total probability in c. +// Next, we calculate inverse CDF of Zipfian and store the value of each in +// an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for +// integer k, its Zipfian CDF value is sum_probs[k]. +// Third, when we need to get an integer whose probability follows Zipfian +// distribution, we use a rand_seed [0,1] which follows uniform distribution +// as a seed and search it in the sum_probs via binary search. When we find +// the closest sum_probs[i] of rand_seed, i is the integer that in +// [0, zipf_sum_size] following Zipfian distribution with parameter alpha. +// Finally, we can scale i to [0, max_key] scale. +// In order to avoid that hot keys are close to each other and skew towards 0, +// we use Rando64 to shuffle it. +void InitializeHotKeyGenerator(double alpha) { + double c = 0; + for (int64_t i = 1; i <= zipf_sum_size; i++) { + c = c + (1.0 / std::pow(static_cast(i), alpha)); + } + c = 1.0 / c; + + sum_probs[0] = 0; + for (int64_t i = 1; i <= zipf_sum_size; i++) { + sum_probs[i] = + sum_probs[i - 1] + c / std::pow(static_cast(i), alpha); + } +} + +// Generate one key that follows the Zipfian distribution. The skewness +// is decided by the parameter alpha. Input is the rand_seed [0,1] and +// the max of the key to be generated. If we directly return tmp_zipf_seed, +// the closer to 0, the higher probability will be. To randomly distribute +// the hot keys in [0, max_key], we use Random64 to shuffle it. +int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) { + int64_t low = 1, mid, high = zipf_sum_size, zipf = 0; + while (low <= high) { + mid = std::floor((low + high) / 2); + if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) { + zipf = mid; + break; + } else if (sum_probs[mid] >= rand_seed) { + high = mid - 1; + } else { + low = mid + 1; + } + } + int64_t tmp_zipf_seed = static_cast( + std::floor(zipf * max_key / (static_cast(zipf_sum_size)))); + Random64 rand_local(tmp_zipf_seed); + return rand_local.Next() % max_key; +} + void PoolSizeChangeThread(void* v) { assert(FLAGS_compaction_thread_pool_adjust_interval > 0); ThreadState* thread = reinterpret_cast(v); @@ -65,14 +124,33 @@ void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) { key, sz, tmp.c_str()); } +// Note that if hot_key_alpha != 0, it generates the key based on Zipfian +// distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does +// not ensure the order of the keys being generated and the keys does not have +// the active range which is related to FLAGS_active_width. int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) { const double completed_ratio = static_cast(iteration) / FLAGS_ops_per_thread; const int64_t base_key = static_cast( completed_ratio * (FLAGS_max_key - FLAGS_active_width)); - return base_key + thread->rand.Next() % FLAGS_active_width; + int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width; + int64_t cur_key = rand_seed; + if (FLAGS_hot_key_alpha != 0) { + // If set the Zipfian distribution Alpha to non 0, use Zipfian + double float_rand = + (static_cast(thread->rand.Next() % FLAGS_max_key)) / + FLAGS_max_key; + cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key); + } + return cur_key; } +// Note that if hot_key_alpha != 0, it generates the key based on Zipfian +// distribution. Keys being generated are in random order. +// If user want to generate keys based on uniform distribution, user needs to +// set hot_key_alpha == 0. It will generate the random keys in increasing +// order in the key array (ensure key[i] >= key[i+1]) and constrained in a +// range related to FLAGS_active_width. std::vector GenerateNKeys(ThreadState* thread, int num_keys, uint64_t iteration) { const double completed_ratio = @@ -84,9 +162,17 @@ std::vector GenerateNKeys(ThreadState* thread, int num_keys, int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width; keys.push_back(next_key); for (int i = 1; i < num_keys; ++i) { - // This may result in some duplicate keys - next_key = next_key + thread->rand.Next() % - (FLAGS_active_width - (next_key - base_key)); + // Generate the key follows zipfian distribution + if (FLAGS_hot_key_alpha != 0) { + double float_rand = + (static_cast(thread->rand.Next() % FLAGS_max_key)) / + FLAGS_max_key; + next_key = GetOneHotKeyID(float_rand, FLAGS_max_key); + } else { + // This may result in some duplicate keys + next_key = next_key + thread->rand.Next() % + (FLAGS_active_width - (next_key - base_key)); + } keys.push_back(next_key); } return keys; diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index cb6c36049..c1e2e0f3b 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -79,6 +79,7 @@ using GFLAGS_NAMESPACE::SetUsageMessage; DECLARE_uint64(seed); DECLARE_bool(read_only); DECLARE_int64(max_key); +DECLARE_double(hot_key_alpha); DECLARE_int32(column_families); DECLARE_string(options_file); DECLARE_int64(active_width); @@ -372,5 +373,7 @@ extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz); extern StressTest* CreateCfConsistencyStressTest(); extern StressTest* CreateBatchedOpsStressTest(); extern StressTest* CreateNonBatchedOpsStressTest(); +extern void InitializeHotKeyGenerator(double alpha); +extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key); } // namespace rocksdb #endif // GFLAGS diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index b56e0fe1b..8dd6f6b05 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -30,6 +30,16 @@ DEFINE_int64(max_key, 1 * KB * KB, DEFINE_int32(column_families, 10, "Number of column families"); +DEFINE_double( + hot_key_alpha, 0, + "Use Zipfian distribution to generate the key " + "distribution. If it is not specified, write path will use random " + "distribution to generate the keys. The parameter is [0, double_max]). " + "However, the larger alpha is, the more shewed will be. If alpha is " + "larger than 2, it is likely that only 1 key will be accessed. The " + "Recommended value is [0.8-1.5]. The distribution is also related to " + "max_key and total iterations of generating the hot key. "); + DEFINE_string( options_file, "", "The path to a RocksDB options file. If specified, then db_stress will " diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 5fb6dee00..0acc657ea 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -190,6 +190,8 @@ int db_stress_tool(int argc, char** argv) { } else { stress.reset(CreateNonBatchedOpsStressTest()); } + // Initialize the Zipfian pre-calculated array + InitializeHotKeyGenerator(FLAGS_hot_key_alpha); if (RunStressTest(stress.get())) { return 0; } else {