diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index 1aa1cc264..13f3aba5c 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -340,6 +340,8 @@ class BatchedOpsStressTest : public StressTest { } void VerifyDb(ThreadState* /* thread */) const override {} + + void ContinuouslyVerifyDb(ThreadState* /* thread */) const override {} }; StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); } diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index ae1e57314..b7cc4c376 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -448,21 +448,24 @@ class CfConsistencyStressTest : public StressTest { DB* db_ptr = cmp_db_ ? cmp_db_ : db_; const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_; - const auto ss_deleter = [&](const Snapshot* ss) { - db_ptr->ReleaseSnapshot(ss); - }; - std::unique_ptr snapshot_guard( - db_ptr->GetSnapshot(), ss_deleter); - if (cmp_db_) { - status = cmp_db_->TryCatchUpWithPrimary(); - } + + // Take a snapshot to preserve the state of primary db. + ManagedSnapshot snapshot_guard(db_); + SharedState* shared = thread->shared; assert(shared); - if (!status.ok()) { - shared->SetShouldStopTest(); - return; + + if (cmp_db_) { + status = cmp_db_->TryCatchUpWithPrimary(); + if (!status.ok()) { + fprintf(stderr, "TryCatchUpWithPrimary: %s\n", + status.ToString().c_str()); + shared->SetShouldStopTest(); + assert(false); + return; + } } - assert(cmp_db_ || snapshot_guard.get()); + const auto checksum_column_family = [](Iterator* iter, uint32_t* checksum) -> Status { assert(nullptr != checksum); @@ -476,17 +479,29 @@ class CfConsistencyStressTest : public StressTest { }; // This `ReadOptions` is for validation purposes. Ignore // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. - ReadOptions ropts; + ReadOptions ropts(FLAGS_verify_checksum, true); ropts.total_order_seek = true; - ropts.snapshot = snapshot_guard.get(); + if (nullptr == cmp_db_) { + ropts.snapshot = snapshot_guard.snapshot(); + } uint32_t crc = 0; { // Compute crc for all key-values of default column family. std::unique_ptr it(db_ptr->NewIterator(ropts)); status = checksum_column_family(it.get(), &crc); + if (!status.ok()) { + fprintf(stderr, "Computing checksum of default cf: %s\n", + status.ToString().c_str()); + assert(false); + } } - uint32_t tmp_crc = 0; - if (status.ok()) { + // Since we currently intentionally disallow reading from the secondary + // instance with snapshot, we cannot achieve cross-cf consistency if WAL is + // enabled because there is no guarantee that secondary instance replays + // the primary's WAL to a consistent point where all cfs have the same + // data. + if (status.ok() && FLAGS_disable_wal) { + uint32_t tmp_crc = 0; for (ColumnFamilyHandle* cfh : cfhs) { if (cfh == db_ptr->DefaultColumnFamily()) { continue; @@ -497,11 +512,19 @@ class CfConsistencyStressTest : public StressTest { break; } } - } - if (!status.ok() || tmp_crc != crc) { - shared->SetShouldStopTest(); + if (!status.ok()) { + fprintf(stderr, "status: %s\n", status.ToString().c_str()); + shared->SetShouldStopTest(); + assert(false); + } else if (tmp_crc != crc) { + fprintf(stderr, "tmp_crc=%" PRIu32 " crc=%" PRIu32 "\n", tmp_crc, crc); + shared->SetShouldStopTest(); + assert(false); + } } } +#else // ROCKSDB_LITE + void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {} #endif // !ROCKSDB_LITE std::vector GenerateColumnFamilies( diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index b50ea25b2..9d392d75a 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -229,7 +229,6 @@ DECLARE_uint64(ops_per_thread); DECLARE_uint64(log2_keys_per_lock); DECLARE_uint64(max_manifest_file_size); DECLARE_bool(in_place_update); -DECLARE_int32(secondary_catch_up_one_in); DECLARE_string(memtablerep); DECLARE_int32(prefix_size); DECLARE_bool(use_merge); diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 438a840ad..009168ae3 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -177,10 +177,6 @@ bool RunStressTest(StressTest* stress) { } } - if (!stress->VerifySecondaries()) { - return false; - } - if (shared.HasVerificationFailedYet()) { fprintf(stderr, "Verification failed :(\n"); return false; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 8b0c6fe35..547fc7a53 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -500,7 +500,9 @@ DEFINE_string(db, "", "Use the db with the following name."); DEFINE_string(secondaries_base, "", "Use this path as the base path for secondary instances."); -DEFINE_bool(test_secondary, false, "Test secondary instance."); +DEFINE_bool(test_secondary, false, + "If true, start an additional secondary instance which can be used " + "for verification."); DEFINE_string( expected_values_dir, "", @@ -790,11 +792,6 @@ DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file"); DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable"); -DEFINE_int32(secondary_catch_up_one_in, 0, - "If non-zero, the secondaries attemp to catch up with the primary " - "once for every N operations on average. 0 indicates the " - "secondaries do not try to catch up after open."); - DEFINE_string(memtablerep, "skip_list", ""); inline static bool ValidatePrefixSize(const char* flagname, int32_t value) { diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 2720a133b..e0179c999 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -107,17 +107,6 @@ StressTest::~StressTest() { column_families_.clear(); delete db_; - assert(secondaries_.size() == secondary_cfh_lists_.size()); - size_t n = secondaries_.size(); - for (size_t i = 0; i != n; ++i) { - for (auto* cf : secondary_cfh_lists_[i]) { - delete cf; - } - secondary_cfh_lists_[i].clear(); - delete secondaries_[i]; - } - secondaries_.clear(); - for (auto* cf : cmp_cfhs_) { delete cf; } @@ -347,63 +336,6 @@ void StressTest::TrackExpectedState(SharedState* shared) { } } -bool StressTest::VerifySecondaries() { -#ifndef ROCKSDB_LITE - if (FLAGS_test_secondary) { - uint64_t now = clock_->NowMicros(); - fprintf(stdout, "%s Start to verify secondaries against primary\n", - clock_->TimeToString(static_cast(now) / 1000000).c_str()); - } - for (size_t k = 0; k != secondaries_.size(); ++k) { - Status s = secondaries_[k]->TryCatchUpWithPrimary(); - if (!s.ok()) { - fprintf(stderr, "Secondary failed to catch up with primary\n"); - return false; - } - // This `ReadOptions` is for validation purposes. Ignore - // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. - ReadOptions ropts; - ropts.total_order_seek = true; - // Verify only the default column family since the primary may have - // dropped other column families after most recent reopen. - std::unique_ptr iter1(db_->NewIterator(ropts)); - std::unique_ptr iter2(secondaries_[k]->NewIterator(ropts)); - for (iter1->SeekToFirst(), iter2->SeekToFirst(); - iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) { - if (iter1->key().compare(iter2->key()) != 0 || - iter1->value().compare(iter2->value())) { - fprintf(stderr, - "Secondary %d contains different data from " - "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n", - static_cast(k), - iter1->key().ToString(/*hex=*/true).c_str(), - iter1->value().ToString(/*hex=*/true).c_str(), - iter2->key().ToString(/*hex=*/true).c_str(), - iter2->value().ToString(/*hex=*/true).c_str()); - return false; - } - } - if (iter1->Valid() && !iter2->Valid()) { - fprintf(stderr, - "Secondary %d record count is smaller than that of primary\n", - static_cast(k)); - return false; - } else if (!iter1->Valid() && iter2->Valid()) { - fprintf(stderr, - "Secondary %d record count is larger than that of primary\n", - static_cast(k)); - return false; - } - } - if (FLAGS_test_secondary) { - uint64_t now = clock_->NowMicros(); - fprintf(stdout, "%s Verification of secondaries succeeded\n", - clock_->TimeToString(static_cast(now) / 1000000).c_str()); - } -#endif // ROCKSDB_LITE - return true; -} - Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf, ThreadState::SnapshotState& snap_state) { Status s; @@ -968,18 +900,6 @@ void StressTest::OperateDb(ThreadState* thread) { TestCustomOperations(thread, rand_column_families); } thread->stats.FinishedSingleOp(); -#ifndef ROCKSDB_LITE - uint32_t tid = thread->tid; - assert(secondaries_.empty() || - static_cast(tid) < secondaries_.size()); - if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) { - Status s = secondaries_[tid]->TryCatchUpWithPrimary(); - if (!s.ok()) { - VerificationAbort(shared, "Secondary instance failed to catch up", s); - break; - } - } -#endif } } while (!thread->snapshot_queue.empty()) { @@ -2619,72 +2539,34 @@ void StressTest::Open(SharedState* shared) { assert(trans.size() == 0); #endif } - assert(!s.ok() || column_families_.size() == - static_cast(FLAGS_column_families)); + assert(s.ok()); + assert(column_families_.size() == + static_cast(FLAGS_column_families)); - if (s.ok() && FLAGS_test_secondary) { -#ifndef ROCKSDB_LITE - secondaries_.resize(FLAGS_threads); - std::fill(secondaries_.begin(), secondaries_.end(), nullptr); - secondary_cfh_lists_.clear(); - secondary_cfh_lists_.resize(FLAGS_threads); - Options tmp_opts; - // TODO(yanqin) support max_open_files != -1 for secondary instance. - tmp_opts.max_open_files = -1; - tmp_opts.statistics = dbstats_secondaries; - tmp_opts.env = db_stress_env; - for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { - const std::string secondary_path = - FLAGS_secondaries_base + "/" + std::to_string(i); - s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, - cf_descriptors, &secondary_cfh_lists_[i], - &secondaries_[i]); - if (!s.ok()) { - break; - } - } -#else - fprintf(stderr, "Secondary is not supported in RocksDBLite\n"); - exit(1); -#endif - } // Secondary instance does not support write-prepared/write-unprepared // transactions, thus just disable secondary instance if we use // transaction. - if (s.ok() && FLAGS_continuous_verification_interval > 0 && - !FLAGS_use_txn && !cmp_db_) { + if (s.ok() && FLAGS_test_secondary && !FLAGS_use_txn) { +#ifndef ROCKSDB_LITE Options tmp_opts; // TODO(yanqin) support max_open_files != -1 for secondary instance. tmp_opts.max_open_files = -1; tmp_opts.env = db_stress_env; - std::string secondary_path = FLAGS_secondaries_base + "/cmp_database"; + const std::string& secondary_path = FLAGS_secondaries_base; s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, cf_descriptors, &cmp_cfhs_, &cmp_db_); - assert(!s.ok() || - cmp_cfhs_.size() == static_cast(FLAGS_column_families)); + assert(s.ok()); + assert(cmp_cfhs_.size() == static_cast(FLAGS_column_families)); +#else + fprintf(stderr, "Secondary is not supported in RocksDBLite\n"); + exit(1); +#endif // !ROCKSDB_LITE } } else { #ifndef ROCKSDB_LITE DBWithTTL* db_with_ttl; s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); db_ = db_with_ttl; - if (FLAGS_test_secondary) { - secondaries_.resize(FLAGS_threads); - std::fill(secondaries_.begin(), secondaries_.end(), nullptr); - Options tmp_opts; - tmp_opts.env = options_.env; - // TODO(yanqin) support max_open_files != -1 for secondary instance. - tmp_opts.max_open_files = -1; - for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { - const std::string secondary_path = - FLAGS_secondaries_base + "/" + std::to_string(i); - s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, - &secondaries_[i]); - if (!s.ok()) { - break; - } - } - } #else fprintf(stderr, "TTL is not supported in RocksDBLite\n"); exit(1); @@ -2735,17 +2617,6 @@ void StressTest::Reopen(ThreadState* thread) { txn_db_ = nullptr; #endif - assert(secondaries_.size() == secondary_cfh_lists_.size()); - size_t n = secondaries_.size(); - for (size_t i = 0; i != n; ++i) { - for (auto* cf : secondary_cfh_lists_[i]) { - delete cf; - } - secondary_cfh_lists_[i].clear(); - delete secondaries_[i]; - } - secondaries_.clear(); - num_times_reopened_++; auto now = clock_->NowMicros(); fprintf(stdout, "%s Reopening database for the %dth time\n", @@ -2784,15 +2655,6 @@ void CheckAndSetOptionsForUserTimestamp(Options& options) { fprintf(stderr, "TransactionDB does not support timestamp yet.\n"); exit(1); } - if (FLAGS_read_only) { - fprintf(stderr, "When opened as read-only, timestamp not supported.\n"); - exit(1); - } - if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 || - FLAGS_continuous_verification_interval > 0) { - fprintf(stderr, "Secondary instance does not support timestamp.\n"); - exit(1); - } #ifndef ROCKSDB_LITE if (FLAGS_enable_blob_files || FLAGS_use_blob_db) { fprintf(stderr, "BlobDB not supported with timestamp.\n"); diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index a0e5c1cf8..cc792fa50 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -37,12 +37,9 @@ class StressTest { void TrackExpectedState(SharedState* shared); - // Return false if verification fails. - bool VerifySecondaries(); - void OperateDb(ThreadState* thread); virtual void VerifyDb(ThreadState* thread) const = 0; - virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const {} + virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0; void PrintStatistics(); @@ -247,10 +244,6 @@ class StressTest { std::vector options_index_; std::atomic db_preload_finished_; - // Fields used for stress-testing secondary instance in the same process - std::vector secondaries_; - std::vector> secondary_cfh_lists_; - // Fields used for continuous verification from another thread DB* cmp_db_; std::vector cmp_cfhs_; diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 8155f0757..d04bd96f2 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -235,12 +235,6 @@ int db_stress_tool(int argc, char** argv) { FLAGS_secondaries_base = default_secondaries_path; } - if (!FLAGS_test_secondary && FLAGS_secondary_catch_up_one_in > 0) { - fprintf( - stderr, - "Must set -test_secondary=true if secondary_catch_up_one_in > 0.\n"); - exit(1); - } if (FLAGS_best_efforts_recovery && !FLAGS_skip_verifydb && !FLAGS_disable_wal) { fprintf(stderr, diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index a0250f4bc..cb40913d7 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -184,6 +184,104 @@ class NonBatchedOpsStressTest : public StressTest { } } +#ifndef ROCKSDB_LITE + void ContinuouslyVerifyDb(ThreadState* thread) const override { + if (!cmp_db_) { + return; + } + assert(cmp_db_); + assert(!cmp_cfhs_.empty()); + Status s = cmp_db_->TryCatchUpWithPrimary(); + if (!s.ok()) { + assert(false); + exit(1); + } + + const auto checksum_column_family = [](Iterator* iter, + uint32_t* checksum) -> Status { + assert(nullptr != checksum); + uint32_t ret = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ret = crc32c::Extend(ret, iter->key().data(), iter->key().size()); + ret = crc32c::Extend(ret, iter->value().data(), iter->value().size()); + } + *checksum = ret; + return iter->status(); + }; + + auto* shared = thread->shared; + assert(shared); + const int64_t max_key = shared->GetMaxKey(); + ReadOptions read_opts(FLAGS_verify_checksum, true); + std::string ts_str; + Slice ts; + if (FLAGS_user_timestamp_size > 0) { + ts_str = GenerateTimestampForRead(); + ts = ts_str; + read_opts.timestamp = &ts; + } + + static Random64 rand64(shared->GetSeed()); + + { + uint32_t crc = 0; + std::unique_ptr it(cmp_db_->NewIterator(read_opts)); + s = checksum_column_family(it.get(), &crc); + if (!s.ok()) { + fprintf(stderr, "Computing checksum of default cf: %s\n", + s.ToString().c_str()); + assert(false); + } + } + + for (auto* handle : cmp_cfhs_) { + if (thread->rand.OneInOpt(3)) { + // Use Get() + uint64_t key = rand64.Uniform(static_cast(max_key)); + std::string key_str = Key(key); + std::string value; + std::string key_ts; + s = cmp_db_->Get(read_opts, handle, key_str, &value, + FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr); + s.PermitUncheckedError(); + } else { + // Use range scan + std::unique_ptr iter(cmp_db_->NewIterator(read_opts, handle)); + uint32_t rnd = (thread->rand.Next()) % 4; + if (0 == rnd) { + // SeekToFirst() + Next()*5 + read_opts.total_order_seek = true; + iter->SeekToFirst(); + for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) { + } + } else if (1 == rnd) { + // SeekToLast() + Prev()*5 + read_opts.total_order_seek = true; + iter->SeekToLast(); + for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) { + } + } else if (2 == rnd) { + // Seek() +Next()*5 + uint64_t key = rand64.Uniform(static_cast(max_key)); + std::string key_str = Key(key); + iter->Seek(key_str); + for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) { + } + } else { + // SeekForPrev() + Prev()*5 + uint64_t key = rand64.Uniform(static_cast(max_key)); + std::string key_str = Key(key); + iter->SeekForPrev(key_str); + for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) { + } + } + } + } + } +#else + void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {} +#endif // ROCKSDB_LITE + void MaybeClearOneColumnFamily(ThreadState* thread) override { if (FLAGS_column_families > 1) { if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index b1187f865..3d0facc68 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -353,8 +353,6 @@ ts_params = { "use_merge": 0, "use_full_merge_v1": 0, "use_txn": 0, - "secondary_catch_up_one_in": 0, - "continuous_verification_interval": 0, "enable_blob_files": 0, "use_blob_db": 0, "enable_compaction_filter": 0,