Allow db_stress to use a secondary cache (#8455)

Summary:
Add a ```-secondary_cache_uri``` to db_stress to allow the user to specify a custom ```SecondaryCache``` object from the object registry. Also allow db_crashtest.py to be run with an alternate db_stress location. Together, these changes will allow us to run db_stress using FB internal components.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8455

Reviewed By: zhichao-cao

Differential Revision: D29371972

Pulled By: anand1976

fbshipit-source-id: dd1b1fd80ebbedc11aa63d9246ea6ae49edb77c4
main
anand76 4 years ago committed by Facebook GitHub Bot
parent be8199cdb9
commit 6f9ed59b1d
  1. 2
      db_stress_tool/db_stress_common.h
  2. 10
      db_stress_tool/db_stress_gflags.cc
  3. 26
      db_stress_tool/db_stress_test_base.cc
  4. 2
      db_stress_tool/db_stress_test_base.h
  5. 10
      tools/db_crashtest.py

@ -131,6 +131,7 @@ DECLARE_int32(get_current_wal_file_one_in);
DECLARE_int32(set_options_one_in); DECLARE_int32(set_options_one_in);
DECLARE_int32(set_in_place_one_in); DECLARE_int32(set_in_place_one_in);
DECLARE_int64(cache_size); DECLARE_int64(cache_size);
DECLARE_int32(cache_numshardbits);
DECLARE_bool(cache_index_and_filter_blocks); DECLARE_bool(cache_index_and_filter_blocks);
DECLARE_int32(top_level_index_pinning); DECLARE_int32(top_level_index_pinning);
DECLARE_int32(partition_pinning); DECLARE_int32(partition_pinning);
@ -262,6 +263,7 @@ DECLARE_bool(fail_if_options_file_error);
DECLARE_uint64(batch_protection_bytes_per_key); DECLARE_uint64(batch_protection_bytes_per_key);
DECLARE_uint64(user_timestamp_size); DECLARE_uint64(user_timestamp_size);
DECLARE_string(secondary_cache_uri);
constexpr long KB = 1024; constexpr long KB = 1024;
constexpr int kRandomValueMaxFactor = 3; constexpr int kRandomValueMaxFactor = 3;

@ -284,6 +284,11 @@ DEFINE_int32(set_in_place_one_in, 0,
DEFINE_int64(cache_size, 2LL * KB * KB * KB, DEFINE_int64(cache_size, 2LL * KB * KB * KB,
"Number of bytes to use as a cache of uncompressed data."); "Number of bytes to use as a cache of uncompressed data.");
DEFINE_int32(cache_numshardbits, 6,
"Number of shards for the block cache"
" is 2 ** cache_numshardbits. Negative means use default settings."
" This is applied only if FLAGS_cache_size is non-negative.");
DEFINE_bool(cache_index_and_filter_blocks, false, DEFINE_bool(cache_index_and_filter_blocks, false,
"True if indexes/filters should be cached in block cache."); "True if indexes/filters should be cached in block cache.");
@ -815,4 +820,9 @@ DEFINE_int32(open_metadata_write_fault_one_in, 0,
"On non-zero, enables fault injection on file metadata write " "On non-zero, enables fault injection on file metadata write "
"during DB reopen."); "during DB reopen.");
#ifndef ROCKSDB_LITE
DEFINE_string(secondary_cache_uri, "",
"Full URI for creating a customized secondary cache object");
#endif // ROCKSDB_LITE
#endif // GFLAGS #endif // GFLAGS

@ -14,8 +14,10 @@
#include "db_stress_tool/db_stress_driver.h" #include "db_stress_tool/db_stress_driver.h"
#include "db_stress_tool/db_stress_table_properties_collector.h" #include "db_stress_tool/db_stress_table_properties_collector.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/secondary_cache.h"
#include "rocksdb/sst_file_manager.h" #include "rocksdb/sst_file_manager.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "rocksdb/utilities/object_registry.h"
#include "util/cast_util.h" #include "util/cast_util.h"
#include "utilities/backupable/backupable_db_impl.h" #include "utilities/backupable/backupable_db_impl.h"
#include "utilities/fault_injection_fs.h" #include "utilities/fault_injection_fs.h"
@ -49,7 +51,7 @@ std::shared_ptr<const FilterPolicy> CreateFilterPolicy() {
} // namespace } // namespace
StressTest::StressTest() StressTest::StressTest()
: cache_(NewCache(FLAGS_cache_size)), : cache_(NewCache(FLAGS_cache_size, FLAGS_cache_numshardbits)),
compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)), compressed_cache_(NewLRUCache(FLAGS_compressed_cache_size)),
filter_policy_(CreateFilterPolicy()), filter_policy_(CreateFilterPolicy()),
db_(nullptr), db_(nullptr),
@ -114,7 +116,8 @@ StressTest::~StressTest() {
delete cmp_db_; delete cmp_db_;
} }
std::shared_ptr<Cache> StressTest::NewCache(size_t capacity) { std::shared_ptr<Cache> StressTest::NewCache(size_t capacity,
int32_t num_shard_bits) {
if (capacity <= 0) { if (capacity <= 0) {
return nullptr; return nullptr;
} }
@ -126,7 +129,24 @@ std::shared_ptr<Cache> StressTest::NewCache(size_t capacity) {
} }
return cache; return cache;
} else { } else {
return NewLRUCache((size_t)capacity); LRUCacheOptions opts;
opts.capacity = capacity;
opts.num_shard_bits = num_shard_bits;
#ifndef ROCKSDB_LITE
std::shared_ptr<SecondaryCache> secondary_cache;
if (!FLAGS_secondary_cache_uri.empty()) {
Status s = ObjectRegistry::NewInstance()->NewSharedObject<SecondaryCache>(
FLAGS_secondary_cache_uri, &secondary_cache);
if (secondary_cache == nullptr) {
fprintf(stderr,
"No secondary cache registered matching string: %s status=%s\n",
FLAGS_secondary_cache_uri.c_str(), s.ToString().c_str());
exit(1);
}
opts.secondary_cache = secondary_cache;
}
#endif
return NewLRUCache(opts);
} }
} }

@ -23,7 +23,7 @@ class StressTest {
virtual ~StressTest(); virtual ~StressTest();
std::shared_ptr<Cache> NewCache(size_t capacity); std::shared_ptr<Cache> NewCache(size_t capacity, int32_t num_shard_bits);
static std::vector<std::string> GetBlobCompressionTags(); static std::vector<std::string> GetBlobCompressionTags();

@ -151,6 +151,7 @@ default_params = {
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR' _TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
_DEBUG_LEVEL_ENV_VAR = 'DEBUG_LEVEL' _DEBUG_LEVEL_ENV_VAR = 'DEBUG_LEVEL'
stress_cmd = "./db_stress"
def is_release_mode(): def is_release_mode():
return os.environ.get(_DEBUG_LEVEL_ENV_VAR) == "0" return os.environ.get(_DEBUG_LEVEL_ENV_VAR) == "0"
@ -413,12 +414,12 @@ def gen_cmd_params(args):
def gen_cmd(params, unknown_params): def gen_cmd(params, unknown_params):
finalzied_params = finalize_and_sanitize(params) finalzied_params = finalize_and_sanitize(params)
cmd = ['./db_stress'] + [ cmd = [stress_cmd] + [
'--{0}={1}'.format(k, v) '--{0}={1}'.format(k, v)
for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)] for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
if k not in set(['test_type', 'simple', 'duration', 'interval', if k not in set(['test_type', 'simple', 'duration', 'interval',
'random_kill_odd', 'cf_consistency', 'txn', 'random_kill_odd', 'cf_consistency', 'txn',
'test_best_efforts_recovery', 'enable_ts']) 'test_best_efforts_recovery', 'enable_ts', 'stress_cmd'])
and v is not None] + unknown_params and v is not None] + unknown_params
return cmd return cmd
@ -669,6 +670,8 @@ def whitebox_crash_main(args, unknown_args):
def main(): def main():
global stress_cmd
parser = argparse.ArgumentParser(description="This script runs and kills \ parser = argparse.ArgumentParser(description="This script runs and kills \
db_stress multiple times") db_stress multiple times")
parser.add_argument("test_type", choices=["blackbox", "whitebox"]) parser.add_argument("test_type", choices=["blackbox", "whitebox"])
@ -677,6 +680,7 @@ def main():
parser.add_argument("--txn", action='store_true') parser.add_argument("--txn", action='store_true')
parser.add_argument("--test_best_efforts_recovery", action='store_true') parser.add_argument("--test_best_efforts_recovery", action='store_true')
parser.add_argument("--enable_ts", action='store_true') parser.add_argument("--enable_ts", action='store_true')
parser.add_argument("--stress_cmd")
all_params = dict(list(default_params.items()) all_params = dict(list(default_params.items())
+ list(blackbox_default_params.items()) + list(blackbox_default_params.items())
@ -698,6 +702,8 @@ def main():
(_TEST_DIR_ENV_VAR, test_tmpdir)) (_TEST_DIR_ENV_VAR, test_tmpdir))
sys.exit(1) sys.exit(1)
if args.stress_cmd:
stress_cmd = args.stress_cmd
if args.test_type == 'blackbox': if args.test_type == 'blackbox':
blackbox_crash_main(args, unknown_args) blackbox_crash_main(args, unknown_args)
if args.test_type == 'whitebox': if args.test_type == 'whitebox':

Loading…
Cancel
Save