diff --git a/db/db_bench.cc b/db/db_bench.cc index 8562d04aa..56e649add 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -153,6 +153,13 @@ DEFINE_int64(merge_keys, -1, "If negative, there will be FLAGS_num keys."); DEFINE_int32(num_column_families, 1, "Number of Column Families to use."); +DEFINE_int32( + num_hot_column_families, 8, + "Number of Hot Column Families. If more than 0, only write to this " + "number of column families. After finishing all the writes to them, " + "create new set of column families and insert to them. Only used " + "when num_column_families > 1."); + DEFINE_int64(reads, -1, "Number of read operations to do. " "If negative, do FLAGS_num reads."); @@ -390,6 +397,16 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { fprintf(stdout, "Cannot parse compression type '%s'\n", ctype); return rocksdb::kSnappyCompression; //default value } + +std::string ColumnFamilyName(int i) { + if (i == 0) { + return rocksdb::kDefaultColumnFamilyName; + } else { + char name[100]; + snprintf(name, sizeof(name), "column_family_name_%06d", i); + return std::string(name); + } +} } // namespace DEFINE_string(compression_type, "snappy", @@ -475,6 +492,7 @@ DEFINE_int32(source_compaction_factor, 1, "Cap the size of data in level-K for" DEFINE_uint64(wal_ttl_seconds, 0, "Set the TTL for the WAL Files in seconds."); DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files" " in MB."); +DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size"); DEFINE_bool(bufferedio, rocksdb::EnvOptions().use_os_buffer, "Allow buffered io using OS buffers"); @@ -779,9 +797,50 @@ static void AppendWithSpace(std::string* str, Slice msg) { struct DBWithColumnFamilies { std::vector cfh; DB* db; + std::atomic num_created; // Need to be updated after all the + // new entries in cfh are set. + size_t num_hot; // Number of column families to be queried at each moment. + // After each CreateNewCf(), another num_hot number of new + // Column families will be created and used to be queried. + port::Mutex create_cf_mutex; // Only one thread can execute CreateNewCf() + DBWithColumnFamilies() : db(nullptr) { cfh.clear(); } + + DBWithColumnFamilies(const DBWithColumnFamilies& other) + : cfh(other.cfh), + db(other.db), + num_created(other.num_created.load()), + num_hot(other.num_hot) {} + + ColumnFamilyHandle* GetCfh(int64_t rand_num) { + assert(num_hot > 0); + return cfh[num_created.load(std::memory_order_acquire) - num_hot + + rand_num % num_hot]; + } + + // stage: assume CF from 0 to stage * num_hot has be created. Need to create + // stage * num_hot + 1 to stage * (num_hot + 1). + void CreateNewCf(ColumnFamilyOptions options, int64_t stage) { + MutexLock l(&create_cf_mutex); + if ((stage + 1) * num_hot <= num_created) { + // Already created. + return; + } + auto new_num_created = num_created + num_hot; + assert(new_num_created <= cfh.size()); + for (size_t i = num_created; i < new_num_created; i++) { + Status s = + db->CreateColumnFamily(options, ColumnFamilyName(i), &(cfh[i])); + if (!s.ok()) { + fprintf(stderr, "create column family error: %s\n", + s.ToString().c_str()); + abort(); + } + } + num_created.store(new_num_created, std::memory_order_release); + } }; class Stats { @@ -888,8 +947,8 @@ class Stats { if (FLAGS_stats_per_interval) { std::string stats; - if (db_with_cfh && db_with_cfh->cfh.size()) { - for (size_t i = 0; i < db_with_cfh->cfh.size(); ++i) { + if (db_with_cfh && db_with_cfh->num_created.load()) { + for (size_t i = 0; i < db_with_cfh->num_created.load(); ++i) { if (db->GetProperty(db_with_cfh->cfh[i], "rocksdb.cfstats", &stats)) fprintf(stderr, "%s\n", stats.c_str()); @@ -994,13 +1053,16 @@ struct ThreadState { class Duration { public: - Duration(int max_seconds, int64_t max_ops) { + Duration(int max_seconds, int64_t max_ops, int64_t ops_per_stage = 0) { max_seconds_ = max_seconds; max_ops_= max_ops; + ops_per_stage_ = (ops_per_stage > 0) ? ops_per_stage : max_ops; ops_ = 0; start_at_ = FLAGS_env->NowMicros(); } + int64_t GetStage() { return std::min(ops_, max_ops_ - 1) / ops_per_stage_; } + bool Done(int64_t increment) { if (increment <= 0) increment = 1; // avoid Done(0) and infinite loops ops_ += increment; @@ -1021,6 +1083,7 @@ class Duration { private: int max_seconds_; int64_t max_ops_; + int64_t ops_per_stage_; int64_t ops_; double start_at_; }; @@ -1040,6 +1103,7 @@ class Benchmark { int64_t keys_per_prefix_; int64_t entries_per_batch_; WriteOptions write_options_; + Options open_options_; // keep options around to properly destroy db later int64_t reads_; int64_t writes_; int64_t readwrites_; @@ -1355,24 +1419,12 @@ class Benchmark { return base_name + ToString(id); } - std::string ColumnFamilyName(int i) { - if (i == 0) { - return kDefaultColumnFamilyName; - } else { - char name[100]; - snprintf(name, sizeof(name), "column_family_name_%06d", i); - return std::string(name); - } - } - void Run() { - Options open_options; // keep options around to properly destroy db later - if (!SanityCheck()) { exit(1); } PrintHeader(); - Open(&open_options); + Open(&open_options_); const char* benchmarks = FLAGS_benchmarks.c_str(); while (benchmarks != nullptr) { const char* sep = strchr(benchmarks, ','); @@ -1533,15 +1585,15 @@ class Benchmark { delete db_.db; db_.db = nullptr; db_.cfh.clear(); - DestroyDB(FLAGS_db, open_options); + DestroyDB(FLAGS_db, open_options_); } for (size_t i = 0; i < multi_dbs_.size(); i++) { delete multi_dbs_[i].db; - DestroyDB(GetDbNameForMultiple(FLAGS_db, i), open_options); + DestroyDB(GetDbNameForMultiple(FLAGS_db, i), open_options_); } multi_dbs_.clear(); } - Open(&open_options); // use open_options for the last accessed + Open(&open_options_); // use open_options for the last accessed } if (method != nullptr) { @@ -1996,6 +2048,8 @@ class Benchmark { options.compression_opts.level = FLAGS_compression_level; options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; + options.max_total_wal_size = FLAGS_max_total_wal_size; + if (FLAGS_min_level_to_compress >= 0) { assert(FLAGS_min_level_to_compress <= FLAGS_num_levels); options.compression_per_level.resize(FLAGS_num_levels); @@ -2077,9 +2131,15 @@ class Benchmark { Status s; // Open with column families if necessary. if (FLAGS_num_column_families > 1) { - db->cfh.resize(FLAGS_num_column_families); + size_t num_hot = FLAGS_num_column_families; + if (FLAGS_num_hot_column_families > 0 && + FLAGS_num_hot_column_families < FLAGS_num_column_families) { + num_hot = FLAGS_num_hot_column_families; + } else { + FLAGS_num_hot_column_families = FLAGS_num_column_families; + } std::vector column_families; - for (int i = 0; i < FLAGS_num_column_families; i++) { + for (size_t i = 0; i < num_hot; i++) { column_families.push_back(ColumnFamilyDescriptor( ColumnFamilyName(i), ColumnFamilyOptions(options))); } @@ -2089,6 +2149,10 @@ class Benchmark { } else { s = DB::Open(options, db_name, column_families, &db->cfh, &db->db); } + db->cfh.resize(FLAGS_num_column_families); + db->num_created = num_hot; + db->num_hot = num_hot; + } else if (FLAGS_readonly) { s = DB::OpenForReadOnly(options, db_name, &db->db); } else { @@ -2185,9 +2249,18 @@ class Benchmark { num_key_gens = multi_dbs_.size(); } std::vector> key_gens(num_key_gens); - Duration duration(test_duration, num_ops * num_key_gens); + int64_t max_ops = num_ops * num_key_gens; + int64_t ops_per_stage = max_ops; + if (FLAGS_num_column_families > 1 && FLAGS_num_hot_column_families > 0) { + ops_per_stage = (max_ops - 1) / (FLAGS_num_column_families / + FLAGS_num_hot_column_families) + + 1; + } + + Duration duration(test_duration, max_ops, ops_per_stage); for (size_t i = 0; i < num_key_gens; i++) { - key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode, num_ops)); + key_gens[i].reset(new KeyGenerator(&(thread->rand), write_mode, num_ops, + ops_per_stage)); } if (num_ != FLAGS_num) { @@ -2203,7 +2276,18 @@ class Benchmark { Slice key = AllocateKey(); std::unique_ptr key_guard(key.data()); + int64_t stage = 0; while (!duration.Done(entries_per_batch_)) { + if (duration.GetStage() != stage) { + stage = duration.GetStage(); + if (db_.db != nullptr) { + db_.CreateNewCf(open_options_, stage); + } else { + for (auto& db : multi_dbs_) { + db.CreateNewCf(open_options_, stage); + } + } + } size_t id = thread->rand.Next() % num_key_gens; DBWithColumnFamilies* db_with_cfh = SelectDBWithCfh(id); batch.Clear(); @@ -2216,8 +2300,8 @@ class Benchmark { // We use same rand_num as seed for key and column family so that we // can deterministically find the cfh corresponding to a particular // key while reading the key. - batch.Put(db_with_cfh->cfh[rand_num % db_with_cfh->cfh.size()], - key, gen.Generate(value_size_)); + batch.Put(db_with_cfh->GetCfh(rand_num), key, + gen.Generate(value_size_)); } bytes += value_size_ + key_size_; } @@ -2343,8 +2427,8 @@ class Benchmark { read++; Status s; if (FLAGS_num_column_families > 1) { - s = db_with_cfh->db->Get(options, - db_with_cfh->cfh[key_rand % db_with_cfh->cfh.size()], key, &value); + s = db_with_cfh->db->Get(options, db_with_cfh->GetCfh(key_rand), key, + &value); } else { s = db_with_cfh->db->Get(options, key, &value); }