From c0abc6bbc1b46138649926e3230f46954a50a21f Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 22 Oct 2019 11:42:00 -0700 Subject: [PATCH] Use FLAGS_env for certain operations in db_bench (#5943) Summary: Since we already parse env_uri from command line and creates custom Env accordingly, we should invoke the methods of such Envs instead of using Env::Default(). Test Plan (on devserver): ``` $make db_bench db_stress $./db_bench -benchmarks=fillseq ./db_stress ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/5943 Differential Revision: D18018550 Pulled By: riversand963 fbshipit-source-id: 03b61329aaae0dfd914a0b902cc677f570f102e3 --- tools/db_bench_tool.cc | 16 +++++++++++----- tools/db_stress_tool.cc | 35 +++++++++++++++++++++++++++-------- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 7721ee476..743752b71 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -889,6 +889,9 @@ DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive" #endif // ROCKSDB_LITE DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with" " --env_uri."); + +static std::shared_ptr env_guard; + static rocksdb::Env* FLAGS_env = rocksdb::Env::Default(); DEFINE_int64(stats_interval, 0, "Stats are reported every N operations when " @@ -1738,7 +1741,7 @@ class Stats { "ElapsedTime", "Stage", "State", "OperationProperties"); int64_t current_time = 0; - Env::Default()->GetCurrentTime(¤t_time); + FLAGS_env->GetCurrentTime(¤t_time); for (auto ts : thread_list) { fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s", ts.thread_id, @@ -2483,7 +2486,7 @@ class Benchmark { "at the same time"); exit(1); } - FLAGS_env = new ReportFileOpEnv(rocksdb::Env::Default()); + FLAGS_env = new ReportFileOpEnv(FLAGS_env); } if (FLAGS_prefix_size > FLAGS_key_size) { @@ -2500,6 +2503,7 @@ class Benchmark { } if (!FLAGS_use_existing_db) { Options options; + options.env = FLAGS_env; if (!FLAGS_wal_dir.empty()) { options.wal_dir = FLAGS_wal_dir; } @@ -3377,8 +3381,9 @@ class Benchmark { DBOptions db_opts; std::vector cf_descs; if (FLAGS_options_file != "") { - auto s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), &db_opts, + auto s = LoadOptionsFromFile(FLAGS_options_file, FLAGS_env, &db_opts, &cf_descs); + db_opts.env = FLAGS_env; if (s.ok()) { *opts = Options(db_opts, cf_descs[0].options); return true; @@ -3399,6 +3404,7 @@ class Benchmark { assert(db_.db == nullptr); + options.env = FLAGS_env; options.max_open_files = FLAGS_open_files; if (FLAGS_cost_write_buffer_to_cache || FLAGS_db_write_buffer_size != 0) { options.write_buffer_manager.reset( @@ -6596,7 +6602,7 @@ int db_bench_tool(int argc, char** argv) { fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n"); exit(1); } else if (!FLAGS_env_uri.empty()) { - Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env); + Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env, &env_guard); if (FLAGS_env == nullptr) { fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str()); exit(1); @@ -6641,7 +6647,7 @@ int db_bench_tool(int argc, char** argv) { // Choose a location for the test database if none given with --db= if (FLAGS_db.empty()) { std::string default_db_path; - rocksdb::Env::Default()->GetTestDirectory(&default_db_path); + FLAGS_env->GetTestDirectory(&default_db_path); default_db_path += "/dbbench"; FLAGS_db = default_db_path; } diff --git a/tools/db_stress_tool.cc b/tools/db_stress_tool.cc index 2b9e063f2..fba2d0150 100644 --- a/tools/db_stress_tool.cc +++ b/tools/db_stress_tool.cc @@ -617,6 +617,11 @@ DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks"); static enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c; DEFINE_string(hdfs, "", "Name of hdfs environment"); + +DEFINE_string(env_uri, "", + "URI for env lookup. Mutually exclusive with --hdfs"); + +static std::shared_ptr env_guard; // posix or hdfs environment static rocksdb::Env* FLAGS_env = rocksdb::Env::Default(); @@ -2721,7 +2726,9 @@ class StressTest { assert(rand_column_families.size() == rand_keys.size()); std::string checkpoint_dir = FLAGS_db + "/.checkpoint" + ToString(thread->tid); - DestroyDB(checkpoint_dir, Options()); + Options tmp_opts(options_); + tmp_opts.listeners.clear(); + DestroyDB(checkpoint_dir, tmp_opts); Checkpoint* checkpoint = nullptr; Status s = Checkpoint::Create(db_, &checkpoint); if (s.ok()) { @@ -2777,7 +2784,7 @@ class StressTest { delete checkpoint_db; checkpoint_db = nullptr; } - DestroyDB(checkpoint_dir, Options()); + DestroyDB(checkpoint_dir, tmp_opts); if (!s.ok()) { fprintf(stderr, "A checkpoint operation failed with: %s\n", s.ToString().c_str()); @@ -2984,8 +2991,9 @@ class StressTest { #else DBOptions db_options; std::vector cf_descriptors; - Status s = LoadOptionsFromFile(FLAGS_options_file, Env::Default(), - &db_options, &cf_descriptors); + Status s = LoadOptionsFromFile(FLAGS_options_file, FLAGS_env, &db_options, + &cf_descriptors); + db_options.env = FLAGS_env; if (!s.ok()) { fprintf(stderr, "Unable to load options file %s --- %s\n", FLAGS_options_file.c_str(), s.ToString().c_str()); @@ -3169,6 +3177,7 @@ class StressTest { secondaries_.resize(FLAGS_threads); std::fill(secondaries_.begin(), secondaries_.end(), nullptr); Options tmp_opts; + tmp_opts.env = options_.env; tmp_opts.max_open_files = FLAGS_open_files; for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { const std::string secondary_path = @@ -3694,7 +3703,7 @@ class NonBatchedOpsStressTest : public StressTest { s = FLAGS_env->DeleteFile(sst_filename); } - SstFileWriter sst_file_writer(EnvOptions(), options_); + SstFileWriter sst_file_writer(EnvOptions(options_), options_); if (s.ok()) { s = sst_file_writer.Open(sst_filename); } @@ -4324,7 +4333,7 @@ class CfConsistencyStressTest : public StressTest { const std::vector& /* rand_keys */) { std::string checkpoint_dir = FLAGS_db + "/.checkpoint" + ToString(thread->tid); - DestroyDB(checkpoint_dir, Options()); + DestroyDB(checkpoint_dir, options_); Checkpoint* checkpoint = nullptr; Status s = Checkpoint::Create(db_, &checkpoint); if (s.ok()) { @@ -4358,7 +4367,7 @@ class CfConsistencyStressTest : public StressTest { delete checkpoint_db; checkpoint_db = nullptr; } - DestroyDB(checkpoint_dir, Options()); + DestroyDB(checkpoint_dir, options_); if (!s.ok()) { fprintf(stderr, "A checkpoint operation failed with: %s\n", s.ToString().c_str()); @@ -4546,7 +4555,17 @@ int db_stress_tool(int argc, char** argv) { StringToCompressionType(FLAGS_compression_type.c_str()); FLAGS_checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str()); if (!FLAGS_hdfs.empty()) { + if (!FLAGS_env_uri.empty()) { + fprintf(stderr, "Cannot specify both --hdfs and --env_uri.\n"); + exit(1); + } FLAGS_env = new rocksdb::HdfsEnv(FLAGS_hdfs); + } else if (!FLAGS_env_uri.empty()) { + Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env, &env_guard); + if (FLAGS_env == nullptr) { + fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str()); + exit(1); + } } FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); @@ -4644,7 +4663,7 @@ int db_stress_tool(int argc, char** argv) { // Choose a location for the test database if none given with --db= if (FLAGS_db.empty()) { std::string default_db_path; - rocksdb::Env::Default()->GetTestDirectory(&default_db_path); + FLAGS_env->GetTestDirectory(&default_db_path); default_db_path += "/dbstress"; FLAGS_db = default_db_path; }