db_stress: generate the key based on Zipfian distribution (hot key) (#6163)

Summary:
In the current db_stress, all the keys are generated randomly and follows the uniform distribution. In order to test some corner cases that some key are always updated or read, we need to generate the key based on other distributions. In this PR, the key is generated based on Zipfian distribution and the skewness can be controlled by setting hot_key_alpha (0.8 to 1.5 is suggested). The larger hot_key_alpha is, the more skewed will be. Not that, usually, if hot_key_alpha is larger than 2, there might be only 1 or 2 keys that are generated. If hot_key_alpha is 0, it generate the key follows uniform distribution (random key)

Testing plan: pass the db_stress and printed the keys to make sure it follows the distribution.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6163

Differential Revision: D18978480

Pulled By: zhichao-cao

fbshipit-source-id: e123b4865477f7478e83fb581f9576bada334680
main
Zhichao Cao 5 years ago committed by Facebook Github Bot
parent db7c687523
commit fbda25f57a
  1. 94
      db_stress_tool/db_stress_common.cc
  2. 3
      db_stress_tool/db_stress_common.h
  3. 10
      db_stress_tool/db_stress_gflags.cc
  4. 2
      db_stress_tool/db_stress_tool.cc

@ -10,14 +10,73 @@
#ifdef GFLAGS
#include "db_stress_tool/db_stress_common.h"
#include <cmath>
rocksdb::Env* FLAGS_env = rocksdb::Env::Default();
enum rocksdb::CompressionType FLAGS_compression_type_e =
rocksdb::kSnappyCompression;
enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c;
enum RepFactory FLAGS_rep_factory = kSkipList;
std::vector<double> sum_probs(100001);
int64_t zipf_sum_size = 100000;
namespace rocksdb {
// Zipfian distribution is generated based on a pre-calculated array.
// It should be used before start the stress test.
// First, the probability distribution function (PDF) of this Zipfian follows
// power low. P(x) = 1/(x^alpha).
// So we calculate the PDF when x is from 0 to zipf_sum_size in first for loop
// and add the PDF value togetger as c. So we get the total probability in c.
// Next, we calculate inverse CDF of Zipfian and store the value of each in
// an array (sum_probs). The rank is from 0 to zipf_sum_size. For example, for
// integer k, its Zipfian CDF value is sum_probs[k].
// Third, when we need to get an integer whose probability follows Zipfian
// distribution, we use a rand_seed [0,1] which follows uniform distribution
// as a seed and search it in the sum_probs via binary search. When we find
// the closest sum_probs[i] of rand_seed, i is the integer that in
// [0, zipf_sum_size] following Zipfian distribution with parameter alpha.
// Finally, we can scale i to [0, max_key] scale.
// In order to avoid that hot keys are close to each other and skew towards 0,
// we use Rando64 to shuffle it.
void InitializeHotKeyGenerator(double alpha) {
double c = 0;
for (int64_t i = 1; i <= zipf_sum_size; i++) {
c = c + (1.0 / std::pow(static_cast<double>(i), alpha));
}
c = 1.0 / c;
sum_probs[0] = 0;
for (int64_t i = 1; i <= zipf_sum_size; i++) {
sum_probs[i] =
sum_probs[i - 1] + c / std::pow(static_cast<double>(i), alpha);
}
}
// Generate one key that follows the Zipfian distribution. The skewness
// is decided by the parameter alpha. Input is the rand_seed [0,1] and
// the max of the key to be generated. If we directly return tmp_zipf_seed,
// the closer to 0, the higher probability will be. To randomly distribute
// the hot keys in [0, max_key], we use Random64 to shuffle it.
int64_t GetOneHotKeyID(double rand_seed, int64_t max_key) {
int64_t low = 1, mid, high = zipf_sum_size, zipf = 0;
while (low <= high) {
mid = std::floor((low + high) / 2);
if (sum_probs[mid] >= rand_seed && sum_probs[mid - 1] < rand_seed) {
zipf = mid;
break;
} else if (sum_probs[mid] >= rand_seed) {
high = mid - 1;
} else {
low = mid + 1;
}
}
int64_t tmp_zipf_seed = static_cast<int64_t>(
std::floor(zipf * max_key / (static_cast<double>(zipf_sum_size))));
Random64 rand_local(tmp_zipf_seed);
return rand_local.Next() % max_key;
}
void PoolSizeChangeThread(void* v) {
assert(FLAGS_compaction_thread_pool_adjust_interval > 0);
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
@ -65,14 +124,33 @@ void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
key, sz, tmp.c_str());
}
// Note that if hot_key_alpha != 0, it generates the key based on Zipfian
// distribution. Keys are randomly scattered to [0, FLAGS_max_key]. It does
// not ensure the order of the keys being generated and the keys does not have
// the active range which is related to FLAGS_active_width.
int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) {
const double completed_ratio =
static_cast<double>(iteration) / FLAGS_ops_per_thread;
const int64_t base_key = static_cast<int64_t>(
completed_ratio * (FLAGS_max_key - FLAGS_active_width));
return base_key + thread->rand.Next() % FLAGS_active_width;
int64_t rand_seed = base_key + thread->rand.Next() % FLAGS_active_width;
int64_t cur_key = rand_seed;
if (FLAGS_hot_key_alpha != 0) {
// If set the Zipfian distribution Alpha to non 0, use Zipfian
double float_rand =
(static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
FLAGS_max_key;
cur_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
}
return cur_key;
}
// Note that if hot_key_alpha != 0, it generates the key based on Zipfian
// distribution. Keys being generated are in random order.
// If user want to generate keys based on uniform distribution, user needs to
// set hot_key_alpha == 0. It will generate the random keys in increasing
// order in the key array (ensure key[i] >= key[i+1]) and constrained in a
// range related to FLAGS_active_width.
std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
uint64_t iteration) {
const double completed_ratio =
@ -84,9 +162,17 @@ std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
int64_t next_key = base_key + thread->rand.Next() % FLAGS_active_width;
keys.push_back(next_key);
for (int i = 1; i < num_keys; ++i) {
// This may result in some duplicate keys
next_key = next_key + thread->rand.Next() %
(FLAGS_active_width - (next_key - base_key));
// Generate the key follows zipfian distribution
if (FLAGS_hot_key_alpha != 0) {
double float_rand =
(static_cast<double>(thread->rand.Next() % FLAGS_max_key)) /
FLAGS_max_key;
next_key = GetOneHotKeyID(float_rand, FLAGS_max_key);
} else {
// This may result in some duplicate keys
next_key = next_key + thread->rand.Next() %
(FLAGS_active_width - (next_key - base_key));
}
keys.push_back(next_key);
}
return keys;

@ -79,6 +79,7 @@ using GFLAGS_NAMESPACE::SetUsageMessage;
DECLARE_uint64(seed);
DECLARE_bool(read_only);
DECLARE_int64(max_key);
DECLARE_double(hot_key_alpha);
DECLARE_int32(column_families);
DECLARE_string(options_file);
DECLARE_int64(active_width);
@ -372,5 +373,7 @@ extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz);
extern StressTest* CreateCfConsistencyStressTest();
extern StressTest* CreateBatchedOpsStressTest();
extern StressTest* CreateNonBatchedOpsStressTest();
extern void InitializeHotKeyGenerator(double alpha);
extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key);
} // namespace rocksdb
#endif // GFLAGS

@ -30,6 +30,16 @@ DEFINE_int64(max_key, 1 * KB * KB,
DEFINE_int32(column_families, 10, "Number of column families");
DEFINE_double(
hot_key_alpha, 0,
"Use Zipfian distribution to generate the key "
"distribution. If it is not specified, write path will use random "
"distribution to generate the keys. The parameter is [0, double_max]). "
"However, the larger alpha is, the more shewed will be. If alpha is "
"larger than 2, it is likely that only 1 key will be accessed. The "
"Recommended value is [0.8-1.5]. The distribution is also related to "
"max_key and total iterations of generating the hot key. ");
DEFINE_string(
options_file, "",
"The path to a RocksDB options file. If specified, then db_stress will "

@ -190,6 +190,8 @@ int db_stress_tool(int argc, char** argv) {
} else {
stress.reset(CreateNonBatchedOpsStressTest());
}
// Initialize the Zipfian pre-calculated array
InitializeHotKeyGenerator(FLAGS_hot_key_alpha);
if (RunStressTest(stress.get())) {
return 0;
} else {

Loading…
Cancel
Save