Refactor pulling out parts of StressTest::OperateDb (#6195)

Summary:
Complete some refactoring called for in https://github.com/facebook/rocksdb/issues/6148. Somehow I got some 'make format' in here for files I didn't change, but that should be OK. (I'm not sure why "hide whitespace changes" doesn't seem to help in review.)

Not addressed in this PR: some operations simply print to stdout rather than failing on discovering a bad status or inconsistency.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6195

Differential Revision: D19131067

Pulled By: pdillinger

fbshipit-source-id: 4f416e6b792023989ef119f385fe122426cb825d
main
Peter Dillinger 5 years ago committed by Facebook Github Bot
parent 77d5ba7887
commit 873331fe49
  1. 305
      db_stress_tool/db_stress_test_base.cc
  2. 11
      db_stress_tool/db_stress_test_base.h

@ -519,58 +519,14 @@ void StressTest::OperateDb(ThreadState* thread) {
}
}
#ifndef ROCKSDB_LITE
if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
auto* random_cf =
column_families_[thread->rand.Next() % FLAGS_column_families];
rocksdb::ColumnFamilyMetaData cf_meta_data;
db_->GetColumnFamilyMetaData(random_cf, &cf_meta_data);
// Randomly compact up to three consecutive files from a level
const int kMaxRetry = 3;
for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
size_t random_level = thread->rand.Uniform(
static_cast<int>(cf_meta_data.levels.size()));
const auto& files = cf_meta_data.levels[random_level].files;
if (files.size() > 0) {
size_t random_file_index =
thread->rand.Uniform(static_cast<int>(files.size()));
if (files[random_file_index].being_compacted) {
// Retry as the selected file is currently being compacted
continue;
}
int rand_column_family = thread->rand.Next() % FLAGS_column_families;
ColumnFamilyHandle* column_family = column_families_[rand_column_family];
std::vector<std::string> input_files;
input_files.push_back(files[random_file_index].name);
if (random_file_index > 0 &&
!files[random_file_index - 1].being_compacted) {
input_files.push_back(files[random_file_index - 1].name);
}
if (random_file_index + 1 < files.size() &&
!files[random_file_index + 1].being_compacted) {
input_files.push_back(files[random_file_index + 1].name);
if (thread->rand.OneInOpt(FLAGS_compact_files_one_in)) {
TestCompactFiles(thread, column_family);
}
size_t output_level =
std::min(random_level + 1, cf_meta_data.levels.size() - 1);
auto s =
db_->CompactFiles(CompactionOptions(), random_cf, input_files,
static_cast<int>(output_level));
if (!s.ok()) {
fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
s.ToString().c_str());
thread->stats.AddNumCompactFilesFailed(1);
} else {
thread->stats.AddNumCompactFilesSucceed(1);
}
break;
}
}
}
#endif // !ROCKSDB_LITE
int64_t rand_key = GenerateOneKey(thread, i);
int rand_column_family = thread->rand.Next() % FLAGS_column_families;
std::string keystr = Key(rand_key);
Slice key = keystr;
std::unique_ptr<MutexLock> lock;
@ -579,8 +535,6 @@ void StressTest::OperateDb(ThreadState* thread) {
shared->GetMutexForKey(rand_column_family, rand_key)));
}
ColumnFamilyHandle* column_family = column_families_[rand_column_family];
if (thread->rand.OneInOpt(FLAGS_compact_range_one_in)) {
TestCompactRange(thread, rand_key, key, column_family);
if (thread->shared->HasVerificationFailedYet()) {
@ -592,12 +546,7 @@ void StressTest::OperateDb(ThreadState* thread) {
GenerateColumnFamilies(FLAGS_column_families, rand_column_family);
if (thread->rand.OneInOpt(FLAGS_flush_one_in)) {
FlushOptions flush_opts;
std::vector<ColumnFamilyHandle*> cfhs;
std::for_each(
rand_column_families.begin(), rand_column_families.end(),
[this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
Status status = db_->Flush(flush_opts, cfhs);
Status status = TestFlush(rand_column_families);
if (!status.ok()) {
fprintf(stdout, "Unable to perform Flush(): %s\n",
status.ToString().c_str());
@ -605,23 +554,10 @@ void StressTest::OperateDb(ThreadState* thread) {
}
if (thread->rand.OneInOpt(FLAGS_pause_background_one_in)) {
Status status = db_->PauseBackgroundWork();
if (!status.ok()) {
VerificationAbort(shared, "PauseBackgroundWork status not OK",
status);
}
// To avoid stalling/deadlocking ourself in this thread, just
// sleep here during pause and let other threads do db operations.
// Sleep up to ~16 seconds (2**24 microseconds), but very skewed
// toward short pause. (1 chance in 25 of pausing >= 1s;
// 1 chance in 625 of pausing full 16s.)
int pwr2_micros =
std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
FLAGS_env->SleepForMicroseconds(1 << pwr2_micros);
status = db_->ContinueBackgroundWork();
Status status = TestPauseBackground(thread);
if (!status.ok()) {
VerificationAbort(shared, "ContinueBackgroundWork status not OK",
status);
VerificationAbort(
shared, "Pause/ContinueBackgroundWork status not OK", status);
}
}
@ -656,73 +592,14 @@ void StressTest::OperateDb(ThreadState* thread) {
}
if (thread->rand.OneInOpt(FLAGS_acquire_snapshot_one_in)) {
#ifndef ROCKSDB_LITE
auto db_impl = reinterpret_cast<DBImpl*>(db_->GetRootDB());
const bool ww_snapshot = thread->rand.OneIn(10);
const Snapshot* snapshot =
ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
: db_->GetSnapshot();
#else
const Snapshot* snapshot = db_->GetSnapshot();
#endif // !ROCKSDB_LITE
ReadOptions ropt;
ropt.snapshot = snapshot;
std::string value_at;
// When taking a snapshot, we also read a key from that snapshot. We
// will later read the same key before releasing the snapshot and
// verify that the results are the same.
auto status_at = db_->Get(ropt, column_family, key, &value_at);
std::vector<bool>* key_vec = nullptr;
if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
key_vec = new std::vector<bool>(FLAGS_max_key);
// When `prefix_extractor` is set, seeking to beginning and scanning
// across prefixes are only supported with `total_order_seek` set.
ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
uint64_t key_val;
if (GetIntVal(iterator->key().ToString(), &key_val)) {
(*key_vec)[key_val] = true;
}
}
TestAcquireSnapshot(thread, rand_column_family, keystr, i);
}
ThreadState::SnapshotState snap_state = {
snapshot, rand_column_family, column_family->GetName(),
keystr, status_at, value_at,
key_vec};
uint64_t hold_for = FLAGS_snapshot_hold_ops;
if (FLAGS_long_running_snapshots) {
// Hold 10% of snapshots for 10x more
if (thread->rand.OneIn(10)) {
assert(hold_for < port::kMaxInt64 / 10);
hold_for *= 10;
// Hold 1% of snapshots for 100x more
if (thread->rand.OneIn(10)) {
assert(hold_for < port::kMaxInt64 / 10);
hold_for *= 10;
}
}
}
uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
thread->snapshot_queue.emplace(release_at, snap_state);
}
while (!thread->snapshot_queue.empty() &&
i >= thread->snapshot_queue.front().first) {
auto snap_state = thread->snapshot_queue.front().second;
assert(snap_state.snapshot);
// Note: this is unsafe as the cf might be dropped concurrently. But
// it is ok since unclean cf drop is cunnrently not supported by write
// prepared transactions.
Status s =
AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
/*always*/ {
Status s = MaybeReleaseSnapshots(thread, i);
if (!s.ok()) {
VerificationAbort(shared, "Snapshot gave inconsistent state", s);
}
db_->ReleaseSnapshot(snap_state.snapshot);
delete snap_state.key_vec;
thread->snapshot_queue.pop();
}
int prob_op = thread->rand.Uniform(100);
@ -784,8 +661,7 @@ void StressTest::OperateDb(ThreadState* thread) {
uint32_t tid = thread->tid;
assert(secondaries_.empty() ||
static_cast<size_t>(tid) < secondaries_.size());
if (FLAGS_secondary_catch_up_one_in > 0 &&
thread->rand.Uniform(FLAGS_secondary_catch_up_one_in) == 0) {
if (thread->rand.OneInOpt(FLAGS_secondary_catch_up_one_in)) {
Status s = secondaries_[tid]->TryCatchUpWithPrimary();
if (!s.ok()) {
VerificationAbort(shared, "Secondary instance failed to catch up", s);
@ -1134,6 +1010,15 @@ Status StressTest::TestCheckpoint(
"TestCheckpoint\n");
std::terminate();
}
void StressTest::TestCompactFiles(ThreadState* /* thread */,
ColumnFamilyHandle* /* column_family */) {
assert(false);
fprintf(stderr,
"RocksDB lite does not support "
"CompactFiles\n");
std::terminate();
}
#else // ROCKSDB_LITE
Status StressTest::TestBackupRestore(
ThreadState* thread, const std::vector<int>& rand_column_families,
@ -1298,8 +1183,156 @@ Status StressTest::TestCheckpoint(ThreadState* thread,
}
return s;
}
void StressTest::TestCompactFiles(ThreadState* thread,
ColumnFamilyHandle* column_family) {
rocksdb::ColumnFamilyMetaData cf_meta_data;
db_->GetColumnFamilyMetaData(column_family, &cf_meta_data);
// Randomly compact up to three consecutive files from a level
const int kMaxRetry = 3;
for (int attempt = 0; attempt < kMaxRetry; ++attempt) {
size_t random_level =
thread->rand.Uniform(static_cast<int>(cf_meta_data.levels.size()));
const auto& files = cf_meta_data.levels[random_level].files;
if (files.size() > 0) {
size_t random_file_index =
thread->rand.Uniform(static_cast<int>(files.size()));
if (files[random_file_index].being_compacted) {
// Retry as the selected file is currently being compacted
continue;
}
std::vector<std::string> input_files;
input_files.push_back(files[random_file_index].name);
if (random_file_index > 0 &&
!files[random_file_index - 1].being_compacted) {
input_files.push_back(files[random_file_index - 1].name);
}
if (random_file_index + 1 < files.size() &&
!files[random_file_index + 1].being_compacted) {
input_files.push_back(files[random_file_index + 1].name);
}
size_t output_level =
std::min(random_level + 1, cf_meta_data.levels.size() - 1);
auto s = db_->CompactFiles(CompactionOptions(), column_family,
input_files, static_cast<int>(output_level));
if (!s.ok()) {
fprintf(stdout, "Unable to perform CompactFiles(): %s\n",
s.ToString().c_str());
thread->stats.AddNumCompactFilesFailed(1);
} else {
thread->stats.AddNumCompactFilesSucceed(1);
}
break;
}
}
}
#endif // ROCKSDB_LITE
Status StressTest::TestFlush(const std::vector<int>& rand_column_families) {
FlushOptions flush_opts;
std::vector<ColumnFamilyHandle*> cfhs;
std::for_each(rand_column_families.begin(), rand_column_families.end(),
[this, &cfhs](int k) { cfhs.push_back(column_families_[k]); });
return db_->Flush(flush_opts, cfhs);
}
Status StressTest::TestPauseBackground(ThreadState* thread) {
Status status = db_->PauseBackgroundWork();
if (!status.ok()) {
return status;
}
// To avoid stalling/deadlocking ourself in this thread, just
// sleep here during pause and let other threads do db operations.
// Sleep up to ~16 seconds (2**24 microseconds), but very skewed
// toward short pause. (1 chance in 25 of pausing >= 1s;
// 1 chance in 625 of pausing full 16s.)
int pwr2_micros =
std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
FLAGS_env->SleepForMicroseconds(1 << pwr2_micros);
return db_->ContinueBackgroundWork();
}
void StressTest::TestAcquireSnapshot(ThreadState* thread,
int rand_column_family,
const std::string& keystr, uint64_t i) {
Slice key = keystr;
ColumnFamilyHandle* column_family = column_families_[rand_column_family];
#ifndef ROCKSDB_LITE
auto db_impl = reinterpret_cast<DBImpl*>(db_->GetRootDB());
const bool ww_snapshot = thread->rand.OneIn(10);
const Snapshot* snapshot =
ww_snapshot ? db_impl->GetSnapshotForWriteConflictBoundary()
: db_->GetSnapshot();
#else
const Snapshot* snapshot = db_->GetSnapshot();
#endif // !ROCKSDB_LITE
ReadOptions ropt;
ropt.snapshot = snapshot;
std::string value_at;
// When taking a snapshot, we also read a key from that snapshot. We
// will later read the same key before releasing the snapshot and
// verify that the results are the same.
auto status_at = db_->Get(ropt, column_family, key, &value_at);
std::vector<bool>* key_vec = nullptr;
if (FLAGS_compare_full_db_state_snapshot && (thread->tid == 0)) {
key_vec = new std::vector<bool>(FLAGS_max_key);
// When `prefix_extractor` is set, seeking to beginning and scanning
// across prefixes are only supported with `total_order_seek` set.
ropt.total_order_seek = true;
std::unique_ptr<Iterator> iterator(db_->NewIterator(ropt));
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
uint64_t key_val;
if (GetIntVal(iterator->key().ToString(), &key_val)) {
(*key_vec)[key_val] = true;
}
}
}
ThreadState::SnapshotState snap_state = {
snapshot, rand_column_family, column_family->GetName(),
keystr, status_at, value_at,
key_vec};
uint64_t hold_for = FLAGS_snapshot_hold_ops;
if (FLAGS_long_running_snapshots) {
// Hold 10% of snapshots for 10x more
if (thread->rand.OneIn(10)) {
assert(hold_for < port::kMaxInt64 / 10);
hold_for *= 10;
// Hold 1% of snapshots for 100x more
if (thread->rand.OneIn(10)) {
assert(hold_for < port::kMaxInt64 / 10);
hold_for *= 10;
}
}
}
uint64_t release_at = std::min(FLAGS_ops_per_thread - 1, i + hold_for);
thread->snapshot_queue.emplace(release_at, snap_state);
}
Status StressTest::MaybeReleaseSnapshots(ThreadState* thread, uint64_t i) {
while (!thread->snapshot_queue.empty() &&
i >= thread->snapshot_queue.front().first) {
auto snap_state = thread->snapshot_queue.front().second;
assert(snap_state.snapshot);
// Note: this is unsafe as the cf might be dropped concurrently. But
// it is ok since unclean cf drop is cunnrently not supported by write
// prepared transactions.
Status s = AssertSame(db_, column_families_[snap_state.cf_at], snap_state);
db_->ReleaseSnapshot(snap_state.snapshot);
delete snap_state.key_vec;
thread->snapshot_queue.pop();
if (!s.ok()) {
return s;
}
}
return Status::OK();
}
void StressTest::TestCompactRange(ThreadState* thread, int64_t rand_key,
const Slice& start_key,
ColumnFamilyHandle* column_family) {

@ -162,6 +162,17 @@ class StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys);
void TestCompactFiles(ThreadState* thread, ColumnFamilyHandle* column_family);
Status TestFlush(const std::vector<int>& rand_column_families);
Status TestPauseBackground(ThreadState* thread);
void TestAcquireSnapshot(ThreadState* thread, int rand_column_family,
const std::string& keystr, uint64_t i);
Status MaybeReleaseSnapshots(ThreadState* thread, uint64_t i);
void VerificationAbort(SharedState* shared, std::string msg, Status s) const;
void VerificationAbort(SharedState* shared, std::string msg, int cf,

Loading…
Cancel
Save