Add a file system parameter: --fs_uri to db_stress and db_bench (#6878)

Summary:
This pull request adds the parameter --fs_uri to db_bench and db_stress, creating a composite env combining the default env with a specified registered rocksdb file system.

This makes it easier to develop and test new RocksDB FileSystems.

The pull request also registers the posix file system for testing purposes.

Examples:
```
$./db_bench --fs_uri=posix:// --benchmarks=fillseq

$./db_stress --fs_uri=zenfs://nullb1
```

zenfs is a RocksDB FileSystem I'm developing to add support for zoned block devices, and in that case the zoned block device is specified in the uri (a zoned null block device in the above example).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/6878

Reviewed By: siying

Differential Revision: D23023063

Pulled By: ajkr

fbshipit-source-id: 8b3fe7193ce45e683043b021779b7a4d547af247
main
Hans Holmberg 4 years ago committed by Facebook GitHub Bot
parent 59ebab654e
commit 2a0d3c7054
  1. 1
      db_stress_tool/db_stress_common.h
  2. 14
      db_stress_tool/db_stress_gflags.cc
  3. 23
      db_stress_tool/db_stress_tool.cc
  4. 12
      env/fs_posix.cc
  5. 42
      tools/db_bench_tool.cc

@ -204,6 +204,7 @@ DECLARE_int32(compression_parallel_threads);
DECLARE_string(checksum_type); DECLARE_string(checksum_type);
DECLARE_string(hdfs); DECLARE_string(hdfs);
DECLARE_string(env_uri); DECLARE_string(env_uri);
DECLARE_string(fs_uri);
DECLARE_uint64(ops_per_thread); DECLARE_uint64(ops_per_thread);
DECLARE_uint64(log2_keys_per_lock); DECLARE_uint64(log2_keys_per_lock);
DECLARE_uint64(max_manifest_file_size); DECLARE_uint64(max_manifest_file_size);

@ -607,10 +607,18 @@ DEFINE_string(bottommost_compression_type, "disable",
DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks"); DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks");
DEFINE_string(hdfs, "", "Name of hdfs environment"); DEFINE_string(hdfs, "",
"Name of hdfs environment. Mutually exclusive with"
" --env_uri and --fs_uri.");
DEFINE_string(env_uri, "", DEFINE_string(
"URI for env lookup. Mutually exclusive with --hdfs"); env_uri, "",
"URI for env lookup. Mutually exclusive with --hdfs and --fs_uri");
DEFINE_string(fs_uri, "",
"URI for registry Filesystem lookup. Mutually exclusive"
" with --hdfs and --env_uri."
" Creates a default environment with the specified filesystem.");
DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread."); DEFINE_uint64(ops_per_thread, 1200000, "Number of operations per thread.");
static const bool FLAGS_ops_per_thread_dummy __attribute__((__unused__)) = static const bool FLAGS_ops_per_thread_dummy __attribute__((__unused__)) =

@ -34,6 +34,11 @@ static std::shared_ptr<ROCKSDB_NAMESPACE::DbStressEnvWrapper> env_wrapper_guard;
static std::shared_ptr<CompositeEnvWrapper> fault_env_guard; static std::shared_ptr<CompositeEnvWrapper> fault_env_guard;
} // namespace } // namespace
static Env* GetCompositeEnv(std::shared_ptr<FileSystem> fs) {
static std::shared_ptr<Env> composite_env = NewCompositeEnv(fs);
return composite_env.get();
}
KeyGenContext key_gen_ctx; KeyGenContext key_gen_ctx;
int db_stress_tool(int argc, char** argv) { int db_stress_tool(int argc, char** argv) {
@ -63,11 +68,15 @@ int db_stress_tool(int argc, char** argv) {
Env* raw_env; Env* raw_env;
if (!FLAGS_hdfs.empty()) { int env_opts =
if (!FLAGS_env_uri.empty()) { !FLAGS_hdfs.empty() + !FLAGS_env_uri.empty() + !FLAGS_fs_uri.empty();
fprintf(stderr, "Cannot specify both --hdfs and --env_uri.\n"); if (env_opts > 1) {
fprintf(stderr,
"Error: --hdfs, --env_uri and --fs_uri are mutually exclusive\n");
exit(1); exit(1);
} }
if (!FLAGS_hdfs.empty()) {
raw_env = new ROCKSDB_NAMESPACE::HdfsEnv(FLAGS_hdfs); raw_env = new ROCKSDB_NAMESPACE::HdfsEnv(FLAGS_hdfs);
} else if (!FLAGS_env_uri.empty()) { } else if (!FLAGS_env_uri.empty()) {
Status s = Env::LoadEnv(FLAGS_env_uri, &raw_env, &env_guard); Status s = Env::LoadEnv(FLAGS_env_uri, &raw_env, &env_guard);
@ -75,6 +84,14 @@ int db_stress_tool(int argc, char** argv) {
fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str()); fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
exit(1); exit(1);
} }
} else if (!FLAGS_fs_uri.empty()) {
std::shared_ptr<FileSystem> fs;
Status s = FileSystem::Load(FLAGS_fs_uri, &fs);
if (!s.ok()) {
fprintf(stderr, "Error: %s\n", s.ToString().c_str());
exit(1);
}
raw_env = GetCompositeEnv(fs);
} else { } else {
raw_env = Env::Default(); raw_env = Env::Default();
} }

12
env/fs_posix.cc vendored

@ -56,6 +56,7 @@
#include "port/port.h" #include "port/port.h"
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/utilities/object_registry.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression_context_cache.h" #include "util/compression_context_cache.h"
@ -1046,4 +1047,15 @@ std::shared_ptr<FileSystem> FileSystem::Default() {
return default_fs_ptr; return default_fs_ptr;
} }
#ifndef ROCKSDB_LITE
static FactoryFunc<FileSystem> posix_filesystem_reg =
ObjectLibrary::Default()->Register<FileSystem>(
"posix://.*",
[](const std::string& /* uri */, std::unique_ptr<FileSystem>* f,
std::string* /* errmsg */) {
f->reset(new PosixFileSystem());
return f->get();
});
#endif
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -952,11 +952,17 @@ static bool ValidateTableCacheNumshardbits(const char* flagname,
DEFINE_int32(table_cache_numshardbits, 4, ""); DEFINE_int32(table_cache_numshardbits, 4, "");
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
DEFINE_string(env_uri, "", "URI for registry Env lookup. Mutually exclusive" DEFINE_string(env_uri, "",
" with --hdfs."); "URI for registry Env lookup. Mutually exclusive"
" with --hdfs and --fs_uri");
DEFINE_string(fs_uri, "",
"URI for registry Filesystem lookup. Mutually exclusive"
" with --hdfs and --env_uri."
" Creates a default environment with the specified filesystem.");
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with" DEFINE_string(hdfs, "",
" --env_uri."); "Name of hdfs environment. Mutually exclusive with"
" --env_uri and --fs_uri");
static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard; static std::shared_ptr<ROCKSDB_NAMESPACE::Env> env_guard;
@ -986,6 +992,15 @@ DEFINE_int32(thread_status_per_interval, 0,
DEFINE_int32(perf_level, ROCKSDB_NAMESPACE::PerfLevel::kDisable, DEFINE_int32(perf_level, ROCKSDB_NAMESPACE::PerfLevel::kDisable,
"Level of perf collection"); "Level of perf collection");
#ifndef ROCKSDB_LITE
static ROCKSDB_NAMESPACE::Env* GetCompositeEnv(
std::shared_ptr<ROCKSDB_NAMESPACE::FileSystem> fs) {
static std::shared_ptr<ROCKSDB_NAMESPACE::Env> composite_env =
ROCKSDB_NAMESPACE::NewCompositeEnv(fs);
return composite_env.get();
}
#endif
static bool ValidateRateLimit(const char* flagname, double value) { static bool ValidateRateLimit(const char* flagname, double value) {
const double EPSILON = 1e-10; const double EPSILON = 1e-10;
if ( value < -EPSILON ) { if ( value < -EPSILON ) {
@ -7097,15 +7112,28 @@ int db_bench_tool(int argc, char** argv) {
FLAGS_blob_db_compression_type_e = FLAGS_blob_db_compression_type_e =
StringToCompressionType(FLAGS_blob_db_compression_type.c_str()); StringToCompressionType(FLAGS_blob_db_compression_type.c_str());
if (!FLAGS_hdfs.empty() && !FLAGS_env_uri.empty()) { int env_opts =
fprintf(stderr, "Cannot provide both --hdfs and --env_uri.\n"); !FLAGS_hdfs.empty() + !FLAGS_env_uri.empty() + !FLAGS_fs_uri.empty();
if (env_opts > 1) {
fprintf(stderr,
"Error: --hdfs, --env_uri and --fs_uri are mutually exclusive\n");
exit(1); exit(1);
} else if (!FLAGS_env_uri.empty()) { }
if (!FLAGS_env_uri.empty()) {
Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env, &env_guard); Status s = Env::LoadEnv(FLAGS_env_uri, &FLAGS_env, &env_guard);
if (FLAGS_env == nullptr) { if (FLAGS_env == nullptr) {
fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str()); fprintf(stderr, "No Env registered for URI: %s\n", FLAGS_env_uri.c_str());
exit(1); exit(1);
} }
} else if (!FLAGS_fs_uri.empty()) {
std::shared_ptr<FileSystem> fs;
Status s = FileSystem::Load(FLAGS_fs_uri, &fs);
if (fs == nullptr) {
fprintf(stderr, "Error: %s\n", s.ToString().c_str());
exit(1);
}
FLAGS_env = GetCompositeEnv(fs);
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) { if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) {

Loading…
Cancel
Save