diff --git a/db/db_bench.cc b/db/db_bench.cc index e203f6b68..0ea723d88 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -157,6 +157,9 @@ static bool ValidateKeySize(const char* flagname, int32_t value) { DEFINE_int32(key_size, 16, "size of each key"); +DEFINE_int32(num_multi_db, 0, + "Number of DBs used in the benchmark. 0 means single DB."); + DEFINE_double(compression_ratio, 0.5, "Arrange to generate values that shrink" " to this fraction of their original size after compression"); @@ -814,6 +817,7 @@ class Benchmark { const FilterPolicy* filter_policy_; const SliceTransform* prefix_extractor_; DB* db_; + std::vector multi_dbs_; int64_t num_; int value_size_; int key_size_; @@ -1096,6 +1100,10 @@ class Benchmark { } } + std::string GetDbNameForMultiple(std::string base_name, size_t id) { + return base_name + std::to_string(id); + } + void Run() { PrintHeader(); Open(); @@ -1245,11 +1253,18 @@ class Benchmark { name.ToString().c_str()); method = nullptr; } else { - delete db_; - db_ = nullptr; - DestroyDB(FLAGS_db, Options()); - Open(); + if (db_ != nullptr) { + delete db_; + db_ = nullptr; + DestroyDB(FLAGS_db, Options()); + } + for (size_t i = 0; i < multi_dbs_.size(); i++) { + delete multi_dbs_[i]; + DestroyDB(GetDbNameForMultiple(FLAGS_db, i), Options()); + } + multi_dbs_.clear(); } + Open(); } if (method != nullptr) { @@ -1666,19 +1681,32 @@ class Benchmark { FLAGS_universal_compression_size_percent; } + if (FLAGS_num_multi_db <= 1) { + OpenDb(options, FLAGS_db, &db_); + } else { + multi_dbs_.clear(); + for (size_t i = 0; i < FLAGS_num_multi_db; i++) { + DB* db; + OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &db); + multi_dbs_.push_back(db); + } + } + if (FLAGS_min_level_to_compress >= 0) { + options.compression_per_level.clear(); + } + } + + void OpenDb(Options options, std::string db_name, DB** db) { Status s; if(FLAGS_readonly) { - s = DB::OpenForReadOnly(options, FLAGS_db, &db_); + s = DB::OpenForReadOnly(options, db_name, db); } else { - s = DB::Open(options, FLAGS_db, &db_); + s = DB::Open(options, db_name, db); } if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); exit(1); } - if (FLAGS_min_level_to_compress >= 0) { - options.compression_per_level.clear(); - } } enum WriteMode { @@ -1740,13 +1768,27 @@ class Benchmark { std::vector values_; }; + DB* SelectDB(ThreadState* thread) { + if (db_ != nullptr) { + return db_; + } else { + return multi_dbs_[thread->rand.Next() % multi_dbs_.size()]; + } + } void DoWrite(ThreadState* thread, WriteMode write_mode) { const int test_duration = write_mode == RANDOM ? FLAGS_duration : 0; const int64_t num_ops = writes_ == 0 ? num_ : writes_; - Duration duration(test_duration, num_ops); - KeyGenerator key_gen(&(thread->rand), write_mode, num_ops); + size_t num_key_gens = 1; + if (db_ == nullptr) { + num_key_gens = multi_dbs_.size(); + } + std::vector> key_gens(num_key_gens); + Duration duration(test_duration, num_ops * num_key_gens); + for (size_t i = 0; i < num_key_gens; i++) { + key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode, num_ops)); + } if (num_ != FLAGS_num) { char msg[100]; @@ -1762,14 +1804,20 @@ class Benchmark { Slice key = AllocateKey(); std::unique_ptr key_guard(key.data()); while (!duration.Done(entries_per_batch_)) { + size_t id = 0; + DB* db_to_write = db_; + if (db_to_write == nullptr) { + id = thread->rand.Next() % num_key_gens; + db_to_write = multi_dbs_[id]; + } batch.Clear(); for (int64_t j = 0; j < entries_per_batch_; j++) { - GenerateKeyFromInt(key_gen.Next(), FLAGS_num, &key); + GenerateKeyFromInt(key_gens[id]->Next(), FLAGS_num, &key); batch.Put(key, gen.Generate(value_size_)); bytes += value_size_ + key_size_; - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedSingleOp(db_to_write); } - s = db_->Write(write_options_, &batch); + s = db_to_write->Write(write_options_, &batch); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -1779,12 +1827,22 @@ class Benchmark { } void ReadSequential(ThreadState* thread) { - Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); + if (db_ != nullptr) { + ReadSequential(thread, db_); + } else { + for (DB* db : multi_dbs_) { + ReadSequential(thread, db); + } + } + } + + void ReadSequential(ThreadState* thread, DB* db) { + Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); int64_t i = 0; int64_t bytes = 0; for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedSingleOp(db); ++i; } delete iter; @@ -1792,7 +1850,17 @@ class Benchmark { } void ReadReverse(ThreadState* thread) { - Iterator* iter = db_->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); + if (db_ != nullptr) { + ReadReverse(thread, db_); + } else { + for (DB* db : multi_dbs_) { + ReadReverse(thread, db); + } + } + } + + void ReadReverse(ThreadState* thread, DB* db) { + Iterator* iter = db->NewIterator(ReadOptions(FLAGS_verify_checksum, true)); int64_t i = 0; int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { @@ -1814,9 +1882,10 @@ class Benchmark { Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { + DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); read++; - if (db_->Get(options, key, &value).ok()) { + if (db->Get(options, key, &value).ok()) { found++; } thread->stats.FinishedSingleOp(db_); @@ -1847,11 +1916,12 @@ class Benchmark { Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { + DB* db = SelectDB(thread); for (int64_t i = 0; i < entries_per_batch_; ++i) { GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &keys[i]); } - std::vector statuses = db_->MultiGet(options, keys, &values); + std::vector statuses = db->MultiGet(options, keys, &values); assert(statuses.size() == entries_per_batch_); read += entries_per_batch_; @@ -1876,9 +1946,10 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); options.prefix_seek = (FLAGS_prefix_size > 0); while (!duration.Done(1)) { - Iterator* iter = db_->NewIterator(options); + DB* db = SelectDB(thread); + Iterator* iter = db->NewIterator(options); delete iter; - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedSingleOp(db); } } @@ -1896,21 +1967,40 @@ class Benchmark { ReadOptions options(FLAGS_verify_checksum, true); options.tailing = FLAGS_use_tailing_iterator; options.prefix_seek = (FLAGS_prefix_size > 0); - auto* iter = db_->NewIterator(options); + + Iterator* single_iter = nullptr; + std::vector multi_iters; + if (db_ != nullptr) { + single_iter = db_->NewIterator(options); + } else { + for (DB* db : multi_dbs_) { + multi_iters.push_back(db->NewIterator(options)); + } + } + Slice key = AllocateKey(); std::unique_ptr key_guard(key.data()); Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { + // Pick a Iterator to use + Iterator* iter_to_use = single_iter; + if (single_iter == nullptr) { + iter_to_use = multi_iters[thread->rand.Next() % multi_iters.size()]; + } + GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); - iter->Seek(key); + iter_to_use->Seek(key); read++; - if (iter->Valid() && iter->key().compare(key) == 0) { + if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) { found++; } thread->stats.FinishedSingleOp(db_); } - delete iter; + delete single_iter; + for (auto iter : multi_iters) { + delete iter; + } char msg[100]; snprintf(msg, sizeof(msg), "(%" PRIu64 " of %" PRIu64 " found)", @@ -1934,14 +2024,15 @@ class Benchmark { std::unique_ptr key_guard(key.data()); while (!duration.Done(entries_per_batch_)) { + DB* db = SelectDB(thread); batch.Clear(); for (int64_t j = 0; j < entries_per_batch_; ++j) { const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num); GenerateKeyFromInt(k, FLAGS_num, &key); batch.Delete(key); - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedSingleOp(db); } - auto s = db_->Write(write_options_, &batch); + auto s = db->Write(write_options_, &batch); if (!s.ok()) { fprintf(stderr, "del error: %s\n", s.ToString().c_str()); exit(1); @@ -1986,6 +2077,7 @@ class Benchmark { std::unique_ptr key_guard(key.data()); while (true) { + DB* db = SelectDB(thread); { MutexLock l(&thread->shared->mu); if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { @@ -1995,7 +2087,7 @@ class Benchmark { } GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); - Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); + Status s = db->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2020,8 +2112,8 @@ class Benchmark { // Given a key K and value V, this puts (K+"0", V), (K+"1", V), (K+"2", V) // in DB atomically i.e in a single batch. Also refer GetMany. - Status PutMany(const WriteOptions& writeoptions, - const Slice& key, const Slice& value) { + Status PutMany(DB* db, const WriteOptions& writeoptions, const Slice& key, + const Slice& value) { std::string suffixes[3] = {"2", "1", "0"}; std::string keys[3]; @@ -2032,15 +2124,15 @@ class Benchmark { batch.Put(keys[i], value); } - s = db_->Write(writeoptions, &batch); + s = db->Write(writeoptions, &batch); return s; } // Given a key K, this deletes (K+"0", V), (K+"1", V), (K+"2", V) // in DB atomically i.e in a single batch. Also refer GetMany. - Status DeleteMany(const WriteOptions& writeoptions, - const Slice& key) { + Status DeleteMany(DB* db, const WriteOptions& writeoptions, + const Slice& key) { std::string suffixes[3] = {"1", "2", "0"}; std::string keys[3]; @@ -2051,26 +2143,26 @@ class Benchmark { batch.Delete(keys[i]); } - s = db_->Write(writeoptions, &batch); + s = db->Write(writeoptions, &batch); return s; } // Given a key K and value V, this gets values for K+"0", K+"1" and K+"2" // in the same snapshot, and verifies that all the values are identical. // ASSUMES that PutMany was used to put (K, V) into the DB. - Status GetMany(const ReadOptions& readoptions, - const Slice& key, std::string* value) { + Status GetMany(DB* db, const ReadOptions& readoptions, const Slice& key, + std::string* value) { std::string suffixes[3] = {"0", "1", "2"}; std::string keys[3]; Slice key_slices[3]; std::string values[3]; ReadOptions readoptionscopy = readoptions; - readoptionscopy.snapshot = db_->GetSnapshot(); + readoptionscopy.snapshot = db->GetSnapshot(); Status s; for (int i = 0; i < 3; i++) { keys[i] = key.ToString() + suffixes[i]; key_slices[i] = keys[i]; - s = db_->Get(readoptionscopy, key_slices[i], value); + s = db->Get(readoptionscopy, key_slices[i], value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "get error: %s\n", s.ToString().c_str()); values[i] = ""; @@ -2082,7 +2174,7 @@ class Benchmark { values[i] = *value; } } - db_->ReleaseSnapshot(readoptionscopy.snapshot); + db->ReleaseSnapshot(readoptionscopy.snapshot); if ((values[0] != values[1]) || (values[1] != values[2])) { fprintf(stderr, "inconsistent values for key %s: %s, %s, %s\n", @@ -2119,6 +2211,7 @@ class Benchmark { // the number of iterations is the larger of read_ or write_ for (int64_t i = 0; i < readwrites_; i++) { + DB* db = SelectDB(thread); if (get_weight == 0 && put_weight == 0 && delete_weight == 0) { // one batch completed, reinitialize for next batch get_weight = FLAGS_readwritepercent; @@ -2129,7 +2222,7 @@ class Benchmark { FLAGS_numdistinct, &key); if (get_weight > 0) { // do all the gets first - Status s = GetMany(options, key, &value); + Status s = GetMany(db, options, key, &value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "getmany error: %s\n", s.ToString().c_str()); // we continue after error rather than exiting so that we can @@ -2142,7 +2235,7 @@ class Benchmark { } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier - Status s = PutMany(write_options_, key, gen.Generate(value_size_)); + Status s = PutMany(db, write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "putmany error: %s\n", s.ToString().c_str()); exit(1); @@ -2150,7 +2243,7 @@ class Benchmark { put_weight--; puts_done++; } else if (delete_weight > 0) { - Status s = DeleteMany(write_options_, key); + Status s = DeleteMany(db, write_options_, key); if (!s.ok()) { fprintf(stderr, "deletemany error: %s\n", s.ToString().c_str()); exit(1); @@ -2187,6 +2280,7 @@ class Benchmark { // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { + DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); if (get_weight == 0 && put_weight == 0) { // one batch completed, reinitialize for next batch @@ -2195,7 +2289,7 @@ class Benchmark { } if (get_weight > 0) { // do all the gets first - Status s = db_->Get(options, key, &value); + Status s = db->Get(options, key, &value); if (!s.ok() && !s.IsNotFound()) { fprintf(stderr, "get error: %s\n", s.ToString().c_str()); // we continue after error rather than exiting so that we can @@ -2208,7 +2302,7 @@ class Benchmark { } else if (put_weight > 0) { // then do all the corresponding number of puts // for all the gets we have done earlier - Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); + Status s = db->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2216,7 +2310,7 @@ class Benchmark { put_weight--; writes_done++; } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedSingleOp(db); } char msg[100]; snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \ @@ -2238,18 +2332,19 @@ class Benchmark { std::unique_ptr key_guard(key.data()); // the number of iterations is the larger of read_ or write_ while (!duration.Done(1)) { + DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); - if (db_->Get(options, key, &value).ok()) { + if (db->Get(options, key, &value).ok()) { found++; } - Status s = db_->Put(write_options_, key, gen.Generate(value_size_)); + Status s = db->Put(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedSingleOp(db); } char msg[100]; snprintf(msg, sizeof(msg), @@ -2271,10 +2366,11 @@ class Benchmark { // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { + DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); // Get the existing value - if (db_->Get(options, key, &value).ok()) { + if (db->Get(options, key, &value).ok()) { found++; } else { // If not existing, then just assume an empty string of data @@ -2290,7 +2386,7 @@ class Benchmark { value.append(operand.data(), operand.size()); // Write back to the database - Status s = db_->Put(write_options_, key, value); + Status s = db->Put(write_options_, key, value); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -2322,9 +2418,10 @@ class Benchmark { // The number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { + DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key); - Status s = db_->Merge(write_options_, key, gen.Generate(value_size_)); + Status s = db->Merge(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); @@ -2360,12 +2457,13 @@ class Benchmark { // the number of iterations is the larger of read_ or write_ Duration duration(FLAGS_duration, readwrites_); while (!duration.Done(1)) { + DB* db = SelectDB(thread); GenerateKeyFromInt(thread->rand.Next() % merge_keys_, merge_keys_, &key); bool do_merge = int(thread->rand.Next() % 100) < FLAGS_mergereadpercent; if (do_merge) { - Status s = db_->Merge(write_options_, key, gen.Generate(value_size_)); + Status s = db->Merge(write_options_, key, gen.Generate(value_size_)); if (!s.ok()) { fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); exit(1); @@ -2374,7 +2472,7 @@ class Benchmark { num_merges++; } else { - Status s = db_->Get(options, key, &value); + Status s = db->Get(options, key, &value); if (value.length() > max_length) max_length = value.length(); @@ -2402,12 +2500,25 @@ class Benchmark { } void Compact(ThreadState* thread) { - db_->CompactRange(nullptr, nullptr); + DB* db = SelectDB(thread); + db->CompactRange(nullptr, nullptr); } void PrintStats(const char* key) { + if (db_ != nullptr) { + PrintStats(db_, key, false); + } + for (DB* db : multi_dbs_) { + PrintStats(db, key, true); + } + } + + void PrintStats(DB* db, const char* key, bool print_header = false) { + if (print_header) { + fprintf(stdout, "\n==== DB: %s ===\n", db->GetName().c_str()); + } std::string stats; - if (!db_->GetProperty(key, &stats)) { + if (!db->GetProperty(key, &stats)) { stats = "(failed)"; } fprintf(stdout, "\n%s\n", stats.c_str());