From 670a916d01eaaea284bb6e0768ceacd23b8442a5 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 20 Dec 2019 08:46:52 -0800 Subject: [PATCH] Add more verification to db_stress (#6173) Summary: Currently, db_stress performs verification by calling `VerifyDb()` at the end of test and optionally before tests start. In case of corruption or incorrect result, it will be too late. This PR adds more verification in two ways. 1. For cf consistency test, each test thread takes a snapshot and verifies every N ops. N is configurable via `-verify_db_one_in`. This option is not supported in other stress tests. 2. For cf consistency test, we use another background thread in which a secondary instance periodically tails the primary (interval is configurable). We verify the secondary. Once an error is detected, we terminate the test and report. This does not affect other stress tests. Test plan (devserver) ``` $./db_stress -test_cf_consistency -verify_db_one_in=0 -ops_per_thread=100000 -continuous_verification_interval=100 $./db_stress -test_cf_consistency -verify_db_one_in=1000 -ops_per_thread=10000 -continuous_verification_interval=0 $make crash_test ``` Pull Request resolved: https://github.com/facebook/rocksdb/pull/6173 Differential Revision: D19047367 Pulled By: riversand963 fbshipit-source-id: aeed584ad71f9310c111445f34975e5ab47a0615 --- Makefile | 2 +- db_stress_tool/cf_consistency_stress.cc | 72 +++++++++++++++++-- db_stress_tool/db_stress_common.cc | 34 ++++++++- db_stress_tool/db_stress_common.h | 6 +- db_stress_tool/db_stress_driver.cc | 16 +++-- db_stress_tool/db_stress_gflags.cc | 10 ++- db_stress_tool/db_stress_shared_state.cc | 1 - db_stress_tool/db_stress_shared_state.h | 34 +++++++-- db_stress_tool/db_stress_test_base.cc | 89 ++++++++++++++++-------- db_stress_tool/db_stress_test_base.h | 5 ++ db_stress_tool/db_stress_tool.cc | 18 +++-- db_stress_tool/no_batched_ops_stress.cc | 7 +- tools/db_crashtest.py | 4 +- 13 files changed, 238 insertions(+), 60 deletions(-) diff --git a/Makefile b/Makefile index ad98ee7bc..c75c140af 100644 --- a/Makefile +++ b/Makefile @@ -745,7 +745,7 @@ endif # PLATFORM_SHARED_EXT dbg rocksdbjavastatic rocksdbjava install install-static install-shared uninstall \ analyze tools tools_lib \ blackbox_crash_test_with_atomic_flush whitebox_crash_test_with_atomic_flush \ - blackbox_crash_test_with_txn whitebox_crash_test_with_txn + blackbox_crash_test_with_txn whitebox_crash_test_with_txn all: $(LIBRARY) $(BENCHMARKS) tools tools_lib test_libs $(TESTS) diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index c3b11edbc..cb61880c1 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -15,7 +15,7 @@ class CfConsistencyStressTest : public StressTest { public: CfConsistencyStressTest() : batch_id_(0) {} - virtual ~CfConsistencyStressTest() {} + ~CfConsistencyStressTest() override {} Status TestPut(ThreadState* thread, WriteOptions& write_opts, const ReadOptions& /* read_opts */, @@ -350,6 +350,12 @@ class CfConsistencyStressTest : public StressTest { // iterate the memtable using this iterator any more, although the memtable // contains the most up-to-date key-values. options.total_order_seek = true; + const auto ss_deleter = [this](const Snapshot* ss) { + db_->ReleaseSnapshot(ss); + }; + std::unique_ptr snapshot_guard( + db_->GetSnapshot(), ss_deleter); + options.snapshot = snapshot_guard.get(); assert(thread != nullptr); auto shared = thread->shared; std::vector> iters(column_families_.size()); @@ -388,9 +394,6 @@ class CfConsistencyStressTest : public StressTest { shared->SetVerificationFailure(); } } - if (status.ok()) { - fprintf(stdout, "Finished scanning all column families.\n"); - } break; } else if (valid_cnt != iters.size()) { shared->SetVerificationFailure(); @@ -491,6 +494,67 @@ class CfConsistencyStressTest : public StressTest { } while (true); } +#ifndef ROCKSDB_LITE + void ContinuouslyVerifyDb(ThreadState* thread) const override { + assert(thread); + Status status; + + DB* db_ptr = cmp_db_ ? cmp_db_ : db_; + const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_; + const auto ss_deleter = [&](const Snapshot* ss) { + db_ptr->ReleaseSnapshot(ss); + }; + std::unique_ptr snapshot_guard( + db_ptr->GetSnapshot(), ss_deleter); + if (cmp_db_) { + status = cmp_db_->TryCatchUpWithPrimary(); + } + SharedState* shared = thread->shared; + assert(shared); + if (!status.ok()) { + shared->SetShouldStopTest(); + return; + } + assert(cmp_db_ || snapshot_guard.get()); + const auto checksum_column_family = [](Iterator* iter, + uint32_t* checksum) -> Status { + assert(nullptr != checksum); + uint32_t ret = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ret = crc32c::Extend(ret, iter->key().data(), iter->key().size()); + ret = crc32c::Extend(ret, iter->value().data(), iter->value().size()); + } + *checksum = ret; + return iter->status(); + }; + ReadOptions ropts; + ropts.total_order_seek = true; + ropts.snapshot = snapshot_guard.get(); + uint32_t crc = 0; + { + // Compute crc for all key-values of default column family. + std::unique_ptr it(db_ptr->NewIterator(ropts)); + status = checksum_column_family(it.get(), &crc); + } + uint32_t tmp_crc = 0; + if (status.ok()) { + for (ColumnFamilyHandle* cfh : cfhs) { + if (cfh == db_ptr->DefaultColumnFamily()) { + continue; + } + std::unique_ptr it(db_ptr->NewIterator(ropts, cfh)); + status = checksum_column_family(it.get(), &tmp_crc); + if (!status.ok() || tmp_crc != crc) { + break; + } + } + } + if (!status.ok() || tmp_crc != crc) { + shared->SetShouldStopTest(); + } + } +#endif // !ROCKSDB_LITE + std::vector GenerateColumnFamilies( const int /* num_column_families */, int /* rand_column_family */) const override { diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index 59fede610..75aa8e5fc 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -85,9 +85,11 @@ void PoolSizeChangeThread(void* v) { while (true) { { MutexLock l(shared->GetMutex()); - if (shared->ShoudStopBgThread()) { - shared->SetBgThreadFinish(); - shared->GetCondVar()->SignalAll(); + if (shared->ShouldStopBgThread()) { + shared->IncBgThreadsFinished(); + if (shared->BgThreadsFinished()) { + shared->GetCondVar()->SignalAll(); + } return; } } @@ -110,6 +112,32 @@ void PoolSizeChangeThread(void* v) { } } +void DbVerificationThread(void* v) { + assert(FLAGS_continuous_verification_interval > 0); + auto* thread = reinterpret_cast(v); + SharedState* shared = thread->shared; + StressTest* stress_test = shared->GetStressTest(); + assert(stress_test != nullptr); + while (true) { + { + MutexLock l(shared->GetMutex()); + if (shared->ShouldStopBgThread()) { + shared->IncBgThreadsFinished(); + if (shared->BgThreadsFinished()) { + shared->GetCondVar()->SignalAll(); + } + return; + } + } + if (!shared->HasVerificationFailedYet()) { + stress_test->ContinuouslyVerifyDb(thread); + } + FLAGS_env->SleepForMicroseconds( + thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 + + 1); + } +} + void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) { if (!FLAGS_verbose) { return; diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index ade10a426..bb3db69a0 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -141,7 +141,7 @@ DECLARE_bool(partition_filters); DECLARE_int32(index_type); DECLARE_string(db); DECLARE_string(secondaries_base); -DECLARE_bool(enable_secondary); +DECLARE_bool(test_secondary); DECLARE_string(expected_values_path); DECLARE_bool(verify_checksum); DECLARE_bool(mmap_read); @@ -208,6 +208,8 @@ DECLARE_bool(write_dbid_to_manifest); DECLARE_uint64(max_write_batch_group_size_bytes); DECLARE_bool(level_compaction_dynamic_level_bytes); DECLARE_int32(verify_checksum_one_in); +DECLARE_int32(verify_db_one_in); +DECLARE_int32(continuous_verification_interval); const long KB = 1024; const int kRandomValueMaxFactor = 3; @@ -367,6 +369,8 @@ extern inline void SanitizeDoubleParam(double* param) { extern void PoolSizeChangeThread(void* v); +extern void DbVerificationThread(void* v); + extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz); extern int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration); diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 87292df47..b0ec49cab 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -75,6 +75,11 @@ bool RunStressTest(StressTest* stress) { if (FLAGS_compaction_thread_pool_adjust_interval > 0) { FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread); } + ThreadState continuous_verification_thread(0, &shared); + if (FLAGS_continuous_verification_interval > 0) { + FLAGS_env->StartThread(DbVerificationThread, + &continuous_verification_thread); + } // Each thread goes through the following states: // initializing -> wait for others to init -> read/populate/depopulate @@ -87,9 +92,9 @@ bool RunStressTest(StressTest* stress) { } if (shared.ShouldVerifyAtBeginning()) { if (shared.HasVerificationFailedYet()) { - printf("Crash-recovery verification failed :(\n"); + fprintf(stderr, "Crash-recovery verification failed :(\n"); } else { - printf("Crash-recovery verification passed :)\n"); + fprintf(stdout, "Crash-recovery verification passed :)\n"); } } @@ -135,10 +140,11 @@ bool RunStressTest(StressTest* stress) { } stress->PrintStatistics(); - if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + if (FLAGS_compaction_thread_pool_adjust_interval > 0 || + FLAGS_continuous_verification_interval > 0) { MutexLock l(shared.GetMutex()); shared.SetShouldStopBgThread(); - while (!shared.BgThreadFinished()) { + while (!shared.BgThreadsFinished()) { shared.GetCondVar()->Wait(); } } @@ -148,7 +154,7 @@ bool RunStressTest(StressTest* stress) { } if (shared.HasVerificationFailedYet()) { - printf("Verification failed :(\n"); + fprintf(stderr, "Verification failed :(\n"); return false; } return true; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index a66fbbf31..722292e52 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -307,7 +307,7 @@ DEFINE_string(db, "", "Use the db with the following name."); DEFINE_string(secondaries_base, "", "Use this path as the base path for secondary instances."); -DEFINE_bool(enable_secondary, false, "Enable secondary instance."); +DEFINE_bool(test_secondary, false, "Test secondary instance."); DEFINE_string( expected_values_path, "", @@ -582,4 +582,12 @@ DEFINE_int32(verify_checksum_one_in, 0, " checksum verification of all the files in the database once for" " every N ops on average. 0 indicates that calls to" " VerifyChecksum() are disabled."); +DEFINE_int32(verify_db_one_in, 0, + "If non-zero, call VerifyDb() once for every N ops. 0 indicates " + "that VerifyDb() will not be called in OperateDb(). Note that " + "enabling this can slow down tests."); + +DEFINE_int32(continuous_verification_interval, 1000, + "While test is running, verify db every N milliseconds. 0 " + "disables continuous verification."); #endif // GFLAGS diff --git a/db_stress_tool/db_stress_shared_state.cc b/db_stress_tool/db_stress_shared_state.cc index 82b0e5ea2..c6ff73ce2 100644 --- a/db_stress_tool/db_stress_shared_state.cc +++ b/db_stress_tool/db_stress_shared_state.cc @@ -13,7 +13,6 @@ namespace rocksdb { const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; -// indicates a key should definitely be deleted const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; } // namespace rocksdb #endif // GFLAGS diff --git a/db_stress_tool/db_stress_shared_state.h b/db_stress_tool/db_stress_shared_state.h index ffb420e5d..08c5d5254 100644 --- a/db_stress_tool/db_stress_shared_state.h +++ b/db_stress_tool/db_stress_shared_state.h @@ -22,6 +22,8 @@ DECLARE_int32(nooverwritepercent); DECLARE_string(expected_values_path); DECLARE_int32(clear_column_family_one_in); DECLARE_bool(test_batches_snapshots); +DECLARE_int32(compaction_thread_pool_adjust_interval); +DECLARE_int32(continuous_verification_interval); namespace rocksdb { class StressTest; @@ -47,17 +49,19 @@ class SharedState { num_done_(0), start_(false), start_verify_(false), + num_bg_threads_(0), should_stop_bg_thread_(false), - bg_thread_finished_(false), + bg_thread_finished_(0), stress_test_(stress_test), verification_failure_(false), + should_stop_test_(false), no_overwrite_ids_(FLAGS_column_families), values_(nullptr), printing_verification_results_(false) { // Pick random keys in each column family that will not experience // overwrite - printf("Choosing random keys with no overwrite\n"); + fprintf(stdout, "Choosing random keys with no overwrite\n"); Random64 rnd(seed_); // Start with the identity permutation. Subsequent iterations of // for loop below will start with perm of previous for loop @@ -159,6 +163,14 @@ class SharedState { ptr.reset(new port::Mutex); } } + if (FLAGS_compaction_thread_pool_adjust_interval > 0) { + ++num_bg_threads_; + fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n"); + } + if (FLAGS_continuous_verification_interval > 0) { + ++num_bg_threads_; + fprintf(stdout, "Starting continuous_verification_thread\n"); + } } ~SharedState() {} @@ -199,7 +211,11 @@ class SharedState { void SetVerificationFailure() { verification_failure_.store(true); } - bool HasVerificationFailedYet() { return verification_failure_.load(); } + bool HasVerificationFailedYet() const { return verification_failure_.load(); } + + void SetShouldStopTest() { should_stop_test_.store(true); } + + bool ShouldStopTest() const { return should_stop_test_.load(); } port::Mutex* GetMutexForKey(int cf, int64_t key) { return key_locks_[cf][key >> log2_keys_per_lock_].get(); @@ -290,11 +306,13 @@ class SharedState { void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } - bool ShoudStopBgThread() { return should_stop_bg_thread_; } + bool ShouldStopBgThread() { return should_stop_bg_thread_; } - void SetBgThreadFinish() { bg_thread_finished_ = true; } + void IncBgThreadsFinished() { ++bg_thread_finished_; } - bool BgThreadFinished() const { return bg_thread_finished_; } + bool BgThreadsFinished() const { + return bg_thread_finished_ == num_bg_threads_; + } bool ShouldVerifyAtBeginning() const { return expected_mmap_buffer_.get() != nullptr; @@ -323,10 +341,12 @@ class SharedState { long num_done_; bool start_; bool start_verify_; + int num_bg_threads_; bool should_stop_bg_thread_; - bool bg_thread_finished_; + int bg_thread_finished_; StressTest* stress_test_; std::atomic verification_failure_; + std::atomic should_stop_test_; // Keys that should not be overwritten std::unordered_set no_overwrite_ids_; diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 09d8745af..eba61041c 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -28,7 +28,8 @@ StressTest::StressTest() #endif new_column_family_name_(1), num_times_reopened_(0), - db_preload_finished_(false) { + db_preload_finished_(false), + cmp_db_(nullptr) { if (FLAGS_destroy_db_initially) { std::vector files; FLAGS_env->GetChildren(FLAGS_db, &files); @@ -65,6 +66,12 @@ StressTest::~StressTest() { delete secondaries_[i]; } secondaries_.clear(); + + for (auto* cf : cmp_cfhs_) { + delete cf; + } + cmp_cfhs_.clear(); + delete cmp_db_; } std::shared_ptr StressTest::NewCache(size_t capacity) { @@ -187,7 +194,7 @@ void StressTest::InitReadonlyDb(SharedState* shared) { bool StressTest::VerifySecondaries() { #ifndef ROCKSDB_LITE - if (FLAGS_enable_secondary) { + if (FLAGS_test_secondary) { uint64_t now = FLAGS_env->NowMicros(); fprintf( stdout, "%s Start to verify secondaries against primary\n", @@ -232,7 +239,7 @@ bool StressTest::VerifySecondaries() { return false; } } - if (FLAGS_enable_secondary) { + if (FLAGS_test_secondary) { uint64_t now = FLAGS_env->NowMicros(); fprintf( stdout, "%s Verification of secondaries succeeded\n", @@ -294,15 +301,16 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf, void StressTest::VerificationAbort(SharedState* shared, std::string msg, Status s) const { - printf("Verification failed: %s. Status is %s\n", msg.c_str(), - s.ToString().c_str()); + fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(), + s.ToString().c_str()); shared->SetVerificationFailure(); } void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf, int64_t key) const { - printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf, - key, msg.c_str()); + fprintf(stderr, + "Verification failed for column family %d key %" PRIi64 ": %s\n", cf, + key, msg.c_str()); shared->SetVerificationFailure(); } @@ -466,15 +474,17 @@ void StressTest::OperateDb(ThreadState* thread) { write_opts.sync = true; } write_opts.disableWAL = FLAGS_disable_wal; - const int prefixBound = (int)FLAGS_readpercent + (int)FLAGS_prefixpercent; - const int writeBound = prefixBound + (int)FLAGS_writepercent; - const int delBound = writeBound + (int)FLAGS_delpercent; - const int delRangeBound = delBound + (int)FLAGS_delrangepercent; + const int prefixBound = static_cast(FLAGS_readpercent) + + static_cast(FLAGS_prefixpercent); + const int writeBound = prefixBound + static_cast(FLAGS_writepercent); + const int delBound = writeBound + static_cast(FLAGS_delpercent); + const int delRangeBound = delBound + static_cast(FLAGS_delrangepercent); const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1); thread->stats.Start(); for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) { - if (thread->shared->HasVerificationFailedYet()) { + if (thread->shared->HasVerificationFailedYet() || + thread->shared->ShouldStopTest()) { break; } if (open_cnt != 0) { @@ -510,12 +520,20 @@ void StressTest::OperateDb(ThreadState* thread) { options_.inplace_update_support ^= options_.inplace_update_support; } + if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 && + thread->rand.OneIn(FLAGS_verify_db_one_in)) { + ContinuouslyVerifyDb(thread); + if (thread->shared->ShouldStopTest()) { + break; + } + } + MaybeClearOneColumnFamily(thread); if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) { Status s = db_->SyncWAL(); if (!s.ok() && !s.IsNotSupported()) { - fprintf(stdout, "SyncWAL() failed: %s\n", s.ToString().c_str()); + fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str()); } } @@ -606,7 +624,7 @@ void StressTest::OperateDb(ThreadState* thread) { // Reset this in case we pick something other than a read op. We don't // want to use a stale value when deciding at the beginning of the loop // whether to vote to reopen - if (prob_op < (int)FLAGS_readpercent) { + if (prob_op >= 0 && prob_op < static_cast(FLAGS_readpercent)) { assert(0 <= prob_op); // OPERATION read if (FLAGS_use_multiget) { @@ -625,7 +643,7 @@ void StressTest::OperateDb(ThreadState* thread) { TestGet(thread, read_opts, rand_column_families, rand_keys); } } else if (prob_op < prefixBound) { - assert((int)FLAGS_readpercent <= prob_op); + assert(static_cast(FLAGS_readpercent) <= prob_op); // OPERATION prefix scan // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will @@ -1098,8 +1116,8 @@ Status StressTest::TestBackupRestore( restored_db = nullptr; } if (!s.ok()) { - printf("A backup/restore operation failed with: %s\n", - s.ToString().c_str()); + fprintf(stderr, "A backup/restore operation failed with: %s\n", + s.ToString().c_str()); } return s; } @@ -1372,7 +1390,8 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key, Status status = db_->CompactRange(cro, column_family, &start_key, &end_key); if (!status.ok()) { - printf("Unable to perform CompactRange(): %s\n", status.ToString().c_str()); + fprintf(stdout, "Unable to perform CompactRange(): %s\n", + status.ToString().c_str()); } if (pre_snapshot != nullptr) { @@ -1699,16 +1718,16 @@ void StressTest::Open() { existing_column_families.end()); if (sorted_cfn != existing_column_families) { fprintf(stderr, "Expected column families differ from the existing:\n"); - printf("Expected: {"); + fprintf(stderr, "Expected: {"); for (auto cf : sorted_cfn) { - printf("%s ", cf.c_str()); + fprintf(stderr, "%s ", cf.c_str()); } - printf("}\n"); - printf("Existing: {"); + fprintf(stderr, "}\n"); + fprintf(stderr, "Existing: {"); for (auto cf : existing_column_families) { - printf("%s ", cf.c_str()); + fprintf(stderr, "%s ", cf.c_str()); } - printf("}\n"); + fprintf(stderr, "}\n"); } assert(sorted_cfn == existing_column_families); } @@ -1775,14 +1794,15 @@ void StressTest::Open() { assert(!s.ok() || column_families_.size() == static_cast(FLAGS_column_families)); - if (FLAGS_enable_secondary) { + if (FLAGS_test_secondary) { #ifndef ROCKSDB_LITE secondaries_.resize(FLAGS_threads); std::fill(secondaries_.begin(), secondaries_.end(), nullptr); secondary_cfh_lists_.clear(); secondary_cfh_lists_.resize(FLAGS_threads); Options tmp_opts; - tmp_opts.max_open_files = FLAGS_open_files; + // TODO(yanqin) support max_open_files != -1 for secondary instance. + tmp_opts.max_open_files = -1; tmp_opts.statistics = dbstats_secondaries; tmp_opts.env = FLAGS_env; for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { @@ -1795,22 +1815,35 @@ void StressTest::Open() { break; } } + assert(s.ok()); #else fprintf(stderr, "Secondary is not supported in RocksDBLite\n"); exit(1); #endif } + if (FLAGS_continuous_verification_interval > 0 && !cmp_db_) { + Options tmp_opts; + // TODO(yanqin) support max_open_files != -1 for secondary instance. + tmp_opts.max_open_files = -1; + tmp_opts.env = FLAGS_env; + std::string secondary_path = FLAGS_secondaries_base + "/cmp_database"; + s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, + cf_descriptors, &cmp_cfhs_, &cmp_db_); + assert(!s.ok() || + cmp_cfhs_.size() == static_cast(FLAGS_column_families)); + } } else { #ifndef ROCKSDB_LITE DBWithTTL* db_with_ttl; s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); db_ = db_with_ttl; - if (FLAGS_enable_secondary) { + if (FLAGS_test_secondary) { secondaries_.resize(FLAGS_threads); std::fill(secondaries_.begin(), secondaries_.end(), nullptr); Options tmp_opts; tmp_opts.env = options_.env; - tmp_opts.max_open_files = FLAGS_open_files; + // TODO(yanqin) support max_open_files != -1 for secondary instance. + tmp_opts.max_open_files = -1; for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { const std::string secondary_path = FLAGS_secondaries_base + "/" + std::to_string(i); diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index c743a3780..c46e53657 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -34,6 +34,7 @@ class StressTest { void OperateDb(ThreadState* thread); virtual void VerifyDb(ThreadState* thread) const = 0; + virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const {} void PrintStatistics(); @@ -203,6 +204,10 @@ class StressTest { // Fields used for stress-testing secondary instance in the same process std::vector secondaries_; std::vector> secondary_cfh_lists_; + + // Fields used for continuous verification from another thread + DB* cmp_db_; + std::vector cmp_cfhs_; }; } // namespace rocksdb diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 7b08fea3f..709215696 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -41,7 +41,7 @@ int db_stress_tool(int argc, char** argv) { if (FLAGS_statistics) { dbstats = rocksdb::CreateDBStatistics(); - if (FLAGS_enable_secondary) { + if (FLAGS_test_secondary) { dbstats_secondaries = rocksdb::CreateDBStatistics(); } } @@ -171,7 +171,8 @@ int db_stress_tool(int argc, char** argv) { FLAGS_db = default_db_path; } - if (FLAGS_enable_secondary && FLAGS_secondaries_base.empty()) { + if ((FLAGS_test_secondary || FLAGS_continuous_verification_interval > 0) && + FLAGS_secondaries_base.empty()) { std::string default_secondaries_path; FLAGS_env->GetTestDirectory(&default_secondaries_path); default_secondaries_path += "/dbstress_secondaries"; @@ -184,8 +185,17 @@ int db_stress_tool(int argc, char** argv) { FLAGS_secondaries_base = default_secondaries_path; } - if (!FLAGS_enable_secondary && FLAGS_secondary_catch_up_one_in > 0) { - fprintf(stderr, "Secondary instance is disabled.\n"); + if (!FLAGS_test_secondary && FLAGS_secondary_catch_up_one_in > 0) { + fprintf( + stderr, + "Must set -test_secondary=true if secondary_catch_up_one_in > 0.\n"); + exit(1); + } + + if (!FLAGS_test_cf_consistency && FLAGS_verify_db_one_in > 0) { + fprintf(stderr, + "For non cf_consistency tests, VerifyDb() is called only before " + "and after test.\n"); exit(1); } diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index f75339737..6c50b9fa5 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -42,10 +42,9 @@ class NonBatchedOpsStressTest : public StressTest { if (thread->shared->HasVerificationFailedYet()) { break; } - // TODO(ljin): update "long" to uint64_t // Reseek when the prefix changes if (prefix_to_use > 0 && - i % (static_cast(1) << 8 * (8 - prefix_to_use)) == 0) { + i % (static_cast(1) << 8 * (8 - prefix_to_use)) == 0) { iter->Seek(Key(i)); } std::string from_db; @@ -65,7 +64,7 @@ class NonBatchedOpsStressTest : public StressTest { } else { // The iterator found no value for the key in question, so do not // move to the next item in the iterator - s = Status::NotFound(Slice()); + s = Status::NotFound(); } VerifyValue(static_cast(cf), i, options, shared, from_db, s, true); @@ -506,7 +505,7 @@ class NonBatchedOpsStressTest : public StressTest { bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/, SharedState* shared, const std::string& value_from_db, - Status s, bool strict = false) const { + const Status& s, bool strict = false) const { if (shared->HasVerificationFailedYet()) { return false; } diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index b76164ffc..413650035 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -99,7 +99,9 @@ default_params = { # Temporarily disabled because of assertion violations in # BlockBasedTable::ApproximateSize # "level_compaction_dynamic_level_bytes" : True, - "verify_checksum_one_in": 1000000 + "verify_checksum_one_in": 1000000, + "verify_db_one_in": 100000, + "continuous_verification_interval" : 0 } _TEST_DIR_ENV_VAR = 'TEST_TMPDIR'