From f890527b16e188bbcd9332e53a85abccb30346ad Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Tue, 7 Jun 2022 21:07:47 -0700 Subject: [PATCH] Update test for secondary instance in stress test (#10121) Summary: This PR updates secondary instance testing in stress test by default. A background thread will be started (disabled by default), running a secondary instance tailing the logs of the primary. Periodically (every 1 sec), this thread calls `TryCatchUpWithPrimary()` and uses point lookup or range scan to read some random keys with only very basic verification to make sure no assertion failure is triggered. Thanks to https://github.com/facebook/rocksdb/issues/10061 , we can enable secondary instance when user-defined timestamp is enabled. Also removed a less useful test configuration, `secondary_catch_up_one_in`. This is very similar to the periodic catch-up. In the last commit, I decided not to enable it now, but just update the tests, since secondary instance does not work well when the underlying file is renamed by primary, e.g. SstFileManager. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10121 Test Plan: ``` TEST_TMPDIR=/dev/shm/rocksdb make crash_test TEST_TMPDIR=/dev/shm/rocksdb make crash_test_with_ts TEST_TMPDIR=/dev/shm/rocksdb make crash_test_with_atomic_flush ``` Reviewed By: ajkr Differential Revision: D36939458 Pulled By: riversand963 fbshipit-source-id: 1c065b7efc3690fc341569b9d369a5cbd8ef6b3e --- db_stress_tool/batched_ops_stress.cc | 2 + db_stress_tool/cf_consistency_stress.cc | 61 ++++++--- db_stress_tool/db_stress_common.h | 1 - db_stress_tool/db_stress_driver.cc | 4 - db_stress_tool/db_stress_gflags.cc | 9 +- db_stress_tool/db_stress_test_base.cc | 162 ++---------------------- db_stress_tool/db_stress_test_base.h | 9 +- db_stress_tool/db_stress_tool.cc | 6 - db_stress_tool/no_batched_ops_stress.cc | 98 ++++++++++++++ tools/db_crashtest.py | 2 - 10 files changed, 158 insertions(+), 196 deletions(-) diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index 1aa1cc264..13f3aba5c 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -340,6 +340,8 @@ class BatchedOpsStressTest : public StressTest { } void VerifyDb(ThreadState* /* thread */) const override {} + + void ContinuouslyVerifyDb(ThreadState* /* thread */) const override {} }; StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); } diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index ae1e57314..b7cc4c376 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -448,21 +448,24 @@ class CfConsistencyStressTest : public StressTest { DB* db_ptr = cmp_db_ ? cmp_db_ : db_; const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_; - const auto ss_deleter = [&](const Snapshot* ss) { - db_ptr->ReleaseSnapshot(ss); - }; - std::unique_ptr snapshot_guard( - db_ptr->GetSnapshot(), ss_deleter); - if (cmp_db_) { - status = cmp_db_->TryCatchUpWithPrimary(); - } + + // Take a snapshot to preserve the state of primary db. + ManagedSnapshot snapshot_guard(db_); + SharedState* shared = thread->shared; assert(shared); - if (!status.ok()) { - shared->SetShouldStopTest(); - return; + + if (cmp_db_) { + status = cmp_db_->TryCatchUpWithPrimary(); + if (!status.ok()) { + fprintf(stderr, "TryCatchUpWithPrimary: %s\n", + status.ToString().c_str()); + shared->SetShouldStopTest(); + assert(false); + return; + } } - assert(cmp_db_ || snapshot_guard.get()); + const auto checksum_column_family = [](Iterator* iter, uint32_t* checksum) -> Status { assert(nullptr != checksum); @@ -476,17 +479,29 @@ class CfConsistencyStressTest : public StressTest { }; // This `ReadOptions` is for validation purposes. Ignore // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. - ReadOptions ropts; + ReadOptions ropts(FLAGS_verify_checksum, true); ropts.total_order_seek = true; - ropts.snapshot = snapshot_guard.get(); + if (nullptr == cmp_db_) { + ropts.snapshot = snapshot_guard.snapshot(); + } uint32_t crc = 0; { // Compute crc for all key-values of default column family. std::unique_ptr it(db_ptr->NewIterator(ropts)); status = checksum_column_family(it.get(), &crc); + if (!status.ok()) { + fprintf(stderr, "Computing checksum of default cf: %s\n", + status.ToString().c_str()); + assert(false); + } } - uint32_t tmp_crc = 0; - if (status.ok()) { + // Since we currently intentionally disallow reading from the secondary + // instance with snapshot, we cannot achieve cross-cf consistency if WAL is + // enabled because there is no guarantee that secondary instance replays + // the primary's WAL to a consistent point where all cfs have the same + // data. + if (status.ok() && FLAGS_disable_wal) { + uint32_t tmp_crc = 0; for (ColumnFamilyHandle* cfh : cfhs) { if (cfh == db_ptr->DefaultColumnFamily()) { continue; @@ -497,11 +512,19 @@ class CfConsistencyStressTest : public StressTest { break; } } - } - if (!status.ok() || tmp_crc != crc) { - shared->SetShouldStopTest(); + if (!status.ok()) { + fprintf(stderr, "status: %s\n", status.ToString().c_str()); + shared->SetShouldStopTest(); + assert(false); + } else if (tmp_crc != crc) { + fprintf(stderr, "tmp_crc=%" PRIu32 " crc=%" PRIu32 "\n", tmp_crc, crc); + shared->SetShouldStopTest(); + assert(false); + } } } +#else // ROCKSDB_LITE + void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {} #endif // !ROCKSDB_LITE std::vector GenerateColumnFamilies( diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index b50ea25b2..9d392d75a 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -229,7 +229,6 @@ DECLARE_uint64(ops_per_thread); DECLARE_uint64(log2_keys_per_lock); DECLARE_uint64(max_manifest_file_size); DECLARE_bool(in_place_update); -DECLARE_int32(secondary_catch_up_one_in); DECLARE_string(memtablerep); DECLARE_int32(prefix_size); DECLARE_bool(use_merge); diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc index 438a840ad..009168ae3 100644 --- a/db_stress_tool/db_stress_driver.cc +++ b/db_stress_tool/db_stress_driver.cc @@ -177,10 +177,6 @@ bool RunStressTest(StressTest* stress) { } } - if (!stress->VerifySecondaries()) { - return false; - } - if (shared.HasVerificationFailedYet()) { fprintf(stderr, "Verification failed :(\n"); return false; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 8b0c6fe35..547fc7a53 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -500,7 +500,9 @@ DEFINE_string(db, "", "Use the db with the following name."); DEFINE_string(secondaries_base, "", "Use this path as the base path for secondary instances."); -DEFINE_bool(test_secondary, false, "Test secondary instance."); +DEFINE_bool(test_secondary, false, + "If true, start an additional secondary instance which can be used " + "for verification."); DEFINE_string( expected_values_dir, "", @@ -790,11 +792,6 @@ DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file"); DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable"); -DEFINE_int32(secondary_catch_up_one_in, 0, - "If non-zero, the secondaries attemp to catch up with the primary " - "once for every N operations on average. 0 indicates the " - "secondaries do not try to catch up after open."); - DEFINE_string(memtablerep, "skip_list", ""); inline static bool ValidatePrefixSize(const char* flagname, int32_t value) { diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 2720a133b..e0179c999 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -107,17 +107,6 @@ StressTest::~StressTest() { column_families_.clear(); delete db_; - assert(secondaries_.size() == secondary_cfh_lists_.size()); - size_t n = secondaries_.size(); - for (size_t i = 0; i != n; ++i) { - for (auto* cf : secondary_cfh_lists_[i]) { - delete cf; - } - secondary_cfh_lists_[i].clear(); - delete secondaries_[i]; - } - secondaries_.clear(); - for (auto* cf : cmp_cfhs_) { delete cf; } @@ -347,63 +336,6 @@ void StressTest::TrackExpectedState(SharedState* shared) { } } -bool StressTest::VerifySecondaries() { -#ifndef ROCKSDB_LITE - if (FLAGS_test_secondary) { - uint64_t now = clock_->NowMicros(); - fprintf(stdout, "%s Start to verify secondaries against primary\n", - clock_->TimeToString(static_cast(now) / 1000000).c_str()); - } - for (size_t k = 0; k != secondaries_.size(); ++k) { - Status s = secondaries_[k]->TryCatchUpWithPrimary(); - if (!s.ok()) { - fprintf(stderr, "Secondary failed to catch up with primary\n"); - return false; - } - // This `ReadOptions` is for validation purposes. Ignore - // `FLAGS_rate_limit_user_ops` to avoid slowing any validation. - ReadOptions ropts; - ropts.total_order_seek = true; - // Verify only the default column family since the primary may have - // dropped other column families after most recent reopen. - std::unique_ptr iter1(db_->NewIterator(ropts)); - std::unique_ptr iter2(secondaries_[k]->NewIterator(ropts)); - for (iter1->SeekToFirst(), iter2->SeekToFirst(); - iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) { - if (iter1->key().compare(iter2->key()) != 0 || - iter1->value().compare(iter2->value())) { - fprintf(stderr, - "Secondary %d contains different data from " - "primary.\nPrimary: %s : %s\nSecondary: %s : %s\n", - static_cast(k), - iter1->key().ToString(/*hex=*/true).c_str(), - iter1->value().ToString(/*hex=*/true).c_str(), - iter2->key().ToString(/*hex=*/true).c_str(), - iter2->value().ToString(/*hex=*/true).c_str()); - return false; - } - } - if (iter1->Valid() && !iter2->Valid()) { - fprintf(stderr, - "Secondary %d record count is smaller than that of primary\n", - static_cast(k)); - return false; - } else if (!iter1->Valid() && iter2->Valid()) { - fprintf(stderr, - "Secondary %d record count is larger than that of primary\n", - static_cast(k)); - return false; - } - } - if (FLAGS_test_secondary) { - uint64_t now = clock_->NowMicros(); - fprintf(stdout, "%s Verification of secondaries succeeded\n", - clock_->TimeToString(static_cast(now) / 1000000).c_str()); - } -#endif // ROCKSDB_LITE - return true; -} - Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf, ThreadState::SnapshotState& snap_state) { Status s; @@ -968,18 +900,6 @@ void StressTest::OperateDb(ThreadState* thread) { TestCustomOperations(thread, rand_column_families); } thread->stats.FinishedSingleOp(); -#ifndef ROCKSDB_LITE - uint32_t tid = thread->tid; - assert(secondaries_.empty() || - static_cast(tid) < secondaries_.size()); - if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) { - Status s = secondaries_[tid]->TryCatchUpWithPrimary(); - if (!s.ok()) { - VerificationAbort(shared, "Secondary instance failed to catch up", s); - break; - } - } -#endif } } while (!thread->snapshot_queue.empty()) { @@ -2619,72 +2539,34 @@ void StressTest::Open(SharedState* shared) { assert(trans.size() == 0); #endif } - assert(!s.ok() || column_families_.size() == - static_cast(FLAGS_column_families)); + assert(s.ok()); + assert(column_families_.size() == + static_cast(FLAGS_column_families)); - if (s.ok() && FLAGS_test_secondary) { -#ifndef ROCKSDB_LITE - secondaries_.resize(FLAGS_threads); - std::fill(secondaries_.begin(), secondaries_.end(), nullptr); - secondary_cfh_lists_.clear(); - secondary_cfh_lists_.resize(FLAGS_threads); - Options tmp_opts; - // TODO(yanqin) support max_open_files != -1 for secondary instance. - tmp_opts.max_open_files = -1; - tmp_opts.statistics = dbstats_secondaries; - tmp_opts.env = db_stress_env; - for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { - const std::string secondary_path = - FLAGS_secondaries_base + "/" + std::to_string(i); - s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, - cf_descriptors, &secondary_cfh_lists_[i], - &secondaries_[i]); - if (!s.ok()) { - break; - } - } -#else - fprintf(stderr, "Secondary is not supported in RocksDBLite\n"); - exit(1); -#endif - } // Secondary instance does not support write-prepared/write-unprepared // transactions, thus just disable secondary instance if we use // transaction. - if (s.ok() && FLAGS_continuous_verification_interval > 0 && - !FLAGS_use_txn && !cmp_db_) { + if (s.ok() && FLAGS_test_secondary && !FLAGS_use_txn) { +#ifndef ROCKSDB_LITE Options tmp_opts; // TODO(yanqin) support max_open_files != -1 for secondary instance. tmp_opts.max_open_files = -1; tmp_opts.env = db_stress_env; - std::string secondary_path = FLAGS_secondaries_base + "/cmp_database"; + const std::string& secondary_path = FLAGS_secondaries_base; s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, cf_descriptors, &cmp_cfhs_, &cmp_db_); - assert(!s.ok() || - cmp_cfhs_.size() == static_cast(FLAGS_column_families)); + assert(s.ok()); + assert(cmp_cfhs_.size() == static_cast(FLAGS_column_families)); +#else + fprintf(stderr, "Secondary is not supported in RocksDBLite\n"); + exit(1); +#endif // !ROCKSDB_LITE } } else { #ifndef ROCKSDB_LITE DBWithTTL* db_with_ttl; s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); db_ = db_with_ttl; - if (FLAGS_test_secondary) { - secondaries_.resize(FLAGS_threads); - std::fill(secondaries_.begin(), secondaries_.end(), nullptr); - Options tmp_opts; - tmp_opts.env = options_.env; - // TODO(yanqin) support max_open_files != -1 for secondary instance. - tmp_opts.max_open_files = -1; - for (size_t i = 0; i != static_cast(FLAGS_threads); ++i) { - const std::string secondary_path = - FLAGS_secondaries_base + "/" + std::to_string(i); - s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path, - &secondaries_[i]); - if (!s.ok()) { - break; - } - } - } #else fprintf(stderr, "TTL is not supported in RocksDBLite\n"); exit(1); @@ -2735,17 +2617,6 @@ void StressTest::Reopen(ThreadState* thread) { txn_db_ = nullptr; #endif - assert(secondaries_.size() == secondary_cfh_lists_.size()); - size_t n = secondaries_.size(); - for (size_t i = 0; i != n; ++i) { - for (auto* cf : secondary_cfh_lists_[i]) { - delete cf; - } - secondary_cfh_lists_[i].clear(); - delete secondaries_[i]; - } - secondaries_.clear(); - num_times_reopened_++; auto now = clock_->NowMicros(); fprintf(stdout, "%s Reopening database for the %dth time\n", @@ -2784,15 +2655,6 @@ void CheckAndSetOptionsForUserTimestamp(Options& options) { fprintf(stderr, "TransactionDB does not support timestamp yet.\n"); exit(1); } - if (FLAGS_read_only) { - fprintf(stderr, "When opened as read-only, timestamp not supported.\n"); - exit(1); - } - if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 || - FLAGS_continuous_verification_interval > 0) { - fprintf(stderr, "Secondary instance does not support timestamp.\n"); - exit(1); - } #ifndef ROCKSDB_LITE if (FLAGS_enable_blob_files || FLAGS_use_blob_db) { fprintf(stderr, "BlobDB not supported with timestamp.\n"); diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index a0e5c1cf8..cc792fa50 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -37,12 +37,9 @@ class StressTest { void TrackExpectedState(SharedState* shared); - // Return false if verification fails. - bool VerifySecondaries(); - void OperateDb(ThreadState* thread); virtual void VerifyDb(ThreadState* thread) const = 0; - virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const {} + virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0; void PrintStatistics(); @@ -247,10 +244,6 @@ class StressTest { std::vector options_index_; std::atomic db_preload_finished_; - // Fields used for stress-testing secondary instance in the same process - std::vector secondaries_; - std::vector> secondary_cfh_lists_; - // Fields used for continuous verification from another thread DB* cmp_db_; std::vector cmp_cfhs_; diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 8155f0757..d04bd96f2 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -235,12 +235,6 @@ int db_stress_tool(int argc, char** argv) { FLAGS_secondaries_base = default_secondaries_path; } - if (!FLAGS_test_secondary && FLAGS_secondary_catch_up_one_in > 0) { - fprintf( - stderr, - "Must set -test_secondary=true if secondary_catch_up_one_in > 0.\n"); - exit(1); - } if (FLAGS_best_efforts_recovery && !FLAGS_skip_verifydb && !FLAGS_disable_wal) { fprintf(stderr, diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index a0250f4bc..cb40913d7 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -184,6 +184,104 @@ class NonBatchedOpsStressTest : public StressTest { } } +#ifndef ROCKSDB_LITE + void ContinuouslyVerifyDb(ThreadState* thread) const override { + if (!cmp_db_) { + return; + } + assert(cmp_db_); + assert(!cmp_cfhs_.empty()); + Status s = cmp_db_->TryCatchUpWithPrimary(); + if (!s.ok()) { + assert(false); + exit(1); + } + + const auto checksum_column_family = [](Iterator* iter, + uint32_t* checksum) -> Status { + assert(nullptr != checksum); + uint32_t ret = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ret = crc32c::Extend(ret, iter->key().data(), iter->key().size()); + ret = crc32c::Extend(ret, iter->value().data(), iter->value().size()); + } + *checksum = ret; + return iter->status(); + }; + + auto* shared = thread->shared; + assert(shared); + const int64_t max_key = shared->GetMaxKey(); + ReadOptions read_opts(FLAGS_verify_checksum, true); + std::string ts_str; + Slice ts; + if (FLAGS_user_timestamp_size > 0) { + ts_str = GenerateTimestampForRead(); + ts = ts_str; + read_opts.timestamp = &ts; + } + + static Random64 rand64(shared->GetSeed()); + + { + uint32_t crc = 0; + std::unique_ptr it(cmp_db_->NewIterator(read_opts)); + s = checksum_column_family(it.get(), &crc); + if (!s.ok()) { + fprintf(stderr, "Computing checksum of default cf: %s\n", + s.ToString().c_str()); + assert(false); + } + } + + for (auto* handle : cmp_cfhs_) { + if (thread->rand.OneInOpt(3)) { + // Use Get() + uint64_t key = rand64.Uniform(static_cast(max_key)); + std::string key_str = Key(key); + std::string value; + std::string key_ts; + s = cmp_db_->Get(read_opts, handle, key_str, &value, + FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr); + s.PermitUncheckedError(); + } else { + // Use range scan + std::unique_ptr iter(cmp_db_->NewIterator(read_opts, handle)); + uint32_t rnd = (thread->rand.Next()) % 4; + if (0 == rnd) { + // SeekToFirst() + Next()*5 + read_opts.total_order_seek = true; + iter->SeekToFirst(); + for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) { + } + } else if (1 == rnd) { + // SeekToLast() + Prev()*5 + read_opts.total_order_seek = true; + iter->SeekToLast(); + for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) { + } + } else if (2 == rnd) { + // Seek() +Next()*5 + uint64_t key = rand64.Uniform(static_cast(max_key)); + std::string key_str = Key(key); + iter->Seek(key_str); + for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) { + } + } else { + // SeekForPrev() + Prev()*5 + uint64_t key = rand64.Uniform(static_cast(max_key)); + std::string key_str = Key(key); + iter->SeekForPrev(key_str); + for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) { + } + } + } + } + } +#else + void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {} +#endif // ROCKSDB_LITE + void MaybeClearOneColumnFamily(ThreadState* thread) override { if (FLAGS_column_families > 1) { if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index b1187f865..3d0facc68 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -353,8 +353,6 @@ ts_params = { "use_merge": 0, "use_full_merge_v1": 0, "use_txn": 0, - "secondary_catch_up_one_in": 0, - "continuous_verification_interval": 0, "enable_blob_files": 0, "use_blob_db": 0, "enable_compaction_filter": 0,