Write a benchmark to emulate time series data

Summary: Add a benchmark to `db_bench`. In this benchmark, a write thread will populate time series data in the format of 'id | timestamp', and multiple read threads will randomly retrieve all data from one id at a time.

Test Plan: Run the benchmark: `num=134217728;bpl=536870912;mb=67108864;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4;delay=8;stop=12;wbn=3;mbc=20;wbs=134217728;dds=0;sync=0;t=32;vs=800;bs=4096;cs=17179869184;of=500000;wps=0;si=10000000; kir=100000; dir=/data/users/jhli/test/; ./db_bench --benchmarks=timeseries --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$num --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=6 --open_files=$of --verify_checksum=1 --db=$dir --sync=$sync --disable_wal=0 --compression_type=none --stats_interval=$si --compression_ratio=1 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=0 --key_id_range=$kir`

Reviewers: andrewkr, sdong

Reviewed By: sdong

Subscribers: lgalanis, andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D60651
main
omegaga 9 years ago
parent 9ae92f50b2
commit 64046e581c
  1. 229
      tools/db_bench_tool.cc

@ -110,7 +110,8 @@ DEFINE_string(benchmarks,
"acquireload,"
"fillseekseq,"
"randomtransaction,"
"randomreplacekeys",
"randomreplacekeys,"
"timeseries",
"Comma-separated list of operations to run in the specified"
" order. Available benchmarks:\n"
@ -163,6 +164,8 @@ DEFINE_string(benchmarks,
"verify correctness\n"
"\trandomreplacekeys -- randomly replaces N keys by deleting "
"the old version and putting the new version\n\n"
"\ttimeseries -- 1 writer generates time series data "
"and multiple readers doing random reads on id\n\n"
"Meta operations:\n"
"\tcompact -- Compact the entire DB\n"
"\tstats -- Print DB stats\n"
@ -778,6 +781,24 @@ DEFINE_double(stddev, 2000.0,
"Standard deviation of normal distribution used for picking keys"
" (used in RandomReplaceKeys only).");
DEFINE_int32(key_id_range, 100000,
"Range of possible value of key id (used in TimeSeries only).");
DEFINE_string(expire_style, "none",
"Style to remove expired time entries. Can be one of the options "
"below: none (do not expired data), compaction_filter (use a "
"compaction filter to remove expired data), delete (seek IDs and "
"remove expired data) (used in TimeSeries only).");
DEFINE_uint64(
time_range, 100000,
"Range of timestamp that store in the database (used in TimeSeries"
" only).");
DEFINE_int32(num_deletion_threads, 1,
"Number of threads to do deletion (used in TimeSeries and delete "
"expire_style only).");
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
" operations on a key in the memtable");
@ -1527,6 +1548,16 @@ class Stats {
}
};
class TimestampEmulator {
private:
std::atomic<uint64_t> timestamp_;
public:
TimestampEmulator() : timestamp_(0) {}
uint64_t Get() const { return timestamp_.load(); }
void Inc() { timestamp_++; }
};
// State shared by all concurrent executions of the same benchmark.
struct SharedState {
port::Mutex mu;
@ -1806,6 +1837,38 @@ class Benchmark {
#endif
}
static bool KeyExpired(const TimestampEmulator* timestamp_emulator,
const Slice& key) {
const char* pos = key.data();
pos += 8;
uint64_t timestamp = 0;
if (port::kLittleEndian) {
int bytes_to_fill = 8;
for (int i = 0; i < bytes_to_fill; ++i) {
timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
<< ((bytes_to_fill - i - 1) << 3));
}
} else {
memcpy(&timestamp, pos, sizeof(timestamp));
}
return timestamp_emulator->Get() - timestamp > FLAGS_time_range;
}
class ExpiredTimeFilter : public CompactionFilter {
public:
explicit ExpiredTimeFilter(
const std::shared_ptr<TimestampEmulator>& timestamp_emulator)
: timestamp_emulator_(timestamp_emulator) {}
bool Filter(int level, const Slice& key, const Slice& existing_value,
std::string* new_value, bool* value_changed) const override {
return KeyExpired(timestamp_emulator_.get(), key);
}
const char* Name() const override { return "ExpiredTimeFilter"; }
private:
std::shared_ptr<TimestampEmulator> timestamp_emulator_;
};
public:
Benchmark()
: cache_(
@ -1905,7 +1968,7 @@ class Benchmark {
// Generate key according to the given specification and random number.
// The resulting key will have the following format (if keys_per_prefix_
// is positive), extra trailing bytes are either cut off or paddd with '0'.
// is positive), extra trailing bytes are either cut off or padded with '0'.
// The prefix value is derived from key value.
// ----------------------------
// | prefix 00000 | key 00000 |
@ -1962,6 +2025,7 @@ class Benchmark {
PrintHeader();
std::stringstream benchmark_stream(FLAGS_benchmarks);
std::string name;
std::unique_ptr<ExpiredTimeFilter> filter;
while (std::getline(benchmark_stream, name, ',')) {
// Sanitize parameters
num_ = FLAGS_num;
@ -2104,6 +2168,15 @@ class Benchmark {
} else if (name == "randomreplacekeys") {
fresh_db = true;
method = &Benchmark::RandomReplaceKeys;
} else if (name == "timeseries") {
timestamp_emulator_.reset(new TimestampEmulator());
if (FLAGS_expire_style == "compaction_filter") {
filter.reset(new ExpiredTimeFilter(timestamp_emulator_));
fprintf(stdout, "Compaction filter is used to remove expired data");
open_options_.compaction_filter = filter.get();
}
fresh_db = true;
method = &Benchmark::TimeSeries;
} else if (name == "stats") {
PrintStats("rocksdb.stats");
} else if (name == "levelstats") {
@ -2153,6 +2226,7 @@ class Benchmark {
private:
std::unique_ptr<Env> flashcache_aware_env_;
std::shared_ptr<TimestampEmulator> timestamp_emulator_;
struct ThreadArg {
Benchmark* bm;
@ -3403,9 +3477,9 @@ class Benchmark {
Status s;
if (write_merge == kWrite) {
s = db->Put(write_options_, key, gen.Generate(value_size_));
s = db->Put(write_options_, key, gen.Generate(value_size_));
} else {
s = db->Merge(write_options_, key, gen.Generate(value_size_));
s = db->Merge(write_options_, key, gen.Generate(value_size_));
}
if (!s.ok()) {
@ -4025,6 +4099,153 @@ class Benchmark {
thread->stats.AddMessage(msg);
}
void TimeSeriesReadOrDelete(ThreadState* thread, bool do_deletion) {
ReadOptions options(FLAGS_verify_checksum, true);
int64_t read = 0;
int64_t found = 0;
int64_t bytes = 0;
Iterator* iter = nullptr;
// Only work on single database
assert(db_.db != nullptr);
iter = db_.db->NewIterator(options);
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
char value_buffer[256];
while (true) {
{
MutexLock l(&thread->shared->mu);
if (thread->shared->num_done >= 1) {
// Write thread have finished
break;
}
}
if (!FLAGS_use_tailing_iterator) {
delete iter;
iter = db_.db->NewIterator(options);
}
// Pick a Iterator to use
int64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
GenerateKeyFromInt(key_id, FLAGS_num, &key);
// Reset last 8 bytes to 0
char* start = const_cast<char*>(key.data());
start += key.size() - 8;
memset(start, 0, 8);
++read;
bool key_found = false;
// Seek the prefix
for (iter->Seek(key); iter->Valid() && iter->key().starts_with(key);
iter->Next()) {
key_found = true;
// Copy out iterator's value to make sure we read them.
if (do_deletion) {
bytes += iter->key().size();
if (KeyExpired(timestamp_emulator_.get(), iter->key())) {
thread->stats.FinishedOps(&db_, db_.db, 1, kDelete);
db_.db->Delete(write_options_, iter->key());
} else {
break;
}
} else {
bytes += iter->key().size() + iter->value().size();
thread->stats.FinishedOps(&db_, db_.db, 1, kRead);
Slice value = iter->value();
memcpy(value_buffer, value.data(),
std::min(value.size(), sizeof(value_buffer)));
assert(iter->status().ok());
}
}
found += key_found;
}
delete iter;
char msg[100];
snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", found,
read);
thread->stats.AddBytes(bytes);
thread->stats.AddMessage(msg);
if (FLAGS_perf_level > rocksdb::PerfLevel::kDisable) {
thread->stats.AddMessage(perf_context.ToString());
}
}
void TimeSeriesWrite(ThreadState* thread) {
// Special thread that keeps writing until other threads are done.
RandomGenerator gen;
int64_t bytes = 0;
// Don't merge stats from this thread with the readers.
thread->stats.SetExcludeFromMerge();
std::unique_ptr<RateLimiter> write_rate_limiter;
if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter.reset(
NewGenericRateLimiter(FLAGS_benchmark_write_rate_limit));
}
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
Duration duration(FLAGS_duration, writes_);
while (!duration.Done(1)) {
DB* db = SelectDB(thread);
uint64_t key_id = thread->rand.Next() % FLAGS_key_id_range;
// Write key id
GenerateKeyFromInt(key_id, FLAGS_num, &key);
// Write timestamp
char* start = const_cast<char*>(key.data());
char* pos = start + 8;
int bytes_to_fill =
std::min(key_size_ - static_cast<int>(pos - start), 8);
uint64_t timestamp_value = timestamp_emulator_->Get();
if (port::kLittleEndian) {
for (int i = 0; i < bytes_to_fill; ++i) {
pos[i] = (timestamp_value >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
}
} else {
memcpy(pos, static_cast<void*>(&timestamp_value), bytes_to_fill);
}
timestamp_emulator_->Inc();
Status s;
s = db->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);
}
bytes = key.size() + value_size_;
thread->stats.FinishedOps(&db_, db_.db, 1, kWrite);
thread->stats.AddBytes(bytes);
if (FLAGS_benchmark_write_rate_limit > 0) {
write_rate_limiter->Request(
entries_per_batch_ * (value_size_ + key_size_), Env::IO_HIGH);
}
}
}
void TimeSeries(ThreadState* thread) {
if (thread->tid > 0) {
bool do_deletion = FLAGS_expire_style == "delete" &&
thread->tid <= FLAGS_num_deletion_threads;
TimeSeriesReadOrDelete(thread, do_deletion);
} else {
TimeSeriesWrite(thread);
thread->stats.Stop();
thread->stats.Report("timeseries write");
}
}
void Compact(ThreadState* thread) {
DB* db = SelectDB(thread);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);

Loading…
Cancel
Save