db_bench cleanup

Summary:
clean up the db_bench a little bit. also avoid allocating memory for key
in the loop

Test Plan:
I verified a run with filluniquerandom & readrandom. Iterator seek will be used lot
to measure performance. Will fix whatever comes up

Reviewers: haobo, igor, yhchiang

Reviewed By: igor

CC: leveldb

Differential Revision: https://reviews.facebook.net/D17559
main
Lei Jin 10 years ago
parent beeee9dccc
commit 0c1126d4cf
  1. 625
      db/db_bench.cc
  2. 7
      port/port_example.h
  3. 4
      port/port_posix.h

@ -42,7 +42,6 @@
DEFINE_string(benchmarks,
"fillseq,"
"fillsync,"
"fillrandom,"
@ -53,6 +52,7 @@ DEFINE_string(benchmarks,
"readreverse,"
"compact,"
"readrandom,"
"multireadrandom,"
"readseq,"
"readtocache,"
"readreverse,"
@ -64,8 +64,7 @@ DEFINE_string(benchmarks,
"crc32c,"
"compress,"
"uncompress,"
"acquireload,"
"fillfromstdin,",
"acquireload,",
"Comma-separated list of operations to run in the specified order"
"Actual benchmarks:\n"
@ -129,16 +128,8 @@ DEFINE_int64(merge_keys, -1,
DEFINE_int64(reads, -1, "Number of read operations to do. "
"If negative, do FLAGS_num reads.");
DEFINE_int64(read_range, 1, "When ==1 reads use ::Get, when >1 reads use"
" an iterator");
DEFINE_bool(use_prefix_blooms, false, "Whether to place prefixes in blooms");
DEFINE_int32(bloom_locality, 0, "Control bloom filter probes locality");
DEFINE_bool(use_prefix_api, false, "Whether to set ReadOptions.prefix for"
" prefixscanrandom. If true, use_prefix_blooms must also be true.");
DEFINE_int64(seed, 0, "Seed base for random number generators. "
"When 0 it is deterministic.");
@ -278,12 +269,6 @@ DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
DEFINE_string(wal_dir, "", "If not empty, use the given dir for WAL");
DEFINE_bool(use_snapshot, false, "If true, create a snapshot per query when"
" randomread benchmark is used");
DEFINE_bool(get_approx, false, "If true, call GetApproximateSizes per query"
" when read_range is > 1 and randomread benchmark is used");
DEFINE_int32(num_levels, 7, "The total number of levels");
DEFINE_int32(target_file_size_base, 2 * 1048576, "Target file size at level-1");
@ -461,20 +446,9 @@ DEFINE_string(compaction_fadvice, "NORMAL",
static auto FLAGS_compaction_fadvice_e =
rocksdb::Options().access_hint_on_compaction_start;
DEFINE_bool(use_multiget, false,
"Use multiget to access a series of keys instead of get");
DEFINE_bool(use_tailing_iterator, false,
"Use tailing iterator to access a series of keys instead of get");
DEFINE_int64(keys_per_multiget, 90, "If use_multiget is true, determines number"
" of keys to group per call Arbitrary default is good because it"
" agrees with readwritepercent");
// TODO: Apply this flag to generic Get calls too. Currently only with Multiget
DEFINE_bool(warn_missing_keys, true, "Print a message to user when a key is"
" missing in a Get/MultiGet call");
DEFINE_bool(use_adaptive_mutex, rocksdb::Options().use_adaptive_mutex,
"Use adaptive mutex");
@ -798,7 +772,7 @@ class Duration {
start_at_ = FLAGS_env->NowMicros();
}
bool Done(int increment) {
bool Done(int64_t increment) {
if (increment <= 0) increment = 1; // avoid Done(0) and infinite loops
ops_ += increment;
@ -834,7 +808,7 @@ class Benchmark {
int key_size_;
int prefix_size_;
int64_t keys_per_prefix_;
int entries_per_batch_;
int64_t entries_per_batch_;
WriteOptions write_options_;
int64_t reads_;
int64_t writes_;
@ -1062,6 +1036,10 @@ class Benchmark {
delete prefix_extractor_;
}
Slice AllocateKey() {
return Slice(new char[key_size_], key_size_);
}
// Generate key according to the given specification and random number.
// The resulting key will have the following format (if keys_per_prefix_
// is positive), extra trailing bytes are either cut off or paddd with '0'.
@ -1074,10 +1052,8 @@ class Benchmark {
// ----------------------------
// | key 00000 |
// ----------------------------
std::string GenerateKeyFromInt(uint64_t v, int64_t num_keys) {
std::string key;
key.resize(key_size_);
char* start = &(key[0]);
void GenerateKeyFromInt(uint64_t v, int64_t num_keys, Slice* key) {
char* start = const_cast<char*>(key->data());
char* pos = start;
if (keys_per_prefix_ > 0) {
int64_t num_prefix = num_keys / keys_per_prefix_;
@ -1109,8 +1085,6 @@ class Benchmark {
if (key_size_ > pos - start) {
memset(pos, '0', key_size_ - (pos - start));
}
return key;
}
void Run() {
@ -1155,9 +1129,6 @@ class Benchmark {
} else if (name == Slice("fillrandom")) {
fresh_db = true;
method = &Benchmark::WriteRandom;
} else if (name == Slice("fillfromstdin")) {
fresh_db = true;
method = &Benchmark::WriteFromStdin;
} else if (name == Slice("filluniquerandom")) {
fresh_db = true;
if (num_threads > 1) {
@ -1189,19 +1160,18 @@ class Benchmark {
method = &Benchmark::ReadReverse;
} else if (name == Slice("readrandom")) {
method = &Benchmark::ReadRandom;
} else if (name == Slice("multireadrandom")) {
method = &Benchmark::MultiReadRandom;
} else if (name == Slice("readmissing")) {
method = &Benchmark::ReadMissing;
++key_size_;
method = &Benchmark::ReadRandom;
} else if (name == Slice("newiterator")) {
method = &Benchmark::IteratorCreation;
} else if (name == Slice("seekrandom")) {
method = &Benchmark::SeekRandom;
} else if (name == Slice("readhot")) {
method = &Benchmark::ReadHot;
} else if (name == Slice("readrandomsmall")) {
reads_ /= 1000;
method = &Benchmark::ReadRandom;
} else if (name == Slice("prefixscanrandom")) {
method = &Benchmark::PrefixScanRandom;
} else if (name == Slice("deleteseq")) {
method = &Benchmark::DeleteSeq;
} else if (name == Slice("deleterandom")) {
@ -1215,10 +1185,9 @@ class Benchmark {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.ToString().c_str());
method = nullptr;
} else {
method = &Benchmark::ReadRandomMergeRandom;
exit(1);
}
method = &Benchmark::ReadRandomMergeRandom;
} else if (name == Slice("updaterandom")) {
method = &Benchmark::UpdateRandom;
} else if (name == Slice("appendrandom")) {
@ -1227,10 +1196,9 @@ class Benchmark {
if (FLAGS_merge_operator.empty()) {
fprintf(stdout, "%-12s : skipped (--merge_operator is unknown)\n",
name.ToString().c_str());
method = nullptr;
} else {
method = &Benchmark::MergeRandom;
exit(1);
}
method = &Benchmark::MergeRandom;
} else if (name == Slice("randomwithverify")) {
method = &Benchmark::RandomWithVerify;
} else if (name == Slice("compact")) {
@ -1243,8 +1211,6 @@ class Benchmark {
method = &Benchmark::Compress;
} else if (name == Slice("uncompress")) {
method = &Benchmark::Uncompress;
} else if (name == Slice("heapprofile")) {
HeapProfile();
} else if (name == Slice("stats")) {
PrintStats("rocksdb.stats");
} else if (name == Slice("levelstats")) {
@ -1254,6 +1220,7 @@ class Benchmark {
} else {
if (name != Slice()) { // No error message for empty name
fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
exit(1);
}
}
@ -1540,7 +1507,7 @@ class Benchmark {
options.compaction_style = FLAGS_compaction_style_e;
options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_;
if (FLAGS_use_plain_table || FLAGS_use_prefix_blooms) {
if (FLAGS_use_plain_table) {
options.prefix_extractor.reset(
NewFixedPrefixTransform(FLAGS_prefix_size));
}
@ -1715,54 +1682,6 @@ class Benchmark {
DoWrite(thread, UNIQUE_RANDOM);
}
void writeOrFail(WriteBatch& batch) {
Status s = db_->Write(write_options_, &batch);
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
}
void WriteFromStdin(ThreadState* thread) {
size_t count = 0;
WriteBatch batch;
const size_t bufferLen = 32 << 20;
unique_ptr<char[]> line = unique_ptr<char[]>(new char[bufferLen]);
char* linep = line.get();
const int batchSize = 100 << 10;
const char columnSeparator = '\t';
const char lineSeparator = '\n';
while (fgets(linep, bufferLen, stdin) != nullptr) {
++count;
char* tab = std::find(linep, linep + bufferLen, columnSeparator);
if (tab == linep + bufferLen) {
fprintf(stderr, "[Error] No Key delimiter TAB at line %zu\n", count);
continue;
}
Slice key(linep, tab - linep);
tab++;
char* endLine = std::find(tab, linep + bufferLen, lineSeparator);
if (endLine == linep + bufferLen) {
fprintf(stderr, "[Error] No ENTER at end of line # %zu\n", count);
continue;
}
Slice value(tab, endLine - tab);
thread->stats.FinishedSingleOp(db_);
thread->stats.AddBytes(endLine - linep - 1);
if (batch.Count() < batchSize) {
batch.Put(key, value);
continue;
}
writeOrFail(batch);
batch.Clear();
}
if (batch.Count() > 0) {
writeOrFail(batch);
}
}
void DoWrite(ThreadState* thread, WriteMode write_mode) {
const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0;
const int64_t num_ops = writes_ == 0 ? num_ : writes_;
@ -1783,10 +1702,13 @@ class Benchmark {
WriteBatch batch;
Status s;
int64_t bytes = 0;
int i = 0;
int64_t i = 0;
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
while (!duration.Done(entries_per_batch_)) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
for (int64_t j = 0; j < entries_per_batch_; j++) {
int64_t k = 0;
switch(write_mode) {
case SEQUENTIAL:
@ -1825,9 +1747,9 @@ class Benchmark {
break;
}
};
std::string key = GenerateKeyFromInt(k, FLAGS_num);
GenerateKeyFromInt(k, FLAGS_num, &key);
batch.Put(key, gen.Generate(value_size_));
bytes += value_size_ + key.size();
bytes += value_size_ + key_size_;
thread->stats.FinishedSingleOp(db_);
}
s = db_->Write(write_options_, &batch);
@ -1866,135 +1788,22 @@ class Benchmark {
thread->stats.AddBytes(bytes);
}
// Calls MultiGet over a list of keys from a random distribution.
// Returns the total number of keys found.
long MultiGetRandom(ReadOptions& options, int num_keys,
Random64* rand, int64_t range, const char* suffix) {
assert(num_keys > 0);
std::vector<Slice> keys(num_keys);
std::vector<std::string> values(num_keys);
std::vector<std::string> gen_keys(num_keys);
int i;
int64_t k;
// Fill the keys vector
for(i=0; i<num_keys; ++i) {
k = rand->Next() % range;
gen_keys[i] = GenerateKeyFromInt(k, range) + suffix;
keys[i] = gen_keys[i];
}
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
// Apply the operation
std::vector<Status> statuses = db_->MultiGet(options, keys, &values);
assert((long)statuses.size() == num_keys);
assert((long)keys.size() == num_keys); // Should always be the case.
assert((long)values.size() == num_keys);
if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
options.snapshot = nullptr;
}
// Count number found
long found = 0;
for(i=0; i<num_keys; ++i) {
if (statuses[i].ok()){
++found;
} else if (FLAGS_warn_missing_keys == true) {
// Key not found, or error.
fprintf(stderr, "get error: %s\n", statuses[i].ToString().c_str());
}
}
return found;
}
void ReadRandom(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
Duration duration(FLAGS_duration, reads_);
int64_t found = 0;
int64_t read = 0;
if (FLAGS_use_multiget) { // MultiGet
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = reads_;
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
read += num_keys;
found +=
MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, "");
thread->stats.FinishedSingleOp(db_);
keys_left -= num_keys;
}
} else if (FLAGS_use_tailing_iterator) { // use tailing iterator for gets
options.tailing = true;
Iterator* iter = db_->NewIterator(options);
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
iter->Seek(key);
read++;
if (iter->Valid() && iter->key().compare(Slice(key)) == 0) {
found++;
}
thread->stats.FinishedSingleOp(db_);
}
delete iter;
} else { // Regular case. Do one "get" at a time Get
options.tailing = true;
options.prefix_seek = (FLAGS_prefix_size == 0);
Iterator* iter = db_->NewIterator(options);
std::string value;
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
if (FLAGS_read_range < 2) {
read++;
if (db_->Get(options, key, &value).ok()) {
found++;
}
} else {
int count = 1;
if (FLAGS_get_approx) {
std::string key2 =
GenerateKeyFromInt(k + static_cast<int>(FLAGS_read_range),
FLAGS_num + FLAGS_read_range);
Range range(key, key2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
read += FLAGS_read_range;
for (iter->Seek(key);
iter->Valid() && count <= FLAGS_read_range;
++count, iter->Next()) {
found++;
}
}
if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
options.snapshot = nullptr;
}
int64_t found = 0;
ReadOptions options(FLAGS_verify_checksum, true);
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
std::string value;
thread->stats.FinishedSingleOp(db_);
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
read++;
if (db_->Get(options, key, &value).ok()) {
found++;
}
delete iter;
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
@ -2008,113 +1817,41 @@ class Benchmark {
}
}
void PrefixScanRandom(ThreadState* thread) {
if (FLAGS_use_prefix_api) {
assert(FLAGS_use_prefix_blooms);
assert(FLAGS_bloom_bits >= 1);
}
ReadOptions options(FLAGS_verify_checksum, true);
Duration duration(FLAGS_duration, reads_);
// Calls MultiGet over a list of keys from a random distribution.
// Returns the total number of keys found.
void MultiReadRandom(ThreadState* thread) {
int64_t read = 0;
int64_t found = 0;
while (!duration.Done(1)) {
std::string value;
const int k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
Slice skey(key);
Slice prefix = prefix_extractor_->Transform(skey);
options.prefix = FLAGS_use_prefix_api ? &prefix : nullptr;
Iterator* iter = db_->NewIterator(options);
for (iter->Seek(skey);
iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
found++;
}
delete iter;
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, reads_);
thread->stats.AddMessage(msg);
}
void ReadMissing(ThreadState* thread) {
FLAGS_warn_missing_keys = false; // Never warn about missing keys
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
if (FLAGS_use_multiget) {
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = reads_;
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
long found;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
found =
MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, ".");
// We should not find any key since the key we try to get has a
// different suffix
if (found) {
assert(false);
}
thread->stats.FinishedSingleOp(db_);
keys_left -= num_keys;
}
} else { // Regular case (not MultiGet)
std::string value;
Status s;
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num) + ".";
s = db_->Get(options, key, &value);
assert(!s.ok() && s.IsNotFound());
thread->stats.FinishedSingleOp(db_);
}
std::vector<Slice> keys(entries_per_batch_);
std::vector<std::string> values(entries_per_batch_);
while (keys.size() < entries_per_batch_) {
keys.push_back(AllocateKey());
}
}
void ReadHot(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
const int64_t range = (FLAGS_num + 99) / 100;
int64_t found = 0;
if (FLAGS_use_multiget) {
const int64_t kpg = FLAGS_keys_per_multiget; // keys per multiget group
int64_t keys_left = reads_;
// Recalculate number of keys per group, and call MultiGet until done
long num_keys;
while(num_keys = std::min(keys_left, kpg), !duration.Done(num_keys)) {
found += MultiGetRandom(options, num_keys, &thread->rand, range, "");
thread->stats.FinishedSingleOp(db_);
keys_left -= num_keys;
while (!duration.Done(1)) {
for (int64_t i = 0; i < entries_per_batch_; ++i) {
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num,
FLAGS_num, &keys[i]);
}
} else {
std::string value;
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % range;
std::string key = GenerateKeyFromInt(k, range);
if (db_->Get(options, key, &value).ok()) {
std::vector<Status> statuses = db_->MultiGet(options, keys, &values);
assert(statuses.size() == entries_per_batch_);
read += entries_per_batch_;
for (int64_t i = 0; i < entries_per_batch_; ++i) {
if (statuses[i].ok()) {
++found;
}
thread->stats.FinishedSingleOp(db_);
}
}
for (auto& k : keys) {
delete k.data();
}
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, reads_);
found, read);
thread->stats.AddMessage(msg);
}
@ -2129,44 +1866,53 @@ class Benchmark {
}
void SeekRandom(ThreadState* thread) {
Duration duration(FLAGS_duration, reads_);
ReadOptions options(FLAGS_verify_checksum, true);
std::string value;
int64_t read = 0;
int64_t found = 0;
ReadOptions options(FLAGS_verify_checksum, true);
options.tailing = FLAGS_use_tailing_iterator;
auto* iter = db_->NewIterator(options);
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
Duration duration(FLAGS_duration, reads_);
while (!duration.Done(1)) {
Iterator* iter = db_->NewIterator(options);
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
iter->Seek(key);
if (iter->Valid() && iter->key() == Slice(key)) found++;
delete iter;
read++;
if (iter->Valid() && iter->key().compare(key) == 0) {
found++;
}
thread->stats.FinishedSingleOp(db_);
}
delete iter;
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)",
found, num_);
found, read);
thread->stats.AddMessage(msg);
}
void DoDelete(ThreadState* thread, bool seq) {
WriteBatch batch;
Status s;
Duration duration(seq ? 0 : FLAGS_duration, num_);
long i = 0;
int64_t i = 0;
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
while (!duration.Done(entries_per_batch_)) {
batch.Clear();
for (int j = 0; j < entries_per_batch_; j++) {
const int64_t k = seq ? i+j : (thread->rand.Next() % FLAGS_num);
std::string key = GenerateKeyFromInt(k, FLAGS_num);
for (int64_t j = 0; j < entries_per_batch_; ++j) {
const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num);
GenerateKeyFromInt(k, FLAGS_num, &key);
batch.Delete(key);
thread->stats.FinishedSingleOp(db_);
}
s = db_->Write(write_options_, &batch);
auto s = db_->Write(write_options_, &batch);
if (!s.ok()) {
fprintf(stderr, "del error: %s\n", s.ToString().c_str());
exit(1);
}
++i;
i += entries_per_batch_;
}
}
@ -2197,6 +1943,9 @@ class Benchmark {
// Don't merge stats from this thread with the readers.
thread->stats.SetExcludeFromMerge();
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
while (true) {
{
MutexLock l(&thread->shared->mu);
@ -2206,8 +1955,7 @@ class Benchmark {
}
}
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
@ -2235,7 +1983,7 @@ class Benchmark {
// Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V)
// in DB atomically i.e in a single batch. Also refer GetMany.
Status PutMany(const WriteOptions& writeoptions,
const Slice& key, const Slice& value) {
const Slice& key, const Slice& value) {
std::string suffixes[3] = {"2", "1", "0"};
std::string keys[3];
@ -2273,7 +2021,7 @@ class Benchmark {
// in the same snapshot, and verifies that all the values are identical.
// ASSUMES that PutMany was used to put (K, V) into the DB.
Status GetMany(const ReadOptions& readoptions,
const Slice& key, std::string* value) {
const Slice& key, std::string* value) {
std::string suffixes[3] = {"0", "1", "2"};
std::string keys[3];
Slice key_slices[3];
@ -2328,16 +2076,19 @@ class Benchmark {
int64_t puts_done = 0;
int64_t deletes_done = 0;
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
// the number of iterations is the larger of read_ or write_
for (int64_t i = 0; i < readwrites_; i++) {
const int64_t k = thread->rand.Next() % (FLAGS_numdistinct);
std::string key = GenerateKeyFromInt(k, FLAGS_numdistinct);
if (get_weight == 0 && put_weight == 0 && delete_weight == 0) {
// one batch completed, reinitialize for next batch
get_weight = FLAGS_readwritepercent;
delete_weight = FLAGS_deletepercent;
put_weight = 100 - get_weight - delete_weight;
}
GenerateKeyFromInt(thread->rand.Next() % FLAGS_numdistinct,
FLAGS_numdistinct, &key);
if (get_weight > 0) {
// do all the gets first
Status s = GetMany(options, key, &value);
@ -2383,12 +2134,6 @@ class Benchmark {
// This is different from ReadWhileWriting because it does not use
// an extra thread.
void ReadRandomWriteRandom(ThreadState* thread) {
if (FLAGS_use_multiget){
// Separate function for multiget (for ease of reading)
ReadRandomWriteRandomMultiGet(thread);
return;
}
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
std::string value;
@ -2399,28 +2144,18 @@ class Benchmark {
int64_t writes_done = 0;
Duration duration(FLAGS_duration, readwrites_);
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
if (get_weight == 0 && put_weight == 0) {
// one batch completed, reinitialize for next batch
get_weight = FLAGS_readwritepercent;
put_weight = 100 - get_weight;
}
if (get_weight > 0) {
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
if (FLAGS_get_approx) {
std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1);
Range range(key, key2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
// do all the gets first
Status s = db_->Get(options, key, &value);
if (!s.ok() && !s.IsNotFound()) {
@ -2430,14 +2165,8 @@ class Benchmark {
} else if (!s.IsNotFound()) {
found++;
}
get_weight--;
reads_done++;
if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
}
} else if (put_weight > 0) {
// then do all the corresponding number of puts
// for all the gets we have done earlier
@ -2458,82 +2187,6 @@ class Benchmark {
thread->stats.AddMessage(msg);
}
// ReadRandomWriteRandom (with multiget)
// Does FLAGS_keys_per_multiget reads (per multiget), followed by some puts.
// FLAGS_readwritepercent will specify the ratio of gets to puts.
// e.g.: If FLAGS_keys_per_multiget == 100 and FLAGS_readwritepercent == 75
// Then each block will do 100 multigets and 33 puts
// So there are 133 operations in-total: 100 of them (75%) are gets, and 33
// of them (25%) are puts.
void ReadRandomWriteRandomMultiGet(ThreadState* thread) {
ReadOptions options(FLAGS_verify_checksum, true);
RandomGenerator gen;
// For multiget
const long& kpg = FLAGS_keys_per_multiget; // keys per multiget group
long keys_left = readwrites_; // number of keys still left to read
long num_keys; // number of keys to read in current group
long num_put_keys; // number of keys to put in current group
int64_t found = 0;
int64_t reads_done = 0;
int64_t writes_done = 0;
int64_t multigets_done = 0;
// the number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while(true) {
// Read num_keys keys, then write num_put_keys keys.
// The ratio of num_keys to num_put_keys is always FLAGS_readwritepercent
// And num_keys is set to be FLAGS_keys_per_multiget (kpg)
// num_put_keys is calculated accordingly (to maintain the ratio)
// Note: On the final iteration, num_keys and num_put_keys will be smaller
num_keys = std::min(keys_left*(FLAGS_readwritepercent + 99)/100, kpg);
num_put_keys = num_keys * (100-FLAGS_readwritepercent)
/ FLAGS_readwritepercent;
// This will break the loop when duration is complete
if (duration.Done(num_keys + num_put_keys)) {
break;
}
// A quick check to make sure our formula doesn't break on edge cases
assert(num_keys >= 1);
assert(num_keys + num_put_keys <= keys_left);
// Apply the MultiGet operations
found += MultiGetRandom(options, num_keys, &thread->rand, FLAGS_num, "");
++multigets_done;
reads_done+=num_keys;
thread->stats.FinishedSingleOp(db_);
// Now do the puts
int i;
int64_t k;
for(i=0; i<num_put_keys; ++i) {
k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
Status s = db_->Put(write_options_, key,
gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
writes_done++;
thread->stats.FinishedSingleOp(db_);
}
keys_left -= (num_keys + num_put_keys);
}
char msg[100];
snprintf(msg, sizeof(msg),
"( reads:%" PRIu64 " writes:%" PRIu64 " total:%" PRIu64 \
" multiget_ops:%" PRIu64 " found:%" PRIu64 ")",
reads_done, writes_done, readwrites_, multigets_done, found);
thread->stats.AddMessage(msg);
}
//
// Read-modify-write for random keys
void UpdateRandom(ThreadState* thread) {
@ -2543,30 +2196,16 @@ class Benchmark {
int64_t found = 0;
Duration duration(FLAGS_duration, readwrites_);
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
// the number of iterations is the larger of read_ or write_
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
if (FLAGS_get_approx) {
std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1);
Range range(key, key2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
if (db_->Get(options, key, &value).ok()) {
found++;
}
if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
}
Status s = db_->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
@ -2589,22 +2228,12 @@ class Benchmark {
std::string value;
int64_t found = 0;
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % FLAGS_num;
std::string key = GenerateKeyFromInt(k, FLAGS_num);
if (FLAGS_use_snapshot) {
options.snapshot = db_->GetSnapshot();
}
if (FLAGS_get_approx) {
std::string key2 = GenerateKeyFromInt(k + 1, FLAGS_num + 1);
Range range(key, key2);
uint64_t sizes;
db_->GetApproximateSizes(&range, 1, &sizes);
}
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
// Get the existing value
if (db_->Get(options, key, &value).ok()) {
@ -2614,10 +2243,6 @@ class Benchmark {
value.clear();
}
if (FLAGS_use_snapshot) {
db_->ReleaseSnapshot(options.snapshot);
}
// Update the value (by appending data)
Slice operand = gen.Generate(value_size_);
if (value.size() > 0) {
@ -2634,6 +2259,7 @@ class Benchmark {
}
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg), "( updates:%" PRIu64 " found:%" PRIu64 ")",
readwrites_, found);
@ -2653,11 +2279,12 @@ class Benchmark {
void MergeRandom(ThreadState* thread) {
RandomGenerator gen;
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
// The number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % merge_keys_;
std::string key = GenerateKeyFromInt(k, merge_keys_);
GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
Status s = db_->Merge(write_options_, key, gen.Generate(value_size_));
@ -2690,12 +2317,12 @@ class Benchmark {
int64_t num_merges = 0;
size_t max_length = 0;
Slice key = AllocateKey();
std::unique_ptr<const char[]> key_guard(key.data());
// the number of iterations is the larger of read_ or write_
Duration duration(FLAGS_duration, readwrites_);
while (!duration.Done(1)) {
const int64_t k = thread->rand.Next() % merge_keys_;
std::string key = GenerateKeyFromInt(k, merge_keys_);
GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key);
bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent;
@ -2727,6 +2354,7 @@ class Benchmark {
thread->stats.FinishedSingleOp(db_);
}
char msg[100];
snprintf(msg, sizeof(msg),
"(reads:%" PRIu64 " merges:%" PRIu64 " total:%" PRIu64 " hits:%" \
@ -2735,7 +2363,6 @@ class Benchmark {
thread->stats.AddMessage(msg);
}
void Compact(ThreadState* thread) {
db_->CompactRange(nullptr, nullptr);
}
@ -2747,28 +2374,6 @@ class Benchmark {
}
fprintf(stdout, "\n%s\n", stats.c_str());
}
static void WriteToFile(void* arg, const char* buf, int n) {
reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n));
}
void HeapProfile() {
char fname[100];
EnvOptions soptions;
snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db.c_str(),
++heap_counter_);
unique_ptr<WritableFile> file;
Status s = FLAGS_env->NewWritableFile(fname, &file, soptions);
if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
return;
}
bool ok = port::GetHeapProfile(WriteToFile, file.get());
if (!ok) {
fprintf(stderr, "heap profiling not supported\n");
FLAGS_env->DeleteFile(fname);
}
}
};
} // namespace rocksdb

@ -127,13 +127,6 @@ extern bool Snappy_GetUncompressedLength(const char* input, size_t length,
extern bool Snappy_Uncompress(const char* input_data, size_t input_length,
char* output);
// ------------------ Miscellaneous -------------------
// If heap profiling is not supported, returns false.
// Else repeatedly calls (*func)(arg, data, n) and then returns true.
// The concatenation of all "data[0,n-1]" fragments is the heap profile.
extern bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg);
} // namespace port
} // namespace rocksdb

@ -476,10 +476,6 @@ inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input,
return false;
}
inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) {
return false;
}
#define CACHE_LINE_SIZE 64U
} // namespace port

Loading…
Cancel
Save