db_bench supports for generating random variable sized value. (#6386)

Summary:
1. `db_bench` now supports `value_size_distribution_type`, `value_size_min`, `value_size_max` options for generating random variable sized value.
2. Added `blob_db_compression_type` option for BlobDB to enable blob compression.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6386

Differential Revision: D19859406

Pulled By: zhichao-cao

fbshipit-source-id: ace52674090023fde15d832392110bf288a8e215
main
Burton Li 5 years ago committed by Facebook Github Bot
parent 3e49249d30
commit e64508917b
  1. 3
      HISTORY.md
  2. 264
      tools/db_bench_tool.cc

@ -20,7 +20,8 @@
* Disable recycle_log_file_num when an inconsistent recovery modes are requested: kPointInTimeRecovery and kAbsoluteConsistency
### New Features
* Added the checksum for each SST file generated by Flush or Compaction. Added sst_file_checksum_func to Options such that user can plugin their own SST file checksum function via override the FileChecksumFunc class. If user does not set the sst_file_checksum_func, SST file checksum calculation will not be enabled. The checksum information inlcuding uint32_t checksum value and a checksum function name (string). The checksum information is stored in FileMetadata in version store and also logged to MANIFEST. A new tool is added to LDB such that user can dump out a list of file checksum information from MANIFEST (stored in an unordered_map).
* Added the checksum for each SST file generated by Flush or Compaction. Added sst_file_checksum_func to Options such that user can plugin their own SST file checksum function via override the FileChecksumFunc class. If user does not set the sst_file_checksum_func, SST file checksum calculation will not be enabled. The checksum information inlcuding uint32_t checksum value and a checksum function name (string). The checksum information is stored in FileMetadata in version store and also logged to MANIFEST. A new tool is added to LDB such that user can dump out a list of file checksum information from MANIFEST (stored in an unordered_map).
* `db_bench` now supports `value_size_distribution_type`, `value_size_min`, `value_size_max` options for generating random variable sized value. Added `blob_db_compression_type` option for BlobDB to enable blob compression.
## 6.7.0 (01/21/2020)
### Public API Change

@ -136,7 +136,7 @@ DEFINE_string(
" key order and keep the shape of the LSM tree\n"
"\toverwrite -- overwrite N values in random key order in"
" async mode\n"
"\tfillsync -- write N/100 values in random key order in "
"\tfillsync -- write N/1000 values in random key order in "
"sync mode\n"
"\tfill100K -- write N/1000 100K values in random order in"
" async mode\n"
@ -245,7 +245,15 @@ DEFINE_int32(threads, 1, "Number of concurrent threads to run.");
DEFINE_int32(duration, 0, "Time in seconds for the random-ops tests to run."
" When 0 then num & reads determine the test duration");
DEFINE_int32(value_size, 100, "Size of each value");
DEFINE_string(value_size_distribution_type, "fixed",
"Value size distribution type: fixed, uniform, normal");
DEFINE_int32(value_size, 100, "Size of each value in fixed distribution");
static unsigned int value_size = 100;
DEFINE_int32(value_size_min, 100, "Min size of random value");
DEFINE_int32(value_size_max, 102400, "Max size of random value");
DEFINE_int32(seek_nexts, 0,
"How many times to call Next() after Seek() in "
@ -787,6 +795,11 @@ DEFINE_uint64(blob_db_file_size,
rocksdb::blob_db::BlobDBOptions().blob_file_size,
"Target size of each blob file.");
DEFINE_string(blob_db_compression_type, "snappy",
"Algorithm to use to compress blob in blob file");
static enum rocksdb::CompressionType FLAGS_blob_db_compression_type_e =
rocksdb::kSnappyCompression;
// Secondary DB instance Options
DEFINE_bool(use_secondary_db, false,
"Open a RocksDB secondary instance. A primary instance can be "
@ -1418,20 +1431,135 @@ class ReportFileOpEnv : public EnvWrapper {
} // namespace
enum DistributionType : unsigned char {
kFixed = 0,
kUniform,
kNormal
};
static enum DistributionType FLAGS_value_size_distribution_type_e = kFixed;
static enum DistributionType StringToDistributionType(const char* ctype) {
assert(ctype);
if (!strcasecmp(ctype, "fixed"))
return kFixed;
else if (!strcasecmp(ctype, "uniform"))
return kUniform;
else if (!strcasecmp(ctype, "normal"))
return kNormal;
fprintf(stdout, "Cannot parse distribution type '%s'\n", ctype);
return kFixed; // default value
}
class BaseDistribution {
public:
BaseDistribution(unsigned int min, unsigned int max) :
min_value_size_(min),
max_value_size_(max) {}
virtual ~BaseDistribution() {}
unsigned int Generate() {
auto val = Get();
if (NeedTruncate()) {
val = std::max(min_value_size_, val);
val = std::min(max_value_size_, val);
}
return val;
}
private:
virtual unsigned int Get() = 0;
virtual bool NeedTruncate() {
return true;
}
unsigned int min_value_size_;
unsigned int max_value_size_;
};
class FixedDistribution : public BaseDistribution
{
public:
FixedDistribution(unsigned int size) :
BaseDistribution(size, size),
size_(size) {}
private:
virtual unsigned int Get() override {
return size_;
}
virtual bool NeedTruncate() override {
return false;
}
unsigned int size_;
};
class NormalDistribution
: public BaseDistribution, public std::normal_distribution<double> {
public:
NormalDistribution(unsigned int min, unsigned int max) :
BaseDistribution(min, max),
// 99.7% values within the range [min, max].
std::normal_distribution<double>((double)(min + max) / 2.0 /*mean*/,
(double)(max - min) / 6.0 /*stddev*/),
gen_(rd_()) {}
private:
virtual unsigned int Get() override {
return static_cast<unsigned int>((*this)(gen_));
}
std::random_device rd_;
std::mt19937 gen_;
};
class UniformDistribution
: public BaseDistribution,
public std::uniform_int_distribution<unsigned int> {
public:
UniformDistribution(unsigned int min, unsigned int max) :
BaseDistribution(min, max),
std::uniform_int_distribution<unsigned int>(min, max),
gen_(rd_()) {}
private:
virtual unsigned int Get() override {
return (*this)(gen_);
}
virtual bool NeedTruncate() override {
return false;
}
std::random_device rd_;
std::mt19937 gen_;
};
// Helper for quickly generating random data.
class RandomGenerator {
private:
std::string data_;
unsigned int pos_;
std::unique_ptr<BaseDistribution> dist_;
public:
RandomGenerator() {
auto max_value_size = FLAGS_value_size_max;
switch (FLAGS_value_size_distribution_type_e) {
case kUniform:
dist_.reset(new UniformDistribution(FLAGS_value_size_min,
FLAGS_value_size_max));
break;
case kNormal:
dist_.reset(new NormalDistribution(FLAGS_value_size_min,
FLAGS_value_size_max));
break;
case kFixed:
default:
dist_.reset(new FixedDistribution(value_size));
max_value_size = value_size;
}
// We use a limited amount of data over and over again and ensure
// that it is larger than the compression window (32KB), and also
// large enough to serve all typical value sizes we want to write.
Random rnd(301);
std::string piece;
while (data_.size() < (unsigned)std::max(1048576, FLAGS_value_size)) {
while (data_.size() < (unsigned)std::max(1048576, max_value_size)) {
// Add a short fragment that is as compressible as specified
// by FLAGS_compression_ratio.
test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece);
@ -1449,13 +1577,9 @@ class RandomGenerator {
return Slice(data_.data() + pos_ - len, len);
}
Slice GenerateWithTTL(unsigned int len) {
assert(len <= data_.size());
if (pos_ + len > data_.size()) {
pos_ = 0;
}
pos_ += len;
return Slice(data_.data() + pos_ - len, len);
Slice Generate() {
auto len = dist_->Generate();
return Generate(len);
}
};
@ -2139,7 +2263,6 @@ class Benchmark {
DBWithColumnFamilies db_;
std::vector<DBWithColumnFamilies> multi_dbs_;
int64_t num_;
int value_size_;
int key_size_;
int prefix_size_;
int64_t keys_per_prefix_;
@ -2265,17 +2388,28 @@ class Benchmark {
void PrintHeader() {
PrintEnvironment();
fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size);
fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
FLAGS_value_size,
static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
auto avg_value_size = FLAGS_value_size;
if (FLAGS_value_size_distribution_type_e == kFixed) {
fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
avg_value_size,
static_cast<int>(avg_value_size * FLAGS_compression_ratio + 0.5));
} else {
avg_value_size = (FLAGS_value_size_min + FLAGS_value_size_max) / 2;
fprintf(stdout, "Values: %d avg bytes each (%d bytes after compression)\n",
avg_value_size,
static_cast<int>(avg_value_size * FLAGS_compression_ratio + 0.5));
fprintf(stdout, "Values Distribution: %s (min: %d, max: %d)\n",
FLAGS_value_size_distribution_type.c_str(),
FLAGS_value_size_min, FLAGS_value_size_max);
}
fprintf(stdout, "Entries: %" PRIu64 "\n", num_);
fprintf(stdout, "Prefix: %d bytes\n", FLAGS_prefix_size);
fprintf(stdout, "Keys per prefix: %" PRIu64 "\n", keys_per_prefix_);
fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
((static_cast<int64_t>(FLAGS_key_size + FLAGS_value_size) * num_)
((static_cast<int64_t>(FLAGS_key_size + avg_value_size) * num_)
/ 1048576.0));
fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
(((FLAGS_key_size + FLAGS_value_size * FLAGS_compression_ratio)
(((FLAGS_key_size + avg_value_size * FLAGS_compression_ratio)
* num_)
/ 1048576.0));
fprintf(stdout, "Write rate: %" PRIu64 " bytes/second\n",
@ -2479,7 +2613,6 @@ class Benchmark {
: nullptr),
prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)),
num_(FLAGS_num),
value_size_(FLAGS_value_size),
key_size_(FLAGS_key_size),
prefix_size_(FLAGS_prefix_size),
keys_per_prefix_(FLAGS_keys_per_prefix),
@ -2702,7 +2835,7 @@ class Benchmark {
reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads);
writes_ = (FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes);
deletes_ = (FLAGS_deletes < 0 ? FLAGS_num : FLAGS_deletes);
value_size_ = FLAGS_value_size;
value_size = FLAGS_value_size;
key_size_ = FLAGS_key_size;
entries_per_batch_ = FLAGS_batch_size;
writes_before_delete_range_ = FLAGS_writes_before_delete_range;
@ -2804,7 +2937,7 @@ class Benchmark {
} else if (name == "fill100K") {
fresh_db = true;
num_ /= 1000;
value_size_ = 100 * 1000;
value_size = 100 * 1000;
method = &Benchmark::WriteRandom;
} else if (name == "readseq") {
method = &Benchmark::ReadSequential;
@ -4016,6 +4149,7 @@ class Benchmark {
blob_db_options.min_blob_size = FLAGS_blob_db_min_blob_size;
blob_db_options.bytes_per_sync = FLAGS_blob_db_bytes_per_sync;
blob_db_options.blob_file_size = FLAGS_blob_db_file_size;
blob_db_options.compression = FLAGS_blob_db_compression_type_e;
blob_db::BlobDB* ptr = nullptr;
s = blob_db::BlobDB::Open(options, blob_db_options, db_name, &ptr);
if (s.ok()) {
@ -4211,23 +4345,14 @@ class Benchmark {
size_t id = thread->rand.Next() % num_key_gens;
DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id);
batch.Clear();
if (thread->shared->write_rate_limiter.get() != nullptr) {
thread->shared->write_rate_limiter->Request(
entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
nullptr /* stats */, RateLimiter::OpType::kWrite);
// Set time at which last op finished to Now() to hide latency and
// sleep from rate limiter. Also, do the check once per batch, not
// once per write.
thread->stats.ResetLastOpTime();
}
int64_t batch_bytes = 0;
for (int64_t j = 0; j < entries_per_batch_; j++) {
int64_t rand_num = key_gens[id]->Next();
GenerateKeyFromInt(rand_num, FLAGS_num, &key);
Slice val = gen.Generate();
if (use_blob_db_) {
#ifndef ROCKSDB_LITE
Slice val = gen.Generate(value_size_);
blob_db::BlobDB* blobdb =
static_cast<blob_db::BlobDB*>(db_with_cfh->db);
if (FLAGS_blob_db_max_ttl_range > 0) {
@ -4238,15 +4363,16 @@ class Benchmark {
}
#endif // ROCKSDB_LITE
} else if (FLAGS_num_column_families <= 1) {
batch.Put(key, gen.Generate(value_size_));
batch.Put(key, val);
} else {
// We use same rand_num as seed for key and column family so that we
// can deterministically find the cfh corresponding to a particular
// key while reading the key.
batch.Put(db_with_cfh->GetCfh(rand_num), key,
gen.Generate(value_size_));
val);
}
bytes += value_size_ + key_size_;
batch_bytes += val.size() + key_size_;
bytes += val.size() + key_size_;
++num_written;
if (writes_per_range_tombstone_ > 0 &&
num_written > writes_before_delete_range_ &&
@ -4293,6 +4419,15 @@ class Benchmark {
}
}
}
if (thread->shared->write_rate_limiter.get() != nullptr) {
thread->shared->write_rate_limiter->Request(
batch_bytes, Env::IO_HIGH,
nullptr /* stats */, RateLimiter::OpType::kWrite);
// Set time at which last op finished to Now() to hide latency and
// sleep from rate limiter. Also, do the check once per batch, not
// once per write.
thread->stats.ResetLastOpTime();
}
if (!use_blob_db_) {
s = db_with_cfh->db->Write(write_options_, &batch);
}
@ -5346,16 +5481,16 @@ class Benchmark {
} else if (query_type == 1) {
// the Put query
puts++;
int64_t value_size = ParetoCdfInversion(
int64_t val_size = ParetoCdfInversion(
u, FLAGS_value_theta, FLAGS_value_k, FLAGS_value_sigma);
if (value_size < 0) {
value_size = 10;
} else if (value_size > value_max) {
value_size = value_size % value_max;
if (val_size < 0) {
val_size = 10;
} else if (val_size > value_max) {
val_size = val_size % value_max;
}
s = db_with_cfh->db->Put(
write_options_, key,
gen.Generate(static_cast<unsigned int>(value_size)));
gen.Generate(static_cast<unsigned int>(val_size)));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -5363,7 +5498,7 @@ class Benchmark {
if (thread->shared->write_rate_limiter) {
thread->shared->write_rate_limiter->Request(
key.size() + value_size, Env::IO_HIGH, nullptr /*stats*/,
key.size() + val_size, Env::IO_HIGH, nullptr /*stats*/,
RateLimiter::OpType::kWrite);
}
thread->stats.FinishedOps(db_with_cfh, db_with_cfh->db, 1, kWrite);
@ -5657,10 +5792,11 @@ class Benchmark {
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Status s;
Slice val = gen.Generate();
if (write_merge == kWrite) {
s = db->Put(write_options_, key, gen.Generate(value_size_));
s = db->Put(write_options_, key, val);
} else {
s = db->Merge(write_options_, key, gen.Generate(value_size_));
s = db->Merge(write_options_, key, val);
}
written++;
@ -5668,12 +5804,12 @@ class Benchmark {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
exit(1);
}
bytes += key.size() + value_size_;
bytes += key.size() + val.size();
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter->Request(
entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
key.size() + val.size(), Env::IO_HIGH,
nullptr /* stats */, RateLimiter::OpType::kWrite);
}
}
@ -5845,7 +5981,7 @@ class Benchmark {
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
Status s = PutMany(db, write_options_, key, gen.Generate(value_size_));
Status s = PutMany(db, write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "putmany error: %s\n", s.ToString().c_str());
exit(1);
@ -5913,7 +6049,7 @@ class Benchmark {
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
Status s = db->Put(write_options_, key, gen.Generate(value_size_));
Status s = db->Put(write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
@ -5959,16 +6095,17 @@ class Benchmark {
if (thread->shared->write_rate_limiter) {
thread->shared->write_rate_limiter->Request(
key.size() + value_size_, Env::IO_HIGH, nullptr /*stats*/,
key.size() + value.size(), Env::IO_HIGH, nullptr /*stats*/,
RateLimiter::OpType::kWrite);
}
Status s = db->Put(write_options_, key, gen.Generate(value_size_));
Slice val = gen.Generate();
Status s = db->Put(write_options_, key, val);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
bytes += key.size() + value_size_;
bytes += key.size() + val.size();
thread->stats.FinishedOps(nullptr, db, 1, kUpdate);
}
char msg[100];
@ -6007,7 +6144,7 @@ class Benchmark {
exit(1);
}
Slice value = gen.Generate(value_size_);
Slice value = gen.Generate(static_cast<unsigned int>(existing_value.size()));
std::string new_value;
if (status.ok()) {
@ -6062,7 +6199,7 @@ class Benchmark {
}
// Update the value (by appending data)
Slice operand = gen.Generate(value_size_);
Slice operand = gen.Generate();
if (value.size() > 0) {
// Use a delimiter to match the semantics for StringAppendOperator
value.append(1,',');
@ -6109,21 +6246,22 @@ class Benchmark {
GenerateKeyFromInt(key_rand, merge_keys_, &key);
Status s;
Slice val = gen.Generate();
if (FLAGS_num_column_families > 1) {
s = db_with_cfh->db->Merge(write_options_,
db_with_cfh->GetCfh(key_rand), key,
gen.Generate(value_size_));
val);
} else {
s = db_with_cfh->db->Merge(write_options_,
db_with_cfh->db->DefaultColumnFamily(), key,
gen.Generate(value_size_));
val);
}
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
}
bytes += key.size() + value_size_;
bytes += key.size() + val.size();
thread->stats.FinishedOps(nullptr, db_with_cfh->db, 1, kMerge);
}
@ -6161,7 +6299,7 @@ class Benchmark {
bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
if (do_merge) {
Status s = db->Merge(write_options_, key, gen.Generate(value_size_));
Status s = db->Merge(write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "merge error: %s\n", s.ToString().c_str());
exit(1);
@ -6437,7 +6575,7 @@ class Benchmark {
DB* db = SelectDB(thread);
for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
s = db->Put(write_options_, key, gen.Generate(value_size_));
s = db->Put(write_options_, key, gen.Generate());
if (!s.ok()) {
fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
exit(1);
@ -6604,20 +6742,20 @@ class Benchmark {
timestamp_emulator_->Inc();
Status s;
s = db->Put(write_options_, key, gen.Generate(value_size_));
Slice val = gen.Generate();
s = db->Put(write_options_, key, val);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
bytes = key.size() + value_size_;
bytes = key.size() + val.size();
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
thread->stats.AddBytes(bytes);
if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter->Request(
entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH,
key.size() + val.size(), Env::IO_HIGH,
nullptr /* stats */, RateLimiter::OpType::kWrite);
}
}
@ -6798,6 +6936,9 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_compression_type_e =
StringToCompressionType(FLAGS_compression_type.c_str());
FLAGS_blob_db_compression_type_e =
StringToCompressionType(FLAGS_blob_db_compression_type.c_str());
#ifndef ROCKSDB_LITE
if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) {
fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n");
@ -6834,6 +6975,9 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_compaction_fadvice.c_str());
}
FLAGS_value_size_distribution_type_e =
StringToDistributionType(FLAGS_value_size_distribution_type.c_str());
FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
// Note options sanitization may increase thread pool sizes according to

Loading…
Cancel
Save