db_stress snapshot compatibility with reopens

Summary:
- Release all snapshots before crashing and reopening the DB. Without this, we may attempt to release snapshots from an old DB using a new DB. That tripped an assertion.
- Release multiple snapshots in the same operation if needed. Without this, we would sometimes leak snapshots.
Closes https://github.com/facebook/rocksdb/pull/3098

Differential Revision: D6194923

Pulled By: ajkr

fbshipit-source-id: b9c89bcca7ebcbb6c7802c616f9d1175a005aadf
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent b7bc9cc038
commit 4d43c6a6a4
  1. 16
      tools/db_stress.cc

@ -985,6 +985,7 @@ struct ThreadState {
Random rand; // Has different seeds for different threads Random rand; // Has different seeds for different threads
SharedState* shared; SharedState* shared;
Stats stats; Stats stats;
std::queue<std::pair<uint64_t, const Snapshot*> > snapshot_queue;
ThreadState(uint32_t index, SharedState* _shared) ThreadState(uint32_t index, SharedState* _shared)
: tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {} : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {}
@ -1657,7 +1658,6 @@ class StressTest {
const int delRangeBound = delBound + (int)FLAGS_delrangepercent; const int delRangeBound = delBound + (int)FLAGS_delrangepercent;
thread->stats.Start(); thread->stats.Start();
std::queue<std::pair<uint64_t, const Snapshot*> > snapshot_queue;
for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) {
if (thread->shared->HasVerificationFailedYet()) { if (thread->shared->HasVerificationFailedYet()) {
break; break;
@ -1666,6 +1666,10 @@ class StressTest {
{ {
thread->stats.FinishedSingleOp(); thread->stats.FinishedSingleOp();
MutexLock l(thread->shared->GetMutex()); MutexLock l(thread->shared->GetMutex());
while (!thread->snapshot_queue.empty()) {
db_->ReleaseSnapshot(thread->snapshot_queue.front().second);
thread->snapshot_queue.pop();
}
thread->shared->IncVotedReopen(); thread->shared->IncVotedReopen();
if (thread->shared->AllVotedReopen()) { if (thread->shared->AllVotedReopen()) {
thread->shared->GetStressTest()->Reopen(); thread->shared->GetStressTest()->Reopen();
@ -1779,13 +1783,15 @@ class StressTest {
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
if (FLAGS_acquire_snapshot_one_in > 0 && if (FLAGS_acquire_snapshot_one_in > 0 &&
thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) { thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) {
snapshot_queue.emplace( thread->snapshot_queue.emplace(
std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops),
db_->GetSnapshot()); db_->GetSnapshot());
} }
if (!snapshot_queue.empty() && i == snapshot_queue.front().first) { if (!thread->snapshot_queue.empty()) {
db_->ReleaseSnapshot(snapshot_queue.front().second); while (i == thread->snapshot_queue.front().first) {
snapshot_queue.pop(); db_->ReleaseSnapshot(thread->snapshot_queue.front().second);
thread->snapshot_queue.pop();
}
} }
const double completed_ratio = const double completed_ratio =

Loading…
Cancel
Save