Update test for secondary instance in stress test (#10121)

Summary:
This PR updates secondary instance testing in stress test by default.

A background thread will be started (disabled by default), running a secondary instance tailing the logs of the primary.

Periodically (every 1 sec), this thread calls `TryCatchUpWithPrimary()` and uses point lookup or range scan
to read some random keys with only very basic verification to make sure no assertion failure is triggered.

Thanks to https://github.com/facebook/rocksdb/issues/10061 , we can enable secondary instance when user-defined timestamp is enabled.

Also removed a less useful test configuration, `secondary_catch_up_one_in`. This is very similar to the periodic
catch-up.

In the last commit, I decided not to enable it now, but just update the tests, since secondary instance does not
work well when the underlying file is renamed by primary, e.g. SstFileManager.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10121

Test Plan:
```
TEST_TMPDIR=/dev/shm/rocksdb make crash_test
TEST_TMPDIR=/dev/shm/rocksdb make crash_test_with_ts
TEST_TMPDIR=/dev/shm/rocksdb make crash_test_with_atomic_flush
```

Reviewed By: ajkr

Differential Revision: D36939458

Pulled By: riversand963

fbshipit-source-id: 1c065b7efc3690fc341569b9d369a5cbd8ef6b3e
main
Yanqin Jin 3 years ago committed by Facebook GitHub Bot
parent ff32346415
commit f890527b16
  1. 2
      db_stress_tool/batched_ops_stress.cc
  2. 61
      db_stress_tool/cf_consistency_stress.cc
  3. 1
      db_stress_tool/db_stress_common.h
  4. 4
      db_stress_tool/db_stress_driver.cc
  5. 9
      db_stress_tool/db_stress_gflags.cc
  6. 162
      db_stress_tool/db_stress_test_base.cc
  7. 9
      db_stress_tool/db_stress_test_base.h
  8. 6
      db_stress_tool/db_stress_tool.cc
  9. 98
      db_stress_tool/no_batched_ops_stress.cc
  10. 2
      tools/db_crashtest.py

@ -340,6 +340,8 @@ class BatchedOpsStressTest : public StressTest {
}
void VerifyDb(ThreadState* /* thread */) const override {}
void ContinuouslyVerifyDb(ThreadState* /* thread */) const override {}
};
StressTest* CreateBatchedOpsStressTest() { return new BatchedOpsStressTest(); }

@ -448,21 +448,24 @@ class CfConsistencyStressTest : public StressTest {
DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
const auto ss_deleter = [&](const Snapshot* ss) {
db_ptr->ReleaseSnapshot(ss);
};
std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
db_ptr->GetSnapshot(), ss_deleter);
if (cmp_db_) {
status = cmp_db_->TryCatchUpWithPrimary();
}
// Take a snapshot to preserve the state of primary db.
ManagedSnapshot snapshot_guard(db_);
SharedState* shared = thread->shared;
assert(shared);
if (!status.ok()) {
shared->SetShouldStopTest();
return;
if (cmp_db_) {
status = cmp_db_->TryCatchUpWithPrimary();
if (!status.ok()) {
fprintf(stderr, "TryCatchUpWithPrimary: %s\n",
status.ToString().c_str());
shared->SetShouldStopTest();
assert(false);
return;
}
}
assert(cmp_db_ || snapshot_guard.get());
const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
@ -476,17 +479,29 @@ class CfConsistencyStressTest : public StressTest {
};
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
ReadOptions ropts(FLAGS_verify_checksum, true);
ropts.total_order_seek = true;
ropts.snapshot = snapshot_guard.get();
if (nullptr == cmp_db_) {
ropts.snapshot = snapshot_guard.snapshot();
}
uint32_t crc = 0;
{
// Compute crc for all key-values of default column family.
std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
status = checksum_column_family(it.get(), &crc);
if (!status.ok()) {
fprintf(stderr, "Computing checksum of default cf: %s\n",
status.ToString().c_str());
assert(false);
}
}
uint32_t tmp_crc = 0;
if (status.ok()) {
// Since we currently intentionally disallow reading from the secondary
// instance with snapshot, we cannot achieve cross-cf consistency if WAL is
// enabled because there is no guarantee that secondary instance replays
// the primary's WAL to a consistent point where all cfs have the same
// data.
if (status.ok() && FLAGS_disable_wal) {
uint32_t tmp_crc = 0;
for (ColumnFamilyHandle* cfh : cfhs) {
if (cfh == db_ptr->DefaultColumnFamily()) {
continue;
@ -497,11 +512,19 @@ class CfConsistencyStressTest : public StressTest {
break;
}
}
}
if (!status.ok() || tmp_crc != crc) {
shared->SetShouldStopTest();
if (!status.ok()) {
fprintf(stderr, "status: %s\n", status.ToString().c_str());
shared->SetShouldStopTest();
assert(false);
} else if (tmp_crc != crc) {
fprintf(stderr, "tmp_crc=%" PRIu32 " crc=%" PRIu32 "\n", tmp_crc, crc);
shared->SetShouldStopTest();
assert(false);
}
}
}
#else // ROCKSDB_LITE
void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {}
#endif // !ROCKSDB_LITE
std::vector<int> GenerateColumnFamilies(

@ -229,7 +229,6 @@ DECLARE_uint64(ops_per_thread);
DECLARE_uint64(log2_keys_per_lock);
DECLARE_uint64(max_manifest_file_size);
DECLARE_bool(in_place_update);
DECLARE_int32(secondary_catch_up_one_in);
DECLARE_string(memtablerep);
DECLARE_int32(prefix_size);
DECLARE_bool(use_merge);

@ -177,10 +177,6 @@ bool RunStressTest(StressTest* stress) {
}
}
if (!stress->VerifySecondaries()) {
return false;
}
if (shared.HasVerificationFailedYet()) {
fprintf(stderr, "Verification failed :(\n");
return false;

@ -500,7 +500,9 @@ DEFINE_string(db, "", "Use the db with the following name.");
DEFINE_string(secondaries_base, "",
"Use this path as the base path for secondary instances.");
DEFINE_bool(test_secondary, false, "Test secondary instance.");
DEFINE_bool(test_secondary, false,
"If true, start an additional secondary instance which can be used "
"for verification.");
DEFINE_string(
expected_values_dir, "",
@ -790,11 +792,6 @@ DEFINE_uint64(max_manifest_file_size, 16384, "Maximum size of a MANIFEST file");
DEFINE_bool(in_place_update, false, "On true, does inplace update in memtable");
DEFINE_int32(secondary_catch_up_one_in, 0,
"If non-zero, the secondaries attemp to catch up with the primary "
"once for every N operations on average. 0 indicates the "
"secondaries do not try to catch up after open.");
DEFINE_string(memtablerep, "skip_list", "");
inline static bool ValidatePrefixSize(const char* flagname, int32_t value) {

@ -107,17 +107,6 @@ StressTest::~StressTest() {
column_families_.clear();
delete db_;
assert(secondaries_.size() == secondary_cfh_lists_.size());
size_t n = secondaries_.size();
for (size_t i = 0; i != n; ++i) {
for (auto* cf : secondary_cfh_lists_[i]) {
delete cf;
}
secondary_cfh_lists_[i].clear();
delete secondaries_[i];
}
secondaries_.clear();
for (auto* cf : cmp_cfhs_) {
delete cf;
}
@ -347,63 +336,6 @@ void StressTest::TrackExpectedState(SharedState* shared) {
}
}
bool StressTest::VerifySecondaries() {
#ifndef ROCKSDB_LITE
if (FLAGS_test_secondary) {
uint64_t now = clock_->NowMicros();
fprintf(stdout, "%s Start to verify secondaries against primary\n",
clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
}
for (size_t k = 0; k != secondaries_.size(); ++k) {
Status s = secondaries_[k]->TryCatchUpWithPrimary();
if (!s.ok()) {
fprintf(stderr, "Secondary failed to catch up with primary\n");
return false;
}
// This `ReadOptions` is for validation purposes. Ignore
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
ReadOptions ropts;
ropts.total_order_seek = true;
// Verify only the default column family since the primary may have
// dropped other column families after most recent reopen.
std::unique_ptr<Iterator> iter1(db_->NewIterator(ropts));
std::unique_ptr<Iterator> iter2(secondaries_[k]->NewIterator(ropts));
for (iter1->SeekToFirst(), iter2->SeekToFirst();
iter1->Valid() && iter2->Valid(); iter1->Next(), iter2->Next()) {
if (iter1->key().compare(iter2->key()) != 0 ||
iter1->value().compare(iter2->value())) {
fprintf(stderr,
"Secondary %d contains different data from "
"primary.\nPrimary: %s : %s\nSecondary: %s : %s\n",
static_cast<int>(k),
iter1->key().ToString(/*hex=*/true).c_str(),
iter1->value().ToString(/*hex=*/true).c_str(),
iter2->key().ToString(/*hex=*/true).c_str(),
iter2->value().ToString(/*hex=*/true).c_str());
return false;
}
}
if (iter1->Valid() && !iter2->Valid()) {
fprintf(stderr,
"Secondary %d record count is smaller than that of primary\n",
static_cast<int>(k));
return false;
} else if (!iter1->Valid() && iter2->Valid()) {
fprintf(stderr,
"Secondary %d record count is larger than that of primary\n",
static_cast<int>(k));
return false;
}
}
if (FLAGS_test_secondary) {
uint64_t now = clock_->NowMicros();
fprintf(stdout, "%s Verification of secondaries succeeded\n",
clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
}
#endif // ROCKSDB_LITE
return true;
}
Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
ThreadState::SnapshotState& snap_state) {
Status s;
@ -968,18 +900,6 @@ void StressTest::OperateDb(ThreadState* thread) {
TestCustomOperations(thread, rand_column_families);
}
thread->stats.FinishedSingleOp();
#ifndef ROCKSDB_LITE
uint32_t tid = thread->tid;
assert(secondaries_.empty() ||
static_cast<size_t>(tid) < secondaries_.size());
if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
Status s = secondaries_[tid]->TryCatchUpWithPrimary();
if (!s.ok()) {
VerificationAbort(shared, "Secondary instance failed to catch up", s);
break;
}
}
#endif
}
}
while (!thread->snapshot_queue.empty()) {
@ -2619,72 +2539,34 @@ void StressTest::Open(SharedState* shared) {
assert(trans.size() == 0);
#endif
}
assert(!s.ok() || column_families_.size() ==
static_cast<size_t>(FLAGS_column_families));
assert(s.ok());
assert(column_families_.size() ==
static_cast<size_t>(FLAGS_column_families));
if (s.ok() && FLAGS_test_secondary) {
#ifndef ROCKSDB_LITE
secondaries_.resize(FLAGS_threads);
std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
secondary_cfh_lists_.clear();
secondary_cfh_lists_.resize(FLAGS_threads);
Options tmp_opts;
// TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
tmp_opts.statistics = dbstats_secondaries;
tmp_opts.env = db_stress_env;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
const std::string secondary_path =
FLAGS_secondaries_base + "/" + std::to_string(i);
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &secondary_cfh_lists_[i],
&secondaries_[i]);
if (!s.ok()) {
break;
}
}
#else
fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
exit(1);
#endif
}
// Secondary instance does not support write-prepared/write-unprepared
// transactions, thus just disable secondary instance if we use
// transaction.
if (s.ok() && FLAGS_continuous_verification_interval > 0 &&
!FLAGS_use_txn && !cmp_db_) {
if (s.ok() && FLAGS_test_secondary && !FLAGS_use_txn) {
#ifndef ROCKSDB_LITE
Options tmp_opts;
// TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
tmp_opts.env = db_stress_env;
std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
const std::string& secondary_path = FLAGS_secondaries_base;
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &cmp_cfhs_, &cmp_db_);
assert(!s.ok() ||
cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
assert(s.ok());
assert(cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
#else
fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
exit(1);
#endif // !ROCKSDB_LITE
}
} else {
#ifndef ROCKSDB_LITE
DBWithTTL* db_with_ttl;
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
db_ = db_with_ttl;
if (FLAGS_test_secondary) {
secondaries_.resize(FLAGS_threads);
std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
Options tmp_opts;
tmp_opts.env = options_.env;
// TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
const std::string secondary_path =
FLAGS_secondaries_base + "/" + std::to_string(i);
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
&secondaries_[i]);
if (!s.ok()) {
break;
}
}
}
#else
fprintf(stderr, "TTL is not supported in RocksDBLite\n");
exit(1);
@ -2735,17 +2617,6 @@ void StressTest::Reopen(ThreadState* thread) {
txn_db_ = nullptr;
#endif
assert(secondaries_.size() == secondary_cfh_lists_.size());
size_t n = secondaries_.size();
for (size_t i = 0; i != n; ++i) {
for (auto* cf : secondary_cfh_lists_[i]) {
delete cf;
}
secondary_cfh_lists_[i].clear();
delete secondaries_[i];
}
secondaries_.clear();
num_times_reopened_++;
auto now = clock_->NowMicros();
fprintf(stdout, "%s Reopening database for the %dth time\n",
@ -2784,15 +2655,6 @@ void CheckAndSetOptionsForUserTimestamp(Options& options) {
fprintf(stderr, "TransactionDB does not support timestamp yet.\n");
exit(1);
}
if (FLAGS_read_only) {
fprintf(stderr, "When opened as read-only, timestamp not supported.\n");
exit(1);
}
if (FLAGS_test_secondary || FLAGS_secondary_catch_up_one_in > 0 ||
FLAGS_continuous_verification_interval > 0) {
fprintf(stderr, "Secondary instance does not support timestamp.\n");
exit(1);
}
#ifndef ROCKSDB_LITE
if (FLAGS_enable_blob_files || FLAGS_use_blob_db) {
fprintf(stderr, "BlobDB not supported with timestamp.\n");

@ -37,12 +37,9 @@ class StressTest {
void TrackExpectedState(SharedState* shared);
// Return false if verification fails.
bool VerifySecondaries();
void OperateDb(ThreadState* thread);
virtual void VerifyDb(ThreadState* thread) const = 0;
virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const {}
virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const = 0;
void PrintStatistics();
@ -247,10 +244,6 @@ class StressTest {
std::vector<std::string> options_index_;
std::atomic<bool> db_preload_finished_;
// Fields used for stress-testing secondary instance in the same process
std::vector<DB*> secondaries_;
std::vector<std::vector<ColumnFamilyHandle*>> secondary_cfh_lists_;
// Fields used for continuous verification from another thread
DB* cmp_db_;
std::vector<ColumnFamilyHandle*> cmp_cfhs_;

@ -235,12 +235,6 @@ int db_stress_tool(int argc, char** argv) {
FLAGS_secondaries_base = default_secondaries_path;
}
if (!FLAGS_test_secondary && FLAGS_secondary_catch_up_one_in > 0) {
fprintf(
stderr,
"Must set -test_secondary=true if secondary_catch_up_one_in > 0.\n");
exit(1);
}
if (FLAGS_best_efforts_recovery && !FLAGS_skip_verifydb &&
!FLAGS_disable_wal) {
fprintf(stderr,

@ -184,6 +184,104 @@ class NonBatchedOpsStressTest : public StressTest {
}
}
#ifndef ROCKSDB_LITE
void ContinuouslyVerifyDb(ThreadState* thread) const override {
if (!cmp_db_) {
return;
}
assert(cmp_db_);
assert(!cmp_cfhs_.empty());
Status s = cmp_db_->TryCatchUpWithPrimary();
if (!s.ok()) {
assert(false);
exit(1);
}
const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
uint32_t ret = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
}
*checksum = ret;
return iter->status();
};
auto* shared = thread->shared;
assert(shared);
const int64_t max_key = shared->GetMaxKey();
ReadOptions read_opts(FLAGS_verify_checksum, true);
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GenerateTimestampForRead();
ts = ts_str;
read_opts.timestamp = &ts;
}
static Random64 rand64(shared->GetSeed());
{
uint32_t crc = 0;
std::unique_ptr<Iterator> it(cmp_db_->NewIterator(read_opts));
s = checksum_column_family(it.get(), &crc);
if (!s.ok()) {
fprintf(stderr, "Computing checksum of default cf: %s\n",
s.ToString().c_str());
assert(false);
}
}
for (auto* handle : cmp_cfhs_) {
if (thread->rand.OneInOpt(3)) {
// Use Get()
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
std::string value;
std::string key_ts;
s = cmp_db_->Get(read_opts, handle, key_str, &value,
FLAGS_user_timestamp_size > 0 ? &key_ts : nullptr);
s.PermitUncheckedError();
} else {
// Use range scan
std::unique_ptr<Iterator> iter(cmp_db_->NewIterator(read_opts, handle));
uint32_t rnd = (thread->rand.Next()) % 4;
if (0 == rnd) {
// SeekToFirst() + Next()*5
read_opts.total_order_seek = true;
iter->SeekToFirst();
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
}
} else if (1 == rnd) {
// SeekToLast() + Prev()*5
read_opts.total_order_seek = true;
iter->SeekToLast();
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
}
} else if (2 == rnd) {
// Seek() +Next()*5
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
iter->Seek(key_str);
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Next()) {
}
} else {
// SeekForPrev() + Prev()*5
uint64_t key = rand64.Uniform(static_cast<uint64_t>(max_key));
std::string key_str = Key(key);
iter->SeekForPrev(key_str);
for (int i = 0; i < 5 && iter->Valid(); ++i, iter->Prev()) {
}
}
}
}
}
#else
void ContinuouslyVerifyDb(ThreadState* /*thread*/) const override {}
#endif // ROCKSDB_LITE
void MaybeClearOneColumnFamily(ThreadState* thread) override {
if (FLAGS_column_families > 1) {
if (thread->rand.OneInOpt(FLAGS_clear_column_family_one_in)) {

@ -353,8 +353,6 @@ ts_params = {
"use_merge": 0,
"use_full_merge_v1": 0,
"use_txn": 0,
"secondary_catch_up_one_in": 0,
"continuous_verification_interval": 0,
"enable_blob_files": 0,
"use_blob_db": 0,
"enable_compaction_filter": 0,

Loading…
Cancel
Save