crash_test to cover bottommost compression and some other changes (#6215)

Summary:
Several improvements to crash_test/stress_test:
(1) Stress_test to support an parameter of bottommost compression
(2) Rename those FLAGS_* variables that are not gflags to avoid confusion
(3) Crash_test to randomly generate compression type for bottommost compression with half the chance.
(4) Stress_test to sanitize unsupported compression type to snappy, so that crash_test to cover all possible compression types and people don't need to worry about they don't support all comrpession types in their environment.
(5) In crash_test, when generating db_stress command, sort arguments in alphabeta order, so that it is easier to find value for a specific argument.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6215

Test Plan: Run "make crash_test" for a while and see the botommost option shown in LOG files.

Differential Revision: D19171255

fbshipit-source-id: d7001e246c4ff9ee5760776eea0be97738650735
main
sdong 5 years ago committed by Facebook Github Bot
parent e55c2b3f0b
commit 338c149b92
  1. 2
      db_stress_tool/cf_consistency_stress.cc
  2. 15
      db_stress_tool/db_stress_common.cc
  3. 58
      db_stress_tool/db_stress_common.h
  4. 28
      db_stress_tool/db_stress_driver.cc
  5. 4
      db_stress_tool/db_stress_gflags.cc
  6. 64
      db_stress_tool/db_stress_test_base.cc
  7. 24
      db_stress_tool/db_stress_tool.cc
  8. 4
      db_stress_tool/no_batched_ops_stress.cc
  9. 10
      tools/db_crashtest.py
  10. 2
      util/compression.h

@ -295,7 +295,7 @@ class CfConsistencyStressTest : public StressTest {
// We need to clear DB including manifest files, so make a copy // We need to clear DB including manifest files, so make a copy
Options opt_copy = options_; Options opt_copy = options_;
opt_copy.env = FLAGS_env->target(); opt_copy.env = db_stress_env->target();
DestroyDB(checkpoint_dir, opt_copy); DestroyDB(checkpoint_dir, opt_copy);
Checkpoint* checkpoint = nullptr; Checkpoint* checkpoint = nullptr;

@ -12,10 +12,11 @@
#include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_common.h"
#include <cmath> #include <cmath>
rocksdb::DbStressEnvWrapper* FLAGS_env = nullptr; rocksdb::DbStressEnvWrapper* db_stress_env = nullptr;
enum rocksdb::CompressionType FLAGS_compression_type_e = enum rocksdb::CompressionType compression_type_e = rocksdb::kSnappyCompression;
enum rocksdb::CompressionType bottommost_compression_type_e =
rocksdb::kSnappyCompression; rocksdb::kSnappyCompression;
enum rocksdb::ChecksumType FLAGS_checksum_type_e = rocksdb::kCRC32c; enum rocksdb::ChecksumType checksum_type_e = rocksdb::kCRC32c;
enum RepFactory FLAGS_rep_factory = kSkipList; enum RepFactory FLAGS_rep_factory = kSkipList;
std::vector<double> sum_probs(100001); std::vector<double> sum_probs(100001);
int64_t zipf_sum_size = 100000; int64_t zipf_sum_size = 100000;
@ -102,10 +103,10 @@ void PoolSizeChangeThread(void* v) {
if (new_thread_pool_size < 1) { if (new_thread_pool_size < 1) {
new_thread_pool_size = 1; new_thread_pool_size = 1;
} }
FLAGS_env->SetBackgroundThreads(new_thread_pool_size, db_stress_env->SetBackgroundThreads(new_thread_pool_size,
rocksdb::Env::Priority::LOW); rocksdb::Env::Priority::LOW);
// Sleep up to 3 seconds // Sleep up to 3 seconds
FLAGS_env->SleepForMicroseconds( db_stress_env->SleepForMicroseconds(
thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval * thread->rand.Next() % FLAGS_compaction_thread_pool_adjust_interval *
1000 + 1000 +
1); 1);
@ -132,7 +133,7 @@ void DbVerificationThread(void* v) {
if (!shared->HasVerificationFailedYet()) { if (!shared->HasVerificationFailedYet()) {
stress_test->ContinuouslyVerifyDb(thread); stress_test->ContinuouslyVerifyDb(thread);
} }
FLAGS_env->SleepForMicroseconds( db_stress_env->SleepForMicroseconds(
thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 + thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 +
1); 1);
} }

@ -190,6 +190,7 @@ DECLARE_int32(nooverwritepercent);
DECLARE_int32(iterpercent); DECLARE_int32(iterpercent);
DECLARE_uint64(num_iterations); DECLARE_uint64(num_iterations);
DECLARE_string(compression_type); DECLARE_string(compression_type);
DECLARE_string(bottommost_compression_type);
DECLARE_int32(compression_max_dict_bytes); DECLARE_int32(compression_max_dict_bytes);
DECLARE_int32(compression_zstd_max_train_bytes); DECLARE_int32(compression_zstd_max_train_bytes);
DECLARE_string(checksum_type); DECLARE_string(checksum_type);
@ -227,10 +228,11 @@ const int kRandomValueMaxFactor = 3;
const int kValueMaxLen = 100; const int kValueMaxLen = 100;
// wrapped posix or hdfs environment // wrapped posix or hdfs environment
extern rocksdb::DbStressEnvWrapper* FLAGS_env; extern rocksdb::DbStressEnvWrapper* db_stress_env;
extern enum rocksdb::CompressionType FLAGS_compression_type_e; extern enum rocksdb::CompressionType compression_type_e;
extern enum rocksdb::ChecksumType FLAGS_checksum_type_e; extern enum rocksdb::CompressionType bottommost_compression_type_e;
extern enum rocksdb::ChecksumType checksum_type_e;
enum RepFactory { kSkipList, kHashSkipList, kVectorRep }; enum RepFactory { kSkipList, kHashSkipList, kVectorRep };
@ -255,25 +257,37 @@ inline enum rocksdb::CompressionType StringToCompressionType(
const char* ctype) { const char* ctype) {
assert(ctype); assert(ctype);
if (!strcasecmp(ctype, "none")) rocksdb::CompressionType ret_compression_type;
return rocksdb::kNoCompression;
else if (!strcasecmp(ctype, "snappy")) if (!strcasecmp(ctype, "disable")) {
return rocksdb::kSnappyCompression; ret_compression_type = rocksdb::kDisableCompressionOption;
else if (!strcasecmp(ctype, "zlib")) } else if (!strcasecmp(ctype, "none")) {
return rocksdb::kZlibCompression; ret_compression_type = rocksdb::kNoCompression;
else if (!strcasecmp(ctype, "bzip2")) } else if (!strcasecmp(ctype, "snappy")) {
return rocksdb::kBZip2Compression; ret_compression_type = rocksdb::kSnappyCompression;
else if (!strcasecmp(ctype, "lz4")) } else if (!strcasecmp(ctype, "zlib")) {
return rocksdb::kLZ4Compression; ret_compression_type = rocksdb::kZlibCompression;
else if (!strcasecmp(ctype, "lz4hc")) } else if (!strcasecmp(ctype, "bzip2")) {
return rocksdb::kLZ4HCCompression; ret_compression_type = rocksdb::kBZip2Compression;
else if (!strcasecmp(ctype, "xpress")) } else if (!strcasecmp(ctype, "lz4")) {
return rocksdb::kXpressCompression; ret_compression_type = rocksdb::kLZ4Compression;
else if (!strcasecmp(ctype, "zstd")) } else if (!strcasecmp(ctype, "lz4hc")) {
return rocksdb::kZSTD; ret_compression_type = rocksdb::kLZ4HCCompression;
} else if (!strcasecmp(ctype, "xpress")) {
fprintf(stderr, "Cannot parse compression type '%s'\n", ctype); ret_compression_type = rocksdb::kXpressCompression;
return rocksdb::kSnappyCompression; // default value } else if (!strcasecmp(ctype, "zstd")) {
ret_compression_type = rocksdb::kZSTD;
} else {
fprintf(stderr, "Cannot parse compression type '%s'\n", ctype);
ret_compression_type = rocksdb::kSnappyCompression; // default value
}
if (ret_compression_type != rocksdb::kDisableCompressionOption &&
!CompressionTypeSupported(ret_compression_type)) {
// Use no compression will be more portable but considering this is
// only a stress test and snappy is widely available. Use snappy here.
ret_compression_type = rocksdb::kSnappyCompression;
}
return ret_compression_type;
} }
inline enum rocksdb::ChecksumType StringToChecksumType(const char* ctype) { inline enum rocksdb::ChecksumType StringToChecksumType(const char* ctype) {

@ -56,29 +56,29 @@ void ThreadBody(void* v) {
bool RunStressTest(StressTest* stress) { bool RunStressTest(StressTest* stress) {
stress->InitDb(); stress->InitDb();
SharedState shared(FLAGS_env, stress); SharedState shared(db_stress_env, stress);
if (FLAGS_read_only) { if (FLAGS_read_only) {
stress->InitReadonlyDb(&shared); stress->InitReadonlyDb(&shared);
} }
uint32_t n = shared.GetNumThreads(); uint32_t n = shared.GetNumThreads();
uint64_t now = FLAGS_env->NowMicros(); uint64_t now = db_stress_env->NowMicros();
fprintf(stdout, "%s Initializing worker threads\n", fprintf(stdout, "%s Initializing worker threads\n",
FLAGS_env->TimeToString(now / 1000000).c_str()); db_stress_env->TimeToString(now / 1000000).c_str());
std::vector<ThreadState*> threads(n); std::vector<ThreadState*> threads(n);
for (uint32_t i = 0; i < n; i++) { for (uint32_t i = 0; i < n; i++) {
threads[i] = new ThreadState(i, &shared); threads[i] = new ThreadState(i, &shared);
FLAGS_env->StartThread(ThreadBody, threads[i]); db_stress_env->StartThread(ThreadBody, threads[i]);
} }
ThreadState bg_thread(0, &shared); ThreadState bg_thread(0, &shared);
if (FLAGS_compaction_thread_pool_adjust_interval > 0) { if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread); db_stress_env->StartThread(PoolSizeChangeThread, &bg_thread);
} }
ThreadState continuous_verification_thread(0, &shared); ThreadState continuous_verification_thread(0, &shared);
if (FLAGS_continuous_verification_interval > 0) { if (FLAGS_continuous_verification_interval > 0) {
FLAGS_env->StartThread(DbVerificationThread, db_stress_env->StartThread(DbVerificationThread,
&continuous_verification_thread); &continuous_verification_thread);
} }
// Each thread goes through the following states: // Each thread goes through the following states:
@ -98,9 +98,9 @@ bool RunStressTest(StressTest* stress) {
} }
} }
now = FLAGS_env->NowMicros(); now = db_stress_env->NowMicros();
fprintf(stdout, "%s Starting database operations\n", fprintf(stdout, "%s Starting database operations\n",
FLAGS_env->TimeToString(now / 1000000).c_str()); db_stress_env->TimeToString(now / 1000000).c_str());
shared.SetStart(); shared.SetStart();
shared.GetCondVar()->SignalAll(); shared.GetCondVar()->SignalAll();
@ -108,13 +108,13 @@ bool RunStressTest(StressTest* stress) {
shared.GetCondVar()->Wait(); shared.GetCondVar()->Wait();
} }
now = FLAGS_env->NowMicros(); now = db_stress_env->NowMicros();
if (FLAGS_test_batches_snapshots) { if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "%s Limited verification already done during gets\n", fprintf(stdout, "%s Limited verification already done during gets\n",
FLAGS_env->TimeToString((uint64_t)now / 1000000).c_str()); db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
} else { } else {
fprintf(stdout, "%s Starting verification\n", fprintf(stdout, "%s Starting verification\n",
FLAGS_env->TimeToString((uint64_t)now / 1000000).c_str()); db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
} }
shared.SetStartVerify(); shared.SetStartVerify();
@ -133,10 +133,10 @@ bool RunStressTest(StressTest* stress) {
delete threads[i]; delete threads[i];
threads[i] = nullptr; threads[i] = nullptr;
} }
now = FLAGS_env->NowMicros(); now = db_stress_env->NowMicros();
if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) { if (!FLAGS_test_batches_snapshots && !shared.HasVerificationFailedYet()) {
fprintf(stdout, "%s Verification successful\n", fprintf(stdout, "%s Verification successful\n",
FLAGS_env->TimeToString(now / 1000000).c_str()); db_stress_env->TimeToString(now / 1000000).c_str());
} }
stress->PrintStatistics(); stress->PrintStatistics();

@ -540,6 +540,10 @@ DEFINE_int32(compression_zstd_max_train_bytes, 0,
"Maximum size of training data passed to zstd's dictionary " "Maximum size of training data passed to zstd's dictionary "
"trainer."); "trainer.");
DEFINE_string(bottommost_compression_type, "disable",
"Algorithm to use to compress bottommost level of the database. "
"\"disable\" means disabling the feature");
DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks"); DEFINE_string(checksum_type, "kCRC32c", "Algorithm to use to checksum blocks");
DEFINE_string(hdfs, "", "Name of hdfs environment"); DEFINE_string(hdfs, "", "Name of hdfs environment");

@ -32,17 +32,15 @@ StressTest::StressTest()
cmp_db_(nullptr) { cmp_db_(nullptr) {
if (FLAGS_destroy_db_initially) { if (FLAGS_destroy_db_initially) {
std::vector<std::string> files; std::vector<std::string> files;
FLAGS_env->GetChildren(FLAGS_db, &files); db_stress_env->GetChildren(FLAGS_db, &files);
for (unsigned int i = 0; i < files.size(); i++) { for (unsigned int i = 0; i < files.size(); i++) {
if (Slice(files[i]).starts_with("heap-")) { if (Slice(files[i]).starts_with("heap-")) {
FLAGS_env->DeleteFile(FLAGS_db + "/" + files[i]); db_stress_env->DeleteFile(FLAGS_db + "/" + files[i]);
} }
} }
Options options; Options options;
// Remove files without preserving manfiest files // Remove files without preserving manfiest files
options.env = FLAGS_env->target();
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
const Status s = !FLAGS_use_blob_db const Status s = !FLAGS_use_blob_db
? DestroyDB(FLAGS_db, options) ? DestroyDB(FLAGS_db, options)
@ -187,28 +185,28 @@ bool StressTest::BuildOptionsTable() {
} }
void StressTest::InitDb() { void StressTest::InitDb() {
uint64_t now = FLAGS_env->NowMicros(); uint64_t now = db_stress_env->NowMicros();
fprintf(stdout, "%s Initializing db_stress\n", fprintf(stdout, "%s Initializing db_stress\n",
FLAGS_env->TimeToString(now / 1000000).c_str()); db_stress_env->TimeToString(now / 1000000).c_str());
PrintEnv(); PrintEnv();
Open(); Open();
BuildOptionsTable(); BuildOptionsTable();
} }
void StressTest::InitReadonlyDb(SharedState* shared) { void StressTest::InitReadonlyDb(SharedState* shared) {
uint64_t now = FLAGS_env->NowMicros(); uint64_t now = db_stress_env->NowMicros();
fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n", fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
FLAGS_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key); db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared); PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
} }
bool StressTest::VerifySecondaries() { bool StressTest::VerifySecondaries() {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (FLAGS_test_secondary) { if (FLAGS_test_secondary) {
uint64_t now = FLAGS_env->NowMicros(); uint64_t now = db_stress_env->NowMicros();
fprintf( fprintf(
stdout, "%s Start to verify secondaries against primary\n", stdout, "%s Start to verify secondaries against primary\n",
FLAGS_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str()); db_stress_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
} }
for (size_t k = 0; k != secondaries_.size(); ++k) { for (size_t k = 0; k != secondaries_.size(); ++k) {
Status s = secondaries_[k]->TryCatchUpWithPrimary(); Status s = secondaries_[k]->TryCatchUpWithPrimary();
@ -250,10 +248,10 @@ bool StressTest::VerifySecondaries() {
} }
} }
if (FLAGS_test_secondary) { if (FLAGS_test_secondary) {
uint64_t now = FLAGS_env->NowMicros(); uint64_t now = db_stress_env->NowMicros();
fprintf( fprintf(
stdout, "%s Verification of secondaries succeeded\n", stdout, "%s Verification of secondaries succeeded\n",
FLAGS_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str()); db_stress_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
return true; return true;
@ -410,9 +408,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
#endif #endif
db_preload_finished_.store(true); db_preload_finished_.store(true);
auto now = FLAGS_env->NowMicros(); auto now = db_stress_env->NowMicros();
fprintf(stdout, "%s Reopening database in read-only\n", fprintf(stdout, "%s Reopening database in read-only\n",
FLAGS_env->TimeToString(now / 1000000).c_str()); db_stress_env->TimeToString(now / 1000000).c_str());
// Reopen as read-only, can ignore all options related to updates // Reopen as read-only, can ignore all options related to updates
Open(); Open();
} else { } else {
@ -1134,14 +1132,14 @@ Status StressTest::TestBackupRestore(
std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid); std::string restore_dir = FLAGS_db + "/.restore" + ToString(thread->tid);
BackupableDBOptions backup_opts(backup_dir); BackupableDBOptions backup_opts(backup_dir);
BackupEngine* backup_engine = nullptr; BackupEngine* backup_engine = nullptr;
Status s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine); Status s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
if (s.ok()) { if (s.ok()) {
s = backup_engine->CreateNewBackup(db_); s = backup_engine->CreateNewBackup(db_);
} }
if (s.ok()) { if (s.ok()) {
delete backup_engine; delete backup_engine;
backup_engine = nullptr; backup_engine = nullptr;
s = BackupEngine::Open(FLAGS_env, backup_opts, &backup_engine); s = BackupEngine::Open(db_stress_env, backup_opts, &backup_engine);
} }
if (s.ok()) { if (s.ok()) {
s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */, s = backup_engine->RestoreDBFromLatestBackup(restore_dir /* db_dir */,
@ -1218,7 +1216,7 @@ Status StressTest::TestCheckpoint(ThreadState* thread,
FLAGS_db + "/.checkpoint" + ToString(thread->tid); FLAGS_db + "/.checkpoint" + ToString(thread->tid);
Options tmp_opts(options_); Options tmp_opts(options_);
tmp_opts.listeners.clear(); tmp_opts.listeners.clear();
tmp_opts.env = FLAGS_env->target(); tmp_opts.env = db_stress_env->target();
DestroyDB(checkpoint_dir, tmp_opts); DestroyDB(checkpoint_dir, tmp_opts);
@ -1355,7 +1353,7 @@ Status StressTest::TestPauseBackground(ThreadState* thread) {
// 1 chance in 625 of pausing full 16s.) // 1 chance in 625 of pausing full 16s.)
int pwr2_micros = int pwr2_micros =
std::min(thread->rand.Uniform(25), thread->rand.Uniform(25)); std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
FLAGS_env->SleepForMicroseconds(1 << pwr2_micros); db_stress_env->SleepForMicroseconds(1 << pwr2_micros);
return db_->ContinueBackgroundWork(); return db_->ContinueBackgroundWork();
} }
@ -1573,9 +1571,13 @@ void StressTest::PrintEnv() const {
fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update); fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update);
fprintf(stdout, "Num keys per lock : %d\n", fprintf(stdout, "Num keys per lock : %d\n",
1 << FLAGS_log2_keys_per_lock); 1 << FLAGS_log2_keys_per_lock);
std::string compression = CompressionTypeToString(FLAGS_compression_type_e); std::string compression = CompressionTypeToString(compression_type_e);
fprintf(stdout, "Compression : %s\n", compression.c_str()); fprintf(stdout, "Compression : %s\n", compression.c_str());
std::string checksum = ChecksumTypeToString(FLAGS_checksum_type_e); std::string bottommost_compression =
CompressionTypeToString(bottommost_compression_type_e);
fprintf(stdout, "Bottommost Compression : %s\n",
bottommost_compression.c_str());
std::string checksum = ChecksumTypeToString(checksum_type_e);
fprintf(stdout, "Checksum type : %s\n", checksum.c_str()); fprintf(stdout, "Checksum type : %s\n", checksum.c_str());
fprintf(stdout, "Bloom bits / key : %s\n", fprintf(stdout, "Bloom bits / key : %s\n",
FormatDoubleParam(FLAGS_bloom_bits).c_str()); FormatDoubleParam(FLAGS_bloom_bits).c_str());
@ -1633,7 +1635,7 @@ void StressTest::Open() {
block_based_options.cache_index_and_filter_blocks = block_based_options.cache_index_and_filter_blocks =
FLAGS_cache_index_and_filter_blocks; FLAGS_cache_index_and_filter_blocks;
block_based_options.block_cache_compressed = compressed_cache_; block_based_options.block_cache_compressed = compressed_cache_;
block_based_options.checksum = FLAGS_checksum_type_e; block_based_options.checksum = checksum_type_e;
block_based_options.block_size = FLAGS_block_size; block_based_options.block_size = FLAGS_block_size;
block_based_options.format_version = block_based_options.format_version =
static_cast<uint32_t>(FLAGS_format_version); static_cast<uint32_t>(FLAGS_format_version);
@ -1667,7 +1669,7 @@ void StressTest::Open() {
} }
options_.max_open_files = FLAGS_open_files; options_.max_open_files = FLAGS_open_files;
options_.statistics = dbstats; options_.statistics = dbstats;
options_.env = FLAGS_env; options_.env = db_stress_env;
options_.use_fsync = FLAGS_use_fsync; options_.use_fsync = FLAGS_use_fsync;
options_.compaction_readahead_size = FLAGS_compaction_readahead_size; options_.compaction_readahead_size = FLAGS_compaction_readahead_size;
options_.allow_mmap_reads = FLAGS_mmap_read; options_.allow_mmap_reads = FLAGS_mmap_read;
@ -1687,7 +1689,8 @@ void StressTest::Open() {
FLAGS_level0_slowdown_writes_trigger; FLAGS_level0_slowdown_writes_trigger;
options_.level0_file_num_compaction_trigger = options_.level0_file_num_compaction_trigger =
FLAGS_level0_file_num_compaction_trigger; FLAGS_level0_file_num_compaction_trigger;
options_.compression = FLAGS_compression_type_e; options_.compression = compression_type_e;
options_.bottommost_compression = bottommost_compression_type_e;
options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes; options_.compression_opts.max_dict_bytes = FLAGS_compression_max_dict_bytes;
options_.compression_opts.zstd_max_train_bytes = options_.compression_opts.zstd_max_train_bytes =
FLAGS_compression_zstd_max_train_bytes; FLAGS_compression_zstd_max_train_bytes;
@ -1725,9 +1728,9 @@ void StressTest::Open() {
#else #else
DBOptions db_options; DBOptions db_options;
std::vector<ColumnFamilyDescriptor> cf_descriptors; std::vector<ColumnFamilyDescriptor> cf_descriptors;
Status s = LoadOptionsFromFile(FLAGS_options_file, FLAGS_env, &db_options, Status s = LoadOptionsFromFile(FLAGS_options_file, db_stress_env,
&cf_descriptors); &db_options, &cf_descriptors);
db_options.env = new DbStressEnvWrapper(FLAGS_env); db_options.env = new DbStressEnvWrapper(db_stress_env);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Unable to load options file %s --- %s\n", fprintf(stderr, "Unable to load options file %s --- %s\n",
FLAGS_options_file.c_str(), s.ToString().c_str()); FLAGS_options_file.c_str(), s.ToString().c_str());
@ -1912,7 +1915,7 @@ void StressTest::Open() {
// TODO(yanqin) support max_open_files != -1 for secondary instance. // TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1; tmp_opts.max_open_files = -1;
tmp_opts.statistics = dbstats_secondaries; tmp_opts.statistics = dbstats_secondaries;
tmp_opts.env = FLAGS_env; tmp_opts.env = db_stress_env;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) { for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
const std::string secondary_path = const std::string secondary_path =
FLAGS_secondaries_base + "/" + std::to_string(i); FLAGS_secondaries_base + "/" + std::to_string(i);
@ -1933,7 +1936,7 @@ void StressTest::Open() {
Options tmp_opts; Options tmp_opts;
// TODO(yanqin) support max_open_files != -1 for secondary instance. // TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1; tmp_opts.max_open_files = -1;
tmp_opts.env = FLAGS_env; tmp_opts.env = db_stress_env;
std::string secondary_path = FLAGS_secondaries_base + "/cmp_database"; std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &cmp_cfhs_, &cmp_db_); cf_descriptors, &cmp_cfhs_, &cmp_db_);
@ -2020,9 +2023,10 @@ void StressTest::Reopen(ThreadState* thread) {
secondaries_.clear(); secondaries_.clear();
num_times_reopened_++; num_times_reopened_++;
auto now = FLAGS_env->NowMicros(); auto now = db_stress_env->NowMicros();
fprintf(stdout, "%s Reopening database for the %dth time\n", fprintf(stdout, "%s Reopening database for the %dth time\n",
FLAGS_env->TimeToString(now / 1000000).c_str(), num_times_reopened_); db_stress_env->TimeToString(now / 1000000).c_str(),
num_times_reopened_);
Open(); Open();
} }
} // namespace rocksdb } // namespace rocksdb

@ -45,9 +45,10 @@ int db_stress_tool(int argc, char** argv) {
dbstats_secondaries = rocksdb::CreateDBStatistics(); dbstats_secondaries = rocksdb::CreateDBStatistics();
} }
} }
FLAGS_compression_type_e = compression_type_e = StringToCompressionType(FLAGS_compression_type.c_str());
StringToCompressionType(FLAGS_compression_type.c_str()); bottommost_compression_type_e =
FLAGS_checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str()); StringToCompressionType(FLAGS_bottommost_compression_type.c_str());
checksum_type_e = StringToChecksumType(FLAGS_checksum_type.c_str());
Env* raw_env; Env* raw_env;
@ -67,16 +68,16 @@ int db_stress_tool(int argc, char** argv) {
raw_env = Env::Default(); raw_env = Env::Default();
} }
env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env); env_wrapper_guard = std::make_shared<DbStressEnvWrapper>(raw_env);
FLAGS_env = env_wrapper_guard.get(); db_stress_env = env_wrapper_guard.get();
FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str()); FLAGS_rep_factory = StringToRepFactory(FLAGS_memtablerep.c_str());
// The number of background threads should be at least as much the // The number of background threads should be at least as much the
// max number of concurrent compactions. // max number of concurrent compactions.
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions, db_stress_env->SetBackgroundThreads(FLAGS_max_background_compactions,
rocksdb::Env::Priority::LOW); rocksdb::Env::Priority::LOW);
FLAGS_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads, db_stress_env->SetBackgroundThreads(FLAGS_num_bottom_pri_threads,
rocksdb::Env::Priority::BOTTOM); rocksdb::Env::Priority::BOTTOM);
if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size < 0) { if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size < 0) {
fprintf(stderr, fprintf(stderr,
"Error: prefixpercent is non-zero while prefix_size is " "Error: prefixpercent is non-zero while prefix_size is "
@ -166,7 +167,7 @@ int db_stress_tool(int argc, char** argv) {
// Choose a location for the test database if none given with --db=<path> // Choose a location for the test database if none given with --db=<path>
if (FLAGS_db.empty()) { if (FLAGS_db.empty()) {
std::string default_db_path; std::string default_db_path;
FLAGS_env->GetTestDirectory(&default_db_path); db_stress_env->GetTestDirectory(&default_db_path);
default_db_path += "/dbstress"; default_db_path += "/dbstress";
FLAGS_db = default_db_path; FLAGS_db = default_db_path;
} }
@ -174,9 +175,10 @@ int db_stress_tool(int argc, char** argv) {
if ((FLAGS_test_secondary || FLAGS_continuous_verification_interval > 0) && if ((FLAGS_test_secondary || FLAGS_continuous_verification_interval > 0) &&
FLAGS_secondaries_base.empty()) { FLAGS_secondaries_base.empty()) {
std::string default_secondaries_path; std::string default_secondaries_path;
FLAGS_env->GetTestDirectory(&default_secondaries_path); db_stress_env->GetTestDirectory(&default_secondaries_path);
default_secondaries_path += "/dbstress_secondaries"; default_secondaries_path += "/dbstress_secondaries";
rocksdb::Status s = FLAGS_env->CreateDirIfMissing(default_secondaries_path); rocksdb::Status s =
db_stress_env->CreateDirIfMissing(default_secondaries_path);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "Failed to create directory %s: %s\n", fprintf(stderr, "Failed to create directory %s: %s\n",
default_secondaries_path.c_str(), s.ToString().c_str()); default_secondaries_path.c_str(), s.ToString().c_str());

@ -446,10 +446,10 @@ class NonBatchedOpsStressTest : public StressTest {
const std::string sst_filename = const std::string sst_filename =
FLAGS_db + "/." + ToString(thread->tid) + ".sst"; FLAGS_db + "/." + ToString(thread->tid) + ".sst";
Status s; Status s;
if (FLAGS_env->FileExists(sst_filename).ok()) { if (db_stress_env->FileExists(sst_filename).ok()) {
// Maybe we terminated abnormally before, so cleanup to give this file // Maybe we terminated abnormally before, so cleanup to give this file
// ingestion a clean slate // ingestion a clean slate
s = FLAGS_env->DeleteFile(sst_filename); s = db_stress_env->DeleteFile(sst_filename);
} }
SstFileWriter sst_file_writer(EnvOptions(options_), options_); SstFileWriter sst_file_writer(EnvOptions(options_), options_);

@ -33,7 +33,12 @@ default_params = {
"cache_size": 1048576, "cache_size": 1048576,
"checkpoint_one_in": 1000000, "checkpoint_one_in": 1000000,
"compression_type": lambda: random.choice( "compression_type": lambda: random.choice(
["snappy", "none", "zlib"]), ["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress", "zstd"]),
"bottommost_compression_type": lambda:
"disable" if random.randint(0, 1) == 0 else
random.choice(
["none", "snappy", "zlib", "bzip2", "lz4", "lz4hc", "xpress",
"zstd"]),
"checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]), "checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]),
"compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1), "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
"compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1), "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
@ -262,9 +267,10 @@ def gen_cmd_params(args):
def gen_cmd(params, unknown_params): def gen_cmd(params, unknown_params):
finalzied_params = finalize_and_sanitize(params)
cmd = ['./db_stress'] + [ cmd = ['./db_stress'] + [
'--{0}={1}'.format(k, v) '--{0}={1}'.format(k, v)
for k, v in finalize_and_sanitize(params).items() for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
if k not in set(['test_type', 'simple', 'duration', 'interval', if k not in set(['test_type', 'simple', 'duration', 'interval',
'random_kill_odd', 'cf_consistency', 'txn']) 'random_kill_odd', 'cf_consistency', 'txn'])
and v is not None] + unknown_params and v is not None] + unknown_params

@ -563,6 +563,8 @@ inline std::string CompressionTypeToString(CompressionType compression_type) {
return "ZSTD"; return "ZSTD";
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
return "ZSTDNotFinal"; return "ZSTDNotFinal";
case kDisableCompressionOption:
return "DisableOption";
default: default:
assert(false); assert(false);
return ""; return "";

Loading…
Cancel
Save