From 0a0860a5fb85cb3a5565cb86beb78cb5e20ac000 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 7 Jun 2018 10:34:56 -0700 Subject: [PATCH] Refactoring db_stress.cc (#3902) Summary: We use `db_stress.cc` intensively to test and verify the behavior of RocksDB. Sometimes we need to add new tests for recently added features. Original `StressTest` class provides many general functionality that can be leveraged by other tests. Therefore, in this refactoring PR, I try to identify the general operations as well as operations that future tests most likely want to customize. Future tests can inherit `StressTest` and overriding the virtual functions to test custom logic. Closes https://github.com/facebook/rocksdb/pull/3902 Differential Revision: D8284607 Pulled By: riversand963 fbshipit-source-id: 019302d04665a2b18334b6d05d04a477168c8ea4 --- tools/db_stress.cc | 1825 +++++++++++++++++++++++--------------------- 1 file changed, 975 insertions(+), 850 deletions(-) diff --git a/tools/db_stress.cc b/tools/db_stress.cc index ff1abaebf..3866a754c 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1306,7 +1306,7 @@ class StressTest { } } - ~StressTest() { + virtual ~StressTest() { for (auto cf : column_families_) { delete cf; } @@ -1503,39 +1503,7 @@ class StressTest { return true; } - private: - Status AssertSame(DB* db, ColumnFamilyHandle* cf, - ThreadState::SnapshotState& snap_state) { - Status s; - if (cf->GetName() != snap_state.cf_at_name) { - return s; - } - ReadOptions ropt; - ropt.snapshot = snap_state.snapshot; - PinnableSlice exp_v(&snap_state.value); - exp_v.PinSelf(); - PinnableSlice v; - s = db->Get(ropt, cf, snap_state.key, &v); - if (!s.ok() && !s.IsNotFound()) { - return s; - } - if (snap_state.status != s) { - return Status::Corruption( - "The snapshot gave inconsistent results for key " + - ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) + - " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() + - ") vs. (" + s.ToString() + ")"); - } - if (s.ok()) { - if (exp_v != v) { - return Status::Corruption("The snapshot gave inconsistent values: (" + - exp_v.ToString() + ") vs. (" + v.ToString() + - ")"); - } - } - return Status::OK(); - } - + protected: static void ThreadBody(void* v) { ThreadState* thread = reinterpret_cast(v); SharedState* shared = thread->shared; @@ -1566,9 +1534,7 @@ class StressTest { } } - if (!FLAGS_test_batches_snapshots) { - thread->shared->GetStressTest()->VerifyDb(thread); - } + thread->shared->GetStressTest()->VerifyDb(thread); { MutexLock l(shared->GetMutex()); @@ -1577,7 +1543,6 @@ class StressTest { shared->GetCondVar()->SignalAll(); } } - } static void PoolSizeChangeThread(void* v) { @@ -1612,298 +1577,138 @@ class StressTest { } } - // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ... - // ("9"+K, "9"+V) in DB atomically i.e in a single batch. - // Also refer MultiGet. - Status MultiPut(ThreadState* thread, const WriteOptions& writeoptions, - ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value, size_t sz) { - std::string keys[10] = {"9", "8", "7", "6", "5", - "4", "3", "2", "1", "0"}; - std::string values[10] = {"9", "8", "7", "6", "5", - "4", "3", "2", "1", "0"}; - Slice value_slices[10]; - WriteBatch batch; - Status s; - for (int i = 0; i < 10; i++) { - keys[i] += key.ToString(); - values[i] += value.ToString(); - value_slices[i] = values[i]; - if (FLAGS_use_merge) { - batch.Merge(column_family, keys[i], value_slices[i]); - } else { - batch.Put(column_family, keys[i], value_slices[i]); - } + static void PrintKeyValue(int cf, uint64_t key, const char* value, + size_t sz) { + if (!FLAGS_verbose) { + return; } - - s = db_->Write(writeoptions, &batch); - if (!s.ok()) { - fprintf(stderr, "multiput error: %s\n", s.ToString().c_str()); - thread->stats.AddErrors(1); - } else { - // we did 10 writes each of size sz + 1 - thread->stats.AddBytesForWrites(10, (sz + 1) * 10); + fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") ", cf, key, sz); + for (size_t i = 0; i < sz; i++) { + fprintf(stdout, "%X", value[i]); } + fprintf(stdout, "\n"); + } - return s; + static int64_t GenerateOneKey(ThreadState* thread, uint64_t iteration) { + const double completed_ratio = + static_cast(iteration) / FLAGS_ops_per_thread; + const int64_t base_key = static_cast( + completed_ratio * (FLAGS_max_key - FLAGS_active_width)); + return base_key + thread->rand.Next() % FLAGS_active_width; } - // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K) - // in DB atomically i.e in a single batch. Also refer MultiGet. - Status MultiDelete(ThreadState* thread, const WriteOptions& writeoptions, - ColumnFamilyHandle* column_family, const Slice& key) { - std::string keys[10] = {"9", "7", "5", "3", "1", - "8", "6", "4", "2", "0"}; + static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) { + size_t value_sz = + ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult; + assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t)); + (void) max_sz; + *((uint32_t*)v) = rand; + for (size_t i=sizeof(uint32_t); i < value_sz; i++) { + v[i] = (char)(rand ^ i); + } + v[value_sz] = '\0'; + return value_sz; // the size of the value set. + } - WriteBatch batch; + Status AssertSame(DB* db, ColumnFamilyHandle* cf, + ThreadState::SnapshotState& snap_state) { Status s; - for (int i = 0; i < 10; i++) { - keys[i] += key.ToString(); - batch.Delete(column_family, keys[i]); + if (cf->GetName() != snap_state.cf_at_name) { + return s; + } + ReadOptions ropt; + ropt.snapshot = snap_state.snapshot; + PinnableSlice exp_v(&snap_state.value); + exp_v.PinSelf(); + PinnableSlice v; + s = db->Get(ropt, cf, snap_state.key, &v); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (snap_state.status != s) { + return Status::Corruption( + "The snapshot gave inconsistent results for key " + + ToString(Hash(snap_state.key.c_str(), snap_state.key.size(), 0)) + + " in cf " + cf->GetName() + ": (" + snap_state.status.ToString() + + ") vs. (" + s.ToString() + ")"); + } + if (s.ok()) { + if (exp_v != v) { + return Status::Corruption("The snapshot gave inconsistent values: (" + + exp_v.ToString() + ") vs. (" + v.ToString() + + ")"); + } } + return Status::OK(); + } - s = db_->Write(writeoptions, &batch); - if (!s.ok()) { - fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str()); - thread->stats.AddErrors(1); + Status SetOptions(ThreadState* thread) { + assert(FLAGS_set_options_one_in > 0); + std::unordered_map opts; + std::string name = options_index_[ + thread->rand.Next() % options_index_.size()]; + int value_idx = thread->rand.Next() % options_table_[name].size(); + if (name == "soft_rate_limit" || name == "hard_rate_limit") { + opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx]; + opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx]; + } else if (name == "level0_file_num_compaction_trigger" || + name == "level0_slowdown_writes_trigger" || + name == "level0_stop_writes_trigger") { + opts["level0_file_num_compaction_trigger"] = + options_table_["level0_file_num_compaction_trigger"][value_idx]; + opts["level0_slowdown_writes_trigger"] = + options_table_["level0_slowdown_writes_trigger"][value_idx]; + opts["level0_stop_writes_trigger"] = + options_table_["level0_stop_writes_trigger"][value_idx]; } else { - thread->stats.AddDeletes(10); + opts[name] = options_table_[name][value_idx]; } - return s; + int rand_cf_idx = thread->rand.Next() % FLAGS_column_families; + auto cfh = column_families_[rand_cf_idx]; + return db_->SetOptions(cfh, opts); } - // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K - // in the same snapshot, and verifies that all the values are of the form - // "0"+V, "1"+V,..."9"+V. - // ASSUMES that MultiPut was used to put (K, V) into the DB. - Status MultiGet(ThreadState* thread, const ReadOptions& readoptions, - ColumnFamilyHandle* column_family, const Slice& key, - std::string* value) { - std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; - Slice key_slices[10]; - std::string values[10]; - ReadOptions readoptionscopy = readoptions; - readoptionscopy.snapshot = db_->GetSnapshot(); - Status s; - for (int i = 0; i < 10; i++) { - keys[i] += key.ToString(); - key_slices[i] = keys[i]; - s = db_->Get(readoptionscopy, column_family, key_slices[i], value); - if (!s.ok() && !s.IsNotFound()) { - fprintf(stderr, "get error: %s\n", s.ToString().c_str()); - values[i] = ""; - thread->stats.AddErrors(1); - // we continue after error rather than exiting so that we can - // find more errors if any - } else if (s.IsNotFound()) { - values[i] = ""; - thread->stats.AddGets(1, 0); - } else { - values[i] = *value; - - char expected_prefix = (keys[i])[0]; - char actual_prefix = (values[i])[0]; - if (actual_prefix != expected_prefix) { - fprintf(stderr, "error expected prefix = %c actual = %c\n", - expected_prefix, actual_prefix); - } - (values[i])[0] = ' '; // blank out the differing character - thread->stats.AddGets(1, 1); - } +#ifndef ROCKSDB_LITE + Status NewTxn(WriteOptions& write_opts, Transaction** txn) { + if (!FLAGS_use_txn) { + return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); } - db_->ReleaseSnapshot(readoptionscopy.snapshot); + static std::atomic txn_id = {0}; + TransactionOptions txn_options; + *txn = txn_db_->BeginTransaction(write_opts, txn_options); + auto istr = std::to_string(txn_id.fetch_add(1)); + Status s = (*txn)->SetName("xid" + istr); + return s; + } - // Now that we retrieved all values, check that they all match - for (int i = 1; i < 10; i++) { - if (values[i] != values[0]) { - fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n", - key.ToString(true).c_str(), StringToHex(values[0]).c_str(), - StringToHex(values[i]).c_str()); - // we continue after error rather than exiting so that we can - // find more errors if any - } + Status CommitTxn(Transaction* txn) { + if (!FLAGS_use_txn) { + return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); } - + Status s = txn->Prepare(); + if (s.ok()) { + s = txn->Commit(); + } + delete txn; return s; } +#endif - // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P - // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes - // of the key. Each of these 10 scans returns a series of values; - // each series should be the same length, and it is verified for each - // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V. - // ASSUMES that MultiPut was used to put (K, V) - Status MultiPrefixScan(ThreadState* thread, const ReadOptions& readoptions, - ColumnFamilyHandle* column_family, - const Slice& key) { - std::string prefixes[10] = {"0", "1", "2", "3", "4", - "5", "6", "7", "8", "9"}; - Slice prefix_slices[10]; - ReadOptions readoptionscopy[10]; - const Snapshot* snapshot = db_->GetSnapshot(); - Iterator* iters[10]; - Status s = Status::OK(); - for (int i = 0; i < 10; i++) { - prefixes[i] += key.ToString(); - prefixes[i].resize(FLAGS_prefix_size); - prefix_slices[i] = Slice(prefixes[i]); - readoptionscopy[i] = readoptions; - readoptionscopy[i].snapshot = snapshot; - iters[i] = db_->NewIterator(readoptionscopy[i], column_family); - iters[i]->Seek(prefix_slices[i]); + virtual void OperateDb(ThreadState* thread) { + ReadOptions read_opts(FLAGS_verify_checksum, true); + WriteOptions write_opts; + auto shared = thread->shared; + char value[100]; + std::string from_db; + if (FLAGS_sync) { + write_opts.sync = true; } - - int count = 0; - while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) { - count++; - std::string values[10]; - // get list of all values for this iteration - for (int i = 0; i < 10; i++) { - // no iterator should finish before the first one - assert(iters[i]->Valid() && - iters[i]->key().starts_with(prefix_slices[i])); - values[i] = iters[i]->value().ToString(); - - char expected_first = (prefixes[i])[0]; - char actual_first = (values[i])[0]; - - if (actual_first != expected_first) { - fprintf(stderr, "error expected first = %c actual = %c\n", - expected_first, actual_first); - } - (values[i])[0] = ' '; // blank out the differing character - } - // make sure all values are equivalent - for (int i = 0; i < 10; i++) { - if (values[i] != values[0]) { - fprintf(stderr, "error : %d, inconsistent values for prefix %s: %s, %s\n", - i, prefixes[i].c_str(), StringToHex(values[0]).c_str(), - StringToHex(values[i]).c_str()); - // we continue after error rather than exiting so that we can - // find more errors if any - } - iters[i]->Next(); - } - } - - // cleanup iterators and snapshot - for (int i = 0; i < 10; i++) { - // if the first iterator finished, they should have all finished - assert(!iters[i]->Valid() || - !iters[i]->key().starts_with(prefix_slices[i])); - assert(iters[i]->status().ok()); - delete iters[i]; - } - db_->ReleaseSnapshot(snapshot); - - if (s.ok()) { - thread->stats.AddPrefixes(1, count); - } else { - thread->stats.AddErrors(1); - } - - return s; - } - - // Given a key K, this creates an iterator which scans to K and then - // does a random sequence of Next/Prev operations. - Status MultiIterate(ThreadState* thread, const ReadOptions& readoptions, - ColumnFamilyHandle* column_family, const Slice& key) { - Status s; - const Snapshot* snapshot = db_->GetSnapshot(); - ReadOptions readoptionscopy = readoptions; - readoptionscopy.snapshot = snapshot; - unique_ptr iter(db_->NewIterator(readoptionscopy, column_family)); - - iter->Seek(key); - for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) { - if (thread->rand.OneIn(2)) { - iter->Next(); - } else { - iter->Prev(); - } - } - - if (s.ok()) { - thread->stats.AddIterations(1); - } else { - thread->stats.AddErrors(1); - } - - db_->ReleaseSnapshot(snapshot); - - return s; - } - - Status SetOptions(ThreadState* thread) { - assert(FLAGS_set_options_one_in > 0); - std::unordered_map opts; - std::string name = options_index_[ - thread->rand.Next() % options_index_.size()]; - int value_idx = thread->rand.Next() % options_table_[name].size(); - if (name == "soft_rate_limit" || name == "hard_rate_limit") { - opts["soft_rate_limit"] = options_table_["soft_rate_limit"][value_idx]; - opts["hard_rate_limit"] = options_table_["hard_rate_limit"][value_idx]; - } else if (name == "level0_file_num_compaction_trigger" || - name == "level0_slowdown_writes_trigger" || - name == "level0_stop_writes_trigger") { - opts["level0_file_num_compaction_trigger"] = - options_table_["level0_file_num_compaction_trigger"][value_idx]; - opts["level0_slowdown_writes_trigger"] = - options_table_["level0_slowdown_writes_trigger"][value_idx]; - opts["level0_stop_writes_trigger"] = - options_table_["level0_stop_writes_trigger"][value_idx]; - } else { - opts[name] = options_table_[name][value_idx]; - } - - int rand_cf_idx = thread->rand.Next() % FLAGS_column_families; - auto cfh = column_families_[rand_cf_idx]; - return db_->SetOptions(cfh, opts); - } - -#ifndef ROCKSDB_LITE - Status NewTxn(WriteOptions& write_opts, Transaction** txn) { - if (!FLAGS_use_txn) { - return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); - } - static std::atomic txn_id = {0}; - TransactionOptions txn_options; - *txn = txn_db_->BeginTransaction(write_opts, txn_options); - auto istr = std::to_string(txn_id.fetch_add(1)); - Status s = (*txn)->SetName("xid" + istr); - return s; - } - - Status CommitTxn(Transaction* txn) { - if (!FLAGS_use_txn) { - return Status::InvalidArgument("CommitTxn when FLAGS_use_txn is not set"); - } - Status s = txn->Prepare(); - if (s.ok()) { - s = txn->Commit(); - } - delete txn; - return s; - } -#endif - - void OperateDb(ThreadState* thread) { - ReadOptions read_opts(FLAGS_verify_checksum, true); - WriteOptions write_opts; - auto shared = thread->shared; - char value[100]; - auto max_key = thread->shared->GetMaxKey(); - std::string from_db; - if (FLAGS_sync) { - write_opts.sync = true; - } - write_opts.disableWAL = FLAGS_disable_wal; - const int prefixBound = (int)FLAGS_readpercent + (int)FLAGS_prefixpercent; - const int writeBound = prefixBound + (int)FLAGS_writepercent; - const int delBound = writeBound + (int)FLAGS_delpercent; - const int delRangeBound = delBound + (int)FLAGS_delrangepercent; + write_opts.disableWAL = FLAGS_disable_wal; + const int prefixBound = (int)FLAGS_readpercent + (int)FLAGS_prefixpercent; + const int writeBound = prefixBound + (int)FLAGS_writepercent; + const int delBound = writeBound + (int)FLAGS_delpercent; + const int delRangeBound = delBound + (int)FLAGS_delrangepercent; thread->stats.Start(); for (uint64_t i = 0; i < FLAGS_ops_per_thread; i++) { @@ -1943,41 +1748,7 @@ class StressTest { options_.inplace_update_support ^= options_.inplace_update_support; } - if (!FLAGS_test_batches_snapshots && - FLAGS_clear_column_family_one_in != 0 && FLAGS_column_families > 1) { - if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) { - // drop column family and then create it again (can't drop default) - int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1; - std::string new_name = - ToString(new_column_family_name_.fetch_add(1)); - { - MutexLock l(thread->shared->GetMutex()); - fprintf( - stdout, - "[CF %d] Dropping and recreating column family. new name: %s\n", - cf, new_name.c_str()); - } - thread->shared->LockColumnFamily(cf); - Status s __attribute__((__unused__)); - s = db_->DropColumnFamily(column_families_[cf]); - delete column_families_[cf]; - if (!s.ok()) { - fprintf(stderr, "dropping column family error: %s\n", - s.ToString().c_str()); - std::terminate(); - } - s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name, - &column_families_[cf]); - column_family_names_[cf] = new_name; - thread->shared->ClearColumnFamily(cf); - if (!s.ok()) { - fprintf(stderr, "creating column family error: %s\n", - s.ToString().c_str()); - std::terminate(); - } - thread->shared->UnlockColumnFamily(cf); - } - } + MaybeClearOneColumnFamily(thread); #ifndef ROCKSDB_LITE // Lite does not support GetColumnFamilyMetaData if (FLAGS_compact_files_one_in > 0 && @@ -2030,17 +1801,13 @@ class StressTest { } } #endif // !ROCKSDB_LITE - const double completed_ratio = - static_cast(i) / FLAGS_ops_per_thread; - const int64_t base_key = static_cast( - completed_ratio * (FLAGS_max_key - FLAGS_active_width)); - int64_t rand_key = base_key + thread->rand.Next() % FLAGS_active_width; + int64_t rand_key = GenerateOneKey(thread, i); int rand_column_family = thread->rand.Next() % FLAGS_column_families; std::string keystr = Key(rand_key); Slice key = keystr; - std::unique_ptr l; - if (!FLAGS_test_batches_snapshots) { - l.reset(new MutexLock( + std::unique_ptr lock; + if (ShouldAcquireMutexOnKey()) { + lock.reset(new MutexLock( shared->GetMutexForKey(rand_column_family, rand_key))); } @@ -2064,7 +1831,7 @@ class StressTest { snap_state); } while (!thread->snapshot_queue.empty() && - i == thread->snapshot_queue.front().first) { + i == thread->snapshot_queue.front().first) { auto snap_state = thread->snapshot_queue.front().second; assert(snap_state.snapshot); // Note: this is unsafe as the cf might be dropped concurrently. But it @@ -2082,245 +1849,28 @@ class StressTest { int prob_op = thread->rand.Uniform(100); if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) { // OPERATION read - if (!FLAGS_test_batches_snapshots) { - Status s = db_->Get(read_opts, column_family, key, &from_db); - if (s.ok()) { - // found case - thread->stats.AddGets(1, 1); - } else if (s.IsNotFound()) { - // not found case - thread->stats.AddGets(1, 0); - } else { - // errors case - thread->stats.AddErrors(1); - } - } else { - MultiGet(thread, read_opts, column_family, key, &from_db); - } + TestGet(thread, read_opts, {rand_column_family}, {rand_key}); } else if ((int)FLAGS_readpercent <= prob_op && prob_op < prefixBound) { // OPERATION prefix scan // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are // (8 - FLAGS_prefix_size) bytes besides the prefix. So there will // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same // prefix - if (!FLAGS_test_batches_snapshots) { - Slice prefix = Slice(key.data(), FLAGS_prefix_size); - Iterator* iter = db_->NewIterator(read_opts, column_family); - int64_t count = 0; - for (iter->Seek(prefix); - iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { - ++count; - } - assert(count <= - (static_cast(1) << ((8 - FLAGS_prefix_size) * 8))); - if (iter->status().ok()) { - thread->stats.AddPrefixes(1, static_cast(count)); - } else { - thread->stats.AddErrors(1); - } - delete iter; - } else { - MultiPrefixScan(thread, read_opts, column_family, key); - } + TestPrefixScan(thread, read_opts, {rand_column_family}, {rand_key}); } else if (prefixBound <= prob_op && prob_op < writeBound) { // OPERATION write - uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; - size_t sz = GenerateValue(value_base, value, sizeof(value)); - Slice v(value, sz); - if (!FLAGS_test_batches_snapshots) { - // If the chosen key does not allow overwrite and it already - // exists, choose another key. Also avoid using merge operands in - // no-overwrite positions (where single delete will be called later), - // as those features have undefined behavior when used together. - while (!shared->AllowsOverwrite(rand_column_family, rand_key) && - (FLAGS_use_merge || - shared->Exists(rand_column_family, rand_key))) { - l.reset(); - rand_key = thread->rand.Next() % max_key; - rand_column_family = thread->rand.Next() % FLAGS_column_families; - l.reset(new MutexLock( - shared->GetMutexForKey(rand_column_family, rand_key))); - } - - keystr = Key(rand_key); - key = keystr; - column_family = column_families_[rand_column_family]; - - if (FLAGS_verify_before_write) { - std::string keystr2 = Key(rand_key); - Slice k = keystr2; - Status s = db_->Get(read_opts, column_family, k, &from_db); - if (!VerifyValue(rand_column_family, rand_key, read_opts, - thread->shared, from_db, s, true)) { - break; - } - } - shared->Put(rand_column_family, rand_key, value_base, - true /* pending */); - Status s; - if (FLAGS_use_merge) { - if (!FLAGS_use_txn) { - s = db_->Merge(write_opts, column_family, key, v); - } else { -#ifndef ROCKSDB_LITE - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->Merge(column_family, key, v); - if (s.ok()) { - s = CommitTxn(txn); - } - } -#endif - } - } else { - if (!FLAGS_use_txn) { - s = db_->Put(write_opts, column_family, key, v); - } else { -#ifndef ROCKSDB_LITE - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->Put(column_family, key, v); - if (s.ok()) { - s = CommitTxn(txn); - } - } -#endif - } - } - shared->Put(rand_column_family, rand_key, value_base, - false /* pending */); - if (!s.ok()) { - fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); - std::terminate(); - } - thread->stats.AddBytesForWrites(1, sz); - } else { - MultiPut(thread, write_opts, column_family, key, v, sz); - } - PrintKeyValue(rand_column_family, static_cast(rand_key), - value, sz); + TestPut(thread, write_opts, read_opts, {rand_column_family}, {rand_key}, + value, lock); } else if (writeBound <= prob_op && prob_op < delBound) { // OPERATION delete - if (!FLAGS_test_batches_snapshots) { - // If the chosen key does not allow overwrite and it does not exist, - // choose another key. - while (!shared->AllowsOverwrite(rand_column_family, rand_key) && - !shared->Exists(rand_column_family, rand_key)) { - l.reset(); - rand_key = thread->rand.Next() % max_key; - rand_column_family = thread->rand.Next() % FLAGS_column_families; - l.reset(new MutexLock( - shared->GetMutexForKey(rand_column_family, rand_key))); - } - - keystr = Key(rand_key); - key = keystr; - column_family = column_families_[rand_column_family]; - - // Use delete if the key may be overwritten and a single deletion - // otherwise. - if (shared->AllowsOverwrite(rand_column_family, rand_key)) { - shared->Delete(rand_column_family, rand_key, true /* pending */); - Status s; - if (!FLAGS_use_txn) { - s = db_->Delete(write_opts, column_family, key); - } else { -#ifndef ROCKSDB_LITE - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->Delete(column_family, key); - if (s.ok()) { - s = CommitTxn(txn); - } - } -#endif - } - shared->Delete(rand_column_family, rand_key, false /* pending */); - thread->stats.AddDeletes(1); - if (!s.ok()) { - fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); - std::terminate(); - } - } else { - shared->SingleDelete(rand_column_family, rand_key, - true /* pending */); - Status s; - if (!FLAGS_use_txn) { - s = db_->SingleDelete(write_opts, column_family, key); - } else { -#ifndef ROCKSDB_LITE - Transaction* txn; - s = NewTxn(write_opts, &txn); - if (s.ok()) { - s = txn->SingleDelete(column_family, key); - if (s.ok()) { - s = CommitTxn(txn); - } - } -#endif - } - shared->SingleDelete(rand_column_family, rand_key, - false /* pending */); - thread->stats.AddSingleDeletes(1); - if (!s.ok()) { - fprintf(stderr, "single delete error: %s\n", - s.ToString().c_str()); - std::terminate(); - } - } - } else { - MultiDelete(thread, write_opts, column_family, key); - } + TestDelete(thread, write_opts, {rand_column_family}, {rand_key}, lock); } else if (delBound <= prob_op && prob_op < delRangeBound) { // OPERATION delete range - if (!FLAGS_test_batches_snapshots) { - std::vector> range_locks; - // delete range does not respect disallowed overwrites. the keys for - // which overwrites are disallowed are randomly distributed so it - // could be expensive to find a range where each key allows - // overwrites. - if (rand_key > max_key - FLAGS_range_deletion_width) { - l.reset(); - rand_key = thread->rand.Next() % - (max_key - FLAGS_range_deletion_width + 1); - range_locks.emplace_back(new MutexLock( - shared->GetMutexForKey(rand_column_family, rand_key))); - } else { - range_locks.emplace_back(std::move(l)); - } - for (int j = 1; j < FLAGS_range_deletion_width; ++j) { - if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) { - range_locks.emplace_back(new MutexLock( - shared->GetMutexForKey(rand_column_family, rand_key + j))); - } - } - shared->DeleteRange(rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width, - true /* pending */); - - keystr = Key(rand_key); - key = keystr; - column_family = column_families_[rand_column_family]; - std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); - Slice end_key = end_keystr; - Status s = db_->DeleteRange(write_opts, column_family, key, end_key); - if (!s.ok()) { - fprintf(stderr, "delete range error: %s\n", - s.ToString().c_str()); - std::terminate(); - } - int covered = shared->DeleteRange( - rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width, false /* pending */); - thread->stats.AddRangeDeletions(1); - thread->stats.AddCoveredByRangeDeletions(covered); - } + TestDeleteRange(thread, write_opts, {rand_column_family}, {rand_key}, + lock); } else { // OPERATION iterate - MultiIterate(thread, read_opts, column_family, key); + TestIterate(thread, read_opts, {rand_column_family}, {rand_key}); } thread->stats.FinishedSingleOp(); } @@ -2328,80 +1878,71 @@ class StressTest { thread->stats.Stop(); } - void VerifyDb(ThreadState* thread) const { - ReadOptions options(FLAGS_verify_checksum, true); - auto shared = thread->shared; - const int64_t max_key = shared->GetMaxKey(); - const int64_t keys_per_thread = max_key / shared->GetNumThreads(); - int64_t start = keys_per_thread * thread->tid; - int64_t end = start + keys_per_thread; - if (thread->tid == shared->GetNumThreads() - 1) { - end = max_key; - } - for (size_t cf = 0; cf < column_families_.size(); ++cf) { - if (thread->shared->HasVerificationFailedYet()) { - break; - } - if (!thread->rand.OneIn(2)) { - // Use iterator to verify this range - unique_ptr iter( - db_->NewIterator(options, column_families_[cf])); - iter->Seek(Key(start)); - for (auto i = start; i < end; i++) { - if (thread->shared->HasVerificationFailedYet()) { - break; - } - // TODO(ljin): update "long" to uint64_t - // Reseek when the prefix changes - if (i % (static_cast(1) << 8 * (8 - FLAGS_prefix_size)) == - 0) { - iter->Seek(Key(i)); - } - std::string from_db; - std::string keystr = Key(i); - Slice k = keystr; - Status s = iter->status(); - if (iter->Valid()) { - if (iter->key().compare(k) > 0) { - s = Status::NotFound(Slice()); - } else if (iter->key().compare(k) == 0) { - from_db = iter->value().ToString(); - iter->Next(); - } else if (iter->key().compare(k) < 0) { - VerificationAbort(shared, "An out of range key was found", - static_cast(cf), i); - } - } else { - // The iterator found no value for the key in question, so do not - // move to the next item in the iterator - s = Status::NotFound(Slice()); - } - VerifyValue(static_cast(cf), i, options, shared, from_db, s, - true); - if (from_db.length()) { - PrintKeyValue(static_cast(cf), static_cast(i), - from_db.data(), from_db.length()); - } - } - } else { - // Use Get to verify this range - for (auto i = start; i < end; i++) { - if (thread->shared->HasVerificationFailedYet()) { - break; - } - std::string from_db; - std::string keystr = Key(i); - Slice k = keystr; - Status s = db_->Get(options, column_families_[cf], k, &from_db); - VerifyValue(static_cast(cf), i, options, shared, from_db, s, - true); - if (from_db.length()) { - PrintKeyValue(static_cast(cf), static_cast(i), - from_db.data(), from_db.length()); - } - } + virtual void VerifyDb(ThreadState* thread) const = 0; + + virtual void MaybeClearOneColumnFamily(ThreadState* /* thread */) {} + + virtual bool ShouldAcquireMutexOnKey() const { return false; } + + virtual Status TestGet(ThreadState* thread, + const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) = 0; + + virtual Status TestPrefixScan(ThreadState* thread, + const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) = 0; + + virtual Status TestPut(ThreadState* thread, + WriteOptions& write_opts, const ReadOptions& read_opts, + const std::vector& cf_ids, const std::vector& keys, + char (&value)[100], std::unique_ptr& lock) = 0; + + virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& lock) = 0; + + virtual Status TestDeleteRange(ThreadState* thread, + WriteOptions& write_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& lock) = 0; + + // Given a key K, this creates an iterator which scans to K and then + // does a random sequence of Next/Prev operations. + virtual Status TestIterate(ThreadState* thread, + const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + Status s; + const Snapshot* snapshot = db_->GetSnapshot(); + ReadOptions readoptionscopy = read_opts; + readoptionscopy.snapshot = snapshot; + auto cfh = column_families_[rand_column_families[0]]; + std::unique_ptr iter(db_->NewIterator(readoptionscopy, cfh)); + + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + iter->Seek(key); + for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) { + if (thread->rand.OneIn(2)) { + iter->Next(); + } else { + iter->Prev(); } } + + if (s.ok()) { + thread->stats.AddIterations(1); + } else { + thread->stats.AddErrors(1); + } + + db_->ReleaseSnapshot(snapshot); + + return s; } void VerificationAbort(SharedState* shared, std::string msg, Status s) const { @@ -2417,71 +1958,6 @@ class StressTest { shared->SetVerificationFailure(); } - bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/, - SharedState* shared, const std::string& value_from_db, - Status s, bool strict = false) const { - if (shared->HasVerificationFailedYet()) { - return false; - } - // compare value_from_db with the value in the shared state - char value[kValueMaxLen]; - uint32_t value_base = shared->Get(cf, key); - if (value_base == SharedState::UNKNOWN_SENTINEL) { - return true; - } - if (value_base == SharedState::DELETION_SENTINEL && !strict) { - return true; - } - - if (s.ok()) { - if (value_base == SharedState::DELETION_SENTINEL) { - VerificationAbort(shared, "Unexpected value found", cf, key); - return false; - } - size_t sz = GenerateValue(value_base, value, sizeof(value)); - if (value_from_db.length() != sz) { - VerificationAbort(shared, "Length of value read is not equal", cf, key); - return false; - } - if (memcmp(value_from_db.data(), value, sz) != 0) { - VerificationAbort(shared, "Contents of value read don't match", cf, - key); - return false; - } - } else { - if (value_base != SharedState::DELETION_SENTINEL) { - VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key); - return false; - } - } - return true; - } - - static void PrintKeyValue(int cf, int64_t key, const char* value, - size_t sz) { - if (!FLAGS_verbose) { - return; - } - fprintf(stdout, "[CF %d] %" PRIi64 " == > (%" ROCKSDB_PRIszt ") ", cf, key, sz); - for (size_t i = 0; i < sz; i++) { - fprintf(stdout, "%X", value[i]); - } - fprintf(stdout, "\n"); - } - - static size_t GenerateValue(uint32_t rand, char *v, size_t max_sz) { - size_t value_sz = - ((rand % kRandomValueMaxFactor) + 1) * FLAGS_value_size_mult; - assert(value_sz <= max_sz && value_sz >= sizeof(uint32_t)); - (void) max_sz; - *((uint32_t*)v) = rand; - for (size_t i=sizeof(uint32_t); i < value_sz; i++) { - v[i] = (char)(rand ^ i); - } - v[value_sz] = '\0'; - return value_sz; // the size of the value set. - } - void PrintEnv() const { fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion, kMinorVersion); @@ -2684,156 +2160,800 @@ class StressTest { #endif // ROCKSDB_LITE } - if (FLAGS_use_full_merge_v1) { - options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator(); + if (FLAGS_use_full_merge_v1) { + options_.merge_operator = MergeOperators::CreateDeprecatedPutOperator(); + } else { + options_.merge_operator = MergeOperators::CreatePutOperator(); + } + + fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); + + Status s; + if (FLAGS_ttl == -1) { + std::vector existing_column_families; + s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db, + &existing_column_families); // ignore errors + if (!s.ok()) { + // DB doesn't exist + assert(existing_column_families.empty()); + assert(column_family_names_.empty()); + column_family_names_.push_back(kDefaultColumnFamilyName); + } else if (column_family_names_.empty()) { + // this is the first call to the function Open() + column_family_names_ = existing_column_families; + } else { + // this is a reopen. just assert that existing column_family_names are + // equivalent to what we remember + auto sorted_cfn = column_family_names_; + std::sort(sorted_cfn.begin(), sorted_cfn.end()); + std::sort(existing_column_families.begin(), + existing_column_families.end()); + if (sorted_cfn != existing_column_families) { + fprintf(stderr, + "Expected column families differ from the existing:\n"); + printf("Expected: {"); + for (auto cf : sorted_cfn) { + printf("%s ", cf.c_str()); + } + printf("}\n"); + printf("Existing: {"); + for (auto cf : existing_column_families) { + printf("%s ", cf.c_str()); + } + printf("}\n"); + } + assert(sorted_cfn == existing_column_families); + } + std::vector cf_descriptors; + for (auto name : column_family_names_) { + if (name != kDefaultColumnFamilyName) { + new_column_family_name_ = + std::max(new_column_family_name_.load(), std::stoi(name) + 1); + } + cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); + } + while (cf_descriptors.size() < (size_t)FLAGS_column_families) { + std::string name = ToString(new_column_family_name_.load()); + new_column_family_name_++; + cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); + column_family_names_.push_back(name); + } + options_.listeners.clear(); + options_.listeners.emplace_back( + new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors)); + options_.create_missing_column_families = true; + if (!FLAGS_use_txn) { + s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, + &column_families_, &db_); + } else { +#ifndef ROCKSDB_LITE + TransactionDBOptions txn_db_options; + // For the moment it is sufficient to test WRITE_PREPARED policy + txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED; + s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, + cf_descriptors, &column_families_, &txn_db_); + db_ = txn_db_; + // after a crash, rollback to commit recovered transactions + std::vector trans; + txn_db_->GetAllPreparedTransactions(&trans); + Random rand(static_cast(FLAGS_seed)); + for (auto txn : trans) { + if (rand.OneIn(2)) { + s = txn->Commit(); + assert(s.ok()); + } else { + s = txn->Rollback(); + assert(s.ok()); + } + delete txn; + } + trans.clear(); + txn_db_->GetAllPreparedTransactions(&trans); + assert(trans.size() == 0); +#endif + } + assert(!s.ok() || column_families_.size() == + static_cast(FLAGS_column_families)); + } else { +#ifndef ROCKSDB_LITE + DBWithTTL* db_with_ttl; + s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); + db_ = db_with_ttl; +#else + fprintf(stderr, "TTL is not supported in RocksDBLite\n"); + exit(1); +#endif + } + if (!s.ok()) { + fprintf(stderr, "open error: %s\n", s.ToString().c_str()); + exit(1); + } + } + + void Reopen() { + for (auto cf : column_families_) { + delete cf; + } + column_families_.clear(); + delete db_; + db_ = nullptr; +#ifndef ROCKSDB_LITE + txn_db_ = nullptr; +#endif + + num_times_reopened_++; + auto now = FLAGS_env->NowMicros(); + fprintf(stdout, "%s Reopening database for the %dth time\n", + FLAGS_env->TimeToString(now/1000000).c_str(), + num_times_reopened_); + Open(); + } + + void PrintStatistics() { + if (dbstats) { + fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); + } + } + + std::shared_ptr cache_; + std::shared_ptr compressed_cache_; + std::shared_ptr filter_policy_; + DB* db_; +#ifndef ROCKSDB_LITE + TransactionDB* txn_db_; +#endif + Options options_; + std::vector column_families_; + std::vector column_family_names_; + std::atomic new_column_family_name_; + int num_times_reopened_; + std::unordered_map> options_table_; + std::vector options_index_; +}; + +class NonBatchedOpsStressTest : public StressTest { + public: + NonBatchedOpsStressTest() {} + + virtual ~NonBatchedOpsStressTest() {} + + virtual void VerifyDb(ThreadState* thread) const { + ReadOptions options(FLAGS_verify_checksum, true); + auto shared = thread->shared; + const int64_t max_key = shared->GetMaxKey(); + const int64_t keys_per_thread = max_key / shared->GetNumThreads(); + int64_t start = keys_per_thread * thread->tid; + int64_t end = start + keys_per_thread; + if (thread->tid == shared->GetNumThreads() - 1) { + end = max_key; + } + for (size_t cf = 0; cf < column_families_.size(); ++cf) { + if (thread->shared->HasVerificationFailedYet()) { + break; + } + if (!thread->rand.OneIn(2)) { + // Use iterator to verify this range + unique_ptr iter( + db_->NewIterator(options, column_families_[cf])); + iter->Seek(Key(start)); + for (auto i = start; i < end; i++) { + if (thread->shared->HasVerificationFailedYet()) { + break; + } + // TODO(ljin): update "long" to uint64_t + // Reseek when the prefix changes + if (i % (static_cast(1) << 8 * (8 - FLAGS_prefix_size)) == + 0) { + iter->Seek(Key(i)); + } + std::string from_db; + std::string keystr = Key(i); + Slice k = keystr; + Status s = iter->status(); + if (iter->Valid()) { + if (iter->key().compare(k) > 0) { + s = Status::NotFound(Slice()); + } else if (iter->key().compare(k) == 0) { + from_db = iter->value().ToString(); + iter->Next(); + } else if (iter->key().compare(k) < 0) { + VerificationAbort(shared, "An out of range key was found", + static_cast(cf), i); + } + } else { + // The iterator found no value for the key in question, so do not + // move to the next item in the iterator + s = Status::NotFound(Slice()); + } + VerifyValue(static_cast(cf), i, options, shared, from_db, s, + true); + if (from_db.length()) { + PrintKeyValue(static_cast(cf), static_cast(i), + from_db.data(), from_db.length()); + } + } + } else { + // Use Get to verify this range + for (auto i = start; i < end; i++) { + if (thread->shared->HasVerificationFailedYet()) { + break; + } + std::string from_db; + std::string keystr = Key(i); + Slice k = keystr; + Status s = db_->Get(options, column_families_[cf], k, &from_db); + VerifyValue(static_cast(cf), i, options, shared, from_db, s, + true); + if (from_db.length()) { + PrintKeyValue(static_cast(cf), static_cast(i), + from_db.data(), from_db.length()); + } + } + } + } + } + + virtual void MaybeClearOneColumnFamily(ThreadState* thread) { + if (FLAGS_clear_column_family_one_in != 0 && FLAGS_column_families > 1) { + if (thread->rand.OneIn(FLAGS_clear_column_family_one_in)) { + // drop column family and then create it again (can't drop default) + int cf = thread->rand.Next() % (FLAGS_column_families - 1) + 1; + std::string new_name = + ToString(new_column_family_name_.fetch_add(1)); + { + MutexLock l(thread->shared->GetMutex()); + fprintf( + stdout, + "[CF %d] Dropping and recreating column family. new name: %s\n", + cf, new_name.c_str()); + } + thread->shared->LockColumnFamily(cf); + Status s = db_->DropColumnFamily(column_families_[cf]); + delete column_families_[cf]; + if (!s.ok()) { + fprintf(stderr, "dropping column family error: %s\n", + s.ToString().c_str()); + std::terminate(); + } + s = db_->CreateColumnFamily(ColumnFamilyOptions(options_), new_name, + &column_families_[cf]); + column_family_names_[cf] = new_name; + thread->shared->ClearColumnFamily(cf); + if (!s.ok()) { + fprintf(stderr, "creating column family error: %s\n", + s.ToString().c_str()); + std::terminate(); + } + thread->shared->UnlockColumnFamily(cf); + } + } + } + + virtual bool ShouldAcquireMutexOnKey() const { return true; } + + virtual Status TestGet(ThreadState* thread, + const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + auto cfh = column_families_[rand_column_families[0]]; + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + std::string from_db; + Status s = db_->Get(read_opts, cfh, key, &from_db); + if (s.ok()) { + // found case + thread->stats.AddGets(1, 1); + } else if (s.IsNotFound()) { + // not found case + thread->stats.AddGets(1, 0); + } else { + // errors case + thread->stats.AddErrors(1); + } + return s; + } + + virtual Status TestPrefixScan(ThreadState* thread, + const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + auto cfh = column_families_[rand_column_families[0]]; + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + Slice prefix = Slice(key.data(), FLAGS_prefix_size); + Iterator* iter = db_->NewIterator(read_opts, cfh); + int64_t count = 0; + for (iter->Seek(prefix); + iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { + ++count; + } + assert(count <= + (static_cast(1) << ((8 - FLAGS_prefix_size) * 8))); + Status s = iter->status(); + if (iter->status().ok()) { + thread->stats.AddPrefixes(1, static_cast(count)); + } else { + thread->stats.AddErrors(1); + } + delete iter; + return s; + } + + virtual Status TestPut(ThreadState* thread, + WriteOptions& write_opts, const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + char (&value) [100], std::unique_ptr& lock) { + auto shared = thread->shared; + int64_t max_key = shared->GetMaxKey(); + int64_t rand_key = rand_keys[0]; + int rand_column_family = rand_column_families[0]; + while (!shared->AllowsOverwrite(rand_column_family, rand_key) && + (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) { + lock.reset(); + rand_key = thread->rand.Next() % max_key; + rand_column_family = thread->rand.Next() % FLAGS_column_families; + lock.reset(new MutexLock( + shared->GetMutexForKey(rand_column_family, rand_key))); + } + + std::string key_str = Key(rand_key); + Slice key = key_str; + ColumnFamilyHandle* cfh = column_families_[rand_column_family]; + + if (FLAGS_verify_before_write) { + std::string key_str2 = Key(rand_key); + Slice k = key_str2; + std::string from_db; + Status s = db_->Get(read_opts, cfh, k, &from_db); + if (!VerifyValue(rand_column_family, rand_key, read_opts, shared, + from_db, s, true)) { + return s; + } + } + uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; + size_t sz = GenerateValue(value_base, value, sizeof(value)); + Slice v(value, sz); + shared->Put(rand_column_family, rand_key, value_base, true /* pending */); + Status s; + if (FLAGS_use_merge) { + if (!FLAGS_use_txn) { + s = db_->Merge(write_opts, cfh, key, v); + } else { +#ifndef ROCKSDB_LITE + Transaction* txn; + s = NewTxn(write_opts, &txn); + if (s.ok()) { + s = txn->Merge(cfh, key, v); + if (s.ok()) { + s = CommitTxn(txn); + } + } +#endif + } + } else { + if (!FLAGS_use_txn) { + s = db_->Put(write_opts, cfh, key, v); + } else { +#ifndef ROCKSDB_LITE + Transaction* txn; + s = NewTxn(write_opts, &txn); + if (s.ok()) { + s = txn->Put(cfh, key, v); + if (s.ok()) { + s = CommitTxn(txn); + } + } +#endif + } + } + shared->Put(rand_column_family, rand_key, value_base, false /* pending */); + if (!s.ok()) { + fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); + std::terminate(); + } + thread->stats.AddBytesForWrites(1, sz); + PrintKeyValue(rand_column_family, static_cast(rand_key), + value, sz); + return s; + } + + virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& lock) { + int64_t rand_key = rand_keys[0]; + int rand_column_family = rand_column_families[0]; + auto shared = thread->shared; + int max_key = shared->GetMaxKey(); + + // OPERATION delete + // If the chosen key does not allow overwrite and it does not exist, + // choose another key. + while (!shared->AllowsOverwrite(rand_column_family, rand_key) && + !shared->Exists(rand_column_family, rand_key)) { + lock.reset(); + rand_key = thread->rand.Next() % max_key; + rand_column_family = thread->rand.Next() % FLAGS_column_families; + lock.reset(new MutexLock( + shared->GetMutexForKey(rand_column_family, rand_key))); + } + + std::string key_str = Key(rand_key); + Slice key = key_str; + auto cfh = column_families_[rand_column_family]; + + // Use delete if the key may be overwritten and a single deletion + // otherwise. + Status s; + if (shared->AllowsOverwrite(rand_column_family, rand_key)) { + shared->Delete(rand_column_family, rand_key, true /* pending */); + if (!FLAGS_use_txn) { + s = db_->Delete(write_opts, cfh, key); + } else { +#ifndef ROCKSDB_LITE + Transaction* txn; + s = NewTxn(write_opts, &txn); + if (s.ok()) { + s = txn->Delete(cfh, key); + if (s.ok()) { + s = CommitTxn(txn); + } + } +#endif + } + shared->Delete(rand_column_family, rand_key, false /* pending */); + thread->stats.AddDeletes(1); + if (!s.ok()) { + fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); + std::terminate(); + } + } else { + shared->SingleDelete(rand_column_family, rand_key, true /* pending */); + if (!FLAGS_use_txn) { + s = db_->SingleDelete(write_opts, cfh, key); + } else { +#ifndef ROCKSDB_LITE + Transaction* txn; + s = NewTxn(write_opts, &txn); + if (s.ok()) { + s = txn->SingleDelete(cfh, key); + if (s.ok()) { + s = CommitTxn(txn); + } + } +#endif + } + shared->SingleDelete(rand_column_family, rand_key, false /* pending */); + thread->stats.AddSingleDeletes(1); + if (!s.ok()) { + fprintf(stderr, "single delete error: %s\n", + s.ToString().c_str()); + std::terminate(); + } + } + return s; + } + + virtual Status TestDeleteRange(ThreadState* thread, + WriteOptions& write_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& lock) { + // OPERATION delete range + std::vector> range_locks; + // delete range does not respect disallowed overwrites. the keys for + // which overwrites are disallowed are randomly distributed so it + // could be expensive to find a range where each key allows + // overwrites. + int64_t rand_key = rand_keys[0]; + int rand_column_family = rand_column_families[0]; + auto shared = thread->shared; + int64_t max_key = shared->GetMaxKey(); + if (rand_key > max_key - FLAGS_range_deletion_width) { + lock.reset(); + rand_key = thread->rand.Next() % + (max_key - FLAGS_range_deletion_width + 1); + range_locks.emplace_back(new MutexLock( + shared->GetMutexForKey(rand_column_family, rand_key))); + } else { + range_locks.emplace_back(std::move(lock)); + } + for (int j = 1; j < FLAGS_range_deletion_width; ++j) { + if (((rand_key + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) { + range_locks.emplace_back(new MutexLock( + shared->GetMutexForKey(rand_column_family, rand_key + j))); + } + } + shared->DeleteRange(rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width, + true /* pending */); + + std::string keystr = Key(rand_key); + Slice key = keystr; + auto cfh = column_families_[rand_column_family]; + std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); + Slice end_key = end_keystr; + Status s = db_->DeleteRange(write_opts, cfh, key, end_key); + if (!s.ok()) { + fprintf(stderr, "delete range error: %s\n", + s.ToString().c_str()); + std::terminate(); + } + int covered = shared->DeleteRange( + rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width, false /* pending */); + thread->stats.AddRangeDeletions(1); + thread->stats.AddCoveredByRangeDeletions(covered); + return s; + } + + bool VerifyValue(int cf, int64_t key, const ReadOptions& /*opts*/, + SharedState* shared, const std::string& value_from_db, + Status s, bool strict = false) const { + if (shared->HasVerificationFailedYet()) { + return false; + } + // compare value_from_db with the value in the shared state + char value[kValueMaxLen]; + uint32_t value_base = shared->Get(cf, key); + if (value_base == SharedState::UNKNOWN_SENTINEL) { + return true; + } + if (value_base == SharedState::DELETION_SENTINEL && !strict) { + return true; + } + + if (s.ok()) { + if (value_base == SharedState::DELETION_SENTINEL) { + VerificationAbort(shared, "Unexpected value found", cf, key); + return false; + } + size_t sz = GenerateValue(value_base, value, sizeof(value)); + if (value_from_db.length() != sz) { + VerificationAbort(shared, "Length of value read is not equal", cf, key); + return false; + } + if (memcmp(value_from_db.data(), value, sz) != 0) { + VerificationAbort(shared, "Contents of value read don't match", cf, + key); + return false; + } + } else { + if (value_base != SharedState::DELETION_SENTINEL) { + VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key); + return false; + } + } + return true; + } +}; + +class BatchedOpsStressTest : public StressTest { + public: + BatchedOpsStressTest() {} + virtual ~BatchedOpsStressTest() {} + + // Given a key K and value V, this puts ("0"+K, "0"+V), ("1"+K, "1"+V), ... + // ("9"+K, "9"+V) in DB atomically i.e in a single batch. + // Also refer BatchedOpsStressTest::TestGet + virtual Status TestPut(ThreadState* thread, + WriteOptions& write_opts, const ReadOptions& /* read_opts */, + const std::vector& rand_column_families, const std::vector& rand_keys, + char (&value)[100], std::unique_ptr& /* lock */) { + uint32_t value_base = + thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL; + size_t sz = GenerateValue(value_base, value, sizeof(value)); + Slice v(value, sz); + std::string keys[10] = {"9", "8", "7", "6", "5", + "4", "3", "2", "1", "0"}; + std::string values[10] = {"9", "8", "7", "6", "5", + "4", "3", "2", "1", "0"}; + Slice value_slices[10]; + WriteBatch batch; + Status s; + auto cfh = column_families_[rand_column_families[0]]; + std::string key_str = Key(rand_keys[0]); + for (int i = 0; i < 10; i++) { + keys[i] += key_str; + values[i] += v.ToString(); + value_slices[i] = values[i]; + if (FLAGS_use_merge) { + batch.Merge(cfh, keys[i], value_slices[i]); + } else { + batch.Put(cfh, keys[i], value_slices[i]); + } + } + + s = db_->Write(write_opts, &batch); + if (!s.ok()) { + fprintf(stderr, "multiput error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); + } else { + // we did 10 writes each of size sz + 1 + thread->stats.AddBytesForWrites(10, (sz + 1) * 10); + } + + return s; + } + + // Given a key K, this deletes ("0"+K), ("1"+K),... ("9"+K) + // in DB atomically i.e in a single batch. Also refer MultiGet. + virtual Status TestDelete(ThreadState* thread, WriteOptions& writeoptions, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& /* lock */) { + std::string keys[10] = {"9", "7", "5", "3", "1", + "8", "6", "4", "2", "0"}; + + WriteBatch batch; + Status s; + auto cfh = column_families_[rand_column_families[0]]; + std::string key_str = Key(rand_keys[0]); + for (int i = 0; i < 10; i++) { + keys[i] += key_str; + batch.Delete(cfh, keys[i]); + } + + s = db_->Write(writeoptions, &batch); + if (!s.ok()) { + fprintf(stderr, "multidelete error: %s\n", s.ToString().c_str()); + thread->stats.AddErrors(1); } else { - options_.merge_operator = MergeOperators::CreatePutOperator(); + thread->stats.AddDeletes(10); } - fprintf(stdout, "DB path: [%s]\n", FLAGS_db.c_str()); + return s; + } + + virtual Status TestDeleteRange(ThreadState* /* thread */, + WriteOptions& /* write_opts */, + const std::vector& /* rand_column_families */, + const std::vector& /* rand_keys */, + std::unique_ptr& /* lock */) { assert(false); } + // Given a key K, this gets values for "0"+K, "1"+K,..."9"+K + // in the same snapshot, and verifies that all the values are of the form + // "0"+V, "1"+V,..."9"+V. + // ASSUMES that BatchedOpsStressTest::TestPut was used to put (K, V) into + // the DB. + virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + std::string keys[10] = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; + Slice key_slices[10]; + std::string values[10]; + ReadOptions readoptionscopy = readoptions; + readoptionscopy.snapshot = db_->GetSnapshot(); + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + auto cfh = column_families_[rand_column_families[0]]; + std::string from_db; Status s; - if (FLAGS_ttl == -1) { - std::vector existing_column_families; - s = DB::ListColumnFamilies(DBOptions(options_), FLAGS_db, - &existing_column_families); // ignore errors - if (!s.ok()) { - // DB doesn't exist - assert(existing_column_families.empty()); - assert(column_family_names_.empty()); - column_family_names_.push_back(kDefaultColumnFamilyName); - } else if (column_family_names_.empty()) { - // this is the first call to the function Open() - column_family_names_ = existing_column_families; + for (int i = 0; i < 10; i++) { + keys[i] += key.ToString(); + key_slices[i] = keys[i]; + s = db_->Get(readoptionscopy, cfh, key_slices[i], &from_db); + if (!s.ok() && !s.IsNotFound()) { + fprintf(stderr, "get error: %s\n", s.ToString().c_str()); + values[i] = ""; + thread->stats.AddErrors(1); + // we continue after error rather than exiting so that we can + // find more errors if any + } else if (s.IsNotFound()) { + values[i] = ""; + thread->stats.AddGets(1, 0); } else { - // this is a reopen. just assert that existing column_family_names are - // equivalent to what we remember - auto sorted_cfn = column_family_names_; - std::sort(sorted_cfn.begin(), sorted_cfn.end()); - std::sort(existing_column_families.begin(), - existing_column_families.end()); - if (sorted_cfn != existing_column_families) { - fprintf(stderr, - "Expected column families differ from the existing:\n"); - printf("Expected: {"); - for (auto cf : sorted_cfn) { - printf("%s ", cf.c_str()); - } - printf("}\n"); - printf("Existing: {"); - for (auto cf : existing_column_families) { - printf("%s ", cf.c_str()); - } - printf("}\n"); + values[i] = from_db; + + char expected_prefix = (keys[i])[0]; + char actual_prefix = (values[i])[0]; + if (actual_prefix != expected_prefix) { + fprintf(stderr, "error expected prefix = %c actual = %c\n", + expected_prefix, actual_prefix); } - assert(sorted_cfn == existing_column_families); + (values[i])[0] = ' '; // blank out the differing character + thread->stats.AddGets(1, 1); } - std::vector cf_descriptors; - for (auto name : column_family_names_) { - if (name != kDefaultColumnFamilyName) { - new_column_family_name_ = - std::max(new_column_family_name_.load(), std::stoi(name) + 1); - } - cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); + } + db_->ReleaseSnapshot(readoptionscopy.snapshot); + + // Now that we retrieved all values, check that they all match + for (int i = 1; i < 10; i++) { + if (values[i] != values[0]) { + fprintf(stderr, "error : inconsistent values for key %s: %s, %s\n", + key.ToString(true).c_str(), StringToHex(values[0]).c_str(), + StringToHex(values[i]).c_str()); + // we continue after error rather than exiting so that we can + // find more errors if any } - while (cf_descriptors.size() < (size_t)FLAGS_column_families) { - std::string name = ToString(new_column_family_name_.load()); - new_column_family_name_++; - cf_descriptors.emplace_back(name, ColumnFamilyOptions(options_)); - column_family_names_.push_back(name); + } + + return s; + } + + // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P + // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes + // of the key. Each of these 10 scans returns a series of values; + // each series should be the same length, and it is verified for each + // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V. + // ASSUMES that MultiPut was used to put (K, V) + virtual Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + std::string key_str = Key(rand_keys[0]); + Slice key = key_str; + auto cfh = column_families_[rand_column_families[0]]; + std::string prefixes[10] = {"0", "1", "2", "3", "4", + "5", "6", "7", "8", "9"}; + Slice prefix_slices[10]; + ReadOptions readoptionscopy[10]; + const Snapshot* snapshot = db_->GetSnapshot(); + Iterator* iters[10]; + Status s = Status::OK(); + for (int i = 0; i < 10; i++) { + prefixes[i] += key.ToString(); + prefixes[i].resize(FLAGS_prefix_size); + prefix_slices[i] = Slice(prefixes[i]); + readoptionscopy[i] = readoptions; + readoptionscopy[i].snapshot = snapshot; + iters[i] = db_->NewIterator(readoptionscopy[i], cfh); + iters[i]->Seek(prefix_slices[i]); + } + + int count = 0; + while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) { + count++; + std::string values[10]; + // get list of all values for this iteration + for (int i = 0; i < 10; i++) { + // no iterator should finish before the first one + assert(iters[i]->Valid() && + iters[i]->key().starts_with(prefix_slices[i])); + values[i] = iters[i]->value().ToString(); + + char expected_first = (prefixes[i])[0]; + char actual_first = (values[i])[0]; + + if (actual_first != expected_first) { + fprintf(stderr, "error expected first = %c actual = %c\n", + expected_first, actual_first); + } + (values[i])[0] = ' '; // blank out the differing character } - options_.listeners.clear(); - options_.listeners.emplace_back( - new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors)); - options_.create_missing_column_families = true; - if (!FLAGS_use_txn) { - s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, - &column_families_, &db_); - } else { -#ifndef ROCKSDB_LITE - TransactionDBOptions txn_db_options; - // For the moment it is sufficient to test WRITE_PREPARED policy - txn_db_options.write_policy = TxnDBWritePolicy::WRITE_PREPARED; - s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, - cf_descriptors, &column_families_, &txn_db_); - db_ = txn_db_; - // after a crash, rollback to commit recovered transactions - std::vector trans; - txn_db_->GetAllPreparedTransactions(&trans); - Random rand(static_cast(FLAGS_seed)); - for (auto txn : trans) { - if (rand.OneIn(2)) { - s = txn->Commit(); - assert(s.ok()); - } else { - s = txn->Rollback(); - assert(s.ok()); - } - delete txn; + // make sure all values are equivalent + for (int i = 0; i < 10; i++) { + if (values[i] != values[0]) { + fprintf(stderr, "error : %d, inconsistent values for prefix %s: %s, %s\n", + i, prefixes[i].c_str(), StringToHex(values[0]).c_str(), + StringToHex(values[i]).c_str()); + // we continue after error rather than exiting so that we can + // find more errors if any } - trans.clear(); - txn_db_->GetAllPreparedTransactions(&trans); - assert(trans.size() == 0); -#endif + iters[i]->Next(); } - assert(!s.ok() || column_families_.size() == - static_cast(FLAGS_column_families)); - } else { -#ifndef ROCKSDB_LITE - DBWithTTL* db_with_ttl; - s = DBWithTTL::Open(options_, FLAGS_db, &db_with_ttl, FLAGS_ttl); - db_ = db_with_ttl; -#else - fprintf(stderr, "TTL is not supported in RocksDBLite\n"); - exit(1); -#endif - } - if (!s.ok()) { - fprintf(stderr, "open error: %s\n", s.ToString().c_str()); - exit(1); } - } - void Reopen() { - for (auto cf : column_families_) { - delete cf; + // cleanup iterators and snapshot + for (int i = 0; i < 10; i++) { + // if the first iterator finished, they should have all finished + assert(!iters[i]->Valid() || + !iters[i]->key().starts_with(prefix_slices[i])); + assert(iters[i]->status().ok()); + delete iters[i]; } - column_families_.clear(); - delete db_; - db_ = nullptr; -#ifndef ROCKSDB_LITE - txn_db_ = nullptr; -#endif - - num_times_reopened_++; - auto now = FLAGS_env->NowMicros(); - fprintf(stdout, "%s Reopening database for the %dth time\n", - FLAGS_env->TimeToString(now/1000000).c_str(), - num_times_reopened_); - Open(); - } + db_->ReleaseSnapshot(snapshot); - void PrintStatistics() { - if (dbstats) { - fprintf(stdout, "STATISTICS:\n%s\n", dbstats->ToString().c_str()); + if (s.ok()) { + thread->stats.AddPrefixes(1, count); + } else { + thread->stats.AddErrors(1); } + + return s; } - private: - std::shared_ptr cache_; - std::shared_ptr compressed_cache_; - std::shared_ptr filter_policy_; - DB* db_; -#ifndef ROCKSDB_LITE - TransactionDB* txn_db_; -#endif - Options options_; - std::vector column_families_; - std::vector column_family_names_; - std::atomic new_column_family_name_; - int num_times_reopened_; - std::unordered_map> options_table_; - std::vector options_index_; + virtual void VerifyDb(ThreadState* /* thread */) const {} }; } // namespace rocksdb @@ -2886,8 +3006,8 @@ int main(int argc, char** argv) { exit(1); } if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) { - fprintf(stderr, "Error: Db cannot reopen safely with disable_wal set!\n"); - exit(1); + fprintf(stderr, "Error: Db cannot reopen safely with disable_wal set!\n"); + exit(1); } if ((unsigned)FLAGS_reopen >= FLAGS_ops_per_thread) { fprintf(stderr, @@ -2931,8 +3051,13 @@ int main(int argc, char** argv) { rocksdb_kill_odds = FLAGS_kill_random_test; rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist); - rocksdb::StressTest stress; - if (stress.Run()) { + std::unique_ptr stress; + if (FLAGS_test_batches_snapshots) { + stress.reset(new rocksdb::BatchedOpsStressTest()); + } else { + stress.reset(new rocksdb::NonBatchedOpsStressTest()); + } + if (stress->Run()) { return 0; } else { return 1;