Introducing deletes for stress test

Summary: Stress test modified to do deletes and later verify them

Test Plan: running the test: db_stress

Reviewers: dhruba, heyongqiang, asad, sheki, MarkCallaghan

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D6567
main
amayank 12 years ago
parent 391885c4e4
commit 9e97bfdcde
  1. 90
      tools/db_stress.cc

@ -98,8 +98,11 @@ static int FLAGS_level0_stop_writes_trigger = 12;
// Number of files in level-0 that will slow down writes. // Number of files in level-0 that will slow down writes.
static int FLAGS_level0_slowdown_writes_trigger = 8; static int FLAGS_level0_slowdown_writes_trigger = 8;
// Ratio of reads to writes (expressed as a percentage) // Ratio of reads to total workload (expressed as a percentage)
static unsigned int FLAGS_readwritepercent = 10; static unsigned int FLAGS_readpercent = 10;
// Ratio of deletes to total workload (expressed as a percentage)
static unsigned int FLAGS_delpercent = 30;
// Option to disable compation triggered by read. // Option to disable compation triggered by read.
static int FLAGS_disable_seek_compaction = false; static int FLAGS_disable_seek_compaction = false;
@ -133,6 +136,7 @@ class Stats {
double seconds_; double seconds_;
long done_; long done_;
long writes_; long writes_;
long deletes_;
int next_report_; int next_report_;
size_t bytes_; size_t bytes_;
double last_op_finish_; double last_op_finish_;
@ -146,6 +150,7 @@ class Stats {
hist_.Clear(); hist_.Clear();
done_ = 0; done_ = 0;
writes_ = 0; writes_ = 0;
deletes_ = 0;
bytes_ = 0; bytes_ = 0;
seconds_ = 0; seconds_ = 0;
start_ = FLAGS_env->NowMicros(); start_ = FLAGS_env->NowMicros();
@ -157,6 +162,7 @@ class Stats {
hist_.Merge(other.hist_); hist_.Merge(other.hist_);
done_ += other.done_; done_ += other.done_;
writes_ += other.writes_; writes_ += other.writes_;
deletes_ += other.deletes_;
bytes_ += other.bytes_; bytes_ += other.bytes_;
seconds_ += other.seconds_; seconds_ += other.seconds_;
if (other.start_ < start_) start_ = other.start_; if (other.start_ < start_) start_ = other.start_;
@ -199,6 +205,10 @@ class Stats {
bytes_ += n; bytes_ += n;
} }
void AddOneDelete() {
deletes_ ++;
}
void Report(const char* name) { void Report(const char* name) {
std::string extra; std::string extra;
if (bytes_ < 1 || done_ < 1) { if (bytes_ < 1 || done_ < 1) {
@ -216,6 +226,7 @@ class Stats {
seconds_ * 1e6 / done_, (long)throughput); seconds_ * 1e6 / done_, (long)throughput);
fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n", fprintf(stdout, "%-12s: Wrote %.2f MB (%.2f MB/sec) (%ld%% of %ld ops)\n",
"", bytes_mb, rate, (100*writes_)/done_, done_); "", bytes_mb, rate, (100*writes_)/done_, done_);
fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_);
if (FLAGS_histogram) { if (FLAGS_histogram) {
fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str()); fprintf(stdout, "Microseconds per op:\n%s\n", hist_.ToString().c_str());
@ -282,7 +293,7 @@ class SharedState {
num_initialized_++; num_initialized_++;
} }
void IncPopulated() { void IncOperated() {
num_populated_++; num_populated_++;
} }
@ -294,7 +305,7 @@ class SharedState {
return num_initialized_ >= num_threads_; return num_initialized_ >= num_threads_;
} }
bool AllPopulated() const { bool AllOperated() const {
return num_populated_ >= num_threads_; return num_populated_ >= num_threads_;
} }
@ -330,6 +341,10 @@ class SharedState {
return values_[key]; return values_[key];
} }
void Delete(long key) const {
values_[key] = SENTINEL;
}
uint32_t GetSeed() const { uint32_t GetSeed() const {
return seed_; return seed_;
} }
@ -405,8 +420,8 @@ class StressTest {
FLAGS_env->StartThread(ThreadBody, threads[i]); FLAGS_env->StartThread(ThreadBody, threads[i]);
} }
// Each thread goes through the following states: // Each thread goes through the following states:
// initializing -> wait for others to init -> populate // initializing -> wait for others to init -> read/populate/depopulate
// wait for others to populate -> verify -> done // wait for others to operate -> verify -> done
{ {
MutexLock l(shared.GetMutex()); MutexLock l(shared.GetMutex());
@ -414,10 +429,11 @@ class StressTest {
shared.GetCondVar()->Wait(); shared.GetCondVar()->Wait();
} }
fprintf(stdout, "Starting to populate db\n"); fprintf(stdout, "Starting database operations\n");
shared.SetStart(); shared.SetStart();
shared.GetCondVar()->SignalAll(); shared.GetCondVar()->SignalAll();
while (!shared.AllPopulated()) { while (!shared.AllOperated()) {
shared.GetCondVar()->Wait(); shared.GetCondVar()->Wait();
} }
@ -438,7 +454,7 @@ class StressTest {
delete threads[i]; delete threads[i];
threads[i] = NULL; threads[i] = NULL;
} }
fprintf(stdout, "Verification successfull\n"); fprintf(stdout, "Verification successful\n");
PrintStatistics(); PrintStatistics();
} }
@ -458,13 +474,12 @@ class StressTest {
shared->GetCondVar()->Wait(); shared->GetCondVar()->Wait();
} }
} }
thread->shared->GetStressTest()->OperateDb(thread);
thread->shared->GetStressTest()->PopulateDb(thread);
{ {
MutexLock l(shared->GetMutex()); MutexLock l(shared->GetMutex());
shared->IncPopulated(); shared->IncOperated();
if (shared->AllPopulated()) { if (shared->AllOperated()) {
shared->GetCondVar()->SignalAll(); shared->GetCondVar()->SignalAll();
} }
while (!shared->VerifyStarted()) { while (!shared->VerifyStarted()) {
@ -484,10 +499,10 @@ class StressTest {
} }
void PopulateDb(ThreadState* thread) { void OperateDb(ThreadState* thread) {
ReadOptions read_opts(FLAGS_verify_checksum, true); ReadOptions read_opts(FLAGS_verify_checksum, true);
WriteOptions write_opts; WriteOptions write_opts;
char value[100], prev_value[100]; char value[100];
long max_key = thread->shared->GetMaxKey(); long max_key = thread->shared->GetMaxKey();
std::string from_db; std::string from_db;
if (FLAGS_sync) { if (FLAGS_sync) {
@ -496,21 +511,31 @@ class StressTest {
write_opts.disableWAL = FLAGS_disable_wal; write_opts.disableWAL = FLAGS_disable_wal;
thread->stats.Start(); thread->stats.Start();
for (long i=0; i < FLAGS_ops_per_thread; i++) { for (long i = 0; i < FLAGS_ops_per_thread; i++) {
long rand_key = thread->rand.Next() % max_key; long rand_key = thread->rand.Next() % max_key;
Slice key((char*)&rand_key, sizeof(rand_key)); Slice key((char*)&rand_key, sizeof(rand_key));
if (FLAGS_readwritepercent > thread->rand.Uniform(100)) { //Read:10%;Delete:30%;Write:60%
// introduce some read load. unsigned int probability_operation = thread->rand.Uniform(100);
if (probability_operation < FLAGS_readpercent) {
// read load
db_->Get(read_opts, key, &from_db); db_->Get(read_opts, key, &from_db);
} else if (probability_operation < FLAGS_delpercent + FLAGS_readpercent) {
//introduce delete load
{
MutexLock l(thread->shared->GetMutexForKey(rand_key));
thread->shared->Delete(rand_key);
db_->Delete(write_opts, key);
}
thread->stats.AddOneDelete();
} else { } else {
// write load
uint32_t value_base = thread->rand.Next(); uint32_t value_base = thread->rand.Next();
size_t sz = GenerateValue(value_base, value, sizeof(value)); size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz); Slice v(value, sz);
{ {
MutexLock l(thread->shared->GetMutexForKey(rand_key)); MutexLock l(thread->shared->GetMutexForKey(rand_key));
if (FLAGS_verify_before_write) { if (FLAGS_verify_before_write) {
VerifyValue(rand_key, read_opts, *(thread->shared), prev_value, VerifyValue(rand_key, read_opts, *(thread->shared), &from_db, true);
sizeof(prev_value), &from_db, true);
} }
thread->shared->Put(rand_key, value_base); thread->shared->Put(rand_key, value_base);
db_->Put(write_opts, key, v); db_->Put(write_opts, key, v);
@ -525,12 +550,11 @@ class StressTest {
void VerifyDb(const SharedState &shared, long start) const { void VerifyDb(const SharedState &shared, long start) const {
ReadOptions options(FLAGS_verify_checksum, true); ReadOptions options(FLAGS_verify_checksum, true);
char value[100];
long max_key = shared.GetMaxKey(); long max_key = shared.GetMaxKey();
long step = shared.GetNumThreads(); long step = shared.GetNumThreads();
for (long i = start; i < max_key; i+= step) { for (long i = start; i < max_key; i+= step) {
std::string from_db; std::string from_db;
VerifyValue(i, options, shared, value, sizeof(value), &from_db); VerifyValue(i, options, shared, &from_db, true);
if (from_db.length()) { if (from_db.length()) {
PrintKeyValue(i, from_db.data(), from_db.length()); PrintKeyValue(i, from_db.data(), from_db.length());
} }
@ -538,15 +562,16 @@ class StressTest {
} }
void VerificationAbort(std::string msg, long key) const { void VerificationAbort(std::string msg, long key) const {
fprintf(stderr, "Verification failed for key %ld: %s\n", fprintf(stderr, "Verification failed for key %ld: %s\n",
key, msg.c_str()); key, msg.c_str());
exit(1); exit(1);
} }
void VerifyValue(long key, const ReadOptions &opts, const SharedState &shared, void VerifyValue(long key, const ReadOptions &opts, const SharedState &shared,
char *value, size_t value_sz,
std::string *value_from_db, bool strict=false) const { std::string *value_from_db, bool strict=false) const {
Slice k((char*)&key, sizeof(key)); Slice k((char*)&key, sizeof(key));
char value[100];
size_t value_sz = 0;
uint32_t value_base = shared.Get(key); uint32_t value_base = shared.Get(key);
if (value_base == SharedState::SENTINEL && !strict) { if (value_base == SharedState::SENTINEL && !strict) {
return; return;
@ -594,7 +619,8 @@ class StressTest {
kMajorVersion, kMinorVersion); kMajorVersion, kMinorVersion);
fprintf(stdout, "Number of threads : %d\n", FLAGS_threads); fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
fprintf(stdout, "Ops per thread : %d\n", FLAGS_ops_per_thread); fprintf(stdout, "Ops per thread : %d\n", FLAGS_ops_per_thread);
fprintf(stdout, "Read percentage : %d\n", FLAGS_readwritepercent); fprintf(stdout, "Read percentage : %d\n", FLAGS_readpercent);
fprintf(stdout, "Delete percentage : %d\n", FLAGS_delpercent);
fprintf(stdout, "Max key : %ld\n", FLAGS_max_key); fprintf(stdout, "Max key : %ld\n", FLAGS_max_key);
fprintf(stdout, "Num keys per lock : %d\n", fprintf(stdout, "Num keys per lock : %d\n",
1 << FLAGS_log2_keys_per_lock); 1 << FLAGS_log2_keys_per_lock);
@ -733,9 +759,12 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) { (n == 0 || n == 1)) {
FLAGS_sync = n; FLAGS_sync = n;
} else if (sscanf(argv[i], "--readwritepercent=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--readpercent=%d%c", &n, &junk) == 1 &&
(n > 0 || n < 100)) { (n >= 0 && n <= 100)) {
FLAGS_readwritepercent = n; FLAGS_readpercent = n;
} else if (sscanf(argv[i], "--delpercent=%d%c", &n, &junk) == 1 &&
(n >= 0 && n <= 100)) {
FLAGS_delpercent = n;
} else if (sscanf(argv[i], "--disable_data_sync=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--disable_data_sync=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) { (n == 0 || n == 1)) {
FLAGS_disable_data_sync = n; FLAGS_disable_data_sync = n;
@ -787,6 +816,11 @@ int main(int argc, char** argv) {
} }
} }
if ((FLAGS_readpercent + FLAGS_delpercent) > 100) {
fprintf(stderr, "Error: Read + Delete percents > 100!\n");
exit(1);
}
// Choose a location for the test database if none given with --db=<path> // Choose a location for the test database if none given with --db=<path>
if (FLAGS_db == NULL) { if (FLAGS_db == NULL) {
leveldb::Env::Default()->GetTestDirectory(&default_db_path); leveldb::Env::Default()->GetTestDirectory(&default_db_path);

Loading…
Cancel
Save