Add more verification to db_stress (#6173)

Summary:
Currently, db_stress performs verification by calling `VerifyDb()` at the end of test and optionally before tests start. In case of corruption or incorrect result, it will be too late. This PR adds more verification in two ways.
1. For cf consistency test, each test thread takes a snapshot and verifies every N ops. N is configurable via `-verify_db_one_in`. This option is not supported in other stress tests.
2. For cf consistency test, we use another background thread in which a secondary instance periodically tails the primary (interval is configurable). We verify the secondary. Once an error is detected, we terminate the test and report. This does not affect other stress tests.

Test plan (devserver)
```
$./db_stress -test_cf_consistency -verify_db_one_in=0 -ops_per_thread=100000 -continuous_verification_interval=100
$./db_stress -test_cf_consistency -verify_db_one_in=1000 -ops_per_thread=10000 -continuous_verification_interval=0
$make crash_test
```
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6173

Differential Revision: D19047367

Pulled By: riversand963

fbshipit-source-id: aeed584ad71f9310c111445f34975e5ab47a0615
main
Yanqin Jin 5 years ago committed by Facebook Github Bot
parent 7a7ca8eb5b
commit 670a916d01
  1. 72
      db_stress_tool/cf_consistency_stress.cc
  2. 34
      db_stress_tool/db_stress_common.cc
  3. 6
      db_stress_tool/db_stress_common.h
  4. 16
      db_stress_tool/db_stress_driver.cc
  5. 10
      db_stress_tool/db_stress_gflags.cc
  6. 1
      db_stress_tool/db_stress_shared_state.cc
  7. 34
      db_stress_tool/db_stress_shared_state.h
  8. 89
      db_stress_tool/db_stress_test_base.cc
  9. 5
      db_stress_tool/db_stress_test_base.h
  10. 18
      db_stress_tool/db_stress_tool.cc
  11. 7
      db_stress_tool/no_batched_ops_stress.cc
  12. 4
      tools/db_crashtest.py

@ -15,7 +15,7 @@ class CfConsistencyStressTest : public StressTest {
public: public:
CfConsistencyStressTest() : batch_id_(0) {} CfConsistencyStressTest() : batch_id_(0) {}
virtual ~CfConsistencyStressTest() {} ~CfConsistencyStressTest() override {}
Status TestPut(ThreadState* thread, WriteOptions& write_opts, Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */, const ReadOptions& /* read_opts */,
@ -350,6 +350,12 @@ class CfConsistencyStressTest : public StressTest {
// iterate the memtable using this iterator any more, although the memtable // iterate the memtable using this iterator any more, although the memtable
// contains the most up-to-date key-values. // contains the most up-to-date key-values.
options.total_order_seek = true; options.total_order_seek = true;
const auto ss_deleter = [this](const Snapshot* ss) {
db_->ReleaseSnapshot(ss);
};
std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
db_->GetSnapshot(), ss_deleter);
options.snapshot = snapshot_guard.get();
assert(thread != nullptr); assert(thread != nullptr);
auto shared = thread->shared; auto shared = thread->shared;
std::vector<std::unique_ptr<Iterator>> iters(column_families_.size()); std::vector<std::unique_ptr<Iterator>> iters(column_families_.size());
@ -388,9 +394,6 @@ class CfConsistencyStressTest : public StressTest {
shared->SetVerificationFailure(); shared->SetVerificationFailure();
} }
} }
if (status.ok()) {
fprintf(stdout, "Finished scanning all column families.\n");
}
break; break;
} else if (valid_cnt != iters.size()) { } else if (valid_cnt != iters.size()) {
shared->SetVerificationFailure(); shared->SetVerificationFailure();
@ -491,6 +494,67 @@ class CfConsistencyStressTest : public StressTest {
} while (true); } while (true);
} }
#ifndef ROCKSDB_LITE
void ContinuouslyVerifyDb(ThreadState* thread) const override {
assert(thread);
Status status;
DB* db_ptr = cmp_db_ ? cmp_db_ : db_;
const auto& cfhs = cmp_db_ ? cmp_cfhs_ : column_families_;
const auto ss_deleter = [&](const Snapshot* ss) {
db_ptr->ReleaseSnapshot(ss);
};
std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard(
db_ptr->GetSnapshot(), ss_deleter);
if (cmp_db_) {
status = cmp_db_->TryCatchUpWithPrimary();
}
SharedState* shared = thread->shared;
assert(shared);
if (!status.ok()) {
shared->SetShouldStopTest();
return;
}
assert(cmp_db_ || snapshot_guard.get());
const auto checksum_column_family = [](Iterator* iter,
uint32_t* checksum) -> Status {
assert(nullptr != checksum);
uint32_t ret = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ret = crc32c::Extend(ret, iter->key().data(), iter->key().size());
ret = crc32c::Extend(ret, iter->value().data(), iter->value().size());
}
*checksum = ret;
return iter->status();
};
ReadOptions ropts;
ropts.total_order_seek = true;
ropts.snapshot = snapshot_guard.get();
uint32_t crc = 0;
{
// Compute crc for all key-values of default column family.
std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts));
status = checksum_column_family(it.get(), &crc);
}
uint32_t tmp_crc = 0;
if (status.ok()) {
for (ColumnFamilyHandle* cfh : cfhs) {
if (cfh == db_ptr->DefaultColumnFamily()) {
continue;
}
std::unique_ptr<Iterator> it(db_ptr->NewIterator(ropts, cfh));
status = checksum_column_family(it.get(), &tmp_crc);
if (!status.ok() || tmp_crc != crc) {
break;
}
}
}
if (!status.ok() || tmp_crc != crc) {
shared->SetShouldStopTest();
}
}
#endif // !ROCKSDB_LITE
std::vector<int> GenerateColumnFamilies( std::vector<int> GenerateColumnFamilies(
const int /* num_column_families */, const int /* num_column_families */,
int /* rand_column_family */) const override { int /* rand_column_family */) const override {

@ -85,9 +85,11 @@ void PoolSizeChangeThread(void* v) {
while (true) { while (true) {
{ {
MutexLock l(shared->GetMutex()); MutexLock l(shared->GetMutex());
if (shared->ShoudStopBgThread()) { if (shared->ShouldStopBgThread()) {
shared->SetBgThreadFinish(); shared->IncBgThreadsFinished();
shared->GetCondVar()->SignalAll(); if (shared->BgThreadsFinished()) {
shared->GetCondVar()->SignalAll();
}
return; return;
} }
} }
@ -110,6 +112,32 @@ void PoolSizeChangeThread(void* v) {
} }
} }
void DbVerificationThread(void* v) {
assert(FLAGS_continuous_verification_interval > 0);
auto* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;
StressTest* stress_test = shared->GetStressTest();
assert(stress_test != nullptr);
while (true) {
{
MutexLock l(shared->GetMutex());
if (shared->ShouldStopBgThread()) {
shared->IncBgThreadsFinished();
if (shared->BgThreadsFinished()) {
shared->GetCondVar()->SignalAll();
}
return;
}
}
if (!shared->HasVerificationFailedYet()) {
stress_test->ContinuouslyVerifyDb(thread);
}
FLAGS_env->SleepForMicroseconds(
thread->rand.Next() % FLAGS_continuous_verification_interval * 1000 +
1);
}
}
void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) { void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz) {
if (!FLAGS_verbose) { if (!FLAGS_verbose) {
return; return;

@ -141,7 +141,7 @@ DECLARE_bool(partition_filters);
DECLARE_int32(index_type); DECLARE_int32(index_type);
DECLARE_string(db); DECLARE_string(db);
DECLARE_string(secondaries_base); DECLARE_string(secondaries_base);
DECLARE_bool(enable_secondary); DECLARE_bool(test_secondary);
DECLARE_string(expected_values_path); DECLARE_string(expected_values_path);
DECLARE_bool(verify_checksum); DECLARE_bool(verify_checksum);
DECLARE_bool(mmap_read); DECLARE_bool(mmap_read);
@ -208,6 +208,8 @@ DECLARE_bool(write_dbid_to_manifest);
DECLARE_uint64(max_write_batch_group_size_bytes); DECLARE_uint64(max_write_batch_group_size_bytes);
DECLARE_bool(level_compaction_dynamic_level_bytes); DECLARE_bool(level_compaction_dynamic_level_bytes);
DECLARE_int32(verify_checksum_one_in); DECLARE_int32(verify_checksum_one_in);
DECLARE_int32(verify_db_one_in);
DECLARE_int32(continuous_verification_interval);
const long KB = 1024; const long KB = 1024;
const int kRandomValueMaxFactor = 3; const int kRandomValueMaxFactor = 3;
@ -367,6 +369,8 @@ extern inline void SanitizeDoubleParam(double* param) {
extern void PoolSizeChangeThread(void* v); extern void PoolSizeChangeThread(void* v);
extern void DbVerificationThread(void* v);
extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz); extern void PrintKeyValue(int cf, uint64_t key, const char* value, size_t sz);
extern int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration); extern int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration);

@ -75,6 +75,11 @@ bool RunStressTest(StressTest* stress) {
if (FLAGS_compaction_thread_pool_adjust_interval > 0) { if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread); FLAGS_env->StartThread(PoolSizeChangeThread, &bg_thread);
} }
ThreadState continuous_verification_thread(0, &shared);
if (FLAGS_continuous_verification_interval > 0) {
FLAGS_env->StartThread(DbVerificationThread,
&continuous_verification_thread);
}
// Each thread goes through the following states: // Each thread goes through the following states:
// initializing -> wait for others to init -> read/populate/depopulate // initializing -> wait for others to init -> read/populate/depopulate
@ -87,9 +92,9 @@ bool RunStressTest(StressTest* stress) {
} }
if (shared.ShouldVerifyAtBeginning()) { if (shared.ShouldVerifyAtBeginning()) {
if (shared.HasVerificationFailedYet()) { if (shared.HasVerificationFailedYet()) {
printf("Crash-recovery verification failed :(\n"); fprintf(stderr, "Crash-recovery verification failed :(\n");
} else { } else {
printf("Crash-recovery verification passed :)\n"); fprintf(stdout, "Crash-recovery verification passed :)\n");
} }
} }
@ -135,10 +140,11 @@ bool RunStressTest(StressTest* stress) {
} }
stress->PrintStatistics(); stress->PrintStatistics();
if (FLAGS_compaction_thread_pool_adjust_interval > 0) { if (FLAGS_compaction_thread_pool_adjust_interval > 0 ||
FLAGS_continuous_verification_interval > 0) {
MutexLock l(shared.GetMutex()); MutexLock l(shared.GetMutex());
shared.SetShouldStopBgThread(); shared.SetShouldStopBgThread();
while (!shared.BgThreadFinished()) { while (!shared.BgThreadsFinished()) {
shared.GetCondVar()->Wait(); shared.GetCondVar()->Wait();
} }
} }
@ -148,7 +154,7 @@ bool RunStressTest(StressTest* stress) {
} }
if (shared.HasVerificationFailedYet()) { if (shared.HasVerificationFailedYet()) {
printf("Verification failed :(\n"); fprintf(stderr, "Verification failed :(\n");
return false; return false;
} }
return true; return true;

@ -307,7 +307,7 @@ DEFINE_string(db, "", "Use the db with the following name.");
DEFINE_string(secondaries_base, "", DEFINE_string(secondaries_base, "",
"Use this path as the base path for secondary instances."); "Use this path as the base path for secondary instances.");
DEFINE_bool(enable_secondary, false, "Enable secondary instance."); DEFINE_bool(test_secondary, false, "Test secondary instance.");
DEFINE_string( DEFINE_string(
expected_values_path, "", expected_values_path, "",
@ -582,4 +582,12 @@ DEFINE_int32(verify_checksum_one_in, 0,
" checksum verification of all the files in the database once for" " checksum verification of all the files in the database once for"
" every N ops on average. 0 indicates that calls to" " every N ops on average. 0 indicates that calls to"
" VerifyChecksum() are disabled."); " VerifyChecksum() are disabled.");
DEFINE_int32(verify_db_one_in, 0,
"If non-zero, call VerifyDb() once for every N ops. 0 indicates "
"that VerifyDb() will not be called in OperateDb(). Note that "
"enabling this can slow down tests.");
DEFINE_int32(continuous_verification_interval, 1000,
"While test is running, verify db every N milliseconds. 0 "
"disables continuous verification.");
#endif // GFLAGS #endif // GFLAGS

@ -13,7 +13,6 @@
namespace rocksdb { namespace rocksdb {
const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe;
// indicates a key should definitely be deleted
const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff;
} // namespace rocksdb } // namespace rocksdb
#endif // GFLAGS #endif // GFLAGS

@ -22,6 +22,8 @@ DECLARE_int32(nooverwritepercent);
DECLARE_string(expected_values_path); DECLARE_string(expected_values_path);
DECLARE_int32(clear_column_family_one_in); DECLARE_int32(clear_column_family_one_in);
DECLARE_bool(test_batches_snapshots); DECLARE_bool(test_batches_snapshots);
DECLARE_int32(compaction_thread_pool_adjust_interval);
DECLARE_int32(continuous_verification_interval);
namespace rocksdb { namespace rocksdb {
class StressTest; class StressTest;
@ -47,17 +49,19 @@ class SharedState {
num_done_(0), num_done_(0),
start_(false), start_(false),
start_verify_(false), start_verify_(false),
num_bg_threads_(0),
should_stop_bg_thread_(false), should_stop_bg_thread_(false),
bg_thread_finished_(false), bg_thread_finished_(0),
stress_test_(stress_test), stress_test_(stress_test),
verification_failure_(false), verification_failure_(false),
should_stop_test_(false),
no_overwrite_ids_(FLAGS_column_families), no_overwrite_ids_(FLAGS_column_families),
values_(nullptr), values_(nullptr),
printing_verification_results_(false) { printing_verification_results_(false) {
// Pick random keys in each column family that will not experience // Pick random keys in each column family that will not experience
// overwrite // overwrite
printf("Choosing random keys with no overwrite\n"); fprintf(stdout, "Choosing random keys with no overwrite\n");
Random64 rnd(seed_); Random64 rnd(seed_);
// Start with the identity permutation. Subsequent iterations of // Start with the identity permutation. Subsequent iterations of
// for loop below will start with perm of previous for loop // for loop below will start with perm of previous for loop
@ -159,6 +163,14 @@ class SharedState {
ptr.reset(new port::Mutex); ptr.reset(new port::Mutex);
} }
} }
if (FLAGS_compaction_thread_pool_adjust_interval > 0) {
++num_bg_threads_;
fprintf(stdout, "Starting compaction_thread_pool_adjust_thread\n");
}
if (FLAGS_continuous_verification_interval > 0) {
++num_bg_threads_;
fprintf(stdout, "Starting continuous_verification_thread\n");
}
} }
~SharedState() {} ~SharedState() {}
@ -199,7 +211,11 @@ class SharedState {
void SetVerificationFailure() { verification_failure_.store(true); } void SetVerificationFailure() { verification_failure_.store(true); }
bool HasVerificationFailedYet() { return verification_failure_.load(); } bool HasVerificationFailedYet() const { return verification_failure_.load(); }
void SetShouldStopTest() { should_stop_test_.store(true); }
bool ShouldStopTest() const { return should_stop_test_.load(); }
port::Mutex* GetMutexForKey(int cf, int64_t key) { port::Mutex* GetMutexForKey(int cf, int64_t key) {
return key_locks_[cf][key >> log2_keys_per_lock_].get(); return key_locks_[cf][key >> log2_keys_per_lock_].get();
@ -290,11 +306,13 @@ class SharedState {
void SetShouldStopBgThread() { should_stop_bg_thread_ = true; } void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
bool ShoudStopBgThread() { return should_stop_bg_thread_; } bool ShouldStopBgThread() { return should_stop_bg_thread_; }
void SetBgThreadFinish() { bg_thread_finished_ = true; } void IncBgThreadsFinished() { ++bg_thread_finished_; }
bool BgThreadFinished() const { return bg_thread_finished_; } bool BgThreadsFinished() const {
return bg_thread_finished_ == num_bg_threads_;
}
bool ShouldVerifyAtBeginning() const { bool ShouldVerifyAtBeginning() const {
return expected_mmap_buffer_.get() != nullptr; return expected_mmap_buffer_.get() != nullptr;
@ -323,10 +341,12 @@ class SharedState {
long num_done_; long num_done_;
bool start_; bool start_;
bool start_verify_; bool start_verify_;
int num_bg_threads_;
bool should_stop_bg_thread_; bool should_stop_bg_thread_;
bool bg_thread_finished_; int bg_thread_finished_;
StressTest* stress_test_; StressTest* stress_test_;
std::atomic<bool> verification_failure_; std::atomic<bool> verification_failure_;
std::atomic<bool> should_stop_test_;
// Keys that should not be overwritten // Keys that should not be overwritten
std::unordered_set<size_t> no_overwrite_ids_; std::unordered_set<size_t> no_overwrite_ids_;

@ -28,7 +28,8 @@ StressTest::StressTest()
#endif #endif
new_column_family_name_(1), new_column_family_name_(1),
num_times_reopened_(0), num_times_reopened_(0),
db_preload_finished_(false) { db_preload_finished_(false),
cmp_db_(nullptr) {
if (FLAGS_destroy_db_initially) { if (FLAGS_destroy_db_initially) {
std::vector<std::string> files; std::vector<std::string> files;
FLAGS_env->GetChildren(FLAGS_db, &files); FLAGS_env->GetChildren(FLAGS_db, &files);
@ -65,6 +66,12 @@ StressTest::~StressTest() {
delete secondaries_[i]; delete secondaries_[i];
} }
secondaries_.clear(); secondaries_.clear();
for (auto* cf : cmp_cfhs_) {
delete cf;
}
cmp_cfhs_.clear();
delete cmp_db_;
} }
std::shared_ptr<Cache> StressTest::NewCache(size_t capacity) { std::shared_ptr<Cache> StressTest::NewCache(size_t capacity) {
@ -187,7 +194,7 @@ void StressTest::InitReadonlyDb(SharedState* shared) {
bool StressTest::VerifySecondaries() { bool StressTest::VerifySecondaries() {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (FLAGS_enable_secondary) { if (FLAGS_test_secondary) {
uint64_t now = FLAGS_env->NowMicros(); uint64_t now = FLAGS_env->NowMicros();
fprintf( fprintf(
stdout, "%s Start to verify secondaries against primary\n", stdout, "%s Start to verify secondaries against primary\n",
@ -232,7 +239,7 @@ bool StressTest::VerifySecondaries() {
return false; return false;
} }
} }
if (FLAGS_enable_secondary) { if (FLAGS_test_secondary) {
uint64_t now = FLAGS_env->NowMicros(); uint64_t now = FLAGS_env->NowMicros();
fprintf( fprintf(
stdout, "%s Verification of secondaries succeeded\n", stdout, "%s Verification of secondaries succeeded\n",
@ -294,15 +301,16 @@ Status StressTest::AssertSame(DB* db, ColumnFamilyHandle* cf,
void StressTest::VerificationAbort(SharedState* shared, std::string msg, void StressTest::VerificationAbort(SharedState* shared, std::string msg,
Status s) const { Status s) const {
printf("Verification failed: %s. Status is %s\n", msg.c_str(), fprintf(stderr, "Verification failed: %s. Status is %s\n", msg.c_str(),
s.ToString().c_str()); s.ToString().c_str());
shared->SetVerificationFailure(); shared->SetVerificationFailure();
} }
void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf, void StressTest::VerificationAbort(SharedState* shared, std::string msg, int cf,
int64_t key) const { int64_t key) const {
printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf, fprintf(stderr,
key, msg.c_str()); "Verification failed for column family %d key %" PRIi64 ": %s\n", cf,
key, msg.c_str());
shared->SetVerificationFailure(); shared->SetVerificationFailure();
} }
@ -466,15 +474,17 @@ void StressTest::OperateDb(ThreadState* thread) {
write_opts.sync = true; write_opts.sync = true;
} }
write_opts.disableWAL = FLAGS_disable_wal; write_opts.disableWAL = FLAGS_disable_wal;
const int prefixBound = (int)FLAGS_readpercent + (int)FLAGS_prefixpercent; const int prefixBound = static_cast<int>(FLAGS_readpercent) +
const int writeBound = prefixBound + (int)FLAGS_writepercent; static_cast<int>(FLAGS_prefixpercent);
const int delBound = writeBound + (int)FLAGS_delpercent; const int writeBound = prefixBound + static_cast<int>(FLAGS_writepercent);
const int delRangeBound = delBound + (int)FLAGS_delrangepercent; const int delBound = writeBound + static_cast<int>(FLAGS_delpercent);
const int delRangeBound = delBound + static_cast<int>(FLAGS_delrangepercent);
const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1); const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1);
thread->stats.Start(); thread->stats.Start();
for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) { for (int open_cnt = 0; open_cnt <= FLAGS_reopen; ++open_cnt) {
if (thread->shared->HasVerificationFailedYet()) { if (thread->shared->HasVerificationFailedYet() ||
thread->shared->ShouldStopTest()) {
break; break;
} }
if (open_cnt != 0) { if (open_cnt != 0) {
@ -510,12 +520,20 @@ void StressTest::OperateDb(ThreadState* thread) {
options_.inplace_update_support ^= options_.inplace_update_support; options_.inplace_update_support ^= options_.inplace_update_support;
} }
if (thread->tid == 0 && FLAGS_verify_db_one_in > 0 &&
thread->rand.OneIn(FLAGS_verify_db_one_in)) {
ContinuouslyVerifyDb(thread);
if (thread->shared->ShouldStopTest()) {
break;
}
}
MaybeClearOneColumnFamily(thread); MaybeClearOneColumnFamily(thread);
if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) { if (thread->rand.OneInOpt(FLAGS_sync_wal_one_in)) {
Status s = db_->SyncWAL(); Status s = db_->SyncWAL();
if (!s.ok() && !s.IsNotSupported()) { if (!s.ok() && !s.IsNotSupported()) {
fprintf(stdout, "SyncWAL() failed: %s\n", s.ToString().c_str()); fprintf(stderr, "SyncWAL() failed: %s\n", s.ToString().c_str());
} }
} }
@ -606,7 +624,7 @@ void StressTest::OperateDb(ThreadState* thread) {
// Reset this in case we pick something other than a read op. We don't // Reset this in case we pick something other than a read op. We don't
// want to use a stale value when deciding at the beginning of the loop // want to use a stale value when deciding at the beginning of the loop
// whether to vote to reopen // whether to vote to reopen
if (prob_op < (int)FLAGS_readpercent) { if (prob_op >= 0 && prob_op < static_cast<int>(FLAGS_readpercent)) {
assert(0 <= prob_op); assert(0 <= prob_op);
// OPERATION read // OPERATION read
if (FLAGS_use_multiget) { if (FLAGS_use_multiget) {
@ -625,7 +643,7 @@ void StressTest::OperateDb(ThreadState* thread) {
TestGet(thread, read_opts, rand_column_families, rand_keys); TestGet(thread, read_opts, rand_column_families, rand_keys);
} }
} else if (prob_op < prefixBound) { } else if (prob_op < prefixBound) {
assert((int)FLAGS_readpercent <= prob_op); assert(static_cast<int>(FLAGS_readpercent) <= prob_op);
// OPERATION prefix scan // OPERATION prefix scan
// keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are
// (8 - FLAGS_prefix_size) bytes besides the prefix. So there will // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will
@ -1098,8 +1116,8 @@ Status StressTest::TestBackupRestore(
restored_db = nullptr; restored_db = nullptr;
} }
if (!s.ok()) { if (!s.ok()) {
printf("A backup/restore operation failed with: %s\n", fprintf(stderr, "A backup/restore operation failed with: %s\n",
s.ToString().c_str()); s.ToString().c_str());
} }
return s; return s;
} }
@ -1372,7 +1390,8 @@ void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
Status status = db_->CompactRange(cro, column_family, &start_key, &end_key); Status status = db_->CompactRange(cro, column_family, &start_key, &end_key);
if (!status.ok()) { if (!status.ok()) {
printf("Unable to perform CompactRange(): %s\n", status.ToString().c_str()); fprintf(stdout, "Unable to perform CompactRange(): %s\n",
status.ToString().c_str());
} }
if (pre_snapshot != nullptr) { if (pre_snapshot != nullptr) {
@ -1699,16 +1718,16 @@ void StressTest::Open() {
existing_column_families.end()); existing_column_families.end());
if (sorted_cfn != existing_column_families) { if (sorted_cfn != existing_column_families) {
fprintf(stderr, "Expected column families differ from the existing:\n"); fprintf(stderr, "Expected column families differ from the existing:\n");
printf("Expected: {"); fprintf(stderr, "Expected: {");
for (auto cf : sorted_cfn) { for (auto cf : sorted_cfn) {
printf("%s ", cf.c_str()); fprintf(stderr, "%s ", cf.c_str());
} }
printf("}\n"); fprintf(stderr, "}\n");
printf("Existing: {"); fprintf(stderr, "Existing: {");
for (auto cf : existing_column_families) { for (auto cf : existing_column_families) {
printf("%s ", cf.c_str()); fprintf(stderr, "%s ", cf.c_str());
} }
printf("}\n"); fprintf(stderr, "}\n");
} }
assert(sorted_cfn == existing_column_families); assert(sorted_cfn == existing_column_families);
} }
@ -1775,14 +1794,15 @@ void StressTest::Open() {
assert(!s.ok() || column_families_.size() == assert(!s.ok() || column_families_.size() ==
static_cast<size_t>(FLAGS_column_families)); static_cast<size_t>(FLAGS_column_families));
if (FLAGS_enable_secondary) { if (FLAGS_test_secondary) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
secondaries_.resize(FLAGS_threads); secondaries_.resize(FLAGS_threads);
std::fill(secondaries_.begin(), secondaries_.end(), nullptr); std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
secondary_cfh_lists_.clear(); secondary_cfh_lists_.clear();
secondary_cfh_lists_.resize(FLAGS_threads); secondary_cfh_lists_.resize(FLAGS_threads);
Options tmp_opts; Options tmp_opts;
tmp_opts.max_open_files = FLAGS_open_files; // TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
tmp_opts.statistics = dbstats_secondaries; tmp_opts.statistics = dbstats_secondaries;
tmp_opts.env = FLAGS_env; tmp_opts.env = FLAGS_env;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) { for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
@ -1795,22 +1815,35 @@ void StressTest::Open() {
break; break;
} }
} }
assert(s.ok());
#else #else
fprintf(stderr, "Secondary is not supported in RocksDBLite\n"); fprintf(stderr, "Secondary is not supported in RocksDBLite\n");
exit(1); exit(1);
#endif #endif
} }
if (FLAGS_continuous_verification_interval > 0 && !cmp_db_) {
Options tmp_opts;
// TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
tmp_opts.env = FLAGS_env;
std::string secondary_path = FLAGS_secondaries_base + "/cmp_database";
s = DB::OpenAsSecondary(tmp_opts, FLAGS_db, secondary_path,
cf_descriptors, &cmp_cfhs_, &cmp_db_);
assert(!s.ok() ||
cmp_cfhs_.size() == static_cast<size_t>(FLAGS_column_families));
}
} else { } else {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
DBWithTTL* db_with_ttl; DBWithTTL* db_with_ttl;
s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl);
db_ = db_with_ttl; db_ = db_with_ttl;
if (FLAGS_enable_secondary) { if (FLAGS_test_secondary) {
secondaries_.resize(FLAGS_threads); secondaries_.resize(FLAGS_threads);
std::fill(secondaries_.begin(), secondaries_.end(), nullptr); std::fill(secondaries_.begin(), secondaries_.end(), nullptr);
Options tmp_opts; Options tmp_opts;
tmp_opts.env = options_.env; tmp_opts.env = options_.env;
tmp_opts.max_open_files = FLAGS_open_files; // TODO(yanqin) support max_open_files != -1 for secondary instance.
tmp_opts.max_open_files = -1;
for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) { for (size_t i = 0; i != static_cast<size_t>(FLAGS_threads); ++i) {
const std::string secondary_path = const std::string secondary_path =
FLAGS_secondaries_base + "/" + std::to_string(i); FLAGS_secondaries_base + "/" + std::to_string(i);

@ -34,6 +34,7 @@ class StressTest {
void OperateDb(ThreadState* thread); void OperateDb(ThreadState* thread);
virtual void VerifyDb(ThreadState* thread) const = 0; virtual void VerifyDb(ThreadState* thread) const = 0;
virtual void ContinuouslyVerifyDb(ThreadState* /*thread*/) const {}
void PrintStatistics(); void PrintStatistics();
@ -203,6 +204,10 @@ class StressTest {
// Fields used for stress-testing secondary instance in the same process // Fields used for stress-testing secondary instance in the same process
std::vector<DB*> secondaries_; std::vector<DB*> secondaries_;
std::vector<std::vector<ColumnFamilyHandle*>> secondary_cfh_lists_; std::vector<std::vector<ColumnFamilyHandle*>> secondary_cfh_lists_;
// Fields used for continuous verification from another thread
DB* cmp_db_;
std::vector<ColumnFamilyHandle*> cmp_cfhs_;
}; };
} // namespace rocksdb } // namespace rocksdb

@ -41,7 +41,7 @@ int db_stress_tool(int argc, char** argv) {
if (FLAGS_statistics) { if (FLAGS_statistics) {
dbstats = rocksdb::CreateDBStatistics(); dbstats = rocksdb::CreateDBStatistics();
if (FLAGS_enable_secondary) { if (FLAGS_test_secondary) {
dbstats_secondaries = rocksdb::CreateDBStatistics(); dbstats_secondaries = rocksdb::CreateDBStatistics();
} }
} }
@ -171,7 +171,8 @@ int db_stress_tool(int argc, char** argv) {
FLAGS_db = default_db_path; FLAGS_db = default_db_path;
} }
if (FLAGS_enable_secondary && FLAGS_secondaries_base.empty()) { if ((FLAGS_test_secondary || FLAGS_continuous_verification_interval > 0) &&
FLAGS_secondaries_base.empty()) {
std::string default_secondaries_path; std::string default_secondaries_path;
FLAGS_env->GetTestDirectory(&default_secondaries_path); FLAGS_env->GetTestDirectory(&default_secondaries_path);
default_secondaries_path += "/dbstress_secondaries"; default_secondaries_path += "/dbstress_secondaries";
@ -184,8 +185,17 @@ int db_stress_tool(int argc, char** argv) {
FLAGS_secondaries_base = default_secondaries_path; FLAGS_secondaries_base = default_secondaries_path;
} }
if (!FLAGS_enable_secondary && FLAGS_secondary_catch_up_one_in > 0) { if (!FLAGS_test_secondary && FLAGS_secondary_catch_up_one_in > 0) {
fprintf(stderr, "Secondary instance is disabled.\n"); fprintf(
stderr,
"Must set -test_secondary=true if secondary_catch_up_one_in > 0.\n");
exit(1);
}
if (!FLAGS_test_cf_consistency && FLAGS_verify_db_one_in > 0) {
fprintf(stderr,
"For non cf_consistency tests, VerifyDb() is called only before "
"and after test.\n");
exit(1); exit(1);
} }

@ -42,10 +42,9 @@ class NonBatchedOpsStressTest : public StressTest {
if (thread->shared->HasVerificationFailedYet()) { if (thread->shared->HasVerificationFailedYet()) {
break; break;
} }
// TODO(ljin): update "long" to uint64_t
// Reseek when the prefix changes // Reseek when the prefix changes
if (prefix_to_use > 0 && if (prefix_to_use > 0 &&
i % (static_cast<int64_t>(1) << 8 * (8 - prefix_to_use)) == 0) { i % (static_cast<uint64_t>(1) << 8 * (8 - prefix_to_use)) == 0) {
iter->Seek(Key(i)); iter->Seek(Key(i));
} }
std::string from_db; std::string from_db;
@ -65,7 +64,7 @@ class NonBatchedOpsStressTest : public StressTest {
} else { } else {
// The iterator found no value for the key in question, so do not // The iterator found no value for the key in question, so do not
// move to the next item in the iterator // move to the next item in the iterator
s = Status::NotFound(Slice()); s = Status::NotFound();
} }
VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s, VerifyValue(static_cast<int>(cf), i, options, shared, from_db, s,
true); true);
@ -506,7 +505,7 @@ class NonBatchedOpsStressTest : public StressTest {
bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/, bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/,
SharedState* shared, const std::string& value_from_db, SharedState* shared, const std::string& value_from_db,
Status s, bool strict = false) const { const Status& s, bool strict = false) const {
if (shared->HasVerificationFailedYet()) { if (shared->HasVerificationFailedYet()) {
return false; return false;
} }

@ -99,7 +99,9 @@ default_params = {
# Temporarily disabled because of assertion violations in # Temporarily disabled because of assertion violations in
# BlockBasedTable::ApproximateSize # BlockBasedTable::ApproximateSize
# "level_compaction_dynamic_level_bytes" : True, # "level_compaction_dynamic_level_bytes" : True,
"verify_checksum_one_in": 1000000 "verify_checksum_one_in": 1000000,
"verify_db_one_in": 100000,
"continuous_verification_interval" : 0
} }
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR' _TEST_DIR_ENV_VAR = 'TEST_TMPDIR'

Loading…
Cancel
Save