diff --git a/db/db_bench.cc b/db/db_bench.cc index f208b8181..8b0456dfd 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -146,6 +146,7 @@ DEFINE_int64(merge_keys, -1, "Number of distinct keys to use for MergeRandom and " "ReadRandomMergeRandom. " "If negative, there will be FLAGS_num keys."); +DEFINE_int32(num_column_families, 1, "Number of Column Families to use."); DEFINE_int64(reads, -1, "Number of read operations to do. " "If negative, do FLAGS_num reads."); @@ -847,8 +848,15 @@ class Benchmark { shared_ptr compressed_cache_; const FilterPolicy* filter_policy_; const SliceTransform* prefix_extractor_; - DB* db_; - std::vector multi_dbs_; + struct DBWithColumnFamilies { + std::vector cfh; + DB* db; + DBWithColumnFamilies() : db(nullptr) { + cfh.clear(); + } + }; + DBWithColumnFamilies db_; + std::vector multi_dbs_; int64_t num_; int value_size_; int key_size_; @@ -1068,7 +1076,6 @@ class Benchmark { ? NewBloomFilterPolicy(FLAGS_bloom_bits) : nullptr), prefix_extractor_(NewFixedPrefixTransform(FLAGS_prefix_size)), - db_(nullptr), num_(FLAGS_num), value_size_(FLAGS_value_size), key_size_(FLAGS_key_size), @@ -1099,7 +1106,7 @@ class Benchmark { } ~Benchmark() { - delete db_; + delete db_.db; delete filter_policy_; delete prefix_extractor_; } @@ -1159,6 +1166,16 @@ class Benchmark { return base_name + std::to_string(id); } + std::string ColumnFamilyName(int i) { + if (i == 0) { + return kDefaultColumnFamilyName; + } else { + char name[100]; + snprintf(name, sizeof(name), "column_family_name_%06d", i); + return std::string(name); + } + } + void Run() { if (!SanityCheck()) { exit(1); @@ -1313,13 +1330,14 @@ class Benchmark { name.ToString().c_str()); method = nullptr; } else { - if (db_ != nullptr) { - delete db_; - db_ = nullptr; + if (db_.db != nullptr) { + delete db_.db; + db_.db = nullptr; + db_.cfh.clear(); DestroyDB(FLAGS_db, Options()); } for (size_t i = 0; i < multi_dbs_.size(); i++) { - delete multi_dbs_[i]; + delete multi_dbs_[i].db; DestroyDB(GetDbNameForMultiple(FLAGS_db, i), Options()); } multi_dbs_.clear(); @@ -1617,9 +1635,10 @@ class Benchmark { } void Open() { - assert(db_ == nullptr); + assert(db_.db == nullptr); Options options; options.create_if_missing = !FLAGS_use_existing_db; + options.create_missing_column_families = FLAGS_num_column_families > 1; options.block_cache = cache_; options.block_cache_compressed = compressed_cache_; if (cache_ == nullptr) { @@ -1816,10 +1835,9 @@ class Benchmark { OpenDb(options, FLAGS_db, &db_); } else { multi_dbs_.clear(); + multi_dbs_.resize(FLAGS_num_multi_db); for (int i = 0; i < FLAGS_num_multi_db; i++) { - DB* db; - OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &db); - multi_dbs_.push_back(db); + OpenDb(options, GetDbNameForMultiple(FLAGS_db, i), &multi_dbs_[i]); } } if (FLAGS_min_level_to_compress >= 0) { @@ -1827,12 +1845,27 @@ class Benchmark { } } - void OpenDb(Options options, std::string db_name, DB** db) { + void OpenDb(const Options& options, const std::string& db_name, + DBWithColumnFamilies* db) { Status s; - if(FLAGS_readonly) { - s = DB::OpenForReadOnly(options, db_name, db); + // Open with column families if necessary. + if (FLAGS_num_column_families > 1) { + db->cfh.resize(FLAGS_num_column_families); + std::vector column_families; + for (int i = 0; i < FLAGS_num_column_families; i++) { + column_families.push_back(ColumnFamilyDescriptor( + ColumnFamilyName(i), ColumnFamilyOptions(options))); + } + if (FLAGS_readonly) { + s = DB::OpenForReadOnly(options, db_name, column_families, + &db->cfh, &db->db); + } else { + s = DB::Open(options, db_name, column_families, &db->cfh, &db->db); + } + } else if (FLAGS_readonly) { + s = DB::OpenForReadOnly(options, db_name, &db->db); } else { - s = DB::Open(options, db_name, db); + s = DB::Open(options, db_name, &db->db); } if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -1900,10 +1933,18 @@ class Benchmark { }; DB* SelectDB(ThreadState* thread) { - if (db_ != nullptr) { - return db_; - } else { - return multi_dbs_[thread->rand.Next() % multi_dbs_.size()]; + return SelectDBWithCfh(thread)->db; + } + + DBWithColumnFamilies* SelectDBWithCfh(ThreadState* thread) { + return SelectDBWithCfh(thread->rand.Next()); + } + + DBWithColumnFamilies* SelectDBWithCfh(uint64_t rand_int) { + if (db_.db != nullptr) { + return &db_; + } else { + return &multi_dbs_[rand_int % multi_dbs_.size()]; } } @@ -1912,7 +1953,7 @@ class Benchmark { const int64_t num_ops = writes_ == 0 ? num_ : writes_; size_t num_key_gens = 1; - if (db_ == nullptr) { + if (db_.db == nullptr) { num_key_gens = multi_dbs_.size(); } std::vector> key_gens(num_key_gens); @@ -1935,20 +1976,25 @@ class Benchmark { Slice key = AllocateKey(); std::unique_ptr key_guard(key.data()); while (!duration.Done(entries_per_batch_)) { - size_t id = 0; - DB* db_to_write = db_; - if (db_to_write == nullptr) { - id = thread->rand.Next() % num_key_gens; - db_to_write = multi_dbs_[id]; - } + size_t id = thread->rand.Next() % num_key_gens; + DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id); batch.Clear(); for (int64_t j = 0; j < entries_per_batch_; j++) { - GenerateKeyFromInt(key_gens[id]->Next(), FLAGS_num, &key); - batch.Put(key, gen.Generate(value_size_)); + int64_t rand_num = key_gens[id]->Next(); + GenerateKeyFromInt(rand_num, FLAGS_num, &key); + if (FLAGS_num_column_families <= 1) { + batch.Put(key, gen.Generate(value_size_)); + } else { + // We use same rand_num as seed for key and column family so that we + // can deterministically find the cfh corresponding to a particular + // key while reading the key. + batch.Put(db_with_cfh->cfh[rand_num % db_with_cfh->cfh.size()], + key, gen.Generate(value_size_)); + } bytes += value_size_ + key_size_; } - s = db_to_write->Write(write_options_, &batch); - thread->stats.FinishedOps(db_to_write, entries_per_batch_); + s = db_with_cfh->db->Write(write_options_, &batch); + thread->stats.FinishedOps(db_with_cfh->db, entries_per_batch_); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -1958,11 +2004,11 @@ class Benchmark { } void ReadSequential(ThreadState* thread) { - if (db_ != nullptr) { - ReadSequential(thread, db_); + if (db_.db != nullptr) { + ReadSequential(thread, db_.db); } else { - for (DB* db : multi_dbs_) { - ReadSequential(thread, db); + for (const auto& db_with_cfh : multi_dbs_) { + ReadSequential(thread, db_with_cfh.db); } } } @@ -1981,11 +2027,11 @@ class Benchmark { } void ReadReverse(ThreadState* thread) { - if (db_ != nullptr) { - ReadReverse(thread, db_); + if (db_.db != nullptr) { + ReadReverse(thread, db_.db); } else { - for (DB* db : multi_dbs_) { - ReadReverse(thread, db); + for (const auto& db_with_cfh : multi_dbs_) { + ReadReverse(thread, db_with_cfh.db); } } } @@ -1996,7 +2042,7 @@ class Benchmark { int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedOps(db_, 1); + thread->stats.FinishedOps(db, 1); ++i; } delete iter; @@ -2013,13 +2059,24 @@ class Benchmark { Duration duration(FLAGS_duration, reads_); while (!duration.Done(1)) { - DB* db = SelectDB(thread); - GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key); + DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(thread); + // We use same key_rand as seed for key and column family so that we can + // deterministically find the cfh corresponding to a particular key, as it + // is done in DoWrite method. + int64_t key_rand = thread->rand.Next() % FLAGS_num; + GenerateKeyFromInt(key_rand, FLAGS_num, &key); read++; - if (db->Get(options, key, &value).ok()) { + Status s; + if (FLAGS_num_column_families > 1) { + s = db_with_cfh->db->Get(options, + db_with_cfh->cfh[key_rand % db_with_cfh->cfh.size()], key, &value); + } else { + s = db_with_cfh->db->Get(options, key, &value); + } + if (s.ok()) { found++; } - thread->stats.FinishedOps(db_, 1); + thread->stats.FinishedOps(db_with_cfh->db, 1); } char msg[100]; @@ -2061,6 +2118,7 @@ class Benchmark { ++found; } } + thread->stats.FinishedOps(db, entries_per_batch_); } for (auto& k : keys) { delete k.data(); @@ -2099,11 +2157,11 @@ class Benchmark { Iterator* single_iter = nullptr; std::vector multi_iters; - if (db_ != nullptr) { - single_iter = db_->NewIterator(options); + if (db_.db != nullptr) { + single_iter = db_.db->NewIterator(options); } else { - for (DB* db : multi_dbs_) { - multi_iters.push_back(db->NewIterator(options)); + for (const auto& db_with_cfh : multi_dbs_) { + multi_iters.push_back(db_with_cfh.db->NewIterator(options)); } } uint64_t last_refresh = FLAGS_env->NowMicros(); @@ -2116,16 +2174,16 @@ class Benchmark { if (!FLAGS_use_tailing_iterator && FLAGS_iter_refresh_interval_us >= 0) { uint64_t now = FLAGS_env->NowMicros(); if (now - last_refresh > (uint64_t)FLAGS_iter_refresh_interval_us) { - if (db_ != nullptr) { + if (db_.db != nullptr) { delete single_iter; - single_iter = db_->NewIterator(options); + single_iter = db_.db->NewIterator(options); } else { for (auto iter : multi_iters) { delete iter; } multi_iters.clear(); - for (DB* db : multi_dbs_) { - multi_iters.push_back(db->NewIterator(options)); + for (const auto& db_with_cfh : multi_dbs_) { + multi_iters.push_back(db_with_cfh.db->NewIterator(options)); } } } @@ -2143,7 +2201,7 @@ class Benchmark { if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) { found++; } - thread->stats.FinishedOps(db_, 1); + thread->stats.FinishedOps(db_.db, 1); } delete single_iter; for (auto iter : multi_iters) { @@ -2243,7 +2301,7 @@ class Benchmark { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } - thread->stats.FinishedOps(db_, 1); + thread->stats.FinishedOps(db_.db, 1); ++num_writes; if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) { @@ -2403,7 +2461,7 @@ class Benchmark { deletes_done++; } - thread->stats.FinishedOps(db_, 1); + thread->stats.FinishedOps(db_.db, 1); } char msg[100]; snprintf(msg, sizeof(msg), @@ -2542,7 +2600,7 @@ class Benchmark { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } - thread->stats.FinishedOps(db_, 1); + thread->stats.FinishedOps(db, 1); } char msg[100]; @@ -2578,7 +2636,7 @@ class Benchmark { fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); exit(1); } - thread->stats.FinishedOps(db_, 1); + thread->stats.FinishedOps(db, 1); } // Print some statistics @@ -2639,7 +2697,7 @@ class Benchmark { } - thread->stats.FinishedOps(db_, 1); + thread->stats.FinishedOps(db, 1); } char msg[100]; @@ -2656,11 +2714,11 @@ class Benchmark { } void PrintStats(const char* key) { - if (db_ != nullptr) { - PrintStats(db_, key, false); + if (db_.db != nullptr) { + PrintStats(db_.db, key, false); } - for (DB* db : multi_dbs_) { - PrintStats(db, key, true); + for (const auto& db_with_cfh : multi_dbs_) { + PrintStats(db_with_cfh.db, key, true); } }