From cd2e5cae7f134aae9ded3403e7f4ed5f1a23abd7 Mon Sep 17 00:00:00 2001 From: Maysam Yabandeh Date: Wed, 13 Dec 2017 11:51:12 -0800 Subject: [PATCH] WritePrepared Txn: make db_stress transactional Summary: Add "--use_txn" option to use transactional API in db_stress, default being WRITE_PREPARED policy, which is the main intention of modifying db_stress. It also extend the existing snapshots to verify that before releasing a snapshot a read from it returns the same value as before. Closes https://github.com/facebook/rocksdb/pull/3243 Differential Revision: D6556912 Pulled By: maysamyabandeh fbshipit-source-id: 1ae31465be362d44bd06e635e2e9e49a1da11268 --- tools/db_stress.cc | 218 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 195 insertions(+), 23 deletions(-) diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 592f30027..b8536f33f 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -51,6 +51,8 @@ int main() { #include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" #include "rocksdb/utilities/db_ttl.h" +#include "rocksdb/utilities/transaction.h" +#include "rocksdb/utilities/transaction_db.h" #include "rocksdb/write_batch.h" #include "util/coding.h" #include "util/compression.h" @@ -350,6 +352,10 @@ DEFINE_uint64(rate_limiter_bytes_per_sec, 0, "Set options.rate_limiter value."); DEFINE_bool(rate_limit_bg_reads, false, "Use options.rate_limiter on compaction reads"); +DEFINE_bool(use_txn, false, + "Use TransactionDB. Currently the default write policy is " + "TxnDBWritePolicy::WRITE_PREPARED"); + // Temporarily disable this to allows it to detect new bugs DEFINE_int32(compact_files_one_in, 0, "If non-zero, then CompactFiles() will be called one for every N " @@ -981,11 +987,22 @@ const uint32_t SharedState::SENTINEL = 0xffffffff; // Per-thread state for concurrent executions of the same benchmark. struct ThreadState { - uint32_t tid; // 0..n-1 - Random rand; // Has different seeds for different threads + uint32_t tid; // 0..n-1 + Random rand; // Has different seeds for different threads SharedState* shared; Stats stats; - std::queue > snapshot_queue; + struct SnapshotState { + const Snapshot* snapshot; + // The cf from which we did a Get at this stapshot + int cf_at; + // The key with which we did a Get at this stapshot + std::string key; + // The status of the Get + Status status; + // The value of the Get + std::string value; + }; + std::queue > snapshot_queue; ThreadState(uint32_t index, SharedState* _shared) : tid(index), rand(1000 + index + _shared->GetSeed()), shared(_shared) {} @@ -1109,6 +1126,9 @@ class StressTest { : NewBloomFilterPolicy(FLAGS_bloom_bits, false) : nullptr), db_(nullptr), +#ifndef ROCKSDB_LITE + txn_db_(nullptr), +#endif new_column_family_name_(1), num_times_reopened_(0) { if (FLAGS_destroy_db_initially) { @@ -1316,6 +1336,32 @@ class StressTest { } private: + Status AssertSame(DB* db, ColumnFamilyHandle* cf, + ThreadState::SnapshotState& snap_state) { + ReadOptions ropt; + ropt.snapshot = snap_state.snapshot; + PinnableSlice exp_v(&snap_state.value); + exp_v.PinSelf(); + Status s; + PinnableSlice v; + s = db->Get(ropt, cf, snap_state.key, &v); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (snap_state.status != s) { + return Status::Corruption("The snapshot gave inconsistent results: (" + + snap_state.status.ToString() + ") vs. (" + + s.ToString() + ")"); + } + if (s.ok()) { + if (exp_v != v) { + return Status::Corruption("The snapshot gave inconsistent values: (" + + exp_v.ToString() + ") vs. (" + v.ToString() + + ")"); + } + } + return Status::OK(); + } static void ThreadBody(void* v) { ThreadState* thread = reinterpret_cast(v); @@ -1641,6 +1687,32 @@ class StressTest { return db_->SetOptions(cfh, opts); } +#ifndef ROCKSDB_LITE + Status NewTxn(WriteOptions& write_opts, Transaction** txn) { + if (!FLAGS_use_txn) { + return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); + } + static std::atomic txn_id = {0}; + TransactionOptions txn_options; + *txn = txn_db_->BeginTransaction(write_opts, txn_options); + auto istr = std::to_string(txn_id.fetch_add(1)); + Status s = (*txn)->SetName("xid" + istr); + return s; + } + + Status CommitTxn(Transaction* txn) { + if (!FLAGS_use_txn) { + return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); + } + Status s = txn->Prepare(); + if (s.ok()) { + s = txn->Commit(); + } + delete txn; + return s; + } +#endif + void OperateDb(ThreadState* thread) { ReadOptions read_opts(FLAGS_verify_checksum, true); WriteOptions write_opts; @@ -1667,7 +1739,8 @@ class StressTest { thread->stats.FinishedSingleOp(); MutexLock l(thread->shared->GetMutex()); while (!thread->snapshot_queue.empty()) { - db_->ReleaseSnapshot(thread->snapshot_queue.front().second); + db_->ReleaseSnapshot( + thread->snapshot_queue.front().second.snapshot); thread->snapshot_queue.pop(); } thread->shared->IncVotedReopen(); @@ -1781,19 +1854,6 @@ class StressTest { } } #endif // !ROCKSDB_LITE - if (FLAGS_acquire_snapshot_one_in > 0 && - thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) { - thread->snapshot_queue.emplace( - std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), - db_->GetSnapshot()); - } - if (!thread->snapshot_queue.empty()) { - while (i == thread->snapshot_queue.front().first) { - db_->ReleaseSnapshot(thread->snapshot_queue.front().second); - thread->snapshot_queue.pop(); - } - } - const double completed_ratio = static_cast(i) / FLAGS_ops_per_thread; const int64_t base_key = static_cast( @@ -1807,8 +1867,38 @@ class StressTest { l.reset(new MutexLock( shared->GetMutexForKey(rand_column_family, rand_key))); } + auto column_family = column_families_[rand_column_family]; + if (FLAGS_acquire_snapshot_one_in > 0 && + thread->rand.Uniform(FLAGS_acquire_snapshot_one_in) == 0) { + auto snapshot = db_->GetSnapshot(); + ReadOptions ropt; + ropt.snapshot = snapshot; + std::string value_at; + // When taking a snapshot, we also read a key from that snapshot. We + // will later read the same key before releasing the snapshot and verify + // that the results are the same. + auto status_at = db_->Get(ropt, column_family, key, &value_at); + ThreadState::SnapshotState snap_state = {snapshot, rand_column_family, + keystr, status_at, value_at}; + thread->snapshot_queue.emplace( + std::min(FLAGS_ops_per_thread - 1, i + FLAGS_snapshot_hold_ops), + snap_state); + } + while (!thread->snapshot_queue.empty() && + i == thread->snapshot_queue.front().first) { + auto snap_state = thread->snapshot_queue.front().second; + assert(snap_state.snapshot); + Status s = + AssertSame(db_, column_families_[snap_state.cf_at], snap_state); + if (!s.ok()) { + VerificationAbort(shared, "Snapshot gave inconsistent state", s); + } + db_->ReleaseSnapshot(snap_state.snapshot); + thread->snapshot_queue.pop(); + } + int prob_op = thread->rand.Uniform(100); if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { // OPERATION read @@ -1885,9 +1975,35 @@ class StressTest { shared->Put(rand_column_family, rand_key, value_base); Status s; if (FLAGS_use_merge) { - s = db_->Merge(write_opts, column_family, key, v); + if (!FLAGS_use_txn) { + s = db_->Merge(write_opts, column_family, key, v); + } else { +#ifndef ROCKSDB_LITE + Transaction* txn; + s = NewTxn(write_opts, &txn); + if (s.ok()) { + s = txn->Merge(column_family, key, v); + if (s.ok()) { + s = CommitTxn(txn); + } + } +#endif + } } else { - s = db_->Put(write_opts, column_family, key, v); + if (!FLAGS_use_txn) { + s = db_->Put(write_opts, column_family, key, v); + } else { +#ifndef ROCKSDB_LITE + Transaction* txn; + s = NewTxn(write_opts, &txn); + if (s.ok()) { + s = txn->Put(column_family, key, v); + if (s.ok()) { + s = CommitTxn(txn); + } + } +#endif + } } if (!s.ok()) { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); @@ -1921,7 +2037,21 @@ class StressTest { // otherwise. if (shared->AllowsOverwrite(rand_column_family, rand_key)) { shared->Delete(rand_column_family, rand_key); - Status s = db_->Delete(write_opts, column_family, key); + Status s; + if (!FLAGS_use_txn) { + s = db_->Delete(write_opts, column_family, key); + } else { +#ifndef ROCKSDB_LITE + Transaction* txn; + s = NewTxn(write_opts, &txn); + if (s.ok()) { + s = txn->Delete(column_family, key); + if (s.ok()) { + s = CommitTxn(txn); + } + } +#endif + } thread->stats.AddDeletes(1); if (!s.ok()) { fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); @@ -1929,7 +2059,21 @@ class StressTest { } } else { shared->SingleDelete(rand_column_family, rand_key); - Status s = db_->SingleDelete(write_opts, column_family, key); + Status s; + if (!FLAGS_use_txn) { + s = db_->SingleDelete(write_opts, column_family, key); + } else { +#ifndef ROCKSDB_LITE + Transaction* txn; + s = NewTxn(write_opts, &txn); + if (s.ok()) { + s = txn->SingleDelete(column_family, key); + if (s.ok()) { + s = CommitTxn(txn); + } + } +#endif + } thread->stats.AddSingleDeletes(1); if (!s.ok()) { fprintf(stderr, "single delete error: %s\n", @@ -2067,6 +2211,12 @@ class StressTest { } } + void VerificationAbort(SharedState* shared, std::string msg, Status s) const { + printf("Verification failed: %s. Status is %s\n", msg.c_str(), + s.ToString().c_str()); + shared->SetVerificationFailure(); + } + void VerificationAbort(SharedState* shared, std::string msg, int cf, int64_t key) const { printf("Verification failed for column family %d key %" PRIi64 ": %s\n", cf, key, @@ -2138,6 +2288,8 @@ class StressTest { void PrintEnv() const { fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); + fprintf(stdout, "TransactionDB : %s\n", + FLAGS_use_txn ? "true" : "false"); fprintf(stdout, "Column families : %d\n", FLAGS_column_families); if (!FLAGS_test_batches_snapshots) { fprintf(stdout, "Clear CFs one in : %d\n", @@ -2210,6 +2362,9 @@ class StressTest { void Open() { assert(db_ == nullptr); +#ifndef ROCKSDB_LITE + assert(txn_db_ == nullptr); +#endif BlockBasedTableOptions block_based_options; block_based_options.block_cache = cache_; block_based_options.block_cache_compressed = compressed_cache_; @@ -2382,8 +2537,19 @@ class StressTest { options_.listeners.emplace_back( new DbStressListener(FLAGS_db, options_.db_paths)); options_.create_missing_column_families = true; - s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, - &column_families_, &db_); + if (!FLAGS_use_txn) { + s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, + &column_families_, &db_); + } else { +#ifndef ROCKSDB_LITE + TransactionDBOptions txn_db_options; + // For the moment it is sufficient to test WRITE_PREPARED policy + txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED; + s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, + cf_descriptors, &column_families_, &txn_db_); + db_ = txn_db_; +#endif + } assert(!s.ok() || column_families_.size() == static_cast(FLAGS_column_families)); } else { @@ -2409,6 +2575,9 @@ class StressTest { column_families_.clear(); delete db_; db_ = nullptr; +#ifndef ROCKSDB_LITE + txn_db_ = nullptr; +#endif num_times_reopened_++; auto now = FLAGS_env->NowMicros(); @@ -2429,6 +2598,9 @@ class StressTest { std::shared_ptr compressed_cache_; std::shared_ptr filter_policy_; DB* db_; +#ifndef ROCKSDB_LITE + TransactionDB* txn_db_; +#endif Options options_; std::vector column_families_; std::vector column_family_names_;